1 module hunt.http.client.Http2ClientConnection; 2 3 import hunt.http.client.ClientHttpHandler; 4 import hunt.http.client.HttpClientConnection; 5 import hunt.http.client.Http2ClientResponseHandler; 6 import hunt.http.client.Http2ClientSession; 7 8 import hunt.http.codec.http.decode.Parser; 9 import hunt.http.codec.http.encode.Http2Generator; 10 import hunt.http.codec.http.frame; 11 import hunt.http.codec.http.stream; 12 13 import hunt.http.HttpConnection; 14 import hunt.http.HttpHeader; 15 import hunt.http.HttpOptions; 16 import hunt.http.HttpOutputStream; 17 import hunt.http.HttpRequest; 18 import hunt.http.WebSocketConnection; 19 import hunt.http.WebSocketPolicy; 20 import hunt.http.Util; 21 22 import hunt.collection; 23 import hunt.concurrency.Delayed; 24 import hunt.logging; 25 import hunt.net.secure.SecureSession; 26 import hunt.net.Connection; 27 28 import hunt.concurrency.FuturePromise; 29 import hunt.concurrency.Promise; 30 import hunt.Exceptions; 31 import hunt.util.Common; 32 import hunt.util.Runnable; 33 34 import core.time; 35 import std.conv; 36 37 alias SessionListener = StreamSession.Listener; 38 39 /** 40 * 41 */ 42 class Http2ClientConnection : AbstractHttp2Connection , HttpClientConnection { 43 void initialize(HttpOptions config, Promise!(HttpClientConnection) promise, 44 SessionListener listener) { 45 Map!(int, int) settings = listener.onPreface(getHttp2Session()); 46 if (settings is null) { 47 settings = Collections.emptyMap!(int, int)(); 48 } 49 PrefaceFrame prefaceFrame = new PrefaceFrame(); 50 SettingsFrame settingsFrame = new SettingsFrame(settings, false); 51 SessionSPI sessionSPI = getSessionSPI(); 52 int windowDelta = config.getInitialSessionRecvWindow() - FlowControlStrategy.DEFAULT_WINDOW_SIZE; 53 Callback callback = new class NoopCallback { 54 55 override 56 void succeeded() { 57 promise.succeeded(this.outer); 58 } 59 60 override 61 void failed(Exception x) { 62 this.outer.close(); 63 promise.failed(x); 64 } 65 }; 66 67 if (windowDelta > 0) { 68 sessionSPI.updateRecvWindow(windowDelta); 69 sessionSPI.frames(null, callback, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)); 70 } else { 71 sessionSPI.frames(null, callback, prefaceFrame, settingsFrame); 72 } 73 74 executor = CommonUtil.scheduler(); 75 executor.setRemoveOnCancelPolicy(true); 76 ScheduledFuture!(void) pingFuture = executor.scheduleWithFixedDelay(new class Runnable { 77 void run() { 78 PingFrame pingFrame = new PingFrame(false); 79 80 getHttp2Session().ping(pingFrame, new class NoopCallback { 81 override void succeeded() { 82 version(HUNT_HTTP_DEBUG) infof("The session %s sent ping frame success", getId()); 83 } 84 85 override void failed(Exception x) { 86 debug warningf("the session %s sends ping frame failure. %s", getId(), x.msg); 87 version(HUNT_HTTP_DEBUG) warning(x); 88 } 89 }); 90 } 91 }, 92 93 msecs(config.getHttp2PingInterval()), 94 msecs(config.getHttp2PingInterval())); 95 96 onClose( (c) { pingFuture.cancel(false); }); 97 } 98 99 this(HttpOptions config, Connection tcpSession, SessionListener listener) { 100 super(config, tcpSession, listener); 101 } 102 103 override 104 protected Http2Session initHttp2Session(HttpOptions config, FlowControlStrategy flowControl, 105 SessionListener listener) { 106 return new Http2ClientSession(null, this._tcpSession, this.generator, listener, flowControl, config.getStreamIdleTimeout()); 107 } 108 109 override 110 protected Parser initParser(HttpOptions config) { 111 return new Parser(http2Session, config.getMaxDynamicTableSize(), config.getMaxRequestHeadLength()); 112 } 113 114 115 override 116 HttpConnectionType getConnectionType() { 117 return super.getConnectionType(); 118 } 119 120 // override 121 // bool isSecured() { 122 // return super.isSecured(); 123 // } 124 125 126 Parser getParser() { 127 return parser; 128 } 129 130 Http2Generator getGenerator() { 131 return generator; 132 } 133 134 SessionSPI getSessionSPI() { 135 return http2Session; 136 } 137 138 override 139 void send(HttpRequest request, ClientHttpHandler handler) { 140 Promise!(HttpOutputStream) promise = new class Promise!(HttpOutputStream) { 141 142 bool succeeded(HttpOutputStream output) { 143 try { 144 output.close(); 145 return true; 146 } catch (IOException e) { 147 errorf("write data unsuccessfully", e); 148 return false; 149 } 150 151 } 152 153 bool failed(Throwable x) { 154 errorf("write data unsuccessfully", x); 155 return true; 156 } 157 158 string id() { return "HttpOutputStream close"; } 159 }; 160 161 this.request(request, true, promise, handler); 162 } 163 164 override 165 void send(HttpRequest request, ByteBuffer buffer, ClientHttpHandler handler) { 166 send(request, [buffer], handler); 167 } 168 169 override 170 void send(HttpRequest request, ByteBuffer[] buffers, ClientHttpHandler handler) { 171 long contentLength = BufferUtils.remaining(buffers); 172 request.getFields().put(HttpHeader.CONTENT_LENGTH, contentLength.to!string); 173 174 Promise!(HttpOutputStream) promise = new class Promise!(HttpOutputStream) { 175 176 bool succeeded(HttpOutputStream output) { 177 try { 178 output.writeWithContentLength(buffers); 179 return true; 180 } catch (IOException e) { 181 errorf("write data unsuccessfully", e); 182 return false; 183 } 184 } 185 186 bool failed(Throwable x) { 187 errorf("write data unsuccessfully", x); 188 return true; 189 } 190 191 string id() { return "writeWithContentLength"; } 192 }; 193 194 send(request, promise, handler); 195 } 196 197 override 198 HttpOutputStream sendRequestWithContinuation(HttpRequest request, ClientHttpHandler handler) { 199 request.getFields().put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE); 200 return getHttpOutputStream(request, handler); 201 } 202 203 override 204 HttpOutputStream getHttpOutputStream(HttpRequest request, ClientHttpHandler handler) { 205 FuturePromise!(HttpOutputStream) promise = new FuturePromise!(HttpOutputStream)(); 206 send(request, promise, handler); 207 try { 208 return promise.get(); 209 } catch (Exception e) { 210 errorf("get http output stream unsuccessfully", e); 211 return null; 212 } 213 } 214 215 override 216 void send(HttpRequest request, Promise!(HttpOutputStream) promise, ClientHttpHandler handler) { 217 this.request(request, false, promise, handler); 218 } 219 220 void request(HttpRequest request, bool endStream, 221 Promise!(HttpOutputStream) promise, 222 ClientHttpHandler handler) { 223 http2Session.newStream(new HeadersFrame(request, null, endStream), 224 new Http2ClientResponseHandler.ClientStreamPromise(request, promise), 225 new Http2ClientResponseHandler(request, handler, this)); 226 } 227 228 override 229 void upgradeHttp2(HttpRequest request, SettingsFrame settings, Promise!(HttpClientConnection) promise, 230 ClientHttpHandler upgradeHandler, 231 ClientHttpHandler http2ResponseHandler) { 232 throw new CommonRuntimeException("The current connection version is http2, it does not need to upgrading."); 233 } 234 235 override 236 void upgradeWebSocket(HttpRequest request, WebSocketPolicy policy, Promise!(WebSocketConnection) promise, 237 ClientHttpHandler upgradeHandler, IncomingFrames incomingFrames) { 238 throw new CommonRuntimeException("The current connection version is http2, it can not upgrade WebSocket."); 239 } 240 241 }