sync.Map
不安全的 map
go 中原生的 map 不是并發安全的,多個 goroutine 并發地去操作一個 map 會拋出一個 panic
package main
import "fmt"
func main() {m := map[string]int {"1": 1, "2": 2,}// 并發寫for i := 0; i < 100; i ++ {go func(i int) {m[fmt.Sprintf("%d", i)] = i}(i)}// 讀for i := 0; i < 100; i ++ {fmt.Println(i, m[fmt.Sprintf("%d", i)])}
}PS E:\test\gol\main> go run .\01.go
fatal error: concurrent map writes
fatal error: concurrent map writes
解決的辦法是互斥地去讀寫,如:
type SafeMap struct {data map[interface{}]interface{}sync.RWMutex
}func (sm *SafeMap) Set(key interface{}, val interface{}) {sm.Lock()defer sm.Unlock()sm.data[key] = val
}func (sm *SafeMap) Get(key interface{}) (val interface{}){sm.Lock()defer sm.Unlock()val, ok := sm.data[key]if !ok {val = ""}return
}
而另一個常用的辦法就是使用 sync
包提供的 Map
.
sync.Map 概覽
sync.Map
包的核心是 Map
結構體,其向外暴露了四個方法:
// 從 Map 中取出一個 value
func (m *Map) Load(key interface{}) (value interface{}, ok bool)// 向 Map 中 存入一個 KV 對
func (m *Map) Store(key, value interface{})// 如果 Map 中存在 key,覆蓋并返回 (舊值, true), 否則返回 (新值, false)
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)// 從 Map 中刪除一個 KV 對
func (m *Map) Delete(key interface{})// 對 Map 中的所有 KV 執行 f, 直到 f 返回 false
func (m *Map) Range(f func(key, value interface{}) bool)
源碼分析
數據結構和設計思想
通過上面直接對所有讀寫操作加鎖的方式類似于Java中的 HashTable
, 效率并不高,所以參考 ConcurrentHashMap
, orcaman 提出了 concurrent_map
通過對內部
map
進行分片,降低鎖粒度,從而達到最少的鎖等待時間(鎖沖突).
但這樣只是降低了鎖粒度,sync.Map
的思路是盡可能使用原子操作而不是鎖,因為原子操作直接由硬件支持,在多核 CPU 環境下有更好的拓展性和性能。
如何對 map
使用原子操作呢?,之所以出現不安全的現象,是由于多個 goroutine 對同一個公有變量(map)操作引起的,如果我們將這個map
存儲在 atomic.Value
中,讀的時候使用 Load
原子地獲取到 map
, 再返回 map[key]
不就可以避免讀時鎖競爭了嗎?
type SafeMap struct {read atomic.Value
}type readOnly struct {m map[interface{}]interface{}
}func (m *SafeMap) Load(key interface{}) interface{}{read := m.read.Load().(readOnly)return read.m[key]
}
類似于上面地偽代碼,將 map
包裝成 readOnly
后,使用 Value
存儲,在需要 Load
的時候,原子地取出 readOnly
, 由于 read
變量不是公有的,所以在拿出 readOnly
后,再從其中查找 key
對應的 value
就不存在線程安全的問題了。
這樣看起來很完美,但問題在于僅僅使用 Value
無法安全的存儲鍵值對:
func (m *SafeMap) Store(k, v interface{}) {read := m.read.Load().(readOnly)read.m[key] = vm.read.Store(rea)
}
上面三條語句操作的其實是同一個 map
,可能出現在 store 之前已經有別人 store 的情況,不對這三條語句加鎖可能導致覆蓋別人的數據,所以其并不是安全的,要想實現安全存儲,必須加鎖:
type SafeMap struct {mu sync.Mutexread atomic.Value
}func (m *SafeMap) Store(k, v interface{}) {m.mu.Lock()read := m.read.Load().(readOnly)read.m[key] = vm.read.Store(rea)m.mu.UnLock()
}
但這就退化到了最初的情況,每次 Store
都需要競爭鎖,為了提高Store
的效率,sync.Map
使用了一個冗余的字段 dirty
, 如果是往 Map
中插入新值,就加鎖插入到 dirty
中, 如果是要修改已經存在的 key 對應的 value ,就可以直接修改 read
,當達到某種條件時,會把 dirty
轉換為 read
, 這樣設計能夠盡可能避免使用 Mutex
而改用性能和拓展性更好的 原子操作來實現安全并發。
Map struct
type Map struct {mu sync.Mutexread atomic.Valuedirty map[interface{}]*entrymisses int
}
mu
: 用于對 dirty 操作時保障并發安全的鎖read
: 與上面偽代碼中的read
相同,存儲一個只讀的量readOnly
, 對它的操作是原子的,所以對Map
的操作會優先在read
上嘗試。dirty
: 這里存儲的是最新的 KV 對,一個新的鍵值對會被存儲在這,等時機成熟,dirty
會被轉換為read
, 然后該字段會被置為空,由于dirty
中的數據總是比read
中的更新,所以在查詢修改等操作中,read
中如果找不到還需要回到dirty
中找。misses
: 控制什么時候dirty
轉換為read
, 每次從read
中沒找到回到dirty
中查詢都會導致misses
自增一,等misses > len(dirty)
時, 就會觸發轉換。
readOnly
type readOnly struct {// m 和 dirty 中的 value 是同一塊內存m map[interface{}]*entry// 如果 dirty 和 read 中的數據不一致時,amended 為 trueamended bool
}
readOnly
同樣類似于上面偽代碼中的 readOnly
, Map.read
中存放的就是它,其中 m
便是車存儲鍵值對的地方,由于 read
中的數據可能滯后于 dirty
, 所以需要使用 amended
來標識, read 中沒有讀到且 amended == true
時,要回 dirty
中查詢。
entry
type entry struct {p unsafe.Pointer // *interface{}
}
從上面可以看到,readOnly
和 dirty
中存儲的 Value 都是 entry
的指針,這樣做的好處在于:
dirty
和readOnly.m
中同一個key
指向的其實是同一個value
, 這樣冗余的就只有 key 和 一個指向值的指針了,可以減少空間浪費。- 修改值時可以直接修改指針指向,這樣對
read
和dirty
都能生效
Load
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {read, _ := m.read.Load().(readOnly)// 嘗試從 read 中獲取e, ok := read.m[key]// 如果 read 中沒找到并且 read 和 dirty 不一致,需要從 dirty 中找if !ok && read.amended {m.mu.Lock()// double-checking, 避免在加鎖過程中 dirty 被提升為 readread, _ = m.read.Load().(readOnly)e, ok = read.m[key]// 雙重檢查沒有得到,去 dirty 中找if !ok && read.amended {e, ok = m.dirty[key]// 修改 misses,嘗試提升 dirtym.missLocked()}m.mu.Unlock()}if !ok {return nil, false}return e.load()
}
Load
的邏輯很簡單,就是先從 read
中找,找不到就去 dirty
中找,并執行 missLocked()
修改 misses
判斷是否需要提升 dirty 到 read. 唯一需要注意的是這里的 double-checking
:
由于可能存在一個 goroutine 在執行完 if !ok && read.amended
但還沒有加鎖完成時,另一個 goroutine 將 dirty 提升成了 read 的情況,所以在加鎖之后還需要再從 read 中檢查一遍,這與 Java 安全單例中的雙重檢查是一樣的,雙重檢查會在 Map
中多次使用到。
從 read 或 dirty 中得到 key 對應的 value 后,并不是最終的結果,而是一個指向 entry 的指針,我們需要根據其指向的 entry 中的 p
拿到真實的 value:
func (e *entry) load() (value interface{}, ok bool) {p := atomic.LoadPointer(&e.p)if p == nil || p == expunged {return nil, false}return *(*interface{})(p), true
}
entry.p
有三種可能的值:
- nil
- expunged
- 其他具體的值
前兩種的出現是由于 Map
的延時刪除策略,到刪除時再說,所以在這個,如果 p
等于前兩種值,就說明 key
不存在或已經被刪除,所以返回 nil, false
missLocked
的邏輯也很簡單,每當調用,就將 misses
自增 1 ,當 m.misses >= len(m.dirty)
時,會進行提升,提升的過程也很簡單,提升結束后,會對 dirty
和 misses
初始化。
func (m *Map) missLocked() {m.misses++if m.misses < len(m.dirty) {return}// 將 dirty 提升為 readm.read.Store(readOnly{m: m.dirty})// 重置相關字段m.dirty = nilm.misses = 0
}
Delete
func (m *Map) Delete(key interface{}) {read, _ := m.read.Load().(readOnly)e, ok := read.m[key]if !ok && read.amended {m.mu.Lock()read, _ = m.read.Load().(readOnly)e, ok = read.m[key]if !ok && read.amended {// read 中沒有,從 dirty 中刪除delete(m.dirty, key)}m.mu.Unlock()}if ok {e.delete()}
}
Delete
的邏輯類似于 Load()
,通過雙重檢查判斷鍵值對是否在 read
中,不在的話直接從 dirty
中刪除,否則調用 entry
的 delete
方法從read
中刪除。
func (e *entry) delete() (hadValue bool) {for {p := atomic.LoadPointer(&e.p)// 不存在或被刪除if p == nil || p == expunged {return false}// CAS 將 enter.p 指向 nilif atomic.CompareAndSwapPointer(&e.p, p, nil) {return true}}
}
在 enter.delete()
中,并沒有真的刪除 value, 只是通過 CAS 把 enter.p
標記為了 nil,但這時這個鍵值對并沒有被從 read
中刪除,僅僅是吧它的值指向了 nil, 在之后的 Store
操作中,這個鍵可能還會被復用到,否則,直到下一次 dirty
升級這個鍵值才會被真正刪除,這就是延時刪除。
Store
func (m *Map) Store(key, value interface{}) {read, _ := m.read.Load().(readOnly)// kv 在 read 中能找到,更新 read key 對應的 entryif e, ok := read.m[key]; ok && e.tryStore(&value) {return}m.mu.Lock()read, _ = m.read.Load().(readOnly)if e, ok := read.m[key]; ok {if e.unexpungeLocked() {m.dirty[key] = e}e.storeLocked(&value)} else if e, ok := m.dirty[key]; ok {e.storeLocked(&value)} else {if !read.amended {m.dirtyLocked()m.read.Store(readOnly{m: read.m, amended: true})}m.dirty[key] = newEntry(value)}m.mu.Unlock()
}
更新值
更新值對應有兩種情況:
-
鍵值對在
read
中能找到,這時直接通過tryStore
修改enter.p
。read, _ := m.read.Load().(readOnly)// kv 在 read 中能找到,更新 read key 對應的 entryif e, ok := read.m[key]; ok && e.tryStore(&value) {return}
func (e *entry) tryStore(i *interface{}) bool {for {p := atomic.LoadPointer(&e.p)// 被刪除if p == expunged {return false}// 比較 e.p 與 p, 相等賦新值,否則自旋比較if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {return true}} }
tryStore
中使用 CAS 實現輕量級鎖實現了并發安全的更新操作。 -
在
read
中找不到,在dirty
中:在持鎖狀態下通過storeLocked
修改dirty
中entry.p
// m.mu.Lock() else if e, ok := m.dirty[key]; ok {e.storeLocked(&value) }
func (e *entry) storeLocked(i *interface{}) {atomic.StorePointer(&e.p, unsafe.Pointer(i)) }
插入新值
新值會被直接加鎖寫入到 dirty
中.
else {if !read.amended {m.dirtyLocked()m.read.Store(readOnly{m: read.m, amended: true})}m.dirty[key] = newEntry(value)
}
需要注意的是,如果 read.amended == false
時,即 dirty
中沒有新數據時,會執行 if 塊中的那兩條語句,這在兩種情況下會發生:
-
第一次往
Map
中插入數據時,amended == false
,dirty
是一個空 map , 這時dirtyLocked
會直接返回什么也不做,然后第二條語句會給read
分配一個空map
, 并標記dirty
中有新數據。 -
dirty 剛被提升為了 read, 這時
amended == false
,dirty == nil
,dirtyLocked
會將read
中沒有被刪除的字段復制到dirty
中, 當下一次提升 dirty 時,那些被標記的鍵值對才會被真正刪除。func (m *Map) dirtyLocked() {// 對應情況 1if m.dirty != nil {return}// 情況 2read, _ := m.read.Load().(readOnly)m.dirty = make(map[interface{}]*entry, len(read.m))for k, e := range read.m {// 沒有被刪除,復制到 dirty 中if !e.tryExpungeLocked() {m.dirty[k] = e}} }
tryExpungeLocked
用來判斷entry
是否被刪除,當entry.p == nil
時,說明這個 value 被標記為刪除,這時會把它重新標記為expunged
返回 true, 否則返回 false這里的并發安全同樣使用 CAS 輕量級鎖實現
func (e *entry) tryExpungeLocked() (isExpunged bool) {p := atomic.LoadPointer(&e.p)for p == nil {if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {return true}p = atomic.LoadPointer(&e.p)}return p == expunged }
修改已刪除的值
從上面知道,當對已經存在于 read
中的鍵值對執行刪除操作時,而是會把其暫時標記為 nil
, 等 dirty 升級為 read 后再插入新值時會把 read 中標記為 nil
的值標記為 expunged
, 而其他的值會被重新復制到 dirty 中,當這時插入剛被刪除的鍵后,就會直接把之前標記為 expunged
的鍵的值賦為新值,如:
sMap := Map{}sMap.Store(1, 2)
sMap.Store(2, 3)
sMap.Store(5, 5)
fmt.Println("[*] ", len(sMap.dirty)) // 3
sMap.Load(10)
sMap.Load(10)
sMap.Load(10) // 到這會執行 dirty 的提升
sMap.Load(10)
fmt.Println("[*] ", len(sMap.dirty)) // 0, 提升后 dirty == nil
sMap.Delete(1) // 此時 1 在 read 中,刪除會將其標記為 nil
sMap.Store(4, 4) // 觸發復制,
sMap.Store(1, 5) // 不會把 1 當作一個新值插入,而是直接存儲在剛刪除的 1 的位置
fmt.Println("[*] ", len(sMap.dirty)) // 4, 新值會先存儲在 dirty 中,同時會修改 read 中對應的 value
上面的代碼是我將 Map 源碼整體復制出來后測試的,Map 中的所有字段都是私有的,直接訪問不到
這種情況對應源碼中加鎖后的第一次判斷:
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {if e.unexpungeLocked() {m.dirty[key] = e}e.storeLocked(&value)
}
func (e *entry) unexpungeLocked() (wasExpunged bool) {return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
加鎖后就老朋友 double-checking ,然后如果 key 在 read 中時,會調用 storeLocked()
將 value 的指針存儲在 e.p
中,并且當value 被標記為 expunged
時(通過 e.unexpungeLocked()
判斷),意味著該鍵值對在之前已經被刪除,但由于它還是新加入的,所以必須存放在 dirty
中,否則下一次提升 dirty 就會丟失這個值.
這與第一種更新值的不同點在于更新值只會從 read 中更新,不會去操作 dirty, 這是因為在更新值時,dirty 與 read 是一致的,或則 dirty 比 read 更新,這是允許的,但在從 read 中復制值到 dirty 中時,我們不能將已標記的鍵值對也復制過去,這會導致這些鍵值無法被刪除,所以如果在插入已刪除的鍵值時還和更新值時一樣只改 read就會導致 read 比 dirty 新,這是不允許的。
LoadOrStore
LoadOrStore()
的作用是如果 key 存在,就 Load
, 否則就 Store
, 其邏輯與 Load 和 Store 基本一致,
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {// 命中 readread, _ := m.read.Load().(readOnly)if e, ok := read.m[key]; ok {actual, loaded, ok := e.tryLoadOrStore(value)if ok {return actual, loaded}}// 未命中read 或 `expunged`m.mu.Lock()// ...m.mu.Unlock()return actual, loaded
}
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {p := atomic.LoadPointer(&e.p)if p == expunged {return nil, false, false}if p != nil {return *(*interface{})(p), true, true}// p == nilic := ifor {// 賦新值if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {return i, false, true}// 已經被別的協程修改,重新判斷p = atomic.LoadPointer(&e.p)if p == expunged {return nil, false, false}if p != nil {return *(*interface{})(p), true, true}}
}
如果 key 在 read 中, 會進入 tryLoadOrStore
:
e.p == expunged
時, 說明 Key 已經被標記刪除,這時為了同時更新 dirty, 會延時到加鎖后執行。e.p != nil
時, 說明 Key Value 存在, 直接返回 Valuee.p == nil
時,說明鍵值對已經被刪除,但還沒有進行 dirty 的提升,會通過 CAS 賦新值(沒有提升,也就不需要像第一種情況一樣考慮 dirty),如果 CAS 沒有通過,說明已經有其他協程修改了這個鍵值,再次判斷其是nil
或expunged
read 沒有命中或 entry.p == expunged
時,需要加鎖對 dirty 進行操作,流程與 Store
完全一樣,不再贅述。
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {// Avoid locking if it's a clean hit.read, _ := m.read.Load().(readOnly)if e, ok := read.m[key]; ok {actual, loaded, ok := e.tryLoadOrStore(value)if ok {return actual, loaded}}m.mu.Lock()read, _ = m.read.Load().(readOnly)if e, ok := read.m[key]; ok {if e.unexpungeLocked() {m.dirty[key] = e}actual, loaded, _ = e.tryLoadOrStore(value)} else if e, ok := m.dirty[key]; ok {actual, loaded, _ = e.tryLoadOrStore(value)m.missLocked()} else {if !read.amended {// We're adding the first new key to the dirty map.// Make sure it is allocated and mark the read-only map as incomplete.m.dirtyLocked()m.read.Store(readOnly{m: read.m, amended: true})}m.dirty[key] = newEntry(value)actual, loaded = value, false}m.mu.Unlock()return actual, loaded
}
Range
我們可以使用安全的 for-range
對一個原生的 map 進行隨機遍歷,但 Map
使用不了這種簡單的方法,好在其提供了 Map.Range
,可以通過回調的方式隨機遍歷其中的鍵值。
Range
接受一個回調函數,在調用時,Range 會把當前遍歷到的鍵值對傳給這個給回調 f
, 當 f
返回 false 時,遍歷結束。
Range
的源碼很簡單,為了保證遍歷完整進行,在真正遍歷之前,他會通過 double-checking
提升 dirty.
func (m *Map) Range(f func(key, value interface{}) bool) {read, _ := m.read.Load().(readOnly)if read.amended {m.mu.Lock()read, _ = m.read.Load().(readOnly)if read.amended {read = readOnly{m: m.dirty}m.read.Store(read)m.dirty = nilm.misses = 0}m.mu.Unlock()}for k, e := range read.m {v, ok := e.load()if !ok {continue}if !f(k, v) {break}}
}
總結
原生的 map
并不是并發安全的,在并發環境下使用原生 map 會直接導致一個 panic,為此,Go 官方從 1.7 之后添加了 sync.Map
,用于支持并發環境下的鍵值對存取操作。
實現并發安全的兩個思路分別是 原子操作 和 加鎖, 原子操作由于是直接面向硬件的一組不可分割的指令,所以效率要比加鎖高很多,因此 Map 的基本思路就是盡可能多的使用原子操作,直到迫不得已才去使用鎖機制,Map 的做法是將數據冗余存儲了兩個數據結構中,read 是一個只讀的 sync.Value
類型的結構,其上存儲的數據可以通過 Value.Load()
和 Value.Store()
安全存取,另外,新的數據會被存儲在 dirty
中, 等實際成熟, dirty 會被升級為 read.所有的讀和修改操作都會優先在 read
上進行,以此盡量避免使用鎖。
Map 的優勢主要集中于下面兩個場景:
(1) when the entry for a given key is only ever written once but read many times, as in caches that only grow,
(2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys.
即:
- 一次寫,多次讀
- 多個 goroutine 操作的鍵不相交時
關于源碼
源碼中的一些核心思想:
- 空間換時間
- 緩存思想
- double-checking
- 延遲刪除
關于 dirty 的提升
Map 中維持了一個 int 類型的 misses
每當 Map 未命中 read 時,會將該值自增 1, 當該值大于 dirty 的長度后,dirty 就會被提升為 read,提升之后,dirty 和 misses 會被重置,等下一次插入新值時,會將 read 中未刪除的數據復制到 dirty 中。
除此之外,執行 Range
時,也會先進行一次提升。
關于延遲刪除
當執行 Delete
時,如果 read 沒有擊中, 就會直接從 dirty 中刪除,否則如果鍵值在 read 中,會先將其 Value 的指針(enter.p)標記為 nil, 等下一次執行復制時,這些被標記為 nil 的鍵值會被重新標記為 expunged,即 enter.p 有三種可能的值:
- nil: 表示 鍵值已經被刪除,但這一版的 read 還沒有被復制到 dirty 中,所以 dirty 此時為 nil, 遇到要重新插入這個key時,可以直接修改 read,之后進行復制時,這個最新的值會被同步回 dirty。
- expunged: 表示該鍵值已經被刪除并且經歷了復制, dirty 不為 nil, 這時需要同時修改 read 和 dirty, 避免 read 的數據比 dirty 中的數據新,導致下一次提升時丟失新數據。
!= nil
: 表示存儲的是具體的 value 的指針。
被刪除的數據直到下一次提升時才會被真正刪除