[netty5: ChunkedInput ChunkedWriteHandler]-源碼分析

ChunkedInput

ChunkedInput<B> 是 Netty 中用于按塊讀取不定長數據流的接口,常配合 ChunkedWriteHandler 實現流式寫入,支持如文件、流、HTTP 和 WebSocket 等多種數據源。

實現類簡要說明
ChunkedFile用于將常規文件按塊傳輸(使用傳統阻塞 IO)。
ChunkedNioFile用于將 FileChannel 形式的文件通過 NIO 按塊傳輸。
ChunkedNioStreamReadableByteChannel 數據源作為塊狀輸入,適用于 NIO 輸入流。
ChunkedStreamInputStream(阻塞流)按塊讀取并傳輸。
Http2DataChunkedInput專為 HTTP/2 數據幀的按塊輸入設計,用于發送 DATA 幀。
HttpChunkedInputHttpContent 對象(如文件塊)封裝為支持 trailer 的塊狀 HTTP 輸出。
WebSocketChunkedInput用于 WebSocket 數據幀分塊傳輸,支持大幀拆分成多個 WebSocket 幀。
public interface ChunkedInput<B> extends AutoCloseable {// 是否已經讀取到輸入流末尾boolean isEndOfInput() throws Exception;// 從數據流中讀取下一段數據塊(chunk)B readChunk(BufferAllocator allocator) throws Exception;// 返回整個輸入源的長度(如果已知)long length();// 返回目前已經“傳輸”的字節數long progress();
}

ChunkedWriteHandler

ChunkedWriteHandler 是 Netty 中用于分塊寫入大數據流(如文件、視頻流等)的處理器,核心職責是將大數據拆成小塊逐步異步寫入,避免一次性占用大量內存,提高傳輸效率和系統穩定性。主要特點和功能包括:

  1. 分塊寫入:支持 ChunkedInput 類型的數據流,按塊讀取并寫入,適合無法一次性全部加載到內存的大數據。
  2. 異步處理:內部維護一個待寫隊列(PendingWrite),通過事件驅動機制逐塊寫出數據,保證非阻塞的高效傳輸。
  3. 資源管理:在寫入完成或異常關閉時,會自動關閉數據流,釋放資源,防止內存泄漏。
  4. 錯誤處理:遇到寫入失敗或通道關閉時,能正確通知每個待寫任務失敗,并清理隊列。
  5. 流控制:自動管理寫請求,避免寫入過快導致擁塞,通過事件循環調度寫操作。
