1 module hunt.http.codec.http.stream.Http2Stream; 2 3 import hunt.http.codec.http.stream.CloseState; 4 import hunt.http.codec.http.stream.Http2Session; 5 import hunt.http.codec.http.stream.SessionSPI; 6 import hunt.http.codec.http.stream.Stream; 7 import hunt.http.codec.http.stream.StreamSPI; 8 9 import hunt.http.codec.http.frame; 10 // import hunt.http.utils.concurrent.IdleTimeout; 11 import hunt.concurrency.Promise; 12 // import hunt.http.utils.concurrent.Scheduler; 13 14 import hunt.concurrency.Scheduler; 15 import hunt.util.Common; 16 import hunt.Exceptions; 17 18 import hunt.logging; 19 import std.format; 20 21 import core.atomic : atomicOp, atomicLoad; 22 23 alias Listener = hunt.http.codec.http.stream.Stream.Stream.Listener; 24 25 /** 26 */ 27 class Http2Stream : StreamSPI { // IdleTimeout, 28 29 // private AtomicReference<ConcurrentMap<string, Object>> attributes = new AtomicReference<>(); 30 // private AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED); 31 // private AtomicInteger sendWindow = new AtomicInteger(); 32 // private AtomicInteger recvWindow = new AtomicInteger(); 33 34 private Object[string] attributes; 35 private CloseState closeState; 36 private shared int sendWindow; 37 private shared int recvWindow; 38 39 private SessionSPI session; 40 private int streamId; 41 private bool local; 42 private Listener listener; 43 private bool localReset; 44 private bool remoteReset; 45 46 this(Scheduler scheduler, SessionSPI session, int streamId, bool local) { 47 // super(scheduler); 48 closeState = CloseState.NOT_CLOSED; 49 this.session = session; 50 this.streamId = streamId; 51 this.local = local; 52 } 53 54 // override 55 int getId() { 56 return streamId; 57 } 58 59 // override 60 bool isLocal() { 61 return local; 62 } 63 64 // override 65 SessionSPI getSession() { 66 return session; 67 } 68 69 // override 70 void headers(HeadersFrame frame, Callback callback) { 71 session.frames(this, callback, frame, Frame.EMPTY_ARRAY); 72 } 73 74 // override 75 void push(PushPromiseFrame frame, Promise!Stream promise, Listener listener) { 76 session.push(this, promise, frame, listener); 77 } 78 79 // override 80 void data(DataFrame frame, Callback callback) { 81 session.data(this, callback, frame); 82 } 83 84 // override 85 void reset(ResetFrame frame, Callback callback) { 86 if (isReset()) 87 return; 88 localReset = true; 89 session.frames(this, callback, frame, Frame.EMPTY_ARRAY); 90 } 91 92 // override 93 Object getAttribute(string key) { 94 return attributes[key]; 95 } 96 97 // override 98 void setAttribute(string key, Object value) { 99 attributes[key] = value; 100 } 101 102 // override 103 Object removeAttribute(string key) { 104 auto r = attributes[key]; 105 attributes.remove(key); 106 return r; 107 } 108 109 // override 110 bool isReset() { 111 return localReset || remoteReset; 112 } 113 114 // override 115 bool isClosed() { 116 return closeState == CloseState.CLOSED; 117 } 118 119 // override 120 bool isRemotelyClosed() { 121 return closeState == CloseState.REMOTELY_CLOSED; 122 } 123 124 bool isLocallyClosed() { 125 return closeState == CloseState.LOCALLY_CLOSED; 126 } 127 128 // override 129 bool isOpen() { 130 return !isClosed(); 131 } 132 133 // override 134 protected void onIdleExpired(TimeoutException timeout) { 135 version(HUNT_DEBUG) { 136 tracef("Idle timeout %sms expired on %s", 0, this.toString()); // getIdleTimeout() 137 } 138 139 // Notify the application. 140 if (notifyIdleTimeout(this, timeout)) { 141 // Tell the other peer that we timed out. 142 reset(new ResetFrame(getId(), cast(int)ErrorCode.CANCEL_STREAM_ERROR), Callback.NOOP); 143 } 144 } 145 146 // private ConcurrentMap<string, Object> attributes() { 147 // ConcurrentMap<string, Object> map = attributes; 148 // if (map == null) { 149 // map = new ConcurrentHashMap<>(); 150 // if (!attributes.compareAndSet(null, map)) { 151 // map = attributes; 152 // } 153 // } 154 // return map; 155 // } 156 157 // override 158 Listener getListener() { 159 return listener; 160 } 161 162 // override 163 void setListener(Listener listener) { 164 this.listener = listener; 165 } 166 167 // override 168 void process(Frame frame, Callback callback) { 169 // notIdle(); 170 switch (frame.getType()) { 171 case FrameType.HEADERS: { 172 onHeaders(cast(HeadersFrame) frame, callback); 173 break; 174 } 175 case FrameType.DATA: { 176 onData(cast(DataFrame) frame, callback); 177 break; 178 } 179 case FrameType.RST_STREAM: { 180 onReset(cast(ResetFrame) frame, callback); 181 break; 182 } 183 case FrameType.PUSH_PROMISE: { 184 onPush(cast(PushPromiseFrame) frame, callback); 185 break; 186 } 187 case FrameType.WINDOW_UPDATE: { 188 onWindowUpdate(cast(WindowUpdateFrame) frame, callback); 189 break; 190 } 191 default: { 192 throw new UnsupportedOperationException(""); 193 } 194 } 195 } 196 197 private void onHeaders(HeadersFrame frame, Callback callback) { 198 if (updateClose(frame.isEndStream(), CloseStateEvent.RECEIVED)) 199 session.removeStream(this); 200 callback.succeeded(); 201 } 202 203 private void onData(DataFrame frame, Callback callback) { 204 if (getRecvWindow() < 0) { 205 // It's a bad client, it does not deserve to be 206 // treated gently by just resetting the stream. 207 session.close(ErrorCode.FLOW_CONTROL_ERROR, "stream_window_exceeded", Callback.NOOP); 208 callback.failed(new IOException("stream_window_exceeded")); 209 return; 210 } 211 212 // SPEC: remotely closed streams must be replied with a reset. 213 if (isRemotelyClosed()) { 214 reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR), Callback.NOOP); 215 callback.failed(new EOFException("stream_closed")); 216 return; 217 } 218 219 if (isReset()) { 220 // Just drop the frame. 221 callback.failed(new IOException("stream_reset")); 222 return; 223 } 224 225 if (updateClose(frame.isEndStream(), CloseStateEvent.RECEIVED)) 226 session.removeStream(this); 227 notifyData(this, frame, callback); 228 } 229 230 private void onReset(ResetFrame frame, Callback callback) { 231 remoteReset = true; 232 close(); 233 session.removeStream(this); 234 notifyReset(this, frame, callback); 235 } 236 237 private void onPush(PushPromiseFrame frame, Callback callback) { 238 // Pushed streams are implicitly locally closed. 239 // They are closed when receiving an end-stream DATA frame. 240 updateClose(true, CloseStateEvent.AFTER_SEND); 241 callback.succeeded(); 242 } 243 244 private void onWindowUpdate(WindowUpdateFrame frame, Callback callback) { 245 callback.succeeded(); 246 } 247 248 override 249 bool updateClose(bool update, CloseStateEvent event) { 250 version(HUNT_DEBUG) { 251 tracef("Update close for %s update=%s event=%s", this, update, event); 252 } 253 254 if (!update) 255 return false; 256 257 switch (event) { 258 case CloseStateEvent.RECEIVED: 259 return updateCloseAfterReceived(); 260 case CloseStateEvent.BEFORE_SEND: 261 return updateCloseBeforeSend(); 262 case CloseStateEvent.AFTER_SEND: 263 return updateCloseAfterSend(); 264 default: 265 return false; 266 } 267 } 268 269 private bool updateCloseAfterReceived() { 270 while (true) { 271 CloseState current = closeState; 272 switch (current) { 273 case CloseState.NOT_CLOSED: { 274 if (closeState == current) 275 { 276 closeState = CloseState.REMOTELY_CLOSED; 277 return false; 278 } 279 break; 280 } 281 case CloseState.LOCALLY_CLOSING: { 282 // if (closeState.compareAndSet(current, CloseState.CLOSING)) { 283 if (closeState == current) 284 { 285 closeState = CloseState.CLOSING; 286 updateStreamCount(0, 1); 287 return false; 288 } 289 break; 290 } 291 case CloseState.LOCALLY_CLOSED: { 292 close(); 293 return true; 294 } 295 default: { 296 return false; 297 } 298 } 299 } 300 } 301 302 private bool updateCloseBeforeSend() { 303 while (true) { 304 CloseState current = closeState; 305 switch (current) { 306 case CloseState.NOT_CLOSED: { 307 // if (closeState.compareAndSet(current, CloseState.LOCALLY_CLOSING)) 308 if (closeState == current) 309 { 310 closeState = CloseState.LOCALLY_CLOSING; 311 return false; 312 } 313 break; 314 } 315 case CloseState.REMOTELY_CLOSED: { 316 // if (closeState.compareAndSet(current, CloseState.CLOSING)) { 317 if (closeState == current) 318 { 319 closeState = CloseState.CLOSING; 320 updateStreamCount(0, 1); 321 return false; 322 } 323 break; 324 } 325 default: { 326 return false; 327 } 328 } 329 } 330 } 331 332 private bool updateCloseAfterSend() { 333 while (true) { 334 CloseState current = closeState; 335 switch (current) { 336 case CloseState.NOT_CLOSED: 337 case CloseState.LOCALLY_CLOSING: { 338 // if (closeState.compareAndSet(current, CloseState.LOCALLY_CLOSED)) 339 if (closeState == current) 340 { 341 closeState = CloseState.LOCALLY_CLOSING; 342 return false; 343 } 344 break; 345 } 346 case CloseState.REMOTELY_CLOSED: 347 case CloseState.CLOSING: { 348 close(); 349 return true; 350 } 351 default: { 352 return false; 353 } 354 } 355 } 356 } 357 358 int getSendWindow() { 359 return sendWindow.atomicLoad!(); 360 } 361 362 int getRecvWindow() { 363 return recvWindow.atomicLoad!(); 364 } 365 366 override 367 int updateSendWindow(int delta) { 368 int r = sendWindow.atomicLoad!(); sendWindow.atomicOp!"+="(delta); 369 return r; 370 // return sendWindow.getAndAdd(delta); 371 } 372 373 override 374 int updateRecvWindow(int delta) { 375 int r = recvWindow.atomicLoad!(); recvWindow.atomicOp!"+="(delta); 376 return r; 377 } 378 379 override 380 void close() { 381 CloseState oldState = closeState; 382 closeState = CloseState.CLOSED; 383 if (oldState != CloseState.CLOSED) { 384 int deltaClosing = oldState == CloseState.CLOSING ? -1 : 0; 385 updateStreamCount(-1, deltaClosing); 386 // onClose(); 387 } 388 } 389 390 private void updateStreamCount(int deltaStream, int deltaClosing) { 391 (cast(Http2Session) session).updateStreamCount(isLocal(), deltaStream, deltaClosing); 392 } 393 394 private void notifyData(Stream stream, DataFrame frame, Callback callback) { 395 Listener listener = this.listener; 396 if (listener is null) 397 return; 398 try { 399 listener.onData(stream, frame, callback); 400 } catch (Exception x) { 401 info("Failure while notifying listener " ~ listener.toString(), x); 402 } 403 } 404 405 private void notifyReset(Stream stream, ResetFrame frame, Callback callback) { 406 Listener listener = this.listener; 407 if (listener is null) 408 return; 409 try { 410 listener.onReset(stream, frame, callback); 411 } catch (Exception x) { 412 info("Failure while notifying listener " ~ listener.toString(), x); 413 } 414 } 415 416 private bool notifyIdleTimeout(Stream stream, Exception failure) { 417 Listener listener = this.listener; 418 if (listener is null) 419 return true; 420 try { 421 return listener.onIdleTimeout(stream, failure); 422 } catch (Exception x) { 423 info("Failure while notifying listener " ~ listener.toString(), x); 424 return true; 425 } 426 } 427 428 override 429 string toString() { 430 return format("%s@%x#%d{sendWindow=%s,recvWindow=%s,reset=%b,%s}", typeof(this).stringof, 431 toHash(), getId(), sendWindow, recvWindow, isReset(), closeState); 432 } 433 }