1 module hunt.http.codec.http.stream.Http2Stream;
2 
3 import hunt.http.codec.http.stream.CloseState;
4 import hunt.http.codec.http.stream.Http2Session;
5 import hunt.http.codec.http.stream.SessionSPI;
6 import hunt.http.codec.http.stream.Stream;
7 import hunt.http.codec.http.stream.StreamSPI;
8 
9 import hunt.http.codec.http.frame;
10 // import hunt.http.utils.concurrent.IdleTimeout;
11 import hunt.util.concurrent.Promise;
12 // import hunt.http.utils.concurrent.Scheduler;
13 
14 import hunt.util.concurrent.Scheduler;
15 import hunt.util.functional;
16 import hunt.lang.exception;
17 
18 import hunt.logging;
19 import std.format;
20 
21 alias Listener = hunt.http.codec.http.stream.Stream.Stream.Listener;
22 
23 /**
24 */
25 class Http2Stream : StreamSPI { // IdleTimeout, 
26 
27     // private AtomicReference<ConcurrentMap<string, Object>> attributes = new AtomicReference<>();
28     // private AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
29     // private AtomicInteger sendWindow = new AtomicInteger();
30     // private AtomicInteger recvWindow = new AtomicInteger();
31 
32     private Object[string] attributes;
33     private CloseState closeState;
34     private int sendWindow;
35     private int recvWindow;
36 
37     private SessionSPI session;
38     private int streamId;
39     private bool local;
40     private  Listener listener;
41     private  bool localReset;
42     private  bool remoteReset;
43 
44     this(Scheduler scheduler, SessionSPI session, int streamId, bool local) {
45         // super(scheduler);
46         closeState = CloseState.NOT_CLOSED; 
47         this.session = session;
48         this.streamId = streamId;
49         this.local = local;
50     }
51 
52     // override
53     int getId() {
54         return streamId;
55     }
56 
57     // override
58     bool isLocal() {
59         return local;
60     }
61 
62     // override
63     SessionSPI getSession() {
64         return session;
65     }
66 
67     // override
68     void headers(HeadersFrame frame, Callback callback) {
69         session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
70     }
71 
72     // override
73     void push(PushPromiseFrame frame, Promise!Stream promise, Listener listener) {
74         session.push(this, promise, frame, listener);
75     }
76 
77     // override
78     void data(DataFrame frame, Callback callback) {
79         session.data(this, callback, frame);
80     }
81 
82     // override
83     void reset(ResetFrame frame, Callback callback) {
84         if (isReset())
85             return;
86         localReset = true;
87         session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
88     }
89 
90     // override
91     Object getAttribute(string key) {
92         return attributes[key];
93     }
94 
95     // override
96     void setAttribute(string key, Object value) {
97         attributes[key] = value;
98     }
99 
100     // override
101     Object removeAttribute(string key) {
102         auto r = attributes[key];
103         attributes.remove(key);
104         return r;
105     }
106 
107     // override
108     bool isReset() {
109         return localReset || remoteReset;
110     }
111 
112     // override
113     bool isClosed() {
114         return closeState == CloseState.CLOSED;
115     }
116 
117     // override
118     bool isRemotelyClosed() {
119         return closeState == CloseState.REMOTELY_CLOSED;
120     }
121 
122     bool isLocallyClosed() {
123         return closeState == CloseState.LOCALLY_CLOSED;
124     }
125 
126     // override
127     bool isOpen() {
128         return !isClosed();
129     }
130 
131     // override
132     protected void onIdleExpired(TimeoutException timeout) {
133         version(HUNT_DEBUG) {
134             tracef("Idle timeout %sms expired on %s", 0, this.toString()); // getIdleTimeout()
135         }
136 
137         // Notify the application.
138         if (notifyIdleTimeout(this, timeout)) {
139             // Tell the other peer that we timed out.
140             reset(new ResetFrame(getId(), cast(int)ErrorCode.CANCEL_STREAM_ERROR), Callback.NOOP);
141         }
142     }
143 
144     // private ConcurrentMap<string, Object> attributes() {
145     //     ConcurrentMap<string, Object> map = attributes;
146     //     if (map == null) {
147     //         map = new ConcurrentHashMap<>();
148     //         if (!attributes.compareAndSet(null, map)) {
149     //             map = attributes;
150     //         }
151     //     }
152     //     return map;
153     // }
154 
155     // override
156     Listener getListener() {
157         return listener;
158     }
159 
160     // override
161     void setListener(Listener listener) {
162         this.listener = listener;
163     }
164 
165     // override
166     void process(Frame frame, Callback callback) {
167         // notIdle();
168         switch (frame.getType()) {
169             case FrameType.HEADERS: {
170                 onHeaders(cast(HeadersFrame) frame, callback);
171                 break;
172             }
173             case FrameType.DATA: {
174                 onData(cast(DataFrame) frame, callback);
175                 break;
176             }
177             case FrameType.RST_STREAM: {
178                 onReset(cast(ResetFrame) frame, callback);
179                 break;
180             }
181             case FrameType.PUSH_PROMISE: {
182                 onPush(cast(PushPromiseFrame) frame, callback);
183                 break;
184             }
185             case FrameType.WINDOW_UPDATE: {
186                 onWindowUpdate(cast(WindowUpdateFrame) frame, callback);
187                 break;
188             }
189             default: {
190                 throw new UnsupportedOperationException("");
191             }
192         }
193     }
194 
195     private void onHeaders(HeadersFrame frame, Callback callback) {
196         if (updateClose(frame.isEndStream(), CloseStateEvent.RECEIVED))
197             session.removeStream(this);
198         callback.succeeded();
199     }
200 
201     private void onData(DataFrame frame, Callback callback) {
202         if (getRecvWindow() < 0) {
203             // It's a bad client, it does not deserve to be
204             // treated gently by just resetting the stream.
205             session.close(ErrorCode.FLOW_CONTROL_ERROR, "stream_window_exceeded", Callback.NOOP);
206             callback.failed(new IOException("stream_window_exceeded"));
207             return;
208         }
209 
210         // SPEC: remotely closed streams must be replied with a reset.
211         if (isRemotelyClosed()) {
212             reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR), Callback.NOOP);
213             callback.failed(new EOFException("stream_closed"));
214             return;
215         }
216 
217         if (isReset()) {
218             // Just drop the frame.
219             callback.failed(new IOException("stream_reset"));
220             return;
221         }
222 
223         if (updateClose(frame.isEndStream(), CloseStateEvent.RECEIVED))
224             session.removeStream(this);
225         notifyData(this, frame, callback);
226     }
227 
228     private void onReset(ResetFrame frame, Callback callback) {
229         remoteReset = true;
230         close();
231         session.removeStream(this);
232         notifyReset(this, frame, callback);
233     }
234 
235     private void onPush(PushPromiseFrame frame, Callback callback) {
236         // Pushed streams are implicitly locally closed.
237         // They are closed when receiving an end-stream DATA frame.
238         updateClose(true, CloseStateEvent.AFTER_SEND);
239         callback.succeeded();
240     }
241 
242     private void onWindowUpdate(WindowUpdateFrame frame, Callback callback) {
243         callback.succeeded();
244     }
245 
246     override
247     bool updateClose(bool update, CloseStateEvent event) {
248         version(HUNT_DEBUG) {
249             tracef("Update close for %s update=%s event=%s", this, update, event);
250         }
251 
252         if (!update)
253             return false;
254 
255         switch (event) {
256             case CloseStateEvent.RECEIVED:
257                 return updateCloseAfterReceived();
258             case CloseStateEvent.BEFORE_SEND:
259                 return updateCloseBeforeSend();
260             case CloseStateEvent.AFTER_SEND:
261                 return updateCloseAfterSend();
262             default:
263                 return false;
264         }
265     }
266 
267     private bool updateCloseAfterReceived() {
268         while (true) {
269             CloseState current = closeState;
270             switch (current) {
271                 case CloseState.NOT_CLOSED: {
272                     if (closeState == current)
273                     {
274                         closeState = CloseState.REMOTELY_CLOSED;
275                         return false;
276                     }
277                     break;
278                 }
279                 case CloseState.LOCALLY_CLOSING: {
280                     // if (closeState.compareAndSet(current, CloseState.CLOSING)) {
281                     if (closeState == current)
282                     {
283                         closeState = CloseState.CLOSING;
284                         updateStreamCount(0, 1);
285                         return false;
286                     }
287                     break;
288                 }
289                 case CloseState.LOCALLY_CLOSED: {
290                     close();
291                     return true;
292                 }
293                 default: {
294                     return false;
295                 }
296             }
297         }
298     }
299 
300     private bool updateCloseBeforeSend() {
301         while (true) {
302             CloseState current = closeState;
303             switch (current) {
304                 case CloseState.NOT_CLOSED: {
305                     // if (closeState.compareAndSet(current, CloseState.LOCALLY_CLOSING))
306                     if (closeState == current)
307                     {
308                         closeState = CloseState.LOCALLY_CLOSING;
309                         return false;
310                     }
311                     break;
312                 }
313                 case CloseState.REMOTELY_CLOSED: {
314                     // if (closeState.compareAndSet(current, CloseState.CLOSING)) {
315                     if (closeState == current)
316                     {
317                         closeState = CloseState.CLOSING;
318                         updateStreamCount(0, 1);
319                         return false;
320                     }
321                     break;
322                 }
323                 default: {
324                     return false;
325                 }
326             }
327         }
328     }
329 
330     private bool updateCloseAfterSend() {
331         while (true) {
332             CloseState current = closeState;
333             switch (current) {
334                 case CloseState.NOT_CLOSED:
335                 case CloseState.LOCALLY_CLOSING: {
336                     // if (closeState.compareAndSet(current, CloseState.LOCALLY_CLOSED))
337                     if (closeState == current)
338                     {
339                         closeState = CloseState.LOCALLY_CLOSING;
340                         return false;
341                     }
342                     break;
343                 }
344                 case CloseState.REMOTELY_CLOSED:
345                 case CloseState.CLOSING: {
346                     close();
347                     return true;
348                 }
349                 default: {
350                     return false;
351                 }
352             }
353         }
354     }
355 
356     int getSendWindow() {
357         return sendWindow;
358     }
359 
360     int getRecvWindow() {
361         return recvWindow;
362     }
363 
364     override
365     int updateSendWindow(int delta) {
366         int r = sendWindow; sendWindow += delta;
367         return r;
368         // return sendWindow.getAndAdd(delta);
369     }
370 
371     override
372     int updateRecvWindow(int delta) {
373         int r = recvWindow; recvWindow += delta;
374         return r;
375     }
376 
377     override
378     void close() {
379         CloseState oldState = closeState;
380         closeState = CloseState.CLOSED;
381         if (oldState != CloseState.CLOSED) {
382             int deltaClosing = oldState == CloseState.CLOSING ? -1 : 0;
383             updateStreamCount(-1, deltaClosing);
384             // onClose();
385         }
386     }
387 
388     private void updateStreamCount(int deltaStream, int deltaClosing) {
389         (cast(Http2Session) session).updateStreamCount(isLocal(), deltaStream, deltaClosing);
390     }
391 
392     private void notifyData(Stream stream, DataFrame frame, Callback callback) {
393         Listener listener = this.listener;
394         if (listener is null)
395             return;
396         try {
397             listener.onData(stream, frame, callback);
398         } catch (Exception x) {
399             info("Failure while notifying listener " ~ listener.toString(), x);
400         }
401     }
402 
403     private void notifyReset(Stream stream, ResetFrame frame, Callback callback) {
404         Listener listener = this.listener;
405         if (listener is null)
406             return;
407         try {
408             listener.onReset(stream, frame, callback);
409         } catch (Exception x) {
410             info("Failure while notifying listener " ~ listener.toString(), x);
411         }
412     }
413 
414     private bool notifyIdleTimeout(Stream stream, Exception failure) {
415         Listener listener = this.listener;
416         if (listener is null)
417             return true;
418         try {
419             return listener.onIdleTimeout(stream, failure);
420         } catch (Exception x) {
421             info("Failure while notifying listener " ~ listener.toString(), x);
422             return true;
423         }
424     }
425 
426     override
427     string toString() {
428         return format("%s@%x#%d{sendWindow=%s,recvWindow=%s,reset=%b,%s}", typeof(this).stringof,
429                 toHash(), getId(), sendWindow, recvWindow, isReset(), closeState);
430     }
431 }