Kafka 多線程開發消費者實例

目前,計算機的硬件條件已經大大改善,即使是在普通的筆記本電腦上,多核都已經是標配了,更不用說專業的服務器了。如果跑在強勁服務器機器上的應用程序依然是單線程架構,那實在是有點暴殄天物了。不過,Kafka Java Consumer 就是單線程的設計,你是不是感到很驚訝。所以,探究它的多線程消費方案,就顯得非常必要了。

Kafka Java Consumer 設計原理

在開始探究之前,我先簡單闡述下 Kafka Java Consumer 為什么采用單線程的設計。了解了這一點,對我們后面制定多線程方案大有裨益。

談到 Java Consumer API,最重要的當屬它的入口類 KafkaConsumer 了。我們說 KafkaConsumer 是單線程的設計,嚴格來說這是不準確的。因為,從 Kafka 0.10.1.0 版本開始,KafkaConsumer 就變為了雙線程的設計,即用戶主線程和心跳線程

所謂用戶主線程,就是你啟動 Consumer 應用程序 main 方法的那個線程,而新引入的心跳線程(Heartbeat Thread)只負責定期給對應的 Broker 機器發送心跳請求,以標識消費者應用的存活性(liveness)。引入這個心跳線程還有一個目的,那就是期望它能將心跳頻率與主線程調用 KafkaConsumer.poll 方法的頻率分開,從而解耦真實的消息處理邏輯與消費者組成員存活性管理。

不過,雖然有心跳線程,但實際的消息獲取邏輯依然是在用戶主線程中完成的。因此,在消費消息的這個層面上,我們依然可以安全地認為 KafkaConsumer 是單線程的設計。

其實,在社區推出 Java Consumer API 之前,Kafka 中存在著一組統稱為 Scala Consumer 的 API。這組 API,或者說這個 Consumer,也被稱為老版本 Consumer,目前在新版的 Kafka 代碼中已經被完全移除了。

我之所以重提舊事,是想告訴你,老版本 Consumer 是多線程的架構,每個 Consumer 實例在內部為所有訂閱的主題分區創建對應的消息獲取線程,也稱 Fetcher 線程。老版本 Consumer 同時也是阻塞式的(blocking),Consumer 實例啟動后,內部會創建很多阻塞式的消息獲取迭代器。但在很多場景下,Consumer 端是有非阻塞需求的,比如在流處理應用中執行過濾(filter)、連接(join)、分組(group by)等操作時就不能是阻塞式的。基于這個原因,社區為新版本 Consumer 設計了單線程 + 輪詢的機制。這種設計能夠較好地實現非阻塞式的消息獲取。

除此之外,單線程的設計能夠簡化 Consumer 端的設計。Consumer 獲取到消息后,處理消息的邏輯是否采用多線程,完全由你決定。這樣,你就擁有了把消息處理的多線程管理策略從 Consumer 端代碼中剝離的權利。

另外,不論使用哪種編程語言,單線程的設計都比較容易實現。相反,并不是所有的編程語言都能夠很好地支持多線程。從這一點上來說,單線程設計的 Consumer 更容易移植到其他語言上。畢竟,Kafka 社區想要打造上下游生態的話,肯定是希望出現越來越多的客戶端的。

多線程方案

了解了單線程的設計原理之后,我們來具體分析一下 KafkaConsumer 這個類的使用方法,以及如何推演出對應的多線程方案。

首先,我們要明確的是,KafkaConsumer 類不是線程安全的 (thread-safe)。所有的網絡 I/O 處理都是發生在用戶主線程中,因此,你在使用過程中必須要確保線程安全。簡單來說,就是你不能在多個線程中共享同一個 KafkaConsumer 實例,否則程序會拋出 ConcurrentModificationException 異常。

當然了,這也不是絕對的。KafkaConsumer 中有個方法是例外的,它就是wakeup(),你可以在其他線程中安全地調用KafkaConsumer.wakeup()來喚醒 Consumer。

鑒于 KafkaConsumer 不是線程安全的事實,我們能夠制定兩套多線程方案。

  1. 消費者程序啟動多個線程,每個線程維護專屬的 KafkaConsumer 實例,負責完整的消息獲取、消息處理流程。如下圖所示:

