1 module hunt.http.client.Http2ClientResponseHandler;
2 
3 import hunt.http.client.ClientHttpHandler;
4 import hunt.http.client.HttpClientConnection;
5 
6 import hunt.http.codec.http.frame;
7 import hunt.http.codec.http.stream.AbstractHttp2OutputStream;
8 import hunt.http.codec.http.stream.DataFrameHandler;
9 import hunt.http.HttpOutputStream;
10 import hunt.http.codec.http.stream.Stream;
11 
12 import hunt.http.HttpMetaData;
13 import hunt.http.HttpStatus;
14 import hunt.http.HttpRequest;
15 import hunt.http.HttpResponse;
16 
17 import hunt.collection.LinkedList;
18 import hunt.concurrency.Promise;
19 import hunt.logging;
20 import hunt.Exceptions;
21 import hunt.util.Common;
22 import hunt.util.Runnable;
23 
24 import std.conv;
25 import std.string;
26 
27 
28 class Http2ClientResponseHandler : Stream.Listener.Adapter { //  , Runnable
29 
30     enum string OUTPUT_STREAM_KEY = "_outputStream";
31     enum string RESPONSE_KEY = "_response";
32     enum string RUN_TASK = "_runTask";
33 
34     private HttpRequest request;
35     private ClientHttpHandler handler;
36     private HttpClientConnection connection;
37     private LinkedList!(ReceivedFrame) receivedFrames; // = new LinkedList!()();
38 
39     this(HttpRequest request, ClientHttpHandler handler, HttpClientConnection connection) {
40         this.request = request;
41         this.handler = handler;
42         this.connection = connection;
43         receivedFrames = new LinkedList!(ReceivedFrame)();
44     }
45 
46     override
47     void onHeaders(Stream stream, HeadersFrame headersFrame) {
48         // Wait the stream is created.
49         receivedFrames.add(new ReceivedFrame(stream, headersFrame, Callback.NOOP));
50         onFrames(stream);
51     }
52 
53     override
54     void onData(Stream stream, DataFrame dataFrame, Callback callback) {
55         receivedFrames.add(new ReceivedFrame(stream, dataFrame, callback));
56         onFrames(stream);
57     }
58 
59     // override
60     void run() {
61         ReceivedFrame receivedFrame;
62         while ((receivedFrame = receivedFrames.poll()) !is null) {
63             onReceivedFrame(receivedFrame);
64         }
65     }
66 
67     private void onFrames(Stream stream) {
68         HttpOutputStream output = getOutputStream(stream);
69         if (output !is null) { // the stream is created completely
70             run();
71         } else {
72             stream.setAttribute(RUN_TASK, this);
73         }
74     }
75 
76     private void onReceivedFrame(ReceivedFrame receivedFrame) {
77         Stream stream = receivedFrame.getStream();
78         HttpOutputStream output = getOutputStream(stream);
79 
80         switch (receivedFrame.getFrame().getType()) {
81             case FrameType.HEADERS: {
82                 HeadersFrame headersFrame = cast(HeadersFrame) receivedFrame.getFrame();
83                 if (headersFrame.getMetaData() is null) {
84                     throw new IllegalArgumentException("the stream " ~ stream.getId().to!string() ~ " received a null meta data");
85                 }
86 
87                 if (headersFrame.getMetaData().isResponse()) {
88                     HttpResponse response = cast(HttpResponse) headersFrame.getMetaData();
89 
90                     if (response.getStatus() == HttpStatus.CONTINUE_100) {
91                         handler.continueToSendData(request, response, output, connection);
92                     } else {
93                         stream.setAttribute(RESPONSE_KEY, response);
94                         handler.headerComplete(request, response, output, connection);
95                         if (headersFrame.isEndStream()) {
96                             handler.messageComplete(request, response, output, connection);
97                         }
98                     }
99                 } else {
100                     if (headersFrame.isEndStream()) {
101                         HttpResponse response = getResponse(stream);
102 
103                         response.setTrailerSupplier(() => headersFrame.getMetaData().getFields());
104                         handler.contentComplete(request, response, output, connection);
105                         handler.messageComplete(request, response, output, connection);
106                     } else {
107                         throw new IllegalArgumentException("the stream " ~ stream.getId().to!string() ~ " received illegal meta data");
108                     }
109                 }
110             }
111             break;
112 
113             case FrameType.DATA: {
114                 DataFrame dataFrame = cast(DataFrame) receivedFrame.getFrame();
115                 Callback callback = receivedFrame.getCallback();
116                 HttpResponse response = getResponse(stream);
117 
118                 DataFrameHandler.handleDataFrame(dataFrame, callback, request, response, output, connection, handler);
119             }
120             break;
121 
122             default: break;
123         }
124     }
125 
126     override
127     void onReset(Stream stream, ResetFrame frame) {
128         // writeln("Client received reset frame: " ~ stream ~ ", " ~ frame);
129         HttpOutputStream output = getOutputStream(stream);
130         HttpResponse response = getResponse(stream);
131 
132         int errorCode = frame.getError();
133         string reason = isValidErrorCode(errorCode) ? (cast(ErrorCode)errorCode).to!string().toLower() : "error=" ~ errorCode.to!string();
134         int status = HttpStatus.INTERNAL_SERVER_ERROR_500;
135 
136         if(errorCode == ErrorCode.PROTOCOL_ERROR)
137             status = HttpStatus.BAD_REQUEST_400;
138 
139         // if (isValidErrorCode(errorCode)) {
140         //     switch (errorCode) {
141         //         case ErrorCode.PROTOCOL_ERROR:
142         //             status = HttpStatus.BAD_REQUEST_400;
143         //             break;
144         //         default:
145         //             status = HttpStatus.INTERNAL_SERVER_ERROR_500;
146         //             break;
147         //     }
148         // }
149         handler.badMessage(status, reason, request, response, output, connection);
150     }
151 
152     private HttpOutputStream getOutputStream(Stream stream) {
153         return cast(HttpOutputStream) stream.getAttribute(OUTPUT_STREAM_KEY);
154     }
155 
156     private HttpResponse getResponse(Stream stream) {
157         return cast(HttpResponse) stream.getAttribute(RESPONSE_KEY);
158     }
159 
160     static class ReceivedFrame {
161         private Stream stream;
162         private Frame frame;
163         private Callback callback;
164 
165         this(Stream stream, Frame frame, Callback callback) {
166             this.stream = stream;
167             this.frame = frame;
168             this.callback = callback;
169         }
170 
171         Stream getStream() {
172             return stream;
173         }
174 
175         Frame getFrame() {
176             return frame;
177         }
178 
179         Callback getCallback() {
180             return callback;
181         }
182     }
183 
184     static class ClientHttp2OutputStream : AbstractHttp2OutputStream {
185 
186         private Stream stream;
187 
188         this(HttpMetaData info, Stream stream) {
189             super(info, true);
190             committed = true;
191             this.stream = stream;
192         }
193 
194         override
195         protected Stream getStream() {
196             return stream;
197         }
198     }
199 
200     static class ClientStreamPromise : Promise!(Stream) {
201 
202         private HttpRequest request;
203         private Promise!(HttpOutputStream) promise;
204 
205         this(HttpRequest request, Promise!(HttpOutputStream) promise) {
206             this.request = request;
207             this.promise = promise;
208         }
209 
210         bool succeeded(Stream stream) {
211             version(HUNT_DEBUG) {
212                 tracef("create a new stream %s", stream.getId());
213             }
214 
215             ClientHttp2OutputStream output = new ClientHttp2OutputStream(request, stream);
216             stream.setAttribute(OUTPUT_STREAM_KEY, output);
217 
218             Runnable r = cast(Runnable) stream.getAttribute(RUN_TASK);
219             if(r !is null)
220                 r.run();
221 
222             // Optional.ofNullable(cast(Runnable) stream.getAttribute(RUN_TASK))
223             //         .ifPresent(Runnable::run);
224             return promise.succeeded(output);
225         }
226 
227         bool failed(Throwable x) {
228             errorf("client creates stream unsuccessfully", x);
229             return promise.failed(x);
230         }
231 
232         string id() { return "undefined"; }
233 
234     }
235 }