1 module hunt.http.codec.websocket.stream.IOState;
2 
3 import hunt.http.codec.websocket.model.CloseInfo;
4 import hunt.http.WebSocketStatusCode;
5 import hunt.http.WebSocketConnection;
6 
7 import hunt.collection;
8 import hunt.logging;
9 import hunt.Exceptions;
10 import hunt.text.Common;
11 import hunt.util.StringBuilder;
12 
13 import std.array;
14 import std.conv;
15 
16 
17 alias ConnectionStateListener = void delegate(WebSocketConnectionState state);
18 
19 /**
20  * Simple state tracker for Input / Output and {@link WebSocketConnectionState}.
21  * <p>
22  * Use the various known .on*() methods to trigger a state change.
23  * <ul>
24  * <li>{@link #onOpened()} - connection has been opened</li>
25  * </ul>
26  */
27 class IOState {
28     /**
29      * The source of a close handshake. (ie: who initiated it).
30      */
31     private enum CloseHandshakeSource {
32         /**
33          * No close handshake initiated (yet)
34          */
35         NONE,
36         /**
37          * Local side initiated the close handshake
38          */
39         LOCAL,
40         /**
41          * Remote side initiated the close handshake
42          */
43         REMOTE,
44         /**
45          * An abnormal close situation (disconnect, timeout, etc...)
46          */
47         ABNORMAL
48     }
49 
50     private WebSocketConnectionState state;
51     private List!(ConnectionStateListener) listeners;
52 
53     /**
54      * Is input on websocket available (for reading frames).
55      * Used to determine close handshake completion, and track half-close states
56      */
57     private bool inputAvailable;
58     /**
59      * Is output on websocket available (for writing frames).
60      * Used to determine close handshake completion, and track half-closed states.
61      */
62     private bool outputAvailable;
63     /**
64      * Initiator of the close handshake.
65      * Used to determine who initiated a close handshake for reply reasons.
66      */
67     private CloseHandshakeSource closeHandshakeSource;
68     /**
69      * The close info for the initiator of the close handshake.
70      * It is possible in abnormal close scenarios to have a different
71      * final close info that is used to notify the WS-Endpoint's onClose()
72      * events with.
73      */
74     private CloseInfo closeInfo;
75     /**
76      * Atomic reference to the final close info.
77      * This can only be set once, and is used for the WS-Endpoint's onClose()
78      * event.
79      */
80     private CloseInfo finalClose;
81     /**
82      * Tracker for if the close handshake was completed successfully by
83      * both sides.  False if close was sudden or abnormal.
84      */
85     private bool cleanClose;
86 
87     /**
88      * Create a new IOState, initialized to {@link WebSocketConnectionState#CONNECTING}
89      */
90     this() {
91         // finalClose = new AtomicReference!(CloseInfo)();
92         // listeners = new CopyOnWriteArrayList!(ConnectionStateListener)();
93         listeners = new ArrayList!(ConnectionStateListener)();
94 
95         this.state = WebSocketConnectionState.CONNECTING;
96         this.inputAvailable = false;
97         this.outputAvailable = false;
98         this.closeHandshakeSource = CloseHandshakeSource.NONE;
99         this.closeInfo = null;
100         this.cleanClose = false;
101     }
102 
103     void addListener(ConnectionStateListener listener) {
104         assert(listener !is null);
105         listeners.add(listener);
106     }
107 
108     void assertInputOpen() {
109         if (!isInputAvailable()) {
110             throw new IOException("Connection input is closed");
111         }
112     }
113 
114     void assertOutputOpen() {
115         if (!isOutputAvailable()) {
116             throw new IOException("Connection output is closed");
117         }
118     }
119 
120     CloseInfo getCloseInfo() {
121         CloseInfo ci = finalClose;
122         if (ci !is null) {
123             return ci;
124         }
125         return closeInfo;
126     }
127 
128     WebSocketConnectionState getConnectionState() {
129         return state;
130     }
131 
132     bool isClosed() {
133         synchronized (this) {
134             return (state == WebSocketConnectionState.CLOSED);
135         }
136     }
137 
138     bool isInputAvailable() {
139         return inputAvailable;
140     }
141 
142     bool isOpen() {
143         return !isClosed();
144     }
145 
146     bool isOutputAvailable() {
147         return outputAvailable;
148     }
149 
150     private void notifyStateListeners(WebSocketConnectionState state) {
151         version(HUNT_HTTP_DEBUG)
152             tracef("Notify State Listeners(%d): %s", listeners.size(), state);
153         foreach (ConnectionStateListener listener ; listeners) {
154             listener(state);
155         }
156     }
157 
158     /**
159      * A websocket connection has been disconnected for abnormal close reasons.
160      * <p>
161      * This is the low level disconnect of the socket. It could be the result of a normal close operation, from an IO error, or even from a timeout.
162      *
163      * @param close the close information
164      */
165     void onAbnormalClose(CloseInfo close) {
166         version(HUNT_HTTP_DEBUG)
167             tracef("onAbnormalClose(%s)", close);
168         WebSocketConnectionState event = WebSocketConnectionState.Unknown;
169         synchronized (this) {
170             if (this.state == WebSocketConnectionState.CLOSED) {
171                 // already closed
172                 return;
173             }
174 
175             if (this.state == WebSocketConnectionState.OPEN) {
176                 this.cleanClose = false;
177             }
178 
179             this.state = WebSocketConnectionState.CLOSED;
180             finalClose = close;
181             // finalClose.compareAndSet(null, close);
182             this.inputAvailable = false;
183             this.outputAvailable = false;
184             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
185             event = this.state;
186         }
187         notifyStateListeners(event);
188     }
189 
190     /**
191      * A close handshake has been issued from the local endpoint
192      *
193      * @param closeInfo the close information
194      */
195     void onCloseLocal(CloseInfo closeInfo) {
196         bool open = false;
197         synchronized (this) {
198             WebSocketConnectionState initialState = this.state;
199             version(HUNT_HTTP_DEBUG)
200                 tracef("onCloseLocal(%s) : %s", closeInfo, initialState);
201             if (initialState == WebSocketConnectionState.CLOSED) {
202                 // already closed
203                 version(HUNT_HTTP_DEBUG)
204                     tracef("already closed");
205                 return;
206             }
207 
208             if (initialState == WebSocketConnectionState.CONNECTED) {
209                 // fast close. a local close request from end-user onConnect/onOpen method
210                 version(HUNT_HTTP_DEBUG)
211                     tracef("FastClose in CONNECTED detected");
212                 open = true;
213             }
214         }
215 
216         if (open)
217             openAndCloseLocal(closeInfo);
218         else
219             closeLocal(closeInfo);
220     }
221 
222     private void openAndCloseLocal(CloseInfo closeInfo) {
223         // Force the state open (to allow read/write to endpoint)
224         onOpened();
225         version(HUNT_HTTP_DEBUG)
226             tracef("FastClose continuing with Closure");
227         closeLocal(closeInfo);
228     }
229 
230     private void closeLocal(CloseInfo closeInfo) {
231         WebSocketConnectionState event = WebSocketConnectionState.Unknown;
232         WebSocketConnectionState abnormalEvent = WebSocketConnectionState.Unknown;
233         synchronized (this) {
234             version(HUNT_HTTP_DEBUG)
235                 tracef("onCloseLocal(), input=%s, output=%s", inputAvailable, outputAvailable);
236 
237             this.closeInfo = closeInfo;
238 
239             // Turn off further output.
240             outputAvailable = false;
241 
242             if (closeHandshakeSource == CloseHandshakeSource.NONE) {
243                 closeHandshakeSource = CloseHandshakeSource.LOCAL;
244             }
245 
246             if (!inputAvailable) {
247                 version(HUNT_HTTP_DEBUG)
248                     tracef("Close Handshake satisfied, disconnecting");
249                 cleanClose = true;
250                 this.state = WebSocketConnectionState.CLOSED;
251                 // finalClose.compareAndSet(null, closeInfo);
252                 finalClose = closeInfo;
253                 event = this.state;
254             } else if (this.state == WebSocketConnectionState.OPEN) {
255                 // We are now entering CLOSING (or half-closed).
256                 this.state = WebSocketConnectionState.CLOSING;
257                 event = this.state;
258 
259                 // If abnormal, we don't expect an answer.
260                 if (closeInfo.isAbnormal()) {
261                     abnormalEvent = WebSocketConnectionState.CLOSED;
262                     // finalClose.compareAndSet(null, closeInfo);
263                     finalClose = closeInfo;
264                     cleanClose = false;
265                     outputAvailable = false;
266                     inputAvailable = false;
267                     closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
268                 }
269             }
270         }
271 
272         // Only notify on state change events
273         if (event != WebSocketConnectionState.Unknown) {
274             notifyStateListeners(event);
275             if (abnormalEvent != WebSocketConnectionState.Unknown) {
276                 notifyStateListeners(abnormalEvent);
277             }
278         }
279     }
280 
281     /**
282      * A close handshake has been received from the remote endpoint
283      *
284      * @param closeInfo the close information
285      */
286     void onCloseRemote(CloseInfo closeInfo) {
287         version(HUNT_HTTP_DEBUG_MORE)
288             tracef("onCloseRemote(%s)", closeInfo);
289         WebSocketConnectionState event = WebSocketConnectionState.Unknown;
290         synchronized (this) {
291             if (this.state == WebSocketConnectionState.CLOSED) {
292                 // already closed
293                 return;
294             }
295 
296             version(HUNT_HTTP_DEBUG_MORE)
297                 tracef("onCloseRemote(), input=%s, output=%s", inputAvailable, outputAvailable);
298 
299             this.closeInfo = closeInfo;
300 
301             // turn off further input
302             inputAvailable = false;
303 
304             if (closeHandshakeSource == CloseHandshakeSource.NONE) {
305                 closeHandshakeSource = CloseHandshakeSource.REMOTE;
306             }
307 
308             if (!outputAvailable) {
309                 version(HUNT_HTTP_DEBUG) tracef("Close Handshake satisfied, disconnecting");
310                 cleanClose = true;
311                 state = WebSocketConnectionState.CLOSED;
312                 finalClose = closeInfo;
313                 // finalClose.compareAndSet(null, closeInfo);
314                 event = this.state;
315             } else if (this.state == WebSocketConnectionState.OPEN) {
316                 // We are now entering CLOSING (or half-closed)
317                 this.state = WebSocketConnectionState.CLOSING;
318                 event = this.state;
319             }
320         }
321 
322         // Only notify on state change events
323         if (event != WebSocketConnectionState.Unknown) {
324             notifyStateListeners(event);
325         }
326     }
327 
328     /**
329      * WebSocket has successfully upgraded, but the end-user onOpen call hasn't run yet.
330      * <p>
331      * This is an intermediate state between the RFC's {@link WebSocketConnectionState#CONNECTING} and {@link WebSocketConnectionState#OPEN}
332      */
333     void onConnected() {
334         WebSocketConnectionState event = WebSocketConnectionState.Unknown;
335         synchronized (this) {
336             if (this.state != WebSocketConnectionState.CONNECTING) {
337                 tracef("Unable to set to connected, not in CONNECTING state: %s", this.state);
338                 return;
339             }
340 
341             this.state = WebSocketConnectionState.CONNECTED;
342             inputAvailable = false; // cannot read (yet)
343             outputAvailable = true; // write allowed
344             event = this.state;
345         }
346         notifyStateListeners(event);
347     }
348 
349     /**
350      * A websocket connection has failed its upgrade handshake, and is now closed.
351      */
352     void onFailedUpgrade() {
353         assert (this.state == WebSocketConnectionState.CONNECTING);
354         WebSocketConnectionState event = WebSocketConnectionState.Unknown;
355         synchronized (this) {
356             this.state = WebSocketConnectionState.CLOSED;
357             cleanClose = false;
358             inputAvailable = false;
359             outputAvailable = false;
360             event = this.state;
361         }
362         notifyStateListeners(event);
363     }
364 
365     /**
366      * A websocket connection has finished its upgrade handshake, and is now open.
367      */
368     void onOpened() {
369         version(HUNT_HTTP_DEBUG) tracef("state: %s", this.state);
370 
371         WebSocketConnectionState event = WebSocketConnectionState.Unknown;
372         synchronized (this) {
373             if (this.state == WebSocketConnectionState.OPEN || 
374                 this.state == WebSocketConnectionState.CONNECTING) {
375                 // already opened
376                 return;
377             }
378 
379             if (this.state != WebSocketConnectionState.CONNECTED) {
380                 warningf("Unable to open, not in CONNECTED state: %s", this.state);
381                 return;
382             }
383 
384             this.state = WebSocketConnectionState.OPEN;
385             this.inputAvailable = true;
386             this.outputAvailable = true;
387             event = this.state;
388         }
389         notifyStateListeners(event);
390     }
391 
392     /**
393      * The local endpoint has reached a read failure.
394      * <p>
395      * This could be a normal result after a proper close handshake, or even a premature close due to a connection disconnect.
396      *
397      * @param t the read failure
398      */
399     void onReadFailure(Throwable t) {
400         WebSocketConnectionState event = WebSocketConnectionState.Unknown;
401         synchronized (this) {
402             if (this.state == WebSocketConnectionState.CLOSED) {
403                 // already closed
404                 return;
405             }
406 
407             // Build out Close Reason
408             string reason = "WebSocket Read Failure";
409             EOFException ee = cast(EOFException)t;
410             if (ee !is null) {
411                 reason = "WebSocket Read EOF";
412                 Throwable cause = t.next();
413                 if ((cause !is null) && (!cause.message().empty())) {
414                     reason = "EOF: " ~ cast(string)cause.message();
415                 }
416             } else {
417                 if (!t.message().empty()) {
418                     reason = cast(string)t.message();
419                 }
420             }
421 
422             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL, reason);
423             finalClose = close;
424             // finalClose.compareAndSet(null, close);
425 
426             this.cleanClose = false;
427             this.state = WebSocketConnectionState.CLOSED;
428             this.closeInfo = close;
429             this.inputAvailable = false;
430             this.outputAvailable = false;
431             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
432             event = this.state;
433         }
434         notifyStateListeners(event);
435     }
436 
437     /**
438      * The local endpoint has reached a write failure.
439      * <p>
440      * A low level I/O failure, or even a hunt side EndPoint close (from idle timeout) are a few scenarios
441      *
442      * @param t the throwable that caused the write failure
443      */
444     void onWriteFailure(Throwable t) {
445         WebSocketConnectionState event = WebSocketConnectionState.Unknown;
446         synchronized (this) {
447             if (this.state == WebSocketConnectionState.CLOSED) {
448                 // already closed
449                 return;
450             }
451 
452             // Build out Close Reason
453             string reason = "WebSocket Write Failure";
454             EOFException ee = cast(EOFException)t;
455             if (ee !is null) {
456                 reason = "WebSocket Write EOF";
457                 Throwable cause = t.next();
458 
459                 if ((cause !is null) && (!cause.message().empty())) {
460                     reason = "EOF: " ~ cast(string)cause.message();
461                 }
462             } else {
463                 if (!t.message().empty()) {
464                     reason = cast(string)t.message();
465                 }
466             }
467 
468             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL, reason);
469             finalClose = close;
470             // finalClose.compareAndSet(null, close);
471 
472             this.cleanClose = false;
473             this.state = WebSocketConnectionState.CLOSED;
474             this.inputAvailable = false;
475             this.outputAvailable = false;
476             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
477             event = this.state;
478         }
479         notifyStateListeners(event);
480     }
481 
482     void onDisconnected() {
483         WebSocketConnectionState event = WebSocketConnectionState.Unknown;
484         synchronized (this) {
485             if (this.state == WebSocketConnectionState.CLOSED) {
486                 // already closed
487                 return;
488             }
489 
490             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL, "Disconnected");
491 
492             this.cleanClose = false;
493             this.state = WebSocketConnectionState.CLOSED;
494             this.closeInfo = close;
495             this.inputAvailable = false;
496             this.outputAvailable = false;
497             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
498             event = this.state;
499         }
500         notifyStateListeners(event);
501     }
502 
503     override
504     string toString() {
505         StringBuilder str = new StringBuilder();
506         str.append(typeid(this).name);
507         str.append("@").append(to!string(toHash(), 16));
508         str.append("[").append(state);
509         str.append(',');
510         if (!inputAvailable) {
511             str.append('!');
512         }
513         str.append("in,");
514         if (!outputAvailable) {
515             str.append('!');
516         }
517         str.append("out");
518         if ((state == WebSocketConnectionState.CLOSED) || (state == WebSocketConnectionState.CLOSING)) {
519             CloseInfo ci = finalClose;
520             if (ci !is null) {
521                 str.append(",finalClose=").append(ci.toString());
522             } else {
523                 str.append(",close=").append(closeInfo.toString());
524             }
525             str.append(",clean=").append(cleanClose);
526             str.append(",closeSource=").append(closeHandshakeSource);
527         }
528         str.append(']');
529         return str.toString();
530     }
531 
532     bool wasAbnormalClose() {
533         return closeHandshakeSource == CloseHandshakeSource.ABNORMAL;
534     }
535 
536     bool wasCleanClose() {
537         return cleanClose;
538     }
539 
540     bool wasLocalCloseInitiated() {
541         return closeHandshakeSource == CloseHandshakeSource.LOCAL;
542     }
543 
544     bool wasRemoteCloseInitiated() {
545         return closeHandshakeSource == CloseHandshakeSource.REMOTE;
546     }
547 
548 }