1 module hunt.http.codec.websocket.decode.Parser;
2 
3 import hunt.http.codec.websocket.exception;
4 import hunt.http.codec.websocket.frame;
5 import hunt.http.codec.websocket.model;
6 import hunt.http.codec.websocket.decode.payload;
7 import hunt.http.codec.websocket.stream.WebSocketPolicy;
8 
9 import hunt.logging;
10 import hunt.container;
11 import hunt.string;
12 
13 import std.algorithm;
14 import std.conv;
15 
16 
17 /**
18  * Parsing of a frames in WebSocket land.
19  */
20 class Parser {
21     private enum State {
22         START,
23         PAYLOAD_LEN,
24         PAYLOAD_LEN_BYTES,
25         MASK,
26         MASK_BYTES,
27         PAYLOAD
28     }
29 
30     private WebSocketPolicy policy;
31 
32     // State specific
33     private State state = State.START;
34     private int cursor = 0;
35     // Frame
36     private WebSocketFrame frame;
37     private bool priorDataFrame;
38     // payload specific
39     private ByteBuffer payload;
40     private int payloadLength;
41     private PayloadProcessor maskProcessor;
42     // private PayloadProcessor strictnessProcessor;
43 
44     /**
45      * Is there an extension using RSV flag?
46      * <p>
47      * <p>
48      * <pre>
49      *   0100_0000 (0x40) = rsv1
50      *   0010_0000 (0x20) = rsv2
51      *   0001_0000 (0x10) = rsv3
52      * </pre>
53      */
54     private byte flagsInUse = 0x00;
55 
56     private IncomingFrames incomingFramesHandler;
57 
58     this(WebSocketPolicy wspolicy) {
59         this.policy = wspolicy;
60          maskProcessor = new DeMaskProcessor();
61     }
62 
63     private void assertSanePayloadLength(long len) {
64         version(HUNT_DEBUG) {
65             tracef("%s Payload Length: %s - %s", policy.getBehavior(), 
66                 len.to!string(), this.toString());
67         }
68 
69         // Since we use ByteBuffer so often, having lengths over int.max is really impossible.
70         if (len > int.max) {
71             // OMG! Sanity Check! DO NOT WANT! Won't anyone think of the memory!
72             throw new MessageTooLargeException("[int-sane!] cannot handle payload lengths larger than " 
73                 ~ to!string(int.max));
74         }
75 
76         switch (frame.getOpCode()) {
77             case OpCode.CLOSE:
78                 if (len == 1) {
79                     throw new ProtocolException("Invalid close frame payload length, [" ~ 
80                         payloadLength.to!string() ~ "]");
81                 }
82                 goto case;
83                 // fall thru
84             case OpCode.PING:
85                 goto case;
86             case OpCode.PONG:
87                 if (len > ControlFrame.MAX_CONTROL_PAYLOAD) {
88                     throw new ProtocolException("Invalid control frame payload length, [" ~ 
89                         payloadLength.to!string() ~ "] cannot exceed [" ~ 
90                         ControlFrame.MAX_CONTROL_PAYLOAD.to!string() ~ "]");
91                 }
92                 break;
93             case OpCode.TEXT:
94                 policy.assertValidTextMessageSize(cast(int) len);
95                 break;
96             case OpCode.BINARY:
97                 policy.assertValidBinaryMessageSize(cast(int) len);
98                 break;
99 
100             default:
101                 break;
102         }
103     }
104 
105     void configureFromExtensions(Extension[] exts) {
106         // default
107         flagsInUse = 0x00;
108 
109         // configure from list of extensions in use
110         foreach (Extension ext ; exts) {
111             if (ext.isRsv1User()) {
112                 flagsInUse = cast(byte) (flagsInUse | 0x40);
113             }
114             if (ext.isRsv2User()) {
115                 flagsInUse = cast(byte) (flagsInUse | 0x20);
116             }
117             if (ext.isRsv3User()) {
118                 flagsInUse = cast(byte) (flagsInUse | 0x10);
119             }
120         }
121     }
122 
123     IncomingFrames getIncomingFramesHandler() {
124         return incomingFramesHandler;
125     }
126 
127     WebSocketPolicy getPolicy() {
128         return policy;
129     }
130 
131     bool isRsv1InUse() {
132         return (flagsInUse & 0x40) != 0;
133     }
134 
135     bool isRsv2InUse() {
136         return (flagsInUse & 0x20) != 0;
137     }
138 
139     bool isRsv3InUse() {
140         return (flagsInUse & 0x10) != 0;
141     }
142 
143     protected void notifyFrame(Frame f) {
144         version(HUNT_DEBUG)
145             tracef("%s Notify %s", policy.getBehavior(), getIncomingFramesHandler());
146 
147         if (policy.getBehavior() == WebSocketBehavior.SERVER) {
148             /* Parsing on server.
149              * 
150              * Then you MUST make sure all incoming frames are masked!
151              * 
152              * Technically, this test is in violation of RFC-6455, Section 5.1
153              * http://tools.ietf.org/html/rfc6455#section-5.1
154              * 
155              * But we can't trust the client at this point, so hunt opts to close
156              * the connection as a Protocol error.
157              */
158             if (!f.isMasked()) {
159                 throw new ProtocolException("Client MUST mask all frames (RFC-6455: Section 5.1)");
160             }
161         } else if (policy.getBehavior() == WebSocketBehavior.CLIENT) {
162             // Required by RFC-6455 / Section 5.1
163             if (f.isMasked()) {
164                 throw new ProtocolException("Server MUST NOT mask any frames (RFC-6455: Section 5.1)");
165             }
166         }
167         
168         if (incomingFramesHandler is null) {
169             version(HUNT_DEBUG) warning("incomingFramesHandler is null");
170             return;
171         }
172 
173         try {
174             incomingFramesHandler.incomingFrame(f);
175         } catch (WebSocketException e) {
176             throw e;
177         } catch (Exception t) {
178             throw new WebSocketException(t);
179         }
180     }
181 
182     void parse(ByteBuffer buffer) {
183         if (buffer.remaining() <= 0) {
184             return;
185         }
186         try {
187             // parse through all the frames in the buffer
188             while (parseFrame(buffer)) {
189                 version(HUNT_DEBUG) {
190                     tracef("%s Parsed Frame: %s", policy.getBehavior(), frame);
191                 }
192                 notifyFrame(frame);
193                 if (frame.isDataFrame()) {
194                     priorDataFrame = !frame.isFin();
195                 }
196                 reset();
197             }
198         } catch (WebSocketException e) {
199             buffer.position(buffer.limit()); // consume remaining
200             reset();
201             // need to throw for proper close behavior in connection
202             throw e;
203         } catch (Exception t) {
204             buffer.position(buffer.limit()); // consume remaining
205             reset();
206             // need to throw for proper close behavior in connection
207             throw new WebSocketException(t);
208         }
209     }
210 
211     private void reset() {
212         if (frame !is null)
213             frame.reset();
214         frame = null;
215         payload = null;
216     }
217 
218     /**
219      * Parse the base framing protocol buffer.
220      * <p>
221      * Note the first byte (fin,rsv1,rsv2,rsv3,opcode) are parsed by the {@link Parser#parse(ByteBuffer)} method
222      * <p>
223      * Not overridable
224      *
225      * @param buffer the buffer to parse from.
226      * @return true if done parsing base framing protocol and ready for parsing of the payload. false if incomplete parsing of base framing protocol.
227      */
228     private bool parseFrame(ByteBuffer buffer) {
229         version(HUNT_DEBUG) {
230             tracef("%s Parsing %s bytes", policy.getBehavior(), buffer.remaining());
231         }
232         while (buffer.hasRemaining()) {
233             switch (state) {
234                 case State.START: {
235                     // peek at byte
236                     byte b = buffer.get();
237                     bool fin = ((b & 0x80) != 0);
238 
239                     byte opcode = cast(byte) (b & 0x0F);
240 
241                     if (!OpCode.isKnown(opcode)) {
242                         throw new ProtocolException("Unknown opcode: " ~ opcode);
243                     }
244 
245                     version(HUNT_DEBUG)
246                         tracef("%s OpCode %s, fin=%s rsv=%s%s%s",
247                                 policy.getBehavior(),
248                                 OpCode.name(opcode),
249                                 fin,
250                                 (((b & 0x40) != 0) ? '1' : '.'),
251                                 (((b & 0x20) != 0) ? '1' : '.'),
252                                 (((b & 0x10) != 0) ? '1' : '.'));
253 
254                     // base framing flags
255                     switch (opcode) {
256                         case OpCode.TEXT:
257                             frame = new TextFrame();
258                             // data validation
259                             if (priorDataFrame) {
260                                 throw new ProtocolException("Unexpected " ~ OpCode.name(opcode) ~ 
261                                     " frame, was expecting CONTINUATION");
262                             }
263                             break;
264                         case OpCode.BINARY:
265                             frame = new BinaryFrame();
266                             // data validation
267                             if (priorDataFrame) {
268                                 throw new ProtocolException("Unexpected " ~ OpCode.name(opcode) ~ 
269                                     " frame, was expecting CONTINUATION");
270                             }
271                             break;
272                         case OpCode.CONTINUATION:
273                             frame = new ContinuationFrame();
274                             // continuation validation
275                             if (!priorDataFrame) {
276                                 throw new ProtocolException("CONTINUATION frame without prior !FIN");
277                             }
278                             // Be careful to use the original opcode
279                             break;
280                         case OpCode.CLOSE:
281                             frame = new CloseFrame();
282                             // control frame validation
283                             if (!fin) {
284                                 throw new ProtocolException("Fragmented Close Frame [" ~ 
285                                     OpCode.name(opcode) ~ "]");
286                             }
287                             break;
288                         case OpCode.PING:
289                             frame = new PingFrame();
290                             // control frame validation
291                             if (!fin) {
292                                 throw new ProtocolException("Fragmented Ping Frame [" ~ 
293                                     OpCode.name(opcode) ~ "]");
294                             }
295                             break;
296                         case OpCode.PONG:
297                             frame = new PongFrame();
298                             // control frame validation
299                             if (!fin) {
300                                 throw new ProtocolException("Fragmented Pong Frame [" ~ 
301                                     OpCode.name(opcode) ~ "]");
302                             }
303                             break;
304 
305                         default: break;
306                     }
307 
308                     frame.setFin(fin);
309 
310                     // Are any flags set?
311                     if ((b & 0x70) != 0) {
312                         /*
313                          * RFC 6455 Section 5.2
314                          * 
315                          * MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. If a nonzero value is received and none of the
316                          * negotiated extensions defines the meaning of such a nonzero value, the receiving endpoint MUST _Fail the WebSocket Connection_.
317                          */
318                         if ((b & 0x40) != 0) {
319                             if (isRsv1InUse())
320                                 frame.setRsv1(true);
321                             else {
322                                 string err = "RSV1 not allowed to be set";
323                                 version(HUNT_DEBUG) {
324                                     tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer));
325                                 }
326                                 throw new ProtocolException(err);
327                             }
328                         }
329                         if ((b & 0x20) != 0) {
330                             if (isRsv2InUse())
331                                 frame.setRsv2(true);
332                             else {
333                                 string err = "RSV2 not allowed to be set";
334                                 version(HUNT_DEBUG) {
335                                     tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer));
336                                 }
337                                 throw new ProtocolException(err);
338                             }
339                         }
340                         if ((b & 0x10) != 0) {
341                             if (isRsv3InUse())
342                                 frame.setRsv3(true);
343                             else {
344                                 string err = "RSV3 not allowed to be set";
345                                 version(HUNT_DEBUG) {
346                                     tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer));
347                                 }
348                                 throw new ProtocolException(err);
349                             }
350                         }
351                     }
352 
353                     state = State.PAYLOAD_LEN;
354                     break;
355                 }
356 
357                 case State.PAYLOAD_LEN: {
358                     byte b = buffer.get();
359                     frame.setMasked((b & 0x80) != 0);
360                     payloadLength = cast(byte) (0x7F & b);
361 
362                     if (payloadLength == 127) // 0x7F
363                     {
364                         // length 8 bytes (extended payload length)
365                         payloadLength = 0;
366                         state = State.PAYLOAD_LEN_BYTES;
367                         cursor = 8;
368                         break; // continue onto next state
369                     } else if (payloadLength == 126) // 0x7E
370                     {
371                         // length 2 bytes (extended payload length)
372                         payloadLength = 0;
373                         state = State.PAYLOAD_LEN_BYTES;
374                         cursor = 2;
375                         break; // continue onto next state
376                     }
377 
378                     assertSanePayloadLength(payloadLength);
379                     if (frame.isMasked()) {
380                         state = State.MASK;
381                     } else {
382                         // special case for empty payloads (no more bytes left in buffer)
383                         if (payloadLength == 0) {
384                             state = State.START;
385                             return true;
386                         }
387 
388                         maskProcessor.reset(frame);
389                         state = State.PAYLOAD;
390                     }
391 
392                     break;
393                 }
394 
395                 case State.PAYLOAD_LEN_BYTES: {
396                     byte b = buffer.get();
397                     --cursor;
398                     payloadLength |= (b & 0xFF) << (8 * cursor);
399                     if (cursor == 0) {
400                         assertSanePayloadLength(payloadLength);
401                         if (frame.isMasked()) {
402                             state = State.MASK;
403                         } else {
404                             // special case for empty payloads (no more bytes left in buffer)
405                             if (payloadLength == 0) {
406                                 state = State.START;
407                                 return true;
408                             }
409 
410                             maskProcessor.reset(frame);
411                             state = State.PAYLOAD;
412                         }
413                     }
414                     break;
415                 }
416 
417                 case State.MASK: {
418                     byte[] m = new byte[4];
419                     frame.setMask(m);
420                     if (buffer.remaining() >= 4) {
421                         buffer.get(m, 0, 4);
422                         // special case for empty payloads (no more bytes left in buffer)
423                         if (payloadLength == 0) {
424                             state = State.START;
425                             return true;
426                         }
427 
428                         maskProcessor.reset(frame);
429                         state = State.PAYLOAD;
430                     } else {
431                         state = State.MASK_BYTES;
432                         cursor = 4;
433                     }
434                     break;
435                 }
436 
437                 case State.MASK_BYTES: {
438                     byte b = buffer.get();
439                     frame.getMask()[4 - cursor] = b;
440                     --cursor;
441                     if (cursor == 0) {
442                         // special case for empty payloads (no more bytes left in buffer)
443                         if (payloadLength == 0) {
444                             state = State.START;
445                             return true;
446                         }
447 
448                         maskProcessor.reset(frame);
449                         state = State.PAYLOAD;
450                     }
451                     break;
452                 }
453 
454                 case State.PAYLOAD: {
455                     frame.assertValid();
456                     if (parsePayload(buffer)) {
457                         // special check for close
458                         if (frame.getOpCode() == OpCode.CLOSE) {
459                             // TODO: yuck. Don't create an object to do validation checks!
460                             new CloseInfo(frame);
461                         }
462                         state = State.START;
463                         // we have a frame!
464                         return true;
465                     }
466                     break;
467                 }
468 
469                 default: break;
470             }
471         }
472 
473         return false;
474     }
475 
476     /**
477      * Implementation specific parsing of a payload
478      *
479      * @param buffer the payload buffer
480      * @return true if payload is done reading, false if incomplete
481      */
482     private bool parsePayload(ByteBuffer buffer) {
483         if (payloadLength == 0) {
484             return true;
485         }
486 
487         if (buffer.hasRemaining()) {
488             // Create a small window of the incoming buffer to work with.
489             // this should only show the payload itself, and not any more
490             // bytes that could belong to the start of the next frame.
491             int bytesSoFar = payload is null ? 0 : payload.position();
492             int bytesExpected = payloadLength - bytesSoFar;
493             int bytesAvailable = buffer.remaining();
494             int windowBytes = std.algorithm.min(bytesAvailable, bytesExpected);
495             int limit = buffer.limit();
496             buffer.limit(buffer.position() + windowBytes);
497             ByteBuffer window = buffer.slice();
498             buffer.limit(limit);
499             buffer.position(buffer.position() + window.remaining());
500 
501             version(HUNT_DEBUG) {
502                 tracef("%s Window: %s", policy.getBehavior(), BufferUtils.toDetailString(window));
503             }
504 
505             maskProcessor.process(window);
506 
507             if (window.remaining() == payloadLength) {
508                 // We have the whole content, no need to copy.
509                 frame.setPayload(window);
510                 return true;
511             } else {
512                 if (payload is null) {
513                     payload = BufferUtils.allocate(payloadLength);
514                     BufferUtils.clearToFill(payload);
515                 }
516                 // Copy the payload.
517                 payload.put(window);
518 
519                 if (payload.position() == payloadLength) {
520                     BufferUtils.flipToFlush(payload, 0);
521                     frame.setPayload(payload);
522                     return true;
523                 }
524             }
525         }
526         return false;
527     }
528 
529     void setIncomingFramesHandler(IncomingFrames incoming) {
530         this.incomingFramesHandler = incoming;
531     }
532 
533     override
534     string toString() {
535         StringBuilder builder = new StringBuilder();
536         builder.append("Parser@").append(toHash().to!string(16));
537         builder.append("[");
538         if (incomingFramesHandler is null) {
539             builder.append("NO_HANDLER");
540         } else {
541             builder.append(typeid(incomingFramesHandler).name);
542         }
543         builder.append(",s=").append(state.to!string());
544         builder.append(",c=").append(cursor.to!string());
545         builder.append(",len=").append(payloadLength);
546         builder.append(",f=").append(frame.toString());
547         // builder.append(",p=").append(policy);
548         builder.append("]");
549         return builder.toString();
550     }
551 }