1、鎖的概念引入
首先,為什么需要鎖?
在并發編程中,多個線程或進程可能同時訪問和修改同一個共享資源(例如變量、數據結構、文件)等,若不引入合適的同步機制,會引發以下問題:
-
數據競爭:多個線程同時修改一個資源,最終的結果跟線程的執行順序有關,結果是不可預測的。
-
數據不一致:一個線程在修改資源,而另一個線程讀取了未修改完的數據,從而導致讀取了錯誤的數據。
-
資源競爭:多線程競爭同一個資源,浪費系統的性能。
因此,我們需要一把鎖,來保證同一時間只有一個人能寫數據,確保共享資源在并發訪問下的正確性和一致性。
在這里,引入兩種常見的并發控制處理機制,即樂觀鎖與悲觀鎖:
-
樂觀鎖:假定在并發操作中,資源的搶占并不是很激烈,數據被修改的可能性不是很大,那這時候就不需要對共享資源區進行加鎖再操作,而是先修改了數據,最終來判斷數據有沒有被修改,沒有被修改則提交修改指,否則重試。
-
悲觀鎖:與樂觀鎖相反,它假設場景的資源競爭激烈,對共享資源區的訪問必須要求持有鎖。
針對不同的場景需要采取因地制宜的策略,比較樂觀鎖與悲觀所,它們的優缺點顯而易見:
2、Sync.Mutex
Go對單機鎖的實現,考慮了實際環境中協程對資源競爭程度的變化,制定了一套鎖升級的過程。具體方案如下:
-
首先采取樂觀的態度,Goroutine會保持自旋態,通過CAS操作嘗試獲取鎖。
-
當多次獲取失敗,將會由樂觀態度轉入悲觀態度,判定當前并發資源競爭程度劇烈,進入阻塞態等待被喚醒。
從樂觀轉向悲觀的判定規則如下,滿足其中之一即發生轉變:
-
Goroutine自旋嘗試次數超過4次
-
當前P的執行隊列中存在等待被執行的G(避免自旋影響GMP調度性能)
-
CPU是單核的(其他Goroutine執行不了,自旋無意義)
除此之外,為了防止被阻塞的協程等待過長時間也沒有獲取到鎖,導致用戶的整體體驗下降,引入了饑餓的概念:
-
饑餓態:若Goroutine被阻塞等待的時間>1ms,則這個協程被視為處于饑餓狀態
-
饑餓模式:表示當前鎖是否處于特定的模式,在該模式下,鎖的交接是公平的,按順序交給等待最久的協程。
饑餓模式與正常模式的轉變規則如下:
-
普通模式->饑餓模式:存在阻塞的協程,阻塞時間超過1ms
-
饑餓模式->普通模式:阻塞隊列清空,亦或者獲得鎖的協程的等待時間小于1ms,則恢復
接下來步入源碼,觀看具體的實現。
2.1、數據結構
位于包sync/mutex.go
中,對鎖的定義如下:
type Mutex struct {state int32sema uint32
}
-
state
:標識目前鎖的狀態信息,包括了是否處于饑餓模式、是否存在喚醒的阻塞協程、是否上鎖、以及處于等待鎖的協程個數有多少。 -
seme
:用于阻塞和喚醒協程的信號量。
將state
看作一個二進制字符串,它存儲信息的規則如下:
-
第一位標識是否處于上鎖,0表示否,1表示上鎖(mutexLocked)
-
第二位標識是否存在喚醒的阻塞協程(mutexWoken)
-
第三位標識是否處于饑餓模式(mutexStarving)
-
從第四位開始,記錄了處于阻塞態的協程個數
const (mutexLocked = 1 << iota // mutex is lockedmutexWokenmutexStarvingmutexWaiterShift = iotastarvationThresholdNs = 1e6 //饑餓閾值
)
2.2、獲取鎖Lock()
func (m *Mutex) Lock() {if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {return}m.lockSlow()
}
嘗試直接通過CAS操作直接獲取鎖,若成功則返回,否則說明鎖被獲取,步入LockSlow
。
2.3、LockSlow()
源碼較長,進行拆分講解:
var waitStartTime int64starving := falseawoke := falseiter := 0old := m.state
(1)定義了基本的常量,含義如下:
-
waitStartTime
:記錄當前協程等待的時間,只有被阻塞才會使用 -
awoke
:標識當前協程是否被Unlock喚醒 -
iter
:記錄當前協程自旋嘗試次數 -
old
:記錄舊的鎖的狀態信息
for {//處于上鎖狀態,并且不處于饑餓狀態中,并且當前的協程允許繼續自旋下去if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}runtime_doSpin()iter++old = m.statecontinue}//...}
(2)進入嘗試獲取鎖的循環中,兩個if表示:
-
若鎖處于上鎖狀態,并且不處于饑餓狀態中,并且當前的協程允許繼續自旋下去(非單核CPU、自旋次數<=4、調度器P的本地隊列不存在等待執行的G),則步入:若當前協程并非從等待隊列喚醒、并且不存在被喚醒的等待協程、并且存在位于阻塞的協程、則嘗試設置mutexWoken標識為1,若成功:標識當前的協程為被喚醒的協程。(雖然并非實際從阻塞中喚醒)告訴P,當前的協程處于自旋態更新
iter
計數器,與old
記錄的當前鎖的狀態信息,進行下一次重試循環
這里存在的唯一疑惑為,為什么要將awoke標識為true?
首先,因為當前鎖并非處于饑餓模式,因此當前的搶占鎖的模式是不公平的,若當前鎖的阻塞隊列還沒有被喚醒的協程,那就要求不要喚醒了,嘗試讓當前正在嘗試的協程獲取到鎖,避免喚醒協程進行資源競爭。
for {//...new := oldif old&mutexStarving == 0 {new |= mutexLocked}if old&(mutexLocked|mutexStarving) != 0 {new += 1 << mutexWaiterShift}if starving && old&mutexLocked != 0 {new |= mutexStarving}if awoke {new &^= mutexWoken}//...
}
(3)進行狀態更新:
當協程從步驟2走出來時,只能說明它位于以下兩個狀態之一:
-
旋不動了,或者鎖進入饑餓模式了,鎖要讓給別人了,總之是獲取不到鎖了(悲觀)。
-
鎖被釋放了。
不論如何,都需要進行一些狀態的更新,為接下來的打算做準備。
用new存儲一個鎖即將要進入的新狀態信息,更新規則:
-
若鎖不處于饑餓模式:說明鎖可能被釋放了,也可能是自旋次數過多,不管接下來是否能拿到鎖,鎖都會被某一個協程獲取,因此置
mutexLocked
為1。 -
若鎖可能處于饑餓狀態,或者鎖沒有被釋放:那說明自己是搶不到鎖了,即將進入阻塞態,阻塞協程計數器+1。
-
若當前的協程是被喚醒的,并且已經處在饑餓態中而且鎖仍然鎖著:鎖進入絕對公平的饑餓模式。
-
若當前協程是被喚醒的:清除
mutexWoken
標識位,因為接下來可能需要有協程被喚醒(饑餓模式)。
雖然更新的有點多,但是可以歸納為:
-
若鎖釋放了,那就標識一下接下來鎖要被獲取即可。
-
若鎖沒有釋放,并給當前協程等待了很久,那鎖就進入饑餓狀態,接下來需要有阻塞協程被喚醒。
(4)嘗試更新信息:
if atomic.CompareAndSwapInt32(&m.state, old, new) {//...} else {old = m.state}
接下來嘗試將new更新進state,若更新失敗,說明當前有另一個協程介入了,為了防止數據的一致性丟失,要全部重來一次。
(5)狀態更新成功,具體判斷是要沉睡還是獲取鎖成功:
步入步驟4的if主支中,此時有兩個狀態:
if atomic.CompareAndSwapInt32(&m.state, old, new) {if old&(mutexLocked|mutexStarving) == 0 {break // locked the mutex with CAS}//...} else {//...}
因為當前狀態,可能是鎖釋放了,檢查鎖更新前是否已經被釋放了并且不是饑餓模式,若是那說明獲取鎖成功了,函數結束了。
if atomic.CompareAndSwapInt32(&m.state, old, new) {if old&(mutexLocked|mutexStarving) == 0 {break // locked the mutex with CAS}// If we were already waiting before, queue at the front of the queue.queueLifo := waitStartTime != 0if waitStartTime == 0 {waitStartTime = runtime_nanotime()}runtime_SemacquireMutex(&m.sema, queueLifo, 2)//....} else {//...}
否則,說明當前協程要進入阻塞態了,記錄一下開始阻塞的時間,用于醒來是判斷是否饑餓。然后進入阻塞沉睡中。
(6)若步驟5進入阻塞,則被喚醒后:
if atomic.CompareAndSwapInt32(&m.state, old, new) {if old&(mutexLocked|mutexStarving) == 0 {break // locked the mutex with CAS}// If we were already waiting before, queue at the front of the queue.queueLifo := waitStartTime != 0if waitStartTime == 0 {waitStartTime = runtime_nanotime()}runtime_SemacquireMutex(&m.sema, queueLifo, 2)//喚醒starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNsold = m.state//若鎖處于饑餓模式if old&mutexStarving != 0 {//鎖的異常處理if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {throw("sync: inconsistent mutex state")}//將要更新的信號量delta := int32(mutexLocked - 1<<mutexWaiterShift)if !starving || old>>mutexWaiterShift == 1 {delta -= mutexStarving}atomic.AddInt32(&m.state, delta)break}awoke = trueiter = 0//....} else {//...}
從阻塞中喚醒,首先計算一些協程的阻塞時間,以及當前的最新鎖狀態。
若鎖處于饑餓模式:那么當前協程將直接獲取鎖,當前協程是因為饑餓模式被喚醒的,不存在其他協程搶占鎖。于是更新信號量,將記錄阻塞協程數-1,將鎖的上鎖態置1。若當前從饑餓模式喚醒的協程,等待時間已經不到1ms了或者是最后一個等待的協程,那么將將鎖從饑餓模式轉化為正常模式。至此,獲取成功,退出函數。
否則,只是普通的隨機喚醒,于是開始嘗試進行搶占,回到步驟1。
2.4、釋放鎖Unlock()
func (m *Mutex) Unlock() {//直接釋放鎖new := atomic.AddInt32(&m.state, -mutexLocked)if new != 0 {m.unlockSlow(new)}
}
通過原子操作,直接將鎖的mutexLocked
標識置為0。若置0后,鎖的狀態不為0,那就說明存在需要獲取鎖的協程,步入unlockSlow
。
2.5、unlockSlow()
func (m *Mutex) unlockSlow(new int32) {if (new+mutexLocked)&mutexLocked == 0 {fatal("sync: unlock of unlocked mutex")}if new&mutexStarving == 0 {old := newfor {if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}new = (old - 1<<mutexWaiterShift) | mutexWokenif atomic.CompareAndSwapInt32(&m.state, old, new) {runtime_Semrelease(&m.sema, false, 2)return}old = m.state}} else {runtime_Semrelease(&m.sema, true, 2)}
}
(1)首先進行了異常狀態處理,若釋放了一個已經釋放了到鎖,那么直接fatal,程序終止。
if (new+mutexLocked)&mutexLocked == 0 {fatal("sync: unlock of unlocked mutex")}
(2)若鎖不處于饑餓狀態:
-
若此時的等待協程數量為0,或者鎖被上鎖了、含有被喚醒的協程、鎖處于饑餓模式:都說明有新的協程介入了流程,已經完成了交接,可以直接退出
-
喚醒一個處于阻塞態的協程。
否則,處于饑餓狀態,喚醒等待最久的協程。
3、Sync.RWMutex
對于共享資源區的操作,可以劃分為讀與寫兩大類。假設在一個場景中,對共享資源區繼續讀的操作遠大于寫的操作,如果每個協程的讀操作都需要獲取互斥鎖,這帶來的性能損耗是非常大的。
RWMutex
是一個可以運用在讀操作>寫操作中的提高性能的鎖,可以將它視為由一個讀鎖與一個寫鎖構成。其運作規則具體如下:
-
讀鎖允許多個讀協程同時讀取共享資源區,若有協程需要修改資源區的數據,那么它需要被阻塞。
-
寫鎖具有嚴格的排他性,當共享資源區被上了寫鎖時,任何其他goroutine都不得訪問。
可見在最壞的情況下,所有的協程都是需要寫操作時,讀寫鎖會退化成普通的Mutex。
3.1、數據結構
type RWMutex struct {w Mutex // held if there are pending writerswriterSem uint32 // semaphore for writers to wait for completing readersreaderSem uint32 // semaphore for readers to wait for completing writersreaderCount atomic.Int32 // number of pending readersreaderWait atomic.Int32 // number of departing readers
}
const rwmutexMaxReaders = 1 << 30 //最大的讀協程數量
-
w
:一個互斥的寫鎖 -
writerSem
:關聯被阻塞的寫協程的信號量 -
readerSem
:關聯被阻塞的讀協程的信號量 -
readerCount
:正常情況下,記錄正在讀取的協程數量;但若當前是寫協程正在持有鎖,那么實際記錄讀協程的數量為readerCount - rwmutexMaxReader
-
readerWait
:記錄釋放下一個寫協程,還需要等待讀協程完成的數量
3.2、讀鎖流程RLock()
func (rw *RWMutex) RLock() {if rw.readerCount.Add(1) < 0 {// A writer is pending, wait for it.runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)}
}
對readerCount
+1,表示新加入一個讀協程。若結果<0,說明當前鎖正在被寫協程占據,令當前的讀協程阻塞。
3.3、讀釋放鎖流程RUnlock()
func (rw *RWMutex) RUnlock() {if r := rw.readerCount.Add(-1); r < 0 {// Outlined slow-path to allow the fast-path to be inlinedrw.rUnlockSlow(r)}
}
對readerCount
-1,表示減少一個讀協程。若結果<0,說明當前鎖正在被寫協程占據,步入runlockslow。
3.4、rUnlockSlow()
func (rw *RWMutex) rUnlockSlow(r int32) {if r+1 == 0 || r+1 == -rwmutexMaxReaders {race.Enable()fatal("sync: RUnlock of unlocked RWMutex")}if rw.readerWait.Add(-1) == 0 {// The last reader unblocks the writer.runtime_Semrelease(&rw.writerSem, false, 1)}
}
首先進行錯誤處理,若發現當前協程為占用過讀鎖,或者讀流程的協程數量上限,系統出現異常,fatal。
否則,對readerWait
-1,若結果為0,說明當前協程是最后一個介入讀鎖流程的協程,此時需要釋放一個寫鎖。
3.5、寫鎖流程Lock()
func (rw *RWMutex) Lock() {// First, resolve competition with other writers.rw.w.Lock()// Announce to readers there is a pending writer.r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders// Wait for active readers.if r != 0 && rw.readerWait.Add(r) != 0 {runtime_SemacquireRWMutex(&rw.writerSem, false, 0)}
}
首先嘗試獲取寫鎖,若獲取成功,需要將readerCount
-最大讀協程數,表示現在鎖被讀協程占據。
r表示處于讀流程的協程數量,若r不為0,那么就將readerWait
加上r,等這些讀協程都讀取完畢,再去寫。將這個寫協程阻塞。(讀寫鎖并非讀、寫公平,讀協程優先。)
3.6、寫釋放鎖流程Unlock()
func (rw *RWMutex) Unlock() {// Announce to readers there is no active writer.r := rw.readerCount.Add(rwmutexMaxReaders)if r >= rwmutexMaxReaders {race.Enable()fatal("sync: Unlock of unlocked RWMutex")}// Unblock blocked readers, if any.for i := 0; i < int(r); i++ {runtime_Semrelease(&rw.readerSem, false, 0)}// Allow other writers to proceed.rw.w.Unlock()
}
重新將readerCount
置為正常指,表示釋放了寫鎖。若讀協程超過最大上限,則異常。
然后喚醒所有阻塞的讀協程。(讀協程優先)
解鎖。
文章轉載自:MelonTe
原文鏈接:golang單機鎖實現 - MelonTe - 博客園
體驗地址:引邁 - JNPF快速開發平臺_低代碼開發平臺_零代碼開發平臺_流程設計器_表單引擎_工作流引擎_軟件架構