1 module hunt.http.codec.http.stream.Http2Session;
2 
3 import hunt.http.codec.http.stream.CloseState;
4 import hunt.http.codec.http.stream.FlowControlStrategy;
5 import hunt.http.codec.http.stream.Http2Flusher;
6 import hunt.http.codec.http.stream.Http2Stream;
7 import hunt.http.codec.http.stream.Session;
8 import hunt.http.codec.http.stream.SessionSPI;
9 import hunt.http.codec.http.stream.Stream;
10 import hunt.http.codec.http.stream.StreamSPI;
11 
12 import hunt.http.codec.http.decode.Parser;
13 import hunt.http.codec.http.encode.Http2Generator;
14 import hunt.http.codec.http.frame;
15 
16 import hunt.collection;
17 
18 import hunt.util.DateTime;
19 import hunt.util.Common;
20 import hunt.Exceptions;
21 import hunt.util.Common;
22 
23 import hunt.concurrency.atomic;
24 import hunt.concurrency.CountingCallback;
25 import hunt.concurrency.Promise;
26 import hunt.concurrency.Scheduler;
27 import core.atomic : atomicOp, atomicLoad;
28 import hunt.net.Connection;
29 
30 import hunt.logging;
31 
32 // import core.exception;
33 
34 import core.time;
35 import std.algorithm;
36 import std.conv;
37 import std.datetime;
38 import std.format;
39 import std.range;
40 import std.typecons;
41 
42 alias StreamSession = hunt.http.codec.http.stream.Session.Session;
43 
44 
45 /**
46 */
47 abstract class Http2Session : SessionSPI, Parser.Listener {
48 
49     // = new ConcurrentHashMap<>();
50     // private AtomicInteger streamIds = new AtomicInteger();
51     // private AtomicInteger lastStreamId = new AtomicInteger();
52     // private AtomicInteger localStreamCount = new AtomicInteger();
53     // private AtomicBiInteger remoteStreamCount = new AtomicBiInteger();
54     // private AtomicInteger sendWindow = new AtomicInteger();
55     // private AtomicInteger recvWindow = new AtomicInteger();
56     // private AtomicReference<CloseState> closed = new AtomicReference<>(CloseState.NOT_CLOSED);
57     // private AtomicLong bytesWritten = new AtomicLong();
58 
59     private StreamSPI[int] streams;
60     private int streamIds ;
61     private int lastStreamId ;
62     private int localStreamCount ;
63     private long remoteStreamCount ;
64     private shared int sendWindow ;
65     private shared int recvWindow ;
66     private CloseState closed = CloseState.NOT_CLOSED;
67     private long bytesWritten;
68 
69     private Scheduler scheduler;
70     private Connection endPoint;
71     private Http2Generator generator;
72     private StreamSession.Listener listener;
73     private FlowControlStrategy flowControl;
74     private Http2Flusher flusher;
75     private int maxLocalStreams;
76     private int maxRemoteStreams;
77     private long streamIdleTimeout;
78     private int initialSessionRecvWindow;
79     private bool pushEnabled;
80     private MonoTime idleTime;
81 
82     alias convertToMillisecond = convert!(TimeUnit.HectoNanosecond, TimeUnit.Millisecond);
83 
84     this(Scheduler scheduler, Connection endPoint, Http2Generator generator,
85                         StreamSession.Listener listener, FlowControlStrategy flowControl,
86                         int initialStreamId, int streamIdleTimeout) {
87         this.scheduler = scheduler;
88         this.endPoint = endPoint;
89         this.generator = generator;
90         this.listener = listener;
91         this.flowControl = flowControl;
92         this.flusher = new Http2Flusher(this);
93         this.maxLocalStreams = -1;
94         this.maxRemoteStreams = -1;
95         this.streamIds = (initialStreamId);
96         this.streamIdleTimeout = streamIdleTimeout > 0 ? streamIdleTimeout : endPoint.getMaxIdleTimeout().total!(TimeUnit.Millisecond)();
97         this.sendWindow = (FlowControlStrategy.DEFAULT_WINDOW_SIZE);
98         this.recvWindow = (FlowControlStrategy.DEFAULT_WINDOW_SIZE);
99         this.pushEnabled = true; // SPEC: by default, push is enabled.
100         this.idleTime = MonoTime.currTime(); // convertToMillisecond(Clock.currStdTime); //Millisecond100Clock.currentTimeMillis();
101     }
102 
103     FlowControlStrategy getFlowControlStrategy() {
104         return flowControl;
105     }
106 
107     int getMaxLocalStreams() {
108         return maxLocalStreams;
109     }
110 
111     void setMaxLocalStreams(int maxLocalStreams) {
112         this.maxLocalStreams = maxLocalStreams;
113     }
114 
115     int getMaxRemoteStreams() {
116         return maxRemoteStreams;
117     }
118 
119     void setMaxRemoteStreams(int maxRemoteStreams) {
120         this.maxRemoteStreams = maxRemoteStreams;
121     }
122 
123     long getStreamIdleTimeout() {
124         return streamIdleTimeout;
125     }
126 
127     void setStreamIdleTimeout(long streamIdleTimeout) {
128         this.streamIdleTimeout = streamIdleTimeout;
129     }
130 
131     int getInitialSessionRecvWindow() {
132         return initialSessionRecvWindow;
133     }
134 
135     void setInitialSessionRecvWindow(int initialSessionRecvWindow) {
136         this.initialSessionRecvWindow = initialSessionRecvWindow;
137     }
138 
139     Connection getEndPoint() {
140         return endPoint;
141     }
142 
143     Http2Generator getGenerator() {
144         return generator;
145     }
146 
147     override
148     long getBytesWritten() {
149         return bytesWritten;
150     }
151 
152     override
153     void onData(DataFrame frame) {
154         version(HUNT_DEBUG) {
155             tracef("Received %s", frame.toString());
156         }
157         int streamId = frame.getStreamId();
158         StreamSPI stream = cast(StreamSPI) getStream(streamId);
159 
160         // SPEC: the session window must be updated even if the stream is null.
161         // The flow control length includes the padding bytes.
162         int flowControlLength = frame.remaining() + frame.padding();
163         flowControl.onDataReceived(this, stream, flowControlLength);
164 
165         class DataCallback : NoopCallback
166         {
167             override
168             void succeeded() {
169                 complete();
170             }
171 
172             override
173             void failed(Exception x) {
174                 // Consume also in case of failures, to free the
175                 // session flow control window for other streams.
176                 complete();
177             }
178 
179             private void complete() {
180                 // notIdle();
181                 // stream.notIdle();
182                 flowControl.onDataConsumed(this.outer, stream, flowControlLength);
183             }
184         }
185 
186         if (stream !is null && !isClosed) {
187             if (getRecvWindow() < 0) {
188                 close(cast(int)ErrorCode.FLOW_CONTROL_ERROR, "session_window_exceeded", Callback.NOOP);
189             } else {
190                 stream.process(frame, new DataCallback() );
191             }
192         } else {
193             tracef("Ignoring %s, stream #%s not found", frame.toString(), streamId);
194             // We must enlarge the session flow control window,
195             // otherwise other requests will be stalled.
196             flowControl.onDataConsumed(this, null, flowControlLength);
197         }
198     }
199 
200     override
201     abstract void onHeaders(HeadersFrame frame);
202 
203     override
204     void onPriority(PriorityFrame frame) {
205         version(HUNT_DEBUG) {
206             tracef("Received %s", frame.toString());
207         }
208     }
209 
210     override
211     void onReset(ResetFrame frame) {
212         version(HUNT_DEBUG) {
213             tracef("Received %s", frame.toString());
214         }
215         StreamSPI stream = cast(StreamSPI)getStream(frame.getStreamId());
216         if (stream !is null)
217             stream.process(frame, new ResetCallback());
218         else
219             notifyReset(this, frame);
220     }
221 
222     override
223     void onSettings(SettingsFrame frame) {
224         // SPEC: SETTINGS frame MUST be replied.
225         onSettings(frame, true);
226     }
227 
228     void onSettings(SettingsFrame frame, bool reply) {
229         version(HUNT_HTTP_DEBUG) {
230             tracef("Received %s", frame.toString());
231         }
232         if (frame.isReply())
233             return;
234 
235         // Iterate over all settings
236         //for (Map.Entry!(int, int) entry : frame.getSettings().entrySet()) 
237         foreach(int key, int value; frame.getSettings())
238         {
239             // int key = entry.getKey();
240             // int value = entry.getValue();
241             switch (key) {
242                 case SettingsFrame.HEADER_TABLE_SIZE: {
243                     version(HUNT_DEBUG) {
244                         tracef("Update HPACK header table size to %s for %s", value, this.toString());
245                     }
246                     generator.setHeaderTableSize(value);
247                     break;
248                 }
249                 case SettingsFrame.ENABLE_PUSH: {
250                     // SPEC: check the value is sane.
251                     if (value != 0 && value != 1) {
252                         onConnectionFailure(cast(int)ErrorCode.PROTOCOL_ERROR, "invalid_settings_enable_push");
253                         return;
254                     }
255                     pushEnabled = value == 1;
256                     version(HUNT_DEBUG) {
257                         tracef("%s push for %s", pushEnabled ? "Enable" : "Disable", this.toString());
258                     }
259                     break;
260                 }
261                 case SettingsFrame.MAX_CONCURRENT_STREAMS: {
262                     maxLocalStreams = value;
263                     version(HUNT_DEBUG) {
264                         tracef("Update max local concurrent streams to %s for %s", maxLocalStreams, this.toString());
265                     }
266                     break;
267                 }
268                 case SettingsFrame.INITIAL_WINDOW_SIZE: {
269                     version(HUNT_DEBUG) {
270                         tracef("Update initial window size to %s for %s", value, this.toString());
271                     }
272                     flowControl.updateInitialStreamWindow(this, value, false);
273                     break;
274                 }
275                 case SettingsFrame.MAX_FRAME_SIZE: {
276                     version(HUNT_DEBUG) {
277                         tracef("Update max frame size to %s for %s", value, this.toString());
278                     }
279                     // SPEC: check the max frame size is sane.
280                     if (value < Frame.DEFAULT_MAX_LENGTH || value > Frame.MAX_MAX_LENGTH) {
281                         onConnectionFailure(cast(int)ErrorCode.PROTOCOL_ERROR, "invalid_settings_max_frame_size");
282                         return;
283                     }
284                     generator.setMaxFrameSize(value);
285                     break;
286                 }
287                 case SettingsFrame.MAX_HEADER_LIST_SIZE: {
288                     version(HUNT_DEBUG) {
289                         tracef("Update max header list size to %s for %s", value, this.toString());
290                     }
291                     generator.setMaxHeaderListSize(value);
292                     break;
293                 }
294                 default: {
295                     version(HUNT_DEBUG) {
296                         tracef("Unknown setting %s:%s for %s", key, value, this.toString());
297                     }
298                     break;
299                 }
300             }
301         }
302         notifySettings(this, frame);
303 
304         if (reply) {
305             SettingsFrame replyFrame = new SettingsFrame(Collections.emptyMap!(int, int)(), true);
306             settings(replyFrame, Callback.NOOP);
307         }
308     }
309 
310     override
311     void onPing(PingFrame frame) {
312         // version(HUNT_HTTP_DEBUG) {
313         //     tracef("Received %s", frame.toString());
314         // }
315         if (frame.isReply()) {
316             // version(HUNT_HTTP_DEBUG)
317              {
318                 infof("The session %s received ping reply", endPoint.getId());
319             }
320             notifyPing(this, frame);
321         } else {
322             PingFrame reply = new PingFrame(frame.getPayload(), true);
323             control(null, Callback.NOOP, reply);
324         }
325     }
326 
327     /**
328      * This method is called when receiving a GO_AWAY from the other peer.
329      * We check the close state to act appropriately:
330      * <ul>
331      * <li>NOT_CLOSED: we move to REMOTELY_CLOSED and queue a disconnect, so
332      * that the content of the queue is written, and then the connection
333      * closed. We notify the application after being terminated.
334      * See <code>Http2Session.ControlEntry#succeeded()</code></li>
335      * <li>In all other cases, we do nothing since other methods are already
336      * performing their actions.</li>
337      * </ul>
338      *
339      * @param frame the GO_AWAY frame that has been received.
340      * @see #close(int, string, Callback)
341      * @see #onShutdown()
342      * @see #onIdleTimeout()
343      */
344     override
345     void onGoAway(GoAwayFrame frame) {
346         version(HUNT_DEBUG) {
347             tracef("Received %s", frame.toString());
348         }
349         while (true) {
350             CloseState current = closed;
351             switch (current) {
352                 case CloseState.NOT_CLOSED: {
353                     if (closed == current) {
354                         closed = CloseState.REMOTELY_CLOSED;
355                         // We received a GO_AWAY, so try to write
356                         // what's in the queue and then disconnect.
357                         notifyClose(this, frame, new DisconnectCallback());
358                         return;
359                     }
360                     break;
361                 }
362                 default: {
363                     version(HUNT_DEBUG) {
364                         tracef("Ignored %s, already closed", frame.toString());
365                     }
366                     return;
367                 }
368             }
369         }
370     }
371 
372     override
373     void onWindowUpdate(WindowUpdateFrame frame) {
374         version(HUNT_DEBUG) {
375             tracef("Received %s ^^^^ ", frame.toString());
376         }
377         int streamId = frame.getStreamId();
378         if (streamId > 0) {
379             StreamSPI stream = cast(StreamSPI)getStream(streamId);
380             if (stream !is null) {
381                 stream.process(frame, Callback.NOOP);
382                 onWindowUpdate(stream, frame);
383             }
384         } else {
385             onWindowUpdate(null, frame);
386         }
387     }
388 
389     override
390     void onConnectionFailure(int error, string reason) {
391         notifyFailure(this, new IOException(format("%d/%s", error, reason)), new CloseCallback(error, reason));
392     }
393 
394     override
395     void newStream(HeadersFrame frame, Promise!Stream promise, Stream.Listener listener) {
396         // Synchronization is necessary to atomically create
397         // the stream id and enqueue the frame to be sent.
398         bool queued;
399         synchronized (this) {
400             int streamId = frame.getStreamId();
401             if (streamId <= 0) {
402                 streamId = streamIds; streamIds += 2; // streamIds.getAndAdd(2);
403                 PriorityFrame priority = frame.getPriority();
404                 priority = priority is null ? null : new PriorityFrame(streamId, priority.getParentStreamId(),
405                         priority.getWeight(), priority.isExclusive());
406                 frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
407             }
408             StreamSPI stream = createLocalStream(streamId, promise);
409             if (stream is null)
410                 return;
411             stream.setListener(listener);
412 
413             ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback!Stream(promise, stream));
414             queued = flusher.append(entry);
415         }
416         // Iterate outside the synchronized block.
417         if (queued)
418             flusher.iterate();
419     }
420 
421     override
422     int priority(PriorityFrame frame, Callback callback) {
423         int streamId = frame.getStreamId();
424         StreamSPI stream = streams[streamId];
425         if (stream is null) {
426             streamId = streamIds; streamIds += 2; //.getAndAdd(2);
427             frame = new PriorityFrame(streamId, frame.getParentStreamId(),
428                     frame.getWeight(), frame.isExclusive());
429         }
430         control(stream, callback, frame);
431         return streamId;
432     }
433 
434     override
435     void push(StreamSPI stream, Promise!Stream promise, PushPromiseFrame frame, Stream.Listener listener) {
436         // Synchronization is necessary to atomically create
437         // the stream id and enqueue the frame to be sent.
438         bool queued;
439         synchronized (this) {
440             int streamId = streamIds; streamIds += 2; //.getAndAdd(2);
441             frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
442 
443             StreamSPI pushStream = createLocalStream(streamId, promise);
444             if (pushStream is null)
445                 return;
446             pushStream.setListener(listener);
447 
448             ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback!Stream(promise, pushStream));
449             queued = flusher.append(entry);
450         }
451         // Iterate outside the synchronized block.
452         if (queued)
453             flusher.iterate();
454     }
455 
456     override
457     void settings(SettingsFrame frame, Callback callback) {
458         control(null, callback, frame);
459     }
460 
461     override
462     void ping(PingFrame frame, Callback callback) {
463         if (frame.isReply())
464             callback.failed(new IllegalArgumentException(""));
465         else
466             control(null, callback, frame);
467     }
468 
469     protected void reset(ResetFrame frame, Callback callback) {
470         control(cast(StreamSPI)getStream(frame.getStreamId()), callback, frame);
471     }
472 
473     /**
474      * Invoked internally and by applications to send a GO_AWAY frame to the
475      * other peer. We check the close state to act appropriately:
476      * <ul>
477      * <li>NOT_CLOSED: we move to LOCALLY_CLOSED and queue a GO_AWAY. When the
478      * GO_AWAY has been written, it will only cause the output to be shut
479      * down (not the connection closed), so that the application can still
480      * read frames arriving from the other peer.
481      * Ideally the other peer will notice the GO_AWAY and close the connection.
482      * When that happen, we close the connection from {@link #onShutdown()}.
483      * Otherwise, the idle timeout mechanism will close the connection, see
484      * {@link #onIdleTimeout()}.</li>
485      * <li>In all other cases, we do nothing since other methods are already
486      * performing their actions.</li>
487      * </ul>
488      *
489      * @param error    the error code
490      * @param reason   the reason
491      * @param callback the callback to invoke when the operation is complete
492      * @see #onGoAway(GoAwayFrame)
493      * @see #onShutdown()
494      * @see #onIdleTimeout()
495      */
496     override
497     bool close(int error, string reason, Callback callback) {
498         while (true) {
499             CloseState current = closed;
500             switch (current) {
501                 case CloseState.NOT_CLOSED: {
502                     if (closed == current) {
503                         closed =  CloseState.LOCALLY_CLOSED;
504                         GoAwayFrame frame = newGoAwayFrame(error, reason);
505                         control(null, callback, frame);
506                         return true;
507                     }
508                     break;
509                 }
510                 default: {
511                     version(HUNT_DEBUG)
512                         tracef("Ignoring close %s/%s, already closed", error, reason);
513                     callback.succeeded();
514                     return false;
515                 }
516             }
517         }
518     }
519 
520     private GoAwayFrame newGoAwayFrame(int error, string reason) {
521         byte[] payload = null;
522         if (!reason.empty) {
523             // Trim the reason to avoid attack vectors.
524             int len = min(reason.length, 32);
525             payload = cast(byte[])reason[0..len].dup;
526         }
527         return new GoAwayFrame(lastStreamId, error, payload);
528     }
529 
530     override
531     bool isClosed() {
532         return closed != CloseState.NOT_CLOSED;
533     }
534 
535     private void control(StreamSPI stream, Callback callback, Frame frame) {
536         frames(stream, callback, frame, Frame.EMPTY_ARRAY);
537     }
538 
539     override
540     void frames(StreamSPI stream, Callback callback, Frame frame, Frame[] frames... ) {
541         // We want to generate as late as possible to allow re-prioritization;
542         // generation will happen while processing the entries.
543 
544         // The callback needs to be notified only when the last frame completes.
545 
546         int length = cast(int)frames.length;
547         if (length == 0) {
548             // version(HUNT_HTTP_DEBUG) tracef("length == 0") ;
549             onFrame(new ControlEntry(frame, stream, callback), true);
550         } else {
551             callback = new CountingCallback(callback, 1 + length);
552             onFrame(new ControlEntry(frame, stream, callback), false);
553             for (int i = 1; i <= length; ++i)
554                 onFrame(new ControlEntry(frames[i - 1], stream, callback), i == length);
555         }
556     }
557 
558     override
559     void data(StreamSPI stream, Callback callback, DataFrame frame) {
560         // We want to generate as late as possible to allow re-prioritization.
561         if (!isClosed())
562         {
563             onFrame(new DataEntry(frame, stream, callback), true);
564         }
565     }
566 
567     private void onFrame(Http2Flusher.Entry entry, bool flush) {
568         version(HUNT_HTTP_DEBUG) {
569             tracef("%s %s", flush ? "Sending" : "Queueing", entry.frame.toString());
570         }
571         // Ping frames are prepended to process them as soon as possible.
572         bool queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry);
573         if (queued && flush) {
574             if (entry.stream !is null) {
575                  //entry.stream.notIdle();
576             }
577             flusher.iterate();
578         }
579     }
580 
581     protected StreamSPI createLocalStream(int streamId, Promise!Stream promise) {
582         while (true) {
583             int localCount = localStreamCount;
584             int maxCount = getMaxLocalStreams();
585             if (maxCount >= 0 && localCount >= maxCount) {
586                 promise.failed(new IllegalStateException("Max local stream count " ~ maxCount.to!string() ~ " exceeded"));
587                 return null;
588             }
589             if (localStreamCount == localCount)
590             {
591                 localStreamCount =  localCount + 1;
592                 break;
593             }
594         }
595 
596         StreamSPI stream = newStream(streamId, true);
597         auto itemPtr = streamId in streams;
598         if (itemPtr is null) {
599             streams[streamId] = stream;
600             // stream.setIdleTimeout(getStreamIdleTimeout());
601             flowControl.onStreamCreated(stream);
602             version(HUNT_DEBUG) {
603                 tracef("Created local %s", stream.toString());
604             }
605             return stream;
606         } else {
607             promise.failed(new IllegalStateException("Duplicate stream " ~ streamId.to!string()));
608             return null;
609         }
610     }
611 
612     protected StreamSPI createRemoteStream(int streamId) {
613         // SPEC: exceeding max concurrent streams is treated as stream error.
614         while (true) {
615             long encoded = remoteStreamCount;
616             int remoteCount = AtomicBiInteger.getHi(encoded);
617             int remoteClosing = AtomicBiInteger.getLo(encoded);
618             int maxCount = getMaxRemoteStreams();
619             if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount) {
620                 reset(new ResetFrame(streamId, cast(int)ErrorCode.REFUSED_STREAM_ERROR), Callback.NOOP);
621                 return null;
622             }
623             // if (remoteStreamCount.compareAndSet(encoded, remoteCount + 1, remoteClosing))
624             if(remoteStreamCount == encoded)
625             {
626                 remoteStreamCount = AtomicBiInteger.encode(remoteCount + 1, remoteClosing);
627                 break;
628             }
629         }
630 
631         StreamSPI stream = newStream(streamId, false);
632 
633         // SPEC: duplicate stream is treated as connection error.
634         //streams.putIfAbsent(streamId, stream)
635         auto itemPtr = streamId in streams;
636         if ( itemPtr is null) {
637             streams[streamId] = stream;
638             updateLastStreamId(streamId);
639             // stream.setIdleTimeout(getStreamIdleTimeout());
640             flowControl.onStreamCreated(stream);
641             version(HUNT_DEBUG) {
642                 tracef("Created remote %s", stream.toString());
643             }
644             return stream;
645         } else {
646             close(cast(int)ErrorCode.PROTOCOL_ERROR, "duplicate_stream", Callback.NOOP);
647             return null;
648         }
649     }
650 
651     void updateStreamCount(bool local, int deltaStreams, int deltaClosing) {
652         if (local)
653             localStreamCount += (deltaStreams);
654         else
655         {
656             remoteStreamCount = AtomicBiInteger.encode(remoteStreamCount, deltaStreams, deltaClosing);
657             // remoteStreamCount.add(deltaStreams, deltaClosing);
658         }
659     }
660 
661     protected StreamSPI newStream(int streamId, bool local) {
662         return new Http2Stream(scheduler, this, streamId, local);
663     }
664 
665     override
666     void removeStream(StreamSPI stream) {
667         bool removed = streams.remove(stream.getId());
668         if (removed) {
669             onStreamClosed(stream);
670             flowControl.onStreamDestroyed(stream);
671             version(HUNT_DEBUG) {
672                 tracef("Removed %s %s", stream.isLocal() ? "local" : "remote", stream);
673             }
674         }
675     }
676 
677     override
678     Stream[] getStreams() {
679         return cast(Stream[])(streams.values());
680     }
681 
682     int getStreamCount() {
683         return cast(int)streams.length;
684     }
685 
686     override
687     StreamSPI getStream(int streamId)
688     {
689         return streams.get(streamId, null);
690     }
691 
692     int getSendWindow() {
693         return sendWindow.atomicLoad!();
694     }
695 
696     int getRecvWindow() {
697         return recvWindow.atomicLoad!();
698     }
699 
700     override
701     int updateSendWindow(int delta) {
702         //int old = sendWindow.atomicLoad!();
703         return sendWindow.atomicOp!"+="(delta);
704         //return old;
705     }
706 
707     override
708     int updateRecvWindow(int delta) {
709         //int old = recvWindow.atomicLoad!();
710         return  recvWindow.atomicOp!"+="(delta);
711         //return old;
712     }
713 
714     override
715     void onWindowUpdate(StreamSPI stream, WindowUpdateFrame frame) {
716         // WindowUpdateFrames arrive concurrently with writes.
717         // Increasing (or reducing) the window size concurrently
718         // with writes requires coordination with the flusher, that
719         // decides how many frames to write depending on the available
720         // window sizes. If the window sizes vary concurrently, the
721         // flusher may take non-optimal or wrong decisions.
722         // Here, we "queue" window updates to the flusher, so it will
723         // be the only component responsible for window updates, for
724         // both increments and reductions.
725         flusher.window(stream, frame);
726     }
727 
728     override
729     bool isPushEnabled() {
730         return pushEnabled;
731     }
732 
733     /**
734      * A typical close by a remote peer involves a GO_AWAY frame followed by TCP FIN.
735      * This method is invoked when the TCP FIN is received, or when an exception is
736      * thrown while reading, and we check the close state to act appropriately:
737      * <ul>
738      * <li>NOT_CLOSED: means that the remote peer did not send a GO_AWAY (abrupt close)
739      * or there was an exception while reading, and therefore we terminate.</li>
740      * <li>LOCALLY_CLOSED: we have sent the GO_AWAY to the remote peer, which received
741      * it and closed the connection; we queue a disconnect to close the connection
742      * on the local side.
743      * The GO_AWAY just shutdown the output, so we need this step to make sure the
744      * connection is closed. See {@link #close(int, string, Callback)}.</li>
745      * <li>REMOTELY_CLOSED: we received the GO_AWAY, and the TCP FIN afterwards, so we
746      * do nothing since the handling of the GO_AWAY will take care of closing the
747      * connection. See {@link #onGoAway(GoAwayFrame)}.</li>
748      * </ul>
749      *
750      * @see #onGoAway(GoAwayFrame)
751      * @see #close(int, string, Callback)
752      * @see #onIdleTimeout()
753      */
754     override
755     void onShutdown() {
756         version(HUNT_DEBUG) {
757             tracef("Shutting down %s", this.toString());
758         }
759         switch (closed) {
760             case CloseState.NOT_CLOSED: {
761                 // The other peer did not send a GO_AWAY, no need to be gentle.
762                 version(HUNT_DEBUG) {
763                     tracef("Abrupt close for %s", this.toString());
764                 }
765                 abort(new ClosedChannelException(""));
766                 break;
767             }
768             case CloseState.LOCALLY_CLOSED: {
769                 // We have closed locally, and only shutdown
770                 // the output; now queue a disconnect.
771                 control(null, Callback.NOOP, new DisconnectFrame());
772                 break;
773             }
774             case CloseState.REMOTELY_CLOSED: {
775                 // Nothing to do, the GO_AWAY frame we
776                 // received will close the connection.
777                 break;
778             }
779             default: {
780                 break;
781             }
782         }
783     }
784 
785     /**
786      * This method is invoked when the idle timeout triggers. We check the close state
787      * to act appropriately:
788      * <ul>
789      * <li>NOT_CLOSED: it's a real idle timeout, we just initiate a close, see
790      * {@link #close(int, string, Callback)}.</li>
791      * <li>LOCALLY_CLOSED: we have sent a GO_AWAY and only shutdown the output, but the
792      * other peer did not close the connection so we never received the TCP FIN, and
793      * therefore we terminate.</li>
794      * <li>REMOTELY_CLOSED: the other peer sent us a GO_AWAY, we should have queued a
795      * disconnect, but for some reason it was not processed (for example, queue was
796      * stuck because of TCP congestion), therefore we terminate.
797      * See {@link #onGoAway(GoAwayFrame)}.</li>
798      * </ul>
799      *
800      * @return true if the session should be closed, false otherwise
801      * @see #onGoAway(GoAwayFrame)
802      * @see #close(int, string, Callback)
803      * @see #onShutdown()
804      */
805     override
806     bool onIdleTimeout() {
807         switch (closed) {
808             case CloseState.NOT_CLOSED: {
809                 // long elapsed = convertToMillisecond(Clock.currStdTime) - idleTime;
810                 Duration elapsed = MonoTime.currTime() - idleTime;
811                 version(HUNT_HTTP_DEBUG) {
812                     tracef("HTTP2 session on idle timeout. The elapsed time is %d - %d", 
813                         elapsed.total!(TimeUnit.Millisecond), 
814                         endPoint.getMaxIdleTimeout().total!(TimeUnit.Millisecond));
815                 }
816                 return elapsed >= endPoint.getMaxIdleTimeout() && notifyIdleTimeout(this);
817             }
818             case CloseState.LOCALLY_CLOSED:
819             case CloseState.REMOTELY_CLOSED: {
820                 abort(new TimeoutException("Idle timeout " ~ endPoint.getMaxIdleTimeout().to!string() ~ " ms"));
821                 return false;
822             }
823             default: {
824                 return false;
825             }
826         }
827     }
828 
829     private void notIdle() {
830         idleTime = MonoTime.currTime; // convertToMillisecond(Clock.currStdTime);
831     }
832 
833     override
834     void onFrame(Frame frame) {
835         onConnectionFailure(cast(int)ErrorCode.PROTOCOL_ERROR, "upgrade");
836     }
837 
838     protected void onStreamOpened(StreamSPI stream) {
839     }
840 
841     protected void onStreamClosed(StreamSPI stream) {
842     }
843 
844     void disconnect() {
845         version(HUNT_DEBUG) {
846             tracef("Disconnecting %s", this.toString());
847         }
848         endPoint.close();
849     }
850 
851     private void terminate(Exception cause) {
852         while (true) {
853             CloseState current = closed;
854             switch (current) {
855                 case CloseState.NOT_CLOSED:
856                 case CloseState.LOCALLY_CLOSED:
857                 case CloseState.REMOTELY_CLOSED: {
858                     if (closed == current) {
859                         closed = CloseState.CLOSED;
860                         flusher.terminate(cause);
861                         foreach (StreamSPI stream ; streams.byValue())
862                             stream.close();
863                         streams.clear();
864                         disconnect();
865                         return;
866                     }
867                     break;
868                 }
869                 default: {
870                     return;
871                 }
872             }
873         }
874     }
875 
876     void abort(Exception failure) {
877         notifyFailure(this, failure, new TerminateCallback(failure));
878     }
879 
880     bool isDisconnected() {
881         return !endPoint.isConnected();
882     }
883 
884     private void updateLastStreamId(int streamId) {
885         // Atomics.updateMax(lastStreamId, streamId);
886         if(streamId>lastStreamId) lastStreamId = streamId;
887     }
888 
889     protected Stream.Listener notifyNewStream(Stream stream, HeadersFrame frame) {
890         try {
891             return listener.onNewStream(stream, frame);
892         } catch (Exception x) {
893             info("Failure while notifying listener " ~ listener.toString(), x);
894             return null;
895         }
896     }
897 
898     protected void notifySettings(StreamSession session, SettingsFrame frame) {
899         try {
900             listener.onSettings(session, frame);
901         } catch (Exception x) {
902             info("Failure while notifying listener " ~ listener.toString(), x);
903         }
904     }
905 
906     protected void notifyPing(StreamSession session, PingFrame frame) {
907         try {
908             listener.onPing(session, frame);
909         } catch (Exception x) {
910             info("Failure while notifying listener " ~ listener.toString(), x);
911         }
912     }
913 
914     protected void notifyReset(StreamSession session, ResetFrame frame) {
915         try {
916             listener.onReset(session, frame);
917         } catch (Exception x) {
918             info("Failure while notifying listener " ~ listener.toString(), x);
919         }
920     }
921 
922     protected void notifyClose(StreamSession session, GoAwayFrame frame, Callback callback) {
923         try {
924             listener.onClose(session, frame, callback);
925         } catch (Exception x) {
926             info("Failure while notifying listener " ~ listener.toString(), x);
927         }
928     }
929 
930     protected bool notifyIdleTimeout(StreamSession session) {
931         try {
932             return listener.onIdleTimeout(session);
933         } catch (Exception x) {
934             info("Failure while notifying listener " ~ listener.toString(), x);
935             return true;
936         }
937     }
938 
939     protected void notifyFailure(StreamSession session, Exception failure, Callback callback) {
940         try {
941             listener.onFailure(session, failure, callback);
942         } catch (Exception x) {
943             info("Failure while notifying listener " ~ listener.toString(), x);
944         }
945     }
946 
947     protected void notifyHeaders(StreamSPI stream, HeadersFrame frame) {
948         // Optional.ofNullable(stream.getListener()).ifPresent(listener -> {
949         //     try {
950         //         listener.onHeaders(stream, frame);
951         //     } catch (Exception x) {
952         //         info("Failure while notifying listener " ~ listener.toString(), x);
953         //     }
954         // });
955         auto listener = stream.getListener();
956         if(listener !is null)
957         {
958             try {
959                 listener.onHeaders(stream, frame);
960             } catch (Exception x) {
961                 info("Failure while notifying listener " ~ listener.toString(), x);
962             }            
963         }
964     }
965 
966     override
967     string toString() {
968         return format("%s@%s{l:%s <-> r:%s,sendWindow=%s,recvWindow=%s,streams=%d,%s}",
969                 typeof(this).stringof,
970                 toHash(),
971                 getEndPoint().getLocalAddress(),
972                 getEndPoint().getRemoteAddress(),
973                 sendWindow,
974                 recvWindow,
975                 streams.length,
976                 closed);
977     }
978 
979     private class ControlEntry : Http2Flusher.Entry {
980         private int bytes;
981 
982         private this(Frame frame, StreamSPI stream, Callback callback) {
983             super(frame, stream, callback);
984         }
985 
986         override
987         protected bool generate(Queue!ByteBuffer buffers) {
988             List!(ByteBuffer) controlFrame = generator.control(frame);
989             bytes = cast(int) BufferUtils.remaining(controlFrame.toArray());
990             buffers.addAll(controlFrame);
991             version(HUNT_HTTP_DEBUG) {
992                 tracef("Generated %s", frame.toString());
993             }
994             beforeSend();
995             return true;
996         }
997 
998         /**
999          * <p>Performs actions just before writing the frame to the network.</p>
1000          * <p>Some frame, when sent over the network, causes the receiver
1001          * to react and send back frames that may be processed by the original
1002          * sender *before* {@link #succeeded()} is called.
1003          * <p>If the action to perform updates some state, this update may
1004          * not be seen by the received frames and cause errors.</p>
1005          * <p>For example, suppose the action updates the stream window to a
1006          * larger value; the sender sends the frame; the receiver is now entitled
1007          * to send back larger data; when the data is received by the original
1008          * sender, the action may have not been performed yet, causing the larger
1009          * data to be rejected, when it should have been accepted.</p>
1010          */
1011         private void beforeSend() {
1012             switch (frame.getType()) {
1013                 case FrameType.HEADERS: {
1014                     HeadersFrame headersFrame = cast(HeadersFrame) frame;
1015                     stream.updateClose(headersFrame.isEndStream(), CloseStateEvent.BEFORE_SEND);
1016                     break;
1017                 }
1018                 case FrameType.SETTINGS: {
1019                     SettingsFrame settingsFrame = cast(SettingsFrame) frame;
1020                     Map!(int, int) settings = settingsFrame.getSettings();
1021                     if (settings.containsKey(SettingsFrame.INITIAL_WINDOW_SIZE))
1022                     {
1023                         int initialWindow = settings.get(SettingsFrame.INITIAL_WINDOW_SIZE);
1024                         flowControl.updateInitialStreamWindow(this.outer, initialWindow, true);
1025                     }
1026                     break;
1027                 }
1028                 default: {
1029                     break;
1030                 }
1031             }
1032         }
1033 
1034         override
1035         void succeeded() {
1036             bytesWritten += (bytes);
1037             switch (frame.getType()) {
1038                 case FrameType.HEADERS: {
1039                     onStreamOpened(stream);
1040                     HeadersFrame headersFrame = cast(HeadersFrame) frame;
1041                     if (stream.updateClose(headersFrame.isEndStream(), CloseStateEvent.AFTER_SEND))
1042                         removeStream(stream);
1043                     break;
1044                 }
1045                 case FrameType.RST_STREAM: {
1046                     if (stream !is null) {
1047                         stream.close();
1048                         removeStream(stream);
1049                     }
1050                     break;
1051                 }
1052                 case FrameType.PUSH_PROMISE: {
1053                     // Pushed streams are implicitly remotely closed.
1054                     // They are closed when sending an end-stream DATA frame.
1055                     stream.updateClose(true, CloseStateEvent.RECEIVED);
1056                     break;
1057                 }
1058                 case FrameType.GO_AWAY: {
1059                     // We just sent a GO_AWAY, only shutdown the
1060                     // output without closing yet, to allow reads.
1061                     getEndPoint().shutdownOutput();
1062                     break;
1063                 }
1064                 case FrameType.WINDOW_UPDATE: {
1065                     flowControl.windowUpdate(this.outer, stream, cast(WindowUpdateFrame) frame);
1066                     break;
1067                 }
1068                 case FrameType.DISCONNECT: {
1069                     terminate(new ClosedChannelException(""));
1070                     break;
1071                 }
1072                 default: {
1073                     break;
1074                 }
1075             }
1076             super.succeeded();
1077         }
1078 
1079         override
1080         void failed(Exception x) {
1081             if (frame.getType() == FrameType.DISCONNECT) {
1082                 terminate(new ClosedChannelException(""));
1083             }
1084             super.failed(x);
1085         }
1086     }
1087 
1088     private class DataEntry :Http2Flusher.Entry {
1089         private int bytes;
1090         private int _dataRemaining;
1091         private int dataWritten;
1092 
1093         private this(DataFrame frame, StreamSPI stream, Callback callback) {
1094             super(frame, stream, callback);
1095             // We don't do any padding, so the flow control length is
1096             // always the data remaining. This simplifies the handling
1097             // of data frames that cannot be completely written due to
1098             // the flow control window exhausting, since in that case
1099             // we would have to count the padding only once.
1100             _dataRemaining = frame.remaining();
1101         }
1102 
1103         override
1104         int dataRemaining() {
1105             return _dataRemaining;
1106         }
1107 
1108         override
1109         protected bool generate(Queue!ByteBuffer buffers) {
1110             int remaining = dataRemaining();
1111 
1112             int sessionSendWindow = getSendWindow();
1113             if (sessionSendWindow <= 0)
1114             {
1115                 updateSendWindow(65535);
1116                 sessionSendWindow = 65535;
1117             }
1118             int streamSendWindow = stream.updateSendWindow(0);
1119             if (streamSendWindow <= 0)
1120             {
1121                 stream.updateSendWindow(sessionSendWindow);
1122                 streamSendWindow = sessionSendWindow;
1123             }
1124             int window = min(streamSendWindow, sessionSendWindow);
1125             //tracef("&&&&&&&&&&&&&&&&& %d &&&& %d" ,sessionSendWindow , streamSendWindow) ;
1126 
1127             if (window <= 0 && remaining > 0)
1128             {
1129                 tracef("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! %d &&&& %d" ,sessionSendWindow , streamSendWindow) ;
1130                  return false;
1131             }
1132 
1133 
1134             int length = min(remaining, window);
1135 
1136             // Only one DATA frame is generated.
1137             DataFrame dataFrame = cast(DataFrame) frame;
1138             Tuple!(int, List!(ByteBuffer)) pair = generator.data(dataFrame, length);
1139             bytes = pair[0];
1140             buffers.addAll(pair[1]);
1141             int written = bytes - Frame.HEADER_LENGTH;
1142             version(HUNT_DEBUG) {
1143                 tracef("Generated %s, length/window/data=%s/%s/%s", dataFrame, written, window, remaining);
1144             }
1145             this.dataWritten = written;
1146             //tracef("&&&&&&&&^^^^^ %d ^^^^^^ %d ^^^^^^%d",  written, _dataRemaining , bytes);
1147             this._dataRemaining -= written;
1148 
1149 
1150             flowControl.onDataSending(stream, written);
1151             stream.updateClose(dataFrame.isEndStream(), CloseStateEvent.BEFORE_SEND);
1152 
1153             return true;
1154         }
1155 
1156         override
1157         void succeeded() {
1158             bytesWritten += bytes;
1159             flowControl.onDataSent(stream, dataWritten);
1160 
1161             // Do we have more to send ?
1162             DataFrame dataFrame = cast(DataFrame) frame;
1163             if (_dataRemaining == 0) {
1164                 // Only now we can update the close state
1165                 // and eventually remove the stream.
1166                 if (stream.updateClose(dataFrame.isEndStream(), CloseStateEvent.AFTER_SEND))
1167                     removeStream(stream);
1168                 super.succeeded();
1169             }
1170         }
1171     }
1172 
1173     private static class PromiseCallback(C) : NoopCallback {
1174         private Promise!C promise;
1175         private C value;
1176 
1177         private this(Promise!C promise, C value) {
1178             this.promise = promise;
1179             this.value = value;
1180         }
1181 
1182         override
1183         void succeeded() {
1184             promise.succeeded(value);
1185         }
1186 
1187         override
1188         void failed(Exception x) {
1189             promise.failed(x);
1190         }
1191     }
1192 
1193     private class ResetCallback : NoopCallback {
1194         override
1195         void succeeded() {
1196             complete();
1197         }
1198 
1199         override
1200         void failed(Exception x) {
1201             complete();
1202         }
1203 
1204         private void complete() {
1205             flusher.iterate();
1206         }
1207     }
1208 
1209     private class CloseCallback : NoopCallback {
1210         private int error;
1211         private string reason;
1212 
1213         private this(int error, string reason) {
1214             this.error = error;
1215             this.reason = reason;
1216         }
1217 
1218         override
1219         void succeeded() {
1220             complete();
1221         }
1222 
1223         override
1224         void failed(Exception x) {
1225             complete();
1226         }
1227 
1228         private void complete() {
1229             close(error, reason, Callback.NOOP);
1230         }
1231     }
1232 
1233     private class DisconnectCallback : NoopCallback {
1234         override
1235         void succeeded() {
1236             complete();
1237         }
1238 
1239         override
1240         void failed(Exception x) {
1241             complete();
1242         }
1243 
1244         private void complete() {
1245             frames(null, Callback.NOOP, newGoAwayFrame(cast(int)ErrorCode.NO_ERROR, null), new DisconnectFrame());
1246         }
1247     }
1248 
1249     private class TerminateCallback : NoopCallback {
1250         private Exception failure;
1251 
1252         private this(Exception failure) {
1253             this.failure = failure;
1254         }
1255 
1256         override
1257         void succeeded() {
1258             complete();
1259         }
1260 
1261         override
1262         void failed(Exception x) {
1263             // failure.addSuppressed(x);
1264             Throwable.chainTogether(failure, x);
1265             complete();
1266         }
1267 
1268         private void complete() {
1269             terminate(failure);
1270         }
1271     }
1272 }