1 module hunt.http.codec.http.stream.AbstractHttp2OutputStream;
2 
3 import hunt.http.codec.http.stream.HttpOutputStream;
4 import hunt.http.codec.http.stream.Stream;
5 
6 import hunt.http.codec.http.frame;
7 import hunt.http.codec.http.model.HttpFields;
8 import hunt.http.codec.http.model.HttpHeader;
9 import hunt.http.codec.http.model.HttpVersion;
10 import hunt.http.codec.http.model.MetaData;
11 
12 import hunt.container.ByteBuffer;
13 import hunt.container.LinkedList;
14 
15 import hunt.lang.common;
16 import hunt.lang.exception;
17 import hunt.util.functional;
18 
19 import hunt.logging;
20 
21 /**
22  * 
23  */
24 abstract class AbstractHttp2OutputStream : HttpOutputStream , Callback 
25 {
26 
27     private long size;
28     private bool isWriting;
29     private LinkedList!Frame frames;
30     private bool noContent = true;
31 
32     this(MetaData metaData, bool clientMode) {
33         super(metaData, clientMode);
34         frames = new LinkedList!Frame();
35     }
36 
37     override
38     void write(ByteBuffer data) {
39         Stream stream = getStream();
40         assert(!closed, "The stream " ~ stream.toString() ~ " output is closed.");
41 
42         noContent = false;
43         commit();
44         writeFrame(new DataFrame(stream.getId(), data, isLastFrame(data)));
45     }
46 
47     override
48     void commit() {
49         if (committed || closed) {
50             return;
51         }
52 
53         HeadersFrame headersFrame = new HeadersFrame(getStream().getId(), metaData, null, noContent);
54         version(HUNT_DEBUG) {
55             tracef("http2 output stream %s commits the header frame %s", getStream().toString(), headersFrame.toString());
56         }
57         writeFrame(headersFrame);
58         committed = true;
59     }
60 
61     override
62     void close() {
63         if (closed) {
64             return;
65         }
66 
67         commit();
68         if (isChunked()) {
69             // Optional.ofNullable(metaData.getTrailerSupplier())
70             //         .map(Supplier::get)
71             //         .ifPresent(trailer -> {
72             //             MetaData metaData = new MetaData(HttpVersion.HTTP_1_1, trailer);
73             //             HeadersFrame trailerFrame = new HeadersFrame(getStream().getId(), metaData, null, true);
74             //             frames.offer(trailerFrame);
75             //         });
76 
77             Supplier!HttpFields supplier= metaData.getTrailerSupplier();
78             if(supplier !is null)
79             {
80                 HttpFields trailer = supplier();
81                 if(trailer !is null)
82                 {
83                     MetaData metaData = new MetaData(HttpVersion.HTTP_1_1, trailer);
84                         HeadersFrame trailerFrame = new HeadersFrame(getStream().getId(), metaData, null, true);
85                         frames.offer(trailerFrame);
86                 }
87             }
88 
89 
90             DisconnectFrame disconnectFrame = new DisconnectFrame();
91             frames.offer(disconnectFrame);
92             if (!isWriting) {
93                 succeeded();
94             }
95         }
96         closed = true;
97     }
98 
99     void writeFrame(Frame frame) {
100         if (isChunked()) {
101             frames.offer(frame);
102             if (!isWriting) {
103                 succeeded();
104             }
105         } else {
106             if (isWriting) {
107                 frames.offer(frame);
108             } else {
109                 _writeFrame(frame);
110             }
111         }
112     }
113 
114     override
115     void succeeded() {
116         if (isChunked()) {
117             if (frames.size() > 2) {
118                 _writeFrame(frames.poll());
119             } else if (frames.size() == 2) {
120                 Frame frame = frames.getLast();
121                 if (frame.getType() == FrameType.DISCONNECT) {
122                     Frame lastFrame = frames.poll();
123                     frames.clear();
124                     switch (lastFrame.getType()) {
125                         case FrameType.DATA: {
126                             DataFrame dataFrame = cast(DataFrame) lastFrame;
127                             if (dataFrame.isEndStream()) {
128                                 _writeFrame(dataFrame);
129                             } else {
130                                 DataFrame lastDataFrame = new DataFrame(dataFrame.getStreamId(), dataFrame.getData(), true);
131                                 _writeFrame(lastDataFrame);
132                             }
133                         }
134                         break;
135                         case FrameType.HEADERS: {
136                             HeadersFrame headersFrame = cast(HeadersFrame) lastFrame;
137                             if (headersFrame.isEndStream()) {
138                                 _writeFrame(headersFrame);
139                             } else {
140                                 HeadersFrame lastHeadersFrame = new HeadersFrame(headersFrame.getStreamId(),
141                                         headersFrame.getMetaData(), headersFrame.getPriority(), true);
142                                 _writeFrame(lastHeadersFrame);
143                             }
144                         }
145                         break;
146                         default:
147                             throw new IllegalStateException("The last frame must be data frame or header frame");
148                     }
149                 } else {
150                     _writeFrame(frames.poll());
151                 }
152             } else if (frames.size() == 1) {
153                 Frame frame = frames.getLast();
154                 if (isLastFrame(frame)) {
155                     _writeFrame(frames.poll());
156                 } else {
157                     isWriting = false;
158                 }
159             } else {
160                 isWriting = false;
161             }
162         } else {
163             Frame frame = frames.poll();
164             if (frame !is null) {
165                 _writeFrame(frame);
166             } else {
167                 isWriting = false;
168             }
169         }
170     }
171 
172     bool isLastFrame(Frame frame) {
173         switch (frame.getType()) {
174             case FrameType.HEADERS:
175                 HeadersFrame headersFrame = cast(HeadersFrame) frame;
176                 return headersFrame.isEndStream();
177             case FrameType.DATA:
178                 DataFrame dataFrame = cast(DataFrame) frame;
179                 return dataFrame.isEndStream();
180             default: break;
181         }
182         return false;
183     }
184 
185     // override
186     void failed(Exception x) {
187         frames.clear();
188         getStream().getSession().close(cast(int)ErrorCode.INTERNAL_ERROR, "Write frame failure", Callback.NOOP);
189         closed = true;
190         errorf("Write frame failure", x);
191     }
192 
193     bool isNonBlocking() {
194         return false;
195     }
196 
197     protected void _writeFrame(Frame frame) {
198         isWriting = true;
199         switch (frame.getType()) {
200             case FrameType.HEADERS: {
201                 HeadersFrame headersFrame = cast(HeadersFrame) frame;
202                 closed = headersFrame.isEndStream();
203                 getStream().headers(headersFrame, this);
204                 break;
205             }
206             case FrameType.DATA: {
207                 DataFrame dataFrame = cast(DataFrame) frame;
208                 closed = dataFrame.isEndStream();
209                 getStream().data(dataFrame, this);
210                 break;
211             }
212 
213             default: break;
214         }
215     }
216 
217     protected bool isLastFrame(ByteBuffer data) {
218         long contentLength = getContentLength();
219         if (contentLength < 0) {
220             return false;
221         } else {
222             size += data.remaining();
223             tracef("http2 output size: %s, content length: %s", size, contentLength);
224             return size >= contentLength;
225         }
226     }
227 
228     protected long getContentLength() {
229         return metaData.getFields().getLongField(HttpHeader.CONTENT_LENGTH.asString());
230     }
231 
232     bool isNoContent() {
233         return noContent;
234     }
235 
236     protected bool isChunked() {
237         return !noContent && getContentLength() < 0;
238     }
239 
240     abstract protected Stream getStream();
241 
242 }