寫在文章開頭
本篇文章算是對go語言
系列的一個收尾,通過go語言實現一個實現一個簡單的有界協程池。
Hi,我是 sharkChili ,是個不斷在硬核技術上作死的 java coder ,是 CSDN的博客專家 ,也是開源項目 Java Guide 的維護者之一,熟悉 Java 也會一點 Go ,偶爾也會在 C源碼 邊緣徘徊。寫過很多有意思的技術博客,也還在研究并輸出技術的路上,希望我的文章對你有幫助,非常歡迎你關注我的公眾號: 寫代碼的SharkChili 。
因為近期收到很多讀者的私信,所以也專門創建了一個交流群,感興趣的讀者可以通過上方的公眾號獲取筆者的聯系方式完成好友添加,點擊備注 “加群” 即可和筆者和筆者的朋友們進行深入交流。
詳解go語言協程池的實現
整體交互流程設計
我們希望創建一個協程池,該協程池大小由用戶決定,主協程不斷生產任務并投遞到channel
中,協程池收到任務后,如果發現沒有對應處理的協程worker
則創建一個協程并處理傳入的任務,反之這些任務就會有序得等待協程有序調度執行:
定義worker
基于上圖我們給出worker
的接口定義,按照我們的實現每一個任務都是一個worker
,協程池的協程可以從channel
中得到對應的Worker
并執行其Task
方法:
type Worker interface {Task()
}
聲明協程池
基于worker
我們封裝一個worker
池,也就是本文提到的協程池,可以看到該Pool
有一個worker
的通道用于存放主協程投遞進來的任務,而wg
則用于控制協程的生命周期,這一點我們會在后續的工作代碼中詳盡說明:
type Pool struct {//記錄主協程投遞的任務work chan Worker//控制工作協程的生命周期wg sync.WaitGroup
}
創建協程池
有了協程池的定義之后,我們就可以編寫協程池的,可以看到我們可以通過入參決定channel
和協程的大小,通過傳入maxGoroutines
設置wg
的大小,當協程都沒有任務執行時,才會調用wg
的Done
方法,確保所有任務執行完成后,主協程才能退出:
func New(maxGoroutines int) *Pool {//創建指定協程數的channelp := Pool{work: make(chan Worker, maxGoroutines),}//基于協程數創建倒計時門閂p.wg.Add(maxGoroutines)//創建maxGoroutines個協程獲取channel的任務執行for i := 0; i < maxGoroutines; i++ {go func() {for w := range p.work {w.DoTask()}//任務執行完成且channel關閉之后,按下倒計時門閂p.wg.Done()}()}//返回pool的指針return &p
}
投遞任務
當我們需要投遞任務時,就可以將自實現的worker
投遞到channle
中:
func (p *Pool) Run(w Worker) {//將任務w投遞到channel中p.work <- w
}
關閉協程池
最后我們給出關于協程池關閉的實現,其邏輯比較簡單:
- 關閉
channel
不再接受新任務。 - 調用
waitGroup
的Wait
方法等待所有協程執行完再返回。
func (p *Pool) ShutDown() {close(p.work)p.wg.Wait()
}
測試代碼
最后我們給出本文的測試代碼,使用示例比較簡單:
- 定義一個姓名切片,作為測試數據。
- 創建一個名為
namePrinter
的結構體,內部包含name
屬性,該結構體會繼承Worker
實現打印姓名的Task
方法。 - 創建一個
channel
和協程大小都為2的Pool
。 - 通過多協程循環遍歷
name
切片并將其封裝成namePrinter投遞到chanel中。 - 協程池的協程消費這些打印姓名的任務。
- 調用
shutDown
方法等待協程池內部協程工作完成后退出主協程。
// 創建一個測試用的姓名切片
var names = []string{"user.go-1","user.go-2","user.go-3","user.go-4","user.go-5",
}// 實現worker接口 打印姓名
type namePrinter struct {name string
}func (n *namePrinter) Task() {fmt.Println(n.name)time.Sleep(time.Second)
}func main() {//創建還有兩個協程的poolp := work.New(2)//創建main協程的倒計時門閂var wg sync.WaitGroupwg.Add(100 * len(names))//多協程投遞任務到poolfor i := 0; i < 100; i++ {for _, name := range names {np := namePrinter{name: name,}go func() {p.Run(&np)wg.Done()}()}}//等待任務投遞完成wg.Wait()fmt.Println("執行結束,關閉pool")p.ShutDown()}
小結
自此,本文基于go語言的并發技術實現了一個簡單的協程池,希望對你有所幫助。而go語言系列也到此告一段落。
我是 sharkchili ,CSDN Java 領域博客專家,開源項目—JavaGuide contributor,我想寫一些有意思的東西,希望對你有幫助,如果你想實時收到我寫的硬核的文章也歡迎你關注我的公眾號: 寫代碼的SharkChili 。
因為近期收到很多讀者的私信,所以也專門創建了一個交流群,感興趣的讀者可以通過上方的公眾號獲取筆者的聯系方式完成好友添加,點擊備注 “加群” 即可和筆者和筆者的朋友們進行深入交流。