1 module hunt.http.codec.http.stream.Http2Flusher;
2 
3 import hunt.http.codec.http.stream.FlowControlStrategy;
4 import hunt.http.codec.http.stream.Http2Session;
5 import hunt.http.codec.http.stream.StreamSPI;
6 
7 import hunt.http.codec.http.frame.Frame;
8 import hunt.http.codec.http.frame.FrameType;
9 import hunt.http.codec.http.frame.WindowUpdateFrame;
10 
11 import hunt.collection;
12 import hunt.concurrency.Locker;
13 import hunt.concurrency.IteratingCallback;
14 import hunt.Exceptions;
15 import hunt.logging;
16 import hunt.net.Connection;
17 import hunt.util.Common;
18 
19 import std.format;
20 
21 
22 /**
23 */
24 class Http2Flusher : IteratingCallback {
25 
26     alias Action = IteratingCallback.Action;
27 
28     private Queue!(WindowEntry) windows;// = new ArrayDeque!()();
29     private Deque!(Entry) frames;// = new ArrayDeque!()();
30     private Queue!(Entry) entries;// = new ArrayDeque!()();
31     private List!(Entry) actives;// = new ArrayList!()();
32     private Http2Session session;
33     private Queue!ByteBuffer buffers;// = new LinkedList!()();
34     private Entry stalled;
35     private Exception terminated;
36 
37     this(Http2Session session) {
38         windows = new LinkedList!(WindowEntry)();
39         frames = new LinkedList!(Entry)();
40         entries = new LinkedList!(Entry)();
41         actives = new ArrayList!(Entry)();
42         buffers = new LinkedList!(ByteBuffer)();
43         this.session = session;
44     }
45 
46     void window(StreamSPI stream, WindowUpdateFrame frame) {
47         Exception closed;
48         synchronized (this) {
49             closed = terminated;
50             if (closed is null)
51                 windows.offer(new WindowEntry(stream, frame));
52         }
53         // Flush stalled data.
54         if (closed is null)
55             iterate();
56     }
57 
58     bool prepend(Entry entry) {
59         Exception e;
60         synchronized (this) {
61             e = terminated;
62             if (e is null) {
63                 frames.offerFirst(entry);
64                 version(HUNT_HTTP_DEBUG) {
65                     tracef("Prepended %s, frames=%s", entry.toString(), frames.size());
66                 }
67             }
68         }
69         if (e is null)
70             return true;
71         onClosed(entry, e);
72         return false;
73     }
74 
75     bool append(Entry entry) {
76         Exception closed;
77         synchronized (this) {
78             closed = terminated;
79             if (closed is null) {
80                 frames.offer(entry);
81                 version(HUNT_DEBUG) {
82                     // tracef("Appended %s, frames=%s", entry.toString(), frames.size());
83                 }
84             }
85         }
86         if (closed is null) {
87             return true;
88         }
89         onClosed(entry, closed);
90         return false;
91     }
92 
93     private int getWindowQueueSize() {
94         return windows.size();
95     }
96 
97     int getFrameQueueSize() {
98         return frames.size();
99     }
100 
101     override
102     protected Action process() {
103         version(HUNT_DEBUG) {
104             // tracef("Flushing %s", session.toString());
105         }
106         synchronized (this) {
107             if (terminated !is null) {
108                 throw terminated;
109             }
110 
111             while (!windows.isEmpty()) {
112                 //logInfo("!isEmpty--------------------------------");
113                 WindowEntry entry = windows.poll();
114                 entry.perform();
115             }
116 
117             foreach (Entry entry ; frames) {
118                 entries.offer(entry);
119                 actives.add(entry);
120             }
121             frames.clear();
122         }
123 
124 
125         if (entries.isEmpty()) {
126             version(HUNT_DEBUG) {
127                 // tracef("Flushed %s", session.toString());
128             }
129             return Action.IDLE;
130         }
131 
132         while (!entries.isEmpty()) {
133             Entry entry = entries.poll();
134             version(HUNT_DEBUG) {
135                 // tracef("Processing %s", entry.toString());
136             }
137             // If the stream has been reset or removed, don't send the frame.
138             if (entry.isStale()) {
139                 version(HUNT_DEBUG) {
140                     tracef("Stale %s", entry.toString());
141                 }
142                 continue;
143             }
144 
145             try {
146                 if (entry.generate(buffers)) {
147                     if (entry.dataRemaining() > 0)
148                         entries.offer(entry);
149                 } else {
150                     if (stalled is null)
151                         stalled = entry;
152                 }
153             } catch (Exception failure) {
154                 // Failure to generate the entry is catastrophic.
155                 version(HUNT_DEBUG) {
156                     trace("Failure generating frame " ~ entry.frame.toString(), failure.toString());
157                 }
158                 failed(failure);
159                 return Action.SUCCEEDED;
160             }
161         }
162 
163         if (buffers.isEmpty()) {
164             complete();
165             return Action.IDLE;
166         }
167 
168         version(HUNT_HTTP_DEBUG) {
169             tracef("Writing %s buffers (%s bytes) for %s frames %s",
170                     buffers.size(), BufferUtils.remaining(buffers.toArray()), actives.size(), actives.toString());
171         }
172 
173         Connection tcpSession =  session.getEndPoint();
174         foreach(ByteBuffer buffer; buffers) {
175             tcpSession.encode(buffer);
176         }
177         this.succeeded();
178         return Action.SCHEDULED;
179     }
180 
181     override
182     void succeeded() {
183         version(HUNT_HTTP_DEBUG) {
184             tracef("Written %s frames for %s", actives.size(), actives.toString());
185         }
186         complete();
187 
188         super.succeeded();
189     }
190 
191     private void complete() {
192         buffers.clear();
193 
194         // actives.forEach(Entry::complete);
195         foreach(Entry ac; actives)
196             ac.complete();
197 
198         if (stalled !is null) {
199             // We have written part of the frame, but there is more to write.
200             // The API will not allow to send two data frames for the same
201             // stream so we append the unfinished frame at the end to allow
202             // better interleaving with other streams.
203             int index = actives.indexOf(stalled);
204             for (int i = index; i < actives.size(); ++i) {
205                 Entry entry = actives.get(i);
206                 if (entry.dataRemaining() > 0)
207                     append(entry);
208             }
209             for (int i = 0; i < index; ++i) {
210                 Entry entry = actives.get(i);
211                 if (entry.dataRemaining() > 0)
212                     append(entry);
213             }
214             stalled = null;
215         }
216 
217         actives.clear();
218     }
219 
220     override
221     protected void onCompleteSuccess() {
222         throw new IllegalStateException("");
223     }
224 
225     override
226     protected void onCompleteFailure(Exception x) {
227         buffers.clear();
228 
229         Exception closed;
230         synchronized (this) {
231             closed = terminated;
232             terminated = x;
233             version(HUNT_DEBUG) {
234                 tracef("%s, active/queued=%s/%s", closed !is null ? "Closing" : "Failing", actives.size(), frames.size());
235             }
236             actives.addAll(frames);
237             frames.clear();
238         }
239 
240         foreach(Entry entry; actives)
241             entry.failed(x);
242         actives.clear();
243 
244         // If the failure came from within the
245         // flusher, we need to close the connection.
246         if (closed is null)
247             session.abort(x);
248     }
249 
250     void terminate(Exception cause) {
251         Exception closed;
252         synchronized (this) {
253             closed = terminated;
254             terminated = cause;
255             version(HUNT_DEBUG) {
256                 tracef("%s", closed !is null ? "Terminated" : "Terminating");
257             }
258         }
259         if (closed is null)
260             iterate();
261     }
262 
263     private void onClosed(Entry entry, Exception failure) {
264         entry.failed(failure);
265     }
266 
267     override
268     string toString() {
269         return format("%s[window_queue=%d,frame_queue=%d,actives=%d]",
270                 super.toString(),
271                 getWindowQueueSize(),
272                 getFrameQueueSize(),
273                 actives.size());
274     }
275 
276     static abstract class Entry : NestedCallback {
277         Frame frame;
278         StreamSPI stream;
279 
280         protected this(Frame frame, StreamSPI stream, Callback callback) {
281             super(callback);
282             this.frame = frame;
283             this.stream = stream;
284         }
285 
286         int dataRemaining() {
287             return 0;
288         }
289 
290         protected abstract bool generate(Queue!ByteBuffer buffers);
291 
292         private void complete() {
293             if (isStale())
294                 failed(new EofException("reset"));
295             else
296                 succeeded();
297         }
298 
299         override
300         void failed(Exception x) {
301             if (stream !is null) {
302                 stream.close();
303                 stream.getSession().removeStream(stream);
304             }
305             super.failed(x);
306         }
307 
308         private bool isStale() {
309             return !isProtocol() && stream !is null && stream.isReset();
310         }
311 
312         private bool isProtocol() {
313             switch (frame.getType()) {
314                 case FrameType.DATA:
315                 case FrameType.HEADERS:
316                 case FrameType.PUSH_PROMISE:
317                 case FrameType.CONTINUATION:
318                     return false;
319                 case FrameType.PRIORITY:
320                 case FrameType.RST_STREAM:
321                 case FrameType.SETTINGS:
322                 case FrameType.PING:
323                 case FrameType.GO_AWAY:
324                 case FrameType.WINDOW_UPDATE:
325                 case FrameType.PREFACE:
326                 case FrameType.DISCONNECT:
327                     return true;
328                 default:
329                     throw new IllegalStateException("");
330             }
331         }
332 
333         override
334         string toString() {
335             return frame.toString();
336         }
337     }
338 
339     private class WindowEntry {
340         private StreamSPI stream;
341         private WindowUpdateFrame frame;
342 
343         this(StreamSPI stream, WindowUpdateFrame frame) {
344             this.stream = stream;
345             this.frame = frame;
346         }
347 
348         void perform() {
349             FlowControlStrategy flowControl = session.getFlowControlStrategy();
350             flowControl.onWindowUpdate(session, stream, frame);
351         }
352     }
353 
354 }