1 module hunt.http.codec.websocket.stream.WebSocketConnectionImpl;
2
3 import hunt.http.codec.websocket.decode.Parser;
4 import hunt.http.codec.websocket.encode;
5 import hunt.http.codec.websocket.frame;
6 import hunt.http.codec.websocket.model.CloseInfo;
7 import hunt.http.codec.websocket.model.Extension;
8 import hunt.http.codec.websocket.model.extension.AbstractExtension;
9 import hunt.http.codec.websocket.stream.ExtensionNegotiator;
10 import hunt.http.codec.websocket.stream.IOState;
11
12 import hunt.http.HttpConnection;
13 import hunt.http.HttpConnection;
14 import hunt.http.HttpConnection;
15 import hunt.http.HttpHeader;
16 import hunt.http.HttpMetaData;
17 import hunt.http.HttpRequest;
18 import hunt.http.HttpResponse;
19 import hunt.http.HttpOptions;
20 import hunt.http.HttpVersion;
21 import hunt.http.WebSocketCommon;
22 import hunt.http.WebSocketConnection;
23 import hunt.http.WebSocketPolicy;
24
25 import hunt.http.Util;
26
27 import hunt.collection;
28 import hunt.concurrency.FuturePromise;
29 import hunt.concurrency.Delayed;
30 import hunt.Exceptions;
31 import hunt.Functions;
32 import hunt.logging;
33 import hunt.net.AbstractConnection;
34 import hunt.net.Connection;
35 import hunt.util.Common;
36 import hunt.util.Runnable;
37
38 import core.time;
39
40 import std.random;
41 import std.socket;
42 import std.array;
43
44 /**
45 *
46 */
47 class WebSocketConnectionImpl : AbstractHttpConnection, WebSocketConnection, IncomingFrames {
48
49 // protected ConnectionEvent!(WebSocketConnection) connectionEvent;
50 protected Parser parser;
51 protected Generator generator;
52 protected WebSocketPolicy policy;
53 protected HttpRequest upgradeRequest;
54 protected HttpResponse upgradeResponse;
55 protected IOState ioState;
56 protected HttpOptions config;
57 protected ExtensionNegotiator extensionNegotiator;
58
59 this(Connection tcpSession, IncomingFrames nextIncomingFrames, WebSocketPolicy policy,
60 HttpRequest upgradeRequest, HttpResponse upgradeResponse,
61 HttpOptions config) {
62 super(tcpSession, HttpVersion.HTTP_1_1);
63
64 extensionNegotiator = new ExtensionNegotiator();
65 // connectionEvent = new ConnectionEvent!(WebSocketConnection)(this);
66 parser = new Parser(policy);
67 parser.setIncomingFramesHandler(this);
68 generator = new Generator(policy);
69 this.policy = policy;
70 this.upgradeRequest = upgradeRequest;
71 this.upgradeResponse = upgradeResponse;
72 this.config = config;
73 ioState = new IOState();
74 // ioState.onOpened();
75
76 //dfmt off
77 extensionNegotiator.setNextOutgoingFrames(
78 new class OutgoingFrames {
79
80 void outgoingFrame(WebSocketFrame frame, Callback callback) {
81
82 AbstractWebSocketFrame webSocketFrame = cast(AbstractWebSocketFrame) frame;
83 if (policy.getBehavior() == WebSocketBehavior.CLIENT && webSocketFrame !is null) {
84 if (!webSocketFrame.isMasked()) {
85 webSocketFrame.setMask(generateMask());
86 }
87 }
88 ByteBuffer buf = BufferUtils.allocate(Generator.MAX_HEADER_LENGTH + frame.getPayloadLength());
89 generator.generateWholeFrame(frame, buf);
90 BufferUtils.flipToFlush(buf, 0);
91
92 // error(buf.toString());
93
94 // tcpSession.encode(new ByteBufferOutputEntry(callback, buf));
95 try {
96 tcpSession.encode(buf);
97 callback.succeeded();
98 } catch(Exception ex ){
99 warning(ex);
100 callback.failed(ex);
101 }
102
103 if (frame.getType() == WebSocketFrameType.CLOSE) {
104 CloseFrame closeFrame = cast(CloseFrame) frame;
105 if(closeFrame !is null) {
106 CloseInfo closeInfo = new CloseInfo(closeFrame.getPayload(), false);
107 getIOState().onCloseLocal(closeInfo);
108 this.outer.close();
109 }
110 }
111 }
112 }
113 );
114
115 setNextIncomingFrames(nextIncomingFrames);
116
117 if (this.policy.getBehavior() == WebSocketBehavior.CLIENT) {
118 executor = CommonUtil.scheduler();
119 executor.setRemoveOnCancelPolicy(true);
120 ScheduledFuture!(void) pingFuture = executor.scheduleWithFixedDelay(new class Runnable {
121 void run() {
122 PingFrame pingFrame = new PingFrame();
123
124 outgoingFrame(pingFrame, new class NoopCallback {
125 override void succeeded() {
126 version(HUNT_HTTP_DEBUG) infof("The websocket connection %s sent ping frame success", getId());
127 }
128
129 override void failed(Exception x) {
130 debug warningf("the websocket connection %s sends ping frame failure. %s", getId(), x.msg);
131 version(HUNT_HTTP_DEBUG) warning(x);
132 }
133 });
134 }
135 },
136 msecs(config.getWebsocketPingInterval()),
137 msecs(config.getWebsocketPingInterval()));
138
139 onClose( (c) {
140 version(HUNT_HTTP_DEBUG)
141 infof("Cancelling the ping task on connection %d with %s", this.getId(), this.getRemoteAddress());
142 pingFuture.cancel(false);
143 });
144 }
145
146 //dfmt on
147 }
148
149 override WebSocketConnection onClose(Action1!(HttpConnection) handler) {
150 super.onClose(handler);
151 return this;
152 }
153
154 override WebSocketConnection onException(Action2!(HttpConnection, Exception) handler) {
155 // return connectionEvent.onException(exceptionListener);
156 super.onException(handler);
157 return this;
158 }
159
160 override void notifyClose() {
161 version(HUNT_DEBUG) tracef("closing, state: %s", ioState.getConnectionState());
162 ioState.onDisconnected();
163 // connectionEvent.notifyClose();
164 super.notifyClose();
165 }
166
167 override void notifyException(Exception t) {
168 version(HUNT_DEBUG) warningf("exception, state: %s, error: %s",
169 ioState.getConnectionState(), t.msg);
170 ioState.onReadFailure(t);
171 // connectionEvent.notifyException(t);
172 super.notifyException(t);
173 }
174
175 bool isConnected() {
176 if(ioState !is null) {
177 WebSocketConnectionState state = ioState.getConnectionState();
178 version(HUNT_HTTP_DEBUG) tracef("io state: %s", state) ;
179 return state == WebSocketConnectionState.CONNECTED || state == WebSocketConnectionState.OPEN;
180 }
181 return false;
182 }
183
184 override IOState getIOState() {
185 return ioState;
186 }
187
188 override WebSocketPolicy getPolicy() {
189 return policy;
190 }
191
192 void outgoingFrame(WebSocketFrame frame, Callback callback) {
193 extensionNegotiator.getOutgoingFrames().outgoingFrame(frame, callback);
194 }
195
196 void setNextIncomingFrames(IncomingFrames nextIncomingFrames) {
197 if (nextIncomingFrames !is null) {
198 extensionNegotiator.setNextIncomingFrames(nextIncomingFrames);
199 HttpMetaData metaData;
200 if (upgradeResponse.getFields().contains(HttpHeader.SEC_WEBSOCKET_EXTENSIONS)) {
201 metaData = upgradeResponse;
202 } else {
203 metaData = upgradeRequest;
204 }
205 Extension[] extensions = extensionNegotiator.parse(metaData);
206 if (!extensions.empty()) {
207 generator.configureFromExtensions(extensions);
208 parser.configureFromExtensions(extensions);
209
210 foreach (Extension e; extensions) {
211 AbstractExtension ae = cast(AbstractExtension) e;
212 if (ae is null)
213 continue;
214 ae.setPolicy(policy);
215 }
216 // auto r = extensions.filter!(e => instanceof!(AbstractExtension)(e))
217 // .map!(e => cast(AbstractExtension)e);
218 // extensions.stream().filter(e -> e instanceof AbstractExtension)
219 // .map(e -> (AbstractExtension) e)
220 // .forEach(e -> e.setPolicy(policy));
221 }
222 }
223 }
224
225 void incomingError(Exception t) {
226 // Optional.ofNullable(extensionNegotiator.getIncomingFrames()).ifPresent(e -> e.incomingError(t));
227 IncomingFrames frames = extensionNegotiator.getIncomingFrames();
228 if (frames !is null)
229 frames.incomingError(t);
230 }
231
232 override void incomingFrame(WebSocketFrame frame) {
233 switch (frame.getType()) {
234 case WebSocketFrameType.PING: {
235 PongFrame pongFrame = new PongFrame();
236 outgoingFrame(pongFrame, Callback.NOOP);
237 }
238 break;
239
240 case WebSocketFrameType.CLOSE: {
241 CloseFrame closeFrame = cast(CloseFrame) frame;
242 CloseInfo closeInfo = new CloseInfo(closeFrame.getPayload(), false);
243 ioState.onCloseRemote(closeInfo);
244 this.close();
245 }
246 break;
247
248 case WebSocketFrameType.PONG: {
249 infof("The websocket connection %s received pong frame", getId());
250 }
251 break;
252
253 default:
254 break;
255 }
256
257 IncomingFrames e = extensionNegotiator.getIncomingFrames();
258 if (e !is null) {
259 version(HUNT_HTTP_DEBUG_MORE) {
260 trace(BufferUtils.toDetailString(frame.getPayload()));
261 }
262 e.incomingFrame(frame);
263 }
264 }
265
266 // override bool isSecured() {
267 // return secureSession !is null;
268 // }
269
270 override HttpConnectionType getConnectionType() {
271 return HttpConnectionType.WEB_SOCKET;
272 }
273
274 override byte[] generateMask() {
275 byte[] mask = new byte[4];
276 // ThreadLocalRandom.current().nextBytes(mask);
277 foreach (size_t i; 0 .. mask.length)
278 mask[i] = uniform(byte.min, byte.max);
279 return mask;
280 }
281
282 override FuturePromise!(bool) sendText(string text) {
283 TextFrame textFrame = new TextFrame();
284 textFrame.setPayload(text);
285 FuturePromise!(bool) future = new FuturePromise!bool();
286 //dfmt off
287 outgoingFrame(textFrame,
288 new class NoopCallback {
289 override void succeeded() {
290 future.succeeded(true);
291 }
292
293 override void failed(Exception x) {
294 future.failed(x);
295 }
296 });
297 //dfmt on
298 return future;
299 }
300
301 override FuturePromise!(bool) sendData(byte[] data) {
302 return _sendData(data);
303 }
304
305 override FuturePromise!(bool) sendData(ByteBuffer data) {
306 return _sendData(data);
307 }
308
309 private FuturePromise!(bool) _sendData(T)(T data) {
310 BinaryFrame binaryFrame = new BinaryFrame();
311 binaryFrame.setPayload(data);
312 FuturePromise!(bool) future = new FuturePromise!bool();
313 //dfmt off
314 outgoingFrame(binaryFrame,
315 new class NoopCallback {
316 override void succeeded() {
317 future.succeeded(true);
318 }
319
320 override void failed(Exception x) {
321 future.failed(x);
322 }
323 });
324 //dfmt on
325 return future;
326 }
327
328 override HttpRequest getUpgradeRequest() {
329 return upgradeRequest;
330 }
331
332 override HttpResponse getUpgradeResponse() {
333 return upgradeResponse;
334 }
335
336 ExtensionNegotiator getExtensionNegotiator() {
337 return extensionNegotiator;
338 }
339
340 Parser getParser() {
341 return parser;
342 }
343
344 Generator getGenerator() {
345 return generator;
346 }
347
348 // override Object getAttachment() {
349 // return super.getAttachment();
350 // }
351
352 // override void setAttachment(Object attachment) {
353 // super.setAttachment(attachment);
354 // }
355
356 override int getId() {
357 return super.getId();
358 }
359
360 override Connection getTcpConnection() {
361 return super.getTcpConnection();
362 }
363
364 override Address getLocalAddress() {
365 return super.getLocalAddress();
366 }
367
368 override Address getRemoteAddress() {
369 return super.getRemoteAddress();
370 }
371
372 override HttpVersion getHttpVersion() {
373 return super.getHttpVersion();
374 }
375
376 override void setAttribute(string key, Object value) {
377 super.setAttribute(key, value);
378 }
379
380 override Object getAttribute(string key) {
381 return super.getAttribute(key);
382 }
383
384 override Object removeAttribute(string key) {
385 return super.removeAttribute(key);
386 }
387
388 override bool containsAttribute(string key) {
389 return super.containsAttribute(key);
390 }
391
392 // version (HUNT_METRIC) {
393 // override long getOpenTime() {
394 // return super.getOpenTime();
395 // }
396
397 // override long getCloseTime() {
398 // return super.getCloseTime();
399 // }
400
401 // override long getDuration() {
402 // return super.getDuration();
403 // }
404
405 // override long getLastReadTime() {
406 // return super.getLastReadTime();
407 // }
408
409 // override long getLastWrittenTime() {
410 // return super.getLastWrittenTime();
411 // }
412
413 // override long getLastActiveTime() {
414 // return super.getLastActiveTime();
415 // }
416
417 // override long getReadBytes() {
418 // return super.getReadBytes();
419 // }
420
421 // override long getWrittenBytes() {
422 // return super.getWrittenBytes();
423 // }
424
425 // override long getIdleTimeout() {
426 // return super.getIdleTimeout();
427 // }
428 // }
429
430 // override Duration getMaxIdleTimeout() {
431 // return super.getMaxIdleTimeout();
432 // }
433
434 // override bool isConnected() {
435 // return super.isConnected();
436 // }
437
438 // override bool isClosing() {
439 // return super.isClosing();
440 // }
441
442 // override Address getLocalAddress() {
443 // return super.getLocalAddress();
444 // }
445
446 // override Address getRemoteAddress() {
447 // return super.getRemoteAddress();
448 // }
449 }