在 Go 語言中,goroutine 的輕量特性使得高并發編程變得異常簡單。然而,隨著并發量的增加,頻繁創建對象和無限制啟動 goroutine 也可能帶來內存浪費、GC 壓力和資源搶占等問題。為了解決這些隱患,協程池成為常用的優化手段。用于控制并發數量、避免系統過載。本文將簡要介紹golang 中大名鼎鼎的 ants 協程池庫的實現原理。
ants包倉庫 : https://github.com/panjf2000/ants
為什么用協程池?
- 提升性能:主要面向一類場景:大批量輕量級并發任務,任務執行成本與協程創建/銷毀成本量級接近;
- 動態調配并發資源 : 能夠動態調整所需的協程數量以及各個模塊的并發度上限;
- 協程生命周期控制:實時查看當前全局并發的協程數量;有一個統一的緊急入口釋放全局協程.
1. 使用方法
安裝ants
庫
go get -u github.com/panjf2000/ants/v2
1.1 創建協程池 NewPool(size int)
用于創建一個容量為 size 的協程池。默認情況下,協程池不會自動擴容,因此超出容量限制的任務會等待空閑 worker。
import "github.com/panjf2000/ants/v2"var pool *ants.Poolfunc init() {var err errorpool, err = ants.NewPool(10) // 創建容量為10的協程池if err != nil {log.Fatalf("Failed to create goroutine pool: %v", err)}
}
- NewPool() 返回的是一個可復用的固定容量協程池,內部通過任務隊列與 worker 協同處理。
1.2 提交任務 Submit(task func())
協程池的核心方法
// Submit submits a task to the pool.
//
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
// but what calls for special attention is that you will get blocked with the last
// Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a Pool with ants.WithNonblocking(true).
func (p *Pool) Submit(task func()) error
使用 Submit() 提交一個函數類型任務給協程池異步執行
示例 :
err := pool.Submit(func() {fmt.Println("Task executed by goroutine:", runtime.NumGoroutine())
})
if err != nil {log.Println("Failed to submit task:", err)
}
-
每次調用 Submit() 不會阻塞主線程。
-
如果當前運行的 goroutine 已達到上限,任務將等待空閑 worker。
1.3 釋放協程池 Release()
釋放協程池資源,釋放后協程池不再接受新的任務提交。
pool.Release()
?? 注意:一旦調用 Release(),協程池將被永久關閉,不能再次使用。再次提交任務將 panic。
1.4 查詢當前運行數 Running()
適合用于實時監控協程池負載狀態。
fmt.Printf("Running goroutines: %d\n", pool.Running())
適合用于實時監控協程池負載狀態。
1.5 池容量
獲取池容量 Cap()
返回協程池的最大容量(即最大 goroutine 數量)。可用于與 Running() 搭配分析使用率。
fmt.Printf("Pool capacity: %d\n", pool.Cap())
動態調整容量 Tune(newSize int)
在運行時動態調整協程池容量,適應系統負載變化。
pool.Tune(20) // 將容量調整為20
- 擴容會立即生效。
- 縮容后,多余的 worker 會在任務完成后自動回收。
- Tune() 不會中斷正在執行的任務。
流程
2. 底層實現
原理篇前置知識
詳細請看以往文章 : Go語言底層(三): sync 包鎖與對象池
2.1 核心數據結構
2.1.1 goWorker
type goWorker struct {pool *Pooltask chan func()recycleTime time.Time
}
goWorker 就是我們協程池里的實例 , 簡單理解為一個長時間運行而不回收的協程,用于反復處理用戶提交的異步任務
-
pool:goWorker 所屬的協程池;
-
task:goWorker 用于接收異步任務包的管道;
-
recycleTime:goWorker 回收到協程池的時間.
2.1.2 Pool
type Pool struct {capacity int32running int32lock sync.Lockerworkers workerArraystate int32cond *sync.CondworkerCache sync.Poolwaiting int32heartbeatDone int32stopHeartbeat context.CancelFuncoptions *Options
}
- capacity:池子的容量
- running:出于運行中的協程數量
- lock:自制的自旋鎖,保證取 goWorker 時并發安全
- workers:goWorker 列表,即“真正意義上的協程池”
- state:池子狀態標識,0-打開;1-關閉
- cond:并發協調器,用于阻塞模式下,掛起和喚醒等待資源的協程
- waiting:標識出于等待狀態的協程數量;
- heartbeatDone:標識回收協程是否關閉;
- stopHeartbeat:用于關閉回收協程的控制器函數;
- options:一些定制化的配置.
- workerCache:存放 goWorker 的對象池,用于緩存釋放的 goworker 資源用于復用. 對象池需要區別于協程池,協程池中的
goWorker 仍存活,進入對象池的 goWorker 邏輯意義已經銷毀;
2.1.3 workerArray
type workerArray interface {len() intisEmpty() boolinsert(worker *goWorker) errordetach() *goWorkerretrieveExpiry(duration time.Duration) []*goWorkerreset()
}
該 interface 主要定義了作為數據集合的幾個通用 api,以及用于回收過期 goWorker 的 api.
- insert 插入一個
goWorker
- detach 取出一個
goWorker
- retrieveExpiry 獲取池中空閑時間超過 duration 的 已經過期的
goWorker
集合 ,其中goWorker
的回收時間與入棧先后順序相關,因此可以借助binarySearch
方法基于二分法快速獲取到目標集合.
2.2 核心方法的實現
2.2.1 NewPool 創建協程池
func NewPool(size int, options ...Option) (*Pool, error) {// 讀取用戶配置,做一些前置校驗,默認值賦值等前處理動作...opts := loadOptions(options...)// 構造好 Pool 數據結構;p := &Pool{capacity: int32(size),lock: internal.NewSpinLock(),options: opts,}// 構造對象池p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}// 構造好 goWorker 對象池 workerCache,聲明好工廠函數;p.workers = newWorkerArray(stackType, 0)// golang 標準庫提供的并發協調器,用于實現指定條件下阻塞和喚醒協程的操作.p.cond = sync.NewCond(p.lock)// 異步啟動 goWorker 過期銷毀協程.var ctx context.Contextctx, p.stopHeartbeat = context.WithCancel(context.Background())go p.purgePeriodically(ctx)return p, nil
}
2.2.2 pool.Submit 提交任務
func (p *Pool) Submit(task func()) error {// 從 Pool 中取出一個可用的 goWorker;var w *goWorkerif w = p.retrieveWorker(); w == nil {return ErrPoolOverload}// 將用戶提交的任務包添加到 goWorker 的 channel 中.w.task <- taskreturn nil
}
取出goWorker
的實現:
func (p *Pool) retrieveWorker() (w *goWorker) {// 聲明了一個構造 goWorker 的函數 spawnWorker 用于兜底,從對象池 workerCache 中獲取 goWorker;spawnWorker := func() {w = p.workerCache.Get().(*goWorker)w.run()}p.lock.Lock()// 嘗試從池中取出一個空閑的 goWorker;w = p.workers.detach()if w != nil { p.lock.Unlock()// 倘若池子容量未超過上限, 從對象池中取出一個 goWorker } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {p.lock.Unlock()spawnWorker()} else { // 倘若池子容量超限,且池子為非阻塞模式,直接拋回錯誤;if p.options.Nonblocking {p.lock.Unlock()return}// 倘若池子容量超限,且池子為阻塞模式,則基于并發協調器 cond 掛起等待有空閑 worker;retry:// 若阻塞任務已達最大限制,也直接返回;if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {p.lock.Unlock()return}// 增加等待數并使用 cond 條件變量掛起當前協程;p.addWaiting(1)p.cond.Wait() // block and wait for an available workerp.addWaiting(-1)// 被喚醒后(可能是因為 scavenger 清理協程),判斷是否還有運行中的 worker;var nw intif nw = p.Running(); nw == 0 { // awakened by the scavengerp.lock.Unlock()spawnWorker()return}// 再次嘗試重新獲取一個空閑 worker;if w = p.workers.detach(); w == nil {if nw < p.Cap() {p.lock.Unlock()spawnWorker()return}goto retry}// 獲取到了可用 worker,解鎖并返回;p.lock.Unlock()}return
}
2.2.3 goWorker 運行
func (w *goWorker) run() {w.pool.addRunning(1)go func() {defer func() {w.pool.addRunning(-1)w.pool.workerCache.Put(w)if p := recover(); p != nil {// panic 后處理}w.pool.cond.Signal()}()for f := range w.task {if f == nil {return}f()if ok := w.pool.revertWorker(w); !ok {return}}}()
- 循環 + 阻塞等待,直到獲取到用戶提交的異步任務包 task 并執行;
- 執行完成 task 后,會將自己交還給協程池;
- 倘若回歸協程池失敗,或者用戶提交了一個空的任務包,則該 goWorker 會被銷毀,銷毀方式是將自身放回協程池的對象池 workerCache. 并且會調用協調器 cond 喚醒一個阻塞等待的協程.
參考文章 : 小徐的編程世界