1 module hunt.http.client.Http1ClientConnection;
2 
3 import hunt.http.client.ClientHttpHandler;
4 import hunt.http.client.ClientHttp2SessionListener;
5 import hunt.http.client.HttpClientConnection;
6 import hunt.http.client.Http1ClientResponseHandler;
7 import hunt.http.client.Http2ClientConnection;
8 import hunt.http.client.Http2ClientResponseHandler;
9 import hunt.http.client.Http2ClientSession;
10 
11 import hunt.http.codec.http.decode.HttpParser;
12 import hunt.http.codec.http.encode.HttpGenerator;
13 import hunt.http.codec.http.frame.SettingsFrame;
14 import hunt.http.codec.http.model;
15 import hunt.http.codec.http.stream;
16 import hunt.http.codec.websocket.model.IncomingFrames;
17 import hunt.http.codec.websocket.model.common;
18 import hunt.http.codec.websocket.stream.WebSocketConnection;
19 import hunt.http.codec.websocket.stream.WebSocketPolicy;
20 import hunt.http.codec.websocket.stream.WebSocketConnectionImpl;
21 
22 import hunt.net.ConnectionType;
23 import hunt.net.secure.SecureSession;
24 import hunt.net.Session;
25 
26 // import hunt.util.Assert;
27 import hunt.container;
28 import hunt.io;
29 import hunt.lang.exception;
30 import hunt.logging;
31 import hunt.string;
32 import hunt.util.codec;
33 import hunt.util.concurrent.Promise;
34 
35 import std.array;
36 import std.base64;
37 import std.conv;
38 import std.random;
39 import std.socket;
40 
41 alias HttpProtocol = hunt.http.codec.http.model.Protocol.Protocol;
42 alias SessionListener = StreamSession.Listener;
43 
44 // import hunt.http.codec.websocket.model.WebSocketConstants.SPEC_VERSION;
45 
46 /**
47 */
48 class Http1ClientConnection : AbstractHttp1Connection, HttpClientConnection {
49 
50     private Promise!(WebSocketConnection) webSocketConnectionPromise;
51     private IncomingFrames incomingFrames;
52     private WebSocketPolicy policy;
53     private Promise!(HttpClientConnection) http2ConnectionPromise;
54     private Http2ClientConnection http2Connection;
55     private ClientHttp2SessionListener http2SessionListener;
56     private bool upgradeHttp2Complete = false; // new bool(false);
57     private bool upgradeWebSocketComplete = false; // new bool(false);
58     private ResponseHandlerWrap wrap;
59 
60     this(Http2Configuration config, TcpSession tcpSession, SecureSession secureSession) {
61         this(config, secureSession, tcpSession, new ResponseHandlerWrap());
62     }
63 
64     private this(Http2Configuration config, SecureSession secureSession,
65             TcpSession tcpSession, ResponseHandler responseHandler) {
66 
67         super(config, secureSession, tcpSession, null, responseHandler);
68         wrap = cast(ResponseHandlerWrap) responseHandler;
69         wrap.connection = this;
70     }
71 
72     override protected HttpParser initHttpParser(Http2Configuration config,
73             HttpRequestHandler requestHandler, ResponseHandler responseHandler) {
74         return new HttpParser(responseHandler, config.getMaxRequestHeadLength());
75     }
76 
77     override ConnectionType getConnectionType() {
78         return ConnectionType.HTTP1;
79     }
80 
81     override bool isEncrypted() {
82         return super.isEncrypted();
83     }
84 
85     HttpParser getParser() {
86         return parser;
87     }
88 
89     Http2Configuration getHttp2Configuration() {
90         return config;
91     }
92 
93     // dfmt off
94     override void upgradeHttp2(Request request, SettingsFrame settings, 
95             Promise!(HttpClientConnection) promise, ClientHttpHandler upgradeHandler,
96             ClientHttpHandler http2ResponseHandler) {
97 
98         Promise!(Stream) initStream = new Http2ClientResponseHandler.ClientStreamPromise(request, 
99             new class DefaultPromise!(HttpOutputStream) {
100 
101             override
102             void failed(Exception x) {
103                 errorf("Create client output stream exception", x);
104             }
105         });
106         Stream.Listener initStreamListener = new Http2ClientResponseHandler(request, http2ResponseHandler, this);
107         ClientHttp2SessionListener listener = new class ClientHttp2SessionListener {
108 
109             override
110             Map!(int, int) onPreface(StreamSession session) {
111                 return settings.getSettings();
112             }
113 
114         };
115         upgradeHttp2(request, settings, promise, initStream, initStreamListener, listener, upgradeHandler);
116     }
117 
118     void upgradeHttp2(Request request, SettingsFrame settings,
119             Promise!(HttpClientConnection) promise, Promise!(Stream) initStream,
120             Stream.Listener initStreamListener,
121             ClientHttp2SessionListener listener, ClientHttpHandler handler) {
122         if (isEncrypted()) {
123             throw new IllegalStateException("The TLS TCP connection must use ALPN to upgrade HTTP2");
124         }
125 
126         this.http2ConnectionPromise = promise;
127         this.http2SessionListener = listener;
128         http2Connection = new class Http2ClientConnection {
129             this() {
130                 super(getHttp2Configuration(),
131                 getTcpSession(), null, http2SessionListener);
132             }
133             override
134             protected Http2Session initHttp2Session(Http2Configuration config, FlowControlStrategy flowControl,
135                                                     StreamSession.Listener listener) {
136                 return Http2ClientSession.initSessionForUpgradingHTTP2(null, this.tcpSession, generator,
137                         listener, flowControl, 3, config.getStreamIdleTimeout(), initStream,
138                         initStreamListener);
139             }
140         };        
141 
142         // generate http2 upgrading headers
143         request.getFields().add(new HttpField(HttpHeader.CONNECTION, "Upgrade, HTTP2-Settings"));
144         request.getFields().add(new HttpField(HttpHeader.UPGRADE, "h2c"));
145         if (settings !is null) {
146             List!(ByteBuffer) byteBuffers = http2Generator.control(settings);
147             if (byteBuffers !is null && byteBuffers.size() > 0) {
148                 try {
149                     // ByteArrayOutputStream ot = new ByteArrayOutputStream();
150                     // foreach (ByteBuffer buffer ; byteBuffers) {
151                     //     ot.write(BufferUtils.toArray(buffer));
152                     // }
153                     Appender!(byte[]) ot;
154                     foreach (ByteBuffer buffer; byteBuffers) {
155                         byte[] bufferArray = BufferUtils.toArray(buffer);
156                         // writeln("before1:\t" ~ TypeUtils.toHexString(bufferArray));
157                         // writefln("before1:\t%(%02X %)" , bufferArray);
158                         ot.put(bufferArray);
159                     }
160                     byte[] settingsFrame = ot.data; //ot.toByteArray();
161                     byte[] settingsPayload = new byte[settingsFrame.length - 9];
162                     // System.arraycopy(settingsFrame, 9, settingsPayload, 0, settingsPayload.length);
163                     settingsPayload[0 .. settingsPayload.length] = settingsFrame[9 .. 9 + settingsPayload.length];
164 
165                     request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS,
166                             // Base64Utils.encodeToUrlSafeString(settingsPayload)
167                             Base64URL.encode(settingsPayload)));
168                 } catch (IOException e) {
169                     errorf("generate http2 upgrading settings exception", e);
170                 }
171             } else {
172                 request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, ""));
173             }
174         } else {
175             request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, ""));
176         }
177 
178         send(request, handler);
179     }
180     // dfmt on
181 
182     bool upgradeProtocolComplete(HttpRequest request, HttpResponse response) {
183         switch (ProtocolHelper.from(response)) {
184         case HttpProtocol.H2: {
185                 if (http2ConnectionPromise !is null
186                         && http2SessionListener !is null && http2Connection !is null) {
187                     upgradeHttp2Complete = true;
188                     getTcpSession().attachObject(http2Connection);
189                     http2SessionListener.setConnection(http2Connection);
190                     http2Connection.initialize(getHttp2Configuration(),
191                             http2ConnectionPromise, http2SessionListener);
192                     return true;
193                 } else {
194                     resetUpgradeProtocol();
195                     return false;
196                 }
197             }
198         case HttpProtocol.WEB_SOCKET: {
199                 if (webSocketConnectionPromise !is null && incomingFrames !is null && policy !is null) {
200                     upgradeWebSocketComplete = true;
201                     WebSocketConnection webSocketConnection = new WebSocketConnectionImpl(secureSession,
202                             tcpSession, incomingFrames, policy, request, response, config);
203                     getTcpSession().attachObject(cast(Object) webSocketConnection);
204                     webSocketConnectionPromise.succeeded(webSocketConnection);
205                     return true;
206                 } else {
207                     resetUpgradeProtocol();
208                     return false;
209                 }
210             }
211         default:
212             resetUpgradeProtocol();
213             return false;
214         }
215     }
216 
217     private void resetUpgradeProtocol() {
218         if (http2ConnectionPromise !is null) {
219             http2ConnectionPromise.failed(new IllegalStateException("upgrade h2 failed"));
220             http2ConnectionPromise = null;
221         }
222         http2SessionListener = null;
223         http2Connection = null;
224         if (webSocketConnectionPromise !is null) {
225             webSocketConnectionPromise.failed(
226                     new IllegalStateException("The websocket handshake failed"));
227             webSocketConnectionPromise = null;
228         }
229         incomingFrames = null;
230         policy = null;
231     }
232 
233     override void upgradeWebSocket(Request request, WebSocketPolicy policy,
234             Promise!(WebSocketConnection) promise,
235             ClientHttpHandler upgradeHandler, IncomingFrames incomingFrames) {
236         assert(HttpMethod.GET.asString() == request.getMethod(),
237                 "The method of the request MUST be GET in the websocket handshake.");
238 
239         assert(policy.getBehavior() == WebSocketBehavior.CLIENT,
240                 "The websocket behavior MUST be client");
241 
242         request.getFields().put(HttpHeader.SEC_WEBSOCKET_VERSION,
243                 WebSocketConstants.SPEC_VERSION.to!string());
244         request.getFields().put(HttpHeader.UPGRADE, "websocket");
245         request.getFields().put(HttpHeader.CONNECTION, "Upgrade");
246         request.getFields().put(HttpHeader.SEC_WEBSOCKET_KEY, genRandomKey());
247         webSocketConnectionPromise = promise;
248         this.incomingFrames = incomingFrames;
249         this.policy = policy;
250         send(request, upgradeHandler);
251     }
252 
253     private string genRandomKey() {
254         byte[] bytes = new byte[16];
255         // ThreadLocalRandom.current().nextBytes(bytes);
256         auto rnd = Random(2018);
257         for (int i; i < bytes.length; i++)
258             bytes[i] = cast(byte) uniform(byte.min, byte.max, rnd);
259         return cast(string)(B64Code.encode(bytes));
260     }
261 
262     override HttpOutputStream sendRequestWithContinuation(Request request, ClientHttpHandler handler) {
263         request.getFields().put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE);
264         HttpOutputStream outputStream = getHttpOutputStream(request, handler);
265         try {
266             outputStream.commit();
267         } catch (IOException e) {
268             errorf("client generates the HTTP message exception", e);
269         }
270         return outputStream;
271     }
272 
273     override void send(Request request, ClientHttpHandler handler) {
274         try {
275             tracef("client request and does not send data");
276             HttpOutputStream output = getHttpOutputStream(request, handler);
277             output.close();
278         } catch (Exception e) {
279             errorf("client generates the HTTP message exception", e);
280         }
281     }
282 
283     override void send(Request request, ByteBuffer buffer, ClientHttpHandler handler) {
284         try {
285             HttpOutputStream output = getHttpOutputStream(request, handler);
286             if (buffer !is null) {
287                 output.writeWithContentLength(buffer);
288             }
289         } catch (IOException e) {
290             errorf("client generates the HTTP message exception", e);
291         }
292     }
293 
294     override void send(Request request, ByteBuffer[] buffers, ClientHttpHandler handler) {
295         try {
296             HttpOutputStream output = getHttpOutputStream(request, handler);
297             if (buffers !is null) {
298                 output.writeWithContentLength(buffers);
299             }
300         } catch (IOException e) {
301             errorf("client generates the HTTP message exception", e);
302         }
303     }
304 
305     override HttpOutputStream getHttpOutputStream(Request request, ClientHttpHandler handler) {
306         Http1ClientResponseHandler http1ClientResponseHandler = new Http1ClientResponseHandler(
307                 handler);
308         checkWrite(request, http1ClientResponseHandler);
309         http1ClientResponseHandler.outputStream = new Http1ClientRequestOutputStream(this,
310                 wrap.writing.request);
311         return http1ClientResponseHandler.outputStream;
312     }
313 
314     override void send(Request request, Promise!(HttpOutputStream) promise,
315             ClientHttpHandler handler) {
316         promise.succeeded(getHttpOutputStream(request, handler));
317     }
318 
319     private void checkWrite(Request request, Http1ClientResponseHandler handler) {
320         assert(request, "The http client request is null.");
321         assert(handler, "The http1 client response handler is null.");
322         assert(isOpen(), "The current connection " ~ tcpSession.getSessionId()
323                 .to!string ~ " has been closed.");
324         assert(!upgradeHttp2Complete, "The current connection " ~ tcpSession.getSessionId()
325                 .to!string ~ " has upgraded HTTP2.");
326         assert(!upgradeWebSocketComplete, "The current connection " ~ tcpSession.getSessionId()
327                 .to!string ~ " has upgraded WebSocket.");
328 
329         if (wrap.writing is null) {
330             wrap.writing = handler;
331             request.getFields().put(HttpHeader.HOST, tcpSession.getRemoteAddress().toAddrString());
332             handler.connection = this;
333             handler.request = request;
334         } else {
335             throw new WritePendingException("");
336         }
337     }
338 
339     override void close() {
340         if (isOpen()) {
341             super.close();
342         }
343     }
344 
345     override bool isClosed() {
346         return !isOpen();
347     }
348 
349     override bool isOpen() {
350         version (HUNT_DEBUG) {
351             tracef("Connection status: isOpen=%s, upgradeHttp2Complete=%s, upgradeWebSocketComplete=%s",
352                     tcpSession.isOpen(), upgradeHttp2Complete, upgradeWebSocketComplete);
353         }
354         return tcpSession.isOpen() && !upgradeHttp2Complete && !upgradeWebSocketComplete;
355     }
356 
357     bool getUpgradeHttp2Complete() {
358         return upgradeHttp2Complete;
359     }
360 
361     bool getUpgradeWebSocketComplete() {
362         return upgradeWebSocketComplete;
363     }
364 }
365 
366 /**
367 */
368 private class ResponseHandlerWrap : ResponseHandler {
369 
370     private Http1ClientResponseHandler writing; // = new AtomicReference<)();
371     private int status;
372     private string reason;
373     private Http1ClientConnection connection;
374 
375     void badMessage(BadMessageException failure) {
376         badMessage(failure.getCode(), failure.getReason());
377     }
378 
379     override void earlyEOF() {
380         Http1ClientResponseHandler h = writing;
381         if (h !is null) {
382             h.earlyEOF();
383         } else {
384             IOUtils.close(connection);
385         }
386 
387         writing = null;
388     }
389 
390     override void parsedHeader(HttpField field) {
391         writing.parsedHeader(field);
392     }
393 
394     override bool headerComplete() {
395         return writing.headerComplete();
396     }
397 
398     override bool content(ByteBuffer item) {
399         return writing.content(item);
400     }
401 
402     override bool contentComplete() {
403         return writing.contentComplete();
404     }
405 
406     override void parsedTrailer(HttpField field) {
407         writing.parsedTrailer(field);
408     }
409 
410     override bool messageComplete() {
411         if (status == 100 && "Continue".equalsIgnoreCase(reason)) {
412             tracef("client received the 100 Continue response");
413             connection.getParser().reset();
414             return true;
415         } else {
416             auto r = writing.messageComplete();
417             writing = null;
418             return r;
419         }
420     }
421 
422     override void badMessage(int status, string reason) {
423         Http1ClientResponseHandler h = writing;
424         writing = null;
425         if (h !is null) {
426             h.badMessage(status, reason);
427         } else {
428             IOUtils.close(connection);
429         }
430     }
431 
432     override int getHeaderCacheSize() {
433         return 1024;
434     }
435 
436     override bool startResponse(HttpVersion ver, int status, string reason) {
437         this.status = status;
438         this.reason = reason;
439         return writing.startResponse(ver, status, reason);
440     }
441 
442 }
443 
444 /**
445 */
446 class Http1ClientRequestOutputStream : AbstractHttp1OutputStream {
447 
448     private Http1ClientConnection connection;
449     private HttpGenerator httpGenerator;
450 
451     private this(Http1ClientConnection connection, Request request) {
452         super(request, true);
453         this.connection = connection;
454         httpGenerator = new HttpGenerator();
455     }
456 
457     override protected void generateHttpMessageSuccessfully() {
458         tracef("client session %s generates the HTTP message completely", connection.getSessionId());
459     }
460 
461     override protected void generateHttpMessageExceptionally(HttpGenerator.Result actualResult,
462             HttpGenerator.State actualState, HttpGenerator.Result expectedResult,
463             HttpGenerator.State expectedState) {
464         errorf("http1 generator error, actual: [%s, %s], expected: [%s, %s]",
465                 actualResult, actualState, expectedResult, expectedState);
466         throw new IllegalStateException("client generates http message exception.");
467     }
468 
469     override protected ByteBuffer getHeaderByteBuffer() {
470         return BufferUtils.allocate(connection.getHttp2Configuration().getMaxRequestHeadLength());
471     }
472 
473     override protected ByteBuffer getTrailerByteBuffer() {
474         return BufferUtils.allocate(connection.getHttp2Configuration()
475                 .getMaxRequestTrailerLength());
476     }
477 
478     override protected TcpSession getSession() {
479         return connection.getTcpSession();
480     }
481 
482     override protected HttpGenerator getHttpGenerator() {
483         return httpGenerator;
484     }
485 }