1 module hunt.http.codec.http.stream.Http2Session; 2 3 import hunt.http.codec.http.stream.CloseState; 4 import hunt.http.codec.http.stream.FlowControlStrategy; 5 import hunt.http.codec.http.stream.Http2Flusher; 6 import hunt.http.codec.http.stream.Http2Stream; 7 import hunt.http.codec.http.stream.Session; 8 import hunt.http.codec.http.stream.SessionSPI; 9 import hunt.http.codec.http.stream.Stream; 10 import hunt.http.codec.http.stream.StreamSPI; 11 12 import hunt.http.codec.http.decode.Parser; 13 import hunt.http.codec.http.encode.Http2Generator; 14 import hunt.http.codec.http.frame; 15 16 import hunt.collection; 17 18 import hunt.util.DateTime; 19 import hunt.util.Common; 20 import hunt.Exceptions; 21 import hunt.util.Common; 22 23 import hunt.concurrency.atomic; 24 import hunt.concurrency.CountingCallback; 25 import hunt.concurrency.Promise; 26 import hunt.concurrency.Scheduler; 27 import core.atomic : atomicOp, atomicLoad; 28 import hunt.net.Connection; 29 30 import hunt.logging; 31 32 // import core.exception; 33 34 import core.time; 35 import std.algorithm; 36 import std.conv; 37 import std.datetime; 38 import std.format; 39 import std.range; 40 import std.typecons; 41 42 alias StreamSession = hunt.http.codec.http.stream.Session.Session; 43 44 45 /** 46 */ 47 abstract class Http2Session : SessionSPI, Parser.Listener { 48 49 // = new ConcurrentHashMap<>(); 50 // private AtomicInteger streamIds = new AtomicInteger(); 51 // private AtomicInteger lastStreamId = new AtomicInteger(); 52 // private AtomicInteger localStreamCount = new AtomicInteger(); 53 // private AtomicBiInteger remoteStreamCount = new AtomicBiInteger(); 54 // private AtomicInteger sendWindow = new AtomicInteger(); 55 // private AtomicInteger recvWindow = new AtomicInteger(); 56 // private AtomicReference<CloseState> closed = new AtomicReference<>(CloseState.NOT_CLOSED); 57 // private AtomicLong bytesWritten = new AtomicLong(); 58 59 private StreamSPI[int] streams; 60 private int streamIds ; 61 private int lastStreamId ; 62 private int localStreamCount ; 63 private long remoteStreamCount ; 64 private shared int sendWindow ; 65 private shared int recvWindow ; 66 private CloseState closed = CloseState.NOT_CLOSED; 67 private long bytesWritten; 68 69 private Scheduler scheduler; 70 private Connection endPoint; 71 private Http2Generator generator; 72 private StreamSession.Listener listener; 73 private FlowControlStrategy flowControl; 74 private Http2Flusher flusher; 75 private int maxLocalStreams; 76 private int maxRemoteStreams; 77 private long streamIdleTimeout; 78 private int initialSessionRecvWindow; 79 private bool pushEnabled; 80 private MonoTime idleTime; 81 82 alias convertToMillisecond = convert!(TimeUnit.HectoNanosecond, TimeUnit.Millisecond); 83 84 this(Scheduler scheduler, Connection endPoint, Http2Generator generator, 85 StreamSession.Listener listener, FlowControlStrategy flowControl, 86 int initialStreamId, int streamIdleTimeout) { 87 this.scheduler = scheduler; 88 this.endPoint = endPoint; 89 this.generator = generator; 90 this.listener = listener; 91 this.flowControl = flowControl; 92 this.flusher = new Http2Flusher(this); 93 this.maxLocalStreams = -1; 94 this.maxRemoteStreams = -1; 95 this.streamIds = (initialStreamId); 96 this.streamIdleTimeout = streamIdleTimeout > 0 ? streamIdleTimeout : endPoint.getMaxIdleTimeout().total!(TimeUnit.Millisecond)(); 97 this.sendWindow = (FlowControlStrategy.DEFAULT_WINDOW_SIZE); 98 this.recvWindow = (FlowControlStrategy.DEFAULT_WINDOW_SIZE); 99 this.pushEnabled = true; // SPEC: by default, push is enabled. 100 this.idleTime = MonoTime.currTime(); // convertToMillisecond(Clock.currStdTime); //Millisecond100Clock.currentTimeMillis(); 101 } 102 103 FlowControlStrategy getFlowControlStrategy() { 104 return flowControl; 105 } 106 107 int getMaxLocalStreams() { 108 return maxLocalStreams; 109 } 110 111 void setMaxLocalStreams(int maxLocalStreams) { 112 this.maxLocalStreams = maxLocalStreams; 113 } 114 115 int getMaxRemoteStreams() { 116 return maxRemoteStreams; 117 } 118 119 void setMaxRemoteStreams(int maxRemoteStreams) { 120 this.maxRemoteStreams = maxRemoteStreams; 121 } 122 123 long getStreamIdleTimeout() { 124 return streamIdleTimeout; 125 } 126 127 void setStreamIdleTimeout(long streamIdleTimeout) { 128 this.streamIdleTimeout = streamIdleTimeout; 129 } 130 131 int getInitialSessionRecvWindow() { 132 return initialSessionRecvWindow; 133 } 134 135 void setInitialSessionRecvWindow(int initialSessionRecvWindow) { 136 this.initialSessionRecvWindow = initialSessionRecvWindow; 137 } 138 139 Connection getEndPoint() { 140 return endPoint; 141 } 142 143 Http2Generator getGenerator() { 144 return generator; 145 } 146 147 override 148 long getBytesWritten() { 149 return bytesWritten; 150 } 151 152 override 153 void onData(DataFrame frame) { 154 version(HUNT_DEBUG) { 155 tracef("Received %s", frame.toString()); 156 } 157 int streamId = frame.getStreamId(); 158 StreamSPI stream = cast(StreamSPI) getStream(streamId); 159 160 // SPEC: the session window must be updated even if the stream is null. 161 // The flow control length includes the padding bytes. 162 int flowControlLength = frame.remaining() + frame.padding(); 163 flowControl.onDataReceived(this, stream, flowControlLength); 164 165 class DataCallback : NoopCallback 166 { 167 override 168 void succeeded() { 169 complete(); 170 } 171 172 override 173 void failed(Exception x) { 174 // Consume also in case of failures, to free the 175 // session flow control window for other streams. 176 complete(); 177 } 178 179 private void complete() { 180 // notIdle(); 181 // stream.notIdle(); 182 flowControl.onDataConsumed(this.outer, stream, flowControlLength); 183 } 184 } 185 186 if (stream !is null && !isClosed) { 187 if (getRecvWindow() < 0) { 188 close(cast(int)ErrorCode.FLOW_CONTROL_ERROR, "session_window_exceeded", Callback.NOOP); 189 } else { 190 stream.process(frame, new DataCallback() ); 191 } 192 } else { 193 tracef("Ignoring %s, stream #%s not found", frame.toString(), streamId); 194 // We must enlarge the session flow control window, 195 // otherwise other requests will be stalled. 196 flowControl.onDataConsumed(this, null, flowControlLength); 197 } 198 } 199 200 override 201 abstract void onHeaders(HeadersFrame frame); 202 203 override 204 void onPriority(PriorityFrame frame) { 205 version(HUNT_DEBUG) { 206 tracef("Received %s", frame.toString()); 207 } 208 } 209 210 override 211 void onReset(ResetFrame frame) { 212 version(HUNT_DEBUG) { 213 tracef("Received %s", frame.toString()); 214 } 215 StreamSPI stream = cast(StreamSPI)getStream(frame.getStreamId()); 216 if (stream !is null) 217 stream.process(frame, new ResetCallback()); 218 else 219 notifyReset(this, frame); 220 } 221 222 override 223 void onSettings(SettingsFrame frame) { 224 // SPEC: SETTINGS frame MUST be replied. 225 onSettings(frame, true); 226 } 227 228 void onSettings(SettingsFrame frame, bool reply) { 229 version(HUNT_HTTP_DEBUG) { 230 tracef("Received %s", frame.toString()); 231 } 232 if (frame.isReply()) 233 return; 234 235 // Iterate over all settings 236 //for (Map.Entry!(int, int) entry : frame.getSettings().entrySet()) 237 foreach(int key, int value; frame.getSettings()) 238 { 239 // int key = entry.getKey(); 240 // int value = entry.getValue(); 241 switch (key) { 242 case SettingsFrame.HEADER_TABLE_SIZE: { 243 version(HUNT_DEBUG) { 244 tracef("Update HPACK header table size to %s for %s", value, this.toString()); 245 } 246 generator.setHeaderTableSize(value); 247 break; 248 } 249 case SettingsFrame.ENABLE_PUSH: { 250 // SPEC: check the value is sane. 251 if (value != 0 && value != 1) { 252 onConnectionFailure(cast(int)ErrorCode.PROTOCOL_ERROR, "invalid_settings_enable_push"); 253 return; 254 } 255 pushEnabled = value == 1; 256 version(HUNT_DEBUG) { 257 tracef("%s push for %s", pushEnabled ? "Enable" : "Disable", this.toString()); 258 } 259 break; 260 } 261 case SettingsFrame.MAX_CONCURRENT_STREAMS: { 262 maxLocalStreams = value; 263 version(HUNT_DEBUG) { 264 tracef("Update max local concurrent streams to %s for %s", maxLocalStreams, this.toString()); 265 } 266 break; 267 } 268 case SettingsFrame.INITIAL_WINDOW_SIZE: { 269 version(HUNT_DEBUG) { 270 tracef("Update initial window size to %s for %s", value, this.toString()); 271 } 272 flowControl.updateInitialStreamWindow(this, value, false); 273 break; 274 } 275 case SettingsFrame.MAX_FRAME_SIZE: { 276 version(HUNT_DEBUG) { 277 tracef("Update max frame size to %s for %s", value, this.toString()); 278 } 279 // SPEC: check the max frame size is sane. 280 if (value < Frame.DEFAULT_MAX_LENGTH || value > Frame.MAX_MAX_LENGTH) { 281 onConnectionFailure(cast(int)ErrorCode.PROTOCOL_ERROR, "invalid_settings_max_frame_size"); 282 return; 283 } 284 generator.setMaxFrameSize(value); 285 break; 286 } 287 case SettingsFrame.MAX_HEADER_LIST_SIZE: { 288 version(HUNT_DEBUG) { 289 tracef("Update max header list size to %s for %s", value, this.toString()); 290 } 291 generator.setMaxHeaderListSize(value); 292 break; 293 } 294 default: { 295 version(HUNT_DEBUG) { 296 tracef("Unknown setting %s:%s for %s", key, value, this.toString()); 297 } 298 break; 299 } 300 } 301 } 302 notifySettings(this, frame); 303 304 if (reply) { 305 SettingsFrame replyFrame = new SettingsFrame(Collections.emptyMap!(int, int)(), true); 306 settings(replyFrame, Callback.NOOP); 307 } 308 } 309 310 override 311 void onPing(PingFrame frame) { 312 // version(HUNT_HTTP_DEBUG) { 313 // tracef("Received %s", frame.toString()); 314 // } 315 if (frame.isReply()) { 316 // version(HUNT_HTTP_DEBUG) 317 { 318 infof("The session %s received ping reply", endPoint.getId()); 319 } 320 notifyPing(this, frame); 321 } else { 322 PingFrame reply = new PingFrame(frame.getPayload(), true); 323 control(null, Callback.NOOP, reply); 324 } 325 } 326 327 /** 328 * This method is called when receiving a GO_AWAY from the other peer. 329 * We check the close state to act appropriately: 330 * <ul> 331 * <li>NOT_CLOSED: we move to REMOTELY_CLOSED and queue a disconnect, so 332 * that the content of the queue is written, and then the connection 333 * closed. We notify the application after being terminated. 334 * See <code>Http2Session.ControlEntry#succeeded()</code></li> 335 * <li>In all other cases, we do nothing since other methods are already 336 * performing their actions.</li> 337 * </ul> 338 * 339 * @param frame the GO_AWAY frame that has been received. 340 * @see #close(int, string, Callback) 341 * @see #onShutdown() 342 * @see #onIdleTimeout() 343 */ 344 override 345 void onGoAway(GoAwayFrame frame) { 346 version(HUNT_DEBUG) { 347 tracef("Received %s", frame.toString()); 348 } 349 while (true) { 350 CloseState current = closed; 351 switch (current) { 352 case CloseState.NOT_CLOSED: { 353 if (closed == current) { 354 closed = CloseState.REMOTELY_CLOSED; 355 // We received a GO_AWAY, so try to write 356 // what's in the queue and then disconnect. 357 notifyClose(this, frame, new DisconnectCallback()); 358 return; 359 } 360 break; 361 } 362 default: { 363 version(HUNT_DEBUG) { 364 tracef("Ignored %s, already closed", frame.toString()); 365 } 366 return; 367 } 368 } 369 } 370 } 371 372 override 373 void onWindowUpdate(WindowUpdateFrame frame) { 374 version(HUNT_DEBUG) { 375 tracef("Received %s ^^^^ ", frame.toString()); 376 } 377 int streamId = frame.getStreamId(); 378 if (streamId > 0) { 379 StreamSPI stream = cast(StreamSPI)getStream(streamId); 380 if (stream !is null) { 381 stream.process(frame, Callback.NOOP); 382 onWindowUpdate(stream, frame); 383 } 384 } else { 385 onWindowUpdate(null, frame); 386 } 387 } 388 389 override 390 void onConnectionFailure(int error, string reason) { 391 notifyFailure(this, new IOException(format("%d/%s", error, reason)), new CloseCallback(error, reason)); 392 } 393 394 override 395 void newStream(HeadersFrame frame, Promise!Stream promise, Stream.Listener listener) { 396 // Synchronization is necessary to atomically create 397 // the stream id and enqueue the frame to be sent. 398 bool queued; 399 synchronized (this) { 400 int streamId = frame.getStreamId(); 401 if (streamId <= 0) { 402 streamId = streamIds; streamIds += 2; // streamIds.getAndAdd(2); 403 PriorityFrame priority = frame.getPriority(); 404 priority = priority is null ? null : new PriorityFrame(streamId, priority.getParentStreamId(), 405 priority.getWeight(), priority.isExclusive()); 406 frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream()); 407 } 408 StreamSPI stream = createLocalStream(streamId, promise); 409 if (stream is null) 410 return; 411 stream.setListener(listener); 412 413 ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback!Stream(promise, stream)); 414 queued = flusher.append(entry); 415 } 416 // Iterate outside the synchronized block. 417 if (queued) 418 flusher.iterate(); 419 } 420 421 override 422 int priority(PriorityFrame frame, Callback callback) { 423 int streamId = frame.getStreamId(); 424 StreamSPI stream = streams[streamId]; 425 if (stream is null) { 426 streamId = streamIds; streamIds += 2; //.getAndAdd(2); 427 frame = new PriorityFrame(streamId, frame.getParentStreamId(), 428 frame.getWeight(), frame.isExclusive()); 429 } 430 control(stream, callback, frame); 431 return streamId; 432 } 433 434 override 435 void push(StreamSPI stream, Promise!Stream promise, PushPromiseFrame frame, Stream.Listener listener) { 436 // Synchronization is necessary to atomically create 437 // the stream id and enqueue the frame to be sent. 438 bool queued; 439 synchronized (this) { 440 int streamId = streamIds; streamIds += 2; //.getAndAdd(2); 441 frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData()); 442 443 StreamSPI pushStream = createLocalStream(streamId, promise); 444 if (pushStream is null) 445 return; 446 pushStream.setListener(listener); 447 448 ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback!Stream(promise, pushStream)); 449 queued = flusher.append(entry); 450 } 451 // Iterate outside the synchronized block. 452 if (queued) 453 flusher.iterate(); 454 } 455 456 override 457 void settings(SettingsFrame frame, Callback callback) { 458 control(null, callback, frame); 459 } 460 461 override 462 void ping(PingFrame frame, Callback callback) { 463 if (frame.isReply()) 464 callback.failed(new IllegalArgumentException("")); 465 else 466 control(null, callback, frame); 467 } 468 469 protected void reset(ResetFrame frame, Callback callback) { 470 control(cast(StreamSPI)getStream(frame.getStreamId()), callback, frame); 471 } 472 473 /** 474 * Invoked internally and by applications to send a GO_AWAY frame to the 475 * other peer. We check the close state to act appropriately: 476 * <ul> 477 * <li>NOT_CLOSED: we move to LOCALLY_CLOSED and queue a GO_AWAY. When the 478 * GO_AWAY has been written, it will only cause the output to be shut 479 * down (not the connection closed), so that the application can still 480 * read frames arriving from the other peer. 481 * Ideally the other peer will notice the GO_AWAY and close the connection. 482 * When that happen, we close the connection from {@link #onShutdown()}. 483 * Otherwise, the idle timeout mechanism will close the connection, see 484 * {@link #onIdleTimeout()}.</li> 485 * <li>In all other cases, we do nothing since other methods are already 486 * performing their actions.</li> 487 * </ul> 488 * 489 * @param error the error code 490 * @param reason the reason 491 * @param callback the callback to invoke when the operation is complete 492 * @see #onGoAway(GoAwayFrame) 493 * @see #onShutdown() 494 * @see #onIdleTimeout() 495 */ 496 override 497 bool close(int error, string reason, Callback callback) { 498 while (true) { 499 CloseState current = closed; 500 switch (current) { 501 case CloseState.NOT_CLOSED: { 502 if (closed == current) { 503 closed = CloseState.LOCALLY_CLOSED; 504 GoAwayFrame frame = newGoAwayFrame(error, reason); 505 control(null, callback, frame); 506 return true; 507 } 508 break; 509 } 510 default: { 511 version(HUNT_DEBUG) 512 tracef("Ignoring close %s/%s, already closed", error, reason); 513 callback.succeeded(); 514 return false; 515 } 516 } 517 } 518 } 519 520 private GoAwayFrame newGoAwayFrame(int error, string reason) { 521 byte[] payload = null; 522 if (!reason.empty) { 523 // Trim the reason to avoid attack vectors. 524 int len = min(reason.length, 32); 525 payload = cast(byte[])reason[0..len].dup; 526 } 527 return new GoAwayFrame(lastStreamId, error, payload); 528 } 529 530 override 531 bool isClosed() { 532 return closed != CloseState.NOT_CLOSED; 533 } 534 535 private void control(StreamSPI stream, Callback callback, Frame frame) { 536 frames(stream, callback, frame, Frame.EMPTY_ARRAY); 537 } 538 539 override 540 void frames(StreamSPI stream, Callback callback, Frame frame, Frame[] frames... ) { 541 // We want to generate as late as possible to allow re-prioritization; 542 // generation will happen while processing the entries. 543 544 // The callback needs to be notified only when the last frame completes. 545 546 int length = cast(int)frames.length; 547 if (length == 0) { 548 // version(HUNT_HTTP_DEBUG) tracef("length == 0") ; 549 onFrame(new ControlEntry(frame, stream, callback), true); 550 } else { 551 callback = new CountingCallback(callback, 1 + length); 552 onFrame(new ControlEntry(frame, stream, callback), false); 553 for (int i = 1; i <= length; ++i) 554 onFrame(new ControlEntry(frames[i - 1], stream, callback), i == length); 555 } 556 } 557 558 override 559 void data(StreamSPI stream, Callback callback, DataFrame frame) { 560 // We want to generate as late as possible to allow re-prioritization. 561 if (!isClosed()) 562 { 563 onFrame(new DataEntry(frame, stream, callback), true); 564 } 565 } 566 567 private void onFrame(Http2Flusher.Entry entry, bool flush) { 568 version(HUNT_HTTP_DEBUG) { 569 tracef("%s %s", flush ? "Sending" : "Queueing", entry.frame.toString()); 570 } 571 // Ping frames are prepended to process them as soon as possible. 572 bool queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry); 573 if (queued && flush) { 574 if (entry.stream !is null) { 575 //entry.stream.notIdle(); 576 } 577 flusher.iterate(); 578 } 579 } 580 581 protected StreamSPI createLocalStream(int streamId, Promise!Stream promise) { 582 while (true) { 583 int localCount = localStreamCount; 584 int maxCount = getMaxLocalStreams(); 585 if (maxCount >= 0 && localCount >= maxCount) { 586 promise.failed(new IllegalStateException("Max local stream count " ~ maxCount.to!string() ~ " exceeded")); 587 return null; 588 } 589 if (localStreamCount == localCount) 590 { 591 localStreamCount = localCount + 1; 592 break; 593 } 594 } 595 596 StreamSPI stream = newStream(streamId, true); 597 auto itemPtr = streamId in streams; 598 if (itemPtr is null) { 599 streams[streamId] = stream; 600 // stream.setIdleTimeout(getStreamIdleTimeout()); 601 flowControl.onStreamCreated(stream); 602 version(HUNT_DEBUG) { 603 tracef("Created local %s", stream.toString()); 604 } 605 return stream; 606 } else { 607 promise.failed(new IllegalStateException("Duplicate stream " ~ streamId.to!string())); 608 return null; 609 } 610 } 611 612 protected StreamSPI createRemoteStream(int streamId) { 613 // SPEC: exceeding max concurrent streams is treated as stream error. 614 while (true) { 615 long encoded = remoteStreamCount; 616 int remoteCount = AtomicBiInteger.getHi(encoded); 617 int remoteClosing = AtomicBiInteger.getLo(encoded); 618 int maxCount = getMaxRemoteStreams(); 619 if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount) { 620 reset(new ResetFrame(streamId, cast(int)ErrorCode.REFUSED_STREAM_ERROR), Callback.NOOP); 621 return null; 622 } 623 // if (remoteStreamCount.compareAndSet(encoded, remoteCount + 1, remoteClosing)) 624 if(remoteStreamCount == encoded) 625 { 626 remoteStreamCount = AtomicBiInteger.encode(remoteCount + 1, remoteClosing); 627 break; 628 } 629 } 630 631 StreamSPI stream = newStream(streamId, false); 632 633 // SPEC: duplicate stream is treated as connection error. 634 //streams.putIfAbsent(streamId, stream) 635 auto itemPtr = streamId in streams; 636 if ( itemPtr is null) { 637 streams[streamId] = stream; 638 updateLastStreamId(streamId); 639 // stream.setIdleTimeout(getStreamIdleTimeout()); 640 flowControl.onStreamCreated(stream); 641 version(HUNT_DEBUG) { 642 tracef("Created remote %s", stream.toString()); 643 } 644 return stream; 645 } else { 646 close(cast(int)ErrorCode.PROTOCOL_ERROR, "duplicate_stream", Callback.NOOP); 647 return null; 648 } 649 } 650 651 void updateStreamCount(bool local, int deltaStreams, int deltaClosing) { 652 if (local) 653 localStreamCount += (deltaStreams); 654 else 655 { 656 remoteStreamCount = AtomicBiInteger.encode(remoteStreamCount, deltaStreams, deltaClosing); 657 // remoteStreamCount.add(deltaStreams, deltaClosing); 658 } 659 } 660 661 protected StreamSPI newStream(int streamId, bool local) { 662 return new Http2Stream(scheduler, this, streamId, local); 663 } 664 665 override 666 void removeStream(StreamSPI stream) { 667 bool removed = streams.remove(stream.getId()); 668 if (removed) { 669 onStreamClosed(stream); 670 flowControl.onStreamDestroyed(stream); 671 version(HUNT_DEBUG) { 672 tracef("Removed %s %s", stream.isLocal() ? "local" : "remote", stream); 673 } 674 } 675 } 676 677 override 678 Stream[] getStreams() { 679 return cast(Stream[])(streams.values()); 680 } 681 682 int getStreamCount() { 683 return cast(int)streams.length; 684 } 685 686 override 687 StreamSPI getStream(int streamId) 688 { 689 return streams.get(streamId, null); 690 } 691 692 int getSendWindow() { 693 return sendWindow.atomicLoad!(); 694 } 695 696 int getRecvWindow() { 697 return recvWindow.atomicLoad!(); 698 } 699 700 override 701 int updateSendWindow(int delta) { 702 //int old = sendWindow.atomicLoad!(); 703 return sendWindow.atomicOp!"+="(delta); 704 //return old; 705 } 706 707 override 708 int updateRecvWindow(int delta) { 709 //int old = recvWindow.atomicLoad!(); 710 return recvWindow.atomicOp!"+="(delta); 711 //return old; 712 } 713 714 override 715 void onWindowUpdate(StreamSPI stream, WindowUpdateFrame frame) { 716 // WindowUpdateFrames arrive concurrently with writes. 717 // Increasing (or reducing) the window size concurrently 718 // with writes requires coordination with the flusher, that 719 // decides how many frames to write depending on the available 720 // window sizes. If the window sizes vary concurrently, the 721 // flusher may take non-optimal or wrong decisions. 722 // Here, we "queue" window updates to the flusher, so it will 723 // be the only component responsible for window updates, for 724 // both increments and reductions. 725 flusher.window(stream, frame); 726 } 727 728 override 729 bool isPushEnabled() { 730 return pushEnabled; 731 } 732 733 /** 734 * A typical close by a remote peer involves a GO_AWAY frame followed by TCP FIN. 735 * This method is invoked when the TCP FIN is received, or when an exception is 736 * thrown while reading, and we check the close state to act appropriately: 737 * <ul> 738 * <li>NOT_CLOSED: means that the remote peer did not send a GO_AWAY (abrupt close) 739 * or there was an exception while reading, and therefore we terminate.</li> 740 * <li>LOCALLY_CLOSED: we have sent the GO_AWAY to the remote peer, which received 741 * it and closed the connection; we queue a disconnect to close the connection 742 * on the local side. 743 * The GO_AWAY just shutdown the output, so we need this step to make sure the 744 * connection is closed. See {@link #close(int, string, Callback)}.</li> 745 * <li>REMOTELY_CLOSED: we received the GO_AWAY, and the TCP FIN afterwards, so we 746 * do nothing since the handling of the GO_AWAY will take care of closing the 747 * connection. See {@link #onGoAway(GoAwayFrame)}.</li> 748 * </ul> 749 * 750 * @see #onGoAway(GoAwayFrame) 751 * @see #close(int, string, Callback) 752 * @see #onIdleTimeout() 753 */ 754 override 755 void onShutdown() { 756 version(HUNT_DEBUG) { 757 tracef("Shutting down %s", this.toString()); 758 } 759 switch (closed) { 760 case CloseState.NOT_CLOSED: { 761 // The other peer did not send a GO_AWAY, no need to be gentle. 762 version(HUNT_DEBUG) { 763 tracef("Abrupt close for %s", this.toString()); 764 } 765 abort(new ClosedChannelException("")); 766 break; 767 } 768 case CloseState.LOCALLY_CLOSED: { 769 // We have closed locally, and only shutdown 770 // the output; now queue a disconnect. 771 control(null, Callback.NOOP, new DisconnectFrame()); 772 break; 773 } 774 case CloseState.REMOTELY_CLOSED: { 775 // Nothing to do, the GO_AWAY frame we 776 // received will close the connection. 777 break; 778 } 779 default: { 780 break; 781 } 782 } 783 } 784 785 /** 786 * This method is invoked when the idle timeout triggers. We check the close state 787 * to act appropriately: 788 * <ul> 789 * <li>NOT_CLOSED: it's a real idle timeout, we just initiate a close, see 790 * {@link #close(int, string, Callback)}.</li> 791 * <li>LOCALLY_CLOSED: we have sent a GO_AWAY and only shutdown the output, but the 792 * other peer did not close the connection so we never received the TCP FIN, and 793 * therefore we terminate.</li> 794 * <li>REMOTELY_CLOSED: the other peer sent us a GO_AWAY, we should have queued a 795 * disconnect, but for some reason it was not processed (for example, queue was 796 * stuck because of TCP congestion), therefore we terminate. 797 * See {@link #onGoAway(GoAwayFrame)}.</li> 798 * </ul> 799 * 800 * @return true if the session should be closed, false otherwise 801 * @see #onGoAway(GoAwayFrame) 802 * @see #close(int, string, Callback) 803 * @see #onShutdown() 804 */ 805 override 806 bool onIdleTimeout() { 807 switch (closed) { 808 case CloseState.NOT_CLOSED: { 809 // long elapsed = convertToMillisecond(Clock.currStdTime) - idleTime; 810 Duration elapsed = MonoTime.currTime() - idleTime; 811 version(HUNT_HTTP_DEBUG) { 812 tracef("HTTP2 session on idle timeout. The elapsed time is %d - %d", 813 elapsed.total!(TimeUnit.Millisecond), 814 endPoint.getMaxIdleTimeout().total!(TimeUnit.Millisecond)); 815 } 816 return elapsed >= endPoint.getMaxIdleTimeout() && notifyIdleTimeout(this); 817 } 818 case CloseState.LOCALLY_CLOSED: 819 case CloseState.REMOTELY_CLOSED: { 820 abort(new TimeoutException("Idle timeout " ~ endPoint.getMaxIdleTimeout().to!string() ~ " ms")); 821 return false; 822 } 823 default: { 824 return false; 825 } 826 } 827 } 828 829 private void notIdle() { 830 idleTime = MonoTime.currTime; // convertToMillisecond(Clock.currStdTime); 831 } 832 833 override 834 void onFrame(Frame frame) { 835 onConnectionFailure(cast(int)ErrorCode.PROTOCOL_ERROR, "upgrade"); 836 } 837 838 protected void onStreamOpened(StreamSPI stream) { 839 } 840 841 protected void onStreamClosed(StreamSPI stream) { 842 } 843 844 void disconnect() { 845 version(HUNT_DEBUG) { 846 tracef("Disconnecting %s", this.toString()); 847 } 848 endPoint.close(); 849 } 850 851 private void terminate(Exception cause) { 852 while (true) { 853 CloseState current = closed; 854 switch (current) { 855 case CloseState.NOT_CLOSED: 856 case CloseState.LOCALLY_CLOSED: 857 case CloseState.REMOTELY_CLOSED: { 858 if (closed == current) { 859 closed = CloseState.CLOSED; 860 flusher.terminate(cause); 861 foreach (StreamSPI stream ; streams.byValue()) 862 stream.close(); 863 streams.clear(); 864 disconnect(); 865 return; 866 } 867 break; 868 } 869 default: { 870 return; 871 } 872 } 873 } 874 } 875 876 void abort(Exception failure) { 877 notifyFailure(this, failure, new TerminateCallback(failure)); 878 } 879 880 bool isDisconnected() { 881 return !endPoint.isConnected(); 882 } 883 884 private void updateLastStreamId(int streamId) { 885 // Atomics.updateMax(lastStreamId, streamId); 886 if(streamId>lastStreamId) lastStreamId = streamId; 887 } 888 889 protected Stream.Listener notifyNewStream(Stream stream, HeadersFrame frame) { 890 try { 891 return listener.onNewStream(stream, frame); 892 } catch (Exception x) { 893 info("Failure while notifying listener " ~ listener.toString(), x); 894 return null; 895 } 896 } 897 898 protected void notifySettings(StreamSession session, SettingsFrame frame) { 899 try { 900 listener.onSettings(session, frame); 901 } catch (Exception x) { 902 info("Failure while notifying listener " ~ listener.toString(), x); 903 } 904 } 905 906 protected void notifyPing(StreamSession session, PingFrame frame) { 907 try { 908 listener.onPing(session, frame); 909 } catch (Exception x) { 910 info("Failure while notifying listener " ~ listener.toString(), x); 911 } 912 } 913 914 protected void notifyReset(StreamSession session, ResetFrame frame) { 915 try { 916 listener.onReset(session, frame); 917 } catch (Exception x) { 918 info("Failure while notifying listener " ~ listener.toString(), x); 919 } 920 } 921 922 protected void notifyClose(StreamSession session, GoAwayFrame frame, Callback callback) { 923 try { 924 listener.onClose(session, frame, callback); 925 } catch (Exception x) { 926 info("Failure while notifying listener " ~ listener.toString(), x); 927 } 928 } 929 930 protected bool notifyIdleTimeout(StreamSession session) { 931 try { 932 return listener.onIdleTimeout(session); 933 } catch (Exception x) { 934 info("Failure while notifying listener " ~ listener.toString(), x); 935 return true; 936 } 937 } 938 939 protected void notifyFailure(StreamSession session, Exception failure, Callback callback) { 940 try { 941 listener.onFailure(session, failure, callback); 942 } catch (Exception x) { 943 info("Failure while notifying listener " ~ listener.toString(), x); 944 } 945 } 946 947 protected void notifyHeaders(StreamSPI stream, HeadersFrame frame) { 948 // Optional.ofNullable(stream.getListener()).ifPresent(listener -> { 949 // try { 950 // listener.onHeaders(stream, frame); 951 // } catch (Exception x) { 952 // info("Failure while notifying listener " ~ listener.toString(), x); 953 // } 954 // }); 955 auto listener = stream.getListener(); 956 if(listener !is null) 957 { 958 try { 959 listener.onHeaders(stream, frame); 960 } catch (Exception x) { 961 info("Failure while notifying listener " ~ listener.toString(), x); 962 } 963 } 964 } 965 966 override 967 string toString() { 968 return format("%s@%s{l:%s <-> r:%s,sendWindow=%s,recvWindow=%s,streams=%d,%s}", 969 typeof(this).stringof, 970 toHash(), 971 getEndPoint().getLocalAddress(), 972 getEndPoint().getRemoteAddress(), 973 sendWindow, 974 recvWindow, 975 streams.length, 976 closed); 977 } 978 979 private class ControlEntry : Http2Flusher.Entry { 980 private int bytes; 981 982 private this(Frame frame, StreamSPI stream, Callback callback) { 983 super(frame, stream, callback); 984 } 985 986 override 987 protected bool generate(Queue!ByteBuffer buffers) { 988 List!(ByteBuffer) controlFrame = generator.control(frame); 989 bytes = cast(int) BufferUtils.remaining(controlFrame.toArray()); 990 buffers.addAll(controlFrame); 991 version(HUNT_HTTP_DEBUG) { 992 tracef("Generated %s", frame.toString()); 993 } 994 beforeSend(); 995 return true; 996 } 997 998 /** 999 * <p>Performs actions just before writing the frame to the network.</p> 1000 * <p>Some frame, when sent over the network, causes the receiver 1001 * to react and send back frames that may be processed by the original 1002 * sender *before* {@link #succeeded()} is called. 1003 * <p>If the action to perform updates some state, this update may 1004 * not be seen by the received frames and cause errors.</p> 1005 * <p>For example, suppose the action updates the stream window to a 1006 * larger value; the sender sends the frame; the receiver is now entitled 1007 * to send back larger data; when the data is received by the original 1008 * sender, the action may have not been performed yet, causing the larger 1009 * data to be rejected, when it should have been accepted.</p> 1010 */ 1011 private void beforeSend() { 1012 switch (frame.getType()) { 1013 case FrameType.HEADERS: { 1014 HeadersFrame headersFrame = cast(HeadersFrame) frame; 1015 stream.updateClose(headersFrame.isEndStream(), CloseStateEvent.BEFORE_SEND); 1016 break; 1017 } 1018 case FrameType.SETTINGS: { 1019 SettingsFrame settingsFrame = cast(SettingsFrame) frame; 1020 Map!(int, int) settings = settingsFrame.getSettings(); 1021 if (settings.containsKey(SettingsFrame.INITIAL_WINDOW_SIZE)) 1022 { 1023 int initialWindow = settings.get(SettingsFrame.INITIAL_WINDOW_SIZE); 1024 flowControl.updateInitialStreamWindow(this.outer, initialWindow, true); 1025 } 1026 break; 1027 } 1028 default: { 1029 break; 1030 } 1031 } 1032 } 1033 1034 override 1035 void succeeded() { 1036 bytesWritten += (bytes); 1037 switch (frame.getType()) { 1038 case FrameType.HEADERS: { 1039 onStreamOpened(stream); 1040 HeadersFrame headersFrame = cast(HeadersFrame) frame; 1041 if (stream.updateClose(headersFrame.isEndStream(), CloseStateEvent.AFTER_SEND)) 1042 removeStream(stream); 1043 break; 1044 } 1045 case FrameType.RST_STREAM: { 1046 if (stream !is null) { 1047 stream.close(); 1048 removeStream(stream); 1049 } 1050 break; 1051 } 1052 case FrameType.PUSH_PROMISE: { 1053 // Pushed streams are implicitly remotely closed. 1054 // They are closed when sending an end-stream DATA frame. 1055 stream.updateClose(true, CloseStateEvent.RECEIVED); 1056 break; 1057 } 1058 case FrameType.GO_AWAY: { 1059 // We just sent a GO_AWAY, only shutdown the 1060 // output without closing yet, to allow reads. 1061 getEndPoint().shutdownOutput(); 1062 break; 1063 } 1064 case FrameType.WINDOW_UPDATE: { 1065 flowControl.windowUpdate(this.outer, stream, cast(WindowUpdateFrame) frame); 1066 break; 1067 } 1068 case FrameType.DISCONNECT: { 1069 terminate(new ClosedChannelException("")); 1070 break; 1071 } 1072 default: { 1073 break; 1074 } 1075 } 1076 super.succeeded(); 1077 } 1078 1079 override 1080 void failed(Exception x) { 1081 if (frame.getType() == FrameType.DISCONNECT) { 1082 terminate(new ClosedChannelException("")); 1083 } 1084 super.failed(x); 1085 } 1086 } 1087 1088 private class DataEntry :Http2Flusher.Entry { 1089 private int bytes; 1090 private int _dataRemaining; 1091 private int dataWritten; 1092 1093 private this(DataFrame frame, StreamSPI stream, Callback callback) { 1094 super(frame, stream, callback); 1095 // We don't do any padding, so the flow control length is 1096 // always the data remaining. This simplifies the handling 1097 // of data frames that cannot be completely written due to 1098 // the flow control window exhausting, since in that case 1099 // we would have to count the padding only once. 1100 _dataRemaining = frame.remaining(); 1101 } 1102 1103 override 1104 int dataRemaining() { 1105 return _dataRemaining; 1106 } 1107 1108 override 1109 protected bool generate(Queue!ByteBuffer buffers) { 1110 int remaining = dataRemaining(); 1111 1112 int sessionSendWindow = getSendWindow(); 1113 if (sessionSendWindow <= 0) 1114 { 1115 updateSendWindow(65535); 1116 sessionSendWindow = 65535; 1117 } 1118 int streamSendWindow = stream.updateSendWindow(0); 1119 if (streamSendWindow <= 0) 1120 { 1121 stream.updateSendWindow(sessionSendWindow); 1122 streamSendWindow = sessionSendWindow; 1123 } 1124 int window = min(streamSendWindow, sessionSendWindow); 1125 //tracef("&&&&&&&&&&&&&&&&& %d &&&& %d" ,sessionSendWindow , streamSendWindow) ; 1126 1127 if (window <= 0 && remaining > 0) 1128 { 1129 tracef("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! %d &&&& %d" ,sessionSendWindow , streamSendWindow) ; 1130 return false; 1131 } 1132 1133 1134 int length = min(remaining, window); 1135 1136 // Only one DATA frame is generated. 1137 DataFrame dataFrame = cast(DataFrame) frame; 1138 Tuple!(int, List!(ByteBuffer)) pair = generator.data(dataFrame, length); 1139 bytes = pair[0]; 1140 buffers.addAll(pair[1]); 1141 int written = bytes - Frame.HEADER_LENGTH; 1142 version(HUNT_DEBUG) { 1143 tracef("Generated %s, length/window/data=%s/%s/%s", dataFrame, written, window, remaining); 1144 } 1145 this.dataWritten = written; 1146 //tracef("&&&&&&&&^^^^^ %d ^^^^^^ %d ^^^^^^%d", written, _dataRemaining , bytes); 1147 this._dataRemaining -= written; 1148 1149 1150 flowControl.onDataSending(stream, written); 1151 stream.updateClose(dataFrame.isEndStream(), CloseStateEvent.BEFORE_SEND); 1152 1153 return true; 1154 } 1155 1156 override 1157 void succeeded() { 1158 bytesWritten += bytes; 1159 flowControl.onDataSent(stream, dataWritten); 1160 1161 // Do we have more to send ? 1162 DataFrame dataFrame = cast(DataFrame) frame; 1163 if (_dataRemaining == 0) { 1164 // Only now we can update the close state 1165 // and eventually remove the stream. 1166 if (stream.updateClose(dataFrame.isEndStream(), CloseStateEvent.AFTER_SEND)) 1167 removeStream(stream); 1168 super.succeeded(); 1169 } 1170 } 1171 } 1172 1173 private static class PromiseCallback(C) : NoopCallback { 1174 private Promise!C promise; 1175 private C value; 1176 1177 private this(Promise!C promise, C value) { 1178 this.promise = promise; 1179 this.value = value; 1180 } 1181 1182 override 1183 void succeeded() { 1184 promise.succeeded(value); 1185 } 1186 1187 override 1188 void failed(Exception x) { 1189 promise.failed(x); 1190 } 1191 } 1192 1193 private class ResetCallback : NoopCallback { 1194 override 1195 void succeeded() { 1196 complete(); 1197 } 1198 1199 override 1200 void failed(Exception x) { 1201 complete(); 1202 } 1203 1204 private void complete() { 1205 flusher.iterate(); 1206 } 1207 } 1208 1209 private class CloseCallback : NoopCallback { 1210 private int error; 1211 private string reason; 1212 1213 private this(int error, string reason) { 1214 this.error = error; 1215 this.reason = reason; 1216 } 1217 1218 override 1219 void succeeded() { 1220 complete(); 1221 } 1222 1223 override 1224 void failed(Exception x) { 1225 complete(); 1226 } 1227 1228 private void complete() { 1229 close(error, reason, Callback.NOOP); 1230 } 1231 } 1232 1233 private class DisconnectCallback : NoopCallback { 1234 override 1235 void succeeded() { 1236 complete(); 1237 } 1238 1239 override 1240 void failed(Exception x) { 1241 complete(); 1242 } 1243 1244 private void complete() { 1245 frames(null, Callback.NOOP, newGoAwayFrame(cast(int)ErrorCode.NO_ERROR, null), new DisconnectFrame()); 1246 } 1247 } 1248 1249 private class TerminateCallback : NoopCallback { 1250 private Exception failure; 1251 1252 private this(Exception failure) { 1253 this.failure = failure; 1254 } 1255 1256 override 1257 void succeeded() { 1258 complete(); 1259 } 1260 1261 override 1262 void failed(Exception x) { 1263 // failure.addSuppressed(x); 1264 Throwable.chainTogether(failure, x); 1265 complete(); 1266 } 1267 1268 private void complete() { 1269 terminate(failure); 1270 } 1271 } 1272 }