1 module hunt.http.client.Http2ClientResponseHandler;
2 
3 import hunt.http.client.ClientHttpHandler;
4 import hunt.http.client.HttpClientConnection;
5 
6 import hunt.http.codec.http.frame;
7 import hunt.http.codec.http.model.HttpStatus;
8 import hunt.http.codec.http.model.MetaData;
9 import hunt.http.codec.http.stream.AbstractHttp2OutputStream;
10 import hunt.http.codec.http.stream.DataFrameHandler;
11 import hunt.http.codec.http.stream.HttpOutputStream;
12 import hunt.http.codec.http.stream.Stream;
13 
14 import hunt.container.LinkedList;
15 
16 import hunt.logging;
17 import hunt.lang.common;
18 import hunt.lang.exception;
19 import hunt.util.functional;
20 import hunt.util.concurrent.Promise;
21 
22 import std.conv;
23 import std.string;
24 
25 alias Request = HttpRequest;
26 
27 class Http2ClientResponseHandler : Stream.Listener.Adapter { //  , Runnable
28 
29     enum string OUTPUT_STREAM_KEY = "_outputStream";
30     enum string RESPONSE_KEY = "_response";
31     enum string RUN_TASK = "_runTask";
32 
33     private Request request;
34     private ClientHttpHandler handler;
35     private HttpClientConnection connection;
36     private LinkedList!(ReceivedFrame) receivedFrames; // = new LinkedList!()();
37 
38     this(Request request, ClientHttpHandler handler, HttpClientConnection connection) {
39         this.request = request;
40         this.handler = handler;
41         this.connection = connection;
42         receivedFrames = new LinkedList!(ReceivedFrame)();
43     }
44 
45     override
46     void onHeaders(Stream stream, HeadersFrame headersFrame) {
47         // Wait the stream is created.
48         receivedFrames.add(new ReceivedFrame(stream, headersFrame, Callback.NOOP));
49         onFrames(stream);
50     }
51 
52     override
53     void onData(Stream stream, DataFrame dataFrame, Callback callback) {
54         receivedFrames.add(new ReceivedFrame(stream, dataFrame, callback));
55         onFrames(stream);
56     }
57 
58     // override
59     void run() {
60         ReceivedFrame receivedFrame;
61         while ((receivedFrame = receivedFrames.poll()) !is null) {
62             onReceivedFrame(receivedFrame);
63         }
64     }
65 
66     private void onFrames(Stream stream) {
67         HttpOutputStream output = getOutputStream(stream);
68         if (output !is null) { // the stream is created completely
69             run();
70         } else {
71             stream.setAttribute(RUN_TASK, this);
72         }
73     }
74 
75     private void onReceivedFrame(ReceivedFrame receivedFrame) {
76         Stream stream = receivedFrame.getStream();
77         HttpOutputStream output = getOutputStream(stream);
78 
79         switch (receivedFrame.getFrame().getType()) {
80             case FrameType.HEADERS: {
81                 HeadersFrame headersFrame = cast(HeadersFrame) receivedFrame.getFrame();
82                 if (headersFrame.getMetaData() is null) {
83                     throw new IllegalArgumentException("the stream " ~ stream.getId().to!string() ~ " received a null meta data");
84                 }
85 
86                 if (headersFrame.getMetaData().isResponse()) {
87                     HttpResponse response = cast(HttpResponse) headersFrame.getMetaData();
88 
89                     if (response.getStatus() == HttpStatus.CONTINUE_100) {
90                         handler.continueToSendData(request, response, output, connection);
91                     } else {
92                         stream.setAttribute(RESPONSE_KEY, response);
93                         handler.headerComplete(request, response, output, connection);
94                         if (headersFrame.isEndStream()) {
95                             handler.messageComplete(request, response, output, connection);
96                         }
97                     }
98                 } else {
99                     if (headersFrame.isEndStream()) {
100                         HttpResponse response = getResponse(stream);
101 
102                         response.setTrailerSupplier(() => headersFrame.getMetaData().getFields());
103                         handler.contentComplete(request, response, output, connection);
104                         handler.messageComplete(request, response, output, connection);
105                     } else {
106                         throw new IllegalArgumentException("the stream " ~ stream.getId().to!string() ~ " received illegal meta data");
107                     }
108                 }
109             }
110             break;
111 
112             case FrameType.DATA: {
113                 DataFrame dataFrame = cast(DataFrame) receivedFrame.getFrame();
114                 Callback callback = receivedFrame.getCallback();
115                 HttpResponse response = getResponse(stream);
116 
117                 DataFrameHandler.handleDataFrame(dataFrame, callback, request, response, output, connection, handler);
118             }
119             break;
120 
121             default: break;
122         }
123     }
124 
125     override
126     void onReset(Stream stream, ResetFrame frame) {
127         // writeln("Client received reset frame: " ~ stream ~ ", " ~ frame);
128         HttpOutputStream output = getOutputStream(stream);
129         HttpResponse response = getResponse(stream);
130 
131         int errorCode = frame.getError();
132         string reason = isValidErrorCode(errorCode) ? (cast(ErrorCode)errorCode).to!string().toLower() : "error=" ~ errorCode.to!string();
133         int status = HttpStatus.INTERNAL_SERVER_ERROR_500;
134 
135         if(errorCode == ErrorCode.PROTOCOL_ERROR)
136             status = HttpStatus.BAD_REQUEST_400;
137 
138         // if (isValidErrorCode(errorCode)) {
139         //     switch (errorCode) {
140         //         case ErrorCode.PROTOCOL_ERROR:
141         //             status = HttpStatus.BAD_REQUEST_400;
142         //             break;
143         //         default:
144         //             status = HttpStatus.INTERNAL_SERVER_ERROR_500;
145         //             break;
146         //     }
147         // }
148         handler.badMessage(status, reason, request, response, output, connection);
149     }
150 
151     private HttpOutputStream getOutputStream(Stream stream) {
152         return cast(HttpOutputStream) stream.getAttribute(OUTPUT_STREAM_KEY);
153     }
154 
155     private HttpResponse getResponse(Stream stream) {
156         return cast(HttpResponse) stream.getAttribute(RESPONSE_KEY);
157     }
158 
159     static class ReceivedFrame {
160         private Stream stream;
161         private Frame frame;
162         private Callback callback;
163 
164         this(Stream stream, Frame frame, Callback callback) {
165             this.stream = stream;
166             this.frame = frame;
167             this.callback = callback;
168         }
169 
170         Stream getStream() {
171             return stream;
172         }
173 
174         Frame getFrame() {
175             return frame;
176         }
177 
178         Callback getCallback() {
179             return callback;
180         }
181     }
182 
183     static class ClientHttp2OutputStream : AbstractHttp2OutputStream {
184 
185         private Stream stream;
186 
187         this(MetaData info, Stream stream) {
188             super(info, true);
189             committed = true;
190             this.stream = stream;
191         }
192 
193         override
194         protected Stream getStream() {
195             return stream;
196         }
197     }
198 
199     static class ClientStreamPromise : Promise!(Stream) {
200 
201         private Request request;
202         private Promise!(HttpOutputStream) promise;
203 
204         this(Request request, Promise!(HttpOutputStream) promise) {
205             this.request = request;
206             this.promise = promise;
207         }
208 
209         void succeeded(Stream stream) {
210             version(HUNT_DEBUG) {
211                 tracef("create a new stream %s", stream.getId());
212             }
213 
214             ClientHttp2OutputStream output = new ClientHttp2OutputStream(request, stream);
215             stream.setAttribute(OUTPUT_STREAM_KEY, output);
216 
217             Runnable r = cast(Runnable) stream.getAttribute(RUN_TASK);
218             if(r !is null)
219                 r.run();
220 
221             // Optional.ofNullable(cast(Runnable) stream.getAttribute(RUN_TASK))
222             //         .ifPresent(Runnable::run);
223             promise.succeeded(output);
224         }
225 
226         void failed(Exception x) {
227             promise.failed(x);
228             errorf("client creates stream unsuccessfully", x);
229         }
230 
231         string id() { return "undefined"; }
232 
233     }
234 }