1 module hunt.http.codec.websocket.decode.Parser;
3 import hunt.http.codec.websocket.frame;
4 import hunt.http.codec.websocket.model;
5 import hunt.http.codec.websocket.decode.payload;
7 import hunt.http.Exceptions;
8 import hunt.http.WebSocketCommon;
9 import hunt.http.WebSocketConnection;
10 import hunt.http.WebSocketFrame;
11 import hunt.http.WebSocketPolicy;
13 import hunt.logging;
14 import hunt.collection;
15 import hunt.text.Common;
16 import hunt.util.StringBuilder;
18 import std.algorithm;
19 import std.conv;
22 /**
23  * Parsing of a frames in WebSocket land.
24  */
25 class Parser {
26     private enum State {
27         START,
28         PAYLOAD_LEN,
30         MASK,
31         MASK_BYTES,
32         PAYLOAD
33     }
35     private WebSocketPolicy policy;
37     // State specific
38     private State state = State.START;
39     private int cursor = 0;
40     // WebSocketFrame
41     private AbstractWebSocketFrame frame;
42     private bool priorDataFrame;
43     // payload specific
44     private ByteBuffer payload;
45     private int payloadLength;
46     private PayloadProcessor maskProcessor;
47     // private PayloadProcessor strictnessProcessor;
49     /**
50      * Is there an extension using RSV flag?
51      * <p>
52      * <p>
53      * <pre>
54      *   0100_0000 (0x40) = rsv1
55      *   0010_0000 (0x20) = rsv2
56      *   0001_0000 (0x10) = rsv3
57      * </pre>
58      */
59     private byte flagsInUse = 0x00;
61     private IncomingFrames incomingFramesHandler;
63     this(WebSocketPolicy wspolicy) {
64         this.policy = wspolicy;
65          maskProcessor = new DeMaskProcessor();
66     }
68     private void assertSanePayloadLength(long len) {
69         version(HUNT_HTTP_DEBUG) {
70             tracef("%s Payload Length: %s - %s", policy.getBehavior(), 
71                 len.to!string(), this.toString());
72         }
74         // Since we use ByteBuffer so often, having lengths over int.max is really impossible.
75         if (len > int.max) {
76             // OMG! Sanity Check! DO NOT WANT! Won't anyone think of the memory!
77             throw new MessageTooLargeException("[int-sane!] cannot handle payload lengths larger than " 
78                 ~ to!string(int.max));
79         }
81         switch (frame.getOpCode()) {
82             case OpCode.CLOSE:
83                 if (len == 1) {
84                     throw new ProtocolException("Invalid close frame payload length, [" ~ 
85                         payloadLength.to!string() ~ "]");
86                 }
87                 goto case;
88                 // fall thru
89             case OpCode.PING:
90                 goto case;
91             case OpCode.PONG:
92                 if (len > ControlFrame.MAX_CONTROL_PAYLOAD) {
93                     throw new ProtocolException("Invalid control frame payload length, [" ~ 
94                         payloadLength.to!string() ~ "] cannot exceed [" ~ 
95                         ControlFrame.MAX_CONTROL_PAYLOAD.to!string() ~ "]");
96                 }
97                 break;
98             case OpCode.TEXT:
99                 policy.assertValidTextMessageSize(cast(int) len);
100                 break;
101             case OpCode.BINARY:
102                 policy.assertValidBinaryMessageSize(cast(int) len);
103                 break;
105             default:
106                 break;
107         }
108     }
110     void configureFromExtensions(Extension[] exts) {
111         // default
112         flagsInUse = 0x00;
114         // configure from list of extensions in use
115         foreach (Extension ext ; exts) {
116             if (ext.isRsv1User()) {
117                 flagsInUse = cast(byte) (flagsInUse | 0x40);
118             }
119             if (ext.isRsv2User()) {
120                 flagsInUse = cast(byte) (flagsInUse | 0x20);
121             }
122             if (ext.isRsv3User()) {
123                 flagsInUse = cast(byte) (flagsInUse | 0x10);
124             }
125         }
126     }
128     IncomingFrames getIncomingFramesHandler() {
129         return incomingFramesHandler;
130     }
132     WebSocketPolicy getPolicy() {
133         return policy;
134     }
136     bool isRsv1InUse() {
137         return (flagsInUse & 0x40) != 0;
138     }
140     bool isRsv2InUse() {
141         return (flagsInUse & 0x20) != 0;
142     }
144     bool isRsv3InUse() {
145         return (flagsInUse & 0x10) != 0;
146     }
148     protected void notifyFrame(WebSocketFrame f) {
149         version(HUNT_HTTP_DEBUG_MORE)
150             tracef("%s Notify %s", policy.getBehavior(), getIncomingFramesHandler());
152         if (policy.getBehavior() == WebSocketBehavior.SERVER) {
153             /* Parsing on server.
154              * 
155              * Then you MUST make sure all incoming frames are masked!
156              * 
157              * Technically, this test is in violation of RFC-6455, Section 5.1
158              * http://tools.ietf.org/html/rfc6455#section-5.1
159              * 
160              * But we can't trust the client at this point, so hunt opts to close
161              * the connection as a Protocol error.
162              */
163             if (!f.isMasked()) {
164                 throw new ProtocolException("Client MUST mask all frames (RFC-6455: Section 5.1)");
165             }
166         } else if (policy.getBehavior() == WebSocketBehavior.CLIENT) {
167             // Required by RFC-6455 / Section 5.1
168             if (f.isMasked()) {
169                 throw new ProtocolException("Server MUST NOT mask any frames (RFC-6455: Section 5.1)");
170             }
171         }
173         if (incomingFramesHandler is null) {
174             version(HUNT_DEBUG) warning("incomingFramesHandler is null");
175             return;
176         }
178         try {
179             incomingFramesHandler.incomingFrame(f);
180         } catch (WebSocketException e) {
181             throw e;
182         } catch (Exception t) {
183             throw new WebSocketException(t);
184         }
185     }
187     void parse(ByteBuffer buffer) {
189         version(HUNT_HTTP_DEBUG) {
190             byte[] bufdata = buffer.peekRemaining();
191             tracef("remaining: %d,  date: %(%02X %)", buffer.remaining(), bufdata);
192         }
194         if (buffer.remaining() <= 0) {
195             return;
196         }
197         try {
198             // parse through all the frames in the buffer
199             while (parseFrame(buffer)) {
200                 version(HUNT_DEBUG) {
201                     tracef("%s Parsed WebSocketFrame: %s", policy.getBehavior(), frame);
202                     // info(BufferUtils.toDetailString(frame.getPayload()));
203                 }
204                 notifyFrame(frame);
205                 if (frame.isDataFrame()) {
206                     priorDataFrame = !frame.isFin();
207                 }
208                 reset();
209             }
210         } catch (WebSocketException e) {
211             buffer.position(buffer.limit()); // consume remaining
212             reset();
213             // need to throw for proper close behavior in connection
214             throw e;
215         } catch (Exception t) {
216             buffer.position(buffer.limit()); // consume remaining
217             reset();
218             // need to throw for proper close behavior in connection
219             throw new WebSocketException(t);
220         }
221     }
223     private void reset() {
224         if (frame !is null)
225             frame.reset();
226         frame = null;
227         payload = null;
228     }
230     /**
231      * Parse the base framing protocol buffer.
232      * <p>
233      * Note the first byte (fin,rsv1,rsv2,rsv3,opcode) are parsed by the {@link Parser#parse(ByteBuffer)} method
234      * <p>
235      * Not overridable
236      *
237      * @param buffer the buffer to parse from.
238      * @return true if done parsing base framing protocol and ready for parsing of the payload. false if incomplete parsing of base framing protocol.
239      */
240     private bool parseFrame(ByteBuffer buffer) {
241         version(HUNT_DEBUG) {
242             tracef("%s Parsing %s bytes", policy.getBehavior(), buffer.remaining());
243         }
244         while (buffer.hasRemaining()) {
245             switch (state) {
246                 case State.START: {
247                     // peek at byte
248                     byte b = buffer.get();
249                     bool fin = ((b & 0x80) != 0);
251                     byte opcode = cast(byte) (b & 0x0F);
253                     if (!OpCode.isKnown(opcode)) {
254                         throw new ProtocolException("Unknown opcode: " ~ opcode);
255                     }
257                     version(HUNT_DEBUG)
258                         tracef("%s OpCode %s, fin=%s rsv=%s%s%s",
259                                 policy.getBehavior(),
260                                 OpCode.name(opcode),
261                                 fin,
262                                 (((b & 0x40) != 0) ? '1' : '.'),
263                                 (((b & 0x20) != 0) ? '1' : '.'),
264                                 (((b & 0x10) != 0) ? '1' : '.'));
266                     // base framing flags
267                     switch (opcode) {
268                         case OpCode.TEXT:
269                             frame = new TextFrame();
270                             // data validation
271                             if (priorDataFrame) {
272                                 throw new ProtocolException("Unexpected " ~ OpCode.name(opcode) ~ 
273                                     " frame, was expecting CONTINUATION");
274                             }
275                             break;
276                         case OpCode.BINARY:
277                             frame = new BinaryFrame();
278                             // data validation
279                             if (priorDataFrame) {
280                                 throw new ProtocolException("Unexpected " ~ OpCode.name(opcode) ~ 
281                                     " frame, was expecting CONTINUATION");
282                             }
283                             break;
284                         case OpCode.CONTINUATION:
285                             frame = new ContinuationFrame();
286                             // continuation validation
287                             if (!priorDataFrame) {
288                                 throw new ProtocolException("CONTINUATION frame without prior !FIN");
289                             }
290                             // Be careful to use the original opcode
291                             break;
292                         case OpCode.CLOSE:
293                             frame = new CloseFrame();
294                             // control frame validation
295                             if (!fin) {
296                                 throw new ProtocolException("Fragmented Close WebSocketFrame [" ~ 
297                                     OpCode.name(opcode) ~ "]");
298                             }
299                             break;
300                         case OpCode.PING:
301                             frame = new PingFrame();
302                             // control frame validation
303                             if (!fin) {
304                                 throw new ProtocolException("Fragmented Ping WebSocketFrame [" ~ 
305                                     OpCode.name(opcode) ~ "]");
306                             }
307                             break;
308                         case OpCode.PONG:
309                             frame = new PongFrame();
310                             // control frame validation
311                             if (!fin) {
312                                 throw new ProtocolException("Fragmented Pong WebSocketFrame [" ~ 
313                                     OpCode.name(opcode) ~ "]");
314                             }
315                             break;
317                         default: break;
318                     }
320                     frame.setFin(fin);
322                     // Are any flags set?
323                     if ((b & 0x70) != 0) {
324                         /*
325                          * RFC 6455 Section 5.2
326                          * 
327                          * MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. If a nonzero value is received and none of the
328                          * negotiated extensions defines the meaning of such a nonzero value, the receiving endpoint MUST _Fail the WebSocket Connection_.
329                          */
330                         if ((b & 0x40) != 0) {
331                             if (isRsv1InUse())
332                                 frame.setRsv1(true);
333                             else {
334                                 string err = "RSV1 not allowed to be set";
335                                 version(HUNT_DEBUG) {
336                                     tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer));
337                                 }
338                                 throw new ProtocolException(err);
339                             }
340                         }
341                         if ((b & 0x20) != 0) {
342                             if (isRsv2InUse())
343                                 frame.setRsv2(true);
344                             else {
345                                 string err = "RSV2 not allowed to be set";
346                                 version(HUNT_DEBUG) {
347                                     tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer));
348                                 }
349                                 throw new ProtocolException(err);
350                             }
351                         }
352                         if ((b & 0x10) != 0) {
353                             if (isRsv3InUse())
354                                 frame.setRsv3(true);
355                             else {
356                                 string err = "RSV3 not allowed to be set";
357                                 version(HUNT_DEBUG) {
358                                     tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer));
359                                 }
360                                 throw new ProtocolException(err);
361                             }
362                         }
363                     }
365                     state = State.PAYLOAD_LEN;
366                     break;
367                 }
369                 case State.PAYLOAD_LEN: {
370                     byte b = buffer.get();
371                     frame.setMasked((b & 0x80) != 0);
372                     payloadLength = cast(byte) (0x7F & b);
374                     if (payloadLength == 127) // 0x7F
375                     {
376                         // length 8 bytes (extended payload length)
377                         payloadLength = 0;
378                         state = State.PAYLOAD_LEN_BYTES;
379                         cursor = 8;
380                         break; // continue onto next state
381                     } else if (payloadLength == 126) // 0x7E
382                     {
383                         // length 2 bytes (extended payload length)
384                         payloadLength = 0;
385                         state = State.PAYLOAD_LEN_BYTES;
386                         cursor = 2;
387                         break; // continue onto next state
388                     }
390                     assertSanePayloadLength(payloadLength);
391                     if (frame.isMasked()) {
392                         state = State.MASK;
393                     } else {
394                         // special case for empty payloads (no more bytes left in buffer)
395                         if (payloadLength == 0) {
396                             state = State.START;
397                             return true;
398                         }
400                         maskProcessor.reset(frame);
401                         state = State.PAYLOAD;
402                     }
404                     break;
405                 }
407                 case State.PAYLOAD_LEN_BYTES: {
408                     byte b = buffer.get();
409                     --cursor;
410                     payloadLength |= (b & 0xFF) << (8 * cursor);
411                     if (cursor == 0) {
412                         assertSanePayloadLength(payloadLength);
413                         if (frame.isMasked()) {
414                             state = State.MASK;
415                         } else {
416                             // special case for empty payloads (no more bytes left in buffer)
417                             if (payloadLength == 0) {
418                                 state = State.START;
419                                 return true;
420                             }
422                             maskProcessor.reset(frame);
423                             state = State.PAYLOAD;
424                         }
425                     }
426                     break;
427                 }
429                 case State.MASK: {
430                     byte[] m = new byte[4];
431                     frame.setMask(m);
432                     if (buffer.remaining() >= 4) {
433                         buffer.get(m, 0, 4);
434                         // special case for empty payloads (no more bytes left in buffer)
435                         if (payloadLength == 0) {
436                             state = State.START;
437                             return true;
438                         }
440                         maskProcessor.reset(frame);
441                         state = State.PAYLOAD;
442                     } else {
443                         state = State.MASK_BYTES;
444                         cursor = 4;
445                     }
446                     break;
447                 }
449                 case State.MASK_BYTES: {
450                     byte b = buffer.get();
451                     frame.getMask()[4 - cursor] = b;
452                     --cursor;
453                     if (cursor == 0) {
454                         // special case for empty payloads (no more bytes left in buffer)
455                         if (payloadLength == 0) {
456                             state = State.START;
457                             return true;
458                         }
460                         maskProcessor.reset(frame);
461                         state = State.PAYLOAD;
462                     }
463                     break;
464                 }
466                 case State.PAYLOAD: {
467                     frame.assertValid();
468                     if (parsePayload(buffer)) {
469                         // special check for close
470                         if (frame.getOpCode() == OpCode.CLOSE) {
471                             // TODO: yuck. Don't create an object to do validation checks!
472                             new CloseInfo(frame);
473                         }
474                         state = State.START;
475                         // we have a frame!
476                         return true;
477                     }
478                     break;
479                 }
481                 default: break;
482             }
483         }
485         return false;
486     }
488     /**
489      * Implementation specific parsing of a payload
490      *
491      * @param buffer the payload buffer
492      * @return true if payload is done reading, false if incomplete
493      */
494     private bool parsePayload(ByteBuffer buffer) {
495         if (payloadLength == 0) {
496             return true;
497         }
499         if (buffer.hasRemaining()) {
500             // Create a small window of the incoming buffer to work with.
501             // this should only show the payload itself, and not any more
502             // bytes that could belong to the start of the next frame.
503             int bytesSoFar = payload is null ? 0 : payload.position();
504             int bytesExpected = payloadLength - bytesSoFar;
505             int bytesAvailable = buffer.remaining();
506             int windowBytes = std.algorithm.min(bytesAvailable, bytesExpected);
507             int limit = buffer.limit();
508             buffer.limit(buffer.position() + windowBytes);
509             ByteBuffer window = buffer.slice();
510             buffer.limit(limit);
511             buffer.position(buffer.position() + window.remaining());
513             maskProcessor.process(window);
515             version(HUNT_HTTP_DEBUG) {
516                 tracef("%s Window(unmarked): %s", policy.getBehavior(), BufferUtils.toDetailString(window));
517             }
519             if (window.remaining() == payloadLength) {
520                 // We have the whole content, no need to copy.
521                 frame.setPayload(window);
522                 return true;
523             } else {
524                 if (payload is null) {
525                     payload = BufferUtils.allocate(payloadLength);
526                     BufferUtils.clearToFill(payload);
527                 }
528                 // Copy the payload.
529                 payload.put(window);
531                 if (payload.position() == payloadLength) {
532                     BufferUtils.flipToFlush(payload, 0);
533                     frame.setPayload(payload);
534                     return true;
535                 }
536             }
537         }
538         return false;
539     }
541     void setIncomingFramesHandler(IncomingFrames incoming) {
542         this.incomingFramesHandler = incoming;
543     }
545     override
546     string toString() {
547         StringBuilder builder = new StringBuilder();
548         builder.append("Parser@").append(toHash().to!string(16));
549         builder.append("[");
550         if (incomingFramesHandler is null) {
551             builder.append("NO_HANDLER");
552         } else {
553             builder.append(typeid(incomingFramesHandler).name);
554         }
555         builder.append(",s=").append(state.to!string());
556         builder.append(",c=").append(cursor.to!string());
557         builder.append(",len=").append(payloadLength);
558         builder.append(",f=").append(frame.toString());
559         // builder.append(",p=").append(policy);
560         builder.append("]");
561         return builder.toString();
562     }
563 }