1 module hunt.http.client.Http1ClientConnection; 2 3 import hunt.http.client.ClientHttpHandler; 4 import hunt.http.client.ClientHttp2SessionListener; 5 import hunt.http.client.HttpClientConnection; 6 import hunt.http.client.Http1ClientResponseHandler; 7 import hunt.http.client.Http2ClientConnection; 8 import hunt.http.client.Http2ClientResponseHandler; 9 import hunt.http.client.Http2ClientSession; 10 11 import hunt.http.codec.http.decode.HttpParser; 12 import hunt.http.codec.http.encode.HttpGenerator; 13 import hunt.http.codec.http.frame.SettingsFrame; 14 import hunt.http.codec.http.model; 15 import hunt.http.codec.http.stream; 16 import hunt.http.codec.websocket.model.IncomingFrames; 17 import hunt.http.codec.websocket.model.common; 18 import hunt.http.codec.websocket.stream.WebSocketConnection; 19 import hunt.http.codec.websocket.stream.WebSocketPolicy; 20 import hunt.http.codec.websocket.stream.WebSocketConnectionImpl; 21 22 import hunt.net.ConnectionType; 23 import hunt.net.secure.SecureSession; 24 import hunt.net.Session; 25 26 // import hunt.util.Assert; 27 import hunt.container; 28 import hunt.io; 29 import hunt.lang.exception; 30 import hunt.logging; 31 import hunt.string; 32 import hunt.util.codec; 33 import hunt.util.concurrent.Promise; 34 35 import std.array; 36 import std.base64; 37 import std.conv; 38 import std.random; 39 import std.socket; 40 41 alias HttpProtocol = hunt.http.codec.http.model.Protocol.Protocol; 42 alias SessionListener = StreamSession.Listener; 43 44 // import hunt.http.codec.websocket.model.WebSocketConstants.SPEC_VERSION; 45 46 /** 47 */ 48 class Http1ClientConnection : AbstractHttp1Connection, HttpClientConnection { 49 50 private Promise!(WebSocketConnection) webSocketConnectionPromise; 51 private IncomingFrames incomingFrames; 52 private WebSocketPolicy policy; 53 private Promise!(HttpClientConnection) http2ConnectionPromise; 54 private Http2ClientConnection http2Connection; 55 private ClientHttp2SessionListener http2SessionListener; 56 private bool upgradeHttp2Complete = false; // new bool(false); 57 private bool upgradeWebSocketComplete = false; // new bool(false); 58 private ResponseHandlerWrap wrap; 59 60 this(Http2Configuration config, TcpSession tcpSession, SecureSession secureSession) { 61 this(config, secureSession, tcpSession, new ResponseHandlerWrap()); 62 } 63 64 private this(Http2Configuration config, SecureSession secureSession, 65 TcpSession tcpSession, ResponseHandler responseHandler) { 66 67 super(config, secureSession, tcpSession, null, responseHandler); 68 wrap = cast(ResponseHandlerWrap) responseHandler; 69 wrap.connection = this; 70 } 71 72 override protected HttpParser initHttpParser(Http2Configuration config, 73 HttpRequestHandler requestHandler, ResponseHandler responseHandler) { 74 return new HttpParser(responseHandler, config.getMaxRequestHeadLength()); 75 } 76 77 override ConnectionType getConnectionType() { 78 return ConnectionType.HTTP1; 79 } 80 81 override bool isEncrypted() { 82 return super.isEncrypted(); 83 } 84 85 HttpParser getParser() { 86 return parser; 87 } 88 89 Http2Configuration getHttp2Configuration() { 90 return config; 91 } 92 93 // dfmt off 94 override void upgradeHttp2(Request request, SettingsFrame settings, 95 Promise!(HttpClientConnection) promise, ClientHttpHandler upgradeHandler, 96 ClientHttpHandler http2ResponseHandler) { 97 98 Promise!(Stream) initStream = new Http2ClientResponseHandler.ClientStreamPromise(request, 99 new class DefaultPromise!(HttpOutputStream) { 100 101 override 102 void failed(Exception x) { 103 errorf("Create client output stream exception", x); 104 } 105 }); 106 Stream.Listener initStreamListener = new Http2ClientResponseHandler(request, http2ResponseHandler, this); 107 ClientHttp2SessionListener listener = new class ClientHttp2SessionListener { 108 109 override 110 Map!(int, int) onPreface(StreamSession session) { 111 return settings.getSettings(); 112 } 113 114 }; 115 upgradeHttp2(request, settings, promise, initStream, initStreamListener, listener, upgradeHandler); 116 } 117 118 void upgradeHttp2(Request request, SettingsFrame settings, 119 Promise!(HttpClientConnection) promise, Promise!(Stream) initStream, 120 Stream.Listener initStreamListener, 121 ClientHttp2SessionListener listener, ClientHttpHandler handler) { 122 if (isEncrypted()) { 123 throw new IllegalStateException("The TLS TCP connection must use ALPN to upgrade HTTP2"); 124 } 125 126 this.http2ConnectionPromise = promise; 127 this.http2SessionListener = listener; 128 http2Connection = new class Http2ClientConnection { 129 this() { 130 super(getHttp2Configuration(), 131 getTcpSession(), null, http2SessionListener); 132 } 133 override 134 protected Http2Session initHttp2Session(Http2Configuration config, FlowControlStrategy flowControl, 135 StreamSession.Listener listener) { 136 return Http2ClientSession.initSessionForUpgradingHTTP2(null, this.tcpSession, generator, 137 listener, flowControl, 3, config.getStreamIdleTimeout(), initStream, 138 initStreamListener); 139 } 140 }; 141 142 // generate http2 upgrading headers 143 request.getFields().add(new HttpField(HttpHeader.CONNECTION, "Upgrade, HTTP2-Settings")); 144 request.getFields().add(new HttpField(HttpHeader.UPGRADE, "h2c")); 145 if (settings !is null) { 146 List!(ByteBuffer) byteBuffers = http2Generator.control(settings); 147 if (byteBuffers !is null && byteBuffers.size() > 0) { 148 try { 149 // ByteArrayOutputStream ot = new ByteArrayOutputStream(); 150 // foreach (ByteBuffer buffer ; byteBuffers) { 151 // ot.write(BufferUtils.toArray(buffer)); 152 // } 153 Appender!(byte[]) ot; 154 foreach (ByteBuffer buffer; byteBuffers) { 155 byte[] bufferArray = BufferUtils.toArray(buffer); 156 // writeln("before1:\t" ~ TypeUtils.toHexString(bufferArray)); 157 // writefln("before1:\t%(%02X %)" , bufferArray); 158 ot.put(bufferArray); 159 } 160 byte[] settingsFrame = ot.data; //ot.toByteArray(); 161 byte[] settingsPayload = new byte[settingsFrame.length - 9]; 162 // System.arraycopy(settingsFrame, 9, settingsPayload, 0, settingsPayload.length); 163 settingsPayload[0 .. settingsPayload.length] = settingsFrame[9 .. 9 + settingsPayload.length]; 164 165 request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, 166 // Base64Utils.encodeToUrlSafeString(settingsPayload) 167 Base64URL.encode(settingsPayload))); 168 } catch (IOException e) { 169 errorf("generate http2 upgrading settings exception", e); 170 } 171 } else { 172 request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, "")); 173 } 174 } else { 175 request.getFields().add(new HttpField(HttpHeader.HTTP2_SETTINGS, "")); 176 } 177 178 send(request, handler); 179 } 180 // dfmt on 181 182 bool upgradeProtocolComplete(HttpRequest request, HttpResponse response) { 183 switch (ProtocolHelper.from(response)) { 184 case HttpProtocol.H2: { 185 if (http2ConnectionPromise !is null 186 && http2SessionListener !is null && http2Connection !is null) { 187 upgradeHttp2Complete = true; 188 getTcpSession().attachObject(http2Connection); 189 http2SessionListener.setConnection(http2Connection); 190 http2Connection.initialize(getHttp2Configuration(), 191 http2ConnectionPromise, http2SessionListener); 192 return true; 193 } else { 194 resetUpgradeProtocol(); 195 return false; 196 } 197 } 198 case HttpProtocol.WEB_SOCKET: { 199 if (webSocketConnectionPromise !is null && incomingFrames !is null && policy !is null) { 200 upgradeWebSocketComplete = true; 201 WebSocketConnection webSocketConnection = new WebSocketConnectionImpl(secureSession, 202 tcpSession, incomingFrames, policy, request, response, config); 203 getTcpSession().attachObject(cast(Object) webSocketConnection); 204 webSocketConnectionPromise.succeeded(webSocketConnection); 205 return true; 206 } else { 207 resetUpgradeProtocol(); 208 return false; 209 } 210 } 211 default: 212 resetUpgradeProtocol(); 213 return false; 214 } 215 } 216 217 private void resetUpgradeProtocol() { 218 if (http2ConnectionPromise !is null) { 219 http2ConnectionPromise.failed(new IllegalStateException("upgrade h2 failed")); 220 http2ConnectionPromise = null; 221 } 222 http2SessionListener = null; 223 http2Connection = null; 224 if (webSocketConnectionPromise !is null) { 225 webSocketConnectionPromise.failed( 226 new IllegalStateException("The websocket handshake failed")); 227 webSocketConnectionPromise = null; 228 } 229 incomingFrames = null; 230 policy = null; 231 } 232 233 override void upgradeWebSocket(Request request, WebSocketPolicy policy, 234 Promise!(WebSocketConnection) promise, 235 ClientHttpHandler upgradeHandler, IncomingFrames incomingFrames) { 236 assert(HttpMethod.GET.asString() == request.getMethod(), 237 "The method of the request MUST be GET in the websocket handshake."); 238 239 assert(policy.getBehavior() == WebSocketBehavior.CLIENT, 240 "The websocket behavior MUST be client"); 241 242 request.getFields().put(HttpHeader.SEC_WEBSOCKET_VERSION, 243 WebSocketConstants.SPEC_VERSION.to!string()); 244 request.getFields().put(HttpHeader.UPGRADE, "websocket"); 245 request.getFields().put(HttpHeader.CONNECTION, "Upgrade"); 246 request.getFields().put(HttpHeader.SEC_WEBSOCKET_KEY, genRandomKey()); 247 webSocketConnectionPromise = promise; 248 this.incomingFrames = incomingFrames; 249 this.policy = policy; 250 send(request, upgradeHandler); 251 } 252 253 private string genRandomKey() { 254 byte[] bytes = new byte[16]; 255 // ThreadLocalRandom.current().nextBytes(bytes); 256 auto rnd = Random(2018); 257 for (int i; i < bytes.length; i++) 258 bytes[i] = cast(byte) uniform(byte.min, byte.max, rnd); 259 return cast(string)(B64Code.encode(bytes)); 260 } 261 262 override HttpOutputStream sendRequestWithContinuation(Request request, ClientHttpHandler handler) { 263 request.getFields().put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE); 264 HttpOutputStream outputStream = getHttpOutputStream(request, handler); 265 try { 266 outputStream.commit(); 267 } catch (IOException e) { 268 errorf("client generates the HTTP message exception", e); 269 } 270 return outputStream; 271 } 272 273 override void send(Request request, ClientHttpHandler handler) { 274 try { 275 tracef("client request and does not send data"); 276 HttpOutputStream output = getHttpOutputStream(request, handler); 277 output.close(); 278 } catch (Exception e) { 279 errorf("client generates the HTTP message exception", e); 280 } 281 } 282 283 override void send(Request request, ByteBuffer buffer, ClientHttpHandler handler) { 284 try { 285 HttpOutputStream output = getHttpOutputStream(request, handler); 286 if (buffer !is null) { 287 output.writeWithContentLength(buffer); 288 } 289 } catch (IOException e) { 290 errorf("client generates the HTTP message exception", e); 291 } 292 } 293 294 override void send(Request request, ByteBuffer[] buffers, ClientHttpHandler handler) { 295 try { 296 HttpOutputStream output = getHttpOutputStream(request, handler); 297 if (buffers !is null) { 298 output.writeWithContentLength(buffers); 299 } 300 } catch (IOException e) { 301 errorf("client generates the HTTP message exception", e); 302 } 303 } 304 305 override HttpOutputStream getHttpOutputStream(Request request, ClientHttpHandler handler) { 306 Http1ClientResponseHandler http1ClientResponseHandler = new Http1ClientResponseHandler( 307 handler); 308 checkWrite(request, http1ClientResponseHandler); 309 http1ClientResponseHandler.outputStream = new Http1ClientRequestOutputStream(this, 310 wrap.writing.request); 311 return http1ClientResponseHandler.outputStream; 312 } 313 314 override void send(Request request, Promise!(HttpOutputStream) promise, 315 ClientHttpHandler handler) { 316 promise.succeeded(getHttpOutputStream(request, handler)); 317 } 318 319 private void checkWrite(Request request, Http1ClientResponseHandler handler) { 320 assert(request, "The http client request is null."); 321 assert(handler, "The http1 client response handler is null."); 322 assert(isOpen(), "The current connection " ~ tcpSession.getSessionId() 323 .to!string ~ " has been closed."); 324 assert(!upgradeHttp2Complete, "The current connection " ~ tcpSession.getSessionId() 325 .to!string ~ " has upgraded HTTP2."); 326 assert(!upgradeWebSocketComplete, "The current connection " ~ tcpSession.getSessionId() 327 .to!string ~ " has upgraded WebSocket."); 328 329 if (wrap.writing is null) { 330 wrap.writing = handler; 331 request.getFields().put(HttpHeader.HOST, tcpSession.getRemoteAddress().toAddrString()); 332 handler.connection = this; 333 handler.request = request; 334 } else { 335 throw new WritePendingException(""); 336 } 337 } 338 339 override void close() { 340 if (isOpen()) { 341 super.close(); 342 } 343 } 344 345 override bool isClosed() { 346 return !isOpen(); 347 } 348 349 override bool isOpen() { 350 version (HUNT_DEBUG) { 351 tracef("Connection status: isOpen=%s, upgradeHttp2Complete=%s, upgradeWebSocketComplete=%s", 352 tcpSession.isOpen(), upgradeHttp2Complete, upgradeWebSocketComplete); 353 } 354 return tcpSession.isOpen() && !upgradeHttp2Complete && !upgradeWebSocketComplete; 355 } 356 357 bool getUpgradeHttp2Complete() { 358 return upgradeHttp2Complete; 359 } 360 361 bool getUpgradeWebSocketComplete() { 362 return upgradeWebSocketComplete; 363 } 364 } 365 366 /** 367 */ 368 private class ResponseHandlerWrap : ResponseHandler { 369 370 private Http1ClientResponseHandler writing; // = new AtomicReference<)(); 371 private int status; 372 private string reason; 373 private Http1ClientConnection connection; 374 375 void badMessage(BadMessageException failure) { 376 badMessage(failure.getCode(), failure.getReason()); 377 } 378 379 override void earlyEOF() { 380 Http1ClientResponseHandler h = writing; 381 if (h !is null) { 382 h.earlyEOF(); 383 } else { 384 IOUtils.close(connection); 385 } 386 387 writing = null; 388 } 389 390 override void parsedHeader(HttpField field) { 391 writing.parsedHeader(field); 392 } 393 394 override bool headerComplete() { 395 return writing.headerComplete(); 396 } 397 398 override bool content(ByteBuffer item) { 399 return writing.content(item); 400 } 401 402 override bool contentComplete() { 403 return writing.contentComplete(); 404 } 405 406 override void parsedTrailer(HttpField field) { 407 writing.parsedTrailer(field); 408 } 409 410 override bool messageComplete() { 411 if (status == 100 && "Continue".equalsIgnoreCase(reason)) { 412 tracef("client received the 100 Continue response"); 413 connection.getParser().reset(); 414 return true; 415 } else { 416 auto r = writing.messageComplete(); 417 writing = null; 418 return r; 419 } 420 } 421 422 override void badMessage(int status, string reason) { 423 Http1ClientResponseHandler h = writing; 424 writing = null; 425 if (h !is null) { 426 h.badMessage(status, reason); 427 } else { 428 IOUtils.close(connection); 429 } 430 } 431 432 override int getHeaderCacheSize() { 433 return 1024; 434 } 435 436 override bool startResponse(HttpVersion ver, int status, string reason) { 437 this.status = status; 438 this.reason = reason; 439 return writing.startResponse(ver, status, reason); 440 } 441 442 } 443 444 /** 445 */ 446 class Http1ClientRequestOutputStream : AbstractHttp1OutputStream { 447 448 private Http1ClientConnection connection; 449 private HttpGenerator httpGenerator; 450 451 private this(Http1ClientConnection connection, Request request) { 452 super(request, true); 453 this.connection = connection; 454 httpGenerator = new HttpGenerator(); 455 } 456 457 override protected void generateHttpMessageSuccessfully() { 458 tracef("client session %s generates the HTTP message completely", connection.getSessionId()); 459 } 460 461 override protected void generateHttpMessageExceptionally(HttpGenerator.Result actualResult, 462 HttpGenerator.State actualState, HttpGenerator.Result expectedResult, 463 HttpGenerator.State expectedState) { 464 errorf("http1 generator error, actual: [%s, %s], expected: [%s, %s]", 465 actualResult, actualState, expectedResult, expectedState); 466 throw new IllegalStateException("client generates http message exception."); 467 } 468 469 override protected ByteBuffer getHeaderByteBuffer() { 470 return BufferUtils.allocate(connection.getHttp2Configuration().getMaxRequestHeadLength()); 471 } 472 473 override protected ByteBuffer getTrailerByteBuffer() { 474 return BufferUtils.allocate(connection.getHttp2Configuration() 475 .getMaxRequestTrailerLength()); 476 } 477 478 override protected TcpSession getSession() { 479 return connection.getTcpSession(); 480 } 481 482 override protected HttpGenerator getHttpGenerator() { 483 return httpGenerator; 484 } 485 }