本文會嘗試解釋 go runtime 中 channel 和 select 的具體實現,部分內容來自 gophercon2017。Go版本為1.8.3
channel
第一部分講述一下 channel 的用法。channel 可以看做一個隊列,用于多個goroutine之間的通信,例如下面的例子,一個goroutine發送msg,另一個msg接受消息。channel 分為帶緩沖和不帶緩沖,差別不是很大,具體請自行google。看一個簡單的例子,了解一下channel的使用。
package mainimport "fmt"func main() {// Create a new channel with `make(chan val-type)`.// Channels are typed by the values they convey.messages := make(chan string)// Send a value into a channel using the `channel <-`// syntax. Here we send `"ping"` to the `messages`// channel we made above, from a new goroutine.go func() { messages <- "ping" }()// The `<-channel` syntax receives a value from the// channel. Here we'll receive the `"ping"` message// we sent above and print it out.msg := <-messagesfmt.Println(msg)
}
channel的功能點:
- 隊列
- 阻塞
- 當一端阻塞,可以被另一個端喚醒
我們圍繞這3點功能展開,講講具體的實現。
channel結構
注釋標注了幾個重要的變量,從功能上大致可以分為兩個功能單元,一個是 ring buffer,用于存數據; 一個是存放 goroutine 的隊列。
type hchan struct {qcount uint // 當前隊列中的元素個數dataqsiz uint // 緩沖隊列的固定大小buf unsafe.Pointer // 緩沖數組elemsize uint16closed uint32elemtype *_type // element typesendx uint // 下一次發送的 indexrecvx uint // 下一次接收的 indexrecvq waitq // 接受者隊列sendq waitq // 發送者隊列// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.lock mutex
}
Ring Buffer
主要是以下變量組成的功能, 一個 buf 存儲實際數據,兩個指針分別代表發送,接收的索引位置,配合 size, count 在數組大小范圍內來回滑動。
qcount uint // 當前隊列中的元素個數
dataqsiz uint // 緩沖隊列的固定大小
buf unsafe.Pointer // 緩沖數組
sendx uint // 下一次發送的 index
recvx uint // 下一次接收的 index
舉個例子,假設我們初始化了一個帶緩沖的channel, ch := make(chan int, 3)
, 那么它初始狀態的值為:
qcount = 0
dataqsiz = 3
buf = [3]int{0, 0, 0} // 表示長度為3的數組
sendx = 0
recvx = 0
第一步,向 channel 里 send 一個值, ch <- 1
, 因為現在緩沖還沒滿,所以操作后狀態如下:
qcount = 1
dataqsiz = 3
buf = [3]int{1, 0, 0} // 表示長度為3的數組
sendx = 1
recvx = 0
快進兩部,連續向 channel 里 send 兩個值 (2, 3),狀態如下:
qcount = 3
dataqsiz = 3
buf = [3]int{1, 2, 3} // 表示長度為3的數組
sendx = 0 // 下一個發送的 index 回到了0
recvx = 0
從 channel 中 receive 一個值, <- ch
, 狀態如下:
qcount = 2
dataqsiz = 3
buf = [3]int{1, 2, 3} // 表示長度為3的數組
sendx = 0 // 下一個發送的 index 回到了0
recvx = 1 // 下一個接收的 index
阻塞
我們看下,如果 receive channel 時,channel 的 buffer中沒有數據是怎么處理的。邏輯在 chanrecv
這個方法中,它的大致流程如下,僅保留了阻塞操作的代碼。
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// 檢查 channdel 是否為 nil// 當不阻塞時,檢查buffer大小,當前大小,檢查chennel是否關閉,看看是否能直接返回// 檢查發送端是否有等待的goroutine,下部分會提到// 當前buffer中有數據,則嘗試取出。// 如果非阻塞,直接返回// 沒有sender等待,buffer中沒有數據,則阻塞等待。gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.selectdone = nilmysg.c = cgp.param = nilc.recvq.enqueue(mysg)//關鍵操作:設置 goroutine 狀態為 waiting, 把 G 和 M 分離goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)// someone woke us up// 被喚醒,清理 sudogif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed
}
這里的操作就是 創建一個 當前 goroutine 的 sudog, 然后把這個 sudog 放入 channel 的接受者等待隊列;設置當前 G 的狀態,和 M分離,到這里當前G就阻塞了,代碼不會執行下去。
當被喚醒后,執行sudog的清理操作。這里接受buffer中的值的指針是 ep
這個變量,被喚醒后好像沒有向 ep
中賦值的操作。這個我們下部分會講。
sudog
還剩最后一個疑問,當一個goroutine因為channel阻塞,另一個goroutine是如何喚醒它的。
channel 中有兩個 waitq
類型的變量, 看下結構發現,就是sudog的鏈表,關鍵是 sudog。sudog中包含了goroutine的引用,注意一下 elem
這個變量,注釋說可能會指向stack。
type waitq struct {first *sudoglast *sudog
}type sudog struct {// The following fields are protected by the hchan.lock of the// channel this sudog is blocking on. shrinkstack depends on// this.g *gselectdone *uint32 // CAS to 1 to win select race (may point to stack)next *sudogprev *sudogelem unsafe.Pointer // data element (may point to stack)// The following fields are never accessed concurrently.// waitlink is only accessed by g.acquiretime int64releasetime int64ticket uint32waitlink *sudog // g.waiting listc *hchan // channel
}
講阻塞部分的時候,我們看到goroutine被調度之前,有一個 enqueue
操作,這時,當前G的sudog已經被存入recvq
中,我們看下發送者這時的操作。
這里的操作是,sender發送的值 直接被拷貝到 sudog.elem 了。然后喚醒 sudog.g ,這樣對面的receiver goroutine 就被喚醒了。具體請下面的注釋。
func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 檢查工作// 如果能從 chennel 的 recvq 彈出 sudog, 那么直接sendif sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) })return true}// buffer有空余空間,返回; 阻塞操作
}func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {// 處理 index// 關鍵if sg.elem != nil {// 這里是根據 elemtype.size 復制內存sendDirect(c.elemtype, sg, ep)sg.elem = nil}// 一些處理// 重新設置 goroutine 的狀態,喚醒它goready(gp, 4)
}func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {// src is on our stack, dst is a slot on another stack.// Once we read sg.elem out of sg, it will no longer// be updated if the destination's stack gets copied (shrunk).// So make sure that no preemption points can happen between read & use.dst := sg.elemtypeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)memmove(dst, src, t.size)
}// memmove copies n bytes from "from" to "to".
// in memmove_*.s
//go:noescape
func memmove(to, from unsafe.Pointer, n uintptr)
select
在看 chanrecv()
方法 時,發現了一個 block 參數,代表操作是否阻塞。一般情況下,channel 都是阻塞的(不考慮buffer),那什么時候非阻塞呢?
第一個想到的就是 select, 在寫了default case的時候,其他的channel是非阻塞的。
還有一個可能不常用,就是 channel 的反射 value, 可以是非阻塞的,這個方法是public的,我們先看下簡單的。
func (v Value) TryRecv() (x Value, ok bool)
func (v Value) TrySend(x Value) bool
select 就復雜一點點,首先在源碼中發現一段注釋:
// compiler implements
//
// select {
// case c <- v:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
// }
//
func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) {return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t)))
}// compiler implements
//
// select {
// case v = <-c:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selectnbrecv(&v, c) {
// ... foo
// } else {
// ... bar
// }
//
func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) {selected, _ = chanrecv(t, c, elem, false)return
}
如果是一個 case + default 的模式,那么編譯器就調用以上方法來實現。
如果是多個 case + default 的模式呢?select 在runtime到底是如何執行的?寫個簡單的select編譯一下。
package mainfunc main() {var ch chan intselect {case <-ch:case ch <- 1:default:}
}
go tool compile -S -l -N test.go > test.s
結果中找一下關鍵字,例如:
0x008c 00140 (test.go:5) CALL runtime.newselect(SB)
0x00ad 00173 (test.go:6) CALL runtime.selectrecv(SB)
0x00ec 00236 (test.go:7) CALL runtime.selectsend(SB)
0x0107 00263 (test.go:8) CALL runtime.selectdefault(SB)
0x0122 00290 (test.go:5) CALL runtime.selectgo(SB)
這里 selectgo
是實際運行的方法,找一下,注意注釋。先檢查channel是否能操作,如果不能操作,就走 default 邏輯。
loop:// pass 1 - look for something already waitingvar dfl *scasevar cas *scasefor i := 0; i < int(sel.ncase); i++ {cas = &scases[pollorder[i]]c = cas.cswitch cas.kind {// 接受數據case caseRecv:sg = c.sendq.dequeue()// 如果有 sender 在等待if sg != nil {goto recv}// 當前buffer中有數據if c.qcount > 0 {goto bufrecv}// 關閉的channelif c.closed != 0 {goto rclose}case caseSend:if raceenabled {racereadpc(unsafe.Pointer(c), cas.pc, chansendpc)}// 關閉if c.closed != 0 {goto sclose}// 有 receiver 正在等待sg = c.recvq.dequeue()if sg != nil {goto send}// 有空間接受if c.qcount < c.dataqsiz {goto bufsend}// 走defaultcase caseDefault:dfl = cas}}if dfl != nil {selunlock(scases, lockorder)cas = dflgoto retc}