1 module hunt.http.codec.websocket.decode.Parser; 2 3 import hunt.http.codec.websocket.frame; 4 import hunt.http.codec.websocket.model; 5 import hunt.http.codec.websocket.decode.payload; 6 7 import hunt.http.Exceptions; 8 import hunt.http.WebSocketCommon; 9 import hunt.http.WebSocketConnection; 10 import hunt.http.WebSocketFrame; 11 import hunt.http.WebSocketPolicy; 12 13 import hunt.logging; 14 import hunt.collection; 15 import hunt.text.Common; 16 import hunt.util.StringBuilder; 17 18 import std.algorithm; 19 import std.conv; 20 21 22 /** 23 * Parsing of a frames in WebSocket land. 24 */ 25 class Parser { 26 private enum State { 27 START, 28 PAYLOAD_LEN, 29 PAYLOAD_LEN_BYTES, 30 MASK, 31 MASK_BYTES, 32 PAYLOAD 33 } 34 35 private WebSocketPolicy policy; 36 37 // State specific 38 private State state = State.START; 39 private int cursor = 0; 40 // WebSocketFrame 41 private AbstractWebSocketFrame frame; 42 private bool priorDataFrame; 43 // payload specific 44 private ByteBuffer payload; 45 private int payloadLength; 46 private PayloadProcessor maskProcessor; 47 // private PayloadProcessor strictnessProcessor; 48 49 /** 50 * Is there an extension using RSV flag? 51 * <p> 52 * <p> 53 * <pre> 54 * 0100_0000 (0x40) = rsv1 55 * 0010_0000 (0x20) = rsv2 56 * 0001_0000 (0x10) = rsv3 57 * </pre> 58 */ 59 private byte flagsInUse = 0x00; 60 61 private IncomingFrames incomingFramesHandler; 62 63 this(WebSocketPolicy wspolicy) { 64 this.policy = wspolicy; 65 maskProcessor = new DeMaskProcessor(); 66 } 67 68 private void assertSanePayloadLength(long len) { 69 version(HUNT_HTTP_DEBUG) { 70 tracef("%s Payload Length: %s - %s", policy.getBehavior(), 71 len.to!string(), this.toString()); 72 } 73 74 // Since we use ByteBuffer so often, having lengths over int.max is really impossible. 75 if (len > int.max) { 76 // OMG! Sanity Check! DO NOT WANT! Won't anyone think of the memory! 77 throw new MessageTooLargeException("[int-sane!] cannot handle payload lengths larger than " 78 ~ to!string(int.max)); 79 } 80 81 switch (frame.getOpCode()) { 82 case OpCode.CLOSE: 83 if (len == 1) { 84 throw new ProtocolException("Invalid close frame payload length, [" ~ 85 payloadLength.to!string() ~ "]"); 86 } 87 goto case; 88 // fall thru 89 case OpCode.PING: 90 goto case; 91 case OpCode.PONG: 92 if (len > ControlFrame.MAX_CONTROL_PAYLOAD) { 93 throw new ProtocolException("Invalid control frame payload length, [" ~ 94 payloadLength.to!string() ~ "] cannot exceed [" ~ 95 ControlFrame.MAX_CONTROL_PAYLOAD.to!string() ~ "]"); 96 } 97 break; 98 case OpCode.TEXT: 99 policy.assertValidTextMessageSize(cast(int) len); 100 break; 101 case OpCode.BINARY: 102 policy.assertValidBinaryMessageSize(cast(int) len); 103 break; 104 105 default: 106 break; 107 } 108 } 109 110 void configureFromExtensions(Extension[] exts) { 111 // default 112 flagsInUse = 0x00; 113 114 // configure from list of extensions in use 115 foreach (Extension ext ; exts) { 116 if (ext.isRsv1User()) { 117 flagsInUse = cast(byte) (flagsInUse | 0x40); 118 } 119 if (ext.isRsv2User()) { 120 flagsInUse = cast(byte) (flagsInUse | 0x20); 121 } 122 if (ext.isRsv3User()) { 123 flagsInUse = cast(byte) (flagsInUse | 0x10); 124 } 125 } 126 } 127 128 IncomingFrames getIncomingFramesHandler() { 129 return incomingFramesHandler; 130 } 131 132 WebSocketPolicy getPolicy() { 133 return policy; 134 } 135 136 bool isRsv1InUse() { 137 return (flagsInUse & 0x40) != 0; 138 } 139 140 bool isRsv2InUse() { 141 return (flagsInUse & 0x20) != 0; 142 } 143 144 bool isRsv3InUse() { 145 return (flagsInUse & 0x10) != 0; 146 } 147 148 protected void notifyFrame(WebSocketFrame f) { 149 version(HUNT_HTTP_DEBUG_MORE) 150 tracef("%s Notify %s", policy.getBehavior(), getIncomingFramesHandler()); 151 152 if (policy.getBehavior() == WebSocketBehavior.SERVER) { 153 /* Parsing on server. 154 * 155 * Then you MUST make sure all incoming frames are masked! 156 * 157 * Technically, this test is in violation of RFC-6455, Section 5.1 158 * http://tools.ietf.org/html/rfc6455#section-5.1 159 * 160 * But we can't trust the client at this point, so hunt opts to close 161 * the connection as a Protocol error. 162 */ 163 if (!f.isMasked()) { 164 throw new ProtocolException("Client MUST mask all frames (RFC-6455: Section 5.1)"); 165 } 166 } else if (policy.getBehavior() == WebSocketBehavior.CLIENT) { 167 // Required by RFC-6455 / Section 5.1 168 if (f.isMasked()) { 169 throw new ProtocolException("Server MUST NOT mask any frames (RFC-6455: Section 5.1)"); 170 } 171 } 172 173 if (incomingFramesHandler is null) { 174 version(HUNT_DEBUG) warning("incomingFramesHandler is null"); 175 return; 176 } 177 178 try { 179 incomingFramesHandler.incomingFrame(f); 180 } catch (WebSocketException e) { 181 throw e; 182 } catch (Exception t) { 183 throw new WebSocketException(t); 184 } 185 } 186 187 void parse(ByteBuffer buffer) { 188 189 version(HUNT_HTTP_DEBUG) { 190 byte[] bufdata = buffer.peekRemaining(); 191 tracef("remaining: %d, date: %(%02X %)", buffer.remaining(), bufdata); 192 } 193 194 if (buffer.remaining() <= 0) { 195 return; 196 } 197 try { 198 // parse through all the frames in the buffer 199 while (parseFrame(buffer)) { 200 version(HUNT_DEBUG) { 201 tracef("%s Parsed WebSocketFrame: %s", policy.getBehavior(), frame); 202 // info(BufferUtils.toDetailString(frame.getPayload())); 203 } 204 notifyFrame(frame); 205 if (frame.isDataFrame()) { 206 priorDataFrame = !frame.isFin(); 207 } 208 reset(); 209 } 210 } catch (WebSocketException e) { 211 buffer.position(buffer.limit()); // consume remaining 212 reset(); 213 // need to throw for proper close behavior in connection 214 throw e; 215 } catch (Exception t) { 216 buffer.position(buffer.limit()); // consume remaining 217 reset(); 218 // need to throw for proper close behavior in connection 219 throw new WebSocketException(t); 220 } 221 } 222 223 private void reset() { 224 if (frame !is null) 225 frame.reset(); 226 frame = null; 227 payload = null; 228 } 229 230 /** 231 * Parse the base framing protocol buffer. 232 * <p> 233 * Note the first byte (fin,rsv1,rsv2,rsv3,opcode) are parsed by the {@link Parser#parse(ByteBuffer)} method 234 * <p> 235 * Not overridable 236 * 237 * @param buffer the buffer to parse from. 238 * @return true if done parsing base framing protocol and ready for parsing of the payload. false if incomplete parsing of base framing protocol. 239 */ 240 private bool parseFrame(ByteBuffer buffer) { 241 version(HUNT_DEBUG) { 242 tracef("%s Parsing %s bytes", policy.getBehavior(), buffer.remaining()); 243 } 244 while (buffer.hasRemaining()) { 245 switch (state) { 246 case State.START: { 247 // peek at byte 248 byte b = buffer.get(); 249 bool fin = ((b & 0x80) != 0); 250 251 byte opcode = cast(byte) (b & 0x0F); 252 253 if (!OpCode.isKnown(opcode)) { 254 throw new ProtocolException("Unknown opcode: " ~ opcode); 255 } 256 257 version(HUNT_DEBUG) 258 tracef("%s OpCode %s, fin=%s rsv=%s%s%s", 259 policy.getBehavior(), 260 OpCode.name(opcode), 261 fin, 262 (((b & 0x40) != 0) ? '1' : '.'), 263 (((b & 0x20) != 0) ? '1' : '.'), 264 (((b & 0x10) != 0) ? '1' : '.')); 265 266 // base framing flags 267 switch (opcode) { 268 case OpCode.TEXT: 269 frame = new TextFrame(); 270 // data validation 271 if (priorDataFrame) { 272 throw new ProtocolException("Unexpected " ~ OpCode.name(opcode) ~ 273 " frame, was expecting CONTINUATION"); 274 } 275 break; 276 case OpCode.BINARY: 277 frame = new BinaryFrame(); 278 // data validation 279 if (priorDataFrame) { 280 throw new ProtocolException("Unexpected " ~ OpCode.name(opcode) ~ 281 " frame, was expecting CONTINUATION"); 282 } 283 break; 284 case OpCode.CONTINUATION: 285 frame = new ContinuationFrame(); 286 // continuation validation 287 if (!priorDataFrame) { 288 throw new ProtocolException("CONTINUATION frame without prior !FIN"); 289 } 290 // Be careful to use the original opcode 291 break; 292 case OpCode.CLOSE: 293 frame = new CloseFrame(); 294 // control frame validation 295 if (!fin) { 296 throw new ProtocolException("Fragmented Close WebSocketFrame [" ~ 297 OpCode.name(opcode) ~ "]"); 298 } 299 break; 300 case OpCode.PING: 301 frame = new PingFrame(); 302 // control frame validation 303 if (!fin) { 304 throw new ProtocolException("Fragmented Ping WebSocketFrame [" ~ 305 OpCode.name(opcode) ~ "]"); 306 } 307 break; 308 case OpCode.PONG: 309 frame = new PongFrame(); 310 // control frame validation 311 if (!fin) { 312 throw new ProtocolException("Fragmented Pong WebSocketFrame [" ~ 313 OpCode.name(opcode) ~ "]"); 314 } 315 break; 316 317 default: break; 318 } 319 320 frame.setFin(fin); 321 322 // Are any flags set? 323 if ((b & 0x70) != 0) { 324 /* 325 * RFC 6455 Section 5.2 326 * 327 * MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. If a nonzero value is received and none of the 328 * negotiated extensions defines the meaning of such a nonzero value, the receiving endpoint MUST _Fail the WebSocket Connection_. 329 */ 330 if ((b & 0x40) != 0) { 331 if (isRsv1InUse()) 332 frame.setRsv1(true); 333 else { 334 string err = "RSV1 not allowed to be set"; 335 version(HUNT_DEBUG) { 336 tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer)); 337 } 338 throw new ProtocolException(err); 339 } 340 } 341 if ((b & 0x20) != 0) { 342 if (isRsv2InUse()) 343 frame.setRsv2(true); 344 else { 345 string err = "RSV2 not allowed to be set"; 346 version(HUNT_DEBUG) { 347 tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer)); 348 } 349 throw new ProtocolException(err); 350 } 351 } 352 if ((b & 0x10) != 0) { 353 if (isRsv3InUse()) 354 frame.setRsv3(true); 355 else { 356 string err = "RSV3 not allowed to be set"; 357 version(HUNT_DEBUG) { 358 tracef(err ~ ": Remaining buffer: %s", BufferUtils.toDetailString(buffer)); 359 } 360 throw new ProtocolException(err); 361 } 362 } 363 } 364 365 state = State.PAYLOAD_LEN; 366 break; 367 } 368 369 case State.PAYLOAD_LEN: { 370 byte b = buffer.get(); 371 frame.setMasked((b & 0x80) != 0); 372 payloadLength = cast(byte) (0x7F & b); 373 374 if (payloadLength == 127) // 0x7F 375 { 376 // length 8 bytes (extended payload length) 377 payloadLength = 0; 378 state = State.PAYLOAD_LEN_BYTES; 379 cursor = 8; 380 break; // continue onto next state 381 } else if (payloadLength == 126) // 0x7E 382 { 383 // length 2 bytes (extended payload length) 384 payloadLength = 0; 385 state = State.PAYLOAD_LEN_BYTES; 386 cursor = 2; 387 break; // continue onto next state 388 } 389 390 assertSanePayloadLength(payloadLength); 391 if (frame.isMasked()) { 392 state = State.MASK; 393 } else { 394 // special case for empty payloads (no more bytes left in buffer) 395 if (payloadLength == 0) { 396 state = State.START; 397 return true; 398 } 399 400 maskProcessor.reset(frame); 401 state = State.PAYLOAD; 402 } 403 404 break; 405 } 406 407 case State.PAYLOAD_LEN_BYTES: { 408 byte b = buffer.get(); 409 --cursor; 410 payloadLength |= (b & 0xFF) << (8 * cursor); 411 if (cursor == 0) { 412 assertSanePayloadLength(payloadLength); 413 if (frame.isMasked()) { 414 state = State.MASK; 415 } else { 416 // special case for empty payloads (no more bytes left in buffer) 417 if (payloadLength == 0) { 418 state = State.START; 419 return true; 420 } 421 422 maskProcessor.reset(frame); 423 state = State.PAYLOAD; 424 } 425 } 426 break; 427 } 428 429 case State.MASK: { 430 byte[] m = new byte[4]; 431 frame.setMask(m); 432 if (buffer.remaining() >= 4) { 433 buffer.get(m, 0, 4); 434 // special case for empty payloads (no more bytes left in buffer) 435 if (payloadLength == 0) { 436 state = State.START; 437 return true; 438 } 439 440 maskProcessor.reset(frame); 441 state = State.PAYLOAD; 442 } else { 443 state = State.MASK_BYTES; 444 cursor = 4; 445 } 446 break; 447 } 448 449 case State.MASK_BYTES: { 450 byte b = buffer.get(); 451 frame.getMask()[4 - cursor] = b; 452 --cursor; 453 if (cursor == 0) { 454 // special case for empty payloads (no more bytes left in buffer) 455 if (payloadLength == 0) { 456 state = State.START; 457 return true; 458 } 459 460 maskProcessor.reset(frame); 461 state = State.PAYLOAD; 462 } 463 break; 464 } 465 466 case State.PAYLOAD: { 467 frame.assertValid(); 468 if (parsePayload(buffer)) { 469 // special check for close 470 if (frame.getOpCode() == OpCode.CLOSE) { 471 // TODO: yuck. Don't create an object to do validation checks! 472 new CloseInfo(frame); 473 } 474 state = State.START; 475 // we have a frame! 476 return true; 477 } 478 break; 479 } 480 481 default: break; 482 } 483 } 484 485 return false; 486 } 487 488 /** 489 * Implementation specific parsing of a payload 490 * 491 * @param buffer the payload buffer 492 * @return true if payload is done reading, false if incomplete 493 */ 494 private bool parsePayload(ByteBuffer buffer) { 495 if (payloadLength == 0) { 496 return true; 497 } 498 499 if (buffer.hasRemaining()) { 500 // Create a small window of the incoming buffer to work with. 501 // this should only show the payload itself, and not any more 502 // bytes that could belong to the start of the next frame. 503 int bytesSoFar = payload is null ? 0 : payload.position(); 504 int bytesExpected = payloadLength - bytesSoFar; 505 int bytesAvailable = buffer.remaining(); 506 int windowBytes = std.algorithm.min(bytesAvailable, bytesExpected); 507 int limit = buffer.limit(); 508 buffer.limit(buffer.position() + windowBytes); 509 ByteBuffer window = buffer.slice(); 510 buffer.limit(limit); 511 buffer.position(buffer.position() + window.remaining()); 512 513 maskProcessor.process(window); 514 515 version(HUNT_HTTP_DEBUG) { 516 tracef("%s Window(unmarked): %s", policy.getBehavior(), BufferUtils.toDetailString(window)); 517 } 518 519 if (window.remaining() == payloadLength) { 520 // We have the whole content, no need to copy. 521 frame.setPayload(window); 522 return true; 523 } else { 524 if (payload is null) { 525 payload = BufferUtils.allocate(payloadLength); 526 BufferUtils.clearToFill(payload); 527 } 528 // Copy the payload. 529 payload.put(window); 530 531 if (payload.position() == payloadLength) { 532 BufferUtils.flipToFlush(payload, 0); 533 frame.setPayload(payload); 534 return true; 535 } 536 } 537 } 538 return false; 539 } 540 541 void setIncomingFramesHandler(IncomingFrames incoming) { 542 this.incomingFramesHandler = incoming; 543 } 544 545 override 546 string toString() { 547 StringBuilder builder = new StringBuilder(); 548 builder.append("Parser@").append(toHash().to!string(16)); 549 builder.append("["); 550 if (incomingFramesHandler is null) { 551 builder.append("NO_HANDLER"); 552 } else { 553 builder.append(typeid(incomingFramesHandler).name); 554 } 555 builder.append(",s=").append(state.to!string()); 556 builder.append(",c=").append(cursor.to!string()); 557 builder.append(",len=").append(payloadLength); 558 builder.append(",f=").append(frame.toString()); 559 // builder.append(",p=").append(policy); 560 builder.append("]"); 561 return builder.toString(); 562 } 563 }