1 module hunt.http.client.HttpClient;
2
3 import hunt.http.client.Call;
4 import hunt.http.client.ClientHttp2SessionListener;
5 import hunt.http.client.ClientHttpHandler;
6 import hunt.http.client.CookieStore;
7 import hunt.http.client.HttpClientConnection;
8 import hunt.http.client.HttpClientHandler;
9 import hunt.http.client.Http1ClientDecoder;
10 import hunt.http.client.HttpClientContext;
11 import hunt.http.client.Http2ClientDecoder;
12 import hunt.http.client.HttpClientOptions;
13 import hunt.http.client.HttpClientResponse;
14 import hunt.http.client.HttpClientRequest;
15 import hunt.http.client.InMemoryCookieStore;
16 import hunt.http.client.RealCall;
17
18 import hunt.http.codec.CommonDecoder;
19 import hunt.http.codec.CommonEncoder;
20 import hunt.http.codec.websocket.decode.WebSocketDecoder;
21 import hunt.http.codec.websocket.frame.DataFrame;
22
23 import hunt.http.HttpBody;
24 import hunt.http.HttpConnection;
25 import hunt.http.HttpOptions;
26 import hunt.http.HttpOutputStream;
27 import hunt.http.HttpRequest;
28 import hunt.http.HttpResponse;
29 import hunt.http.WebSocketConnection;
30 import hunt.http.WebSocketFrame;
31 import hunt.http.WebSocketPolicy;
32
33 import hunt.Exceptions;
34 import hunt.concurrency.Future;
35 import hunt.concurrency.FuturePromise;
36 import hunt.concurrency.Promise;
37
38 import hunt.io.BufferUtils;
39 import hunt.io.ByteBuffer;
40 import hunt.collection.Map;
41 import hunt.collection.HashMap;
42
43 import hunt.logging;
44 import hunt.net;
45 import hunt.util.AbstractLifecycle;
46
47 import core.atomic;
48 import core.time;
49
50 /**
51 *
52 */
53 class HttpClient : AbstractLifecycle {
54
55 enum Duration DEFAULT_IDLE_TIMEOUT = 15.seconds;
56 enum Duration DEFAULT_CONNECT_TIMEOUT = 10.seconds;
57
58 alias Callback = void delegate();
59
60 private NetClientOptions clientOptions;
61 private NetClient[int] _netClients;
62 private HttpClientOptions _httpOptions;
63 private Callback _onClosed = null;
64 private CookieStore _cookieStore;
65
66 this() {
67 clientOptions = new NetClientOptions();
68 clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT);
69 clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
70 HttpClientOptions config = new HttpClientOptions(clientOptions);
71 this(config);
72 }
73
74 this(HttpClientOptions c) {
75 if (c is null) {
76 throw new IllegalArgumentException("http configuration is null");
77 }
78
79 clientOptions = c.getTcpConfiguration();
80 if(clientOptions is null) {
81 clientOptions = new NetClientOptions();
82 clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT);
83 clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
84 c.setTcpConfiguration(clientOptions);
85 } else {
86 if(clientOptions.getIdleTimeout == Duration.zero)
87 clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT);
88
89 if(clientOptions.getConnectTimeout == Duration.zero)
90 clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
91 }
92
93 this._httpOptions = c;
94
95 start();
96 }
97
98 Future!(HttpClientConnection) connect(string host, int port) {
99 FuturePromise!(HttpClientConnection) completable = new FuturePromise!(HttpClientConnection)("httpclient");
100 connect(host, port, completable);
101 return completable;
102 }
103
104 void connect(string host, int port, Promise!(HttpClientConnection) promise) {
105 connect(host, port, promise, new ClientHttp2SessionListener());
106 }
107
108 void connect(string host, int port, Promise!(HttpClientConnection) promise,
109 ClientHttp2SessionListener listener) {
110
111 HttpClientContext clientContext = new HttpClientContext();
112 clientContext.setPromise(promise);
113 clientContext.setListener(listener);
114
115 NetClient client = NetUtil.createNetClient(clientOptions);
116 int clientId = client.getId();
117 if(clientId in _netClients) {
118 warningf("clientId existes: %d", clientId);
119 }
120 _netClients[client.getId()] = client;
121
122 client.setCodec(new class Codec {
123
124 private CommonEncoder encoder;
125 private CommonDecoder decoder;
126
127 this() {
128 encoder = new CommonEncoder();
129
130 Http1ClientDecoder httpClientDecoder = new Http1ClientDecoder(
131 new WebSocketDecoder(),
132 new Http2ClientDecoder());
133 decoder = new CommonDecoder(httpClientDecoder);
134 }
135
136 Encoder getEncoder() {
137 return encoder;
138 }
139
140 Decoder getDecoder() {
141 return decoder;
142 }
143 })
144 .setHandler(new HttpClientHandler(_httpOptions, clientContext));
145
146 client.onClosed(_onClosed);
147 client.connect(host, port);
148 // _isConnected = client.isConnected();
149 }
150
151 HttpClientOptions getHttpOptions() {
152 return _httpOptions;
153 }
154
155 void close() {
156 stop();
157 }
158
159 void setOnClosed(Callback callback) {
160 _onClosed = callback;
161 }
162
163 override protected void initialize() {
164 // do nothing
165 }
166
167 override void destroy() {
168 foreach(NetClient client; _netClients) {
169 client.close();
170 }
171
172 _netClients = null;
173 }
174
175 deprecated("Unsupported anymore!")
176 bool isConnected() {
177 // return _isConnected;
178 return false;
179 }
180
181 /**
182 * Sets the handler that can accept cookies from incoming HTTP responses and provides cookies to
183 * outgoing HTTP requests.
184 *
185 * <p>If unset, {@linkplain CookieStore#NO_COOKIES no cookies} will be accepted nor provided.
186 */
187 HttpClient useCookieStore() {
188 _cookieStore = new InMemoryCookieStore();
189 return this;
190 }
191
192 /**
193 *
194 * Params:
195 * store =
196 * Returns:
197 */
198 HttpClient useCookieStore(CookieStore store) {
199 if (store is null) throw new NullPointerException("CookieStore is null");
200 _cookieStore = store;
201 return this;
202 }
203
204 CookieStore getCookieStore() {
205 return _cookieStore;
206 }
207
208 /**
209 * Prepares the {@code request} to be executed at some point in the future.
210 */
211 Call newCall(Request request) {
212 return RealCall.newRealCall(this, request, false /* for web socket */);
213 }
214
215 /**
216 * Uses {@code request} to connect a new web socket.
217 */
218 WebSocketConnection newWebSocket(Request request, WebSocketMessageHandler handler) {
219 assert(handler !is null);
220
221 // import core.atomic;
222 // import core.sync.condition;
223 // import core.sync.mutex;
224
225 // Mutex responseLocker = new Mutex();
226 // Condition responseCondition = new Condition(responseLocker);
227
228 HttpURI uri = request.getURI();
229 string scheme = uri.getScheme();
230 string host = uri.getHost();
231 int port = uri.getPort();
232
233
234 // responseLocker.lock();
235 // scope(exit) {
236 // responseLocker.unlock();
237 // }
238
239 Future!(HttpClientConnection) conn = connect(host, port);
240
241 TcpSslOptions tcpOptions = _httpOptions.getTcpConfiguration();
242 Duration idleTimeout = tcpOptions.getIdleTimeout();
243
244 HttpClientConnection connection = conn.get();
245 assert(connection !is null);
246
247 FuturePromise!WebSocketConnection promise = new FuturePromise!WebSocketConnection();
248 WebSocketConnection webSocket;
249
250 AbstractClientHttpHandler httpHandler = new class AbstractClientHttpHandler {
251 override bool messageComplete(HttpRequest request,
252 HttpResponse response, HttpOutputStream output, HttpConnection connection) {
253 version(HUNT_HTTP_DEBUG) tracef("Upgrade to WebSocket successfully: " ~ response.toString());
254 return true;
255 }
256 };
257
258 IncomingFrames incomingFrames = new class IncomingFrames {
259 void incomingError(Exception ex) {
260 version(HUNT_DEBUG) warningf(ex.msg);
261 handler.onError(webSocket, ex);
262 }
263
264 void incomingFrame(WebSocketFrame frame) {
265 WebSocketFrameType type = frame.getType();
266 version(HUNT_HTTP_DEBUG) tracef("new frame comming: %s", type);
267 switch (type) {
268 case WebSocketFrameType.TEXT:
269 handler.onText(webSocket, (cast(DataFrame) frame).getPayloadAsUTF8());
270 break;
271
272 case WebSocketFrameType.BINARY:
273 handler.onBinary(webSocket, frame.getPayload());
274 break;
275
276 case WebSocketFrameType.CLOSE:
277 handler.onClosed(webSocket);
278 break;
279
280 case WebSocketFrameType.PING:
281 handler.onPing(webSocket);
282 break;
283
284 case WebSocketFrameType.PONG:
285 handler.onPong(webSocket);
286 break;
287
288 case WebSocketFrameType.CONTINUATION:
289 handler.onContinuation(webSocket, frame.getPayload());
290 break;
291
292 default:
293 warningf("Can't handle the frame of ", type);
294 break;
295 }
296 }
297 };
298
299 connection.upgradeWebSocket(request, WebSocketPolicy.newClientPolicy(),
300 promise, httpHandler, incomingFrames);
301
302 if(idleTimeout.isNegative()) {
303 version (HUNT_HTTP_DEBUG) infof("waitting for response...");
304 webSocket = promise.get();
305 handler.onOpen(webSocket);
306 } else {
307 version (HUNT_HTTP_DEBUG) infof("waitting for response in %s ...", idleTimeout);
308 try {
309 webSocket = promise.get(idleTimeout);
310 handler.onOpen(webSocket);
311 } catch(Exception ex ) {
312 version(HUNT_HTTP_DEBUG) warningf(ex.msg);
313 handler.onError(webSocket, ex);
314 }
315 }
316 return webSocket;
317 }
318
319 }