1 module hunt.http.client.Http2ClientConnection;
2 
3 import hunt.http.client.ClientHttpHandler;
4 import hunt.http.client.HttpClientConnection;
5 import hunt.http.client.Http2ClientResponseHandler;
6 import hunt.http.client.Http2ClientSession;
7 
8 import hunt.http.codec.http.decode.Parser;
9 import hunt.http.codec.http.encode.Http2Generator;
10 import hunt.http.codec.http.frame;
11 import hunt.http.codec.http.model.HttpHeader;
12 import hunt.http.codec.http.model.HttpHeaderValue;
13 import hunt.http.codec.http.model.MetaData;
14 import hunt.http.codec.http.stream;
15 import hunt.http.codec.websocket.model.IncomingFrames;
16 import hunt.http.codec.websocket.stream.WebSocketConnection;
17 import hunt.http.codec.websocket.stream.WebSocketPolicy;
18 
19 import hunt.container;
20 import hunt.logging;
21 import hunt.net.ConnectionType;
22 import hunt.net.secure.SecureSession;
23 import hunt.net.Session;
24 
25 import hunt.util.functional;
26 import hunt.util.concurrent.FuturePromise;
27 import hunt.util.concurrent.Promise;
28 // import hunt.util.concurrent.Scheduler;
29 import hunt.lang.exception;
30 
31 import std.conv;
32 
33 alias SessionListener = StreamSession.Listener;
34 
35 class Http2ClientConnection : AbstractHttp2Connection , HttpClientConnection {
36     void initialize(Http2Configuration config, Promise!(HttpClientConnection) promise,
37                            SessionListener listener) {
38         Map!(int, int) settings = listener.onPreface(getHttp2Session());
39         if (settings is null) {
40             settings = Collections.emptyMap!(int, int)();
41         }
42         PrefaceFrame prefaceFrame = new PrefaceFrame();
43         SettingsFrame settingsFrame = new SettingsFrame(settings, false);
44         SessionSPI sessionSPI = getSessionSPI();
45         int windowDelta = config.getInitialSessionRecvWindow() - FlowControlStrategy.DEFAULT_WINDOW_SIZE;
46         Callback callback = new class NoopCallback {
47 
48             override
49             void succeeded() {
50                 promise.succeeded(this.outer);
51             }
52 
53             override
54             void failed(Exception x) {
55                 this.outer.close();
56                 promise.failed(x);
57             }
58         };
59 
60         if (windowDelta > 0) {
61             sessionSPI.updateRecvWindow(windowDelta);
62             sessionSPI.frames(null, callback, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta));
63         } else {
64             sessionSPI.frames(null, callback, prefaceFrame, settingsFrame);
65         }
66 
67         // TODO: Tasks pending completion -@Administrator at 2018-7-16 14:41:31
68         // 
69         // Scheduler.Future pingFuture = scheduler.scheduleAtFixedRate(() => getHttp2Session().ping(new PingFrame(false), new class Callback {
70         //     void succeeded() {
71         //         info("The session %s sent ping frame success", getSessionId());
72         //     }
73 
74         //     void failed(Throwable x) {
75         //         warningf("the session %s sends ping frame failure. %s", getSessionId(), x.getMessage());
76         //     }
77         // }), config.getHttp2PingInterval(), config.getHttp2PingInterval(), TimeUnit.MILLISECONDS);
78 
79         // onClose(c => pingFuture.cancel());
80     }
81 
82     this(Http2Configuration config, TcpSession tcpSession, SecureSession secureSession,
83                                  SessionListener listener) {
84         super(config, tcpSession, secureSession, listener);
85     }
86 
87     override
88     protected Http2Session initHttp2Session(Http2Configuration config, FlowControlStrategy flowControl,
89                                             SessionListener listener) {
90         return new Http2ClientSession(null, this.tcpSession, this.generator, listener, flowControl, config.getStreamIdleTimeout());
91     }
92 
93     override
94     protected Parser initParser(Http2Configuration config) {
95         return new Parser(http2Session, config.getMaxDynamicTableSize(), config.getMaxRequestHeadLength());
96     }
97 
98     
99     override
100     ConnectionType getConnectionType() {
101         return super.getConnectionType();
102     }
103 
104     override
105     bool isEncrypted() {
106         return super.isEncrypted();
107     }
108 
109 
110     Parser getParser() {
111         return parser;
112     }
113 
114     Http2Generator getGenerator() {
115         return generator;
116     }
117 
118     SessionSPI getSessionSPI() {
119         return http2Session;
120     }
121 
122     override
123     void send(Request request, ClientHttpHandler handler) {
124         Promise!(HttpOutputStream) promise = new class Promise!(HttpOutputStream) {
125 
126             void succeeded(HttpOutputStream output) {
127                 try {
128                     output.close();
129                 } catch (IOException e) {
130                     errorf("write data unsuccessfully", e);
131                 }
132 
133             }
134 
135             void failed(Exception x) {
136                 errorf("write data unsuccessfully", x);
137             }
138 
139             string id() { return "HttpOutputStream close"; }
140         };
141 
142         this.request(request, true, promise, handler);
143     }
144 
145     override
146     void send(Request request, ByteBuffer buffer, ClientHttpHandler handler) {
147         send(request, [buffer], handler);
148     }
149 
150     override
151     void send(Request request, ByteBuffer[] buffers, ClientHttpHandler handler) {
152         long contentLength = BufferUtils.remaining(buffers);
153         request.getFields().put(HttpHeader.CONTENT_LENGTH, contentLength.to!string);
154 
155         Promise!(HttpOutputStream) promise = new class Promise!(HttpOutputStream) {
156 
157             void succeeded(HttpOutputStream output) {
158                 try {
159                     output.writeWithContentLength(buffers);
160                 } catch (IOException e) {
161                     errorf("write data unsuccessfully", e);
162                 }
163             }
164 
165             void failed(Exception x) {
166                 errorf("write data unsuccessfully", x);
167             }
168 
169             string id() { return "writeWithContentLength"; }
170         };
171 
172         send(request, promise, handler);
173     }
174 
175     override
176     HttpOutputStream sendRequestWithContinuation(HttpRequest request, ClientHttpHandler handler) {
177         request.getFields().put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE);
178         return getHttpOutputStream(request, handler);
179     }
180 
181     override
182     HttpOutputStream getHttpOutputStream(Request request, ClientHttpHandler handler) {
183         FuturePromise!(HttpOutputStream) promise = new FuturePromise!(HttpOutputStream)();
184         send(request, promise, handler);
185         try {
186             return promise.get();
187         } catch (Exception e) {
188             errorf("get http output stream unsuccessfully", e);
189             return null;
190         }
191     }
192 
193     override
194     void send(Request request, Promise!(HttpOutputStream) promise, ClientHttpHandler handler) {
195         this.request(request, false, promise, handler);
196     }
197 
198     void request(Request request, bool endStream,
199                         Promise!(HttpOutputStream) promise,
200                         ClientHttpHandler handler) {
201         http2Session.newStream(new HeadersFrame(request, null, endStream),
202                 new Http2ClientResponseHandler.ClientStreamPromise(request, promise),
203                 new Http2ClientResponseHandler(request, handler, this));
204     }
205 
206     override
207     void upgradeHttp2(Request request, SettingsFrame settings, Promise!(HttpClientConnection) promise,
208                              ClientHttpHandler upgradeHandler,
209                              ClientHttpHandler http2ResponseHandler) {
210         throw new CommonRuntimeException("The current connection version is http2, it does not need to upgrading.");
211     }
212 
213     override
214     void upgradeWebSocket(Request request, WebSocketPolicy policy, Promise!(WebSocketConnection) promise,
215                                  ClientHttpHandler upgradeHandler, IncomingFrames incomingFrames) {
216         throw new CommonRuntimeException("The current connection version is http2, it can not upgrade WebSocket.");
217     }
218 
219 }