1 module hunt.http.codec.websocket.decode.Parser;
2
3 import hunt.http.codec.websocket.frame;
4 import hunt.http.codec.websocket.model;
5 import hunt.http.codec.websocket.decode.payload;
6
7 import hunt.http.Exceptions;
8 import hunt.http.WebSocketCommon;
9 import hunt.http.WebSocketConnection;
10 import hunt.http.WebSocketFrame;
11 import hunt.http.WebSocketPolicy;
12
13 import hunt.logging;
14 import hunt.collection;
15 import hunt.text.Common;
16 import hunt.util.StringBuilder;
17
18 import std.algorithm;
19 import std.conv;
20
21
22 /**
23 * Parsing of a frames in WebSocket land.
24 */
25 class Parser {
26 private enum State {
27 START,
28 PAYLOAD_LEN,
29 PAYLOAD_LEN_BYTES,
30 MASK,
31 MASK_BYTES,
32 PAYLOAD
33 }
34
35 private WebSocketPolicy policy;
36
37 // State specific
38 private State state = State.START;
39 private int cursor = 0;
40 // WebSocketFrame
41 private AbstractWebSocketFrame frame;
42 private bool priorDataFrame;
43 // payload specific
44 private ByteBuffer payload;
45 private int payloadLength;
46 private PayloadProcessor maskProcessor;
47 // private PayloadProcessor strictnessProcessor;
48
49 /**
50 * Is there an extension using RSV flag?
51 * <p>
52 * <p>
53 * <pre>
54 * 0100_0000 (0x40) = rsv1
55 * 0010_0000 (0x20) = rsv2
56 * 0001_0000 (0x10) = rsv3
57 * </pre>
58 */
59 private byte flagsInUse = 0x00;
60
61 private IncomingFrames incomingFramesHandler;
62
63 this(WebSocketPolicy wspolicy) {
64 this.policy = wspolicy;
65 maskProcessor = new DeMaskProcessor();
66 }
67
68 private void assertSanePayloadLength(long len) {
69 version(HUNT_HTTP_DEBUG) {
70 tracef("%s Payload Length: %s - %s", policy.getBehavior(),
71 len.to!string(), this.toString());
72 }
73
74 // Since we use ByteBuffer so often, having lengths over int.max is really impossible.
75 if (len > int.max) {
76 // OMG! Sanity Check! DO NOT WANT! Won't anyone think of the memory!
77 throw new MessageTooLargeException("[int-sane!] cannot handle payload lengths larger than "
78 ~ to!string(int.max));
79 }
80
81 switch (frame.getOpCode()) {
82 case OpCode.CLOSE:
83 if (len == 1) {
84 throw new ProtocolException("Invalid close frame payload length, [" ~
85 payloadLength.to!string() ~ "]");
86 }
87 goto case;
88 // fall thru
89 case OpCode.PING:
90 goto case;
91 case OpCode.PONG:
92 if (len > ControlFrame.MAX_CONTROL_PAYLOAD) {
93 throw new ProtocolException("Invalid control frame payload length, [" ~
94 payloadLength.to!string() ~ "] cannot exceed [" ~
95 ControlFrame.MAX_CONTROL_PAYLOAD.to!string() ~ "]");
96 }
97 break;
98 case OpCode.TEXT:
99 policy.assertValidTextMessageSize(cast(int) len);
100 break;
101 case OpCode.BINARY:
102 policy.assertValidBinaryMessageSize(cast(int) len);
103 break;
104
105 default:
106 break;
107 }
108 }
109
110 void configureFromExtensions(Extension[] exts) {
111 // default
112 flagsInUse = 0x00;
113
114 // configure from list of extensions in use
115 foreach (Extension ext ; exts) {
116 if (ext.isRsv1User()) {
117 flagsInUse = cast(byte) (flagsInUse | 0x40);
118 }
119 if (ext.isRsv2User()) {
120 flagsInUse = cast(byte) (flagsInUse | 0x20);
121 }
122 if (ext.isRsv3User()) {
123 flagsInUse = cast(byte) (flagsInUse | 0x10);
124 }
125 }
126 }
127
128 IncomingFrames getIncomingFramesHandler() {
129 return incomingFramesHandler;
130 }
131
132 WebSocketPolicy getPolicy() {
133 return policy;
134 }
135
136 bool isRsv1InUse() {
137 return (flagsInUse & 0x40) != 0;
138 }
139
140 bool isRsv2InUse() {
141 return (flagsInUse & 0x20) != 0;
142 }
143
144 bool isRsv3InUse() {
145 return (flagsInUse & 0x10) != 0;
146 }
147
148 protected void notifyFrame(WebSocketFrame f) {
149 version(HUNT_HTTP_DEBUG_MORE)
150 tracef("%s Notify %s", policy.getBehavior(), getIncomingFramesHandler());
151
152 if (policy.getBehavior() == WebSocketBehavior.SERVER) {
153 /* Parsing on server.
154 *
155 * Then you MUST make sure all incoming frames are masked!
156 *
157 * Technically, this test is in violation of RFC-6455, Section 5.1
158 * http://tools.ietf.org/html/rfc6455#section-5.1
159 *
160 * But we can't trust the client at this point, so hunt opts to close
161 * the connection as a Protocol error.
162 */
163 if (!f.isMasked()) {
164 throw new ProtocolException("Client MUST mask all frames (RFC-6455: Section 5.1)");
165 }
166 } else if (policy.getBehavior() == WebSocketBehavior.CLIENT) {
167 // Required by RFC-6455 / Section 5.1
168 if (f.isMasked()) {
169 throw new ProtocolException("Server MUST NOT mask any frames (RFC-6455: Section 5.1)");
170 }
171 }
172
173 if (incomingFramesHandler is null) {
174 version(HUNT_DEBUG) warning("incomingFramesHandler is null");
175 return;
176 }
177
178 try {
179 incomingFramesHandler.incomingFrame(f);
180 } catch (WebSocketException e) {
181 throw e;
182 } catch (Exception t) {
183 throw new WebSocketException(t);
184 }
185 }
186
187 void parse(ByteBuffer buffer) {
188
189 version(HUNT_HTTP_DEBUG) {
190 byte[] bufdata = buffer.peekRemaining();
191 tracef("remaining: %d, date: %(%02X %)", buffer.remaining(), bufdata);
192 }
193
194 if (buffer.remaining() <= 0) {
195 return;
196 }
197 try {
198 // parse through all the frames in the buffer
199 while (parseFrame(buffer)) {
200 version(HUNT_DEBUG) {
201 tracef("%s Parsed WebSocketFrame: %s", policy.getBehavior(), frame);
202 // info(BufferUtils.toDetailString(frame.getPayload()));
203 }
204 notifyFrame(frame);
205 if (frame.isDataFrame()) {
206 priorDataFrame = !frame.isFin();
207 }
208 reset();
209 }
210 } catch (WebSocketException e) {
211 buffer.position(buffer.limit()); // consume remaining
212 reset();
213 // need to throw for proper close behavior in connection
214 throw e;
215 } catch (Exception t) {
216 buffer.position(buffer.limit()); // consume remaining
217 reset();
218 // need to throw for proper close behavior in connection
219 throw new WebSocketException(t);
220 }
221 }
222
223 private void reset() {
224 if (frame !is null)
225 frame.reset();
226 frame = null;
227 payload = null;
228 }
229
230 /**
231 * Parse the base framing protocol buffer.
232 * <p>
233 * Note the first byte (fin,rsv1,rsv2,rsv3,opcode) are parsed by the {@link Parser#parse(ByteBuffer)} method
234 * <p>
235 * Not overridable
236 *
237 * @param buffer the buffer to parse from.
238 * @return true if done parsing base framing protocol and ready for parsing of the payload. false if incomplete parsing of base framing protocol.
239 */
240 private bool parseFrame(ByteBuffer buffer) {
241 version(HUNT_DEBUG) {
242 tracef("%s Parsing %s bytes", policy.getBehavior(), buffer.remaining());
243 }
244 while (buffer.hasRemaining()) {
245 switch (state) {
246 case State.START: {
247 // peek at byte
248 byte b = buffer.get();
249 bool fin = ((b & 0x80) != 0);
250
251 byte opcode = cast(byte) (b & 0x0F);
252
253 if (!OpCode.isKnown(opcode)) {
254 throw new ProtocolException("Unknown opcode: " ~ opcode);
255 }
256
257 version(HUNT_DEBUG)
258 tracef("%s OpCode %s, fin=%s rsv=%s%s%s",
259 policy.getBehavior(),
260 OpCode.name(opcode),
261 fin,
262 (((b & 0x40) != 0) ? '1' : '.'),
263 (((b & 0x20) != 0) ? '1' : '.'),
264 (((b & 0x10) != 0) ? '1' : '.'));
265
266 // base framing flags
267 switch (opcode) {
268 case OpCode.TEXT:
269 frame = new TextFrame();
270 // data validation
271 if (priorDataFrame) {
272 throw new ProtocolException("Unexpected " ~ OpCode.name(opcode) ~
273 " frame, was expecting CONTINUATION");
274 }
275 break;
276 case OpCode.BINARY:
277 frame = new BinaryFrame();
278 // data validation
279 if (priorDataFrame) {
280 throw new ProtocolException("Unexpected " ~ OpCode.name(opcode) ~
281 " frame, was expecting CONTINUATION");
282 }
283 break;
284 case OpCode.CONTINUATION:
285 frame = new ContinuationFrame();
286 // continuation validation
287 if (!priorDataFrame) {
288 throw new ProtocolException("CONTINUATION frame without prior !FIN");
289 }
290 // Be careful to use the original opcode
291 break;
292 case OpCode.CLOSE:
293 frame = new CloseFrame();
294 // control frame validation
295 if (!fin) {
296 throw new ProtocolException("Fragmented Close WebSocketFrame [" ~
297 OpCode.name(opcode) ~ "]");
298 }
299 break;
300 case OpCode.PING:
301 frame = new PingFrame();
302 // control frame validation
303 if (!fin) {
304 throw new ProtocolException("Fragmented Ping WebSocketFrame [" ~
305 OpCode.name(opcode) ~ "]");
306 }
307 break;
308 case OpCode.PONG:
309 frame = new PongFrame();
310 // control frame validation
311 if (!fin) {
312 throw new ProtocolException("Fragmented Pong WebSocketFrame [" ~
313 OpCode.name(opcode) ~ "]");
314 }
315 break;
316
317 default: break;
318 }
319
320 frame.setFin(fin);
321
322 // Are any flags set?
323 if ((b & 0x70) != 0) {
324 /*
325 * RFC 6455 Section 5.2
326 *
327 * MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. If a nonzero value is received and none of the
328 * negotiated extensions defines the meaning of such a nonzero value, the receiving endpoint MUST _Fail the WebSocket Connection_.
329 */
330 if ((b & 0x40) != 0) {
331 if (isRsv1InUse())
332 frame.setRsv1(true);
333 else {
334 string err = "RSV1 not allowed to be set";
335 version(HUNT_DEBUG) {
336 tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer));
337 }
338 throw new ProtocolException(err);
339 }
340 }
341 if ((b & 0x20) != 0) {
342 if (isRsv2InUse())
343 frame.setRsv2(true);
344 else {
345 string err = "RSV2 not allowed to be set";
346 version(HUNT_DEBUG) {
347 tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer));
348 }
349 throw new ProtocolException(err);
350 }
351 }
352 if ((b & 0x10) != 0) {
353 if (isRsv3InUse())
354 frame.setRsv3(true);
355 else {
356 string err = "RSV3 not allowed to be set";
357 version(HUNT_DEBUG) {
358 tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer));
359 }
360 throw new ProtocolException(err);
361 }
362 }
363 }
364
365 state = State.PAYLOAD_LEN;
366 break;
367 }
368
369 case State.PAYLOAD_LEN: {
370 byte b = buffer.get();
371 frame.setMasked((b & 0x80) != 0);
372 payloadLength = cast(byte) (0x7F & b);
373
374 if (payloadLength == 127) // 0x7F
375 {
376 // length 8 bytes (extended payload length)
377 payloadLength = 0;
378 state = State.PAYLOAD_LEN_BYTES;
379 cursor = 8;
380 break; // continue onto next state
381 } else if (payloadLength == 126) // 0x7E
382 {
383 // length 2 bytes (extended payload length)
384 payloadLength = 0;
385 state = State.PAYLOAD_LEN_BYTES;
386 cursor = 2;
387 break; // continue onto next state
388 }
389
390 assertSanePayloadLength(payloadLength);
391 if (frame.isMasked()) {
392 state = State.MASK;
393 } else {
394 // special case for empty payloads (no more bytes left in buffer)
395 if (payloadLength == 0) {
396 state = State.START;
397 return true;
398 }
399
400 maskProcessor.reset(frame);
401 state = State.PAYLOAD;
402 }
403
404 break;
405 }
406
407 case State.PAYLOAD_LEN_BYTES: {
408 byte b = buffer.get();
409 --cursor;
410 payloadLength |= (b & 0xFF) << (8 * cursor);
411 if (cursor == 0) {
412 assertSanePayloadLength(payloadLength);
413 if (frame.isMasked()) {
414 state = State.MASK;
415 } else {
416 // special case for empty payloads (no more bytes left in buffer)
417 if (payloadLength == 0) {
418 state = State.START;
419 return true;
420 }
421
422 maskProcessor.reset(frame);
423 state = State.PAYLOAD;
424 }
425 }
426 break;
427 }
428
429 case State.MASK: {
430 byte[] m = new byte[4];
431 frame.setMask(m);
432 if (buffer.remaining() >= 4) {
433 buffer.get(m, 0, 4);
434 // special case for empty payloads (no more bytes left in buffer)
435 if (payloadLength == 0) {
436 state = State.START;
437 return true;
438 }
439
440 maskProcessor.reset(frame);
441 state = State.PAYLOAD;
442 } else {
443 state = State.MASK_BYTES;
444 cursor = 4;
445 }
446 break;
447 }
448
449 case State.MASK_BYTES: {
450 byte b = buffer.get();
451 frame.getMask()[4 - cursor] = b;
452 --cursor;
453 if (cursor == 0) {
454 // special case for empty payloads (no more bytes left in buffer)
455 if (payloadLength == 0) {
456 state = State.START;
457 return true;
458 }
459
460 maskProcessor.reset(frame);
461 state = State.PAYLOAD;
462 }
463 break;
464 }
465
466 case State.PAYLOAD: {
467 frame.assertValid();
468 if (parsePayload(buffer)) {
469 // special check for close
470 if (frame.getOpCode() == OpCode.CLOSE) {
471 // TODO: yuck. Don't create an object to do validation checks!
472 new CloseInfo(frame);
473 }
474 state = State.START;
475 // we have a frame!
476 return true;
477 }
478 break;
479 }
480
481 default: break;
482 }
483 }
484
485 return false;
486 }
487
488 /**
489 * Implementation specific parsing of a payload
490 *
491 * @param buffer the payload buffer
492 * @return true if payload is done reading, false if incomplete
493 */
494 private bool parsePayload(ByteBuffer buffer) {
495 if (payloadLength == 0) {
496 return true;
497 }
498
499 if (buffer.hasRemaining()) {
500 // Create a small window of the incoming buffer to work with.
501 // this should only show the payload itself, and not any more
502 // bytes that could belong to the start of the next frame.
503 int bytesSoFar = payload is null ? 0 : payload.position();
504 int bytesExpected = payloadLength - bytesSoFar;
505 int bytesAvailable = buffer.remaining();
506 int windowBytes = std.algorithm.min(bytesAvailable, bytesExpected);
507 int limit = buffer.limit();
508 buffer.limit(buffer.position() + windowBytes);
509 ByteBuffer window = buffer.slice();
510 buffer.limit(limit);
511 buffer.position(buffer.position() + window.remaining());
512
513 maskProcessor.process(window);
514
515 version(HUNT_HTTP_DEBUG) {
516 tracef("%s Window(unmarked): %s", policy.getBehavior(), BufferUtils.toDetailString(window));
517 }
518
519 if (window.remaining() == payloadLength) {
520 // We have the whole content, no need to copy.
521 frame.setPayload(window);
522 return true;
523 } else {
524 if (payload is null) {
525 payload = BufferUtils.allocate(payloadLength);
526 BufferUtils.clearToFill(payload);
527 }
528 // Copy the payload.
529 payload.put(window);
530
531 if (payload.position() == payloadLength) {
532 BufferUtils.flipToFlush(payload, 0);
533 frame.setPayload(payload);
534 return true;
535 }
536 }
537 }
538 return false;
539 }
540
541 void setIncomingFramesHandler(IncomingFrames incoming) {
542 this.incomingFramesHandler = incoming;
543 }
544
545 override
546 string toString() {
547 StringBuilder builder = new StringBuilder();
548 builder.append("Parser@").append(toHash().to!string(16));
549 builder.append("[");
550 if (incomingFramesHandler is null) {
551 builder.append("NO_HANDLER");
552 } else {
553 builder.append(typeid(incomingFramesHandler).name);
554 }
555 builder.append(",s=").append(state.to!string());
556 builder.append(",c=").append(cursor.to!string());
557 builder.append(",len=").append(payloadLength);
558 builder.append(",f=").append(frame.toString());
559 // builder.append(",p=").append(policy);
560 builder.append("]");
561 return builder.toString();
562 }
563 }