大家好,我是君哥。
使用消息隊列時,我們經常會遇到一個可能對業務產生影響的問題,消息重復。在訂單、扣款、對賬等對冪等有要求的場景,消息重復的問題必須解決。
那怎樣應對重復消息呢?今天來聊一聊這個話題。
1.三個語義
正確使用消息隊列,我們會考慮到消息防丟失、防重復,我們介紹 3 個語義:
-
At Least Once:在消息隊列中,指消息不丟失,一條消息最少被消費一次,但是可能會有重復消費。
-
Exactly Once:在消息隊列中,消息被精準消費一次,不丟失,也不會重復;
-
At Most Once:在消息隊列中,消息不會被重復消費,但是可能會有消息丟失
不同的消息場景,需要的語義不同。比如?Exactly Once?最難實現,一般需要引入事務消息。
不同使用場景,對語義的要求也不一樣。比如日志收集類的場景,At Most Once?就可以滿足,而支付類的場景則要求?Exactly Once。
2.消息重復
什么情況下會導致消息重復呢?
生產者發送消息后,Broker?保存成功,但是沒有成功給生產者返回?ACK,生產者以為消息發送失敗,重試,再次給?Broker?發送。Broker?保存了重復消息,導致?Consumer?多次消費。
消費者消費消息后,給?Broker?返回?ACK?失敗,導致?Broker?沒有修改偏移量,同一條消息再次發送給消費者,或者被消費者拉取到。
3.生產者防重
有的消息中間件是支持生產者冪等的。比如 Kafka 從 0.11.0?版本開始引入了冪等?Producer,可以使用下面代碼開啟冪等?Producer:
Properties props =?new?Properties();
//省略其他代碼
//配置冪等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,?true);?
//創建生產者實例
KafkaProducer<String, String> producer =?new?KafkaProducer<>(props);
Kafka?實現生產者冪等的原理是在生產者引入了?Producer ID(PID)和 Sequence Number?這兩個參數。
-
PID:Producer?擁有的?ID,唯一標識一個?Producer。
-
Sequence Number:自增的數值,唯一標識同一個?Producer?發送到指定分區的消息?ID。
有了這兩個參數,Broker?單分區就可以唯一標識一個生產者發送的唯一一條消息<PID,SequenceNumber>。Broker 收到消息時,如果檢查到消息的<PID,SequenceNumber>已經存在,就不會再保留這條消息。
但冪等?Producer?只能在單分區下生效,多分區情況下是不生效的。因為多個分區之間并不能相互訪問對方的<PID,SequenceNumber>。
4.Broker 防重
Broker?如果可以防重,那對于生產者和消費者來說,節省了大量的工作。下面我們看下?Pulsar?是怎樣防重的。
Broker?通過參數?BrokerDeduplicationEnabled?開啟防重功能。對于?Producer?發送的重復消息,Broker?返回響應?-1:-1。
Producer?發送消息時,會帶一個?sequenceId?字段,Broker?會按照?ProducerName?維度記錄當前生產者最大的?sequenceId(highestSequenceId)。Broker?收到消息時,首先會判斷消息中的?sequenceId?是否大于自己保存的當前生產者的?highestSequenceId,如果是則保存消息并更新?highestSequenceId,否則丟棄消息,并且給?Producer?返回?-1:-1。
下面是三個極端情況:
-
Producer?斷開連接:這種情況下,跟?Broker?重新建立連接后,本地保存的?sequenceId?還在,只要使用?sequenceId?遞增后發送消息即可;
-
Producer?宕機:Producer?重啟后,緩存的?sequenceId?肯定不存在了,這時跟?Broker?重新建立連接后,Broker?會根據?ProducerName?找出?highestSequenceId?發給?Producer,Producer?使用這個?sequenceId?來發送消息;
-
Producer 和 Broker?都宕機:Broker?重啟后,可以從宕機前保存的快照中恢復各?Producer?對應的?highestSequenceId?發送給各?Producer。但這個?highestSequenceId?不一定準確,因為?Broker?宕機瞬間很有可能最新的?sequenceId?沒有來得及保存快照。
需要注意的是,跟?Kafka?的冪等?Producer?類似,Pulsar 的 Broker?冪等也只能保證?Topic/Partition?級別。
5.消費者防重
從上面的分析可以看出,靠生產者防重和?Broker?防重,只能在?Topic/Partition?級別生效,這通常并不能滿足我們的需求。而為了避免消費者重復消費對業務造成影響,消息防重還是必要的。這就要求我們做最后一道防線,在消費端進行防重或冪等處理。
消費端做防重,就不再考慮消息中間件層面的配置(比如?sequenceId),而是從消息體進行下手。
生產者發送消息時,給消息體賦值一個全局唯一的?ID,消費者處理消息時,根據全局唯一?ID?做防重。
比如消費端的邏輯是保存一條訂單消息,那把唯一?ID?保存到數據庫并且加一個唯一索引,這樣根據唯一索引就可以做消息去重。
不過使用唯一索引也有缺點:
-
如果使用?MySQL?數據庫,不能使用?Change Buffer;
-
非插入的場景(比如更新庫存)不能去重。
對于唯一索引的缺點,我們可以引入?Redis?對唯一?ID?做保存,利用?setNx?判斷消息是否已經處理過。如下圖:
if?(jedis.setnx(ID,?"1") ==?1) {//處理業務,返回 ACK
}else?{//直接返回 返回 ACK
}
6.總結
使用消息隊列,在一些場景下是需要防重的。主流消息隊列提供了一些防重的能力,但并不是完全可靠的。在對重復消息敏感的場景下,最好是在消費端處理消息時,從業務層面進行消息防重。