網址
- https://github.com/cfanbo/delay-queue-redis
代碼結構很簡單,簡單代表著自由度很高,使用過程中出現問題也很好修改。
我很喜歡這樣的代碼,至少我看的懂,該有的都有。
//package main
//
//import (
// "context"
// "fmt"
// "log"
// "time"
//
// queue "github.com/cfanbo/delay-queue-redis"
// "github.com/go-redis/redis/v8"
//)
//
//var redisClient *redis.Client
//
//type Msg struct {
// MsgId int `json:"msg_id"`
// MsgBody string `json:"body"`
// UserId int `json:"uid"`
//}
//
//func handerFunc(msg queue.Message) {
// fmt.Println("消費一條消息:=========")
// fmt.Printf("%#v\n", msg)
//
// // 轉map
// m := msg.Body.(map[string]interface{})
// fmt.Println(m["msg_id"], m["body"], m["uid"])
//}
//
//func main() {
// ctx, cancel := context.WithCancel(context.Background())
// redisClient = redis.NewClient(&redis.Options{
// Addr: "localhost:6379",
// Password: "", // no password set
// DB: 0, // use default DB
// })
//
// _, err := redisClient.Ping(ctx).Result()
// if err != nil {
// log.Fatal(err)
// }
//
// // 創建延時隊列
// q := queue.NewQueue(ctx, redisClient, queue.WithTopic("test-topic"), queue.WithHandler(handerFunc))
// q.Start()
//
// // 創建消息實體對象
// ticker := time.NewTicker(time.Second * 1)
// go func(ticker *time.Ticker) {
// defer ticker.Stop()
//
// for {
// select {
// case <-ticker.C:
// message := Msg{100, "abc", 43}
// msg := queue.NewMessage("", time.Now().Add(time.Second*8), message)
//
// // 發布
// _, err = q.Publish(msg)
// if err != nil {
// log.Fatal(err)
// }
// fmt.Println("發布成功一條消息")
// }
// }
//
// }(ticker)
//
// // 手動延時10秒后退出
// time.Sleep(time.Second * 10)
// cancel()
//}