簡介
這篇主要介紹 sync.Once、sync.WaitGroup和sync.Mutex
sync.Once
once 顧名思義 只執行一次 廢話不說 我們看源碼 英文介紹直接略過了 感興趣的建議讀一讀 獲益匪淺
其結構體如下
Once 是一個嚴格只執行一次的object
type Once struct {// 建議看下源碼的注解,done 放在結構體第一個 所以其 地址就是 結構體的地址 不用加偏移量 則生成的匯編代碼很緊湊,// 且cpu減少一次偏移量計算,執行效率高。所以對其頻繁的訪問(形成 hot path)速度更快。done uint32 m Mutex // 互斥鎖
}
其當然只有一個函數 Do 我們來看下源碼
// Do 嚴格執行一次 f
func (o *Once) Do(f func()) {// Note: Here is an incorrect implementation of Do://// if atomic.CompareAndSwapUint32(&o.done, 0, 1) {// f()// }//// Do guarantees that when it returns, f has finished.// This implementation would not implement that guarantee:// given two simultaneous calls, the winner of the cas would// call f, and the second would return immediately, without// waiting for the first's call to f to complete.// This is why the slow path falls back to a mutex, and why// the atomic.StoreUint32 must be delayed until after f returns.// 上面英文翻譯過來大意如下:// 注意 上述 代碼里利用 CompareAndSwapUint32 來實現 是一個錯誤的示例。// 因為 Do 函數要確保 協程 返回時 f已經執行完畢了(因為如果f沒執行完畢,// 某協程就返回,這時f當中配置項 可能還沒初始化, 那么該協程 要調用 f 里// 定義的 配置項 可能 會報空指針異常),但是這種實現不能保證:例如 有兩個// 同時對Do的調用,操作cas成功的調用會執行f(其實cas 中 比較 賦值 返回 這三種操作是// 直接調用操作系統 lock 命令實現的原子操作),另一個操作cas失敗不會等第一個調用// 操作f 就直接返回。這就是為什么慢路徑操作(doShow)會使用互斥鎖,以及為什么 StoreUint32// 必須等 f 執行完畢后才能調用的原因。// 其實啰嗦了一大堆 就是要保證 所有調用 返回時 保證 f中初始化的配置文件 結構體 可用 不能報空指針異常等。// 這里是 第一波同時調用后(f已經執行完畢) ,后續調用可以直接對o.done來判斷// 后續o.doSlow還要不要走,以便不執行加鎖等影響效率的操作.// 另外done如果調用過多就是熱路徑, 會直接內聯到調用處,我的理解是 直接將&o.done 替換成一個 變量 這個變量就是1if atomic.LoadUint32(&o.done) == 0 {// Outlined slow-path to allow inlining of the fast-path.o.doSlow(f)}
}
其中 doSlow 函數如下
func (o *Once) doSlow(f func()) {o.m.Lock() // 加鎖defer o.m.Unlock() // defer 壓棧 最后執行if o.done == 0 { // 第一波同時調用 都在 鎖這邊 阻塞,則保證就一個執行 其余的直接返回就行 這就是為什么 必須要判斷兩次 o.done==0(第一次判斷 見atomic.LoadUint32處 )defer atomic.StoreUint32(&o.done, 1) // defer 壓棧 函數 f 執行完畢后 再修改 done的值f() // 執行函數}
}
sync.WaitGroup
WaitGroup結構體如下
type WaitGroup struct {noCopy noCopy// 原子類的 數據存儲 為 64位,其中高32位存 待完成的任務(協程)數,低32位存儲 在信號sema處阻塞的協程數(一般情況下是主協程,所以 waiter 一般為1)state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.sema uint32 // 信號量 阻塞了多少協程(一般是主協程) 阻塞邏輯根據這個參數控制 一般情況最大也就是1
}
其中關于 32 位和 64位等平臺 運行差異可以自行搜索查看,不在贅述
其中最重要的函數是 Add(delta int), Done(), Wait().
下面舉一個小例子
func demo(wg *sync.WaitGroup) {fmt.Println("this is demo")wg.Done() // 執行完任務后 任務數減1}func TestWaitGroup(t *testing.T) {var wg sync.WaitGroupwg.Add(5) // Add 函數 用來 添加需要 執行多少任務for i := 0; i < 5; i++ {go func() {demo(&wg)}()}wg.Wait() // 主協程阻塞 等待 任務完成
}
其中 Add(delta int)函數 講解 如下:
// Add 添加一個 delta 數量的未完成任務;delta可以為負數
func (wg *WaitGroup) Add(delta int) {if race.Enabled { // 一般為false ,測試情況下為trueif delta < 0 {// Synchronize decrements with Wait.race.ReleaseMerge(unsafe.Pointer(wg))}race.Disable()defer race.Enable()}state := wg.state.Add(uint64(delta) << 32) // state 高32為 加 deltav := int32(state >> 32) // 高32位為 待完成協程數w := uint32(state) // 阻塞等待協程數;一般為主協程阻塞 w一般為 0和1 ,有大神知道例外情況不,歡迎補充if race.Enabled && delta > 0 && v == int32(delta) {// The first increment must be synchronized with Wait.// Need to model this as a read, because there can be// several concurrent wg.counter transitions from 0.race.Read(unsafe.Pointer(&wg.sema))}if v < 0 { // 待完成任務數 <0 例如: Add(2) 但是 Done()了 3次panic("sync: negative WaitGroup counter")}if w != 0 && delta > 0 && v == int32(delta) { // Add和Wait在并發條件下被調用,不合理panic("sync: WaitGroup misuse: Add called concurrently with Wait")}if v > 0 || w == 0 { // 當待完成的任務大于0 或者 等待任務是0 (沒走到主協程調用wait),則返回return}// This goroutine has set counter to 0 when waiters > 0.// Now there can't be concurrent mutations of state:// - Adds must not happen concurrently with Wait,// - Wait does not increment waiters if it sees counter == 0.// Still do a cheap sanity check to detect WaitGroup misuse.if wg.state.Load() != state { // 誤用panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// Reset waiters count to 0.wg.state.Store(0) // 先將 state 置為 0,走到這一步時,肯定是 v==0&&w>0,這時開始喚醒 沉睡的協程(主程序),所以為了復用wg需要初始化其參數for ; w != 0; w-- { // 根據 waiter數量 喚醒 每個在sema處阻塞的協程,下面函數執行完畢后 sema為1runtime_Semrelease(&wg.sema, false, 0) // 釋放 信號量 喚醒 沉睡的協程,這里 wg.sema采用cas自增1}
}
Done()函數源碼 如下
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() { // Done 就是 協程完畢后 非完成協程數減1wg.Add(-1)
}
Wait()函數 源碼 如下:
func (wg *WaitGroup) Wait() {if race.Enabled {race.Disable()}for { // 請注意 for循環 目前只會執行一次循環 Wait的阻塞機制不在 for 循環處(至少 通常情況下是)state := wg.state.Load()v := int32(state >> 32)w := uint32(state)if v == 0 { // 如果 非完成的協程數為0,則Wait直接返回。例如: 主程序還沒走到 Wait()代碼處,前面調用了sleep函數,// 則到Wait時可能所有協程都執行完畢了,這時 v==0// Counter is 0, no need to wait.if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}// Increment waiters count.if wg.state.CompareAndSwap(state, state+1) { // 這里是 cas對state進行自增;主程序來增state的低32位也就是 waiter數,// 這里大家應該就明白了 Wait只有主程序調用所以 state低32位最大是1(其他情況請大神告訴下)if race.Enabled && w == 0 {// Wait must be synchronized with the first Add.// Need to model this is as a write to race with the read in Add.// As a consequence, can do the write only for the first waiter,// otherwise concurrent Waits will race with each other.race.Write(unsafe.Pointer(&wg.sema))}runtime_Semacquire(&wg.sema) // 在這邊阻塞(看其源碼是調用這個函數的協程阻塞,也就是主協程阻塞),這時 未完成協程大于0;其會一直阻塞直到 sema大于0(Add函數最后代碼部分),然后就對 sema進行遞減 喚醒協程(主協程);// 目前sema就兩個值 跟 state一樣 0 ,1 所以邏輯相對簡單。其源碼 見 runtime/sema.go 感興趣的可以看看if wg.state.Load() != 0 { // 查看state是否被重置了(見 Add wg.state.Store(0) ) 如果沒有 panicpanic("sync: WaitGroup is reused before previous Wait has returned")}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return // 喚醒主協程后 退出 Wait()函數}}
}
sync.Mutex
鎖 就是我鎖上 你不能用 我打開你才能用 sync.Mutex 主要采用了 自旋(runtime_doSpin(): 操作系統命令 pause)和睡眠(runtime_SemacquireMutex: 類似 linux futex阻塞) )方式來 使得 協程進行阻塞也就是上鎖。采用釋放信號量 (runtime_Semrelease)來喚醒阻塞協程(可以喚醒任意一個或者隊列第一個)或者自旋直接獲取鎖(無需信號量參與)
看似挺簡單 但其源碼 我利用業余時間大概看了一周左右吧 雖然也就二百多行 但是是我看過有限源碼里比較難理解的了,所以再向大神對齊的路上是愈來愈拉胯了看來,下面我們開始分析下源碼
我們看下Mutex鎖結構體
type Mutex struct {state int32 // 鎖的狀態 sema uint32 // 信號量
}
其實現了如下鎖接口:
type Locker interface {Lock()Unlock()
}
首先需要先認識幾個參數 如下:
const (mutexLocked = 1 << iota // 鎖上鎖標志mutexWoken // 有協程被喚醒標志mutexStarving // 當前鎖饑餓標志mutexWaiterShift = iota // state左移右移 位數 用來計算 waiters數量// Mutex fairness. //// Mutex can be in 2 modes of operations: normal and starvation.// In normal mode waiters are queued in FIFO order, but a woken up waiter// does not own the mutex and competes with new arriving goroutines over// the ownership. New arriving goroutines have an advantage -- they are// already running on CPU and there can be lots of them, so a woken up// waiter has good chances of losing. In such case it is queued at front// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,// it switches mutex to the starvation mode.//// In starvation mode ownership of the mutex is directly handed off from// the unlocking goroutine to the waiter at the front of the queue.// New arriving goroutines don't try to acquire the mutex even if it appears// to be unlocked, and don't try to spin. Instead they queue themselves at// the tail of the wait queue.//// If a waiter receives ownership of the mutex and sees that either// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,// it switches mutex back to normal operation mode.//// Normal mode has considerably better performance as a goroutine can acquire// a mutex several times in a row even if there are blocked waiters.// Starvation mode is important to prevent pathological cases of tail latency.// 以上翻譯如下:// Mutex 公平鎖// Mutex 有兩種模式 :正常模式和饑餓模式// 正常模式下獲取鎖的順序是先進先出,但是一個喚醒的等待者需要和一個新到達的協程競爭鎖。// 新到達的協程有一個優勢,它們已經在cpu上運行了而且數量很多,所以剛被喚醒的協程就失去// 了搶占鎖的機會,這時它就會排在隊列的頭部。如果一個協程超過1ms沒獲取鎖,那么鎖狀態就會// 切換為饑餓模式。// 饑餓模式下 直接將鎖從正在執行 unlock操作 的協程交給 隊列頭部排隊的協程,即使鎖未鎖// 定狀態新到達的協程也不能獲得鎖,也不進行自旋。相反他們會直接查到隊列尾部// 這是從 正常模式到饑餓模式 還得從饑餓模式切換回去呢 要滿足兩個條件// (1) 協程是隊列最后一個 (2) 它等待時間少于1ms// 正常模式性能要好很多,因為即使有阻塞的等待協程,一個協程也可以連續多次獲取鎖 ?? 這是為啥// 1msstarvationThresholdNs = 1e6
)
那說完了這些 以上這些參數跟 mutex啥關系呢 我們來看一張圖
看到了吧 直接用 位圖 前三位來表示mutex的各種狀態 后29位來表示 waiters的數量
接下來 我們來看下 mutex 實現的 Locker的兩個函數
Lock()函數
我們先來梳理下其粗粒度的流程圖:
下面代碼可以按照上圖進行梳理
// Lock() 先采用cas快速獲取鎖 如果獲取失敗 就 阻塞等待鎖釋放 ps: 阻塞其實有三種情況 1 自旋 2. 進入 等待隊列 3. 前兩種都失敗 繼續 for重試1,2兩種情況 這也會造成阻塞的效果
func (m *Mutex) Lock() {// Fast path: grab unlocked mutex.if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {if race.Enabled {race.Acquire(unsafe.Pointer(m))}return}// Slow path (outlined so that the fast path can be inlined)m.lockSlow()
}
其中最重要的 是 lockSlow()函數 是重點 也是難點
其代碼如下:
// 其實說白了 就是根據鎖當前的狀態和當前協程的狀態 來 更新 鎖狀態 更新 當前協程狀態 然后在一定條件下阻塞協程(pause 或者加入隊列)
func (m *Mutex) lockSlow() {var waitStartTime int64 // 當前協程等待的時間starving := false // 當前協程狀態awoke := false // 當前協程是否被喚醒iter := 0 // 當前協程自旋次數old := m.state // 當前鎖狀態for {// Don't spin in starvation mode, ownership is handed off to waiters// so we won't be able to acquire the mutex anyway.// 自旋條件:非饑餓模式、鎖鎖著、沒達到最大自旋次數 自旋就是 浪費cpu的時鐘周期 所以要 限制自旋的次數if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {// Active spinning makes sense.// Try to set mutexWoken flag to inform Unlock// to not wake other blocked goroutines.// 協程喚醒條件: 當前協程非喚醒、鎖非喚醒、等待的協程數不為0 則更新鎖為喚醒狀態 更新成功后 協程變為喚醒狀態// 將鎖 置為喚醒模式 是防止 mutex解鎖時再喚醒其他協程if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}// 協程開始睡眠 底層調用的 操作系統 pauseruntime_doSpin()// 自旋數加1iter++// 重新獲取 state值 用于比較計算old = m.statecontinue}// 不能自旋時,要么cas更新 state某個標志位和waiters數量 要么 繼續 for循環 執行如下邏輯。// 其實不能自旋就兩種情況:// 1. 本協程原因 自旋達到了 閾值// 2. 別的協程原因 修改了 state 使得 old&(mutexLocked|mutexStarving) == mutexLocked 為false 咦 這不廢話嗎// 以下代碼主要開始準備計算 new 用cas來更新 statenew := old// Don't try to acquire starving mutex, new arriving goroutines must queue.// 如果是非饑餓模式 new 狀態 變為 上鎖(新來的協程 鎖狀態不是饑餓 就不用去queue里等待 可以直接嘗試獲取鎖 所以要更新 new)if old&mutexStarving == 0 {new |= mutexLocked}// 等待協程數加1條件: 當前鎖鎖著或者為饑餓狀態(于此相反的 是 非鎖定且非饑餓狀態 可以直接嘗試獲取鎖 無需增加等待記數)if old&(mutexLocked|mutexStarving) != 0 {new += 1 << mutexWaiterShift}// The current goroutine switches mutex to starvation mode.// But if the mutex is currently unlocked, don't do the switch.// Unlock expects that starving mutex has waiters, which will not// be true in this case.// new更新為饑餓狀態:當前協程 饑餓狀態(等待超過1ms) 并且 鎖鎖著if starving && old&mutexLocked != 0 {new |= mutexStarving}// 看了幾篇帖子 還是沒整明白這里 先 todo吧if awoke {// The goroutine has been woken from sleep,// so we need to reset the flag in either case.if new&mutexWoken == 0 {throw("sync: inconsistent mutex state")}// 清除 喚醒標記new &^= mutexWoken}// 開始采用cas 根據 new 修改 state cas 成功后 (有可能就只更新了 協程等待數) 進行 計算等待時間 入等待隊列 等 操作if atomic.CompareAndSwapInt32(&m.state, old, new) {// 如果原先的 狀態是 鎖已釋放 且 是非饑餓狀態,則這個協程可直接獲取鎖 且可直接 執行 Lock()后的代碼,沒必要執行下面 入隊列 等邏輯了if old&(mutexLocked|mutexStarving) == 0 {break // locked the mutex with CAS}// 走到這里 證明 原先鎖 未釋放 或者 是饑餓狀態 則 需要將 協程加入隊列(頭或者尾部)注意這里 我們不管 new是啥狀態 只管原先old的狀態// If we were already waiting before, queue at the front of the queue.// 如果原先等待過 則 cas成功后 直接 加入等待隊列頭 設置計算本協程等待時間queueLifo := waitStartTime != 0// 等待時間初始化 作為基準時間if waitStartTime == 0 {waitStartTime = runtime_nanotime()}// 將當前協程 加入等待隊列(已等待過直接加入等待頭部)使用sleep源語進行阻塞runtime_SemacquireMutex(&m.sema, queueLifo, 1)// 下面代碼是本協程出隊列被喚醒后 執行的// 加入等待隊列后 計算等待時間 超過閾值 修改本協程狀態為 饑餓starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs// 獲取當前鎖狀態old = m.state// 當前鎖為饑餓 則直接獲取鎖 (防止協程被餓死),否則就去自旋if old&mutexStarving != 0 {// If this goroutine was woken and mutex is in starvation mode,// ownership was handed off to us but mutex is in somewhat// inconsistent state: mutexLocked is not set and we are still// accounted as waiter. Fix that.// 協程是被喚醒的 且鎖是饑餓模式下 鎖一定是未鎖定,且是未被喚醒狀態(如果是喚醒狀態 todo)或者 隊列位空 則拋出異常if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {throw("sync: inconsistent mutex state")}// 等待隊列數量減1 同時 設定鎖為鎖定狀態 delta 最終是要 加在 atomic.AddInt32 上 下面式子 可以分解為// 1. 設定鎖為鎖定狀態 atomic.AddInt32(&m.state, mutexLocked)// 2. 等待隊列數量減1 atomic.AddInt32(&m.state, - 1<<mutexWaiterShift)// 由于 其在 state 中的 二進制表示 互不影響 所以可以 合并成 int32(mutexLocked - 1<<mutexWaiterShift)delta := int32(mutexLocked - 1<<mutexWaiterShift)// 如果當前協程處于非饑餓狀態 或者本協程是最后一個 等待者 則 將鎖狀態置為正常狀態(改為正常狀態 是因為饑餓模式下 所有協程都會入隊列sleep 不會自旋等待 性能消耗大)if !starving || old>>mutexWaiterShift == 1 {// Exit starvation mode.// Critical to do it here and consider wait time.// Starvation mode is so inefficient, that two goroutines// can go lock-step infinitely once they switch mutex// to starvation mode.delta -= mutexStarving}// 因為是被喚醒的 則直接更新狀態 就行 不用cas 更新完成后直接退出 Lock() 執行 其后代碼atomic.AddInt32(&m.state, delta)break}// 本協程被喚醒 自旋次數清零 且從for循環重新開始awoke = true// 自旋次數清零iter = 0} else {// 自旋或者 cas修改鎖狀態失敗 繼續獲取 state 從 for循環開始 這時 本協程 既沒有 修改本身任何狀態 也沒有修改state任何狀態old = m.state}}if race.Enabled {race.Acquire(unsafe.Pointer(m))}
}
Unlock()
unlock就比較簡單了 我們直接看它
func (m *Mutex) Unlock() {if race.Enabled {_ = m.staterace.Release(unsafe.Pointer(m))}// Fast path: drop lock bit.// 因為 Unlock 只能一個協程執行 所以 可以直接修改 鎖狀態 鎖解鎖new := atomic.AddInt32(&m.state, -mutexLocked)// 如果 等待協程數量不為0 或者 鎖饑餓 或者 鎖為喚醒狀態 執行慢解鎖流程 否則 解鎖完畢if new != 0 {// Outlined slow path to allow inlining the fast path.// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.m.unlockSlow(new)}
}
其中 unlockSlow()函數 代碼如下
func (m *Mutex) unlockSlow(new int32) {if (new+mutexLocked)&mutexLocked == 0 {fatal("sync: unlock of unlocked mutex")}// 如果鎖 非 饑餓if new&mutexStarving == 0 {old := newfor {// If there are no waiters or a goroutine has already// been woken or grabbed the lock, no need to wake anyone.// In starvation mode ownership is directly handed off from unlocking// goroutine to the next waiter. We are not part of this chain,// since we did not observe mutexStarving when we unlocked the mutex above.// So get off the way.// 如果 等待的協程為0 沒必要再去更新 state 狀態了 直接返回// 如果鎖上鎖了 表示已經有協程獲取到了鎖 不用再喚醒 且 等待協程減1了 直接返回// 如果鎖是喚醒狀態 說明已經有協程被喚醒了 (自旋的沒入隊列的協程被喚醒 這就是為什么 自旋的協程比 入隊列協程更容易獲取鎖的原因)// 如果鎖是 饑餓狀態 todo 不用更新 等待協程數量?? 為啥不執行 runtime_Semrelease(&m.sema, true, 1) ??if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}// Grab the right to wake someone.// 等待協程數-1 鎖狀態 置為以喚醒new = (old - 1<<mutexWaiterShift) | mutexWokenif atomic.CompareAndSwapInt32(&m.state, old, new) {// 喚醒一個協程runtime_Semrelease(&m.sema, false, 1)return}old = m.state}} else {// Starving mode: handoff mutex ownership to the next waiter, and yield// our time slice so that the next waiter can start to run immediately.// Note: mutexLocked is not set, the waiter will set it after wakeup.// But mutex is still considered locked if mutexStarving is set,// so new coming goroutines won't acquire it.// 饑餓模式下 直接喚醒隊列頭協程,注意此時state還沒加鎖狀態 喚醒的 協程會設置,也會 執行 等待隊列數減1等// 注意 在饑餓模式下 鎖仍然被認為是 鎖定的狀態 (我個人認為只是效果一樣,因為饑餓狀態 別的協程過來 會直接插到 隊列尾部 不會去獲取鎖 因為不會自旋)runtime_Semrelease(&m.sema, true, 1)}
}
還是有一些小疑問 沒解決 先放著吧 有大神知道的可以解答下 疑問點都標注在注解中了