1 module hunt.http.client.Http1ClientConnection; 2 3 import hunt.http.client.ClientHttpHandler; 4 import hunt.http.client.ClientHttp2SessionListener; 5 import hunt.http.client.HttpClientConnection; 6 import hunt.http.client.Http1ClientResponseHandler; 7 import hunt.http.client.Http2ClientConnection; 8 import hunt.http.client.Http2ClientResponseHandler; 9 import hunt.http.client.Http2ClientSession; 10 11 import hunt.http.codec.http.decode.HttpParser; 12 import hunt.http.codec.http.encode.HttpGenerator; 13 import hunt.http.codec.http.frame.SettingsFrame; 14 import hunt.http.codec.http.model; 15 import hunt.http.codec.http.stream; 16 import hunt.http.WebSocketCommon; 17 import hunt.http.WebSocketConnection; 18 import hunt.http.codec.websocket.stream.IOState; 19 import hunt.http.codec.websocket.stream.WebSocketConnectionImpl; 20 21 import hunt.http.HttpConnection; 22 import hunt.http.HttpField; 23 import hunt.http.HttpFields; 24 import hunt.http.HttpHeader; 25 import hunt.http.HttpMethod; 26 import hunt.http.HttpOptions; 27 import hunt.http.HttpOutputStream; 28 import hunt.http.HttpRequest; 29 import hunt.http.HttpResponse; 30 import hunt.http.HttpVersion; 31 import hunt.http.WebSocketPolicy; 32 33 import hunt.net.secure.SecureSession; 34 import hunt.net.Connection; 35 36 import hunt.collection; 37 import hunt.concurrency.Promise; 38 import hunt.stream; 39 import hunt.Exceptions; 40 import hunt.logging; 41 import hunt.text.Common; 42 import hunt.text.Codec; 43 44 import std.array; 45 import std.base64; 46 import std.conv; 47 import std.random; 48 import std.socket; 49 50 alias HttpProtocol = hunt.http.codec.http.model.Protocol.Protocol; 51 alias SessionListener = StreamSession.Listener; 52 53 // import hunt.http.codec.websocket.model.WebSocketConstants.SPEC_VERSION; 54 55 /** 56 */ 57 class Http1ClientConnection : AbstractHttp1Connection, HttpClientConnection { 58 59 private Promise!(WebSocketConnection) webSocketConnectionPromise; 60 private IncomingFrames incomingFrames; 61 private WebSocketPolicy policy; 62 private Promise!(HttpClientConnection) http2ConnectionPromise; 63 private Http2ClientConnection http2Connection; 64 private ClientHttp2SessionListener http2SessionListener; 65 private bool upgradeHttp2Complete = false; 66 private bool upgradeWebSocketComplete = false; 67 private ResponseHandlerWrap wrap; 68 69 this(HttpOptions config, Connection tcpSession) { // , SecureSession secureSession 70 this(config, tcpSession, new ResponseHandlerWrap()); // secureSession, 71 } 72 73 private this(HttpOptions config, 74 Connection tcpSession, HttpResponseParsingHandler responseHandler) { 75 76 super(config, tcpSession, null, responseHandler); 77 wrap = cast(ResponseHandlerWrap) responseHandler; 78 wrap.connection = this; 79 } 80 81 override protected HttpParser initHttpParser(HttpOptions config, 82 HttpRequestParsingHandler requestHandler, HttpResponseParsingHandler responseHandler) { 83 return new HttpParser(responseHandler, config.getMaxRequestHeadLength()); 84 } 85 86 override HttpConnectionType getConnectionType() { 87 return HttpConnectionType.HTTP1; 88 } 89 90 // override bool isSecured() { 91 // return super.isSecured(); 92 // } 93 94 HttpParser getParser() { 95 return parser; 96 } 97 98 HttpOptions getHttpOptions() { 99 return config; 100 } 101 102 // dfmt off 103 override void upgradeHttp2(HttpRequest request, SettingsFrame settings, 104 Promise!(HttpClientConnection) promise, ClientHttpHandler upgradeHandler, 105 ClientHttpHandler http2ResponseHandler) { 106 107 Promise!(Stream) initStream = new Http2ClientResponseHandler.ClientStreamPromise(request, 108 new class DefaultPromise!(HttpOutputStream) { 109 110 override 111 void failed(Exception x) { 112 errorf("Create client output stream exception", x); 113 } 114 }); 115 Stream.Listener initStreamListener = new Http2ClientResponseHandler(request, http2ResponseHandler, this); 116 ClientHttp2SessionListener listener = new class ClientHttp2SessionListener { 117 118 override 119 Map!(int, int) onPreface(StreamSession session) { 120 return settings.getSettings(); 121 } 122 123 }; 124 upgradeHttp2(request, settings, promise, initStream, initStreamListener, listener, upgradeHandler); 125 } 126 127 void upgradeHttp2(HttpRequest request, SettingsFrame settings, 128 Promise!(HttpClientConnection) promise, Promise!(Stream) initStream, 129 Stream.Listener initStreamListener, 130 ClientHttp2SessionListener listener, ClientHttpHandler handler) { 131 if (isSecured()) { 132 throw new IllegalStateException("The TLS TCP connection must use ALPN to upgrade HTTP2"); 133 } 134 135 this.http2ConnectionPromise = promise; 136 this.http2SessionListener = listener; 137 http2Connection = new class Http2ClientConnection { 138 this() { 139 super(getHttpOptions(), this.outer.getTcpConnection(), http2SessionListener); 140 } 141 override 142 protected Http2Session initHttp2Session(HttpOptions config, FlowControlStrategy flowControl, 143 StreamSession.Listener listener) { 144 return Http2ClientSession.initSessionForUpgradingHTTP2(null, this.outer.getTcpConnection(), generator, 145 listener, flowControl, 3, config.getStreamIdleTimeout(), initStream, 146 initStreamListener); 147 } 148 }; 149 150 // generate http2 upgrading headers 151 request.getFields().add(new HttpField(HttpHeader.CONNECTION, "Upgrade, HTTP2-Settings")); 152 request.getFields().add(new HttpField(HttpHeader.UPGRADE, "h2c")); 153 if (settings !is null) { 154 List!(ByteBuffer) byteBuffers = http2Generator.control(settings); 155 if (byteBuffers !is null && byteBuffers.size() > 0) { 156 try { 157 // ByteArrayOutputStream ot = new ByteArrayOutputStream(); 158 // foreach (ByteBuffer buffer ; byteBuffers) { 159 // ot.write(BufferUtils.toArray(buffer)); 160 // } 161 Appender!(byte[]) ot; 162 foreach (ByteBuffer buffer; byteBuffers) { 163 byte[] bufferArray = BufferUtils.toArray(buffer); 164 // writeln("before1:\t" ~ TypeUtils.toHexString(bufferArray)); 165 // writefln("before1:\t%(%02X %)" , bufferArray); 166 ot.put(bufferArray); 167 } 168 byte[] settingsFrame = ot.data; //ot.toByteArray(); 169 byte[] settingsPayload = new byte[settingsFrame.length - 9]; 170 // System.arraycopy(settingsFrame, 9, settingsPayload, 0, settingsPayload.length); 171 settingsPayload[0 .. settingsPayload.length] = settingsFrame[9 .. 9 + settingsPayload.length]; 172 173 request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, 174 // Base64Utils.encodeToUrlSafeString(settingsPayload) 175 Base64URL.encode(settingsPayload))); 176 } catch (IOException e) { 177 errorf("generate http2 upgrading settings exception", e); 178 } 179 } else { 180 request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, "")); 181 } 182 } else { 183 request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, "")); 184 } 185 186 send(request, handler); 187 } 188 // dfmt on 189 190 bool upgradeProtocolComplete(HttpRequest request, HttpResponse response) { 191 switch (ProtocolHelper.from(response)) { 192 case HttpProtocol.H2: { 193 if (http2ConnectionPromise !is null 194 && http2SessionListener !is null && http2Connection !is null) { 195 upgradeHttp2Complete = true; 196 // tcpSession.attachObject(http2Connection); 197 setAttribute(HttpConnection.NAME, http2Connection); 198 http2SessionListener.setConnection(http2Connection); 199 http2Connection.initialize(getHttpOptions(), 200 http2ConnectionPromise, http2SessionListener); 201 return true; 202 } else { 203 resetUpgradeProtocol(); 204 return false; 205 } 206 } 207 case HttpProtocol.WEB_SOCKET: { 208 if (webSocketConnectionPromise !is null && incomingFrames !is null && policy !is null) { 209 upgradeWebSocketComplete = true; 210 WebSocketConnection webSocketConnection = new WebSocketConnectionImpl( 211 getTcpConnection(), incomingFrames, policy, request, response, config); 212 // tcpSession.attachObject(cast(Object) webSocketConnection); 213 setAttribute(HttpConnection.NAME, cast(Object)webSocketConnection); 214 IOState state = webSocketConnection.getIOState(); 215 state.onConnected(); 216 state.onOpened(); 217 webSocketConnectionPromise.succeeded(webSocketConnection); 218 return true; 219 } else { 220 resetUpgradeProtocol(); 221 return false; 222 } 223 } 224 default: 225 resetUpgradeProtocol(); 226 return false; 227 } 228 } 229 230 private void resetUpgradeProtocol() { 231 if (http2ConnectionPromise !is null) { 232 http2ConnectionPromise.failed(new IllegalStateException("upgrade h2 failed")); 233 http2ConnectionPromise = null; 234 } 235 http2SessionListener = null; 236 http2Connection = null; 237 if (webSocketConnectionPromise !is null) { 238 webSocketConnectionPromise.failed( 239 new IllegalStateException("The websocket handshake failed")); 240 webSocketConnectionPromise = null; 241 } 242 incomingFrames = null; 243 policy = null; 244 } 245 246 override void upgradeWebSocket(HttpRequest request, WebSocketPolicy policy, 247 Promise!(WebSocketConnection) promise, 248 ClientHttpHandler upgradeHandler, IncomingFrames incomingFrames) { 249 assert(HttpMethod.GET.asString() == request.getMethod(), 250 "The method of the request MUST be GET in the websocket handshake."); 251 252 assert(policy.getBehavior() == WebSocketBehavior.CLIENT, 253 "The websocket behavior MUST be client"); 254 255 request.getFields().put(HttpHeader.SEC_WEBSOCKET_VERSION, 256 WebSocketConstants.SPEC_VERSION.to!string()); 257 request.getFields().put(HttpHeader.UPGRADE, "websocket"); 258 request.getFields().put(HttpHeader.CONNECTION, "Upgrade"); 259 request.getFields().put(HttpHeader.SEC_WEBSOCKET_KEY, genRandomKey()); 260 webSocketConnectionPromise = promise; 261 this.incomingFrames = incomingFrames; 262 this.policy = policy; 263 send(request, upgradeHandler); 264 } 265 266 private string genRandomKey() { 267 byte[] bytes = new byte[16]; 268 // ThreadLocalRandom.current().nextBytes(bytes); 269 auto rnd = Random(2018); 270 for (int i; i < bytes.length; i++) 271 bytes[i] = cast(byte) uniform(byte.min, byte.max, rnd); 272 return cast(string)(B64Code.encode(bytes)); 273 } 274 275 override HttpOutputStream sendRequestWithContinuation(HttpRequest request, ClientHttpHandler handler) { 276 request.getFields().put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE); 277 HttpOutputStream outputStream = getHttpOutputStream(request, handler); 278 try { 279 outputStream.commit(); 280 } catch (IOException e) { 281 errorf("client generates the HTTP message exception", e); 282 } 283 return outputStream; 284 } 285 286 override void send(HttpRequest request, ClientHttpHandler handler) { 287 try { 288 version (HUNT_HTTP_DEBUG) tracef("client request and does not send data"); 289 HttpOutputStream output = getHttpOutputStream(request, handler); 290 output.close(); 291 } catch (Exception e) { 292 errorf("client generates the HTTP message exception", e); 293 } 294 } 295 296 override void send(HttpRequest request, ByteBuffer buffer, ClientHttpHandler handler) { 297 try { 298 HttpOutputStream output = getHttpOutputStream(request, handler); 299 if (buffer !is null) { 300 output.writeWithContentLength(buffer); 301 } 302 } catch (IOException e) { 303 errorf("client generates the HTTP message exception", e); 304 } 305 } 306 307 override void send(HttpRequest request, ByteBuffer[] buffers, ClientHttpHandler handler) { 308 try { 309 HttpOutputStream output = getHttpOutputStream(request, handler); 310 if (buffers !is null) { 311 output.writeWithContentLength(buffers); 312 } 313 } catch (IOException e) { 314 errorf("client generates the HTTP message exception", e); 315 } 316 } 317 318 override void send(HttpRequest request, Promise!(HttpOutputStream) promise, 319 ClientHttpHandler handler) { 320 promise.succeeded(getHttpOutputStream(request, handler)); 321 } 322 323 override HttpOutputStream getHttpOutputStream(HttpRequest request, ClientHttpHandler handler) { 324 Http1ClientResponseHandler http1ClientResponseHandler = 325 new Http1ClientResponseHandler(handler); 326 checkWrite(request, http1ClientResponseHandler); 327 http1ClientResponseHandler.outputStream = new Http1ClientRequestOutputStream(this, 328 wrap.writing.request); 329 return http1ClientResponseHandler.outputStream; 330 } 331 332 private void checkWrite(HttpRequest request, Http1ClientResponseHandler handler) { 333 assert(request, "The http client request is null."); 334 assert(handler, "The http1 client response handler is null."); 335 assert(isOpen(), "The current connection " ~ getId() 336 .to!string ~ " has been closed."); 337 assert(!upgradeHttp2Complete, "The current connection " ~ getId() 338 .to!string ~ " has upgraded HTTP2."); 339 assert(!upgradeWebSocketComplete, "The current connection " ~ getId() 340 .to!string ~ " has upgraded WebSocket."); 341 342 if (wrap.writing is null) { 343 wrap.writing = handler; 344 HttpFields headerFields = request.getFields(); 345 346 if(!headerFields.contains(HttpHeader.HOST)) { 347 headerFields.put(HttpHeader.HOST, request.getURI.getHost()); 348 } 349 handler.connection = this; 350 handler.request = request; 351 handler.onReady(); 352 } else { 353 throw new WritePendingException(""); 354 } 355 } 356 357 override void close() { 358 if (isOpen()) { 359 super.close(); 360 } 361 } 362 363 // override bool isClosed() { 364 // return !isOpen(); 365 // } 366 367 // override 368 bool isOpen() { 369 version (HUNT_HTTP_DEBUG) { 370 tracef("Connection status: isOpen=%s, upgradeHttp2Complete=%s, upgradeWebSocketComplete=%s", 371 getTcpConnection().isConnected(), upgradeHttp2Complete, upgradeWebSocketComplete); 372 } 373 return getTcpConnection().isConnected() && !upgradeHttp2Complete && !upgradeWebSocketComplete; 374 } 375 376 bool getUpgradeHttp2Complete() { 377 return upgradeHttp2Complete; 378 } 379 380 bool getUpgradeWebSocketComplete() { 381 return upgradeWebSocketComplete; 382 } 383 } 384 385 /** 386 */ 387 private class ResponseHandlerWrap : HttpResponseParsingHandler { 388 389 private Http1ClientResponseHandler writing; // = new AtomicReference<)(); 390 private int status; 391 private string reason; 392 private Http1ClientConnection connection; 393 394 void badMessage(BadMessageException failure) { 395 badMessage(failure.getCode(), failure.getReason()); 396 } 397 398 override void earlyEOF() { 399 Http1ClientResponseHandler h = writing; 400 if (h !is null) { 401 h.earlyEOF(); 402 } else { 403 IOUtils.close(connection); 404 } 405 406 writing = null; 407 } 408 409 override void parsedHeader(HttpField field) { 410 writing.parsedHeader(field); 411 } 412 413 override bool headerComplete() { 414 return writing.headerComplete(); 415 } 416 417 override bool content(ByteBuffer item) { 418 return writing.content(item); 419 } 420 421 override bool contentComplete() { 422 return writing.contentComplete(); 423 } 424 425 override void parsedTrailer(HttpField field) { 426 writing.parsedTrailer(field); 427 } 428 429 override bool messageComplete() { 430 if (status == 100 && "Continue".equalsIgnoreCase(reason)) { 431 tracef("client received the 100 Continue response"); 432 connection.getParser().reset(); 433 return true; 434 } else { 435 auto r = writing.messageComplete(); 436 writing = null; 437 return r; 438 } 439 } 440 441 override void badMessage(int status, string reason) { 442 Http1ClientResponseHandler h = writing; 443 writing = null; 444 if (h !is null) { 445 h.badMessage(status, reason); 446 } else { 447 IOUtils.close(connection); 448 } 449 } 450 451 override int getHeaderCacheSize() { 452 return 1024; 453 } 454 455 override bool startResponse(HttpVersion ver, int status, string reason) { 456 this.status = status; 457 this.reason = reason; 458 return writing.startResponse(ver, status, reason); 459 } 460 461 } 462 463 /** 464 * 465 */ 466 class Http1ClientRequestOutputStream : AbstractHttp1OutputStream { 467 private Http1ClientConnection connection; 468 private HttpGenerator httpGenerator; 469 470 private this(Http1ClientConnection connection, HttpRequest request) { 471 super(request, true); 472 this.connection = connection; 473 httpGenerator = new HttpGenerator(); 474 } 475 476 477 override protected void generateHttpMessageSuccessfully() { 478 version(HUNT_DEBUG) tracef("client session %s generates the HTTP message completely", connection.getId()); 479 } 480 481 override protected void generateHttpMessageExceptionally(HttpGenerator.Result actualResult, 482 HttpGenerator.State actualState, HttpGenerator.Result expectedResult, 483 HttpGenerator.State expectedState) { 484 errorf("http1 generator error, actual: [%s, %s], expected: [%s, %s]", 485 actualResult, actualState, expectedResult, expectedState); 486 throw new IllegalStateException("client generates http message exception."); 487 } 488 489 override protected ByteBuffer getHeaderByteBuffer() { 490 return BufferUtils.allocate(connection.getHttpOptions().getMaxRequestHeadLength()); 491 } 492 493 override protected ByteBuffer getTrailerByteBuffer() { 494 return BufferUtils.allocate(connection.getHttpOptions() 495 .getMaxRequestTrailerLength()); 496 } 497 498 override protected Connection getSession() { 499 return connection.getTcpConnection(); 500 } 501 502 override protected HttpGenerator getHttpGenerator() { 503 return httpGenerator; 504 } 505 }