1 module hunt.http.client.Http2ClientConnection;
2 
3 import hunt.http.client.ClientHttpHandler;
4 import hunt.http.client.HttpClientConnection;
5 import hunt.http.client.Http2ClientResponseHandler;
6 import hunt.http.client.Http2ClientSession;
7 
8 import hunt.http.codec.http.decode.Parser;
9 import hunt.http.codec.http.encode.Http2Generator;
10 import hunt.http.codec.http.frame;
11 import hunt.http.codec.http.stream;
12 
13 import hunt.http.HttpConnection;
14 import hunt.http.HttpHeader;
15 import hunt.http.HttpOptions;
16 import hunt.http.HttpOutputStream;
17 import hunt.http.HttpRequest;
18 import hunt.http.WebSocketConnection;
19 import hunt.http.WebSocketPolicy;
20 import hunt.http.Util;
21 
22 import hunt.collection;
23 import hunt.concurrency.Delayed;
24 import hunt.logging;
25 import hunt.net.secure.SecureSession;
26 import hunt.net.Connection;
27 
28 import hunt.concurrency.FuturePromise;
29 import hunt.concurrency.Promise;
30 import hunt.Exceptions;
31 import hunt.util.Common;
32 import hunt.util.Runnable;
33 
34 import core.time;
35 import std.conv;
36 
37 alias SessionListener = StreamSession.Listener;
38 
39 /**
40  * 
41  */
42 class Http2ClientConnection : AbstractHttp2Connection , HttpClientConnection {
43     void initialize(HttpOptions config, Promise!(HttpClientConnection) promise,
44                            SessionListener listener) {
45         Map!(int, int) settings = listener.onPreface(getHttp2Session());
46         if (settings is null) {
47             settings = Collections.emptyMap!(int, int)();
48         }
49         PrefaceFrame prefaceFrame = new PrefaceFrame();
50         SettingsFrame settingsFrame = new SettingsFrame(settings, false);
51         SessionSPI sessionSPI = getSessionSPI();
52         int windowDelta = config.getInitialSessionRecvWindow() - FlowControlStrategy.DEFAULT_WINDOW_SIZE;
53         Callback callback = new class NoopCallback {
54 
55             override
56             void succeeded() {
57                 promise.succeeded(this.outer);
58             }
59 
60             override
61             void failed(Exception x) {
62                 this.outer.close();
63                 promise.failed(x);
64             }
65         };
66 
67         if (windowDelta > 0) {
68             sessionSPI.updateRecvWindow(windowDelta);
69             sessionSPI.frames(null, callback, prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta));
70         } else {
71             sessionSPI.frames(null, callback, prefaceFrame, settingsFrame);
72         }
73 
74         executor = CommonUtil.scheduler();
75         executor.setRemoveOnCancelPolicy(true);
76         ScheduledFuture!(void) pingFuture = executor.scheduleWithFixedDelay(new class Runnable {
77             void run() {
78                 PingFrame pingFrame = new PingFrame(false);
79 
80                 getHttp2Session().ping(pingFrame, new class NoopCallback {
81                     override void succeeded() {
82                         version(HUNT_HTTP_DEBUG) infof("The session %s sent ping frame success", getId());
83                     }
84 
85                     override void failed(Exception x) {
86                         debug warningf("the session %s sends ping frame failure. %s", getId(), x.msg);
87                         version(HUNT_HTTP_DEBUG)  warning(x);
88                     }
89                 });
90             }
91         }, 
92         
93         msecs(config.getHttp2PingInterval()), 
94         msecs(config.getHttp2PingInterval()));
95 
96         onClose( (c) { pingFuture.cancel(false); });        
97     }
98 
99     this(HttpOptions config, Connection tcpSession, SessionListener listener) {
100         super(config, tcpSession, listener);
101     }
102 
103     override
104     protected Http2Session initHttp2Session(HttpOptions config, FlowControlStrategy flowControl,
105                                             SessionListener listener) {
106         return new Http2ClientSession(null, this._tcpSession, this.generator, listener, flowControl, config.getStreamIdleTimeout());
107     }
108 
109     override
110     protected Parser initParser(HttpOptions config) {
111         return new Parser(http2Session, config.getMaxDynamicTableSize(), config.getMaxRequestHeadLength());
112     }
113 
114     
115     override
116     HttpConnectionType getConnectionType() {
117         return super.getConnectionType();
118     }
119 
120     // override
121     // bool isSecured() {
122     //     return super.isSecured();
123     // }
124 
125 
126     Parser getParser() {
127         return parser;
128     }
129 
130     Http2Generator getGenerator() {
131         return generator;
132     }
133 
134     SessionSPI getSessionSPI() {
135         return http2Session;
136     }
137 
138     override
139     void send(HttpRequest request, ClientHttpHandler handler) {
140         Promise!(HttpOutputStream) promise = new class Promise!(HttpOutputStream) {
141 
142             bool succeeded(HttpOutputStream output) {
143                 try {
144                     output.close();
145                     return true;
146                 } catch (IOException e) {
147                     errorf("write data unsuccessfully", e);
148                     return false;
149                 }
150 
151             }
152 
153             bool failed(Throwable x) {
154                 errorf("write data unsuccessfully", x);
155                 return true;
156             }
157 
158             string id() { return "HttpOutputStream close"; }
159         };
160 
161         this.request(request, true, promise, handler);
162     }
163 
164     override
165     void send(HttpRequest request, ByteBuffer buffer, ClientHttpHandler handler) {
166         send(request, [buffer], handler);
167     }
168 
169     override
170     void send(HttpRequest request, ByteBuffer[] buffers, ClientHttpHandler handler) {
171         long contentLength = BufferUtils.remaining(buffers);
172         request.getFields().put(HttpHeader.CONTENT_LENGTH, contentLength.to!string);
173 
174         Promise!(HttpOutputStream) promise = new class Promise!(HttpOutputStream) {
175 
176             bool succeeded(HttpOutputStream output) {
177                 try {
178                     output.writeWithContentLength(buffers);
179                     return true;
180                 } catch (IOException e) {
181                     errorf("write data unsuccessfully", e);
182                     return false;
183                 }
184             }
185 
186             bool failed(Throwable x) {
187                 errorf("write data unsuccessfully", x);
188                 return true;
189             }
190 
191             string id() { return "writeWithContentLength"; }
192         };
193 
194         send(request, promise, handler);
195     }
196 
197     override
198     HttpOutputStream sendRequestWithContinuation(HttpRequest request, ClientHttpHandler handler) {
199         request.getFields().put(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE);
200         return getHttpOutputStream(request, handler);
201     }
202 
203     override
204     HttpOutputStream getHttpOutputStream(HttpRequest request, ClientHttpHandler handler) {
205         FuturePromise!(HttpOutputStream) promise = new FuturePromise!(HttpOutputStream)();
206         send(request, promise, handler);
207         try {
208             return promise.get();
209         } catch (Exception e) {
210             errorf("get http output stream unsuccessfully", e);
211             return null;
212         }
213     }
214 
215     override
216     void send(HttpRequest request, Promise!(HttpOutputStream) promise, ClientHttpHandler handler) {
217         this.request(request, false, promise, handler);
218     }
219 
220     void request(HttpRequest request, bool endStream,
221                         Promise!(HttpOutputStream) promise,
222                         ClientHttpHandler handler) {
223         http2Session.newStream(new HeadersFrame(request, null, endStream),
224                 new Http2ClientResponseHandler.ClientStreamPromise(request, promise),
225                 new Http2ClientResponseHandler(request, handler, this));
226     }
227 
228     override
229     void upgradeHttp2(HttpRequest request, SettingsFrame settings, Promise!(HttpClientConnection) promise,
230                              ClientHttpHandler upgradeHandler,
231                              ClientHttpHandler http2ResponseHandler) {
232         throw new CommonRuntimeException("The current connection version is http2, it does not need to upgrading.");
233     }
234 
235     override
236     void upgradeWebSocket(HttpRequest request, WebSocketPolicy policy, Promise!(WebSocketConnection) promise,
237                                  ClientHttpHandler upgradeHandler, IncomingFrames incomingFrames) {
238         throw new CommonRuntimeException("The current connection version is http2, it can not upgrade WebSocket.");
239     }
240 
241 }