1 module hunt.http.codec.websocket.stream.WebSocketConnectionImpl;
2 
3 import hunt.http.codec.websocket.decode.Parser;
4 import hunt.http.codec.websocket.encode;
5 import hunt.http.codec.websocket.frame;
6 import hunt.http.codec.websocket.model.CloseInfo;
7 import hunt.http.codec.websocket.model.Extension;
8 import hunt.http.codec.websocket.model.extension.AbstractExtension;
9 import hunt.http.codec.websocket.stream.ExtensionNegotiator;
10 import hunt.http.codec.websocket.stream.IOState;
11 
12 import hunt.http.HttpConnection;
13 import hunt.http.HttpConnection;
14 import hunt.http.HttpConnection;
15 import hunt.http.HttpHeader;
16 import hunt.http.HttpMetaData;
17 import hunt.http.HttpRequest;
18 import hunt.http.HttpResponse;
19 import hunt.http.HttpOptions;
20 import hunt.http.HttpVersion;
21 import hunt.http.WebSocketCommon;
22 import hunt.http.WebSocketConnection;
23 import hunt.http.WebSocketPolicy;
24 
25 import hunt.http.Util;
26 
27 import hunt.collection;
28 import hunt.concurrency.FuturePromise;
29 import hunt.concurrency.Delayed;
30 import hunt.Exceptions;
31 import hunt.Functions;
32 import hunt.logging;
33 import hunt.net.AbstractConnection;
34 import hunt.net.Connection;
35 import hunt.util.Common;
36 import hunt.util.Runnable;
37 
38 import core.time;
39 
40 import std.random;
41 import std.socket;
42 import std.array;
43 
44 /**
45  * 
46  */
47 class WebSocketConnectionImpl : AbstractHttpConnection, WebSocketConnection, IncomingFrames {
48 
49     // protected ConnectionEvent!(WebSocketConnection) connectionEvent;
50     protected Parser parser;
51     protected Generator generator;
52     protected WebSocketPolicy policy;
53     protected HttpRequest upgradeRequest;
54     protected HttpResponse upgradeResponse;
55     protected IOState ioState;
56     protected HttpOptions config;
57     protected ExtensionNegotiator extensionNegotiator;
58 
59     this(Connection tcpSession, IncomingFrames nextIncomingFrames, WebSocketPolicy policy,
60             HttpRequest upgradeRequest, HttpResponse upgradeResponse,
61             HttpOptions config) {
62         super(tcpSession, HttpVersion.HTTP_1_1);
63 
64         extensionNegotiator = new ExtensionNegotiator();
65         // connectionEvent = new ConnectionEvent!(WebSocketConnection)(this);
66         parser = new Parser(policy);
67         parser.setIncomingFramesHandler(this);
68         generator = new Generator(policy);
69         this.policy = policy;
70         this.upgradeRequest = upgradeRequest;
71         this.upgradeResponse = upgradeResponse;
72         this.config = config;
73         ioState = new IOState();
74         // ioState.onOpened();
75 
76         //dfmt off
77         extensionNegotiator.setNextOutgoingFrames(
78             new class OutgoingFrames { 
79 
80                 void outgoingFrame(WebSocketFrame frame, Callback callback) {
81 
82                     AbstractWebSocketFrame webSocketFrame = cast(AbstractWebSocketFrame) frame;
83                     if (policy.getBehavior() == WebSocketBehavior.CLIENT && webSocketFrame !is null) {
84                         if (!webSocketFrame.isMasked()) {
85                             webSocketFrame.setMask(generateMask());
86                         }
87                     }
88                     ByteBuffer buf = BufferUtils.allocate(Generator.MAX_HEADER_LENGTH + frame.getPayloadLength());
89                     generator.generateWholeFrame(frame, buf);
90                     BufferUtils.flipToFlush(buf, 0);
91 
92                     // error(buf.toString());
93                     
94                     // tcpSession.encode(new ByteBufferOutputEntry(callback, buf));
95                     try {
96                         tcpSession.encode(buf);
97                         callback.succeeded();
98                     } catch(Exception ex ){
99                         warning(ex);
100                         callback.failed(ex);
101                     }
102 
103                     if (frame.getType() == WebSocketFrameType.CLOSE) {
104                         CloseFrame closeFrame = cast(CloseFrame) frame;
105                         if(closeFrame !is null) {
106                             CloseInfo closeInfo = new CloseInfo(closeFrame.getPayload(), false);
107                             getIOState().onCloseLocal(closeInfo);
108                             this.outer.close();
109                         }
110                     }
111                 }
112             }
113         );
114 
115         setNextIncomingFrames(nextIncomingFrames);
116 
117         if (this.policy.getBehavior() == WebSocketBehavior.CLIENT) {
118             executor = CommonUtil.scheduler();
119             executor.setRemoveOnCancelPolicy(true);
120             ScheduledFuture!(void) pingFuture = executor.scheduleWithFixedDelay(new class Runnable {
121                     void run() {
122                         PingFrame pingFrame = new PingFrame();
123 
124                         outgoingFrame(pingFrame, new class NoopCallback {
125                             override void succeeded() {
126                                 version(HUNT_HTTP_DEBUG) infof("The websocket connection %s sent ping frame success", getId());
127                             }
128 
129                             override void failed(Exception x) {
130                                 debug warningf("the websocket connection %s sends ping frame failure. %s", getId(), x.msg);
131                                 version(HUNT_HTTP_DEBUG)  warning(x);
132                             }
133                         });
134                     }
135                 }, 
136                 msecs(config.getWebsocketPingInterval()), 
137                 msecs(config.getWebsocketPingInterval()));
138 
139             onClose( (c) {
140                 version(HUNT_HTTP_DEBUG) 
141                 infof("Cancelling the ping task on connection %d with %s", this.getId(), this.getRemoteAddress());
142                 pingFuture.cancel(false); 
143             });
144         }        
145 
146 //dfmt on        
147     }
148 
149     override WebSocketConnection onClose(Action1!(HttpConnection) handler) {
150         super.onClose(handler);
151         return this;
152     }
153 
154     override WebSocketConnection onException(Action2!(HttpConnection, Exception) handler) {
155         // return connectionEvent.onException(exceptionListener);
156         super.onException(handler);
157         return this;
158     }
159 
160     override void notifyClose() {
161         version(HUNT_DEBUG) tracef("closing, state: %s", ioState.getConnectionState());
162         ioState.onDisconnected();
163         // connectionEvent.notifyClose();
164         super.notifyClose();
165     }
166 
167     override void notifyException(Exception t) {
168         version(HUNT_DEBUG) warningf("exception, state: %s, error: %s", 
169             ioState.getConnectionState(), t.msg);
170         ioState.onReadFailure(t);
171         // connectionEvent.notifyException(t);
172         super.notifyException(t);
173     }
174 
175     bool isConnected() {
176         if(ioState !is null) {
177             WebSocketConnectionState state = ioState.getConnectionState();
178             version(HUNT_HTTP_DEBUG) tracef("io state: %s", state) ;
179             return state == WebSocketConnectionState.CONNECTED || state == WebSocketConnectionState.OPEN;
180         }
181         return false;
182     }
183 
184     override IOState getIOState() {
185         return ioState;
186     }
187 
188     override WebSocketPolicy getPolicy() {
189         return policy;
190     }
191 
192     void outgoingFrame(WebSocketFrame frame, Callback callback) {
193         extensionNegotiator.getOutgoingFrames().outgoingFrame(frame, callback);
194     }
195 
196     void setNextIncomingFrames(IncomingFrames nextIncomingFrames) {
197         if (nextIncomingFrames !is null) {
198             extensionNegotiator.setNextIncomingFrames(nextIncomingFrames);
199             HttpMetaData metaData;
200             if (upgradeResponse.getFields().contains(HttpHeader.SEC_WEBSOCKET_EXTENSIONS)) {
201                 metaData = upgradeResponse;
202             } else {
203                 metaData = upgradeRequest;
204             }
205             Extension[] extensions = extensionNegotiator.parse(metaData);
206             if (!extensions.empty()) {
207                 generator.configureFromExtensions(extensions);
208                 parser.configureFromExtensions(extensions);
209 
210                 foreach (Extension e; extensions) {
211                     AbstractExtension ae = cast(AbstractExtension) e;
212                     if (ae is null)
213                         continue;
214                     ae.setPolicy(policy);
215                 }
216                 // auto r = extensions.filter!(e => instanceof!(AbstractExtension)(e))
217                 //     .map!(e => cast(AbstractExtension)e);
218                 // extensions.stream().filter(e -> e instanceof AbstractExtension)
219                 //           .map(e -> (AbstractExtension) e)
220                 //           .forEach(e -> e.setPolicy(policy));
221             }
222         }
223     }
224 
225     void incomingError(Exception t) {
226         // Optional.ofNullable(extensionNegotiator.getIncomingFrames()).ifPresent(e -> e.incomingError(t));
227         IncomingFrames frames = extensionNegotiator.getIncomingFrames();
228         if (frames !is null)
229             frames.incomingError(t);
230     }
231 
232     override void incomingFrame(WebSocketFrame frame) {
233         switch (frame.getType()) {
234         case WebSocketFrameType.PING: {
235                 PongFrame pongFrame = new PongFrame();
236                 outgoingFrame(pongFrame, Callback.NOOP);
237             }
238             break;
239 
240         case WebSocketFrameType.CLOSE: {
241                 CloseFrame closeFrame = cast(CloseFrame) frame;
242                 CloseInfo closeInfo = new CloseInfo(closeFrame.getPayload(), false);
243                 ioState.onCloseRemote(closeInfo);
244                 this.close();
245             }
246             break;
247 
248         case WebSocketFrameType.PONG: {
249                 infof("The websocket connection %s received pong frame", getId());
250             }
251             break;
252 
253         default:
254             break;
255         }
256 
257         IncomingFrames e = extensionNegotiator.getIncomingFrames();
258         if (e !is null) {
259             version(HUNT_HTTP_DEBUG_MORE) {
260                 trace(BufferUtils.toDetailString(frame.getPayload()));
261             }
262             e.incomingFrame(frame);
263         }
264     }
265 
266     // override bool isSecured() {
267     //     return secureSession !is null;
268     // }
269 
270     override HttpConnectionType getConnectionType() {
271         return HttpConnectionType.WEB_SOCKET;
272     }
273 
274     override byte[] generateMask() {
275         byte[] mask = new byte[4];
276         // ThreadLocalRandom.current().nextBytes(mask);
277         foreach (size_t i; 0 .. mask.length)
278             mask[i] = uniform(byte.min, byte.max);
279         return mask;
280     }
281 
282     override FuturePromise!(bool) sendText(string text) {
283         TextFrame textFrame = new TextFrame();
284         textFrame.setPayload(text);
285         FuturePromise!(bool) future = new FuturePromise!bool();
286         //dfmt off        
287         outgoingFrame(textFrame, 
288             new class NoopCallback {
289             override void succeeded() {
290                 future.succeeded(true);
291             }
292 
293             override void failed(Exception x) {
294                 future.failed(x);
295             }
296         });
297 //dfmt on        
298         return future;
299     }
300 
301     override FuturePromise!(bool) sendData(byte[] data) {
302         return _sendData(data);
303     }
304 
305     override FuturePromise!(bool) sendData(ByteBuffer data) {
306         return _sendData(data);
307     }
308 
309     private FuturePromise!(bool) _sendData(T)(T data) {
310         BinaryFrame binaryFrame = new BinaryFrame();
311         binaryFrame.setPayload(data);
312         FuturePromise!(bool) future = new FuturePromise!bool();
313         //dfmt off        
314         outgoingFrame(binaryFrame, 
315             new class NoopCallback {
316             override void succeeded() {
317                 future.succeeded(true);
318             }
319 
320             override void failed(Exception x) {
321                 future.failed(x);
322             }
323         });
324 //dfmt on          
325         return future;
326     }
327 
328     override HttpRequest getUpgradeRequest() {
329         return upgradeRequest;
330     }
331 
332     override HttpResponse getUpgradeResponse() {
333         return upgradeResponse;
334     }
335 
336     ExtensionNegotiator getExtensionNegotiator() {
337         return extensionNegotiator;
338     }
339 
340     Parser getParser() {
341         return parser;
342     }
343 
344     Generator getGenerator() {
345         return generator;
346     }
347 
348     // override Object getAttachment() {
349     //     return super.getAttachment();
350     // }
351 
352     // override void setAttachment(Object attachment) {
353     //     super.setAttachment(attachment);
354     // }
355 
356     override int getId() {
357         return super.getId();
358     }
359 
360     override Connection getTcpConnection() {
361         return super.getTcpConnection();
362     }
363     
364     override Address getLocalAddress() {
365         return  super.getLocalAddress();
366     }
367 
368     override Address getRemoteAddress() {
369         return super.getRemoteAddress();
370     }
371 
372     override HttpVersion getHttpVersion() {
373         return super.getHttpVersion();
374     }
375 
376     override void setAttribute(string key, Object value) {
377         super.setAttribute(key, value);
378     }
379     
380     override Object getAttribute(string key) {
381         return super.getAttribute(key);
382     }
383     
384     override Object removeAttribute(string key) {
385         return super.removeAttribute(key);
386     }    
387     
388     override bool containsAttribute(string key) {
389         return super.containsAttribute(key);
390     }    
391 
392 // version (HUNT_METRIC) {
393 //     override long getOpenTime() {
394 //         return super.getOpenTime();
395 //     }
396 
397 //     override long getCloseTime() {
398 //         return super.getCloseTime();
399 //     }
400 
401 //     override long getDuration() {
402 //         return super.getDuration();
403 //     }
404 
405 //     override long getLastReadTime() {
406 //         return super.getLastReadTime();
407 //     }
408 
409 //     override long getLastWrittenTime() {
410 //         return super.getLastWrittenTime();
411 //     }
412 
413 //     override long getLastActiveTime() {
414 //         return super.getLastActiveTime();
415 //     }
416 
417 //     override long getReadBytes() {
418 //         return super.getReadBytes();
419 //     }
420 
421 //     override long getWrittenBytes() {
422 //         return super.getWrittenBytes();
423 //     }
424 
425 //     override long getIdleTimeout() {
426 //         return super.getIdleTimeout();
427 //     }
428 // }
429 
430 //     override Duration getMaxIdleTimeout() {
431 //         return super.getMaxIdleTimeout();
432 //     }
433 
434 //     override bool isConnected() {
435 //         return super.isConnected();
436 //     }
437 
438 //     override bool isClosing() {
439 //         return super.isClosing();
440 //     }
441 
442 //     override Address getLocalAddress() {
443 //         return super.getLocalAddress();
444 //     }
445 
446 //     override Address getRemoteAddress() {
447 //         return super.getRemoteAddress();
448 //     }
449 }