1 module hunt.http.codec.websocket.stream.IOState;
2 
3 import hunt.http.codec.websocket.model.CloseInfo;
4 import hunt.http.codec.websocket.model.common;
5 import hunt.http.codec.websocket.model.StatusCode;
6 
7 import hunt.container;
8 import hunt.logging;
9 import hunt.lang.exception;
10 import hunt.string;
11 
12 import std.array;
13 import std.conv;
14 
15 /**
16  * Simple state tracker for Input / Output and {@link ConnectionState}.
17  * <p>
18  * Use the various known .on*() methods to trigger a state change.
19  * <ul>
20  * <li>{@link #onOpened()} - connection has been opened</li>
21  * </ul>
22  */
23 class IOState {
24     /**
25      * The source of a close handshake. (ie: who initiated it).
26      */
27     private enum CloseHandshakeSource {
28         /**
29          * No close handshake initiated (yet)
30          */
31         NONE,
32         /**
33          * Local side initiated the close handshake
34          */
35         LOCAL,
36         /**
37          * Remote side initiated the close handshake
38          */
39         REMOTE,
40         /**
41          * An abnormal close situation (disconnect, timeout, etc...)
42          */
43         ABNORMAL
44     }
45 
46     interface ConnectionStateListener {
47         void onConnectionStateChange(ConnectionState state);
48     }
49 
50 
51     private ConnectionState state;
52     private List!(ConnectionStateListener) listeners;
53 
54     /**
55      * Is input on websocket available (for reading frames).
56      * Used to determine close handshake completion, and track half-close states
57      */
58     private bool inputAvailable;
59     /**
60      * Is output on websocket available (for writing frames).
61      * Used to determine close handshake completion, and track half-closed states.
62      */
63     private bool outputAvailable;
64     /**
65      * Initiator of the close handshake.
66      * Used to determine who initiated a close handshake for reply reasons.
67      */
68     private CloseHandshakeSource closeHandshakeSource;
69     /**
70      * The close info for the initiator of the close handshake.
71      * It is possible in abnormal close scenarios to have a different
72      * final close info that is used to notify the WS-Endpoint's onClose()
73      * events with.
74      */
75     private CloseInfo closeInfo;
76     /**
77      * Atomic reference to the final close info.
78      * This can only be set once, and is used for the WS-Endpoint's onClose()
79      * event.
80      */
81     private CloseInfo finalClose;
82     /**
83      * Tracker for if the close handshake was completed successfully by
84      * both sides.  False if close was sudden or abnormal.
85      */
86     private bool cleanClose;
87 
88     /**
89      * Create a new IOState, initialized to {@link ConnectionState#CONNECTING}
90      */
91     this() {
92         // finalClose = new AtomicReference!(CloseInfo)();
93         // listeners = new CopyOnWriteArrayList!(ConnectionStateListener)();
94         listeners = new ArrayList!(ConnectionStateListener)();
95 
96         this.state = ConnectionState.CONNECTING;
97         this.inputAvailable = false;
98         this.outputAvailable = false;
99         this.closeHandshakeSource = CloseHandshakeSource.NONE;
100         this.closeInfo = null;
101         this.cleanClose = false;
102     }
103 
104     void addListener(ConnectionStateListener listener) {
105         listeners.add(listener);
106     }
107 
108     void assertInputOpen() {
109         if (!isInputAvailable()) {
110             throw new IOException("Connection input is closed");
111         }
112     }
113 
114     void assertOutputOpen() {
115         if (!isOutputAvailable()) {
116             throw new IOException("Connection output is closed");
117         }
118     }
119 
120     CloseInfo getCloseInfo() {
121         CloseInfo ci = finalClose;
122         if (ci !is null) {
123             return ci;
124         }
125         return closeInfo;
126     }
127 
128     ConnectionState getConnectionState() {
129         return state;
130     }
131 
132     bool isClosed() {
133         synchronized (this) {
134             return (state == ConnectionState.CLOSED);
135         }
136     }
137 
138     bool isInputAvailable() {
139         return inputAvailable;
140     }
141 
142     bool isOpen() {
143         return !isClosed();
144     }
145 
146     bool isOutputAvailable() {
147         return outputAvailable;
148     }
149 
150     private void notifyStateListeners(ConnectionState state) {
151         version(HUNT_DEBUG)
152             tracef("Notify State Listeners: %s", state);
153         foreach (ConnectionStateListener listener ; listeners) {
154             version(HUNT_DEBUG) {
155                 tracef("%s.onConnectionStateChange(%s)", typeid(listener).name, to!string(state));
156             }
157             listener.onConnectionStateChange(state);
158         }
159     }
160 
161     /**
162      * A websocket connection has been disconnected for abnormal close reasons.
163      * <p>
164      * This is the low level disconnect of the socket. It could be the result of a normal close operation, from an IO error, or even from a timeout.
165      *
166      * @param close the close information
167      */
168     void onAbnormalClose(CloseInfo close) {
169         version(HUNT_DEBUG)
170             tracef("onAbnormalClose(%s)", close);
171         ConnectionState event = ConnectionState.Unknown;
172         synchronized (this) {
173             if (this.state == ConnectionState.CLOSED) {
174                 // already closed
175                 return;
176             }
177 
178             if (this.state == ConnectionState.OPEN) {
179                 this.cleanClose = false;
180             }
181 
182             this.state = ConnectionState.CLOSED;
183             finalClose = close;
184             // finalClose.compareAndSet(null, close);
185             this.inputAvailable = false;
186             this.outputAvailable = false;
187             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
188             event = this.state;
189         }
190         notifyStateListeners(event);
191     }
192 
193     /**
194      * A close handshake has been issued from the local endpoint
195      *
196      * @param closeInfo the close information
197      */
198     void onCloseLocal(CloseInfo closeInfo) {
199         bool open = false;
200         synchronized (this) {
201             ConnectionState initialState = this.state;
202             version(HUNT_DEBUG)
203                 tracef("onCloseLocal(%s) : %s", closeInfo, initialState);
204             if (initialState == ConnectionState.CLOSED) {
205                 // already closed
206                 version(HUNT_DEBUG)
207                     tracef("already closed");
208                 return;
209             }
210 
211             if (initialState == ConnectionState.CONNECTED) {
212                 // fast close. a local close request from end-user onConnect/onOpen method
213                 version(HUNT_DEBUG)
214                     tracef("FastClose in CONNECTED detected");
215                 open = true;
216             }
217         }
218 
219         if (open)
220             openAndCloseLocal(closeInfo);
221         else
222             closeLocal(closeInfo);
223     }
224 
225     private void openAndCloseLocal(CloseInfo closeInfo) {
226         // Force the state open (to allow read/write to endpoint)
227         onOpened();
228         version(HUNT_DEBUG)
229             tracef("FastClose continuing with Closure");
230         closeLocal(closeInfo);
231     }
232 
233     private void closeLocal(CloseInfo closeInfo) {
234         ConnectionState event = ConnectionState.Unknown;
235         ConnectionState abnormalEvent = ConnectionState.Unknown;
236         synchronized (this) {
237             version(HUNT_DEBUG)
238                 tracef("onCloseLocal(), input=%s, output=%s", inputAvailable, outputAvailable);
239 
240             this.closeInfo = closeInfo;
241 
242             // Turn off further output.
243             outputAvailable = false;
244 
245             if (closeHandshakeSource == CloseHandshakeSource.NONE) {
246                 closeHandshakeSource = CloseHandshakeSource.LOCAL;
247             }
248 
249             if (!inputAvailable) {
250                 version(HUNT_DEBUG)
251                     tracef("Close Handshake satisfied, disconnecting");
252                 cleanClose = true;
253                 this.state = ConnectionState.CLOSED;
254                 // finalClose.compareAndSet(null, closeInfo);
255                 finalClose = closeInfo;
256                 event = this.state;
257             } else if (this.state == ConnectionState.OPEN) {
258                 // We are now entering CLOSING (or half-closed).
259                 this.state = ConnectionState.CLOSING;
260                 event = this.state;
261 
262                 // If abnormal, we don't expect an answer.
263                 if (closeInfo.isAbnormal()) {
264                     abnormalEvent = ConnectionState.CLOSED;
265                     // finalClose.compareAndSet(null, closeInfo);
266                     finalClose = closeInfo;
267                     cleanClose = false;
268                     outputAvailable = false;
269                     inputAvailable = false;
270                     closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
271                 }
272             }
273         }
274 
275         // Only notify on state change events
276         if (event != ConnectionState.Unknown) {
277             notifyStateListeners(event);
278             if (abnormalEvent != ConnectionState.Unknown) {
279                 notifyStateListeners(abnormalEvent);
280             }
281         }
282     }
283 
284     /**
285      * A close handshake has been received from the remote endpoint
286      *
287      * @param closeInfo the close information
288      */
289     void onCloseRemote(CloseInfo closeInfo) {
290         version(HUNT_DEBUG)
291             tracef("onCloseRemote(%s)", closeInfo);
292         ConnectionState event = ConnectionState.Unknown;
293         synchronized (this) {
294             if (this.state == ConnectionState.CLOSED) {
295                 // already closed
296                 return;
297             }
298 
299             version(HUNT_DEBUG)
300                 tracef("onCloseRemote(), input=%s, output=%s", inputAvailable, outputAvailable);
301 
302             this.closeInfo = closeInfo;
303 
304             // turn off further input
305             inputAvailable = false;
306 
307             if (closeHandshakeSource == CloseHandshakeSource.NONE) {
308                 closeHandshakeSource = CloseHandshakeSource.REMOTE;
309             }
310 
311             if (!outputAvailable) {
312                 tracef("Close Handshake satisfied, disconnecting");
313                 cleanClose = true;
314                 state = ConnectionState.CLOSED;
315                 finalClose = closeInfo;
316                 // finalClose.compareAndSet(null, closeInfo);
317                 event = this.state;
318             } else if (this.state == ConnectionState.OPEN) {
319                 // We are now entering CLOSING (or half-closed)
320                 this.state = ConnectionState.CLOSING;
321                 event = this.state;
322             }
323         }
324 
325         // Only notify on state change events
326         if (event != ConnectionState.Unknown) {
327             notifyStateListeners(event);
328         }
329     }
330 
331     /**
332      * WebSocket has successfully upgraded, but the end-user onOpen call hasn't run yet.
333      * <p>
334      * This is an intermediate state between the RFC's {@link ConnectionState#CONNECTING} and {@link ConnectionState#OPEN}
335      */
336     void onConnected() {
337         ConnectionState event = ConnectionState.Unknown;
338         synchronized (this) {
339             if (this.state != ConnectionState.CONNECTING) {
340                 tracef("Unable to set to connected, not in CONNECTING state: %s", this.state);
341                 return;
342             }
343 
344             this.state = ConnectionState.CONNECTED;
345             inputAvailable = false; // cannot read (yet)
346             outputAvailable = true; // write allowed
347             event = this.state;
348         }
349         notifyStateListeners(event);
350     }
351 
352     /**
353      * A websocket connection has failed its upgrade handshake, and is now closed.
354      */
355     void onFailedUpgrade() {
356         assert (this.state == ConnectionState.CONNECTING);
357         ConnectionState event = ConnectionState.Unknown;
358         synchronized (this) {
359             this.state = ConnectionState.CLOSED;
360             cleanClose = false;
361             inputAvailable = false;
362             outputAvailable = false;
363             event = this.state;
364         }
365         notifyStateListeners(event);
366     }
367 
368     /**
369      * A websocket connection has finished its upgrade handshake, and is now open.
370      */
371     void onOpened() {
372         version(HUNT_DEBUG) tracef("state: %s", this.state);
373 
374         ConnectionState event = ConnectionState.Unknown;
375         synchronized (this) {
376             if (this.state == ConnectionState.OPEN || 
377                 this.state == ConnectionState.CONNECTING) {
378                 // already opened
379                 return;
380             }
381 
382             if (this.state != ConnectionState.CONNECTED) {
383                 warningf("Unable to open, not in CONNECTED state: %s", this.state);
384                 return;
385             }
386 
387             this.state = ConnectionState.OPEN;
388             this.inputAvailable = true;
389             this.outputAvailable = true;
390             event = this.state;
391         }
392         notifyStateListeners(event);
393     }
394 
395     /**
396      * The local endpoint has reached a read failure.
397      * <p>
398      * This could be a normal result after a proper close handshake, or even a premature close due to a connection disconnect.
399      *
400      * @param t the read failure
401      */
402     void onReadFailure(Throwable t) {
403         ConnectionState event = ConnectionState.Unknown;
404         synchronized (this) {
405             if (this.state == ConnectionState.CLOSED) {
406                 // already closed
407                 return;
408             }
409 
410             // Build out Close Reason
411             string reason = "WebSocket Read Failure";
412             EOFException ee = cast(EOFException)t;
413             if (ee !is null) {
414                 reason = "WebSocket Read EOF";
415                 Throwable cause = t.next();
416                 if ((cause !is null) && (!cause.message().empty())) {
417                     reason = "EOF: " ~ cast(string)cause.message();
418                 }
419             } else {
420                 if (!t.message().empty()) {
421                     reason = cast(string)t.message();
422                 }
423             }
424 
425             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL, reason);
426             finalClose = close;
427             // finalClose.compareAndSet(null, close);
428 
429             this.cleanClose = false;
430             this.state = ConnectionState.CLOSED;
431             this.closeInfo = close;
432             this.inputAvailable = false;
433             this.outputAvailable = false;
434             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
435             event = this.state;
436         }
437         notifyStateListeners(event);
438     }
439 
440     /**
441      * The local endpoint has reached a write failure.
442      * <p>
443      * A low level I/O failure, or even a hunt side EndPoint close (from idle timeout) are a few scenarios
444      *
445      * @param t the throwable that caused the write failure
446      */
447     void onWriteFailure(Throwable t) {
448         ConnectionState event = ConnectionState.Unknown;
449         synchronized (this) {
450             if (this.state == ConnectionState.CLOSED) {
451                 // already closed
452                 return;
453             }
454 
455             // Build out Close Reason
456             string reason = "WebSocket Write Failure";
457             EOFException ee = cast(EOFException)t;
458             if (ee !is null) {
459                 reason = "WebSocket Write EOF";
460                 Throwable cause = t.next();
461 
462                 if ((cause !is null) && (!cause.message().empty())) {
463                     reason = "EOF: " ~ cast(string)cause.message();
464                 }
465             } else {
466                 if (!t.message().empty()) {
467                     reason = cast(string)t.message();
468                 }
469             }
470 
471             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL, reason);
472             finalClose = close;
473             // finalClose.compareAndSet(null, close);
474 
475             this.cleanClose = false;
476             this.state = ConnectionState.CLOSED;
477             this.inputAvailable = false;
478             this.outputAvailable = false;
479             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
480             event = this.state;
481         }
482         notifyStateListeners(event);
483     }
484 
485     void onDisconnected() {
486         ConnectionState event = ConnectionState.Unknown;
487         synchronized (this) {
488             if (this.state == ConnectionState.CLOSED) {
489                 // already closed
490                 return;
491             }
492 
493             CloseInfo close = new CloseInfo(StatusCode.ABNORMAL, "Disconnected");
494 
495             this.cleanClose = false;
496             this.state = ConnectionState.CLOSED;
497             this.closeInfo = close;
498             this.inputAvailable = false;
499             this.outputAvailable = false;
500             this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
501             event = this.state;
502         }
503         notifyStateListeners(event);
504     }
505 
506     override
507     string toString() {
508         StringBuilder str = new StringBuilder();
509         str.append(typeid(this).name);
510         str.append("@").append(to!string(toHash(), 16));
511         str.append("[").append(state);
512         str.append(',');
513         if (!inputAvailable) {
514             str.append('!');
515         }
516         str.append("in,");
517         if (!outputAvailable) {
518             str.append('!');
519         }
520         str.append("out");
521         if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING)) {
522             CloseInfo ci = finalClose;
523             if (ci !is null) {
524                 str.append(",finalClose=").append(ci.toString());
525             } else {
526                 str.append(",close=").append(closeInfo.toString());
527             }
528             str.append(",clean=").append(cleanClose);
529             str.append(",closeSource=").append(closeHandshakeSource);
530         }
531         str.append(']');
532         return str.toString();
533     }
534 
535     bool wasAbnormalClose() {
536         return closeHandshakeSource == CloseHandshakeSource.ABNORMAL;
537     }
538 
539     bool wasCleanClose() {
540         return cleanClose;
541     }
542 
543     bool wasLocalCloseInitiated() {
544         return closeHandshakeSource == CloseHandshakeSource.LOCAL;
545     }
546 
547     bool wasRemoteCloseInitiated() {
548         return closeHandshakeSource == CloseHandshakeSource.REMOTE;
549     }
550 
551 }