1 module hunt.http.client.HttpClient; 2 3 import hunt.http.client.ClientHttp2SessionListener; 4 import hunt.http.client.HttpClientConnection; 5 import hunt.http.client.Http1ClientDecoder; 6 import hunt.http.client.Http2ClientContext; 7 import hunt.http.client.Http2ClientDecoder; 8 import hunt.http.client.Http2ClientHandler; 9 10 import hunt.http.codec.CommonDecoder; 11 import hunt.http.codec.CommonEncoder; 12 import hunt.http.codec.http.stream.Http2Configuration; 13 import hunt.http.codec.websocket.decode.WebSocketDecoder; 14 15 import hunt.util.Lifecycle; 16 import hunt.lang.exception; 17 import hunt.util.concurrent.CompletableFuture;; 18 import hunt.util.concurrent.Promise; 19 20 import hunt.container.ByteBuffer; 21 import hunt.container.Map; 22 import hunt.container.HashMap; 23 24 import hunt.net.AsynchronousTcpSession; 25 import hunt.net.Client; 26 import hunt.net; 27 28 import hunt.logging; 29 30 class HttpClient : AbstractLifecycle { 31 32 private AbstractClient client; 33 private Map!(int, Http2ClientContext) http2ClientContext; // = new ConcurrentHashMap!()(); 34 private __gshared static int sessionId = 0; // new int(0); 35 private Http2Configuration http2Configuration; 36 37 this(Http2Configuration c) { 38 if (c is null) { 39 throw new IllegalArgumentException("the http2 configuration is null"); 40 } 41 http2ClientContext = new HashMap!(int, Http2ClientContext)(); 42 43 Http1ClientDecoder httpClientDecoder = new Http1ClientDecoder(new WebSocketDecoder(), new Http2ClientDecoder()); 44 CommonDecoder commonDecoder = new CommonDecoder(httpClientDecoder); 45 46 c.getTcpConfiguration().setDecoder(commonDecoder); 47 c.getTcpConfiguration().setEncoder(new CommonEncoder()); 48 c.getTcpConfiguration().setHandler(new Http2ClientHandler(c, http2ClientContext)); 49 50 NetClient client = NetUtil.createNetClient(); 51 this.client = client; 52 // this.client = new AsynchronousTcpClient(c.getTcpConfiguration()); 53 client.setConfig(c.getTcpConfiguration()); 54 55 client.connectHandler((NetSocket sock){ 56 infof("A connection created with %s:%d", _host, _port); 57 AsynchronousTcpSession session = cast(AsynchronousTcpSession)sock; 58 59 session.handler( (const ubyte[] data) { 60 infof("data received (%d bytes): ", data.length); 61 version(HUNT_DEBUG) { 62 if(data.length<=64) 63 infof("%(%02X %)", data[0 .. $]); 64 else 65 { 66 infof("%(%02X %) ...", data[0 .. 64]); 67 // debug { infof("%(%02X %)", data[0 .. $]); } 68 // else{ infof("%(%02X %) ...", data[0 .. 64]); } 69 } 70 } 71 72 ByteBuffer buf = ByteBuffer.wrap(cast(byte[])data); 73 commonDecoder.decode(buf, session); 74 // httpClientDecoder.decode(buf, session); 75 } 76 ); 77 }); 78 79 this.http2Configuration = c; 80 } 81 82 Completable!(HttpClientConnection) connect(string host, int port) { 83 Completable!(HttpClientConnection) completable = new Completable!(HttpClientConnection)(); 84 completable.id = "http2client"; 85 connect(host, port, completable); 86 return completable; 87 } 88 89 void connect(string host, int port, Promise!(HttpClientConnection) promise) { 90 connect(host, port, promise, new ClientHttp2SessionListener()); 91 } 92 93 void connect(string host, int port, Promise!(HttpClientConnection) promise, ClientHttp2SessionListener listener) { 94 _host = host; 95 _port = port; 96 start(); 97 clientContext = new Http2ClientContext(); 98 clientContext.setPromise(promise); 99 clientContext.setListener(listener); 100 int id = sessionId++; 101 version(HUNT_DEBUG) tracef("Client sessionId = %d", id); 102 http2ClientContext.put(id, clientContext); 103 client.connect(host, port, id); 104 } 105 106 Http2ClientContext clientContext; 107 108 private string _host; 109 private int _port; 110 111 Http2Configuration getHttp2Configuration() { 112 return http2Configuration; 113 } 114 115 override 116 protected void initialize() { 117 } 118 119 override 120 protected void destroy() { 121 if (client !is null) { 122 client.stop(); 123 } 124 } 125 126 }