Kafka如何保證「消息不丟失」,「順序傳輸」,「不重復消費」,以及為什么會發生重平衡(reblanace)

前言

上一篇文章總結了kafka為什么快,下面來總結一下,kafka高頻的常見的問題。內容有點多,全部看完需要有一定的耐心。

kafka如何保證消息不丟失

Producer端

要保證消息不丟失,第一點要做的就是要保證消息從producer端發送到了kafka的broker中,并且broker把消息保存了下來。
由于在發送消息的過程中有可能會發生網絡故障,broker故障等原因導致消息發送失敗,因此在producer端有兩種方式來避免消息丟失。

接收發送消息回執

我們在使用kafka發送消息的時候,通常是使用producer.send(msg)方法,但是這個方法其實是一種異步發送,調用此方法發送消息的時候,雖然會立即返回,但是并不代表消息真的發送成功了。
1、所以可以使用同步發送消息,producer.send(msg).get()此方法會執行同步發生消息,并等待結果返回
2、也可以使用帶回調函數的異步方法,producer.send(msg,callback),用回調函數來監聽消息的發送結果,如果發送失敗了,可以在回調函數里面進行重試。

producer參數配置

producer也提供了一些配置參數來避免消息丟失。

// 此配置表示,Leader和Follower全部成功接收消息后才確認收到消息,
// 可以最大限度保證消息不丟失,但是吞吐量會下降
acks = -1 
// producer 發送消息失敗后,自動重試次數
retries = 3
// 發送消息失敗后的重試時間間隔
retry.backoff.ms = 300
Broker端

當消息發送到broker后,broker需要保證此消息不會丟失,我們都知道,kafka是會將消息持久化到磁盤中的。
但是kafka為了保持性能采用了,頁緩存+異步刷盤的形式將消息持久化到磁盤的。也就是批量定時將消息持久化到磁盤。
但是頁緩存如果還沒來的及將消息刷到磁盤,broker就掛了,還是會有消息丟失的風險,因此kafka又提供了partition的ISR(同步副本機制),即每一個patrtition都會有一個唯一的Leader和一到多個Follower,Leader專門處理一些事務類型的請求,Follower負責同步Leader的數據。當leader掛了后,會重新從Follower中選舉出新的Leader,保證消息能夠最終持久化。
另外,在producer中的配置參數acks,配置不同的值,broker也是會做不同的處理的。

acks=0:表示Producer請求立即返回,不需要等待Leader的任何確認。這種方案有最高的吞吐率,但是不保證消息是否真的發送成功。
acks =-1: 表示分區Leader必須等待消息被成功寫入到所有的ISR副本(同步副本)中才認為Producer請求成功。這種方案提供最高的消息持久性保證,但是理論上吞吐率也是最差的。
acks=1: 表示Leader副本必須應答此Producer請求并寫入消息到本地日志,之后Producer請求被認為成功。如果此時Leader副本應答請求之后掛掉了,消息會丟失。這個方案,提供了不錯的持久性保證和吞吐。

producer中還有一些參數的配置也是會起到避免消息丟失的作用

//表示分區副本的個數,replication.factor>1,當Leader掛了,follower會被選舉為leader繼續提供服務
replication.factor=2//表示 ISR 最少的副本數量,通常設置 replication.factormin.insync.replicas>1,這樣才有可用的follower副本執行替換,保證消息不丟失
replication.factormin.insync.replicas=2//是否可以把非ISR集合中的副本選舉為leader
min.inunclean.leader.election.enable= false
Consumer端

Consumer端,只要保證消息接收到不胡亂的提交offset就行,kafka本身也是會記錄每個pratition的偏移量,但是為了業務的可靠性,也可以自己存儲一份offset,防止由于業務代碼的問題導致消息沒有處理就提交的offset,有自己存儲才這一份offset就可以對偏移量進行一個回撥。

為了避免消息丟失,建議使用手動提交偏移量的方式,防止消息的業務邏輯未處理完,提交偏移量后消費者掛了的問題。

