WaitGroup
sync.WaitGroup 用于等待一組 goroutine 返回,如:
var wg = sync.WaitGroup{}func do() {time.Sleep(time.Second)fmt.Println("done")wg.Done()
}func main() {go do()go do()wg.Add(2)wg.Wait()fmt.Println("main done")
}
概覽
如上面的例子, WaitGroup 只堆外暴露了三個方法:
// 等待的 goroutine 數加 delta
func (wg *WaitGroup) Add(delta int)
// 等待的 goroutine 數減一
func (wg *WaitGroup) Done()
// 阻塞,等待這一組 goroutine 全部退出
func (wg *WaitGroup) Wait()
type WaitGroup struct {noCopy noCopystate1 [3]uint32
}
WaitGroup 結構體中也只有兩個字段:
noCopy
: 用來保證不會被開發者錯誤拷貝state1
: 用來保存相關狀態量
另外,他還提供了一個私有的方法用來獲取狀態和信號量
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]} else {return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]}
}
statep 就是狀態量,注意這里通過 unsafe
將 3 位數組(共 96 位)強轉成了 uint64
這會導致部分數據丟失,具體來說,在64位的機器上會丟失最低 32 位,也即 state1[2]
在 32 位機器上會丟失最高 32 位,也即 state1[0]
, 這也是 64 位和 32 位機器上數組三位元素表示意義不同的原因。
強轉之后,以 64 位機器為例,數組第二位會作為 statep 的高 32 位,第一位會作為 statep 的低 32 位,也就是說,此時 statep 的結構如下:
+----------------------+-----------------------+
| | |
| Counter | Waiter |
| | |
+----------------------+-----------------------+
Add
func (wg *WaitGroup) Done() {wg.Add(-1)
}
Done 其實就是對 Add 的一個封裝。
func (wg *WaitGroup) Add(delta int) {statep, semap := wg.state()// 把 delta 加到 count 中state := atomic.AddUint64(statep, uint64(delta)<<32)// 獲取 countv := int32(state >> 32)// 丟失高 32 位的 Counter, 得到 Waiterw := uint32(state)if v < 0 {panic("sync: negative WaitGroup counter")}// Waiter 不等于 0 說明現在還有 goroutine 沒有 done, 這時是不允許 Add 的// 也即在 Wait 的過程中不允許通過 Add 添加 if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// 正常修改 Counter 后返回if v > 0 || w == 0 {return}// 到這說明 Counter == 0 并且 delta 不是一個正數(執行 Done,并且是最后一次 Done)// 狀態改變,說明有人在 Wait 過程中 Add 了if *statep != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// 狀態置 0*statep = 0// 喚醒 Wait 中的 goroutinefor ; w != 0; w-- {runtime_Semrelease(semap, false, 0)}
}
總結一下,首先 Done 只是對 Add 的簡單封裝,在 Add 時,通過巧妙利用精度丟失和位移運算分別計算出 add 后的 Counter 和 Waiter, 前者表示已經 add 了多少 Goroutine, 后者表示還有多少個 goroutine 需要 Wait, 這里需要注意,在 Wait 的過程中是不允許 Add 新 goroutine 的;在執行 Done 時,只是簡單的將 Counter 減 1,直到 Counter == 1 時,也即最后一個 goroutine 已經執行完畢時,Done 會通知 Wait 停止阻塞,并將標志清空。
Wait
func (wg *WaitGroup) Wait() {statep, semap := wg.state()for {state := atomic.LoadUint64(statep)v := int32(state >> 32)// Counter == 0, 沒有 Add, 直接返回if v == 0 {return}// 每一次 CAS 讓 Waiter 加一,并進入阻塞,等待最后一個 Done 的 goroutine 將其喚醒if atomic.CompareAndSwapUint64(statep, state, state+1) {runtime_Semacquire(semap)if *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}return}// 如果 CAS 比較沒通過,說明在此過程中有 goroutine Done 了,需要重新去獲取最新的狀態}
}
總結
WaitGroup 用于阻塞某個 Goroutine 以等待一組 goroutine 返回,在實現上,它采用一個長度為 3 的 32 位無符號整型數組保存 Waiter, Counter, 和信號量,每次 Add 時,會將 Counder 加上 delta,而當執行 Done 或 delta 為負數時,如果 Done 的是最后一個 Goroutine, Add 會去喚醒 Wait
執行 Wait 只是將 Waiter 加一并阻塞等待 Add 的喚醒,所以其實 Waiter 的值只會是 0 或 1.