1 module hunt.http.codec.http.stream.Http2Stream; 2 3 import hunt.http.codec.http.stream.CloseState; 4 import hunt.http.codec.http.stream.Http2Session; 5 import hunt.http.codec.http.stream.SessionSPI; 6 import hunt.http.codec.http.stream.Stream; 7 import hunt.http.codec.http.stream.StreamSPI; 8 9 import hunt.http.codec.http.frame; 10 // import hunt.http.utils.concurrent.IdleTimeout; 11 import hunt.util.concurrent.Promise; 12 // import hunt.http.utils.concurrent.Scheduler; 13 14 import hunt.util.concurrent.Scheduler; 15 import hunt.util.functional; 16 import hunt.lang.exception; 17 18 import hunt.logging; 19 import std.format; 20 21 alias Listener = hunt.http.codec.http.stream.Stream.Stream.Listener; 22 23 /** 24 */ 25 class Http2Stream : StreamSPI { // IdleTimeout, 26 27 // private AtomicReference<ConcurrentMap<string, Object>> attributes = new AtomicReference<>(); 28 // private AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED); 29 // private AtomicInteger sendWindow = new AtomicInteger(); 30 // private AtomicInteger recvWindow = new AtomicInteger(); 31 32 private Object[string] attributes; 33 private CloseState closeState; 34 private int sendWindow; 35 private int recvWindow; 36 37 private SessionSPI session; 38 private int streamId; 39 private bool local; 40 private Listener listener; 41 private bool localReset; 42 private bool remoteReset; 43 44 this(Scheduler scheduler, SessionSPI session, int streamId, bool local) { 45 // super(scheduler); 46 closeState = CloseState.NOT_CLOSED; 47 this.session = session; 48 this.streamId = streamId; 49 this.local = local; 50 } 51 52 // override 53 int getId() { 54 return streamId; 55 } 56 57 // override 58 bool isLocal() { 59 return local; 60 } 61 62 // override 63 SessionSPI getSession() { 64 return session; 65 } 66 67 // override 68 void headers(HeadersFrame frame, Callback callback) { 69 session.frames(this, callback, frame, Frame.EMPTY_ARRAY); 70 } 71 72 // override 73 void push(PushPromiseFrame frame, Promise!Stream promise, Listener listener) { 74 session.push(this, promise, frame, listener); 75 } 76 77 // override 78 void data(DataFrame frame, Callback callback) { 79 session.data(this, callback, frame); 80 } 81 82 // override 83 void reset(ResetFrame frame, Callback callback) { 84 if (isReset()) 85 return; 86 localReset = true; 87 session.frames(this, callback, frame, Frame.EMPTY_ARRAY); 88 } 89 90 // override 91 Object getAttribute(string key) { 92 return attributes[key]; 93 } 94 95 // override 96 void setAttribute(string key, Object value) { 97 attributes[key] = value; 98 } 99 100 // override 101 Object removeAttribute(string key) { 102 auto r = attributes[key]; 103 attributes.remove(key); 104 return r; 105 } 106 107 // override 108 bool isReset() { 109 return localReset || remoteReset; 110 } 111 112 // override 113 bool isClosed() { 114 return closeState == CloseState.CLOSED; 115 } 116 117 // override 118 bool isRemotelyClosed() { 119 return closeState == CloseState.REMOTELY_CLOSED; 120 } 121 122 bool isLocallyClosed() { 123 return closeState == CloseState.LOCALLY_CLOSED; 124 } 125 126 // override 127 bool isOpen() { 128 return !isClosed(); 129 } 130 131 // override 132 protected void onIdleExpired(TimeoutException timeout) { 133 version(HUNT_DEBUG) { 134 tracef("Idle timeout %sms expired on %s", 0, this.toString()); // getIdleTimeout() 135 } 136 137 // Notify the application. 138 if (notifyIdleTimeout(this, timeout)) { 139 // Tell the other peer that we timed out. 140 reset(new ResetFrame(getId(), cast(int)ErrorCode.CANCEL_STREAM_ERROR), Callback.NOOP); 141 } 142 } 143 144 // private ConcurrentMap<string, Object> attributes() { 145 // ConcurrentMap<string, Object> map = attributes; 146 // if (map == null) { 147 // map = new ConcurrentHashMap<>(); 148 // if (!attributes.compareAndSet(null, map)) { 149 // map = attributes; 150 // } 151 // } 152 // return map; 153 // } 154 155 // override 156 Listener getListener() { 157 return listener; 158 } 159 160 // override 161 void setListener(Listener listener) { 162 this.listener = listener; 163 } 164 165 // override 166 void process(Frame frame, Callback callback) { 167 // notIdle(); 168 switch (frame.getType()) { 169 case FrameType.HEADERS: { 170 onHeaders(cast(HeadersFrame) frame, callback); 171 break; 172 } 173 case FrameType.DATA: { 174 onData(cast(DataFrame) frame, callback); 175 break; 176 } 177 case FrameType.RST_STREAM: { 178 onReset(cast(ResetFrame) frame, callback); 179 break; 180 } 181 case FrameType.PUSH_PROMISE: { 182 onPush(cast(PushPromiseFrame) frame, callback); 183 break; 184 } 185 case FrameType.WINDOW_UPDATE: { 186 onWindowUpdate(cast(WindowUpdateFrame) frame, callback); 187 break; 188 } 189 default: { 190 throw new UnsupportedOperationException(""); 191 } 192 } 193 } 194 195 private void onHeaders(HeadersFrame frame, Callback callback) { 196 if (updateClose(frame.isEndStream(), CloseStateEvent.RECEIVED)) 197 session.removeStream(this); 198 callback.succeeded(); 199 } 200 201 private void onData(DataFrame frame, Callback callback) { 202 if (getRecvWindow() < 0) { 203 // It's a bad client, it does not deserve to be 204 // treated gently by just resetting the stream. 205 session.close(ErrorCode.FLOW_CONTROL_ERROR, "stream_window_exceeded", Callback.NOOP); 206 callback.failed(new IOException("stream_window_exceeded")); 207 return; 208 } 209 210 // SPEC: remotely closed streams must be replied with a reset. 211 if (isRemotelyClosed()) { 212 reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR), Callback.NOOP); 213 callback.failed(new EOFException("stream_closed")); 214 return; 215 } 216 217 if (isReset()) { 218 // Just drop the frame. 219 callback.failed(new IOException("stream_reset")); 220 return; 221 } 222 223 if (updateClose(frame.isEndStream(), CloseStateEvent.RECEIVED)) 224 session.removeStream(this); 225 notifyData(this, frame, callback); 226 } 227 228 private void onReset(ResetFrame frame, Callback callback) { 229 remoteReset = true; 230 close(); 231 session.removeStream(this); 232 notifyReset(this, frame, callback); 233 } 234 235 private void onPush(PushPromiseFrame frame, Callback callback) { 236 // Pushed streams are implicitly locally closed. 237 // They are closed when receiving an end-stream DATA frame. 238 updateClose(true, CloseStateEvent.AFTER_SEND); 239 callback.succeeded(); 240 } 241 242 private void onWindowUpdate(WindowUpdateFrame frame, Callback callback) { 243 callback.succeeded(); 244 } 245 246 override 247 bool updateClose(bool update, CloseStateEvent event) { 248 version(HUNT_DEBUG) { 249 tracef("Update close for %s update=%s event=%s", this, update, event); 250 } 251 252 if (!update) 253 return false; 254 255 switch (event) { 256 case CloseStateEvent.RECEIVED: 257 return updateCloseAfterReceived(); 258 case CloseStateEvent.BEFORE_SEND: 259 return updateCloseBeforeSend(); 260 case CloseStateEvent.AFTER_SEND: 261 return updateCloseAfterSend(); 262 default: 263 return false; 264 } 265 } 266 267 private bool updateCloseAfterReceived() { 268 while (true) { 269 CloseState current = closeState; 270 switch (current) { 271 case CloseState.NOT_CLOSED: { 272 if (closeState == current) 273 { 274 closeState = CloseState.REMOTELY_CLOSED; 275 return false; 276 } 277 break; 278 } 279 case CloseState.LOCALLY_CLOSING: { 280 // if (closeState.compareAndSet(current, CloseState.CLOSING)) { 281 if (closeState == current) 282 { 283 closeState = CloseState.CLOSING; 284 updateStreamCount(0, 1); 285 return false; 286 } 287 break; 288 } 289 case CloseState.LOCALLY_CLOSED: { 290 close(); 291 return true; 292 } 293 default: { 294 return false; 295 } 296 } 297 } 298 } 299 300 private bool updateCloseBeforeSend() { 301 while (true) { 302 CloseState current = closeState; 303 switch (current) { 304 case CloseState.NOT_CLOSED: { 305 // if (closeState.compareAndSet(current, CloseState.LOCALLY_CLOSING)) 306 if (closeState == current) 307 { 308 closeState = CloseState.LOCALLY_CLOSING; 309 return false; 310 } 311 break; 312 } 313 case CloseState.REMOTELY_CLOSED: { 314 // if (closeState.compareAndSet(current, CloseState.CLOSING)) { 315 if (closeState == current) 316 { 317 closeState = CloseState.CLOSING; 318 updateStreamCount(0, 1); 319 return false; 320 } 321 break; 322 } 323 default: { 324 return false; 325 } 326 } 327 } 328 } 329 330 private bool updateCloseAfterSend() { 331 while (true) { 332 CloseState current = closeState; 333 switch (current) { 334 case CloseState.NOT_CLOSED: 335 case CloseState.LOCALLY_CLOSING: { 336 // if (closeState.compareAndSet(current, CloseState.LOCALLY_CLOSED)) 337 if (closeState == current) 338 { 339 closeState = CloseState.LOCALLY_CLOSING; 340 return false; 341 } 342 break; 343 } 344 case CloseState.REMOTELY_CLOSED: 345 case CloseState.CLOSING: { 346 close(); 347 return true; 348 } 349 default: { 350 return false; 351 } 352 } 353 } 354 } 355 356 int getSendWindow() { 357 return sendWindow; 358 } 359 360 int getRecvWindow() { 361 return recvWindow; 362 } 363 364 override 365 int updateSendWindow(int delta) { 366 int r = sendWindow; sendWindow += delta; 367 return r; 368 // return sendWindow.getAndAdd(delta); 369 } 370 371 override 372 int updateRecvWindow(int delta) { 373 int r = recvWindow; recvWindow += delta; 374 return r; 375 } 376 377 override 378 void close() { 379 CloseState oldState = closeState; 380 closeState = CloseState.CLOSED; 381 if (oldState != CloseState.CLOSED) { 382 int deltaClosing = oldState == CloseState.CLOSING ? -1 : 0; 383 updateStreamCount(-1, deltaClosing); 384 // onClose(); 385 } 386 } 387 388 private void updateStreamCount(int deltaStream, int deltaClosing) { 389 (cast(Http2Session) session).updateStreamCount(isLocal(), deltaStream, deltaClosing); 390 } 391 392 private void notifyData(Stream stream, DataFrame frame, Callback callback) { 393 Listener listener = this.listener; 394 if (listener is null) 395 return; 396 try { 397 listener.onData(stream, frame, callback); 398 } catch (Exception x) { 399 info("Failure while notifying listener " ~ listener.toString(), x); 400 } 401 } 402 403 private void notifyReset(Stream stream, ResetFrame frame, Callback callback) { 404 Listener listener = this.listener; 405 if (listener is null) 406 return; 407 try { 408 listener.onReset(stream, frame, callback); 409 } catch (Exception x) { 410 info("Failure while notifying listener " ~ listener.toString(), x); 411 } 412 } 413 414 private bool notifyIdleTimeout(Stream stream, Exception failure) { 415 Listener listener = this.listener; 416 if (listener is null) 417 return true; 418 try { 419 return listener.onIdleTimeout(stream, failure); 420 } catch (Exception x) { 421 info("Failure while notifying listener " ~ listener.toString(), x); 422 return true; 423 } 424 } 425 426 override 427 string toString() { 428 return format("%s@%x#%d{sendWindow=%s,recvWindow=%s,reset=%b,%s}", typeof(this).stringof, 429 toHash(), getId(), sendWindow, recvWindow, isReset(), closeState); 430 } 431 }