1 module hunt.http.client.Http2ClientConnection; 2 3 import hunt.http.client.ClientHttpHandler; 4 import hunt.http.client.HttpClientConnection; 5 import hunt.http.client.Http2ClientResponseHandler; 6 import hunt.http.client.Http2ClientSession; 7 8 import hunt.http.codec.http.decode.Parser; 9 import hunt.http.codec.http.encode.Http2Generator; 10 import hunt.http.codec.http.frame; 11 import hunt.http.codec.http.model.HttpHeader; 12 import hunt.http.codec.http.model.HttpHeaderValue; 13 import hunt.http.codec.http.model.MetaData; 14 import hunt.http.codec.http.stream; 15 import hunt.http.codec.websocket.model.IncomingFrames; 16 import hunt.http.codec.websocket.stream.WebSocketConnection; 17 import hunt.http.codec.websocket.stream.WebSocketPolicy; 18 19 import hunt.container; 20 import hunt.logging; 21 import hunt.net.ConnectionType; 22 import hunt.net.secure.SecureSession; 23 import hunt.net.Session; 24 25 import hunt.util.functional; 26 import hunt.util.concurrent.FuturePromise; 27 import hunt.util.concurrent.Promise; 28 // import hunt.util.concurrent.Scheduler; 29 import hunt.lang.exception; 30 31 import std.conv; 32 33 alias SessionListener = StreamSession.Listener; 34 35 class Http2ClientConnection : AbstractHttp2Connection , HttpClientConnection { 36 void initialize(Http2Configuration config, Promise!(HttpClientConnection) promise, 37 SessionListener listener) { 38 Map!(int, int) settings = listener.onPreface(getHttp2Session()); 39 if (settings is null) { 40 settings = Collections.emptyMap!(int, int)(); 41 } 42 PrefaceFrame prefaceFrame = new PrefaceFrame(); 43 SettingsFrame settingsFrame = new SettingsFrame(settings, false); 44 SessionSPI sessionSPI = getSessionSPI(); 45 int windowDelta = config.getInitialSessionRecvWindow() - FlowControlStrategy.DEFAULT_WINDOW_SIZE; 46 Callback callback = new class NoopCallback { 47 48 override 49 void succeeded() { 50 promise.succeeded(this.outer); 51 } 52 53 override 54 void failed(Exception x) { 55 this.outer.close(); 56 promise.failed(x); 57 } 58 }; 59 60 if (windowDelta > 0) { 61 sessionSPI.updateRecvWindow(windowDelta); 62 sessionSPI.frames(null, callback, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)); 63 } else { 64 sessionSPI.frames(null, callback, prefaceFrame, settingsFrame); 65 } 66 67 // TODO: Tasks pending completion -@Administrator at 2018-7-16 14:41:31 68 // 69 // Scheduler.Future pingFuture = scheduler.scheduleAtFixedRate(() => getHttp2Session().ping(new PingFrame(false), new class Callback { 70 // void succeeded() { 71 // info("The session %s sent ping frame success", getSessionId()); 72 // } 73 74 // void failed(Throwable x) { 75 // warningf("the session %s sends ping frame failure. %s", getSessionId(), x.getMessage()); 76 // } 77 // }), config.getHttp2PingInterval(), config.getHttp2PingInterval(), TimeUnit.MILLISECONDS); 78 79 // onClose(c => pingFuture.cancel()); 80 } 81 82 this(Http2Configuration config, TcpSession tcpSession, SecureSession secureSession, 83 SessionListener listener) { 84 super(config, tcpSession, secureSession, listener); 85 } 86 87 override 88 protected Http2Session initHttp2Session(Http2Configuration config, FlowControlStrategy flowControl, 89 SessionListener listener) { 90 return new Http2ClientSession(null, this.tcpSession, this.generator, listener, flowControl, config.getStreamIdleTimeout()); 91 } 92 93 override 94 protected Parser initParser(Http2Configuration config) { 95 return new Parser(http2Session, config.getMaxDynamicTableSize(), config.getMaxRequestHeadLength()); 96 } 97 98 99 override 100 ConnectionType getConnectionType() { 101 return super.getConnectionType(); 102 } 103 104 override 105 bool isEncrypted() { 106 return super.isEncrypted(); 107 } 108 109 110 Parser getParser() { 111 return parser; 112 } 113 114 Http2Generator getGenerator() { 115 return generator; 116 } 117 118 SessionSPI getSessionSPI() { 119 return http2Session; 120 } 121 122 override 123 void send(Request request, ClientHttpHandler handler) { 124 Promise!(HttpOutputStream) promise = new class Promise!(HttpOutputStream) { 125 126 void succeeded(HttpOutputStream output) { 127 try { 128 output.close(); 129 } catch (IOException e) { 130 errorf("write data unsuccessfully", e); 131 } 132 133 } 134 135 void failed(Exception x) { 136 errorf("write data unsuccessfully", x); 137 } 138 139 string id() { return "HttpOutputStream close"; } 140 }; 141 142 this.request(request, true, promise, handler); 143 } 144 145 override 146 void send(Request request, ByteBuffer buffer, ClientHttpHandler handler) { 147 send(request, [buffer], handler); 148 } 149 150 override 151 void send(Request request, ByteBuffer[] buffers, ClientHttpHandler handler) { 152 long contentLength = BufferUtils.remaining(buffers); 153 request.getFields().put(HttpHeader.CONTENT_LENGTH, contentLength.to!string); 154 155 Promise!(HttpOutputStream) promise = new class Promise!(HttpOutputStream) { 156 157 void succeeded(HttpOutputStream output) { 158 try { 159 output.writeWithContentLength(buffers); 160 } catch (IOException e) { 161 errorf("write data unsuccessfully", e); 162 } 163 } 164 165 void failed(Exception x) { 166 errorf("write data unsuccessfully", x); 167 } 168 169 string id() { return "writeWithContentLength"; } 170 }; 171 172 send(request, promise, handler); 173 } 174 175 override 176 HttpOutputStream sendRequestWithContinuation(HttpRequest request, ClientHttpHandler handler) { 177 request.getFields().put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE); 178 return getHttpOutputStream(request, handler); 179 } 180 181 override 182 HttpOutputStream getHttpOutputStream(Request request, ClientHttpHandler handler) { 183 FuturePromise!(HttpOutputStream) promise = new FuturePromise!(HttpOutputStream)(); 184 send(request, promise, handler); 185 try { 186 return promise.get(); 187 } catch (Exception e) { 188 errorf("get http output stream unsuccessfully", e); 189 return null; 190 } 191 } 192 193 override 194 void send(Request request, Promise!(HttpOutputStream) promise, ClientHttpHandler handler) { 195 this.request(request, false, promise, handler); 196 } 197 198 void request(Request request, bool endStream, 199 Promise!(HttpOutputStream) promise, 200 ClientHttpHandler handler) { 201 http2Session.newStream(new HeadersFrame(request, null, endStream), 202 new Http2ClientResponseHandler.ClientStreamPromise(request, promise), 203 new Http2ClientResponseHandler(request, handler, this)); 204 } 205 206 override 207 void upgradeHttp2(Request request, SettingsFrame settings, Promise!(HttpClientConnection) promise, 208 ClientHttpHandler upgradeHandler, 209 ClientHttpHandler http2ResponseHandler) { 210 throw new CommonRuntimeException("The current connection version is http2, it does not need to upgrading."); 211 } 212 213 override 214 void upgradeWebSocket(Request request, WebSocketPolicy policy, Promise!(WebSocketConnection) promise, 215 ClientHttpHandler upgradeHandler, IncomingFrames incomingFrames) { 216 throw new CommonRuntimeException("The current connection version is http2, it can not upgrade WebSocket."); 217 } 218 219 }