Go語言底層(五): 深入淺出Go語言的ants協程池

在 Go 語言中,goroutine 的輕量特性使得高并發編程變得異常簡單。然而,隨著并發量的增加,頻繁創建對象和無限制啟動 goroutine 也可能帶來內存浪費、GC 壓力和資源搶占等問題。為了解決這些隱患,協程池成為常用的優化手段。用于控制并發數量、避免系統過載。本文將簡要介紹golang 中大名鼎鼎的 ants 協程池庫的實現原理。


ants包倉庫 : https://github.com/panjf2000/ants

為什么用協程池?

  • 提升性能:主要面向一類場景:大批量輕量級并發任務,任務執行成本與協程創建/銷毀成本量級接近;
  • 動態調配并發資源 : 能夠動態調整所需的協程數量以及各個模塊的并發度上限;
  • 協程生命周期控制:實時查看當前全局并發的協程數量;有一個統一的緊急入口釋放全局協程.

1. 使用方法

安裝ants

go get -u github.com/panjf2000/ants/v2

1.1 創建協程池 NewPool(size int)

用于創建一個容量為 size 的協程池。默認情況下,協程池不會自動擴容,因此超出容量限制的任務會等待空閑 worker。

import "github.com/panjf2000/ants/v2"var pool *ants.Poolfunc init() {var err errorpool, err = ants.NewPool(10) // 創建容量為10的協程池if err != nil {log.Fatalf("Failed to create goroutine pool: %v", err)}
}
  • NewPool() 返回的是一個可復用的固定容量協程池,內部通過任務隊列與 worker 協同處理。

1.2 提交任務 Submit(task func())

協程池的核心方法

// Submit submits a task to the pool.
//
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
// but what calls for special attention is that you will get blocked with the last
// Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a Pool with ants.WithNonblocking(true).
func (p *Pool) Submit(task func()) error 

使用 Submit() 提交一個函數類型任務給協程池異步執行

示例 :

err := pool.Submit(func() {fmt.Println("Task executed by goroutine:", runtime.NumGoroutine())
})
if err != nil {log.Println("Failed to submit task:", err)
}
  • 每次調用 Submit() 不會阻塞主線程。

  • 如果當前運行的 goroutine 已達到上限,任務將等待空閑 worker。

1.3 釋放協程池 Release()

釋放協程池資源,釋放后協程池不再接受新的任務提交。

pool.Release()

?? 注意:一旦調用 Release(),協程池將被永久關閉,不能再次使用。再次提交任務將 panic。

1.4 查詢當前運行數 Running()

適合用于實時監控協程池負載狀態。

fmt.Printf("Running goroutines: %d\n", pool.Running())

適合用于實時監控協程池負載狀態。

1.5 池容量

獲取池容量 Cap()
返回協程池的最大容量(即最大 goroutine 數量)。可用于與 Running() 搭配分析使用率。

fmt.Printf("Pool capacity: %d\n", pool.Cap())

動態調整容量 Tune(newSize int)
在運行時動態調整協程池容量,適應系統負載變化。

pool.Tune(20) // 將容量調整為20
  • 擴容會立即生效。
  • 縮容后,多余的 worker 會在任務完成后自動回收。
  • Tune() 不會中斷正在執行的任務。

流程

請添加圖片描述

2. 底層實現

原理篇前置知識
詳細請看以往文章 : Go語言底層(三): sync 包鎖與對象池

2.1 核心數據結構

2.1.1 goWorker

type goWorker struct {pool *Pooltask chan func()recycleTime time.Time
}

goWorker 就是我們協程池里的實例 , 簡單理解為一個長時間運行而不回收的協程,用于反復處理用戶提交的異步任務

  • pool:goWorker 所屬的協程池;

  • task:goWorker 用于接收異步任務包的管道;

  • recycleTime:goWorker 回收到協程池的時間.

2.1.2 Pool

type Pool struct {capacity int32running int32lock sync.Lockerworkers workerArraystate int32cond *sync.CondworkerCache sync.Poolwaiting int32heartbeatDone int32stopHeartbeat context.CancelFuncoptions *Options
}
  • capacity:池子的容量
  • running:出于運行中的協程數量
  • lock:自制的自旋鎖,保證取 goWorker 時并發安全
  • workers:goWorker 列表,即“真正意義上的協程池”
  • state:池子狀態標識,0-打開;1-關閉
  • cond:并發協調器,用于阻塞模式下,掛起和喚醒等待資源的協程
  • waiting:標識出于等待狀態的協程數量;
  • heartbeatDone:標識回收協程是否關閉;
  • stopHeartbeat:用于關閉回收協程的控制器函數;
  • options:一些定制化的配置.
  • workerCache:存放 goWorker 的對象池,用于緩存釋放的 goworker 資源用于復用. 對象池需要區別于協程池,協程池中的
    goWorker 仍存活,進入對象池的 goWorker 邏輯意義已經銷毀;

