1 module hunt.http.client.Http2ClientResponseHandler; 2 3 import hunt.http.client.ClientHttpHandler; 4 import hunt.http.client.HttpClientConnection; 5 6 import hunt.http.codec.http.frame; 7 import hunt.http.codec.http.model.HttpStatus; 8 import hunt.http.codec.http.model.MetaData; 9 import hunt.http.codec.http.stream.AbstractHttp2OutputStream; 10 import hunt.http.codec.http.stream.DataFrameHandler; 11 import hunt.http.codec.http.stream.HttpOutputStream; 12 import hunt.http.codec.http.stream.Stream; 13 14 import hunt.container.LinkedList; 15 16 import hunt.logging; 17 import hunt.lang.common; 18 import hunt.lang.exception; 19 import hunt.util.functional; 20 import hunt.util.concurrent.Promise; 21 22 import std.conv; 23 import std.string; 24 25 alias Request = HttpRequest; 26 27 class Http2ClientResponseHandler : Stream.Listener.Adapter { // , Runnable 28 29 enum string OUTPUT_STREAM_KEY = "_outputStream"; 30 enum string RESPONSE_KEY = "_response"; 31 enum string RUN_TASK = "_runTask"; 32 33 private Request request; 34 private ClientHttpHandler handler; 35 private HttpClientConnection connection; 36 private LinkedList!(ReceivedFrame) receivedFrames; // = new LinkedList!()(); 37 38 this(Request request, ClientHttpHandler handler, HttpClientConnection connection) { 39 this.request = request; 40 this.handler = handler; 41 this.connection = connection; 42 receivedFrames = new LinkedList!(ReceivedFrame)(); 43 } 44 45 override 46 void onHeaders(Stream stream, HeadersFrame headersFrame) { 47 // Wait the stream is created. 48 receivedFrames.add(new ReceivedFrame(stream, headersFrame, Callback.NOOP)); 49 onFrames(stream); 50 } 51 52 override 53 void onData(Stream stream, DataFrame dataFrame, Callback callback) { 54 receivedFrames.add(new ReceivedFrame(stream, dataFrame, callback)); 55 onFrames(stream); 56 } 57 58 // override 59 void run() { 60 ReceivedFrame receivedFrame; 61 while ((receivedFrame = receivedFrames.poll()) !is null) { 62 onReceivedFrame(receivedFrame); 63 } 64 } 65 66 private void onFrames(Stream stream) { 67 HttpOutputStream output = getOutputStream(stream); 68 if (output !is null) { // the stream is created completely 69 run(); 70 } else { 71 stream.setAttribute(RUN_TASK, this); 72 } 73 } 74 75 private void onReceivedFrame(ReceivedFrame receivedFrame) { 76 Stream stream = receivedFrame.getStream(); 77 HttpOutputStream output = getOutputStream(stream); 78 79 switch (receivedFrame.getFrame().getType()) { 80 case FrameType.HEADERS: { 81 HeadersFrame headersFrame = cast(HeadersFrame) receivedFrame.getFrame(); 82 if (headersFrame.getMetaData() is null) { 83 throw new IllegalArgumentException("the stream " ~ stream.getId().to!string() ~ " received a null meta data"); 84 } 85 86 if (headersFrame.getMetaData().isResponse()) { 87 HttpResponse response = cast(HttpResponse) headersFrame.getMetaData(); 88 89 if (response.getStatus() == HttpStatus.CONTINUE_100) { 90 handler.continueToSendData(request, response, output, connection); 91 } else { 92 stream.setAttribute(RESPONSE_KEY, response); 93 handler.headerComplete(request, response, output, connection); 94 if (headersFrame.isEndStream()) { 95 handler.messageComplete(request, response, output, connection); 96 } 97 } 98 } else { 99 if (headersFrame.isEndStream()) { 100 HttpResponse response = getResponse(stream); 101 102 response.setTrailerSupplier(() => headersFrame.getMetaData().getFields()); 103 handler.contentComplete(request, response, output, connection); 104 handler.messageComplete(request, response, output, connection); 105 } else { 106 throw new IllegalArgumentException("the stream " ~ stream.getId().to!string() ~ " received illegal meta data"); 107 } 108 } 109 } 110 break; 111 112 case FrameType.DATA: { 113 DataFrame dataFrame = cast(DataFrame) receivedFrame.getFrame(); 114 Callback callback = receivedFrame.getCallback(); 115 HttpResponse response = getResponse(stream); 116 117 DataFrameHandler.handleDataFrame(dataFrame, callback, request, response, output, connection, handler); 118 } 119 break; 120 121 default: break; 122 } 123 } 124 125 override 126 void onReset(Stream stream, ResetFrame frame) { 127 // writeln("Client received reset frame: " ~ stream ~ ", " ~ frame); 128 HttpOutputStream output = getOutputStream(stream); 129 HttpResponse response = getResponse(stream); 130 131 int errorCode = frame.getError(); 132 string reason = isValidErrorCode(errorCode) ? (cast(ErrorCode)errorCode).to!string().toLower() : "error=" ~ errorCode.to!string(); 133 int status = HttpStatus.INTERNAL_SERVER_ERROR_500; 134 135 if(errorCode == ErrorCode.PROTOCOL_ERROR) 136 status = HttpStatus.BAD_REQUEST_400; 137 138 // if (isValidErrorCode(errorCode)) { 139 // switch (errorCode) { 140 // case ErrorCode.PROTOCOL_ERROR: 141 // status = HttpStatus.BAD_REQUEST_400; 142 // break; 143 // default: 144 // status = HttpStatus.INTERNAL_SERVER_ERROR_500; 145 // break; 146 // } 147 // } 148 handler.badMessage(status, reason, request, response, output, connection); 149 } 150 151 private HttpOutputStream getOutputStream(Stream stream) { 152 return cast(HttpOutputStream) stream.getAttribute(OUTPUT_STREAM_KEY); 153 } 154 155 private HttpResponse getResponse(Stream stream) { 156 return cast(HttpResponse) stream.getAttribute(RESPONSE_KEY); 157 } 158 159 static class ReceivedFrame { 160 private Stream stream; 161 private Frame frame; 162 private Callback callback; 163 164 this(Stream stream, Frame frame, Callback callback) { 165 this.stream = stream; 166 this.frame = frame; 167 this.callback = callback; 168 } 169 170 Stream getStream() { 171 return stream; 172 } 173 174 Frame getFrame() { 175 return frame; 176 } 177 178 Callback getCallback() { 179 return callback; 180 } 181 } 182 183 static class ClientHttp2OutputStream : AbstractHttp2OutputStream { 184 185 private Stream stream; 186 187 this(MetaData info, Stream stream) { 188 super(info, true); 189 committed = true; 190 this.stream = stream; 191 } 192 193 override 194 protected Stream getStream() { 195 return stream; 196 } 197 } 198 199 static class ClientStreamPromise : Promise!(Stream) { 200 201 private Request request; 202 private Promise!(HttpOutputStream) promise; 203 204 this(Request request, Promise!(HttpOutputStream) promise) { 205 this.request = request; 206 this.promise = promise; 207 } 208 209 void succeeded(Stream stream) { 210 version(HUNT_DEBUG) { 211 tracef("create a new stream %s", stream.getId()); 212 } 213 214 ClientHttp2OutputStream output = new ClientHttp2OutputStream(request, stream); 215 stream.setAttribute(OUTPUT_STREAM_KEY, output); 216 217 Runnable r = cast(Runnable) stream.getAttribute(RUN_TASK); 218 if(r !is null) 219 r.run(); 220 221 // Optional.ofNullable(cast(Runnable) stream.getAttribute(RUN_TASK)) 222 // .ifPresent(Runnable::run); 223 promise.succeeded(output); 224 } 225 226 void failed(Exception x) { 227 promise.failed(x); 228 errorf("client creates stream unsuccessfully", x); 229 } 230 231 string id() { return "undefined"; } 232 233 } 234 }