得物面試:Kafka消息0丟失,如何實現?

得物面試:Kafka消息0丟失,如何實現?

尼恩說在前面

在40歲老架構師 尼恩的讀者交流群(50+)中,最近有小伙伴拿到了一線互聯網企業如得物、阿里、滴滴、極兔、有贊、希音、百度、網易、美團的面試資格,遇到很多很重要的面試題:

Kafka消息0丟失,如何實現?

? kafka如何保證消息不丟失?

最近有小伙伴在面試得物,又遇到了相關的面試題。小伙伴懵了,因為沒有遇到過,所以支支吾吾的說了幾句,面試官不滿意,面試掛了。

所以,尼恩給大家做一下系統化、體系化的梳理,使得大家內力猛增,可以充分展示一下大家雄厚的 “技術肌肉”,讓面試官愛到 “不能自已、口水直流”,然后實現”offer直提”。

當然,這道面試題,以及參考答案,也會收入咱們的 《尼恩Java面試寶典PDF》V140版本,供后面的小伙伴參考,提升大家的 3高 架構、設計、開發水平。

《尼恩 架構筆記》《尼恩高并發三部曲》《尼恩Java面試寶典》的PDF,請到文末公號【技術自由圈】獲取

在這里插入圖片描述

消息的發送流程

一條消息從生產到被消費,將會經歷三個階段:

  • 生產階段,Producer 新建消息,而后經過網絡將消息投遞給 MQ Broker
  • 存儲階段,消息將會存儲在 Broker 端磁盤中
  • 消息階段, Consumer 將會從 Broker 拉取消息

以上任一階段, 都可能會丟失消息,只要這三個階段0丟失,就能夠完全解決消息丟失的問題。

生產階段如何實現0丟失方式

從架構視角來說, kafka 生產者之所以會丟消息,和Producer 的高吞吐架構有關。

Producer 的高吞吐架構

Producer 的高吞吐架構: 異步發生+ 批量發送。

Kafka的Producer發送消息采用的是異步發送的方式。
在消息發送的過程中,涉及到了兩個線程和一個隊列:

  • 業務線程 和 Sender線程
  • 以及一個消息累積器 : RecordAccumulator。

在這里插入圖片描述

Kafka Producer SDK會創建了一個消息累積器 RecordAccumulator,里邊使用 雙端隊列 緩存消息, 業務線程 將消息加入到 RecordAccumulator ,業務線程就返回了。

這就是業務發送的妙處, 注意, 業務線程就返回了,但是底層的發送工作,還沒開始。

誰來負責底層發送呢? Sender線程。

Sender線程不斷從 RecordAccumulator 中拉取消息,負責發送到Kafka broker。

在這里插入圖片描述

這回我們明白了, 原來機關在這里:

  • kafka在發送消息時,是由底層的SEND線程進行消息的批量發送,不是由業務代碼線程執行發送的。

  • 業務代碼線程執行完send方法后,就返回了。

    很多小伙伴 沒有寫個kafka發消息的代碼,下面有一個demo,大家一看就明白了:

package org.example;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Producer<String, String> producer = getProducer();// Kafka 主題名稱String topic = "mytopic";// 發送消息String message = "Hello, Kafka!";ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);producer.send(record);// 關閉 Kafka 生產者producer.close();}private static Producer<String, String> getProducer() {Properties properties = getProperties();// 創建 Kafka 生產者Producer<String, String> producer = new KafkaProducer<>(properties);return producer;}private static Properties getProperties() {// Kafka 服務器地址和端口String bootstrapServers = "localhost:9092";// Kafka 生產者配置Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return properties;}
}

消息到底發送給broker側沒有了?通過send方法其實是無法知道的。

上面的代碼,創建消息之后,就開始發送消息了

 ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
producer.send(record);

其實,如果要想知道發現的結果,可以通過send方法的 Future實例去實現:

Future<RecordMetadata> future = producer.send(record);

Producer#send方法是一個異步方法,即它會立即返回一個Future對象,而不會等待消息發送完成。

可以使用Future對象來異步處理發送結果,例如等待發送完成或注冊回調函數來處理結果。具體的辦法是,給Future去注冊回調函數處理結果,這樣可以實現非阻塞的方式處理發送完成的回調。

Future去注冊回調函數處理結果,下面是一個示例代碼:

future.addCallback(new ListenableFutureCallback<RecordMetadata>() {@Overridepublic void onSuccess(RecordMetadata metadata) {System.out.println("消息發送成功,分區:" + metadata.partition() + ",偏移量:" + metadata.offset());}@Overridepublic void onFailure(Throwable ex) {System.err.println("消息發送失敗:" + ex.getMessage());}
});

還沒有其他的方法, 獲取發送的處理結果呢?

其實,Producer#send方法有兩個重載版本, 具體如下:

Future<RecordMetadata> send(ProducerRecord<K, V> record)
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)

除了上面尼恩給大家介紹的是一個參數的版本, 實際上還有一個兩個參數的版本:

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)

通過這個版本, 大家可以注冊回調函數的方式,完成發送結果的處理。通過回調函數版本,更好的實現非阻塞的方式處理發送完成的回調。

