【kafka系列】生產者

目錄

發送流程

1. 流程邏輯分析

階段一:主線程處理

階段二:Sender 線程異步發送

核心設計思想

2. 流程

關鍵點總結

重要參數

一、核心必填參數

二、可靠性相關參數

三、性能優化參數

四、高級配置

五、安全性配置(可選)

六、錯誤處理與監控

典型配置示例

關鍵注意事項


發送流程

  • 序列化與分區:消息通過Partitioner選擇目標分區(默認輪詢或哈希),序列化后加入RecordAccumulator緩沖區。
  • 批次合并Sender線程將同一分區的消息合并為ProducerBatch,減少網絡請求(源碼見Sender.run()方法)。
  • 發送至Broker:通過NetworkClient異步發送,Broker的LogAppendTime處理寫入請求。
  • ACK機制:根據acks配置(0/1/all)等待Broker確認,通過Metadata類更新分區元數據

1. 流程邏輯分析

Kafka 生產者發送消息的核心流程分為 主線程處理Sender 線程異步發送 兩個階段,具體步驟如下:


階段一:主線程處理
  1. 創建 ProducerRecord
    • 用戶調用 producer.send(ProducerRecord),指定 Topic、Key、Value 和可選的分區或時間戳。
  1. 選擇分區(Partition)
    • 若未指定分區,根據以下規則選擇:
      • 有 Key:對 Key 哈希取模(hash(key) % 分區數),確保相同 Key 的消息進入同一分區。
      • 無 Key:默認使用粘性分區策略(Sticky Partitioning,Kafka 2.4+),在批次填滿或超時前發送到同一分區,提升性能。
  1. 序列化(Serialize)
    • 使用配置的 key.serializervalue.serializer 對 Key 和 Value 序列化(如 StringSerializerByteArraySerializer)。
  1. 追加到緩沖區(RecordAccumulator)
    • 將消息按 Topic-Partition 分組,存入 RecordAccumulator 的批次(Batch)中。
    • 批次策略
      • batch.size:批次大小閾值(默認 16KB),達到閾值立即發送。
      • linger.ms:批次等待時間(默認 0ms),超時后發送未滿批次。

階段二:Sender 線程異步發送
  1. Sender 線程拉取批次
    • Sender 線程定期檢查緩沖區,將滿足條件的批次(已滿或超時)封裝為 ProducerRequest
  1. 構建請求并發送到 Broker
    • 根據分區的 Leader 副本所在 Broker,將請求發送到對應的節點。
    • 關鍵配置
      • acks:控制消息持久化確認級別:
        • 0:不等待確認(可能丟失數據)。
        • 1:等待 Leader 確認(默認)。
        • all:等待所有 ISR 副本確認(最高可靠性)。
      • max.in.flight.requests.per.connection:控制單個 Broker 的未確認請求數(默認 5)。
  1. 處理 Broker 響應
    • 成功:觸發用戶設置的 Callback 回調,并釋放批次內存。
    • 失敗
      • 可重試錯誤(如網絡抖動、Leader 切換):根據 retries(默認 0)和 retry.backoff.ms(默認 100ms)重試。
      • 不可重試錯誤(如消息過大):直接觸發回調并拋出異常。

核心設計思想
  • 異步批處理:通過緩沖區合并小消息,減少網絡 I/O 次數。
  • 零拷貝優化:使用 sendfile 系統調用提升網絡傳輸效率。
  • 高可靠性:通過重試機制和 acks=all 確保消息不丟失。

2. 流程


關鍵點總結

  1. 分區選擇:優先使用 Key 哈希或粘性分區策略,保證消息順序性和吞吐量。
  2. 批次優化:通過 batch.sizelinger.ms 平衡延遲與吞吐。
  3. 可靠性保障:通過 acksretries 配置確保消息持久化。
  4. 異步處理:主線程與 Sender 線程解耦,避免阻塞用戶邏輯。

重要參數

以下是 Kafka 生產者(Producer)在日常開發中的 常見配置參數 及其作用,按功能分類整理成表格:


一、核心必填參數

參數名

默認值

說明

bootstrap.servers

Kafka 集群地址列表(逗號分隔,如 host1:9092,host2:9092

)。

key.serializer

Key 的序列化類(如 org.apache.kafka.common.serialization.StringSerializer

)。

value.serializer

Value 的序列化類(同上)。


二、可靠性相關參數

參數名

默認值

說明

acks

1

消息持久化確認機制:

0:不等待確認(可能丟失數據)。 1:等待 Leader 確認(默認)。all:等待所有 ISR 副本確認(最高可靠性)。

retries

0

發送失敗后的重試次數(建議設為 Integer.MAX_VALUE

配合 delivery.timeout.ms

)。

enable.idempotence

false

是否啟用冪等性(true時保證消息不重復,需配合 acks=all

retries>0)。

max.in.flight.requests.per.connection

5

單個 Broker 的未確認請求數。若啟用冪等性,建議設為 1

以保證順序。


三、性能優化參數

參數名

默認值

說明

linger.ms

0

