Channel
設計原理
不要通過共享內存的方式進行通信,而是應該通過通信的方式共享內存。
在主流編程語言中,多個線程傳遞數據的方式一般都是共享內存。
Go 可以使用共享內存加互斥鎖進行通信,同時也提供了一種不同的并發模型,即通信順序進程(Communicating sequential processes,CSP)。Goroutine 和 Channel 分別對應 CSP 中的實體和傳遞信息的媒介,Goroutine 之間會通過 Channel 傳遞數據。
上圖中的兩個 Goroutine,一個會向 Channel 中發送數據,另一個會從 Channel 中接收數據,它們兩者能夠獨立運行并不存在直接關聯,但是能通過 Channel 間接完成通信。
接收數據
兩個 Goroutine,一個會向 Channel 中發送數據,另一個會從 Channel 中接收數據,它們兩者能夠獨立運行并不存在直接關聯,但是能通過 Channel 間接完成通信。這是一個 生產者 - 消費者 模型,負責接收數據的 goroutine 從 channel 讀取一個消息進行消費,channel 起到一個臨界區/緩沖區的作用。
// chanrecv 函數接收 channel c 的元素并將其寫入 ep 所指向的內存地址。
// 如果 ep 是 nil,說明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在沒有數據可接收的情況下,返回 (false, false)
// 否則,如果 c 處于關閉狀態,將 ep 指向的地址清零,返回 (true, false)
// 否則,用返回值填充 ep 指向的內存地址。返回 (true, true)
// 如果 ep 非空,則應該指向堆或者函數調用者的棧func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// 省略 debug 內容 …………// 如果是一個 nil 的 channelif c == nil {// 如果不阻塞,直接返回 (false, false)if !block {return}// 否則,接收一個 nil 的 channel,goroutine 掛起gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)// 不會執行到這里throw("unreachable")}// 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回// 當我們觀察到 channel 沒準備好接收:// 1. 非緩沖型,等待發送列隊 sendq 里沒有 goroutine 在等待// 2. 緩沖型,但 buf 里沒有元素// 之后,又觀察到 closed == 0,即 channel 未關閉。// 因為 channel 不可能被重復打開,所以前一個觀測的時候 channel 也是未關閉的,// 因此在這種情況下可以直接宣布接收失敗,返回 (false, false)if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&atomic.Load(&c.closed) == 0 {return}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 加鎖lock(&c.lock)// channel 已關閉,并且循環數組 buf 里沒有元素// 這里可以處理非緩沖型關閉 和 緩沖型關閉但 buf 無元素的情況// 也就是說即使是關閉狀態,但在緩沖型的 channel,// buf 里有元素的情況下還能接收到元素if c.closed != 0 && c.qcount == 0 {if raceenabled {raceacquire(unsafe.Pointer(c))}// 解鎖unlock(&c.lock)if ep != nil {// 從一個已關閉的 channel 執行接收操作,且未忽略返回值// 那么接收的值將是一個該類型的零值// typedmemclr 根據類型清理相應地址的內存typedmemclr(c.elemtype, ep)}// 從一個已關閉的 channel 接收,selected 會返回truereturn true, false}// 等待發送隊列里有 goroutine 存在,說明 buf 是滿的// 1. 非緩沖型的 channel。直接進行內存拷貝(從 sender goroutine -> receiver goroutine)// 2. 緩沖型的 channel,但 buf 滿了。接收到循環數組頭部的元素,并將發送者的元素放到循環數組尾部if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}// 緩沖型,buf 里有元素,可以正常接收if c.qcount > 0 {// 直接從循環數組里找到要接收的元素qp := chanbuf(c, c.recvx)// …………// 沒有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 valif ep != nil {typedmemmove(c.elemtype, ep, qp)}// 清理掉循環數組里相應位置的值typedmemclr(c.elemtype, qp)// 接收游標向前移動c.recvx++// 接收游標歸零if c.recvx == c.dataqsiz {c.recvx = 0}// buf 數組里的元素個數減 1c.qcount--// 解鎖unlock(&c.lock)return true, true}if !block {// 非阻塞接收,解鎖。selected 返回 false,因為沒有接收到值unlock(&c.lock)return false, false}// 構造一個 sudog 并設置相應參數gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.selectdone = nilmysg.c = cgp.param = nil// 進入 channel 的等待接收隊列c.recvq.enqueue(mysg)// 將當前 goroutine 掛起goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)// 被喚醒了,接著從這里繼續執行一些掃尾工作if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nil// 釋放當前 gorountine 的 sudogreleaseSudog(mysg)return true, !closed
}func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {// 如果是非緩沖型的 channelif c.dataqsiz == 0 {if raceenabled {racesync(c, sg)}// 未忽略接收的數據 不是 "<- ch",而是 "val <- ch",ep 指向 valif ep != nil {// 直接拷貝數據,從 sender goroutine -> receiver goroutinerecvDirect(c.elemtype, sg, ep)}} else {// 緩沖型的 channel,但 buf 已滿。// 1. 循環數組 buf 隊首的元素拷貝到接收數據的地址// 2. 將 sender 的數據入隊。qp := chanbuf(c, c.recvx)// …………// 將 recvx 處的數據拷貝給接收者if ep != nil {typedmemmove(c.elemtype, ep, qp)}// sender data -> buftypedmemmove(c.elemtype, qp, sg.elem)// 更新索引c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx}sg.elem = nilgp := sg.g// 解鎖unlockf()gp.param = unsafe.Pointer(sg)if sg.releasetime != 0 {sg.releasetime = cputicks()}// 將當前處理器的 runnext 設置成發送數據的 Goroutine,在調度器下一次調度時將阻塞的發送方喚醒。goready(gp, skip+1)
}func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {// dst is on our stack or the heap, src is on another stack.src := sg.elemtypeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)memmove(dst, src, t.size)
}
從 channel 接收消息 的 核心函數是 chanrecv
跟 send 流程差不多:
特殊情況:
- 如果 channel 為空,那么會直接調用 runtime.gopark 掛起當前 goroutine。
- 如果 channel 已經關閉并且緩沖區沒有任何數據,runtime.chanrecv 會直接返回零值。
正常情況: - 如果 channel 的 sendq 隊列中存在掛起的 goroutine,會將 recvx 索引所在的數據拷貝到接收變量所在的內存空間上并將 sendq 隊列中 goroutine 的數據拷貝到緩沖區。
- 如果 channel 的緩沖區中包含數據,那么直接讀取 recvx 索引對應的數據。
- 在默認情況下會掛起當前的 goroutine,將 runtime.sudog 結構加入 recvq 隊列并陷入休眠等待調度器的喚醒。
從 channel 接收數據時,會觸發 goroutine 調度的兩個時機: - 當 channel 為空時。
- 當緩沖區中不存在數據并且也不存在數據的發送者時。