本篇文章主要講鎖,主要會涉及go的sync.Mutex和sync.RWMutex。
一.鎖的概念和發展
1.1 鎖的概念
所謂的加鎖和解鎖其實就是指一個數據是否被占用了,通過Mutex內的一個狀態來表示。
例如,取 0 表示未加鎖,1 表示已加鎖;
- 上鎖:把 0 改為 1;
- 解鎖:把 1 置為 0.
- 上鎖時,假若已經是 1,則上鎖失敗,需要等鎖的主人解鎖,將狀態改為 0,才可以被其他鎖鎖上。
這就是一個鎖的基本骨架,鎖主要就是加鎖和解鎖兩個狀態。
并且這里要注意一個點,就是這兩個操作具有原子性,不可以被拆解。
1.2 由自旋等阻塞的升級過程
一個優先的工具需要具備探測并適應環境,從而采取不同對策因地制宜的能力.
針對 goroutine 加鎖時發現鎖已被搶占的這種情形,此時擺在面前的策略有如下兩種:
- 阻塞/喚醒:將當前 goroutine 阻塞掛起,直到鎖被釋放后,以回調的方式將阻塞 goroutine 重新喚醒,進行鎖爭奪;
- 自旋 + CAS:基于自旋結合 CAS 的方式,重復校驗鎖的狀態并嘗試獲取鎖,始終把主動權握在手中.
阻塞和喚醒大家肯定都知道,這里主要說一下自旋和CAS
如果看過gmp的同學對自旋肯定不陌生,所謂的自旋其實就是輪詢,什么意思?這里舉一個例子:
比如所謂的主動輪詢,其實就是指如果加鎖失敗之后,它會停歇一會,然后再次詢問,我可以加鎖嘛?如果可以,就取到這個鎖,要是不可以,就進行下一次的輪詢。
和阻塞/喚醒不同,他是需要等待通知,說這把鎖釋放了,然后后續的goroutine才可以拿到這把鎖。
CAS是什么?
CAS全稱為Compare-And-Swap,是一種原子操作,用于多線程編程中實現無鎖同步。
上述的方案各有各的優缺點,都有其對應的適用場景,接下來來看一下
鎖競爭方案 | 優勢 | 劣勢 | 適用場景 |
阻塞/喚醒 | 精準打擊,不浪費 CPU 時間片 | 需要掛起協程,進行上下文切換,操作較重 | 并發競爭激烈的場景 |
自旋+CAS | 無需阻塞協程,短期來看操作較輕 | 長時間爭而不得,會浪費 CPU 時間片 | 并發競爭強度低的場景 |
這里對這兩種鎖思想做一個介紹吧:
阻塞/喚醒:這種形式被稱為是悲觀鎖,當G獲取鎖失敗而阻塞時,會被掛起,標記為waiting的狀態,主動讓出Processor,直接讓M和G結合,而P去執行其他的G(保證不會浪費這個P)鎖被釋放之后,才會喚醒G
自旋+CAS:這種形式被稱為是樂觀鎖,主動權掌握在自己的手中(也就是不釋放processor),會不斷主動輪詢嘗試獲取這個鎖
而sync.Mutex結合了上述的兩種方案,指定了一個鎖升級的過程,讓我們來看看吧
進行了一個怎么樣的鎖升級?
其實就是設計了一個狀態的轉化,由樂觀轉換為悲觀,為什么要這樣設計呢?
先來說說具體的方法:
- 首先保持樂觀,goroutine 采用自旋 + CAS 的策略爭奪鎖;
- 嘗試持續受挫達到一定條件后,判定當前過于激烈,則由自旋轉為 阻塞/掛起模式.
這樣做的原因是可以具備探測和適應環境,因地制宜采取不同的策略,首先采用樂觀的狀態,如果幾次自旋無果,就認為現在是并發激烈的情況,就會轉化為悲觀的狀態。
1.3 饑餓模式
上一小節的升級策略主要是面向性能,而本小節引入的饑餓模式,則是對公平性問題的探討。
下面首先拎清兩個概念:
- 饑餓:顧名思義,是因為非公平機制的原因,導致 Mutex 阻塞隊列中存在 goroutine 長時間取不到鎖,從而陷入饑荒狀態;
- 饑餓模式:當 Mutex 阻塞隊列中存在處于饑餓態的 goroutine 時,會進入模式,將搶鎖流程由非公平機制轉為公平機制.
Mutex運作下的兩種模式
- 正常模式/非饑餓模式:這是 sync.Mutex 默認采用的模式. 當有 goroutine 從阻塞隊列被喚醒時,會和此時先進入搶鎖流程的 goroutine 進行鎖資源的爭奪,假如搶鎖失敗,會重新回到阻塞隊列頭部.
這里雖然有一個阻塞隊列,當鎖資源被釋放,按理說阻塞隊列的隊首的G或獲取這個鎖資源,這其實是很公平了,但是實際上他只是看似公平,因為還有沒進阻塞隊列的G,還記得什么時候進阻塞隊列嘛?對,就是當自旋結束才會進,這樣一來就很清晰了,隊首的G會和自旋的G搶占這個鎖,如果說隊首的G排了半天隊,結果被這個初出茅廬的自旋G搶了鎖資源,這還叫公平嘛?結果顯而易見,肯定是不公平的,于是為了解決這個問題,就有了饑餓模式。
(值得一提的是,此時被喚醒的老 goroutine 相比新 goroutine 是處于劣勢地位,因為新 goroutine 已經在占用 CPU 時間片,且新 goroutine 可能存在多個,從而形成多對一的人數優勢,因此形勢對老 goroutine 不利.)
- 饑餓模式:這是 sync.Mutex 為拯救陷入饑荒的老 goroutine 而啟用的特殊機制,饑餓模式下,鎖的所有權按照阻塞隊列的順序進行依次傳遞. 新 goroutine 進行流程時不得搶鎖,而是進入隊列尾部排隊.
這樣就可以避免自旋的鎖搶占鎖資源了
兩種模式的轉化
- 默認為正常模式;
- 正常模式 -> 饑餓模式:當阻塞隊列存在 goroutine 等鎖超過 1ms 而不得,則進入饑餓模式;
- 饑餓模式 -> 正常模式:當阻塞隊列已清空,或取得鎖的 goroutine 等鎖時間已低于 1ms 時,則回到正常模式.
小結:正常模式靈活機動,性能較好;饑餓模式嚴格死板,但能捍衛公平的底線. 因此,兩種模式的切換體現了 sync.Mutex 為適應環境變化,在公平與性能之間做出的調整與權衡. 回頭觀望,這一項因地制宜、隨機應變的能力正是許多優秀工具所共有的特質.
二.sync.Mutex
在這之前呢,做一個簡單的補充,在sync下,提供了一個接口,提供了一個實現屬于自己的鎖的方法哦
type Locker interface {Lock()Unlock()
}
2.1 核心數據結構
type Mutex struct {state int32sema uint32
}
- state:鎖中最核心的狀態字段,不同 bit 位分別存儲了 mutexLocked(是否上鎖)、mutexWoken(是否有 goroutine 從阻塞隊列中被喚醒)、mutexStarving(是否處于饑餓模式)的信息,具體在 2.2 節詳細展開;
- sema:用于阻塞和喚醒 goroutine 的信號量.
const (mutexLocked = 1 << iota // mutex is lockedmutexWokenmutexStarvingmutexWaiterShift = iotastarvationThresholdNs = 1e6
)
- mutexLocked = 1:state 最右側的一個 bit 位標志是否上鎖,0-未上鎖,1-已上鎖;
- mutexWoken = 2:state 右數第二個 bit 位標志是否有 goroutine 從阻塞中被喚醒,0-沒有,1-有;
- mutexStarving = 4:state 右數第三個 bit 位標志 Mutex 是否處于饑餓模式,0-非饑餓,1-饑餓;
- mutexWaiterShift = 3:右側存在 3 個 bit 位標識特殊信息,分別為上述的 mutexLocked、mutexWoken、mutexStarving;
- starvationThresholdNs = 1 ms:sync.Mutex 進入饑餓模式的等待時間閾值.
2.2 state字段
低 3 位分別標識 mutexLocked(是否上鎖)、mutexWoken(是否有協程在搶鎖)、mutexStarving(是否處于饑餓模式),高 29 位的值聚合為一個范圍為 0~2^29-1 的整數,表示在阻塞隊列中等待的協程個數.
2.3 加鎖Mutex.Lock() (了解即可)
在之前說過一個鎖要實現加鎖和解鎖的操作,接下來就來看看加鎖的操作
func (m *Mutex) Lock() {// Fast path: 嘗試直接通過 CAS 搶占鎖if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {if race.Enabled {race.Acquire(unsafe.Pointer(m))}return}// Slow path: 處理鎖競爭或鎖已被持有的情況m.lockSlow()
}
使用 原子操作 CompareAndSwapInt32
檢查鎖狀態:如果鎖的 state
為 0
(未鎖定),則將其設為 mutexLocked
(1),表示鎖被當前 Goroutine 持有。
否則就進入lockslow
來看下這個lockslow
func (m *Mutex) lockSlow() {var waitStartTime int64starving := falseawoke := falseiter := 0old := m.state
? waitStartTime:標識當前 goroutine 在搶鎖過程中的等待時長,單位:ns;
? starving:標識當前是否處于饑餓模式;
? awoke:標識當前是否已有協程在等鎖;
? iter:標識當前 goroutine 參與自旋的次數;
? old:臨時存儲鎖的 state 值.for {// 進入該 if 分支,說明搶鎖失敗,處于饑餓模式,但仍滿足自旋條件if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {// 進入該 if 分支,說明當前鎖阻塞隊列有協程,但還未被喚醒,因此需要將 // mutexWoken 標識置為 1,避免再有其他協程被喚醒和自己搶鎖if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}runtime_doSpin()iter++old = m.statecontinue}// ......}if race.Enabled {race.Acquire(unsafe.Pointer(m))}
}
- 走進 for 循環;
- 假如滿足三個條件:I 鎖已被占用、 II 鎖為正常模式、III 滿足自旋條件(runtime_canSpin 方法),則進入自旋后處理環節;
- 在自旋后處理中,假如當前鎖有尚未喚醒的阻塞協程,則通過 CAS 操作將 state 的 mutexWoken 標識置為 1,將局部變量 awoke 置為 true;
- 調用 runtime_doSpin 告知調度器 P 當前處于自旋模式;
- 更新自旋次數 iter 和鎖狀態值 old;
- 通過 continue 語句進入下一輪嘗試.
上面的部分可以自旋的情況,當一定次數的自旋之后,會改變狀態,調整字段,然后進入悲觀狀態,我們來看看,簡單過一遍吧,結合ai的解讀
func (m *Mutex) lockSlow() {// ......for {// ......new := old// 若非饑餓模式,嘗試直接獲取鎖if old&mutexStarving == 0 {new |= mutexLocked}// 若鎖已被持有或處于饑餓模式,增加等待者數量if old&(mutexLocked|mutexStarving) != 0 {new += 1 << mutexWaiterShift}// 若鎖已被持有或處于饑餓模式,增加等待者數量if starving && old&mutexLocked != 0 {new |= mutexStarving}// 清除喚醒標志(若當前協程已被喚醒)if awoke {.if new&mutexWoken == 0 {throw("sync: inconsistent mutex state")}new &^= mutexWoken}if atomic.CompareAndSwapInt32(&m.state, old, new) {// 成功獲取鎖(僅在非饑餓模式且鎖未被持有時可能)if old&(mutexLocked|mutexStarving) == 0 {break }// 加入等待隊列(LIFO 或 FIFO,取決于是否已等待過)queueLifo := waitStartTime != 0if waitStartTime == 0 {waitStartTime = runtime_nanotime()}// 阻塞等待信號量喚醒runtime_SemacquireMutex(&m.sema, queueLifo, 1)starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNsold = m.stateif old&mutexStarving != 0 {// 調整狀態:減少等待者數量,可能退出饑餓模式if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {throw("sync: inconsistent mutex state")}delta := int32(mutexLocked - 1<<mutexWaiterShift)if !starving || old>>mutexWaiterShift == 1 {// 退出饑餓模式delta -= mutexStarving}atomic.AddInt32(&m.state, delta)break}awoke = trueiter = 0} else {old = m.state// CAS 失敗,重新加載狀態}}if race.Enabled {race.Acquire(unsafe.Pointer(m))}
}
2.4 Unlock (了解即可)
func (m *Mutex) Unlock() {if race.Enabled {_ = m.staterace.Release(unsafe.Pointer(m))}new := atomic.AddInt32(&m.state, -mutexLocked)if new != 0 {m.unlockSlow(new)}
}
func (m *Mutex) unlockSlow(new int32) {if (new+mutexLocked)&mutexLocked == 0 {fatal("sync: unlock of unlocked mutex")}if new&mutexStarving == 0 {old := newfor {// If there are no waiters or a goroutine has already// been woken or grabbed the lock, no need to wake anyone.// In starvation mode ownership is directly handed off from unlocking// goroutine to the next waiter. We are not part of this chain,// since we did not observe mutexStarving when we unlocked the mutex above.// So get off the way.if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}// Grab the right to wake someone.new = (old - 1<<mutexWaiterShift) | mutexWokenif atomic.CompareAndSwapInt32(&m.state, old, new) {runtime_Semrelease(&m.sema, false, 1)return}old = m.state}} else {// Starving mode: handoff mutex ownership to the next waiter, and yield// our time slice so that the next waiter can start to run immediately.// Note: mutexLocked is not set, the waiter will set it after wakeup.// But mutex is still considered locked if mutexStarving is set,// so new coming goroutines won't acquire it.runtime_Semrelease(&m.sema, true, 1)}
}
這里就不再多介紹了,可以下去自己看一下源碼,借助ai,其實只要知道大致的思想,在去編寫的時候,在看即可了。
三.sync.RWMutex
- 從邏輯上,可以把 RWMutex 理解為一把讀鎖加一把寫鎖;
- 寫鎖具有嚴格的排他性,當其被占用,其他試圖取寫鎖或者讀鎖的 goroutine 均阻塞;
- 讀鎖具有有限的共享性,當其被占用,試圖取寫鎖的 goroutine 會阻塞,試圖取讀鎖的 goroutine 可與當前 goroutine 共享讀鎖;
- 綜上可見,RWMutex 適用于讀多寫少的場景,最理想化的情況,當所有操作均使用讀鎖,則可實現去無化;最悲觀的情況,倘若所有操作均使用寫鎖,則 RWMutex 退化為普通的 Mutex
3.1 核心數據結構
type RWMutex struct {w Mutex // held if there are pending writerswriterSem uint32 // semaphore for writers to wait for completing readersreaderSem uint32 // semaphore for readers to wait for completing writersreaderCount atomic.Int32 // number of pending readersreaderWait atomic.Int32 // number of departing readers
}
- rwmutexMaxReaders:共享讀鎖的 goroutine 數量上限,值為 2^29;
- w:RWMutex 內置的一把普通互斥鎖 sync.Mutex;
- writerSem:關聯寫鎖阻塞隊列的信號量;
- readerSem:關聯讀鎖阻塞隊列的信號量;
- readerCount:正常情況下等于介入讀鎖流程的 goroutine 數量;當 goroutine 接入寫鎖流程時,該值為實際介入讀鎖流程的 goroutine 數量減 rwmutexMaxReaders.
- readerWait:記錄在當前 goroutine 獲取寫鎖前,還需要等待多少個 goroutine 釋放讀鎖.
源碼的走讀就不再寫了,后續在學分布式鎖的時候在完善。