1 module hunt.http.codec.websocket.stream.WebSocketConnectionImpl; 2 3 import hunt.http.codec.http.model.HttpHeader; 4 import hunt.http.codec.http.model.MetaData; 5 import hunt.http.codec.http.stream.Http2Configuration; 6 import hunt.http.codec.websocket.decode.Parser; 7 import hunt.http.codec.websocket.encode; 8 import hunt.http.codec.websocket.frame; 9 import hunt.http.codec.websocket.model.CloseInfo; 10 import hunt.http.codec.websocket.model.common; 11 import hunt.http.codec.websocket.model.Extension; 12 import hunt.http.codec.websocket.model.extension.AbstractExtension; 13 import hunt.http.codec.websocket.model.IncomingFrames; 14 import hunt.http.codec.websocket.model.OutgoingFrames; 15 import hunt.http.codec.websocket.stream.ExtensionNegotiator; 16 import hunt.http.codec.websocket.stream.IOState; 17 import hunt.http.codec.websocket.stream.WebSocketConnection; 18 import hunt.http.codec.websocket.stream.WebSocketPolicy; 19 20 import hunt.net.AbstractConnection; 21 import hunt.net.OutputEntry; 22 23 ; 24 import hunt.net.ConnectionEvent; 25 import hunt.net.ConnectionType; 26 import hunt.net.secure.SecureSession; 27 import hunt.net.Session; 28 29 import hunt.container; 30 import hunt.lang.common; 31 import hunt.logging; 32 import hunt.util.concurrent.CompletableFuture; 33 import hunt.lang.exception; 34 import hunt.util.functional; 35 36 import std.random; 37 import std.socket; 38 import std.array; 39 40 /** 41 * 42 */ 43 class WebSocketConnectionImpl : AbstractConnection, WebSocketConnection, IncomingFrames { 44 45 protected ConnectionEvent!(WebSocketConnection) connectionEvent; 46 protected Parser parser; 47 protected Generator generator; 48 protected WebSocketPolicy policy; 49 protected HttpRequest upgradeRequest; 50 protected HttpResponse upgradeResponse; 51 protected IOState ioState; 52 protected Http2Configuration config; 53 protected ExtensionNegotiator extensionNegotiator; 54 55 this(SecureSession secureSession, Session tcpSession, IncomingFrames nextIncomingFrames, WebSocketPolicy policy, 56 HttpRequest upgradeRequest, HttpResponse upgradeResponse, 57 Http2Configuration config) { 58 super(secureSession, tcpSession); 59 60 extensionNegotiator = new ExtensionNegotiator(); 61 connectionEvent = new ConnectionEvent!(WebSocketConnection)(this); 62 parser = new Parser(policy); 63 parser.setIncomingFramesHandler(this); 64 generator = new Generator(policy); 65 this.policy = policy; 66 this.upgradeRequest = upgradeRequest; 67 this.upgradeResponse = upgradeResponse; 68 this.config = config; 69 ioState = new IOState(); 70 ioState.onOpened(); 71 72 //dfmt off 73 extensionNegotiator.setNextOutgoingFrames( 74 new class OutgoingFrames { 75 void outgoingFrame(Frame frame, Callback callback) { 76 WebSocketFrame webSocketFrame = cast(WebSocketFrame) frame; 77 if (policy.getBehavior() == WebSocketBehavior.CLIENT && webSocketFrame !is null) { 78 if (!webSocketFrame.isMasked()) { 79 webSocketFrame.setMask(generateMask()); 80 } 81 } 82 ByteBuffer buf = ByteBuffer.allocate(Generator.MAX_HEADER_LENGTH + frame.getPayloadLength()); 83 generator.generateWholeFrame(frame, buf); 84 BufferUtils.flipToFlush(buf, 0); 85 tcpSession.encode(new ByteBufferOutputEntry(callback, buf)); 86 if (frame.getType() == Frame.Type.CLOSE) { 87 CloseFrame closeFrame = cast(CloseFrame) frame; 88 if(closeFrame !is null) { 89 CloseInfo closeInfo = new CloseInfo(closeFrame.getPayload(), false); 90 getIOState().onCloseLocal(closeInfo); 91 this.outer.close(); 92 } 93 } 94 } 95 }); 96 //dfmt on 97 setNextIncomingFrames(nextIncomingFrames); 98 99 if (this.policy.getBehavior() == WebSocketBehavior.CLIENT) { 100 101 implementationMissing(false); 102 103 // Scheduler.Future pingFuture = scheduler.scheduleAtFixedRate(() { 104 // PingFrame pingFrame = new PingFrame(); 105 // outgoingFrame(pingFrame, new class NoopCallback { 106 // void succeeded() { 107 // info("The websocket connection %s sent ping frame success", getSessionId()); 108 // } 109 110 // void failed(Exception x) { 111 // log.warn("the websocket connection %s sends ping frame failure. %s", getSessionId(), x.getMessage()); 112 // } 113 // }); 114 // }, config.getWebsocketPingInterval(), config.getWebsocketPingInterval(), TimeUnit.MILLISECONDS); 115 // onClose(c -> pingFuture.cancel()); 116 } 117 } 118 119 override WebSocketConnection onClose(Action1!(WebSocketConnection) closedListener) { 120 return connectionEvent.onClose(closedListener); 121 } 122 123 override WebSocketConnection onException(Action2!(WebSocketConnection, 124 Exception) exceptionListener) { 125 return connectionEvent.onException(exceptionListener); 126 } 127 128 void notifyClose() { 129 connectionEvent.notifyClose(); 130 } 131 132 void notifyException(Exception t) { 133 connectionEvent.notifyException(t); 134 } 135 136 override IOState getIOState() { 137 return ioState; 138 } 139 140 override WebSocketPolicy getPolicy() { 141 return policy; 142 } 143 144 void outgoingFrame(Frame frame, Callback callback) { 145 extensionNegotiator.getOutgoingFrames().outgoingFrame(frame, callback); 146 } 147 148 void setNextIncomingFrames(IncomingFrames nextIncomingFrames) { 149 if (nextIncomingFrames !is null) { 150 extensionNegotiator.setNextIncomingFrames(nextIncomingFrames); 151 MetaData metaData; 152 if (upgradeResponse.getFields().contains(HttpHeader.SEC_WEBSOCKET_EXTENSIONS)) { 153 metaData = upgradeResponse; 154 } else { 155 metaData = upgradeRequest; 156 } 157 Extension[] extensions = extensionNegotiator.parse(metaData); 158 if (!extensions.empty()) { 159 generator.configureFromExtensions(extensions); 160 parser.configureFromExtensions(extensions); 161 162 foreach (Extension e; extensions) { 163 AbstractExtension ae = cast(AbstractExtension) e; 164 if (ae is null) 165 continue; 166 ae.setPolicy(policy); 167 } 168 // auto r = extensions.filter!(e => instanceof!(AbstractExtension)(e)) 169 // .map!(e => cast(AbstractExtension)e); 170 // extensions.stream().filter(e -> e instanceof AbstractExtension) 171 // .map(e -> (AbstractExtension) e) 172 // .forEach(e -> e.setPolicy(policy)); 173 } 174 } 175 } 176 177 void incomingError(Exception t) { 178 // Optional.ofNullable(extensionNegotiator.getIncomingFrames()).ifPresent(e -> e.incomingError(t)); 179 IncomingFrames frames = extensionNegotiator.getIncomingFrames(); 180 if (frames !is null) 181 frames.incomingError(t); 182 } 183 184 override void incomingFrame(Frame frame) { 185 switch (frame.getType()) { 186 case FrameType.PING: { 187 PongFrame pongFrame = new PongFrame(); 188 outgoingFrame(pongFrame, Callback.NOOP); 189 } 190 break; 191 192 case FrameType.CLOSE: { 193 CloseFrame closeFrame = cast(CloseFrame) frame; 194 CloseInfo closeInfo = new CloseInfo(closeFrame.getPayload(), false); 195 ioState.onCloseRemote(closeInfo); 196 this.close(); 197 } 198 break; 199 200 case FrameType.PONG: { 201 info("The websocket connection %s received pong frame", getSessionId()); 202 } 203 break; 204 205 default: 206 break; 207 } 208 209 IncomingFrames e = extensionNegotiator.getIncomingFrames(); 210 if (e !is null) 211 e.incomingFrame(frame); 212 } 213 214 override bool isEncrypted() { 215 return secureSession !is null; 216 } 217 218 override ConnectionType getConnectionType() { 219 return ConnectionType.WEB_SOCKET; 220 } 221 222 override byte[] generateMask() { 223 byte[] mask = new byte[4]; 224 // ThreadLocalRandom.current().nextBytes(mask); 225 foreach (size_t i; 0 .. mask.length) 226 mask[i] = uniform(byte.min, byte.max); 227 return mask; 228 } 229 230 override CompletableFuture!(bool) sendText(string text) { 231 TextFrame textFrame = new TextFrame(); 232 textFrame.setPayload(text); 233 CompletableFuture!(bool) future = new CompletableFuture!bool(); 234 //dfmt off 235 outgoingFrame(textFrame, 236 new class NoopCallback { 237 override void succeeded() { 238 future.complete(true); 239 } 240 241 override void failed(Exception x) { 242 future.completeExceptionally(x); 243 } 244 }); 245 //dfmt on 246 return future; 247 } 248 249 override CompletableFuture!(bool) sendData(byte[] data) { 250 return _sendData(data); 251 } 252 253 override CompletableFuture!(bool) sendData(ByteBuffer data) { 254 return _sendData(data); 255 } 256 257 private CompletableFuture!(bool) _sendData(T)(T data) { 258 BinaryFrame binaryFrame = new BinaryFrame(); 259 binaryFrame.setPayload(data); 260 CompletableFuture!(bool) future = new CompletableFuture!bool(); 261 //dfmt off 262 outgoingFrame(binaryFrame, 263 new class NoopCallback { 264 override void succeeded() { 265 future.complete(true); 266 } 267 268 override void failed(Exception x) { 269 future.completeExceptionally(x); 270 } 271 }); 272 //dfmt on 273 return future; 274 } 275 276 override HttpRequest getUpgradeRequest() { 277 return upgradeRequest; 278 } 279 280 override HttpResponse getUpgradeResponse() { 281 return upgradeResponse; 282 } 283 284 ExtensionNegotiator getExtensionNegotiator() { 285 return extensionNegotiator; 286 } 287 288 Parser getParser() { 289 return parser; 290 } 291 292 Generator getGenerator() { 293 return generator; 294 } 295 296 override Object getAttachment() { 297 return super.getAttachment(); 298 } 299 300 override void setAttachment(Object attachment) { 301 super.setAttachment(attachment); 302 } 303 304 override int getSessionId() { 305 return super.getSessionId(); 306 } 307 308 version (HUNT_METRIC) { 309 override long getOpenTime() { 310 return super.getOpenTime(); 311 } 312 313 override long getCloseTime() { 314 return super.getCloseTime(); 315 } 316 317 override long getDuration() { 318 return super.getDuration(); 319 } 320 321 override long getLastReadTime() { 322 return super.getLastReadTime(); 323 } 324 325 override long getLastWrittenTime() { 326 return super.getLastWrittenTime(); 327 } 328 329 override long getLastActiveTime() { 330 return super.getLastActiveTime(); 331 } 332 333 override long getReadBytes() { 334 return super.getReadBytes(); 335 } 336 337 override long getWrittenBytes() { 338 return super.getWrittenBytes(); 339 } 340 341 override long getIdleTimeout() { 342 return super.getIdleTimeout(); 343 } 344 } 345 346 override long getMaxIdleTimeout() { 347 return super.getMaxIdleTimeout(); 348 } 349 350 override bool isOpen() { 351 return super.isOpen(); 352 } 353 354 override bool isClosed() { 355 return super.isClosed(); 356 } 357 358 override Address getLocalAddress() { 359 return super.getLocalAddress(); 360 } 361 362 override Address getRemoteAddress() { 363 return super.getRemoteAddress(); 364 } 365 }