1 module hunt.http.codec.http.stream.Http2Flusher; 2 3 import hunt.http.codec.http.stream.FlowControlStrategy; 4 import hunt.http.codec.http.stream.Http2Session; 5 import hunt.http.codec.http.stream.StreamSPI; 6 7 import hunt.http.codec.http.frame.Frame; 8 import hunt.http.codec.http.frame.FrameType; 9 import hunt.http.codec.http.frame.WindowUpdateFrame; 10 11 import hunt.collection; 12 import hunt.concurrency.Locker; 13 import hunt.concurrency.IteratingCallback; 14 import hunt.Exceptions; 15 import hunt.logging; 16 import hunt.net.Connection; 17 import hunt.util.Common; 18 19 import std.format; 20 21 22 /** 23 */ 24 class Http2Flusher : IteratingCallback { 25 26 alias Action = IteratingCallback.Action; 27 28 private Queue!(WindowEntry) windows;// = new ArrayDeque!()(); 29 private Deque!(Entry) frames;// = new ArrayDeque!()(); 30 private Queue!(Entry) entries;// = new ArrayDeque!()(); 31 private List!(Entry) actives;// = new ArrayList!()(); 32 private Http2Session session; 33 private Queue!ByteBuffer buffers;// = new LinkedList!()(); 34 private Entry stalled; 35 private Exception terminated; 36 37 this(Http2Session session) { 38 windows = new LinkedList!(WindowEntry)(); 39 frames = new LinkedList!(Entry)(); 40 entries = new LinkedList!(Entry)(); 41 actives = new ArrayList!(Entry)(); 42 buffers = new LinkedList!(ByteBuffer)(); 43 this.session = session; 44 } 45 46 void window(StreamSPI stream, WindowUpdateFrame frame) { 47 Exception closed; 48 synchronized (this) { 49 closed = terminated; 50 if (closed is null) 51 windows.offer(new WindowEntry(stream, frame)); 52 } 53 // Flush stalled data. 54 if (closed is null) 55 iterate(); 56 } 57 58 bool prepend(Entry entry) { 59 Exception e; 60 synchronized (this) { 61 e = terminated; 62 if (e is null) { 63 frames.offerFirst(entry); 64 version(HUNT_HTTP_DEBUG) { 65 tracef("Prepended %s, frames=%s", entry.toString(), frames.size()); 66 } 67 } 68 } 69 if (e is null) 70 return true; 71 onClosed(entry, e); 72 return false; 73 } 74 75 bool append(Entry entry) { 76 Exception closed; 77 synchronized (this) { 78 closed = terminated; 79 if (closed is null) { 80 frames.offer(entry); 81 version(HUNT_DEBUG) { 82 // tracef("Appended %s, frames=%s", entry.toString(), frames.size()); 83 } 84 } 85 } 86 if (closed is null) { 87 return true; 88 } 89 onClosed(entry, closed); 90 return false; 91 } 92 93 private int getWindowQueueSize() { 94 return windows.size(); 95 } 96 97 int getFrameQueueSize() { 98 return frames.size(); 99 } 100 101 override 102 protected Action process() { 103 version(HUNT_DEBUG) { 104 // tracef("Flushing %s", session.toString()); 105 } 106 synchronized (this) { 107 if (terminated !is null) { 108 throw terminated; 109 } 110 111 while (!windows.isEmpty()) { 112 //logInfo("!isEmpty--------------------------------"); 113 WindowEntry entry = windows.poll(); 114 entry.perform(); 115 } 116 117 foreach (Entry entry ; frames) { 118 entries.offer(entry); 119 actives.add(entry); 120 } 121 frames.clear(); 122 } 123 124 125 if (entries.isEmpty()) { 126 version(HUNT_DEBUG) { 127 // tracef("Flushed %s", session.toString()); 128 } 129 return Action.IDLE; 130 } 131 132 while (!entries.isEmpty()) { 133 Entry entry = entries.poll(); 134 version(HUNT_DEBUG) { 135 // tracef("Processing %s", entry.toString()); 136 } 137 // If the stream has been reset or removed, don't send the frame. 138 if (entry.isStale()) { 139 version(HUNT_DEBUG) { 140 tracef("Stale %s", entry.toString()); 141 } 142 continue; 143 } 144 145 try { 146 if (entry.generate(buffers)) { 147 if (entry.dataRemaining() > 0) 148 entries.offer(entry); 149 } else { 150 if (stalled is null) 151 stalled = entry; 152 } 153 } catch (Exception failure) { 154 // Failure to generate the entry is catastrophic. 155 version(HUNT_DEBUG) { 156 trace("Failure generating frame " ~ entry.frame.toString(), failure.toString()); 157 } 158 failed(failure); 159 return Action.SUCCEEDED; 160 } 161 } 162 163 if (buffers.isEmpty()) { 164 complete(); 165 return Action.IDLE; 166 } 167 168 version(HUNT_HTTP_DEBUG) { 169 tracef("Writing %s buffers (%s bytes) for %s frames %s", 170 buffers.size(), BufferUtils.remaining(buffers.toArray()), actives.size(), actives.toString()); 171 } 172 173 Connection tcpSession = session.getEndPoint(); 174 foreach(ByteBuffer buffer; buffers) { 175 tcpSession.encode(buffer); 176 } 177 this.succeeded(); 178 return Action.SCHEDULED; 179 } 180 181 override 182 void succeeded() { 183 version(HUNT_HTTP_DEBUG) { 184 tracef("Written %s frames for %s", actives.size(), actives.toString()); 185 } 186 complete(); 187 188 super.succeeded(); 189 } 190 191 private void complete() { 192 buffers.clear(); 193 194 // actives.forEach(Entry::complete); 195 foreach(Entry ac; actives) 196 ac.complete(); 197 198 if (stalled !is null) { 199 // We have written part of the frame, but there is more to write. 200 // The API will not allow to send two data frames for the same 201 // stream so we append the unfinished frame at the end to allow 202 // better interleaving with other streams. 203 int index = actives.indexOf(stalled); 204 for (int i = index; i < actives.size(); ++i) { 205 Entry entry = actives.get(i); 206 if (entry.dataRemaining() > 0) 207 append(entry); 208 } 209 for (int i = 0; i < index; ++i) { 210 Entry entry = actives.get(i); 211 if (entry.dataRemaining() > 0) 212 append(entry); 213 } 214 stalled = null; 215 } 216 217 actives.clear(); 218 } 219 220 override 221 protected void onCompleteSuccess() { 222 throw new IllegalStateException(""); 223 } 224 225 override 226 protected void onCompleteFailure(Exception x) { 227 buffers.clear(); 228 229 Exception closed; 230 synchronized (this) { 231 closed = terminated; 232 terminated = x; 233 version(HUNT_DEBUG) { 234 tracef("%s, active/queued=%s/%s", closed !is null ? "Closing" : "Failing", actives.size(), frames.size()); 235 } 236 actives.addAll(frames); 237 frames.clear(); 238 } 239 240 foreach(Entry entry; actives) 241 entry.failed(x); 242 actives.clear(); 243 244 // If the failure came from within the 245 // flusher, we need to close the connection. 246 if (closed is null) 247 session.abort(x); 248 } 249 250 void terminate(Exception cause) { 251 Exception closed; 252 synchronized (this) { 253 closed = terminated; 254 terminated = cause; 255 version(HUNT_DEBUG) { 256 tracef("%s", closed !is null ? "Terminated" : "Terminating"); 257 } 258 } 259 if (closed is null) 260 iterate(); 261 } 262 263 private void onClosed(Entry entry, Exception failure) { 264 entry.failed(failure); 265 } 266 267 override 268 string toString() { 269 return format("%s[window_queue=%d,frame_queue=%d,actives=%d]", 270 super.toString(), 271 getWindowQueueSize(), 272 getFrameQueueSize(), 273 actives.size()); 274 } 275 276 static abstract class Entry : NestedCallback { 277 Frame frame; 278 StreamSPI stream; 279 280 protected this(Frame frame, StreamSPI stream, Callback callback) { 281 super(callback); 282 this.frame = frame; 283 this.stream = stream; 284 } 285 286 int dataRemaining() { 287 return 0; 288 } 289 290 protected abstract bool generate(Queue!ByteBuffer buffers); 291 292 private void complete() { 293 if (isStale()) 294 failed(new EofException("reset")); 295 else 296 succeeded(); 297 } 298 299 override 300 void failed(Exception x) { 301 if (stream !is null) { 302 stream.close(); 303 stream.getSession().removeStream(stream); 304 } 305 super.failed(x); 306 } 307 308 private bool isStale() { 309 return !isProtocol() && stream !is null && stream.isReset(); 310 } 311 312 private bool isProtocol() { 313 switch (frame.getType()) { 314 case FrameType.DATA: 315 case FrameType.HEADERS: 316 case FrameType.PUSH_PROMISE: 317 case FrameType.CONTINUATION: 318 return false; 319 case FrameType.PRIORITY: 320 case FrameType.RST_STREAM: 321 case FrameType.SETTINGS: 322 case FrameType.PING: 323 case FrameType.GO_AWAY: 324 case FrameType.WINDOW_UPDATE: 325 case FrameType.PREFACE: 326 case FrameType.DISCONNECT: 327 return true; 328 default: 329 throw new IllegalStateException(""); 330 } 331 } 332 333 override 334 string toString() { 335 return frame.toString(); 336 } 337 } 338 339 private class WindowEntry { 340 private StreamSPI stream; 341 private WindowUpdateFrame frame; 342 343 this(StreamSPI stream, WindowUpdateFrame frame) { 344 this.stream = stream; 345 this.frame = frame; 346 } 347 348 void perform() { 349 FlowControlStrategy flowControl = session.getFlowControlStrategy(); 350 flowControl.onWindowUpdate(session, stream, frame); 351 } 352 } 353 354 }