1 module hunt.http.codec.websocket.model.extension.compress.CompressExtension; 2 3 // import hunt.http.codec.websocket.frame.DataFrame; 4 // import hunt.http.WebSocketFrame; 5 // import hunt.http.WebSocketCommon; 6 // import hunt.http.codec.websocket.model.extension.AbstractExtension; 7 // import hunt.util.Common; 8 // import hunt.http.utils.concurrent.IteratingCallback; 9 // import hunt.io.BufferUtils; 10 // import hunt.logging; 11 12 13 // import java.io.ByteArrayOutputStream; 14 // import hunt.io.ByteBuffer; 15 // import java.util.ArrayDeque; 16 // import java.util.Queue; 17 // import hunt.concurrency.atomic.AtomicInteger; 18 // import java.util.zip.DataFormatException; 19 // import java.util.zip.Deflater; 20 // import java.util.zip.Inflater; 21 // import java.util.zip.ZipException; 22 23 // abstract class CompressExtension : AbstractExtension { 24 // protected enum byte[] TAIL_BYTES = new byte[]{0x00, 0x00, (byte) 0xFF, (byte) 0xFF}; 25 // protected enum ByteBuffer TAIL_BYTES_BUF = BufferUtils.toBuffer(TAIL_BYTES); 26 27 28 // /** 29 // * Never drop tail bytes 0000FFFF, from any frame type 30 // */ 31 // protected enum int TAIL_DROP_NEVER = 0; 32 // /** 33 // * Always drop tail bytes 0000FFFF, from all frame types 34 // */ 35 // protected enum int TAIL_DROP_ALWAYS = 1; 36 // /** 37 // * Only drop tail bytes 0000FFFF, from fin==true frames 38 // */ 39 // protected enum int TAIL_DROP_FIN_ONLY = 2; 40 41 // /** 42 // * Always set RSV flag, on all frame types 43 // */ 44 // protected enum int RSV_USE_ALWAYS = 0; 45 // /** 46 // * Only set RSV flag on first frame in multi-frame messages. 47 // * <p> 48 // * Note: this automatically means no-continuation frames have the RSV bit set 49 // */ 50 // protected enum int RSV_USE_ONLY_FIRST = 1; 51 52 // /** 53 // * Inflater / Decompressed Buffer Size 54 // */ 55 // protected enum int INFLATE_BUFFER_SIZE = 8 * 1024; 56 57 // /** 58 // * Deflater / Inflater: Maximum Input Buffer Size 59 // */ 60 // protected enum int INPUT_MAX_BUFFER_SIZE = 8 * 1024; 61 62 // /** 63 // * Inflater : Output Buffer Size 64 // */ 65 // private enum int DECOMPRESS_BUF_SIZE = 8 * 1024; 66 67 // private static bool NOWRAP = true; 68 69 // private Queue<FrameEntry> entries = new ArrayDeque<>(); 70 // private IteratingCallback flusher = new Flusher(); 71 // private Deflater deflaterImpl; 72 // private Inflater inflaterImpl; 73 // protected AtomicInteger decompressCount = new AtomicInteger(0); 74 // private int tailDrop = TAIL_DROP_NEVER; 75 // private int rsvUse = RSV_USE_ALWAYS; 76 77 // protected this() { 78 // tailDrop = getTailDropMode(); 79 // rsvUse = getRsvUseMode(); 80 // start(); 81 // } 82 83 // Deflater getDeflater() { 84 // if (deflaterImpl is null) { 85 // deflaterImpl = new Deflater(Deflater.DEFAULT_COMPRESSION, NOWRAP); 86 // } 87 // return deflaterImpl; 88 // } 89 90 // Inflater getInflater() { 91 // if (inflaterImpl is null) { 92 // inflaterImpl = new Inflater(NOWRAP); 93 // } 94 // return inflaterImpl; 95 // } 96 97 // /** 98 // * Indicates use of RSV1 flag for indicating deflation is in use. 99 // */ 100 // override 101 // bool isRsv1User() { 102 // return true; 103 // } 104 105 // /** 106 // * Return the mode of operation for dropping (or keeping) tail bytes in frames generated by compress (outgoing) 107 // * 108 // * @return either {@link #TAIL_DROP_ALWAYS}, {@link #TAIL_DROP_FIN_ONLY}, or {@link #TAIL_DROP_NEVER} 109 // */ 110 // abstract int getTailDropMode(); 111 112 // /** 113 // * Return the mode of operation for RSV flag use in frames generate by compress (outgoing) 114 // * 115 // * @return either {@link #RSV_USE_ALWAYS} or {@link #RSV_USE_ONLY_FIRST} 116 // */ 117 // abstract int getRsvUseMode(); 118 119 // protected void forwardIncoming(Frame frame, ByteAccumulator accumulator) { 120 // DataFrame newFrame = new DataFrame(frame); 121 // // Unset RSV1 since it's not compressed anymore. 122 // newFrame.setRsv1(false); 123 124 // ByteBuffer buffer = BufferUtils.allocate(accumulator.getLength()); 125 // BufferUtils.flipToFill(buffer); 126 // accumulator.transferTo(buffer); 127 // newFrame.setPayload(buffer); 128 // nextIncomingFrame(newFrame); 129 // } 130 131 // protected ByteAccumulator newByteAccumulator() { 132 // int maxSize = Math.max(getPolicy().getMaxTextMessageSize(), getPolicy().getMaxBinaryMessageBufferSize()); 133 // return new ByteAccumulator(maxSize); 134 // } 135 136 // protected void decompress(ByteAccumulator accumulator, ByteBuffer buf) throws DataFormatException { 137 // if ((buf is null) || (!buf.hasRemaining())) { 138 // return; 139 // } 140 // byte[] output = new byte[DECOMPRESS_BUF_SIZE]; 141 142 // Inflater inflater = getInflater(); 143 144 // while (buf.hasRemaining() && inflater.needsInput()) { 145 // if (!supplyInput(inflater, buf)) { 146 // tracef("Needed input, but no buffer could supply input"); 147 // return; 148 // } 149 150 // int read; 151 // while ((read = inflater.inflate(output)) >= 0) { 152 // if (read == 0) { 153 // tracef("Decompress: read 0 %s", toDetail(inflater)); 154 // break; 155 // } else { 156 // // do something with output 157 // version(HUNT_DEBUG) { 158 // tracef("Decompressed %s bytes: %s", read, toDetail(inflater)); 159 // } 160 161 // accumulator.copyChunk(output, 0, read); 162 // } 163 // } 164 // } 165 166 // version(HUNT_DEBUG) { 167 // tracef("Decompress: exiting %s", toDetail(inflater)); 168 // } 169 // } 170 171 // override 172 // void outgoingFrame(Frame frame, Callback callback) { 173 // // We use a queue and an IteratingCallback to handle concurrency. 174 // // We must compress and write atomically, otherwise the compression 175 // // context on the other end gets confused. 176 177 // if (flusher.isFailed()) { 178 // notifyCallbackFailure(callback, new ZipException()); 179 // return; 180 // } 181 182 // FrameEntry entry = new FrameEntry(frame, callback); 183 // version(HUNT_DEBUG) 184 // tracef("Queuing %s", entry); 185 // offerEntry(entry); 186 // flusher.iterate(); 187 // } 188 189 // private void offerEntry(FrameEntry entry) { 190 // synchronized (this) { 191 // entries.offer(entry); 192 // } 193 // } 194 195 // private FrameEntry pollEntry() { 196 // synchronized (this) { 197 // return entries.poll(); 198 // } 199 // } 200 201 // protected void notifyCallbackSuccess(Callback callback) { 202 // try { 203 // if (callback !is null) 204 // callback.succeeded(); 205 // } catch (Throwable x) { 206 // version(HUNT_DEBUG) 207 // tracef("Exception while notifying success of callback " ~ callback, x); 208 // } 209 // } 210 211 // protected void notifyCallbackFailure(Callback callback, Throwable failure) { 212 // try { 213 // if (callback !is null) 214 // callback.failed(failure); 215 // } catch (Throwable x) { 216 // version(HUNT_DEBUG) 217 // tracef("Exception while notifying failure of callback " ~ callback, x); 218 // } 219 // } 220 221 // private static bool supplyInput(Inflater inflater, ByteBuffer buf) { 222 // if (buf is null || buf.remaining() <= 0) { 223 // version(HUNT_DEBUG) { 224 // tracef("No data left left to supply to Inflater"); 225 // } 226 // return false; 227 // } 228 229 // byte input[]; 230 // int inputOffset; 231 // int len; 232 233 // if (buf.hasArray()) { 234 // // no need to create a new byte buffer, just return this one. 235 // len = buf.remaining(); 236 // input = buf.array(); 237 // inputOffset = buf.position() + buf.arrayOffset(); 238 // buf.position(buf.position() + len); 239 // } else { 240 // // Only create an return byte buffer that is reasonable in size 241 // len = Math.min(INPUT_MAX_BUFFER_SIZE, buf.remaining()); 242 // input = new byte[len]; 243 // inputOffset = 0; 244 // buf.get(input, 0, len); 245 // } 246 247 // inflater.setInput(input, inputOffset, len); 248 // version(HUNT_DEBUG) { 249 // tracef("Supplied %s input bytes: %s", input.length, toDetail(inflater)); 250 // } 251 // return true; 252 // } 253 254 // private static bool supplyInput(Deflater deflater, ByteBuffer buf) { 255 // if (buf is null || buf.remaining() <= 0) { 256 // version(HUNT_DEBUG) { 257 // tracef("No data left left to supply to Deflater"); 258 // } 259 // return false; 260 // } 261 262 // byte input[]; 263 // int inputOffset; 264 // int len; 265 266 // if (buf.hasArray()) { 267 // // no need to create a new byte buffer, just return this one. 268 // len = buf.remaining(); 269 // input = buf.array(); 270 // inputOffset = buf.position() + buf.arrayOffset(); 271 // buf.position(buf.position() + len); 272 // } else { 273 // // Only create an return byte buffer that is reasonable in size 274 // len = Math.min(INPUT_MAX_BUFFER_SIZE, buf.remaining()); 275 // input = new byte[len]; 276 // inputOffset = 0; 277 // buf.get(input, 0, len); 278 // } 279 280 // deflater.setInput(input, inputOffset, len); 281 // version(HUNT_DEBUG) { 282 // tracef("Supplied %s input bytes: %s", input.length, toDetail(deflater)); 283 // } 284 // return true; 285 // } 286 287 // private static string toDetail(Inflater inflater) { 288 // return string.format("Inflater[finished=%b,read=%d,written=%d,remaining=%d,in=%d,out=%d]", inflater.finished(), inflater.getBytesRead(), 289 // inflater.getBytesWritten(), inflater.peekRemaining(), inflater.getTotalIn(), inflater.getTotalOut()); 290 // } 291 292 // private static string toDetail(Deflater deflater) { 293 // return string.format("Deflater[finished=%b,read=%d,written=%d,in=%d,out=%d]", deflater.finished(), deflater.getBytesRead(), deflater.getBytesWritten(), 294 // deflater.getTotalIn(), deflater.getTotalOut()); 295 // } 296 297 // static bool endsWithTail(ByteBuffer buf) { 298 // if ((buf is null) || (buf.remaining() < TAIL_BYTES.length)) { 299 // return false; 300 // } 301 // int limit = buf.limit(); 302 // for (int i = TAIL_BYTES.length; i > 0; i--) { 303 // if (buf.get(limit - i) != TAIL_BYTES[TAIL_BYTES.length - i]) { 304 // return false; 305 // } 306 // } 307 // return true; 308 // } 309 310 // override 311 // protected void init() { 312 313 // } 314 315 // override 316 // protected void destroy() { 317 // if (deflaterImpl !is null) 318 // deflaterImpl.end(); 319 // if (inflaterImpl !is null) 320 // inflaterImpl.end(); 321 // } 322 323 // override 324 // string toString() { 325 // return typeof(this).stringof; 326 // } 327 328 // private static class FrameEntry { 329 // private final Frame frame; 330 // private final Callback callback; 331 332 // private FrameEntry(Frame frame, Callback callback) { 333 // this.frame = frame; 334 // this.callback = callback; 335 // } 336 337 // override 338 // string toString() { 339 // return frame.toString(); 340 // } 341 // } 342 343 // private class Flusher : IteratingCallback { 344 // private FrameEntry current; 345 // private bool finished = true; 346 347 // override 348 // protected Action process() throws Exception { 349 // if (finished) { 350 // current = pollEntry(); 351 // tracef("Processing %s", current); 352 // if (current is null) 353 // return Action.IDLE; 354 // deflate(current); 355 // } else { 356 // compress(current, false); 357 // } 358 // return Action.SCHEDULED; 359 // } 360 361 // private void deflate(FrameEntry entry) { 362 // Frame frame = entry.frame; 363 // if (OpCode.isControlFrame(frame.getOpCode())) { 364 // // Do not deflate control frames 365 // nextOutgoingFrame(frame, this); 366 // return; 367 // } 368 369 // compress(entry, true); 370 // } 371 372 // private void compress(FrameEntry entry, bool first) { 373 // // Get a chunk of the payload to avoid to blow 374 // // the heap if the payload is a huge mapped file. 375 // Frame frame = entry.frame; 376 // ByteBuffer data = frame.getPayload(); 377 378 // if (data is null) 379 // data = BufferUtils.EMPTY_BUFFER; 380 381 // int remaining = data.remaining(); 382 // int outputLength = Math.max(256, data.remaining()); 383 // version(HUNT_DEBUG) 384 // tracef("Compressing %s: %s bytes in %s bytes chunk", entry, remaining, outputLength); 385 386 // bool needsCompress = true; 387 388 // Deflater deflater = getDeflater(); 389 390 // if (deflater.needsInput() && !supplyInput(deflater, data)) { 391 // // no input supplied 392 // needsCompress = false; 393 // } 394 395 // ByteArrayOutputStream out = new ByteArrayOutputStream(); 396 397 // byte[] output = new byte[outputLength]; 398 399 // bool fin = frame.isFin(); 400 401 // // Compress the data 402 // while (needsCompress) { 403 // int compressed = deflater.deflate(output, 0, outputLength, Deflater.SYNC_FLUSH); 404 405 // // Append the output for the eventual frame. 406 // version(HUNT_DEBUG) 407 // tracef("Wrote %s bytes to output buffer", compressed); 408 // out.write(output, 0, compressed); 409 410 // if (compressed < outputLength) { 411 // needsCompress = false; 412 // } 413 // } 414 415 // ByteBuffer payload = BufferUtils.toBuffer(out.toByteArray()); 416 417 // if (payload.remaining() > 0) { 418 // // Handle tail bytes generated by SYNC_FLUSH. 419 // version(HUNT_DEBUG) 420 // tracef("compressed bytes[] = %s", BufferUtils.toDetailString(payload)); 421 422 // if (tailDrop == TAIL_DROP_ALWAYS) { 423 // if (endsWithTail(payload)) { 424 // payload.limit(payload.limit() - TAIL_BYTES.length); 425 // } 426 // version(HUNT_DEBUG) 427 // tracef("payload (TAIL_DROP_ALWAYS) = %s", BufferUtils.toDetailString(payload)); 428 // } else if (tailDrop == TAIL_DROP_FIN_ONLY) { 429 // if (frame.isFin() && endsWithTail(payload)) { 430 // payload.limit(payload.limit() - TAIL_BYTES.length); 431 // } 432 // version(HUNT_DEBUG) 433 // tracef("payload (TAIL_DROP_FIN_ONLY) = %s", BufferUtils.toDetailString(payload)); 434 // } 435 // } else if (fin) { 436 // // Special case: 7.2.3.6. Generating an Empty Fragment Manually 437 // // https://tools.ietf.org/html/rfc7692#section-7.2.3.6 438 // payload = BufferUtils.toBuffer(new byte[]{0x00}); 439 // } 440 441 // version(HUNT_DEBUG) { 442 // tracef("Compressed %s: input:%s -> payload:%s", entry, outputLength, payload.remaining()); 443 // } 444 445 // bool continuation = frame.getType().isContinuation() || !first; 446 // DataFrame chunk = new DataFrame(frame, continuation); 447 // if (rsvUse == RSV_USE_ONLY_FIRST) { 448 // chunk.setRsv1(!continuation); 449 // } else { 450 // // always set 451 // chunk.setRsv1(true); 452 // } 453 // chunk.setPayload(payload); 454 // chunk.setFin(fin); 455 456 // nextOutgoingFrame(chunk, this); 457 // } 458 459 // override 460 // protected void onCompleteSuccess() { 461 // // This IteratingCallback never completes. 462 // } 463 464 // override 465 // protected void onCompleteFailure(Throwable x) { 466 // // Fail all the frames in the queue. 467 // FrameEntry entry; 468 // while ((entry = pollEntry()) !is null) 469 // notifyCallbackFailure(entry.callback, x); 470 // } 471 472 // override 473 // void succeeded() { 474 // if (finished) 475 // notifyCallbackSuccess(current.callback); 476 // super.succeeded(); 477 // } 478 479 // override 480 // void failed(Throwable x) { 481 // notifyCallbackFailure(current.callback, x); 482 // // If something went wrong, very likely the compression context 483 // // will be invalid, so we need to fail this IteratingCallback. 484 // LOG.warn("flush frame failed", x); 485 // super.failed(x); 486 // } 487 488 // } 489 // }