kafka學習筆記--如何保證生產者數據可靠、不重復、有序

本文內容來自尚硅谷B站公開教學視頻,僅做個人總結、學習、復習使用,任何對此文章的引用,應當說明源出處為尚硅谷,不得用于商業用途。
如有侵權、聯系速刪
視頻教程鏈接:【尚硅谷】Kafka3.x教程(從入門到調優,深入全面)


PS:本節內容尚硅谷的視頻講的不太友好,又查了很多資料才搞明白

文章目錄

  • 數據可靠性
  • 數據不重復
  • 數據有序性

數據可靠性

首先數據的可靠性指的是:

  • 消息不會意外丟失
  • 消息不會重復傳遞

那回顧我們的數據發送流程,在確認數據發送成功的這一步,也就是ack應答這里,不同的參數對應著不同的策略,如果選擇了0和1,則存在丟數的問題,如圖:
0: 如果數據發送到某個主題的leader時,leader所在節點掛了,那么這條消息就丟失了
在這里插入圖片描述
1: 同理,leader收到了,還沒應答時掛了,也會丟數據
在這里插入圖片描述
-1(all): 使用-1能保證數據落配盤后才回答,保證數據不丟失
在這里插入圖片描述
但是,如果Leader收到數據,所有Follower都開始同步數據,但有一個Follower,因為某種故障,遲遲不能與Leader進行同步,那這個問題怎么解決呢?

這就引出了ISR隊列的概念了

ISR,是一個機制,也代表著一個同步合集,是由Leader維護的一個動態的in-sync replica set(ISR),意為和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。它包含著所有處于同步狀態的副本。當一個副本和Leader副本的差距超過一定程度時,這個副本就會被認為是不同步的,不再被加入到ISR中。也因此,Kafka中的 ISR 并不是一直不變的


那么,既然ISR是動態的,那哪些副本會被包含在ISR中呢?


主要依據就是 副本需要保證能夠及時地接收并復制Leader副本的消息,也就是需要保證與leader副本的消息同步延遲在一定的時間范圍內(默認情況下是10秒鐘,由參數 replica.lag.time.max.ms 控制)。

換而言之,因為分區與ISR機制,我們的消息一旦被Kafka 接收后,就會復制多份并很快落盤。這意味著,即使某一臺Broker節點宕機乃至硬盤損毀,也不會導致數據丟失。

我們將ISR與ACK應答結合起來使用,就形成了數據可靠條件

  • 數據完全可靠條件 = ACK級別設置為-1 + 分區副本大于等于2 + ISR里應答的最小副本數量大于等于2

數據不重復

上面講解的,只能保證數據可靠,但是這又引出了一個新的問題
如果,leader在同步完成之后,向生產者回答時,掛掉了,這時候剩下的備份分區會自動選舉出一個新leader出來,但是生產者并不知道它掛掉了,只會以為是消息發送失敗了,觸發重試,又將數據發送了一遍,然后新的leader就又接受了一遍消息,然后在備份分區上再存一遍。這就導致了這條消息存在兩份,產生數據重復問題。
在這里插入圖片描述
那么kafka是怎么保證數據不重復的呢?
其實這就是數據的冪等性問題了,冪等性就是指Producer不論向Broker發送多少次重復數據,Broker端都只會持久化一條,保證了不重復。

kafka默認啟用數據冪等性,即設置 enable.idempotence = true

在生產者發消息時,這條消息是有它自己的屬性的,其中有三個數據被拿來作為數據的主鍵,kafka會以此來判斷這條消息是否重復,若重復,則只保留一條

PID:又叫生產者編號(producerid), Producer在初始化的時候(只有初始化的時候會隨機生成PID,也就是重啟就會再次生成)會被分配一個PID

Partition:又叫分區編號,即這條消息要發往的分區的paritionid

SeqNumber:又叫序列號,發往同一Partition的消息會附帶Sequence Number(即發送數據的編號,代表著向分區發送的第幾條消息)

這樣<PID, PartitionID, SeqNumber>就相當于構成了一個主鍵。Broker端會對<PID, PartitionID, SeqNumber>做緩存,當具有相同主鍵的消息提交時,Broker只會持久化一條,這樣就保證了數據的唯一,不重復。

但是冪等性只能保證的是在單分區單會話內不重復,如果發消息時生產者掛掉了,重啟后它不知道是否發送成功了,又將這個消息再發送一遍,此時它的PID發生變化,那么這條消息就被認為是一條新的消息,導致重復存儲,這種情況怎么解決呢?

這就要引入kafka的事務機制了,事務這個東西大家都知道啥意思,不再重復解釋

我們通過事務,讓客戶端掛掉后繼續處理,而不是重新從頭來過,保證消息的僅一次發送

注意:開啟事務,必須開啟冪等性。

請添加圖片描述

kafka使用事務,有5個API

