1 module hunt.http.codec.websocket.decode.Parser; 2 3 import hunt.http.codec.websocket.exception; 4 import hunt.http.codec.websocket.frame; 5 import hunt.http.codec.websocket.model; 6 import hunt.http.codec.websocket.decode.payload; 7 import hunt.http.codec.websocket.stream.WebSocketPolicy; 8 9 import hunt.logging; 10 import hunt.container; 11 import hunt.string; 12 13 import std.algorithm; 14 import std.conv; 15 16 17 /** 18 * Parsing of a frames in WebSocket land. 19 */ 20 class Parser { 21 private enum State { 22 START, 23 PAYLOAD_LEN, 24 PAYLOAD_LEN_BYTES, 25 MASK, 26 MASK_BYTES, 27 PAYLOAD 28 } 29 30 private WebSocketPolicy policy; 31 32 // State specific 33 private State state = State.START; 34 private int cursor = 0; 35 // Frame 36 private WebSocketFrame frame; 37 private bool priorDataFrame; 38 // payload specific 39 private ByteBuffer payload; 40 private int payloadLength; 41 private PayloadProcessor maskProcessor; 42 // private PayloadProcessor strictnessProcessor; 43 44 /** 45 * Is there an extension using RSV flag? 46 * <p> 47 * <p> 48 * <pre> 49 * 0100_0000 (0x40) = rsv1 50 * 0010_0000 (0x20) = rsv2 51 * 0001_0000 (0x10) = rsv3 52 * </pre> 53 */ 54 private byte flagsInUse = 0x00; 55 56 private IncomingFrames incomingFramesHandler; 57 58 this(WebSocketPolicy wspolicy) { 59 this.policy = wspolicy; 60 maskProcessor = new DeMaskProcessor(); 61 } 62 63 private void assertSanePayloadLength(long len) { 64 version(HUNT_DEBUG) { 65 tracef("%s Payload Length: %s - %s", policy.getBehavior(), 66 len.to!string(), this.toString()); 67 } 68 69 // Since we use ByteBuffer so often, having lengths over int.max is really impossible. 70 if (len > int.max) { 71 // OMG! Sanity Check! DO NOT WANT! Won't anyone think of the memory! 72 throw new MessageTooLargeException("[int-sane!] cannot handle payload lengths larger than " 73 ~ to!string(int.max)); 74 } 75 76 switch (frame.getOpCode()) { 77 case OpCode.CLOSE: 78 if (len == 1) { 79 throw new ProtocolException("Invalid close frame payload length, [" ~ 80 payloadLength.to!string() ~ "]"); 81 } 82 goto case; 83 // fall thru 84 case OpCode.PING: 85 goto case; 86 case OpCode.PONG: 87 if (len > ControlFrame.MAX_CONTROL_PAYLOAD) { 88 throw new ProtocolException("Invalid control frame payload length, [" ~ 89 payloadLength.to!string() ~ "] cannot exceed [" ~ 90 ControlFrame.MAX_CONTROL_PAYLOAD.to!string() ~ "]"); 91 } 92 break; 93 case OpCode.TEXT: 94 policy.assertValidTextMessageSize(cast(int) len); 95 break; 96 case OpCode.BINARY: 97 policy.assertValidBinaryMessageSize(cast(int) len); 98 break; 99 100 default: 101 break; 102 } 103 } 104 105 void configureFromExtensions(Extension[] exts) { 106 // default 107 flagsInUse = 0x00; 108 109 // configure from list of extensions in use 110 foreach (Extension ext ; exts) { 111 if (ext.isRsv1User()) { 112 flagsInUse = cast(byte) (flagsInUse | 0x40); 113 } 114 if (ext.isRsv2User()) { 115 flagsInUse = cast(byte) (flagsInUse | 0x20); 116 } 117 if (ext.isRsv3User()) { 118 flagsInUse = cast(byte) (flagsInUse | 0x10); 119 } 120 } 121 } 122 123 IncomingFrames getIncomingFramesHandler() { 124 return incomingFramesHandler; 125 } 126 127 WebSocketPolicy getPolicy() { 128 return policy; 129 } 130 131 bool isRsv1InUse() { 132 return (flagsInUse & 0x40) != 0; 133 } 134 135 bool isRsv2InUse() { 136 return (flagsInUse & 0x20) != 0; 137 } 138 139 bool isRsv3InUse() { 140 return (flagsInUse & 0x10) != 0; 141 } 142 143 protected void notifyFrame(Frame f) { 144 version(HUNT_DEBUG) 145 tracef("%s Notify %s", policy.getBehavior(), getIncomingFramesHandler()); 146 147 if (policy.getBehavior() == WebSocketBehavior.SERVER) { 148 /* Parsing on server. 149 * 150 * Then you MUST make sure all incoming frames are masked! 151 * 152 * Technically, this test is in violation of RFC-6455, Section 5.1 153 * http://tools.ietf.org/html/rfc6455#section-5.1 154 * 155 * But we can't trust the client at this point, so hunt opts to close 156 * the connection as a Protocol error. 157 */ 158 if (!f.isMasked()) { 159 throw new ProtocolException("Client MUST mask all frames (RFC-6455: Section 5.1)"); 160 } 161 } else if (policy.getBehavior() == WebSocketBehavior.CLIENT) { 162 // Required by RFC-6455 / Section 5.1 163 if (f.isMasked()) { 164 throw new ProtocolException("Server MUST NOT mask any frames (RFC-6455: Section 5.1)"); 165 } 166 } 167 168 if (incomingFramesHandler is null) { 169 version(HUNT_DEBUG) warning("incomingFramesHandler is null"); 170 return; 171 } 172 173 try { 174 incomingFramesHandler.incomingFrame(f); 175 } catch (WebSocketException e) { 176 throw e; 177 } catch (Exception t) { 178 throw new WebSocketException(t); 179 } 180 } 181 182 void parse(ByteBuffer buffer) { 183 if (buffer.remaining() <= 0) { 184 return; 185 } 186 try { 187 // parse through all the frames in the buffer 188 while (parseFrame(buffer)) { 189 version(HUNT_DEBUG) { 190 tracef("%s Parsed Frame: %s", policy.getBehavior(), frame); 191 } 192 notifyFrame(frame); 193 if (frame.isDataFrame()) { 194 priorDataFrame = !frame.isFin(); 195 } 196 reset(); 197 } 198 } catch (WebSocketException e) { 199 buffer.position(buffer.limit()); // consume remaining 200 reset(); 201 // need to throw for proper close behavior in connection 202 throw e; 203 } catch (Exception t) { 204 buffer.position(buffer.limit()); // consume remaining 205 reset(); 206 // need to throw for proper close behavior in connection 207 throw new WebSocketException(t); 208 } 209 } 210 211 private void reset() { 212 if (frame !is null) 213 frame.reset(); 214 frame = null; 215 payload = null; 216 } 217 218 /** 219 * Parse the base framing protocol buffer. 220 * <p> 221 * Note the first byte (fin,rsv1,rsv2,rsv3,opcode) are parsed by the {@link Parser#parse(ByteBuffer)} method 222 * <p> 223 * Not overridable 224 * 225 * @param buffer the buffer to parse from. 226 * @return true if done parsing base framing protocol and ready for parsing of the payload. false if incomplete parsing of base framing protocol. 227 */ 228 private bool parseFrame(ByteBuffer buffer) { 229 version(HUNT_DEBUG) { 230 tracef("%s Parsing %s bytes", policy.getBehavior(), buffer.remaining()); 231 } 232 while (buffer.hasRemaining()) { 233 switch (state) { 234 case State.START: { 235 // peek at byte 236 byte b = buffer.get(); 237 bool fin = ((b & 0x80) != 0); 238 239 byte opcode = cast(byte) (b & 0x0F); 240 241 if (!OpCode.isKnown(opcode)) { 242 throw new ProtocolException("Unknown opcode: " ~ opcode); 243 } 244 245 version(HUNT_DEBUG) 246 tracef("%s OpCode %s, fin=%s rsv=%s%s%s", 247 policy.getBehavior(), 248 OpCode.name(opcode), 249 fin, 250 (((b & 0x40) != 0) ? '1' : '.'), 251 (((b & 0x20) != 0) ? '1' : '.'), 252 (((b & 0x10) != 0) ? '1' : '.')); 253 254 // base framing flags 255 switch (opcode) { 256 case OpCode.TEXT: 257 frame = new TextFrame(); 258 // data validation 259 if (priorDataFrame) { 260 throw new ProtocolException("Unexpected " ~ OpCode.name(opcode) ~ 261 " frame, was expecting CONTINUATION"); 262 } 263 break; 264 case OpCode.BINARY: 265 frame = new BinaryFrame(); 266 // data validation 267 if (priorDataFrame) { 268 throw new ProtocolException("Unexpected " ~ OpCode.name(opcode) ~ 269 " frame, was expecting CONTINUATION"); 270 } 271 break; 272 case OpCode.CONTINUATION: 273 frame = new ContinuationFrame(); 274 // continuation validation 275 if (!priorDataFrame) { 276 throw new ProtocolException("CONTINUATION frame without prior !FIN"); 277 } 278 // Be careful to use the original opcode 279 break; 280 case OpCode.CLOSE: 281 frame = new CloseFrame(); 282 // control frame validation 283 if (!fin) { 284 throw new ProtocolException("Fragmented Close Frame [" ~ 285 OpCode.name(opcode) ~ "]"); 286 } 287 break; 288 case OpCode.PING: 289 frame = new PingFrame(); 290 // control frame validation 291 if (!fin) { 292 throw new ProtocolException("Fragmented Ping Frame [" ~ 293 OpCode.name(opcode) ~ "]"); 294 } 295 break; 296 case OpCode.PONG: 297 frame = new PongFrame(); 298 // control frame validation 299 if (!fin) { 300 throw new ProtocolException("Fragmented Pong Frame [" ~ 301 OpCode.name(opcode) ~ "]"); 302 } 303 break; 304 305 default: break; 306 } 307 308 frame.setFin(fin); 309 310 // Are any flags set? 311 if ((b & 0x70) != 0) { 312 /* 313 * RFC 6455 Section 5.2 314 * 315 * MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. If a nonzero value is received and none of the 316 * negotiated extensions defines the meaning of such a nonzero value, the receiving endpoint MUST _Fail the WebSocket Connection_. 317 */ 318 if ((b & 0x40) != 0) { 319 if (isRsv1InUse()) 320 frame.setRsv1(true); 321 else { 322 string err = "RSV1 not allowed to be set"; 323 version(HUNT_DEBUG) { 324 tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer)); 325 } 326 throw new ProtocolException(err); 327 } 328 } 329 if ((b & 0x20) != 0) { 330 if (isRsv2InUse()) 331 frame.setRsv2(true); 332 else { 333 string err = "RSV2 not allowed to be set"; 334 version(HUNT_DEBUG) { 335 tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer)); 336 } 337 throw new ProtocolException(err); 338 } 339 } 340 if ((b & 0x10) != 0) { 341 if (isRsv3InUse()) 342 frame.setRsv3(true); 343 else { 344 string err = "RSV3 not allowed to be set"; 345 version(HUNT_DEBUG) { 346 tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer)); 347 } 348 throw new ProtocolException(err); 349 } 350 } 351 } 352 353 state = State.PAYLOAD_LEN; 354 break; 355 } 356 357 case State.PAYLOAD_LEN: { 358 byte b = buffer.get(); 359 frame.setMasked((b & 0x80) != 0); 360 payloadLength = cast(byte) (0x7F & b); 361 362 if (payloadLength == 127) // 0x7F 363 { 364 // length 8 bytes (extended payload length) 365 payloadLength = 0; 366 state = State.PAYLOAD_LEN_BYTES; 367 cursor = 8; 368 break; // continue onto next state 369 } else if (payloadLength == 126) // 0x7E 370 { 371 // length 2 bytes (extended payload length) 372 payloadLength = 0; 373 state = State.PAYLOAD_LEN_BYTES; 374 cursor = 2; 375 break; // continue onto next state 376 } 377 378 assertSanePayloadLength(payloadLength); 379 if (frame.isMasked()) { 380 state = State.MASK; 381 } else { 382 // special case for empty payloads (no more bytes left in buffer) 383 if (payloadLength == 0) { 384 state = State.START; 385 return true; 386 } 387 388 maskProcessor.reset(frame); 389 state = State.PAYLOAD; 390 } 391 392 break; 393 } 394 395 case State.PAYLOAD_LEN_BYTES: { 396 byte b = buffer.get(); 397 --cursor; 398 payloadLength |= (b & 0xFF) << (8 * cursor); 399 if (cursor == 0) { 400 assertSanePayloadLength(payloadLength); 401 if (frame.isMasked()) { 402 state = State.MASK; 403 } else { 404 // special case for empty payloads (no more bytes left in buffer) 405 if (payloadLength == 0) { 406 state = State.START; 407 return true; 408 } 409 410 maskProcessor.reset(frame); 411 state = State.PAYLOAD; 412 } 413 } 414 break; 415 } 416 417 case State.MASK: { 418 byte[] m = new byte[4]; 419 frame.setMask(m); 420 if (buffer.remaining() >= 4) { 421 buffer.get(m, 0, 4); 422 // special case for empty payloads (no more bytes left in buffer) 423 if (payloadLength == 0) { 424 state = State.START; 425 return true; 426 } 427 428 maskProcessor.reset(frame); 429 state = State.PAYLOAD; 430 } else { 431 state = State.MASK_BYTES; 432 cursor = 4; 433 } 434 break; 435 } 436 437 case State.MASK_BYTES: { 438 byte b = buffer.get(); 439 frame.getMask()[4 - cursor] = b; 440 --cursor; 441 if (cursor == 0) { 442 // special case for empty payloads (no more bytes left in buffer) 443 if (payloadLength == 0) { 444 state = State.START; 445 return true; 446 } 447 448 maskProcessor.reset(frame); 449 state = State.PAYLOAD; 450 } 451 break; 452 } 453 454 case State.PAYLOAD: { 455 frame.assertValid(); 456 if (parsePayload(buffer)) { 457 // special check for close 458 if (frame.getOpCode() == OpCode.CLOSE) { 459 // TODO: yuck. Don't create an object to do validation checks! 460 new CloseInfo(frame); 461 } 462 state = State.START; 463 // we have a frame! 464 return true; 465 } 466 break; 467 } 468 469 default: break; 470 } 471 } 472 473 return false; 474 } 475 476 /** 477 * Implementation specific parsing of a payload 478 * 479 * @param buffer the payload buffer 480 * @return true if payload is done reading, false if incomplete 481 */ 482 private bool parsePayload(ByteBuffer buffer) { 483 if (payloadLength == 0) { 484 return true; 485 } 486 487 if (buffer.hasRemaining()) { 488 // Create a small window of the incoming buffer to work with. 489 // this should only show the payload itself, and not any more 490 // bytes that could belong to the start of the next frame. 491 int bytesSoFar = payload is null ? 0 : payload.position(); 492 int bytesExpected = payloadLength - bytesSoFar; 493 int bytesAvailable = buffer.remaining(); 494 int windowBytes = std.algorithm.min(bytesAvailable, bytesExpected); 495 int limit = buffer.limit(); 496 buffer.limit(buffer.position() + windowBytes); 497 ByteBuffer window = buffer.slice(); 498 buffer.limit(limit); 499 buffer.position(buffer.position() + window.remaining()); 500 501 version(HUNT_DEBUG) { 502 tracef("%s Window: %s", policy.getBehavior(), BufferUtils.toDetailString(window)); 503 } 504 505 maskProcessor.process(window); 506 507 if (window.remaining() == payloadLength) { 508 // We have the whole content, no need to copy. 509 frame.setPayload(window); 510 return true; 511 } else { 512 if (payload is null) { 513 payload = BufferUtils.allocate(payloadLength); 514 BufferUtils.clearToFill(payload); 515 } 516 // Copy the payload. 517 payload.put(window); 518 519 if (payload.position() == payloadLength) { 520 BufferUtils.flipToFlush(payload, 0); 521 frame.setPayload(payload); 522 return true; 523 } 524 } 525 } 526 return false; 527 } 528 529 void setIncomingFramesHandler(IncomingFrames incoming) { 530 this.incomingFramesHandler = incoming; 531 } 532 533 override 534 string toString() { 535 StringBuilder builder = new StringBuilder(); 536 builder.append("Parser@").append(toHash().to!string(16)); 537 builder.append("["); 538 if (incomingFramesHandler is null) { 539 builder.append("NO_HANDLER"); 540 } else { 541 builder.append(typeid(incomingFramesHandler).name); 542 } 543 builder.append(",s=").append(state.to!string()); 544 builder.append(",c=").append(cursor.to!string()); 545 builder.append(",len=").append(payloadLength); 546 builder.append(",f=").append(frame.toString()); 547 // builder.append(",p=").append(policy); 548 builder.append("]"); 549 return builder.toString(); 550 } 551 }