1 module hunt.http.codec.http.stream.AbstractHttp1OutputStream;
2 
3 import hunt.http.codec.http.stream.HttpOutputStream;
4 import hunt.http.codec.http.encode.HttpGenerator;
5 import hunt.http.codec.http.model.MetaData;
6 
7 import hunt.container.ByteBuffer;
8 import hunt.container.BufferUtils;
9 import hunt.net.Session;
10 import hunt.lang.exception;
11 import hunt.logging;
12 
13 
14 /**
15 */
16 abstract class AbstractHttp1OutputStream : HttpOutputStream {
17 
18     this(MetaData metaData, bool clientMode) {
19         super(metaData, clientMode);
20     }
21 
22     override void commit() {
23         commit(cast(ByteBuffer)null);
24     }
25 
26     protected void commit(ByteBuffer data) {
27         if (closed)
28             return;
29 
30         if (committed)
31             return;
32         
33         version(HUNT_DEBUG) {
34             infof("committing data: %s", data.toString());
35         }
36 
37         HttpGenerator generator = getHttpGenerator();
38         Session tcpSession = getSession();
39         HttpGenerator.Result generatorResult;
40         ByteBuffer header = getHeaderByteBuffer();
41 
42         generatorResult = generate(metaData, header, null, data, false);
43         if (generatorResult == HttpGenerator.Result.FLUSH && 
44             generator.getState() == HttpGenerator.State.COMMITTED) {
45             if (data !is null) {
46                 ByteBuffer[] headerAndData = [header, data];
47                 tcpSession.encode(headerAndData);
48             } else {
49                 tcpSession.encode(header);
50             }
51             committed = true;
52         } else {
53             generateHttpMessageExceptionally(generatorResult, generator.getState(), 
54                 HttpGenerator.Result.FLUSH, HttpGenerator.State.COMMITTED);
55         }
56     }
57 
58     override void write(ByteBuffer data){
59         if (closed)
60             return;
61 
62         if (!data.hasRemaining())
63             return;
64 
65         HttpGenerator generator = getHttpGenerator();
66         Session tcpSession = getSession();
67         HttpGenerator.Result generatorResult;
68 
69         if (!committed) {
70             commit(data);
71         } else {
72             if (generator.isChunking()) {
73                 ByteBuffer chunk = BufferUtils.allocate(HttpGenerator.CHUNK_SIZE);
74 
75                 generatorResult = generate(null, null, chunk, data, false);
76                 if (generatorResult == HttpGenerator.Result.FLUSH && 
77                     generator.getState() == HttpGenerator.State.COMMITTED) {
78                     ByteBuffer[] chunkAndData = [chunk, data];
79                     tcpSession.encode(chunkAndData);
80                 } else {
81                     generateHttpMessageExceptionally(generatorResult, generator.getState(), 
82                         HttpGenerator.Result.FLUSH, HttpGenerator.State.COMMITTED);
83                 }
84             } else {
85                 generatorResult = generate(null, null, null, data, false);
86                 if (generatorResult == HttpGenerator.Result.FLUSH && 
87                     generator.getState() == HttpGenerator.State.COMMITTED) {
88                     tcpSession.encode(data);
89                 } else {
90                     generateHttpMessageExceptionally(generatorResult, generator.getState(), 
91                         HttpGenerator.Result.FLUSH, HttpGenerator.State.COMMITTED);
92                 }
93             }
94         }
95     }
96 
97     override void close() {
98         if (closed)
99             return;
100 
101         try {
102             version(HUNT_DEBUG) trace("http1 output stream is closing");
103             HttpGenerator generator = getHttpGenerator();
104             Session tcpSession = getSession();
105             HttpGenerator.Result generatorResult;
106 
107             if (!committed) {
108                 ByteBuffer header = getHeaderByteBuffer();
109                 generatorResult = generate(metaData, header, null, null, true);
110                 if (generatorResult == HttpGenerator.Result.FLUSH && 
111                     generator.getState() == HttpGenerator.State.COMPLETING) {
112                     tcpSession.encode(header);
113                     generateLastData(generator);
114                 } else {
115                     generateHttpMessageExceptionally(generatorResult, generator.getState(), 
116                         HttpGenerator.Result.FLUSH, HttpGenerator.State.COMPLETING);
117                 }
118                 committed = true;
119             } else {
120                 if (generator.isChunking()) {
121                     version (HUNT_DEBUG) tracef("http1 output stream is generating chunk");
122                     generatorResult = generate(null, null, null, null, true);
123                     if (generatorResult == HttpGenerator.Result.CONTINUE && 
124                         generator.getState() == HttpGenerator.State.COMPLETING) {
125                         generatorResult = generate(null, null, null, null, true);
126                         if (generatorResult == HttpGenerator.Result.NEED_CHUNK && 
127                             generator.getState() == HttpGenerator.State.COMPLETING) {
128                             generateLastChunk(generator, tcpSession);
129                         } else if (generatorResult == HttpGenerator.Result.NEED_CHUNK_TRAILER && 
130                             generator.getState() == HttpGenerator.State.COMPLETING) {
131                             generateTrailer(generator, tcpSession);
132                         }
133                     } else {
134                         generateHttpMessageExceptionally(generatorResult, generator.getState(), 
135                             HttpGenerator.Result.CONTINUE, HttpGenerator.State.COMPLETING);
136                     }
137                 } else {
138                     generatorResult = generate(null, null, null, null, true);
139                     if (generatorResult == HttpGenerator.Result.CONTINUE && 
140                         generator.getState() == HttpGenerator.State.COMPLETING) {
141                         generateLastData(generator);
142                     } else {
143                         generateHttpMessageExceptionally(generatorResult, generator.getState(), 
144                             HttpGenerator.Result.CONTINUE, HttpGenerator.State.COMPLETING);
145                     }
146                 }
147             }
148         } finally {
149             closed = true;
150             version(HUNT_DEBUG) tracef("http1 output stream closed");
151         }
152     }
153 
154     private void generateLastChunk(HttpGenerator generator, Session tcpSession) {
155         ByteBuffer chunk = BufferUtils.allocate(HttpGenerator.CHUNK_SIZE);
156         HttpGenerator.Result generatorResult = generate(null, null, chunk, null, true);
157         if (generatorResult == HttpGenerator.Result.FLUSH && 
158             generator.getState() == HttpGenerator.State.COMPLETING) {
159             tcpSession.encode(chunk);
160             generateLastData(generator);
161         } else {
162             generateHttpMessageExceptionally(generatorResult, generator.getState(), 
163                 HttpGenerator.Result.FLUSH, HttpGenerator.State.COMPLETING);
164         }
165     }
166 
167     private void generateTrailer(HttpGenerator generator, Session tcpSession) {
168         ByteBuffer trailer = getTrailerByteBuffer();
169         HttpGenerator.Result generatorResult = generate(null, null, trailer, null, true);
170         if (generatorResult == HttpGenerator.Result.FLUSH && 
171             generator.getState() == HttpGenerator.State.COMPLETING) {
172             tcpSession.encode(trailer);
173             generateLastData(generator);
174         } else {
175             generateHttpMessageExceptionally(generatorResult, generator.getState(), 
176                 HttpGenerator.Result.FLUSH, HttpGenerator.State.COMPLETING);
177         }
178     }
179 
180     private void generateLastData(HttpGenerator generator) {
181         HttpGenerator.Result generatorResult = generate(null, null, null, null, true);
182         if (generator.getState() == HttpGenerator.State.END) {
183             if (generatorResult == HttpGenerator.Result.DONE) {
184                 generateHttpMessageSuccessfully();
185             } else if (generatorResult == HttpGenerator.Result.SHUTDOWN_OUT) {
186                 getSession().close();
187             } else {
188                 generateHttpMessageExceptionally(generatorResult, generator.getState(), 
189                     HttpGenerator.Result.DONE, HttpGenerator.State.END);
190             }
191         } else {
192             generateHttpMessageExceptionally(generatorResult, generator.getState(), 
193                 HttpGenerator.Result.DONE, HttpGenerator.State.END);
194         }
195     }
196 
197     protected HttpGenerator.Result generate(MetaData metaData, 
198         ByteBuffer header, ByteBuffer chunk, ByteBuffer content, bool last) {
199         HttpGenerator generator = getHttpGenerator();
200         if (clientMode) {
201             return generator.generateRequest(cast(HttpRequest) metaData, header, chunk, content, last);
202         } else {
203             return generator.generateResponse(cast(HttpResponse) metaData, false, header, chunk, content, last);
204         }
205     }
206 
207     abstract protected ByteBuffer getHeaderByteBuffer();
208 
209     abstract protected ByteBuffer getTrailerByteBuffer();
210 
211     abstract protected Session getSession();
212 
213     abstract protected HttpGenerator getHttpGenerator();
214 
215     abstract protected void generateHttpMessageSuccessfully();
216 
217     abstract protected void generateHttpMessageExceptionally(HttpGenerator.Result actualResult,
218                                                              HttpGenerator.State actualState,
219                                                              HttpGenerator.Result expectedResult,
220                                                              HttpGenerator.State expectedState);
221 
222 }