1 module hunt.http.codec.websocket.model.extension.fragment.FragmentExtension;
2 
3 import hunt.http.codec.websocket.frame.DataFrame;
4 import hunt.http.WebSocketFrame;
5 import hunt.http.codec.websocket.model.ExtensionConfig;
6 import hunt.http.WebSocketCommon;
7 import hunt.http.codec.websocket.model.extension.AbstractExtension;
8 import hunt.util.Common;
9 // import hunt.http.utils.concurrent.IteratingCallback;
10 import hunt.logging;
11 import hunt.collection;
12 
13 
14 /**
15  * Fragment Extension
16  */
17 // class FragmentExtension : AbstractExtension {
18 
19 
20 //     private Queue!(FrameEntry) entries;
21 //     private IteratingCallback flusher;
22 //     private int maxLength;
23 
24 //     this() {
25 //         entries = new ArrayDeque!(FrameEntry)();
26 //         flusher = new Flusher();
27 //         start();
28 //     }
29 
30 //     override
31 //     string getName() {
32 //         return "fragment";
33 //     }
34 
35 //     override
36 //     void incomingFrame(Frame frame) {
37 //         nextIncomingFrame(frame);
38 //     }
39 
40 //     override
41 //     void outgoingFrame(Frame frame, Callback callback) {
42 //         ByteBuffer payload = frame.getPayload();
43 //         int length = payload !is null ? payload.remaining() : 0;
44 //         if (OpCode.isControlFrame(frame.getOpCode()) || maxLength <= 0 || length <= maxLength) {
45 //             nextOutgoingFrame(frame, callback);
46 //             return;
47 //         }
48 
49 //         FrameEntry entry = new FrameEntry(frame, callback);
50 //         version(HUNT_DEBUG)
51 //             tracef("Queuing %s", entry);
52 //         offerEntry(entry);
53 //         flusher.iterate();
54 //     }
55 
56 //     override
57 //     void setConfig(ExtensionConfig config) {
58 //         super.setConfig(config);
59 //         maxLength = config.getParameter("maxLength", -1);
60 //     }
61 
62 //     private void offerEntry(FrameEntry entry) {
63 //         synchronized (this) {
64 //             entries.offer(entry);
65 //         }
66 //     }
67 
68 //     private FrameEntry pollEntry() {
69 //         synchronized (this) {
70 //             return entries.poll();
71 //         }
72 //     }
73 
74 //     override
75 //     protected void init() {
76 
77 //     }
78 
79 //     override
80 //     protected void destroy() {
81 
82 //     }
83 
84 //     private static class FrameEntry {
85 //         private final Frame frame;
86 //         private final Callback callback;
87 
88 //         private this(Frame frame, Callback callback) {
89 //             this.frame = frame;
90 //             this.callback = callback;
91 //         }
92 
93 //         override
94 //         string toString() {
95 //             return frame.toString();
96 //         }
97 //     }
98 
99 //     private class Flusher : IteratingCallback {
100 //         private FrameEntry current;
101 //         private bool finished = true;
102 
103 //         override
104 //         protected Action process() {
105 //             if (finished) {
106 //                 current = pollEntry();
107 //                 tracef("Processing %s", current);
108 //                 if (current is null)
109 //                     return Action.IDLE;
110 //                 fragment(current, true);
111 //             } else {
112 //                 fragment(current, false);
113 //             }
114 //             return Action.SCHEDULED;
115 //         }
116 
117 //         private void fragment(FrameEntry entry, bool first) {
118 //             Frame frame = entry.frame;
119 //             ByteBuffer payload = frame.getPayload();
120 //             int remaining = payload.remaining();
121 //             int length = Math.min(remaining, maxLength);
122 //             finished = length == remaining;
123 
124 //             bool continuation = frame.getType().isContinuation() || !first;
125 //             DataFrame fragment = new DataFrame(frame, continuation);
126 //             bool fin = frame.isFin() && finished;
127 //             fragment.setFin(fin);
128 
129 //             int limit = payload.limit();
130 //             int newLimit = payload.position() + length;
131 //             payload.limit(newLimit);
132 //             ByteBuffer payloadFragment = payload.slice();
133 //             payload.limit(limit);
134 //             fragment.setPayload(payloadFragment);
135 //             version(HUNT_DEBUG)
136 //                 tracef("Fragmented %s->%s", frame, fragment);
137 //             payload.position(newLimit);
138 
139 //             nextOutgoingFrame(fragment, this);
140 //         }
141 
142 //         override
143 //         protected void onCompleteSuccess() {
144 //             // This IteratingCallback never completes.
145 //         }
146 
147 //         override
148 //         protected void onCompleteFailure(Throwable x) {
149 //             // This IteratingCallback never fails.
150 //             // The callback are those provided by WriteCallback (implemented
151 //             // below) and even in case of writeFailed() we call succeeded().
152 //         }
153 
154 //         override
155 //         void succeeded() {
156 //             // Notify first then call succeeded(), otherwise
157 //             // write callbacks may be invoked out of order.
158 //             notifyCallbackSuccess(current.callback);
159 //             super.succeeded();
160 //         }
161 
162 //         override
163 //         void failed(Throwable x) {
164 //             // Notify first, the call succeeded() to drain the queue.
165 //             // We don't want to call failed(x) because that will put
166 //             // this flusher into a final state that cannot be exited,
167 //             // and the failure of a frame may not mean that the whole
168 //             // connection is now invalid.
169 //             notifyCallbackFailure(current.callback, x);
170 //             succeeded();
171 //         }
172 
173 //         private void notifyCallbackSuccess(Callback callback) {
174 //             try {
175 //                 if (callback !is null)
176 //                     callback.succeeded();
177 //             } catch (Throwable x) {
178 //                 version(HUNT_DEBUG)
179 //                     tracef("Exception while notifying success of callback " ~ callback, x);
180 //             }
181 //         }
182 
183 //         private void notifyCallbackFailure(Callback callback, Throwable failure) {
184 //             try {
185 //                 if (callback !is null)
186 //                     callback.failed(failure);
187 //             } catch (Throwable x) {
188 //                 version(HUNT_DEBUG)
189 //                     tracef("Exception while notifying failure of callback " ~ callback, x);
190 //             }
191 //         }
192 //     }
193 // }