1 module hunt.http.client.Http1ClientConnection;
2 
3 import hunt.http.client.ClientHttpHandler;
4 import hunt.http.client.ClientHttp2SessionListener;
5 import hunt.http.client.HttpClientConnection;
6 import hunt.http.client.Http1ClientResponseHandler;
7 import hunt.http.client.Http2ClientConnection;
8 import hunt.http.client.Http2ClientResponseHandler;
9 import hunt.http.client.Http2ClientSession;
10 
11 import hunt.http.codec.http.decode.HttpParser;
12 import hunt.http.codec.http.encode.HttpGenerator;
13 import hunt.http.codec.http.frame.SettingsFrame;
14 import hunt.http.codec.http.model;
15 import hunt.http.codec.http.stream;
16 import hunt.http.WebSocketCommon;
17 import hunt.http.WebSocketConnection;
18 import hunt.http.codec.websocket.stream.IOState;
19 import hunt.http.codec.websocket.stream.WebSocketConnectionImpl;
20 
21 import hunt.http.HttpConnection;
22 import hunt.http.HttpField;
23 import hunt.http.HttpFields;
24 import hunt.http.HttpHeader;
25 import hunt.http.HttpMethod;
26 import hunt.http.HttpOptions;
27 import hunt.http.HttpOutputStream;
28 import hunt.http.HttpRequest;
29 import hunt.http.HttpResponse;
30 import hunt.http.HttpVersion;
31 import hunt.http.WebSocketPolicy;
32 
33 import hunt.net.secure.SecureSession;
34 import hunt.net.Connection;
35 
36 import hunt.collection;
37 import hunt.concurrency.Promise;
38 import hunt.stream;
39 import hunt.Exceptions;
40 import hunt.logging;
41 import hunt.text.Common;
42 import hunt.text.Codec;
43 
44 import std.array;
45 import std.base64;
46 import std.conv;
47 import std.random;
48 import std.socket;
49 
50 alias HttpProtocol = hunt.http.codec.http.model.Protocol.Protocol;
51 alias SessionListener = StreamSession.Listener;
52 
53 // import hunt.http.codec.websocket.model.WebSocketConstants.SPEC_VERSION;
54 
55 /**
56 */
57 class Http1ClientConnection : AbstractHttp1Connection, HttpClientConnection {
58 
59     private Promise!(WebSocketConnection) webSocketConnectionPromise;
60     private IncomingFrames incomingFrames;
61     private WebSocketPolicy policy;
62     private Promise!(HttpClientConnection) http2ConnectionPromise;
63     private Http2ClientConnection http2Connection;
64     private ClientHttp2SessionListener http2SessionListener;
65     private bool upgradeHttp2Complete = false; 
66     private bool upgradeWebSocketComplete = false;
67     private ResponseHandlerWrap wrap;
68 
69     this(HttpOptions config, Connection tcpSession) { // , SecureSession secureSession
70         this(config, tcpSession, new ResponseHandlerWrap()); // secureSession, 
71     }
72 
73     private this(HttpOptions config,
74             Connection tcpSession, HttpResponseParsingHandler responseHandler) {
75 
76         super(config, tcpSession, null, responseHandler);
77         wrap = cast(ResponseHandlerWrap) responseHandler;
78         wrap.connection = this;
79     }
80 
81     override protected HttpParser initHttpParser(HttpOptions config,
82             HttpRequestParsingHandler requestHandler, HttpResponseParsingHandler responseHandler) {
83         return new HttpParser(responseHandler, config.getMaxRequestHeadLength());
84     }
85 
86     override HttpConnectionType getConnectionType() {
87         return HttpConnectionType.HTTP1;
88     }
89 
90     // override bool isSecured() {
91     //     return super.isSecured();
92     // }
93 
94     HttpParser getParser() {
95         return parser;
96     }
97 
98     HttpOptions getHttpOptions() {
99         return config;
100     }
101 
102     // dfmt off
103     override void upgradeHttp2(HttpRequest request, SettingsFrame settings, 
104             Promise!(HttpClientConnection) promise, ClientHttpHandler upgradeHandler,
105             ClientHttpHandler http2ResponseHandler) {
106 
107         Promise!(Stream) initStream = new Http2ClientResponseHandler.ClientStreamPromise(request, 
108             new class DefaultPromise!(HttpOutputStream) {
109 
110             override
111             void failed(Exception x) {
112                 errorf("Create client output stream exception", x);
113             }
114         });
115         Stream.Listener initStreamListener = new Http2ClientResponseHandler(request, http2ResponseHandler, this);
116         ClientHttp2SessionListener listener = new class ClientHttp2SessionListener {
117 
118             override
119             Map!(int, int) onPreface(StreamSession session) {
120                 return settings.getSettings();
121             }
122 
123         };
124         upgradeHttp2(request, settings, promise, initStream, initStreamListener, listener, upgradeHandler);
125     }
126 
127     void upgradeHttp2(HttpRequest request, SettingsFrame settings,
128             Promise!(HttpClientConnection) promise, Promise!(Stream) initStream,
129             Stream.Listener initStreamListener,
130             ClientHttp2SessionListener listener, ClientHttpHandler handler) {
131         if (isSecured()) {
132             throw new IllegalStateException("The TLS TCP connection must use ALPN to upgrade HTTP2");
133         }
134 
135         this.http2ConnectionPromise = promise;
136         this.http2SessionListener = listener;
137         http2Connection = new class Http2ClientConnection {
138             this() {
139                 super(getHttpOptions(), this.outer.getTcpConnection(), http2SessionListener);
140             }
141             override
142             protected Http2Session initHttp2Session(HttpOptions config, FlowControlStrategy flowControl,
143                                                     StreamSession.Listener listener) {
144                 return Http2ClientSession.initSessionForUpgradingHTTP2(null, this.outer.getTcpConnection(), generator,
145                         listener, flowControl, 3, config.getStreamIdleTimeout(), initStream,
146                         initStreamListener);
147             }
148         };        
149 
150         // generate http2 upgrading headers
151         request.getFields().add(new HttpField(HttpHeader.CONNECTION, "Upgrade, HTTP2-Settings"));
152         request.getFields().add(new HttpField(HttpHeader.UPGRADE, "h2c"));
153         if (settings !is null) {
154             List!(ByteBuffer) byteBuffers = http2Generator.control(settings);
155             if (byteBuffers !is null && byteBuffers.size() > 0) {
156                 try {
157                     // ByteArrayOutputStream ot = new ByteArrayOutputStream();
158                     // foreach (ByteBuffer buffer ; byteBuffers) {
159                     //     ot.write(BufferUtils.toArray(buffer));
160                     // }
161                     Appender!(byte[]) ot;
162                     foreach (ByteBuffer buffer; byteBuffers) {
163                         byte[] bufferArray = BufferUtils.toArray(buffer);
164                         // writeln("before1:\t" ~ TypeUtils.toHexString(bufferArray));
165                         // writefln("before1:\t%(%02X %)" , bufferArray);
166                         ot.put(bufferArray);
167                     }
168                     byte[] settingsFrame = ot.data; //ot.toByteArray();
169                     byte[] settingsPayload = new byte[settingsFrame.length - 9];
170                     // System.arraycopy(settingsFrame, 9, settingsPayload, 0, settingsPayload.length);
171                     settingsPayload[0 .. settingsPayload.length] = settingsFrame[9 .. 9 + settingsPayload.length];
172 
173                     request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS,
174                             // Base64Utils.encodeToUrlSafeString(settingsPayload)
175                             Base64URL.encode(settingsPayload)));
176                 } catch (IOException e) {
177                     errorf("generate http2 upgrading settings exception", e);
178                 }
179             } else {
180                 request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, ""));
181             }
182         } else {
183             request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, ""));
184         }
185 
186         send(request, handler);
187     }
188     // dfmt on
189 
190     bool upgradeProtocolComplete(HttpRequest request, HttpResponse response) {
191         switch (ProtocolHelper.from(response)) {
192         case HttpProtocol.H2: {
193                 if (http2ConnectionPromise !is null
194                         && http2SessionListener !is null && http2Connection !is null) {
195                     upgradeHttp2Complete = true;
196                     // tcpSession.attachObject(http2Connection);
197                     setAttribute(HttpConnection.NAME, http2Connection);
198                     http2SessionListener.setConnection(http2Connection);
199                     http2Connection.initialize(getHttpOptions(),
200                             http2ConnectionPromise, http2SessionListener);
201                     return true;
202                 } else {
203                     resetUpgradeProtocol();
204                     return false;
205                 }
206             }
207         case HttpProtocol.WEB_SOCKET: {
208                 if (webSocketConnectionPromise !is null && incomingFrames !is null && policy !is null) {
209                     upgradeWebSocketComplete = true;
210                     WebSocketConnection webSocketConnection = new WebSocketConnectionImpl(
211                             getTcpConnection(), incomingFrames, policy, request, response, config);
212                     // tcpSession.attachObject(cast(Object) webSocketConnection);
213                     setAttribute(HttpConnection.NAME, cast(Object)webSocketConnection);
214                     IOState state = webSocketConnection.getIOState();
215                     state.onConnected();
216                     state.onOpened();
217                     webSocketConnectionPromise.succeeded(webSocketConnection);
218                     return true;
219                 } else {
220                     resetUpgradeProtocol();
221                     return false;
222                 }
223             }
224         default:
225             resetUpgradeProtocol();
226             return false;
227         }
228     }
229 
230     private void resetUpgradeProtocol() {
231         if (http2ConnectionPromise !is null) {
232             http2ConnectionPromise.failed(new IllegalStateException("upgrade h2 failed"));
233             http2ConnectionPromise = null;
234         }
235         http2SessionListener = null;
236         http2Connection = null;
237         if (webSocketConnectionPromise !is null) {
238             webSocketConnectionPromise.failed(
239                     new IllegalStateException("The websocket handshake failed"));
240             webSocketConnectionPromise = null;
241         }
242         incomingFrames = null;
243         policy = null;
244     }
245 
246     override void upgradeWebSocket(HttpRequest request, WebSocketPolicy policy,
247             Promise!(WebSocketConnection) promise,
248             ClientHttpHandler upgradeHandler, IncomingFrames incomingFrames) {
249         assert(HttpMethod.GET.asString() == request.getMethod(),
250                 "The method of the request MUST be GET in the websocket handshake.");
251 
252         assert(policy.getBehavior() == WebSocketBehavior.CLIENT,
253                 "The websocket behavior MUST be client");
254 
255         request.getFields().put(HttpHeader.SEC_WEBSOCKET_VERSION,
256                 WebSocketConstants.SPEC_VERSION.to!string());
257         request.getFields().put(HttpHeader.UPGRADE, "websocket");
258         request.getFields().put(HttpHeader.CONNECTION, "Upgrade");
259         request.getFields().put(HttpHeader.SEC_WEBSOCKET_KEY, genRandomKey());
260         webSocketConnectionPromise = promise;
261         this.incomingFrames = incomingFrames;
262         this.policy = policy;
263         send(request, upgradeHandler);
264     }
265 
266     private string genRandomKey() {
267         byte[] bytes = new byte[16];
268         // ThreadLocalRandom.current().nextBytes(bytes);
269         auto rnd = Random(2018);
270         for (int i; i < bytes.length; i++)
271             bytes[i] = cast(byte) uniform(byte.min, byte.max, rnd);
272         return cast(string)(B64Code.encode(bytes));
273     }
274 
275     override HttpOutputStream sendRequestWithContinuation(HttpRequest request, ClientHttpHandler handler) {
276         request.getFields().put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE);
277         HttpOutputStream outputStream = getHttpOutputStream(request, handler);
278         try {
279             outputStream.commit();
280         } catch (IOException e) {
281             errorf("client generates the HTTP message exception", e);
282         }
283         return outputStream;
284     }
285 
286     override void send(HttpRequest request, ClientHttpHandler handler) {
287         try {
288             version (HUNT_HTTP_DEBUG) tracef("client request and does not send data");
289             HttpOutputStream output = getHttpOutputStream(request, handler);
290             output.close();
291         } catch (Exception e) {
292             errorf("client generates the HTTP message exception", e);
293         }
294     }
295 
296     override void send(HttpRequest request, ByteBuffer buffer, ClientHttpHandler handler) {
297         try {
298             HttpOutputStream output = getHttpOutputStream(request, handler);
299             if (buffer !is null) {
300                 output.writeWithContentLength(buffer);
301             }
302         } catch (IOException e) {
303             errorf("client generates the HTTP message exception", e);
304         }
305     }
306 
307     override void send(HttpRequest request, ByteBuffer[] buffers, ClientHttpHandler handler) {
308         try {
309             HttpOutputStream output = getHttpOutputStream(request, handler);
310             if (buffers !is null) {
311                 output.writeWithContentLength(buffers);
312             }
313         } catch (IOException e) {
314             errorf("client generates the HTTP message exception", e);
315         }
316     }
317 
318     override void send(HttpRequest request, Promise!(HttpOutputStream) promise,
319             ClientHttpHandler handler) {
320         promise.succeeded(getHttpOutputStream(request, handler));
321     }
322 
323     override HttpOutputStream getHttpOutputStream(HttpRequest request, ClientHttpHandler handler) {
324         Http1ClientResponseHandler http1ClientResponseHandler = 
325             new Http1ClientResponseHandler(handler);
326         checkWrite(request, http1ClientResponseHandler);
327         http1ClientResponseHandler.outputStream = new Http1ClientRequestOutputStream(this,
328                 wrap.writing.request);
329         return http1ClientResponseHandler.outputStream;
330     }
331 
332     private void checkWrite(HttpRequest request, Http1ClientResponseHandler handler) {
333         assert(request, "The http client request is null.");
334         assert(handler, "The http1 client response handler is null.");
335         assert(isOpen(), "The current connection " ~ getId()
336                 .to!string ~ " has been closed.");
337         assert(!upgradeHttp2Complete, "The current connection " ~ getId()
338                 .to!string ~ " has upgraded HTTP2.");
339         assert(!upgradeWebSocketComplete, "The current connection " ~ getId()
340                 .to!string ~ " has upgraded WebSocket.");
341 
342         if (wrap.writing is null) {
343             wrap.writing = handler;
344             HttpFields headerFields = request.getFields();
345             
346             if(!headerFields.contains(HttpHeader.HOST)) {
347                 headerFields.put(HttpHeader.HOST, request.getURI.getHost());
348             }
349             handler.connection = this;
350             handler.request = request;
351             handler.onReady();
352         } else {
353             throw new WritePendingException("");
354         }
355     }
356 
357     override void close() {
358         if (isOpen()) {
359             super.close();
360         }
361     }
362 
363     // override bool isClosed() {
364     //     return !isOpen();
365     // }
366 
367     // override 
368     bool isOpen() {
369         version (HUNT_HTTP_DEBUG) {
370             tracef("Connection status: isOpen=%s, upgradeHttp2Complete=%s, upgradeWebSocketComplete=%s",
371                     getTcpConnection().isConnected(), upgradeHttp2Complete, upgradeWebSocketComplete);
372         }
373         return getTcpConnection().isConnected() && !upgradeHttp2Complete && !upgradeWebSocketComplete;
374     }
375 
376     bool getUpgradeHttp2Complete() {
377         return upgradeHttp2Complete;
378     }
379 
380     bool getUpgradeWebSocketComplete() {
381         return upgradeWebSocketComplete;
382     }
383 }
384 
385 /**
386 */
387 private class ResponseHandlerWrap : HttpResponseParsingHandler {
388 
389     private Http1ClientResponseHandler writing; // = new AtomicReference<)();
390     private int status;
391     private string reason;
392     private Http1ClientConnection connection;
393 
394     void badMessage(BadMessageException failure) {
395         badMessage(failure.getCode(), failure.getReason());
396     }
397 
398     override void earlyEOF() {
399         Http1ClientResponseHandler h = writing;
400         if (h !is null) {
401             h.earlyEOF();
402         } else {
403             IOUtils.close(connection);
404         }
405 
406         writing = null;
407     }
408 
409     override void parsedHeader(HttpField field) {
410         writing.parsedHeader(field);
411     }
412 
413     override bool headerComplete() {
414         return writing.headerComplete();
415     }
416 
417     override bool content(ByteBuffer item) {
418         return writing.content(item);
419     }
420 
421     override bool contentComplete() {
422         return writing.contentComplete();
423     }
424 
425     override void parsedTrailer(HttpField field) {
426         writing.parsedTrailer(field);
427     }
428 
429     override bool messageComplete() {
430         if (status == 100 && "Continue".equalsIgnoreCase(reason)) {
431             tracef("client received the 100 Continue response");
432             connection.getParser().reset();
433             return true;
434         } else {
435             auto r = writing.messageComplete();
436             writing = null;
437             return r;
438         }
439     }
440 
441     override void badMessage(int status, string reason) {
442         Http1ClientResponseHandler h = writing;
443         writing = null;
444         if (h !is null) {
445             h.badMessage(status, reason);
446         } else {
447             IOUtils.close(connection);
448         }
449     }
450 
451     override int getHeaderCacheSize() {
452         return 1024;
453     }
454 
455     override bool startResponse(HttpVersion ver, int status, string reason) {
456         this.status = status;
457         this.reason = reason;
458         return writing.startResponse(ver, status, reason);
459     }
460 
461 }
462 
463 /** 
464  * 
465  */
466 class Http1ClientRequestOutputStream : AbstractHttp1OutputStream {
467     private Http1ClientConnection connection;
468     private HttpGenerator httpGenerator;
469 
470     private this(Http1ClientConnection connection, HttpRequest request) {
471         super(request, true);
472         this.connection = connection;
473         httpGenerator = new HttpGenerator();
474     }
475 
476 
477     override protected void generateHttpMessageSuccessfully() {
478         version(HUNT_DEBUG) tracef("client session %s generates the HTTP message completely", connection.getId());
479     }
480 
481     override protected void generateHttpMessageExceptionally(HttpGenerator.Result actualResult,
482             HttpGenerator.State actualState, HttpGenerator.Result expectedResult,
483             HttpGenerator.State expectedState) {
484         errorf("http1 generator error, actual: [%s, %s], expected: [%s, %s]",
485                 actualResult, actualState, expectedResult, expectedState);
486         throw new IllegalStateException("client generates http message exception.");
487     }
488 
489     override protected ByteBuffer getHeaderByteBuffer() {
490         return BufferUtils.allocate(connection.getHttpOptions().getMaxRequestHeadLength());
491     }
492 
493     override protected ByteBuffer getTrailerByteBuffer() {
494         return BufferUtils.allocate(connection.getHttpOptions()
495                 .getMaxRequestTrailerLength());
496     }
497 
498     override protected Connection getSession() {
499         return connection.getTcpConnection();
500     }
501 
502     override protected HttpGenerator getHttpGenerator() {
503         return httpGenerator;
504     }
505 }