背景
在實際業務開發中,我們會遇到以下場景:請求數據庫,批量獲取1000條數據記錄后,處理數據
為了減少因一次批量獲取的數據太多,導致的數據庫延時增加,我們可以把一次請求拆分成多次請求,并發去處理,當所有的并發請求完成后,再繼續處理這些返回的數據
golang中的WaitGroup,就可以幫助我們實現上述的場景
快速入門
背景:開啟10個goroutine并發執行,等待所有goroutine執行完成后,當前goroutine打印執行完成
func TestWaitGroup(t *testing.T) {var wg sync.WaitGroupfor i := 0; i < 10; i++ {index := igo func() {wg.Add(1)defer wg.Done()fmt.Println(fmt.Sprintf("%+v 正在執行", index))}()}wg.Wait()fmt.Println("TestWaitGroup method done")
}
源碼分析
golang版本:1.18.2
源碼路徑:src/sync/waitgroup.go
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
// WaitGroup 等待 goroutine 集合完成
// 主 goroutine 調用 Add 設置等待的 goroutine 數量
// 然后每個 goroutine 運行并在完成時調用 Done
// 同時,Wait 可以用來阻塞,直到所有 goroutine 都完成
type WaitGroup struct {noCopy noCopy// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.// 64-bit atomic operations require 64-bit alignment, but 32-bit// compilers only guarantee that 64-bit fields are 32-bit aligned.// For this reason on 32 bit architectures we need to check in state()// if state1 is aligned or not, and dynamically "swap" the field order if// needed.// 64位值:高32位是計數器,低32位是waiter計數// 64位原子操作需要64位對齊,但32位編譯器僅保證64位字段是32位對齊的// 因此,在 32 位架構上,我們需要在 state() 中檢查 state1 是否對齊,并在需要時動態“交換”字段順序state1 uint64state2 uint32
}
noCopy:WaitGroup在首次使用后,不能被復制
state1,state2:一共占用12字節,保存了三類信息:4字節保存goroutine計數,4字節保存waiter計數,4字節保存信號量
WaitGroup對外提供了以下三個方法:
// 設置等待的goroutine數量
func (wg *WaitGroup) Add(delta int)
// goroutine執行完成
func (wg *WaitGroup) Done()
// 阻塞等待所有的goroutine都執行完成
func (wg *WaitGroup) Wait()
Add
// state returns pointers to the state and sema fields stored within wg.state*.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {// state1 is 64-bit aligned: nothing to do.return &wg.state1, &wg.state2} else {// state1 is 32-bit aligned but not 64-bit aligned: this means that// (&state1)+4 is 64-bit aligned.state := (*[3]uint32)(unsafe.Pointer(&wg.state1))return (*uint64)(unsafe.Pointer(&state[1])), &state[0]}
}// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
// Add 將 delta(可能為負)添加到 WaitGroup 計數器。
// 如果計數器變為零,則所有在 Wait 上阻塞的 goroutine 都會被釋放。
// 如果計數器變為負數,則添加panic。
// 請注意,計數器為零時發生的具有正增量的調用必須在等待之前發生。
// 具有負增量的調用或在計數器大于零時開始的具有正增量的調用可能隨時發生。
// 通常,這意味著對 Add 的調用應該在創建 goroutine 或其他要等待的事件的語句之前執行。
// 如果重用一個 WaitGroup 來等待幾個獨立的事件集,新的 Add 調用必須在所有先前的 Wait 調用返回后發生。
func (wg *WaitGroup) Add(delta int) {statep, semap := wg.state()if race.Enabled {_ = *statep // trigger nil deref earlyif delta < 0 {// Synchronize decrements with Wait.race.ReleaseMerge(unsafe.Pointer(wg))}race.Disable()defer race.Enable()}// 記錄goroutine計數state := atomic.AddUint64(statep, uint64(delta)<<32)// 獲取goroutine計數v := int32(state >> 32)// 獲取waiter計數w := uint32(state)if race.Enabled && delta > 0 && v == int32(delta) {// The first increment must be synchronized with Wait.// Need to model this as a read, because there can be// several concurrent wg.counter transitions from 0.race.Read(unsafe.Pointer(semap))}// goroutine計數小于0if v < 0 {panic("sync: negative WaitGroup counter")}// w != 0說明已經執行了Wait且還有阻塞等待的goroutine,此時不允許在執行Addif w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// 存在沒有執行完成的goroutine,或者當前沒有waiter,直接返回if v > 0 || w == 0 {return}// This goroutine has set counter to 0 when waiters > 0.// Now there can't be concurrent mutations of state:// - Adds must not happen concurrently with Wait,// - Wait does not increment waiters if it sees counter == 0.// Still do a cheap sanity check to detect WaitGroup misuse.// 此時goroutine計數為0,且waiter計數大于0,不然上一步就返回了// 現在以下狀態不能同時發生:// 1. 并發調用Add和Wait// 2. 當goroutine計數為0時,Wait不會繼續增加waiter計數// 仍然做一個廉價的健全性檢查來檢測 WaitGroup 的濫用,防止以上情況發生if *statep != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// Reset waiters count to 0.// 重置waiter計數*statep = 0// 喚醒所有的waiterfor ; w != 0; w-- {runtime_Semrelease(semap, false, 0)}
}
delta代表本次需要記錄的goroutine計數,可能為負數
64位原子操作需要64位對齊,但32位編譯器僅保證64位字段是32位對齊的
當state1是64位對齊時,state1高32位是goroutine計數,低32位是waiter計數
當state1不是64位對齊時,動態“交換”字段順序
記錄goroutine計數的變化delta
如果goroutine計數小于0,則直接panic
如果已經執行了Wait且還有阻塞等待的goroutine,此時不允許在執行Add
如果存在沒有執行完成的goroutine,或者當前沒有waiter,直接返回
當goroutine計數為0,且waiter計數大于0時,現在以下狀態不能同時發生:
并發調用Add和Wait
當goroutine計數為0時,Wait不會繼續增加waiter計數
簡單校驗通過后,重置waiter計數為0,喚醒所有阻塞等待的waiter
Done
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {wg.Add(-1)
}
調用Add,delta = -1,代表goroutine計數-1
Wait
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {statep, semap := wg.state()if race.Enabled {_ = *statep // trigger nil deref earlyrace.Disable()}for {state := atomic.LoadUint64(statep)// 獲取goroutine計數v := int32(state >> 32)// 獲取waiter計數w := uint32(state)// goroutine計數為0,不需要等待,直接返回if v == 0 {// Counter is 0, no need to wait.if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}// Increment waiters count.// waiter計數+1if atomic.CompareAndSwapUint64(statep, state, state+1) {if race.Enabled && w == 0 {// Wait must be synchronized with the first Add.// Need to model this is as a write to race with the read in Add.// As a consequence, can do the write only for the first waiter,// otherwise concurrent Waits will race with each other.race.Write(unsafe.Pointer(semap))}// 阻塞,等待goroutine計數為0后喚醒繼續執行runtime_Semacquire(semap)// Wait還沒有執行完成,就開始復用WaitGroupif *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}}
}
調用state(),保證字段內存對齊
如果goroutine計數為0,不需要等待,直接返回
嘗試對waiter計數+1,若失敗,則繼續下一輪重試
對waiter計數+1成功,則阻塞當前goroutine,等待goroutine計數為0后喚醒繼續執行
喚醒繼續執行后,簡單判斷是否存在Wait還沒有執行完成,就開始復用WaitGroup的情況,如果有,則panic;如果沒有,則直接返回