1 module hunt.http.codec.websocket.stream.WebSocketConnectionImpl;
2 
3 import hunt.http.codec.http.model.HttpHeader;
4 import hunt.http.codec.http.model.MetaData;
5 import hunt.http.codec.http.stream.Http2Configuration;
6 import hunt.http.codec.websocket.decode.Parser;
7 import hunt.http.codec.websocket.encode;
8 import hunt.http.codec.websocket.frame;
9 import hunt.http.codec.websocket.model.CloseInfo;
10 import hunt.http.codec.websocket.model.common;
11 import hunt.http.codec.websocket.model.Extension;
12 import hunt.http.codec.websocket.model.extension.AbstractExtension;
13 import hunt.http.codec.websocket.model.IncomingFrames;
14 import hunt.http.codec.websocket.model.OutgoingFrames;
15 import hunt.http.codec.websocket.stream.ExtensionNegotiator;
16 import hunt.http.codec.websocket.stream.IOState;
17 import hunt.http.codec.websocket.stream.WebSocketConnection;
18 import hunt.http.codec.websocket.stream.WebSocketPolicy;
19 
20 import hunt.net.AbstractConnection;
21 import hunt.net.OutputEntry;
22 
23 ;
24 import hunt.net.ConnectionEvent;
25 import hunt.net.ConnectionType;
26 import hunt.net.secure.SecureSession;
27 import hunt.net.Session;
28 
29 import hunt.container;
30 import hunt.lang.common;
31 import hunt.logging;
32 import hunt.util.concurrent.CompletableFuture;
33 import hunt.lang.exception;
34 import hunt.util.functional;
35 
36 import std.random;
37 import std.socket;
38 import std.array;
39 
40 /**
41  * 
42  */
43 class WebSocketConnectionImpl : AbstractConnection, WebSocketConnection, IncomingFrames {
44 
45     protected ConnectionEvent!(WebSocketConnection) connectionEvent;
46     protected Parser parser;
47     protected Generator generator;
48     protected WebSocketPolicy policy;
49     protected HttpRequest upgradeRequest;
50     protected HttpResponse upgradeResponse;
51     protected IOState ioState;
52     protected Http2Configuration config;
53     protected ExtensionNegotiator extensionNegotiator;
54 
55     this(SecureSession secureSession, Session tcpSession, IncomingFrames nextIncomingFrames, WebSocketPolicy policy,
56             HttpRequest upgradeRequest, HttpResponse upgradeResponse,
57             Http2Configuration config) {
58         super(secureSession, tcpSession);
59 
60         extensionNegotiator = new ExtensionNegotiator();
61         connectionEvent = new ConnectionEvent!(WebSocketConnection)(this);
62         parser = new Parser(policy);
63         parser.setIncomingFramesHandler(this);
64         generator = new Generator(policy);
65         this.policy = policy;
66         this.upgradeRequest = upgradeRequest;
67         this.upgradeResponse = upgradeResponse;
68         this.config = config;
69         ioState = new IOState();
70         ioState.onOpened();
71 
72         //dfmt off
73         extensionNegotiator.setNextOutgoingFrames(
74             new class OutgoingFrames { 
75                 void outgoingFrame(Frame frame, Callback callback) {
76                 WebSocketFrame webSocketFrame = cast(WebSocketFrame) frame;
77                 if (policy.getBehavior() == WebSocketBehavior.CLIENT && webSocketFrame !is null) {
78                     if (!webSocketFrame.isMasked()) {
79                         webSocketFrame.setMask(generateMask());
80                     }
81                 }
82                 ByteBuffer buf = ByteBuffer.allocate(Generator.MAX_HEADER_LENGTH + frame.getPayloadLength());
83                 generator.generateWholeFrame(frame, buf);
84                 BufferUtils.flipToFlush(buf, 0);
85                 tcpSession.encode(new ByteBufferOutputEntry(callback, buf));
86                 if (frame.getType() == Frame.Type.CLOSE) {
87                     CloseFrame closeFrame = cast(CloseFrame) frame;
88                     if(closeFrame !is null) {
89                         CloseInfo closeInfo = new CloseInfo(closeFrame.getPayload(), false);
90                         getIOState().onCloseLocal(closeInfo);
91                         this.outer.close();
92                     }
93                 }
94             }
95         });
96 //dfmt on
97         setNextIncomingFrames(nextIncomingFrames);
98 
99         if (this.policy.getBehavior() == WebSocketBehavior.CLIENT) {
100 
101             implementationMissing(false);
102 
103             // Scheduler.Future pingFuture = scheduler.scheduleAtFixedRate(() {
104             //     PingFrame pingFrame = new PingFrame();
105             //     outgoingFrame(pingFrame, new class NoopCallback {
106             //         void succeeded() {
107             //             info("The websocket connection %s sent ping frame success", getSessionId());
108             //         }
109 
110             //         void failed(Exception x) {
111             //             log.warn("the websocket connection %s sends ping frame failure. %s", getSessionId(), x.getMessage());
112             //         }
113             //     });
114             // }, config.getWebsocketPingInterval(), config.getWebsocketPingInterval(), TimeUnit.MILLISECONDS);
115             // onClose(c -> pingFuture.cancel());
116         }
117     }
118 
119     override WebSocketConnection onClose(Action1!(WebSocketConnection) closedListener) {
120         return connectionEvent.onClose(closedListener);
121     }
122 
123     override WebSocketConnection onException(Action2!(WebSocketConnection,
124             Exception) exceptionListener) {
125         return connectionEvent.onException(exceptionListener);
126     }
127 
128     void notifyClose() {
129         connectionEvent.notifyClose();
130     }
131 
132     void notifyException(Exception t) {
133         connectionEvent.notifyException(t);
134     }
135 
136     override IOState getIOState() {
137         return ioState;
138     }
139 
140     override WebSocketPolicy getPolicy() {
141         return policy;
142     }
143 
144     void outgoingFrame(Frame frame, Callback callback) {
145         extensionNegotiator.getOutgoingFrames().outgoingFrame(frame, callback);
146     }
147 
148     void setNextIncomingFrames(IncomingFrames nextIncomingFrames) {
149         if (nextIncomingFrames !is null) {
150             extensionNegotiator.setNextIncomingFrames(nextIncomingFrames);
151             MetaData metaData;
152             if (upgradeResponse.getFields().contains(HttpHeader.SEC_WEBSOCKET_EXTENSIONS)) {
153                 metaData = upgradeResponse;
154             } else {
155                 metaData = upgradeRequest;
156             }
157             Extension[] extensions = extensionNegotiator.parse(metaData);
158             if (!extensions.empty()) {
159                 generator.configureFromExtensions(extensions);
160                 parser.configureFromExtensions(extensions);
161 
162                 foreach (Extension e; extensions) {
163                     AbstractExtension ae = cast(AbstractExtension) e;
164                     if (ae is null)
165                         continue;
166                     ae.setPolicy(policy);
167                 }
168                 // auto r = extensions.filter!(e => instanceof!(AbstractExtension)(e))
169                 //     .map!(e => cast(AbstractExtension)e);
170                 // extensions.stream().filter(e -> e instanceof AbstractExtension)
171                 //           .map(e -> (AbstractExtension) e)
172                 //           .forEach(e -> e.setPolicy(policy));
173             }
174         }
175     }
176 
177     void incomingError(Exception t) {
178         // Optional.ofNullable(extensionNegotiator.getIncomingFrames()).ifPresent(e -> e.incomingError(t));
179         IncomingFrames frames = extensionNegotiator.getIncomingFrames();
180         if (frames !is null)
181             frames.incomingError(t);
182     }
183 
184     override void incomingFrame(Frame frame) {
185         switch (frame.getType()) {
186         case FrameType.PING: {
187                 PongFrame pongFrame = new PongFrame();
188                 outgoingFrame(pongFrame, Callback.NOOP);
189             }
190             break;
191 
192         case FrameType.CLOSE: {
193                 CloseFrame closeFrame = cast(CloseFrame) frame;
194                 CloseInfo closeInfo = new CloseInfo(closeFrame.getPayload(), false);
195                 ioState.onCloseRemote(closeInfo);
196                 this.close();
197             }
198             break;
199 
200         case FrameType.PONG: {
201                 info("The websocket connection %s received pong frame", getSessionId());
202             }
203             break;
204 
205         default:
206             break;
207         }
208 
209         IncomingFrames e = extensionNegotiator.getIncomingFrames();
210         if (e !is null)
211             e.incomingFrame(frame);
212     }
213 
214     override bool isEncrypted() {
215         return secureSession !is null;
216     }
217 
218     override ConnectionType getConnectionType() {
219         return ConnectionType.WEB_SOCKET;
220     }
221 
222     override byte[] generateMask() {
223         byte[] mask = new byte[4];
224         // ThreadLocalRandom.current().nextBytes(mask);
225         foreach (size_t i; 0 .. mask.length)
226             mask[i] = uniform(byte.min, byte.max);
227         return mask;
228     }
229 
230     override CompletableFuture!(bool) sendText(string text) {
231         TextFrame textFrame = new TextFrame();
232         textFrame.setPayload(text);
233         CompletableFuture!(bool) future = new CompletableFuture!bool();
234         //dfmt off        
235         outgoingFrame(textFrame, 
236             new class NoopCallback {
237             override void succeeded() {
238                 future.complete(true);
239             }
240 
241             override void failed(Exception x) {
242                 future.completeExceptionally(x);
243             }
244         });
245 //dfmt on        
246         return future;
247     }
248 
249     override CompletableFuture!(bool) sendData(byte[] data) {
250         return _sendData(data);
251     }
252 
253     override CompletableFuture!(bool) sendData(ByteBuffer data) {
254         return _sendData(data);
255     }
256 
257     private CompletableFuture!(bool) _sendData(T)(T data) {
258         BinaryFrame binaryFrame = new BinaryFrame();
259         binaryFrame.setPayload(data);
260         CompletableFuture!(bool) future = new CompletableFuture!bool();
261         //dfmt off        
262         outgoingFrame(binaryFrame, 
263             new class NoopCallback {
264             override void succeeded() {
265                 future.complete(true);
266             }
267 
268             override void failed(Exception x) {
269                 future.completeExceptionally(x);
270             }
271         });
272 //dfmt on          
273         return future;
274     }
275 
276     override HttpRequest getUpgradeRequest() {
277         return upgradeRequest;
278     }
279 
280     override HttpResponse getUpgradeResponse() {
281         return upgradeResponse;
282     }
283 
284     ExtensionNegotiator getExtensionNegotiator() {
285         return extensionNegotiator;
286     }
287 
288     Parser getParser() {
289         return parser;
290     }
291 
292     Generator getGenerator() {
293         return generator;
294     }
295 
296     override Object getAttachment() {
297         return super.getAttachment();
298     }
299 
300     override void setAttachment(Object attachment) {
301         super.setAttachment(attachment);
302     }
303 
304     override int getSessionId() {
305         return super.getSessionId();
306     }
307 
308 version (HUNT_METRIC) {
309     override long getOpenTime() {
310         return super.getOpenTime();
311     }
312 
313     override long getCloseTime() {
314         return super.getCloseTime();
315     }
316 
317     override long getDuration() {
318         return super.getDuration();
319     }
320 
321     override long getLastReadTime() {
322         return super.getLastReadTime();
323     }
324 
325     override long getLastWrittenTime() {
326         return super.getLastWrittenTime();
327     }
328 
329     override long getLastActiveTime() {
330         return super.getLastActiveTime();
331     }
332 
333     override long getReadBytes() {
334         return super.getReadBytes();
335     }
336 
337     override long getWrittenBytes() {
338         return super.getWrittenBytes();
339     }
340 
341     override long getIdleTimeout() {
342         return super.getIdleTimeout();
343     }
344 }
345 
346     override long getMaxIdleTimeout() {
347         return super.getMaxIdleTimeout();
348     }
349 
350     override bool isOpen() {
351         return super.isOpen();
352     }
353 
354     override bool isClosed() {
355         return super.isClosed();
356     }
357 
358     override Address getLocalAddress() {
359         return super.getLocalAddress();
360     }
361 
362     override Address getRemoteAddress() {
363         return super.getRemoteAddress();
364     }
365 }