Kafka 原生不支持延遲隊列功能。而RabbitMQ、RocketMQ及Redis等其他消息隊列原生支持延遲隊列。
RabbitMQ | RocketMQ | Redis | |
實現方式 | 通過插件實現,消息進入延遲隊列后根據配置時間過濾轉發。 | 原生支持,發送消息時設置延遲級別,定時任務處理到期消息。 | 通過sorted set實現,消息按延遲時間存儲。 |
特點 | 優點:易于部署使用,支持消息重試和順序處理。 缺點:性能較低,不適合高并發場景。 | 優點:高性能、支持分布式和消息持久化。 缺點:不支持動態添加/刪除隊列。 | 優點:性能高,支持高并發。 缺點:消息未經持久化,存在丟失風險。 |
適用場景 | 中小型任務調度和消息通知。 | 大規模數據處理,高性能要求場景。 | 輕量級任務調度和短期延遲任務。 |
表 RabbitMQ、RocketMQ及Redis實現延遲方案對比
延遲隊列的其他實現方式:
1)? 數據庫 + 定時任務。
實現:將消息存儲到數據庫并記錄目標執行時間,定時任務輪詢數據庫,將到期的消息取出并消費。
優點:延遲時間精準控制,可靠性高。
缺點:依賴數據庫,性能受限。
適用場景:延遲時間不固定,消息量不大。
2) Redis ZSet方案
實現:利用Redis有序集合(ZSet),以消息的執行時間作為score,消費者定時輪詢到期消息。
優點:基于內存,性能高。
缺點:消息可能丟失。
1 Kafka + 時間輪 + 數據庫實現延遲隊列
實現:創建一個延遲隊列Topic,將需要延遲的消息發送到這里。該Topic由一個專門的消費者處理。通過時間輪將讀取的延遲隊列消息存儲并在消息的期望執行時間將消息再發送到目標Topic。
圖 Kafka + 時間輪實現延遲隊列示例圖
1.1?時間輪可靠性保證
方案的關鍵在于時間輪,時間輪是在內存中的數據結構,除了需要保證消息的準時性,還需要可靠性,即當項目重啟后,原先在時間輪中的消息不能丟失。
當消息進入時間輪時,同步將這個消息存儲到數據庫中,狀態為待完成,當這條消息被處理后,標注為已完成。
當系統重啟時,從數據庫中獲取待處理的任務,并把它們放入到時間輪中。
???????1.2?角色職責及數據結構
表 Kafka + 時間輪 + 數據庫實現延遲隊列
延遲消息:指定執行日期的消息,包含字段:id、執行日期、目標topic、狀態(待執行、執行完成、執行失敗)、目標參數。
延遲消息生產者:業務代碼中延遲任務的產出源頭。負責將延遲消息發送給Kafka的延遲隊列Topic。
延遲隊列Topic消費者:消費Kafka 延遲隊列Topic的消息,將待執行的消息放入到時間輪中,同時將其持久化到數據庫中。
時間輪:負責延遲消息的調度,將要執行的消息發送到Kafka的目標Topic中。
目標Topic消費者:消費Kafka目標Topic的消息,并根據消息的id,來更新其在數據庫中的狀態。