原理
通過用一個goroutine以及堆來存儲要待調度的延遲任務,當達到調度時間后,將其添加到協程池中去執行。
主要是使用了chan、Mutex、atomic及ants協程池來實現。
用途
主要是用于高并發及大量定時任務要處理的情況,如果使用Go協程來實現每次延遲任務的調度,那么數量極大的goroutine將會占用內存,導致性能下降,使用協程池實現延遲任務的調度,會改善該情況。
如在物聯網設備中,當連接數量達到幾十萬時,如果使用goroutine來處理心跳或者活躍檢測,頻繁的創建銷毀goroutine會影響性能。
特色
在常見的cron等開源框架中使用的是數組存儲待調度的任務,每次循環時都要排序,并且要刪除某個任務則時間復雜度是O(n)。
本文通過使用堆及雙重Map優化存儲待調度的任務,使得添加任務時間復雜度為O(log n),獲取任務時間復雜度為O(1),刪除時間復雜度為O(1)。
調度器并不會真正的刪除取消任務,當取消任務達到執行時間時,會直接continue,是為了提高刪除效率,如果要刪除取消任務,那么刪除的時間復雜度為O(log n),當有極大量任務時,會占用一些內存,通過空間換時間來提高刪除效率,下文也提供了刪除取消任務的實現,根據不同的場景使用不同的定時任務。
API
創建
NewSchedule(workerNum int, options ...ants.Option) (*Schedule, error) //創建協程數是1的延遲任務調度器
s, _ := NewSchedule(1)
創建一個延遲調度任務器,workerNum是協程數量,options是ants協程池的配置,除了WithMaxBlockingTasks不能配置,別的都可以,具體參考:https://github.com/panjf2000/ants
調度一次
func (s *Schedule) ScheduleOne(job func(), duration time.Duration) (TaskId, error) //1秒后打印一次時間
taskId, _ := s.ScheduleOne(func() {fmt.Println(time.Now())
}, time.Second)
重復調度
func (s *Schedule) Schedule(job func(), duration time.Duration) (TaskId, error) //每隔一秒打印一次時間
taskId, _ := s.Schedule(func() {fmt.Println(time.Now())
}, time.Second)
取消調度
func (s *Schedule) Schedule(job func(), duration time.Duration) (TaskId, error) //每隔一秒打印一次時間
taskId, _ := s.Schedule(func() {fmt.Println(time.Now())
}, time.Second)
//休眠3秒后,取消調度
time.Sleep(3 * time.Second)
s.CancelTask(taskId)
停止調度
func (s *Schedule) Schedule(job func(), duration time.Duration) (TaskId, error) //每隔一秒打印一次時間
taskId, _ := s.Schedule(func() {fmt.Println(time.Now())
}, time.Second)
//休眠3秒后,停用延遲任務調度器
time.Sleep(3 * time.Second)
s.Shutdown()
代碼
package scheduleimport ("container/heap""errors""github.com/panjf2000/ants/v2""math""sync/atomic""time"
)var (// ErrScheduleShutdown 延遲任務調度器已關閉錯誤ErrScheduleShutdown = errors.New("schedule: schedule is already in shutdown")
)const invalidTaskId = 0type TaskId uint32
type OriginalTaskId uint32// Schedule 延遲調度的結構體,提供延遲調度任務的全部方法
// 通過NewSchedule方法創建Schedule,通過Schedule、ScheduleOne方法添加延遲調度任務,通過CancelTask方法取消任務,通過Shutdown停止延遲任務
type Schedule struct {//任務堆,按時間排序taskHeap taskHeap//可執行的任務Map,key是當前的任務id,value是任務的第一次原始id,用于優化取消任務時需要遍歷堆去刪除executeTaskIdMap map[TaskId]OriginalTaskId//任務id的Map,key是任務的第一次原始id,value是當前的任務id,用于優化取消任務時需要遍歷堆去刪除originalTaskIdMap map[OriginalTaskId]TaskId//調度器是否運行中running atomic.Bool//下一個任務idnextTaskId atomic.Uint32//任務運行池pool *ants.Pool//添加任務ChanaddTaskChan chan *Task//刪除任務ChanstopTaskChan chan struct{}//取消任務ChancancelTaskChan chan OriginalTaskId
}// NewSchedule 構建一個Schedule
// workerNum 工作的協程數量,options ants協程池的配置,除了WithMaxBlockingTasks不能配置,別的都可以,具體參考:https://github.com/panjf2000/ants
func NewSchedule(workerNum int, options ...ants.Option) (*Schedule, error) {//延遲任務的最大任務數量必須不限制options = append(options, ants.WithMaxBlockingTasks(0))//創建一個協程池pool, err := ants.NewPool(workerNum)if err != nil {return nil, err}//創建一個延遲調度結構體s := &Schedule{taskHeap: make(taskHeap, 0),executeTaskIdMap: make(map[TaskId]OriginalTaskId),originalTaskIdMap: make(map[OriginalTaskId]TaskId),running: atomic.Bool{},nextTaskId: atomic.Uint32{},pool: pool,addTaskChan: make(chan *Task),stopTaskChan: make(chan struct{}),cancelTaskChan: make(chan OriginalTaskId),}//啟動調度 會開啟一個協程去將即將要調度的任務添加到協程池中運行s.start()return s, nil
}// ScheduleOne 添加延遲調度任務,只調度一次
// job 執行的方法 duration 周期間隔,如果是負數立馬執行,如果是負數立馬且只執行一次
func (s *Schedule) ScheduleOne(job func(), duration time.Duration) (uint32, error) {return s.doSchedule(job, duration, true)
}// Schedule 添加延遲調度任務,重復調度
// job 執行的方法 duration 周期間隔,如果是負數立馬且只執行一次
func (s *Schedule) Schedule(job func(), duration time.Duration) (uint32, error) {return s.doSchedule(job, duration, false)
}// doSchedule 添加延遲調度任務的具體實現
func (s *Schedule) doSchedule(job func(), duration time.Duration, onlyOne bool) (uint32, error) {if s.running.Load() {//如果是負數 只執行一次if duration <= 0 {onlyOne = true}nextTaskId := s.getNextTaskId()task := new(Task)task.job = jobtask.executeTime = time.Now().Add(duration)task.onlyOne = onlyOnetask.duration = durationtask.originalId = OriginalTaskId(nextTaskId)task.id = TaskId(nextTaskId)s.addTaskChan <- taskreturn uint32(task.originalId), nil} else {return invalidTaskId, ErrScheduleShutdown}
}// CancelTask 取消延遲調度任務
// taskId 任務id
func (s *Schedule) CancelTask(taskId uint32) {if s.running.Load() {if taskId != invalidTaskId {s.cancelTaskChan <- OriginalTaskId(taskId)}}
}// Shutdown 結束延遲任務調度
func (s *Schedule) Shutdown() {//通過cas設值if s.running.CompareAndSwap(true, false) {s.stopTaskChan <- struct{}{}}
}// IsShutdown 延遲任務調度是否關閉
func (s *Schedule) IsShutdown() bool {return !s.running.Load()
}// start 啟動延遲任務調度
func (s *Schedule) start() {s.running.Store(true)go func() {for {now := time.Now()var timer *time.Timer//如果沒有任務提交,睡眠等待任務if s.taskHeap.Len() == 0 {timer = time.NewTimer(math.MaxUint16 * time.Hour)} else {//查看第一個要執行的任務是否是被取消的task := s.taskHeap.Peek()_, ok := s.executeTaskIdMap[task.id]if !ok {//是被取消的任務,移除后continueheap.Pop(&s.taskHeap)continue} else {//設置執行間隔timer = time.NewTimer(task.executeTime.Sub(now))}}select {case <-timer.C://到達第一個任務執行時間task := heap.Pop(&s.taskHeap).(*Task)//提交到線程池執行,返回的error不需要處理,因為任務池是無限大_ = s.pool.Submit(task.job)//單次執行則刪除,多次執行,則更新if task.onlyOne {s.removeTask(task.originalId, task.id)} else {s.updateTask(task)}case originalTaskId := <-s.cancelTaskChan:timer.Stop()//如果取消的任務id在待執行任務列表中,則刪除任務if taskId, ok := s.originalTaskIdMap[originalTaskId]; ok {s.removeTask(originalTaskId, taskId)}case task := <-s.addTaskChan:timer.Stop()//添加任務s.addTask(task)case <-s.stopTaskChan:timer.Stop()//關閉資源s.close()return}}}()
}// updateTask 更新延遲調度任務
func (s *Schedule) updateTask(executedTask *Task) {//拷貝 并設置新的執行時間和IDtask := *executedTasktask.executeTime = time.Now().Add(task.duration)nextTaskId := s.getNextTaskId()task.id = TaskId(nextTaskId)//把已執行的任務刪除s.removeTask(invalidTaskId, executedTask.id)//添加新的任務s.addTask(&task)
}// removeTask 移除任務
func (s *Schedule) removeTask(originalTaskId OriginalTaskId, taskId TaskId) {//如果原始的任務ID不為空,則為使用者取消的,從任務Map中也刪除if originalTaskId != invalidTaskId {delete(s.originalTaskIdMap, originalTaskId)}delete(s.executeTaskIdMap, taskId)
}// addTask 添加任務
func (s *Schedule) addTask(task *Task) {s.originalTaskIdMap[task.originalId] = task.ids.executeTaskIdMap[task.id] = task.originalIdheap.Push(&s.taskHeap, task)
}// getNextTaskId 獲取下一個任務id
func (s *Schedule) getNextTaskId() uint32 {taskId := s.nextTaskId.Add(1)if taskId == invalidTaskId {taskId = s.nextTaskId.Add(1)}return taskId
}// close 關閉Schedule資源和協程池的資源
func (s *Schedule) close() {//關閉所有資源并設置為 nil help gcs.taskHeap = nils.executeTaskIdMap = nils.originalTaskIdMap = nils.pool.Release()s.pool = nilclose(s.addTaskChan)close(s.cancelTaskChan)close(s.stopTaskChan)s.addTaskChan = nils.cancelTaskChan = nils.stopTaskChan = nil
}// Task 調度任務結構體,是一個調度任務的實體信息
type Task struct {// 原始id,用于Schedule本身的刪除使用,用兩層Map的方式優化數組刪除的O(n)時間復雜度originalId OriginalTaskId// 任務idid TaskId// 執行的時間,每次執行完,如果重復調度就重新計算executeTime time.Time// 周期間隔duration time.Duration// 執行的任務job func()// 是否只執行一次onlyOne bool
}// 任務的堆,使用隊只需要在添加的時候進行排序,堆頂是最先要執行的任務
type taskHeap []*Task// 下面都是堆接口的實現func (t *taskHeap) Len() int {return len(*t)
}
func (t *taskHeap) Less(i, j int) bool {return (*t)[i].executeTime.Before((*t)[j].executeTime)
}func (t *taskHeap) Swap(i, j int) {(*t)[i], (*t)[j] = (*t)[j], (*t)[i]
}func (t *taskHeap) Push(x interface{}) {*t = append(*t, x.(*Task))
}func (t *taskHeap) Pop() interface{} {old := *tn := len(old)x := old[n-1]old[n-1] = nil*t = old[:n-1]return x
}// Peek 查看堆頂元素,非堆接口的實現
func (t *taskHeap) Peek() *Task {return (*t)[0]
}
代碼加上詳細的中文注解,大約300行。
github地址:
https://github.com/xzc-coder/go-schedule
另一個版本的實現,刪除時間復雜度為:O(log n),相對上文中的實現,占用的內存會少,但是刪除效率會變低。
package scheduleimport ("container/heap""errors""github.com/panjf2000/ants/v2""math""sync/atomic""time"
)var (// ErrScheduleShutdown 延遲任務調度器已關閉錯誤ErrScheduleShutdown = errors.New("schedule: schedule is already in shutdown")
)const invalidTaskId = 0type TaskId uint32// Schedule 延遲調度的結構體,提供延遲調度任務的全部方法
// 通過NewSchedule方法創建Schedule,通過Schedule、ScheduleOne方法添加延遲調度任務,通過CancelTask方法取消任務,通過Shutdown停止延遲任務
type Schedule struct {//任務堆,按時間排序taskHeap taskHeaptaskMap map[TaskId]*Task//調度器是否運行中running atomic.Bool//下一個任務idnextTaskId atomic.Uint32//任務運行池pool *ants.Pool//添加任務ChanaddTaskChan chan *Task//刪除任務ChanstopTaskChan chan struct{}//取消任務ChancancelTaskChan chan TaskId
}// NewSchedule 構建一個Schedule
// workerNum 工作的協程數量,options ants協程池的配置,除了WithMaxBlockingTasks不能配置,別的都可以,具體參考:https://github.com/panjf2000/ants
func NewSchedule(workerNum int, options ...ants.Option) (*Schedule, error) {//延遲任務的最大任務數量必須不限制options = append(options, ants.WithMaxBlockingTasks(0))//創建一個協程池pool, err := ants.NewPool(workerNum)if err != nil {return nil, err}//創建一個延遲調度結構體s := &Schedule{taskHeap: make(taskHeap, 0),taskMap: make(map[TaskId]*Task),running: atomic.Bool{},nextTaskId: atomic.Uint32{},pool: pool,addTaskChan: make(chan *Task),stopTaskChan: make(chan struct{}),cancelTaskChan: make(chan TaskId),}//啟動調度 會開啟一個協程去將即將要調度的任務添加到協程池中運行s.start()return s, nil
}// ScheduleOne 添加延遲調度任務,只調度一次
// job 執行的方法 duration 周期間隔,如果是負數立馬執行,如果是負數立馬且只執行一次
func (s *Schedule) ScheduleOne(job func(), duration time.Duration) (uint32, error) {return s.doSchedule(job, duration, true)
}// Schedule 添加延遲調度任務,重復調度
// job 執行的方法 duration 周期間隔,如果是負數立馬且只執行一次
func (s *Schedule) Schedule(job func(), duration time.Duration) (uint32, error) {return s.doSchedule(job, duration, false)
}// doSchedule 添加延遲調度任務的具體實現
func (s *Schedule) doSchedule(job func(), duration time.Duration, onlyOne bool) (uint32, error) {if s.running.Load() {//如果是負數 只執行一次if duration <= 0 {onlyOne = true}nextTaskId := s.getNextTaskId()task := new(Task)task.job = jobtask.executeTime = time.Now().Add(duration)task.onlyOne = onlyOnetask.duration = durationtask.id = TaskId(nextTaskId)task.index = 0s.addTaskChan <- taskreturn uint32(task.id), nil} else {return invalidTaskId, ErrScheduleShutdown}
}// CancelTask 取消延遲調度任務
// taskId 任務id
func (s *Schedule) CancelTask(taskId uint32) {if s.running.Load() {if taskId != invalidTaskId {s.cancelTaskChan <- TaskId(taskId)}}
}// Shutdown 結束延遲任務調度
func (s *Schedule) Shutdown() {//通過cas設值if s.running.CompareAndSwap(true, false) {s.stopTaskChan <- struct{}{}}
}// IsShutdown 延遲任務調度是否關閉
func (s *Schedule) IsShutdown() bool {return !s.running.Load()
}// start 啟動延遲任務調度
func (s *Schedule) start() {s.running.Store(true)go func() {for {now := time.Now()var timer *time.Timer//如果沒有任務提交,睡眠等待任務if s.taskHeap.Len() == 0 {timer = time.NewTimer(math.MaxUint16 * time.Hour)} else {task := s.taskHeap.Peek()//設置執行間隔timer = time.NewTimer(task.executeTime.Sub(now))}select {case <-timer.C://到達第一個任務執行時間task := heap.Pop(&s.taskHeap).(*Task)//提交到線程池執行,返回的error不需要處理,因為任務池是無限大_ = s.pool.Submit(task.job)//單次執行則刪除,多次執行,則更新if task.onlyOne {s.removeTask(false, task)} else {s.updateTask(task)}case taskId := <-s.cancelTaskChan:timer.Stop()//如果取消的任務id在待執行任務列表中,則刪除任務if task, ok := s.taskMap[taskId]; ok {s.removeTask(true, task)}case task := <-s.addTaskChan:timer.Stop()//添加任務s.addTask(task)case <-s.stopTaskChan:timer.Stop()//關閉資源s.close()return}}}()
}// updateTask 更新延遲調度任務
func (s *Schedule) updateTask(executedTask *Task) {//拷貝 并設置新的執行時間和IDtask := *executedTasktask.executeTime = time.Now().Add(task.duration)//把已執行的任務刪除s.removeTask(false, executedTask)//添加新的任務s.addTask(&task)
}// removeTask 移除任務
func (s *Schedule) removeTask(removeHeap bool, task *Task) {//從Map和堆中delete(s.taskMap, task.id)if removeHeap {heap.Remove(&s.taskHeap, task.index)}
}// addTask 添加任務
func (s *Schedule) addTask(task *Task) {heap.Push(&s.taskHeap, task)s.taskMap[task.id] = task
}// getNextTaskId 獲取下一個任務id
func (s *Schedule) getNextTaskId() uint32 {taskId := s.nextTaskId.Add(1)if taskId == invalidTaskId {taskId = s.nextTaskId.Add(1)}return taskId
}// close 關閉Schedule資源和協程池的資源
func (s *Schedule) close() {//關閉所有資源并設置為 nil help gcs.taskHeap = nils.taskMap = nils.pool.Release()s.pool = nilclose(s.addTaskChan)close(s.cancelTaskChan)close(s.stopTaskChan)s.addTaskChan = nils.cancelTaskChan = nils.stopTaskChan = nil
}// Task 調度任務結構體,是一個調度任務的實體信息
type Task struct {// 任務idid TaskId// 執行的時間,每次執行完,如果重復調度就重新計算executeTime time.Time// 周期間隔duration time.Duration// 執行的任務job func()// 是否只執行一次onlyOne bool//所在堆數組的下標位置index int
}// 任務的堆,使用隊只需要在添加的時候進行排序,堆頂是最先要執行的任務
type taskHeap []*Task// 下面都是堆接口的實現func (t *taskHeap) Len() int {return len(*t)
}
func (t *taskHeap) Less(i, j int) bool {return (*t)[i].executeTime.Before((*t)[j].executeTime)
}func (t *taskHeap) Swap(i, j int) {(*t)[i], (*t)[j] = (*t)[j], (*t)[i](*t)[i].index = i(*t)[j].index = j
}func (t *taskHeap) Push(x interface{}) {*t = append(*t, x.(*Task))
}func (t *taskHeap) Pop() interface{} {old := *tn := len(old)x := old[n-1]old[n-1] = nil*t = old[:n-1]return x
}// Peek 查看堆頂元素,非堆接口的實現
func (t *taskHeap) Peek() *Task {return (*t)[0]
}