1 module hunt.http.client.Http2ClientSession;
2
3 import hunt.http.codec.http.encode.Http2Generator;
4 import hunt.http.codec.http.frame.HeadersFrame;
5 import hunt.http.codec.http.frame.PushPromiseFrame;
6 import hunt.http.codec.http.stream;
7
8 import hunt.net.Connection;
9
10 import hunt.util.Common;
11 import hunt.concurrency.Promise;
12 import hunt.concurrency.Scheduler;
13 import hunt.logging;
14
15
16 class Http2ClientSession : Http2Session {
17
18 this(Scheduler scheduler, Connection endPoint, Http2Generator generator,
19 Listener listener, FlowControlStrategy flowControl, int streamIdleTimeout) {
20 super(scheduler, endPoint, generator, listener, flowControl, 1, streamIdleTimeout);
21 }
22
23 static Http2ClientSession initSessionForUpgradingHTTP2(Scheduler scheduler, Connection endPoint,
24 Http2Generator generator, Listener listener, FlowControlStrategy flowControl,
25 int initialStreamId, int streamIdleTimeout, Promise!(Stream) initStream,
26 Stream.Listener initStreamListener) {
27
28 Http2ClientSession session = new Http2ClientSession(scheduler, endPoint, generator, listener, flowControl,
29 initialStreamId, streamIdleTimeout);
30 StreamSPI stream = session.createLocalStream(1, initStream);
31 stream.setListener(initStreamListener);
32 stream.updateClose(true, CloseStateEvent.AFTER_SEND);
33 initStream.succeeded(stream);
34 return session;
35 }
36
37 private this(Scheduler scheduler, Connection endPoint, Http2Generator generator,
38 Listener listener, FlowControlStrategy flowControl, int initialStreamId, int streamIdleTimeout) {
39 super(scheduler, endPoint, generator, listener, flowControl, initialStreamId, streamIdleTimeout);
40 }
41
42 override
43 void onHeaders(HeadersFrame frame) {
44 version(HUNT_DEBUG) {
45 tracef("Client received %s", frame);
46 }
47
48 auto stream = getStream(frame.getStreamId());
49 if(stream !is null)
50 {
51 stream.process(frame, Callback.NOOP);
52 notifyHeaders(stream, frame);
53 }
54
55 // Optional.ofNullable(getStream(frame.getStreamId()))
56 // .ifPresent(stream -) {
57 // stream.process(frame, Callback.NOOP);
58 // notifyHeaders(stream, frame);
59 // });
60 }
61
62 // override
63 void onPushPromise(PushPromiseFrame frame) {
64 version(HUNT_DEBUG) {
65 tracef("Client received %s", frame);
66 }
67
68 int streamId = frame.getStreamId();
69 int pushStreamId = frame.getPromisedStreamId();
70 StreamSPI stream = getStream(streamId);
71 if (stream is null) {
72 version(HUNT_DEBUG)
73 tracef("Ignoring %s, stream #%s not found", frame, streamId);
74 } else {
75 StreamSPI pushStream = createRemoteStream(pushStreamId);
76 pushStream.process(frame, Callback.NOOP);
77 Stream.Listener listener = notifyPush(stream, pushStream, frame);
78 pushStream.setListener(listener);
79 }
80 }
81
82 private Stream.Listener notifyPush(StreamSPI stream, StreamSPI pushStream, PushPromiseFrame frame) {
83 Stream.Listener listener = stream.getListener();
84 if (listener is null)
85 return null;
86 try {
87 return listener.onPush(pushStream, frame);
88 } catch (Throwable x) {
89 errorf("Failure while notifying listener %s", x, listener);
90 return null;
91 }
92 }
93 }