1 module hunt.http.codec.http.stream.AbstractFlowControlStrategy;
2
3 import hunt.http.codec.http.stream.FlowControlStrategy;
4 import hunt.http.codec.http.stream.Stream;
5 import hunt.http.codec.http.stream.StreamSPI;
6 import hunt.http.codec.http.stream.SessionSPI;
7
8 import hunt.http.codec.http.frame.WindowUpdateFrame;
9
10 import std.datetime;
11 import core.time;
12
13 import hunt.collection.Map;
14 import hunt.Exceptions;
15 import hunt.logging;
16 import hunt.util.DateTime;
17
18
19 abstract class AbstractFlowControlStrategy : FlowControlStrategy {
20
21 // private AtomicLong sessionStall = new AtomicLong();
22 // private AtomicLong sessionStallTime = new AtomicLong();
23 // private Map!(StreamSPI, long) streamsStalls = new ConcurrentHashMap!(StreamSPI, long)();
24 // private AtomicLong streamsStallTime = new AtomicLong();
25 private long sessionStall;
26 private long sessionStallTime;
27 private long streamsStallTime;
28 private long[StreamSPI] streamsStalls;
29
30 private int initialStreamSendWindow;
31 private int initialStreamRecvWindow;
32
33 this(int initialStreamSendWindow) {
34 this.initialStreamSendWindow = initialStreamSendWindow;
35 this.initialStreamRecvWindow = DEFAULT_WINDOW_SIZE;
36 }
37
38 int getInitialStreamSendWindow() {
39 return initialStreamSendWindow;
40 }
41
42 int getInitialStreamRecvWindow() {
43 return initialStreamRecvWindow;
44 }
45
46 override
47 void onStreamCreated(StreamSPI stream) {
48 stream.updateSendWindow(initialStreamSendWindow);
49 stream.updateRecvWindow(initialStreamRecvWindow);
50 }
51
52 override
53 void onStreamDestroyed(StreamSPI stream) {
54 streamsStalls.remove(stream);
55 }
56
57 void onDataConsumed(SessionSPI session, StreamSPI stream, int length) { implementationMissing(); }
58
59 override
60 void updateInitialStreamWindow(SessionSPI session, int initialStreamWindow, bool local) {
61 int previousInitialStreamWindow;
62 if (local) {
63 previousInitialStreamWindow = getInitialStreamRecvWindow();
64 this.initialStreamRecvWindow = initialStreamWindow;
65 } else {
66 previousInitialStreamWindow = getInitialStreamSendWindow();
67 this.initialStreamSendWindow = initialStreamWindow;
68 }
69 int delta = initialStreamWindow - previousInitialStreamWindow;
70
71 // SPEC: updates of the initial window size only affect stream windows, not session's.
72 foreach (Stream stream ; session.getStreams()) {
73 if (local) {
74 (cast(StreamSPI) stream).updateRecvWindow(delta);
75 version(HUNT_DEBUG)
76 tracef("Updated initial stream recv window %s -> %s for %s", previousInitialStreamWindow, initialStreamWindow, stream);
77 } else {
78 session.onWindowUpdate(cast(StreamSPI) stream, new WindowUpdateFrame(stream.getId(), delta));
79 }
80 }
81 }
82
83 override
84 void onWindowUpdate(SessionSPI session, StreamSPI stream, WindowUpdateFrame frame) {
85 int delta = frame.getWindowDelta();
86 if (frame.getStreamId() > 0) {
87 // The stream may have been removed concurrently.
88 if (stream !is null) {
89 int oldSize = stream.updateSendWindow(delta);
90 version(HUNT_DEBUG)
91 tracef("Updated stream send window %s -> %s for %s", oldSize, oldSize + delta, stream);
92 if (oldSize <= 0)
93 onStreamUnstalled(stream);
94 }
95 } else {
96 int oldSize = session.updateSendWindow(delta);
97 //version(HUNT_DEBUG)
98 //tracef("Updated session send window %s -> %s for %s", oldSize, oldSize + delta, session);
99 if (oldSize <= 0)
100 onSessionUnstalled(session);
101 }
102 }
103
104 override
105 void onDataReceived(SessionSPI session, StreamSPI stream, int length) {
106 int oldSize = session.updateRecvWindow(-length);
107 version(HUNT_DEBUG)
108 tracef("Data received, %s bytes, updated session recv window %s -> %s for %s", length, oldSize, oldSize - length, session);
109
110 if (stream !is null) {
111 oldSize = stream.updateRecvWindow(-length);
112 version(HUNT_DEBUG)
113 tracef("Data received, %s bytes, updated stream recv window %s -> %s for %s", length, oldSize, oldSize - length, stream);
114 }
115 }
116
117 override
118 void windowUpdate(SessionSPI session, StreamSPI stream, WindowUpdateFrame frame) {
119 }
120
121 override
122 void onDataSending(StreamSPI stream, int length) {
123 if (length == 0)
124 return;
125
126 SessionSPI session = stream.getSession();
127 int oldSessionWindow = session.updateSendWindow(-length);
128 int newSessionWindow = oldSessionWindow - length;
129 version(HUNT_DEBUG)
130 tracef("Sending, session send window %s -> %s for %s", oldSessionWindow, newSessionWindow, session);
131 if (newSessionWindow <= 0)
132 onSessionStalled(session);
133
134 int oldStreamWindow = stream.updateSendWindow(-length);
135 int newStreamWindow = oldStreamWindow - length;
136 version(HUNT_DEBUG)
137 tracef("Sending, stream send window %s -> %s for %s", oldStreamWindow, newStreamWindow, stream);
138 if (newStreamWindow <= 0)
139 onStreamStalled(stream);
140 }
141
142 override
143 void onDataSent(StreamSPI stream, int length) {
144 }
145
146 protected void onSessionStalled(SessionSPI session) {
147 sessionStall = Clock.currStdTime;
148 version(HUNT_DEBUG)
149 tracef("Session stalled %s", session);
150 }
151
152 protected void onStreamStalled(StreamSPI stream) {
153 streamsStalls[stream] = Clock.currStdTime;
154 version(HUNT_DEBUG)
155 tracef("Stream stalled %s", stream);
156 }
157
158 protected void onSessionUnstalled(SessionSPI session) {
159 sessionStallTime += (Clock.currStdTime - sessionStall);
160 sessionStall = 0;
161 version(HUNT_DEBUG)
162 tracef("Session unstalled %s", session);
163 }
164
165 protected void onStreamUnstalled(StreamSPI stream) {
166 auto itemPtr = stream in streamsStalls;
167
168 if (itemPtr !is null)
169 {
170 long time = *itemPtr;
171 streamsStalls.remove(stream);
172 streamsStallTime += (Clock.currStdTime - time);
173
174 }
175 version(HUNT_DEBUG)
176 tracef("Stream unstalled %s", stream);
177 }
178
179 long getSessionStallTime() {
180 long pastStallTime = sessionStallTime;
181 long currentStallTime = sessionStall;
182 if (currentStallTime != 0)
183 currentStallTime = Clock.currStdTime - currentStallTime;
184 return convert!(TimeUnit.HectoNanosecond, TimeUnit.Millisecond)(pastStallTime + currentStallTime);
185 }
186
187 long getStreamsStallTime() {
188 long pastStallTime = streamsStallTime;
189 long now = Clock.currStdTime;
190 // long currentStallTime = streamsStalls.values().stream().reduce(0L, (result, time) => now - time);
191 long currentStallTime = 0L;
192 foreach(long v; streamsStalls.byValue)
193 {
194 currentStallTime = now - v;
195 }
196
197 return convert!(TimeUnit.HectoNanosecond, TimeUnit.Millisecond)(pastStallTime + currentStallTime);
198 }
199
200 void reset() {
201 sessionStallTime = (0);
202 streamsStallTime = (0);
203 }
204 }