參考的代碼如下:

   producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("Error sending message: " + exception.getMessage());} else {System.out.println("Message sent successfully! Topic: " + metadata.topic() +", Partition: " + metadata.partition() + ", Offset: " + metadata.offset());}}});

通過這個callback回調函數版本的send方法,如果Producer底層的send線程發送給broker端成功/(失敗的)了,都可以回調callback函數,通知上層業務應用。

一般來說,大家在callback函數里,根據回調函數的參數,就能知道消息是否發送成功了,如果發送失敗了,那么我們還可以在callback函數里重試。

Producer(生產者)保證消息不丟失的方法:

如果要保證 Producer(生產者)0 丟失, Producer 端的策略是啥呢?

尼恩從40歲老架構師的視角,給大家出個狠招, 包括下面的三板斧:

  • Producer一板斧:設置最高可靠的、最為嚴格的發送確認機制

  • Producer二板斧:設置嚴格的消息重試機制,比如增加重試次數

  • Producer三板斧:本地消息表+定時掃描

在這里插入圖片描述

Producer一板斧:設置最高可靠的、最為嚴格的發送確認機制

Producer可以使用Kafka的acks參數來配置發送確認機制。這個acks參數用來指定分區中必須要有多少個副本收到這條消息,之后生產者才會認為這條消息是成功寫入的。通過設置合適的acks參數值,Producer可以在消息發送后等待Broker的確認。

確認機制提供了不同級別的可靠性保證,包括:

  • acks=0:Producer在發送消息后不會等待Broker的確認,這可能導致消息丟失風險。
  • acks=1:Producer在發送消息后等待Broker的確認,確保至少將消息寫入到Leader副本中。
  • acks=all或acks=-1:Producer在發送消息后等待Broker的確認,確保將消息寫入到所有ISR(In-Sync Replicas)副本中。這提供了最高的可靠性保證。

尼恩提示:由于acks 是生產者客戶端中一個非常重要的參數,它涉及消息的可靠性和吞吐量之間的權衡,所以非常重要。

下面對于acks 參數的3種類型的值(都是字符串類型), 更加詳細的介紹一下。

  • acks = 1。默認值即為1。生產者發送消息之后,只要分區的 leader 副本成功寫入消息,那么它就會收到來自服務端的成功響應。如果消息無法寫入 leader 副本,比如在 leader 副本崩潰、重新選舉新的 leader 副本的過程中,那么生產者就會收到一個錯誤的響應,為了避免消息丟失,生產者可以選擇重發消息。如果消息寫入 leader 副本并返回成功響應給生產者,且在被其他 follower 副本拉取之前 leader 副本崩潰,那么此時消息還是會丟失,因為新選舉的 leader 副本中并沒有這條對應的消息。acks 設置為1,是消息可靠性和吞吐量之間的折中方案。
  • acks = 0。生產者發送消息之后不需要等待任何服務端的響應。如果在消息從發送到寫入 Kafka 的過程中出現某些異常,導致 Kafka 并沒有收到這條消息,那么生產者也無從得知,消息也就丟失了。在其他配置環境相同的情況下,acks 設置為0可以達到最大的吞吐量。
  • acks = -1 或 acks = all。生產者在消息發送之后,需要等待 ISR 中的所有副本都成功寫入消息之后才能夠收到來自服務端的成功響應。在其他配置環境相同的情況下,acks 設置為 -1(all) 可以達到最強的可靠性。但這并不意味著消息就一定可靠,因為ISR中可能只有 leader 副本,這樣就退化成了 acks=1 的情況。要獲得更高的消息可靠性需要配合 min.insync.replicas 等參數的聯動,消息可靠性分析的具體內容可以參考《圖解Kafka之核心原理》。

注意 acks 參數配置的值是一個字符串類型,而不是整數類型。舉個例子,將 acks 參數設置為0,需要采用下面這兩種形式:

properties.put("acks", "0");
# 或者
properties.put(ProducerConfig.ACKS_CONFIG, "0");

設置最高可靠的發送確認機制,通過設置acks參數來控制消息的確認方式。

acks參數可以設置為"all"或者 -1,表示要求所有副本都確認消息,這樣可以最大程度地保證消息的可靠性。

properties.put(ProducerConfig.ACKS_CONFIG, "all");
# 或者
properties.put(ProducerConfig.ACKS_CONFIG, "-1");

Producer二板斧:設置嚴格的消息重試機制,包括增加重試次數

很多消息,因為臨時的網絡問題或Broker故障而丟失。通過消息重試機制,可以保證消息不會因為臨時的網絡問題或Broker故障而丟失。

消息在從生產者發出到成功寫入服務器之前可能發生一些臨時性的異常,比如網絡抖動、leader 副本的選舉等,這種異常往往是可以自行恢復的,生產者可以通過配置 retries 大于0的值,以此通過內部重試來恢復而不是一味地將異常拋給生產者的應用程序。

所以,如果發送失敗,Producer可以重新發送消息,直到成功或達到最大重試次數。

消息重試機制涉及到兩個參數:

  • retries
  • retry.backoff.ms

