1 module hunt.http.client.RealCall; 2 3 import hunt.http.client.Call; 4 import hunt.http.client.ClientHttpHandler; 5 import hunt.http.client.CookieStore; 6 import hunt.http.client.HttpClient; 7 import hunt.http.client.HttpClientConnection; 8 import hunt.http.client.HttpClientResponse; 9 import hunt.http.client.HttpClientRequest; 10 import hunt.http.client.Http1ClientConnection; 11 12 import hunt.http.Cookie; 13 import hunt.http.HttpBody; 14 import hunt.http.HttpConnection; 15 import hunt.http.HttpConnection; 16 import hunt.http.HttpFields; 17 import hunt.http.HttpField; 18 import hunt.http.HttpHeader; 19 import hunt.http.HttpMethod; 20 import hunt.http.HttpOptions; 21 import hunt.http.HttpOutputStream; 22 import hunt.http.HttpRequest; 23 import hunt.http.HttpResponse; 24 import hunt.http.HttpStatus; 25 import hunt.http.HttpVersion; 26 27 import hunt.io.ByteBuffer; 28 import hunt.io.BufferUtils; 29 import hunt.concurrency.FuturePromise; 30 import hunt.Exceptions; 31 // import hunt.net.NetUtil; 32 import hunt.logging; 33 import hunt.net.KeyCertOptions; 34 import hunt.net.PemKeyCertOptions; 35 import hunt.net.TcpSslOptions; 36 import hunt.net.NetClientOptions; 37 import hunt.net.util.HttpURI; 38 import hunt.util.Traits; 39 40 import core.atomic; 41 import core.sync.condition; 42 import core.sync.mutex; 43 44 import std.conv; 45 import std.format; 46 import std.parallelism; 47 48 /** 49 */ 50 class RealCall : Call { 51 private HttpClient client; 52 53 /** The application's original request unadulterated by redirects or auth headers. */ 54 private Request originalRequest; 55 private bool forWebSocket; 56 57 // Guarded by this. 58 private bool executed; 59 private Mutex responseLocker; 60 private Condition responseCondition; 61 62 private this(HttpClient client, Request request, bool forWebSocket) { 63 this.client = client; 64 this.originalRequest = request; 65 this.forWebSocket = forWebSocket; 66 67 version(WITH_HUNT_SECURITY) { 68 if(request.isHttps()) { 69 client.getHttpOptions().isSecureConnectionEnabled(true); 70 } 71 } 72 responseLocker = new Mutex(); 73 responseCondition = new Condition(responseLocker); 74 } 75 76 static RealCall newRealCall(HttpClient client, Request request, bool forWebSocket) { 77 // Safely publish the Call instance to the EventListener. 78 RealCall call = new RealCall(client, request, forWebSocket); 79 return call; 80 } 81 82 Request request() { 83 return originalRequest; 84 } 85 86 Response execute() { 87 synchronized (this) { 88 if (executed) throw new IllegalStateException("Already Executed"); 89 executed = true; 90 } 91 92 93 version(WITH_HUNT_TRACE) { 94 originalRequest.startSpan(); 95 } 96 97 HttpClientResponse hcr; 98 99 AbstractClientHttpHandler httpHandler = new class AbstractClientHttpHandler { 100 101 override bool headerComplete(HttpRequest request, HttpResponse response, 102 HttpOutputStream output, HttpConnection connection) { 103 version(HUNT_HTTP_DEBUG) info("headerComplete!"); 104 105 HttpClientRequest req = cast(HttpClientRequest)request; 106 assert(req !is null); 107 108 HttpClientResponse res = cast(HttpClientResponse)response; 109 assert(res !is null); 110 111 if(req.isCookieStoreEnabled()) { 112 CookieStore store = client.getCookieStore(); 113 if(store !is null) { 114 foreach(Cookie c; res.cookies()) { 115 store.add(request.getURI(), c); 116 } 117 } 118 } 119 120 return true; 121 } 122 123 override bool content(ByteBuffer item, HttpRequest request, HttpResponse response, 124 HttpOutputStream output, HttpConnection connection) { 125 126 HttpClientResponse clientResponse = cast(HttpClientResponse)response; 127 assert(clientResponse !is null); 128 129 version (HUNT_HTTP_DEBUG) { 130 tracef("ContentType: %s, ContentLength: %d, current content size: %d", 131 response.getContentType(), response.getContentLength(), item.remaining()); 132 } 133 134 version (HUNT_HTTP_DEBUG_MORE) { 135 tracef("content: %s", cast(string)item.peekRemaining()); 136 } 137 138 // clientResponse.setBody(new ResponseBody(response.getContentType(), 139 // response.getContentLength(), BufferUtils.clone(item))); 140 141 HttpBody hb; 142 if(clientResponse.haveBody()) { 143 hb = clientResponse.getBody(); 144 } else { 145 hb = HttpBody.create(response.getContentType(), response.getContentLength()); 146 clientResponse.setBody(hb); 147 } 148 hb.append(item); 149 150 return false; 151 } 152 153 override bool messageComplete(HttpRequest request, HttpResponse response, 154 HttpOutputStream output, HttpConnection connection) { 155 version (HUNT_HTTP_DEBUG) trace(response.getFields()); 156 157 synchronized { 158 hcr = cast(HttpClientResponse)response; 159 assert(hcr !is null); 160 } 161 162 responseLocker.lock(); 163 scope(exit) { 164 responseLocker.unlock(); 165 } 166 responseCondition.notifyAll(); 167 version(HUNT_HTTP_DEBUG) infof("All the message content received."); 168 return true; 169 } 170 171 }; 172 173 HttpOptions options = client.getHttpOptions(); 174 // options = new HttpOptions(options); // clone the options 175 // FIXME: Needing refactor or cleanup -@zhangxueping at 2019-12-12T18:13:36+08:00 176 // clone the options 177 if(originalRequest.isCertificateAuth()) { 178 options.isCertificateAuth = originalRequest.isCertificateAuth(); 179 options.setKeyCertOptions(originalRequest.getKeyCertOptions()); 180 } 181 182 doRequestTask(httpHandler); 183 184 responseLocker.lock(); 185 scope(exit) { 186 responseLocker.unlock(); 187 } 188 189 if(hcr is null) { 190 TcpSslOptions tcpOptions = options.getTcpConfiguration(); 191 Duration idleTimeout = tcpOptions.getIdleTimeout(); 192 193 if(idleTimeout.isNegative()) { 194 version (HUNT_DEBUG) infof("waitting for response..."); 195 responseCondition.wait(); 196 } else { 197 version (HUNT_DEBUG) infof("waitting for response in %s ...", idleTimeout); 198 bool r = responseCondition.wait(idleTimeout); 199 if(!r) { 200 string msg = format("No any response in %s", idleTimeout); 201 warningf(msg); 202 if(!client.isStopped()) 203 client.close(); 204 205 version(WITH_HUNT_TRACE) { 206 originalRequest.endTraceSpan(HttpStatus.INTERNAL_SERVER_ERROR_500, msg); 207 } 208 throw new TimeoutException(); 209 } 210 } 211 } 212 213 version (HUNT_HTTP_DEBUG) info("response normally"); 214 version(WITH_HUNT_TRACE) { 215 originalRequest.endTraceSpan(hcr.getStatus(), null); 216 } 217 return hcr; 218 } 219 220 void enqueue(Callback responseCallback) { 221 synchronized (this) { 222 if (executed) throw new IllegalStateException("Already Executed"); 223 executed = true; 224 } 225 // transmitter.callStart(); 226 // client.dispatcher().enqueue(new AsyncCall(responseCallback)); 227 228 AbstractClientHttpHandler httpHandler = new class AbstractClientHttpHandler { 229 override bool content(ByteBuffer item, HttpRequest request, HttpResponse response, 230 HttpOutputStream output, HttpConnection connection) { 231 HttpClientResponse hcr = cast(HttpClientResponse)response; 232 // hcr.setBody(new ResponseBody(response.getContentType(), 233 // response.getContentLength(), BufferUtils.clone(item))); 234 235 HttpBody hb = hcr.getBody(); 236 if(hb is null) { 237 hb = HttpBody.create(response.getContentType(), response.getContentLength()); 238 hcr.setBody(hb); 239 } 240 hb.append(item); 241 242 return false; 243 } 244 245 override bool messageComplete(HttpRequest request, HttpResponse response, 246 HttpOutputStream output, HttpConnection connection) { 247 version (HUNT_HTTP_DEBUG) trace(response.getFields()); 248 responseCallback.onResponse(this.outer, cast(HttpClientResponse)response); 249 return true; 250 } 251 252 }; 253 254 httpHandler.badMessage((int status, string reason, HttpRequest request, 255 HttpResponse response, HttpOutputStream output, HttpConnection connection) { 256 import std.format; 257 string msg = format("status: %d, reason: %s", status, reason); 258 responseCallback.onFailure(this, new IOException(msg)); 259 }); 260 261 try { 262 doRequestTask(httpHandler); 263 // auto requestTask = task(&doRequestTask, httpHandler); 264 // requestTask.executeInNewThread(); 265 } catch(IOException ex) { 266 responseCallback.onFailure(this, ex); 267 } 268 } 269 270 void doRequestTask(AbstractClientHttpHandler httpHandler) { 271 HttpURI uri = originalRequest.getURI(); 272 string scheme = uri.getScheme(); 273 274 // port 275 int port = uri.getPort(); 276 version(HUNT_HTTP_DEBUG) infof("new request: scheme=%s, host=%s, port=%d", 277 scheme, uri.getHost(), port); 278 if(port <= 0) { 279 auto itemPtr = scheme in SchemePortMap; 280 if(itemPtr is null) { 281 throw new Exception("Invalid http scheme: " ~ scheme); 282 } 283 port = *itemPtr; 284 } 285 286 // set cookie from cookie store 287 if(originalRequest.isCookieStoreEnabled()) { 288 CookieStore store = client.getCookieStore(); 289 HttpFields fields = originalRequest.getFields(); 290 291 if(store !is null && fields !is null) { 292 auto cookies = store.getCookies(); 293 294 if(cookies !is null) 295 fields.put(HttpHeader.COOKIE, generateCookies(store.getCookies())); 296 } 297 } 298 299 // opentracing: initialize TraceContext 300 301 302 FuturePromise!HttpClientConnection promise = new FuturePromise!HttpClientConnection(); 303 HttpConnection connection; 304 try { 305 NetClientOptions tcpConfig = cast(NetClientOptions)client.getHttpOptions().getTcpConfiguration(); 306 client.connect(uri.getHost(), port, promise); 307 connection = promise.get(tcpConfig.getConnectTimeout()); 308 } catch(Exception ex) { 309 string msg = "Failed to open " ~ uri.toString(); 310 version(HUNT_DEBUG) { 311 warning(msg, ". The reason: ", ex.msg); 312 } 313 version(HUNT_HTTP_DEBUG) warning(ex); 314 if(!client.isStopped()) 315 client.close(); 316 317 version(WITH_HUNT_TRACE) { 318 originalRequest.endTraceSpan(HttpStatus.INTERNAL_SERVER_ERROR_500, msg); 319 } 320 throw new IOException(msg); 321 } 322 323 version (HUNT_HTTP_DEBUG) info(connection.getHttpVersion()); 324 325 if (connection.getHttpVersion() == HttpVersion.HTTP_1_1) { 326 327 Http1ClientConnection http1ClientConnection = cast(Http1ClientConnection) connection; 328 // HttpBody rb = originalRequest.getBody(); 329 if(HttpMethod.permitsRequestBody(originalRequest.getMethod())) { // && rb !is null 330 // http1ClientConnection.send(originalRequest, rb.content(), httpHandler); 331 HttpOutputStream output = http1ClientConnection.getHttpOutputStream(originalRequest, httpHandler); 332 // rb.writeTo(output); 333 output.close(); // End a request, and keep the connection for waiting for the respons. 334 } else { 335 http1ClientConnection.send(originalRequest, httpHandler); 336 } 337 } else { 338 // TODO: Tasks pending completion -@zxp at 6/4/2019, 5:55:40 PM 339 // 340 string msg = "Unsupported " ~ connection.getHttpVersion().toString(); 341 342 version(WITH_HUNT_TRACE) { 343 originalRequest.endTraceSpan(HttpStatus.INTERNAL_SERVER_ERROR_500, msg); 344 } 345 throw new IOException(msg); 346 } 347 348 version (HUNT_HTTP_DEBUG) info("waitting response..."); 349 } 350 351 352 void cancel() { 353 implementationMissing(false); 354 // transmitter.cancel(); 355 } 356 357 // Timeout timeout() { 358 // return transmitter.timeout(); 359 // } 360 361 bool isExecuted() { 362 return executed; 363 } 364 365 bool isCanceled() { 366 // return transmitter.isCanceled(); 367 368 return false; 369 } 370 371 }