golang實現延遲隊列
1 延遲隊列:郵件提醒、訂單自動取消
延遲隊列:處理需要在未來某個特定時間執行的任務。這些任務被添加到隊列中,并且指定了一個執行時間,只有達到指定的時間點時才能從隊列中取出并執行。
應用場景:
- 郵件提醒
- 訂單自動取消(超過多少時間未支付,就取消訂單)
- 對超時任務的處理等
由于任務的執行是在未來的某個時間點,因此這些任務不會立即執行,而是存儲在隊列中,直到它的預定執行時間才會被執行。
2 實現
2.1 simple簡單版:go自帶的time包實現
思路:
- 定義Task結構體,包含
- ExecuteTime time.Time
- Job func()
- 定義DelayQueue
- TaskQueue []Task
- func AddTask
- func RemoveTask
- ExecuteTask
這種方案存在的問題:
Go程序重啟時,存儲在slice中的延遲處理任務將全部丟失
完整代碼:
package mainimport ("fmt""time"
)/*
基于go實現延遲隊列
*/
type Task struct {ExecuteTime time.TimeJob func()
}type DelayQueue struct {Tasks []*Task
}func (d *DelayQueue) AddTask(t *Task) {d.Tasks = append(d.Tasks, t)
}func (d *DelayQueue) RemoveTask() {//FIFO: remove the first task to enqueued.Tasks = d.Tasks[1:]
}func (d *DelayQueue) ExecuteTask() {for len(d.Tasks) > 0 {//dequeue a taskcurrentTask := d.Tasks[0]if time.Now().Before(currentTask.ExecuteTime) {//if the task execution time is not up, waittime.Sleep(currentTask.ExecuteTime.Sub(time.Now()))}//execute the taskcurrentTask.Job()//remove task who has been executedd.RemoveTask()}}func main() {fmt.Println("start delayQueue")delayQueue := &DelayQueue{}firstTask := &Task{ExecuteTime: time.Now().Add(time.Second * 1),Job: func() {fmt.Println("executed task 1 after delay")},}delayQueue.AddTask(firstTask)secondTask := &Task{ExecuteTime: time.Now().Add(time.Second * 7),Job: func() {fmt.Println("executed task 2 after delay")},}delayQueue.AddTask(secondTask)delayQueue.ExecuteTask()fmt.Println("all tasks have been done!!!")
}
效果:
2.2 complex持久版:go+redis
為了防止Go重啟后存儲到delayQueue的數據丟失,我們可以將任務持久化到redis中。
思路:
- 初始化redis連接
- 延遲隊列采用redis的zset(有序集合)實現
前置準備:
# 安裝docker
yum install -y yum-utils
yum-config-manager \--add-repo \https://download.docker.com/linux/centos/docker-ce.repo
yum install docker
systemctl start docker# docker搭建redis
mkdir -p /Users/ziyi2/docker-home/redis
docker run -d --name redis -v /Users/ziyi2/docker-home/redis:/data -p 6379:6379 redis
完整代碼:
package mainimport ("fmt""github.com/go-redis/redis"log "github.com/ziyifast/log""time"
)/*
基于redis zset實現延遲隊列
*/
var redisdb *redis.Client
var DelayQueueKey = "delay-queue"func initClient() (err error) {redisdb = redis.NewClient(&redis.Options{Addr: "localhost:6379",Password: "", // not set passwordDB: 0, //use default db})_, err = redisdb.Ping().Result()if err != nil {log.Errorf("%v", err)return err}return nil
}func main() {err := initClient()if err != nil {log.Errorf("init redis client err: %v", err)return}addTaskToQueue("task1", time.Now().Add(time.Second*3).Unix())addTaskToQueue("task2", time.Now().Add(time.Second*8).Unix())//執行隊列中的任務getAndExecuteTask()
}// executeTime為unix時間戳,作為zset中的score。允許redis按照task應該執行時間來進行排序
func addTaskToQueue(task string, executeTime int64) {err := redisdb.ZAdd(DelayQueueKey, redis.Z{Score: float64(executeTime),Member: task,}).Err()if err != nil {panic(err)}
}// 從redis中取一個task并執行
func getAndExecuteTask() {for {tasks, err := redisdb.ZRangeByScore(DelayQueueKey, redis.ZRangeBy{Min: "-inf",Max: fmt.Sprintf("%d", time.Now().Unix()),Offset: 0,Count: 1,}).Result()if err != nil {time.Sleep(time.Second * 1)continue}//處理任務for _, task := range tasks {fmt.Println("Execute task: ", task)//執行完任務之后用 ZREM 移除該任務redisdb.ZRem(DelayQueueKey, task)}time.Sleep(time.Second * 1)}
}
效果:
redis一直從延遲隊列中取數據,如果處理完一批則睡眠1s
- 具體根據大家的業務調整,此處主要介紹思路