1 module hunt.http.codec.http.stream.AbstractHttp1OutputStream; 2 3 import hunt.http.codec.http.stream.HttpOutputStream; 4 import hunt.http.codec.http.encode.HttpGenerator; 5 import hunt.http.codec.http.model.MetaData; 6 7 import hunt.container.ByteBuffer; 8 import hunt.container.BufferUtils; 9 import hunt.net.Session; 10 import hunt.lang.exception; 11 import hunt.logging; 12 13 14 /** 15 */ 16 abstract class AbstractHttp1OutputStream : HttpOutputStream { 17 18 this(MetaData metaData, bool clientMode) { 19 super(metaData, clientMode); 20 } 21 22 override void commit() { 23 commit(cast(ByteBuffer)null); 24 } 25 26 protected void commit(ByteBuffer data) { 27 if (closed) 28 return; 29 30 if (committed) 31 return; 32 33 version(HUNT_DEBUG) { 34 infof("committing data: %s", data.toString()); 35 } 36 37 HttpGenerator generator = getHttpGenerator(); 38 Session tcpSession = getSession(); 39 HttpGenerator.Result generatorResult; 40 ByteBuffer header = getHeaderByteBuffer(); 41 42 generatorResult = generate(metaData, header, null, data, false); 43 if (generatorResult == HttpGenerator.Result.FLUSH && 44 generator.getState() == HttpGenerator.State.COMMITTED) { 45 if (data !is null) { 46 ByteBuffer[] headerAndData = [header, data]; 47 tcpSession.encode(headerAndData); 48 } else { 49 tcpSession.encode(header); 50 } 51 committed = true; 52 } else { 53 generateHttpMessageExceptionally(generatorResult, generator.getState(), 54 HttpGenerator.Result.FLUSH, HttpGenerator.State.COMMITTED); 55 } 56 } 57 58 override void write(ByteBuffer data){ 59 if (closed) 60 return; 61 62 if (!data.hasRemaining()) 63 return; 64 65 HttpGenerator generator = getHttpGenerator(); 66 Session tcpSession = getSession(); 67 HttpGenerator.Result generatorResult; 68 69 if (!committed) { 70 commit(data); 71 } else { 72 if (generator.isChunking()) { 73 ByteBuffer chunk = BufferUtils.allocate(HttpGenerator.CHUNK_SIZE); 74 75 generatorResult = generate(null, null, chunk, data, false); 76 if (generatorResult == HttpGenerator.Result.FLUSH && 77 generator.getState() == HttpGenerator.State.COMMITTED) { 78 ByteBuffer[] chunkAndData = [chunk, data]; 79 tcpSession.encode(chunkAndData); 80 } else { 81 generateHttpMessageExceptionally(generatorResult, generator.getState(), 82 HttpGenerator.Result.FLUSH, HttpGenerator.State.COMMITTED); 83 } 84 } else { 85 generatorResult = generate(null, null, null, data, false); 86 if (generatorResult == HttpGenerator.Result.FLUSH && 87 generator.getState() == HttpGenerator.State.COMMITTED) { 88 tcpSession.encode(data); 89 } else { 90 generateHttpMessageExceptionally(generatorResult, generator.getState(), 91 HttpGenerator.Result.FLUSH, HttpGenerator.State.COMMITTED); 92 } 93 } 94 } 95 } 96 97 override void close() { 98 if (closed) 99 return; 100 101 try { 102 version(HUNT_DEBUG) trace("http1 output stream is closing"); 103 HttpGenerator generator = getHttpGenerator(); 104 Session tcpSession = getSession(); 105 HttpGenerator.Result generatorResult; 106 107 if (!committed) { 108 ByteBuffer header = getHeaderByteBuffer(); 109 generatorResult = generate(metaData, header, null, null, true); 110 if (generatorResult == HttpGenerator.Result.FLUSH && 111 generator.getState() == HttpGenerator.State.COMPLETING) { 112 tcpSession.encode(header); 113 generateLastData(generator); 114 } else { 115 generateHttpMessageExceptionally(generatorResult, generator.getState(), 116 HttpGenerator.Result.FLUSH, HttpGenerator.State.COMPLETING); 117 } 118 committed = true; 119 } else { 120 if (generator.isChunking()) { 121 version (HUNT_DEBUG) tracef("http1 output stream is generating chunk"); 122 generatorResult = generate(null, null, null, null, true); 123 if (generatorResult == HttpGenerator.Result.CONTINUE && 124 generator.getState() == HttpGenerator.State.COMPLETING) { 125 generatorResult = generate(null, null, null, null, true); 126 if (generatorResult == HttpGenerator.Result.NEED_CHUNK && 127 generator.getState() == HttpGenerator.State.COMPLETING) { 128 generateLastChunk(generator, tcpSession); 129 } else if (generatorResult == HttpGenerator.Result.NEED_CHUNK_TRAILER && 130 generator.getState() == HttpGenerator.State.COMPLETING) { 131 generateTrailer(generator, tcpSession); 132 } 133 } else { 134 generateHttpMessageExceptionally(generatorResult, generator.getState(), 135 HttpGenerator.Result.CONTINUE, HttpGenerator.State.COMPLETING); 136 } 137 } else { 138 generatorResult = generate(null, null, null, null, true); 139 if (generatorResult == HttpGenerator.Result.CONTINUE && 140 generator.getState() == HttpGenerator.State.COMPLETING) { 141 generateLastData(generator); 142 } else { 143 generateHttpMessageExceptionally(generatorResult, generator.getState(), 144 HttpGenerator.Result.CONTINUE, HttpGenerator.State.COMPLETING); 145 } 146 } 147 } 148 } finally { 149 closed = true; 150 version(HUNT_DEBUG) tracef("http1 output stream closed"); 151 } 152 } 153 154 private void generateLastChunk(HttpGenerator generator, Session tcpSession) { 155 ByteBuffer chunk = BufferUtils.allocate(HttpGenerator.CHUNK_SIZE); 156 HttpGenerator.Result generatorResult = generate(null, null, chunk, null, true); 157 if (generatorResult == HttpGenerator.Result.FLUSH && 158 generator.getState() == HttpGenerator.State.COMPLETING) { 159 tcpSession.encode(chunk); 160 generateLastData(generator); 161 } else { 162 generateHttpMessageExceptionally(generatorResult, generator.getState(), 163 HttpGenerator.Result.FLUSH, HttpGenerator.State.COMPLETING); 164 } 165 } 166 167 private void generateTrailer(HttpGenerator generator, Session tcpSession) { 168 ByteBuffer trailer = getTrailerByteBuffer(); 169 HttpGenerator.Result generatorResult = generate(null, null, trailer, null, true); 170 if (generatorResult == HttpGenerator.Result.FLUSH && 171 generator.getState() == HttpGenerator.State.COMPLETING) { 172 tcpSession.encode(trailer); 173 generateLastData(generator); 174 } else { 175 generateHttpMessageExceptionally(generatorResult, generator.getState(), 176 HttpGenerator.Result.FLUSH, HttpGenerator.State.COMPLETING); 177 } 178 } 179 180 private void generateLastData(HttpGenerator generator) { 181 HttpGenerator.Result generatorResult = generate(null, null, null, null, true); 182 if (generator.getState() == HttpGenerator.State.END) { 183 if (generatorResult == HttpGenerator.Result.DONE) { 184 generateHttpMessageSuccessfully(); 185 } else if (generatorResult == HttpGenerator.Result.SHUTDOWN_OUT) { 186 getSession().close(); 187 } else { 188 generateHttpMessageExceptionally(generatorResult, generator.getState(), 189 HttpGenerator.Result.DONE, HttpGenerator.State.END); 190 } 191 } else { 192 generateHttpMessageExceptionally(generatorResult, generator.getState(), 193 HttpGenerator.Result.DONE, HttpGenerator.State.END); 194 } 195 } 196 197 protected HttpGenerator.Result generate(MetaData metaData, 198 ByteBuffer header, ByteBuffer chunk, ByteBuffer content, bool last) { 199 HttpGenerator generator = getHttpGenerator(); 200 if (clientMode) { 201 return generator.generateRequest(cast(HttpRequest) metaData, header, chunk, content, last); 202 } else { 203 return generator.generateResponse(cast(HttpResponse) metaData, false, header, chunk, content, last); 204 } 205 } 206 207 abstract protected ByteBuffer getHeaderByteBuffer(); 208 209 abstract protected ByteBuffer getTrailerByteBuffer(); 210 211 abstract protected Session getSession(); 212 213 abstract protected HttpGenerator getHttpGenerator(); 214 215 abstract protected void generateHttpMessageSuccessfully(); 216 217 abstract protected void generateHttpMessageExceptionally(HttpGenerator.Result actualResult, 218 HttpGenerator.State actualState, 219 HttpGenerator.Result expectedResult, 220 HttpGenerator.State expectedState); 221 222 }