消息在緩沖區等待時間(毫秒),增大可提升吞吐量(但增加延遲)。

batch.size

16384

(16KB)

單個批次的大小閾值,達到閾值后立即發送。

buffer.memory

33554432

(32MB)

生產者緩沖區的總內存大小。

compression.type

none

消息壓縮算法(gzip

snappy

lz4

zstd

),減少網絡帶寬占用。


四、高級配置

參數名

默認值

說明

request.timeout.ms

30000

(30秒)

生產者等待 Broker 響應的超時時間。

max.block.ms

60000

(60秒)

生產者緩沖區滿或元數據不可用時的阻塞時間(超時拋異常)。

partitioner.class

默認輪詢/哈希策略

自定義分區策略(實現 Partitioner

接口)。


五、安全性配置(可選)

參數名

默認值

說明

security.protocol

PLAINTEXT

安全協議(如 SSL

SASL_SSL

)。

ssl.keystore.location

SSL 證書路徑(客戶端認證時需配置)。

sasl.mechanism

SASL 認證機制(如 PLAIN

SCRAM-SHA-256

)。


六、錯誤處理與監控

參數名

默認值

說明

interceptor.classes

生產者攔截器(實現 ProducerInterceptor

接口),用于監控或修改消息。

metrics.sample.window.ms

30000

(30秒)

性能指標采樣窗口時間。


典型配置示例

Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 10);
props.put("linger.ms", 20);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");
props.put("enable.idempotence", "true");

關鍵注意事項

  1. 可靠性 vs 性能
    • acks=allenable.idempotence=true 提高可靠性,但可能降低吞吐量。
    • 增大 batch.sizelinger.ms 可提升吞吐量,但增加延遲。
  1. 冪等性限制
    • 需 Kafka 0.11+ 版本支持,且 max.in.flight.requests=1(或 Kafka 2.0+ 允許 5)。
  1. 監控與調優
    • 通過 metrics 和攔截器監控生產者性能,動態調整參數

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/70108.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/70108.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/70108.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Docker 入門與實戰:從安裝到容器管理的完整指南

🚀 Docker 入門與實戰:從安裝到容器管理的完整指南 🌟 📖 簡介 在現代軟件開發中,容器化技術已經成為不可或缺的一部分。而 Docker 作為容器化領域的領頭羊,以其輕量級、高效和跨平臺的特性,深…

MySQL 插入替換語句(replace into statement)

我們日常使用 insert into 語句向表中插入數據時,一定遇到過主鍵或唯一索引沖突的情況,MySQL的反應是報錯并停止執行后續的語句,而replace into語句可以實現強制插入。 文章目錄 一、replace into 語句簡介1.1 基本用法1.2 使用set語句 二、注…

基于SpringBoot+Vue的智慧校園管理系統設計和實現(源碼+文檔+部署講解)

🎬 秋野醬:《個人主頁》 🔥 個人專欄:《Java專欄》《Python專欄》 ??心若有所向往,何懼道阻且長 文章目錄 .🚀 技術架構技術棧全景 🎯 功能模塊功能矩陣表📊 數據庫設計核心ER關系圖 💻 核心…

【Three.js】JS 3D library(一個月進化史)

#春節過完了,該繼續投入學習了~ 作為一個平面開發者,想要增進更多的技能,掌握web3D開發# Day 1 了解熟悉Three.js,著重基礎理論 學習資源: 前端可視化從0-1 Day 2 寫一個簡易demo 搭建環境-->安裝包-->創建…

moveable 一個可實現前端海報編輯器的 js 庫

目錄 緣由-胡扯本文實驗環境通用流程1.基礎移動1.1 基礎代碼1.1.1 data-* 解釋 1.2 操作元素創建1.3 css 修飾1.4 cdn 引入1.5 js 實現元素可移動1.6 圖片拖拽2.縮放3.旋轉4.裁剪 懶得改文案了,海報編輯器換方案了,如果后面用別的再更。 緣由-胡扯 導火…

Apollo 9.0 速度動態規劃決策算法 – path time heuristic optimizer

文章目錄 1. 動態規劃2. 采樣3. 代價函數3.1 障礙物代價3.2 距離終點代價3.3 速度代價3.4 加速度代價3.5 jerk代價 4. 回溯 這一章將來講解速度決策算法,也就是SPEED_HEURISTIC_OPTIMIZER task里面的內容。Apollo 9.0使用動態規劃算法進行速度決策,從類名…

【Day41 LeetCode】單調棧問題

一、單調棧問題 單調棧問題通常是在一維數組中尋找任一個元素的右邊或者左邊第一個比自己大或者小的元素的位置。 1、每日溫度 739 這題的目的是對于當天,找到未來溫度升高的那一天,也就是當前元素的右邊第一個比自己大的元素。所以我們需要維護一個單…

Cherno C++ P55 宏

這篇文章我們講一下C當中的宏。其實接觸過大型項目的朋友可能都被詭異的宏折磨過。 宏是在預處理當中,通過文本替換的方式來實現一些操作,這樣可以不用反復的輸入代碼,幫助我們實現自動化。至于預處理的過程,其實就是文本編輯&am…