請添加圖片描述

2.1.3 workerArray

type workerArray interface {len() intisEmpty() boolinsert(worker *goWorker) errordetach() *goWorkerretrieveExpiry(duration time.Duration) []*goWorkerreset()
}

該 interface 主要定義了作為數據集合的幾個通用 api,以及用于回收過期 goWorker 的 api.

  • insert 插入一個 goWorker
  • detach 取出一個 goWorker
  • retrieveExpiry 獲取池中空閑時間超過 duration 的 已經過期的 goWorker 集合 ,其中 goWorker 的回收時間與入棧先后順序相關,因此可以借助 binarySearch 方法基于二分法快速獲取到目標集合.

2.2 核心方法的實現

2.2.1 NewPool 創建協程池

func NewPool(size int, options ...Option) (*Pool, error) {// 讀取用戶配置,做一些前置校驗,默認值賦值等前處理動作...opts := loadOptions(options...)// 構造好 Pool 數據結構;p := &Pool{capacity: int32(size),lock:     internal.NewSpinLock(),options:  opts,}// 構造對象池p.workerCache.New = func() interface{} {return &goWorker{pool: p,task: make(chan func(), workerChanCap),}}// 構造好 goWorker 對象池 workerCache,聲明好工廠函數;p.workers = newWorkerArray(stackType, 0)//  golang 標準庫提供的并發協調器,用于實現指定條件下阻塞和喚醒協程的操作.p.cond = sync.NewCond(p.lock)// 異步啟動 goWorker 過期銷毀協程.var ctx context.Contextctx, p.stopHeartbeat = context.WithCancel(context.Background())go p.purgePeriodically(ctx)return p, nil
}

2.2.2 pool.Submit 提交任務

func (p *Pool) Submit(task func()) error {// 從 Pool 中取出一個可用的 goWorker;var w *goWorkerif w = p.retrieveWorker(); w == nil {return ErrPoolOverload}// 將用戶提交的任務包添加到 goWorker 的 channel 中.w.task <- taskreturn nil
}

取出goWorker 的實現:

func (p *Pool) retrieveWorker() (w *goWorker) {// 聲明了一個構造 goWorker 的函數 spawnWorker 用于兜底,從對象池 workerCache 中獲取 goWorker;spawnWorker := func() {w = p.workerCache.Get().(*goWorker)w.run()}p.lock.Lock()// 嘗試從池中取出一個空閑的 goWorker;w = p.workers.detach()if w != nil { p.lock.Unlock()// 倘若池子容量未超過上限, 從對象池中取出一個 goWorker } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {p.lock.Unlock()spawnWorker()} else { // 倘若池子容量超限,且池子為非阻塞模式,直接拋回錯誤;if p.options.Nonblocking {p.lock.Unlock()return}// 倘若池子容量超限,且池子為阻塞模式,則基于并發協調器 cond 掛起等待有空閑 worker;retry:// 若阻塞任務已達最大限制,也直接返回;if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {p.lock.Unlock()return}// 增加等待數并使用 cond 條件變量掛起當前協程;p.addWaiting(1)p.cond.Wait() // block and wait for an available workerp.addWaiting(-1)// 被喚醒后(可能是因為 scavenger 清理協程),判斷是否還有運行中的 worker;var nw intif nw = p.Running(); nw == 0 { // awakened by the scavengerp.lock.Unlock()spawnWorker()return}// 再次嘗試重新獲取一個空閑 worker;if w = p.workers.detach(); w == nil {if nw < p.Cap() {p.lock.Unlock()spawnWorker()return}goto retry}// 獲取到了可用 worker,解鎖并返回;p.lock.Unlock()}return
}

2.2.3 goWorker 運行

