1 module hunt.http.codec.http.stream.Http2Stream;
2 
3 import hunt.http.codec.http.stream.CloseState;
4 import hunt.http.codec.http.stream.Http2Session;
5 import hunt.http.codec.http.stream.SessionSPI;
6 import hunt.http.codec.http.stream.Stream;
7 import hunt.http.codec.http.stream.StreamSPI;
8 
9 import hunt.http.codec.http.frame;
10 // import hunt.http.utils.concurrent.IdleTimeout;
11 import hunt.concurrency.Promise;
12 // import hunt.http.utils.concurrent.Scheduler;
13 
14 import hunt.concurrency.Scheduler;
15 import hunt.util.Common;
16 import hunt.Exceptions;
17 
18 import hunt.logging;
19 import std.format;
20 
21 import core.atomic : atomicOp, atomicLoad;
22 
23 alias Listener = hunt.http.codec.http.stream.Stream.Stream.Listener;
24 
25 /**
26 */
27 class Http2Stream : StreamSPI { // IdleTimeout,
28 
29     // private AtomicReference<ConcurrentMap<string, Object>> attributes = new AtomicReference<>();
30     // private AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
31     // private AtomicInteger sendWindow = new AtomicInteger();
32     // private AtomicInteger recvWindow = new AtomicInteger();
33 
34     private Object[string] attributes;
35     private CloseState closeState;
36     private shared int sendWindow;
37     private shared int recvWindow;
38 
39     private SessionSPI session;
40     private int streamId;
41     private bool local;
42     private  Listener listener;
43     private  bool localReset;
44     private  bool remoteReset;
45 
46     this(Scheduler scheduler, SessionSPI session, int streamId, bool local) {
47         // super(scheduler);
48         closeState = CloseState.NOT_CLOSED; 
49         this.session = session;
50         this.streamId = streamId;
51         this.local = local;
52     }
53 
54     // override
55     int getId() {
56         return streamId;
57     }
58 
59     // override
60     bool isLocal() {
61         return local;
62     }
63 
64     // override
65     SessionSPI getSession() {
66         return session;
67     }
68 
69     // override
70     void headers(HeadersFrame frame, Callback callback) {
71         session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
72     }
73 
74     // override
75     void push(PushPromiseFrame frame, Promise!Stream promise, Listener listener) {
76         session.push(this, promise, frame, listener);
77     }
78 
79     // override
80     void data(DataFrame frame, Callback callback) {
81         session.data(this, callback, frame);
82     }
83 
84     // override
85     void reset(ResetFrame frame, Callback callback) {
86         if (isReset())
87             return;
88         localReset = true;
89         session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
90     }
91 
92     // override
93     Object getAttribute(string key) {
94         return attributes[key];
95     }
96 
97     // override
98     void setAttribute(string key, Object value) {
99         attributes[key] = value;
100     }
101 
102     // override
103     Object removeAttribute(string key) {
104         auto r = attributes[key];
105         attributes.remove(key);
106         return r;
107     }
108 
109     // override
110     bool isReset() {
111         return localReset || remoteReset;
112     }
113 
114     // override
115     bool isClosed() {
116         return closeState == CloseState.CLOSED;
117     }
118 
119     // override
120     bool isRemotelyClosed() {
121         return closeState == CloseState.REMOTELY_CLOSED;
122     }
123 
124     bool isLocallyClosed() {
125         return closeState == CloseState.LOCALLY_CLOSED;
126     }
127 
128     // override
129     bool isOpen() {
130         return !isClosed();
131     }
132 
133     // override
134     protected void onIdleExpired(TimeoutException timeout) {
135         version(HUNT_DEBUG) {
136             tracef("Idle timeout %sms expired on %s", 0, this.toString()); // getIdleTimeout()
137         }
138 
139         // Notify the application.
140         if (notifyIdleTimeout(this, timeout)) {
141             // Tell the other peer that we timed out.
142             reset(new ResetFrame(getId(), cast(int)ErrorCode.CANCEL_STREAM_ERROR), Callback.NOOP);
143         }
144     }
145 
146     // private ConcurrentMap<string, Object> attributes() {
147     //     ConcurrentMap<string, Object> map = attributes;
148     //     if (map == null) {
149     //         map = new ConcurrentHashMap<>();
150     //         if (!attributes.compareAndSet(null, map)) {
151     //             map = attributes;
152     //         }
153     //     }
154     //     return map;
155     // }
156 
157     // override
158     Listener getListener() {
159         return listener;
160     }
161 
162     // override
163     void setListener(Listener listener) {
164         this.listener = listener;
165     }
166 
167     // override
168     void process(Frame frame, Callback callback) {
169         // notIdle();
170         switch (frame.getType()) {
171             case FrameType.HEADERS: {
172                 onHeaders(cast(HeadersFrame) frame, callback);
173                 break;
174             }
175             case FrameType.DATA: {
176                 onData(cast(DataFrame) frame, callback);
177                 break;
178             }
179             case FrameType.RST_STREAM: {
180                 onReset(cast(ResetFrame) frame, callback);
181                 break;
182             }
183             case FrameType.PUSH_PROMISE: {
184                 onPush(cast(PushPromiseFrame) frame, callback);
185                 break;
186             }
187             case FrameType.WINDOW_UPDATE: {
188                 onWindowUpdate(cast(WindowUpdateFrame) frame, callback);
189                 break;
190             }
191             default: {
192                 throw new UnsupportedOperationException("");
193             }
194         }
195     }
196 
197     private void onHeaders(HeadersFrame frame, Callback callback) {
198         if (updateClose(frame.isEndStream(), CloseStateEvent.RECEIVED))
199             session.removeStream(this);
200         callback.succeeded();
201     }
202 
203     private void onData(DataFrame frame, Callback callback) {
204         if (getRecvWindow() < 0) {
205             // It's a bad client, it does not deserve to be
206             // treated gently by just resetting the stream.
207             session.close(ErrorCode.FLOW_CONTROL_ERROR, "stream_window_exceeded", Callback.NOOP);
208             callback.failed(new IOException("stream_window_exceeded"));
209             return;
210         }
211 
212         // SPEC: remotely closed streams must be replied with a reset.
213         if (isRemotelyClosed()) {
214             reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR), Callback.NOOP);
215             callback.failed(new EOFException("stream_closed"));
216             return;
217         }
218 
219         if (isReset()) {
220             // Just drop the frame.
221             callback.failed(new IOException("stream_reset"));
222             return;
223         }
224 
225         if (updateClose(frame.isEndStream(), CloseStateEvent.RECEIVED))
226             session.removeStream(this);
227         notifyData(this, frame, callback);
228     }
229 
230     private void onReset(ResetFrame frame, Callback callback) {
231         remoteReset = true;
232         close();
233         session.removeStream(this);
234         notifyReset(this, frame, callback);
235     }
236 
237     private void onPush(PushPromiseFrame frame, Callback callback) {
238         // Pushed streams are implicitly locally closed.
239         // They are closed when receiving an end-stream DATA frame.
240         updateClose(true, CloseStateEvent.AFTER_SEND);
241         callback.succeeded();
242     }
243 
244     private void onWindowUpdate(WindowUpdateFrame frame, Callback callback) {
245         callback.succeeded();
246     }
247 
248     override
249     bool updateClose(bool update, CloseStateEvent event) {
250         version(HUNT_DEBUG) {
251             tracef("Update close for %s update=%s event=%s", this, update, event);
252         }
253 
254         if (!update)
255             return false;
256 
257         switch (event) {
258             case CloseStateEvent.RECEIVED:
259                 return updateCloseAfterReceived();
260             case CloseStateEvent.BEFORE_SEND:
261                 return updateCloseBeforeSend();
262             case CloseStateEvent.AFTER_SEND:
263                 return updateCloseAfterSend();
264             default:
265                 return false;
266         }
267     }
268 
269     private bool updateCloseAfterReceived() {
270         while (true) {
271             CloseState current = closeState;
272             switch (current) {
273                 case CloseState.NOT_CLOSED: {
274                     if (closeState == current)
275                     {
276                         closeState = CloseState.REMOTELY_CLOSED;
277                         return false;
278                     }
279                     break;
280                 }
281                 case CloseState.LOCALLY_CLOSING: {
282                     // if (closeState.compareAndSet(current, CloseState.CLOSING)) {
283                     if (closeState == current)
284                     {
285                         closeState = CloseState.CLOSING;
286                         updateStreamCount(0, 1);
287                         return false;
288                     }
289                     break;
290                 }
291                 case CloseState.LOCALLY_CLOSED: {
292                     close();
293                     return true;
294                 }
295                 default: {
296                     return false;
297                 }
298             }
299         }
300     }
301 
302     private bool updateCloseBeforeSend() {
303         while (true) {
304             CloseState current = closeState;
305             switch (current) {
306                 case CloseState.NOT_CLOSED: {
307                     // if (closeState.compareAndSet(current, CloseState.LOCALLY_CLOSING))
308                     if (closeState == current)
309                     {
310                         closeState = CloseState.LOCALLY_CLOSING;
311                         return false;
312                     }
313                     break;
314                 }
315                 case CloseState.REMOTELY_CLOSED: {
316                     // if (closeState.compareAndSet(current, CloseState.CLOSING)) {
317                     if (closeState == current)
318                     {
319                         closeState = CloseState.CLOSING;
320                         updateStreamCount(0, 1);
321                         return false;
322                     }
323                     break;
324                 }
325                 default: {
326                     return false;
327                 }
328             }
329         }
330     }
331 
332     private bool updateCloseAfterSend() {
333         while (true) {
334             CloseState current = closeState;
335             switch (current) {
336                 case CloseState.NOT_CLOSED:
337                 case CloseState.LOCALLY_CLOSING: {
338                     // if (closeState.compareAndSet(current, CloseState.LOCALLY_CLOSED))
339                     if (closeState == current)
340                     {
341                         closeState = CloseState.LOCALLY_CLOSING;
342                         return false;
343                     }
344                     break;
345                 }
346                 case CloseState.REMOTELY_CLOSED:
347                 case CloseState.CLOSING: {
348                     close();
349                     return true;
350                 }
351                 default: {
352                     return false;
353                 }
354             }
355         }
356     }
357 
358     int getSendWindow() {
359         return sendWindow.atomicLoad!();
360     }
361 
362     int getRecvWindow() {
363         return recvWindow.atomicLoad!();
364     }
365 
366     override
367     int updateSendWindow(int delta) {
368         int r = sendWindow.atomicLoad!(); sendWindow.atomicOp!"+="(delta);
369         return r;
370         // return sendWindow.getAndAdd(delta);
371     }
372 
373     override
374     int updateRecvWindow(int delta) {
375         int r = recvWindow.atomicLoad!(); recvWindow.atomicOp!"+="(delta);
376         return r;
377     }
378 
379     override
380     void close() {
381         CloseState oldState = closeState;
382         closeState = CloseState.CLOSED;
383         if (oldState != CloseState.CLOSED) {
384             int deltaClosing = oldState == CloseState.CLOSING ? -1 : 0;
385             updateStreamCount(-1, deltaClosing);
386             // onClose();
387         }
388     }
389 
390     private void updateStreamCount(int deltaStream, int deltaClosing) {
391         (cast(Http2Session) session).updateStreamCount(isLocal(), deltaStream, deltaClosing);
392     }
393 
394     private void notifyData(Stream stream, DataFrame frame, Callback callback) {
395         Listener listener = this.listener;
396         if (listener is null)
397             return;
398         try {
399             listener.onData(stream, frame, callback);
400         } catch (Exception x) {
401             info("Failure while notifying listener " ~ listener.toString(), x);
402         }
403     }
404 
405     private void notifyReset(Stream stream, ResetFrame frame, Callback callback) {
406         Listener listener = this.listener;
407         if (listener is null)
408             return;
409         try {
410             listener.onReset(stream, frame, callback);
411         } catch (Exception x) {
412             info("Failure while notifying listener " ~ listener.toString(), x);
413         }
414     }
415 
416     private bool notifyIdleTimeout(Stream stream, Exception failure) {
417         Listener listener = this.listener;
418         if (listener is null)
419             return true;
420         try {
421             return listener.onIdleTimeout(stream, failure);
422         } catch (Exception x) {
423             info("Failure while notifying listener " ~ listener.toString(), x);
424             return true;
425         }
426     }
427 
428     override
429     string toString() {
430         return format("%s@%x#%d{sendWindow=%s,recvWindow=%s,reset=%b,%s}", typeof(this).stringof,
431                 toHash(), getId(), sendWindow, recvWindow, isReset(), closeState);
432     }
433 }