1 module hunt.http.client.Http2ClientConnection; 2 3 import hunt.http.client.ClientHttpHandler; 4 import hunt.http.client.HttpClientConnection; 5 import hunt.http.client.Http2ClientResponseHandler; 6 import hunt.http.client.Http2ClientSession; 7 8 import hunt.http.codec.http.decode.Parser; 9 import hunt.http.codec.http.encode.Http2Generator; 10 import hunt.http.codec.http.frame; 11 import hunt.http.codec.http.stream; 12 13 import hunt.http.HttpConnection; 14 import hunt.http.HttpHeader; 15 import hunt.http.HttpOptions; 16 import hunt.http.HttpOutputStream; 17 import hunt.http.HttpRequest; 18 import hunt.http.WebSocketConnection; 19 import hunt.http.WebSocketPolicy; 20 import hunt.http.Util; 21 22 import hunt.collection; 23 import hunt.concurrency.Delayed; 24 import hunt.logging.ConsoleLogger; 25 import hunt.net.secure.SecureSession; 26 import hunt.net.Connection; 27 28 import hunt.concurrency.FuturePromise; 29 import hunt.concurrency.Promise; 30 import hunt.Exceptions; 31 import hunt.util.Common; 32 import hunt.util.Runnable; 33 34 import core.time; 35 import std.conv; 36 37 alias SessionListener = StreamSession.Listener; 38 39 /** 40 * 41 */ 42 class Http2ClientConnection : AbstractHttp2Connection , HttpClientConnection { 43 void initialize(HttpOptions config, Promise!(HttpClientConnection) promise, 44 SessionListener listener) { 45 Map!(int, int) settings = listener.onPreface(getHttp2Session()); 46 if (settings is null) { 47 settings = Collections.emptyMap!(int, int)(); 48 } 49 PrefaceFrame prefaceFrame = new PrefaceFrame(); 50 SettingsFrame settingsFrame = new SettingsFrame(settings, false); 51 SessionSPI sessionSPI = getSessionSPI(); 52 int windowDelta = config.getInitialSessionRecvWindow() - FlowControlStrategy.DEFAULT_WINDOW_SIZE; 53 Callback callback = new class NoopCallback { 54 55 override 56 void succeeded() { 57 promise.succeeded(this.outer); 58 } 59 60 override 61 void failed(Exception x) { 62 this.outer.close(); 63 promise.failed(x); 64 } 65 }; 66 67 if (windowDelta > 0) { 68 sessionSPI.updateRecvWindow(windowDelta); 69 sessionSPI.frames(null, callback, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)); 70 } else { 71 sessionSPI.frames(null, callback, prefaceFrame, settingsFrame); 72 } 73 74 executor = CommonUtil.scheduler(); 75 executor.setRemoveOnCancelPolicy(true); 76 ScheduledFuture!(void) pingFuture = executor.scheduleWithFixedDelay(new class Runnable { 77 void run() { 78 PingFrame pingFrame = new PingFrame(false); 79 80 getHttp2Session().ping(pingFrame, new class NoopCallback { 81 override void succeeded() { 82 version(HUNT_HTTP_DEBUG) infof("The session %s sent ping frame success", getId()); 83 } 84 85 override void failed(Exception x) { 86 debug warningf("the session %s sends ping frame failure. %s", getId(), x.msg); 87 version(HUNT_HTTP_DEBUG) warning(x); 88 } 89 }); 90 } 91 }, 92 93 msecs(config.getHttp2PingInterval()), 94 msecs(config.getHttp2PingInterval())); 95 96 onClose( (c) { pingFuture.cancel(false); }); 97 } 98 99 this(HttpOptions config, Connection tcpSession, SessionListener listener) { 100 super(config, tcpSession, listener); 101 } 102 103 override 104 protected Http2Session initHttp2Session(HttpOptions config, FlowControlStrategy flowControl, 105 SessionListener listener) { 106 return new Http2ClientSession(null, this._tcpSession, this.generator, listener, flowControl, config.getStreamIdleTimeout()); 107 } 108 109 override 110 protected Parser initParser(HttpOptions config) { 111 return new Parser(http2Session, config.getMaxDynamicTableSize(), config.getMaxRequestHeadLength()); 112 } 113 114 115 override 116 HttpConnectionType getConnectionType() { 117 return super.getConnectionType(); 118 } 119 120 // override 121 // bool isSecured() { 122 // return super.isSecured(); 123 // } 124 125 126 Parser getParser() { 127 return parser; 128 } 129 130 Http2Generator getGenerator() { 131 return generator; 132 } 133 134 SessionSPI getSessionSPI() { 135 return http2Session; 136 } 137 138 override 139 void send(HttpRequest request, ClientHttpHandler handler) { 140 Promise!(HttpOutputStream) promise = new class Promise!(HttpOutputStream) { 141 142 void succeeded(HttpOutputStream output) { 143 try { 144 output.close(); 145 } catch (IOException e) { 146 errorf("write data unsuccessfully", e); 147 } 148 149 } 150 151 void failed(Exception x) { 152 errorf("write data unsuccessfully", x); 153 } 154 155 string id() { return "HttpOutputStream close"; } 156 }; 157 158 this.request(request, true, promise, handler); 159 } 160 161 override 162 void send(HttpRequest request, ByteBuffer buffer, ClientHttpHandler handler) { 163 send(request, [buffer], handler); 164 } 165 166 override 167 void send(HttpRequest request, ByteBuffer[] buffers, ClientHttpHandler handler) { 168 long contentLength = BufferUtils.remaining(buffers); 169 request.getFields().put(HttpHeader.CONTENT_LENGTH, contentLength.to!string); 170 171 Promise!(HttpOutputStream) promise = new class Promise!(HttpOutputStream) { 172 173 void succeeded(HttpOutputStream output) { 174 try { 175 output.writeWithContentLength(buffers); 176 } catch (IOException e) { 177 errorf("write data unsuccessfully", e); 178 } 179 } 180 181 void failed(Exception x) { 182 errorf("write data unsuccessfully", x); 183 } 184 185 string id() { return "writeWithContentLength"; } 186 }; 187 188 send(request, promise, handler); 189 } 190 191 override 192 HttpOutputStream sendRequestWithContinuation(HttpRequest request, ClientHttpHandler handler) { 193 request.getFields().put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE); 194 return getHttpOutputStream(request, handler); 195 } 196 197 override 198 HttpOutputStream getHttpOutputStream(HttpRequest request, ClientHttpHandler handler) { 199 FuturePromise!(HttpOutputStream) promise = new FuturePromise!(HttpOutputStream)(); 200 send(request, promise, handler); 201 try { 202 return promise.get(); 203 } catch (Exception e) { 204 errorf("get http output stream unsuccessfully", e); 205 return null; 206 } 207 } 208 209 override 210 void send(HttpRequest request, Promise!(HttpOutputStream) promise, ClientHttpHandler handler) { 211 this.request(request, false, promise, handler); 212 } 213 214 void request(HttpRequest request, bool endStream, 215 Promise!(HttpOutputStream) promise, 216 ClientHttpHandler handler) { 217 http2Session.newStream(new HeadersFrame(request, null, endStream), 218 new Http2ClientResponseHandler.ClientStreamPromise(request, promise), 219 new Http2ClientResponseHandler(request, handler, this)); 220 } 221 222 override 223 void upgradeHttp2(HttpRequest request, SettingsFrame settings, Promise!(HttpClientConnection) promise, 224 ClientHttpHandler upgradeHandler, 225 ClientHttpHandler http2ResponseHandler) { 226 throw new CommonRuntimeException("The current connection version is http2, it does not need to upgrading."); 227 } 228 229 override 230 void upgradeWebSocket(HttpRequest request, WebSocketPolicy policy, Promise!(WebSocketConnection) promise, 231 ClientHttpHandler upgradeHandler, IncomingFrames incomingFrames) { 232 throw new CommonRuntimeException("The current connection version is http2, it can not upgrade WebSocket."); 233 } 234 235 }