1 協程基礎
1.1 協程定義(Goroutine)
- 概念:Go 語言特有的輕量級線程,由 Go 運行時(runtime)管理,相比系統線程(Thread),創建和銷毀成本極低,占用內存小(初始 2KB)。協程是 Go 程序中最基本的并發執行單元。
-
創建方式:使用
go
關鍵字啟動一個協程func main() {// 匿名函數直接啟動協程go func() {fmt.Println("Hello from goroutine!")}() // 調用已定義函數啟動協程go func1()go func2()time.Sleep(time.Second) // 等待協程執行,否則主協程退出導致所有協程終止fmt.Println("主協程退出") }
1.2?協程調度模型GMP
Go 調度器采用?Goroutine-Machine-Processor (GMP)?模型,核心組件包括:
- G (Goroutine):協程的抽象,包含執行棧、程序計數器等信息。
- M (Machine):? ?對應操作系統線程,實際執行代碼的實體。
- P (Processor):邏輯處理器,持有運行隊列(Local Queue)和 G 上下文,必須綁定 M 才能執行 G。
M 是 “執行任務的實體”,是唯一能運行 Go 代碼的載體。G(任務)本身只是一段代碼邏輯,必須依賴 M(操作系統線程)才能在 CPU 上執行?
M和P是綁定關系,必須成對出現
1.2.1 協程創建
- 當調用?
go func()
?時,創建一個新的 G 對象,放入當前 P 的 Local Queue。 - 若 Local Queue 已滿(默認 256 個 G),將一半 G 轉移到全局隊列(Global Queue)。
1.2.2 協程執行
- M 從綁定的 P 的 Local Queue 獲取 G 執行。
- 若 Local Queue 為空,從 Global Queue 批量獲取 G(通常為 P 的 GOMAXPROCS/2)。
- 若 Global Queue 也為空,從其他 P 的 Local Queue?竊取(Work Stealing)?一半 G。
1.2.3 協程阻塞 / 喚醒
- 當 G 執行系統調用(如 I/O)時,M 與 P 解綁,P 可被其他 M 接管繼續執行隊列中的 G。
如果 M 因系統調用被阻塞時,P 繼續綁定 M,會導致以下問題:
- P 無法工作:P 的本地隊列中可能有大量就緒的 G,但由于 M 被阻塞,這些 G 無法執行。
- CPU 核心浪費:如果 P 對應一個 CPU 核心,該核心將處于閑置狀態,即使還有其他任務可執行。
- 因此,當 G 執行系統調用時,調度器會?解綁 M 和 P,允許 P 繼續工作,避免 CPU 資源浪費
- 系統調用返回后,G 重新加入某個 P 的隊列等待執行。
2?并發模式?
2.1 共享內存并發
多個協程通過共享變量訪問數據,需使用同步原語(如sync.Mutex
、sync.RWMutex
)保護臨界區
var (counter intmu sync.Mutex
)func increment() {mu.Lock()defer mu.Unlock()counter++
}func main() {var wg sync.WaitGroupfor i := 0; i < 1000; i++ {wg.Add(1)go func() {defer wg.Done()increment()}()}wg.Wait()fmt.Println("Counter:", counter) // 輸出1000,無競爭
}
2.2?CSP 并發(通過通道通信)
使用channel
實現協程間通信和同步,遵循 “不要通過共享內存來通信,而要通過通信來共享內存” 原則
func producer(ch chan<- int) {for i := 0; i < 5; i++ {ch <- i}close(ch)
}func consumer(ch <-chan int) {for num := range ch {fmt.Println("Received:", num)}
}func main() {ch := make(chan int)go producer(ch)consumer(ch)
}
2.3?并發任務控制
// 普通的協程創建方法:
go func() {// your code1
}()
go func() {// your code2
}()
// go on
這段 Go 代碼的執行順序如下:
-
啟動 goroutine 1:主協程創建并啟動第一個匿名函數(
// your code1
),該函數在后臺異步執行。 -
啟動 goroutine 2:主協程緊接著創建并啟動第二個匿名函數(
// your code2
),同樣在后臺異步執行。 -
主協程繼續執行:主協程不會等待這兩個 goroutine 完成,而是立即繼續執行
// go on
之后的代碼。 -
并行執行 goroutine:
// your code1
和// your code2
的執行順序取決于調度器,可能并行或交替執行,但它們的完成順序不確定。由于主協程未等待它們,若主協程提前結束(例如程序退出),這兩個 goroutine 可能被強制終止。
2.3.1?sync.WaitGroup
wg.Wait()會阻塞直到2個協程執行完后
go func() {
// func1wg.Done()
}()
go func() {
// func2wg.Done()
}()
wg.Wait()
// go on
這段 Go 代碼的執行順序如下:
-
啟動 goroutine 1:主協程創建并啟動第一個匿名函數(
func1
),該函數在后臺異步執行。 -
啟動 goroutine 2:主協程緊接著創建并啟動第二個匿名函數(
func2
),同樣在后臺異步執行。 -
主協程阻塞:主協程執行
wg.Wait()
,進入阻塞狀態,等待所有被等待的 goroutine 完成。 -
并行執行 goroutine:
func1
和func2
的執行順序取決于調度器,可能并行或交替執行,但它們的完成順序不確定。每個 goroutine 在完成任務后調用wg.Done()
通知等待組。 -
恢復主協程:當所有被等待的 goroutine(即
func1
和func2
)都調用了wg.Done()
后,wg.Wait()
返回,主協程繼續執行后續代碼(// go on
)。
? ?主協程 ????????|? ? goroutine 1? ? ??|? ? goroutine 2
---------------------------------------------------------------
wg.Add(2)? ? ? |? ? ? ? ? ? ? ? ? ? ? ? ? ? ?|
啟動func1? ? ? ?|? 開始執行func1? ?|
啟動func2 ??????|? ? ? ? ? ? ? ? ? ? ? ? ? ? | 開始執行func2
wg.Wait()阻塞 |? ????????...???????????????|?????????... ???????????????
????????????????????????| ?????執行完畢? ? ? ?|
????????????????????????| wg.Done()? ? ? ? ? |
????????????????????????|? ? ? ? ? ? ? ? ? ? ? ? ? ? |? ? ? 執行完畢
? ? ? ? ? ? ? ? ? ? ? ? |? ? ? ? ? ? ? ? ? ? ? ? ? ? | ?????wg.Done()
wg.Wait()返回? | ???????????????????????????|
繼續執行后續代碼
2.3.2?errgroup.Group
var g errgroup.Group
g.Go(func() error {// 任務1:可能返回錯誤return nil
})
g.Go(func() error {// 任務2:可能返回錯誤return errors.New("task failed")
})
if err := g.Wait(); err != nil {// 處理首個錯誤(如任務2失敗)
}
執行順序:
-
主協程啟動兩個 goroutine 并行執行
-
若其中一個 goroutine 返回非 nil 錯誤:
-
自動調用內置的
context.CancelFunc
-
向其他 goroutine 發送取消信號(通過 context)
-
g.Wait()
立即返回首個錯誤
-
-
所有 goroutine(包括未出錯的)需主動檢查 context 狀態并提前退出
2.3.3 對比
特性 | errgroup.Group | sync.WaitGroup |
---|---|---|
錯誤處理 | 自動捕獲首個非 nil 錯誤并終止所有 goroutine | 不處理錯誤 |
執行控制 | 首個錯誤發生后,其他 goroutine 會被 CancelFunc 終止 | 所有 goroutine 獨立運行至完成 |
結果聚合 | 可返回首個錯誤,用于統一錯誤處理 | 無內置錯誤傳遞機制 |
取消機制 | 支持通過 context 傳播取消信號 | 無內置取消機制 |
3.?并發panic處理
協程中發生 panic 若未被捕獲,僅會導致該協程崩潰,不會影響其他協程和主程序,但可能導致資源泄漏?
3.1 普通goroutine的panic處理
對于普通的goroutine,可以在協程函數內部使用defer和recover組合來捕獲panic。defer語句會將函數推遲到外層函數返回之前執行,而recover函數用于捕獲panic,它只能在defer修飾的函數中有效
func worker() {defer func() {if r := recover(); r != nil {fmt.Println("Recovered in worker:", r)}}()// 可能觸發panic的代碼var data map[string]intdata["key"] = 1 // 觸發panic: assignment to entry in nil map
}func main() {go worker()time.Sleep(time.Second)fmt.Println("Main continues")
}
3.2?使用 sync.WaitGroup 時的 panic 處理
sync.WaitGroup常用于等待一組goroutine完成任務。在這種場景下,每個goroutine內部仍需使用defer和recover捕獲panic,并且可以通過額外的機制將panic信息傳遞給主協程。?
import ("fmt""sync"
)type Result struct {Err errorData interface{}
}func worker(id int, wg *sync.WaitGroup, resultChan chan<- Result) {defer func() {if r := recover(); r != nil {resultChan <- Result{Err: fmt.Errorf("panic in worker %d: %v", id, r)}}}()// 模擬可能觸發panic的任務if id == 2 {panic("simulated panic")}resultChan <- Result{Data: fmt.Sprintf("Worker %d finished", id)}wg.Done()
}func main() {var wg sync.WaitGroupresultChan := make(chan Result)numWorkers := 3for i := 1; i <= numWorkers; i++ {wg.Add(1)go worker(i, &wg, resultChan)}go func() {wg.Wait()close(resultChan)}()for result := range resultChan {if result.Err != nil {fmt.Println(result.Err)} else {fmt.Println(result.Data)}}
}
????????worker函數通過defer和recover捕獲panic,并將錯誤信息封裝成Result結構體發送到resultChan通道。主協程從通道中接收結果,判斷是否存在錯誤并進行相應處理,確保即使有goroutine發生panic,也能及時獲取信息并繼續執行后續邏輯。?
3.3?使用 errgroup.Group 時的 panic 處理
????????errgroup.Group可以方便地并行執行多個任務,并在其中一個任務出錯時快速返回錯誤。然而,它只能處理函數返回的錯誤,無法自動捕獲goroutine內部的panic。因此,需要手動在每個任務函數中添加panic捕獲邏輯,并將panic轉換為錯誤返回給errgroup.Group。?
3.3.1 方法一:手動封裝panic捕獲
import ("fmt""golang.org/x/sync/errgroup"
)func safeGo(g *errgroup.Group, fn func() error) {g.Go(func() error {defer func() {if r := recover(); r != nil {return fmt.Errorf("panic occurred: %v", r)}}()return fn()})
}func main() {var g errgroup.GroupsafeGo(&g, func() error {// 可能觸發panic的任務panic("unexpected error")return nil})if err := g.Wait(); err != nil {fmt.Println("Error:", err) // 輸出 panic occurred: unexpected error}
}
3.3.2 封裝增強版errgroup
import ("fmt""golang.org/x/sync/errgroup""sync"
)type SafeGroup struct {g errgroup.Groupmu sync.Mutexpanics []interface{}
}func (sg *SafeGroup) Go(fn func() error) {sg.g.Go(func() error {defer func() {if r := recover(); r != nil {sg.mu.Lock()sg.panics = append(sg.panics, r)sg.mu.Unlock()}}()return fn()})
}func (sg *SafeGroup) Wait() error {if err := sg.g.Wait(); err != nil {return err}if len(sg.panics) > 0 {return fmt.Errorf("panics occurred: %v", sg.panics)}return nil
}func main() {var sg SafeGroupsg.Go(func() error {panic("panic in goroutine")return nil})if err := sg.Wait(); err != nil {fmt.Println("Error:", err) // 輸出: panics occurred: [panic in goroutine]}
}
????????這兩種方案都能有效地在errgroup.Group中處理panic,方案一通過簡單的函數封裝,在每個任務中添加panic捕獲;方案二則通過自定義結構體,將panic信息集中管理,在Wait方法中統一返回錯誤,方便在復雜場景下對panic進行更靈活的處理。?