目前,計算機的硬件條件已經大大改善,即使是在普通的筆記本電腦上,多核都已經是標配了,更不用說專業的服務器了。如果跑在強勁服務器機器上的應用程序依然是單線程架構,那實在是有點暴殄天物了。不過,Kafka Java Consumer 就是單線程的設計,你是不是感到很驚訝。所以,探究它的多線程消費方案,就顯得非常必要了。
Kafka Java Consumer 設計原理
在開始探究之前,我先簡單闡述下 Kafka Java Consumer 為什么采用單線程的設計。了解了這一點,對我們后面制定多線程方案大有裨益。
談到 Java Consumer API,最重要的當屬它的入口類 KafkaConsumer 了。我們說 KafkaConsumer 是單線程的設計,嚴格來說這是不準確的。因為,從 Kafka 0.10.1.0 版本開始,KafkaConsumer 就變為了雙線程的設計,即用戶主線程和心跳線程。
所謂用戶主線程,就是你啟動 Consumer 應用程序 main 方法的那個線程,而新引入的心跳線程(Heartbeat Thread)只負責定期給對應的 Broker 機器發送心跳請求,以標識消費者應用的存活性(liveness)。引入這個心跳線程還有一個目的,那就是期望它能將心跳頻率與主線程調用 KafkaConsumer.poll 方法的頻率分開,從而解耦真實的消息處理邏輯與消費者組成員存活性管理。
不過,雖然有心跳線程,但實際的消息獲取邏輯依然是在用戶主線程中完成的。因此,在消費消息的這個層面上,我們依然可以安全地認為 KafkaConsumer 是單線程的設計。
其實,在社區推出 Java Consumer API 之前,Kafka 中存在著一組統稱為 Scala Consumer 的 API。這組 API,或者說這個 Consumer,也被稱為老版本 Consumer,目前在新版的 Kafka 代碼中已經被完全移除了。
我之所以重提舊事,是想告訴你,老版本 Consumer 是多線程的架構,每個 Consumer 實例在內部為所有訂閱的主題分區創建對應的消息獲取線程,也稱 Fetcher 線程。老版本 Consumer 同時也是阻塞式的(blocking),Consumer 實例啟動后,內部會創建很多阻塞式的消息獲取迭代器。但在很多場景下,Consumer 端是有非阻塞需求的,比如在流處理應用中執行過濾(filter)、連接(join)、分組(group by)等操作時就不能是阻塞式的。基于這個原因,社區為新版本 Consumer 設計了單線程 + 輪詢的機制。這種設計能夠較好地實現非阻塞式的消息獲取。
除此之外,單線程的設計能夠簡化 Consumer 端的設計。Consumer 獲取到消息后,處理消息的邏輯是否采用多線程,完全由你決定。這樣,你就擁有了把消息處理的多線程管理策略從 Consumer 端代碼中剝離的權利。
另外,不論使用哪種編程語言,單線程的設計都比較容易實現。相反,并不是所有的編程語言都能夠很好地支持多線程。從這一點上來說,單線程設計的 Consumer 更容易移植到其他語言上。畢竟,Kafka 社區想要打造上下游生態的話,肯定是希望出現越來越多的客戶端的。
多線程方案
了解了單線程的設計原理之后,我們來具體分析一下 KafkaConsumer 這個類的使用方法,以及如何推演出對應的多線程方案。
首先,我們要明確的是,KafkaConsumer 類不是線程安全的 (thread-safe)。所有的網絡 I/O 處理都是發生在用戶主線程中,因此,你在使用過程中必須要確保線程安全。簡單來說,就是你不能在多個線程中共享同一個 KafkaConsumer 實例,否則程序會拋出 ConcurrentModificationException 異常。
當然了,這也不是絕對的。KafkaConsumer 中有個方法是例外的,它就是wakeup(),你可以在其他線程中安全地調用KafkaConsumer.wakeup()來喚醒 Consumer。
鑒于 KafkaConsumer 不是線程安全的事實,我們能夠制定兩套多線程方案。
- 消費者程序啟動多個線程,每個線程維護專屬的 KafkaConsumer 實例,負責完整的消息獲取、消息處理流程。如下圖所示:
總體來說,這兩種方案都會創建多個線程,這些線程都會參與到消息的消費過程中,但各自的思路是不一樣的。
我們來打個比方。比如一個完整的消費者應用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作劃分,也就是說方案 1 會創建多個線程,每個線程完整地執行 1、2、3、4、5,以實現并行處理的目標,它不會進一步分割具體的子任務;而方案 2 則更細粒度化,它會將 1、2 分割出來,用單線程(也可以是多線程)來做,對于 3、4、5,則用另外的多個線程來做。
這兩種方案孰優孰劣呢?應該說是各有千秋。我總結了一下這兩種方案的優缺點,我們先來看看下面這張表格。
推薦閱讀
結合案例深入理解DDD聚合與聚合根
技術架構:作為開發,你真的了解系統嗎-CSDN博客