gopool
gopool
是字節跳動開源節流的gopkg
包中協程池的一個實現。
關鍵結構
協程池:
type pool struct {// The name of the poolname string// capacity of the pool, the maximum number of goroutines that are actually working// 協程池的最大容量cap int32// Configuration informationconfig *Config// linked list of tasks// 任務鏈表taskHead *tasktaskTail *tasktaskLock sync.MutextaskCount int32// Record the number of running workers// 運行中的協程數workerCount int32// This method will be called when the worker panic// 出現 panic 時調用、 panicHandler func(context.Context, interface{})
}
任務:
type task struct {ctx context.Contextf func()next *task
}
worker:
type worker struct {pool *pool
}
源碼分析
先說一下 gopool
的工作流程:
- 通過 Go 或者 CtxGo 方法調用
- 從 taskPool 中取出一個 t
- 如果當前的積壓task達到閾值且worker(工作協程)的數量未達到上限,則新建一個worker。
pool.cap
最大工作協程與實際運行的最大協程可能會存在誤差。因為新建worker這塊不是原子操作:
if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {// 工作協程加1p.incWorkerCount()w := workerPool.Get().(*worker)w.pool = pw.run()}
worker 的最大數量不會超過pool.cap
。worker run 流程比較簡單:
- 循環的從 pool 中取出 task 執行
為了方便查看源碼,我把相關代碼都粘到了下面的,詳細流程如下:
var workerPool sync.Poolvar taskPool sync.Pool// 初始化 taskPool
func init() {taskPool.New = newTask
}func (p *pool) Go(f func()) {p.CtxGo(context.Background(), f)
}func (p *pool) CtxGo(ctx context.Context, f func()) {// 從 taskPool 中取 task,避免頻繁創建銷毀t := taskPool.Get().(*task)t.ctx = ctx// 賦值執行函數t.f = f// 將 t 添加到任務鏈表里,加鎖保證并發安全p.taskLock.Lock()if p.taskHead == nil {p.taskHead = tp.taskTail = t} else {p.taskTail.next = tp.taskTail = t}p.taskLock.Unlock()// 任務鏈表數量原子加 1atomic.AddInt32(&p.taskCount, 1)// The following two conditions are met:// 1. the number of tasks is greater than the threshold.// 2. The current number of workers is less than the upper limit p.cap.// or there are currently no workers.// 滿足以下兩個條件:// 1.任務數大于等于設置的閾值(默認為1)// 2.當前的協程數低于上限,或者目前沒有工人if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {// 工作協程加1p.incWorkerCount()w := workerPool.Get().(*worker)w.pool = pw.run()}
}func (w *worker) run() {go func() {for {var t *taskw.pool.taskLock.Lock()if w.pool.taskHead != nil {// 取出任務t = w.pool.taskHeadw.pool.taskHead = w.pool.taskHead.nextatomic.AddInt32(&w.pool.taskCount, -1)}// 沒有任務則結束if t == nil {// if there's no task to do, exitw.close()w.pool.taskLock.Unlock()w.Recycle()return}w.pool.taskLock.Unlock()func() {defer func() {if r := recover(); r != nil {if w.pool.panicHandler != nil {w.pool.panicHandler(t.ctx, r)} else {msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())logger.CtxErrorf(t.ctx, msg)}}}()// 執行t.f()}()t.Recycle()}}()
}func (t *task) Recycle() {t.zero()taskPool.Put(t)
}