如何防止消息丟失
- 生產者:
- 使用同步發送
- 把ack設成1或者all(非0,0可能會出現消息丟失的情況),并且設置同步的分區數>=2
- 消費者:把自動提交改成手動提交
如何防止重復消費
在防止消息丟失的方案中,如果生產者發送完消息后,因為網絡抖動,沒有收到ack,但實際上broker已經收到了。此時生產者會進行重試,于是broker就會收到多條相同的消息,而造成消費者的重復消費。
如何解決:
- 生產者關閉重試:雖不會發送相同消息,但會造成丟消息(不建議)【
同步發送消息并且開啟重試,ack設置為1或者all
】 消費者解決非冪等性消費問題
:
所謂的冪等性:多次訪問的結果是?樣的
。對于rest的請求(get(冪等)、post(非冪等)、put(冪等)、delete(冪等))
解決方案:
- 在數據庫中創建
聯合主鍵
,防止相同的主鍵創建出多條記錄 - 使用
分布式鎖
,以業務id為鎖。保證只有?條記錄能夠創建成功(setnx
)
如何做到消息的順序消費(效率不高,RocketMQ)
- 生產者:保證消息按順序發送,且消息不丟失——使用同步的發送,ack設置成非0的值。
- 消費者:
主題只能設置?個分區,消費組中只能有一個消費者
【消費者只能限制單partition順序消費,這種效率不高】
kafka的順序消費使用場景不多,因為犧牲掉了性能,但是比如rocketmq在這?塊有專門的功能已設計好。
如何解決消息積壓問題
積壓的消息越多,消費者消費越慢(尋址越來越慢),越慢積壓越多,死循環,導致整個kafka集群磁盤IO都很慢導致多個服務不可用
1.消息積壓問題的出現
消費者消費消息速度遠趕不上生產者生產消息的速度,導致kafka中有大量的數據沒有被消費。隨著沒有被消費的數據堆積越多,消費者尋址的性能會越來越差,最后導致整個kafka對外提供的服務的性能很差,從而造成其他服務也訪問速度變慢,造成服務雪崩。
2.消息積壓的解決方案
- 消費者中,使用
多線程
,充分利用機器的性能進行消費消息。 - 通過業務的架構設計,提升業務層面消費的性能。
- 創建多個消費組,
多個消費者
,部署到其他機器上,?起消費,提高消費者的消費速度 - 創建?個消費者,該消費者在kafka另建?個主題,配上多個分區,多個分區再配上多個消費者。該消費者將poll下來的消息,不進行消費,直接轉發到新建的主題上。此時,新的主題的多個分區的多個消費者就開始?起消費了。——不常用
實現延時隊列的效果(實現比較費勁,RabbitMQ)
1.應用場景
訂單創建后,超過30分鐘沒有?付,則需要取消訂單,這種場景可以通過延時隊列來實現
2.具體方案
- kafka中創建相應的主題,每個topic表示延時的間隔
- topic_5s: 延時5s執行的隊列
- topic_1m: 延時1分鐘執行的隊列
- topic_30m: 延時30分鐘執行的隊列
- 消息發送者發送消息到相應的topic,并帶上消息的發送時間
- 消費者訂閱相應的topic,消費該主題的消息(輪詢)
- 消費者消費消息時判斷消息的創建時間和當前時間是否超過30分鐘(前提是訂單沒支付)
- 如果是:去數據庫中修改訂單狀態為已取消
- 如果否:記錄當前消息的offset,并不再繼續消費之后的消息。等待1分鐘后,再次向kafka拉取該offset及之后的消息,繼續進行判斷,以此反復。