下面有個例子(這里重試配置了 10 次,重試10 次之后沒有答復,就會拋出異常,并且,下面的例子每一次重試直接的時間間隔是1秒):

在這里插入圖片描述

retries 參數用來配置生產者重試的次數,默認值為0,也就是說,默認情況下,在發生異常的時候不進行任何重試動作。

retries 參數設置之后,如果重試達到設定的retries 次數,那么生產者就會放棄重試并返回異常。

當然,retries 重試還和另一個參數 retry.backoff.ms 有關,這個參數的默認值為100,它用來設定兩次重試之間的時間間隔,默認為100ms,避免無效的頻繁重試。

在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的異常恢復時間,這樣可以設定總的重試時間大于這個異常恢復時間,以此來避免生產者過早地放棄重試。

Producer三板斧:本地消息表+定時掃描

40歲老架構師尼恩慎重提示:前面兩板斧,并不能保證100%的0丟失。

為啥呢?因為 broker端是異步落盤機制,異步落盤待會詳細分析。總之,異步落盤就是broker 消息沒有落盤,就返回結果的。

雖然第一板斧設置了最為嚴格的確認機制,在這里,尼恩提醒大家一個極端情況: 哪怕全部的broker 返回了確認結果, 消息也不一定落盤和被投遞出去,如果broker 集體斷電,還是丟了。

所以說:僅僅依靠回調函數的、設置最高可靠的確認機制,設置最重的重試策略,還是不能保證消息一定被consumer消費的。

另外,callback函數也是不可靠的。比如,剛好遇到在執行回調函數時,jvm OOM/ jvm假死的情況;那么回調函數是不能被執行的。

如何實現極端嚴格的場景下消息0丟失?尼恩告訴大家,可以和本地消息表事務機制類似,采用 本地消息表+定時掃描 的架構方案。

大概流程如下圖

在這里插入圖片描述

1、設計一個本地消息表,可以存儲在DB里,或者其它存儲引擎里,用戶保存消息的消費狀態

2、Producer 發送消息之前,首先保證消息的發生狀態,并且初始化為待發送;

3、如果消費者(如庫存服務)完成的消費,則通過RPC,調用Producer 去更新一下消息狀態;

4、Producer 利用定時任務掃描 過期的消息(比如10分鐘到期),再次進行發送。

在這里尼恩想說的是: 本地消息表+定時掃描 的架構方案 ,是業務層通過額外的機制來保證消息數據發送的完整性,是一種很重的方案。 這個方案的兩個特點:

  • CP 不是 AP,性能低

  • 需要 做好冪等性設計

CP 不是 AP的 需要權衡,請參見全網最好的架構設計個黃金法則,尼恩的 專門文章具體如下:

一張圖總結架構設計的40個黃金法則

全網最好的冪等性 方案,請參見尼恩的 專門文章, 具體如下:

最系統的冪等性方案:一鎖二判三更新

在這里插入圖片描述

Broker端保證消息不丟失的方法:

首先,尼恩想說正常情況下,只要 Broker 在正常運行,就不會出現丟失消息的問題。但是如果 Broker 出現了故障,比如進程死掉了或者服務器宕機了,還是可能會丟失消息的。

如果確保萬無一失,實現Broker端保證消息不丟失,有兩板斧:

  • Broker端第一板斧:設置嚴格的副本同步機制
  • Broker端第二板斧:設置嚴格的消息刷盤機制

在這里插入圖片描述

Broker端第一板斧:設置嚴格的副本同步機制

kafka應對此種情況,建議是通過多副本機制來解決的,核心思想也挺簡單的:如果數據保存在一臺機器上你覺得可靠性不夠,那么我就把相同的數據保存到多臺機器上,某臺機器宕機了可以由其它機器提供相同的服務和數據。

要想達到上面效果,有三個關鍵參數需要配置

  • 第1:在broker端 配置 min.insync.replicas參數設置至少為2
    此參數代表了 上面的“大多數”副本。為2表示除了寫入leader分區外,還需要寫入到一個follower 分區副本里,broker端才會應答給生產端消息寫入成功。此參數設置需要搭配第一個參數使用。

  • 第2:在broker端配置 replicator.factor參數至少3
    此參數表示:topic每個分區的副本數。如果配置為2,表示每個分區只有2個副本,在加上第二個參數消息寫入時至少寫入2個分區副本,則整個寫入邏輯就表示集群中topic的分區副本不能有一個宕機。如果配置為3,則topic的每個分區副本數為3,再加上第二個參數min.insync.replicas為2,即每次,只需要寫入2個分區副本即可,另外一個宕機也不影響,在保證了消息不丟的情況下,也能提高分區的可用性;只是有點費空間,畢竟多保存了一份相同的數據到另外一臺機器上。

  • 第3:unclean.leader.election.enable

    此參數表示:沒有和leader分區保持數據同步的副本分區是否也能參與leader分區的選舉,建議設置為false,不允許。如果允許,這這些落后的副本分區競選為leader分區后,則之前leader分區已保存的最新數據就有丟失的風險。注意在0.11版本之前默認為TRUE。

