1 module hunt.http.client.Http2ClientResponseHandler; 2 3 import hunt.http.client.ClientHttpHandler; 4 import hunt.http.client.HttpClientConnection; 5 6 import hunt.http.codec.http.frame; 7 import hunt.http.codec.http.stream.AbstractHttp2OutputStream; 8 import hunt.http.codec.http.stream.DataFrameHandler; 9 import hunt.http.HttpOutputStream; 10 import hunt.http.codec.http.stream.Stream; 11 12 import hunt.http.HttpMetaData; 13 import hunt.http.HttpStatus; 14 import hunt.http.HttpRequest; 15 import hunt.http.HttpResponse; 16 17 import hunt.collection.LinkedList; 18 import hunt.concurrency.Promise; 19 import hunt.logging; 20 import hunt.Exceptions; 21 import hunt.util.Common; 22 import hunt.util.Runnable; 23 24 import std.conv; 25 import std.string; 26 27 28 class Http2ClientResponseHandler : Stream.Listener.Adapter { // , Runnable 29 30 enum string OUTPUT_STREAM_KEY = "_outputStream"; 31 enum string RESPONSE_KEY = "_response"; 32 enum string RUN_TASK = "_runTask"; 33 34 private HttpRequest request; 35 private ClientHttpHandler handler; 36 private HttpClientConnection connection; 37 private LinkedList!(ReceivedFrame) receivedFrames; // = new LinkedList!()(); 38 39 this(HttpRequest request, ClientHttpHandler handler, HttpClientConnection connection) { 40 this.request = request; 41 this.handler = handler; 42 this.connection = connection; 43 receivedFrames = new LinkedList!(ReceivedFrame)(); 44 } 45 46 override 47 void onHeaders(Stream stream, HeadersFrame headersFrame) { 48 // Wait the stream is created. 49 receivedFrames.add(new ReceivedFrame(stream, headersFrame, Callback.NOOP)); 50 onFrames(stream); 51 } 52 53 override 54 void onData(Stream stream, DataFrame dataFrame, Callback callback) { 55 receivedFrames.add(new ReceivedFrame(stream, dataFrame, callback)); 56 onFrames(stream); 57 } 58 59 // override 60 void run() { 61 ReceivedFrame receivedFrame; 62 while ((receivedFrame = receivedFrames.poll()) !is null) { 63 onReceivedFrame(receivedFrame); 64 } 65 } 66 67 private void onFrames(Stream stream) { 68 HttpOutputStream output = getOutputStream(stream); 69 if (output !is null) { // the stream is created completely 70 run(); 71 } else { 72 stream.setAttribute(RUN_TASK, this); 73 } 74 } 75 76 private void onReceivedFrame(ReceivedFrame receivedFrame) { 77 Stream stream = receivedFrame.getStream(); 78 HttpOutputStream output = getOutputStream(stream); 79 80 switch (receivedFrame.getFrame().getType()) { 81 case FrameType.HEADERS: { 82 HeadersFrame headersFrame = cast(HeadersFrame) receivedFrame.getFrame(); 83 if (headersFrame.getMetaData() is null) { 84 throw new IllegalArgumentException("the stream " ~ stream.getId().to!string() ~ " received a null meta data"); 85 } 86 87 if (headersFrame.getMetaData().isResponse()) { 88 HttpResponse response = cast(HttpResponse) headersFrame.getMetaData(); 89 90 if (response.getStatus() == HttpStatus.CONTINUE_100) { 91 handler.continueToSendData(request, response, output, connection); 92 } else { 93 stream.setAttribute(RESPONSE_KEY, response); 94 handler.headerComplete(request, response, output, connection); 95 if (headersFrame.isEndStream()) { 96 handler.messageComplete(request, response, output, connection); 97 } 98 } 99 } else { 100 if (headersFrame.isEndStream()) { 101 HttpResponse response = getResponse(stream); 102 103 response.setTrailerSupplier(() => headersFrame.getMetaData().getFields()); 104 handler.contentComplete(request, response, output, connection); 105 handler.messageComplete(request, response, output, connection); 106 } else { 107 throw new IllegalArgumentException("the stream " ~ stream.getId().to!string() ~ " received illegal meta data"); 108 } 109 } 110 } 111 break; 112 113 case FrameType.DATA: { 114 DataFrame dataFrame = cast(DataFrame) receivedFrame.getFrame(); 115 Callback callback = receivedFrame.getCallback(); 116 HttpResponse response = getResponse(stream); 117 118 DataFrameHandler.handleDataFrame(dataFrame, callback, request, response, output, connection, handler); 119 } 120 break; 121 122 default: break; 123 } 124 } 125 126 override 127 void onReset(Stream stream, ResetFrame frame) { 128 // writeln("Client received reset frame: " ~ stream ~ ", " ~ frame); 129 HttpOutputStream output = getOutputStream(stream); 130 HttpResponse response = getResponse(stream); 131 132 int errorCode = frame.getError(); 133 string reason = isValidErrorCode(errorCode) ? (cast(ErrorCode)errorCode).to!string().toLower() : "error=" ~ errorCode.to!string(); 134 int status = HttpStatus.INTERNAL_SERVER_ERROR_500; 135 136 if(errorCode == ErrorCode.PROTOCOL_ERROR) 137 status = HttpStatus.BAD_REQUEST_400; 138 139 // if (isValidErrorCode(errorCode)) { 140 // switch (errorCode) { 141 // case ErrorCode.PROTOCOL_ERROR: 142 // status = HttpStatus.BAD_REQUEST_400; 143 // break; 144 // default: 145 // status = HttpStatus.INTERNAL_SERVER_ERROR_500; 146 // break; 147 // } 148 // } 149 handler.badMessage(status, reason, request, response, output, connection); 150 } 151 152 private HttpOutputStream getOutputStream(Stream stream) { 153 return cast(HttpOutputStream) stream.getAttribute(OUTPUT_STREAM_KEY); 154 } 155 156 private HttpResponse getResponse(Stream stream) { 157 return cast(HttpResponse) stream.getAttribute(RESPONSE_KEY); 158 } 159 160 static class ReceivedFrame { 161 private Stream stream; 162 private Frame frame; 163 private Callback callback; 164 165 this(Stream stream, Frame frame, Callback callback) { 166 this.stream = stream; 167 this.frame = frame; 168 this.callback = callback; 169 } 170 171 Stream getStream() { 172 return stream; 173 } 174 175 Frame getFrame() { 176 return frame; 177 } 178 179 Callback getCallback() { 180 return callback; 181 } 182 } 183 184 static class ClientHttp2OutputStream : AbstractHttp2OutputStream { 185 186 private Stream stream; 187 188 this(HttpMetaData info, Stream stream) { 189 super(info, true); 190 committed = true; 191 this.stream = stream; 192 } 193 194 override 195 protected Stream getStream() { 196 return stream; 197 } 198 } 199 200 static class ClientStreamPromise : Promise!(Stream) { 201 202 private HttpRequest request; 203 private Promise!(HttpOutputStream) promise; 204 205 this(HttpRequest request, Promise!(HttpOutputStream) promise) { 206 this.request = request; 207 this.promise = promise; 208 } 209 210 bool succeeded(Stream stream) { 211 version(HUNT_DEBUG) { 212 tracef("create a new stream %s", stream.getId()); 213 } 214 215 ClientHttp2OutputStream output = new ClientHttp2OutputStream(request, stream); 216 stream.setAttribute(OUTPUT_STREAM_KEY, output); 217 218 Runnable r = cast(Runnable) stream.getAttribute(RUN_TASK); 219 if(r !is null) 220 r.run(); 221 222 // Optional.ofNullable(cast(Runnable) stream.getAttribute(RUN_TASK)) 223 // .ifPresent(Runnable::run); 224 return promise.succeeded(output); 225 } 226 227 bool failed(Throwable x) { 228 errorf("client creates stream unsuccessfully", x); 229 return promise.failed(x); 230 } 231 232 string id() { return "undefined"; } 233 234 } 235 }