public class ChunkedWriteHandler implements ChannelHandler {private static final Logger logger = LoggerFactory.getLogger(ChunkedWriteHandler.class);private Queue<PendingWrite> queue;private volatile ChannelHandlerContext ctx;public ChunkedWriteHandler() {}private void allocateQueue() {if (queue == null) {queue = new ArrayDeque<>();}}private boolean queueIsEmpty() {return queue == null || queue.isEmpty();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {this.ctx = ctx;}public void resumeTransfer() {final ChannelHandlerContext ctx = this.ctx;if (ctx == null) {return;}if (ctx.executor().inEventLoop()) {resumeTransfer0(ctx);} else {ctx.executor().execute(() -> resumeTransfer0(ctx));}}private void resumeTransfer0(ChannelHandlerContext ctx) {try {doFlush(ctx);} catch (Exception e) {logger.warn("Unexpected exception while sending chunks.", e);}}// 如果當前有待寫隊列(queue)不為空,或者寫入的消息是 ChunkedInput(分塊數據流),則將寫操作封裝為 PendingWrite 并加入隊列,返回對應的 Future(異步寫結果)。// 否則直接調用下游的 ctx.write(msg)。@Overridepublic Future<Void> write(ChannelHandlerContext ctx, Object msg) {if (!queueIsEmpty() || msg instanceof ChunkedInput) {allocateQueue();Promise<Void> promise = ctx.newPromise();queue.add(new PendingWrite(msg, promise));return promise.asFuture();} else {return ctx.write(msg);}}@Overridepublic void flush(ChannelHandlerContext ctx) {doFlush(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {doFlush(ctx);ctx.fireChannelInactive();}// 實現基于通道寫緩沖區狀態的流控,防止寫過快導致內存溢出@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isWritable()) {doFlush(ctx);}ctx.fireChannelWritabilityChanged();}// 從 queue 中依次取出 PendingWrite,若是 ChunkedInput 則判斷是否寫完,關閉資源,并根據情況調用 fail() 或 success()。private void discard(Throwable cause) {if (queueIsEmpty()) {return;}for (;;) {PendingWrite currentWrite = queue.poll();if (currentWrite == null) {break;}Object message = currentWrite.msg;if (message instanceof ChunkedInput) {ChunkedInput<?> in = (ChunkedInput<?>) message;boolean endOfInput;try {endOfInput = in.isEndOfInput();closeInput(in);} catch (Exception e) {closeInput(in);currentWrite.fail(e);logger.warn("ChunkedInput failed", e);continue;}if (!endOfInput) {if (cause == null) {cause = new ClosedChannelException();}currentWrite.fail(cause);} else {currentWrite.success();}} else {if (cause == null) {cause = new ClosedChannelException();}currentWrite.fail(cause);}}}private void doFlush(final ChannelHandlerContext ctx) {final Channel channel = ctx.channel();// 如果通道不活躍(比如已關閉),調用 discard(null) 清空隊列,釋放資源,// 隨后調用 ctx.flush() 以確保之前寫過但未刷新的數據也被處理,最后直接返回。if (!channel.isActive()) {discard(null);ctx.flush();return;}// 如果待寫隊列為空,直接調用 flush(),然后返回。if (queueIsEmpty()) {ctx.flush();return;}// 標記是否最終需要調用 flush()。boolean requiresFlush = true;// 獲取緩沖區分配器,用于分配內存。BufferAllocator allocator = ctx.bufferAllocator();// 只要通道可寫(寫緩沖區未滿),就嘗試寫數據。防止寫入過快導致擁塞。while (channel.isWritable()) {// 從隊列頭獲取當前待寫項(不移除)final PendingWrite currentWrite = queue.peek();if (currentWrite == null) {break;}// 如果當前待寫的 promise 已經完成(可能之前寫失敗了),直接移除該項并繼續處理下一個。if (currentWrite.promise.isDone()) {queue.remove();continue;}final Object pendingMessage = currentWrite.msg;// 判斷當前待寫消息是否是 ChunkedInput 類型(分塊輸入流)。if (pendingMessage instanceof ChunkedInput) {final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;boolean endOfInput;boolean suspend;Object message = null;try {// 從 ChunkedInput 中讀取下一塊數據,分配內存。message = chunks.readChunk(allocator);// 判斷是否讀取到輸入末尾。endOfInput = chunks.isEndOfInput();// suspend 標記如果當前塊是 null 且未到末尾,說明暫時沒有數據可寫,需要掛起等待更多數據。suspend = message == null && !endOfInput;} catch (final Throwable t) {// 如果讀取過程異常,清理資源,調用失敗回調,并跳出循環。queue.remove();if (message != null) {Resource.dispose(message);}closeInput(chunks);currentWrite.fail(t);break;}// 如果需要掛起等待數據,則退出寫入循環。if (suspend) {break;}// 如果塊數據為空(null),則分配一個空緩沖區寫入,防止寫空消息時出現問題。if (message == null) {message = allocator.allocate(0);}// 如果已到輸入末尾,移除隊列當前項。if (endOfInput) {queue.remove();}// 寫入當前塊并立即刷新。Future<Void> f = ctx.writeAndFlush(message);// 如果已經到末尾,寫完成后調用 handleEndOfInputFuture 處理關閉輸入流、通知完成等。if (endOfInput) {if (f.isDone()) {handleEndOfInputFuture(f, chunks, currentWrite);} else {f.addListener(future -> handleEndOfInputFuture(future, chunks, currentWrite));}} else {// 如果未到末尾,調用 handleFuture 處理寫入完成后的邏輯,比如繼續寫或暫停寫。final boolean resume = !channel.isWritable();if (f.isDone()) {handleFuture(channel, f, chunks, currentWrite, resume);} else {f.addListener(future -> handleFuture(channel, future, chunks, currentWrite, resume));}}// 由于已經調用了 writeAndFlush,此時不需要額外再調用 flush()。requiresFlush = false;} else {// 非 ChunkedInput 處理// 對于非分塊輸入,直接從隊列移除,調用 write() 異步寫入,并將結果與當前 promise 關聯。// 標記需要最后調用 flush()。queue.remove();ctx.write(pendingMessage).cascadeTo(currentWrite.promise);requiresFlush = true;}// 每寫完一個任務后檢測通道狀態,若關閉,則調用 discard 清理剩余隊列,退出循環。if (!channel.isActive()) {discard(new ClosedChannelException());break;}}// 如果循環中沒主動調用 flush(),則最后統一調用。if (requiresFlush) {ctx.flush();}}// 在最后一個 chunk 寫完后調用: 處理 ChunkedInput 寫入完成后的清理邏輯private static void handleEndOfInputFuture(Future<?> future, ChunkedInput<?> input, PendingWrite currentWrite) {closeInput(input);if (future.isFailed()) {currentWrite.fail(future.cause());} else {currentWrite.success();}}// 處理 非末尾 chunk 寫入完成后的回調邏輯, 根據是否寫入成功、是否需要繼續寫,來決定是否恢復 chunk 的發送private void handleFuture(Channel channel, Future<?> future, ChunkedInput<?> input,PendingWrite currentWrite, boolean resume) {if (future.isFailed()) {closeInput(input);currentWrite.fail(future.cause());} else {if (resume && channel.isWritable()) {resumeTransfer();}}}// 關閉分塊輸入流,捕獲并記錄異常。private static void closeInput(ChunkedInput<?> chunks) {try {chunks.close();} catch (Throwable t) {logger.warn("Failed to close a ChunkedInput.", t);}}}

ChunkedWriteHandler.PendingWrite

PendingWrite 是一個寫任務的封裝器,綁定了待寫入數據和寫完成通知,是實現異步分塊寫入的基礎數據結構。

private static final class PendingWrite {final Object msg;final Promise<Void> promise;PendingWrite(Object msg, Promise<Void> promise) {this.msg = msg;this.promise = promise;}void fail(Throwable cause) {promise.tryFailure(cause);if (Resource.isAccessible(msg, false)) {SilentDispose.dispose(msg, logger);}}void success() {if (promise.isDone()) {return;}promise.trySuccess(null);}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/87804.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/87804.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/87804.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

QT 第十二講 --- 控件篇 LineEdit,TextEdit與ComboBox

前言&#xff1a;歡迎進入 QT 控件世界的第十二講&#xff01;在上一講《QT 第十一講 --- 控件篇 LCDnumber&#xff0c;ProgressBar與CalenderWidget》中&#xff0c;我們探索了用于信息展示和狀態反饋的控件&#xff1a;精準的數字顯示器 LCD Number、直觀的進度指示器 Progr…

VSCode遇到的一些小毛病(自動保存、運行后光標不再處于編輯區)

1. 右鍵點擊Run Code沒有觸發自動保存 1. 打開 VS Code 設置&#xff08;Ctrl ,&#xff09; 2. 搜索&#xff1a;code runner save 3. 勾選你需要的 2. 運行后光標仍然處于編輯區&#xff08;容易誤輸入&#xff09; 1. 打開 VS Code 設置&#xff08;Ctrl ,&#xff09; 2.…

Maixcam的使用2

1.單文件和項目&#xff08;多個 py 文件項目/模塊化&#xff09;# 在編寫代碼時&#xff0c;一般兩種模式&#xff0c;執行單個文件&#xff0c;或者執行一個完成項目&#xff08;包含多個 py 文件或者其它資源文件&#xff09;。 單文件模式&#xff1a;MaixVision 創建或者…

征信系統架構思想:打造商業信任基石_東方仙盟—仙盟創夢IDE

一、建設必要性在復雜的商業環境中&#xff0c;企業面臨多元交易對象與業務場景&#xff0c;準確評估合作方信用狀況及潛在價值的難度顯著增加。傳統經驗判斷和簡單背景調查存在局限性&#xff0c;難以滿足現代商業決策需求&#xff0c;因此構建科學的征信體系具有現實必要性。…

網安-XSS-pikachu

介紹 XSS&#xff0c;即跨站腳本攻擊&#xff0c;是指攻擊者利用Web服務器中的代碼漏洞&#xff0c;在頁面中嵌入客戶端腳本&#xff08;通常是一段由JavaScript編寫的惡意代碼&#xff09;&#xff0c;當信任此Web服務器的用戶訪問 Web站點中含有惡意腳本代碼的頁面&#xff…

算法入門——字典樹(C++實現詳解)

字典樹&#xff08;Trie&#xff09;是處理字符串匹配的高效數據結構&#xff0c;廣泛應用于搜索提示、拼寫檢查等場景。本文將帶你從零掌握字典樹的原理與實現&#xff01; 一、什么是字典樹&#xff1f; 字典樹&#xff08;Trie&#xff09;是一種樹形數據結構&#xff0c;…

SpringBoot整合SpringCache緩存

SpringBoot整合SpringCache使用緩存 文章目錄SpringBoot整合SpringCache使用緩存1.介紹2.SpringBoot整合1.導入xml依賴2.配置yml3.使用EnableCaching啟用SpringCache4.Cacheable5.CachePut6.CacheEvict7. Caching8.CacheConfig3.其他屬性配置1.keyGenerator 屬性2. cacheManage…

WPF學習筆記(20)Button與控件模板

Button與控件模板一、 Button默認控件模板詳解二、自定義按鈕模板一、 Button默認控件模板詳解 WPF 中的大多數控件都有默認的控件模板。 這些模板定義了控件的默認外觀和行為&#xff0c;包括控件的布局、背景、前景、邊框、內容等。 官方文檔&#xff1a;https://learn.mic…

藍天居士自傳(1)

藍天居士何許人&#xff1f; 藍天居士是我的筆名&#xff0c;也可以說是號。就好像李白號青蓮居士、歐陽修號六一居士一樣。筆者本名彭昊 —— 一個有不少重名重姓者的名字。 筆者小的時候上語文課&#xff0c;無論是小學、初中抑或是高中&#xff0c;都會有魯迅&#xff08;…

短劇系統開發定制全流程解析:從需求分析到上線的專業指南

一、短劇行業數字化趨勢與系統開發必要性在短視頻內容爆發式增長的時代背景下&#xff0c;短劇作為一種新興的內容形式正在迅速崛起。數據顯示&#xff0c;2023年中國短劇市場規模已突破300億元&#xff0c;用戶規模達到4.5億&#xff0c;年增長率超過200%。這一迅猛發展的市場…

getBoundingClientRect() 詳解:精準獲取元素位置和尺寸

getBoundingClientRect() 是 JavaScript 中一個強大的 DOM API&#xff0c;用于獲取元素在視口中的精確位置和尺寸信息。它返回一個 DOMRect 對象&#xff0c;包含元素的坐標、寬度和高度等關鍵幾何信息。 基本用法 const element document.getElementById(myElement); cons…

EXCEL 基礎技巧

來源&#xff1a;WPS 官網 初步了解WPS表格-WPS學堂https://www.wps.cn/learning/course/detail/id/635.html 1、格式刷 1.1使用格式刷隔行填充顏色。 首先設置部分表格顏色&#xff0c;選中此區域&#xff0c;雙擊點擊格式刷&#xff0c;然后選中其他表格區域。 這樣就可以…

【RK3568 編譯rtl8723DU驅動】

RK3568 編譯rtl8723DU驅動 編譯源碼1.解壓rtl8723du2.修改Makefile 驗證1.加載模塊2.開啟wifi 在驅動開發中&#xff0c;驅動的編譯與集成是實現設備功能的關鍵環節。本文聚焦于基于 RK3568 處理器平臺編譯 RTL8723DU WiFi/BT 二合一模塊驅動的完整流程&#xff0c;涵蓋源碼編譯…

基于Simulink的二關節機器人獨立PD控制仿真

文章目錄 理論模型仿真窗口控制函數目標函數仿真 本文是劉金琨. 機器人控制系統的設計與MATLAB仿真的學習筆記。 理論模型 對于二關節機器人系統&#xff0c;其動力學模型為 D ( q ) q C ( q , q ˙ ) q ˙ r D(q)\ddot qC(q,\dot q)\dot q r D(q)q?C(q,q˙?)q˙?r 式…

【技術架構解析】國產化雙復旦微FPGA+飛騰D2000核心板架構

本文就一款基于飛騰D2000核心板與兩片高性能FPGA的國產化開發主板進行技術解析&#xff0c;包括系統架構、主要硬件模塊、關鍵接口及軟件環境&#xff0c;重點闡述各子系統間的數據路徑與協同工作方式&#xff0c;旨在為行業內同類產品設計與應用提供參考。 隨著國產化要求的加…

Python 數據分析:計算,分組統計1,df.groupby()。聽故事學知識點怎么這么容易?

目錄1 示例代碼2 歡迎糾錯3 論文寫作/Python 學習智能體1 示例代碼 直接上代碼。 def grpby1():xls "book.xls"df pd.DataFrame(pd.read_excel(xls, engine"xlrd"))print(df)"""序號 分類 銷量0 1 文學 51 2 計算機…

【解決“此擴展可能損壞”】Edge瀏覽器(chrome系列通殺))擴展損壞?一招保留數據快速修復

引言 如果你想保留你的數據&#xff0c;敲重點&#xff1a;不要點擊修復&#xff0c;不要修復&#xff0c;不要修復 在使用 Microsoft Edge 瀏覽器時&#xff0c;您可能會遇到擴展程序顯示“此擴展程序可能已損壞”的提示&#xff0c;且啟用按鈕無法點擊。這一問題讓許多用戶感…

AI專業化應用加速落地,安全治理挑戰同步凸顯

7月2日&#xff0c;2025全球數字經濟大會在北京國家會議中心開幕。本屆大會以“建設數字友好城市”為主題&#xff0c;聚焦數字技術對城市發展的影響。開幕式上&#xff0c;一首完全由AI生成的MV成為焦點——從歌詞、譜曲、演唱到視頻制作全流程AI生成&#xff0c;展現人工智能…

Python統一調用多家大模型API指南

隨著大模型技術的快速發展&#xff0c;市場上出現了越來越多的LLM服務提供商&#xff0c;包括OpenAI、Anthropic、Google、百度、阿里云等。作為開發者&#xff0c;我們經常需要在不同的模型之間切換&#xff0c;或者同時使用多個模型來滿足不同的業務需求。本文將詳細介紹如何…

【ESP32】1.編譯、燒錄、創建工程

標題打開一個Hello world工程并燒錄 點擊環境搭建鏈接 遇到的問題&#xff1a; 1.ESP32在VSCODE中燒錄代碼時&#xff0c;跳出窗口&#xff0c;OPenOCD is not running ,do you want to launch it? 可能是OCD沒安裝&#xff0c;重新安裝 ESP-IDF試一下&#xff0c;在終端命令窗…