1 module hunt.http.client.HttpClient;
2 
3 import hunt.http.client.Call;
4 import hunt.http.client.ClientHttp2SessionListener;
5 import hunt.http.client.ClientHttpHandler;
6 import hunt.http.client.CookieStore;
7 import hunt.http.client.HttpClientConnection;
8 import hunt.http.client.HttpClientHandler;
9 import hunt.http.client.Http1ClientDecoder;
10 import hunt.http.client.HttpClientContext;
11 import hunt.http.client.Http2ClientDecoder;
12 import hunt.http.client.HttpClientOptions;
13 import hunt.http.client.HttpClientResponse;
14 import hunt.http.client.HttpClientRequest;
15 import hunt.http.client.InMemoryCookieStore;
16 import hunt.http.client.RealCall;
17 
18 import hunt.http.codec.CommonDecoder;
19 import hunt.http.codec.CommonEncoder;
20 import hunt.http.codec.websocket.decode.WebSocketDecoder;
21 import hunt.http.codec.websocket.frame.DataFrame;
22 
23 import hunt.http.HttpBody;
24 import hunt.http.HttpConnection;
25 import hunt.http.HttpOptions;
26 import hunt.http.HttpOutputStream;
27 import hunt.http.HttpRequest;
28 import hunt.http.HttpResponse;
29 import hunt.http.WebSocketConnection;
30 import hunt.http.WebSocketFrame;
31 import hunt.http.WebSocketPolicy;
32 
33 import hunt.Exceptions;
34 import hunt.concurrency.Future;
35 import hunt.concurrency.FuturePromise;
36 import hunt.concurrency.Promise;
37 
38 import hunt.io.BufferUtils;
39 import hunt.io.ByteBuffer;
40 import hunt.collection.Map;
41 import hunt.collection.HashMap;
42 
43 import hunt.logging;
44 import hunt.net;
45 import hunt.util.AbstractLifecycle;
46 
47 import core.atomic;
48 import core.time;
49 
50 /**
51  * 
52  */
53 class HttpClient : AbstractLifecycle {
54 
55     enum Duration DEFAULT_IDLE_TIMEOUT = 15.seconds;
56     enum Duration DEFAULT_CONNECT_TIMEOUT = 10.seconds;
57 
58     alias Callback = void delegate();
59 
60     private NetClientOptions clientOptions;
61     private NetClient[int] _netClients;
62     private HttpClientOptions _httpOptions;
63     private Callback _onClosed = null;
64     private CookieStore _cookieStore;
65 
66     this() {
67         clientOptions = new NetClientOptions();
68         clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT);
69         clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
70         HttpClientOptions config = new HttpClientOptions(clientOptions);
71         this(config);
72     }
73 
74     this(HttpClientOptions c) {
75         if (c is null) {
76             throw new IllegalArgumentException("http configuration is null");
77         }
78 
79         clientOptions = c.getTcpConfiguration();
80         if(clientOptions is null) {
81             clientOptions = new NetClientOptions();
82             clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT);
83             clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
84             c.setTcpConfiguration(clientOptions);
85         } else {
86             if(clientOptions.getIdleTimeout == Duration.zero) 
87                 clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT);
88 
89             if(clientOptions.getConnectTimeout == Duration.zero)
90                 clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
91         }
92 
93         this._httpOptions = c;
94 
95         start();
96     }
97 
98     Future!(HttpClientConnection) connect(string host, int port) {
99         FuturePromise!(HttpClientConnection) completable = new FuturePromise!(HttpClientConnection)("httpclient");
100         connect(host, port, completable);
101         return completable;
102     }
103 
104     void connect(string host, int port, Promise!(HttpClientConnection) promise) {
105         connect(host, port, promise, new ClientHttp2SessionListener());
106     }
107 
108     void connect(string host, int port, Promise!(HttpClientConnection) promise,
109             ClientHttp2SessionListener listener) {
110 
111         HttpClientContext clientContext = new HttpClientContext();
112         clientContext.setPromise(promise);
113         clientContext.setListener(listener);
114 
115         NetClient client = NetUtil.createNetClient(clientOptions);
116         int clientId = client.getId();
117         if(clientId in _netClients) {
118             warningf("clientId existes: %d", clientId);
119         }
120         _netClients[client.getId()] = client;
121         
122         client.setCodec(new class Codec {
123 
124             private CommonEncoder encoder;
125             private CommonDecoder decoder;
126 
127             this() {
128                 encoder = new CommonEncoder();
129 
130                 Http1ClientDecoder httpClientDecoder = new Http1ClientDecoder(
131                             new WebSocketDecoder(),
132                             new Http2ClientDecoder());
133                 decoder = new CommonDecoder(httpClientDecoder);
134             }
135 
136             Encoder getEncoder() {
137                 return encoder;
138             }
139 
140             Decoder getDecoder() {
141                 return decoder;
142             }
143         })
144         .setHandler(new HttpClientHandler(_httpOptions, clientContext));
145 
146         client.onClosed(_onClosed);
147         client.connect(host, port);
148         // _isConnected = client.isConnected();
149     }
150 
151     HttpClientOptions getHttpOptions() {
152         return _httpOptions;
153     }
154 
155     void close() {
156         stop();
157     }
158 
159     void setOnClosed(Callback callback) {
160         _onClosed = callback;
161     }
162 
163     override protected void initialize() {
164         // do nothing
165     }
166 
167     override void destroy() {
168         foreach(NetClient client; _netClients) {
169             client.close();
170         }
171         
172         _netClients = null;
173     }
174 
175     deprecated("Unsupported anymore!")
176     bool isConnected() {
177         // return _isConnected;
178         return false;
179     }
180 
181     /**
182      * Sets the handler that can accept cookies from incoming HTTP responses and provides cookies to
183      * outgoing HTTP requests.
184      *
185      * <p>If unset, {@linkplain CookieStore#NO_COOKIES no cookies} will be accepted nor provided.
186      */
187     HttpClient useCookieStore() {
188         _cookieStore = new InMemoryCookieStore();
189         return this;
190     }
191     
192     /** 
193      * 
194      * Params:
195      *   store = 
196      * Returns: 
197      */
198     HttpClient useCookieStore(CookieStore store) {
199         if (store is null) throw new NullPointerException("CookieStore is null");
200         _cookieStore = store;
201         return this;
202     }
203 
204     CookieStore getCookieStore() {
205         return _cookieStore;
206     }
207 
208     /**
209      * Prepares the {@code request} to be executed at some point in the future.
210      */
211     Call newCall(Request request) {
212         return RealCall.newRealCall(this, request, false /* for web socket */);
213     }    
214 
215     /**
216      * Uses {@code request} to connect a new web socket.
217      */
218     WebSocketConnection newWebSocket(Request request, WebSocketMessageHandler handler) {
219         assert(handler !is null);
220         
221 // import core.atomic;
222 // import core.sync.condition;
223 // import core.sync.mutex;
224 
225 // 		Mutex responseLocker = new Mutex();
226 // 		Condition responseCondition = new Condition(responseLocker);
227         
228         HttpURI uri = request.getURI();
229         string scheme = uri.getScheme();
230         string host = uri.getHost();
231         int port = uri.getPort();
232         
233         
234         // responseLocker.lock();
235         // scope(exit) {
236         //     responseLocker.unlock();
237         // }
238         
239         Future!(HttpClientConnection) conn = connect(host, port);
240         
241         TcpSslOptions tcpOptions = _httpOptions.getTcpConfiguration(); 
242         Duration idleTimeout = tcpOptions.getIdleTimeout();   
243         
244         HttpClientConnection connection = conn.get();
245         assert(connection !is null);
246         
247         FuturePromise!WebSocketConnection promise = new FuturePromise!WebSocketConnection();
248         WebSocketConnection webSocket;
249 
250         AbstractClientHttpHandler httpHandler = new class AbstractClientHttpHandler {
251             override bool messageComplete(HttpRequest request,
252                     HttpResponse response, HttpOutputStream output, HttpConnection connection) {
253                 version(HUNT_HTTP_DEBUG) tracef("Upgrade to WebSocket successfully: " ~ response.toString());
254                 return true;
255             }
256         };
257 
258         IncomingFrames incomingFrames = new class IncomingFrames {
259             void incomingError(Exception ex) {
260                 version(HUNT_DEBUG) warningf(ex.msg);
261                 handler.onError(webSocket, ex);
262             }
263 
264             void incomingFrame(WebSocketFrame frame) {
265                 WebSocketFrameType type = frame.getType();
266                 version(HUNT_HTTP_DEBUG) tracef("new frame comming: %s", type);
267                 switch (type) {
268                     case WebSocketFrameType.TEXT:
269                         handler.onText(webSocket, (cast(DataFrame) frame).getPayloadAsUTF8());
270                         break;
271                         
272                     case WebSocketFrameType.BINARY:
273                         handler.onBinary(webSocket, frame.getPayload());
274                         break;
275                         
276                     case WebSocketFrameType.CLOSE:
277                         handler.onClosed(webSocket);
278                         break;
279 
280                     case WebSocketFrameType.PING:
281                         handler.onPing(webSocket);
282                         break;
283 
284                     case WebSocketFrameType.PONG:
285                         handler.onPong(webSocket);
286                         break;
287 
288                     case WebSocketFrameType.CONTINUATION:
289                         handler.onContinuation(webSocket, frame.getPayload());
290                         break;
291 
292                     default:
293                         warningf("Can't handle the frame of ", type);
294                         break;
295                 }
296             }
297         };
298 
299         connection.upgradeWebSocket(request, WebSocketPolicy.newClientPolicy(),
300                 promise, httpHandler, incomingFrames);
301 
302         if(idleTimeout.isNegative()) {
303             version (HUNT_HTTP_DEBUG) infof("waitting for response...");
304             webSocket =  promise.get();
305             handler.onOpen(webSocket);
306         } else {
307             version (HUNT_HTTP_DEBUG) infof("waitting for response in %s ...", idleTimeout);
308             try {
309                 webSocket = promise.get(idleTimeout);
310                 handler.onOpen(webSocket);
311             } catch(Exception ex ) {
312                 version(HUNT_HTTP_DEBUG) warningf(ex.msg);
313                 handler.onError(webSocket, ex);
314             }
315         }
316         return webSocket;
317     }
318 
319 }