1 module hunt.http.codec.websocket.stream.IOState; 2 3 import hunt.http.codec.websocket.model.CloseInfo; 4 import hunt.http.WebSocketStatusCode; 5 import hunt.http.WebSocketConnection; 6 7 import hunt.collection; 8 import hunt.logging; 9 import hunt.Exceptions; 10 import hunt.text.Common; 11 import hunt.util.StringBuilder; 12 13 import std.array; 14 import std.conv; 15 16 17 alias ConnectionStateListener = void delegate(WebSocketConnectionState state); 18 19 /** 20 * Simple state tracker for Input / Output and {@link WebSocketConnectionState}. 21 * <p> 22 * Use the various known .on*() methods to trigger a state change. 23 * <ul> 24 * <li>{@link #onOpened()} - connection has been opened</li> 25 * </ul> 26 */ 27 class IOState { 28 /** 29 * The source of a close handshake. (ie: who initiated it). 30 */ 31 private enum CloseHandshakeSource { 32 /** 33 * No close handshake initiated (yet) 34 */ 35 NONE, 36 /** 37 * Local side initiated the close handshake 38 */ 39 LOCAL, 40 /** 41 * Remote side initiated the close handshake 42 */ 43 REMOTE, 44 /** 45 * An abnormal close situation (disconnect, timeout, etc...) 46 */ 47 ABNORMAL 48 } 49 50 private WebSocketConnectionState state; 51 private List!(ConnectionStateListener) listeners; 52 53 /** 54 * Is input on websocket available (for reading frames). 55 * Used to determine close handshake completion, and track half-close states 56 */ 57 private bool inputAvailable; 58 /** 59 * Is output on websocket available (for writing frames). 60 * Used to determine close handshake completion, and track half-closed states. 61 */ 62 private bool outputAvailable; 63 /** 64 * Initiator of the close handshake. 65 * Used to determine who initiated a close handshake for reply reasons. 66 */ 67 private CloseHandshakeSource closeHandshakeSource; 68 /** 69 * The close info for the initiator of the close handshake. 70 * It is possible in abnormal close scenarios to have a different 71 * final close info that is used to notify the WS-Endpoint's onClose() 72 * events with. 73 */ 74 private CloseInfo closeInfo; 75 /** 76 * Atomic reference to the final close info. 77 * This can only be set once, and is used for the WS-Endpoint's onClose() 78 * event. 79 */ 80 private CloseInfo finalClose; 81 /** 82 * Tracker for if the close handshake was completed successfully by 83 * both sides. False if close was sudden or abnormal. 84 */ 85 private bool cleanClose; 86 87 /** 88 * Create a new IOState, initialized to {@link WebSocketConnectionState#CONNECTING} 89 */ 90 this() { 91 // finalClose = new AtomicReference!(CloseInfo)(); 92 // listeners = new CopyOnWriteArrayList!(ConnectionStateListener)(); 93 listeners = new ArrayList!(ConnectionStateListener)(); 94 95 this.state = WebSocketConnectionState.CONNECTING; 96 this.inputAvailable = false; 97 this.outputAvailable = false; 98 this.closeHandshakeSource = CloseHandshakeSource.NONE; 99 this.closeInfo = null; 100 this.cleanClose = false; 101 } 102 103 void addListener(ConnectionStateListener listener) { 104 assert(listener !is null); 105 listeners.add(listener); 106 } 107 108 void assertInputOpen() { 109 if (!isInputAvailable()) { 110 throw new IOException("Connection input is closed"); 111 } 112 } 113 114 void assertOutputOpen() { 115 if (!isOutputAvailable()) { 116 throw new IOException("Connection output is closed"); 117 } 118 } 119 120 CloseInfo getCloseInfo() { 121 CloseInfo ci = finalClose; 122 if (ci !is null) { 123 return ci; 124 } 125 return closeInfo; 126 } 127 128 WebSocketConnectionState getConnectionState() { 129 return state; 130 } 131 132 bool isClosed() { 133 synchronized (this) { 134 return (state == WebSocketConnectionState.CLOSED); 135 } 136 } 137 138 bool isInputAvailable() { 139 return inputAvailable; 140 } 141 142 bool isOpen() { 143 return !isClosed(); 144 } 145 146 bool isOutputAvailable() { 147 return outputAvailable; 148 } 149 150 private void notifyStateListeners(WebSocketConnectionState state) { 151 version(HUNT_HTTP_DEBUG) 152 tracef("Notify State Listeners(%d): %s", listeners.size(), state); 153 foreach (ConnectionStateListener listener ; listeners) { 154 listener(state); 155 } 156 } 157 158 /** 159 * A websocket connection has been disconnected for abnormal close reasons. 160 * <p> 161 * This is the low level disconnect of the socket. It could be the result of a normal close operation, from an IO error, or even from a timeout. 162 * 163 * @param close the close information 164 */ 165 void onAbnormalClose(CloseInfo close) { 166 version(HUNT_HTTP_DEBUG) 167 tracef("onAbnormalClose(%s)", close); 168 WebSocketConnectionState event = WebSocketConnectionState.Unknown; 169 synchronized (this) { 170 if (this.state == WebSocketConnectionState.CLOSED) { 171 // already closed 172 return; 173 } 174 175 if (this.state == WebSocketConnectionState.OPEN) { 176 this.cleanClose = false; 177 } 178 179 this.state = WebSocketConnectionState.CLOSED; 180 finalClose = close; 181 // finalClose.compareAndSet(null, close); 182 this.inputAvailable = false; 183 this.outputAvailable = false; 184 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL; 185 event = this.state; 186 } 187 notifyStateListeners(event); 188 } 189 190 /** 191 * A close handshake has been issued from the local endpoint 192 * 193 * @param closeInfo the close information 194 */ 195 void onCloseLocal(CloseInfo closeInfo) { 196 bool open = false; 197 synchronized (this) { 198 WebSocketConnectionState initialState = this.state; 199 version(HUNT_HTTP_DEBUG) 200 tracef("onCloseLocal(%s) : %s", closeInfo, initialState); 201 if (initialState == WebSocketConnectionState.CLOSED) { 202 // already closed 203 version(HUNT_HTTP_DEBUG) 204 tracef("already closed"); 205 return; 206 } 207 208 if (initialState == WebSocketConnectionState.CONNECTED) { 209 // fast close. a local close request from end-user onConnect/onOpen method 210 version(HUNT_HTTP_DEBUG) 211 tracef("FastClose in CONNECTED detected"); 212 open = true; 213 } 214 } 215 216 if (open) 217 openAndCloseLocal(closeInfo); 218 else 219 closeLocal(closeInfo); 220 } 221 222 private void openAndCloseLocal(CloseInfo closeInfo) { 223 // Force the state open (to allow read/write to endpoint) 224 onOpened(); 225 version(HUNT_HTTP_DEBUG) 226 tracef("FastClose continuing with Closure"); 227 closeLocal(closeInfo); 228 } 229 230 private void closeLocal(CloseInfo closeInfo) { 231 WebSocketConnectionState event = WebSocketConnectionState.Unknown; 232 WebSocketConnectionState abnormalEvent = WebSocketConnectionState.Unknown; 233 synchronized (this) { 234 version(HUNT_HTTP_DEBUG) 235 tracef("onCloseLocal(), input=%s, output=%s", inputAvailable, outputAvailable); 236 237 this.closeInfo = closeInfo; 238 239 // Turn off further output. 240 outputAvailable = false; 241 242 if (closeHandshakeSource == CloseHandshakeSource.NONE) { 243 closeHandshakeSource = CloseHandshakeSource.LOCAL; 244 } 245 246 if (!inputAvailable) { 247 version(HUNT_HTTP_DEBUG) 248 tracef("Close Handshake satisfied, disconnecting"); 249 cleanClose = true; 250 this.state = WebSocketConnectionState.CLOSED; 251 // finalClose.compareAndSet(null, closeInfo); 252 finalClose = closeInfo; 253 event = this.state; 254 } else if (this.state == WebSocketConnectionState.OPEN) { 255 // We are now entering CLOSING (or half-closed). 256 this.state = WebSocketConnectionState.CLOSING; 257 event = this.state; 258 259 // If abnormal, we don't expect an answer. 260 if (closeInfo.isAbnormal()) { 261 abnormalEvent = WebSocketConnectionState.CLOSED; 262 // finalClose.compareAndSet(null, closeInfo); 263 finalClose = closeInfo; 264 cleanClose = false; 265 outputAvailable = false; 266 inputAvailable = false; 267 closeHandshakeSource = CloseHandshakeSource.ABNORMAL; 268 } 269 } 270 } 271 272 // Only notify on state change events 273 if (event != WebSocketConnectionState.Unknown) { 274 notifyStateListeners(event); 275 if (abnormalEvent != WebSocketConnectionState.Unknown) { 276 notifyStateListeners(abnormalEvent); 277 } 278 } 279 } 280 281 /** 282 * A close handshake has been received from the remote endpoint 283 * 284 * @param closeInfo the close information 285 */ 286 void onCloseRemote(CloseInfo closeInfo) { 287 version(HUNT_HTTP_DEBUG_MORE) 288 tracef("onCloseRemote(%s)", closeInfo); 289 WebSocketConnectionState event = WebSocketConnectionState.Unknown; 290 synchronized (this) { 291 if (this.state == WebSocketConnectionState.CLOSED) { 292 // already closed 293 return; 294 } 295 296 version(HUNT_HTTP_DEBUG_MORE) 297 tracef("onCloseRemote(), input=%s, output=%s", inputAvailable, outputAvailable); 298 299 this.closeInfo = closeInfo; 300 301 // turn off further input 302 inputAvailable = false; 303 304 if (closeHandshakeSource == CloseHandshakeSource.NONE) { 305 closeHandshakeSource = CloseHandshakeSource.REMOTE; 306 } 307 308 if (!outputAvailable) { 309 version(HUNT_HTTP_DEBUG) tracef("Close Handshake satisfied, disconnecting"); 310 cleanClose = true; 311 state = WebSocketConnectionState.CLOSED; 312 finalClose = closeInfo; 313 // finalClose.compareAndSet(null, closeInfo); 314 event = this.state; 315 } else if (this.state == WebSocketConnectionState.OPEN) { 316 // We are now entering CLOSING (or half-closed) 317 this.state = WebSocketConnectionState.CLOSING; 318 event = this.state; 319 } 320 } 321 322 // Only notify on state change events 323 if (event != WebSocketConnectionState.Unknown) { 324 notifyStateListeners(event); 325 } 326 } 327 328 /** 329 * WebSocket has successfully upgraded, but the end-user onOpen call hasn't run yet. 330 * <p> 331 * This is an intermediate state between the RFC's {@link WebSocketConnectionState#CONNECTING} and {@link WebSocketConnectionState#OPEN} 332 */ 333 void onConnected() { 334 WebSocketConnectionState event = WebSocketConnectionState.Unknown; 335 synchronized (this) { 336 if (this.state != WebSocketConnectionState.CONNECTING) { 337 tracef("Unable to set to connected, not in CONNECTING state: %s", this.state); 338 return; 339 } 340 341 this.state = WebSocketConnectionState.CONNECTED; 342 inputAvailable = false; // cannot read (yet) 343 outputAvailable = true; // write allowed 344 event = this.state; 345 } 346 notifyStateListeners(event); 347 } 348 349 /** 350 * A websocket connection has failed its upgrade handshake, and is now closed. 351 */ 352 void onFailedUpgrade() { 353 assert (this.state == WebSocketConnectionState.CONNECTING); 354 WebSocketConnectionState event = WebSocketConnectionState.Unknown; 355 synchronized (this) { 356 this.state = WebSocketConnectionState.CLOSED; 357 cleanClose = false; 358 inputAvailable = false; 359 outputAvailable = false; 360 event = this.state; 361 } 362 notifyStateListeners(event); 363 } 364 365 /** 366 * A websocket connection has finished its upgrade handshake, and is now open. 367 */ 368 void onOpened() { 369 version(HUNT_HTTP_DEBUG) tracef("state: %s", this.state); 370 371 WebSocketConnectionState event = WebSocketConnectionState.Unknown; 372 synchronized (this) { 373 if (this.state == WebSocketConnectionState.OPEN || 374 this.state == WebSocketConnectionState.CONNECTING) { 375 // already opened 376 return; 377 } 378 379 if (this.state != WebSocketConnectionState.CONNECTED) { 380 warningf("Unable to open, not in CONNECTED state: %s", this.state); 381 return; 382 } 383 384 this.state = WebSocketConnectionState.OPEN; 385 this.inputAvailable = true; 386 this.outputAvailable = true; 387 event = this.state; 388 } 389 notifyStateListeners(event); 390 } 391 392 /** 393 * The local endpoint has reached a read failure. 394 * <p> 395 * This could be a normal result after a proper close handshake, or even a premature close due to a connection disconnect. 396 * 397 * @param t the read failure 398 */ 399 void onReadFailure(Throwable t) { 400 WebSocketConnectionState event = WebSocketConnectionState.Unknown; 401 synchronized (this) { 402 if (this.state == WebSocketConnectionState.CLOSED) { 403 // already closed 404 return; 405 } 406 407 // Build out Close Reason 408 string reason = "WebSocket Read Failure"; 409 EOFException ee = cast(EOFException)t; 410 if (ee !is null) { 411 reason = "WebSocket Read EOF"; 412 Throwable cause = t.next(); 413 if ((cause !is null) && (!cause.message().empty())) { 414 reason = "EOF: " ~ cast(string)cause.message(); 415 } 416 } else { 417 if (!t.message().empty()) { 418 reason = cast(string)t.message(); 419 } 420 } 421 422 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL, reason); 423 finalClose = close; 424 // finalClose.compareAndSet(null, close); 425 426 this.cleanClose = false; 427 this.state = WebSocketConnectionState.CLOSED; 428 this.closeInfo = close; 429 this.inputAvailable = false; 430 this.outputAvailable = false; 431 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL; 432 event = this.state; 433 } 434 notifyStateListeners(event); 435 } 436 437 /** 438 * The local endpoint has reached a write failure. 439 * <p> 440 * A low level I/O failure, or even a hunt side EndPoint close (from idle timeout) are a few scenarios 441 * 442 * @param t the throwable that caused the write failure 443 */ 444 void onWriteFailure(Throwable t) { 445 WebSocketConnectionState event = WebSocketConnectionState.Unknown; 446 synchronized (this) { 447 if (this.state == WebSocketConnectionState.CLOSED) { 448 // already closed 449 return; 450 } 451 452 // Build out Close Reason 453 string reason = "WebSocket Write Failure"; 454 EOFException ee = cast(EOFException)t; 455 if (ee !is null) { 456 reason = "WebSocket Write EOF"; 457 Throwable cause = t.next(); 458 459 if ((cause !is null) && (!cause.message().empty())) { 460 reason = "EOF: " ~ cast(string)cause.message(); 461 } 462 } else { 463 if (!t.message().empty()) { 464 reason = cast(string)t.message(); 465 } 466 } 467 468 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL, reason); 469 finalClose = close; 470 // finalClose.compareAndSet(null, close); 471 472 this.cleanClose = false; 473 this.state = WebSocketConnectionState.CLOSED; 474 this.inputAvailable = false; 475 this.outputAvailable = false; 476 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL; 477 event = this.state; 478 } 479 notifyStateListeners(event); 480 } 481 482 void onDisconnected() { 483 WebSocketConnectionState event = WebSocketConnectionState.Unknown; 484 synchronized (this) { 485 if (this.state == WebSocketConnectionState.CLOSED) { 486 // already closed 487 return; 488 } 489 490 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL, "Disconnected"); 491 492 this.cleanClose = false; 493 this.state = WebSocketConnectionState.CLOSED; 494 this.closeInfo = close; 495 this.inputAvailable = false; 496 this.outputAvailable = false; 497 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL; 498 event = this.state; 499 } 500 notifyStateListeners(event); 501 } 502 503 override 504 string toString() { 505 StringBuilder str = new StringBuilder(); 506 str.append(typeid(this).name); 507 str.append("@").append(to!string(toHash(), 16)); 508 str.append("[").append(state); 509 str.append(','); 510 if (!inputAvailable) { 511 str.append('!'); 512 } 513 str.append("in,"); 514 if (!outputAvailable) { 515 str.append('!'); 516 } 517 str.append("out"); 518 if ((state == WebSocketConnectionState.CLOSED) || (state == WebSocketConnectionState.CLOSING)) { 519 CloseInfo ci = finalClose; 520 if (ci !is null) { 521 str.append(",finalClose=").append(ci.toString()); 522 } else { 523 str.append(",close=").append(closeInfo.toString()); 524 } 525 str.append(",clean=").append(cleanClose); 526 str.append(",closeSource=").append(closeHandshakeSource); 527 } 528 str.append(']'); 529 return str.toString(); 530 } 531 532 bool wasAbnormalClose() { 533 return closeHandshakeSource == CloseHandshakeSource.ABNORMAL; 534 } 535 536 bool wasCleanClose() { 537 return cleanClose; 538 } 539 540 bool wasLocalCloseInitiated() { 541 return closeHandshakeSource == CloseHandshakeSource.LOCAL; 542 } 543 544 bool wasRemoteCloseInitiated() { 545 return closeHandshakeSource == CloseHandshakeSource.REMOTE; 546 } 547 548 }