1 module hunt.http.client.Http2ClientSession;
2 
3 import hunt.http.codec.http.encode.Http2Generator;
4 import hunt.http.codec.http.frame.HeadersFrame;
5 import hunt.http.codec.http.frame.PushPromiseFrame;
6 import hunt.http.codec.http.stream;
7 
8 import hunt.net.Connection;
9 
10 import hunt.util.Common;
11 import hunt.concurrency.Promise;
12 import hunt.concurrency.Scheduler;
13 import hunt.logging;
14 
15 
16 class Http2ClientSession : Http2Session {
17     
18     this(Scheduler scheduler, Connection endPoint, Http2Generator generator,
19                               Listener listener, FlowControlStrategy flowControl, int streamIdleTimeout) {
20         super(scheduler, endPoint, generator, listener, flowControl, 1, streamIdleTimeout);
21     }
22 
23     static Http2ClientSession initSessionForUpgradingHTTP2(Scheduler scheduler, Connection endPoint,
24         Http2Generator generator, Listener listener, FlowControlStrategy flowControl, 
25         int initialStreamId, int streamIdleTimeout, Promise!(Stream) initStream, 
26         Stream.Listener initStreamListener) {
27 
28         Http2ClientSession session = new Http2ClientSession(scheduler, endPoint, generator, listener, flowControl,
29                 initialStreamId, streamIdleTimeout);
30         StreamSPI stream = session.createLocalStream(1, initStream);
31         stream.setListener(initStreamListener);
32         stream.updateClose(true, CloseStateEvent.AFTER_SEND);
33         initStream.succeeded(stream);
34         return session;
35     }
36 
37     private this(Scheduler scheduler, Connection endPoint, Http2Generator generator,
38                                Listener listener, FlowControlStrategy flowControl, int initialStreamId, int streamIdleTimeout) {
39         super(scheduler, endPoint, generator, listener, flowControl, initialStreamId, streamIdleTimeout);
40     }
41 
42     override
43     void onHeaders(HeadersFrame frame) {
44         version(HUNT_DEBUG) {
45             tracef("Client received %s", frame);
46         }
47 
48         auto stream = getStream(frame.getStreamId());
49         if(stream !is null)
50         {
51             stream.process(frame, Callback.NOOP);
52             notifyHeaders(stream, frame);
53         }
54 
55         // Optional.ofNullable(getStream(frame.getStreamId()))
56         //         .ifPresent(stream -) {
57         //             stream.process(frame, Callback.NOOP);
58         //             notifyHeaders(stream, frame);
59         //         });
60     }
61 
62     // override
63     void onPushPromise(PushPromiseFrame frame) {
64         version(HUNT_DEBUG) {
65             tracef("Client received %s", frame);
66         }
67 
68         int streamId = frame.getStreamId();
69         int pushStreamId = frame.getPromisedStreamId();
70         StreamSPI stream = getStream(streamId);
71         if (stream is null) {
72             version(HUNT_DEBUG)
73                 tracef("Ignoring %s, stream #%s not found", frame, streamId);
74         } else {
75             StreamSPI pushStream = createRemoteStream(pushStreamId);
76             pushStream.process(frame, Callback.NOOP);
77             Stream.Listener listener = notifyPush(stream, pushStream, frame);
78             pushStream.setListener(listener);
79         }
80     }
81 
82     private Stream.Listener notifyPush(StreamSPI stream, StreamSPI pushStream, PushPromiseFrame frame) {
83         Stream.Listener listener = stream.getListener();
84         if (listener is null)
85             return null;
86         try {
87             return listener.onPush(pushStream, frame);
88         } catch (Throwable x) {
89             errorf("Failure while notifying listener %s", x, listener);
90             return null;
91         }
92     }
93 }