1 module hunt.http.codec.http.stream.AbstractHttp2OutputStream; 2 3 import hunt.http.codec.http.stream.HttpOutputStream; 4 import hunt.http.codec.http.stream.Stream; 5 6 import hunt.http.codec.http.frame; 7 import hunt.http.codec.http.model.HttpFields; 8 import hunt.http.codec.http.model.HttpHeader; 9 import hunt.http.codec.http.model.HttpVersion; 10 import hunt.http.codec.http.model.MetaData; 11 12 import hunt.container.ByteBuffer; 13 import hunt.container.LinkedList; 14 15 import hunt.lang.common; 16 import hunt.lang.exception; 17 import hunt.util.functional; 18 19 import hunt.logging; 20 21 /** 22 * 23 */ 24 abstract class AbstractHttp2OutputStream : HttpOutputStream , Callback 25 { 26 27 private long size; 28 private bool isWriting; 29 private LinkedList!Frame frames; 30 private bool noContent = true; 31 32 this(MetaData metaData, bool clientMode) { 33 super(metaData, clientMode); 34 frames = new LinkedList!Frame(); 35 } 36 37 override 38 void write(ByteBuffer data) { 39 Stream stream = getStream(); 40 assert(!closed, "The stream " ~ stream.toString() ~ " output is closed."); 41 42 noContent = false; 43 commit(); 44 writeFrame(new DataFrame(stream.getId(), data, isLastFrame(data))); 45 } 46 47 override 48 void commit() { 49 if (committed || closed) { 50 return; 51 } 52 53 HeadersFrame headersFrame = new HeadersFrame(getStream().getId(), metaData, null, noContent); 54 version(HUNT_DEBUG) { 55 tracef("http2 output stream %s commits the header frame %s", getStream().toString(), headersFrame.toString()); 56 } 57 writeFrame(headersFrame); 58 committed = true; 59 } 60 61 override 62 void close() { 63 if (closed) { 64 return; 65 } 66 67 commit(); 68 if (isChunked()) { 69 // Optional.ofNullable(metaData.getTrailerSupplier()) 70 // .map(Supplier::get) 71 // .ifPresent(trailer -> { 72 // MetaData metaData = new MetaData(HttpVersion.HTTP_1_1, trailer); 73 // HeadersFrame trailerFrame = new HeadersFrame(getStream().getId(), metaData, null, true); 74 // frames.offer(trailerFrame); 75 // }); 76 77 Supplier!HttpFields supplier= metaData.getTrailerSupplier(); 78 if(supplier !is null) 79 { 80 HttpFields trailer = supplier(); 81 if(trailer !is null) 82 { 83 MetaData metaData = new MetaData(HttpVersion.HTTP_1_1, trailer); 84 HeadersFrame trailerFrame = new HeadersFrame(getStream().getId(), metaData, null, true); 85 frames.offer(trailerFrame); 86 } 87 } 88 89 90 DisconnectFrame disconnectFrame = new DisconnectFrame(); 91 frames.offer(disconnectFrame); 92 if (!isWriting) { 93 succeeded(); 94 } 95 } 96 closed = true; 97 } 98 99 void writeFrame(Frame frame) { 100 if (isChunked()) { 101 frames.offer(frame); 102 if (!isWriting) { 103 succeeded(); 104 } 105 } else { 106 if (isWriting) { 107 frames.offer(frame); 108 } else { 109 _writeFrame(frame); 110 } 111 } 112 } 113 114 override 115 void succeeded() { 116 if (isChunked()) { 117 if (frames.size() > 2) { 118 _writeFrame(frames.poll()); 119 } else if (frames.size() == 2) { 120 Frame frame = frames.getLast(); 121 if (frame.getType() == FrameType.DISCONNECT) { 122 Frame lastFrame = frames.poll(); 123 frames.clear(); 124 switch (lastFrame.getType()) { 125 case FrameType.DATA: { 126 DataFrame dataFrame = cast(DataFrame) lastFrame; 127 if (dataFrame.isEndStream()) { 128 _writeFrame(dataFrame); 129 } else { 130 DataFrame lastDataFrame = new DataFrame(dataFrame.getStreamId(), dataFrame.getData(), true); 131 _writeFrame(lastDataFrame); 132 } 133 } 134 break; 135 case FrameType.HEADERS: { 136 HeadersFrame headersFrame = cast(HeadersFrame) lastFrame; 137 if (headersFrame.isEndStream()) { 138 _writeFrame(headersFrame); 139 } else { 140 HeadersFrame lastHeadersFrame = new HeadersFrame(headersFrame.getStreamId(), 141 headersFrame.getMetaData(), headersFrame.getPriority(), true); 142 _writeFrame(lastHeadersFrame); 143 } 144 } 145 break; 146 default: 147 throw new IllegalStateException("The last frame must be data frame or header frame"); 148 } 149 } else { 150 _writeFrame(frames.poll()); 151 } 152 } else if (frames.size() == 1) { 153 Frame frame = frames.getLast(); 154 if (isLastFrame(frame)) { 155 _writeFrame(frames.poll()); 156 } else { 157 isWriting = false; 158 } 159 } else { 160 isWriting = false; 161 } 162 } else { 163 Frame frame = frames.poll(); 164 if (frame !is null) { 165 _writeFrame(frame); 166 } else { 167 isWriting = false; 168 } 169 } 170 } 171 172 bool isLastFrame(Frame frame) { 173 switch (frame.getType()) { 174 case FrameType.HEADERS: 175 HeadersFrame headersFrame = cast(HeadersFrame) frame; 176 return headersFrame.isEndStream(); 177 case FrameType.DATA: 178 DataFrame dataFrame = cast(DataFrame) frame; 179 return dataFrame.isEndStream(); 180 default: break; 181 } 182 return false; 183 } 184 185 // override 186 void failed(Exception x) { 187 frames.clear(); 188 getStream().getSession().close(cast(int)ErrorCode.INTERNAL_ERROR, "Write frame failure", Callback.NOOP); 189 closed = true; 190 errorf("Write frame failure", x); 191 } 192 193 bool isNonBlocking() { 194 return false; 195 } 196 197 protected void _writeFrame(Frame frame) { 198 isWriting = true; 199 switch (frame.getType()) { 200 case FrameType.HEADERS: { 201 HeadersFrame headersFrame = cast(HeadersFrame) frame; 202 closed = headersFrame.isEndStream(); 203 getStream().headers(headersFrame, this); 204 break; 205 } 206 case FrameType.DATA: { 207 DataFrame dataFrame = cast(DataFrame) frame; 208 closed = dataFrame.isEndStream(); 209 getStream().data(dataFrame, this); 210 break; 211 } 212 213 default: break; 214 } 215 } 216 217 protected bool isLastFrame(ByteBuffer data) { 218 long contentLength = getContentLength(); 219 if (contentLength < 0) { 220 return false; 221 } else { 222 size += data.remaining(); 223 tracef("http2 output size: %s, content length: %s", size, contentLength); 224 return size >= contentLength; 225 } 226 } 227 228 protected long getContentLength() { 229 return metaData.getFields().getLongField(HttpHeader.CONTENT_LENGTH.asString()); 230 } 231 232 bool isNoContent() { 233 return noContent; 234 } 235 236 protected bool isChunked() { 237 return !noContent && getContentLength() < 0; 238 } 239 240 abstract protected Stream getStream(); 241 242 }