所以,通過如下配置來保證Broker消息可靠性:

  • default.replication.factor:設置為大于等于3,保證一個partition中至少有兩個Replica,并且replication.factor > min.insync.replicas
  • min.insync.replicas:設置為大于等于2,保證ISR中至少有兩個Replica
  • unclean.leader.election.enable=false,那么就意味著非ISR中的副本不能夠參與選舉,避免臟Leader。

在這里插入圖片描述

Kafka的ISR機制可自動動態調整同步復制的Replica,將慢(可能是暫時的慢)Follower踢出ISR,將同步趕上的Follower拉回ISR,避免最慢的Follower拖慢整體速度,最大限度地兼顧了可靠性和可用性。

Broker端第二板斧:設置嚴格的消息刷盤機制

無論是kafka、Rocketmq、還是Mysql,為了提升底層IO的寫入性能,都會用到操作系統的 Page Cache 技術。注意,這里的 Page Cache 是操作系統提供的緩存機制。具體請參考尼恩的架構視頻《葵花寶典》。

我們的kafka、Rocketmq、Mysql程序 在讀寫磁盤文件時,其實操作的都是內存,然后由操作系統決定什么時候將 Page Cache 里的數據真正刷入磁盤。如果 Page Cache 內存中數據還未刷入磁盤,而我們的服務器宕機了,這個時候還是會丟消息的。

刷盤的方式有同步刷盤和異步刷盤兩種。

  • 同步刷盤指的是:生產者消息發過來時,只有持久化到磁盤,RocketMQ、kafka的存儲端Broker才返回一個成功的ACK響應,這就是同步刷盤。它保證消息不丟失,但是影響了性能。

  • 異步刷盤指的是:消息寫入PageCache緩存,就返回一個成功的ACK響應,不管消息有沒有落盤,就返回一個成功的ACK響應。這樣提高了MQ的性能,但是如果這時候機器斷電了,就會丟失消息。

同步刷盤和異步刷盤的區別如下:

  • 同步刷盤:當數據寫如到內存中之后立刻刷盤(同步),在保證刷盤成功的前提下響應client。
  • 異步刷盤:數據寫入內存后,直接響應client。異步將內存中的數據持久化到磁盤上。

同步刷盤和異步輸盤的優劣:

  • 同步刷盤保證了數據的可靠性,保證數據不會丟失。
  • 同步刷盤效率較低,因為client獲取響應需要等待刷盤時間,為了提升效率,通常采用批量輸盤的方式,每次刷盤將會flush內存中的所有數據。(若底層的存儲為mmap,則每次刷盤將刷新所有的dirty頁)
  • 異步刷盤不能保證數據的可靠性.
  • 異步刷盤可以提高系統的吞吐量.
  • 常見的異步刷盤方式有兩種,分別是定時刷盤和觸發式刷盤。定時刷盤可設置為如每1s刷新一次內存.觸發刷盤為當內存中數據到達一定的值,會觸發異步刷盤程序進行刷盤。

Broker端第二板斧:設置嚴格的消息刷盤機制,設置為Kafka同步刷盤。

如何設置Kafka同步刷盤?

網上有一種說法,kafka不支持同步刷盤,這種說法,實際上是錯的。

為啥了?可以通過參數的配置變成同步刷盤

log.flush.interval.messages  //page cache里邊多少條消息刷盤1次 ,默認值 LONG.MAX_VALUE
log.flush.interval.ms  //隔多長時間刷盤1次,默認值 LONG.MAX_VALUE
log.flush.scheduler.interval.ms //周期性的刷盤,缺省3000,即3s。

源碼里邊,有這些參數的注釋:

public static final String FLUSH_MESSAGES_INTERVAL_CONFIG = "flush.messages";
public static final String FLUSH_MESSAGES_INTERVAL_DOC = "This setting allows specifying an interval at " +"which we will force an fsync of data written to the log  多少消息刷盤. For example if this was set to 1 we would fsync after every message  設置為1 就是1條消息就刷盤,也就是同步刷盤模式; if it were 5 we would fsync after every five messages. " +"In general we recommend you not set this and use replication for durability and allow the " +"operating system's background flush capabilities as it is more efficient. This setting can " +"be overridden on a per-topic basis (see <a href=\"#topicconfigs\">the per-topic configuration section</a>).";public static final String FLUSH_MS_CONFIG = "flush.ms";
public static final String FLUSH_MS_DOC = "This setting allows specifying a time interval at which we will " +"force an fsync of data written to the log. For example if this was set to 1000 " +"we would fsync after 1000 ms had passed. In general we recommend you not set " +"this and use replication for durability and allow the operating system's background " +"flush capabilities as it is more efficient.";

如果要同步刷盤,可以使用下面的配置:

# 當達到下面的消息數量時,會將數據flush到日志文件中。默認10000
#log.flush.interval.messages=10000
# 當達到下面的時間(ms)時,執行一次強制的flush操作。interval.ms和interval.messages無論哪個達到,都會flush。默認3000ms
#log.flush.interval.ms=1000
# 檢查是否需要將日志flush的時間間隔
log.flush.scheduler.interval.ms = 3000

CP 不是 AP,CP性能很低,請參見全網最好的架構設計個黃金法則,尼恩的 專門文章具體如下:

一張圖總結架構設計的40個黃金法則

