1 module hunt.http.client.RealCall;
2 
3 import hunt.http.client.Call;
4 import hunt.http.client.ClientHttpHandler;
5 import hunt.http.client.CookieStore;
6 import hunt.http.client.HttpClient;
7 import hunt.http.client.HttpClientConnection;
8 import hunt.http.client.HttpClientResponse;
9 import hunt.http.client.HttpClientRequest;
10 import hunt.http.client.Http1ClientConnection;
11 
12 import hunt.http.Cookie;
13 import hunt.http.HttpBody;
14 import hunt.http.HttpConnection;
15 import hunt.http.HttpConnection;
16 import hunt.http.HttpFields;
17 import hunt.http.HttpField;
18 import hunt.http.HttpHeader;
19 import hunt.http.HttpMethod;
20 import hunt.http.HttpOptions;
21 import hunt.http.HttpOutputStream;
22 import hunt.http.HttpRequest;
23 import hunt.http.HttpResponse;
24 import hunt.http.HttpStatus;
25 import hunt.http.HttpVersion;
26 
27 import hunt.io.ByteBuffer;
28 import hunt.io.BufferUtils;
29 import hunt.concurrency.FuturePromise;
30 import hunt.Exceptions;
31 // import hunt.net.NetUtil;
32 import hunt.logging;
33 import hunt.net.KeyCertOptions;
34 import hunt.net.PemKeyCertOptions;
35 import hunt.net.TcpSslOptions;
36 import hunt.net.NetClientOptions;
37 import hunt.net.util.HttpURI;
38 import hunt.util.Traits;
39 
40 import core.atomic;
41 import core.sync.condition;
42 import core.sync.mutex;
43 
44 import std.conv;
45 import std.format;
46 import std.parallelism;
47 
48 /**
49 */
50 class RealCall : Call {
51     private HttpClient client;
52 
53     /** The application's original request unadulterated by redirects or auth headers. */
54     private Request originalRequest;
55     private bool forWebSocket;
56 
57     // Guarded by this.
58     private bool executed;
59     private Mutex responseLocker;
60     private Condition responseCondition;
61 
62     private this(HttpClient client, Request request, bool forWebSocket) {
63         this.client = client;
64         this.originalRequest = request;
65         this.forWebSocket = forWebSocket;
66         
67         version(WITH_HUNT_SECURITY) {
68             if(request.isHttps()) {
69                 client.getHttpOptions().isSecureConnectionEnabled(true);
70             }
71         }
72 		responseLocker = new Mutex();
73 		responseCondition = new Condition(responseLocker);
74     }
75 
76     static RealCall newRealCall(HttpClient client, Request request, bool forWebSocket) {
77         // Safely publish the Call instance to the EventListener.
78         RealCall call = new RealCall(client, request, forWebSocket);
79         return call;
80     }
81 
82     Request request() {
83         return originalRequest;
84     }
85 
86     Response execute() {
87         synchronized (this) {
88             if (executed) throw new IllegalStateException("Already Executed");
89                 executed = true;
90         }
91 
92 
93         version(WITH_HUNT_TRACE) {
94             originalRequest.startSpan();
95         }
96         
97         HttpClientResponse hcr;
98 
99         AbstractClientHttpHandler httpHandler = new class AbstractClientHttpHandler {
100 
101             override bool headerComplete(HttpRequest request, HttpResponse response,
102                     HttpOutputStream output, HttpConnection connection) {
103                 version(HUNT_HTTP_DEBUG) info("headerComplete!");
104                     
105                 HttpClientRequest req = cast(HttpClientRequest)request;
106                 assert(req !is null);
107 
108                 HttpClientResponse res = cast(HttpClientResponse)response;
109                 assert(res !is null);
110 
111                 if(req.isCookieStoreEnabled()) {
112                     CookieStore store = client.getCookieStore();
113                     if(store !is null) {
114                         foreach(Cookie c; res.cookies()) {
115                             store.add(request.getURI(), c);
116                         }
117                     }
118                 }
119 
120                 return true;
121             }
122 
123             override bool content(ByteBuffer item, HttpRequest request, HttpResponse response, 
124                     HttpOutputStream output, HttpConnection connection) {
125                 
126                 HttpClientResponse clientResponse = cast(HttpClientResponse)response;
127                 assert(clientResponse !is null);
128                 
129                 version (HUNT_HTTP_DEBUG) {
130                     tracef("ContentType: %s, ContentLength: %d, current content size: %d", 
131                         response.getContentType(), response.getContentLength(), item.remaining());
132                 }
133 
134                 version (HUNT_HTTP_DEBUG_MORE) {
135                     tracef("content: %s", cast(string)item.peekRemaining());
136                 }
137 
138                 // clientResponse.setBody(new ResponseBody(response.getContentType(), 
139                 //     response.getContentLength(), BufferUtils.clone(item)));
140 
141                 HttpBody hb;
142                 if(clientResponse.haveBody()) {
143                     hb = clientResponse.getBody();
144                 } else {
145                     hb = HttpBody.create(response.getContentType(), response.getContentLength());
146                     clientResponse.setBody(hb); 
147                 } 
148                 hb.append(item);
149 
150                 return false;
151             }
152 
153             override bool messageComplete(HttpRequest request, HttpResponse response,
154                     HttpOutputStream output, HttpConnection connection) {
155                 version (HUNT_HTTP_DEBUG) trace(response.getFields());
156                 
157                 synchronized {
158                     hcr = cast(HttpClientResponse)response;
159                     assert(hcr !is null);
160                 }
161 
162                 responseLocker.lock();
163                 scope(exit) {
164                     responseLocker.unlock();
165                 }
166                 responseCondition.notifyAll();
167                 version(HUNT_HTTP_DEBUG) infof("All the message content received.");
168                 return true;
169             }
170 
171         };
172 
173         HttpOptions options = client.getHttpOptions();
174         // options = new HttpOptions(options); // clone the options
175         // FIXME: Needing refactor or cleanup -@zhangxueping at 2019-12-12T18:13:36+08:00
176         // clone the options
177         if(originalRequest.isCertificateAuth()) {
178             options.isCertificateAuth = originalRequest.isCertificateAuth();
179             options.setKeyCertOptions(originalRequest.getKeyCertOptions());
180         }
181 
182         doRequestTask(httpHandler);
183 
184         responseLocker.lock();
185         scope(exit) {
186             responseLocker.unlock();
187         }
188 
189         if(hcr is null) {        
190             TcpSslOptions tcpOptions = options.getTcpConfiguration();     
191             Duration idleTimeout = tcpOptions.getIdleTimeout();    
192 
193             if(idleTimeout.isNegative()) {
194                 version (HUNT_DEBUG) infof("waitting for response...");
195                 responseCondition.wait();
196             } else {  
197                 version (HUNT_DEBUG) infof("waitting for response in %s ...", idleTimeout);
198                 bool r = responseCondition.wait(idleTimeout);
199                 if(!r) {
200                     string msg = format("No any response in %s", idleTimeout);
201                     warningf(msg);
202                     if(!client.isStopped())
203                         client.close();
204 
205                     version(WITH_HUNT_TRACE) {
206                         originalRequest.endTraceSpan(HttpStatus.INTERNAL_SERVER_ERROR_500, msg);
207                     }                    
208                     throw new TimeoutException();
209                 }
210             }
211         }
212 
213         version (HUNT_HTTP_DEBUG) info("response normally");
214         version(WITH_HUNT_TRACE) {
215             originalRequest.endTraceSpan(hcr.getStatus(), null);
216         }
217         return hcr;
218     }
219 
220     void enqueue(Callback responseCallback) {
221         synchronized (this) {
222             if (executed) throw new IllegalStateException("Already Executed");
223                 executed = true;
224         }
225         // transmitter.callStart();
226         // client.dispatcher().enqueue(new AsyncCall(responseCallback));
227 
228         AbstractClientHttpHandler httpHandler = new class AbstractClientHttpHandler {
229             override bool content(ByteBuffer item, HttpRequest request, HttpResponse response, 
230                     HttpOutputStream output, HttpConnection connection) {
231                 HttpClientResponse hcr = cast(HttpClientResponse)response;
232                 // hcr.setBody(new ResponseBody(response.getContentType(), 
233                 //     response.getContentLength(), BufferUtils.clone(item)));
234 
235                 HttpBody hb = hcr.getBody();
236                 if(hb is null) {
237                     hb = HttpBody.create(response.getContentType(), response.getContentLength());
238                     hcr.setBody(hb); 
239                 } 
240                 hb.append(item);
241 
242                 return false;
243             }
244 
245             override bool messageComplete(HttpRequest request, HttpResponse response,
246                     HttpOutputStream output, HttpConnection connection) {
247                 version (HUNT_HTTP_DEBUG) trace(response.getFields());
248                 responseCallback.onResponse(this.outer, cast(HttpClientResponse)response);
249                 return true;
250             }
251 
252         };
253 
254         httpHandler.badMessage((int status, string reason, HttpRequest request,
255                     HttpResponse response, HttpOutputStream output, HttpConnection connection) {
256             import std.format;
257             string msg = format("status: %d, reason: %s", status, reason);
258             responseCallback.onFailure(this, new IOException(msg));
259         });
260 
261         try {
262             doRequestTask(httpHandler);
263             // auto requestTask = task(&doRequestTask, httpHandler);
264             // requestTask.executeInNewThread();            
265         } catch(IOException ex) {
266             responseCallback.onFailure(this, ex);
267         }
268     }
269 
270     void doRequestTask(AbstractClientHttpHandler httpHandler) {
271         HttpURI uri = originalRequest.getURI();
272         string scheme = uri.getScheme();
273 
274         // port
275         int port = uri.getPort();
276         version(HUNT_HTTP_DEBUG) infof("new request: scheme=%s, host=%s, port=%d", 
277             scheme, uri.getHost(), port);
278         if(port <= 0) {
279             auto itemPtr = scheme in SchemePortMap;
280             if(itemPtr is null) {
281                 throw new Exception("Invalid http scheme: " ~ scheme);
282             }
283             port = *itemPtr;
284         }
285 
286         // set cookie from cookie store
287         if(originalRequest.isCookieStoreEnabled()) {
288             CookieStore store = client.getCookieStore();
289             HttpFields fields = originalRequest.getFields();
290             
291             if(store !is null && fields !is null) {
292                 auto cookies = store.getCookies();
293                 
294                 if(cookies !is null)
295                     fields.put(HttpHeader.COOKIE, generateCookies(store.getCookies()));
296             }
297         }
298 
299         // opentracing: initialize TraceContext
300 
301 
302         FuturePromise!HttpClientConnection promise = new FuturePromise!HttpClientConnection();
303         HttpConnection connection;
304         try {
305             NetClientOptions tcpConfig = cast(NetClientOptions)client.getHttpOptions().getTcpConfiguration();
306             client.connect(uri.getHost(), port, promise);
307             connection = promise.get(tcpConfig.getConnectTimeout());
308         } catch(Exception ex) {
309             string msg = "Failed to open " ~ uri.toString();
310             version(HUNT_DEBUG) {
311                 warning(msg, ". The reason: ", ex.msg);
312             }
313             version(HUNT_HTTP_DEBUG) warning(ex);
314             if(!client.isStopped())
315                 client.close();
316 
317             version(WITH_HUNT_TRACE) {
318                 originalRequest.endTraceSpan(HttpStatus.INTERNAL_SERVER_ERROR_500, msg);
319             }
320             throw new IOException(msg);
321         }
322         
323         version (HUNT_HTTP_DEBUG) info(connection.getHttpVersion());
324 
325         if (connection.getHttpVersion() == HttpVersion.HTTP_1_1) {
326 
327             Http1ClientConnection http1ClientConnection = cast(Http1ClientConnection) connection;
328             // HttpBody rb = originalRequest.getBody();
329             if(HttpMethod.permitsRequestBody(originalRequest.getMethod())) { // && rb !is null
330                 // http1ClientConnection.send(originalRequest, rb.content(), httpHandler);
331                 HttpOutputStream output = http1ClientConnection.getHttpOutputStream(originalRequest, httpHandler);
332                 // rb.writeTo(output);
333                 output.close(); // End a request, and keep the connection for waiting for the respons.
334             } else {
335                 http1ClientConnection.send(originalRequest, httpHandler);
336             }
337         } else {
338             // TODO: Tasks pending completion -@zxp at 6/4/2019, 5:55:40 PM
339             // 
340             string msg = "Unsupported " ~ connection.getHttpVersion().toString();
341             
342             version(WITH_HUNT_TRACE) {
343                 originalRequest.endTraceSpan(HttpStatus.INTERNAL_SERVER_ERROR_500, msg);
344             }
345             throw new IOException(msg);
346         }
347 
348         version (HUNT_HTTP_DEBUG) info("waitting response...");        
349     }
350 
351     
352     void cancel() {
353         implementationMissing(false);
354         // transmitter.cancel();
355     }
356 
357     // Timeout timeout() {
358     //     return transmitter.timeout();
359     // }
360 
361     bool isExecuted() {
362         return executed;
363     }
364 
365     bool isCanceled() {
366         // return transmitter.isCanceled();
367         
368         return false;
369     }
370           
371 }