文章目錄
- Go語言并發編程
- **簡單介紹**
- goroutine channel 實現并發和并行
- for循環開啟多個協程
- Channel管道
- goroutine 結合 channel 管道
- **goroutine 結合 channel打印素數**
- 單向管道
- Select多路復用
- Goroutine Recover解決協程中出現的Panic
- Go中的并發安全和互斥鎖
Go語言并發編程
簡單介紹
Go語言并發編程是通過goroutine(輕量級線程)和channel(通道)實現的高效并發模型,其核心特點是"不要通過共享內存來通信,而要通過通信來共享內存"。主要優勢包括:1) 輕量級(goroutine開銷極小,可輕松創建數萬個);2) 內置CSP模型(通過channel安全傳遞數據,避免鎖競爭);3) 簡單易用(go關鍵字即可啟動并發,相比線程/回調更簡潔)。作用體現在提升CPU/IO密集型任務效率(如網絡服務、并行計算),通過GPM調度器智能利用多核,同時保持代碼可讀性,典型應用如高并發服務器、爬蟲、數據處理等。
goroutine channel 實現并發和并行
為什么要使用goroutine呢
需求:要統計1-10000000的數字中那些是素數,并打印這些素數?
素數:就是除了1和它本身不能被其他數整除的數
實現方法:
- 傳統方法,通過一個for循環判斷各個數是不是素數
- 使用并發或者并行的方式,將統計素數的任務分配給多個goroutine去完成,這個時候就用到了goroutine
- goroutine 結合 channel
進程、線程以及并行、并發
進程
進程(Process)就是程序在操作系統中的一次執行過程,是系統進行資源分配和調度的基本單位,進程是一個動態概念,是程序在執行過程中分配和管理資源的基本單位,每一個進程都有一個自己的地址空間。一個進程至少有5種基本狀態,它們是:初始態,執行態,等待狀態,就緒狀態,終止狀態。
通俗的講進程就是一個正在執行的程序。
線程
線程是進程的一個執行實例,是程序執行的最小單元,它是比進程更小的能獨立運行的基本單位
一個進程可以創建多個線程,同一個進程中多個線程可以并發執行 ,一個線程要運行的話,至少有一個進程
并發和并行
并發:多個線程同時競爭一個位置,競爭到的才可以執行,每一個時間段只有一個線程在執行。
并行:多個線程可以同時執行,每一個時間段,可以有多個線程同時執行。
通俗的講多線程程序在單核CPU上面運行就是并發,多線程程序在多核CUP上運行就是并行,如果線程數大于CPU核數,則多線程程序在多個CPU上面運行既有并行又有并發
Golang中協程(goroutine)以及主線程
golang中的主線程:(可以理解為線程/也可以理解為進程),在一個Golang程序的主線程上可以起多個協程。Golang中多協程可以實現并行或者并發。
協程:可以理解為用戶級線程,這是對內核透明的,也就是系統并不知道有協程的存在,是完全由用戶自己的程序進行調度的。Golang的一大特色就是從語言層面原生持協程,在函數或者方法前面加go關鍵字就可創建一個協程。可以說Golang中的協程就是goroutine。
Golang中的多協程有點類似于Java中的多線程
多協程和多線程
多協程和多線程:Golang中每個goroutine(協程)默認占用內存遠比Java、C的線程少。
OS線程(操作系統線程)一般都有固定的棧內存(通常為2MB左右),一個goroutine(協程)占用內存非常小,只有2KB左右,多協程goroutine切換調度開銷方面遠比線程要少。
這也是為什么越來越多的大公司使用Golang的原因之一。
goroutine的使用以及sync.WaitGroup
并行執行需求
在主線程(可以理解成進程)中,開啟一個goroutine,該協程每隔50毫秒秒輸出“你好golang"
在主線程中也每隔50毫秒輸出“你好golang",輸出10次后,退出程序,要求主線程和goroutine同時執行。
這是時候,我們就可以開啟協程來了,通過 go關鍵字開啟
package mainimport ("fmt""time"
)// 協程需要運行的方法
func test() {for i := 0; i < 5; i++ {fmt.Println("test 你好Golang")time.Sleep(time.Millisecond * 100)}
}func main() {// 通過go關鍵字,就可以直接開啟一個協程go test()// 這是主進程執行的for i := 0; i < 5; i++ {fmt.Println("main 你好Goland")time.Sleep(time.Millisecond * 100)}
}
上述的代碼其實還有問題的,也就是說當主進程執行完畢后,不管協程有沒有執行完成,都會退出
輸出結果:
PS D:\Microsoft VS Code\GOproject\src\go_code\chapter3\goroutine> go run .\goroutine.go
main 你好Goland
test 你好Golang
test 你好Golang
main 你好Goland
main 你好Goland
test 你好Golang
test 你好Golang
main 你好Goland
main 你好Goland
test 你好Golang
這時使用我們就需要用到 sync.WaitGroup等待協程
首先我們需要創建一個協程計數器:
// 定義一個協程計數器
var wg sync.WaitGroup
然后當我們開啟協程的時候,我們要讓計數器加1
// 開啟協程,協程計數器加1
wg.Add(1)
go tes
代碼修改:
package mainimport ("fmt""sync"
)// 定義一個協程計數器,初始化
var wg sync.WaitGroup// 協程需要運行的方法
func test() {// 這是主進程執行的for i := 0; i < 100; i++ {fmt.Println("test 你好Golang", i)// time.Sleep(time.Millisecond * 100)}// 協程計數器減1wg.Done()
}func test2() {//這是主進程執行的for i := 0; i < 100; i++ {fmt.Println("test2 你好Golang", i)}// 協程計數器減1wg.Done()
}func main() {// 通過go關鍵字,就可以直接開啟一個協程wg.Add(1)go test()//協程計數器加1wg.Add(1)go test2()//并發執行//與主Goroutinue并行運行//這是主進程執行的,主 Goroutine 執行循環for i := 0; i < 100; i++ {fmt.Println("main 你好Golang")}// 等待所有的協程執行完畢wg.Wait()fmt.Println("主線程退出")
}
使用 sync.WaitGroup
作為協程同步工具,用于等待所有 Goroutine 執行完畢。
兩個 Goroutine 函數:
test()
:循環 100 次,每次打印并休眠 100ms。test2()
:循環 100 次,僅打印。
主函數 main()
:
- 啟動兩個 Goroutine(
test
和test2
)。 - 主 Goroutine 執行自己的循環 100 次。
- 最后通過
wg.Wait()
等待所有子 Goroutine 完成。
測試輸出:
//主 Goroutine 的循環不會等待子 Goroutine,直接開始執行。輸出 main 你好Golang 的速度最快(無任何阻塞)。//并發輸出順序:
//test 和 test2 的輸出會交替出現:
//test 和 test2 各自循環 100 次,均無阻塞。
//它們的輸出 test 你好Golang X 和 test2 你好Golang X 會隨機交錯//主 Goroutine 完成自己的循環后,執行 wg.Wait(),阻塞等待 test 和 test2 完成。
//此時 test 和 test2 可能仍在執行,也可能已結束(取決于調度)。//test 和 test2 在完成各自的 100 次循環后,調用 wg.Done(),將計數器分別減 1。
//當計數器歸零時,wg.Wait() 解除阻塞。//最終輸出 主線程退出,程序終止。
流程示例:
啟動階段:
主 Goroutine 調用 wg.Add(1) 兩次,啟動 test 和 test2。
兩個子 Goroutine 開始并發執行。
并發執行階段:
主 Goroutine:快速執行 100 次 fmt.Println("main 你好Golang")(無阻塞,幾乎瞬間完成)。
子 Goroutine test:執行 100 次 fmt.Println("test 你好Golang", i)。
子 Goroutine test2:執行 100 次 fmt.Println("test2 你好Golang", i)。
輸出順序隨機,例如:
main 你好Golang
test2 你好Golang 0
test 你好Golang 0
main 你好Golang
test2 你好Golang 1
...
同步等待階段:
主 Goroutine 完成自己的循環后,調用 wg.Wait(),進入阻塞狀態。
子 Goroutine test 和 test2 繼續執行,直到完成各自的循環并調用 wg.Done()。
當計數器歸零時,主 Goroutine 解除阻塞,輸出 主線程退出。
練習例子1:
// 生產者-消費者模型
// 假設一個程序需要生成數據并處理數據,可以用兩個協程分工合作:
package mainimport ("fmt""time"
)func producer(ch chan<- int) {for i := 0; i < 5; i++ {ch <- i // 發送數據到通道fmt.Printf("生產了數據:%d\n", i)time.Sleep(time.Second) // 模擬耗時}close(ch) // 生產完畢,關閉通道
}func consumer(ch <-chan int) {for num := range ch { // 從通道接收數據fmt.Printf("消費了數據: %d\n", num)}
}func main() {ch := make(chan int)go producer(ch) // 啟動生產者協程consumer(ch) // 消費者在主協程運行
}// 生產者每秒生成一個數字,通過通道發送給消費者。
// 消費者實時接收并處理數據。
// 通道保證了生產者和消費者的同步,避免數據混亂
輸出結果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
消費了數據: 0
生產了數據:0
生產了數據:1
消費了數據: 1
消費了數據: 2
生產了數據:2
生產了數據:3
消費了數據: 3
生產了數據:4
消費了數據: 4
練習例子2:
// 例子2:多任務并行下載
// 用多個協程同時下載多個文件:
package mainimport ("fmt""sync""time"
)func download(url string, wg *sync.WaitGroup) {defer wg.Done() // 協程結束時通知WaitGroupfmt.Printf("開始下載: %s\n", url)time.Sleep(2 * time.Second) //模擬耗時fmt.Printf("下載完成: %s\n", url)
}// Add(n):計數器增加 n(每個任務啟動前+1)
// Done():計數器減1(每個協程結束時調用)
// Wait():阻塞主協程,直到計數器歸零。func main() {//初始化 sync.WaitGroup(計數器初始為0)var wg sync.WaitGroup//定義下載數據urls := []string{"file1.txt", "file2.png", "file3.jpg"}for _, url := range urls {wg.Add(1)go download(url, &wg) //啟動多個下載協程}wg.Wait() //等待所有協程完成fmt.Println("所有文件下載完成")
}// 每個文件下載任務由一個協程獨立執行。
// sync.WaitGroup 確保主協程等待所有下載完成后再結束。
// 協程的并行執行顯著縮短了總耗時
輸出結果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
開始下載: file3.jpg
開始下載: file2.png
開始下載: file1.txt
下載完成: file1.txt
下載完成: file2.png
下載完成: file3.jpg
所有文件下載完成[Done] exited with code=0 in 8.174 seconds
設置Go并行運行的時候占用的cpu數量
Go運行時的調度器使用GOMAXPROCS參數來確定需要使用多少個OS線程來同時執行Go代碼。默認值是機器上的CPU核心數。例如在一個8核心的機器上,調度器會把Go代碼同時調度到8個oS線程上。
Go 語言中可以通過runtime.GOMAXPROCS()函數設置當前程序并發時占用的CPU邏輯核心數。
Go1.5版本之前,默認使用的是單核心執行。Go1.5版本之后,默認使用全部的CPU邏輯核心數。
package mainimport ("fmt""runtime"
)func main() {// 獲取cpu個數npmCpu := runtime.NumCPU()fmt.Println("cup的個數:", npmCpu)// 設置允許使用的CPU數量runtime.GOMAXPROCS(runtime.NumCPU() - 1)
}
測試輸出:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
cup的個數: 16[Done] exited with code=0 in 1.312 seconds
for循環開啟多個協程
類似于Java里面開啟多個線程,同時執行
// 啟動10個協程并發執行,每個協程打印10次自己的編號(0到9)。
// 主線程等待所有協程結束后退出。
package mainimport ("fmt""sync"
)// 初始化全局變量
var vg sync.WaitGroupfunc test(num int) {for i := 0; i < 10; i++ {fmt.Printf("協程 (%v) 打印的第 %v 條數據 \n", num, i)}vg.Done()
}func main() {for i := 0; i < 10; i++ {vg.Add(1)go test(i) //啟動協程}vg.Wait()fmt.Println("主線程退出")
}
輸出結果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
協程 (2) 打印的第 0 條數據
協程 (2) 打印的第 1 條數據
協程 (2) 打印的第 2 條數據
協程 (2) 打印的第 3 條數據
協程 (2) 打印的第 4 條數據
協程 (2) 打印的第 5 條數據
協程 (2) 打印的第 6 條數據
協程 (2) 打印的第 7 條數據
協程 (2) 打印的第 8 條數據
協程 (2) 打印的第 9 條數據
協程 (8) 打印的第 0 條數據
協程 (1) 打印的第 0 條數據
協程 (1) 打印的第 1 條數據
協程 (1) 打印的第 2 條數據
協程 (1) 打印的第 3 條數據
協程 (1) 打印的第 4 條數據
協程 (1) 打印的第 5 條數據
協程 (1) 打印的第 6 條數據
協程 (1) 打印的第 7 條數據
協程 (1) 打印的第 8 條數據
協程 (1) 打印的第 9 條數據
協程 (7) 打印的第 0 條數據
協程 (5) 打印的第 0 條數據
協程 (5) 打印的第 1 條數據
協程 (5) 打印的第 2 條數據
協程 (5) 打印的第 3 條數據
協程 (0) 打印的第 0 條數據
協程 (0) 打印的第 1 條數據
協程 (0) 打印的第 2 條數據
協程 (0) 打印的第 3 條數據
協程 (0) 打印的第 4 條數據
協程 (0) 打印的第 5 條數據
協程 (0) 打印的第 6 條數據
協程 (0) 打印的第 7 條數據
協程 (0) 打印的第 8 條數據
協程 (0) 打印的第 9 條數據
協程 (4) 打印的第 0 條數據
協程 (4) 打印的第 1 條數據
協程 (4) 打印的第 2 條數據
協程 (4) 打印的第 3 條數據
協程 (4) 打印的第 4 條數據
協程 (4) 打印的第 5 條數據
協程 (4) 打印的第 6 條數據
協程 (4) 打印的第 7 條數據
協程 (4) 打印的第 8 條數據
協程 (4) 打印的第 9 條數據
協程 (6) 打印的第 0 條數據
協程 (6) 打印的第 1 條數據
協程 (6) 打印的第 2 條數據
協程 (6) 打印的第 3 條數據
協程 (6) 打印的第 4 條數據
協程 (6) 打印的第 5 條數據
協程 (6) 打印的第 6 條數據
協程 (6) 打印的第 7 條數據
協程 (6) 打印的第 8 條數據
協程 (6) 打印的第 9 條數據
協程 (7) 打印的第 1 條數據
協程 (7) 打印的第 2 條數據
協程 (7) 打印的第 3 條數據
協程 (7) 打印的第 4 條數據
協程 (9) 打印的第 0 條數據
協程 (9) 打印的第 1 條數據
協程 (9) 打印的第 2 條數據
協程 (9) 打印的第 3 條數據
協程 (9) 打印的第 4 條數據
協程 (9) 打印的第 5 條數據
協程 (9) 打印的第 6 條數據
協程 (9) 打印的第 7 條數據
協程 (9) 打印的第 8 條數據
協程 (9) 打印的第 9 條數據
協程 (3) 打印的第 0 條數據
協程 (3) 打印的第 1 條數據
協程 (3) 打印的第 2 條數據
協程 (3) 打印的第 3 條數據
協程 (3) 打印的第 4 條數據
協程 (3) 打印的第 5 條數據
協程 (3) 打印的第 6 條數據
協程 (3) 打印的第 7 條數據
協程 (3) 打印的第 8 條數據
協程 (3) 打印的第 9 條數據
協程 (7) 打印的第 5 條數據
協程 (7) 打印的第 6 條數據
協程 (8) 打印的第 1 條數據
協程 (8) 打印的第 2 條數據
協程 (8) 打印的第 3 條數據
協程 (8) 打印的第 4 條數據
協程 (8) 打印的第 5 條數據
協程 (8) 打印的第 6 條數據
協程 (8) 打印的第 7 條數據
協程 (8) 打印的第 8 條數據
協程 (8) 打印的第 9 條數據
協程 (5) 打印的第 4 條數據
協程 (5) 打印的第 5 條數據
協程 (5) 打印的第 6 條數據
協程 (5) 打印的第 7 條數據
協程 (5) 打印的第 8 條數據
協程 (5) 打印的第 9 條數據
協程 (7) 打印的第 7 條數據
協程 (7) 打印的第 8 條數據
協程 (7) 打印的第 9 條數據
主線程退出[Done] exited with code=0 in 1.205 seconds
Channel管道
管道是Golang在語言級別上提供的goroutine間的通訊方式,我們可以使用channel在多個goroutine之間傳遞消息。如果說goroutine是Go程序并發的執行體,channel就是它們之間的連接。channel是可以讓一個goroutine發送特定值到另一個goroutine的通信機制。
Golang的并發模型是CSP(Communicating Sequential Processes),提倡通過通信共享內存而不是通過共享內存而實現通信。
Go語言中的管道(channel)是一種特殊的類型。管道像一個傳送帶或者隊列,總是遵循先入先出(First In First Out)的規則,保證收發數據的順序。每一個管道都是一個具體類型的導管,也就是聲明channel的時候需要為其指定元素類型。
channel類型
channel是一種類型,一種引用類型。聲明管道類型的格式如下:
// 聲明一個傳遞整型的管道
var ch1 chan int
// 聲明一個傳遞布爾類型的管道
var ch2 chan bool
// 聲明一個傳遞int切片的管道
var ch3 chan []int
創建channel
聲明管道后,需要使用make函數初始化之后才能使用
make(chan 元素類型, 容量)
舉例如下:
// 創建一個能存儲10個int類型的數據管道
ch1 = make(chan int, 10)
// 創建一個能存儲4個bool類型的數據管道
ch2 = make(chan bool, 4)
// 創建一個能存儲3個[]int切片類型的管道
ch3 = make(chan []int, 3)
channel操作
管道有發送,接收和關閉的三個功能
發送和接收 都使用 <- 符號
現在我們先使用以下語句
定義一個管道:
ch := make(chan int, 3)
發送
將數據放到管道內,將一個值發送到管道內
// 把10發送到ch中
ch <- 10
取操作
x := <- ch
關閉管道
通過調用內置的close函數來關閉管道
close(ch)
完整例子
package mainimport ("fmt""time"
)func main() {//創建管道,容量為2ch := make(chan int, 2)//啟動子協程持續讀取go func() {for num := range ch {fmt.Printf("讀取到數據:%d (剩余長度: %d)", num, len(ch))time.Sleep(time.Second)}}()//主協程寫入數據ch <- 1fmt.Println("寫入1,當前長度為:", len(ch))ch <- 2fmt.Println("寫入2,當前長度為:", len(ch))//主協程等待3秒后關閉管道time.Sleep(3 * time.Second)close(ch)for num := range ch { // 嘗試讀取所有剩余數據fmt.Println("關閉后讀取:", num)}fmt.Println("主協程退出")
}
輸出結果:
寫入1,當前長度為: 1 // 主協程順序輸出
讀取到數據:1 (剩余長度: 0) // 子協程第一次讀取,立即搶占執行
寫入2,當前長度為: 1 // 主協程繼續寫入,此時子協程還未讀2,所以長度暫時為1
讀取到數據:2 (剩余長度: 0) // 子協程在下一輪循環讀取2
主協程退出 // 管道關閉后無殘留數據,循環不執行
疑問解答
1. 為什么 寫入2
時的長度是1?
- 主協程寫入1
- 管道長度變為
1
。 fmt.Println
立即打印這個值。- 子協程被調度,讀取數據1,長度變為
0
。
- 管道長度變為
- 主協程寫入2
- 管道長度再次變為
1
。 fmt.Println
打印時的長度是基于寫入后的當前狀態(后續被子協程讀取才會減為0)。
- 管道長度再次變為
2. 為什么關閉管道后沒有輸出?
- 子協程在
time.Sleep
之前已經讀取所有數據- 寫入1和2后,主協程休眠3秒。
- 子協程期間完成了兩次讀取(兩次
time.Sleep
總和為2秒),剩余1秒還在休眠。 - 主協程休眠結束后關閉管道,此時管道已空。
for num := range ch
直接跳過,不輸出任何內容。
for range從管道循環取值
當向管道中發送完數據時,我們可以通過close函數來關閉管道,當管道被關閉時,再往該管道發送值會引發panic,從該管道取值的操作會去完管道中的值,再然后取到的值一直都是對應類型的零值。那如何判斷一個管道是否被關閉的呢?
未關閉管道——導致死鎖
package mainimport ("fmt"
)func main() {//定義管道,容量為10ch := make(chan int, 10)// 存入10個值(管道緩沖填滿)for i := 0; i < 10; i++ {ch <- i}// 嘗試從管道中讀取數據(無其他協程寫入)for value := range ch {// 問題所在:未關閉管道,循環會持續等待新數據fmt.Println(value)}// 所有數據讀完后,主協程阻塞在此處,觸發死鎖
}
測試結果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
0
1
2
3
4
5
6
7
8
9
fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan receive]:
main.main()d:/Microsoft VS Code/GOproject/src/go_code/goroutine/test1.go:12 +0xb9
exit status 2[Done] exited with code=1 in 1.355 seconds
原因:管道中有10個值,for-range
會循環讀取完所有值后繼續等待新的數據。由于管道未關閉,且沒有其他協程向管道寫數據,主協程會永久阻塞,觸發死鎖。
正確關閉管道——避免死鎖
package mainimport ("fmt"
)func main() {//定義管道,容量為10ch := make(chan int, 10)// 存入10個值(管道緩沖填滿)for i := 0; i < 10; i++ {ch <- i}// 關鍵操作:告知接收方管道已不再寫入close(ch)for value := range ch {// 讀取現存數據后自動退出循環fmt.Println(value)}fmt.Println("管道已關閉且數據讀取完畢,程序正常退出")
}
輸出結果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
0
1
2
3
4
5
6
7
8
9
管道已關閉且數據讀取完畢,程序正常退出[Done] exited with code=0 in 1.308 seconds
原因:管道關閉后,for-range
會遍歷所有已存儲的數據,并在讀取完最后一個數據后自動退出循環,不會陷入無限等待。
核心總結
操作 | 是否關閉管道 | 結果 | 原理 |
---|---|---|---|
for-range 讀取 | 關閉 | ? 正常退出 | 關閉后,讀完已有的數據即退出循環,不會等待新數據。 |
for-range 讀取 | 未關閉 | ? 死鎖 | 持續等待新數據,但無其他協程寫入,主協程永久阻塞。 |
goroutine 結合 channel 管道
需求1:定義兩個方法,一個方法給管道里面寫數據,一個給管道里面讀取數據。要求同步進行。
- 開啟一個fn1的的協程給向管道inChan中寫入10條數據
- 開啟一個fn2的協程讀取inChan中寫入的數據
- 注意:fn1和fn2同時操作一個管道
- 主線程必須等待操作完成后才可以退出
管道是安全的,是一邊寫入,一邊讀取,當讀取比較快的時候,會等待寫入
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroup // 同步原語:等待組func write(ch chan int) {// 生產者:循環向通道寫入10個數據for i := 0; i < 10; i++ {fmt.Println("寫入:", i)ch <- i// 控制寫入速度time.Sleep(time.Millisecond * 10)}close(ch)// 完成一個任務-計數器減1wg.Done()
}func read(ch chan int) {// 通道關閉后自動退出循環for v := range ch {fmt.Println("讀取:", v)time.Sleep(time.Millisecond * 10)}wg.Done()
}func main() {// 初始化一個容量為10的緩沖通道ch := make(chan int, 10)// 啟動寫協程(生產者)wg.Add(1)go write(ch)// 啟動讀協程(消費者)wg.Add(1)go read(ch)// 等待所有協程完成(計數器歸零)wg.Wait()fmt.Println("主線程執行完畢")
}
該程序實現了一個生產者-消費者模型,使用帶緩沖的通道(channel)、協程(goroutine)和等待組(sync.WaitGroup)來實現并發控制。
輸出結果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
寫入: 0
讀取: 0
寫入: 1
讀取: 1
寫入: 2
讀取: 2
寫入: 3
讀取: 3
寫入: 4
讀取: 4
寫入: 5
讀取: 5
寫入: 6
讀取: 6
寫入: 7
讀取: 7
寫入: 8
讀取: 8
寫入: 9
讀取: 9
主線程執行完畢[Done] exited with code=0 in 6.345 seconds
goroutine 結合 channel打印素數
package mainimport ("fmt""math""sync"
)var wg sync.WaitGroup// 向intChan中放入 1~ 120000個數
func putNum(inChan chan int) {for i := 2; i < 12000; i++ { // 從2開始,0和1不是素數inChan <- i}close(inChan)wg.Done()
}// 從intChan取出數據,并判斷是否為素數,
// 如果是的話,就把得到的素數放到primeChan中
func primeNum(inChan chan int, primeChan chan int, exitChan chan bool) {for value := range inChan {var flag = true// 不是素數的情況if value <= 1 {flag = false} else {for i := 2; i <= int(math.Sqrt(float64(value))); i++ {if value%i == 0 { // 修正這里flag = falsebreak}}}if flag {// 是素數的話primeChan <- value}}exitChan <- truewg.Done()
}// 打印素數
func printPrime(primeChan chan int) {for value := range primeChan {fmt.Println(value)}wg.Done()
}func main() {// 寫入數字intChan := make(chan int, 1000)// 存放素數primeChan := make(chan int, 1000)// 存放 primeChan退出狀態exitChan := make(chan bool, 10) // 改為10,與primeNum協程數量一致// 開啟寫的協程wg.Add(1)go putNum(intChan)// 開啟計算素數的協程for i := 0; i < 10; i++ {wg.Add(1)go primeNum(intChan, primeChan, exitChan)}// 開啟打印協程wg.Add(1)go printPrime(primeChan)// 等待所有primeNum協程完成wg.Add(1)go func() {for i := 0; i < 10; i++ { // 改為10,與primeNum協程數量一致<-exitChan}close(primeChan)wg.Done()}()wg.Wait()fmt.Println("主線程執行完畢")
}
輸出結果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
2
3
5
7
11
13
17
19
23
29
31
37
41
43
47
...
...
11941
11953
11959
11117
11969
11971
11987
11981
主線程執行完畢[Done] exited with code=0 in 1.263 seconds
代碼詳細講解:
通道定義
go
復制
intChan := make(chan int, 1000) // 緩沖數字(生產協程 -> 計算協程)
primeChan := make(chan int, 1000) // 緩沖素數結果(計算協程 -> 打印協程)
exitChan := make(chan bool, 10) // 記錄計算協程完成狀態(協助關閉primeChan)
核心函數
函數名 | 作用 | 協程類型 |
---|---|---|
putNum | 向 intChan 中寫入2~11999的數字 | 生產協程 |
primeNum | 從 intChan 讀取數字并判斷是否為素數,結果寫入 primeChan | 計算協程 |
printPrime | 從 primeChan 讀取并打印所有素數 | 打印協程 |
匿名函數 | 監控所有計算協程的完成狀態,隨后關閉 primeChan | 協調協程 |
協程啟動順序
生產協程 (putNum
):首先啟動,填充 intChan
。
10個計算協程 (primeNum
):并發啟動,消費 intChan
。
打印協程 (printPrime
):持續消費 primeChan
,實時輸出素數。
協調協程:等待所有 primeNum
協程完成后關閉 primeChan
。
wg.Add(1)
go putNum(intChan) // 啟動生產協程for i := 0; i < 10; i++ { // 啟動10個計算協程wg.Add(1)go primeNum(...)
}wg.Add(1)
go printPrime(primeChan) // 啟動打印協程wg.Add(1)
go func() { ... }() // 啟動協調協程
同步機制
-
sync.WaitGroup
:總計數器為
1(putNum) + 10(primeNum) + 1(printPrime) + 1(協調協程) = 13
- 每個協程結束后調用
wg.Done()
,主線程通過wg.Wait()
等待所有協程退出。
- 每個協程結束后調用
-
exitChan
的設計- 每個計算協程 (
primeNum
) 結束后發送一個true
到exitChan
。 - 協調協程讀取10次
exitChan
后觸發primeChan
關閉,終止printPrime
協程。
- 每個計算協程 (
func primeNum(...) {for value := range inChan {flag := trueif value <= 1 { flag = false } else {// 計算范圍優化:只需檢查到 sqrt(value)sqrtVal := int(math.Sqrt(float64(value)))for i := 2; i <= sqrtVal; i++ {if value%i == 0 {flag = falsebreak}}}if flag {primeChan <- value // 素數進入結果通道}}
}
單向管道
有時候我們會將管道作為參數在多個任務函數間傳遞,很多時候我們在不同的任務函數中,使用管道都會對其進行限制,比如限制管道在函數中只能發送或者只能接受
默認的管道是 可讀可寫
單向管道的作用:
chan<- int
:表示只能向該管道寫入數據(如producer
函數)。<-chan int
:表示只能從該管道讀取數據(如consumer
函數)。- Go編譯器會檢查單向管道的違規操作(如嘗試在只寫管道中讀取數據會直接報錯)
練習一下:
package mainimport ("fmt""time"
)// 生產函數
func producer(writeChan chan<- int) {for i := 0; i < 5; i++ {fmt.Printf("生產者發送:%d\n", i)writeChan <- itime.Sleep(time.Second)}close(writeChan)
}// 消費函數
func consumer(readChan <-chan int) {for num := range readChan {fmt.Printf("消費者收到: %d\n", num)}
}func main() {//創建一個雙向管道,容量為3ch := make(chan int, 3)// 啟動生產者協程(將雙向管道轉為只寫管道傳入)go producer(ch)consumer(ch)// 主協程作為消費者(將雙向管道轉為只讀管道傳入)fmt.Println("程序結束")
}
測試輸出:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
生產者發送:0
消費者收到: 0
生產者發送:1
消費者收到: 1
生產者發送:2
消費者收到: 2
生產者發送:3
消費者收到: 3
生產者發送:4
消費者收到: 4
程序結束[Done] exited with code=0 in 11.354 seconds
Select多路復用
在某些場景下我們需要同時從多個通道接收數據。這個時候就可以用到golang中給我們提供的select多路復用。
通常情況通道在接收數據時,如果沒有數據可以接收將會發生阻塞。
package mainimport ("fmt""math/rand""time"
)func main() {// 創建通道:數據通道(無緩沖)、超時通道(3秒后觸發)dataChan := make(chan int)// timeoutChan 在3秒后會收到一個時間對象timeoutChan := time.After(3 * time.Second)//啟動生產者協程:數據通道(無緩沖)、超時通道(3秒后超時)go func() {// 生成隨機延遲時間(0~4秒區間)sleepTime := time.Duration(rand.Intn(5)) * time.Second// 模擬耗時操作(如網絡請求、IO操作)time.Sleep(sleepTime)// 向數據通道發送計算結果dataChan <- 42}()fmt.Println("等待數據或超時(最多3秒)...")//select多路復用機制,優先級:同時就緒時隨機選擇select {case data := <-dataChan: // 接收到數據時的處理fmt.Printf("收到數據: %d\n", data)// 可以在該分支補充其他業務邏輯,如數據處理case <-timeoutChan:fmt.Println("超時,未收到數據!")// 可在此設置重試/寫日志/返回錯誤狀態等}fmt.Println("程序結束")
}
多路復用機制:
select
同時監聽多個case
操作(如dataChan
和timeoutChan
)。- 當 任一通道就緒(有數據可讀或可寫)時,對應
case
執行。 - 如果 多個通道同時就緒,Go 會隨機選擇一個執行。
測試輸出:
//測試1
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
等待數據或超時(最多3秒)...
超時,未收到數據!
程序結束[Done] exited with code=0 in 9.378 seconds//測試2
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
等待數據或超時(最多3秒)...
收到數據: 42
程序結束[Done] exited with code=0 in 8.286 seconds
tip:使用select來獲取數據的時候,不需要關閉chan,不然會出現問題
Goroutine Recover解決協程中出現的Panic
Go語言中,如果一個協程發生未捕獲的 panic
,會導致 整個程序崩潰退出(即使其他協程正常)。如果不處理這一問題,在處理關鍵任務的系統中,單點故障可能引發級聯崩潰。
核心機制
Go語言通過兩個層級處理協程外的錯誤隔離:
協程間隔離:默認情況下,協程之間的Panic相互獨立
全局進程級防護:任意未恢復的Panic都會導致整個程序終止
無Recover的危害案例
假設以下業務場景:
- 服務器處理用戶請求 → 分發到10個協程處理
- 金融數處理 → 其中一個協程觸發除零錯誤(division by zero)
- 系統后果 → 所有請求處理中斷,服務完全崩潰
災難性后果:單點破壞全局穩定,LOGO投資人血本無歸
Recover機制生效原理
必須滿足 三個核心條件:
條件項 | 作用描述 | 典型錯誤應用場景 |
---|---|---|
位于defer函數內部 | Go要求Recover只能在延遲函數中觸發 | 直接在主流程調用recover() 無效 |
正確捕獲層級 | 只能在發生Panic的函數棧內 | 跨協程直接調用Recover無效 |
主動觸發機制 | 需通過邏輯主動觸犯Panic事件 | – |
通過defer結合recover函數,可以在發生異常時進行恢復操作,并獲取異常信息進行處理。這有助于程序的健壯性和錯誤處理?。
package mainimport ("fmt""math/rand""strings""sync""time"
)// 情況:需要并發處理10個文件,某個文件處理可能引發Panic
// 目標:單個文件處理失敗不能影響其他文件,需記錄錯誤信息// 文件處理函數(隨機觸發Panic)
func ProcessFile(filename string, wg *sync.WaitGroup) {defer wg.Done() // 確保資源釋放//核心:每個協程必須包含自己的recover機制//通過 defer 和 recover 實現協程級別的錯誤恢復機制defer func() {if err := recover(); err != nil {fmt.Printf("[Error]文件 %s 處理失敗: %v\n", filename, err)}}()//如果當前協程中發生任何未捕獲的 panic(如空指針、索引越界等錯誤)//這段代碼會 攔截異常 并打印錯誤信息,而非直接退出整個程序。// 模擬文件處理時隨機出現異常if rand.Intn(10) < 3 { // 30%概率觸發異常panic("文件格式解析失敗:非標準文件結構")}//正常處理邏輯fmt.Printf("正在處理文件: %s\n", filename)time.Sleep(500 * time.Millisecond)fmt.Printf("完成處理文件:%s\n", filename)
}// 打印分割線的工具函數
func PrintDivider() {fmt.Println("\n" + strings.Repeat("=", 50) + "\n")
}func main() {rand.Intn(1000)var wg sync.WaitGroupfiles := []string{"用戶數據.xlsx","機密文檔.zip","圖片1.jpg","損壞文件.bin","備份.tar.gz",}PrintDivider()fmt.Println("開始并發文件處理流程(安全模式)")// 批量啟動文件處理協程for _, f := range files {wg.Add(1)go ProcessFile(f, &wg)}wg.Wait()PrintDivider()fmt.Println("所有文件處理流程結束(包含成功和失敗的任務)")
}
輸出結果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"==================================================開始并發文件處理流程(安全模式)
正在處理文件: 用戶數據.xlsx
正在處理文件: 備份.tar.gz
正在處理文件: 圖片1.jpg
[Error]文件 機密文檔.zip 處理失敗: 文件格式解析失敗:非標準文件結構
正在處理文件: 損壞文件.bin
完成處理文件:損壞文件.bin
完成處理文件:圖片1.jpg
完成處理文件:用戶數據.xlsx
完成處理文件:備份.tar.gz==================================================所有文件處理流程結束(包含成功和失敗的任務)[Done] exited with code=0 in 7.048 seconds
┌────────────────┐│ 協程開始運行 │└───┬───────────┬┘│ 業務代碼 │▼ ║ 發生錯誤(panic) ║ ║ ▼ 觸發流程中斷 正常結束╚═════════?┌─────────┐│ │ 正常結束 │┌─────▼─────┐ └─────────┘│ 執行defer鏈 │┌───────┤嘗試調用recover├───┐▼ └─────┬───┬───┘ ▼
無錯誤 捕獲到錯誤 → 記錄并恢復執行▼ 顯示錯誤后步驟繼續正常結束
通過在協程內部實現 panic
恢復,確保了程序的健壯性和業務的連續性,是現代高并發系統設計的重要實踐。
Go中的并發安全和互斥鎖
如下面一段代碼,我們在并發環境下進行操作,就會出現并發訪問的問題
var count = 0
var wg sync.WaitGroupfunc test() {count++fmt.Println("the count is : ", count)time.Sleep(time.Millisecond)wg.Done()
}
func main() {for i := 0; i < 20; i++ {wg.Add(1)go test()}time.Sleep(time.Second * 10)
}
輸出結果:
[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
the count is : 1
the count is : 7
the count is : 2
the count is : 3
the count is : 4
the count is : 6
the count is : 9
the count is : 10
the count is : 11
the count is : 17
the count is : 12
the count is : 13
the count is : 8
the count is : 14
the count is : 15
the count is : 5
the count is : 18
the count is : 19
the count is : 20
the count is : 16[Done] exited with code=0 in 11.013 seconds
最終結果不穩定。
互斥鎖
互斥鎖是傳統并發編程中對共享資源進行訪問控制的主要手段,它由標準庫sync中的Mutex結構體類型表示。sync.Mutex類型只有兩個公開的指針方法,Lock和Unlock。Lock鎖定當前的共享資源,Unlock 進行解鎖
// 定義一個鎖
var mutex sync.Mutex
// 加鎖
mutex.Lock()
// 解鎖
mutex.Unlock()
使用互斥鎖:
package mainimport ("fmt""sync""time"
)// 共享資源
var count = 0// 同步控制組合
var (wg sync.WaitGroup // 等待組用于等待所有goroutine完成mutex sync.Mutex // 互斥鎖用于保護count的原子操作
)func safeIncrement() {// 確保無論是否panic都會標記完成defer wg.Done()// Step 1. 加鎖保護臨界區mutex.Lock()// 延遲解鎖確保即使在panic情況下也能解鎖defer mutex.Unlock()// Step 2. 安全操作資源temp := counttemp++ // 模擬操作耗時(這里只是示例,實際應直接count++)time.Sleep(1 * time.Millisecond) // 故意放大并發問題效果count = temp// Step 3. 安全讀取資源(仍處于鎖保護中)fmt.Printf("安全更新后的值:%d\n", count)// 注意:所有對共享資源count的操作都在鎖的保護范圍內
}func main() {//啟動20個并發操作for i := 0; i < 20; i++ {wg.Add(1)go safeIncrement()}// 正確等待方式(替換原來的Sleep猜測等待)wg.Wait()// 驗證最終結果fmt.Println("\n最終結果:", count) // 現在一定會輸出20
}
關鍵點
互斥鎖的作用:確保同一時間只有一個goroutine能訪問count
,防止數據競爭
等待組的作用:確保主goroutine等待所有工作goroutine完成
defer的使用:確保鎖一定會被釋放,即使發生panic
測試結果:
main()函數
啟動20個并發goroutine調用safeIncrement()
使用wg.Wait()等待所有goroutine完成
打印最終結果[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
安全更新后的值:1
安全更新后的值:2
安全更新后的值:3
安全更新后的值:4
安全更新后的值:5
安全更新后的值:6
安全更新后的值:7
安全更新后的值:8
安全更新后的值:9
安全更新后的值:10
安全更新后的值:11
安全更新后的值:12
安全更新后的值:13
安全更新后的值:14
安全更新后的值:15
安全更新后的值:16
安全更新后的值:17
安全更新后的值:18
安全更新后的值:19
安全更新后的值:20最終結果: 20[Done] exited with code=0 in 1.003 seconds
safeIncrement()
函數
- 使用
defer wg.Done()
確保函數退出時通知等待組 - 獲取互斥鎖
mutex.Lock()
并defer mutex.Unlock()
確保鎖會被釋放 - 安全地讀取、修改并寫回
count
值 - 打印當前計數值
main()
函數
- 啟動20個并發goroutine調用
safeIncrement()
- 使用
wg.Wait()
等待所有goroutine完成 - 打印最終結果
輸出流程
初始化階段:
count
初始化為0- 創建20個goroutine,每個都調用
safeIncrement()
并發執行階段:
- 每個goroutine嘗試獲取互斥鎖
- 只有一個goroutine能成功獲取鎖,其他被阻塞
- 獲取鎖的goroutine:
- 讀取
count
值到臨時變量 - 增加臨時變量值
- 短暫睡眠(1ms)
- 將新值寫回
count
- 打印當前值
- 釋放鎖
- 讀取
- 下一個goroutine獲取鎖并重復上述過程
完成階段:
- 所有goroutine完成后,
wg.Wait()
解除阻塞 - 打印最終結果
20
讀寫互斥鎖
互斥鎖的本質是當一個goroutine訪問的時候,其他goroutine都不能訪問。這樣在資源同步,避免競爭的同時也降低了程序的并發性能。程序由原來的并行執行變成了串行執行。
其實,當我們對一個不會變化的數據只做“讀”操作的話,是不存在資源競爭的問題的。因為數據是不變的,不管怎么讀取,多少goroutine同時讀取,都是可以的。
所以問題不是出在“讀”上,主要是修改,也就是“寫”。修改的數據要同步,這樣其他goroutine才可以感知到。所以真正的互斥應該是讀取和修改、修改和修改之間,讀和讀是沒有互斥操作的必要的。
因此,衍生出另外一種鎖,叫做讀寫鎖。
讀寫鎖可以讓多個讀操作并發,同時讀取,但是對于寫操作是完全互斥的。也就是說,當一個goroutine進行寫操作的時候,其他goroutine既不能進行讀操作,也不能進行寫操作。
會輸出20
}
**關鍵點****互斥鎖的作用**:確保同一時間只有一個goroutine能訪問`count`,防止數據競爭**等待組的作用**:確保主goroutine等待所有工作goroutine完成**defer的使用**:確保鎖一定會被釋放,即使發生panic測試結果:~~~go
main()函數
啟動20個并發goroutine調用safeIncrement()
使用wg.Wait()等待所有goroutine完成
打印最終結果[Running] go run "d:\Microsoft VS Code\GOproject\src\go_code\goroutine\test1.go"
安全更新后的值:1
安全更新后的值:2
安全更新后的值:3
安全更新后的值:4
安全更新后的值:5
安全更新后的值:6
安全更新后的值:7
安全更新后的值:8
安全更新后的值:9
安全更新后的值:10
安全更新后的值:11
安全更新后的值:12
安全更新后的值:13
安全更新后的值:14
安全更新后的值:15
安全更新后的值:16
安全更新后的值:17
安全更新后的值:18
安全更新后的值:19
安全更新后的值:20最終結果: 20[Done] exited with code=0 in 1.003 seconds
safeIncrement()
函數
- 使用
defer wg.Done()
確保函數退出時通知等待組 - 獲取互斥鎖
mutex.Lock()
并defer mutex.Unlock()
確保鎖會被釋放 - 安全地讀取、修改并寫回
count
值 - 打印當前計數值
main()
函數
- 啟動20個并發goroutine調用
safeIncrement()
- 使用
wg.Wait()
等待所有goroutine完成 - 打印最終結果
輸出流程
初始化階段:
count
初始化為0- 創建20個goroutine,每個都調用
safeIncrement()
并發執行階段:
- 每個goroutine嘗試獲取互斥鎖
- 只有一個goroutine能成功獲取鎖,其他被阻塞
- 獲取鎖的goroutine:
- 讀取
count
值到臨時變量 - 增加臨時變量值
- 短暫睡眠(1ms)
- 將新值寫回
count
- 打印當前值
- 釋放鎖
- 讀取
- 下一個goroutine獲取鎖并重復上述過程
完成階段:
- 所有goroutine完成后,
wg.Wait()
解除阻塞 - 打印最終結果
20