1 module hunt.http.codec.http.stream.Http2Session; 2 3 import hunt.http.codec.http.stream.CloseState; 4 import hunt.http.codec.http.stream.FlowControlStrategy; 5 import hunt.http.codec.http.stream.Http2Flusher; 6 import hunt.http.codec.http.stream.Http2Stream; 7 import hunt.http.codec.http.stream.Session; 8 import hunt.http.codec.http.stream.SessionSPI; 9 import hunt.http.codec.http.stream.Stream; 10 import hunt.http.codec.http.stream.StreamSPI; 11 12 import hunt.http.codec.http.decode.Parser; 13 import hunt.http.codec.http.encode.Http2Generator; 14 import hunt.http.codec.http.frame; 15 16 import hunt.container; 17 18 import hunt.datetime; 19 import hunt.lang.common; 20 import hunt.lang.exception; 21 import hunt.util.functional; 22 23 import hunt.util.concurrent.atomic; 24 import hunt.util.concurrent.CountingCallback; 25 import hunt.util.concurrent.Promise; 26 import hunt.util.concurrent.Scheduler; 27 28 import hunt.net.Session; 29 30 import hunt.logging; 31 32 // import core.exception; 33 34 import std.algorithm; 35 import std.conv; 36 import std.datetime; 37 // import std.exception; 38 import std.format; 39 import std.range; 40 import std.typecons; 41 42 alias TcpSession = hunt.net.Session.Session; 43 alias StreamSession = hunt.http.codec.http.stream.Session.Session; 44 45 46 /** 47 */ 48 abstract class Http2Session : SessionSPI, Parser.Listener { 49 50 // = new ConcurrentHashMap<>(); 51 // private AtomicInteger streamIds = new AtomicInteger(); 52 // private AtomicInteger lastStreamId = new AtomicInteger(); 53 // private AtomicInteger localStreamCount = new AtomicInteger(); 54 // private AtomicBiInteger remoteStreamCount = new AtomicBiInteger(); 55 // private AtomicInteger sendWindow = new AtomicInteger(); 56 // private AtomicInteger recvWindow = new AtomicInteger(); 57 // private AtomicReference<CloseState> closed = new AtomicReference<>(CloseState.NOT_CLOSED); 58 // private AtomicLong bytesWritten = new AtomicLong(); 59 60 private StreamSPI[int] streams; 61 private int streamIds ; 62 private int lastStreamId ; 63 private int localStreamCount ; 64 private long remoteStreamCount ; 65 private int sendWindow ; 66 private int recvWindow ; 67 private CloseState closed = CloseState.NOT_CLOSED; 68 private long bytesWritten; 69 70 private Scheduler scheduler; 71 private TcpSession endPoint; 72 private Http2Generator generator; 73 private StreamSession.Listener listener; 74 private FlowControlStrategy flowControl; 75 private Http2Flusher flusher; 76 private int maxLocalStreams; 77 private int maxRemoteStreams; 78 private long streamIdleTimeout; 79 private int initialSessionRecvWindow; 80 private bool pushEnabled; 81 private long idleTime; 82 83 alias convertToMillisecond = convert!(TimeUnit.HectoNanosecond, TimeUnit.Millisecond); 84 85 this(Scheduler scheduler, TcpSession endPoint, Http2Generator generator, 86 StreamSession.Listener listener, FlowControlStrategy flowControl, 87 int initialStreamId, int streamIdleTimeout) { 88 this.scheduler = scheduler; 89 this.endPoint = endPoint; 90 this.generator = generator; 91 this.listener = listener; 92 this.flowControl = flowControl; 93 this.flusher = new Http2Flusher(this); 94 this.maxLocalStreams = -1; 95 this.maxRemoteStreams = -1; 96 this.streamIds = (initialStreamId); 97 this.streamIdleTimeout = streamIdleTimeout > 0 ? streamIdleTimeout : endPoint.getMaxIdleTimeout(); 98 this.sendWindow = (FlowControlStrategy.DEFAULT_WINDOW_SIZE); 99 this.recvWindow = (FlowControlStrategy.DEFAULT_WINDOW_SIZE); 100 this.pushEnabled = true; // SPEC: by default, push is enabled. 101 this.idleTime = convertToMillisecond(Clock.currStdTime); //Millisecond100Clock.currentTimeMillis(); 102 } 103 104 FlowControlStrategy getFlowControlStrategy() { 105 return flowControl; 106 } 107 108 int getMaxLocalStreams() { 109 return maxLocalStreams; 110 } 111 112 void setMaxLocalStreams(int maxLocalStreams) { 113 this.maxLocalStreams = maxLocalStreams; 114 } 115 116 int getMaxRemoteStreams() { 117 return maxRemoteStreams; 118 } 119 120 void setMaxRemoteStreams(int maxRemoteStreams) { 121 this.maxRemoteStreams = maxRemoteStreams; 122 } 123 124 long getStreamIdleTimeout() { 125 return streamIdleTimeout; 126 } 127 128 void setStreamIdleTimeout(long streamIdleTimeout) { 129 this.streamIdleTimeout = streamIdleTimeout; 130 } 131 132 int getInitialSessionRecvWindow() { 133 return initialSessionRecvWindow; 134 } 135 136 void setInitialSessionRecvWindow(int initialSessionRecvWindow) { 137 this.initialSessionRecvWindow = initialSessionRecvWindow; 138 } 139 140 TcpSession getEndPoint() { 141 return endPoint; 142 } 143 144 Http2Generator getGenerator() { 145 return generator; 146 } 147 148 override 149 long getBytesWritten() { 150 return bytesWritten; 151 } 152 153 override 154 void onData(DataFrame frame) { 155 version(HUNT_DEBUG) { 156 tracef("Received %s", frame.toString()); 157 } 158 int streamId = frame.getStreamId(); 159 StreamSPI stream = cast(StreamSPI) getStream(streamId); 160 161 // SPEC: the session window must be updated even if the stream is null. 162 // The flow control length includes the padding bytes. 163 int flowControlLength = frame.remaining() + frame.padding(); 164 flowControl.onDataReceived(this, stream, flowControlLength); 165 166 class DataCallback : NoopCallback 167 { 168 override 169 void succeeded() { 170 complete(); 171 } 172 173 override 174 void failed(Exception x) { 175 // Consume also in case of failures, to free the 176 // session flow control window for other streams. 177 complete(); 178 } 179 180 private void complete() { 181 // notIdle(); 182 // stream.notIdle(); 183 flowControl.onDataConsumed(this.outer, stream, flowControlLength); 184 } 185 } 186 187 if (stream !is null) { 188 if (getRecvWindow() < 0) { 189 close(cast(int)ErrorCode.FLOW_CONTROL_ERROR, "session_window_exceeded", Callback.NOOP); 190 } else { 191 stream.process(frame, new DataCallback() ); 192 } 193 } else { 194 version(HUNT_DEBUG) { 195 tracef("Ignoring %s, stream #%s not found", frame.toString(), streamId); 196 } 197 // We must enlarge the session flow control window, 198 // otherwise other requests will be stalled. 199 flowControl.onDataConsumed(this, null, flowControlLength); 200 } 201 } 202 203 override 204 abstract void onHeaders(HeadersFrame frame); 205 206 override 207 void onPriority(PriorityFrame frame) { 208 version(HUNT_DEBUG) { 209 tracef("Received %s", frame.toString()); 210 } 211 } 212 213 override 214 void onReset(ResetFrame frame) { 215 version(HUNT_DEBUG) { 216 tracef("Received %s", frame.toString()); 217 } 218 StreamSPI stream = cast(StreamSPI)getStream(frame.getStreamId()); 219 if (stream !is null) 220 stream.process(frame, new ResetCallback()); 221 else 222 notifyReset(this, frame); 223 } 224 225 override 226 void onSettings(SettingsFrame frame) { 227 // SPEC: SETTINGS frame MUST be replied. 228 onSettings(frame, true); 229 } 230 231 void onSettings(SettingsFrame frame, bool reply) { 232 version(HUNT_DEBUG) { 233 tracef("Received %s", frame.toString()); 234 } 235 if (frame.isReply()) 236 return; 237 238 // Iterate over all settings 239 //for (Map.Entry!(int, int) entry : frame.getSettings().entrySet()) 240 foreach(int key, int value; frame.getSettings()) 241 { 242 // int key = entry.getKey(); 243 // int value = entry.getValue(); 244 switch (key) { 245 case SettingsFrame.HEADER_TABLE_SIZE: { 246 version(HUNT_DEBUG) { 247 tracef("Update HPACK header table size to %s for %s", value, this.toString()); 248 } 249 generator.setHeaderTableSize(value); 250 break; 251 } 252 case SettingsFrame.ENABLE_PUSH: { 253 // SPEC: check the value is sane. 254 if (value != 0 && value != 1) { 255 onConnectionFailure(cast(int)ErrorCode.PROTOCOL_ERROR, "invalid_settings_enable_push"); 256 return; 257 } 258 pushEnabled = value == 1; 259 version(HUNT_DEBUG) { 260 tracef("%s push for %s", pushEnabled ? "Enable" : "Disable", this.toString()); 261 } 262 break; 263 } 264 case SettingsFrame.MAX_CONCURRENT_STREAMS: { 265 maxLocalStreams = value; 266 version(HUNT_DEBUG) { 267 tracef("Update max local concurrent streams to %s for %s", maxLocalStreams, this.toString()); 268 } 269 break; 270 } 271 case SettingsFrame.INITIAL_WINDOW_SIZE: { 272 version(HUNT_DEBUG) { 273 tracef("Update initial window size to %s for %s", value, this.toString()); 274 } 275 flowControl.updateInitialStreamWindow(this, value, false); 276 break; 277 } 278 case SettingsFrame.MAX_FRAME_SIZE: { 279 version(HUNT_DEBUG) { 280 tracef("Update max frame size to %s for %s", value, this.toString()); 281 } 282 // SPEC: check the max frame size is sane. 283 if (value < Frame.DEFAULT_MAX_LENGTH || value > Frame.MAX_MAX_LENGTH) { 284 onConnectionFailure(cast(int)ErrorCode.PROTOCOL_ERROR, "invalid_settings_max_frame_size"); 285 return; 286 } 287 generator.setMaxFrameSize(value); 288 break; 289 } 290 case SettingsFrame.MAX_HEADER_LIST_SIZE: { 291 version(HUNT_DEBUG) { 292 tracef("Update max header list size to %s for %s", value, this.toString()); 293 } 294 generator.setMaxHeaderListSize(value); 295 break; 296 } 297 default: { 298 version(HUNT_DEBUG) { 299 tracef("Unknown setting %s:%s for %s", key, value, this.toString()); 300 } 301 break; 302 } 303 } 304 } 305 notifySettings(this, frame); 306 307 if (reply) { 308 SettingsFrame replyFrame = new SettingsFrame(Collections.emptyMap!(int, int)(), true); 309 settings(replyFrame, Callback.NOOP); 310 } 311 } 312 313 override 314 void onPing(PingFrame frame) { 315 version(HUNT_DEBUG) { 316 tracef("Received %s", frame.toString()); 317 } 318 if (frame.isReply()) { 319 info("The session %s received ping reply", endPoint.getSessionId()); 320 notifyPing(this, frame); 321 } else { 322 PingFrame reply = new PingFrame(frame.getPayload(), true); 323 control(null, Callback.NOOP, reply); 324 } 325 } 326 327 /** 328 * This method is called when receiving a GO_AWAY from the other peer. 329 * We check the close state to act appropriately: 330 * <ul> 331 * <li>NOT_CLOSED: we move to REMOTELY_CLOSED and queue a disconnect, so 332 * that the content of the queue is written, and then the connection 333 * closed. We notify the application after being terminated. 334 * See <code>Http2Session.ControlEntry#succeeded()</code></li> 335 * <li>In all other cases, we do nothing since other methods are already 336 * performing their actions.</li> 337 * </ul> 338 * 339 * @param frame the GO_AWAY frame that has been received. 340 * @see #close(int, string, Callback) 341 * @see #onShutdown() 342 * @see #onIdleTimeout() 343 */ 344 override 345 void onGoAway(GoAwayFrame frame) { 346 version(HUNT_DEBUG) { 347 tracef("Received %s", frame.toString()); 348 } 349 while (true) { 350 CloseState current = closed; 351 switch (current) { 352 case CloseState.NOT_CLOSED: { 353 if (closed == current) { 354 closed = CloseState.REMOTELY_CLOSED; 355 // We received a GO_AWAY, so try to write 356 // what's in the queue and then disconnect. 357 notifyClose(this, frame, new DisconnectCallback()); 358 return; 359 } 360 break; 361 } 362 default: { 363 version(HUNT_DEBUG) { 364 tracef("Ignored %s, already closed", frame.toString()); 365 } 366 return; 367 } 368 } 369 } 370 } 371 372 override 373 void onWindowUpdate(WindowUpdateFrame frame) { 374 version(HUNT_DEBUG) { 375 tracef("Received %s", frame.toString()); 376 } 377 int streamId = frame.getStreamId(); 378 if (streamId > 0) { 379 StreamSPI stream = cast(StreamSPI)getStream(streamId); 380 if (stream !is null) { 381 stream.process(frame, Callback.NOOP); 382 onWindowUpdate(stream, frame); 383 } 384 } else { 385 onWindowUpdate(null, frame); 386 } 387 } 388 389 override 390 void onConnectionFailure(int error, string reason) { 391 notifyFailure(this, new IOException(format("%d/%s", error, reason)), new CloseCallback(error, reason)); 392 } 393 394 override 395 void newStream(HeadersFrame frame, Promise!Stream promise, Stream.Listener listener) { 396 // Synchronization is necessary to atomically create 397 // the stream id and enqueue the frame to be sent. 398 bool queued; 399 synchronized (this) { 400 int streamId = frame.getStreamId(); 401 if (streamId <= 0) { 402 streamId = streamIds; streamIds += 2; // streamIds.getAndAdd(2); 403 PriorityFrame priority = frame.getPriority(); 404 priority = priority is null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), 405 priority.getWeight(), priority.isExclusive()); 406 frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream()); 407 } 408 StreamSPI stream = createLocalStream(streamId, promise); 409 if (stream is null) 410 return; 411 stream.setListener(listener); 412 413 ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback!Stream(promise, stream)); 414 queued = flusher.append(entry); 415 } 416 // Iterate outside the synchronized block. 417 if (queued) 418 flusher.iterate(); 419 } 420 421 override 422 int priority(PriorityFrame frame, Callback callback) { 423 int streamId = frame.getStreamId(); 424 StreamSPI stream = streams[streamId]; 425 if (stream is null) { 426 streamId = streamIds; streamIds += 2; //.getAndAdd(2); 427 frame = new PriorityFrame(streamId, frame.getParentStreamId(), 428 frame.getWeight(), frame.isExclusive()); 429 } 430 control(stream, callback, frame); 431 return streamId; 432 } 433 434 override 435 void push(StreamSPI stream, Promise!Stream promise, PushPromiseFrame frame, Stream.Listener listener) { 436 // Synchronization is necessary to atomically create 437 // the stream id and enqueue the frame to be sent. 438 bool queued; 439 synchronized (this) { 440 int streamId = streamIds; streamIds += 2; //.getAndAdd(2); 441 frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData()); 442 443 StreamSPI pushStream = createLocalStream(streamId, promise); 444 if (pushStream is null) 445 return; 446 pushStream.setListener(listener); 447 448 ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback!Stream(promise, pushStream)); 449 queued = flusher.append(entry); 450 } 451 // Iterate outside the synchronized block. 452 if (queued) 453 flusher.iterate(); 454 } 455 456 override 457 void settings(SettingsFrame frame, Callback callback) { 458 control(null, callback, frame); 459 } 460 461 override 462 void ping(PingFrame frame, Callback callback) { 463 if (frame.isReply()) 464 callback.failed(new IllegalArgumentException("")); 465 else 466 control(null, callback, frame); 467 } 468 469 protected void reset(ResetFrame frame, Callback callback) { 470 control(cast(StreamSPI)getStream(frame.getStreamId()), callback, frame); 471 } 472 473 /** 474 * Invoked internally and by applications to send a GO_AWAY frame to the 475 * other peer. We check the close state to act appropriately: 476 * <ul> 477 * <li>NOT_CLOSED: we move to LOCALLY_CLOSED and queue a GO_AWAY. When the 478 * GO_AWAY has been written, it will only cause the output to be shut 479 * down (not the connection closed), so that the application can still 480 * read frames arriving from the other peer. 481 * Ideally the other peer will notice the GO_AWAY and close the connection. 482 * When that happen, we close the connection from {@link #onShutdown()}. 483 * Otherwise, the idle timeout mechanism will close the connection, see 484 * {@link #onIdleTimeout()}.</li> 485 * <li>In all other cases, we do nothing since other methods are already 486 * performing their actions.</li> 487 * </ul> 488 * 489 * @param error the error code 490 * @param reason the reason 491 * @param callback the callback to invoke when the operation is complete 492 * @see #onGoAway(GoAwayFrame) 493 * @see #onShutdown() 494 * @see #onIdleTimeout() 495 */ 496 override 497 bool close(int error, string reason, Callback callback) { 498 while (true) { 499 CloseState current = closed; 500 switch (current) { 501 case CloseState.NOT_CLOSED: { 502 if (closed == current) { 503 closed = CloseState.LOCALLY_CLOSED; 504 GoAwayFrame frame = newGoAwayFrame(error, reason); 505 control(null, callback, frame); 506 return true; 507 } 508 break; 509 } 510 default: { 511 version(HUNT_DEBUG) 512 tracef("Ignoring close %s/%s, already closed", error, reason); 513 callback.succeeded(); 514 return false; 515 } 516 } 517 } 518 } 519 520 private GoAwayFrame newGoAwayFrame(int error, string reason) { 521 byte[] payload = null; 522 if (!reason.empty) { 523 // Trim the reason to avoid attack vectors. 524 int len = min(reason.length, 32); 525 payload = cast(byte[])reason[0..len].dup; 526 } 527 return new GoAwayFrame(lastStreamId, error, payload); 528 } 529 530 override 531 bool isClosed() { 532 return closed != CloseState.NOT_CLOSED; 533 } 534 535 private void control(StreamSPI stream, Callback callback, Frame frame) { 536 frames(stream, callback, frame, Frame.EMPTY_ARRAY); 537 } 538 539 override 540 void frames(StreamSPI stream, Callback callback, Frame frame, Frame[] frames... ) { 541 // We want to generate as late as possible to allow re-prioritization; 542 // generation will happen while processing the entries. 543 544 // The callback needs to be notified only when the last frame completes. 545 546 int length = cast(int)frames.length; 547 if (length == 0) { 548 onFrame(new ControlEntry(frame, stream, callback), true); 549 } else { 550 callback = new CountingCallback(callback, 1 + length); 551 onFrame(new ControlEntry(frame, stream, callback), false); 552 for (int i = 1; i <= length; ++i) 553 onFrame(new ControlEntry(frames[i - 1], stream, callback), i == length); 554 } 555 } 556 557 override 558 void data(StreamSPI stream, Callback callback, DataFrame frame) { 559 // We want to generate as late as possible to allow re-prioritization. 560 onFrame(new DataEntry(frame, stream, callback), true); 561 } 562 563 private void onFrame(Http2Flusher.Entry entry, bool flush) { 564 version(HUNT_DEBUG) { 565 tracef("%s %s", flush ? "Sending" : "Queueing", entry.frame.toString()); 566 } 567 // Ping frames are prepended to process them as soon as possible. 568 bool queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry); 569 if (queued && flush) { 570 if (entry.stream !is null) { 571 // entry.stream.notIdle(); 572 } 573 flusher.iterate(); 574 } 575 } 576 577 protected StreamSPI createLocalStream(int streamId, Promise!Stream promise) { 578 while (true) { 579 int localCount = localStreamCount; 580 int maxCount = getMaxLocalStreams(); 581 if (maxCount >= 0 && localCount >= maxCount) { 582 promise.failed(new IllegalStateException("Max local stream count " ~ maxCount.to!string() ~ " exceeded")); 583 return null; 584 } 585 if (localStreamCount == localCount) 586 { 587 localStreamCount = localCount + 1; 588 break; 589 } 590 } 591 592 StreamSPI stream = newStream(streamId, true); 593 auto itemPtr = streamId in streams; 594 if (itemPtr is null) { 595 streams[streamId] = stream; 596 // stream.setIdleTimeout(getStreamIdleTimeout()); 597 flowControl.onStreamCreated(stream); 598 version(HUNT_DEBUG) { 599 tracef("Created local %s", stream.toString()); 600 } 601 return stream; 602 } else { 603 promise.failed(new IllegalStateException("Duplicate stream " ~ streamId.to!string())); 604 return null; 605 } 606 } 607 608 protected StreamSPI createRemoteStream(int streamId) { 609 // SPEC: exceeding max concurrent streams is treated as stream error. 610 while (true) { 611 long encoded = remoteStreamCount; 612 int remoteCount = AtomicBiInteger.getHi(encoded); 613 int remoteClosing = AtomicBiInteger.getLo(encoded); 614 int maxCount = getMaxRemoteStreams(); 615 if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount) { 616 reset(new ResetFrame(streamId, cast(int)ErrorCode.REFUSED_STREAM_ERROR), Callback.NOOP); 617 return null; 618 } 619 // if (remoteStreamCount.compareAndSet(encoded, remoteCount + 1, remoteClosing)) 620 if(remoteStreamCount == encoded) 621 { 622 remoteStreamCount = AtomicBiInteger.encode(remoteCount + 1, remoteClosing); 623 break; 624 } 625 } 626 627 StreamSPI stream = newStream(streamId, false); 628 629 // SPEC: duplicate stream is treated as connection error. 630 //streams.putIfAbsent(streamId, stream) 631 auto itemPtr = streamId in streams; 632 if ( itemPtr is null) { 633 streams[streamId] = stream; 634 updateLastStreamId(streamId); 635 // stream.setIdleTimeout(getStreamIdleTimeout()); 636 flowControl.onStreamCreated(stream); 637 version(HUNT_DEBUG) { 638 tracef("Created remote %s", stream.toString()); 639 } 640 return stream; 641 } else { 642 close(cast(int)ErrorCode.PROTOCOL_ERROR, "duplicate_stream", Callback.NOOP); 643 return null; 644 } 645 } 646 647 void updateStreamCount(bool local, int deltaStreams, int deltaClosing) { 648 if (local) 649 localStreamCount += (deltaStreams); 650 else 651 { 652 remoteStreamCount = AtomicBiInteger.encode(remoteStreamCount, deltaStreams, deltaClosing); 653 // remoteStreamCount.add(deltaStreams, deltaClosing); 654 } 655 } 656 657 protected StreamSPI newStream(int streamId, bool local) { 658 return new Http2Stream(scheduler, this, streamId, local); 659 } 660 661 override 662 void removeStream(StreamSPI stream) { 663 bool removed = streams.remove(stream.getId()); 664 if (removed) { 665 onStreamClosed(stream); 666 flowControl.onStreamDestroyed(stream); 667 version(HUNT_DEBUG) { 668 tracef("Removed %s %s", stream.isLocal() ? "local" : "remote", stream); 669 } 670 } 671 } 672 673 override 674 Stream[] getStreams() { 675 return cast(Stream[])(streams.values()); 676 } 677 678 int getStreamCount() { 679 return cast(int)streams.length; 680 } 681 682 override 683 StreamSPI getStream(int streamId) { 684 return streams[streamId]; 685 } 686 687 int getSendWindow() { 688 return sendWindow; 689 } 690 691 int getRecvWindow() { 692 return recvWindow; 693 } 694 695 override 696 int updateSendWindow(int delta) { 697 int old = sendWindow; 698 sendWindow += delta; 699 return old; 700 } 701 702 override 703 int updateRecvWindow(int delta) { 704 int old = recvWindow; 705 recvWindow += delta; 706 return old; 707 } 708 709 override 710 void onWindowUpdate(StreamSPI stream, WindowUpdateFrame frame) { 711 // WindowUpdateFrames arrive concurrently with writes. 712 // Increasing (or reducing) the window size concurrently 713 // with writes requires coordination with the flusher, that 714 // decides how many frames to write depending on the available 715 // window sizes. If the window sizes vary concurrently, the 716 // flusher may take non-optimal or wrong decisions. 717 // Here, we "queue" window updates to the flusher, so it will 718 // be the only component responsible for window updates, for 719 // both increments and reductions. 720 flusher.window(stream, frame); 721 } 722 723 override 724 bool isPushEnabled() { 725 return pushEnabled; 726 } 727 728 /** 729 * A typical close by a remote peer involves a GO_AWAY frame followed by TCP FIN. 730 * This method is invoked when the TCP FIN is received, or when an exception is 731 * thrown while reading, and we check the close state to act appropriately: 732 * <ul> 733 * <li>NOT_CLOSED: means that the remote peer did not send a GO_AWAY (abrupt close) 734 * or there was an exception while reading, and therefore we terminate.</li> 735 * <li>LOCALLY_CLOSED: we have sent the GO_AWAY to the remote peer, which received 736 * it and closed the connection; we queue a disconnect to close the connection 737 * on the local side. 738 * The GO_AWAY just shutdown the output, so we need this step to make sure the 739 * connection is closed. See {@link #close(int, string, Callback)}.</li> 740 * <li>REMOTELY_CLOSED: we received the GO_AWAY, and the TCP FIN afterwards, so we 741 * do nothing since the handling of the GO_AWAY will take care of closing the 742 * connection. See {@link #onGoAway(GoAwayFrame)}.</li> 743 * </ul> 744 * 745 * @see #onGoAway(GoAwayFrame) 746 * @see #close(int, string, Callback) 747 * @see #onIdleTimeout() 748 */ 749 override 750 void onShutdown() { 751 version(HUNT_DEBUG) { 752 tracef("Shutting down %s", this.toString()); 753 } 754 switch (closed) { 755 case CloseState.NOT_CLOSED: { 756 // The other peer did not send a GO_AWAY, no need to be gentle. 757 version(HUNT_DEBUG) { 758 tracef("Abrupt close for %s", this.toString()); 759 } 760 abort(new ClosedChannelException("")); 761 break; 762 } 763 case CloseState.LOCALLY_CLOSED: { 764 // We have closed locally, and only shutdown 765 // the output; now queue a disconnect. 766 control(null, Callback.NOOP, new DisconnectFrame()); 767 break; 768 } 769 case CloseState.REMOTELY_CLOSED: { 770 // Nothing to do, the GO_AWAY frame we 771 // received will close the connection. 772 break; 773 } 774 default: { 775 break; 776 } 777 } 778 } 779 780 /** 781 * This method is invoked when the idle timeout triggers. We check the close state 782 * to act appropriately: 783 * <ul> 784 * <li>NOT_CLOSED: it's a real idle timeout, we just initiate a close, see 785 * {@link #close(int, string, Callback)}.</li> 786 * <li>LOCALLY_CLOSED: we have sent a GO_AWAY and only shutdown the output, but the 787 * other peer did not close the connection so we never received the TCP FIN, and 788 * therefore we terminate.</li> 789 * <li>REMOTELY_CLOSED: the other peer sent us a GO_AWAY, we should have queued a 790 * disconnect, but for some reason it was not processed (for example, queue was 791 * stuck because of TCP congestion), therefore we terminate. 792 * See {@link #onGoAway(GoAwayFrame)}.</li> 793 * </ul> 794 * 795 * @return true if the session should be closed, false otherwise 796 * @see #onGoAway(GoAwayFrame) 797 * @see #close(int, string, Callback) 798 * @see #onShutdown() 799 */ 800 override 801 bool onIdleTimeout() { 802 switch (closed) { 803 case CloseState.NOT_CLOSED: { 804 long elapsed = convertToMillisecond(Clock.currStdTime) - idleTime; 805 version(HUNT_DEBUG) { 806 tracef("HTTP2 session on idle timeout. The elapsed time is %s - %s", elapsed, endPoint.getMaxIdleTimeout()); 807 } 808 return elapsed >= endPoint.getMaxIdleTimeout() && notifyIdleTimeout(this); 809 } 810 case CloseState.LOCALLY_CLOSED: 811 case CloseState.REMOTELY_CLOSED: { 812 abort(new TimeoutException("Idle timeout " ~ endPoint.getMaxIdleTimeout().to!string() ~ " ms")); 813 return false; 814 } 815 default: { 816 return false; 817 } 818 } 819 } 820 821 private void notIdle() { 822 idleTime = convertToMillisecond(Clock.currStdTime); 823 } 824 825 override 826 void onFrame(Frame frame) { 827 onConnectionFailure(cast(int)ErrorCode.PROTOCOL_ERROR, "upgrade"); 828 } 829 830 protected void onStreamOpened(StreamSPI stream) { 831 } 832 833 protected void onStreamClosed(StreamSPI stream) { 834 } 835 836 void disconnect() { 837 version(HUNT_DEBUG) { 838 tracef("Disconnecting %s", this.toString()); 839 } 840 endPoint.close(); 841 } 842 843 private void terminate(Exception cause) { 844 while (true) { 845 CloseState current = closed; 846 switch (current) { 847 case CloseState.NOT_CLOSED: 848 case CloseState.LOCALLY_CLOSED: 849 case CloseState.REMOTELY_CLOSED: { 850 if (closed == current) { 851 closed = CloseState.CLOSED; 852 flusher.terminate(cause); 853 foreach (StreamSPI stream ; streams.byValue()) 854 stream.close(); 855 streams.clear(); 856 disconnect(); 857 return; 858 } 859 break; 860 } 861 default: { 862 return; 863 } 864 } 865 } 866 } 867 868 void abort(Exception failure) { 869 notifyFailure(this, failure, new TerminateCallback(failure)); 870 } 871 872 bool isDisconnected() { 873 return !endPoint.isOpen(); 874 } 875 876 private void updateLastStreamId(int streamId) { 877 // Atomics.updateMax(lastStreamId, streamId); 878 if(streamId>lastStreamId) lastStreamId = streamId; 879 } 880 881 protected Stream.Listener notifyNewStream(Stream stream, HeadersFrame frame) { 882 try { 883 return listener.onNewStream(stream, frame); 884 } catch (Exception x) { 885 info("Failure while notifying listener " ~ listener.toString(), x); 886 return null; 887 } 888 } 889 890 protected void notifySettings(StreamSession session, SettingsFrame frame) { 891 try { 892 listener.onSettings(session, frame); 893 } catch (Exception x) { 894 info("Failure while notifying listener " ~ listener.toString(), x); 895 } 896 } 897 898 protected void notifyPing(StreamSession session, PingFrame frame) { 899 try { 900 listener.onPing(session, frame); 901 } catch (Exception x) { 902 info("Failure while notifying listener " ~ listener.toString(), x); 903 } 904 } 905 906 protected void notifyReset(StreamSession session, ResetFrame frame) { 907 try { 908 listener.onReset(session, frame); 909 } catch (Exception x) { 910 info("Failure while notifying listener " ~ listener.toString(), x); 911 } 912 } 913 914 protected void notifyClose(StreamSession session, GoAwayFrame frame, Callback callback) { 915 try { 916 listener.onClose(session, frame, callback); 917 } catch (Exception x) { 918 info("Failure while notifying listener " ~ listener.toString(), x); 919 } 920 } 921 922 protected bool notifyIdleTimeout(StreamSession session) { 923 try { 924 return listener.onIdleTimeout(session); 925 } catch (Exception x) { 926 info("Failure while notifying listener " ~ listener.toString(), x); 927 return true; 928 } 929 } 930 931 protected void notifyFailure(StreamSession session, Exception failure, Callback callback) { 932 try { 933 listener.onFailure(session, failure, callback); 934 } catch (Exception x) { 935 info("Failure while notifying listener " ~ listener.toString(), x); 936 } 937 } 938 939 protected void notifyHeaders(StreamSPI stream, HeadersFrame frame) { 940 // Optional.ofNullable(stream.getListener()).ifPresent(listener -> { 941 // try { 942 // listener.onHeaders(stream, frame); 943 // } catch (Exception x) { 944 // info("Failure while notifying listener " ~ listener.toString(), x); 945 // } 946 // }); 947 auto listener = stream.getListener(); 948 if(listener !is null) 949 { 950 try { 951 listener.onHeaders(stream, frame); 952 } catch (Exception x) { 953 info("Failure while notifying listener " ~ listener.toString(), x); 954 } 955 } 956 } 957 958 override 959 string toString() { 960 return format("%s@%s{l:%s <-> r:%s,sendWindow=%s,recvWindow=%s,streams=%d,%s}", 961 typeof(this).stringof, 962 toHash(), 963 getEndPoint().getLocalAddress(), 964 getEndPoint().getRemoteAddress(), 965 sendWindow, 966 recvWindow, 967 streams.length, 968 closed); 969 } 970 971 private class ControlEntry : Http2Flusher.Entry { 972 private int bytes; 973 974 private this(Frame frame, StreamSPI stream, Callback callback) { 975 super(frame, stream, callback); 976 } 977 978 override 979 protected bool generate(Queue!ByteBuffer buffers) { 980 List!(ByteBuffer) controlFrame = generator.control(frame); 981 bytes = cast(int) BufferUtils.remaining(controlFrame); 982 buffers.addAll(controlFrame); 983 version(HUNT_DEBUG) { 984 tracef("Generated %s", frame.toString()); 985 } 986 beforeSend(); 987 return true; 988 } 989 990 /** 991 * <p>Performs actions just before writing the frame to the network.</p> 992 * <p>Some frame, when sent over the network, causes the receiver 993 * to react and send back frames that may be processed by the original 994 * sender *before* {@link #succeeded()} is called. 995 * <p>If the action to perform updates some state, this update may 996 * not be seen by the received frames and cause errors.</p> 997 * <p>For example, suppose the action updates the stream window to a 998 * larger value; the sender sends the frame; the receiver is now entitled 999 * to send back larger data; when the data is received by the original 1000 * sender, the action may have not been performed yet, causing the larger 1001 * data to be rejected, when it should have been accepted.</p> 1002 */ 1003 private void beforeSend() { 1004 switch (frame.getType()) { 1005 case FrameType.HEADERS: { 1006 HeadersFrame headersFrame = cast(HeadersFrame) frame; 1007 stream.updateClose(headersFrame.isEndStream(), CloseStateEvent.BEFORE_SEND); 1008 break; 1009 } 1010 case FrameType.SETTINGS: { 1011 SettingsFrame settingsFrame = cast(SettingsFrame) frame; 1012 Map!(int, int) settings = settingsFrame.getSettings(); 1013 if (settings.containsKey(SettingsFrame.INITIAL_WINDOW_SIZE)) 1014 { 1015 int initialWindow = settings.get(SettingsFrame.INITIAL_WINDOW_SIZE); 1016 flowControl.updateInitialStreamWindow(this.outer, initialWindow, true); 1017 } 1018 break; 1019 } 1020 default: { 1021 break; 1022 } 1023 } 1024 } 1025 1026 override 1027 void succeeded() { 1028 bytesWritten += (bytes); 1029 switch (frame.getType()) { 1030 case FrameType.HEADERS: { 1031 onStreamOpened(stream); 1032 HeadersFrame headersFrame = cast(HeadersFrame) frame; 1033 if (stream.updateClose(headersFrame.isEndStream(), CloseStateEvent.AFTER_SEND)) 1034 removeStream(stream); 1035 break; 1036 } 1037 case FrameType.RST_STREAM: { 1038 if (stream !is null) { 1039 stream.close(); 1040 removeStream(stream); 1041 } 1042 break; 1043 } 1044 case FrameType.PUSH_PROMISE: { 1045 // Pushed streams are implicitly remotely closed. 1046 // They are closed when sending an end-stream DATA frame. 1047 stream.updateClose(true, CloseStateEvent.RECEIVED); 1048 break; 1049 } 1050 case FrameType.GO_AWAY: { 1051 // We just sent a GO_AWAY, only shutdown the 1052 // output without closing yet, to allow reads. 1053 getEndPoint().shutdownOutput(); 1054 break; 1055 } 1056 case FrameType.WINDOW_UPDATE: { 1057 flowControl.windowUpdate(this.outer, stream, cast(WindowUpdateFrame) frame); 1058 break; 1059 } 1060 case FrameType.DISCONNECT: { 1061 terminate(new ClosedChannelException("")); 1062 break; 1063 } 1064 default: { 1065 break; 1066 } 1067 } 1068 super.succeeded(); 1069 } 1070 1071 override 1072 void failed(Exception x) { 1073 if (frame.getType() == FrameType.DISCONNECT) { 1074 terminate(new ClosedChannelException("")); 1075 } 1076 super.failed(x); 1077 } 1078 } 1079 1080 private class DataEntry :Http2Flusher.Entry { 1081 private int bytes; 1082 private int _dataRemaining; 1083 private int dataWritten; 1084 1085 private this(DataFrame frame, StreamSPI stream, Callback callback) { 1086 super(frame, stream, callback); 1087 // We don't do any padding, so the flow control length is 1088 // always the data remaining. This simplifies the handling 1089 // of data frames that cannot be completely written due to 1090 // the flow control window exhausting, since in that case 1091 // we would have to count the padding only once. 1092 _dataRemaining = frame.remaining(); 1093 } 1094 1095 override 1096 int dataRemaining() { 1097 return _dataRemaining; 1098 } 1099 1100 override 1101 protected bool generate(Queue!ByteBuffer buffers) { 1102 int remaining = dataRemaining(); 1103 1104 int sessionSendWindow = getSendWindow(); 1105 int streamSendWindow = stream.updateSendWindow(0); 1106 int window = min(streamSendWindow, sessionSendWindow); 1107 if (window <= 0 && remaining > 0) 1108 return false; 1109 1110 int length = min(remaining, window); 1111 1112 // Only one DATA frame is generated. 1113 DataFrame dataFrame = cast(DataFrame) frame; 1114 Tuple!(int, List!(ByteBuffer)) pair = generator.data(dataFrame, length); 1115 bytes = pair[0]; 1116 buffers.addAll(pair[1]); 1117 int written = bytes - Frame.HEADER_LENGTH; 1118 version(HUNT_DEBUG) { 1119 tracef("Generated %s, length/window/data=%s/%s/%s", dataFrame, written, window, remaining); 1120 } 1121 this.dataWritten = written; 1122 this._dataRemaining -= written; 1123 1124 flowControl.onDataSending(stream, written); 1125 stream.updateClose(dataFrame.isEndStream(), CloseStateEvent.BEFORE_SEND); 1126 1127 return true; 1128 } 1129 1130 override 1131 void succeeded() { 1132 bytesWritten += bytes; 1133 flowControl.onDataSent(stream, dataWritten); 1134 1135 // Do we have more to send ? 1136 DataFrame dataFrame = cast(DataFrame) frame; 1137 if (_dataRemaining == 0) { 1138 // Only now we can update the close state 1139 // and eventually remove the stream. 1140 if (stream.updateClose(dataFrame.isEndStream(), CloseStateEvent.AFTER_SEND)) 1141 removeStream(stream); 1142 super.succeeded(); 1143 } 1144 } 1145 } 1146 1147 private static class PromiseCallback(C) : NoopCallback { 1148 private Promise!C promise; 1149 private C value; 1150 1151 private this(Promise!C promise, C value) { 1152 this.promise = promise; 1153 this.value = value; 1154 } 1155 1156 override 1157 void succeeded() { 1158 promise.succeeded(value); 1159 } 1160 1161 override 1162 void failed(Exception x) { 1163 promise.failed(x); 1164 } 1165 } 1166 1167 private class ResetCallback : NoopCallback { 1168 override 1169 void succeeded() { 1170 complete(); 1171 } 1172 1173 override 1174 void failed(Exception x) { 1175 complete(); 1176 } 1177 1178 private void complete() { 1179 flusher.iterate(); 1180 } 1181 } 1182 1183 private class CloseCallback : NoopCallback { 1184 private int error; 1185 private string reason; 1186 1187 private this(int error, string reason) { 1188 this.error = error; 1189 this.reason = reason; 1190 } 1191 1192 override 1193 void succeeded() { 1194 complete(); 1195 } 1196 1197 override 1198 void failed(Exception x) { 1199 complete(); 1200 } 1201 1202 private void complete() { 1203 close(error, reason, Callback.NOOP); 1204 } 1205 } 1206 1207 private class DisconnectCallback : NoopCallback { 1208 override 1209 void succeeded() { 1210 complete(); 1211 } 1212 1213 override 1214 void failed(Exception x) { 1215 complete(); 1216 } 1217 1218 private void complete() { 1219 frames(null, Callback.NOOP, newGoAwayFrame(cast(int)ErrorCode.NO_ERROR, null), new DisconnectFrame()); 1220 } 1221 } 1222 1223 private class TerminateCallback : NoopCallback { 1224 private Exception failure; 1225 1226 private this(Exception failure) { 1227 this.failure = failure; 1228 } 1229 1230 override 1231 void succeeded() { 1232 complete(); 1233 } 1234 1235 override 1236 void failed(Exception x) { 1237 // failure.addSuppressed(x); 1238 Throwable.chainTogether(failure, x); 1239 complete(); 1240 } 1241 1242 private void complete() { 1243 terminate(failure); 1244 } 1245 } 1246 }