1 module hunt.http.codec.http.stream.Http2Flusher; 2 3 import hunt.http.codec.http.stream.FlowControlStrategy; 4 import hunt.http.codec.http.stream.Http2Session; 5 import hunt.http.codec.http.stream.StreamSPI; 6 7 import hunt.http.codec.http.frame.Frame; 8 import hunt.http.codec.http.frame.FrameType; 9 import hunt.http.codec.http.frame.WindowUpdateFrame; 10 11 import hunt.util.concurrent.Locker; 12 import hunt.util.concurrent.IteratingCallback; 13 import hunt.lang.exception; 14 import hunt.util.functional; 15 16 import hunt.container; 17 import hunt.logging; 18 import std.format; 19 20 21 /** 22 */ 23 class Http2Flusher : IteratingCallback { 24 25 alias Action = IteratingCallback.Action; 26 27 private Queue!(WindowEntry) windows;// = new ArrayDeque!()(); 28 private Deque!(Entry) frames;// = new ArrayDeque!()(); 29 private Queue!(Entry) entries;// = new ArrayDeque!()(); 30 private List!(Entry) actives;// = new ArrayList!()(); 31 private Http2Session session; 32 private Queue!ByteBuffer buffers;// = new LinkedList!()(); 33 private Entry stalled; 34 private Exception terminated; 35 36 this(Http2Session session) { 37 windows = new LinkedList!(WindowEntry)(); 38 frames = new LinkedList!(Entry)(); 39 entries = new LinkedList!(Entry)(); 40 actives = new ArrayList!(Entry)(); 41 buffers = new LinkedList!(ByteBuffer)(); 42 this.session = session; 43 } 44 45 void window(StreamSPI stream, WindowUpdateFrame frame) { 46 Exception closed; 47 synchronized (this) { 48 closed = terminated; 49 if (closed is null) 50 windows.offer(new WindowEntry(stream, frame)); 51 } 52 // Flush stalled data. 53 if (closed is null) 54 iterate(); 55 } 56 57 bool prepend(Entry entry) { 58 Exception e; 59 synchronized (this) { 60 e = terminated; 61 if (e is null) { 62 frames.offerFirst(entry); 63 version(HUNT_DEBUG) { 64 tracef("Prepended %s, frames=%s", entry.toString(), frames.size()); 65 } 66 } 67 } 68 if (e is null) 69 return true; 70 onClosed(entry, e); 71 return false; 72 } 73 74 bool append(Entry entry) { 75 Exception closed; 76 synchronized (this) { 77 closed = terminated; 78 if (closed is null) { 79 frames.offer(entry); 80 version(HUNT_DEBUG) { 81 tracef("Appended %s, frames=%s", entry.toString(), frames.size()); 82 } 83 } 84 } 85 if (closed is null) { 86 return true; 87 } 88 onClosed(entry, closed); 89 return false; 90 } 91 92 private int getWindowQueueSize() { 93 return windows.size(); 94 } 95 96 int getFrameQueueSize() { 97 return frames.size(); 98 } 99 100 override 101 protected Action process() { 102 version(HUNT_DEBUG) { 103 tracef("Flushing %s", session.toString()); 104 } 105 synchronized (this) { 106 if (terminated !is null) { 107 throw terminated; 108 } 109 110 while (!windows.isEmpty()) { 111 WindowEntry entry = windows.poll(); 112 entry.perform(); 113 } 114 115 foreach (Entry entry ; frames) { 116 entries.offer(entry); 117 actives.add(entry); 118 } 119 frames.clear(); 120 } 121 122 123 if (entries.isEmpty()) { 124 version(HUNT_DEBUG) { 125 tracef("Flushed %s", session.toString()); 126 } 127 return Action.IDLE; 128 } 129 130 while (!entries.isEmpty()) { 131 Entry entry = entries.poll(); 132 version(HUNT_DEBUG) { 133 tracef("Processing %s", entry.toString()); 134 } 135 // If the stream has been reset or removed, don't send the frame. 136 if (entry.isStale()) { 137 version(HUNT_DEBUG) { 138 tracef("Stale %s", entry.toString()); 139 } 140 continue; 141 } 142 143 try { 144 if (entry.generate(buffers)) { 145 if (entry.dataRemaining() > 0) 146 entries.offer(entry); 147 } else { 148 if (stalled is null) 149 stalled = entry; 150 } 151 } catch (Exception failure) { 152 // Failure to generate the entry is catastrophic. 153 version(HUNT_DEBUG) { 154 trace("Failure generating frame " ~ entry.frame.toString(), failure.toString()); 155 } 156 failed(failure); 157 return Action.SUCCEEDED; 158 } 159 } 160 161 if (buffers.isEmpty()) { 162 complete(); 163 return Action.IDLE; 164 } 165 166 version(HUNT_DEBUG) { 167 tracef("Writing %s buffers (%s bytes) for %s frames %s", 168 buffers.size(), BufferUtils.remaining(buffers), actives.size(), actives.toString()); 169 } 170 171 TcpSession tcpSession = session.getEndPoint(); 172 tcpSession.encode(buffers.toArray()); 173 this.succeeded(); 174 return Action.SCHEDULED; 175 } 176 177 override 178 void succeeded() { 179 version(HUNT_DEBUG) { 180 tracef("Written %s frames for %s", actives.size(), actives.toString()); 181 } 182 complete(); 183 184 super.succeeded(); 185 } 186 187 private void complete() { 188 buffers.clear(); 189 190 // actives.forEach(Entry::complete); 191 foreach(Entry ac; actives) 192 ac.complete(); 193 194 if (stalled !is null) { 195 // We have written part of the frame, but there is more to write. 196 // The API will not allow to send two data frames for the same 197 // stream so we append the unfinished frame at the end to allow 198 // better interleaving with other streams. 199 int index = actives.indexOf(stalled); 200 for (int i = index; i < actives.size(); ++i) { 201 Entry entry = actives.get(i); 202 if (entry.dataRemaining() > 0) 203 append(entry); 204 } 205 for (int i = 0; i < index; ++i) { 206 Entry entry = actives.get(i); 207 if (entry.dataRemaining() > 0) 208 append(entry); 209 } 210 stalled = null; 211 } 212 213 actives.clear(); 214 } 215 216 override 217 protected void onCompleteSuccess() { 218 throw new IllegalStateException(""); 219 } 220 221 override 222 protected void onCompleteFailure(Exception x) { 223 buffers.clear(); 224 225 Exception closed; 226 synchronized (this) { 227 closed = terminated; 228 terminated = x; 229 version(HUNT_DEBUG) { 230 tracef("%s, active/queued=%s/%s", closed !is null ? "Closing" : "Failing", actives.size(), frames.size()); 231 } 232 actives.addAll(frames); 233 frames.clear(); 234 } 235 236 foreach(Entry entry; actives) 237 entry.failed(x); 238 actives.clear(); 239 240 // If the failure came from within the 241 // flusher, we need to close the connection. 242 if (closed is null) 243 session.abort(x); 244 } 245 246 void terminate(Exception cause) { 247 Exception closed; 248 synchronized (this) { 249 closed = terminated; 250 terminated = cause; 251 version(HUNT_DEBUG) { 252 tracef("%s", closed !is null ? "Terminated" : "Terminating"); 253 } 254 } 255 if (closed is null) 256 iterate(); 257 } 258 259 private void onClosed(Entry entry, Exception failure) { 260 entry.failed(failure); 261 } 262 263 override 264 string toString() { 265 return format("%s[window_queue=%d,frame_queue=%d,actives=%d]", 266 super.toString(), 267 getWindowQueueSize(), 268 getFrameQueueSize(), 269 actives.size()); 270 } 271 272 static abstract class Entry : NestedCallback { 273 Frame frame; 274 StreamSPI stream; 275 276 protected this(Frame frame, StreamSPI stream, Callback callback) { 277 super(callback); 278 this.frame = frame; 279 this.stream = stream; 280 } 281 282 int dataRemaining() { 283 return 0; 284 } 285 286 protected abstract bool generate(Queue!ByteBuffer buffers); 287 288 private void complete() { 289 if (isStale()) 290 failed(new EofException("reset")); 291 else 292 succeeded(); 293 } 294 295 override 296 void failed(Exception x) { 297 if (stream !is null) { 298 stream.close(); 299 stream.getSession().removeStream(stream); 300 } 301 super.failed(x); 302 } 303 304 private bool isStale() { 305 return !isProtocol() && stream !is null && stream.isReset(); 306 } 307 308 private bool isProtocol() { 309 switch (frame.getType()) { 310 case FrameType.DATA: 311 case FrameType.HEADERS: 312 case FrameType.PUSH_PROMISE: 313 case FrameType.CONTINUATION: 314 return false; 315 case FrameType.PRIORITY: 316 case FrameType.RST_STREAM: 317 case FrameType.SETTINGS: 318 case FrameType.PING: 319 case FrameType.GO_AWAY: 320 case FrameType.WINDOW_UPDATE: 321 case FrameType.PREFACE: 322 case FrameType.DISCONNECT: 323 return true; 324 default: 325 throw new IllegalStateException(""); 326 } 327 } 328 329 override 330 string toString() { 331 return frame.toString(); 332 } 333 } 334 335 private class WindowEntry { 336 private StreamSPI stream; 337 private WindowUpdateFrame frame; 338 339 this(StreamSPI stream, WindowUpdateFrame frame) { 340 this.stream = stream; 341 this.frame = frame; 342 } 343 344 void perform() { 345 FlowControlStrategy flowControl = session.getFlowControlStrategy(); 346 flowControl.onWindowUpdate(session, stream, frame); 347 } 348 } 349 350 }