enable.auto.commit=false

kafka如何保證消息的順序傳輸

我們知道,kafka的消息實際是存在某個topic的partition中的,一個topic有多個partition分區,同一個partition中的消息是有序的,跨partition的消息是無序的。
這個是怎么實現的呢?
因為我們在【Kafka為什么吞吐量大,速度快?】這篇文章里面總結了,kafka寫入磁盤時是順序寫的,并且會被分配一個唯一的offset,所以同一個partition保存的數據都是有序的。而在讀取消息時,消費者會從該分區最早的offset開始,依次讀取消息,保證了消息順序消費。

具體實現順序發送消息有兩種方式:
1、在使用kafka時,對需要保證順序消費的topic,只創建一個partition,這樣消息就都會順序的存儲到這一個partition中,也就能保證順序消費了。
2、當一個topic有多個partition時,對需要保證順序的消息,都發到指定的partition即可,這樣也能保證順序消費。

注:需要注意一點,雖然發送時保證了順序,也都寫到了同一個partition中,但在消費端,也要保證順序消費,即單線程處理消息。

目前kafka4.0,可以允許一個consumer group下的多個消費者同時消費同一個partition了。
其借助新推出的共享組(Shared Group)特性來達成這一功能,且支持逐條消息確認,從而讓消費模式更具靈活性,還能助力提升吞吐量。以往版本默認僅允許一個消費者組內單個消費者消費一個特定分區,當消費者多于分區時,多余消費者會閑置,共享組則可避免出現該類資源浪費情況。

將消息發到指定partition中也有幾種方式。
1、發送消息,組裝producerRecord時,指定partition

// 創建Kafka生產者
Producer<String, String> producer = new KafkaProducer<>(getProperties());
// 指定要發送消息的topic
String topic ="jimer_topic";
// 發送的消息內容
String message =Hello World!";
// 指定發送消息的分區
int partition =0;// 創建包含分區信息的ProducerRecord
ProducerRecordProducerRecord<String,String> record = new ProducerRecord<>(topic, partition, message);
// 發送消息
producer.send(record);
//關閉Kafka生產者
producer.close();

2、指定消息的key,保證相同key的消息發送到同一個partition

// 創建Kafka生產者
Producer<String, String> producer = new KafkaProducer<>(getProperties());
// 指定要發送消息的topic
String topic ="jimer_topic";
// 發送的消息內容
String message =Hello World";
// 指定發送消息的key
String msg_key = "order_msg";// 創建包含消息key的producerRecord
ProducerRecordProducerRecord<String,String> record = 
new ProducerRecord<>(topic, null,msg_key, message);
// 發送消息
producer.send(record);
//關閉Kafka生產者
producer.close();

3、自定義Partitioner
除了上面兩種方式外,還可以自定義指定分區的方式。通過實現Partitioner這個接口,具體實現partition方法,就可以了。具體使用的時候,在初始化Producer時,指定具體的partition實現類即可。
例如:

