實現原理
通過topic區分不同的延遲時長,每個topic對于一個延遲,比如 topic100 僅存儲延遲 100ms 的消息,topic1000 僅存儲延遲 1s 的消息,依次類推。
生產消息時,消息需按延遲時長投遞到對應的topic。消費消息時,檢查消息的時間,如果未到達延遲時長,則sleep剩余的時長后再處理。這樣就簡單的實現了基于kafka的延遲隊列。死信隊列,可作為一種特殊的延遲隊列,比如延遲 3600000ms 的處理。
消費者實現
package mainimport ("context""time""github.com/IBM/sarama""github.com/sirupsen/logrus"
)// 定義每個topic對應的延遲時間(ms)
var topicDelayConfig = map[string]time.Duration{"delay-100ms": 100 * time.Millisecond,"delay-200ms": 200 * time.Millisecond,"delay-500ms": 500 * time.Millisecond,"delay-1000ms": 1000 * time.Millisecond,
}type delayConsumerHandler struct {// 可以添加必要的依賴,如業務處理器等
}func (h *delayConsumerHandler) Setup(sess sarama.ConsumerGroupSession) error {logrus.Info("延遲隊列消費者初始化完成")return nil
}func (h *delayConsumerHandler) Cleanup(sess sarama.ConsumerGroupSession) error {logrus.Info("延遲隊列消費者清理完成")return nil
}// ConsumeClaim 處理分區消息,實現延遲邏輯
func (h *delayConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {topic := claim.Topic()delay, exists := topicDelayConfig[topic]if !exists {logrus.Errorf("topic %s 未配置延遲時間,跳過消費", topic)// 標記所有消息為已消費,避免重復處理for range claim.Messages() {sess.MarkMessage(msg, "")}return nil}// 按順序處理消息(假設消息時間有序)for msg := range claim.Messages() {// 檢查會話是否已關閉(如重平衡發生)select {case <-sess.Context().Done():logrus.Info("會話已關閉,停止消費")return nildefault:}// 計算需要延遲的時間// 消息應該被處理的時間 = 消息產生時間 + 主題延遲時間produceTime := msg.TimestampprocessTime := produceTime.Add(delay)now := time.Now()// 如果當前時間未到處理時間,計算需要休眠的時間if now.Before(processTime) {sleepDuration := processTime.Sub(now)logrus.Debugf("消息需要延遲處理,topic=%s, offset=%d, 需等待 %v (產生時間: %v, 預計處理時間: %v)",topic, msg.Offset, sleepDuration, produceTime, processTime,)// 休眠期間監聽會話關閉信號,避免阻塞重平衡select {case <-sess.Context().Done():logrus.Info("休眠期間會話關閉,停止消費")return nilcase <-time.After(sleepDuration):// 休眠完成,繼續處理}}// 延遲時間已到,處理消息h.processMessage(msg)// 標記消息為已消費sess.MarkMessage(msg, "")}return nil
}// 實際業務處理邏輯
func (h *delayConsumerHandler) processMessage(msg *sarama.ConsumerMessage) {logrus.Infof("處理延遲消息,topic=%s, partition=%d, offset=%d, key=%s, value=%s, 產生時間=%v",msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp,)// 這里添加實際的業務處理代碼
}// 初始化消費者示例
func newDelayConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {config := sarama.NewConfig()config.Version = sarama.V2_8_1_0 // 指定Kafka版本config.Consumer.Return.Errors = trueconfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange// 確保消息的Timestamp是創建時間(需要Kafka broker配置支持)config.Consumer.Fetch.Min = 1config.Consumer.Fetch.Default = 1024 * 1024return sarama.NewConsumerGroup(brokers, groupID, config)
}func main() {brokers := []string{"localhost:9092"}groupID := "delay-queue-group"topics := []string{"delay-100ms", "delay-200ms", "delay-500ms", "delay-1000ms"}consumer, err := newDelayConsumer(brokers, groupID)if err != nil {logrus.Fatalf("創建消費者失敗: %v", err)}defer consumer.Close()handler := &delayConsumerHandler{}ctx := context.Background()// 持續消費for {if err := consumer.Consume(ctx, topics, handler); err != nil {logrus.Errorf("消費出錯: %v", err)// 簡單重試邏輯time.Sleep(5 * time.Second)}}
}
生產者實現
package mainimport ("errors""time""github.com/IBM/sarama""github.com/sirupsen/logrus"
)// 定義允許的延遲時長(毫秒)及其對應的Topic
var allowedDelays = map[time.Duration]string{100 * time.Millisecond: "delay-100ms",200 * time.Millisecond: "delay-200ms",500 * time.Millisecond: "delay-500ms",1000 * time.Millisecond: "delay-1000ms",// 可根據需要添加更多允許的延遲時長
}// DelayProducer 延遲消息生產者
type DelayProducer struct {producer sarama.SyncProducer
}// NewDelayProducer 創建延遲消息生產者實例
func NewDelayProducer(brokers []string) (*DelayProducer, error) {config := sarama.NewConfig()config.Version = sarama.V2_8_1_0 // 匹配Kafka版本config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 3config.Producer.Return.Successes = trueproducer, err := sarama.NewSyncProducer(brokers, config)if err != nil {return nil, err}return &DelayProducer{producer: producer,}, nil
}// SendDelayMessage 發送延遲消息
// 參數:
// - key: 消息鍵
// - value: 消息內容
// - delay: 延遲時長
// 返回:
// - 消息的分區和偏移量
// - 錯誤信息(若延遲不合法或發送失敗)
func (p *DelayProducer) SendDelayMessage(key, value []byte, delay time.Duration) (partition int32, offset int64, err error) {// 1. 校驗延遲時長是否合法topic, ok := allowedDelays[delay]if !ok {return 0, 0, errors.New("invalid delay duration, allowed values are: 100ms, 200ms, 500ms, 1000ms")}// 2. 創建消息,設置當前時間為消息時間戳(供消費者計算延遲)msg := &sarama.ProducerMessage{Topic: topic,Key: sarama.ByteEncoder(key),Value: sarama.ByteEncoder(value),Timestamp: time.Now(), // 記錄消息發送時間,用于消費者計算處理時間}// 3. 發送消息partition, offset, err = p.producer.SendMessage(msg)if err != nil {logrus.Errorf("發送延遲消息失敗: %v, 延遲時長: %v", err, delay)return 0, 0, err}logrus.Infof("發送延遲消息成功, topic: %s, 分區: %d, 偏移量: %d, 延遲時長: %v",topic, partition, offset, delay)return partition, offset, nil
}// Close 關閉生產者
func (p *DelayProducer) Close() error {return p.producer.Close()
}// 使用示例
func main() {// 初始化生產者producer, err := NewDelayProducer([]string{"localhost:9092"})if err != nil {logrus.Fatalf("初始化生產者失敗: %v", err)}defer producer.Close()// 發送合法延遲消息_, _, err = producer.SendDelayMessage([]byte("test-key"),[]byte("這是一條延遲消息"),100*time.Millisecond, // 合法延遲)if err != nil {logrus.Error("發送消息失敗:", err)}// 嘗試發送非法延遲消息(會被拒絕)_, _, err = producer.SendDelayMessage([]byte("test-key"),[]byte("這是一條非法延遲消息"),300*time.Millisecond, // 不允許的延遲)if err != nil {logrus.Error("發送消息失敗:", err) // 會輸出非法延遲的錯誤}
}