1 module hunt.http.codec.http.stream.BufferingFlowControlStrategy;
2 
3 import hunt.http.codec.http.stream.AbstractFlowControlStrategy;
4 import hunt.http.codec.http.stream.StreamSPI;
5 import hunt.http.codec.http.stream.SessionSPI;
6 
7 import hunt.http.codec.http.frame.Frame;
8 import hunt.http.codec.http.frame.WindowUpdateFrame;
9 
10 import hunt.collection.Map;
11 import hunt.logging;
12 import hunt.util.Common;
13 
14 import std.format;
15 
16 /**
17  * <p>
18  * A flow control strategy that accumulates updates and emits window control
19  * frames when the accumulated value reaches a threshold.
20  * </p>
21  * <p>
22  * The sender flow control window is represented in the receiver as two buckets:
23  * a bigger bucket, initially full, that is drained when data is received, and a
24  * smaller bucket, initially empty, that is filled when data is consumed. Only
25  * the smaller bucket can refill the bigger bucket.
26  * </p>
27  * <p>
28  * The smaller bucket is defined as a fraction of the bigger bucket.
29  * </p>
30  * <p>
31  * For a more visual representation, see the
32  * <a href="http://en.wikipedia.org/wiki/Shishi-odoshi">rocking bamboo
33  * fountain</a>.
34  * </p>
35  * <p>
36  * The algorithm works in this way.
37  * </p>
38  * <p>
39  * The initial bigger bucket (BB) capacity is 100, and let's imagine the smaller
40  * bucket (SB) being 40% of the bigger bucket: 40.
41  * </p>
42  * <p>
43  * The receiver receives a data frame of 60, so now BB=40; the data frame is
44  * passed to the application that consumes 25, so now SB=25. Since SB is not
45  * full, no window control frames are emitted.
46  * </p>
47  * <p>
48  * The application consumes other 20, so now SB=45. Since SB is full, its 45 are
49  * transferred to BB, which is now BB=85, and a window control frame is sent
50  * with delta=45.
51  * </p>
52  * <p>
53  * The application consumes the remaining 15, so now SB=15, and no window
54  * control frame is emitted.
55  * </p>
56  */
57 class BufferingFlowControlStrategy :AbstractFlowControlStrategy {
58     // private AtomicInteger maxSessionRecvWindow = new AtomicInteger(DEFAULT_WINDOW_SIZE);
59     // private AtomicInteger sessionLevel = new AtomicInteger();
60     // private AtomicInteger[StreamSPI] streamLevels = new ConcurrentHashMap<>();
61     private int maxSessionRecvWindow;
62     private int sessionLevel;
63     private int[StreamSPI] streamLevels; // = new ConcurrentHashMap<>();
64     private float bufferRatio;
65 
66     this(float bufferRatio) {
67         this(DEFAULT_WINDOW_SIZE, bufferRatio);
68     }
69 
70     this(int initialStreamSendWindow, float bufferRatio) {
71         super(initialStreamSendWindow);
72         this.bufferRatio = bufferRatio;
73     }
74 
75     float getBufferRatio() {
76         return bufferRatio;
77     }
78 
79     void setBufferRatio(float bufferRatio) {
80         this.bufferRatio = bufferRatio;
81     }
82 
83     override
84     void onStreamCreated(StreamSPI stream) {
85         super.onStreamCreated(stream);
86         streamLevels[stream] = 0;
87     }
88 
89     override
90     void onStreamDestroyed(StreamSPI stream) {
91         streamLevels.remove(stream);
92         super.onStreamDestroyed(stream);
93     }
94 
95     override
96     void onDataConsumed(SessionSPI session, StreamSPI stream, int length) {
97         if (length <= 0) {
98             return;
99         }
100         float ratio = bufferRatio;
101 
102         WindowUpdateFrame windowFrame = null;
103         sessionLevel += length;
104         int level = sessionLevel;
105         int maxLevel = cast(int) (maxSessionRecvWindow * ratio);
106         if (level > maxLevel) {
107             // if (sessionLevel.compareAndSet(level, 0)) 
108             if(sessionLevel == level)
109             {
110                 sessionLevel = 0;
111                 session.updateRecvWindow(level);
112                 version(HUNT_DEBUG) {
113                     tracef("Data consumed, %s bytes, updated session recv window by %s/%s for %s", length, level,
114                             maxLevel, session);
115                 }
116                 windowFrame = new WindowUpdateFrame(0, level);
117             } else {
118                 version(HUNT_DEBUG) {
119                     tracef("Data consumed, %s bytes, concurrent session recv window level %s/%s for %s", length, sessionLevel, maxLevel, session);
120                 }
121             }
122         } else {
123             version(HUNT_DEBUG) {
124                 tracef("Data consumed, %s bytes, session recv window level %s/%s for %s", length, level, maxLevel, session);
125             }
126         }
127 
128         Frame[] windowFrames = Frame.EMPTY_ARRAY;
129         if (stream !is null) {
130             if (stream.isRemotelyClosed()) {
131                 version(HUNT_DEBUG) {
132                     tracef("Data consumed, %s bytes, ignoring update stream recv window for remotely closed %s", length, stream);
133                 }
134             } else {
135                 int streamLevel = streamLevels[stream];
136                 // if (streamLevel != null) {
137                     streamLevel += length;
138                     level = streamLevel; // streamLevel.addAndGet(length);
139                     maxLevel = cast(int) (getInitialStreamRecvWindow() * ratio);
140                     if (level > maxLevel) {
141                         level = streamLevel; streamLevel = 0;
142                         stream.updateRecvWindow(level);
143                         version(HUNT_DEBUG) {
144                             tracef("Data consumed, %s bytes, updated stream recv window by %s/%s for %s", length, level, maxLevel, stream);
145                         }
146                         WindowUpdateFrame frame = new WindowUpdateFrame(stream.getId(), level);
147                         if (windowFrame is null) {
148                             windowFrame = frame;
149                         } else {
150                             windowFrames = [frame];
151                         }
152                     } else {
153                         version(HUNT_DEBUG) {
154                             tracef("Data consumed, %s bytes, stream recv window level %s/%s for %s", length, level, maxLevel, stream);
155                         }
156                     }
157                 // }
158             }
159         }
160 
161         if (windowFrame !is null) {
162             session.frames(stream, Callback.NOOP, windowFrame, windowFrames);
163         }
164     }
165 
166     override
167     void windowUpdate(SessionSPI session, StreamSPI stream, WindowUpdateFrame frame) {
168         super.windowUpdate(session, stream, frame);
169 
170         // Window updates cannot be negative.
171         // The SettingsFrame.INITIAL_WINDOW_SIZE setting
172         // only influences the *stream* window size.
173         // Therefore the session window can only be enlarged,
174         // and here we keep track of its max value.
175 
176         // Updating the max session recv window is done here
177         // so that if a peer decides to send an unilateral
178         // window update to enlarge the session window,
179         // without the corresponding data consumption, here
180         // we can track it.
181         // Note that it is not perfect, since there is a time
182         // window between the session recv window being updated
183         // before the window update frame is sent, and the
184         // invocation of this method: in between data may arrive
185         // and reduce the session recv window size.
186         // But eventually the max value will be seen.
187 
188         // Note that we cannot avoid the time window described
189         // above by updating the session recv window from here
190         // because there is a race between the sender and the
191         // receiver: the sender may receive a window update and
192         // send more data, while this method has not yet been
193         // invoked; when the data is received the session recv
194         // window may become negative and the connection will
195         // be closed (per specification).
196 
197         if (frame.getStreamId() == 0) {
198             int sessionWindow = session.updateRecvWindow(0);
199             // Atomics.updateMax(maxSessionRecvWindow, sessionWindow);
200             if(sessionWindow > maxSessionRecvWindow)
201                 maxSessionRecvWindow = sessionWindow;
202         }
203     }
204 
205     override
206     string toString() {
207         return format("%s@%x[ratio=%.2f,sessionLevel=%s,sessionStallTime=%dms,streamsStallTime=%dms]",
208                 typeof(this).stringof, toHash(), bufferRatio, sessionLevel, getSessionStallTime(),
209                 getStreamsStallTime());
210     }
211 }