channel
文章目錄
- channel
- 簡介
- 基本概念
- 類型表示法
- 值表示法
- 操作的特性
- 初始化通道
- 接收元素值
- Happens before
- 發送值
- 例1
- 核心組件
- 關鍵執行順序
- 輸出示例(可能順序)
- 設計要點
- 例2
- 例3
- 關閉通道
- 長度與容量
- 單向通道
- 主要用途
- 增強代碼表達性和安全性(最重要的用途)
- 實現更嚴格的接口
- 使用示例
- 示例 1:經典的生產者-消費者模型
- 示例 2:函數返回一個只讀通道
- 重要注意事項
- 總結
- 配合 `for` 語句與 `select` 語句
- 核心思想
- `for range` (最常用、最推薦)
- 顯式檢查 (較少用)
- 基礎語法
- 核心應用模式
- 模式 1:多路復用 (Multiplexing)
- 模式 2:超時控制 (Timeout)
- 模式 3:非阻塞操作 (Non-blocking)
- For 配合 Select 一起使用
- 經典結構:帶退出機制的永久循環
- 更現代的結構:使用 `context.Context`
- 總結與最佳實踐表格
- 黃金法則
- 配合 `time` 包使用
- 基本用法
- 1. 定時器 (Timer)
- 2. 打點器 (Ticker)
- 實際應用場景
- 1. 超時控制
- 2. 定期執行任務
- 3. 限制操作頻率
- 4. 帶有超時的等待組
- 注意事項
- 4. 帶有超時的等待組
- 注意事項
簡介
這里會詳細介紹 Go 的并發編程理念:
以通信作為手段來共享內存, 而不是通過共享內存來通信
這一理念的最直接最重要的體現也就是 channel
.
Go 鼓勵用與眾不同的方法來共享值, 這個方法就是用一個通道(信道)類型在不同 goroutine 之間傳遞值. Go 的 channel
就像是一個類型安全的通用型管道.
channel 提供一種機制, 使得即可以同步兩個并發執行函數, 又可以讓兩個函數通過相互傳遞特定類型值來通信. 也就是說提供了兩個功能:
- 并發 goroutine 同步
- 并發 goroutine 通信
當然有些場景下, 使用共享變量和傳統同步方法更加方便, 但是作為高級用法, 使用 channel
可以讓我們編寫更加清晰正確的程序.
基本概念
Go 中, channel
既指通道類型, 也指代可以傳遞某種類型的值的通道.
通道即某一個通道類型的值, 是該類型的一個實例.
類型表示法
通道是一個引用類型, 這和切片以及字典這兩個類型是一致的. 一個泛化的通道類型聲明應當如此:
chan T
其中,
- 關鍵字
chan
是代表通道類型的關鍵字, T
是表示了該通道類型的元素類型. 限制了可以經由此類通道傳遞的元素值的類型.
可以聲明這樣一個別名類型:
type IntChan chan int
該別名類型代表了元素類型為 int
的通道類型.
又比如可以直接聲明一個 chan int
類型的變量:
var intChan chan int
初始化之后, intChan
變量就可用來傳遞 int
類型的值.
以上就是最簡單的通道類型生命方式, 這樣聲明的通道類型是雙向的. 也就是既可以向它發送值, 也可從他接收值.
此外還可聲明單向的通道類型, 需要用到接收操作符 <-
, 下面就是一個只能用于發送值的通道類型的泛化表示:
chan<- T
只能向此類通道發送值而不能從其中接收值. 接受操作符 <-
生動表示了元素值的流向. 可以把這樣的單項通道類型簡稱為發送通道類型. 同樣的也可以聲明只能從中接收值的通道類型:
<-chan T
這類單向通道類型可以被簡稱為: 接收通道類型.
值表示法
因為 channel 是引用類型, 所以通道類型的變量在被初始化之前, 其值一定是 nil
.
注意: 通道的語義決定了他和其他類型不同, 通道類型的變量一定是用來傳遞值的, 而不是用來存儲值的, 所以通道類型沒有對應的值表示法. 其值有即時性, 無法用字面量來準確表達.
操作的特性
通道是在多個 goroutine
間傳遞數據和同步的重要手段, 對于通道的操作, 其本身也是同步的.
在同一時刻, 只能有一個 goroutine 向一個通道發送值, 同時也只能有一個 goroutine 從它那接受值.
通道相當于一個 FIFO 的消息隊列, 其中的各個值都是嚴格按照發送到其中的先后順序排列, 最早被發送到通道的值會最先被接收.
通道中的值都有原子性, 不可以被分割.
通道中的每一個值都只能被某一個 goroutine 接收, 已經被接受的值會立刻從通道中刪除.
初始化通道
所有引用類型的值都要使用 make
內建函數初始化, channel
也是這樣的:
make(chan int, 10)
將初始化一個最多能緩沖 10 個 int
類型值的 channel (這是一個帶有緩沖區的 channel), 一個帶有緩沖的 channel, 其緩沖容量總是固定不變的.
此處也可以省略第二個參數, 此時創建的就是一個無緩沖區的 channel:
make(chan int)
發送給該 channel 的值應當被立刻取走, 否則發送方的 goroutine 將會阻塞, 直到有接收方接受了該值.
接收元素值
接收運算符 <- 既可以用來作為通道類型聲明的一部分, 也可用于通道操作(發送或者接收元素值).
假設有這樣的一個通道類型的變量:
strChan := make(chan string, 3)
make
函數調用后, 返回一個已經被初始化的通道值作為結果.
因此該賦值語句時的變量 strChan
成為一個雙向通道, 該通道的元素類型為 string
, 容量為 3.
如果要從中接收元素值, 那么這樣寫:
elem := <-strChan
語義很簡單: 將 strChan
中的一個值賦值給變量 elem
.
該操作將使當前 goroutine 被迫進入 Gwaiting 狀態, 直到 strChan
之中有新的值可取時才會被喚醒.
也可以用以下的雙返回值:
elem, ok := <-strChan
這里同樣是一個阻塞行為.
如果在進行接收操作之前或者過程中該通道被關閉了, 那么該操作將會立即結束, 變量 elem
會被賦予該通道的元素類型的零值(0, nil等). 這對應著一個特殊情況, 如果我們接收到的值本身就是零值而不是由于關閉通道而產生的異常零值要怎么辦, 此時 ok
就有作用了.
ok
是一個 bool
類型變量, 當接收操作因通道關閉而結束時, 該值為 false
, 否則就是 true
.
可以把在符號 =
或者 :=
右側出現的, 僅能是接收表達式的賦值語句稱為接收語句.
在其中的接收操作符 <-
右邊的不僅僅可以是代表通道的標識符, 也可是任意的表達式.
只要該表達式的結果類型是一個通道類型即可, 將這樣的表達式稱作通道表達式.
最后要注意: 嘗試從一個未被初始化的通道值(也就是一個值為 nil
的通道)接收值, 會造成當前 goroutine 的永久阻塞.
Happens before
為了能夠從通道接收元素值,我們先向它發送元素值.
理所當然,一個元素值在被接收方從通道中取出之前,必須先存在于該通道內.
更加正式地講,對于一個緩沖通道,有如下規則:
-
發送操作會使通道復制被發送的元素.
如果因通道的緩沖空間已滿而無法立即復制,則阻塞進行發送操作的 goroutine.
復制的目的地址有兩種:
- 當通道已空且有接收方在等待元素值時,它會是最早等待的那個接收方持有的內存地址(channel就像是中轉了一下)
- 否則是通道持有的緩沖中的內存地址。
-
接收操作會使通道給出一個已發給它的元素值的副本,若因通道的緩沖空間已空而無法立即給出,則阻塞進行接收操作的goroutine。一般情況下, 接收方會從通道持有的緩沖中得到元素值。
-
對于同一個元素值來說,把它 發送給某個通道 的操作,總是會在 從該通道接收它 這一操作之前完成。
換句話說,在通道完全復制一個元素值之前,任何 goroutine 都不可能從它那里接收到這個元素值的副本。
發送值
發送語句由三要素組成:
- 通道表達式
- 接收操作符
<-
- 代表元素值的表達式(以下簡稱為元素表達式)
其中, 元素表達式 的結果類型一定要和 通道表達式 的結果類型中的元素類型之間存在可賦值關系. 也就是, 前者的值一定是可以賦給類型為后者的變量.
對于接收表達式 <-
兩邊的表達式的求值總是先于發送操作執行, 對兩個表達式的求值完成前, 發送操作一定會被阻塞.
比如想要向通道 strChan
發送一個值 "a"
, 要這樣做:
先初始化一個通道,
strChan := make(chan string, 3)
然后:
strChan <- "a"
<-
左側是將要接納元素值的通道, 右邊則是想要發送給該通道的值.
此表達式被求值后, 通道 strChan
就緩沖了值 “a”, 然后再往里邊發兩個值:
strChan <- "b"
strChan <- "c"
現在 通道 strChan
緩沖了3個元素值, 達到了最大容量. 此后某個 goroutine 再向其中發送元素值時, 該 goroutine 就會被阻塞, 只有從該通道中接受一個元素值后, 這個 goroutine 才會被喚醒并且完成發送操作.
例1
看一段代碼:
package mainimport ("fmt""time"
)var strChan = make(chan string, 3)func main() {syncChan1 := make(chan struct{}, 1)syncChan2 := make(chan struct{}, 2)// 用于演示接收操作go func() {<-syncChan1 // 等待同步信號(阻塞直到收到 "c")fmt.Println("Received a sync signal and wait a second ... [receiver]")time.Sleep(time.Second) // 故意等待 1 秒// 循環接收數據直到通道關閉for {if elem, ok := <-strChan; ok {fmt.Println("Received:", elem, "[receiver]")} else {break // 通道關閉時退出}}fmt.Println("Stopped. [receiver]")syncChan2 <- struct{}{} // 發送完成信號}()// 用于演示發送操作go func() {// 發送數據 "a", "b", "c", "d"for _, elem := range []string{"a", "b", "c", "d"} {strChan <- elemfmt.Println("Sent:", elem, "[sender]")// 關鍵同步點:發送 "c" 后觸發接收者啟動if elem == "c" {syncChan1 <- struct{}{} // 發送同步信號fmt.Println("Sent a sync signal. [sender]")}}// 完成發送后等待 2 秒fmt.Println("Wait 2 seconds... [sender]")time.Sleep(time.Second * 2)close(strChan) // 關閉數據通道syncChan2 <- struct{}{} // 發送完成信號}()<-syncChan2<-syncChan2
}
這段 Go 代碼演示 goroutine 間的同步與通信,使用了帶緩沖的 channel 和同步信號 channel.
核心組件
strChan
:緩沖大小為 3 的字符串 channel,用于數據傳輸syncChan1
:緩沖大小為 1 的同步信號 channel(控制接收啟動時機)syncChan2
:緩沖大小為 2 的同步信號 channel(等待兩個 goroutine 結束)
關鍵執行順序
-
初始發送階段:
- 發送者快速發送 “a”, “b”, “c”(填滿 3 緩沖)
- 發送 “c” 后觸發
syncChan1
信號 - 發送 “d” 時阻塞(因緩沖已滿)
-
接收啟動階段:
- 接收者收到
syncChan1
信號 - 等待 1 秒后開始消費數據
- 接收 “a” 后釋放緩沖空間
- 接收者收到
-
完成階段:
- 發送者解除阻塞,發送 “d”
- 發送者等待 2 秒后關閉
strChan
- 接收者消費剩余數據 (“b”, “c”, “d”) 后退出
輸出示例(可能順序)
Sent: a [sender]
Sent: b [sender]
Sent: c [sender]
Sent a sync signal. [sender] // 發送者在此阻塞
Received a sync signal... [receiver]
// (1秒延遲)
Received: a [receiver] // 釋放緩沖
Sent: d [sender] // 發送者解除阻塞
Wait 2 seconds... [sender] // 發送者開始等待
Received: b [receiver]
Received: c [receiver]
Received: d [receiver] // 接收者消費完畢
Stopped. [receiver] // 接收者退出
// (主 goroutine 收到兩個完成信號后退出)
設計要點
- 緩沖控制:緩沖大小為 3 使得發送 “d” 時被阻塞
- 精確同步:
syncChan1
確保接收者在特定時點啟動(收到 “c” 后) - 關閉通道:
close(strChan)
通知接收者數據結束 - 雙信號確認:
syncChan2
保證主 goroutine 等待所有任務完成
此代碼演示了如何通過 channel 實現:
- 數據傳輸 (
strChan
) - 啟動時機控制 (
syncChan1
) - 任務完成同步 (
syncChan2
) - 通道關閉通知機制
例2
對于通道的復制行為還需要再解釋解釋. 發送方 向 通道發送的值會被復制, 接收方接受的總是該值的副本而不是該值的本身,
這意味著對于一個值對象來說, 就是做了普通的一次拷貝而已,
但是對于引用類型來說(比如切片, 字典等), 就是復制了一份引用, 這也意味著修改了復制之后得到的引用就修改了收發兩方持有的值.
package mainimport ("fmt""time"
)var mapChan = make(chan map[string]int, 1)func main() {syncChan := make(chan struct{}, 2)// 發go func() {countMap := make(map[string]int)for i := 0; i < 5; i++ {mapChan <- countMaptime.Sleep(time.Millisecond)fmt.Printf("The count map: %v. [sender]\n", countMap)}close(mapChan)syncChan <- struct{}{}}()// 收go func() {for {if elem, ok := <-mapChan; ok {elem["count"]++} else {break}}fmt.Println("Stopped. [receiver]")syncChan <- struct{}{}}()<-syncChan<-syncChan
}
輸出如下:
The count map: map[count:{count: 1}]. [sender]
The count map: map[count:{count: 2}]. [sender]
The count map: map[count:{count: 3}]. [sender]
The count map: map[count:{count: 4}]. [sender]
The count map: map[count:{count: 5}]. [sender]
Stopped. [receiver]
這里收的一方就對得到的值做了自增操作, 這里也能看到, 原值同樣被更改了.
例3
package mainimport ("fmt""time"
)type Counter struct {count int
}var mapChan = make(chan map[string]Counter, 1)func main() {syncChan := make(chan struct{}, 2)// 發go func() {countMap := map[string]Counter{"count": {},}for i := 0; i < 5; i++ {mapChan <- countMaptime.Sleep(time.Millisecond)fmt.Printf("The count map: %v. [sender]\n", countMap)}close(mapChan)syncChan <- struct{}{}}()// 收go func() {for {if elem, ok := <-mapChan; ok {counter := elem["count"]counter.count++} else {break}}fmt.Println("Stopped. [receiver]")syncChan <- struct{}{}}()<-syncChan<-syncChan
}
這里將輸出:
The count map: map[count:{0}]. [sender]
The count map: map[count:{0}]. [sender]
The count map: map[count:{0}]. [sender]
The count map: map[count:{0}]. [sender]
The count map: map[count:{0}]. [sender]
Stopped. [receiver]
原因是 go 中的結構體都是值類型而非引用類型, 如果要修改原值的話就要傳入指針
比如將變量 mapChan
和 countMap
修改為:
var mapChan = make(chan map[string]*Counter, 1)
countMap := map[string]*Counter{"count": {},
}
為了觀察結構體內部的值狀態及其變化, 為 Counter
類型增加一個方法:
func (counter *Counter) String() string {return fmt.Sprintf("{count: %d}", counter.count)
}
然后輸出將是如此:
The count map: map[count:{count: 1}]. [sender]
The count map: map[count:{count: 2}]. [sender]
The count map: map[count:{count: 3}]. [sender]
The count map: map[count:{count: 4}]. [sender]
The count map: map[count:{count: 5}]. [sender]
Stopped. [receiver]
關閉通道
調用 close
函數可以關閉一個通道, 但是調用前要注意: 如果向已關閉的通道發送元素, 此操作會引發 panic
, 所以在關閉通道之前應當確保安全(后面會說用 for
語句和 select
語句確保安全.
這里要說明: 無論如何都不能在接收端關閉通道, 因為接收端在邏輯上一般是不發判斷發送端是否還會向通道發元素值. 另一個方面來說, 從發送端關閉通道一般不會產生什么影響, 就算是關閉之后通道里面還有值, 也是可以通過接收表達式取出的, 然后根據該表達式第二個結果值判斷通道是否已關閉并且沒有元素值可以取.
然后看一個示例:
package mainimport "fmt"func main() {dataChan := make(chan int, 5)syncChan1 := make(chan struct{}, 1)syncChan2 := make(chan struct{}, 2)// 發go func() {for i := 0; i < 5; i++ {dataChan <- ifmt.Printf("[sender] Sent: %d\n", i)}close(dataChan)syncChan1 <- struct{}{}fmt.Println("[sender] Done.")syncChan2 <- struct{}{}}()// 收go func() {<-syncChan1for {if elem, ok := <-dataChan; ok {fmt.Printf("[receiver] Received: %d \n", elem)} else {break}}fmt.Println("[receiver] Done.")syncChan2 <- struct{}{}}()<-syncChan2<-syncChan2
}
這里通過一個通道(dataChan1
)阻塞了收方面, 強制發送完所有元素并且關閉通道之后再執行接收操作. 雖然通道在這里已關閉了, 但是對于接受操作卻沒有影響, 接收方仍然可以再接受完所有元素值之后結束工作.
最后又兩個注意點:
- 同一通道只能關閉一次, 關閉一個已關閉的通道會引發
panic
- 調用 close 函數時, 需要把想要關閉的通道的變量作為參數傳入, 如果該變量值為
nil
, 將引發panic
.
為了幫助決策,可以遵循以下原則:
場景 | 是否需要關閉? | 說明 |
---|---|---|
你是發送方,且不再發送任何值,接收方正使用 for range | 必須關閉 | 這是關閉 channel 最主要的原因。 |
你是發送方,需要通知多個接收者“結束了” | 應該關閉 | 關閉 channel 是一種廣播機制,所有接收的 for range 都會收到。 |
Channel 用于單向同步信號(如 done <- struct{}{} ) | 不需要關閉 | 接收方只接收一次,不關心后續狀態。 |
Channel 是全局的、永久的(如任務隊列) | 不需要關閉 | 它的設計就是永不停止。但消費者邏輯要匹配(用 for-select 而非 for range )。 |
你不確定 | 傾向于關閉 | 除非你有明確理由不關閉,否則關閉一個 channel 通常比不關閉更安全。但切記:只能關閉一次,且不能關閉已關閉的 channel。 |
記住最后的黃金法則:永遠不要關閉一個接收方還在等待讀取的 channel,并且只能由發送方來關閉(或者一個非常明確知道沒有其他發送者的角色)。關閉一個 channel 的意圖應該是向接收方發送信息,而不是為了回收資源。
長度與容量
內建函數 len
和 cap
同樣可以用在通道上, 作用分別是獲取通道中當前元素值的數量(長度)以及獲取通道可容納元素值的最大數量(容量). 通道容量實在初始化時以確定的, 并且之后不能改變, 通道長度則會隨實際情況變化.
可以通過容量判斷通道是否帶緩沖. 如果容量為 0, 那么一定是非緩沖通道, 否則是緩沖通道.
單向通道
這是一個非常重要的概念,主要用于在函數或方法間傳遞通道時,施加明確的權限限制,從而增強代碼的類型安全性和可讀性。
顧名思義,單向通道就是只能用于發送或只能用于接收的通道。它是雙向通道的一種變體,其類型由 chan<-
(只寫)和 <-chan
(只讀)表示。
- 只寫通道:
chan<- T
- 你只能向這個通道發送數據(
ch <- value
)。 - 你不能從這個通道接收數據(如果嘗試
<-ch
會引發編譯錯誤)。
- 你只能向這個通道發送數據(
- 只讀通道:
<-chan T
- 你只能從這個通道接收數據(
value := <-ch
)。 - 你不能向這個通道發送數據(如果嘗試
ch <- value
會引發編譯錯誤)。
- 你只能從這個通道接收數據(
一個普通的雙向通道 chan T
可以被隱式轉換為任何一種單向通道,但反過來不行。
主要用途
你可能會問,既然有雙向通道,為什么要限制自己呢?其主要目的是為了在接口層面強制約定和保證代碼安全。
增強代碼表達性和安全性(最重要的用途)
當一個函數或方法的參數是一個通道時,使用單向通道可以清晰地表達這個函數的意圖。
- 對于函數參數:它明確規定了函數對這個通道的操作權限。
func producer(ch chan<- int)
: 我(producer
函數)承諾只會在ch
里寫數據,絕不會嘗試讀。這相當于一個“合同”。func consumer(ch <-chan int)
: 我(consumer
函數)承諾只會從ch
里讀數據,絕不會嘗試寫。
這樣做的好處是:
- 自文檔化:任何人看到函數簽名,立刻就知道這個通道該怎么用。
- 編譯時檢查:編譯器會幫你抓住所有違反這個約定的操作。如果你不小心在
consumer
函數里寫了ch <- data
,代碼將無法通過編譯。這是一種強大的、在編譯階段就能發現錯誤的機制。 - 防止誤操作:避免在復雜的并發程序中,錯誤地關閉了不該關閉的通道,或者向一個本應只讀的通道發送數據,導致難以調試的 panic。
實現更嚴格的接口
在設計庫或者模塊時,你可以暴露只讀或只寫通道給外部使用者,從而隱藏內部實現細節,防止外部代碼錯誤地干擾你的內部通信邏輯。
使用示例
示例 1:經典的生產者-消費者模型
這是最典型的使用場景。
package mainimport ("fmt""time"
)// 生產者函數:接收一個只寫通道
// 它只能向這個通道發送數據
func producer(ch chan<- int) {for i := 0; i < 5; i++ {fmt.Printf("生產者發送: %d\n", i)ch <- i // 這是允許的time.Sleep(time.Second)}close(ch) // 關閉通道也是允許的(通常由發送方關閉)// 注意:從一個只寫通道接收數據(如 <-ch)會導致編譯錯誤
}// 消費者函數:接收一個只讀通道
// 它只能從這個通道接收數據
func consumer(ch <-chan int) {// 循環從通道中讀取數據,直到通道被關閉for num := range ch {fmt.Printf("消費者收到: %d\n", num)}// 向一個只讀通道發送數據(如 ch <- 99)會導致編譯錯誤// 關閉一個只讀通道(如 close(ch))也會導致編譯錯誤
}func main() {// 1. 創建一個普通的雙向通道ch := make(chan int)// 2. 啟動生產者和消費者goroutine// 在傳參時,Go語言會自動將雙向通道 ch 轉換為所需的單向通道類型go producer(ch) // ch 被當作 chan<- int 使用consumer(ch) // ch 被當作 <-chan int 使用fmt.Println("程序結束")
}
關鍵點:
main
函數里創建的是雙向通道chan int
。- 在將
ch
傳遞給producer
時,它被隱式轉換為了chan<- int
(只寫)。 - 在將
ch
傳遞給consumer
時,它被隱式轉換為了<-chan int
(只讀)。 - 這種轉換是安全的,并且是 Go 語言類型系統所允許的。
示例 2:函數返回一個只讀通道
你可以設計一個函數,它返回一個只讀通道,調用者只能從這個通道消費數據,無法向其發送數據,這很好地封裝了內部邏輯。
// 創建一個計數器,返回一個只讀通道,每秒發送一個遞增的數字
func startCounter() <-chan int {ch := make(chan int)go func() {defer close(ch)for i := 0; ; i++ {ch <- itime.Sleep(time.Second)}}()return ch // 返回的 chan int 被隱式轉換為 <-chan int
}func main() {countCh := startCounter()// 我們只能從 countCh 讀for i := 0; i < 3; i++ {fmt.Println(<-countCh)}// countCh <- 100 // 錯誤:不能向只讀通道發送// close(countCh) // 錯誤:不能關閉只讀通道
}
重要注意事項
- 轉換是單向的:你可以將
chan T
轉換為chan<- T
或<-chan T
,但不能將chan<- T
或<-chan T
轉換回chan T
。這是一條“單行道”,目的是為了保證安全。 - 通道操作權限:
- 關閉操作:只有發送方可以關閉通道。因此,你可以在一個
chan<- T
上調用close()
,但不能在一個<-chan T
上調用close()
,這會導致編譯錯誤。 - 長度和容量:你可以使用
len()
和cap()
來查詢只讀和只寫通道,因為這個操作不涉及數據的發送和接收。
- 關閉操作:只有發送方可以關閉通道。因此,你可以在一個
總結
通道類型 | 操作權限 | 典型用途 |
---|---|---|
chan T | 雙向(可讀可寫) | 在單個 goroutine 內部或多個 goroutine 間自由通信 |
chan<- T | 只寫(發送) | 作為函數參數,限制函數只能向通道發送數據 |
<-chan T | 只讀(接收) | 作為函數參數或返回值,限制函數只能從通道接收數據 |
最佳實踐:在函數或方法的簽名中,盡可能地使用單向通道。這是一種“按權限設計”的思路,它能極大地提高并發代碼的清晰度、安全性和可維護性,是編寫高質量 Go 并發程序的標志之一。
配合 for
語句與 select
語句
好的,這是一份為你整理的關于 Go 語言中 channel
與 for
和 select
語句配合使用的綜合筆記。它涵蓋了核心概念、各種模式、最佳實踐和注意事項,非常適合用于學習和復習。
核心思想
Channel 是 Goroutine 之間的通信管道,而 for
和 select
是消費和管理這些管道的主要控制流語句。它們的組合構成了 Go 并發編程的基石。
for
:用于持續地從 channel 中接收數據。select
:用于同時監聽多個 channel 的操作(發送或接收)。for
+select
:用于構建長期運行的服務,該服務需要多路處理各種事件(如數據、信號、超時)。
for range
(最常用、最推薦)
行為:自動從 channel 接收值,直到 channel 被關閉且 drained(排空)。
循環結束條件:channel 被關閉。
關鍵:發送方負責關閉 channel,以向接收方廣播“沒有更多數據”的信號。
ch := make(chan int)// 生產者 Goroutine
go func() {for i := 0; i < 3; i++ {ch <- i}close(ch) // 重要!由發送方關閉
}()// 消費者:使用 for range
for value := range ch {fmt.Println(value) // 打印 0, 1, 2
}
// 循環在 ch 關閉后自動退出
fmt.Println("Channel closed, loop exited.")
最佳實踐:在簡單消費者場景下優先使用此模式。
顯式檢查 (較少用)
行為:使用 , ok
語法手動檢查 channel 狀態。
循環結束條件:ok == false
(channel 已關閉且空)。
for {value, ok := <-chif !ok {break // channel 已關閉且空,退出循環}fmt.Println(value)
}
select
用于監聽多個 channel 操作,每個 case
是一個通信操作。
跟在每個 case
后面的之呢個是針對某個通道的發送語句或者接收語句。
在 select
關鍵字右側沒有像是 switch
語句那樣的 switch
表達式, 而是直接跟上左花括號。
基礎語法
select {
case v := <-chan1:fmt.Printf("Received %v from chan1\n", v)
case chan2 <- data:fmt.Println("Sent data to chan2")
case <-chan3:fmt.Println("Received something from chan3 (value ignored)")
default:fmt.Println("No communication ready, do something else")
}
開始執行 select
語句時, 所有在 case
右側的發送語句或者接收語句中的通道表達式和元素表達式都會先求值(求值順序是從左到右, 自上而下), 無論他們所在的 case
是否可能被選擇都是這樣.
在執行 select
語句的時候,運行時系統會自上而下地判斷每個 case
中的發送或接收操作是否可以立即進行。
這里的“立即進行”,指的是當前 goroutine 不會因此操作而被阻塞。
這個判斷還需要依據通道的具體特性(緩沖或非緩沖)以及那一時刻的具體情況來進行。
只要發現有一個 case
上的判斷是肯定的,該 case
就會被選中。
package mainimport "fmt"var intChan1 chan int
var intChan2 chan int
var channels = []chan int{intChan1, intChan2}var numbers = []int{1, 2, 3, 4, 5}func main() {select {case getChan(0) <- getNumber(0):fmt.Println("1th case is selected.")case getChan(1) <- getNumber(1):fmt.Println("The 2nd case is selected.")default:fmt.Println("Default case!")}
}func getNumber(i int) int {fmt.Printf("numbers[%d]\n", i)return numbers[i]
}func getChan(i int) chan int {fmt.Printf("channels[%d]\n", i)return channels[i]
}
輸出:
channels[0]
numbers[0]
channels[1]
numbers[1]
Default case!
select
會阻塞直到某個 case
就緒,并在多個 case
就緒時偽隨機公平地選擇一個執行。
如果沒有任何一個 case
符合選擇條件, 而且沒有 default
case, 那么當前 goroutine 將保持阻塞, 直到至少有一個 case
中的發送或者接受操作可以立即進行為止.
核心應用模式
模式 1:多路復用 (Multiplexing)
監聽多個 channel,處理最先到達的事件。
dataChan := make(chan string)
stopChan := make(chan struct{}) // 用于信號的 channelfor {select {case data := <-dataChan:handleData(data)case <-stopChan:// 收到停止信號,清理并退出fmt.Println("Stopping...")return}
}
模式 2:超時控制 (Timeout)
防止 Goroutine 無限期阻塞。使用 time.After
或 context
。
select {
case result := <-longRunningOperationChan:fmt.Println("Success:", result)
case <-time.After(2 * time.Second):fmt.Println("Error: Operation timed out after 2 seconds")
}
注意:在長周期循環中使用 time.After
會創建大量 Timer,可能導致資源泄漏。應使用 time.NewTimer
并在循環外創建和重置。
timer := time.NewTimer(2 * time.Second)
defer timer.Stop() // 確保釋放資源for {timer.Reset(2 * time.Second) // 每次循環重置select {case result := <-operationChan:handle(result)case <-timer.C:handleTimeout()}
}
模式 3:非阻塞操作 (Non-blocking)
使用 default
分支嘗試立即進行通信,若無法完成則執行其他任務。
select {
case ch <- task: // 嘗試發送fmt.Println("Task sent")
default:fmt.Println("Channel is busy, skipping task or adding to a buffer")// 例如,實現一個簡單的負載下降策略
}
For 配合 Select 一起使用
這是構建復雜并發服務(如 worker pools、網絡服務器、事件循環)的核心模式。
經典結構:帶退出機制的永久循環
func worker(inputChan <-chan *Task, stopChan <-chan struct{}) {for { // 永久循環select {case task := <-inputChan: // 1. 處理主要工作process(task)case <-stopChan: // 2. 響應退出信號fmt.Println("Worker shutting down...")cleanup()return // 退出函數,從而結束 Goroutinecase <-time.After(30 * time.Second): // 3. 處理超時/空閑狀態fmt.Println("Worker is idle")}}
}
更現代的結構:使用 context.Context
context
包提供了更強大、更標準的取消和超時機制。
func worker(ctx context.Context, inputChan <-chan *Task) {for {select {case task := <-inputChan:process(task)case <-ctx.Done(): // 監聽 Context 的取消/超時信號err := ctx.Err()fmt.Printf("Worker stopping due to: %v\n", err)cleanup()return}}
}// 在主函數中
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() // 確保資源釋放go worker(ctx, taskChan)
總結與最佳實踐表格
模式 | 語法 | 用途 | 結束條件 |
---|---|---|---|
for range ch | for v := range ch { } | 簡單消費者,處理所有數據 | ch 被關閉 |
for + select | for { select { case ... } } | 復雜消費者,多路事件處理 | return 或 break (通常由信號觸發) |
select + time.After | case <-time.After(d) | 單次操作超時控制 | 超時或操作完成 |
select + default | default: ... | 非阻塞通信嘗試 | 立即執行 |
黃金法則
- 關閉原則:永遠只由發送方關閉 channel。關閉一個已關閉的 channel 會引發 panic。
- 循環退出:
for range
依賴 channel 關閉來退出。for-select
循環通常依賴一個專門的信號 channel(如stopChan
或ctx.Done()
)來觸發退出。 - nil Channel:對一個
nil
channel 的操作會永遠阻塞。你可以利用這一點在select
中動態“禁用”某個case
(將其設置為nil
)。 - 資源管理:
- 使用
defer
關閉 channel(如果是發送方)。 - 避免在長循環中頻繁使用
time.After()
,改用time.Timer
。 - 使用
context.Context
來傳播取消信號,這是處理超時和取消的現代標準方式。
- 使用
- 預防泄漏:確保 Goroutine 總有辦法退出(通過 channel 關閉或信號),否則會導致 Goroutine 泄漏。
配合 time
包使用
在 Go 語言中,time 包與 channel 的配合使用非常強大,主要用于實現超時控制、定時任務和周期性操作等場景。
基本用法
1. 定時器 (Timer)
定時器用于在未來的某個時間點執行一次操作。
package mainimport ("fmt""time"
)func main() {// 創建一個 2 秒的定時器timer := time.NewTimer(2 * time.Second)// 等待定時器觸發<-timer.Cfmt.Println("定時器觸發")// 停止定時器(如果還需要使用,可以使用 Reset)// timer.Stop()
}
2. 打點器 (Ticker)
打點器用于每隔一段時間重復執行操作。
func main() {// 創建一個每秒觸發一次的打點器ticker := time.NewTicker(1 * time.Second)// 創建一個 5 秒后觸發的定時器用于停止打點器stopTimer := time.NewTimer(5 * time.Second)for {select {case <-ticker.C:fmt.Println("打點器觸發")case <-stopTimer.C:fmt.Println("停止打點器")ticker.Stop()return}}
}
實際應用場景
1. 超時控制
func main() {// 創建一個用于模擬長時間操作的 channelresultChan := make(chan string)// 模擬一個耗時操作go func() {time.Sleep(3 * time.Second)resultChan <- "操作完成"}()// 設置超時時間為 2 秒select {case res := <-resultChan:fmt.Println(res)case <-time.After(2 * time.Second):fmt.Println("操作超時")}
}
2. 定期執行任務
func main() {ticker := time.NewTicker(2 * time.Second)done := make(chan bool)go func() {for {select {case <-done:returncase t := <-ticker.C:fmt.Println("定期任務執行于", t.Format("15:04:05"))}}}()// 運行 10 秒后停止time.Sleep(10 * time.Second)ticker.Stop()done <- truefmt.Println("定時任務停止")
}
3. 限制操作頻率
func main() {requests := make(chan int, 5)for i := 1; i <= 5; i++ {requests <- i}close(requests)// 限制為每 1 秒處理一個請求limiter := time.Tick(1 * time.Second)for req := range requests {<-limiterfmt.Println("處理請求", req, time.Now().Format("15:04:05"))}
}
4. 帶有超時的等待組
func main() {var wg sync.WaitGroupwg.Add(1)done := make(chan bool)go func() {time.Sleep(3 * time.Second) // 模擬耗時任務wg.Done()done <- true}()// 設置 2 秒超時select {case <-done:fmt.Println("任務完成")case <-time.After(2 * time.Second):fmt.Println("任務超時")}
}
注意事項
- 記得調用
Stop()
方法來釋放定時器/打點器資源,避免內存泄漏 - 使用
time.After()
在長時間運行的循環中可能會創建大量定時器,應考慮使用time.NewTimer()
并重用 - 定時器/打點器觸發后,channel 會接收到一個時間值,但通常我們只關心觸發事件本身
這些模式使得 Go 程序能夠優雅地處理時間相關的操作,特別是在并發環境中非常有用。
(“定期任務執行于”, t.Format(“15:04:05”))
}
}
}()
// 運行 10 秒后停止
time.Sleep(10 * time.Second)
ticker.Stop()
done <- true
fmt.Println("定時任務停止")
}
#### 3. 限制操作頻率```go
func main() {requests := make(chan int, 5)for i := 1; i <= 5; i++ {requests <- i}close(requests)// 限制為每 1 秒處理一個請求limiter := time.Tick(1 * time.Second)for req := range requests {<-limiterfmt.Println("處理請求", req, time.Now().Format("15:04:05"))}
}
4. 帶有超時的等待組
func main() {var wg sync.WaitGroupwg.Add(1)done := make(chan bool)go func() {time.Sleep(3 * time.Second) // 模擬耗時任務wg.Done()done <- true}()// 設置 2 秒超時select {case <-done:fmt.Println("任務完成")case <-time.After(2 * time.Second):fmt.Println("任務超時")}
}
注意事項
- 記得調用
Stop()
方法來釋放定時器/打點器資源,避免內存泄漏 - 使用
time.After()
在長時間運行的循環中可能會創建大量定時器,應考慮使用time.NewTimer()
并重用 - 定時器/打點器觸發后,channel 會接收到一個時間值,但通常我們只關心觸發事件本身
這些模式使得 Go 程序能夠優雅地處理時間相關的操作,特別是在并發環境中非常有用。