目錄
發送流程
1. 流程邏輯分析
階段一:主線程處理
階段二:Sender 線程異步發送
核心設計思想
2. 流程
關鍵點總結
重要參數
一、核心必填參數
二、可靠性相關參數
三、性能優化參數
四、高級配置
五、安全性配置(可選)
六、錯誤處理與監控
典型配置示例
關鍵注意事項
發送流程
- 序列化與分區:消息通過
Partitioner
選擇目標分區(默認輪詢或哈希),序列化后加入RecordAccumulator
緩沖區。- 批次合并:
Sender線程
將同一分區的消息合并為ProducerBatch
,減少網絡請求(源碼見Sender.run()
方法)。- 發送至Broker:通過
NetworkClient
異步發送,Broker的LogAppendTime
處理寫入請求。- ACK機制:根據
acks
配置(0/1/all)等待Broker確認,通過Metadata
類更新分區元數據
1. 流程邏輯分析
Kafka 生產者發送消息的核心流程分為 主線程處理 和 Sender 線程異步發送 兩個階段,具體步驟如下:
階段一:主線程處理
- 創建 ProducerRecord
-
- 用戶調用
producer.send(ProducerRecord)
,指定 Topic、Key、Value 和可選的分區或時間戳。
- 用戶調用
- 選擇分區(Partition)
-
- 若未指定分區,根據以下規則選擇:
-
-
- 有 Key:對 Key 哈希取模(
hash(key) % 分區數
),確保相同 Key 的消息進入同一分區。 - 無 Key:默認使用粘性分區策略(Sticky Partitioning,Kafka 2.4+),在批次填滿或超時前發送到同一分區,提升性能。
- 有 Key:對 Key 哈希取模(
-
- 序列化(Serialize)
-
- 使用配置的
key.serializer
和value.serializer
對 Key 和 Value 序列化(如StringSerializer
、ByteArraySerializer
)。
- 使用配置的
- 追加到緩沖區(RecordAccumulator)
-
- 將消息按 Topic-Partition 分組,存入
RecordAccumulator
的批次(Batch)中。 - 批次策略:
- 將消息按 Topic-Partition 分組,存入
-
-
batch.size
:批次大小閾值(默認 16KB),達到閾值立即發送。linger.ms
:批次等待時間(默認 0ms),超時后發送未滿批次。
-
階段二:Sender 線程異步發送
- Sender 線程拉取批次
-
- Sender 線程定期檢查緩沖區,將滿足條件的批次(已滿或超時)封裝為
ProducerRequest
。
- Sender 線程定期檢查緩沖區,將滿足條件的批次(已滿或超時)封裝為
- 構建請求并發送到 Broker
-
- 根據分區的 Leader 副本所在 Broker,將請求發送到對應的節點。
- 關鍵配置:
-
-
acks
:控制消息持久化確認級別:
-
-
-
-
0
:不等待確認(可能丟失數據)。1
:等待 Leader 確認(默認)。all
:等待所有 ISR 副本確認(最高可靠性)。
-
-
-
-
max.in.flight.requests.per.connection
:控制單個 Broker 的未確認請求數(默認 5)。
-
- 處理 Broker 響應
-
- 成功:觸發用戶設置的
Callback
回調,并釋放批次內存。 - 失敗:
- 成功:觸發用戶設置的
-
-
- 可重試錯誤(如網絡抖動、Leader 切換):根據
retries
(默認 0)和retry.backoff.ms
(默認 100ms)重試。 - 不可重試錯誤(如消息過大):直接觸發回調并拋出異常。
- 可重試錯誤(如網絡抖動、Leader 切換):根據
-
核心設計思想
- 異步批處理:通過緩沖區合并小消息,減少網絡 I/O 次數。
- 零拷貝優化:使用
sendfile
系統調用提升網絡傳輸效率。 - 高可靠性:通過重試機制和
acks=all
確保消息不丟失。
2. 流程
關鍵點總結
- 分區選擇:優先使用 Key 哈希或粘性分區策略,保證消息順序性和吞吐量。
- 批次優化:通過
batch.size
和linger.ms
平衡延遲與吞吐。 - 可靠性保障:通過
acks
和retries
配置確保消息持久化。 - 異步處理:主線程與 Sender 線程解耦,避免阻塞用戶邏輯。
重要參數
以下是 Kafka 生產者(Producer)在日常開發中的 常見配置參數 及其作用,按功能分類整理成表格:
一、核心必填參數
參數名 | 默認值 | 說明 |
| 無 | Kafka 集群地址列表(逗號分隔,如 )。 |
| 無 | Key 的序列化類(如 )。 |
| 無 | Value 的序列化類(同上)。 |
二、可靠性相關參數
參數名 | 默認值 | 說明 |
|
| 消息持久化確認機制:
|
|
| 發送失敗后的重試次數(建議設為 配合 )。 |
|
| 是否啟用冪等性( 和 |
|
| 單個 Broker 的未確認請求數。若啟用冪等性,建議設為 以保證順序。 |
三、性能優化參數
參數名 | 默認值 | 說明 |
|
| 消息在緩沖區等待時間(毫秒),增大可提升吞吐量(但增加延遲)。 |
|
(16KB) | 單個批次的大小閾值,達到閾值后立即發送。 |
|
(32MB) | 生產者緩沖區的總內存大小。 |
|
| 消息壓縮算法( 、 、 、 ),減少網絡帶寬占用。 |
四、高級配置
參數名 | 默認值 | 說明 |
|
(30秒) | 生產者等待 Broker 響應的超時時間。 |
|
(60秒) | 生產者緩沖區滿或元數據不可用時的阻塞時間(超時拋異常)。 |
| 默認輪詢/哈希策略 | 自定義分區策略(實現 接口)。 |
五、安全性配置(可選)
參數名 | 默認值 | 說明 |
|
| 安全協議(如 、 )。 |
| 無 | SSL 證書路徑(客戶端認證時需配置)。 |
| 無 | SASL 認證機制(如 、 )。 |
六、錯誤處理與監控
參數名 | 默認值 | 說明 |
| 無 | 生產者攔截器(實現 接口),用于監控或修改消息。 |
|
(30秒) | 性能指標采樣窗口時間。 |
典型配置示例
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 10);
props.put("linger.ms", 20);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");
props.put("enable.idempotence", "true");
關鍵注意事項
- 可靠性 vs 性能:
-
acks=all
和enable.idempotence=true
提高可靠性,但可能降低吞吐量。- 增大
batch.size
和linger.ms
可提升吞吐量,但增加延遲。
- 冪等性限制:
-
- 需 Kafka 0.11+ 版本支持,且
max.in.flight.requests=1
(或 Kafka 2.0+ 允許5
)。
- 需 Kafka 0.11+ 版本支持,且
- 監控與調優:
-
- 通過
metrics
和攔截器監控生產者性能,動態調整參數
- 通過