1 module hunt.http.codec.http.stream.AbstractHttp2OutputStream;
2 
3 import hunt.http.HttpOutputStream;
4 import hunt.http.codec.http.stream.Stream;
5 
6 import hunt.http.codec.http.frame;
7 import hunt.http.HttpFields;
8 import hunt.http.HttpHeader;
9 import hunt.http.HttpMetaData;
10 import hunt.http.HttpRequest;
11 import hunt.http.HttpVersion;
12 
13 import hunt.io.ByteBuffer;
14 import hunt.collection.LinkedList;
15 
16 import hunt.Functions;
17 import hunt.Exceptions;
18 import hunt.util.Common;
19 
20 import hunt.logging;
21 
22 /**
23  * 
24  */
25 abstract class AbstractHttp2OutputStream : HttpOutputStream , Callback 
26 {
27 
28     private long size;
29     private bool isWriting;
30     private LinkedList!Frame frames;
31     private bool noContent = true;
32 
33     this(HttpMetaData metaData, bool clientMode) {
34         super(metaData, clientMode);
35         frames = new LinkedList!Frame();
36     }
37 
38     override
39     void write(ByteBuffer data) {
40         Stream stream = getStream();
41         assert(!closed, "The stream " ~ stream.toString() ~ " output is closed.");
42 
43         noContent = false;
44         commit();
45         writeFrame(new DataFrame(stream.getId(), data, isLastFrame(data)));
46     }
47 
48     override
49     void commit() {
50         if (committed || closed) {
51             return;
52         }
53 
54         HeadersFrame headersFrame = new HeadersFrame(getStream().getId(), metaData, null, noContent);
55         version(HUNT_DEBUG) {
56             tracef("http2 output stream %s commits the header frame %s", getStream().toString(), headersFrame.toString());
57         }
58         writeFrame(headersFrame);
59         committed = true;
60     }
61 
62     override
63     void close() {
64         if (closed) {
65             return;
66         }
67 
68         commit();
69         if (isChunked()) {
70             // Optional.ofNullable(metaData.getTrailerSupplier())
71             //         .map(Supplier::get)
72             //         .ifPresent(trailer -> {
73             //             HttpMetaData metaData = new HttpMetaData(HttpVersion.HTTP_1_1, trailer);
74             //             HeadersFrame trailerFrame = new HeadersFrame(getStream().getId(), metaData, null, true);
75             //             frames.offer(trailerFrame);
76             //         });
77 
78             Supplier!HttpFields supplier= metaData.getTrailerSupplier();
79             if(supplier !is null)
80             {
81                 HttpFields trailer = supplier();
82                 if(trailer !is null)
83                 {
84                     HttpMetaData metaData = new HttpMetaData(HttpVersion.HTTP_1_1, trailer);
85                         HeadersFrame trailerFrame = new HeadersFrame(getStream().getId(), metaData, null, true);
86                         frames.offer(trailerFrame);
87                 }
88             }
89 
90 
91             DisconnectFrame disconnectFrame = new DisconnectFrame();
92             frames.offer(disconnectFrame);
93             if (!isWriting) {
94                 succeeded();
95             }
96         }
97         closed = true;
98     }
99 
100     void writeFrame(Frame frame) {
101         if (isChunked()) {
102             frames.offer(frame);
103             if (!isWriting) {
104                 succeeded();
105             }
106         } else {
107             if (isWriting) {
108                 frames.offer(frame);
109             } else {
110                 _writeFrame(frame);
111             }
112         }
113     }
114 
115     override
116     void succeeded() {
117         if (isChunked()) {
118             if (frames.size() > 2) {
119                 _writeFrame(frames.poll());
120             } else if (frames.size() == 2) {
121                 Frame frame = frames.getLast();
122                 if (frame.getType() == FrameType.DISCONNECT) {
123                     Frame lastFrame = frames.poll();
124                     frames.clear();
125                     switch (lastFrame.getType()) {
126                         case FrameType.DATA: {
127                             DataFrame dataFrame = cast(DataFrame) lastFrame;
128                             if (dataFrame.isEndStream()) {
129                                 _writeFrame(dataFrame);
130                             } else {
131                                 DataFrame lastDataFrame = new DataFrame(dataFrame.getStreamId(), dataFrame.getData(), true);
132                                 _writeFrame(lastDataFrame);
133                             }
134                         }
135                         break;
136                         case FrameType.HEADERS: {
137                             HeadersFrame headersFrame = cast(HeadersFrame) lastFrame;
138                             if (headersFrame.isEndStream()) {
139                                 _writeFrame(headersFrame);
140                             } else {
141                                 HeadersFrame lastHeadersFrame = new HeadersFrame(headersFrame.getStreamId(),
142                                         headersFrame.getMetaData(), headersFrame.getPriority(), true);
143                                 _writeFrame(lastHeadersFrame);
144                             }
145                         }
146                         break;
147                         default:
148                             throw new IllegalStateException("The last frame must be data frame or header frame");
149                     }
150                 } else {
151                     _writeFrame(frames.poll());
152                 }
153             } else if (frames.size() == 1) {
154                 Frame frame = frames.getLast();
155                 if (isLastFrame(frame)) {
156                     _writeFrame(frames.poll());
157                 } else {
158                     isWriting = false;
159                 }
160             } else {
161                 isWriting = false;
162             }
163         } else {
164             Frame frame = frames.poll();
165             if (frame !is null) {
166                 _writeFrame(frame);
167             } else {
168                 isWriting = false;
169             }
170         }
171     }
172 
173     bool isLastFrame(Frame frame) {
174         switch (frame.getType()) {
175             case FrameType.HEADERS:
176                 HeadersFrame headersFrame = cast(HeadersFrame) frame;
177                 return headersFrame.isEndStream();
178             case FrameType.DATA:
179                 DataFrame dataFrame = cast(DataFrame) frame;
180                 return dataFrame.isEndStream();
181             default: break;
182         }
183         return false;
184     }
185 
186     // override
187     void failed(Exception x) {
188         frames.clear();
189         getStream().getSession().close(cast(int)ErrorCode.INTERNAL_ERROR, "Write frame failure", Callback.NOOP);
190         closed = true;
191         errorf("Write frame failure", x);
192     }
193 
194     bool isNonBlocking() {
195         return false;
196     }
197 
198     protected void _writeFrame(Frame frame) {
199         isWriting = true;
200         switch (frame.getType()) {
201             case FrameType.HEADERS: {
202                 HeadersFrame headersFrame = cast(HeadersFrame) frame;
203                 closed = headersFrame.isEndStream();
204                 getStream().headers(headersFrame, this);
205                 break;
206             }
207             case FrameType.DATA: {
208                 DataFrame dataFrame = cast(DataFrame) frame;
209                 closed = dataFrame.isEndStream();
210                 getStream().data(dataFrame, this);
211                 break;
212             }
213 
214             default: break;
215         }
216     }
217 
218     protected bool isLastFrame(ByteBuffer data) {
219         long contentLength = getContentLength();
220         if (contentLength < 0) {
221             return false;
222         } else {
223             size += data.remaining();
224             tracef("http2 output size: %s, content length: %s", size, contentLength);
225             return size >= contentLength;
226         }
227     }
228 
229     protected long getContentLength() {
230         return metaData.getFields().getLongField(HttpHeader.CONTENT_LENGTH.asString());
231     }
232 
233     bool isNoContent() {
234         return noContent;
235     }
236 
237     protected bool isChunked() {
238         return !noContent && getContentLength() < 0;
239     }
240 
241     abstract protected Stream getStream();
242 
243 }