1 module hunt.http.client.HttpClient; 2 3 import hunt.http.client.Call; 4 import hunt.http.client.ClientHttp2SessionListener; 5 import hunt.http.client.ClientHttpHandler; 6 import hunt.http.client.CookieStore; 7 import hunt.http.client.HttpClientConnection; 8 import hunt.http.client.HttpClientHandler; 9 import hunt.http.client.Http1ClientDecoder; 10 import hunt.http.client.HttpClientContext; 11 import hunt.http.client.Http2ClientDecoder; 12 import hunt.http.client.HttpClientOptions; 13 import hunt.http.client.HttpClientResponse; 14 import hunt.http.client.HttpClientRequest; 15 import hunt.http.client.InMemoryCookieStore; 16 import hunt.http.client.RealCall; 17 18 import hunt.http.codec.CommonDecoder; 19 import hunt.http.codec.CommonEncoder; 20 import hunt.http.codec.websocket.decode.WebSocketDecoder; 21 import hunt.http.codec.websocket.frame.DataFrame; 22 23 import hunt.http.HttpBody; 24 import hunt.http.HttpConnection; 25 import hunt.http.HttpOptions; 26 import hunt.http.HttpOutputStream; 27 import hunt.http.HttpRequest; 28 import hunt.http.HttpResponse; 29 import hunt.http.WebSocketConnection; 30 import hunt.http.WebSocketFrame; 31 import hunt.http.WebSocketPolicy; 32 33 import hunt.Exceptions; 34 import hunt.concurrency.Future; 35 import hunt.concurrency.FuturePromise; 36 import hunt.concurrency.Promise; 37 38 import hunt.io.BufferUtils; 39 import hunt.io.ByteBuffer; 40 import hunt.collection.Map; 41 import hunt.collection.HashMap; 42 43 import hunt.logging; 44 import hunt.net; 45 import hunt.util.AbstractLifecycle; 46 47 import core.atomic; 48 import core.time; 49 50 /** 51 * 52 */ 53 class HttpClient : AbstractLifecycle { 54 55 enum Duration DEFAULT_IDLE_TIMEOUT = 15.seconds; 56 enum Duration DEFAULT_CONNECT_TIMEOUT = 10.seconds; 57 58 alias Callback = void delegate(); 59 60 private NetClientOptions clientOptions; 61 private NetClient[int] _netClients; 62 private HttpClientOptions _httpOptions; 63 private Callback _onClosed = null; 64 private CookieStore _cookieStore; 65 66 this() { 67 clientOptions = new NetClientOptions(); 68 clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT); 69 clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT); 70 HttpClientOptions config = new HttpClientOptions(clientOptions); 71 this(config); 72 } 73 74 this(HttpClientOptions c) { 75 if (c is null) { 76 throw new IllegalArgumentException("http configuration is null"); 77 } 78 79 clientOptions = c.getTcpConfiguration(); 80 if(clientOptions is null) { 81 clientOptions = new NetClientOptions(); 82 clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT); 83 clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT); 84 c.setTcpConfiguration(clientOptions); 85 } else { 86 if(clientOptions.getIdleTimeout == Duration.zero) 87 clientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT); 88 89 if(clientOptions.getConnectTimeout == Duration.zero) 90 clientOptions.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT); 91 } 92 93 this._httpOptions = c; 94 95 start(); 96 } 97 98 Future!(HttpClientConnection) connect(string host, int port) { 99 FuturePromise!(HttpClientConnection) completable = new FuturePromise!(HttpClientConnection)("httpclient"); 100 connect(host, port, completable); 101 return completable; 102 } 103 104 void connect(string host, int port, Promise!(HttpClientConnection) promise) { 105 connect(host, port, promise, new ClientHttp2SessionListener()); 106 } 107 108 void connect(string host, int port, Promise!(HttpClientConnection) promise, 109 ClientHttp2SessionListener listener) { 110 111 HttpClientContext clientContext = new HttpClientContext(); 112 clientContext.setPromise(promise); 113 clientContext.setListener(listener); 114 115 NetClient client = NetUtil.createNetClient(clientOptions); 116 int clientId = client.getId(); 117 if(clientId in _netClients) { 118 warningf("clientId existes: %d", clientId); 119 } 120 _netClients[client.getId()] = client; 121 122 client.setCodec(new class Codec { 123 124 private CommonEncoder encoder; 125 private CommonDecoder decoder; 126 127 this() { 128 encoder = new CommonEncoder(); 129 130 Http1ClientDecoder httpClientDecoder = new Http1ClientDecoder( 131 new WebSocketDecoder(), 132 new Http2ClientDecoder()); 133 decoder = new CommonDecoder(httpClientDecoder); 134 } 135 136 Encoder getEncoder() { 137 return encoder; 138 } 139 140 Decoder getDecoder() { 141 return decoder; 142 } 143 }) 144 .setHandler(new HttpClientHandler(_httpOptions, clientContext)); 145 146 client.onClosed(_onClosed); 147 client.connect(host, port); 148 // _isConnected = client.isConnected(); 149 } 150 151 HttpClientOptions getHttpOptions() { 152 return _httpOptions; 153 } 154 155 void close() { 156 stop(); 157 } 158 159 void setOnClosed(Callback callback) { 160 _onClosed = callback; 161 } 162 163 override protected void initialize() { 164 // do nothing 165 } 166 167 override void destroy() { 168 foreach(NetClient client; _netClients) { 169 client.close(); 170 } 171 172 _netClients = null; 173 } 174 175 deprecated("Unsupported anymore!") 176 bool isConnected() { 177 // return _isConnected; 178 return false; 179 } 180 181 /** 182 * Sets the handler that can accept cookies from incoming HTTP responses and provides cookies to 183 * outgoing HTTP requests. 184 * 185 * <p>If unset, {@linkplain CookieStore#NO_COOKIES no cookies} will be accepted nor provided. 186 */ 187 HttpClient useCookieStore() { 188 _cookieStore = new InMemoryCookieStore(); 189 return this; 190 } 191 192 /** 193 * 194 * Params: 195 * store = 196 * Returns: 197 */ 198 HttpClient useCookieStore(CookieStore store) { 199 if (store is null) throw new NullPointerException("CookieStore is null"); 200 _cookieStore = store; 201 return this; 202 } 203 204 CookieStore getCookieStore() { 205 return _cookieStore; 206 } 207 208 /** 209 * Prepares the {@code request} to be executed at some point in the future. 210 */ 211 Call newCall(Request request) { 212 return RealCall.newRealCall(this, request, false /* for web socket */); 213 } 214 215 /** 216 * Uses {@code request} to connect a new web socket. 217 */ 218 WebSocketConnection newWebSocket(Request request, WebSocketMessageHandler handler) { 219 assert(handler !is null); 220 221 // import core.atomic; 222 // import core.sync.condition; 223 // import core.sync.mutex; 224 225 // Mutex responseLocker = new Mutex(); 226 // Condition responseCondition = new Condition(responseLocker); 227 228 HttpURI uri = request.getURI(); 229 string scheme = uri.getScheme(); 230 string host = uri.getHost(); 231 int port = uri.getPort(); 232 233 234 // responseLocker.lock(); 235 // scope(exit) { 236 // responseLocker.unlock(); 237 // } 238 239 Future!(HttpClientConnection) conn = connect(host, port); 240 241 TcpSslOptions tcpOptions = _httpOptions.getTcpConfiguration(); 242 Duration idleTimeout = tcpOptions.getIdleTimeout(); 243 244 HttpClientConnection connection = conn.get(); 245 assert(connection !is null); 246 247 FuturePromise!WebSocketConnection promise = new FuturePromise!WebSocketConnection(); 248 WebSocketConnection webSocket; 249 250 AbstractClientHttpHandler httpHandler = new class AbstractClientHttpHandler { 251 override bool messageComplete(HttpRequest request, 252 HttpResponse response, HttpOutputStream output, HttpConnection connection) { 253 version(HUNT_HTTP_DEBUG) tracef("Upgrade to WebSocket successfully: " ~ response.toString()); 254 return true; 255 } 256 }; 257 258 IncomingFrames incomingFrames = new class IncomingFrames { 259 void incomingError(Exception ex) { 260 version(HUNT_DEBUG) warningf(ex.msg); 261 handler.onError(webSocket, ex); 262 } 263 264 void incomingFrame(WebSocketFrame frame) { 265 WebSocketFrameType type = frame.getType(); 266 version(HUNT_HTTP_DEBUG) tracef("new frame comming: %s", type); 267 switch (type) { 268 case WebSocketFrameType.TEXT: 269 handler.onText(webSocket, (cast(DataFrame) frame).getPayloadAsUTF8()); 270 break; 271 272 case WebSocketFrameType.BINARY: 273 handler.onBinary(webSocket, frame.getPayload()); 274 break; 275 276 case WebSocketFrameType.CLOSE: 277 handler.onClosed(webSocket); 278 break; 279 280 case WebSocketFrameType.PING: 281 handler.onPing(webSocket); 282 break; 283 284 case WebSocketFrameType.PONG: 285 handler.onPong(webSocket); 286 break; 287 288 case WebSocketFrameType.CONTINUATION: 289 handler.onContinuation(webSocket, frame.getPayload()); 290 break; 291 292 default: 293 warningf("Can't handle the frame of ", type); 294 break; 295 } 296 } 297 }; 298 299 connection.upgradeWebSocket(request, WebSocketPolicy.newClientPolicy(), 300 promise, httpHandler, incomingFrames); 301 302 if(idleTimeout.isNegative()) { 303 version (HUNT_HTTP_DEBUG) infof("waitting for response..."); 304 webSocket = promise.get(); 305 handler.onOpen(webSocket); 306 } else { 307 version (HUNT_HTTP_DEBUG) infof("waitting for response in %s ...", idleTimeout); 308 try { 309 webSocket = promise.get(idleTimeout); 310 handler.onOpen(webSocket); 311 } catch(Exception ex ) { 312 version(HUNT_HTTP_DEBUG) warningf(ex.msg); 313 handler.onError(webSocket, ex); 314 } 315 } 316 return webSocket; 317 } 318 319 }