【Kafka使用方式以及原理】

在這里插入圖片描述
在這里插入圖片描述
在這里插入圖片描述

Kafka生產者發送消息的方式

Kafka生產者發送消息主要通過以下三種方式:

同步發送
生產者發送消息后,會阻塞等待Broker的響應,確認消息是否成功寫入。這種方式可靠性高,但吞吐量較低。代碼示例:

ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
RecordMetadata metadata = producer.send(record).get();

異步發送
生產者發送消息后立即返回,通過回調函數處理Broker的響應。這種方式吞吐量高,但需要自行處理失敗情況。代碼示例:

producer.send(record, (metadata, exception) -> {if (exception != null) {// 處理失敗邏輯}
});

異步發送(無回調)
生產者直接發送消息而不關心結果,適用于對可靠性要求不高的場景。吞吐量最高,但可能丟失消息。代碼示例:

producer.send(record);

Kafka生產者發送消息的特點

分區策略
生產者可以通過指定分區鍵(Key)控制消息寫入的分區。若未指定Key,則采用輪詢策略分配分區。支持自定義分區器(Partitioner)。

消息確認機制(acks)

  • acks=0:生產者不等待Broker確認,消息可能丟失。
  • acks=1:Leader副本寫入成功后即返回響應。
  • acks=all/-1:需所有ISR副本寫入成功,可靠性最高。

批量發送(Batch)
生產者會將多條消息合并為一個批次發送,減少網絡開銷。通過linger.msbatch.size參數控制批處理行為。

消息重試
網絡異常或Leader切換時,生產者會自動重試發送消息。可通過retriesretry.backoff.ms參數配置重試策略。

冪等性與事務

  • 冪等性:通過啟用enable.idempotence=true避免消息重復發送。
  • 事務:支持跨分區原子性寫入,需配置transactional.id

緩沖區機制
生產者維護一個內存緩沖區(buffer.memory),暫存待發送消息。緩沖區滿時,發送調用會被阻塞或拋出異常。

Kafka消費者的基本工作流程

Kafka消費者通過訂閱主題(Topic)或特定分區(Partition)來消費消息。消費者組(Consumer Group)機制允許并行處理消息,每個消費者組內的消費者獨立消費不同分區的數據。

消費者啟動時會向Kafka集群發送元數據請求,獲取訂閱主題的分區信息。消費者與分區建立連接后,通過輪詢(Poll)機制從分區拉取消息。消費者會定期提交偏移量(Offset)到Kafka,記錄消費進度。

消費者配置關鍵參數

  • bootstrap.servers: Kafka集群的地址列表。
  • group.id: 消費者所屬的組名稱。
  • auto.offset.reset: 當無初始偏移量時如何處理(earliestlatest)。
  • enable.auto.commit: 是否自動提交偏移量(默認true)。
  • max.poll.records: 單次Poll返回的最大消息數。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

消息消費模式

訂閱主題模式
消費者訂閱一個或多個主題,Kafka自動分配分區:

consumer.subscribe(Arrays.asList("topic1", "topic2"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());}
}

手動分配分區模式
直接指定消費的分區,繞過消費者組協調:

TopicPartition partition = new TopicPartition("topic1", 0);
consumer.assign(Arrays.asList(partition));

偏移量管理

自動提交
配置enable.auto.commit=true,Kafka定期提交偏移量(默認5秒一次)。

手動同步提交
精確控制提交時機,確保消息處理完成后再提交:

consumer.commitSync();

手動異步提交
非阻塞式提交,需處理回調:

consumer.commitAsync((offsets, exception) -> {if (exception != null) {System.err.println("Commit failed: " + offsets);}
});

消費者再平衡(Rebalance)

當消費者組內成員變化(如新增或下線消費者)時,Kafka觸發再平衡,重新分配分區。可通過ConsumerRebalanceListener接口實現自定義邏輯:

