1.核心概念與架構
1.1.消費者與消費者組
Kafka消費者是訂閱主題(Topic)并拉取消息的客戶端實例,其核心邏輯通過KafkaConsumer
類實現。消費者組(Consumer Group)是由多個邏輯關聯的消費者組成的集合。
- 核心規則
-
同一分區獨占性: 一個分區(
Partition
)只能被同一消費者組內的一個消費者消費,但不同消費者組可同時消費同一分區,實現廣播模式。 -
負載均衡與容錯: 通過動態分區分配(
Rebalance
)實現消費者增減時的負載均衡,支持水平擴展和容錯恢復。
- 消息投遞模式
-
點對點模式: 所有消費者屬于同一組,消息被均勻分配給組內消費者,每條消息僅被消費一次。
-
發布/訂閱模式: 消費者屬于不同組,消息廣播到所有組,每個組獨立消費全量數據。
消費者與消費者組的這種模式可以讓整體的消費能力具備橫向伸縮性,可以增加或減少消費者的個數來提高或降低整體的消費能力。注:消費者個數不能大于分區數。
1.2.核心類與組件
類名 | 作用 |
---|---|
KafkaConsumer | 消費者入口類,封裝所有消費邏輯(線程不安全,需單線程操作) |
ConsumerNetworkClient | 網絡通信層,管理與 Broker 的 TCP 連接和請求發送/接收 |
Fetcher | 消息拉取核心邏輯,處理消息批次、反序列化、異常重試 |
SubscriptionState | 維護訂閱狀態(主題、分區分配、消費偏移量等) |
ConsumerCoordinator | 消費者協調器,負責消費者組管理、心跳發送、分區再平衡(Rebalance) |
Deserializer | 反序列化器接口,將字節數組轉換為 Java 對象 |
2.基礎消費者示例
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class BasicKafkaConsumer {public static void main(String[] args) {// 1. 配置消費者屬性Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // 消費者組IDprops.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 當無提交offset時,從最早的消息開始props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自動提交偏移量props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // 每秒自動提交// 2. 創建消費者實例try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {// 3. 訂閱主題(支持多個主題和正則表達式)consumer.subscribe(Collections.singletonList("test-topic"));// 4. 持續拉取消息while (true) {// poll()參數為等待時間(毫秒),返回消息集合ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 處理消息System.out.printf("Received message: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",record.topic(),record.partition(),record.offset(),record.key(),record.value());}}}}
}
3.源碼及核心功能解析
3.1.訂閱主題和分區
3.1.1. 訂閱主題
在上面#2中的代碼實例中的注釋2中,使用subscribe()
方法訂閱了一個主題。消費者通過 subscribe()
方法訂閱一個或多個主題,Kafka會自動將主題的分區分配給消費者組內的各個消費者。這是最常見的消費方式,適用于動態分區分配場景。
核心特點:
- 自動負載均衡: 消費者組內的消費者會分攤主題的分區(例如:3個分區被2個消費者分配為2+1)。
- 支持分區再平衡(Rebalance): 當消費者加入或離開組時,Kafka會自動重新分配分區。
- 依賴消費者組(Group ID): 通過
group.id
標識消費者組,同一組的消費者共享分區。
代碼實例:
// 訂閱單個主題
consumer.subscribe(Collections.singletonList("test-topic"));// 訂閱多個主題
consumer.subscribe(Arrays.asList("topic1", "topic2"));// 使用正則表達式訂閱(匹配所有以 "logs-" 開頭的主題)
consumer.subscribe(Pattern.compile("logs-.*"));
適用場景:
- 需要動態擴展消費者數量(水平擴展)。
- 分區數量固定或動態變化時(如新增分區)。
- 消費者需要自動故障恢復(如某個消費者宕機,其他消費者接管分區)。
分區分配策略:
通過 partition.assignment.strategy
配置:
-
RangeAssignor(默認): 按范圍分配分區(可能導致不均衡)。
-
RoundRobinAssignor: 輪詢分配分區(更均衡)。
-
StickyAssignor: 盡量保持分配粘性,減少Rebalance時的分區變動。
3.1.2.直接訂閱分區
消費者通過 assign()
方法直接指定要消費的分區,繞過消費者組管理,需手動管理分區偏移量。
核心特點:
- 無消費者組協調: 不依賴 group.id,消費者獨立運行。
- 完全手動控制: 需自行指定分區和起始偏移量。
- 不支持自動再平衡: 分區增減或消費者故障時需手動處理。
代碼示例:
// 分配指定主題的特定分區
TopicPartition partition0 = new TopicPartition("test-topic", 0);
TopicPartition partition1 = new TopicPartition("test-topic", 1);
consumer.assign(Arrays.asList(partition0, partition1));// 指定從某個offset開始消費
consumer.seek(partition0, 100); // 從offset=100開始
consumer.seekToBeginning(Collections.singleton(partition1)); // 從最早開始
consumer.seekToEnd(Collections.singleton(partition1)); // 從最新開始
適用場景:
- 需要精確控制消費特定分區(如按業務邏輯分區)。
- 消費者組管理不適用時(如單消費者消費所有分區)。
- 測試或調試場景下手動控制消費位置。
3.1.3.取消訂閱
適用KafkaConsumer
中的unsubscribe()
方法來取消主題和分區的訂閱。
代碼實例:
consumer.unsubscribe()
如果將
subscribe(Collection)
或assign(Collection)
中的集合參數設置為空集合,那么作用與unsubscribe()
等同。
3.2.消息拉取
poll()
方法是消息消費的核心入口,其內部實現涉及 網絡通信、消息批量拉取 、分區狀態管理 、反序列化 、位移提交 等多個關鍵步驟。以下從源碼層面逐層解析 poll()
方法的完整流程:
3.2.1.poll()方法源碼解析
// KafkaConsumer.poll() 方法定義
public ConsumerRecords<K, V> poll(Duration timeout) {// 1. 檢查線程安全性(確保單線程調用)acquireAndEnsureOpen();try {// 2. 記錄開始時間(用于超時控制)long startMs = time.milliseconds();long remainingMs = timeout.toMillis();// 3. 主循環:處理消息拉取、心跳、Rebalance 等do {// 3.1 發送心跳(維持消費者組活性)coordinator.poll(timeRemaining(remainingMs, startMs));// 3.2 拉取消息(核心邏輯在 Fetcher 中)Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();if (!records.isEmpty()) {// 3.3 返回拉取到的消息(退出循環)return new ConsumerRecords<>(records);}// 3.4 計算剩余等待時間remainingMs = timeRemaining(remainingMs, startMs);} while (remainingMs > 0);return ConsumerRecords.empty();} finally {release();}
}
3.2.2.核心子流程解析
-
發送心跳(
coordinator.poll()
)
消費者通過ConsumerCoordinator
維持與GroupCoordinator
的心跳,確保消費者組活性。關鍵源碼路徑:
ConsumerCoordinator.poll()
→pollHeartbeat()
→sendHeartbeatRequest()
// ConsumerCoordinator.poll() 核心邏輯 public void poll(long timeoutMs) {// 1. 處理未完成的異步請求(如心跳、JoinGroup 等)client.poll(timeoutMs, time.milliseconds(), new PollCondition() {@Overridepublic boolean shouldBlock() {return !coordinatorUnknown(); // 是否等待響應}});// 2. 檢查是否需要觸發 Rebalanceif (needRejoin()) {joinGroupIfNeeded(); // 觸發 Rebalance}// 3. 周期性發送心跳pollHeartbeat(time.milliseconds()); }
心跳機制:
-
心跳間隔: 由
heartbeat.interval.ms
控制(默認 3 秒)。 -
會話超時: 由
session.timeout.ms
控制(默認 10 秒)。若超時未心跳,消費者被踢出組。
-
-
消息拉取(
fetcher.fetchedRecords()
)
Fetcher 負責從 Broker 拉取消息并解析。其內部維護一個completedFetches
隊列緩存已拉取但未處理的消息批次。源碼路徑:
Fetcher.fetchedRecords()
→parseCompletedFetches()
→parseRecord()
→deserializeRecord()
// Fetcher.fetchedRecords() 核心邏輯 public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {Map<TopicPartition, List<ConsumerRecord<K, V>>> drainedRecords = new HashMap<>();int recordsRemaining = maxPollRecords; // 單次 poll 最大記錄數(默認 500)// 1. 遍歷已完成的拉取請求(completedFetches)while (recordsRemaining > 0 && !completedFetches.isEmpty()) {CompletedFetch completedFetch = completedFetches.peek();// 2. 解析消息批次(MemoryRecords → RecordBatch)MemoryRecords records = completedFetch.records();for (RecordBatch batch : records.batches()) {// 3. 遍歷批次內的每條消息for (Record record : batch) {// 4. 反序列化消息(調用 Deserializer)K key = keyDeserializer.deserialize(record.topic(), record.key());V value = valueDeserializer.deserialize(record.topic(), record.value());// 5. 構建 ConsumerRecordConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(record.topic(),record.partition(),record.offset(),record.timestamp(),TimestampType.forCode(record.timestampType()),record.serializedKeySize(),record.serializedValueSize(),key,value,record.headers(),record.leaderEpoch());// 6. 按分區緩存消息drainedRecords.computeIfAbsent(partition, p -> new ArrayList<>()).add(consumerRecord);recordsRemaining--;}}completedFetches.poll();}return drainedRecords; }
關鍵設計:
- 批量拉取: Kafka 按批次(
RecordBatch
)拉取消息,減少網絡開銷。 - 零拷貝優化:
MemoryRecords
直接操作ByteBuffer
,避免內存復制。 - 反序列化延遲: 反序列化在消息實際被消費時進行(而非拉取時)。
- 批量拉取: Kafka 按批次(
-
位移提交(
maybeAutoCommitOffsetsAsync()
)
若啟用自動提交(enable.auto.commit
=true
),消費者會在poll()
后異步提交位移。源碼路徑:
KafkaConsumer.maybeAutoCommitOffsetsAsync()
→commitOffsetsAsync()
// KafkaConsumer 自動提交邏輯 private void maybeAutoCommitOffsetsAsync() {if (autoCommitEnabled) {// 1. 計算距離上次提交的時間間隔long now = time.milliseconds();if (now - lastAutoCommitTime >= autoCommitIntervalMs) {// 2. 提交所有分區的位移commitOffsetsAsync(subscriptions.allConsumed());lastAutoCommitTime = now;}} }// 異步提交位移 private void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) {coordinator.commitOffsetsAsync(offsets, new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {log.error("Async auto-commit failed", exception);}}}); }
自動提交配置:
-
提交間隔: 由
auto.commit.interval.ms
控制(默認 5 秒)。 -
提交內容: 提交所有已消費分區的 offset + 1(確保至少一次語義)。
-
3.2.3.網絡通信
所有網絡請求(拉取消息、心跳、位移提交)由 ConsumerNetworkClient
管理,其核心機制為 異步請求-響應模型。
-
請求發送
// ConsumerNetworkClient.send() 方法 public RequestFuture<ClientResponse> send(Node node,AbstractRequest.Builder<?> requestBuilder,long timeoutMs ) {// 1. 構建請求對象long now = time.milliseconds();ClientRequest clientRequest = client.newClientRequest(node.idString(),requestBuilder,now,true,timeoutMs,null);// 2. 將請求加入隊列(非阻塞)unsent.put(node, clientRequest);return clientRequest.future(); }
-
響應處理
// ConsumerNetworkClient.poll() 方法 public void poll(long timeout, long now, PollCondition pollCondition) {// 1. 發送所有未完成的請求sendAllRequests();// 2. 輪詢網絡通道(Selector),接收響應client.poll(timeout, now, new PollCondition() {@Overridepublic boolean shouldBlock() {return pollCondition.shouldBlock() || hasPendingRequests();}});// 3. 處理已完成的響應handleCompletedSends();handleCompletedReceives();handleDisconnections();handleConnections();handleTimedOutRequests(); }
關鍵機制:
-
請求隊列:
unsent
緩存待發送的請求。 -
響應回調: 每個請求關聯一個
RequestFuture
,在響應到達時觸發回調。
-
3.3.反序列化
3.3.1.核心接口
Kafka 的反序列化通過 org.apache.kafka.common.serialization.Deserializer
接口實現,所有反序列化器必須實現該接口。
接口定義:
public interface Deserializer<T> extends Closeable {// 配置反序列化器(從消費者配置中讀取參數)void configure(Map<String, ?> configs, boolean isKey);// 核心反序列化方法T deserialize(String topic, byte[] data);// 反序列化方法(帶Headers)default T deserialize(String topic, Headers headers, byte[] data) {return deserialize(topic, data);}// 關閉資源@Overridevoid close();
}
3.3.2.源碼級執行流程
-
消費者初始化階段
當創建 KafkaConsumer 時,會通過 ConsumerConfig 加載配置,并初始化key.deserializer
和value.deserializer
。關鍵源碼路徑:
KafkaConsumer
#initialize()
→ConsumerConfig
#getConfiguredInstance()
// 源碼片段:ConsumerConfig.java public static <T> T getConfiguredInstance(String key, Class<T> t) {String className = getString(key);try {Class<?> c = Class.forName(className, true, Utils.getContextOrKafkaClassLoader());return Utils.newInstance(c, t); // 反射創建實例} catch (ClassNotFoundException e) {// 處理異常...} }// KafkaConsumer 構造函數中初始化反序列化器 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class); this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);// 調用 configure() 方法 Map<String, Object> configs = config.originals(); this.keyDeserializer.configure(configs, true); // 標記為 key 反序列化器 this.valueDeserializer.configure(configs, false); // 標記為 value 反序列化器
-
消息拉取與反序列化
當調用poll()
方法拉取消息后,Kafka 會遍歷每條消息,使用反序列化器將字節數組轉換為 Java 對象。關鍵源碼路徑:
KafkaConsumer
#poll()
→Fetcher
#parseRecord()
→ConsumerRecord
#toRecord()
// 源碼片段:ConsumerRecord.java public static <K, V> ConsumerRecord<K, V> toRecord(/* 參數省略 */) {// 反序列化 KeyK key = keyDeserializer != null ? keyDeserializer.deserialize(topic, headers, keyBytes) : null;// 反序列化 ValueV value = valueDeserializer != null ? valueDeserializer.deserialize(topic, headers, valueBytes) : null;return new ConsumerRecord<>(topic, partition, offset, timestamp, timestampType,key, value, headers, leaderEpoch); }
-
線程安全與生命周期
-
線程安全: 每個 KafkaConsumer 實例持有獨立的反序列化器實例,因此反序列化器無需考慮線程安全。
-
生命周期: 反序列化器的
close()
方法在消費者關閉時被調用。
-
3.3.3.自定義反序列化器實現
- 實現 Deserializer 接口
假設需要反序列化 JSON 數據:public class JsonDeserializer<T> implements Deserializer<T> {private ObjectMapper objectMapper = new ObjectMapper();private Class<T> targetType;@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// 從配置中獲取目標類型(如 User.class)String configKey = isKey ? "key.type" : "value.type";String typeName = (String) configs.get(configKey);try {this.targetType = (Class<T>) Class.forName(typeName);} catch (ClassNotFoundException e) {throw new KafkaException("Class not found: " + typeName, e);}}@Overridepublic T deserialize(String topic, byte[] data) {if (data == null) return null;try {return objectMapper.readValue(data, targetType);} catch (IOException e) {throw new SerializationException("Error deserializing JSON", e);}}@Overridepublic void close() {// 清理資源(可選)} }
- 消費者配置
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName()); props.put("value.type", "com.example.User"); // 自定義參數傳遞
3.4.消費者攔截器
Kafka 支持通過 ConsumerInterceptor 攔截消息處理流程:
public interface ConsumerInterceptor<K, V> extends Configurable {// 在消息返回給用戶前攔截ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);// 在位移提交前攔截void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
}
- KafkaConsumer會在poll()方法返回之前調用攔截器的onConsume()方法來對消息進行定制化處理。
- KafkaConsumer會在提交完消費者位移之后調用攔截器的onCommit(),可以使用和這個方法來記錄跟蹤所提交的位移信息。
消費者也有連接鏈的概念,跟生產者一樣也是按照
inerceptor.classes
參數配置連接鏈的執行順序。
配置方式:
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.example.MyConsumerInterceptor");