常見模式之四:工作池/協程池模式
定義
顧名思義,就是有固定數量的工人(協程),去執行批量的任務
使用場景
-
適用于需要限制并發執行任務數量的情況
-
創建一個固定大小的 goroutine 池,將任務分發給池中的 goroutine 并等待它們完成,使用帶緩沖的通道來接收任務,以避免阻塞主線程
示例
有生產需求,建議使用大佬寫的?ants庫 ,以下是模擬協程池的簡單示例
假設,我們有固定數量(2個)工人執行批量(4個)任務
package mainimport ("fmt""sync"
)type Task struct {TaskFunc func() interface{}
}type WorkerPool struct {Size intWg *sync.WaitGroupTasks chan TaskResults chan Result
}type Result struct {ID intRes interface{}
}func NewWorkerPool(workerNum, taskBufSize int) *WorkerPool {return &WorkerPool{Size: workerNum,Wg: &sync.WaitGroup{},Tasks: make(chan Task, taskBufSize),Results: make(chan Result, taskBufSize),}
}func (w *WorkerPool) AddTask(task Task) {w.Tasks <- task
}func (w *WorkerPool) Run() {for i := 1; i <= w.Size; i++ {w.Wg.Add(1)go func(id int) {defer w.Wg.Done()w.Work(id, w.Tasks, w.Results)}(i)}
}func (w *WorkerPool) Work(Id int, tasks chan Task, results chan Result) {for task := range tasks {results <- Result{ID: Id,Res: task.TaskFunc(),}}
}func main() {pool := NewWorkerPool(3, 10)pool.Run()pool.AddTask(Task{TaskFunc: func() interface{} {return 2 * 3}})pool.AddTask(Task{TaskFunc: func() interface{} {return 4 * 5}})pool.AddTask(Task{TaskFunc: func() interface{} {return 6 * 7}})pool.AddTask(Task{TaskFunc: func() interface{} {return 8 * 9}})close(pool.Tasks)go func() {pool.Wg.Wait()close(pool.Results)}()for v := range pool.Results {fmt.Println(v.ID, v.Res)}
}