1 module hunt.http.codec.websocket.stream.WebSocketConnectionImpl; 2 3 import hunt.http.codec.websocket.decode.Parser; 4 import hunt.http.codec.websocket.encode; 5 import hunt.http.codec.websocket.frame; 6 import hunt.http.codec.websocket.model.CloseInfo; 7 import hunt.http.codec.websocket.model.Extension; 8 import hunt.http.codec.websocket.model.extension.AbstractExtension; 9 import hunt.http.codec.websocket.stream.ExtensionNegotiator; 10 import hunt.http.codec.websocket.stream.IOState; 11 12 import hunt.http.HttpConnection; 13 import hunt.http.HttpConnection; 14 import hunt.http.HttpConnection; 15 import hunt.http.HttpHeader; 16 import hunt.http.HttpMetaData; 17 import hunt.http.HttpRequest; 18 import hunt.http.HttpResponse; 19 import hunt.http.HttpOptions; 20 import hunt.http.HttpVersion; 21 import hunt.http.WebSocketCommon; 22 import hunt.http.WebSocketConnection; 23 import hunt.http.WebSocketPolicy; 24 25 import hunt.http.Util; 26 27 import hunt.collection; 28 import hunt.concurrency.FuturePromise; 29 import hunt.concurrency.Delayed; 30 import hunt.Exceptions; 31 import hunt.Functions; 32 import hunt.logging; 33 import hunt.net.AbstractConnection; 34 import hunt.net.Connection; 35 import hunt.util.Common; 36 import hunt.util.Runnable; 37 38 import core.time; 39 40 import std.random; 41 import std.socket; 42 import std.array; 43 44 /** 45 * 46 */ 47 class WebSocketConnectionImpl : AbstractHttpConnection, WebSocketConnection, IncomingFrames { 48 49 // protected ConnectionEvent!(WebSocketConnection) connectionEvent; 50 protected Parser parser; 51 protected Generator generator; 52 protected WebSocketPolicy policy; 53 protected HttpRequest upgradeRequest; 54 protected HttpResponse upgradeResponse; 55 protected IOState ioState; 56 protected HttpOptions config; 57 protected ExtensionNegotiator extensionNegotiator; 58 59 this(Connection tcpSession, IncomingFrames nextIncomingFrames, WebSocketPolicy policy, 60 HttpRequest upgradeRequest, HttpResponse upgradeResponse, 61 HttpOptions config) { 62 super(tcpSession, HttpVersion.HTTP_1_1); 63 64 extensionNegotiator = new ExtensionNegotiator(); 65 // connectionEvent = new ConnectionEvent!(WebSocketConnection)(this); 66 parser = new Parser(policy); 67 parser.setIncomingFramesHandler(this); 68 generator = new Generator(policy); 69 this.policy = policy; 70 this.upgradeRequest = upgradeRequest; 71 this.upgradeResponse = upgradeResponse; 72 this.config = config; 73 ioState = new IOState(); 74 // ioState.onOpened(); 75 76 //dfmt off 77 extensionNegotiator.setNextOutgoingFrames( 78 new class OutgoingFrames { 79 80 void outgoingFrame(WebSocketFrame frame, Callback callback) { 81 82 AbstractWebSocketFrame webSocketFrame = cast(AbstractWebSocketFrame) frame; 83 if (policy.getBehavior() == WebSocketBehavior.CLIENT && webSocketFrame !is null) { 84 if (!webSocketFrame.isMasked()) { 85 webSocketFrame.setMask(generateMask()); 86 } 87 } 88 ByteBuffer buf = BufferUtils.allocate(Generator.MAX_HEADER_LENGTH + frame.getPayloadLength()); 89 generator.generateWholeFrame(frame, buf); 90 BufferUtils.flipToFlush(buf, 0); 91 92 // error(buf.toString()); 93 94 // tcpSession.encode(new ByteBufferOutputEntry(callback, buf)); 95 try { 96 tcpSession.encode(buf); 97 callback.succeeded(); 98 } catch(Exception ex ){ 99 warning(ex); 100 callback.failed(ex); 101 } 102 103 if (frame.getType() == WebSocketFrameType.CLOSE) { 104 CloseFrame closeFrame = cast(CloseFrame) frame; 105 if(closeFrame !is null) { 106 CloseInfo closeInfo = new CloseInfo(closeFrame.getPayload(), false); 107 getIOState().onCloseLocal(closeInfo); 108 this.outer.close(); 109 } 110 } 111 } 112 } 113 ); 114 115 setNextIncomingFrames(nextIncomingFrames); 116 117 if (this.policy.getBehavior() == WebSocketBehavior.CLIENT) { 118 executor = CommonUtil.scheduler(); 119 executor.setRemoveOnCancelPolicy(true); 120 ScheduledFuture!(void) pingFuture = executor.scheduleWithFixedDelay(new class Runnable { 121 void run() { 122 PingFrame pingFrame = new PingFrame(); 123 124 outgoingFrame(pingFrame, new class NoopCallback { 125 override void succeeded() { 126 version(HUNT_HTTP_DEBUG) infof("The websocket connection %s sent ping frame success", getId()); 127 } 128 129 override void failed(Exception x) { 130 debug warningf("the websocket connection %s sends ping frame failure. %s", getId(), x.msg); 131 version(HUNT_HTTP_DEBUG) warning(x); 132 } 133 }); 134 } 135 }, 136 msecs(config.getWebsocketPingInterval()), 137 msecs(config.getWebsocketPingInterval())); 138 139 onClose( (c) { 140 version(HUNT_HTTP_DEBUG) 141 infof("Cancelling the ping task on connection %d with %s", this.getId(), this.getRemoteAddress()); 142 pingFuture.cancel(false); 143 }); 144 } 145 146 //dfmt on 147 } 148 149 override WebSocketConnection onClose(Action1!(HttpConnection) handler) { 150 super.onClose(handler); 151 return this; 152 } 153 154 override WebSocketConnection onException(Action2!(HttpConnection, Exception) handler) { 155 // return connectionEvent.onException(exceptionListener); 156 super.onException(handler); 157 return this; 158 } 159 160 override void notifyClose() { 161 version(HUNT_DEBUG) tracef("closing, state: %s", ioState.getConnectionState()); 162 ioState.onDisconnected(); 163 // connectionEvent.notifyClose(); 164 super.notifyClose(); 165 } 166 167 override void notifyException(Exception t) { 168 version(HUNT_DEBUG) warningf("exception, state: %s, error: %s", 169 ioState.getConnectionState(), t.msg); 170 ioState.onReadFailure(t); 171 // connectionEvent.notifyException(t); 172 super.notifyException(t); 173 } 174 175 bool isConnected() { 176 if(ioState !is null) { 177 WebSocketConnectionState state = ioState.getConnectionState(); 178 version(HUNT_HTTP_DEBUG) tracef("io state: %s", state) ; 179 return state == WebSocketConnectionState.CONNECTED || state == WebSocketConnectionState.OPEN; 180 } 181 return false; 182 } 183 184 override IOState getIOState() { 185 return ioState; 186 } 187 188 override WebSocketPolicy getPolicy() { 189 return policy; 190 } 191 192 void outgoingFrame(WebSocketFrame frame, Callback callback) { 193 extensionNegotiator.getOutgoingFrames().outgoingFrame(frame, callback); 194 } 195 196 void setNextIncomingFrames(IncomingFrames nextIncomingFrames) { 197 if (nextIncomingFrames !is null) { 198 extensionNegotiator.setNextIncomingFrames(nextIncomingFrames); 199 HttpMetaData metaData; 200 if (upgradeResponse.getFields().contains(HttpHeader.SEC_WEBSOCKET_EXTENSIONS)) { 201 metaData = upgradeResponse; 202 } else { 203 metaData = upgradeRequest; 204 } 205 Extension[] extensions = extensionNegotiator.parse(metaData); 206 if (!extensions.empty()) { 207 generator.configureFromExtensions(extensions); 208 parser.configureFromExtensions(extensions); 209 210 foreach (Extension e; extensions) { 211 AbstractExtension ae = cast(AbstractExtension) e; 212 if (ae is null) 213 continue; 214 ae.setPolicy(policy); 215 } 216 // auto r = extensions.filter!(e => instanceof!(AbstractExtension)(e)) 217 // .map!(e => cast(AbstractExtension)e); 218 // extensions.stream().filter(e -> e instanceof AbstractExtension) 219 // .map(e -> (AbstractExtension) e) 220 // .forEach(e -> e.setPolicy(policy)); 221 } 222 } 223 } 224 225 void incomingError(Exception t) { 226 // Optional.ofNullable(extensionNegotiator.getIncomingFrames()).ifPresent(e -> e.incomingError(t)); 227 IncomingFrames frames = extensionNegotiator.getIncomingFrames(); 228 if (frames !is null) 229 frames.incomingError(t); 230 } 231 232 override void incomingFrame(WebSocketFrame frame) { 233 switch (frame.getType()) { 234 case WebSocketFrameType.PING: { 235 PongFrame pongFrame = new PongFrame(); 236 outgoingFrame(pongFrame, Callback.NOOP); 237 } 238 break; 239 240 case WebSocketFrameType.CLOSE: { 241 CloseFrame closeFrame = cast(CloseFrame) frame; 242 CloseInfo closeInfo = new CloseInfo(closeFrame.getPayload(), false); 243 ioState.onCloseRemote(closeInfo); 244 this.close(); 245 } 246 break; 247 248 case WebSocketFrameType.PONG: { 249 infof("The websocket connection %s received pong frame", getId()); 250 } 251 break; 252 253 default: 254 break; 255 } 256 257 IncomingFrames e = extensionNegotiator.getIncomingFrames(); 258 if (e !is null) { 259 version(HUNT_HTTP_DEBUG_MORE) { 260 trace(BufferUtils.toDetailString(frame.getPayload())); 261 } 262 e.incomingFrame(frame); 263 } 264 } 265 266 // override bool isSecured() { 267 // return secureSession !is null; 268 // } 269 270 override HttpConnectionType getConnectionType() { 271 return HttpConnectionType.WEB_SOCKET; 272 } 273 274 override byte[] generateMask() { 275 byte[] mask = new byte[4]; 276 // ThreadLocalRandom.current().nextBytes(mask); 277 foreach (size_t i; 0 .. mask.length) 278 mask[i] = uniform(byte.min, byte.max); 279 return mask; 280 } 281 282 override FuturePromise!(bool) sendText(string text) { 283 TextFrame textFrame = new TextFrame(); 284 textFrame.setPayload(text); 285 FuturePromise!(bool) future = new FuturePromise!bool(); 286 //dfmt off 287 outgoingFrame(textFrame, 288 new class NoopCallback { 289 override void succeeded() { 290 future.succeeded(true); 291 } 292 293 override void failed(Exception x) { 294 future.failed(x); 295 } 296 }); 297 //dfmt on 298 return future; 299 } 300 301 override FuturePromise!(bool) sendData(byte[] data) { 302 return _sendData(data); 303 } 304 305 override FuturePromise!(bool) sendData(ByteBuffer data) { 306 return _sendData(data); 307 } 308 309 private FuturePromise!(bool) _sendData(T)(T data) { 310 BinaryFrame binaryFrame = new BinaryFrame(); 311 binaryFrame.setPayload(data); 312 FuturePromise!(bool) future = new FuturePromise!bool(); 313 //dfmt off 314 outgoingFrame(binaryFrame, 315 new class NoopCallback { 316 override void succeeded() { 317 future.succeeded(true); 318 } 319 320 override void failed(Exception x) { 321 future.failed(x); 322 } 323 }); 324 //dfmt on 325 return future; 326 } 327 328 override HttpRequest getUpgradeRequest() { 329 return upgradeRequest; 330 } 331 332 override HttpResponse getUpgradeResponse() { 333 return upgradeResponse; 334 } 335 336 ExtensionNegotiator getExtensionNegotiator() { 337 return extensionNegotiator; 338 } 339 340 Parser getParser() { 341 return parser; 342 } 343 344 Generator getGenerator() { 345 return generator; 346 } 347 348 // override Object getAttachment() { 349 // return super.getAttachment(); 350 // } 351 352 // override void setAttachment(Object attachment) { 353 // super.setAttachment(attachment); 354 // } 355 356 override int getId() { 357 return super.getId(); 358 } 359 360 override Connection getTcpConnection() { 361 return super.getTcpConnection(); 362 } 363 364 override Address getLocalAddress() { 365 return super.getLocalAddress(); 366 } 367 368 override Address getRemoteAddress() { 369 return super.getRemoteAddress(); 370 } 371 372 override HttpVersion getHttpVersion() { 373 return super.getHttpVersion(); 374 } 375 376 override void setAttribute(string key, Object value) { 377 super.setAttribute(key, value); 378 } 379 380 override Object getAttribute(string key) { 381 return super.getAttribute(key); 382 } 383 384 override Object removeAttribute(string key) { 385 return super.removeAttribute(key); 386 } 387 388 override bool containsAttribute(string key) { 389 return super.containsAttribute(key); 390 } 391 392 // version (HUNT_METRIC) { 393 // override long getOpenTime() { 394 // return super.getOpenTime(); 395 // } 396 397 // override long getCloseTime() { 398 // return super.getCloseTime(); 399 // } 400 401 // override long getDuration() { 402 // return super.getDuration(); 403 // } 404 405 // override long getLastReadTime() { 406 // return super.getLastReadTime(); 407 // } 408 409 // override long getLastWrittenTime() { 410 // return super.getLastWrittenTime(); 411 // } 412 413 // override long getLastActiveTime() { 414 // return super.getLastActiveTime(); 415 // } 416 417 // override long getReadBytes() { 418 // return super.getReadBytes(); 419 // } 420 421 // override long getWrittenBytes() { 422 // return super.getWrittenBytes(); 423 // } 424 425 // override long getIdleTimeout() { 426 // return super.getIdleTimeout(); 427 // } 428 // } 429 430 // override Duration getMaxIdleTimeout() { 431 // return super.getMaxIdleTimeout(); 432 // } 433 434 // override bool isConnected() { 435 // return super.isConnected(); 436 // } 437 438 // override bool isClosing() { 439 // return super.isClosing(); 440 // } 441 442 // override Address getLocalAddress() { 443 // return super.getLocalAddress(); 444 // } 445 446 // override Address getRemoteAddress() { 447 // return super.getRemoteAddress(); 448 // } 449 }