1 module hunt.http.client.Http2ClientSession; 2 3 import hunt.http.codec.http.encode.Http2Generator; 4 import hunt.http.codec.http.frame.HeadersFrame; 5 import hunt.http.codec.http.frame.PushPromiseFrame; 6 import hunt.http.codec.http.stream; 7 8 import hunt.net.Connection; 9 10 import hunt.util.Common; 11 import hunt.concurrency.Promise; 12 import hunt.concurrency.Scheduler; 13 import hunt.logging; 14 15 16 class Http2ClientSession : Http2Session { 17 18 this(Scheduler scheduler, Connection endPoint, Http2Generator generator, 19 Listener listener, FlowControlStrategy flowControl, int streamIdleTimeout) { 20 super(scheduler, endPoint, generator, listener, flowControl, 1, streamIdleTimeout); 21 } 22 23 static Http2ClientSession initSessionForUpgradingHTTP2(Scheduler scheduler, Connection endPoint, 24 Http2Generator generator, Listener listener, FlowControlStrategy flowControl, 25 int initialStreamId, int streamIdleTimeout, Promise!(Stream) initStream, 26 Stream.Listener initStreamListener) { 27 28 Http2ClientSession session = new Http2ClientSession(scheduler, endPoint, generator, listener, flowControl, 29 initialStreamId, streamIdleTimeout); 30 StreamSPI stream = session.createLocalStream(1, initStream); 31 stream.setListener(initStreamListener); 32 stream.updateClose(true, CloseStateEvent.AFTER_SEND); 33 initStream.succeeded(stream); 34 return session; 35 } 36 37 private this(Scheduler scheduler, Connection endPoint, Http2Generator generator, 38 Listener listener, FlowControlStrategy flowControl, int initialStreamId, int streamIdleTimeout) { 39 super(scheduler, endPoint, generator, listener, flowControl, initialStreamId, streamIdleTimeout); 40 } 41 42 override 43 void onHeaders(HeadersFrame frame) { 44 version(HUNT_DEBUG) { 45 tracef("Client received %s", frame); 46 } 47 48 auto stream = getStream(frame.getStreamId()); 49 if(stream !is null) 50 { 51 stream.process(frame, Callback.NOOP); 52 notifyHeaders(stream, frame); 53 } 54 55 // Optional.ofNullable(getStream(frame.getStreamId())) 56 // .ifPresent(stream -) { 57 // stream.process(frame, Callback.NOOP); 58 // notifyHeaders(stream, frame); 59 // }); 60 } 61 62 // override 63 void onPushPromise(PushPromiseFrame frame) { 64 version(HUNT_DEBUG) { 65 tracef("Client received %s", frame); 66 } 67 68 int streamId = frame.getStreamId(); 69 int pushStreamId = frame.getPromisedStreamId(); 70 StreamSPI stream = getStream(streamId); 71 if (stream is null) { 72 version(HUNT_DEBUG) 73 tracef("Ignoring %s, stream #%s not found", frame, streamId); 74 } else { 75 StreamSPI pushStream = createRemoteStream(pushStreamId); 76 pushStream.process(frame, Callback.NOOP); 77 Stream.Listener listener = notifyPush(stream, pushStream, frame); 78 pushStream.setListener(listener); 79 } 80 } 81 82 private Stream.Listener notifyPush(StreamSPI stream, StreamSPI pushStream, PushPromiseFrame frame) { 83 Stream.Listener listener = stream.getListener(); 84 if (listener is null) 85 return null; 86 try { 87 return listener.onPush(pushStream, frame); 88 } catch (Throwable x) { 89 errorf("Failure while notifying listener %s", x, listener); 90 return null; 91 } 92 } 93 }