web第三次作業

彈窗案例 1.首頁代碼 <!DOCTYPE html><html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>綜合案例</title><st…

深入解析LVS命令參數及DR模式下的ARP抑制原理

深入解析LVS命令參數及DR模式下的ARP抑制原理 一、LVS簡介 Linux Virtual Server (LVS) 是基于Linux內核的高性能負載均衡解決方案&#xff0c;支持NAT、DR&#xff08;Direct Routing&#xff09;和TUN&#xff08;IP Tunneling&#xff09;三種模式。其中&#xff0c;ipvsad…

阿里云一鍵部署DeepSeek-V3、DeepSeek-R1模型

目錄 支持的模型列表 模型部署 模型調用 WebUI使用 在線調試 API調用 關于成本 FAQ 點擊部署后服務長時間等待 服務部署成功后&#xff0c;調用API返回404 請求太長導致EAS網關超時 部署完成后&#xff0c;如何在EAS的在線調試頁面調試 模型部署之后沒有“聯網搜索…

Win10環境借助DockerDesktop部署大數據時序數據庫Apache Druid

Win10環境借助DockerDesktop部署最新版大數據時序數據庫Apache Druid32.0.0 前言 大數據分析中&#xff0c;有一種常見的場景&#xff0c;那就是時序數據&#xff0c;簡言之&#xff0c;數據一旦產生絕對不會修改&#xff0c;隨著時間流逝&#xff0c;每個時間點都會有個新的…

【第13章:自監督學習與少樣本學習—13.1 自監督學習最新進展與實現方法】

凌晨三點的實驗室,博士生小王盯著屏幕里正在"自娛自樂"的神經網絡——這個沒有吃過一張標注圖片的模型,正在通過旋轉、拼圖、填色等游戲任務,悄悄掌握著理解世界的秘訣。這種魔法般的修煉方式,正是當今AI領域最炙手可熱的技術:自監督學習。 一、打破數據枷鎖:自…

數據庫報錯1045-Access denied for user ‘root‘@‘localhost‘ (using password: YES)解決方式

MySQL 報錯 1045 表示用戶root從localhost連接時被拒絕訪問&#xff0c;通常是因為密碼錯誤、權限問題或配置問題。以下是解決該問題的常見方法&#xff1a; 方法一&#xff1a;檢查用戶名和密碼 ? 確認用戶名和密碼是否正確&#xff1a; 確保輸入的用戶名和密碼完全正確&am…

八大排序——簡單選擇排序

目錄 1.1基本操作&#xff1a; 1.2動態圖&#xff1a; 1.3代碼&#xff1a; 代碼解釋 1. main 方法 2. selectSort 方法 示例運行過程 初始數組 每輪排序后的數組 最終排序結果 代碼總結 1.1基本操作&#xff1a; 選擇排序&#xff08;select sorting&#xff09;也…

與傳統光伏相比 城電科技的光伏太陽花有什么優勢?

相比于傳統光伏&#xff0c;城電科技的光伏太陽花有以下優勢&#xff1a; 一、發電效率方面 智能追蹤技術&#xff1a;光伏太陽花通過內置的智能追蹤系統&#xff0c;采用全球定位跟蹤算法&#xff0c;能夠實時調整花瓣&#xff08;即光伏板&#xff09;的角度&#xff0c;確…

FPGA的星辰大海

編者按 時下風頭正盛的DeepSeek,正值喜好宏大敘事的米國大統領二次上崗就業,OpenAI、軟銀、甲骨文等宣布投資高達5000億美元“星際之門”之際,對比尤為強烈。 某種程度上,,是低成本創新理念的直接落地。 包括來自開源社區的諸多贊譽是,并非體現技術有多“超越”,而是…

Elasticsearch:15 年來致力于索引一切,找到重要內容

作者&#xff1a;來自 Elastic Shay Banon 及 Philipp Krenn Elasticsearch 剛剛 15 歲了&#xff01;回顧過去 15 年的索引和搜索&#xff0c;并展望未來 15 年的相關內容。 Elasticsearch 剛剛成立 15 周年。一切始于 2010 年 2 月的一篇公告博客文章&#xff08;帶有標志性的…

嵌入式軟件、系統、RTOS(高軟23)

系列文章目錄 4.2嵌入式軟件、系統、RTOS 文章目錄 系列文章目錄前言一、嵌入式軟件二、嵌入式系統三、嵌入式系統分類四、真題總結 前言 本節講明嵌入式相關知識&#xff0c;包括軟件、系統。 一、嵌入式軟件 二、嵌入式系統 三、嵌入式系統分類 四、真題 總結 就是高軟筆記…

數據結構 day02

3. 線性表 3.1. 順序表 3.1.3. 順序表編程實現 操作&#xff1a;增刪改查 .h 文件 #ifndef __SEQLIST_H__ #define __SEQLIST_H__ #define N 10 typedef struct seqlist {int data[N];int last; //代表數組中最后一個有效元素的下標 } seqlist_t;//1.創建一個空的順序表 seq…