1 module hunt.http.codec.http.stream.Http2Flusher;
2 
3 import hunt.http.codec.http.stream.FlowControlStrategy;
4 import hunt.http.codec.http.stream.Http2Session;
5 import hunt.http.codec.http.stream.StreamSPI;
6 
7 import hunt.http.codec.http.frame.Frame;
8 import hunt.http.codec.http.frame.FrameType;
9 import hunt.http.codec.http.frame.WindowUpdateFrame;
10 
11 import hunt.util.concurrent.Locker;
12 import hunt.util.concurrent.IteratingCallback;
13 import hunt.lang.exception;
14 import hunt.util.functional;
15 
16 import hunt.container;
17 import hunt.logging;
18 import std.format;
19 
20 
21 /**
22 */
23 class Http2Flusher : IteratingCallback {
24 
25     alias Action = IteratingCallback.Action;
26 
27     private Queue!(WindowEntry) windows;// = new ArrayDeque!()();
28     private Deque!(Entry) frames;// = new ArrayDeque!()();
29     private Queue!(Entry) entries;// = new ArrayDeque!()();
30     private List!(Entry) actives;// = new ArrayList!()();
31     private Http2Session session;
32     private Queue!ByteBuffer buffers;// = new LinkedList!()();
33     private Entry stalled;
34     private Exception terminated;
35 
36     this(Http2Session session) {
37         windows = new LinkedList!(WindowEntry)();
38         frames = new LinkedList!(Entry)();
39         entries = new LinkedList!(Entry)();
40         actives = new ArrayList!(Entry)();
41         buffers = new LinkedList!(ByteBuffer)();
42         this.session = session;
43     }
44 
45     void window(StreamSPI stream, WindowUpdateFrame frame) {
46         Exception closed;
47         synchronized (this) {
48             closed = terminated;
49             if (closed is null)
50                 windows.offer(new WindowEntry(stream, frame));
51         }
52         // Flush stalled data.
53         if (closed is null)
54             iterate();
55     }
56 
57     bool prepend(Entry entry) {
58         Exception e;
59         synchronized (this) {
60             e = terminated;
61             if (e is null) {
62                 frames.offerFirst(entry);
63                 version(HUNT_DEBUG) {
64                     tracef("Prepended %s, frames=%s", entry.toString(), frames.size());
65                 }
66             }
67         }
68         if (e is null)
69             return true;
70         onClosed(entry, e);
71         return false;
72     }
73 
74     bool append(Entry entry) {
75         Exception closed;
76         synchronized (this) {
77             closed = terminated;
78             if (closed is null) {
79                 frames.offer(entry);
80                 version(HUNT_DEBUG) {
81                     tracef("Appended %s, frames=%s", entry.toString(), frames.size());
82                 }
83             }
84         }
85         if (closed is null) {
86             return true;
87         }
88         onClosed(entry, closed);
89         return false;
90     }
91 
92     private int getWindowQueueSize() {
93         return windows.size();
94     }
95 
96     int getFrameQueueSize() {
97         return frames.size();
98     }
99 
100     override
101     protected Action process() {
102         version(HUNT_DEBUG) {
103             tracef("Flushing %s", session.toString());
104         }
105         synchronized (this) {
106             if (terminated !is null) {
107                 throw terminated;
108             }
109 
110             while (!windows.isEmpty()) {
111                 WindowEntry entry = windows.poll();
112                 entry.perform();
113             }
114 
115             foreach (Entry entry ; frames) {
116                 entries.offer(entry);
117                 actives.add(entry);
118             }
119             frames.clear();
120         }
121 
122 
123         if (entries.isEmpty()) {
124             version(HUNT_DEBUG) {
125                 tracef("Flushed %s", session.toString());
126             }
127             return Action.IDLE;
128         }
129 
130         while (!entries.isEmpty()) {
131             Entry entry = entries.poll();
132             version(HUNT_DEBUG) {
133                 tracef("Processing %s", entry.toString());
134             }
135             // If the stream has been reset or removed, don't send the frame.
136             if (entry.isStale()) {
137                 version(HUNT_DEBUG) {
138                     tracef("Stale %s", entry.toString());
139                 }
140                 continue;
141             }
142 
143             try {
144                 if (entry.generate(buffers)) {
145                     if (entry.dataRemaining() > 0)
146                         entries.offer(entry);
147                 } else {
148                     if (stalled is null)
149                         stalled = entry;
150                 }
151             } catch (Exception failure) {
152                 // Failure to generate the entry is catastrophic.
153                 version(HUNT_DEBUG) {
154                     trace("Failure generating frame " ~ entry.frame.toString(), failure.toString());
155                 }
156                 failed(failure);
157                 return Action.SUCCEEDED;
158             }
159         }
160 
161         if (buffers.isEmpty()) {
162             complete();
163             return Action.IDLE;
164         }
165 
166         version(HUNT_DEBUG) {
167             tracef("Writing %s buffers (%s bytes) for %s frames %s",
168                     buffers.size(), BufferUtils.remaining(buffers), actives.size(), actives.toString());
169         }
170 
171         TcpSession tcpSession =  session.getEndPoint();
172         tcpSession.encode(buffers.toArray());
173         this.succeeded();
174         return Action.SCHEDULED;
175     }
176 
177     override
178     void succeeded() {
179         version(HUNT_DEBUG) {
180             tracef("Written %s frames for %s", actives.size(), actives.toString());
181         }
182         complete();
183 
184         super.succeeded();
185     }
186 
187     private void complete() {
188         buffers.clear();
189 
190         // actives.forEach(Entry::complete);
191         foreach(Entry ac; actives)
192             ac.complete();
193 
194         if (stalled !is null) {
195             // We have written part of the frame, but there is more to write.
196             // The API will not allow to send two data frames for the same
197             // stream so we append the unfinished frame at the end to allow
198             // better interleaving with other streams.
199             int index = actives.indexOf(stalled);
200             for (int i = index; i < actives.size(); ++i) {
201                 Entry entry = actives.get(i);
202                 if (entry.dataRemaining() > 0)
203                     append(entry);
204             }
205             for (int i = 0; i < index; ++i) {
206                 Entry entry = actives.get(i);
207                 if (entry.dataRemaining() > 0)
208                     append(entry);
209             }
210             stalled = null;
211         }
212 
213         actives.clear();
214     }
215 
216     override
217     protected void onCompleteSuccess() {
218         throw new IllegalStateException("");
219     }
220 
221     override
222     protected void onCompleteFailure(Exception x) {
223         buffers.clear();
224 
225         Exception closed;
226         synchronized (this) {
227             closed = terminated;
228             terminated = x;
229             version(HUNT_DEBUG) {
230                 tracef("%s, active/queued=%s/%s", closed !is null ? "Closing" : "Failing", actives.size(), frames.size());
231             }
232             actives.addAll(frames);
233             frames.clear();
234         }
235 
236         foreach(Entry entry; actives)
237             entry.failed(x);
238         actives.clear();
239 
240         // If the failure came from within the
241         // flusher, we need to close the connection.
242         if (closed is null)
243             session.abort(x);
244     }
245 
246     void terminate(Exception cause) {
247         Exception closed;
248         synchronized (this) {
249             closed = terminated;
250             terminated = cause;
251             version(HUNT_DEBUG) {
252                 tracef("%s", closed !is null ? "Terminated" : "Terminating");
253             }
254         }
255         if (closed is null)
256             iterate();
257     }
258 
259     private void onClosed(Entry entry, Exception failure) {
260         entry.failed(failure);
261     }
262 
263     override
264     string toString() {
265         return format("%s[window_queue=%d,frame_queue=%d,actives=%d]",
266                 super.toString(),
267                 getWindowQueueSize(),
268                 getFrameQueueSize(),
269                 actives.size());
270     }
271 
272     static abstract class Entry : NestedCallback {
273         Frame frame;
274         StreamSPI stream;
275 
276         protected this(Frame frame, StreamSPI stream, Callback callback) {
277             super(callback);
278             this.frame = frame;
279             this.stream = stream;
280         }
281 
282         int dataRemaining() {
283             return 0;
284         }
285 
286         protected abstract bool generate(Queue!ByteBuffer buffers);
287 
288         private void complete() {
289             if (isStale())
290                 failed(new EofException("reset"));
291             else
292                 succeeded();
293         }
294 
295         override
296         void failed(Exception x) {
297             if (stream !is null) {
298                 stream.close();
299                 stream.getSession().removeStream(stream);
300             }
301             super.failed(x);
302         }
303 
304         private bool isStale() {
305             return !isProtocol() && stream !is null && stream.isReset();
306         }
307 
308         private bool isProtocol() {
309             switch (frame.getType()) {
310                 case FrameType.DATA:
311                 case FrameType.HEADERS:
312                 case FrameType.PUSH_PROMISE:
313                 case FrameType.CONTINUATION:
314                     return false;
315                 case FrameType.PRIORITY:
316                 case FrameType.RST_STREAM:
317                 case FrameType.SETTINGS:
318                 case FrameType.PING:
319                 case FrameType.GO_AWAY:
320                 case FrameType.WINDOW_UPDATE:
321                 case FrameType.PREFACE:
322                 case FrameType.DISCONNECT:
323                     return true;
324                 default:
325                     throw new IllegalStateException("");
326             }
327         }
328 
329         override
330         string toString() {
331             return frame.toString();
332         }
333     }
334 
335     private class WindowEntry {
336         private StreamSPI stream;
337         private WindowUpdateFrame frame;
338 
339         this(StreamSPI stream, WindowUpdateFrame frame) {
340             this.stream = stream;
341             this.frame = frame;
342         }
343 
344         void perform() {
345             FlowControlStrategy flowControl = session.getFlowControlStrategy();
346             flowControl.onWindowUpdate(session, stream, frame);
347         }
348     }
349 
350 }