設置了同步刷盤后,在掉電的情況下,數據是不丟失了,但是,kafka 吞吐量降低到了 500tps 以下。

在這里插入圖片描述

而異步刷盤, 吞吐量 的單節點在 10000W以上,差異巨大的。

注意,同步刷盤,性能 足足相差20倍。

在這里插入圖片描述

注意: 尼恩沒有做對比測試,上面的數據,來自于互聯網一個小伙伴的對比測試。 有興趣的小伙伴,可以自己試試。

40歲老架構尼 恩提示:還是那句老化,CP 不是 AP的 需要權衡,請參見全網最好的架構設計40個黃金法則,尼恩的 專門文章具體如下:

一張圖總結架構設計的40個黃金法則

Consumer(消費者)保證消息不丟失的方法:

如果要保證 Consumer(消費者)0 丟失, Consumer 端的策略是啥呢?

這個比較簡單,只要一招就夠:消費成功之后,手動ACK提交消費位移(位點)。

這一招分為兩步:

  • 設置enable.auto.commit 為 false

  • commitSync() 和 commitAsync() 組合使用進行手動提交

在這里插入圖片描述

什么是Consumer 位移

Consumer 程序有個“位移”(/位點)的概念,表示的是這個 Consumer 當前消費到的 Topic Partion分區的位置。

下面這張圖來自于官網,它清晰地展示了 Consumer 端的位移數據。

圖片

enable.auto.commit=false 關閉自動提交位移,消息處理完成之后再提交offset

consumer端需要為每個它要讀取的分區保存消費進度,即分區中當前消費消息的位置,該位置稱為位移(offset)。每個Consumer Group獨立維護offset,互不干擾,不存在線程安全問題。kafka中的consumer group中使用一個map來保存其訂閱的topic所屬分區的offset:

在這里插入圖片描述

實際上,這里的位移值通常是下一條待消費的消息的位置,因為位移是從0開始的,所以位移為N的消息其實是第N+1條消息。在consumer中有如下位置信息:

在這里插入圖片描述

  • 上次提交位移:consumer最后一次提交的offset值;

  • 當前位置:consumer已經讀取,但尚未提交時的位置;

  • 水位:也稱為高水位,代表consumer是否可讀。對于處于水位以下(水位左側)的所有消息,consumer是可以讀取的,水位以上(水位又側)的消息consumer不讀取;

  • 日志最新位移:也稱日志終端位移,表示了某個分區副本當前保存消息對應的最大位移值。

consumer需要定期向Kafka提交自己的位置信息,這一過程稱為位移提交(offset commit)。

consumer提交的對象,叫做 coordinator。

consumer會在所有的broker中選擇一個broker作為consumer group的coordinator,coordinator用于實現組成員管理、消費分配方案制定以及位移提交等。

如何選擇coordinator,依據就是kafka的內置topic(_consumer_offsets)。內置_consumer_offsets 的topic與普通topic一樣,配置多個分區,每個分區有多個副本,它存在的唯一目的就是保存consumer提交的位移。

當消費者組首次啟動的時候,由于沒有初始的位移信息,coordinator需要為其確定初始位移值,這就是consumer參數 auto.offset.reset 的作用,通常情況下,consumer要么從最開始位移開始讀取。

當cosumer運行一段時間之后,就需要提交自己的位移信息,如果consumer奔潰或者被關閉,它負責的分區就會被分配給其他consumer,因此一定要在其他consumer讀取這些分區前,就做好位移提交,否則會出現重復消費。

consumer提交位移的主要機制,也是發消息實現的。具體來說,是通過向所屬的coordinator發送位移提交請求消息來實現的。

每個位移提交請求都會向_consumer_offsets對應分區寫入一條消息,消息的key是group.id,topic和分區的元組,value是位移值。

如果consumer為同一個group的同一個topic分區提交了多次位移,那么只有最新的那次提交的位移值是有效的,其余幾次提交的位移值都已經過期,Kafka通過壓實(compact)策略來處理這種消息使用模式,

consumer提交位移,有兩大模式:

  1. 自動提交位移:Consumer可以選擇啟用自動提交位移的功能。當Consumer成功處理一批消息后,它會自動提交當前位移,標記為已消費。這樣即使Consumer發生故障,它可以使用已提交的位移來恢復并繼續消費之前未處理的消息。
  2. 手動提交位移:Consumer還可以選擇手動提交位移的方式。在消費一批消息后,Consumer可以顯式地提交位移,以確保處理的消息被正確記錄。這樣可以避免重復消費和位移丟失的問題。

Consumer 端有個參數 enable.auto.commit(默認值就是 true),把它設置為 true , 就是自動提交位移的。

自動提交的參考代碼如下:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserialprops.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeseriKafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while(true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),
}

和自動提交配合的參數,還有一個 auto.commit.interval.ms。它的默認值是 5 秒,表明 Kafka 每 5 秒會為你自動提交一次位移。

高并發場景,一定是多線程異步消費消息,自動提交就不管有沒有消費成功, 位點都提交了,所以為了保證0丟失,消費者 Consumer 程序不要開啟自動提交位移,而是要應用程序手動提交位移。