總體來說,這兩種方案都會創建多個線程,這些線程都會參與到消息的消費過程中,但各自的思路是不一樣的。

我們來打個比方。比如一個完整的消費者應用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作劃分,也就是說方案 1 會創建多個線程,每個線程完整地執行 1、2、3、4、5,以實現并行處理的目標,它不會進一步分割具體的子任務;而方案 2 則更細粒度化,它會將 1、2 分割出來,用單線程(也可以是多線程)來做,對于 3、4、5,則用另外的多個線程來做。

這兩種方案孰優孰劣呢?應該說是各有千秋。我總結了一下這兩種方案的優缺點,我們先來看看下面這張表格。


推薦閱讀

結合案例深入理解DDD聚合與聚合根

技術架構:作為開發,你真的了解系統嗎-CSDN博客

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/899511.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/899511.shtml
英文地址,請注明出處:http://en.pswp.cn/news/899511.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

zynq7000 + ucos3 + lwip202_v1_2調試過程

1 現在裸機應用上驗證lwip 跑起來可能會報錯,看下面的鏈接解決 zynq 網卡Phy setup error問題 zynq 網卡Phy setup error問題-CSDN博客 2 ping同以后,在zynq上添加ucos系統 鏈接如下: ZYNQ移植uCOSIII_zynq ucos-CSDN博客 3 移植lwip協議…

Android7 Input(二)Linux 驅動層輸入事件管理

概述 在Linux系統中,將鍵盤,鼠標,觸摸屏等這類交互設備交由Linux Input子系統進行管理,Linux Input驅動子系統由于具有良好的和用戶空間交互的接口。因此Linux Input驅動子系統,不止于只管理輸入類型的設備。也可以將其…

Java內存中的Heap(堆)的作用

Java內存中的Heap(堆)的作用 在 Java 的內存模型中,Heap(堆) 是 JVM(Java Virtual Machine)管理的運行時數據區域之一,主要用于存儲程序運行過程中動態分配的對象和數據。它是 Java…

自行車模型與汽車模型的混合策略在自動駕駛中的多維度協同優化

基于動態架構與智能調度的自動駕駛系統設計 #mermaid-svg-1yvF1EzG07ktndY6 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-1yvF1EzG07ktndY6 .error-icon{fill:#552222;}#mermaid-svg-1yvF1EzG07ktndY6 .error-tex…

mysql.8.4.4--初始化報錯--libnuma.so.1缺失

錯誤 mysqld: error while loading shared libraries: libnuma.so.1: cannot open shared object file: No such file or directory解決辦法:下載相關依賴 sudo apt update sudo apt install numactl然后重新初始化 mysqld --initialize

【區塊鏈安全 | 第三篇】主流公鏈以太坊運行機制

文章目錄 1. 以太坊賬戶類型2. 以太坊網絡架構2.1 節點類型2.2 交易流程 3. 共識機制4. Gas 機制4.1 Gas 計算方式4.2 以太坊 EIP-1559 交易機制 5. EVM(以太坊虛擬機)5.1 EVM 結構5.2 EVM 指令5.3 EVM 運行機制 6. 智能合約7. ERC 代幣標準7.1 ERC-207.…

計算機三級信息安全部分英文縮寫

eip,指令寄存器,用于存放指向下一條將執行指令的指針,即返回地址棧頂指針esp基址指針寄存器EBP,基地址數據執行保護DEP(Data Execute Prevention)技術可以設置內存堆棧區的代碼為不可執行狀態,從而防范溢出后代碼的執行…

【Goalng】第九彈-----文件操作、JSON處理

🎁個人主頁:星云愛編程 🔍所屬專欄:【Go】 🎉歡迎大家點贊👍評論📝收藏?文章 長風破浪會有時,直掛云帆濟滄海 目錄 1.文件操作 1.1文件介紹 1.2.文件流 1.3.打開和關閉文件 1…

C#高級:啟動、中止一個指定路徑的exe程序