public class MyPartitioner implatents Partitioner{@Override
public void configure(Map<String,?> configs){// 可以在這里處理和獲取分區器的配置參數
}
@0verride
public int partition(String topic, Object key, byte[] keyBytes, 
Object value,byte[] valueBytes,Cluster cluster){int partition =  int ss = new Random().nextInt(2);// 返回分區編號return partition;
}
@0verride
public void close(){// 可以在這里進行一些清理操作
}

使用時

Properties propsProducer = new Properties();propsProducer.put("acks", "all"); // 全部ISR列表中的副本接收成功后返回propsProducer.put("retries", 3);//失敗時重試次數propsProducer.put("partitioner.class", "com.jimoer.MyPartitioner"); // 指定自定義分區器類
// 創建Kafka生產者
Producer<String, String> producer = new KafkaProducer<>(propsProducer);

kafka如何保證消息不重復消費

什么情況下會導致消息被重復消費呢?

1、生產者,生產者可能重復推送了一條消息到kafka,例如:某接口未做冪等處理,接口中會發送kafka消息。
2、kafka,在消費者消費完消息后,提交offset時,kafka突然掛了,導致kafka認為此消息還未消費,又重新推送了該條消息,導致了重復消費消息。
3、消費者,在消費者消費完消息后,提交offset時,Consumer突然宕機掛掉,這個時候,kafka未接收到已處理的offset值,當Consumer恢復后,會重新消費此部分消息。
4、還有一種情況,Kafka 存在 Partition Balance 機制,會將多個 Partition 均衡分配給多個消費者。若 Consumer 在默認 5 分鐘內未處理完一批消息,會觸發 Rebalance 機制,導致 offset 自動提交失敗,重新 Rebalance 后,消費者會從之前未提交的 offset 位置開始消費,從而造成消息重復消費。

那么我們該如何防止消息被重復消費呢

其實上面的1、2、3、4這些情況都可以用冪等機制來防止消息被重復消費。為消息生成 一個唯一標識,并保存到 mysql 或 redis 中,處理消息前先到 mysql 或 redis 中判斷該消息是否已被消費過。

但是第4種情況,前提是要先優化消費端處理性能,避免觸發 Rebalance,例如:采用異步方式處理消息、縮短單個消息消費時長、調整消息處理超時時間、減少一次性從 Broker 拉取的數據條數等。

kafka什么情況下會發生reblanace(重平衡)

Kafka 的重平衡(Rebalance)是指消費者組(Consumer Group)內的消費者與分區(Partition)之間的分配關系發生重新調整的過程
主要有以下幾種情況會觸發:
1、消費者組成員數量發生變化。((新消費者的加入或者退出)
2、訂閱主題(Topic)數量發生變化。
3、訂閱主題的分區(Partition)數發生變化。

還有某些異常情況也會觸發Rebalance:
1、消費端處理消息超時,上面我們說到過,若消費者未在設定時間內處理完消息,消費者組會認為當前消費者有問題了,會觸發Rebalance,重新分配消息。又或者當前消費者掛了,也是一樣會觸發Rebalance。
2、組協調器(Group Coordinator)是 Kafka 負責管理消費者組的 Broker 節點。如果它崩潰或者發生故障。Kafka 需要重新選舉新的 Group Coordinator ,并進行重平衡。

當Kafka 集群要觸發重平衡機制時,大致的步驟如下:
1.暫停消費:在重平衡開始之前,Kafka 會暫停所有消費者的拉取操作,以確保不會出現重平衡期間的消息丟失或重復消費。
2.計算分區分配方案:Kafka 集群會根據當前消費者組的消費者數量和主題分區數量,計算出每個消費者應該分配的分區列表,以實現分區的負載均衡。
3.通知消費者:一旦分區分配方案確定,Kafka 集群會將分配方案發送給每個消費者,告訴它們需要消費的分區列表,并請求它們重新加入消費者組。
4.重新分配分區:在消費者重新加入消費者組后,Kafka 集群會將分區分配方案應用到實際的分區分配中,重新分配主題分區給各個消費者。
5.恢復消費:最后,Kafka 會恢復所有消費者的拉取操作,允許它們消費分配給自己的分區。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/94466.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/94466.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/94466.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

原子操作匯編實現:原理、流程與代碼解析

&#x1f52c; 原子操作匯編實現&#xff1a;原理、流程與代碼解析 引用&#xff1a;VC/C Intel x86 內聯匯編實現 “Interlocked” 原子變量各種操作 &#x1f31f; 引言&#xff1a;原子操作的重要性 在多線程編程中&#xff0c;原子操作是確保數據一致性的關鍵機制。本文…

【WRF理論第十九期】內陸湖泊、水體的處理方式

目錄 WRF 模型中湖泊模擬概述 湖泊模型(Lake Model)集成 新增湖泊數據支持(如 WUDAPT + MODIS) LAKE_DEPTH Noah-MP + 湖泊模型聯合使用 namelist.input 配置說明 WRF 代碼更新 參考 論壇-WRF 湖泊模型(WRF-Lake model)與 SST 更新 WRF 模型中湖泊模擬概述 湖泊模型(La…

【滲透測試】SQLmap實戰:一鍵獲取MySQL數據庫權限

注&#xff1a;所有技術僅用于合法安全測試與防御研究&#xff0c;未經授權的攻擊行為屬違法犯罪&#xff0c;將承擔法律責任。一、SQLmap常規用法注意存放路徑&#xff1a;C:\Users\neo\AppData\Local\sqlmap\output1、列出詳細過程和數據庫列表sqlmap -u http://192.168.61.2…

LeetCode 第464場周賽 第三天

1. 3658 奇數和與偶數和的最大公約數&#xff08;歐幾里得&#xff09; 鏈接&#xff1a;題目鏈接 題解&#xff1a; 題解時間復雜度O(logmin(a, b))&#xff1a; 獲得前n個奇、偶數的總和&#xff0c;由于數列為等差數列&#xff0c;等差數列和公式&#xff1a;(a1 an) * n …

IntelliJ IDEA 集成 ApiFox 操作與注解規范指南

一、IDEA裝入Apifox 1.安裝Apifox Helper 說明:在 IntelliJ IDEA 中安裝 ApiFox Helper 插件。 2.打開Apifox 說明:點擊 設置,在菜單中選擇 API訪問令牌。在彈出的窗口中輸入任意名稱,并選擇令牌的有效期(為了方便,我這里選擇了 無期限)。生成令牌后,由于 令牌只能復…

C++---雙指針

在C編程中&#xff0c;雙指針算法是一種高效的解題思路&#xff0c;其核心是通過設置兩個指針&#xff08;或索引&#xff09;遍歷數據結構&#xff08;如數組、鏈表、字符串等&#xff09;&#xff0c;利用指針的移動規則減少無效操作&#xff0c;從而將時間復雜度從暴力解法的…

【LLM】GLM-4.5模型架構和原理

note 文章目錄note一、GLM-4.5模型二、Slime RL強化學習訓練架構Reference一、GLM-4.5模型 大模型進展&#xff0c;GLM-4.5技術報告,https://arxiv.org/pdf/2508.06471&#xff0c;https://github.com/zai-org/GLM-4.5&#xff0c;包括GLM-4.5&#xff08;355B總參數&#xff…

LLM 中增量解碼與模型推理解讀

在【LLM】LLM 中 token 簡介與 bert 實操解讀一文中對 LLM 基礎定義進行了介紹&#xff0c;本文會對 LLM 中增量解碼與模型推理進行解讀。 一、LLM 中增量解碼定義 增量解碼&#xff08;Incremental Decoding&#xff09;是指在自回歸文本生成過程中&#xff0c;模型每次只計…

1.Spring Boot:超越配置地獄,重塑Java開發體驗

目錄 一、Spring框架&#xff1a;偉大的基石 歷史背景與挑戰 Spring的革命性貢獻 新的挑戰&#xff1a;配置地獄 二、Spring Boot&#xff1a;約定大于配置的革命 四大核心特性 1. 快速創建獨立應用 2. 自動配置&#xff1a;智能化的魔法 3. 起步依賴&#xff1a;依賴管…

assert使用方法

assert 是 Python 中用來進行 調試 和 驗證 的一個關鍵字&#xff0c;它用于測試一個 條件表達式 是否為真。如果條件為假&#xff0c;assert 會拋出一個 AssertionError 異常&#xff0c;通常帶有錯誤信息。語法&#xff1a;assert condition, "Error message"condi…

【實習總結】快速上手Git:關鍵命令整理

目錄 git的四大工作區域 git首次配置 克隆遠程倉庫 提交代碼到遠程倉庫 查看文件狀態&#xff08;可選&#xff09; 添加文件到暫存區 將暫存區的內容提交到本地倉庫 將本地的提交上傳到遠程倉庫 拉取并合并代碼 第一種方式 第二種方式 分支管理 查看與創建分支 …

02-開發環境搭建與工具鏈

第2課&#xff1a;開發環境搭建與工具鏈 &#x1f4da; 課程目標 掌握DevEco Studio的下載、安裝和配置熟悉HMS Core&#xff08;華為移動服務&#xff09;的使用了解鴻蒙模擬器與真機調試環境掌握必備開發工具的使用 &#x1f6e0;? DevEco Studio環境搭建 2.1 下載與安裝…

刪掉一個元素以后全為1的最長子數組-滑動窗口

1493. 刪掉一個元素以后全為 1 的最長子數組 - 力扣&#xff08;LeetCode&#xff09; Solution #include<iostream> #include<vector> using namespace std;class Solution { public://滑動窗口//動態維護一個窗口&#xff0c;窗口內只能有1個0&#xff0c;記錄窗…

【計算機網絡 | 第8篇】編碼與調制

文章目錄通信系統中的編碼與調制&#xff1a;從信道基礎到信號傳輸技術一、信道與通信電路&#x1f342;二、三種基本通信方式&#x1f4d6;1. 單向通信&#xff08;單工通信&#xff09;2. 雙向交替通信&#xff08;半雙工通信&#xff09;3. 雙向同時通信&#xff08;全雙工通…

當AI遇上終端:Gemini CLI的技術魔法與架構奧秘

"代碼不僅僅是指令的集合&#xff0c;更是思想的載體。當AI與終端相遇&#xff0c;會碰撞出怎樣的火花&#xff1f;" 在這個AI技術日新月異的時代&#xff0c;Google推出的Gemini CLI無疑是一顆璀璨的明星。它不僅僅是一個命令行工具&#xff0c;更是一個將人工智能無…

ViLU: Learning Vision-Language Uncertainties for Failure Prediction

研究方向&#xff1a;Image Captioning1. 論文介紹本文提出ViLU&#xff08;Vision-Language Uncertainties&#xff09;&#xff0c;一個用于學習視覺語言不確定性量化&#xff08;UQ&#xff09;和檢測視覺語言模型故障的事后框架。使用VLMs進行量化&#xff08;UQ&#xff0…

數據集筆記:百度地圖高德地圖坐標互轉

1 為什么會有高德坐標系和百度坐標系&#xff1f;根據《測繪法》和國家保密法規&#xff0c;在中國大陸范圍內的地理坐標數據必須做加密處理&#xff0c;不允許直接使用 WGS84&#xff08;openstreetmap&#xff09;所以出現了GCJ-02 和 BD-09高德、騰訊、谷歌中國都遵循 GCJ-0…

SkyWalking高效線程上下文管理機制:確保調用鏈中traceId來自同一個請求

SkyWalking Agent 能確保獲取到“正確”的 traceId,其核心在于它建立并維護了一套高效的線程上下文管理機制。這套機制確保了即使在復雜的多線程、異步環境下,也能將正確的上下文(包含 traceId)與當前正在執行的代碼邏輯關聯起來。 其工作原理可以概括為下圖所示的流程: …

Kafka-Eagle安裝

目錄Eagle環境安裝Mysql環境準備Kafka環境準備Eagle安裝Kafka-Eagle框架可以監控Kafka集群的整體運行情況&#xff0c;在生產環境中經常使用 Eagle環境安裝 Mysql環境準備 Eagle的安裝依賴于Mysql&#xff0c;Mysql主要用來存儲可視化展示的數據 將mysql文件夾及里面所有內…

Matlab系列(005) 一 歸一化

目錄1、前言2、什么是歸一化&#xff1f;3、為什么要進行歸一化4、歸一化方法詳解與Matlab實現5、總結1、前言 ? ??歸一化技術是數據預處理的核心環節&#xff0c;本文將深度解析主流歸一化方法&#xff0c;提供可復現Matlab代碼&#xff0c;并探討其在各領域中的應用場景。…