開啟手動提交位移的方法就是設置enable.auto.commit 為 false。

但是,僅僅設置它為 false 還不夠,這個配置只是告訴Kafka Consumer 不要自動提交位移而已,應用程序還需要調用相應的 API 手動提交位移。

手動提交位移的 API,一個最簡單的是 同步提交位移,KafkaConsumer#commitSync()。該方法會提交KafkaConsumer#poll() 返回的最新位移。

下面這段代碼展示了 commitSync() 的使用方法:

下面是手動提交位移的例子:

while(true) {ConsumerRecords<String, String>records=consumer.poll(Duration.ofSeconds(1));process(records);//處理消息try {consumer.commitSync();}catch (CommitFailedException e) {handle(e);//處理提交失敗異常}
}

可見,調用 consumer.commitSync() 方法的時機,是在你處理完了 poll() 方法返回的所有消息之后。

KafkaConsumer#commitSync() 它是一個同步操作,即該方法會一直等待,直到位移被成功提交才會返回。如果提交過程中出現異常,該方法會將異常信息拋出。

除了同步提交,Kafka 社區為手動提交位移提供了另一個異步 API 方法:KafkaConsumer#commitAsync()。

異步提交的優勢:調用 commitAsync() 之后,它會立即返回,不會阻塞,因此不會影響 Consumer 應用的 TPS。

按照40歲老架構師 尼 恩經驗: 異步往往配合了回調,Kafka 提供了回調函數(callback),回調用于處理提交之后的邏輯,比如記錄日志或處理異常等。

下面這段代碼展示了調用 commitAsync() 的方法:

while(true) {ConsumerRecords<String, String>records=consumer.poll(Duration.ofSeconds(1));process(records);//處理消息consumer.commitAsync((offsets, exception) -> {if(exception != null)handle(exception);});

如何又能保證 提交的高性能,又能重復利用 commitSync 的自動重試來規避那些瞬時錯誤(比如網絡的瞬時抖動,Broker 端 GC 等)呢?

答案是: commitSync() 和 commitAsync() 組合使用。

它展示的是如何將兩個 API 方法commitSync() 和 commitAsync() 組合使用進行手動提交。

try{while(true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));process(records);//處理消息commitAysnc();//使用異步提交規避阻塞}
}
catch(Exception e) {handle(e);//處理異常
}
finally{try{consumer.commitSync();//最后一次提交使用同步阻塞式提交}finally{consumer.close();

Kafak 0丟失的最佳實踐

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。

    記住,一定要使用帶有回調通知的 send 方法。

  2. 設置 acks = all 設置最高可靠的、最為嚴格的發送確認機制。acks 設置成 all,則表明所有副本 Broker 都要接收到消息,該消息才算是“已提交”。這是最高等級的“已提交”定義。

  3. 設置 retries 為一個較大的值如10,設置嚴格的消息重試機制,包括增加重試次數。當出現網絡的瞬時抖動時,消息發送可能會失敗,retries 較大,能夠自動重試消息發送,避免消息丟失。

  4. Broker 端設置 unclean.leader.election.enable = false。它控制的是哪些 Broker 有資格競選分區的 Leader。如果一個 Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會造成消息的丟失。故一般都要將該參數設置成 false,即不允許這種情況的發生。

  5. Broker 端設置 replication.factor >= 3。這里想表述的是,最好將消息多保存幾份,畢竟目前防止消息丟失的主要機制就是冗余。

  6. Broker 端設置 min.insync.replicas > 1。控制的是消息至少要被寫入到多少個副本才算是“已提交”。設置成大于 1 可以提升消息持久性。在實際環境中千萬不要使用默認值 1。

  7. Broker 端設置 replication.factor > min.insync.replicas。如果兩者相等,那么只要有一個副本掛機,整個分區就無法正常工作了。我們不僅要改善消息的持久性,防止數據丟失,還要在不降低可用性的基礎上完成。推薦設置成 replication.factor = min.insync.replicas + 1。

  8. Consumer 端 確保消息消費完成再提交。Consumer 端有個參數 enable.auto.commit 設置成 false,并采用將兩個 API 方法commitSync() 和 commitAsync() 組合使用進行手動提交位移的方式。這對于單 Consumer 多線程處理的場景而言是至關重要的。

  9. 業務維度的的0丟失架構, 采用 本地消息表+定時掃描 架構方案,實現業務維度的 0丟失,100%可靠性。

如上,就是尼恩為大家梳理的,史上最牛掰的 答案, 全網最為爆表的方案。按照尼恩的套取去回到, 面試官一定驚到掉下巴。 offer直接奉上。此答案大家可以收藏一起,有時間看看。

在這里插入圖片描述

說在最后:有問題找老架構取經

以上的內容,如果大家能對答如流,如數家珍,基本上 面試官會被你 震驚到、吸引到。

最終,讓面試官愛到 “不能自已、口水直流”。offer, 也就來了。

在面試之前,建議大家系統化的刷一波 5000頁《尼恩Java面試寶典PDF》,里邊有大量的大廠真題、面試難題、架構難題。很多小伙伴刷完后, 吊打面試官, 大廠橫著走。

在刷題過程中,如果有啥問題,大家可以來 找 40歲老架構師尼恩交流。

另外,如果沒有面試機會,可以找尼恩來改簡歷、做幫扶。

遇到職業難題,找老架構取經, 可以省去太多的折騰,省去太多的彎路。

尼恩指導了大量的小伙伴上岸,前段時間,剛指導一個40歲+被裁小伙伴,拿到了一個年薪100W的offer。

狠狠卷,實現 “offer自由” 很容易的, 前段時間一個武漢的跟著尼恩卷了2年的小伙伴, 在極度嚴寒/痛苦被裁的環境下, offer拿到手軟, 實現真正的 “offer自由” 。

尼恩技術圣經系列PDF

  • 《NIO圣經:一次穿透NIO、Selector、Epoll底層原理》
  • 《Docker圣經:大白話說Docker底層原理,6W字實現Docker自由》
  • 《K8S學習圣經:大白話說K8S底層原理,14W字實現K8S自由》
  • 《SpringCloud Alibaba 學習圣經,10萬字實現SpringCloud 自由》
  • 《大數據HBase學習圣經:一本書實現HBase學習自由》
  • 《大數據Flink學習圣經:一本書實現大數據Flink自由》
  • 《響應式圣經:10W字,實現Spring響應式編程自由》
  • 《Go學習圣經:Go語言實現高并發CRUD業務開發》

……完整版尼恩技術圣經PDF集群,請找尼恩領取

《尼恩 架構筆記》《尼恩高并發三部曲》《尼恩Java面試寶典》PDF,請到下面公號【技術自由圈】取↓↓↓

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/696861.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/696861.shtml
英文地址,請注明出處:http://en.pswp.cn/news/696861.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

新版Java面試專題視頻教程——多線程篇②

新版Java面試專題視頻教程——多線程篇② 0. 問題匯總0.1 線程的基礎知識0.2 線程中并發安全0.3 線程池0.4 使用場景 1.線程的基礎知識2.線程中并發鎖3.線程池3.1 說一下線程池的核心參數&#xff08;線程池的執行原理知道嘛&#xff09;3.2 線程池中有哪些常見的阻塞隊列Array…

高級語言期末2014級A卷

1.編寫函數 int delarr(int a[] ,int n)&#xff0c;刪除有n個元素的正整型數組a中所有素數&#xff0c;要求&#xff1a; 1&#xff09;數組a中剩余元素保持原來次序&#xff1b; 2&#xff09;將處理后的數組輸出&#xff1b; 3&#xff09;函數值返回剩余元素個數&#xff1…

MySQL索引面試題(高頻)

文章目錄 前言什么時候需要&#xff08;不需要&#xff09;)使用索引&#xff1f;有哪些優化索引的方法前綴索引優化索引覆蓋優化索引失效場景 總結 前言 今天來講一講 MySQL 索引的高頻面試題。主要是針對前一篇文章 MySQL索引入門&#xff08;一文搞定&#xff09;進行查漏補…

虛擬機的內存結構

一、摘要 熟悉 Java 語言特性的同學都知道&#xff0c;相比 C、C 等編程語言&#xff0c;Java 無需通過手動方式回收內存&#xff0c;內存中所有的對象都可以交給 Java 虛擬機來幫助自動回收&#xff1b;而像 C、C 等編程語言&#xff0c;需要開發者通過代碼手動釋放內存資源&…

MedicalGPT 訓練醫療大模型,實現了包括增量預訓練、有監督微調、RLHF(獎勵建模、強化學習訓練)和DPO(直接偏好優化)

MedicalGPT 訓練醫療大模型&#xff0c;實現了包括增量預訓練、有監督微調、RLHF(獎勵建模、強化學習訓練)和DPO(直接偏好優化)。 MedicalGPT: Training Your Own Medical GPT Model with ChatGPT Training Pipeline. 訓練醫療大模型&#xff0c;實現了包括增量預訓練、有監督微…

Linux第63步_為新創建的虛擬機添加必要的目錄和安裝支持linux系統移植的軟件

1、創建必要的目錄 1)、創建“/home/zgq/linux/”目錄 打開終端&#xff0c;進入“/home/zgq/”目錄 輸入“mkdir linux回車”&#xff0c;創建“/home/zgq/linux/”目錄 輸入“ls回車”&#xff0c;列舉“/home/zgq/”目錄的所有文件和文件夾 創建好“/home/zgq/linux/”…

EIS(防抖):meshflow算法 C++實現

視頻防抖的應用 對視頻防抖的需求在許多領域都有。 這在消費者和專業攝像中是極其重要的。因此&#xff0c;存在許多不同的機械、光學和算法解決方案。即使在靜態圖像拍攝中&#xff0c;防抖技術也可以幫助拍攝長時間曝光的手持照片。 在內窺鏡和結腸鏡等醫療診斷應用中&…

Go 中的 init 如何用?它的常見應用場景有哪些呢?

嗨&#xff0c;大家好&#xff01;我是波羅學。本文是系列文章 Go 技巧第十六篇&#xff0c;系列文章查看&#xff1a;Go 語言技巧。 Go 中有一個特別的 init() 函數&#xff0c;它主要用于包的初始化。init() 函數在包被引入后會被自動執行。如果在 main 包中&#xff0c;它也…

QT基本組件

四、基本組件 Designer 設計師&#xff08;重點&#xff09; Qt包含了一個Designer程序&#xff0c;用于通過可視化界面設計開發界面&#xff0c;保存文件格式為.ui&#xff08;界面文件&#xff09;。界面文件內部使用xml語法的標簽式語言。 在Qt Creator中創建文件時&#xf…

滾雪球學Java(67):深入理解 TreeMap:Java 中的有序鍵值映射表

咦咦咦&#xff0c;各位小可愛&#xff0c;我是你們的好伙伴——bug菌&#xff0c;今天又來給大家普及Java SE相關知識點了&#xff0c;別躲起來啊&#xff0c;聽我講干貨還不快點贊&#xff0c;贊多了我就有動力講得更嗨啦&#xff01;所以呀&#xff0c;養成先點贊后閱讀的好…

機器人內部傳感器閱讀筆記及心得-位置傳感器-旋轉變壓器、激光干涉式編碼器

旋轉變壓器 旋轉變壓器是一種輸出電壓隨轉角變化的檢測裝置&#xff0c;是用來檢測角位移的&#xff0c;其基本結構與交流繞線式異步電動機相似&#xff0c;由定子和轉子組成。 旋轉變壓器的原理如圖1所示&#xff0c;定子相當于變壓器的一次側&#xff0c;有兩組在空間位置上…

MyBatis-Plus 優雅實現數據加密存儲

文章目錄 前言一、數據庫字段加解密實現1. 定義加密類型枚舉2. 定義AES密鑰和偏移量3. 配置定義使用的加密類型4. 加密解密接口5. 解密解密異常類6. 加密解密實現類6.1 AES加密解密實現類6.2 Base64加密解密實現類 7. 實現數據庫的字段保存加密與查詢解密處理類8. MybatisPlus配…

使用python進行量化交易

yfinance yfinance國內不能使用&#xff0c;可以使用tushare、akshare代替 import yfinance as yf# 輸入股票代碼 stock_symbol AAPL # 替換為你想要查詢的股票代碼# 獲取股票數據 data yf.download(stock_symbol)# 打印實時數據 print(data)pip install akshare import …

Selenium安裝與配置

文章目錄 一、selenium安裝1. Python環境準備&#xff1a;2. 安裝Selenium&#xff1a;3. 瀏覽器驅動安裝&#xff1a;4. 驗證安裝&#xff1a; 二、常見問題1. Selenium版本與瀏覽器驅動程序不兼容&#xff1a;2. 瀏覽器驅動程序路徑未正確設置&#xff1a; Selenium是一個用于…

2024年1月手機市場行業分析:蘋果手機份額驟降,國產高端手機成功逆襲!

小米Ultra發布。 一方面&#xff0c;我們有望看到國產手機再一次超越自己的決心&#xff0c;繼續創新追逐高端&#xff1b;另一方面&#xff0c;我們也不得不正視目前手機市場所面臨的危機狀態。 2024年1月的線上手機市場遠不如去年。根據鯨參謀數據顯示&#xff0c;今年1月京…

Qt(C++)面試題 | 精選25項常問

面試是每個求職者都必須經歷的一關,而QT面試更是需要面試者有深厚的編程基礎和豐富的實戰經驗。下面我們為大家整理了25道QT面試題,希望能夠幫助大家在求職路上獲得成功。 ?Qt 中常用的五大模塊是哪些? Qt 中常用的五大模塊包括: QtCore:提供了 Qt 的核心功能,例如基本的…

Java面試題之分布式/微服務篇

經濟依舊不景氣啊&#xff0c;如此大環境下Java還是這么卷&#xff0c;又是一年一次的金三銀四。 兄弟們&#xff0c;你準備好了嗎&#xff1f;沖沖沖&#xff01;歐里給&#xff01; 分布式/微服務相關面試題解 題一&#xff1a;CAP理論&#xff0c;BASE理論題二&#xff1a;…

深度神經網絡

包括&#xff1a;深度前饋神經網絡、深度卷積神經網絡、深度循環神經網絡 深度神經網絡全面概述&#xff1a;從基本概念到實際模型和硬件基礎-騰訊云開發者社區-騰訊云

MQL語言實現JSON協議庫

文章目錄 一、MQL語言實現JSON協議的意義二、定義JSON數據枚舉類型簡單數據類型復雜數據類型枚舉數據類型定義類變量清理與賦值方法構造與析構方法重載運算符添加與設置方法序列化與反序列方法 一、MQL語言實現JSON協議的意義 數據交互&#xff1a;JSON是一種輕量級的數據交換格…

【2024軟件測試面試必會技能】Postman(1): postman的介紹和安裝

Postman的介紹 Postman 是一款谷歌開發的接口測試工具,使API的調試與測試更加便捷。 它提供功能強大的 Web API & HTTP 請求調試。它能夠發送任何類型的HTTP 請求 (GET, HEAD, POST, PUT..)&#xff0c;附帶任何數量的參數 headers。 postman是一款支持http協議的接口調試…