一、啟動一個exe class Program {static void Main(string[] args){string exePath "D:\測試\Test.exe";// 修改為你要運行的exe路徑StartProcess(exePath);}private static bool StartProcess(string exePath){// 創建一個 ProcessStartInfo 對象來配置進程啟動參…

猜猜我用的是哪個大模型?我的世界游戲界面簡單的模擬效果

我的羅里吧嗦的,根據小朋友的要求,邊聽邊寫邊輸入的提示詞: 請生成一段完整的在網頁中用html5和javascript代碼模擬“我的世界”中游戲場景的互動畫面,要求提供若干人物選項可以選擇,請自行選擇需要使用哪些庫或框架來…

AI知識補全(八):多模態大模型是什么?

名人說:人生如逆旅,我亦是行人。 ——蘇軾《臨江仙送錢穆父》 創作者:Code_流蘇(CSDN)(一個喜歡古詩詞和編程的Coder😊) 上一篇:AI知識補全(七):AI Agent 智能…

更新docker 容器時,提前換后端jar 包,為什么會存在異常

我們現場更新時,通常都是提前將后端jar 包替換了,然后到了更新的時間,只需要更新相關的前端文件和修改各種配置,就行了。 但是最近一次更新操作中,忽然發現,提前更新后端包,會存在依賴丟失問題…

LoRA 模型微調框架核心原理及實現步驟

LoRA(Low-Rank Adaptation)模型微調框架通過低秩矩陣分解原理,實現了對大型預訓練模型的高效微調。其核心原理是:在凍結預訓練模型權重的基礎上,向特定層注入可訓練的低秩矩陣,以極少量參數(通常…

XHR.readyState詳解

XHR.readyState詳解 引言 XHR.readyState是XMLHttpRequest對象的一個屬性,它反映了當前請求的狀態。在Ajax編程中,正確理解和使用XHR.readyState對于調試和確保異步請求的正確執行至關重要。本文將詳細介紹XHR.readyState的屬性值、含義以及在Ajax請求中的具體應用。 XHR.…

MySQL8.4 InnoDB Cluster高可用集群使用指南

簡介 高可用方案 Orchestrator: 可視化 Web 界面管理 MySQL 拓撲結構,并且兼容多種復制架構(異步、半同步、GTID),提供自動和手動的故障轉移。但是8.0.21后 MySQL 更新了主從復制相關命令,Orchestrator無…

擴散模型總結

目錄 定義與原理 發展歷程 正向擴散過程 反向擴散過程 噪聲預測網絡 離散時間模型 連續時間模型 條件擴散模型 生成質量 訓練穩定性 采樣靈活性 圖像生成 音頻合成 文本生成 計算效率 模型復雜度 定義與原理 擴散模型是一種新型的生成模型,其核心原理源于熱力…

【Java】Java核心知識點與相應面試技巧(七)——類與對象(二)

Java 類與對象篇 1.上期面試題解析: 上文鏈接:https://blog.csdn.net/weixin_73492487/article/details/146607026 創建對象時的內存分配過程? ① 加載類 ② 堆內存分配空間 ③ 默認初始化 ④ 顯式初始化 ⑤ 構造器執行 this和super能否同時…

筆記:遇見未來——6G協同創新技術研討會

https://www.cww.net.cn/article?id564308 研討會由中國移動研究院首席科學家易芝玲博士主持。來自清華大學-中國移動聯合研究院、北京郵電大學-中國移動研究院聯合創新中心、東南大學-中國移動研究院聯合創新中心、中關村泛聯移動通信技術創新應用研究院等合作載體的知名教授…

Python Cookbook-4.14 反轉字典

任務 給定一個字典,此字典將不同的鍵映射到不同的值。而你想創建一個反轉的字典,將各個值反映射到鍵。 解決方案 可以創建一個函數,此函數傳遞一個列表推導作為dict的參數以創建需要的字典。 def invert_dict(d):return dict([(v,k) for …

深度學習在測距模型中的應用

一、單目視覺測距和雙目視覺測距簡介 1、單目視覺測距 模型:深度估計(Depth Estimation) 原理:通過深度學習模型(如MonoDepth2、MiDaS)或傳統的計算機視覺方法(如單目相機結合物體大小推斷&am…