1 module hunt.http.client.HttpClient; 2 3 import hunt.http.client.Call; 4 import hunt.http.client.ClientHttp2SessionListener; 5 import hunt.http.client.ClientHttpHandler; 6 import hunt.http.client.CookieStore; 7 import hunt.http.client.HttpClientConnection; 8 import hunt.http.client.HttpClientHandler; 9 import hunt.http.client.Http1ClientDecoder; 10 import hunt.http.client.HttpClientContext; 11 import hunt.http.client.Http2ClientDecoder; 12 import hunt.http.client.HttpClientOptions; 13 import hunt.http.client.HttpClientResponse; 14 import hunt.http.client.HttpClientRequest; 15 import hunt.http.client.InMemoryCookieStore; 16 import hunt.http.client.RealCall; 17 18 import hunt.http.codec.CommonDecoder; 19 import hunt.http.codec.CommonEncoder; 20 import hunt.http.codec.websocket.decode.WebSocketDecoder; 21 import hunt.http.codec.websocket.frame.DataFrame; 22 23 import hunt.http.HttpBody; 24 import hunt.http.HttpConnection; 25 import hunt.http.HttpOptions; 26 import hunt.http.HttpOutputStream; 27 import hunt.http.HttpRequest; 28 import hunt.http.HttpResponse; 29 import hunt.http.WebSocketConnection; 30 import hunt.http.WebSocketFrame; 31 import hunt.http.WebSocketPolicy; 32 33 import hunt.Exceptions; 34 import hunt.concurrency.Future; 35 import hunt.concurrency.FuturePromise; 36 import hunt.concurrency.Promise; 37 38 import hunt.io.BufferUtils; 39 import hunt.io.ByteBuffer; 40 import hunt.collection.Map; 41 import hunt.collection.HashMap; 42 43 import hunt.logging.ConsoleLogger; 44 import hunt.net; 45 import hunt.util.AbstractLifecycle; 46 47 import core.atomic; 48 import core.time; 49 50 /** 51 * 52 */ 53 class HttpClient : AbstractLifecycle { 54 55 enum Duration DEFAULT_IDLE_TIMEOUT = 15.seconds; 56 enum Duration DEFAULT_CONNECT_TIMEOUT = 10.seconds; 57 58 alias Callback = void delegate(); 59 60 private NetClientOptions clientOptions; 61 private NetClient[int] _netClients; 62 private HttpClientOptions _httpOptions; 63 private Callback _onClosed = null; 64 private CookieStore _cookieStore; 65 66 this() { 67 clientOptions = new NetClientOptions(); 68 clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT); 69 clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT); 70 HttpClientOptions config = new HttpClientOptions(clientOptions); 71 this(config); 72 } 73 74 this(HttpClientOptions c) { 75 if (c is null) { 76 throw new IllegalArgumentException("http configuration is null"); 77 } 78 79 clientOptions = c.getTcpConfiguration(); 80 if(clientOptions is null) { 81 clientOptions = new NetClientOptions(); 82 clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT); 83 clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT); 84 c.setTcpConfiguration(clientOptions); 85 } else { 86 if(clientOptions.getIdleTimeout == Duration.zero) 87 clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT); 88 89 if(clientOptions.getConnectTimeout == Duration.zero) 90 clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT); 91 } 92 93 this._httpOptions = c; 94 95 start(); 96 } 97 98 Future!(HttpClientConnection) connect(string host, int port) { 99 FuturePromise!(HttpClientConnection) completable = new FuturePromise!(HttpClientConnection)(); 100 completable.id = "httpclient"; 101 connect(host, port, completable); 102 return completable; 103 } 104 105 void connect(string host, int port, Promise!(HttpClientConnection) promise) { 106 connect(host, port, promise, new ClientHttp2SessionListener()); 107 } 108 109 void connect(string host, int port, Promise!(HttpClientConnection) promise, 110 ClientHttp2SessionListener listener) { 111 112 HttpClientContext clientContext = new HttpClientContext(); 113 clientContext.setPromise(promise); 114 clientContext.setListener(listener); 115 116 NetClient client = NetUtil.createNetClient(clientOptions); 117 int clientId = client.getId(); 118 if(clientId in _netClients) { 119 warningf("clientId existes: %d", clientId); 120 } 121 _netClients[client.getId()] = client; 122 123 client.setCodec(new class Codec { 124 125 private CommonEncoder encoder; 126 private CommonDecoder decoder; 127 128 this() { 129 encoder = new CommonEncoder(); 130 131 Http1ClientDecoder httpClientDecoder = new Http1ClientDecoder( 132 new WebSocketDecoder(), 133 new Http2ClientDecoder()); 134 decoder = new CommonDecoder(httpClientDecoder); 135 } 136 137 Encoder getEncoder() { 138 return encoder; 139 } 140 141 Decoder getDecoder() { 142 return decoder; 143 } 144 }) 145 .setHandler(new HttpClientHandler(_httpOptions, clientContext)); 146 147 client.onClosed(_onClosed); 148 client.connect(host, port); 149 // _isConnected = client.isConnected(); 150 } 151 152 HttpClientOptions getHttpOptions() { 153 return _httpOptions; 154 } 155 156 void close() { 157 stop(); 158 } 159 160 void setOnClosed(Callback callback) { 161 _onClosed = callback; 162 } 163 164 override protected void initialize() { 165 // do nothing 166 } 167 168 override void destroy() { 169 foreach(NetClient client; _netClients) { 170 client.close(); 171 } 172 173 _netClients = null; 174 } 175 176 deprecated("Unsupported anymore!") 177 bool isConnected() { 178 // return _isConnected; 179 return false; 180 } 181 182 /** 183 * Sets the handler that can accept cookies from incoming HTTP responses and provides cookies to 184 * outgoing HTTP requests. 185 * 186 * <p>If unset, {@linkplain CookieStore#NO_COOKIES no cookies} will be accepted nor provided. 187 */ 188 HttpClient useCookieStore() { 189 _cookieStore = new InMemoryCookieStore(); 190 return this; 191 } 192 193 /** 194 * 195 * Params: 196 * store = 197 * Returns: 198 */ 199 HttpClient useCookieStore(CookieStore store) { 200 if (store is null) throw new NullPointerException("CookieStore is null"); 201 _cookieStore = store; 202 return this; 203 } 204 205 CookieStore getCookieStore() { 206 return _cookieStore; 207 } 208 209 /** 210 * Prepares the {@code request} to be executed at some point in the future. 211 */ 212 Call newCall(Request request) { 213 return RealCall.newRealCall(this, request, false /* for web socket */); 214 } 215 216 /** 217 * Uses {@code request} to connect a new web socket. 218 */ 219 WebSocketConnection newWebSocket(Request request, WebSocketMessageHandler handler) { 220 assert(handler !is null); 221 222 // import core.atomic; 223 // import core.sync.condition; 224 // import core.sync.mutex; 225 226 // Mutex responseLocker = new Mutex(); 227 // Condition responseCondition = new Condition(responseLocker); 228 229 HttpURI uri = request.getURI(); 230 string scheme = uri.getScheme(); 231 string host = uri.getHost(); 232 int port = uri.getPort(); 233 234 235 // responseLocker.lock(); 236 // scope(exit) { 237 // responseLocker.unlock(); 238 // } 239 240 Future!(HttpClientConnection) conn = connect(host, port); 241 242 TcpSslOptions tcpOptions = _httpOptions.getTcpConfiguration(); 243 Duration idleTimeout = tcpOptions.getIdleTimeout(); 244 245 HttpClientConnection connection = conn.get(); 246 assert(connection !is null); 247 248 FuturePromise!WebSocketConnection promise = new FuturePromise!WebSocketConnection(); 249 WebSocketConnection webSocket; 250 251 AbstractClientHttpHandler httpHandler = new class AbstractClientHttpHandler { 252 override bool messageComplete(HttpRequest request, 253 HttpResponse response, HttpOutputStream output, HttpConnection connection) { 254 version(HUNT_HTTP_DEBUG) tracef("Upgrade to WebSocket successfully: " ~ response.toString()); 255 return true; 256 } 257 }; 258 259 IncomingFrames incomingFrames = new class IncomingFrames { 260 void incomingError(Exception ex) { 261 version(HUNT_DEBUG) warningf(ex.msg); 262 handler.onError(webSocket, ex); 263 } 264 265 void incomingFrame(WebSocketFrame frame) { 266 WebSocketFrameType type = frame.getType(); 267 version(HUNT_HTTP_DEBUG) tracef("new frame comming: %s", type); 268 switch (type) { 269 case WebSocketFrameType.TEXT: 270 handler.onText(webSocket, (cast(DataFrame) frame).getPayloadAsUTF8()); 271 break; 272 273 case WebSocketFrameType.BINARY: 274 handler.onBinary(webSocket, frame.getPayload()); 275 break; 276 277 case WebSocketFrameType.CLOSE: 278 handler.onClosed(webSocket); 279 break; 280 281 case WebSocketFrameType.PING: 282 handler.onPing(webSocket); 283 break; 284 285 case WebSocketFrameType.PONG: 286 handler.onPong(webSocket); 287 break; 288 289 case WebSocketFrameType.CONTINUATION: 290 handler.onContinuation(webSocket, frame.getPayload()); 291 break; 292 293 default: 294 warningf("Can't handle the frame of ", type); 295 break; 296 } 297 } 298 }; 299 300 connection.upgradeWebSocket(request, WebSocketPolicy.newClientPolicy(), 301 promise, httpHandler, incomingFrames); 302 303 if(idleTimeout.isNegative()) { 304 version (HUNT_HTTP_DEBUG) infof("waitting for response..."); 305 webSocket = promise.get(); 306 handler.onOpen(webSocket); 307 } else { 308 version (HUNT_HTTP_DEBUG) infof("waitting for response in %s ...", idleTimeout); 309 try { 310 webSocket = promise.get(idleTimeout); 311 handler.onOpen(webSocket); 312 } catch(Exception ex ) { 313 version(HUNT_HTTP_DEBUG) warningf(ex.msg); 314 handler.onError(webSocket, ex); 315 } 316 } 317 return webSocket; 318 } 319 320 }