1 module hunt.http.codec.http.stream.AbstractHttp2OutputStream; 2 3 import hunt.http.HttpOutputStream; 4 import hunt.http.codec.http.stream.Stream; 5 6 import hunt.http.codec.http.frame; 7 import hunt.http.HttpFields; 8 import hunt.http.HttpHeader; 9 import hunt.http.HttpMetaData; 10 import hunt.http.HttpRequest; 11 import hunt.http.HttpVersion; 12 13 import hunt.io.ByteBuffer; 14 import hunt.collection.LinkedList; 15 16 import hunt.Functions; 17 import hunt.Exceptions; 18 import hunt.util.Common; 19 20 import hunt.logging; 21 22 /** 23 * 24 */ 25 abstract class AbstractHttp2OutputStream : HttpOutputStream , Callback 26 { 27 28 private long size; 29 private bool isWriting; 30 private LinkedList!Frame frames; 31 private bool noContent = true; 32 33 this(HttpMetaData metaData, bool clientMode) { 34 super(metaData, clientMode); 35 frames = new LinkedList!Frame(); 36 } 37 38 override 39 void write(ByteBuffer data) { 40 Stream stream = getStream(); 41 assert(!closed, "The stream " ~ stream.toString() ~ " output is closed."); 42 43 noContent = false; 44 commit(); 45 writeFrame(new DataFrame(stream.getId(), data, isLastFrame(data))); 46 } 47 48 override 49 void commit() { 50 if (committed || closed) { 51 return; 52 } 53 54 HeadersFrame headersFrame = new HeadersFrame(getStream().getId(), metaData, null, noContent); 55 version(HUNT_DEBUG) { 56 tracef("http2 output stream %s commits the header frame %s", getStream().toString(), headersFrame.toString()); 57 } 58 writeFrame(headersFrame); 59 committed = true; 60 } 61 62 override 63 void close() { 64 if (closed) { 65 return; 66 } 67 68 commit(); 69 if (isChunked()) { 70 // Optional.ofNullable(metaData.getTrailerSupplier()) 71 // .map(Supplier::get) 72 // .ifPresent(trailer -> { 73 // HttpMetaData metaData = new HttpMetaData(HttpVersion.HTTP_1_1, trailer); 74 // HeadersFrame trailerFrame = new HeadersFrame(getStream().getId(), metaData, null, true); 75 // frames.offer(trailerFrame); 76 // }); 77 78 Supplier!HttpFields supplier= metaData.getTrailerSupplier(); 79 if(supplier !is null) 80 { 81 HttpFields trailer = supplier(); 82 if(trailer !is null) 83 { 84 HttpMetaData metaData = new HttpMetaData(HttpVersion.HTTP_1_1, trailer); 85 HeadersFrame trailerFrame = new HeadersFrame(getStream().getId(), metaData, null, true); 86 frames.offer(trailerFrame); 87 } 88 } 89 90 91 DisconnectFrame disconnectFrame = new DisconnectFrame(); 92 frames.offer(disconnectFrame); 93 if (!isWriting) { 94 succeeded(); 95 } 96 } 97 closed = true; 98 } 99 100 void writeFrame(Frame frame) { 101 if (isChunked()) { 102 frames.offer(frame); 103 if (!isWriting) { 104 succeeded(); 105 } 106 } else { 107 if (isWriting) { 108 frames.offer(frame); 109 } else { 110 _writeFrame(frame); 111 } 112 } 113 } 114 115 override 116 void succeeded() { 117 if (isChunked()) { 118 if (frames.size() > 2) { 119 _writeFrame(frames.poll()); 120 } else if (frames.size() == 2) { 121 Frame frame = frames.getLast(); 122 if (frame.getType() == FrameType.DISCONNECT) { 123 Frame lastFrame = frames.poll(); 124 frames.clear(); 125 switch (lastFrame.getType()) { 126 case FrameType.DATA: { 127 DataFrame dataFrame = cast(DataFrame) lastFrame; 128 if (dataFrame.isEndStream()) { 129 _writeFrame(dataFrame); 130 } else { 131 DataFrame lastDataFrame = new DataFrame(dataFrame.getStreamId(), dataFrame.getData(), true); 132 _writeFrame(lastDataFrame); 133 } 134 } 135 break; 136 case FrameType.HEADERS: { 137 HeadersFrame headersFrame = cast(HeadersFrame) lastFrame; 138 if (headersFrame.isEndStream()) { 139 _writeFrame(headersFrame); 140 } else { 141 HeadersFrame lastHeadersFrame = new HeadersFrame(headersFrame.getStreamId(), 142 headersFrame.getMetaData(), headersFrame.getPriority(), true); 143 _writeFrame(lastHeadersFrame); 144 } 145 } 146 break; 147 default: 148 throw new IllegalStateException("The last frame must be data frame or header frame"); 149 } 150 } else { 151 _writeFrame(frames.poll()); 152 } 153 } else if (frames.size() == 1) { 154 Frame frame = frames.getLast(); 155 if (isLastFrame(frame)) { 156 _writeFrame(frames.poll()); 157 } else { 158 isWriting = false; 159 } 160 } else { 161 isWriting = false; 162 } 163 } else { 164 Frame frame = frames.poll(); 165 if (frame !is null) { 166 _writeFrame(frame); 167 } else { 168 isWriting = false; 169 } 170 } 171 } 172 173 bool isLastFrame(Frame frame) { 174 switch (frame.getType()) { 175 case FrameType.HEADERS: 176 HeadersFrame headersFrame = cast(HeadersFrame) frame; 177 return headersFrame.isEndStream(); 178 case FrameType.DATA: 179 DataFrame dataFrame = cast(DataFrame) frame; 180 return dataFrame.isEndStream(); 181 default: break; 182 } 183 return false; 184 } 185 186 // override 187 void failed(Exception x) { 188 frames.clear(); 189 getStream().getSession().close(cast(int)ErrorCode.INTERNAL_ERROR, "Write frame failure", Callback.NOOP); 190 closed = true; 191 errorf("Write frame failure", x); 192 } 193 194 bool isNonBlocking() { 195 return false; 196 } 197 198 protected void _writeFrame(Frame frame) { 199 isWriting = true; 200 switch (frame.getType()) { 201 case FrameType.HEADERS: { 202 HeadersFrame headersFrame = cast(HeadersFrame) frame; 203 closed = headersFrame.isEndStream(); 204 getStream().headers(headersFrame, this); 205 break; 206 } 207 case FrameType.DATA: { 208 DataFrame dataFrame = cast(DataFrame) frame; 209 closed = dataFrame.isEndStream(); 210 getStream().data(dataFrame, this); 211 break; 212 } 213 214 default: break; 215 } 216 } 217 218 protected bool isLastFrame(ByteBuffer data) { 219 long contentLength = getContentLength(); 220 if (contentLength < 0) { 221 return false; 222 } else { 223 size += data.remaining(); 224 tracef("http2 output size: %s, content length: %s", size, contentLength); 225 return size >= contentLength; 226 } 227 } 228 229 protected long getContentLength() { 230 return metaData.getFields().getLongField(HttpHeader.CONTENT_LENGTH.asString()); 231 } 232 233 bool isNoContent() { 234 return noContent; 235 } 236 237 protected bool isChunked() { 238 return !noContent && getContentLength() < 0; 239 } 240 241 abstract protected Stream getStream(); 242 243 }