1 module hunt.http.codec.http.stream.Http2Session;
2 
3 import hunt.http.codec.http.stream.CloseState;
4 import hunt.http.codec.http.stream.FlowControlStrategy;
5 import hunt.http.codec.http.stream.Http2Flusher;
6 import hunt.http.codec.http.stream.Http2Stream;
7 import hunt.http.codec.http.stream.Session;
8 import hunt.http.codec.http.stream.SessionSPI;
9 import hunt.http.codec.http.stream.Stream;
10 import hunt.http.codec.http.stream.StreamSPI;
11 
12 import hunt.http.codec.http.decode.Parser;
13 import hunt.http.codec.http.encode.Http2Generator;
14 import hunt.http.codec.http.frame;
15 
16 import hunt.container;
17 
18 import hunt.datetime;
19 import hunt.lang.common;
20 import hunt.lang.exception;
21 import hunt.util.functional;
22 
23 import hunt.util.concurrent.atomic;
24 import hunt.util.concurrent.CountingCallback;
25 import hunt.util.concurrent.Promise;
26 import hunt.util.concurrent.Scheduler;
27 
28 import hunt.net.Session;
29 
30 import hunt.logging;
31 
32 // import core.exception;
33 
34 import std.algorithm;
35 import std.conv;
36 import std.datetime;
37 // import std.exception;
38 import std.format;
39 import std.range;
40 import std.typecons;
41 
42 alias TcpSession = hunt.net.Session.Session;
43 alias StreamSession = hunt.http.codec.http.stream.Session.Session;
44 
45 
46 /**
47 */
48 abstract class Http2Session : SessionSPI, Parser.Listener {
49 
50     // = new ConcurrentHashMap<>();
51     // private AtomicInteger streamIds = new AtomicInteger();
52     // private AtomicInteger lastStreamId = new AtomicInteger();
53     // private AtomicInteger localStreamCount = new AtomicInteger();
54     // private AtomicBiInteger remoteStreamCount = new AtomicBiInteger();
55     // private AtomicInteger sendWindow = new AtomicInteger();
56     // private AtomicInteger recvWindow = new AtomicInteger();
57     // private AtomicReference<CloseState> closed = new AtomicReference<>(CloseState.NOT_CLOSED);
58     // private AtomicLong bytesWritten = new AtomicLong();
59 
60     private StreamSPI[int] streams;
61     private int streamIds ;
62     private int lastStreamId ;
63     private int localStreamCount ;
64     private long remoteStreamCount ;
65     private int sendWindow ;
66     private int recvWindow ;
67     private CloseState closed = CloseState.NOT_CLOSED;
68     private long bytesWritten;
69 
70     private Scheduler scheduler;
71     private TcpSession endPoint;
72     private Http2Generator generator;
73     private StreamSession.Listener listener;
74     private FlowControlStrategy flowControl;
75     private Http2Flusher flusher;
76     private int maxLocalStreams;
77     private int maxRemoteStreams;
78     private long streamIdleTimeout;
79     private int initialSessionRecvWindow;
80     private bool pushEnabled;
81     private long idleTime;
82 
83     alias convertToMillisecond = convert!(TimeUnit.HectoNanosecond, TimeUnit.Millisecond);
84 
85     this(Scheduler scheduler, TcpSession endPoint, Http2Generator generator,
86                         StreamSession.Listener listener, FlowControlStrategy flowControl,
87                         int initialStreamId, int streamIdleTimeout) {
88         this.scheduler = scheduler;
89         this.endPoint = endPoint;
90         this.generator = generator;
91         this.listener = listener;
92         this.flowControl = flowControl;
93         this.flusher = new Http2Flusher(this);
94         this.maxLocalStreams = -1;
95         this.maxRemoteStreams = -1;
96         this.streamIds = (initialStreamId);
97         this.streamIdleTimeout = streamIdleTimeout > 0 ? streamIdleTimeout : endPoint.getMaxIdleTimeout();
98         this.sendWindow = (FlowControlStrategy.DEFAULT_WINDOW_SIZE);
99         this.recvWindow = (FlowControlStrategy.DEFAULT_WINDOW_SIZE);
100         this.pushEnabled = true; // SPEC: by default, push is enabled.
101         this.idleTime = convertToMillisecond(Clock.currStdTime); //Millisecond100Clock.currentTimeMillis();
102     }
103 
104     FlowControlStrategy getFlowControlStrategy() {
105         return flowControl;
106     }
107 
108     int getMaxLocalStreams() {
109         return maxLocalStreams;
110     }
111 
112     void setMaxLocalStreams(int maxLocalStreams) {
113         this.maxLocalStreams = maxLocalStreams;
114     }
115 
116     int getMaxRemoteStreams() {
117         return maxRemoteStreams;
118     }
119 
120     void setMaxRemoteStreams(int maxRemoteStreams) {
121         this.maxRemoteStreams = maxRemoteStreams;
122     }
123 
124     long getStreamIdleTimeout() {
125         return streamIdleTimeout;
126     }
127 
128     void setStreamIdleTimeout(long streamIdleTimeout) {
129         this.streamIdleTimeout = streamIdleTimeout;
130     }
131 
132     int getInitialSessionRecvWindow() {
133         return initialSessionRecvWindow;
134     }
135 
136     void setInitialSessionRecvWindow(int initialSessionRecvWindow) {
137         this.initialSessionRecvWindow = initialSessionRecvWindow;
138     }
139 
140     TcpSession getEndPoint() {
141         return endPoint;
142     }
143 
144     Http2Generator getGenerator() {
145         return generator;
146     }
147 
148     override
149     long getBytesWritten() {
150         return bytesWritten;
151     }
152 
153     override
154     void onData(DataFrame frame) {
155         version(HUNT_DEBUG) {
156             tracef("Received %s", frame.toString());
157         }
158         int streamId = frame.getStreamId();
159         StreamSPI stream = cast(StreamSPI) getStream(streamId);
160 
161         // SPEC: the session window must be updated even if the stream is null.
162         // The flow control length includes the padding bytes.
163         int flowControlLength = frame.remaining() + frame.padding();
164         flowControl.onDataReceived(this, stream, flowControlLength);
165 
166         class DataCallback : NoopCallback
167         {
168             override
169             void succeeded() {
170                 complete();
171             }
172 
173             override
174             void failed(Exception x) {
175                 // Consume also in case of failures, to free the
176                 // session flow control window for other streams.
177                 complete();
178             }
179 
180             private void complete() {
181                 // notIdle();
182                 // stream.notIdle();
183                 flowControl.onDataConsumed(this.outer, stream, flowControlLength);
184             }
185         }
186 
187         if (stream !is null) {
188             if (getRecvWindow() < 0) {
189                 close(cast(int)ErrorCode.FLOW_CONTROL_ERROR, "session_window_exceeded", Callback.NOOP);
190             } else {
191                 stream.process(frame, new DataCallback() );
192             }
193         } else {
194             version(HUNT_DEBUG) {
195                 tracef("Ignoring %s, stream #%s not found", frame.toString(), streamId);
196             }
197             // We must enlarge the session flow control window,
198             // otherwise other requests will be stalled.
199             flowControl.onDataConsumed(this, null, flowControlLength);
200         }
201     }
202 
203     override
204     abstract void onHeaders(HeadersFrame frame);
205 
206     override
207     void onPriority(PriorityFrame frame) {
208         version(HUNT_DEBUG) {
209             tracef("Received %s", frame.toString());
210         }
211     }
212 
213     override
214     void onReset(ResetFrame frame) {
215         version(HUNT_DEBUG) {
216             tracef("Received %s", frame.toString());
217         }
218         StreamSPI stream = cast(StreamSPI)getStream(frame.getStreamId());
219         if (stream !is null)
220             stream.process(frame, new ResetCallback());
221         else
222             notifyReset(this, frame);
223     }
224 
225     override
226     void onSettings(SettingsFrame frame) {
227         // SPEC: SETTINGS frame MUST be replied.
228         onSettings(frame, true);
229     }
230 
231     void onSettings(SettingsFrame frame, bool reply) {
232         version(HUNT_DEBUG) {
233             tracef("Received %s", frame.toString());
234         }
235         if (frame.isReply())
236             return;
237 
238         // Iterate over all settings
239         //for (Map.Entry!(int, int) entry : frame.getSettings().entrySet()) 
240         foreach(int key, int value; frame.getSettings())
241         {
242             // int key = entry.getKey();
243             // int value = entry.getValue();
244             switch (key) {
245                 case SettingsFrame.HEADER_TABLE_SIZE: {
246                     version(HUNT_DEBUG) {
247                         tracef("Update HPACK header table size to %s for %s", value, this.toString());
248                     }
249                     generator.setHeaderTableSize(value);
250                     break;
251                 }
252                 case SettingsFrame.ENABLE_PUSH: {
253                     // SPEC: check the value is sane.
254                     if (value != 0 && value != 1) {
255                         onConnectionFailure(cast(int)ErrorCode.PROTOCOL_ERROR, "invalid_settings_enable_push");
256                         return;
257                     }
258                     pushEnabled = value == 1;
259                     version(HUNT_DEBUG) {
260                         tracef("%s push for %s", pushEnabled ? "Enable" : "Disable", this.toString());
261                     }
262                     break;
263                 }
264                 case SettingsFrame.MAX_CONCURRENT_STREAMS: {
265                     maxLocalStreams = value;
266                     version(HUNT_DEBUG) {
267                         tracef("Update max local concurrent streams to %s for %s", maxLocalStreams, this.toString());
268                     }
269                     break;
270                 }
271                 case SettingsFrame.INITIAL_WINDOW_SIZE: {
272                     version(HUNT_DEBUG) {
273                         tracef("Update initial window size to %s for %s", value, this.toString());
274                     }
275                     flowControl.updateInitialStreamWindow(this, value, false);
276                     break;
277                 }
278                 case SettingsFrame.MAX_FRAME_SIZE: {
279                     version(HUNT_DEBUG) {
280                         tracef("Update max frame size to %s for %s", value, this.toString());
281                     }
282                     // SPEC: check the max frame size is sane.
283                     if (value < Frame.DEFAULT_MAX_LENGTH || value > Frame.MAX_MAX_LENGTH) {
284                         onConnectionFailure(cast(int)ErrorCode.PROTOCOL_ERROR, "invalid_settings_max_frame_size");
285                         return;
286                     }
287                     generator.setMaxFrameSize(value);
288                     break;
289                 }
290                 case SettingsFrame.MAX_HEADER_LIST_SIZE: {
291                     version(HUNT_DEBUG) {
292                         tracef("Update max header list size to %s for %s", value, this.toString());
293                     }
294                     generator.setMaxHeaderListSize(value);
295                     break;
296                 }
297                 default: {
298                     version(HUNT_DEBUG) {
299                         tracef("Unknown setting %s:%s for %s", key, value, this.toString());
300                     }
301                     break;
302                 }
303             }
304         }
305         notifySettings(this, frame);
306 
307         if (reply) {
308             SettingsFrame replyFrame = new SettingsFrame(Collections.emptyMap!(int, int)(), true);
309             settings(replyFrame, Callback.NOOP);
310         }
311     }
312 
313     override
314     void onPing(PingFrame frame) {
315         version(HUNT_DEBUG) {
316             tracef("Received %s", frame.toString());
317         }
318         if (frame.isReply()) {
319             info("The session %s received ping reply", endPoint.getSessionId());
320             notifyPing(this, frame);
321         } else {
322             PingFrame reply = new PingFrame(frame.getPayload(), true);
323             control(null, Callback.NOOP, reply);
324         }
325     }
326 
327     /**
328      * This method is called when receiving a GO_AWAY from the other peer.
329      * We check the close state to act appropriately:
330      * <ul>
331      * <li>NOT_CLOSED: we move to REMOTELY_CLOSED and queue a disconnect, so
332      * that the content of the queue is written, and then the connection
333      * closed. We notify the application after being terminated.
334      * See <code>Http2Session.ControlEntry#succeeded()</code></li>
335      * <li>In all other cases, we do nothing since other methods are already
336      * performing their actions.</li>
337      * </ul>
338      *
339      * @param frame the GO_AWAY frame that has been received.
340      * @see #close(int, string, Callback)
341      * @see #onShutdown()
342      * @see #onIdleTimeout()
343      */
344     override
345     void onGoAway(GoAwayFrame frame) {
346         version(HUNT_DEBUG) {
347             tracef("Received %s", frame.toString());
348         }
349         while (true) {
350             CloseState current = closed;
351             switch (current) {
352                 case CloseState.NOT_CLOSED: {
353                     if (closed == current) {
354                         closed = CloseState.REMOTELY_CLOSED;
355                         // We received a GO_AWAY, so try to write
356                         // what's in the queue and then disconnect.
357                         notifyClose(this, frame, new DisconnectCallback());
358                         return;
359                     }
360                     break;
361                 }
362                 default: {
363                     version(HUNT_DEBUG) {
364                         tracef("Ignored %s, already closed", frame.toString());
365                     }
366                     return;
367                 }
368             }
369         }
370     }
371 
372     override
373     void onWindowUpdate(WindowUpdateFrame frame) {
374         version(HUNT_DEBUG) {
375             tracef("Received %s", frame.toString());
376         }
377         int streamId = frame.getStreamId();
378         if (streamId > 0) {
379             StreamSPI stream = cast(StreamSPI)getStream(streamId);
380             if (stream !is null) {
381                 stream.process(frame, Callback.NOOP);
382                 onWindowUpdate(stream, frame);
383             }
384         } else {
385             onWindowUpdate(null, frame);
386         }
387     }
388 
389     override
390     void onConnectionFailure(int error, string reason) {
391         notifyFailure(this, new IOException(format("%d/%s", error, reason)), new CloseCallback(error, reason));
392     }
393 
394     override
395     void newStream(HeadersFrame frame, Promise!Stream promise, Stream.Listener listener) {
396         // Synchronization is necessary to atomically create
397         // the stream id and enqueue the frame to be sent.
398         bool queued;
399         synchronized (this) {
400             int streamId = frame.getStreamId();
401             if (streamId <= 0) {
402                 streamId = streamIds; streamIds += 2; // streamIds.getAndAdd(2);
403                 PriorityFrame priority = frame.getPriority();
404                 priority = priority is null ? null : new PriorityFrame(streamId, priority.getParentStreamId(),
405                         priority.getWeight(), priority.isExclusive());
406                 frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
407             }
408             StreamSPI stream = createLocalStream(streamId, promise);
409             if (stream is null)
410                 return;
411             stream.setListener(listener);
412 
413             ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback!Stream(promise, stream));
414             queued = flusher.append(entry);
415         }
416         // Iterate outside the synchronized block.
417         if (queued)
418             flusher.iterate();
419     }
420 
421     override
422     int priority(PriorityFrame frame, Callback callback) {
423         int streamId = frame.getStreamId();
424         StreamSPI stream = streams[streamId];
425         if (stream is null) {
426             streamId = streamIds; streamIds += 2; //.getAndAdd(2);
427             frame = new PriorityFrame(streamId, frame.getParentStreamId(),
428                     frame.getWeight(), frame.isExclusive());
429         }
430         control(stream, callback, frame);
431         return streamId;
432     }
433 
434     override
435     void push(StreamSPI stream, Promise!Stream promise, PushPromiseFrame frame, Stream.Listener listener) {
436         // Synchronization is necessary to atomically create
437         // the stream id and enqueue the frame to be sent.
438         bool queued;
439         synchronized (this) {
440             int streamId = streamIds; streamIds += 2; //.getAndAdd(2);
441             frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
442 
443             StreamSPI pushStream = createLocalStream(streamId, promise);
444             if (pushStream is null)
445                 return;
446             pushStream.setListener(listener);
447 
448             ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback!Stream(promise, pushStream));
449             queued = flusher.append(entry);
450         }
451         // Iterate outside the synchronized block.
452         if (queued)
453             flusher.iterate();
454     }
455 
456     override
457     void settings(SettingsFrame frame, Callback callback) {
458         control(null, callback, frame);
459     }
460 
461     override
462     void ping(PingFrame frame, Callback callback) {
463         if (frame.isReply())
464             callback.failed(new IllegalArgumentException(""));
465         else
466             control(null, callback, frame);
467     }
468 
469     protected void reset(ResetFrame frame, Callback callback) {
470         control(cast(StreamSPI)getStream(frame.getStreamId()), callback, frame);
471     }
472 
473     /**
474      * Invoked internally and by applications to send a GO_AWAY frame to the
475      * other peer. We check the close state to act appropriately:
476      * <ul>
477      * <li>NOT_CLOSED: we move to LOCALLY_CLOSED and queue a GO_AWAY. When the
478      * GO_AWAY has been written, it will only cause the output to be shut
479      * down (not the connection closed), so that the application can still
480      * read frames arriving from the other peer.
481      * Ideally the other peer will notice the GO_AWAY and close the connection.
482      * When that happen, we close the connection from {@link #onShutdown()}.
483      * Otherwise, the idle timeout mechanism will close the connection, see
484      * {@link #onIdleTimeout()}.</li>
485      * <li>In all other cases, we do nothing since other methods are already
486      * performing their actions.</li>
487      * </ul>
488      *
489      * @param error    the error code
490      * @param reason   the reason
491      * @param callback the callback to invoke when the operation is complete
492      * @see #onGoAway(GoAwayFrame)
493      * @see #onShutdown()
494      * @see #onIdleTimeout()
495      */
496     override
497     bool close(int error, string reason, Callback callback) {
498         while (true) {
499             CloseState current = closed;
500             switch (current) {
501                 case CloseState.NOT_CLOSED: {
502                     if (closed == current) {
503                         closed =  CloseState.LOCALLY_CLOSED;
504                         GoAwayFrame frame = newGoAwayFrame(error, reason);
505                         control(null, callback, frame);
506                         return true;
507                     }
508                     break;
509                 }
510                 default: {
511                     version(HUNT_DEBUG)
512                         tracef("Ignoring close %s/%s, already closed", error, reason);
513                     callback.succeeded();
514                     return false;
515                 }
516             }
517         }
518     }
519 
520     private GoAwayFrame newGoAwayFrame(int error, string reason) {
521         byte[] payload = null;
522         if (!reason.empty) {
523             // Trim the reason to avoid attack vectors.
524             int len = min(reason.length, 32);
525             payload = cast(byte[])reason[0..len].dup;
526         }
527         return new GoAwayFrame(lastStreamId, error, payload);
528     }
529 
530     override
531     bool isClosed() {
532         return closed != CloseState.NOT_CLOSED;
533     }
534 
535     private void control(StreamSPI stream, Callback callback, Frame frame) {
536         frames(stream, callback, frame, Frame.EMPTY_ARRAY);
537     }
538 
539     override
540     void frames(StreamSPI stream, Callback callback, Frame frame, Frame[] frames... ) {
541         // We want to generate as late as possible to allow re-prioritization;
542         // generation will happen while processing the entries.
543 
544         // The callback needs to be notified only when the last frame completes.
545 
546         int length = cast(int)frames.length;
547         if (length == 0) {
548             onFrame(new ControlEntry(frame, stream, callback), true);
549         } else {
550             callback = new CountingCallback(callback, 1 + length);
551             onFrame(new ControlEntry(frame, stream, callback), false);
552             for (int i = 1; i <= length; ++i)
553                 onFrame(new ControlEntry(frames[i - 1], stream, callback), i == length);
554         }
555     }
556 
557     override
558     void data(StreamSPI stream, Callback callback, DataFrame frame) {
559         // We want to generate as late as possible to allow re-prioritization.
560         onFrame(new DataEntry(frame, stream, callback), true);
561     }
562 
563     private void onFrame(Http2Flusher.Entry entry, bool flush) {
564         version(HUNT_DEBUG) {
565             tracef("%s %s", flush ? "Sending" : "Queueing", entry.frame.toString());
566         }
567         // Ping frames are prepended to process them as soon as possible.
568         bool queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry);
569         if (queued && flush) {
570             if (entry.stream !is null) {
571                 // entry.stream.notIdle();
572             }
573             flusher.iterate();
574         }
575     }
576 
577     protected StreamSPI createLocalStream(int streamId, Promise!Stream promise) {
578         while (true) {
579             int localCount = localStreamCount;
580             int maxCount = getMaxLocalStreams();
581             if (maxCount >= 0 && localCount >= maxCount) {
582                 promise.failed(new IllegalStateException("Max local stream count " ~ maxCount.to!string() ~ " exceeded"));
583                 return null;
584             }
585             if (localStreamCount == localCount)
586             {
587                 localStreamCount =  localCount + 1;
588                 break;
589             }
590         }
591 
592         StreamSPI stream = newStream(streamId, true);
593         auto itemPtr = streamId in streams;
594         if (itemPtr is null) {
595             streams[streamId] = stream;
596             // stream.setIdleTimeout(getStreamIdleTimeout());
597             flowControl.onStreamCreated(stream);
598             version(HUNT_DEBUG) {
599                 tracef("Created local %s", stream.toString());
600             }
601             return stream;
602         } else {
603             promise.failed(new IllegalStateException("Duplicate stream " ~ streamId.to!string()));
604             return null;
605         }
606     }
607 
608     protected StreamSPI createRemoteStream(int streamId) {
609         // SPEC: exceeding max concurrent streams is treated as stream error.
610         while (true) {
611             long encoded = remoteStreamCount;
612             int remoteCount = AtomicBiInteger.getHi(encoded);
613             int remoteClosing = AtomicBiInteger.getLo(encoded);
614             int maxCount = getMaxRemoteStreams();
615             if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount) {
616                 reset(new ResetFrame(streamId, cast(int)ErrorCode.REFUSED_STREAM_ERROR), Callback.NOOP);
617                 return null;
618             }
619             // if (remoteStreamCount.compareAndSet(encoded, remoteCount + 1, remoteClosing))
620             if(remoteStreamCount == encoded)
621             {
622                 remoteStreamCount = AtomicBiInteger.encode(remoteCount + 1, remoteClosing);
623                 break;
624             }
625         }
626 
627         StreamSPI stream = newStream(streamId, false);
628 
629         // SPEC: duplicate stream is treated as connection error.
630         //streams.putIfAbsent(streamId, stream)
631         auto itemPtr = streamId in streams;
632         if ( itemPtr is null) {
633             streams[streamId] = stream;
634             updateLastStreamId(streamId);
635             // stream.setIdleTimeout(getStreamIdleTimeout());
636             flowControl.onStreamCreated(stream);
637             version(HUNT_DEBUG) {
638                 tracef("Created remote %s", stream.toString());
639             }
640             return stream;
641         } else {
642             close(cast(int)ErrorCode.PROTOCOL_ERROR, "duplicate_stream", Callback.NOOP);
643             return null;
644         }
645     }
646 
647     void updateStreamCount(bool local, int deltaStreams, int deltaClosing) {
648         if (local)
649             localStreamCount += (deltaStreams);
650         else
651         {
652             remoteStreamCount = AtomicBiInteger.encode(remoteStreamCount, deltaStreams, deltaClosing);
653             // remoteStreamCount.add(deltaStreams, deltaClosing);
654         }
655     }
656 
657     protected StreamSPI newStream(int streamId, bool local) {
658         return new Http2Stream(scheduler, this, streamId, local);
659     }
660 
661     override
662     void removeStream(StreamSPI stream) {
663         bool removed = streams.remove(stream.getId());
664         if (removed) {
665             onStreamClosed(stream);
666             flowControl.onStreamDestroyed(stream);
667             version(HUNT_DEBUG) {
668                 tracef("Removed %s %s", stream.isLocal() ? "local" : "remote", stream);
669             }
670         }
671     }
672 
673     override
674     Stream[] getStreams() {
675         return cast(Stream[])(streams.values());
676     }
677 
678     int getStreamCount() {
679         return cast(int)streams.length;
680     }
681 
682     override
683     StreamSPI getStream(int streamId) {
684         return streams[streamId];
685     }
686 
687     int getSendWindow() {
688         return sendWindow;
689     }
690 
691     int getRecvWindow() {
692         return recvWindow;
693     }
694 
695     override
696     int updateSendWindow(int delta) {
697         int old = sendWindow;
698         sendWindow += delta;
699         return old; 
700     }
701 
702     override
703     int updateRecvWindow(int delta) {
704         int old = recvWindow;
705         recvWindow += delta;
706         return old; 
707     }
708 
709     override
710     void onWindowUpdate(StreamSPI stream, WindowUpdateFrame frame) {
711         // WindowUpdateFrames arrive concurrently with writes.
712         // Increasing (or reducing) the window size concurrently
713         // with writes requires coordination with the flusher, that
714         // decides how many frames to write depending on the available
715         // window sizes. If the window sizes vary concurrently, the
716         // flusher may take non-optimal or wrong decisions.
717         // Here, we "queue" window updates to the flusher, so it will
718         // be the only component responsible for window updates, for
719         // both increments and reductions.
720         flusher.window(stream, frame);
721     }
722 
723     override
724     bool isPushEnabled() {
725         return pushEnabled;
726     }
727 
728     /**
729      * A typical close by a remote peer involves a GO_AWAY frame followed by TCP FIN.
730      * This method is invoked when the TCP FIN is received, or when an exception is
731      * thrown while reading, and we check the close state to act appropriately:
732      * <ul>
733      * <li>NOT_CLOSED: means that the remote peer did not send a GO_AWAY (abrupt close)
734      * or there was an exception while reading, and therefore we terminate.</li>
735      * <li>LOCALLY_CLOSED: we have sent the GO_AWAY to the remote peer, which received
736      * it and closed the connection; we queue a disconnect to close the connection
737      * on the local side.
738      * The GO_AWAY just shutdown the output, so we need this step to make sure the
739      * connection is closed. See {@link #close(int, string, Callback)}.</li>
740      * <li>REMOTELY_CLOSED: we received the GO_AWAY, and the TCP FIN afterwards, so we
741      * do nothing since the handling of the GO_AWAY will take care of closing the
742      * connection. See {@link #onGoAway(GoAwayFrame)}.</li>
743      * </ul>
744      *
745      * @see #onGoAway(GoAwayFrame)
746      * @see #close(int, string, Callback)
747      * @see #onIdleTimeout()
748      */
749     override
750     void onShutdown() {
751         version(HUNT_DEBUG) {
752             tracef("Shutting down %s", this.toString());
753         }
754         switch (closed) {
755             case CloseState.NOT_CLOSED: {
756                 // The other peer did not send a GO_AWAY, no need to be gentle.
757                 version(HUNT_DEBUG) {
758                     tracef("Abrupt close for %s", this.toString());
759                 }
760                 abort(new ClosedChannelException(""));
761                 break;
762             }
763             case CloseState.LOCALLY_CLOSED: {
764                 // We have closed locally, and only shutdown
765                 // the output; now queue a disconnect.
766                 control(null, Callback.NOOP, new DisconnectFrame());
767                 break;
768             }
769             case CloseState.REMOTELY_CLOSED: {
770                 // Nothing to do, the GO_AWAY frame we
771                 // received will close the connection.
772                 break;
773             }
774             default: {
775                 break;
776             }
777         }
778     }
779 
780     /**
781      * This method is invoked when the idle timeout triggers. We check the close state
782      * to act appropriately:
783      * <ul>
784      * <li>NOT_CLOSED: it's a real idle timeout, we just initiate a close, see
785      * {@link #close(int, string, Callback)}.</li>
786      * <li>LOCALLY_CLOSED: we have sent a GO_AWAY and only shutdown the output, but the
787      * other peer did not close the connection so we never received the TCP FIN, and
788      * therefore we terminate.</li>
789      * <li>REMOTELY_CLOSED: the other peer sent us a GO_AWAY, we should have queued a
790      * disconnect, but for some reason it was not processed (for example, queue was
791      * stuck because of TCP congestion), therefore we terminate.
792      * See {@link #onGoAway(GoAwayFrame)}.</li>
793      * </ul>
794      *
795      * @return true if the session should be closed, false otherwise
796      * @see #onGoAway(GoAwayFrame)
797      * @see #close(int, string, Callback)
798      * @see #onShutdown()
799      */
800     override
801     bool onIdleTimeout() {
802         switch (closed) {
803             case CloseState.NOT_CLOSED: {
804                 long elapsed = convertToMillisecond(Clock.currStdTime) - idleTime;
805                 version(HUNT_DEBUG) {
806                     tracef("HTTP2 session on idle timeout. The elapsed time is %s - %s", elapsed, endPoint.getMaxIdleTimeout());
807                 }
808                 return elapsed >= endPoint.getMaxIdleTimeout() && notifyIdleTimeout(this);
809             }
810             case CloseState.LOCALLY_CLOSED:
811             case CloseState.REMOTELY_CLOSED: {
812                 abort(new TimeoutException("Idle timeout " ~ endPoint.getMaxIdleTimeout().to!string() ~ " ms"));
813                 return false;
814             }
815             default: {
816                 return false;
817             }
818         }
819     }
820 
821     private void notIdle() {
822         idleTime = convertToMillisecond(Clock.currStdTime);
823     }
824 
825     override
826     void onFrame(Frame frame) {
827         onConnectionFailure(cast(int)ErrorCode.PROTOCOL_ERROR, "upgrade");
828     }
829 
830     protected void onStreamOpened(StreamSPI stream) {
831     }
832 
833     protected void onStreamClosed(StreamSPI stream) {
834     }
835 
836     void disconnect() {
837         version(HUNT_DEBUG) {
838             tracef("Disconnecting %s", this.toString());
839         }
840         endPoint.close();
841     }
842 
843     private void terminate(Exception cause) {
844         while (true) {
845             CloseState current = closed;
846             switch (current) {
847                 case CloseState.NOT_CLOSED:
848                 case CloseState.LOCALLY_CLOSED:
849                 case CloseState.REMOTELY_CLOSED: {
850                     if (closed == current) {
851                         closed = CloseState.CLOSED;
852                         flusher.terminate(cause);
853                         foreach (StreamSPI stream ; streams.byValue())
854                             stream.close();
855                         streams.clear();
856                         disconnect();
857                         return;
858                     }
859                     break;
860                 }
861                 default: {
862                     return;
863                 }
864             }
865         }
866     }
867 
868     void abort(Exception failure) {
869         notifyFailure(this, failure, new TerminateCallback(failure));
870     }
871 
872     bool isDisconnected() {
873         return !endPoint.isOpen();
874     }
875 
876     private void updateLastStreamId(int streamId) {
877         // Atomics.updateMax(lastStreamId, streamId);
878         if(streamId>lastStreamId) lastStreamId = streamId;
879     }
880 
881     protected Stream.Listener notifyNewStream(Stream stream, HeadersFrame frame) {
882         try {
883             return listener.onNewStream(stream, frame);
884         } catch (Exception x) {
885             info("Failure while notifying listener " ~ listener.toString(), x);
886             return null;
887         }
888     }
889 
890     protected void notifySettings(StreamSession session, SettingsFrame frame) {
891         try {
892             listener.onSettings(session, frame);
893         } catch (Exception x) {
894             info("Failure while notifying listener " ~ listener.toString(), x);
895         }
896     }
897 
898     protected void notifyPing(StreamSession session, PingFrame frame) {
899         try {
900             listener.onPing(session, frame);
901         } catch (Exception x) {
902             info("Failure while notifying listener " ~ listener.toString(), x);
903         }
904     }
905 
906     protected void notifyReset(StreamSession session, ResetFrame frame) {
907         try {
908             listener.onReset(session, frame);
909         } catch (Exception x) {
910             info("Failure while notifying listener " ~ listener.toString(), x);
911         }
912     }
913 
914     protected void notifyClose(StreamSession session, GoAwayFrame frame, Callback callback) {
915         try {
916             listener.onClose(session, frame, callback);
917         } catch (Exception x) {
918             info("Failure while notifying listener " ~ listener.toString(), x);
919         }
920     }
921 
922     protected bool notifyIdleTimeout(StreamSession session) {
923         try {
924             return listener.onIdleTimeout(session);
925         } catch (Exception x) {
926             info("Failure while notifying listener " ~ listener.toString(), x);
927             return true;
928         }
929     }
930 
931     protected void notifyFailure(StreamSession session, Exception failure, Callback callback) {
932         try {
933             listener.onFailure(session, failure, callback);
934         } catch (Exception x) {
935             info("Failure while notifying listener " ~ listener.toString(), x);
936         }
937     }
938 
939     protected void notifyHeaders(StreamSPI stream, HeadersFrame frame) {
940         // Optional.ofNullable(stream.getListener()).ifPresent(listener -> {
941         //     try {
942         //         listener.onHeaders(stream, frame);
943         //     } catch (Exception x) {
944         //         info("Failure while notifying listener " ~ listener.toString(), x);
945         //     }
946         // });
947         auto listener = stream.getListener();
948         if(listener !is null)
949         {
950             try {
951                 listener.onHeaders(stream, frame);
952             } catch (Exception x) {
953                 info("Failure while notifying listener " ~ listener.toString(), x);
954             }            
955         }
956     }
957 
958     override
959     string toString() {
960         return format("%s@%s{l:%s <-> r:%s,sendWindow=%s,recvWindow=%s,streams=%d,%s}",
961                 typeof(this).stringof,
962                 toHash(),
963                 getEndPoint().getLocalAddress(),
964                 getEndPoint().getRemoteAddress(),
965                 sendWindow,
966                 recvWindow,
967                 streams.length,
968                 closed);
969     }
970 
971     private class ControlEntry : Http2Flusher.Entry {
972         private int bytes;
973 
974         private this(Frame frame, StreamSPI stream, Callback callback) {
975             super(frame, stream, callback);
976         }
977 
978         override
979         protected bool generate(Queue!ByteBuffer buffers) {
980             List!(ByteBuffer) controlFrame = generator.control(frame);
981             bytes = cast(int) BufferUtils.remaining(controlFrame);
982             buffers.addAll(controlFrame);
983             version(HUNT_DEBUG) {
984                 tracef("Generated %s", frame.toString());
985             }
986             beforeSend();
987             return true;
988         }
989 
990         /**
991          * <p>Performs actions just before writing the frame to the network.</p>
992          * <p>Some frame, when sent over the network, causes the receiver
993          * to react and send back frames that may be processed by the original
994          * sender *before* {@link #succeeded()} is called.
995          * <p>If the action to perform updates some state, this update may
996          * not be seen by the received frames and cause errors.</p>
997          * <p>For example, suppose the action updates the stream window to a
998          * larger value; the sender sends the frame; the receiver is now entitled
999          * to send back larger data; when the data is received by the original
1000          * sender, the action may have not been performed yet, causing the larger
1001          * data to be rejected, when it should have been accepted.</p>
1002          */
1003         private void beforeSend() {
1004             switch (frame.getType()) {
1005                 case FrameType.HEADERS: {
1006                     HeadersFrame headersFrame = cast(HeadersFrame) frame;
1007                     stream.updateClose(headersFrame.isEndStream(), CloseStateEvent.BEFORE_SEND);
1008                     break;
1009                 }
1010                 case FrameType.SETTINGS: {
1011                     SettingsFrame settingsFrame = cast(SettingsFrame) frame;
1012                     Map!(int, int) settings = settingsFrame.getSettings();
1013                     if (settings.containsKey(SettingsFrame.INITIAL_WINDOW_SIZE))
1014                     {
1015                         int initialWindow = settings.get(SettingsFrame.INITIAL_WINDOW_SIZE);
1016                         flowControl.updateInitialStreamWindow(this.outer, initialWindow, true);
1017                     }
1018                     break;
1019                 }
1020                 default: {
1021                     break;
1022                 }
1023             }
1024         }
1025 
1026         override
1027         void succeeded() {
1028             bytesWritten += (bytes);
1029             switch (frame.getType()) {
1030                 case FrameType.HEADERS: {
1031                     onStreamOpened(stream);
1032                     HeadersFrame headersFrame = cast(HeadersFrame) frame;
1033                     if (stream.updateClose(headersFrame.isEndStream(), CloseStateEvent.AFTER_SEND))
1034                         removeStream(stream);
1035                     break;
1036                 }
1037                 case FrameType.RST_STREAM: {
1038                     if (stream !is null) {
1039                         stream.close();
1040                         removeStream(stream);
1041                     }
1042                     break;
1043                 }
1044                 case FrameType.PUSH_PROMISE: {
1045                     // Pushed streams are implicitly remotely closed.
1046                     // They are closed when sending an end-stream DATA frame.
1047                     stream.updateClose(true, CloseStateEvent.RECEIVED);
1048                     break;
1049                 }
1050                 case FrameType.GO_AWAY: {
1051                     // We just sent a GO_AWAY, only shutdown the
1052                     // output without closing yet, to allow reads.
1053                     getEndPoint().shutdownOutput();
1054                     break;
1055                 }
1056                 case FrameType.WINDOW_UPDATE: {
1057                     flowControl.windowUpdate(this.outer, stream, cast(WindowUpdateFrame) frame);
1058                     break;
1059                 }
1060                 case FrameType.DISCONNECT: {
1061                     terminate(new ClosedChannelException(""));
1062                     break;
1063                 }
1064                 default: {
1065                     break;
1066                 }
1067             }
1068             super.succeeded();
1069         }
1070 
1071         override
1072         void failed(Exception x) {
1073             if (frame.getType() == FrameType.DISCONNECT) {
1074                 terminate(new ClosedChannelException(""));
1075             }
1076             super.failed(x);
1077         }
1078     }
1079 
1080     private class DataEntry :Http2Flusher.Entry {
1081         private int bytes;
1082         private int _dataRemaining;
1083         private int dataWritten;
1084 
1085         private this(DataFrame frame, StreamSPI stream, Callback callback) {
1086             super(frame, stream, callback);
1087             // We don't do any padding, so the flow control length is
1088             // always the data remaining. This simplifies the handling
1089             // of data frames that cannot be completely written due to
1090             // the flow control window exhausting, since in that case
1091             // we would have to count the padding only once.
1092             _dataRemaining = frame.remaining();
1093         }
1094 
1095         override
1096         int dataRemaining() {
1097             return _dataRemaining;
1098         }
1099 
1100         override
1101         protected bool generate(Queue!ByteBuffer buffers) {
1102             int remaining = dataRemaining();
1103 
1104             int sessionSendWindow = getSendWindow();
1105             int streamSendWindow = stream.updateSendWindow(0);
1106             int window = min(streamSendWindow, sessionSendWindow);
1107             if (window <= 0 && remaining > 0)
1108                 return false;
1109 
1110             int length = min(remaining, window);
1111 
1112             // Only one DATA frame is generated.
1113             DataFrame dataFrame = cast(DataFrame) frame;
1114             Tuple!(int, List!(ByteBuffer)) pair = generator.data(dataFrame, length);
1115             bytes = pair[0];
1116             buffers.addAll(pair[1]);
1117             int written = bytes - Frame.HEADER_LENGTH;
1118             version(HUNT_DEBUG) {
1119                 tracef("Generated %s, length/window/data=%s/%s/%s", dataFrame, written, window, remaining);
1120             }
1121             this.dataWritten = written;
1122             this._dataRemaining -= written;
1123 
1124             flowControl.onDataSending(stream, written);
1125             stream.updateClose(dataFrame.isEndStream(), CloseStateEvent.BEFORE_SEND);
1126 
1127             return true;
1128         }
1129 
1130         override
1131         void succeeded() {
1132             bytesWritten += bytes;
1133             flowControl.onDataSent(stream, dataWritten);
1134 
1135             // Do we have more to send ?
1136             DataFrame dataFrame = cast(DataFrame) frame;
1137             if (_dataRemaining == 0) {
1138                 // Only now we can update the close state
1139                 // and eventually remove the stream.
1140                 if (stream.updateClose(dataFrame.isEndStream(), CloseStateEvent.AFTER_SEND))
1141                     removeStream(stream);
1142                 super.succeeded();
1143             }
1144         }
1145     }
1146 
1147     private static class PromiseCallback(C) : NoopCallback {
1148         private Promise!C promise;
1149         private C value;
1150 
1151         private this(Promise!C promise, C value) {
1152             this.promise = promise;
1153             this.value = value;
1154         }
1155 
1156         override
1157         void succeeded() {
1158             promise.succeeded(value);
1159         }
1160 
1161         override
1162         void failed(Exception x) {
1163             promise.failed(x);
1164         }
1165     }
1166 
1167     private class ResetCallback : NoopCallback {
1168         override
1169         void succeeded() {
1170             complete();
1171         }
1172 
1173         override
1174         void failed(Exception x) {
1175             complete();
1176         }
1177 
1178         private void complete() {
1179             flusher.iterate();
1180         }
1181     }
1182 
1183     private class CloseCallback : NoopCallback {
1184         private int error;
1185         private string reason;
1186 
1187         private this(int error, string reason) {
1188             this.error = error;
1189             this.reason = reason;
1190         }
1191 
1192         override
1193         void succeeded() {
1194             complete();
1195         }
1196 
1197         override
1198         void failed(Exception x) {
1199             complete();
1200         }
1201 
1202         private void complete() {
1203             close(error, reason, Callback.NOOP);
1204         }
1205     }
1206 
1207     private class DisconnectCallback : NoopCallback {
1208         override
1209         void succeeded() {
1210             complete();
1211         }
1212 
1213         override
1214         void failed(Exception x) {
1215             complete();
1216         }
1217 
1218         private void complete() {
1219             frames(null, Callback.NOOP, newGoAwayFrame(cast(int)ErrorCode.NO_ERROR, null), new DisconnectFrame());
1220         }
1221     }
1222 
1223     private class TerminateCallback : NoopCallback {
1224         private Exception failure;
1225 
1226         private this(Exception failure) {
1227             this.failure = failure;
1228         }
1229 
1230         override
1231         void succeeded() {
1232             complete();
1233         }
1234 
1235         override
1236         void failed(Exception x) {
1237             // failure.addSuppressed(x);
1238             Throwable.chainTogether(failure, x);
1239             complete();
1240         }
1241 
1242         private void complete() {
1243             terminate(failure);
1244         }
1245     }
1246 }