ChunkedInput
ChunkedInput<B>
是 Netty 中用于按塊讀取不定長數據流的接口,常配合 ChunkedWriteHandler
實現流式寫入,支持如文件、流、HTTP 和 WebSocket 等多種數據源。
實現類 | 簡要說明 |
---|---|
ChunkedFile | 用于將常規文件按塊傳輸(使用傳統阻塞 IO)。 |
ChunkedNioFile | 用于將 FileChannel 形式的文件通過 NIO 按塊傳輸。 |
ChunkedNioStream | 將 ReadableByteChannel 數據源作為塊狀輸入,適用于 NIO 輸入流。 |
ChunkedStream | 將 InputStream (阻塞流)按塊讀取并傳輸。 |
Http2DataChunkedInput | 專為 HTTP/2 數據幀的按塊輸入設計,用于發送 DATA 幀。 |
HttpChunkedInput | 將 HttpContent 對象(如文件塊)封裝為支持 trailer 的塊狀 HTTP 輸出。 |
WebSocketChunkedInput | 用于 WebSocket 數據幀分塊傳輸,支持大幀拆分成多個 WebSocket 幀。 |
public interface ChunkedInput<B> extends AutoCloseable {// 是否已經讀取到輸入流末尾boolean isEndOfInput() throws Exception;// 從數據流中讀取下一段數據塊(chunk)B readChunk(BufferAllocator allocator) throws Exception;// 返回整個輸入源的長度(如果已知)long length();// 返回目前已經“傳輸”的字節數long progress();
}
ChunkedWriteHandler
ChunkedWriteHandler 是 Netty 中用于分塊寫入大數據流(如文件、視頻流等)的處理器,核心職責是將大數據拆成小塊逐步異步寫入,避免一次性占用大量內存,提高傳輸效率和系統穩定性。主要特點和功能包括:
- 分塊寫入:支持 ChunkedInput 類型的數據流,按塊讀取并寫入,適合無法一次性全部加載到內存的大數據。
- 異步處理:內部維護一個待寫隊列(PendingWrite),通過事件驅動機制逐塊寫出數據,保證非阻塞的高效傳輸。
- 資源管理:在寫入完成或異常關閉時,會自動關閉數據流,釋放資源,防止內存泄漏。
- 錯誤處理:遇到寫入失敗或通道關閉時,能正確通知每個待寫任務失敗,并清理隊列。
- 流控制:自動管理寫請求,避免寫入過快導致擁塞,通過事件循環調度寫操作。
public class ChunkedWriteHandler implements ChannelHandler {private static final Logger logger = LoggerFactory.getLogger(ChunkedWriteHandler.class);private Queue<PendingWrite> queue;private volatile ChannelHandlerContext ctx;public ChunkedWriteHandler() {}private void allocateQueue() {if (queue == null) {queue = new ArrayDeque<>();}}private boolean queueIsEmpty() {return queue == null || queue.isEmpty();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {this.ctx = ctx;}public void resumeTransfer() {final ChannelHandlerContext ctx = this.ctx;if (ctx == null) {return;}if (ctx.executor().inEventLoop()) {resumeTransfer0(ctx);} else {ctx.executor().execute(() -> resumeTransfer0(ctx));}}private void resumeTransfer0(ChannelHandlerContext ctx) {try {doFlush(ctx);} catch (Exception e) {logger.warn("Unexpected exception while sending chunks.", e);}}// 如果當前有待寫隊列(queue)不為空,或者寫入的消息是 ChunkedInput(分塊數據流),則將寫操作封裝為 PendingWrite 并加入隊列,返回對應的 Future(異步寫結果)。// 否則直接調用下游的 ctx.write(msg)。@Overridepublic Future<Void> write(ChannelHandlerContext ctx, Object msg) {if (!queueIsEmpty() || msg instanceof ChunkedInput) {allocateQueue();Promise<Void> promise = ctx.newPromise();queue.add(new PendingWrite(msg, promise));return promise.asFuture();} else {return ctx.write(msg);}}@Overridepublic void flush(ChannelHandlerContext ctx) {doFlush(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {doFlush(ctx);ctx.fireChannelInactive();}// 實現基于通道寫緩沖區狀態的流控,防止寫過快導致內存溢出@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isWritable()) {doFlush(ctx);}ctx.fireChannelWritabilityChanged();}// 從 queue 中依次取出 PendingWrite,若是 ChunkedInput 則判斷是否寫完,關閉資源,并根據情況調用 fail() 或 success()。private void discard(Throwable cause) {if (queueIsEmpty()) {return;}for (;;) {PendingWrite currentWrite = queue.poll();if (currentWrite == null) {break;}Object message = currentWrite.msg;if (message instanceof ChunkedInput) {ChunkedInput<?> in = (ChunkedInput<?>) message;boolean endOfInput;try {endOfInput = in.isEndOfInput();closeInput(in);} catch (Exception e) {closeInput(in);currentWrite.fail(e);logger.warn("ChunkedInput failed", e);continue;}if (!endOfInput) {if (cause == null) {cause = new ClosedChannelException();}currentWrite.fail(cause);} else {currentWrite.success();}} else {if (cause == null) {cause = new ClosedChannelException();}currentWrite.fail(cause);}}}private void doFlush(final ChannelHandlerContext ctx) {final Channel channel = ctx.channel();// 如果通道不活躍(比如已關閉),調用 discard(null) 清空隊列,釋放資源,// 隨后調用 ctx.flush() 以確保之前寫過但未刷新的數據也被處理,最后直接返回。if (!channel.isActive()) {discard(null);ctx.flush();return;}// 如果待寫隊列為空,直接調用 flush(),然后返回。if (queueIsEmpty()) {ctx.flush();return;}// 標記是否最終需要調用 flush()。boolean requiresFlush = true;// 獲取緩沖區分配器,用于分配內存。BufferAllocator allocator = ctx.bufferAllocator();// 只要通道可寫(寫緩沖區未滿),就嘗試寫數據。防止寫入過快導致擁塞。while (channel.isWritable()) {// 從隊列頭獲取當前待寫項(不移除)final PendingWrite currentWrite = queue.peek();if (currentWrite == null) {break;}// 如果當前待寫的 promise 已經完成(可能之前寫失敗了),直接移除該項并繼續處理下一個。if (currentWrite.promise.isDone()) {queue.remove();continue;}final Object pendingMessage = currentWrite.msg;// 判斷當前待寫消息是否是 ChunkedInput 類型(分塊輸入流)。if (pendingMessage instanceof ChunkedInput) {final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;boolean endOfInput;boolean suspend;Object message = null;try {// 從 ChunkedInput 中讀取下一塊數據,分配內存。message = chunks.readChunk(allocator);// 判斷是否讀取到輸入末尾。endOfInput = chunks.isEndOfInput();// suspend 標記如果當前塊是 null 且未到末尾,說明暫時沒有數據可寫,需要掛起等待更多數據。suspend = message == null && !endOfInput;} catch (final Throwable t) {// 如果讀取過程異常,清理資源,調用失敗回調,并跳出循環。queue.remove();if (message != null) {Resource.dispose(message);}closeInput(chunks);currentWrite.fail(t);break;}// 如果需要掛起等待數據,則退出寫入循環。if (suspend) {break;}// 如果塊數據為空(null),則分配一個空緩沖區寫入,防止寫空消息時出現問題。if (message == null) {message = allocator.allocate(0);}// 如果已到輸入末尾,移除隊列當前項。if (endOfInput) {queue.remove();}// 寫入當前塊并立即刷新。Future<Void> f = ctx.writeAndFlush(message);// 如果已經到末尾,寫完成后調用 handleEndOfInputFuture 處理關閉輸入流、通知完成等。if (endOfInput) {if (f.isDone()) {handleEndOfInputFuture(f, chunks, currentWrite);} else {f.addListener(future -> handleEndOfInputFuture(future, chunks, currentWrite));}} else {// 如果未到末尾,調用 handleFuture 處理寫入完成后的邏輯,比如繼續寫或暫停寫。final boolean resume = !channel.isWritable();if (f.isDone()) {handleFuture(channel, f, chunks, currentWrite, resume);} else {f.addListener(future -> handleFuture(channel, future, chunks, currentWrite, resume));}}// 由于已經調用了 writeAndFlush,此時不需要額外再調用 flush()。requiresFlush = false;} else {// 非 ChunkedInput 處理// 對于非分塊輸入,直接從隊列移除,調用 write() 異步寫入,并將結果與當前 promise 關聯。// 標記需要最后調用 flush()。queue.remove();ctx.write(pendingMessage).cascadeTo(currentWrite.promise);requiresFlush = true;}// 每寫完一個任務后檢測通道狀態,若關閉,則調用 discard 清理剩余隊列,退出循環。if (!channel.isActive()) {discard(new ClosedChannelException());break;}}// 如果循環中沒主動調用 flush(),則最后統一調用。if (requiresFlush) {ctx.flush();}}// 在最后一個 chunk 寫完后調用: 處理 ChunkedInput 寫入完成后的清理邏輯private static void handleEndOfInputFuture(Future<?> future, ChunkedInput<?> input, PendingWrite currentWrite) {closeInput(input);if (future.isFailed()) {currentWrite.fail(future.cause());} else {currentWrite.success();}}// 處理 非末尾 chunk 寫入完成后的回調邏輯, 根據是否寫入成功、是否需要繼續寫,來決定是否恢復 chunk 的發送private void handleFuture(Channel channel, Future<?> future, ChunkedInput<?> input,PendingWrite currentWrite, boolean resume) {if (future.isFailed()) {closeInput(input);currentWrite.fail(future.cause());} else {if (resume && channel.isWritable()) {resumeTransfer();}}}// 關閉分塊輸入流,捕獲并記錄異常。private static void closeInput(ChunkedInput<?> chunks) {try {chunks.close();} catch (Throwable t) {logger.warn("Failed to close a ChunkedInput.", t);}}}
ChunkedWriteHandler.PendingWrite
PendingWrite 是一個寫任務的封裝器,綁定了待寫入數據和寫完成通知,是實現異步分塊寫入的基礎數據結構。
private static final class PendingWrite {final Object msg;final Promise<Void> promise;PendingWrite(Object msg, Promise<Void> promise) {this.msg = msg;this.promise = promise;}void fail(Throwable cause) {promise.tryFailure(cause);if (Resource.isAccessible(msg, false)) {SilentDispose.dispose(msg, logger);}}void success() {if (promise.isDone()) {return;}promise.trySuccess(null);}
}