func (w *goWorker) run() {w.pool.addRunning(1)go func() {defer func() {w.pool.addRunning(-1)w.pool.workerCache.Put(w)if p := recover(); p != nil {// panic 后處理}w.pool.cond.Signal()}()for f := range w.task {if f == nil {return}f()if ok := w.pool.revertWorker(w); !ok {return}}}()
  • 循環 + 阻塞等待,直到獲取到用戶提交的異步任務包 task 并執行;
  • 執行完成 task 后,會將自己交還給協程池;
  • 倘若回歸協程池失敗,或者用戶提交了一個空的任務包,則該 goWorker 會被銷毀,銷毀方式是將自身放回協程池的對象池 workerCache. 并且會調用協調器 cond 喚醒一個阻塞等待的協程.

參考文章 : 小徐的編程世界

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/84937.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/84937.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/84937.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

React Native【實戰范例】網格導航 FlatList

import React from "react"; import {FlatList,Image,SafeAreaView,StyleSheet,Text,View, } from "react-native"; interface GridItem {id: string;title: string;imageUrl: string; } // 網格布局數據 const gridData Array.from({ length: 30 }, (_, …

KJY0047-J1階段測試

KJY0047 - J1階段測試題解 題目1&#xff1a;SYAP0001. 闖關 解題思路&#xff1a; 暴力思路&#xff1a;每次碰到奇數都使用一次 f o r for for 循環將后續的數值 1 1 1, 時間復雜度 O ( n 2 ) O(n^2) O(n2) 優化思路&#xff1a;可以用一個計數器 c n t cnt cnt 來存…

鍵盤按鍵枚舉 Key 說明文檔

鍵盤按鍵枚舉 Key 說明文檔 該文檔介紹了 Key 枚舉中定義的鍵盤按鍵常量及其對應編號&#xff0c;適用于標準 105 鍵的美式鍵盤布局。常用于瀏覽器或桌面端的鍵盤事件監聽、游戲開發、快捷鍵映射等場景。 electron-jest ?? 功能鍵&#xff08;Function Keys&#xff09; …

函數調用過程中的棧幀變化

int add(int a, int b) {int c a b;return c; }int main() {int result add(1, 2);return 0; }生成匯編代碼&#xff1a;g -S Cplus.cpp -o Cplus.s .file "Cplus.cpp".text.globl _Z3addii.def _Z3addii; .scl 2; .type 32; .endef.seh_proc _Z3addii _Z3addii:p…

【Java面試筆記:實戰】41、Java面試核心考點!AQS原理及應用生態全解析

引言:AQS在Java并發體系中的核心地位 AQS(AbstractQueuedSynchronizer)作為Java并發包的底層基石,是理解ReentrantLock、Semaphore等同步工具的關鍵。 在Java架構師面試中,AQS的原理與應用是高頻考點,掌握其核心機制對理解JUC包和構建高并發系統至關重要。 本文將從原…

碩士課題常用命令

ros常用命令&#xff1a; 1.環境變量刷新 source devel/setup.bash2.ROS_INFO的信息在終端顯示為亂碼或者問號&#xff0c;則在main函數中加入&#xff1a; setlocale(LC_ALL, "");3.刷新bashrc文件 source ~/.bashrcPX4 roslaunch px4 mavros_posix_sitl.launc…

2.6 激光雷達消息格式

新建終端&#xff0c;執行命令 roslaunch wpr_simulation wpb_simple.launch 在新建終端&#xff0c;執行命令 roslaunch wpr_simulation wpb_rviz.launch 顯示/Scan話題消息&#xff0c;后面的參數是noarr無數組&#xff0c;防止刷屏 rostopic echo /scan --noarr 參考官…

常見的網絡協議有哪些

1.應用層 1.1 HTTP/HTTPS 前端與服務器通信的基礎協議&#xff0c;用于傳輸 HTML、CSS、JS、圖片等資源。 1.2WebSocket&#xff08;如社交聊天、股票實時報價、視頻會議、在線教育等&#xff09; WebSocket協議建立在TCP協議之上&#xff0c;實現了瀏覽器與服務器之間的實時…

Prometheus + Grafana 監控 RabbitMQ 實踐指南

文章目錄 Prometheus Grafana 監控 RabbitMQ 實踐教程一、前言二、環境搭建2.1 環境準備2.2 安裝 Prometheus2.3 安裝 Grafana 三、集成 RabbitMQ Exporter3.1 下載 RabbitMQ Exporter3.2 解壓文件3.3 配置環境變量3.4 啟動 RabbitMQ Exporter3.6 驗證 Exporter 狀態 四、Prom…

Babylon.js場景加載器(Scene Loader)使用指南

在3D開發中&#xff0c;Babylon.js的場景加載器(Scene Loader)是加載各種3D模型格式的核心工具。本文將詳細介紹如何高效使用Scene Loader加載多種格式的3D模型文件。 一、基本概念與支持格式 要加載特定類型的文件&#xff0c;Babylon.js需要先注冊對應的文件類型插件。目前…

編程學習網站大全(C++/OpenCV/QT方向)—— 資源導航與深度評測

工欲善其事&#xff0c;必先利其器 本文系統整理了C、OpenCV、QT三大方向的優質學習網站&#xff0c;結合技術特點與平臺優勢&#xff0c;助你精準選擇學習資源&#xff0c;少走彎路&#xff01; 一、C 學習網站精選 &#x1f4da; 1. cppreference.com 權威性最高&#xff1a…

逆向入門(5)程序逆向篇-AD_CM#2

打開程序 常規注冊界面&#xff0c;打開OD&#xff0c;隨便找找就看到關鍵字了 沒有殼邏輯也挺簡單的 獲取輸入框&#xff0c;用5比較輸入內容的長度&#xff0c;小于則跳轉提示密碼長度不夠 否則就進入下一個流程&#xff0c;去獲取序列號&#xff0c;其實可以直接將jnz換…

OD 算法題 B卷【路燈照明II】

文章目錄 路燈照明II 路燈照明II 在一條筆直的公路上安裝了N個路燈&#xff0c;從位置0開始安裝&#xff0c;間距固定為100米&#xff1b;每個路燈都有自己的照明半徑&#xff0c;計算第一個路燈和最后一個路燈之間&#xff0c;無法照明的區間長度和&#xff1b; 輸入描述: 第…

JUC核心解析系列(四)——同步工具類 (Synchronizers)深度解析

在多線程開發中&#xff0c;死鎖、資源競爭、線程協調等問題如同暗礁&#xff0c;稍有不慎就會導致程序崩潰。而JUC同步工具類正是解決這些問題的瑞士軍刀&#xff01; 一、同步工具類核心價值&#xff1a;線程協作的藝術 在高并發系統中&#xff0c;線程協作是保證數據一致性…

板凳-------Mysql cookbook學習 (十--6)

第7章&#xff1a;排序查詢結果 7.0 引言 mysql> use cookbook Database changed mysql> select * from driver_log; ---------------------------------- | rec_id | name | trav_date | miles | ---------------------------------- | 1 | Ben | 2014-07-30 …

從入門到精通:C# 中 AutoMapper 的深度解析與實戰應用

在 C# 開發領域&#xff0c;尤其是企業級應用開發過程中&#xff0c;不同層次和模塊之間的數據傳遞與對象轉換是常見需求。例如&#xff0c;從數據庫讀取的實體類&#xff0c;在傳遞到前端時&#xff0c;往往需要轉換為更簡潔、安全的數據傳輸對象&#xff08;DTO&#xff09; …

【熱更新知識】學習一 Lua語法學習

1、注釋 1.1 單行注釋 --注釋內容 --單行注釋 print打印函數 1.2 多行注釋&#xff0c;三種方式 --[[注釋內容]] --[[注釋內容]]-- --[[注釋內容--]] --[[ 多行 注釋 ]]--[[ 第二種多行注釋 1 2 ]]----[[ 第三種 多行 注釋 --]] 2、簡單變量 2.1 聲明變量&#xff0c…

React 第三方狀態管理庫的比較與選擇

在現代前端開發中,狀態管理是一個重要的環節。選擇合適的狀態管理庫可以極大地提高項目的可維護性和開發效率。本文將對幾種流行的狀態管理庫進行比較,包括Valtio、XState、MobX、Recoil和Zustand,幫助開發者在實際項目中做出明智的選擇。 1. Valtio 1.1. 設計理念 Valti…

《Kafka 在實時消息系統中的高可用架構設計》

Kafka 在實時消息系統中的高可用架構設計 引言 在當今互聯網社交應用中&#xff0c;實時消息系統已成為核心基礎設施。以中性互聯網公司為例&#xff0c;其每天需要處理數十億條消息&#xff0c;涵蓋一對一聊天、群組互動、直播彈幕等多種場景。特別是在大型直播活動中&#…

SKUA-GOCAD入門教程-第八節 線的創建與編輯3

8.1.4根據面對象創建曲線 (1)從曲面生成曲線 從曲面邊界生成曲線您可以從選定的曲面邊界創建一條單段曲線。 1、選擇 Curve commands > New > Borders > One 打開從曲面的一條邊界創建曲線對話框。 圖1 在“Name名稱”框中,輸入要創建的曲線的名稱。