consumer.subscribe(Arrays.asList("topic1"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {// 分區被回收前的處理(如提交偏移量)}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {// 新分區分配后的處理(如恢復狀態)}
});

處理消費延遲與積壓

  • 調整max.poll.records減少單次Poll的數據量。
  • 優化消息處理邏輯,避免阻塞Poll線程。
  • 增加消費者實例數量(不超過分區數)。
  • 監控消費者延遲指標(如consumer_lag)。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/87286.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/87286.shtml
英文地址,請注明出處:http://en.pswp.cn/web/87286.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【ChatTTS】ChatTTS使用體驗

ChatTTS 使用體驗&#xff1a;初始使用真的十分驚艷。可以嘗試官網調用試一試。部署的好處是&#xff0c;遇到好聽的音色可以把參數自動存儲在本地。 苦惱&#xff1a;相同參數生成的音色不一致&#xff0c;需要多次調整&#xff0c;但最終效果非常滿意。 ? GitHub Star數變化…

華為云Flexus+DeepSeek征文| 基于華為云Dify-LLM高可用平臺開發運維故障處理智能體

華為云FlexusDeepSeek征文&#xff5c; 基于華為云Dify-LLM高可用平臺開發運維故障處理智能體 1. 概述2. 創建工作流2.1. 創建開始節點2.2. 創建搜索節點2.3. 創建LLM大模型節點2.4. 創建結束節點 3. 測試工作流4. 應用發布5. 總結 1. 概述 Dify是一款開源的LLM應用開發平臺&am…

vue中scss下載方式與引入方式

1. scss下載 npm install sass-loader --save-devnpm install node-sass --save-dev 2. 在style標簽里面加入lang“scss” 測試下&#xff01;

Day04_C語言IO進程線程

01.思維導圖 02.創建一個分支線程&#xff0c;在主線程中拷貝文件的前一部分&#xff0c;主線程拷貝文件的后一部分 #include <25051head.h> void* callback(void *arg) {off_t size*(off_t*)arg;//打開一個文件讀//打開一個文件寫int fd_r2open("./my.txt",O_…

金牛區數字文創夢工廠:國際數字影像產業園的先行服務

在金牛區數字文創夢工廠的實踐中&#xff0c;先行服務作為創新引擎&#xff0c;為企業提供預啟動階段的全方位支持。其核心理念是通過前置化咨詢和資源整合&#xff0c;降低試錯成本&#xff0c;賦能產業升級。 先行服務的三大核心優勢 通過主動介入項目啟動前環節&#xff0…

使用RSA對網址url欄加密以及二維碼的網址內容加密

JSEncrypt 庫 &#xff1a; - 引入了 jsencrypt/bin/jsencrypt.min - 需要在項目中安裝 jsencrypt 包 npm install jsencrypt import JSEncrypt from jsencrypt/bin/jsencrypt.min// 密鑰對生成 http://web.chacuo.net/netrsakeypairconst publicKey MFwAAQconst privateKe…

如何用 Kafka Manager 實現 Kafka 集群全面監控

1. 前言:為什么需要 Kafka 集群監控? Apache Kafka 是現代大數據架構中不可或缺的組件,廣泛用于日志收集、流處理、消息隊列等場景。隨著 Kafka 集群規模的增長和業務復雜度的提升,對 Kafka 的實時監控變得尤為重要。 1.1 Kafka 在大數據架構中的核心地位 Kafka 被廣泛應…

MyBatis架構原理解析:核心對象與執行流程深度剖析

一、開篇&#xff1a;理解MyBatis的核心價值 在當今Java持久層框架生態中&#xff0c;MyBatis憑借其靈活的SQL控制能力和簡潔的ORM實現成為企業級應用的首選。與JPA的全自動ORM不同&#xff0c;MyBatis采用半自動化映射理念&#xff0c;在保持SQL靈活性的同時&#xff0c;通過…

移遠通信攜手高通:以全棧車載解決方案,共繪智能出行新藍圖

6月26日至27日&#xff0c;2025高通汽車技術與合作峰會于蘇州盛大舉辦。本次峰會以 “我們一起&#xff0c;行穩智遠” 為主題&#xff0c;全方位呈現智能汽車全棧技術、全產業鏈生態與全場景體驗。作為高通長期穩定的戰略合作伙伴&#xff0c;移遠通信攜全棧車載智能解決方案深…

拿來就能用的python 課程 1

拿來就能用的python 課程 引言 python是很多人入門計算機語言的首選。 但是繁文縟節&#xff0c;很多人從怎么裝python開始學起&#xff0c;然后python計算&#xff0c;然后什么是函數&#xff0c;然后什么是類&#xff0c;然后就因為太難放棄了。&#xff08;說的是不是你&a…

openssh-server

默認地&#xff0c;Ubuntu桌面版不帶SSH服務器 1 檢查服務是否存在 ls /usr/sbin/sshd2 安裝服務 apt install openssh-server3 關閉防火墻 ufw disable 4 啟動服務 service ssh start

html虛擬滾動,解決dom渲染過多卡頓的問題

<!DOCTYPE html> <html lang"zh"><head><meta charset"UTF-8" /><title>極簡虛擬滾動</title><style>.container {width: 300px;height: 300px;border: 1px solid #ccc;overflow: auto;position: relative;}.pl…

華銳互動:全方位定制化 VR 內容制作服務流程剖析?

華銳互動始終堅持以客戶為中心&#xff0c;為客戶提供全方位、定制化的 VR 內容制作服務。從項目的最初階段開始&#xff0c;華銳互動就會深入了解客戶的需求和目標&#xff0c;與客戶進行充分的溝通和交流&#xff0c;挖掘項目背后的故事和文化內涵&#xff0c;然后根據客戶的…

50天50個小項目 (Vue3 + Tailwindcss V4) ? | DragNDrop(拖拽占用組件)

&#x1f4c5; 我們繼續 50 個小項目挑戰&#xff01;—— DragNDrop組件 倉庫地址&#xff1a;https://github.com/SunACong/50-vue-projects 項目預覽地址&#xff1a;https://50-vue-projects.vercel.app/ 使用 Vue 3 的 Composition API 和 <script setup> 語法結合…

springboot應用即使使用了連接池,MySQL數據庫仍然有大量sleep狀態的連接

springboot應用即使使用了連接池&#xff0c;MySQL數據庫仍然有大量sleep狀態的連接 問題背景概念理解MySQL配置參數wait_timeout概念Hikari配置參數&#xff08;項目使用hikari作為數據庫連接池&#xff09; 實踐出真知總結和解決思路 問題背景 近期客戶生產環境報&#xff1…

windows下安裝和使用git

本文為windows下git的下載安裝和使用。 git下載和安裝 參考&#xff1a; windows安裝git&#xff08;全網最詳細&#xff0c;保姆教程&#xff09;-CSDN博客 【學了就忘】Git介紹 — 4.Git的安裝 - 簡書 先解決下載時的一些疑惑&#xff1a; 選擇哪個架構&#xff1f; 電腦ARM6…

借助工具給外語視頻加雙語字幕的實用指南?

給外語視頻配上雙語字幕&#xff0c;能讓不同語言背景的觀眾更輕松理解內容&#xff0c;也能讓視頻在傳播時更受歡迎。現在有不少智能工具能幫我們高效完成這項工作&#xff0c;比如 ViiTor AI 平臺&#xff0c;它在處理雙語字幕方面有不少實用功能&#xff0c;下面就結合其功能…

Claude 4 與 Gemini 2.5 Pro:開發者深度比較

Claude 4 與 Gemini 2.5 Pro&#xff1a;開發者深度比較 在使用相同的編碼挑戰對Claude Sonnet 4和Gemini 2.5 Pro Preview進行廣泛的正面測試后&#xff0c;我發現了每個開發人員都應該了解的顯著性能差異。我的發現揭示了執行速度、成本效率以及最重要的&#xff0c;精確執行…

怎么進入9870端口

在實驗時想進入9870端口查看safe狀態 但是輸入localhost:9870后顯示&#xff1a; 首先使用jps確認hadoop狀態&#xff1a; 從 jps 的輸出來看&#xff0c;Hadoop 的核心服務&#xff08;NameNode、DataNode、ResourceManager、NodeManager 等&#xff09;都已經正常運行&…

Windows、Linux、macOS 三大系統安裝 Git 的常見坑點及解決方案,附帶 具體操作示例,幫助新手快速避坑

以下是 Windows、Linux、macOS 三大系統安裝 Git 的常見坑點及解決方案,附帶 具體操作示例,幫助新手快速避坑。 一、Windows 系統安裝 Git 1. 安裝路徑含空格或中文 坑點:默認路徑 C:\Program Files\Git 可能導致某些腳本報錯。 解決:自定義路徑(如 D:\DevTools\Git)。…