// 初始化事務
void initializeTransactions ();// 開啟事務
void beginTransaction () throws ProducerFencedException;// 在事務中提交已消費的偏移量(主要用于消費者)
void sendOffsetsToTransaction (Map < TopicPartition, OffsetAndMetadata > offsets, String consumerGroupId) throws ProducerFencedException;// 提交事務
void commitTransaction () throws ProducerFencedException;// 放棄事務(類似于回滾事務的操作)
void abortTransaction () throws ProducerFencedException;

舉個例子:

package com.atguigu.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class Test {public static void main(String[] args) {// 1. 創建 kafka 生產者的配置對象Properties properties = new Properties();// 2. 給 kafka 配置對象添加配置信息properties.put("bootstrap.servers", "hadoop102:9092");properties.put("key.serializer", StringSerializer.class.getName());properties.put("value.serializer", StringSerializer.class.getName());properties.put("transactional.id", "transaction_id_0");// 3. 創建 kafka 生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 初始化事務kafkaProducer.initTransactions();// 開啟事務kafkaProducer.beginTransaction();try {// 4. 調用 send 方法,發送消息// 發送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));}// int i = 1 / 0;// 提交事務kafkaProducer.commitTransaction();} catch (Exception e) {// 終止事務kafkaProducer.abortTransaction();} finally {// 5. 關閉資源kafkaProducer.close();}}
}

數據有序性

如果某主題TOPIC只有一個分區,那么它天生有序,因為分區其實就是一個有序隊列

如果是多分區的,kafka是通過滑動窗口的思想解決這個問題的

我們知道kafka發送請求時,最多緩存5個,其實在發送時,每個請求都有自己的單調遞增編號,kafka broker在接收數據時,會自動按照編號將數據排序,并且如果其中一個編號的請求失敗時,后續再次成功,數據過來后,會自動的根據編號插入到應該在的位置上
請添加圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/211848.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/211848.shtml
英文地址,請注明出處:http://en.pswp.cn/news/211848.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

名字的漂亮度

給出一個字符串&#xff0c;該字符串僅由小寫字母組成&#xff0c;定義這個字符串的“漂亮度”是其所有字母“漂亮度”的總和。 每個字母都有一個“漂亮度”&#xff0c;范圍在1到26之間。沒有任何兩個不同字母擁有相同的“漂亮度”。字母忽略大小寫。給出多個字符串&#xff0…

從零開發短視頻電商 Low Level Client(推薦)連接OpenSearch進行CRUD

文章目錄 依賴初始化客戶端發起請求請求參數請求頭設置超時時間設置線程數設置用戶名密碼結果解析節點選擇器配置嗅探器整體示例問題參考 OpenSearch開發環境安裝Docker和Docker-Compose兩種方式 依賴 <dependency><groupId>org.elasticsearch.client</groupId…

【腳本】圖片-音視頻-壓縮文件處理

音視頻處理 一&#xff0c;圖片操作1&#xff0c;轉換圖片格式2&#xff0c;多張圖片合成視頻 二&#xff0c;音頻操作1&#xff0c;轉換音頻格式2&#xff0c;分割音頻為多段3&#xff0c;合成多段音頻 三&#xff0c;視頻操作1&#xff0c;轉換視頻格式2&#xff0c;提取視頻…

【Go自學版】01-基礎

// 變量 var a, b, c 8, 2.3, "hello" var d float64; e : 6var A []int; var B [10]int; C : [10]int{1, 2, 3, 4} for i : 0; i < len(B); i {} for _, value : range C {} D make([]int, 3) // len 4, cap 10, 擴容方式 cap*2 E : make([]int, 4, 10) E …

掌握PyTorch數據預處理(一):讓模型表現更上一層樓!!!

引言 在PyTorch中&#xff0c;數據預處理是模型訓練過程中不可或缺的一環。通過精心優化數據&#xff0c;我們能夠確保模型在訓練時能夠更高效地學習&#xff0c;從而在實際應用中達到更好的性能。今天&#xff0c;我們將深入探討一些常用的PyTorch數據預處理技巧&#xff0c;…

C++如何通過調用ffmpeg接口對H264文件進行編碼和解碼

C可以通過調用FFmpeg的API來對H264文件進行編碼和解碼。下面是一個簡單的例子。 首先需要在代碼中包含FFmpeg的頭文件&#xff1a; extern "C" { #include <libavcodec/avcodec.h> #include <libavformat/avformat.h> #include <libswscale/swscale…

Linux系統編程:進程間通信總結

管道 在Linux中&#xff0c;管道是一種進程間通信方式&#xff0c;它允許一個進程&#xff08;寫入端&#xff09;將其輸出直接連接到另一個進程&#xff08;讀取端&#xff09;的輸入。從本質上說&#xff0c;管道也是一種文件&#xff0c;但它又和一般的文件有所不同。 具體…

Docker部署開源分布式任務調度平臺DolphinScheduler并實現遠程訪問辦公

文章目錄 前言1. 安裝部署DolphinScheduler1.1 啟動服務 2. 登錄DolphinScheduler界面3. 安裝內網穿透工具4. 配置Dolphin Scheduler公網地址5. 固定DolphinScheduler公網地址 前言 本篇教程和大家分享一下DolphinScheduler的安裝部署及如何實現公網遠程訪問&#xff0c;結合內…

前端知識筆記(二十七)———CSS核心功能手冊:從熟悉到精通

參考HTML代碼 <!DOCTYPE html> <html lang"zh-CN"><head><meta charset"utf-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-wi…

12.9_黑馬數據結構與算法筆記Java

目錄 057 多路遞歸 e03 楊輝三角2 thinking&#xff1a;二維數組的動態初始化&#xff1f; 057 多路遞歸 e03 楊輝三角3 058 鏈表 e01 反轉單向鏈表1 058 鏈表 e01 反轉單向鏈表2 058 鏈表 e01 反轉單向鏈表3 遞歸 058 鏈表 e01 反轉單向鏈表4 為什么是returnn1呢&…

【Cisco Packet Tracer】路由器 NAT實驗

NAT的實現方式有三種&#xff0c;即靜態轉換Static Nat、動態轉換Dynamic Nat和端口多路復用OverLoad。 靜態轉換是指內部本地地址一對一轉換成內部全局地址&#xff0c;相當內部本地的每一臺PC都綁定了一個全局地址。一般用于在內網中對外提供服務的服務器。 [3] 動態轉換是指…

C++ 迭代器

迭代器 迭代器類似于指針類型&#xff0c;也提供了對對象的間接訪問。 就迭代器而言&#xff0c;其對象是容器中的元素或 string 對象中的字符。 獲取迭代器 容器的迭代器類型 使用作用域運算符來說明我們希望使用的類型成員&#xff1b;例&#xff1a;string::iterator it…

探秘MSSQL存儲過程:功能、用法及實戰案例

在現代軟件開發中&#xff0c;高效地操作數據庫是至關重要的。而MSSQL&#xff08;Microsoft SQL Server&#xff09;作為一款強大的關系型數據庫管理系統&#xff0c;為我們提供了豐富的功能和工具來處理數據。其中&#xff0c;MSSQL存儲過程是一項強大而又常用的功能&#xf…

改進YOLOv8注意力系列一:結合ACmix、Biformer、BAM注意力機制

???改進YOLOv8注意力系列一:結合ACmix、Biformer、BAM注意力機制 代碼ACmixBiFormerBAMBlock加入方法各種yaml加入結構本文提供了改進 YOLOv8注意力系列包含不同的注意力機制以及多種加入方式,在本文中具有完整的代碼和包含多種更有效加入YOLOv8中的yaml結構,讀者可以獲…

C++ 的關鍵字(保留字)介紹

一.C中部分關鍵字的用法 1. auto 關鍵字auto是C11引入的&#xff0c;它可以用于變量聲明和函數返回類型的推導。當你不關心變量的具體類型時&#xff0c;可以使用auto來讓編譯器根據初始化表達式推導出變量的類型。這樣可以簡化代碼&#xff0c;提高可讀性。 1.在for循環中遍…

Mysql索引一篇就夠了

索引 定義 索引是對數據庫表中一列或者多列的值進行排序的結構。 目的 數據庫索引好比一本書的目錄&#xff0c;提高查詢效率。但是為表設置索引要付出相應的代價&#xff1a; 增加了數據庫的存儲空間 在插入和修改時需花費更多的時間&#xff08;因為索引也要隨之變動&#…

一、C#筆記

1.注釋 /*多行注釋*/class HelloWorld{ void Hello(){Console.WriteLine("Hello!");//單行注釋}} 2.理解語句 2.1方法、語法、語義 2.2使用標識符 標識符語法規則&#xff1a; 只能使用字母&#xff08;大寫和小寫&#xff09;、數字和下劃…

C++相關閑碎記錄(5)

1、容器提供的類型 2、Array Array大小固定&#xff0c;只允許替換元素的值&#xff0c;不能增加或者移除元素改變大小。Array是一種有序集合&#xff0c;支持隨機訪問。 std::array<int, 4> x; //elements of x have undefined value std::array<int, 5> x {…

滲透測試——七、網站漏洞——命令注入和跨站請求偽造(CSRF)

滲透測試 一、命令注入二、跨站請求偽造(CSRF)三、命令注入頁面之注人測試四、CSRF頁面之請求偽造測試 一、命令注入 命令注入(命令執行) 漏洞是指在網頁代碼中有時需要調用一些執行系統命令的函數例如 system()、exec()、shell_exec()、eval()、passthru()&#xff0c;代碼未…

基于ssm在線云音樂系統的設計與實現論文

摘 要 隨著移動互聯網時代的發展&#xff0c;網絡的使用越來越普及&#xff0c;用戶在獲取和存儲信息方面也會有激動人心的時刻。音樂也將慢慢融入人們的生活中。影響和改變我們的生活。隨著當今各種流行音樂的流行&#xff0c;人們在日常生活中經常會用到的就是在線云音樂系統…