文章目錄
- 1. 為什么要用協程?
- 1.1 進程與線程
- 1.2 協程
- 1.3 線程和協程的區別
- 線程
- 協程
- 1.4 Go 協程(goroutines)和協程(coroutines)
- 2.Go 協程基本內容
- 2.1 channel
- 2.2 select
- 2.3 future 模式
- 3. 實踐示例
- 3.1 并發處理多個網絡請求
- 3.2 工作池(Worker Pool)
- 3.3 協程使用最佳實踐
- 4. 并發安全
- 4.1 happens-before 原則
- 4.2 并發安全類型
- 5. GMP 模型
- 5.1 GMP
- 5.2 協程數量
- 6. 協程池 ants
- 6.1 特性
- 6.2 運行流程
- 參考文獻
不要通過共享內存來通信,而通過通信來共享內存。
1. 為什么要用協程?
線程先出現還是協程先出現?
1.1 進程與線程
進程是對運行時程序的封裝,是系統進行資源調度和分配的的基本單位,實現了操作系統的并發。
線程是進程的子任務,作為CPU調度和分配的基本單元,它們既能確保程序實時響應,又實現了進程內部的并發執行。同時,線程也是操作系統能夠識別的最小執行和調度單位。
用戶態線程和內核態線程是操作系統層面的概念,而CPU線程數屬于硬件層面的邏輯概念。CPU線程數指的是單個物理核心通過超線程技術虛擬出的邏輯核心數量,每個物理核心最多支持兩個線程,因此系統顯示的線程總數不會超過物理核心數的兩倍。
1.2 協程
協程是早于線程出現的。協程哪怕沒有操作系統干預也可以實現,畢竟任何編程語言自身就能夠實現這個結構。早期的多任務大多來自于此。
協程是非搶占式多任務,線程是搶占式多任務。
協程需要編寫代碼者主動讓出控制權,而線程可以無需規劃讓出控制權的時間點。
協程實現在用戶態,線程實現在內核態。
以函數為例:
void func() {print("a")print("b")print("c")
}def A():co = func() # 得到該協程print("in function A") # do something
void func() {print("a")yield #java中的yield是請求釋放cpu資源print("b")yieldprint("c")
}def A():co = func() # 得到該協程next(co) # 調用協程print("in function A") # do somethingnext(co) # 再次調用該協程
線程也可以被暫停,操作系統保存線程運行狀態然后去調度其它線程,此后該線程再次被分配CPU時還可以繼續運行,就像沒有被暫停過一樣。只不過線程的調度是操作系統實現的,這些對程序員都不可見,而協程是在用戶態實現的,對程序員可見。這就是為什么有的人說可以把協程理解為用戶態線程的原因。
1.3 線程和協程的區別
線程
- 共享變量(解決了通訊麻煩的問題,但是對于變量的訪問需要加鎖)
- 調度由操作系統完成
- 一個進程可以有多個線程,每個線程會共享父進程的資源(創建線程開銷占用比進程小很多,可創建的數量也會很多)
- 通訊除了可使用進程間通訊的方式,還可以通過共享內存的方式進行通信(通過共享內存通信比通過內核要快很多)
- 線程的使用會給系統帶來上下文切換的額外負擔。
協程
- 調度完全由用戶控制
- 一個線程(進程)可以有多個協程
- 每個線程(進程)循環按照指定的任務清單順序完成不同的任務(當任務被堵塞時,執行下一個任務;當恢復時,再回來執行這個任務;任務間切換只需要保存任務的上下文,沒有內核的開銷,可以不加鎖的訪問全局變量)
- 協程需要保證是非堵塞的且沒有相互依賴
- 協程基本上不能同步通訊,多采用異步的消息通訊,效率比較高
1.4 Go 協程(goroutines)和協程(coroutines)
應用程序在運行時會創建進程,這些進程獨立運行但共享內存空間。通過多線程技術,程序能夠同時處理多個請求,從而顯著提升性能。不過,多線程訪問共享內存時容易出現競態條件問題。Go語言通過輕量級的協程(goroutines)和通信機制(channels)來實現并發,避免了直接的內存共享,有效降低了系統復雜度。
協程在多線程間靈活調度,實現高效的并發處理,同時管理棧內存,適合處理大量任務。Go 的并發模型支持確定性并發,通過go關鍵字啟動協程,其棧動態伸縮,無需開發者干預。
在其他語言中,比如 C#,Lua 或者 Python 都有協程的概念。這個名字表明它和 Go協程有些相似,不過有兩點不同:
- Go 協程意味著并行(或者可以以并行的方式部署),協程一般來說不是這樣的
- Go 協程通過通道來通信;協程通過讓出和恢復操作來通信。
性能對比(協程 vs 線程):
特性 | Goroutine | OS Thread |
---|---|---|
創建開銷 | ~2KB 內存,微秒級 | ~1MB 內存,毫秒級 |
切換成本 | 用戶態調度,納秒級 | 內核態切換,微秒級 |
最大數量 | 輕松支持 10 萬+ | 通常限制在數千 |
協程由 Go 運行時調度(GMP 模型),在用戶態實現高效并發,適合 I/O 密集型任務。計算密集型任務需結合 runtime.GOMAXPROCS
控制線程數。
2.Go 協程基本內容
2.1 channel
- 在任何給定時間,一個數據被設計為只有一個協程可以對其訪問,所以不會發生數據競爭。 數據的所有權(可以讀寫數據的能力)也因此被傳遞。
- 通道的發送和接收都是原子操作。
- 發送操作(協程或者函數中的),在接收者準備好之前是阻塞的;如果通道中沒有數據,接收者就阻塞。使用帶緩沖通道可以實現異步非阻塞。
var send_only chan<- int // 只接收通道,無法關閉
var recv_only <-chan int // 只發送通道func main() {ch := make(chan string)buf := 100channelWithBuffer := make(chan string, buf) #帶緩沖通道go sendMsg(ch)go receiveMsg(ch)time.Sleep(1e9)
}func sendMsg(ch chan string) {ch <- "hello"ch <- "world"
}func receiveMsg(ch chan string) {var msg stringfor {msg = <-chfmt.Println(msg)}
}
- 如果在程序結束之前,向通道寫值的協程未完成工作,則這個協程不會被垃圾回收。
一個通道被其發送數據協程隊列和接收數據協程隊列中的所有協程引用著。因此,如果一個通道的這兩個隊列只要有一個不為空(可達),則此通道肯定不會被垃圾回收。另一方面,如果一個協程處于一個通道的某個協程隊列之中,則此協程也肯定不會被垃圾回收,即使此通道僅被此協程所引用。一個協程只有在退出后才能被垃圾回收。
- 通道在創建后(通常使用make函數)會持有一定量的內存。只有在以下兩種情況下,該內存才會被釋放:
- 通道關閉并且沒有其他引用(包括發送和接收操作)。
- 通道變得不可達。
如何優雅的關閉通道可參考這里。
2.2 select
select 是 Go 語言中處理多通道操作的核心控制結構,專為并發編程設計。它允許 goroutine 同時等待多個通道操作,類似于 switch 語句,但專門用于通道(channel)。
select {case u := <-ch1:...case v := <-ch2:......default: // no value ready to be received...
}
select 做的就是:選擇處理列出的多個通信情況中的一個。
- 如果都阻塞了,會等待直到其中一個可以處理。
- 如果多個可以處理,隨機選擇一個。
- 如果沒有通道操作可以處理并且寫了 default 語句,它就會執行:default 永遠是可運行的。
- 如果沒有 default,select 就會一直阻塞。
2.3 future 模式
Future 模式是一種并發設計模式,它允許你啟動一個異步任務并立即返回一個"占位符"(Future對象),你可以在稍后需要結果時從這個對象中獲取計算結果。
Go 語言中沒有內置的 Future 類型,但可以通過 goroutine 和 channel 輕松實現。
func InverseProduct(a Matrix, b Matrix) {a_inv_future := InverseFuture(a) // start as a goroutineb_inv_future := InverseFuture(b) // start as a goroutinea_inv := <-a_inv_futureb_inv := <-b_inv_futurereturn Product(a_inv, b_inv)
}func InverseFuture(a Matrix) chan Matrix {future := make(chan Matrix)go func() {future <- Inverse(a)}()return future
} public static Future<Matrix> inverseFuture(Matrix matrix) { ExecutorService executor = Executors.newSingleThreadExecutor(); Callable<Matrix> task = () -> Matrix.Inverse(matrix); Future<Matrix> future = executor.submit(task); // 注意:通常不關閉executor,因為這里只是單個任務 // 但如果你有一個共享的executor,你可能需要管理它的生命周期 return future; } public static Matrix inverseProduct(Matrix a, Matrix b) throws InterruptedException, ExecutionException { Future<Matrix> aInvFuture = inverseFuture(a); Future<Matrix> bInvFuture = inverseFuture(b); Matrix aInv = aInvFuture.get(); // 等待獲取a的逆 Matrix bInv = bInvFuture.get(); // 等待獲取b的逆 return Matrix.Product(aInv, bInv); }
3. 實踐示例
3.1 并發處理多個網絡請求
需求:同時請求多個 API 接口,匯總結果后繼續處理。
package mainimport ("fmt""io/ioutil""net/http""sync""time"
)func main() {urls := []string{"https://jsonplaceholder.typicode.com/posts/1","https://jsonplaceholder.typicode.com/posts/2","https://jsonplaceholder.typicode.com/posts/3",}// 使用 WaitGroup 等待所有協程完成var wg sync.WaitGroupresults := make(chan string, len(urls)) // 緩沖通道存儲結果start := time.Now()// 為每個 URL 啟動一個協程for _, url := range urls {wg.Add(1) // 計數器 +1go func(u string) {defer wg.Done() // 協程結束時計數器 -1resp, err := http.Get(u)if err != nil {results <- fmt.Sprintf("Error fetching %s: %v", u, err)return}defer resp.Body.Close()body, _ := ioutil.ReadAll(resp.Body)results <- fmt.Sprintf("Response from %s: %d bytes", u, len(body))}(url) // 注意:顯式傳遞 url 避免閉包陷阱}// 等待所有協程完成go func() {wg.Wait() // 阻塞直到計數器歸零close(results) // 關閉通道,通知主協程}()// 從通道讀取結果for res := range results {fmt.Println(res)}fmt.Printf("Total time: %v\n", time.Since(start))
}
關鍵知識點解析:
-
協程創建
go func() { ... }
啟動協程,輕量級(初始僅 2KB 棧)。 -
同步控制
sync.WaitGroup
等待協程組完成(Add(), Done(), Wait())。 -
通道 chan
協程間通信(此處用緩沖通道避免阻塞)。 -
閉包陷阱
循環中啟動協程時,通過參數傳遞當前值(url),避免共享變量問題。 -
資源釋放
defer resp.Body.Close()
確保 HTTP 響應體關閉。
3.2 工作池(Worker Pool)
需求:限制并發數,避免資源耗盡。
func worker(id int, jobs <-chan int, results chan<- int) {for j := range jobs {fmt.Printf("Worker %d processing job %d\n", id, j)time.Sleep(time.Second) // 模擬耗時任務results <- j * 2}
}func main() {const numJobs = 10const numWorkers = 3jobs := make(chan int, numJobs)results := make(chan int, numJobs)// 啟動固定數量的工作協程for w := 1; w <= numWorkers; w++ {go worker(w, jobs, results)}// 發送任務for j := 1; j <= numJobs; j++ {jobs <- j}close(jobs) // 關閉通道,通知 worker 退出// 收集結果for r := 1; r <= numJobs; r++ {<-results}
}
3.3 協程使用最佳實踐
- 控制并發量
用帶緩沖的通道或信號量(sem := make(chan struct{}, maxConcurrency))限制協程數量。
- 避免泄漏
確保協程能正常退出(如通過 context 取消)。
使用 defer 釋放資源(文件句柄、網絡連接)。
- 錯誤處理
在協程內部捕獲 panic:
go func() {defer func() {if r := recover(); r != nil {log.Println("Recovered in goroutine:", r)}}()// 業務代碼...
}()
- 優先用通道通信
遵循 Go 哲學:“不要通過共享內存來通信,而要通過通信來共享內存”。
4. 并發安全
4.1 happens-before 原則
與 goroutine 有關的 happens-before 保證場景有:
- goroutine的創建happens before其執行
- goroutine的完成不保證happens-before任何代碼
4.2 并發安全類型
安全 | 不安全 |
---|---|
字節、布爾、整型、浮點型、字符型、atomic.Value(樂觀鎖)、指針、函數 | string、struct、復數型、數組、切片、映射、通道、接口 |
5. GMP 模型
5.1 GMP
Goroutine:是對 Go 中代碼片段的封裝,其實是一種輕量級的用戶線程。
Machine:一個 machine 對應一個內核線程,相當于內核線程在 Go 進程中的映射。
Processor:一個 prcessor 表示執行 Go 代碼片段的所必需的上下文環境,可以理解為用戶代碼邏輯的處理器。
每一個 M 都會以一個內核線程綁定,M 和 P 之間也是一對一的關系,而 P 和 G 的關系則是一對多。在運行過程中,M 和 內核線程之間對應關系的不會變化,在 M 的生命周期內,它只會與一個內核線程綁定,而 M 和 P 以及 P 和 G 之間的關系都是動態可變的。
M 和 P 必須組合使用才能為 G 提供有效的運行環境。多個可執行的 G 會按順序排成隊列,掛載在某個P上等待調度執行。
M 的創建一般是因為沒有足夠的 M 來和 P 組合以為 G 提供運行環境,在很多時候 M 的數量可能會比 P 要多。在單個 Go 進程中,P 的最大數量決定了程序的并發規模,且 P 的最大數量是由程序決定的。可以通過修改環境變量 GOMAXPROCS 和 調用函數 runtime#GOMAXPROCS
來設定 P 的最大值。
M 和 P 會適時的組合和斷開,保證 P 中的待執行 G 隊列能夠得到及時運行。比如說上圖中的 G0 此時因為網絡 I/O 而阻塞了 M,那么 P 就會攜帶剩余的 G 投入到其他 M 的懷抱中。這個新的 M1 可能是新創建的,也可能是從調度器空閑 M 列表中獲取的,取決于此時的調度器空閑 M 列表中是否存在 M,從而避免 M 的過多創建。
當 M 對應的內核線程被喚醒時,M 將會嘗試為 G0 捕獲一個 P 上下文,可能是從調度器的空閑 P 列表中獲取,如果獲取不成功,M 會被 G0 放入到調度器的可執行 G 隊列中,等待其他 P 的查找。為了保證 G 的均衡執行,非空閑的 P 會運行完自身的可執行 G 隊列中,會周期性從調度器的可執行 G 隊列中獲取代執行的 G,甚至從其他的 P 的可執行 G 隊列中掠奪 G。
5.2 協程數量
CPU 密集型: 如果是CPU密集型應用,并且持續的時間很長,這時CPU就會優先達到瓶頸。因此,應當限制goroutine的數量,以避免過多的上下文切換。
IO密集型: 如果是IO密集型應用,則可以開啟大量的goroutine,理論上內存會首先成為瓶頸(比如程序執行空操作)。因為 IO 操作相對較慢,goroutine在等待IO時會被阻塞,減少了CPU的使用。
6. 協程池 ants
ants 是一個高性能的 Golang 協程池庫,通過復用協程(goroutine)顯著減少頻繁創建銷毀的開銷,特別適合高并發場景。
6.1 特性
- 自動管理和回收大量 goroutine。
- 定期清除過期的 goroutine。
- 豐富的API:提交任務,獲取運行 goroutine 的數量,動態調整池的容量,釋放池,重新啟動池
- 優雅地處理死機以防止程序崩潰。
- 高效的內存使用,甚至比 Golang 中的無限 goroutine 實現了更高的性能。
- 非阻塞機制。
6.2 運行流程
- Pool :Ants 協程池核心結構。
- WorkerArray:Pool池中的worker隊列,存放所有的 Worker。
- goWorker:運行任務的實際執行者,它啟動一個 goroutine 來接受任務并執行函數調用。
- sync.Pool:golang 標準庫下并發安全的對象池,緩存申請用于之后的重用,以減輕 GC 的壓力。
- spinLock:基于CAS機制和指數退避算法實現的一種自旋鎖。
參考文獻
panjf2000/ants
如何優雅地關閉 channel?