1 module hunt.http.codec.http.stream.AbstractFlowControlStrategy;
2 
3 import hunt.http.codec.http.stream.FlowControlStrategy;
4 import hunt.http.codec.http.stream.Stream;
5 import hunt.http.codec.http.stream.StreamSPI;
6 import hunt.http.codec.http.stream.SessionSPI;
7 
8 import hunt.http.codec.http.frame.WindowUpdateFrame;
9 
10 import std.datetime;
11 import core.time;
12 
13 import hunt.collection.Map;
14 import hunt.Exceptions;
15 import hunt.logging;
16 import hunt.util.DateTime;
17 
18 
19 abstract class AbstractFlowControlStrategy : FlowControlStrategy {
20     
21     // private AtomicLong sessionStall = new AtomicLong();
22     // private AtomicLong sessionStallTime = new AtomicLong();
23     // private Map!(StreamSPI, long) streamsStalls = new ConcurrentHashMap!(StreamSPI, long)();
24     // private AtomicLong streamsStallTime = new AtomicLong();
25     private long sessionStall;
26     private long sessionStallTime;
27     private long streamsStallTime;
28     private long[StreamSPI] streamsStalls;
29 
30     private int initialStreamSendWindow;
31     private int initialStreamRecvWindow;
32 
33     this(int initialStreamSendWindow) {
34         this.initialStreamSendWindow = initialStreamSendWindow;
35         this.initialStreamRecvWindow = DEFAULT_WINDOW_SIZE; 
36     }
37 
38     int getInitialStreamSendWindow() {
39         return initialStreamSendWindow;
40     }
41 
42     int getInitialStreamRecvWindow() {
43         return initialStreamRecvWindow;
44     }
45 
46     override
47     void onStreamCreated(StreamSPI stream) {
48         stream.updateSendWindow(initialStreamSendWindow);
49         stream.updateRecvWindow(initialStreamRecvWindow);
50     }
51 
52     override
53     void onStreamDestroyed(StreamSPI stream) {
54         streamsStalls.remove(stream);
55     }
56 
57 	void onDataConsumed(SessionSPI session, StreamSPI stream, int length) { implementationMissing(); }
58 
59     override
60     void updateInitialStreamWindow(SessionSPI session, int initialStreamWindow, bool local) {
61         int previousInitialStreamWindow;
62         if (local) {
63             previousInitialStreamWindow = getInitialStreamRecvWindow();
64             this.initialStreamRecvWindow = initialStreamWindow;
65         } else {
66             previousInitialStreamWindow = getInitialStreamSendWindow();
67             this.initialStreamSendWindow = initialStreamWindow;
68         }
69         int delta = initialStreamWindow - previousInitialStreamWindow;
70 
71         // SPEC: updates of the initial window size only affect stream windows, not session's.
72         foreach (Stream stream ; session.getStreams()) {
73             if (local) {
74                 (cast(StreamSPI) stream).updateRecvWindow(delta);
75                 version(HUNT_DEBUG)
76                     tracef("Updated initial stream recv window %s -> %s for %s", previousInitialStreamWindow, initialStreamWindow, stream);
77             } else {
78                 session.onWindowUpdate(cast(StreamSPI) stream, new WindowUpdateFrame(stream.getId(), delta));
79             }
80         }
81     }
82 
83     override
84     void onWindowUpdate(SessionSPI session, StreamSPI stream, WindowUpdateFrame frame) {
85         int delta = frame.getWindowDelta();
86         if (frame.getStreamId() > 0) {
87             // The stream may have been removed concurrently.
88             if (stream !is null) {
89                 int oldSize = stream.updateSendWindow(delta);
90                 version(HUNT_DEBUG)
91                     tracef("Updated stream send window %s -> %s for %s", oldSize, oldSize + delta, stream);
92                 if (oldSize <= 0)
93                     onStreamUnstalled(stream);
94             }
95         } else {
96             int oldSize = session.updateSendWindow(delta);
97             //version(HUNT_DEBUG)
98             //tracef("Updated session send window %s -> %s for %s", oldSize, oldSize + delta, session);
99             if (oldSize <= 0)
100                 onSessionUnstalled(session);
101         }
102     }
103 
104     override
105     void onDataReceived(SessionSPI session, StreamSPI stream, int length) {
106         int oldSize = session.updateRecvWindow(-length);
107         version(HUNT_DEBUG)
108             tracef("Data received, %s bytes, updated session recv window %s -> %s for %s", length, oldSize, oldSize - length, session);
109 
110         if (stream !is null) {
111             oldSize = stream.updateRecvWindow(-length);
112             version(HUNT_DEBUG)
113                 tracef("Data received, %s bytes, updated stream recv window %s -> %s for %s", length, oldSize, oldSize - length, stream);
114         }
115     }
116 
117     override
118     void windowUpdate(SessionSPI session, StreamSPI stream, WindowUpdateFrame frame) {
119     }
120 
121     override
122     void onDataSending(StreamSPI stream, int length) {
123         if (length == 0)
124             return;
125 
126         SessionSPI session = stream.getSession();
127         int oldSessionWindow = session.updateSendWindow(-length);
128         int newSessionWindow = oldSessionWindow - length;
129         version(HUNT_DEBUG)
130             tracef("Sending, session send window %s -> %s for %s", oldSessionWindow, newSessionWindow, session);
131         if (newSessionWindow <= 0)
132             onSessionStalled(session);
133 
134         int oldStreamWindow = stream.updateSendWindow(-length);
135         int newStreamWindow = oldStreamWindow - length;
136         version(HUNT_DEBUG)
137             tracef("Sending, stream send window %s -> %s for %s", oldStreamWindow, newStreamWindow, stream);
138         if (newStreamWindow <= 0)
139             onStreamStalled(stream);
140     }
141 
142     override
143     void onDataSent(StreamSPI stream, int length) {
144     }
145 
146     protected void onSessionStalled(SessionSPI session) {
147         sessionStall = Clock.currStdTime;
148         version(HUNT_DEBUG)
149             tracef("Session stalled %s", session);
150     }
151 
152     protected void onStreamStalled(StreamSPI stream) {
153         streamsStalls[stream] = Clock.currStdTime;
154         version(HUNT_DEBUG)
155             tracef("Stream stalled %s", stream);
156     }
157 
158     protected void onSessionUnstalled(SessionSPI session) {
159         sessionStallTime += (Clock.currStdTime - sessionStall);
160         sessionStall = 0;
161         version(HUNT_DEBUG)
162             tracef("Session unstalled %s", session);
163     }
164 
165     protected void onStreamUnstalled(StreamSPI stream) {
166         auto itemPtr = stream in streamsStalls;
167         
168         if (itemPtr !is null)
169         {
170             long time = *itemPtr;
171             streamsStalls.remove(stream);
172             streamsStallTime += (Clock.currStdTime - time);
173 
174         }
175         version(HUNT_DEBUG)
176             tracef("Stream unstalled %s", stream);
177     }
178 
179     long getSessionStallTime() {
180         long pastStallTime = sessionStallTime;
181         long currentStallTime = sessionStall;
182         if (currentStallTime != 0)
183             currentStallTime = Clock.currStdTime - currentStallTime;
184         return convert!(TimeUnit.HectoNanosecond, TimeUnit.Millisecond)(pastStallTime + currentStallTime);
185     }
186 
187     long getStreamsStallTime() {
188         long pastStallTime = streamsStallTime;
189         long now = Clock.currStdTime;
190         // long currentStallTime = streamsStalls.values().stream().reduce(0L, (result, time) => now - time);
191         long currentStallTime = 0L;
192         foreach(long v; streamsStalls.byValue)
193         {
194             currentStallTime =  now - v;           
195         }
196 
197         return convert!(TimeUnit.HectoNanosecond, TimeUnit.Millisecond)(pastStallTime + currentStallTime);
198     }
199 
200     void reset() {
201         sessionStallTime = (0);
202         streamsStallTime = (0);
203     }
204 }