1 module hunt.http.client.Http1ClientConnection;
2 
3 import hunt.http.client.ClientHttpHandler;
4 import hunt.http.client.ClientHttp2SessionListener;
5 import hunt.http.client.HttpClientConnection;
6 import hunt.http.client.Http1ClientResponseHandler;
7 import hunt.http.client.Http2ClientConnection;
8 import hunt.http.client.Http2ClientResponseHandler;
9 import hunt.http.client.Http2ClientSession;
10 
11 import hunt.http.codec.http.decode.HttpParser;
12 import hunt.http.codec.http.encode.HttpGenerator;
13 import hunt.http.codec.http.frame.SettingsFrame;
14 import hunt.http.codec.http.model;
15 import hunt.http.codec.http.stream;
16 import hunt.http.WebSocketCommon;
17 import hunt.http.WebSocketConnection;
18 import hunt.http.codec.websocket.stream.IOState;
19 import hunt.http.codec.websocket.stream.WebSocketConnectionImpl;
20 
21 import hunt.http.HttpConnection;
22 import hunt.http.HttpField;
23 import hunt.http.HttpFields;
24 import hunt.http.HttpHeader;
25 import hunt.http.HttpMethod;
26 import hunt.http.HttpOptions;
27 import hunt.http.HttpOutputStream;
28 import hunt.http.HttpRequest;
29 import hunt.http.HttpResponse;
30 import hunt.http.HttpVersion;
31 import hunt.http.WebSocketPolicy;
32 
33 import hunt.net.secure.SecureSession;
34 import hunt.net.Connection;
35 
36 import hunt.collection;
37 import hunt.concurrency.Promise;
38 import hunt.stream;
39 import hunt.Exceptions;
40 import hunt.logging;
41 import hunt.text.Common;
42 import hunt.text.Codec;
43 
44 import std.array;
45 import std.base64;
46 import std.conv;
47 import std.random;
48 import std.socket;
49 
50 alias HttpProtocol = hunt.http.codec.http.model.Protocol.Protocol;
51 alias SessionListener = StreamSession.Listener;
52 
53 // import hunt.http.codec.websocket.model.WebSocketConstants.SPEC_VERSION;
54 
55 /**
56 */
57 class Http1ClientConnection : AbstractHttp1Connection, HttpClientConnection {
58 
59     private Promise!(WebSocketConnection) webSocketConnectionPromise;
60     private IncomingFrames incomingFrames;
61     private WebSocketPolicy policy;
62     private Promise!(HttpClientConnection) http2ConnectionPromise;
63     private Http2ClientConnection http2Connection;
64     private ClientHttp2SessionListener http2SessionListener;
65     private bool upgradeHttp2Complete = false; 
66     private bool upgradeWebSocketComplete = false;
67     private ResponseHandlerWrap wrap;
68 
69     this(HttpOptions config, Connection tcpSession) { // , SecureSession secureSession
70         this(config, tcpSession, new ResponseHandlerWrap()); // secureSession, 
71     }
72 
73     private this(HttpOptions config,
74             Connection tcpSession, HttpResponseParsingHandler responseHandler) {
75 
76         super(config, tcpSession, null, responseHandler);
77         wrap = cast(ResponseHandlerWrap) responseHandler;
78         wrap.connection = this;
79     }
80 
81     override protected HttpParser initHttpParser(HttpOptions config,
82             HttpRequestParsingHandler requestHandler, HttpResponseParsingHandler responseHandler) {
83         return new HttpParser(responseHandler, config.getMaxRequestHeadLength());
84     }
85 
86     override HttpConnectionType getConnectionType() {
87         return HttpConnectionType.HTTP1;
88     }
89 
90     // override bool isSecured() {
91     //     return super.isSecured();
92     // }
93 
94     HttpParser getParser() {
95         return parser;
96     }
97 
98     HttpOptions getHttpOptions() {
99         return config;
100     }
101 
102     // dfmt off
103     override void upgradeHttp2(HttpRequest request, SettingsFrame settings, 
104             Promise!(HttpClientConnection) promise, ClientHttpHandler upgradeHandler,
105             ClientHttpHandler http2ResponseHandler) {
106 
107         Promise!(Stream) initStream = new Http2ClientResponseHandler.ClientStreamPromise(request, 
108             new class DefaultPromise!(HttpOutputStream) {
109 
110             override
111             bool failed(Throwable x) {
112                 errorf("Create client output stream exception", x);
113                 return true;
114             }
115         });
116 
117         Stream.Listener initStreamListener = new Http2ClientResponseHandler(request, http2ResponseHandler, this);
118         ClientHttp2SessionListener listener = new class ClientHttp2SessionListener {
119 
120             override
121             Map!(int, int) onPreface(StreamSession session) {
122                 return settings.getSettings();
123             }
124 
125         };
126         upgradeHttp2(request, settings, promise, initStream, initStreamListener, listener, upgradeHandler);
127     }
128 
129     void upgradeHttp2(HttpRequest request, SettingsFrame settings,
130             Promise!(HttpClientConnection) promise, Promise!(Stream) initStream,
131             Stream.Listener initStreamListener,
132             ClientHttp2SessionListener listener, ClientHttpHandler handler) {
133         if (isSecured()) {
134             throw new IllegalStateException("The TLS TCP connection must use ALPN to upgrade HTTP2");
135         }
136 
137         this.http2ConnectionPromise = promise;
138         this.http2SessionListener = listener;
139         http2Connection = new class Http2ClientConnection {
140             this() {
141                 super(getHttpOptions(), this.outer.getTcpConnection(), http2SessionListener);
142             }
143             
144             override
145             protected Http2Session initHttp2Session(HttpOptions config, FlowControlStrategy flowControl,
146                                                     StreamSession.Listener listener) {
147                 return Http2ClientSession.initSessionForUpgradingHTTP2(null, this.outer.getTcpConnection(), generator,
148                         listener, flowControl, 3, config.getStreamIdleTimeout(), initStream,
149                         initStreamListener);
150             }
151         };        
152 
153         // generate http2 upgrading headers
154         request.getFields().add(new HttpField(HttpHeader.CONNECTION, "Upgrade, HTTP2-Settings"));
155         request.getFields().add(new HttpField(HttpHeader.UPGRADE, "h2c"));
156         if (settings !is null) {
157             List!(ByteBuffer) byteBuffers = http2Generator.control(settings);
158             if (byteBuffers !is null && byteBuffers.size() > 0) {
159                 try {
160                     // ByteArrayOutputStream ot = new ByteArrayOutputStream();
161                     // foreach (ByteBuffer buffer ; byteBuffers) {
162                     //     ot.write(BufferUtils.toArray(buffer));
163                     // }
164                     Appender!(byte[]) ot;
165                     foreach (ByteBuffer buffer; byteBuffers) {
166                         byte[] bufferArray = BufferUtils.toArray(buffer);
167                         // writeln("before1:\t" ~ TypeUtils.toHexString(bufferArray));
168                         // writefln("before1:\t%(%02X %)" , bufferArray);
169                         ot.put(bufferArray);
170                     }
171                     byte[] settingsFrame = ot.data; //ot.toByteArray();
172                     byte[] settingsPayload = new byte[settingsFrame.length - 9];
173                     // System.arraycopy(settingsFrame, 9, settingsPayload, 0, settingsPayload.length);
174                     settingsPayload[0 .. settingsPayload.length] = settingsFrame[9 .. 9 + settingsPayload.length];
175 
176                     request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS,
177                             // Base64Utils.encodeToUrlSafeString(settingsPayload)
178                             Base64URL.encode(settingsPayload)));
179                 } catch (IOException e) {
180                     errorf("generate http2 upgrading settings exception", e);
181                 }
182             } else {
183                 request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, ""));
184             }
185         } else {
186             request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, ""));
187         }
188 
189         send(request, handler);
190     }
191     // dfmt on
192 
193     bool upgradeProtocolComplete(HttpRequest request, HttpResponse response) {
194         switch (ProtocolHelper.from(response)) {
195         case HttpProtocol.H2: {
196                 if (http2ConnectionPromise !is null
197                         && http2SessionListener !is null && http2Connection !is null) {
198                     upgradeHttp2Complete = true;
199                     // tcpSession.attachObject(http2Connection);
200                     setAttribute(HttpConnection.NAME, http2Connection);
201                     http2SessionListener.setConnection(http2Connection);
202                     http2Connection.initialize(getHttpOptions(),
203                             http2ConnectionPromise, http2SessionListener);
204                     return true;
205                 } else {
206                     resetUpgradeProtocol();
207                     return false;
208                 }
209             }
210         case HttpProtocol.WEB_SOCKET: {
211                 if (webSocketConnectionPromise !is null && incomingFrames !is null && policy !is null) {
212                     upgradeWebSocketComplete = true;
213                     WebSocketConnection webSocketConnection = new WebSocketConnectionImpl(
214                             getTcpConnection(), incomingFrames, policy, request, response, config);
215                     // tcpSession.attachObject(cast(Object) webSocketConnection);
216                     setAttribute(HttpConnection.NAME, cast(Object)webSocketConnection);
217                     IOState state = webSocketConnection.getIOState();
218                     state.onConnected();
219                     state.onOpened();
220                     webSocketConnectionPromise.succeeded(webSocketConnection);
221                     return true;
222                 } else {
223                     resetUpgradeProtocol();
224                     return false;
225                 }
226             }
227         default:
228             resetUpgradeProtocol();
229             return false;
230         }
231     }
232 
233     private void resetUpgradeProtocol() {
234         if (http2ConnectionPromise !is null) {
235             http2ConnectionPromise.failed(new IllegalStateException("upgrade h2 failed"));
236             http2ConnectionPromise = null;
237         }
238         http2SessionListener = null;
239         http2Connection = null;
240         if (webSocketConnectionPromise !is null) {
241             webSocketConnectionPromise.failed(
242                     new IllegalStateException("The websocket handshake failed"));
243             webSocketConnectionPromise = null;
244         }
245         incomingFrames = null;
246         policy = null;
247     }
248 
249     override void upgradeWebSocket(HttpRequest request, WebSocketPolicy policy,
250             Promise!(WebSocketConnection) promise,
251             ClientHttpHandler upgradeHandler, IncomingFrames incomingFrames) {
252         assert(HttpMethod.GET.asString() == request.getMethod(),
253                 "The method of the request MUST be GET in the websocket handshake.");
254 
255         assert(policy.getBehavior() == WebSocketBehavior.CLIENT,
256                 "The websocket behavior MUST be client");
257 
258         request.getFields().put(HttpHeader.SEC_WEBSOCKET_VERSION,
259                 WebSocketConstants.SPEC_VERSION.to!string());
260         request.getFields().put(HttpHeader.UPGRADE, "websocket");
261         request.getFields().put(HttpHeader.CONNECTION, "Upgrade");
262         request.getFields().put(HttpHeader.SEC_WEBSOCKET_KEY, genRandomKey());
263         webSocketConnectionPromise = promise;
264         this.incomingFrames = incomingFrames;
265         this.policy = policy;
266         send(request, upgradeHandler);
267     }
268 
269     private string genRandomKey() {
270         byte[] bytes = new byte[16];
271         // ThreadLocalRandom.current().nextBytes(bytes);
272         auto rnd = Random(2018);
273         for (int i; i < bytes.length; i++)
274             bytes[i] = cast(byte) uniform(byte.min, byte.max, rnd);
275         return cast(string)(B64Code.encode(bytes));
276     }
277 
278     override HttpOutputStream sendRequestWithContinuation(HttpRequest request, ClientHttpHandler handler) {
279         request.getFields().put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE);
280         HttpOutputStream outputStream = getHttpOutputStream(request, handler);
281         try {
282             outputStream.commit();
283         } catch (IOException e) {
284             errorf("client generates the HTTP message exception", e);
285         }
286         return outputStream;
287     }
288 
289     override void send(HttpRequest request, ClientHttpHandler handler) {
290         try {
291             version (HUNT_HTTP_DEBUG) tracef("client request and does not send data");
292             HttpOutputStream output = getHttpOutputStream(request, handler);
293             output.close();
294         } catch (Exception e) {
295             errorf("client generates the HTTP message exception", e);
296         }
297     }
298 
299     override void send(HttpRequest request, ByteBuffer buffer, ClientHttpHandler handler) {
300         try {
301             HttpOutputStream output = getHttpOutputStream(request, handler);
302             if (buffer !is null) {
303                 output.writeWithContentLength(buffer);
304             }
305         } catch (IOException e) {
306             errorf("client generates the HTTP message exception", e);
307         }
308     }
309 
310     override void send(HttpRequest request, ByteBuffer[] buffers, ClientHttpHandler handler) {
311         try {
312             HttpOutputStream output = getHttpOutputStream(request, handler);
313             if (buffers !is null) {
314                 output.writeWithContentLength(buffers);
315             }
316         } catch (IOException e) {
317             errorf("client generates the HTTP message exception", e);
318         }
319     }
320 
321     override void send(HttpRequest request, Promise!(HttpOutputStream) promise,
322             ClientHttpHandler handler) {
323         promise.succeeded(getHttpOutputStream(request, handler));
324     }
325 
326     override HttpOutputStream getHttpOutputStream(HttpRequest request, ClientHttpHandler handler) {
327         Http1ClientResponseHandler http1ClientResponseHandler = 
328             new Http1ClientResponseHandler(handler);
329         checkWrite(request, http1ClientResponseHandler);
330         http1ClientResponseHandler.outputStream = new Http1ClientRequestOutputStream(this,
331                 wrap.writing.request);
332         return http1ClientResponseHandler.outputStream;
333     }
334 
335     private void checkWrite(HttpRequest request, Http1ClientResponseHandler handler) {
336         assert(request, "The http client request is null.");
337         assert(handler, "The http1 client response handler is null.");
338         assert(isOpen(), "The current connection " ~ getId()
339                 .to!string ~ " has been closed.");
340         assert(!upgradeHttp2Complete, "The current connection " ~ getId()
341                 .to!string ~ " has upgraded HTTP2.");
342         assert(!upgradeWebSocketComplete, "The current connection " ~ getId()
343                 .to!string ~ " has upgraded WebSocket.");
344 
345         if (wrap.writing is null) {
346             wrap.writing = handler;
347             HttpFields headerFields = request.getFields();
348             
349             if(!headerFields.contains(HttpHeader.HOST)) {
350                 headerFields.put(HttpHeader.HOST, request.getURI.getHost());
351             }
352             handler.connection = this;
353             handler.request = request;
354             handler.onReady();
355         } else {
356             throw new WritePendingException("");
357         }
358     }
359 
360     override void close() {
361         if (isOpen()) {
362             super.close();
363         }
364     }
365 
366     // override bool isClosed() {
367     //     return !isOpen();
368     // }
369 
370     // override 
371     bool isOpen() {
372         version (HUNT_HTTP_DEBUG) {
373             tracef("Connection status: isOpen=%s, upgradeHttp2Complete=%s, upgradeWebSocketComplete=%s",
374                     getTcpConnection().isConnected(), upgradeHttp2Complete, upgradeWebSocketComplete);
375         }
376         return getTcpConnection().isConnected() && !upgradeHttp2Complete && !upgradeWebSocketComplete;
377     }
378 
379     bool getUpgradeHttp2Complete() {
380         return upgradeHttp2Complete;
381     }
382 
383     bool getUpgradeWebSocketComplete() {
384         return upgradeWebSocketComplete;
385     }
386 }
387 
388 /**
389 */
390 private class ResponseHandlerWrap : HttpResponseParsingHandler {
391 
392     private Http1ClientResponseHandler writing; // = new AtomicReference<)();
393     private int status;
394     private string reason;
395     private Http1ClientConnection connection;
396 
397     void badMessage(BadMessageException failure) {
398         badMessage(failure.getCode(), failure.getReason());
399     }
400 
401     override void earlyEOF() {
402         Http1ClientResponseHandler h = writing;
403         if (h !is null) {
404             h.earlyEOF();
405         } else {
406             IOUtils.close(connection);
407         }
408 
409         writing = null;
410     }
411 
412     override void parsedHeader(HttpField field) {
413         writing.parsedHeader(field);
414     }
415 
416     override bool headerComplete() {
417         return writing.headerComplete();
418     }
419 
420     override bool content(ByteBuffer item) {
421         return writing.content(item);
422     }
423 
424     override bool contentComplete() {
425         return writing.contentComplete();
426     }
427 
428     override void parsedTrailer(HttpField field) {
429         writing.parsedTrailer(field);
430     }
431 
432     override bool messageComplete() {
433         if (status == 100 && "Continue".equalsIgnoreCase(reason)) {
434             tracef("client received the 100 Continue response");
435             connection.getParser().reset();
436             return true;
437         } else {
438             auto r = writing.messageComplete();
439             writing = null;
440             return r;
441         }
442     }
443 
444     override void badMessage(int status, string reason) {
445         Http1ClientResponseHandler h = writing;
446         writing = null;
447         if (h !is null) {
448             h.badMessage(status, reason);
449         } else {
450             IOUtils.close(connection);
451         }
452     }
453 
454     override int getHeaderCacheSize() {
455         return 1024;
456     }
457 
458     override bool startResponse(HttpVersion ver, int status, string reason) {
459         this.status = status;
460         this.reason = reason;
461         return writing.startResponse(ver, status, reason);
462     }
463 
464 }
465 
466 /** 
467  * 
468  */
469 class Http1ClientRequestOutputStream : AbstractHttp1OutputStream {
470     private Http1ClientConnection connection;
471     private HttpGenerator httpGenerator;
472 
473     private this(Http1ClientConnection connection, HttpRequest request) {
474         super(request, true);
475         this.connection = connection;
476         httpGenerator = new HttpGenerator();
477     }
478 
479 
480     override protected void generateHttpMessageSuccessfully() {
481         version(HUNT_DEBUG) tracef("client session %s generates the HTTP message completely", connection.getId());
482     }
483 
484     override protected void generateHttpMessageExceptionally(HttpGenerator.Result actualResult,
485             HttpGenerator.State actualState, HttpGenerator.Result expectedResult,
486             HttpGenerator.State expectedState) {
487         errorf("http1 generator error, actual: [%s, %s], expected: [%s, %s]",
488                 actualResult, actualState, expectedResult, expectedState);
489         throw new IllegalStateException("client generates http message exception.");
490     }
491 
492     override protected ByteBuffer getHeaderByteBuffer() {
493         return BufferUtils.allocate(connection.getHttpOptions().getMaxRequestHeadLength());
494     }
495 
496     override protected ByteBuffer getTrailerByteBuffer() {
497         return BufferUtils.allocate(connection.getHttpOptions()
498                 .getMaxRequestTrailerLength());
499     }
500 
501     override protected Connection getSession() {
502         return connection.getTcpConnection();
503     }
504 
505     override protected HttpGenerator getHttpGenerator() {
506         return httpGenerator;
507     }
508 }