1 module hunt.http.codec.websocket.stream.IOState; 2 3 import hunt.http.codec.websocket.model.CloseInfo; 4 import hunt.http.codec.websocket.model.common; 5 import hunt.http.codec.websocket.model.StatusCode; 6 7 import hunt.container; 8 import hunt.logging; 9 import hunt.lang.exception; 10 import hunt.string; 11 12 import std.array; 13 import std.conv; 14 15 /** 16 * Simple state tracker for Input / Output and {@link ConnectionState}. 17 * <p> 18 * Use the various known .on*() methods to trigger a state change. 19 * <ul> 20 * <li>{@link #onOpened()} - connection has been opened</li> 21 * </ul> 22 */ 23 class IOState { 24 /** 25 * The source of a close handshake. (ie: who initiated it). 26 */ 27 private enum CloseHandshakeSource { 28 /** 29 * No close handshake initiated (yet) 30 */ 31 NONE, 32 /** 33 * Local side initiated the close handshake 34 */ 35 LOCAL, 36 /** 37 * Remote side initiated the close handshake 38 */ 39 REMOTE, 40 /** 41 * An abnormal close situation (disconnect, timeout, etc...) 42 */ 43 ABNORMAL 44 } 45 46 interface ConnectionStateListener { 47 void onConnectionStateChange(ConnectionState state); 48 } 49 50 51 private ConnectionState state; 52 private List!(ConnectionStateListener) listeners; 53 54 /** 55 * Is input on websocket available (for reading frames). 56 * Used to determine close handshake completion, and track half-close states 57 */ 58 private bool inputAvailable; 59 /** 60 * Is output on websocket available (for writing frames). 61 * Used to determine close handshake completion, and track half-closed states. 62 */ 63 private bool outputAvailable; 64 /** 65 * Initiator of the close handshake. 66 * Used to determine who initiated a close handshake for reply reasons. 67 */ 68 private CloseHandshakeSource closeHandshakeSource; 69 /** 70 * The close info for the initiator of the close handshake. 71 * It is possible in abnormal close scenarios to have a different 72 * final close info that is used to notify the WS-Endpoint's onClose() 73 * events with. 74 */ 75 private CloseInfo closeInfo; 76 /** 77 * Atomic reference to the final close info. 78 * This can only be set once, and is used for the WS-Endpoint's onClose() 79 * event. 80 */ 81 private CloseInfo finalClose; 82 /** 83 * Tracker for if the close handshake was completed successfully by 84 * both sides. False if close was sudden or abnormal. 85 */ 86 private bool cleanClose; 87 88 /** 89 * Create a new IOState, initialized to {@link ConnectionState#CONNECTING} 90 */ 91 this() { 92 // finalClose = new AtomicReference!(CloseInfo)(); 93 // listeners = new CopyOnWriteArrayList!(ConnectionStateListener)(); 94 listeners = new ArrayList!(ConnectionStateListener)(); 95 96 this.state = ConnectionState.CONNECTING; 97 this.inputAvailable = false; 98 this.outputAvailable = false; 99 this.closeHandshakeSource = CloseHandshakeSource.NONE; 100 this.closeInfo = null; 101 this.cleanClose = false; 102 } 103 104 void addListener(ConnectionStateListener listener) { 105 listeners.add(listener); 106 } 107 108 void assertInputOpen() { 109 if (!isInputAvailable()) { 110 throw new IOException("Connection input is closed"); 111 } 112 } 113 114 void assertOutputOpen() { 115 if (!isOutputAvailable()) { 116 throw new IOException("Connection output is closed"); 117 } 118 } 119 120 CloseInfo getCloseInfo() { 121 CloseInfo ci = finalClose; 122 if (ci !is null) { 123 return ci; 124 } 125 return closeInfo; 126 } 127 128 ConnectionState getConnectionState() { 129 return state; 130 } 131 132 bool isClosed() { 133 synchronized (this) { 134 return (state == ConnectionState.CLOSED); 135 } 136 } 137 138 bool isInputAvailable() { 139 return inputAvailable; 140 } 141 142 bool isOpen() { 143 return !isClosed(); 144 } 145 146 bool isOutputAvailable() { 147 return outputAvailable; 148 } 149 150 private void notifyStateListeners(ConnectionState state) { 151 version(HUNT_DEBUG) 152 tracef("Notify State Listeners: %s", state); 153 foreach (ConnectionStateListener listener ; listeners) { 154 version(HUNT_DEBUG) { 155 tracef("%s.onConnectionStateChange(%s)", typeid(listener).name, to!string(state)); 156 } 157 listener.onConnectionStateChange(state); 158 } 159 } 160 161 /** 162 * A websocket connection has been disconnected for abnormal close reasons. 163 * <p> 164 * This is the low level disconnect of the socket. It could be the result of a normal close operation, from an IO error, or even from a timeout. 165 * 166 * @param close the close information 167 */ 168 void onAbnormalClose(CloseInfo close) { 169 version(HUNT_DEBUG) 170 tracef("onAbnormalClose(%s)", close); 171 ConnectionState event = ConnectionState.Unknown; 172 synchronized (this) { 173 if (this.state == ConnectionState.CLOSED) { 174 // already closed 175 return; 176 } 177 178 if (this.state == ConnectionState.OPEN) { 179 this.cleanClose = false; 180 } 181 182 this.state = ConnectionState.CLOSED; 183 finalClose = close; 184 // finalClose.compareAndSet(null, close); 185 this.inputAvailable = false; 186 this.outputAvailable = false; 187 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL; 188 event = this.state; 189 } 190 notifyStateListeners(event); 191 } 192 193 /** 194 * A close handshake has been issued from the local endpoint 195 * 196 * @param closeInfo the close information 197 */ 198 void onCloseLocal(CloseInfo closeInfo) { 199 bool open = false; 200 synchronized (this) { 201 ConnectionState initialState = this.state; 202 version(HUNT_DEBUG) 203 tracef("onCloseLocal(%s) : %s", closeInfo, initialState); 204 if (initialState == ConnectionState.CLOSED) { 205 // already closed 206 version(HUNT_DEBUG) 207 tracef("already closed"); 208 return; 209 } 210 211 if (initialState == ConnectionState.CONNECTED) { 212 // fast close. a local close request from end-user onConnect/onOpen method 213 version(HUNT_DEBUG) 214 tracef("FastClose in CONNECTED detected"); 215 open = true; 216 } 217 } 218 219 if (open) 220 openAndCloseLocal(closeInfo); 221 else 222 closeLocal(closeInfo); 223 } 224 225 private void openAndCloseLocal(CloseInfo closeInfo) { 226 // Force the state open (to allow read/write to endpoint) 227 onOpened(); 228 version(HUNT_DEBUG) 229 tracef("FastClose continuing with Closure"); 230 closeLocal(closeInfo); 231 } 232 233 private void closeLocal(CloseInfo closeInfo) { 234 ConnectionState event = ConnectionState.Unknown; 235 ConnectionState abnormalEvent = ConnectionState.Unknown; 236 synchronized (this) { 237 version(HUNT_DEBUG) 238 tracef("onCloseLocal(), input=%s, output=%s", inputAvailable, outputAvailable); 239 240 this.closeInfo = closeInfo; 241 242 // Turn off further output. 243 outputAvailable = false; 244 245 if (closeHandshakeSource == CloseHandshakeSource.NONE) { 246 closeHandshakeSource = CloseHandshakeSource.LOCAL; 247 } 248 249 if (!inputAvailable) { 250 version(HUNT_DEBUG) 251 tracef("Close Handshake satisfied, disconnecting"); 252 cleanClose = true; 253 this.state = ConnectionState.CLOSED; 254 // finalClose.compareAndSet(null, closeInfo); 255 finalClose = closeInfo; 256 event = this.state; 257 } else if (this.state == ConnectionState.OPEN) { 258 // We are now entering CLOSING (or half-closed). 259 this.state = ConnectionState.CLOSING; 260 event = this.state; 261 262 // If abnormal, we don't expect an answer. 263 if (closeInfo.isAbnormal()) { 264 abnormalEvent = ConnectionState.CLOSED; 265 // finalClose.compareAndSet(null, closeInfo); 266 finalClose = closeInfo; 267 cleanClose = false; 268 outputAvailable = false; 269 inputAvailable = false; 270 closeHandshakeSource = CloseHandshakeSource.ABNORMAL; 271 } 272 } 273 } 274 275 // Only notify on state change events 276 if (event != ConnectionState.Unknown) { 277 notifyStateListeners(event); 278 if (abnormalEvent != ConnectionState.Unknown) { 279 notifyStateListeners(abnormalEvent); 280 } 281 } 282 } 283 284 /** 285 * A close handshake has been received from the remote endpoint 286 * 287 * @param closeInfo the close information 288 */ 289 void onCloseRemote(CloseInfo closeInfo) { 290 version(HUNT_DEBUG) 291 tracef("onCloseRemote(%s)", closeInfo); 292 ConnectionState event = ConnectionState.Unknown; 293 synchronized (this) { 294 if (this.state == ConnectionState.CLOSED) { 295 // already closed 296 return; 297 } 298 299 version(HUNT_DEBUG) 300 tracef("onCloseRemote(), input=%s, output=%s", inputAvailable, outputAvailable); 301 302 this.closeInfo = closeInfo; 303 304 // turn off further input 305 inputAvailable = false; 306 307 if (closeHandshakeSource == CloseHandshakeSource.NONE) { 308 closeHandshakeSource = CloseHandshakeSource.REMOTE; 309 } 310 311 if (!outputAvailable) { 312 tracef("Close Handshake satisfied, disconnecting"); 313 cleanClose = true; 314 state = ConnectionState.CLOSED; 315 finalClose = closeInfo; 316 // finalClose.compareAndSet(null, closeInfo); 317 event = this.state; 318 } else if (this.state == ConnectionState.OPEN) { 319 // We are now entering CLOSING (or half-closed) 320 this.state = ConnectionState.CLOSING; 321 event = this.state; 322 } 323 } 324 325 // Only notify on state change events 326 if (event != ConnectionState.Unknown) { 327 notifyStateListeners(event); 328 } 329 } 330 331 /** 332 * WebSocket has successfully upgraded, but the end-user onOpen call hasn't run yet. 333 * <p> 334 * This is an intermediate state between the RFC's {@link ConnectionState#CONNECTING} and {@link ConnectionState#OPEN} 335 */ 336 void onConnected() { 337 ConnectionState event = ConnectionState.Unknown; 338 synchronized (this) { 339 if (this.state != ConnectionState.CONNECTING) { 340 tracef("Unable to set to connected, not in CONNECTING state: %s", this.state); 341 return; 342 } 343 344 this.state = ConnectionState.CONNECTED; 345 inputAvailable = false; // cannot read (yet) 346 outputAvailable = true; // write allowed 347 event = this.state; 348 } 349 notifyStateListeners(event); 350 } 351 352 /** 353 * A websocket connection has failed its upgrade handshake, and is now closed. 354 */ 355 void onFailedUpgrade() { 356 assert (this.state == ConnectionState.CONNECTING); 357 ConnectionState event = ConnectionState.Unknown; 358 synchronized (this) { 359 this.state = ConnectionState.CLOSED; 360 cleanClose = false; 361 inputAvailable = false; 362 outputAvailable = false; 363 event = this.state; 364 } 365 notifyStateListeners(event); 366 } 367 368 /** 369 * A websocket connection has finished its upgrade handshake, and is now open. 370 */ 371 void onOpened() { 372 version(HUNT_DEBUG) tracef("state: %s", this.state); 373 374 ConnectionState event = ConnectionState.Unknown; 375 synchronized (this) { 376 if (this.state == ConnectionState.OPEN || 377 this.state == ConnectionState.CONNECTING) { 378 // already opened 379 return; 380 } 381 382 if (this.state != ConnectionState.CONNECTED) { 383 warningf("Unable to open, not in CONNECTED state: %s", this.state); 384 return; 385 } 386 387 this.state = ConnectionState.OPEN; 388 this.inputAvailable = true; 389 this.outputAvailable = true; 390 event = this.state; 391 } 392 notifyStateListeners(event); 393 } 394 395 /** 396 * The local endpoint has reached a read failure. 397 * <p> 398 * This could be a normal result after a proper close handshake, or even a premature close due to a connection disconnect. 399 * 400 * @param t the read failure 401 */ 402 void onReadFailure(Throwable t) { 403 ConnectionState event = ConnectionState.Unknown; 404 synchronized (this) { 405 if (this.state == ConnectionState.CLOSED) { 406 // already closed 407 return; 408 } 409 410 // Build out Close Reason 411 string reason = "WebSocket Read Failure"; 412 EOFException ee = cast(EOFException)t; 413 if (ee !is null) { 414 reason = "WebSocket Read EOF"; 415 Throwable cause = t.next(); 416 if ((cause !is null) && (!cause.message().empty())) { 417 reason = "EOF: " ~ cast(string)cause.message(); 418 } 419 } else { 420 if (!t.message().empty()) { 421 reason = cast(string)t.message(); 422 } 423 } 424 425 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL, reason); 426 finalClose = close; 427 // finalClose.compareAndSet(null, close); 428 429 this.cleanClose = false; 430 this.state = ConnectionState.CLOSED; 431 this.closeInfo = close; 432 this.inputAvailable = false; 433 this.outputAvailable = false; 434 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL; 435 event = this.state; 436 } 437 notifyStateListeners(event); 438 } 439 440 /** 441 * The local endpoint has reached a write failure. 442 * <p> 443 * A low level I/O failure, or even a hunt side EndPoint close (from idle timeout) are a few scenarios 444 * 445 * @param t the throwable that caused the write failure 446 */ 447 void onWriteFailure(Throwable t) { 448 ConnectionState event = ConnectionState.Unknown; 449 synchronized (this) { 450 if (this.state == ConnectionState.CLOSED) { 451 // already closed 452 return; 453 } 454 455 // Build out Close Reason 456 string reason = "WebSocket Write Failure"; 457 EOFException ee = cast(EOFException)t; 458 if (ee !is null) { 459 reason = "WebSocket Write EOF"; 460 Throwable cause = t.next(); 461 462 if ((cause !is null) && (!cause.message().empty())) { 463 reason = "EOF: " ~ cast(string)cause.message(); 464 } 465 } else { 466 if (!t.message().empty()) { 467 reason = cast(string)t.message(); 468 } 469 } 470 471 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL, reason); 472 finalClose = close; 473 // finalClose.compareAndSet(null, close); 474 475 this.cleanClose = false; 476 this.state = ConnectionState.CLOSED; 477 this.inputAvailable = false; 478 this.outputAvailable = false; 479 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL; 480 event = this.state; 481 } 482 notifyStateListeners(event); 483 } 484 485 void onDisconnected() { 486 ConnectionState event = ConnectionState.Unknown; 487 synchronized (this) { 488 if (this.state == ConnectionState.CLOSED) { 489 // already closed 490 return; 491 } 492 493 CloseInfo close = new CloseInfo(StatusCode.ABNORMAL, "Disconnected"); 494 495 this.cleanClose = false; 496 this.state = ConnectionState.CLOSED; 497 this.closeInfo = close; 498 this.inputAvailable = false; 499 this.outputAvailable = false; 500 this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL; 501 event = this.state; 502 } 503 notifyStateListeners(event); 504 } 505 506 override 507 string toString() { 508 StringBuilder str = new StringBuilder(); 509 str.append(typeid(this).name); 510 str.append("@").append(to!string(toHash(), 16)); 511 str.append("[").append(state); 512 str.append(','); 513 if (!inputAvailable) { 514 str.append('!'); 515 } 516 str.append("in,"); 517 if (!outputAvailable) { 518 str.append('!'); 519 } 520 str.append("out"); 521 if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING)) { 522 CloseInfo ci = finalClose; 523 if (ci !is null) { 524 str.append(",finalClose=").append(ci.toString()); 525 } else { 526 str.append(",close=").append(closeInfo.toString()); 527 } 528 str.append(",clean=").append(cleanClose); 529 str.append(",closeSource=").append(closeHandshakeSource); 530 } 531 str.append(']'); 532 return str.toString(); 533 } 534 535 bool wasAbnormalClose() { 536 return closeHandshakeSource == CloseHandshakeSource.ABNORMAL; 537 } 538 539 bool wasCleanClose() { 540 return cleanClose; 541 } 542 543 bool wasLocalCloseInitiated() { 544 return closeHandshakeSource == CloseHandshakeSource.LOCAL; 545 } 546 547 bool wasRemoteCloseInitiated() { 548 return closeHandshakeSource == CloseHandshakeSource.REMOTE; 549 } 550 551 }