1 module hunt.http.codec.http.stream.AbstractFlowControlStrategy; 2 3 import hunt.http.codec.http.stream.FlowControlStrategy; 4 import hunt.http.codec.http.stream.Stream; 5 import hunt.http.codec.http.stream.StreamSPI; 6 import hunt.http.codec.http.stream.SessionSPI; 7 8 import hunt.http.codec.http.frame.WindowUpdateFrame; 9 10 import std.datetime; 11 import core.time; 12 13 import hunt.collection.Map; 14 import hunt.Exceptions; 15 import hunt.logging; 16 import hunt.util.DateTime; 17 18 19 abstract class AbstractFlowControlStrategy : FlowControlStrategy { 20 21 // private AtomicLong sessionStall = new AtomicLong(); 22 // private AtomicLong sessionStallTime = new AtomicLong(); 23 // private Map!(StreamSPI, long) streamsStalls = new ConcurrentHashMap!(StreamSPI, long)(); 24 // private AtomicLong streamsStallTime = new AtomicLong(); 25 private long sessionStall; 26 private long sessionStallTime; 27 private long streamsStallTime; 28 private long[StreamSPI] streamsStalls; 29 30 private int initialStreamSendWindow; 31 private int initialStreamRecvWindow; 32 33 this(int initialStreamSendWindow) { 34 this.initialStreamSendWindow = initialStreamSendWindow; 35 this.initialStreamRecvWindow = DEFAULT_WINDOW_SIZE; 36 } 37 38 int getInitialStreamSendWindow() { 39 return initialStreamSendWindow; 40 } 41 42 int getInitialStreamRecvWindow() { 43 return initialStreamRecvWindow; 44 } 45 46 override 47 void onStreamCreated(StreamSPI stream) { 48 stream.updateSendWindow(initialStreamSendWindow); 49 stream.updateRecvWindow(initialStreamRecvWindow); 50 } 51 52 override 53 void onStreamDestroyed(StreamSPI stream) { 54 streamsStalls.remove(stream); 55 } 56 57 void onDataConsumed(SessionSPI session, StreamSPI stream, int length) { implementationMissing(); } 58 59 override 60 void updateInitialStreamWindow(SessionSPI session, int initialStreamWindow, bool local) { 61 int previousInitialStreamWindow; 62 if (local) { 63 previousInitialStreamWindow = getInitialStreamRecvWindow(); 64 this.initialStreamRecvWindow = initialStreamWindow; 65 } else { 66 previousInitialStreamWindow = getInitialStreamSendWindow(); 67 this.initialStreamSendWindow = initialStreamWindow; 68 } 69 int delta = initialStreamWindow - previousInitialStreamWindow; 70 71 // SPEC: updates of the initial window size only affect stream windows, not session's. 72 foreach (Stream stream ; session.getStreams()) { 73 if (local) { 74 (cast(StreamSPI) stream).updateRecvWindow(delta); 75 version(HUNT_DEBUG) 76 tracef("Updated initial stream recv window %s -> %s for %s", previousInitialStreamWindow, initialStreamWindow, stream); 77 } else { 78 session.onWindowUpdate(cast(StreamSPI) stream, new WindowUpdateFrame(stream.getId(), delta)); 79 } 80 } 81 } 82 83 override 84 void onWindowUpdate(SessionSPI session, StreamSPI stream, WindowUpdateFrame frame) { 85 int delta = frame.getWindowDelta(); 86 if (frame.getStreamId() > 0) { 87 // The stream may have been removed concurrently. 88 if (stream !is null) { 89 int oldSize = stream.updateSendWindow(delta); 90 version(HUNT_DEBUG) 91 tracef("Updated stream send window %s -> %s for %s", oldSize, oldSize + delta, stream); 92 if (oldSize <= 0) 93 onStreamUnstalled(stream); 94 } 95 } else { 96 int oldSize = session.updateSendWindow(delta); 97 //version(HUNT_DEBUG) 98 //tracef("Updated session send window %s -> %s for %s", oldSize, oldSize + delta, session); 99 if (oldSize <= 0) 100 onSessionUnstalled(session); 101 } 102 } 103 104 override 105 void onDataReceived(SessionSPI session, StreamSPI stream, int length) { 106 int oldSize = session.updateRecvWindow(-length); 107 version(HUNT_DEBUG) 108 tracef("Data received, %s bytes, updated session recv window %s -> %s for %s", length, oldSize, oldSize - length, session); 109 110 if (stream !is null) { 111 oldSize = stream.updateRecvWindow(-length); 112 version(HUNT_DEBUG) 113 tracef("Data received, %s bytes, updated stream recv window %s -> %s for %s", length, oldSize, oldSize - length, stream); 114 } 115 } 116 117 override 118 void windowUpdate(SessionSPI session, StreamSPI stream, WindowUpdateFrame frame) { 119 } 120 121 override 122 void onDataSending(StreamSPI stream, int length) { 123 if (length == 0) 124 return; 125 126 SessionSPI session = stream.getSession(); 127 int oldSessionWindow = session.updateSendWindow(-length); 128 int newSessionWindow = oldSessionWindow - length; 129 version(HUNT_DEBUG) 130 tracef("Sending, session send window %s -> %s for %s", oldSessionWindow, newSessionWindow, session); 131 if (newSessionWindow <= 0) 132 onSessionStalled(session); 133 134 int oldStreamWindow = stream.updateSendWindow(-length); 135 int newStreamWindow = oldStreamWindow - length; 136 version(HUNT_DEBUG) 137 tracef("Sending, stream send window %s -> %s for %s", oldStreamWindow, newStreamWindow, stream); 138 if (newStreamWindow <= 0) 139 onStreamStalled(stream); 140 } 141 142 override 143 void onDataSent(StreamSPI stream, int length) { 144 } 145 146 protected void onSessionStalled(SessionSPI session) { 147 sessionStall = Clock.currStdTime; 148 version(HUNT_DEBUG) 149 tracef("Session stalled %s", session); 150 } 151 152 protected void onStreamStalled(StreamSPI stream) { 153 streamsStalls[stream] = Clock.currStdTime; 154 version(HUNT_DEBUG) 155 tracef("Stream stalled %s", stream); 156 } 157 158 protected void onSessionUnstalled(SessionSPI session) { 159 sessionStallTime += (Clock.currStdTime - sessionStall); 160 sessionStall = 0; 161 version(HUNT_DEBUG) 162 tracef("Session unstalled %s", session); 163 } 164 165 protected void onStreamUnstalled(StreamSPI stream) { 166 auto itemPtr = stream in streamsStalls; 167 168 if (itemPtr !is null) 169 { 170 long time = *itemPtr; 171 streamsStalls.remove(stream); 172 streamsStallTime += (Clock.currStdTime - time); 173 174 } 175 version(HUNT_DEBUG) 176 tracef("Stream unstalled %s", stream); 177 } 178 179 long getSessionStallTime() { 180 long pastStallTime = sessionStallTime; 181 long currentStallTime = sessionStall; 182 if (currentStallTime != 0) 183 currentStallTime = Clock.currStdTime - currentStallTime; 184 return convert!(TimeUnit.HectoNanosecond, TimeUnit.Millisecond)(pastStallTime + currentStallTime); 185 } 186 187 long getStreamsStallTime() { 188 long pastStallTime = streamsStallTime; 189 long now = Clock.currStdTime; 190 // long currentStallTime = streamsStalls.values().stream().reduce(0L, (result, time) => now - time); 191 long currentStallTime = 0L; 192 foreach(long v; streamsStalls.byValue) 193 { 194 currentStallTime = now - v; 195 } 196 197 return convert!(TimeUnit.HectoNanosecond, TimeUnit.Millisecond)(pastStallTime + currentStallTime); 198 } 199 200 void reset() { 201 sessionStallTime = (0); 202 streamsStallTime = (0); 203 } 204 }