1 module hunt.http.codec.http.stream.BufferingFlowControlStrategy; 2 3 import hunt.http.codec.http.stream.AbstractFlowControlStrategy; 4 import hunt.http.codec.http.stream.StreamSPI; 5 import hunt.http.codec.http.stream.SessionSPI; 6 7 import hunt.http.codec.http.frame.Frame; 8 import hunt.http.codec.http.frame.WindowUpdateFrame; 9 10 import hunt.collection.Map; 11 import hunt.logging; 12 import hunt.util.Common; 13 14 import std.format; 15 16 /** 17 * <p> 18 * A flow control strategy that accumulates updates and emits window control 19 * frames when the accumulated value reaches a threshold. 20 * </p> 21 * <p> 22 * The sender flow control window is represented in the receiver as two buckets: 23 * a bigger bucket, initially full, that is drained when data is received, and a 24 * smaller bucket, initially empty, that is filled when data is consumed. Only 25 * the smaller bucket can refill the bigger bucket. 26 * </p> 27 * <p> 28 * The smaller bucket is defined as a fraction of the bigger bucket. 29 * </p> 30 * <p> 31 * For a more visual representation, see the 32 * <a href="http://en.wikipedia.org/wiki/Shishi-odoshi">rocking bamboo 33 * fountain</a>. 34 * </p> 35 * <p> 36 * The algorithm works in this way. 37 * </p> 38 * <p> 39 * The initial bigger bucket (BB) capacity is 100, and let's imagine the smaller 40 * bucket (SB) being 40% of the bigger bucket: 40. 41 * </p> 42 * <p> 43 * The receiver receives a data frame of 60, so now BB=40; the data frame is 44 * passed to the application that consumes 25, so now SB=25. Since SB is not 45 * full, no window control frames are emitted. 46 * </p> 47 * <p> 48 * The application consumes other 20, so now SB=45. Since SB is full, its 45 are 49 * transferred to BB, which is now BB=85, and a window control frame is sent 50 * with delta=45. 51 * </p> 52 * <p> 53 * The application consumes the remaining 15, so now SB=15, and no window 54 * control frame is emitted. 55 * </p> 56 */ 57 class BufferingFlowControlStrategy :AbstractFlowControlStrategy { 58 // private AtomicInteger maxSessionRecvWindow = new AtomicInteger(DEFAULT_WINDOW_SIZE); 59 // private AtomicInteger sessionLevel = new AtomicInteger(); 60 // private AtomicInteger[StreamSPI] streamLevels = new ConcurrentHashMap<>(); 61 private int maxSessionRecvWindow; 62 private int sessionLevel; 63 private int[StreamSPI] streamLevels; // = new ConcurrentHashMap<>(); 64 private float bufferRatio; 65 66 this(float bufferRatio) { 67 this(DEFAULT_WINDOW_SIZE, bufferRatio); 68 } 69 70 this(int initialStreamSendWindow, float bufferRatio) { 71 super(initialStreamSendWindow); 72 this.bufferRatio = bufferRatio; 73 } 74 75 float getBufferRatio() { 76 return bufferRatio; 77 } 78 79 void setBufferRatio(float bufferRatio) { 80 this.bufferRatio = bufferRatio; 81 } 82 83 override 84 void onStreamCreated(StreamSPI stream) { 85 super.onStreamCreated(stream); 86 streamLevels[stream] = 0; 87 } 88 89 override 90 void onStreamDestroyed(StreamSPI stream) { 91 streamLevels.remove(stream); 92 super.onStreamDestroyed(stream); 93 } 94 95 override 96 void onDataConsumed(SessionSPI session, StreamSPI stream, int length) { 97 if (length <= 0) { 98 return; 99 } 100 float ratio = bufferRatio; 101 102 WindowUpdateFrame windowFrame = null; 103 sessionLevel += length; 104 int level = sessionLevel; 105 int maxLevel = cast(int) (maxSessionRecvWindow * ratio); 106 if (level > maxLevel) { 107 // if (sessionLevel.compareAndSet(level, 0)) 108 if(sessionLevel == level) 109 { 110 sessionLevel = 0; 111 session.updateRecvWindow(level); 112 version(HUNT_DEBUG) { 113 tracef("Data consumed, %s bytes, updated session recv window by %s/%s for %s", length, level, 114 maxLevel, session); 115 } 116 windowFrame = new WindowUpdateFrame(0, level); 117 } else { 118 version(HUNT_DEBUG) { 119 tracef("Data consumed, %s bytes, concurrent session recv window level %s/%s for %s", length, sessionLevel, maxLevel, session); 120 } 121 } 122 } else { 123 version(HUNT_DEBUG) { 124 tracef("Data consumed, %s bytes, session recv window level %s/%s for %s", length, level, maxLevel, session); 125 } 126 } 127 128 Frame[] windowFrames = Frame.EMPTY_ARRAY; 129 if (stream !is null) { 130 if (stream.isRemotelyClosed()) { 131 version(HUNT_DEBUG) { 132 tracef("Data consumed, %s bytes, ignoring update stream recv window for remotely closed %s", length, stream); 133 } 134 } else { 135 int streamLevel = streamLevels[stream]; 136 // if (streamLevel != null) { 137 streamLevel += length; 138 level = streamLevel; // streamLevel.addAndGet(length); 139 maxLevel = cast(int) (getInitialStreamRecvWindow() * ratio); 140 if (level > maxLevel) { 141 level = streamLevel; streamLevel = 0; 142 stream.updateRecvWindow(level); 143 version(HUNT_DEBUG) { 144 tracef("Data consumed, %s bytes, updated stream recv window by %s/%s for %s", length, level, maxLevel, stream); 145 } 146 WindowUpdateFrame frame = new WindowUpdateFrame(stream.getId(), level); 147 if (windowFrame is null) { 148 windowFrame = frame; 149 } else { 150 windowFrames = [frame]; 151 } 152 } else { 153 version(HUNT_DEBUG) { 154 tracef("Data consumed, %s bytes, stream recv window level %s/%s for %s", length, level, maxLevel, stream); 155 } 156 } 157 // } 158 } 159 } 160 161 if (windowFrame !is null) { 162 session.frames(stream, Callback.NOOP, windowFrame, windowFrames); 163 } 164 } 165 166 override 167 void windowUpdate(SessionSPI session, StreamSPI stream, WindowUpdateFrame frame) { 168 super.windowUpdate(session, stream, frame); 169 170 // Window updates cannot be negative. 171 // The SettingsFrame.INITIAL_WINDOW_SIZE setting 172 // only influences the *stream* window size. 173 // Therefore the session window can only be enlarged, 174 // and here we keep track of its max value. 175 176 // Updating the max session recv window is done here 177 // so that if a peer decides to send an unilateral 178 // window update to enlarge the session window, 179 // without the corresponding data consumption, here 180 // we can track it. 181 // Note that it is not perfect, since there is a time 182 // window between the session recv window being updated 183 // before the window update frame is sent, and the 184 // invocation of this method: in between data may arrive 185 // and reduce the session recv window size. 186 // But eventually the max value will be seen. 187 188 // Note that we cannot avoid the time window described 189 // above by updating the session recv window from here 190 // because there is a race between the sender and the 191 // receiver: the sender may receive a window update and 192 // send more data, while this method has not yet been 193 // invoked; when the data is received the session recv 194 // window may become negative and the connection will 195 // be closed (per specification). 196 197 if (frame.getStreamId() == 0) { 198 int sessionWindow = session.updateRecvWindow(0); 199 // Atomics.updateMax(maxSessionRecvWindow, sessionWindow); 200 if(sessionWindow > maxSessionRecvWindow) 201 maxSessionRecvWindow = sessionWindow; 202 } 203 } 204 205 override 206 string toString() { 207 return format("%s@%x[ratio=%.2f,sessionLevel=%s,sessionStallTime=%dms,streamsStallTime=%dms]", 208 typeof(this).stringof, toHash(), bufferRatio, sessionLevel, getSessionStallTime(), 209 getStreamsStallTime()); 210 } 211 }