1 module hunt.http.client.Http1ClientConnection; 2 3 import hunt.http.client.ClientHttpHandler; 4 import hunt.http.client.ClientHttp2SessionListener; 5 import hunt.http.client.HttpClientConnection; 6 import hunt.http.client.Http1ClientResponseHandler; 7 import hunt.http.client.Http2ClientConnection; 8 import hunt.http.client.Http2ClientResponseHandler; 9 import hunt.http.client.Http2ClientSession; 10 11 import hunt.http.codec.http.decode.HttpParser; 12 import hunt.http.codec.http.encode.HttpGenerator; 13 import hunt.http.codec.http.frame.SettingsFrame; 14 import hunt.http.codec.http.model; 15 import hunt.http.codec.http.stream; 16 import hunt.http.WebSocketCommon; 17 import hunt.http.WebSocketConnection; 18 import hunt.http.codec.websocket.stream.IOState; 19 import hunt.http.codec.websocket.stream.WebSocketConnectionImpl; 20 21 import hunt.http.HttpConnection; 22 import hunt.http.HttpField; 23 import hunt.http.HttpFields; 24 import hunt.http.HttpHeader; 25 import hunt.http.HttpMethod; 26 import hunt.http.HttpOptions; 27 import hunt.http.HttpOutputStream; 28 import hunt.http.HttpRequest; 29 import hunt.http.HttpResponse; 30 import hunt.http.HttpVersion; 31 import hunt.http.WebSocketPolicy; 32 33 import hunt.net.secure.SecureSession; 34 import hunt.net.Connection; 35 36 import hunt.collection; 37 import hunt.concurrency.Promise; 38 import hunt.stream; 39 import hunt.Exceptions; 40 import hunt.logging; 41 import hunt.text.Common; 42 import hunt.text.Codec; 43 44 import std.array; 45 import std.base64; 46 import std.conv; 47 import std.random; 48 import std.socket; 49 50 alias HttpProtocol = hunt.http.codec.http.model.Protocol.Protocol; 51 alias SessionListener = StreamSession.Listener; 52 53 // import hunt.http.codec.websocket.model.WebSocketConstants.SPEC_VERSION; 54 55 /** 56 */ 57 class Http1ClientConnection : AbstractHttp1Connection, HttpClientConnection { 58 59 private Promise!(WebSocketConnection) webSocketConnectionPromise; 60 private IncomingFrames incomingFrames; 61 private WebSocketPolicy policy; 62 private Promise!(HttpClientConnection) http2ConnectionPromise; 63 private Http2ClientConnection http2Connection; 64 private ClientHttp2SessionListener http2SessionListener; 65 private bool upgradeHttp2Complete = false; 66 private bool upgradeWebSocketComplete = false; 67 private ResponseHandlerWrap wrap; 68 69 this(HttpOptions config, Connection tcpSession) { // , SecureSession secureSession 70 this(config, tcpSession, new ResponseHandlerWrap()); // secureSession, 71 } 72 73 private this(HttpOptions config, 74 Connection tcpSession, HttpResponseParsingHandler responseHandler) { 75 76 super(config, tcpSession, null, responseHandler); 77 wrap = cast(ResponseHandlerWrap) responseHandler; 78 wrap.connection = this; 79 } 80 81 override protected HttpParser initHttpParser(HttpOptions config, 82 HttpRequestParsingHandler requestHandler, HttpResponseParsingHandler responseHandler) { 83 return new HttpParser(responseHandler, config.getMaxRequestHeadLength()); 84 } 85 86 override HttpConnectionType getConnectionType() { 87 return HttpConnectionType.HTTP1; 88 } 89 90 // override bool isSecured() { 91 // return super.isSecured(); 92 // } 93 94 HttpParser getParser() { 95 return parser; 96 } 97 98 HttpOptions getHttpOptions() { 99 return config; 100 } 101 102 // dfmt off 103 override void upgradeHttp2(HttpRequest request, SettingsFrame settings, 104 Promise!(HttpClientConnection) promise, ClientHttpHandler upgradeHandler, 105 ClientHttpHandler http2ResponseHandler) { 106 107 Promise!(Stream) initStream = new Http2ClientResponseHandler.ClientStreamPromise(request, 108 new class DefaultPromise!(HttpOutputStream) { 109 110 override 111 bool failed(Throwable x) { 112 errorf("Create client output stream exception", x); 113 return true; 114 } 115 }); 116 117 Stream.Listener initStreamListener = new Http2ClientResponseHandler(request, http2ResponseHandler, this); 118 ClientHttp2SessionListener listener = new class ClientHttp2SessionListener { 119 120 override 121 Map!(int, int) onPreface(StreamSession session) { 122 return settings.getSettings(); 123 } 124 125 }; 126 upgradeHttp2(request, settings, promise, initStream, initStreamListener, listener, upgradeHandler); 127 } 128 129 void upgradeHttp2(HttpRequest request, SettingsFrame settings, 130 Promise!(HttpClientConnection) promise, Promise!(Stream) initStream, 131 Stream.Listener initStreamListener, 132 ClientHttp2SessionListener listener, ClientHttpHandler handler) { 133 if (isSecured()) { 134 throw new IllegalStateException("The TLS TCP connection must use ALPN to upgrade HTTP2"); 135 } 136 137 this.http2ConnectionPromise = promise; 138 this.http2SessionListener = listener; 139 http2Connection = new class Http2ClientConnection { 140 this() { 141 super(getHttpOptions(), this.outer.getTcpConnection(), http2SessionListener); 142 } 143 144 override 145 protected Http2Session initHttp2Session(HttpOptions config, FlowControlStrategy flowControl, 146 StreamSession.Listener listener) { 147 return Http2ClientSession.initSessionForUpgradingHTTP2(null, this.outer.getTcpConnection(), generator, 148 listener, flowControl, 3, config.getStreamIdleTimeout(), initStream, 149 initStreamListener); 150 } 151 }; 152 153 // generate http2 upgrading headers 154 request.getFields().add(new HttpField(HttpHeader.CONNECTION, "Upgrade, HTTP2-Settings")); 155 request.getFields().add(new HttpField(HttpHeader.UPGRADE, "h2c")); 156 if (settings !is null) { 157 List!(ByteBuffer) byteBuffers = http2Generator.control(settings); 158 if (byteBuffers !is null && byteBuffers.size() > 0) { 159 try { 160 // ByteArrayOutputStream ot = new ByteArrayOutputStream(); 161 // foreach (ByteBuffer buffer ; byteBuffers) { 162 // ot.write(BufferUtils.toArray(buffer)); 163 // } 164 Appender!(byte[]) ot; 165 foreach (ByteBuffer buffer; byteBuffers) { 166 byte[] bufferArray = BufferUtils.toArray(buffer); 167 // writeln("before1:\t" ~ TypeUtils.toHexString(bufferArray)); 168 // writefln("before1:\t%(%02X %)" , bufferArray); 169 ot.put(bufferArray); 170 } 171 byte[] settingsFrame = ot.data; //ot.toByteArray(); 172 byte[] settingsPayload = new byte[settingsFrame.length - 9]; 173 // System.arraycopy(settingsFrame, 9, settingsPayload, 0, settingsPayload.length); 174 settingsPayload[0 .. settingsPayload.length] = settingsFrame[9 .. 9 + settingsPayload.length]; 175 176 request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, 177 // Base64Utils.encodeToUrlSafeString(settingsPayload) 178 Base64URL.encode(settingsPayload))); 179 } catch (IOException e) { 180 errorf("generate http2 upgrading settings exception", e); 181 } 182 } else { 183 request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, "")); 184 } 185 } else { 186 request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, "")); 187 } 188 189 send(request, handler); 190 } 191 // dfmt on 192 193 bool upgradeProtocolComplete(HttpRequest request, HttpResponse response) { 194 switch (ProtocolHelper.from(response)) { 195 case HttpProtocol.H2: { 196 if (http2ConnectionPromise !is null 197 && http2SessionListener !is null && http2Connection !is null) { 198 upgradeHttp2Complete = true; 199 // tcpSession.attachObject(http2Connection); 200 setAttribute(HttpConnection.NAME, http2Connection); 201 http2SessionListener.setConnection(http2Connection); 202 http2Connection.initialize(getHttpOptions(), 203 http2ConnectionPromise, http2SessionListener); 204 return true; 205 } else { 206 resetUpgradeProtocol(); 207 return false; 208 } 209 } 210 case HttpProtocol.WEB_SOCKET: { 211 if (webSocketConnectionPromise !is null && incomingFrames !is null && policy !is null) { 212 upgradeWebSocketComplete = true; 213 WebSocketConnection webSocketConnection = new WebSocketConnectionImpl( 214 getTcpConnection(), incomingFrames, policy, request, response, config); 215 // tcpSession.attachObject(cast(Object) webSocketConnection); 216 setAttribute(HttpConnection.NAME, cast(Object)webSocketConnection); 217 IOState state = webSocketConnection.getIOState(); 218 state.onConnected(); 219 state.onOpened(); 220 webSocketConnectionPromise.succeeded(webSocketConnection); 221 return true; 222 } else { 223 resetUpgradeProtocol(); 224 return false; 225 } 226 } 227 default: 228 resetUpgradeProtocol(); 229 return false; 230 } 231 } 232 233 private void resetUpgradeProtocol() { 234 if (http2ConnectionPromise !is null) { 235 http2ConnectionPromise.failed(new IllegalStateException("upgrade h2 failed")); 236 http2ConnectionPromise = null; 237 } 238 http2SessionListener = null; 239 http2Connection = null; 240 if (webSocketConnectionPromise !is null) { 241 webSocketConnectionPromise.failed( 242 new IllegalStateException("The websocket handshake failed")); 243 webSocketConnectionPromise = null; 244 } 245 incomingFrames = null; 246 policy = null; 247 } 248 249 override void upgradeWebSocket(HttpRequest request, WebSocketPolicy policy, 250 Promise!(WebSocketConnection) promise, 251 ClientHttpHandler upgradeHandler, IncomingFrames incomingFrames) { 252 assert(HttpMethod.GET.asString() == request.getMethod(), 253 "The method of the request MUST be GET in the websocket handshake."); 254 255 assert(policy.getBehavior() == WebSocketBehavior.CLIENT, 256 "The websocket behavior MUST be client"); 257 258 request.getFields().put(HttpHeader.SEC_WEBSOCKET_VERSION, 259 WebSocketConstants.SPEC_VERSION.to!string()); 260 request.getFields().put(HttpHeader.UPGRADE, "websocket"); 261 request.getFields().put(HttpHeader.CONNECTION, "Upgrade"); 262 request.getFields().put(HttpHeader.SEC_WEBSOCKET_KEY, genRandomKey()); 263 webSocketConnectionPromise = promise; 264 this.incomingFrames = incomingFrames; 265 this.policy = policy; 266 send(request, upgradeHandler); 267 } 268 269 private string genRandomKey() { 270 byte[] bytes = new byte[16]; 271 // ThreadLocalRandom.current().nextBytes(bytes); 272 auto rnd = Random(2018); 273 for (int i; i < bytes.length; i++) 274 bytes[i] = cast(byte) uniform(byte.min, byte.max, rnd); 275 return cast(string)(B64Code.encode(bytes)); 276 } 277 278 override HttpOutputStream sendRequestWithContinuation(HttpRequest request, ClientHttpHandler handler) { 279 request.getFields().put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE); 280 HttpOutputStream outputStream = getHttpOutputStream(request, handler); 281 try { 282 outputStream.commit(); 283 } catch (IOException e) { 284 errorf("client generates the HTTP message exception", e); 285 } 286 return outputStream; 287 } 288 289 override void send(HttpRequest request, ClientHttpHandler handler) { 290 try { 291 version (HUNT_HTTP_DEBUG) tracef("client request and does not send data"); 292 HttpOutputStream output = getHttpOutputStream(request, handler); 293 output.close(); 294 } catch (Exception e) { 295 errorf("client generates the HTTP message exception", e); 296 } 297 } 298 299 override void send(HttpRequest request, ByteBuffer buffer, ClientHttpHandler handler) { 300 try { 301 HttpOutputStream output = getHttpOutputStream(request, handler); 302 if (buffer !is null) { 303 output.writeWithContentLength(buffer); 304 } 305 } catch (IOException e) { 306 errorf("client generates the HTTP message exception", e); 307 } 308 } 309 310 override void send(HttpRequest request, ByteBuffer[] buffers, ClientHttpHandler handler) { 311 try { 312 HttpOutputStream output = getHttpOutputStream(request, handler); 313 if (buffers !is null) { 314 output.writeWithContentLength(buffers); 315 } 316 } catch (IOException e) { 317 errorf("client generates the HTTP message exception", e); 318 } 319 } 320 321 override void send(HttpRequest request, Promise!(HttpOutputStream) promise, 322 ClientHttpHandler handler) { 323 promise.succeeded(getHttpOutputStream(request, handler)); 324 } 325 326 override HttpOutputStream getHttpOutputStream(HttpRequest request, ClientHttpHandler handler) { 327 Http1ClientResponseHandler http1ClientResponseHandler = 328 new Http1ClientResponseHandler(handler); 329 checkWrite(request, http1ClientResponseHandler); 330 http1ClientResponseHandler.outputStream = new Http1ClientRequestOutputStream(this, 331 wrap.writing.request); 332 return http1ClientResponseHandler.outputStream; 333 } 334 335 private void checkWrite(HttpRequest request, Http1ClientResponseHandler handler) { 336 assert(request, "The http client request is null."); 337 assert(handler, "The http1 client response handler is null."); 338 assert(isOpen(), "The current connection " ~ getId() 339 .to!string ~ " has been closed."); 340 assert(!upgradeHttp2Complete, "The current connection " ~ getId() 341 .to!string ~ " has upgraded HTTP2."); 342 assert(!upgradeWebSocketComplete, "The current connection " ~ getId() 343 .to!string ~ " has upgraded WebSocket."); 344 345 if (wrap.writing is null) { 346 wrap.writing = handler; 347 HttpFields headerFields = request.getFields(); 348 349 if(!headerFields.contains(HttpHeader.HOST)) { 350 headerFields.put(HttpHeader.HOST, request.getURI.getHost()); 351 } 352 handler.connection = this; 353 handler.request = request; 354 handler.onReady(); 355 } else { 356 throw new WritePendingException(""); 357 } 358 } 359 360 override void close() { 361 if (isOpen()) { 362 super.close(); 363 } 364 } 365 366 // override bool isClosed() { 367 // return !isOpen(); 368 // } 369 370 // override 371 bool isOpen() { 372 version (HUNT_HTTP_DEBUG) { 373 tracef("Connection status: isOpen=%s, upgradeHttp2Complete=%s, upgradeWebSocketComplete=%s", 374 getTcpConnection().isConnected(), upgradeHttp2Complete, upgradeWebSocketComplete); 375 } 376 return getTcpConnection().isConnected() && !upgradeHttp2Complete && !upgradeWebSocketComplete; 377 } 378 379 bool getUpgradeHttp2Complete() { 380 return upgradeHttp2Complete; 381 } 382 383 bool getUpgradeWebSocketComplete() { 384 return upgradeWebSocketComplete; 385 } 386 } 387 388 /** 389 */ 390 private class ResponseHandlerWrap : HttpResponseParsingHandler { 391 392 private Http1ClientResponseHandler writing; // = new AtomicReference<)(); 393 private int status; 394 private string reason; 395 private Http1ClientConnection connection; 396 397 void badMessage(BadMessageException failure) { 398 badMessage(failure.getCode(), failure.getReason()); 399 } 400 401 override void earlyEOF() { 402 Http1ClientResponseHandler h = writing; 403 if (h !is null) { 404 h.earlyEOF(); 405 } else { 406 IOUtils.close(connection); 407 } 408 409 writing = null; 410 } 411 412 override void parsedHeader(HttpField field) { 413 writing.parsedHeader(field); 414 } 415 416 override bool headerComplete() { 417 return writing.headerComplete(); 418 } 419 420 override bool content(ByteBuffer item) { 421 return writing.content(item); 422 } 423 424 override bool contentComplete() { 425 return writing.contentComplete(); 426 } 427 428 override void parsedTrailer(HttpField field) { 429 writing.parsedTrailer(field); 430 } 431 432 override bool messageComplete() { 433 if (status == 100 && "Continue".equalsIgnoreCase(reason)) { 434 tracef("client received the 100 Continue response"); 435 connection.getParser().reset(); 436 return true; 437 } else { 438 auto r = writing.messageComplete(); 439 writing = null; 440 return r; 441 } 442 } 443 444 override void badMessage(int status, string reason) { 445 Http1ClientResponseHandler h = writing; 446 writing = null; 447 if (h !is null) { 448 h.badMessage(status, reason); 449 } else { 450 IOUtils.close(connection); 451 } 452 } 453 454 override int getHeaderCacheSize() { 455 return 1024; 456 } 457 458 override bool startResponse(HttpVersion ver, int status, string reason) { 459 this.status = status; 460 this.reason = reason; 461 return writing.startResponse(ver, status, reason); 462 } 463 464 } 465 466 /** 467 * 468 */ 469 class Http1ClientRequestOutputStream : AbstractHttp1OutputStream { 470 private Http1ClientConnection connection; 471 private HttpGenerator httpGenerator; 472 473 private this(Http1ClientConnection connection, HttpRequest request) { 474 super(request, true); 475 this.connection = connection; 476 httpGenerator = new HttpGenerator(); 477 } 478 479 480 override protected void generateHttpMessageSuccessfully() { 481 version(HUNT_DEBUG) tracef("client session %s generates the HTTP message completely", connection.getId()); 482 } 483 484 override protected void generateHttpMessageExceptionally(HttpGenerator.Result actualResult, 485 HttpGenerator.State actualState, HttpGenerator.Result expectedResult, 486 HttpGenerator.State expectedState) { 487 errorf("http1 generator error, actual: [%s, %s], expected: [%s, %s]", 488 actualResult, actualState, expectedResult, expectedState); 489 throw new IllegalStateException("client generates http message exception."); 490 } 491 492 override protected ByteBuffer getHeaderByteBuffer() { 493 return BufferUtils.allocate(connection.getHttpOptions().getMaxRequestHeadLength()); 494 } 495 496 override protected ByteBuffer getTrailerByteBuffer() { 497 return BufferUtils.allocate(connection.getHttpOptions() 498 .getMaxRequestTrailerLength()); 499 } 500 501 override protected Connection getSession() { 502 return connection.getTcpConnection(); 503 } 504 505 override protected HttpGenerator getHttpGenerator() { 506 return httpGenerator; 507 } 508 }