本文內容來自尚硅谷B站公開教學視頻,僅做個人總結、學習、復習使用,任何對此文章的引用,應當說明源出處為尚硅谷,不得用于商業用途。
如有侵權、聯系速刪
視頻教程鏈接:【尚硅谷】Kafka3.x教程(從入門到調優,深入全面)
PS:本節內容尚硅谷的視頻講的不太友好,又查了很多資料才搞明白
文章目錄
- 數據可靠性
- 數據不重復
- 數據有序性
數據可靠性
首先數據的可靠性指的是:
- 消息不會意外丟失
- 消息不會重復傳遞
那回顧我們的數據發送流程,在確認數據發送成功的這一步,也就是ack應答這里,不同的參數對應著不同的策略,如果選擇了0和1,則存在丟數的問題,如圖:
0: 如果數據發送到某個主題的leader時,leader所在節點掛了,那么這條消息就丟失了
1: 同理,leader收到了,還沒應答時掛了,也會丟數據
-1(all): 使用-1能保證數據落配盤后才回答,保證數據不丟失
但是,如果Leader收到數據,所有Follower都開始同步數據,但有一個Follower,因為某種故障,遲遲不能與Leader進行同步,那這個問題怎么解決呢?
這就引出了ISR隊列的概念了
ISR,是一個機制,也代表著一個同步合集,是由Leader維護的一個動態的in-sync replica set(ISR),意為和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)。它包含著所有處于同步狀態的副本。當一個副本和Leader副本的差距超過一定程度時,這個副本就會被認為是不同步的,不再被加入到ISR中。也因此,Kafka中的 ISR 并不是一直不變的
那么,既然ISR是動態的,那哪些副本會被包含在ISR中呢?
主要依據就是 副本需要保證能夠及時地接收并復制Leader副本的消息,也就是需要保證與leader副本的消息同步延遲在一定的時間范圍內(默認情況下是10秒鐘,由參數 replica.lag.time.max.ms 控制)。換而言之,因為分區與ISR機制,我們的消息一旦被Kafka 接收后,就會復制多份并很快落盤。這意味著,即使某一臺Broker節點宕機乃至硬盤損毀,也不會導致數據丟失。
我們將ISR與ACK應答結合起來使用,就形成了數據可靠條件
- 數據完全可靠條件 = ACK級別設置為-1 + 分區副本大于等于2 + ISR里應答的最小副本數量大于等于2
數據不重復
上面講解的,只能保證數據可靠,但是這又引出了一個新的問題
如果,leader在同步完成之后,向生產者回答時,掛掉了,這時候剩下的備份分區會自動選舉出一個新leader出來,但是生產者并不知道它掛掉了,只會以為是消息發送失敗了,觸發重試,又將數據發送了一遍,然后新的leader就又接受了一遍消息,然后在備份分區上再存一遍。這就導致了這條消息存在兩份,產生數據重復問題。
那么kafka是怎么保證數據不重復的呢?
其實這就是數據的冪等性問題了,冪等性就是指Producer不論向Broker發送多少次重復數據,Broker端都只會持久化一條,保證了不重復。
kafka默認啟用數據冪等性,即設置 enable.idempotence = true
在生產者發消息時,這條消息是有它自己的屬性的,其中有三個數據被拿來作為數據的主鍵,kafka會以此來判斷這條消息是否重復,若重復,則只保留一條
PID:又叫生產者編號(producerid), Producer在初始化的時候(只有初始化的時候會隨機生成PID,也就是重啟就會再次生成)會被分配一個PID
Partition:又叫分區編號,即這條消息要發往的分區的paritionid
SeqNumber:又叫序列號,發往同一Partition的消息會附帶Sequence Number(即發送數據的編號,代表著向分區發送的第幾條消息)
這樣<PID, PartitionID, SeqNumber>就相當于構成了一個主鍵。Broker端會對<PID, PartitionID, SeqNumber>做緩存,當具有相同主鍵的消息提交時,Broker只會持久化一條,這樣就保證了數據的唯一,不重復。
但是冪等性只能保證的是在單分區單會話內不重復,如果發消息時生產者掛掉了,重啟后它不知道是否發送成功了,又將這個消息再發送一遍,此時它的PID發生變化,那么這條消息就被認為是一條新的消息,導致重復存儲,這種情況怎么解決呢?
這就要引入kafka的事務機制了,事務這個東西大家都知道啥意思,不再重復解釋
我們通過事務,讓客戶端掛掉后繼續處理,而不是重新從頭來過,保證消息的僅一次發送
注意:開啟事務,必須開啟冪等性。
kafka使用事務,有5個API
// 初始化事務
void initializeTransactions ();// 開啟事務
void beginTransaction () throws ProducerFencedException;// 在事務中提交已消費的偏移量(主要用于消費者)
void sendOffsetsToTransaction (Map < TopicPartition, OffsetAndMetadata > offsets, String consumerGroupId) throws ProducerFencedException;// 提交事務
void commitTransaction () throws ProducerFencedException;// 放棄事務(類似于回滾事務的操作)
void abortTransaction () throws ProducerFencedException;
舉個例子:
package com.atguigu.kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class Test {public static void main(String[] args) {// 1. 創建 kafka 生產者的配置對象Properties properties = new Properties();// 2. 給 kafka 配置對象添加配置信息properties.put("bootstrap.servers", "hadoop102:9092");properties.put("key.serializer", StringSerializer.class.getName());properties.put("value.serializer", StringSerializer.class.getName());properties.put("transactional.id", "transaction_id_0");// 3. 創建 kafka 生產者對象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);// 初始化事務kafkaProducer.initTransactions();// 開啟事務kafkaProducer.beginTransaction();try {// 4. 調用 send 方法,發送消息// 發送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));}// int i = 1 / 0;// 提交事務kafkaProducer.commitTransaction();} catch (Exception e) {// 終止事務kafkaProducer.abortTransaction();} finally {// 5. 關閉資源kafkaProducer.close();}}
}
數據有序性
如果某主題TOPIC只有一個分區,那么它天生有序,因為分區其實就是一個有序隊列
如果是多分區的,kafka是通過滑動窗口的思想解決這個問題的
我們知道kafka發送請求時,最多緩存5個,其實在發送時,每個請求都有自己的單調遞增編號,kafka broker在接收數據時,會自動按照編號將數據排序,并且如果其中一個編號的請求失敗時,后續再次成功,數據過來后,會自動的根據編號插入到應該在的位置上