前言
上一篇文章總結了kafka為什么快,下面來總結一下,kafka高頻的常見的問題。內容有點多,全部看完需要有一定的耐心。
kafka如何保證消息不丟失
Producer端
要保證消息不丟失,第一點要做的就是要保證消息從producer端發送到了kafka的broker中,并且broker把消息保存了下來。
由于在發送消息的過程中有可能會發生網絡故障,broker故障等原因導致消息發送失敗,因此在producer端有兩種方式來避免消息丟失。
接收發送消息回執
我們在使用kafka發送消息的時候,通常是使用producer.send(msg)
方法,但是這個方法其實是一種異步發送,調用此方法發送消息的時候,雖然會立即返回,但是并不代表消息真的發送成功了。
1、所以可以使用同步發送消息,producer.send(msg).get()
此方法會執行同步發生消息,并等待結果返回。
2、也可以使用帶回調函數的異步方法,producer.send(msg,callback)
,用回調函數來監聽消息的發送結果,如果發送失敗了,可以在回調函數里面進行重試。
producer參數配置
producer也提供了一些配置參數來避免消息丟失。
// 此配置表示,Leader和Follower全部成功接收消息后才確認收到消息,
// 可以最大限度保證消息不丟失,但是吞吐量會下降
acks = -1
// producer 發送消息失敗后,自動重試次數
retries = 3
// 發送消息失敗后的重試時間間隔
retry.backoff.ms = 300
Broker端
當消息發送到broker后,broker需要保證此消息不會丟失,我們都知道,kafka是會將消息持久化到磁盤中的。
但是kafka為了保持性能采用了,頁緩存+異步刷盤的形式將消息持久化到磁盤的。也就是批量定時將消息持久化到磁盤。
但是頁緩存如果還沒來的及將消息刷到磁盤,broker就掛了,還是會有消息丟失的風險,因此kafka又提供了partition的ISR(同步副本機制),即每一個patrtition都會有一個唯一的Leader和一到多個Follower,Leader專門處理一些事務類型的請求,Follower負責同步Leader的數據。當leader掛了后,會重新從Follower中選舉出新的Leader,保證消息能夠最終持久化。
另外,在producer中的配置參數acks
,配置不同的值,broker也是會做不同的處理的。
acks=0:表示Producer請求立即返回,不需要等待Leader的任何確認。這種方案有最高的吞吐率,但是不保證消息是否真的發送成功。
acks =-1: 表示分區Leader必須等待消息被成功寫入到所有的ISR副本(同步副本)中才認為Producer請求成功。這種方案提供最高的消息持久性保證,但是理論上吞吐率也是最差的。
acks=1: 表示Leader副本必須應答此Producer請求并寫入消息到本地日志,之后Producer請求被認為成功。如果此時Leader副本應答請求之后掛掉了,消息會丟失。這個方案,提供了不錯的持久性保證和吞吐。
producer中還有一些參數的配置也是會起到避免消息丟失的作用
//表示分區副本的個數,replication.factor>1,當Leader掛了,follower會被選舉為leader繼續提供服務
replication.factor=2//表示 ISR 最少的副本數量,通常設置 replication.factormin.insync.replicas>1,這樣才有可用的follower副本執行替換,保證消息不丟失
replication.factormin.insync.replicas=2//是否可以把非ISR集合中的副本選舉為leader
min.inunclean.leader.election.enable= false
Consumer端
Consumer端,只要保證消息接收到不胡亂的提交offset就行,kafka本身也是會記錄每個pratition的偏移量,但是為了業務的可靠性,也可以自己存儲一份offset,防止由于業務代碼的問題導致消息沒有處理就提交的offset,有自己存儲才這一份offset就可以對偏移量進行一個回撥。
為了避免消息丟失,建議使用手動提交偏移量的方式,防止消息的業務邏輯未處理完,提交偏移量后消費者掛了的問題。
enable.auto.commit=false
kafka如何保證消息的順序傳輸
我們知道,kafka的消息實際是存在某個topic的partition中的,一個topic有多個partition分區,同一個partition中的消息是有序的,跨partition的消息是無序的。
這個是怎么實現的呢?
因為我們在【Kafka為什么吞吐量大,速度快?】這篇文章里面總結了,kafka寫入磁盤時是順序寫的,并且會被分配一個唯一的offset,所以同一個partition保存的數據都是有序的。而在讀取消息時,消費者會從該分區最早的offset開始,依次讀取消息,保證了消息順序消費。
具體實現順序發送消息有兩種方式:
1、在使用kafka時,對需要保證順序消費的topic,只創建一個partition,這樣消息就都會順序的存儲到這一個partition中,也就能保證順序消費了。
2、當一個topic有多個partition時,對需要保證順序的消息,都發到指定的partition即可,這樣也能保證順序消費。
注:需要注意一點,雖然發送時保證了順序,也都寫到了同一個partition中,但在消費端,也要保證順序消費,即單線程處理消息。
目前kafka4.0,可以允許一個consumer group下的多個消費者同時消費同一個partition了。
其借助新推出的共享組(Shared Group)特性來達成這一功能,且支持逐條消息確認,從而讓消費模式更具靈活性,還能助力提升吞吐量。以往版本默認僅允許一個消費者組內單個消費者消費一個特定分區,當消費者多于分區時,多余消費者會閑置,共享組則可避免出現該類資源浪費情況。
將消息發到指定partition中也有幾種方式。
1、發送消息,組裝producerRecord時,指定partition
// 創建Kafka生產者
Producer<String, String> producer = new KafkaProducer<>(getProperties());
// 指定要發送消息的topic
String topic ="jimer_topic";
// 發送的消息內容
String message =“Hello World!";
// 指定發送消息的分區
int partition =0;// 創建包含分區信息的ProducerRecord
ProducerRecordProducerRecord<String,String> record = new ProducerRecord<>(topic, partition, message);
// 發送消息
producer.send(record);
//關閉Kafka生產者
producer.close();
2、指定消息的key,保證相同key的消息發送到同一個partition
// 創建Kafka生產者
Producer<String, String> producer = new KafkaProducer<>(getProperties());
// 指定要發送消息的topic
String topic ="jimer_topic";
// 發送的消息內容
String message =“Hello World";
// 指定發送消息的key
String msg_key = "order_msg";// 創建包含消息key的producerRecord
ProducerRecordProducerRecord<String,String> record =
new ProducerRecord<>(topic, null,msg_key, message);
// 發送消息
producer.send(record);
//關閉Kafka生產者
producer.close();
3、自定義Partitioner
除了上面兩種方式外,還可以自定義指定分區的方式。通過實現Partitioner這個接口,具體實現partition方法,就可以了。具體使用的時候,在初始化Producer時,指定具體的partition實現類即可。
例如:
public class MyPartitioner implatents Partitioner{@Override
public void configure(Map<String,?> configs){// 可以在這里處理和獲取分區器的配置參數
}
@0verride
public int partition(String topic, Object key, byte[] keyBytes,
Object value,byte[] valueBytes,Cluster cluster){int partition = int ss = new Random().nextInt(2);// 返回分區編號return partition;
}
@0verride
public void close(){// 可以在這里進行一些清理操作
}
使用時
Properties propsProducer = new Properties();propsProducer.put("acks", "all"); // 全部ISR列表中的副本接收成功后返回propsProducer.put("retries", 3);//失敗時重試次數propsProducer.put("partitioner.class", "com.jimoer.MyPartitioner"); // 指定自定義分區器類
// 創建Kafka生產者
Producer<String, String> producer = new KafkaProducer<>(propsProducer);
kafka如何保證消息不重復消費
什么情況下會導致消息被重復消費呢?
1、生產者,生產者可能重復推送了一條消息到kafka,例如:某接口未做冪等處理,接口中會發送kafka消息。
2、kafka,在消費者消費完消息后,提交offset時,kafka突然掛了,導致kafka認為此消息還未消費,又重新推送了該條消息,導致了重復消費消息。
3、消費者,在消費者消費完消息后,提交offset時,Consumer突然宕機掛掉,這個時候,kafka未接收到已處理的offset值,當Consumer恢復后,會重新消費此部分消息。
4、還有一種情況,Kafka 存在 Partition Balance 機制,會將多個 Partition 均衡分配給多個消費者。若 Consumer 在默認 5 分鐘內未處理完一批消息,會觸發 Rebalance 機制,導致 offset 自動提交失敗,重新 Rebalance 后,消費者會從之前未提交的 offset 位置開始消費,從而造成消息重復消費。
那么我們該如何防止消息被重復消費呢
其實上面的1、2、3、4這些情況都可以用冪等機制來防止消息被重復消費。為消息生成 一個唯一標識,并保存到 mysql 或 redis 中,處理消息前先到 mysql 或 redis 中判斷該消息是否已被消費過。
但是第4種情況,前提是要先優化消費端處理性能,避免觸發 Rebalance,例如:采用異步方式處理消息、縮短單個消息消費時長、調整消息處理超時時間、減少一次性從 Broker 拉取的數據條數等。
kafka什么情況下會發生reblanace(重平衡)
Kafka 的重平衡(Rebalance)是指消費者組(Consumer Group)內的消費者與分區(Partition)之間的分配關系發生重新調整的過程。
主要有以下幾種情況會觸發:
1、消費者組成員數量發生變化。((新消費者的加入或者退出)
2、訂閱主題(Topic)數量發生變化。
3、訂閱主題的分區(Partition)數發生變化。
還有某些異常情況也會觸發Rebalance:
1、消費端處理消息超時,上面我們說到過,若消費者未在設定時間內處理完消息,消費者組會認為當前消費者有問題了,會觸發Rebalance,重新分配消息。又或者當前消費者掛了,也是一樣會觸發Rebalance。
2、組協調器(Group Coordinator)是 Kafka 負責管理消費者組的 Broker 節點。如果它崩潰或者發生故障。Kafka 需要重新選舉新的 Group Coordinator ,并進行重平衡。
當Kafka 集群要觸發重平衡機制時,大致的步驟如下:
1.暫停消費:在重平衡開始之前,Kafka 會暫停所有消費者的拉取操作,以確保不會出現重平衡期間的消息丟失或重復消費。
2.計算分區分配方案:Kafka 集群會根據當前消費者組的消費者數量和主題分區數量,計算出每個消費者應該分配的分區列表,以實現分區的負載均衡。
3.通知消費者:一旦分區分配方案確定,Kafka 集群會將分配方案發送給每個消費者,告訴它們需要消費的分區列表,并請求它們重新加入消費者組。
4.重新分配分區:在消費者重新加入消費者組后,Kafka 集群會將分區分配方案應用到實際的分區分配中,重新分配主題分區給各個消費者。
5.恢復消費:最后,Kafka 會恢復所有消費者的拉取操作,允許它們消費分配給自己的分區。