1 module hunt.http.client.HttpClient;
2 
3 import hunt.http.client.Call;
4 import hunt.http.client.ClientHttp2SessionListener;
5 import hunt.http.client.ClientHttpHandler;
6 import hunt.http.client.CookieStore;
7 import hunt.http.client.HttpClientConnection;
8 import hunt.http.client.HttpClientHandler;
9 import hunt.http.client.Http1ClientDecoder;
10 import hunt.http.client.HttpClientContext;
11 import hunt.http.client.Http2ClientDecoder;
12 import hunt.http.client.HttpClientOptions;
13 import hunt.http.client.HttpClientResponse;
14 import hunt.http.client.HttpClientRequest;
15 import hunt.http.client.InMemoryCookieStore;
16 import hunt.http.client.RealCall;
17 
18 import hunt.http.codec.CommonDecoder;
19 import hunt.http.codec.CommonEncoder;
20 import hunt.http.codec.websocket.decode.WebSocketDecoder;
21 import hunt.http.codec.websocket.frame.DataFrame;
22 
23 import hunt.http.HttpBody;
24 import hunt.http.HttpConnection;
25 import hunt.http.HttpOptions;
26 import hunt.http.HttpOutputStream;
27 import hunt.http.HttpRequest;
28 import hunt.http.HttpResponse;
29 import hunt.http.WebSocketConnection;
30 import hunt.http.WebSocketFrame;
31 import hunt.http.WebSocketPolicy;
32 
33 import hunt.Exceptions;
34 import hunt.concurrency.Future;
35 import hunt.concurrency.FuturePromise;
36 import hunt.concurrency.Promise;
37 
38 import hunt.io.BufferUtils;
39 import hunt.io.ByteBuffer;
40 import hunt.collection.Map;
41 import hunt.collection.HashMap;
42 
43 import hunt.logging.ConsoleLogger;
44 import hunt.net;
45 import hunt.util.AbstractLifecycle;
46 
47 import core.atomic;
48 import core.time;
49 
50 /**
51  * 
52  */
53 class HttpClient : AbstractLifecycle {
54 
55     enum Duration DEFAULT_IDLE_TIMEOUT = 15.seconds;
56     enum Duration DEFAULT_CONNECT_TIMEOUT = 10.seconds;
57 
58     alias Callback = void delegate();
59 
60     private NetClientOptions clientOptions;
61     private NetClient[int] _netClients;
62     private HttpClientOptions _httpOptions;
63     private Callback _onClosed = null;
64     private CookieStore _cookieStore;
65 
66     this() {
67         clientOptions = new NetClientOptions();
68         clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT);
69         clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
70         HttpClientOptions config = new HttpClientOptions(clientOptions);
71         this(config);
72     }
73 
74     this(HttpClientOptions c) {
75         if (c is null) {
76             throw new IllegalArgumentException("http configuration is null");
77         }
78 
79         clientOptions = c.getTcpConfiguration();
80         if(clientOptions is null) {
81             clientOptions = new NetClientOptions();
82             clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT);
83             clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
84             c.setTcpConfiguration(clientOptions);
85         } else {
86             if(clientOptions.getIdleTimeout == Duration.zero) 
87                 clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT);
88 
89             if(clientOptions.getConnectTimeout == Duration.zero)
90                 clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
91         }
92 
93         this._httpOptions = c;
94 
95         start();
96     }
97 
98     Future!(HttpClientConnection) connect(string host, int port) {
99         FuturePromise!(HttpClientConnection) completable = new FuturePromise!(HttpClientConnection)();
100         completable.id = "httpclient";
101         connect(host, port, completable);
102         return completable;
103     }
104 
105     void connect(string host, int port, Promise!(HttpClientConnection) promise) {
106         connect(host, port, promise, new ClientHttp2SessionListener());
107     }
108 
109     void connect(string host, int port, Promise!(HttpClientConnection) promise,
110             ClientHttp2SessionListener listener) {
111 
112         HttpClientContext clientContext = new HttpClientContext();
113         clientContext.setPromise(promise);
114         clientContext.setListener(listener);
115 
116         NetClient client = NetUtil.createNetClient(clientOptions);
117         int clientId = client.getId();
118         if(clientId in _netClients) {
119             warningf("clientId existes: %d", clientId);
120         }
121         _netClients[client.getId()] = client;
122         
123         client.setCodec(new class Codec {
124 
125             private CommonEncoder encoder;
126             private CommonDecoder decoder;
127 
128             this() {
129                 encoder = new CommonEncoder();
130 
131                 Http1ClientDecoder httpClientDecoder = new Http1ClientDecoder(
132                             new WebSocketDecoder(),
133                             new Http2ClientDecoder());
134                 decoder = new CommonDecoder(httpClientDecoder);
135             }
136 
137             Encoder getEncoder() {
138                 return encoder;
139             }
140 
141             Decoder getDecoder() {
142                 return decoder;
143             }
144         })
145         .setHandler(new HttpClientHandler(_httpOptions, clientContext));
146 
147         client.onClosed(_onClosed);
148         client.connect(host, port);
149         // _isConnected = client.isConnected();
150     }
151 
152     HttpClientOptions getHttpOptions() {
153         return _httpOptions;
154     }
155 
156     void close() {
157         stop();
158     }
159 
160     void setOnClosed(Callback callback) {
161         _onClosed = callback;
162     }
163 
164     override protected void initialize() {
165         // do nothing
166     }
167 
168     override void destroy() {
169         foreach(NetClient client; _netClients) {
170             client.close();
171         }
172         
173         _netClients = null;
174     }
175 
176     deprecated("Unsupported anymore!")
177     bool isConnected() {
178         // return _isConnected;
179         return false;
180     }
181 
182     /**
183      * Sets the handler that can accept cookies from incoming HTTP responses and provides cookies to
184      * outgoing HTTP requests.
185      *
186      * <p>If unset, {@linkplain CookieStore#NO_COOKIES no cookies} will be accepted nor provided.
187      */
188     HttpClient useCookieStore() {
189         _cookieStore = new InMemoryCookieStore();
190         return this;
191     }
192     
193     /** 
194      * 
195      * Params:
196      *   store = 
197      * Returns: 
198      */
199     HttpClient useCookieStore(CookieStore store) {
200         if (store is null) throw new NullPointerException("CookieStore is null");
201         _cookieStore = store;
202         return this;
203     }
204 
205     CookieStore getCookieStore() {
206         return _cookieStore;
207     }
208 
209     /**
210      * Prepares the {@code request} to be executed at some point in the future.
211      */
212     Call newCall(Request request) {
213         return RealCall.newRealCall(this, request, false /* for web socket */);
214     }    
215 
216     /**
217      * Uses {@code request} to connect a new web socket.
218      */
219     WebSocketConnection newWebSocket(Request request, WebSocketMessageHandler handler) {
220         assert(handler !is null);
221         
222 // import core.atomic;
223 // import core.sync.condition;
224 // import core.sync.mutex;
225 
226 // 		Mutex responseLocker = new Mutex();
227 // 		Condition responseCondition = new Condition(responseLocker);
228         
229         HttpURI uri = request.getURI();
230         string scheme = uri.getScheme();
231         string host = uri.getHost();
232         int port = uri.getPort();
233         
234         
235         // responseLocker.lock();
236         // scope(exit) {
237         //     responseLocker.unlock();
238         // }
239         
240         Future!(HttpClientConnection) conn = connect(host, port);
241         
242         TcpSslOptions tcpOptions = _httpOptions.getTcpConfiguration(); 
243         Duration idleTimeout = tcpOptions.getIdleTimeout();   
244         
245         HttpClientConnection connection = conn.get();
246         assert(connection !is null);
247         
248         FuturePromise!WebSocketConnection promise = new FuturePromise!WebSocketConnection();
249         WebSocketConnection webSocket;
250 
251         AbstractClientHttpHandler httpHandler = new class AbstractClientHttpHandler {
252             override bool messageComplete(HttpRequest request,
253                     HttpResponse response, HttpOutputStream output, HttpConnection connection) {
254                 version(HUNT_HTTP_DEBUG) tracef("Upgrade to WebSocket successfully: " ~ response.toString());
255                 return true;
256             }
257         };
258 
259         IncomingFrames incomingFrames = new class IncomingFrames {
260             void incomingError(Exception ex) {
261                 version(HUNT_DEBUG) warningf(ex.msg);
262                 handler.onError(webSocket, ex);
263             }
264 
265             void incomingFrame(WebSocketFrame frame) {
266                 WebSocketFrameType type = frame.getType();
267                 version(HUNT_HTTP_DEBUG) tracef("new frame comming: %s", type);
268                 switch (type) {
269                     case WebSocketFrameType.TEXT:
270                         handler.onText(webSocket, (cast(DataFrame) frame).getPayloadAsUTF8());
271                         break;
272                         
273                     case WebSocketFrameType.BINARY:
274                         handler.onBinary(webSocket, frame.getPayload());
275                         break;
276                         
277                     case WebSocketFrameType.CLOSE:
278                         handler.onClosed(webSocket);
279                         break;
280 
281                     case WebSocketFrameType.PING:
282                         handler.onPing(webSocket);
283                         break;
284 
285                     case WebSocketFrameType.PONG:
286                         handler.onPong(webSocket);
287                         break;
288 
289                     case WebSocketFrameType.CONTINUATION:
290                         handler.onContinuation(webSocket, frame.getPayload());
291                         break;
292 
293                     default:
294                         warningf("Can't handle the frame of ", type);
295                         break;
296                 }
297             }
298         };
299 
300         connection.upgradeWebSocket(request, WebSocketPolicy.newClientPolicy(),
301                 promise, httpHandler, incomingFrames);
302 
303         if(idleTimeout.isNegative()) {
304             version (HUNT_HTTP_DEBUG) infof("waitting for response...");
305             webSocket =  promise.get();
306             handler.onOpen(webSocket);
307         } else {
308             version (HUNT_HTTP_DEBUG) infof("waitting for response in %s ...", idleTimeout);
309             try {
310                 webSocket = promise.get(idleTimeout);
311                 handler.onOpen(webSocket);
312             } catch(Exception ex ) {
313                 version(HUNT_HTTP_DEBUG) warningf(ex.msg);
314                 handler.onError(webSocket, ex);
315             }
316         }
317         return webSocket;
318     }
319 
320 }