基于Kafka的延遲隊列

實現原理

通過topic區分不同的延遲時長,每個topic對于一個延遲,比如 topic100 僅存儲延遲 100ms 的消息,topic1000 僅存儲延遲 1s 的消息,依次類推。

在這里插入圖片描述

生產消息時,消息需按延遲時長投遞到對應的topic。消費消息時,檢查消息的時間,如果未到達延遲時長,則sleep剩余的時長后再處理。這樣就簡單的實現了基于kafka的延遲隊列。死信隊列,可作為一種特殊的延遲隊列,比如延遲 3600000ms 的處理。

消費者實現

package mainimport ("context""time""github.com/IBM/sarama""github.com/sirupsen/logrus"
)// 定義每個topic對應的延遲時間(ms)
var topicDelayConfig = map[string]time.Duration{"delay-100ms":  100 * time.Millisecond,"delay-200ms":  200 * time.Millisecond,"delay-500ms":  500 * time.Millisecond,"delay-1000ms": 1000 * time.Millisecond,
}type delayConsumerHandler struct {// 可以添加必要的依賴,如業務處理器等
}func (h *delayConsumerHandler) Setup(sess sarama.ConsumerGroupSession) error {logrus.Info("延遲隊列消費者初始化完成")return nil
}func (h *delayConsumerHandler) Cleanup(sess sarama.ConsumerGroupSession) error {logrus.Info("延遲隊列消費者清理完成")return nil
}// ConsumeClaim 處理分區消息,實現延遲邏輯
func (h *delayConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {topic := claim.Topic()delay, exists := topicDelayConfig[topic]if !exists {logrus.Errorf("topic %s 未配置延遲時間,跳過消費", topic)// 標記所有消息為已消費,避免重復處理for range claim.Messages() {sess.MarkMessage(msg, "")}return nil}// 按順序處理消息(假設消息時間有序)for msg := range claim.Messages() {// 檢查會話是否已關閉(如重平衡發生)select {case <-sess.Context().Done():logrus.Info("會話已關閉,停止消費")return nildefault:}// 計算需要延遲的時間// 消息應該被處理的時間 = 消息產生時間 + 主題延遲時間produceTime := msg.TimestampprocessTime := produceTime.Add(delay)now := time.Now()// 如果當前時間未到處理時間,計算需要休眠的時間if now.Before(processTime) {sleepDuration := processTime.Sub(now)logrus.Debugf("消息需要延遲處理,topic=%s, offset=%d, 需等待 %v (產生時間: %v, 預計處理時間: %v)",topic, msg.Offset, sleepDuration, produceTime, processTime,)// 休眠期間監聽會話關閉信號,避免阻塞重平衡select {case <-sess.Context().Done():logrus.Info("休眠期間會話關閉,停止消費")return nilcase <-time.After(sleepDuration):// 休眠完成,繼續處理}}// 延遲時間已到,處理消息h.processMessage(msg)// 標記消息為已消費sess.MarkMessage(msg, "")}return nil
}// 實際業務處理邏輯
func (h *delayConsumerHandler) processMessage(msg *sarama.ConsumerMessage) {logrus.Infof("處理延遲消息,topic=%s, partition=%d, offset=%d, key=%s, value=%s, 產生時間=%v",msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp,)// 這里添加實際的業務處理代碼
}// 初始化消費者示例
func newDelayConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {config := sarama.NewConfig()config.Version = sarama.V2_8_1_0 // 指定Kafka版本config.Consumer.Return.Errors = trueconfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange// 確保消息的Timestamp是創建時間(需要Kafka broker配置支持)config.Consumer.Fetch.Min = 1config.Consumer.Fetch.Default = 1024 * 1024return sarama.NewConsumerGroup(brokers, groupID, config)
}func main() {brokers := []string{"localhost:9092"}groupID := "delay-queue-group"topics := []string{"delay-100ms", "delay-200ms", "delay-500ms", "delay-1000ms"}consumer, err := newDelayConsumer(brokers, groupID)if err != nil {logrus.Fatalf("創建消費者失敗: %v", err)}defer consumer.Close()handler := &delayConsumerHandler{}ctx := context.Background()// 持續消費for {if err := consumer.Consume(ctx, topics, handler); err != nil {logrus.Errorf("消費出錯: %v", err)// 簡單重試邏輯time.Sleep(5 * time.Second)}}
}

生產者實現

package mainimport ("errors""time""github.com/IBM/sarama""github.com/sirupsen/logrus"
)// 定義允許的延遲時長(毫秒)及其對應的Topic
var allowedDelays = map[time.Duration]string{100 * time.Millisecond:  "delay-100ms",200 * time.Millisecond:  "delay-200ms",500 * time.Millisecond:  "delay-500ms",1000 * time.Millisecond: "delay-1000ms",// 可根據需要添加更多允許的延遲時長
}// DelayProducer 延遲消息生產者
type DelayProducer struct {producer sarama.SyncProducer
}// NewDelayProducer 創建延遲消息生產者實例
func NewDelayProducer(brokers []string) (*DelayProducer, error) {config := sarama.NewConfig()config.Version = sarama.V2_8_1_0 // 匹配Kafka版本config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 3config.Producer.Return.Successes = trueproducer, err := sarama.NewSyncProducer(brokers, config)if err != nil {return nil, err}return &DelayProducer{producer: producer,}, nil
}// SendDelayMessage 發送延遲消息
// 參數:
//   - key: 消息鍵
//   - value: 消息內容
//   - delay: 延遲時長
// 返回:
//   - 消息的分區和偏移量
//   - 錯誤信息(若延遲不合法或發送失敗)
func (p *DelayProducer) SendDelayMessage(key, value []byte, delay time.Duration) (partition int32, offset int64, err error) {// 1. 校驗延遲時長是否合法topic, ok := allowedDelays[delay]if !ok {return 0, 0, errors.New("invalid delay duration, allowed values are: 100ms, 200ms, 500ms, 1000ms")}// 2. 創建消息,設置當前時間為消息時間戳(供消費者計算延遲)msg := &sarama.ProducerMessage{Topic:     topic,Key:       sarama.ByteEncoder(key),Value:     sarama.ByteEncoder(value),Timestamp: time.Now(), // 記錄消息發送時間,用于消費者計算處理時間}// 3. 發送消息partition, offset, err = p.producer.SendMessage(msg)if err != nil {logrus.Errorf("發送延遲消息失敗: %v, 延遲時長: %v", err, delay)return 0, 0, err}logrus.Infof("發送延遲消息成功, topic: %s, 分區: %d, 偏移量: %d, 延遲時長: %v",topic, partition, offset, delay)return partition, offset, nil
}// Close 關閉生產者
func (p *DelayProducer) Close() error {return p.producer.Close()
}// 使用示例
func main() {// 初始化生產者producer, err := NewDelayProducer([]string{"localhost:9092"})if err != nil {logrus.Fatalf("初始化生產者失敗: %v", err)}defer producer.Close()// 發送合法延遲消息_, _, err = producer.SendDelayMessage([]byte("test-key"),[]byte("這是一條延遲消息"),100*time.Millisecond, // 合法延遲)if err != nil {logrus.Error("發送消息失敗:", err)}// 嘗試發送非法延遲消息(會被拒絕)_, _, err = producer.SendDelayMessage([]byte("test-key"),[]byte("這是一條非法延遲消息"),300*time.Millisecond, // 不允許的延遲)if err != nil {logrus.Error("發送消息失敗:", err) // 會輸出非法延遲的錯誤}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/920797.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/920797.shtml
英文地址,請注明出處:http://en.pswp.cn/news/920797.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

LabVIEW轉速儀校準系統

LabVIEW 與機器視覺的智能校準系統以工控機為核心&#xff0c;整合標準源、智能相機等硬件&#xff0c;通過軟件實現校準流程自動化&#xff0c;支持 500-6000r/min 轉速范圍校準&#xff0c;覆蓋 5 類轉速測量儀&#xff0c;校準時間縮短約 70%&#xff0c;滿足計量院高效、精…

Synchronized 概述

1. 初識 synchronized 是 Java 中的關鍵字&#xff0c;是一種 同步鎖 &#xff0c;可重入鎖&#xff0c;悲觀鎖。它修飾的對象有以下幾種&#xff1a; 具體表現為以下3種形式。 對于普通同步方法&#xff0c;鎖是當前實例對象。 對于靜態同步方法&#xff0c;鎖是當前類的 Clas…

通過Auth.log來查看VPS服務器是否被掃描和暴力破解及解決辦法

說明&#xff1a;很多人vps可能出現過被掃的情況&#xff0c;有的還被爆破了&#xff0c;這里提供下查看方法 查看用密碼登陸成功的IP地址及次數grep "Accepted password for root" /var/log/auth.log | awk {print $11} | sort | uniq -c | sort -nr | more查看用密…

碰一碰發視頻手機版源碼開發:支持OEM

**從事開發 20 年&#xff0c;見過不少技術風口起起落落&#xff0c;最近 “碰一碰發視頻” 又成了熱門話題。不少同行或剛入行的年輕人來問我&#xff0c;手機版源碼開發該從哪下手&#xff0c;怕踩坑、怕走彎路。今天就以一個老程序員的視角&#xff0c;把碰一碰發視頻手機版…

只出現一次的數字(總結)

提示&#xff1a;文章寫完后&#xff0c;目錄可以自動生成&#xff0c;如何生成可參考右邊的幫助文檔 文章目錄前言一、給定一個整數數組nums&#xff0c;除了某個元素只出現一次以外&#xff0c;其余元素均出現兩次。找出那個只出現一次的元素二、給你一個整數數組nums&#x…

Cesium 入門教程(十一):Camera相機功能展示

文章目錄一&#xff0c;Cesium 實際示例&#xff08;含源代碼&#xff09;1&#xff0c;vuecesium&#xff1a; 圍繞一個固定點自動左右旋轉2&#xff0c;vuecesium&#xff1a; flyto一個具體的實體位置3&#xff0c;vuecesium&#xff1a; flyto一個具體的點位置4&#xff0c…

go語言基本排序算法

package mainimport "fmt"func main() {BubbleSort()SelectSort()InsertSort()MergeSort()QuickSort()HeapSort()ShellSort() }//冒泡排序 func BubbleSort() {str : []int{9, 1, 5, 8, 3, 7, 4, 6, 2}for i : 0; i < len(str)-1; i {flag : falsefor j : len(str…

一步完成CalDAV賬戶同步,日歷服務助力釘釘日歷日程集中管理

在信息爆炸節奏飛快的今天&#xff0c;高效的管理時間已經成為我們工作和生活中的核心競爭力&#xff0c;復雜紛繁的日程安排&#xff0c;無處不在的提醒需求以及跨設備同步的困擾&#xff0c;這些問題仿佛都在呼喚著一個更智能、更便捷、更可靠的解決方案。 而華為日歷App&am…

企業內部機密視頻安全保護|如何防止企業內部機密視頻泄露?

在企業數字化進程飛速發展的今天&#xff0c;視頻內容已成為承載企業內部培訓、戰略會議、產品機密和核心技術的關鍵載體。一次意外的泄露&#xff0c;不僅可能導致知識產權流失&#xff0c;更會讓企業聲譽和市場競爭力遭受重創。面對無孔不入的安全威脅&#xff0c;企業該如何…

C# Deconstruct | 簡化元組與對象的數據提取

官方文檔&#xff1a;析構元組和其他類型 - C# | Microsoft Learn 標簽&#xff1a;Deconstruct、Tuple、record、模式匹配 PS&#xff1a;record相關內容后續還會繼續更新&#x1f504; 模式匹配可以查看我的另一篇&#x1f449;模式匹配 目錄1. 概述2. 基本用法2.1 元組解…

R 語言 ComplexUpset 包實戰:替代 Venn 圖的高級集合可視化方案

摘要 在生物信息學、數據挖掘等領域的集合分析中,傳統 Venn 圖在多維度數據展示時存在信息擁擠、可讀性差等問題。本文基于 R 語言的 ComplexUpset 包,以基因表達研究為場景,從包安裝、數據準備到可視化實現,完整演示如何制作正刊級別的集合交集圖,解決多條件下差異基因(…

?導游|基于SprinBoot+vue的在線預約導游系統

在線預約導游系統 基于SprinBootvue的在線預約導游系統 一、前言 二、系統設計 三、系統功能設計 前臺功能實現 后臺功能實現 管理員模塊實現 導游模塊實現 用戶模塊實現 四、數據庫設計 五、核心代碼 六、論文參考 七、最新計算機畢設選題推薦 八、源碼獲取&am…

SQL server 異常 出現錯誤 824

2025-08-27 01:36:37,324 ERROR c.z.i.w.DatabaseUtils [Scheduled-7] Error executeStoredProcedure SQL script: sp_RefreshDWDByDateFive警告: 在 08 27 2025 1:36AM 出現錯誤 824。請記錄該錯誤和時間&#xff0c;并與您的系統管理員聯系。 2025-08-27 01:36:37,332 ERROR …

制造業生產線連貫性動作識別系統開發

制造業生產線連貫性動作識別系統開發 第一部分&#xff1a;項目概述與理論基礎 1.1 項目背景與意義 在現代智能制造環境中&#xff0c;盡管自動化程度不斷提高&#xff0c;但人工操作仍然在復雜裝配任務中扮演著不可替代的角色。研究表明&#xff0c;人機協作被視為打破傳統人機…

什么是Jmeter? Jmeter工作原理是什么?

&#x1f345; 點擊文末小卡片&#xff0c;免費獲取軟件測試全套資料&#xff0c;資料在手&#xff0c;漲薪更快 第一篇 什么是 JMeter&#xff1f;JMeter 工作原理 1.1 什么是 JMeter Apache JMeter 是 Apache 組織開發的基于 Java 的壓力測試工具。用于對軟件做壓力測試&a…

Linux網絡基礎1(一)之計算機網絡背景

文章目錄計算機網絡背景網絡發展認識 "協議"高小琴例子方言例子計算機網絡背景 網絡發展 獨立模式: 計算機之間相互獨立; 網絡互聯: 多臺計算機連接在一起, 完成數據共享; 局域網LAN: 計算機數量更多了, 通過交換機和路由器連接在一起; 廣域網WAN: 將遠隔千里的計算…

如何在數學建模賽中實現模型創新?

模型創新性在國賽數學建模中&#xff0c;完備性是論文的基本要求&#xff0c;而創新性則是決定論文能否脫穎而出的關鍵因素。所謂創新&#xff0c;并不僅僅指提出完全新穎的數學理論&#xff0c;而是能夠在已有方法的基礎上&#xff0c;通過新的問題切入點、假設修正、模型優化…

【重磅發布】flutter_chen_updater-版本升級更新

Flutter Chen Updater 一個功能強大的Flutter應用內更新插件&#xff0c;支持Android APK自動下載、安裝和iOS跳轉App Store。 ? 特性 ? 跨平臺支持: Android APK自動更新&#xff0c;iOS跳轉App Store? 智能下載: 支持斷點續傳、文件校驗、多重備用方案? 權限管理: 自動處…

docker 1分鐘 快速搭建 redis 哨兵集群

使用 docker-compose 1 分鐘搭建好 1主2從3哨兵的 redis 哨兵集群 目錄結構 redis-sentinel-cluster ├── check_redis.sh ├── docker-compose.yml ├── redis │ └── redis.conf ├── sentinel │ └── sentinel.confdocker-compose.yml 配置 version: 3…

Git與DevOps實戰:從版本控制到自動化部署

一、版本控制1.什么是版本控制&#xff1f;版本控制用于高效追蹤和管理項目開發中的代碼、配置及文檔變更歷史&#xff0c;確保團隊成員始終使用正確版本&#xff0c;并支持版本回溯、差異比較和文件恢復。它能帶來以下優勢&#xff1a;通過歷史記錄保障數據安全與完整性&#…