1 module hunt.http.codec.websocket.model.extension.fragment.FragmentExtension; 2 3 import hunt.http.codec.websocket.frame.DataFrame; 4 import hunt.http.WebSocketFrame; 5 import hunt.http.codec.websocket.model.ExtensionConfig; 6 import hunt.http.WebSocketCommon; 7 import hunt.http.codec.websocket.model.extension.AbstractExtension; 8 import hunt.util.Common; 9 // import hunt.http.utils.concurrent.IteratingCallback; 10 import hunt.logging; 11 import hunt.collection; 12 13 14 /** 15 * Fragment Extension 16 */ 17 // class FragmentExtension : AbstractExtension { 18 19 20 // private Queue!(FrameEntry) entries; 21 // private IteratingCallback flusher; 22 // private int maxLength; 23 24 // this() { 25 // entries = new ArrayDeque!(FrameEntry)(); 26 // flusher = new Flusher(); 27 // start(); 28 // } 29 30 // override 31 // string getName() { 32 // return "fragment"; 33 // } 34 35 // override 36 // void incomingFrame(Frame frame) { 37 // nextIncomingFrame(frame); 38 // } 39 40 // override 41 // void outgoingFrame(Frame frame, Callback callback) { 42 // ByteBuffer payload = frame.getPayload(); 43 // int length = payload !is null ? payload.remaining() : 0; 44 // if (OpCode.isControlFrame(frame.getOpCode()) || maxLength <= 0 || length <= maxLength) { 45 // nextOutgoingFrame(frame, callback); 46 // return; 47 // } 48 49 // FrameEntry entry = new FrameEntry(frame, callback); 50 // version(HUNT_DEBUG) 51 // tracef("Queuing %s", entry); 52 // offerEntry(entry); 53 // flusher.iterate(); 54 // } 55 56 // override 57 // void setConfig(ExtensionConfig config) { 58 // super.setConfig(config); 59 // maxLength = config.getParameter("maxLength", -1); 60 // } 61 62 // private void offerEntry(FrameEntry entry) { 63 // synchronized (this) { 64 // entries.offer(entry); 65 // } 66 // } 67 68 // private FrameEntry pollEntry() { 69 // synchronized (this) { 70 // return entries.poll(); 71 // } 72 // } 73 74 // override 75 // protected void init() { 76 77 // } 78 79 // override 80 // protected void destroy() { 81 82 // } 83 84 // private static class FrameEntry { 85 // private final Frame frame; 86 // private final Callback callback; 87 88 // private this(Frame frame, Callback callback) { 89 // this.frame = frame; 90 // this.callback = callback; 91 // } 92 93 // override 94 // string toString() { 95 // return frame.toString(); 96 // } 97 // } 98 99 // private class Flusher : IteratingCallback { 100 // private FrameEntry current; 101 // private bool finished = true; 102 103 // override 104 // protected Action process() { 105 // if (finished) { 106 // current = pollEntry(); 107 // tracef("Processing %s", current); 108 // if (current is null) 109 // return Action.IDLE; 110 // fragment(current, true); 111 // } else { 112 // fragment(current, false); 113 // } 114 // return Action.SCHEDULED; 115 // } 116 117 // private void fragment(FrameEntry entry, bool first) { 118 // Frame frame = entry.frame; 119 // ByteBuffer payload = frame.getPayload(); 120 // int remaining = payload.remaining(); 121 // int length = Math.min(remaining, maxLength); 122 // finished = length == remaining; 123 124 // bool continuation = frame.getType().isContinuation() || !first; 125 // DataFrame fragment = new DataFrame(frame, continuation); 126 // bool fin = frame.isFin() && finished; 127 // fragment.setFin(fin); 128 129 // int limit = payload.limit(); 130 // int newLimit = payload.position() + length; 131 // payload.limit(newLimit); 132 // ByteBuffer payloadFragment = payload.slice(); 133 // payload.limit(limit); 134 // fragment.setPayload(payloadFragment); 135 // version(HUNT_DEBUG) 136 // tracef("Fragmented %s->%s", frame, fragment); 137 // payload.position(newLimit); 138 139 // nextOutgoingFrame(fragment, this); 140 // } 141 142 // override 143 // protected void onCompleteSuccess() { 144 // // This IteratingCallback never completes. 145 // } 146 147 // override 148 // protected void onCompleteFailure(Throwable x) { 149 // // This IteratingCallback never fails. 150 // // The callback are those provided by WriteCallback (implemented 151 // // below) and even in case of writeFailed() we call succeeded(). 152 // } 153 154 // override 155 // void succeeded() { 156 // // Notify first then call succeeded(), otherwise 157 // // write callbacks may be invoked out of order. 158 // notifyCallbackSuccess(current.callback); 159 // super.succeeded(); 160 // } 161 162 // override 163 // void failed(Throwable x) { 164 // // Notify first, the call succeeded() to drain the queue. 165 // // We don't want to call failed(x) because that will put 166 // // this flusher into a final state that cannot be exited, 167 // // and the failure of a frame may not mean that the whole 168 // // connection is now invalid. 169 // notifyCallbackFailure(current.callback, x); 170 // succeeded(); 171 // } 172 173 // private void notifyCallbackSuccess(Callback callback) { 174 // try { 175 // if (callback !is null) 176 // callback.succeeded(); 177 // } catch (Throwable x) { 178 // version(HUNT_DEBUG) 179 // tracef("Exception while notifying success of callback " ~ callback, x); 180 // } 181 // } 182 183 // private void notifyCallbackFailure(Callback callback, Throwable failure) { 184 // try { 185 // if (callback !is null) 186 // callback.failed(failure); 187 // } catch (Throwable x) { 188 // version(HUNT_DEBUG) 189 // tracef("Exception while notifying failure of callback " ~ callback, x); 190 // } 191 // } 192 // } 193 // }