一、map 是什么
map 是 Go 中用于存儲 key-value 關系數據的數據結構,類似 C++ 中的 map,Python 中的 dict。Go 中 map 的使用很簡單,但是對于初學者,經常會犯兩個錯誤:沒有初始化,并發讀寫。
1、未初始化的 map 都是 nil,直接賦值會報 panic。map 作為結構體成員的時候,很容易忘記對它的初始化。
2、并發讀寫是我們使用 map 中很常見的一個錯誤。多個協程并發讀寫同一個 key 的時候,會出現沖突,導致 panic。
Go 內置的 map 類型并沒有對并發場景場景進行優化,但是并發場景又很常見,如何實現線程安全(并發安全)的 map就很重要了?
二、三種線程安全的 map
1、加讀寫鎖(RWMutex)
這是最容易想到的一種方式。常見的 map 的操作有增刪改查和遍歷,這里面查和遍歷是讀操作,增刪改是寫操作,因此對查和遍歷需要加讀鎖,對增刪改需要加寫鎖。
以 map[int]int 為例,借助 RWMutex,具體的實現方式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | type ?RWMap? struct ?{? // 一個讀寫鎖保護的線程安全的map ???? sync.RWMutex? // 讀寫鎖保護下面的map字段 ???? m? map [int]int } // 新建一個RWMap func ?NewRWMap(n int) *RWMap { ???? return ?&RWMap{ ???????? m: make( map [int]int, n), ???? } } func ?(m *RWMap) Get(k int) (int, bool) {? //從map中讀取一個值 ???? m.RLock() ???? defer ?m.RUnlock() ???? v, existed := m.m[k]? // 在鎖的保護下從map中讀取 ???? return ?v, existed } func ?(m *RWMap) Set(k int, v int) {? // 設置一個鍵值對 ???? m.Lock()?????????????? // 鎖保護 ???? defer ?m.Unlock() ???? m.m[k] = v } func ?(m *RWMap) Delete(k int) {? //刪除一個鍵 ???? m.Lock()??????????????????? // 鎖保護 ???? defer ?m.Unlock() ???? delete(m.m, k) } func ?(m *RWMap) Len() int {? // map的長度 ???? m.RLock()??? // 鎖保護 ???? defer ?m.RUnlock() ???? return ?len(m.m) } func ?(m *RWMap) Each(f? func (k, v int) bool) {? // 遍歷map ???? m.RLock()????????????? //遍歷期間一直持有讀鎖 ???? defer ?m.RUnlock() ???? for ?k, v :=? range ?m.m { ???????? if ?!f(k, v) { ???????????? return ???????? } ???? } } |
2、分片加鎖
通過讀寫鎖 RWMutex 實現的線程安全的 map,功能上已經完全滿足了需要,但是面對高并發的場景,僅僅功能滿足可不行,性能也得跟上。鎖是性能下降的萬惡之源之一。所以并發編程的原則就是盡可能減少鎖的使用。當鎖不得不用的時候,可以減小鎖的粒度和持有的時間。
在第一種方法中,加鎖的對象是整個 map,協程 A 對 map 中的 key 進行修改操作,會導致其它協程無法對其它 key 進行讀寫操作。一種解決思路是將這個 map 分成 n 塊,每個塊之間的讀寫操作都互不干擾,從而降低沖突的可能性。
Go 比較知名的分片 map 的實現是 orcaman/concurrent-map,它的定義如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | var ?SHARD_COUNT = 32 ?? ? // 分成SHARD_COUNT個分片的map type ?ConcurrentMap []*ConcurrentMapShared ?? ? // 通過RWMutex保護的線程安全的分片,包含一個map type ?ConcurrentMapShared? struct ?{ ???? items???????? map [string] interface {} ???? sync.RWMutex? // Read Write mutex, guards access to internal map. } ?? ? // 創建并發map func ?New() ConcurrentMap { ???? m := make(ConcurrentMap, SHARD_COUNT) ???? for ?i := 0; i < SHARD_COUNT; i++ { ???????? m[i] = &ConcurrentMapShared{items: make( map [string] interface {})} ???? } ???? return ?m } ?? ? // 根據key計算分片索引 func ?(m ConcurrentMap) GetShard(key string) *ConcurrentMapShared { ???? return ?m[uint(fnv32(key))%uint(SHARD_COUNT)] } |
ConcurrentMap 其實就是一個切片,切片的每個元素都是第一種方法中攜帶了讀寫鎖的 map。
這里面 GetShard 方法就是用來計算每一個 key 應該分配到哪個分片上。
再來看一下 Set 和 Get 操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | func ?(m ConcurrentMap) Set(key string, value? interface {}) { ???? // 根據key計算出對應的分片 ???? shard := m.GetShard(key) ???? shard.Lock()? //對這個分片加鎖,執行業務操作 ???? shard.items[key] = value ???? shard.Unlock() } func ?(m ConcurrentMap) Get(key string) ( interface {}, bool) { ???? // 根據key計算出對應的分片 ???? shard := m.GetShard(key) ???? shard.RLock() ???? // 從這個分片讀取key的值 ???? val, ok := shard.items[key] ???? shard.RUnlock() ???? return ?val, ok } |
Get 和 Set 方法類似,都是根據 key 用 GetShard 計算出分片索引,找到對應的 map 塊,執行讀寫操作。
3、sync 中的 map
分片加鎖的思路是將大塊的數據切分成小塊的數據,從而減少沖突導致鎖阻塞的可能性。如果在一些特殊的場景下,將讀寫數據分開,是不是能在進一步提升性能呢?
在內置的 sync 包中(Go 1.9+)也有一個線程安全的 map,通過將讀寫分離的方式實現了某些特定場景下的性能提升。
其實在生產環境中,sync.map 用的很少,官方文檔推薦的兩種使用場景是:
a) when the entry for a given key is only ever written once but read many times, as in caches that only grow.
b) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys.
兩種場景都比較苛刻,要么是一寫多讀,要么是各個協程操作的 key 集合沒有交集(或者交集很少)。所以官方建議先對自己的場景做性能測評,如果確實能顯著提高性能,再使用 sync.map。
sync.map 的整體思路就是用兩個數據結構(只讀的 read 和可寫的 dirty)盡量將讀寫操作分開,來減少鎖對性能的影響。
下面詳細看下 sync.map 的定義和增刪改查實現。
sync.map 數據結構定義
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | type ?Map? struct ?{ ???? mu Mutex ???? // 基本上你可以把它看成一個安全的只讀的map ???? // 它包含的元素其實也是通過原子操作更新的,但是已刪除的entry就需要加鎖操作了 ???? read atomic.Value? // readOnly ???? // 包含需要加鎖才能訪問的元素 ???? // 包括所有在read字段中但未被expunged(刪除)的元素以及新加的元素 ???? dirty? map [ interface {}]*entry ???? // 記錄從read中讀取miss的次數,一旦miss數和dirty長度一樣了,就會把dirty提升為read,并把dirty置空 ???? misses int } type ?readOnly? struct ?{ ???? m??????? map [ interface {}]*entry ???? amended bool? // 當dirty中包含read沒有的數據時為true,比如新增一條數據 } // expunged是用來標識此項已經刪掉的指針 // 當map中的一個項目被刪除了,只是把它的值標記為expunged,以后才有機會真正刪除此項 var ?expunged = unsafe.Pointer(new( interface {})) // entry代表一個值 type ?entry? struct ?{ ???? p unsafe.Pointer? // *interface{} } |
Map 的定義中,read 字段通過 atomic.Values 存儲被高頻讀的 readOnly 類型的數據。dirty 存儲
Store 方法
Store 方法用來設置一個鍵值對,或者更新一個鍵值對。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | func ?(m *Map) Store(key, value? interface {}) { ???? read, _ := m.read.Load().(readOnly) ???? // 如果read字段包含這個項,說明是更新,cas更新項目的值即可 ???? if ?e, ok := read.m[key]; ok && e.tryStore(&value) { ???????? return ???? } ???? // read中不存在,或者cas更新失敗,就需要加鎖訪問dirty了 ???? m.mu.Lock() ???? read, _ = m.read.Load().(readOnly) ???? if ?e, ok := read.m[key]; ok {? // 雙檢查,看看read是否已經存在了 ???????? if ?e.unexpungeLocked() { ???????????? // 此項目先前已經被刪除了,需要添加到 dirty 中 ???????????? m.dirty[key] = e ???????? } ???????? e.storeLocked(&value)? // 更新 ???? }? else ?if ?e, ok := m.dirty[key]; ok {? // 如果dirty中有此項 ???????? e.storeLocked(&value)? // 直接更新 ???? }? else ?{? // 否則就是一個新的key ???????? if ?!read.amended {? //如果dirty為nil ???????????? // 需要創建dirty對象,并且標記read的amended為true, ???????????? // 說明有元素它不包含而dirty包含 ???????????? m.dirtyLocked() ???????????? m.read.Store(readOnly{m: read.m, amended: true}) ???????? } ???????? m.dirty[key] = newEntry(value)? //將新值增加到dirty對象中 ???? } ???? m.mu.Unlock() } // tryStore利用 cas 操作來更新value。 // 更新之前會判斷這個鍵值對有沒有被打上刪除的標記 func ?(e *entry) tryStore(i * interface {}) bool { ???? for ?{ ???????? p := atomic.LoadPointer(&e.p) ???????? if ?p == expunged { ???????????? return ?false ???????? } ???????? if ?atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { ???????????? return ?true ???????? } ???? } } // 將值設置成 nil,表示沒有被刪除 func ?(e *entry) unexpungeLocked() (wasExpunged bool) { ???? return ?atomic.CompareAndSwapPointer(&e.p, expunged, nil) } // 通過復制 read 生成 dirty func ?(m *Map) dirtyLocked() { ???? if ?m.dirty != nil { ???????? return ???? } ???? read, _ := m.read.Load().(readOnly) ???? m.dirty = make( map [ interface {}]*entry, len(read.m)) ???? for ?k, e :=? range ?read.m { ???????? if ?!e.tryExpungeLocked() { ???????????? m.dirty[k] = e ???????? } ???? } } // 標記刪除 func ?(e *entry) tryExpungeLocked() (isExpunged bool) { ???? p := atomic.LoadPointer(&e.p) ???? for ?p == nil { ???????? if ?atomic.CompareAndSwapPointer(&e.p, nil, expunged) { ???????????? return ?true ???????? } ???????? p = atomic.LoadPointer(&e.p) ???? } ???? return ?p == expunged } |
第2-6行,通過 cas 進行鍵值對更新,更新成功直接返回。
第8-28行,通過互斥鎖加鎖來處理處理新增鍵值對和更新失敗的場景(鍵值對被標記刪除)。
第11行,再次檢查 read 中是否已經存在要 Store 的 key(雙檢查是因為之前檢查的時候沒有加鎖,中途可能有協程修改了 read)。
如果該鍵值對之前被標記刪除,先將這個鍵值對寫到 dirty 中,同時更新 read。
如果 dirty 中已經有這一項了,直接更新 read。
如果是一個新的 key。dirty 為空的情況下通過復制 read 創建 dirty,不為空的情況下直接更新 dirty。
Load 方法
Load 方法比較簡單,先是從 read 中讀數據,讀不到,再通過互斥鎖鎖從 dirty 中讀數據。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | func ?(m *Map) Load(key? interface {}) (value? interface {}, ok bool) { ???? // 首先從read處理 ???? read, _ := m.read.Load().(readOnly) ???? e, ok := read.m[key] ???? if ?!ok && read.amended {? // 如果不存在并且dirty不為nil(有新的元素) ???????? m.mu.Lock() ???????? // 雙檢查,看看read中現在是否存在此key ???????? read, _ = m.read.Load().(readOnly) ???????? e, ok = read.m[key] ???????? if ?!ok && read.amended { //依然不存在,并且dirty不為nil ???????????? e, ok = m.dirty[key] // 從dirty中讀取 ???????????? // 不管dirty中存不存在,miss數都加1 ???????????? m.missLocked() ???????? } ???????? m.mu.Unlock() ???? } ???? if ?!ok { ???????? return ?nil, false ???? } ???? return ?e.load()? //返回讀取的對象,e既可能是從read中獲得的,也可能是從dirty中獲得的 } func ?(m *Map) missLocked() { ???? m.misses++? // misses計數加一 ???? if ?m.misses < len(m.dirty) {? // 如果沒達到閾值(dirty字段的長度),返回 ???????? return ???? } ???? m.read.Store(readOnly{m: m.dirty})? //把dirty字段的內存提升為read字段 ???? m.dirty = nil? // 清空dirty ???? m.misses = 0?? // misses數重置為0 } |
這里需要注意的是,如果出現多次從 read 中讀不到數據,得到 dirty 中讀取的情況,就直接把 dirty 升級成 read,以提高 read 效率。
Delete 方法
下面是 Go1.13 中 Delete 的實現方式,如果 key 在 read 中,就將值置成 nil;如果在 dirty 中,直接刪除 key。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | func ?(m *Map) Delete(key? interface {}) { ???? read, _ := m.read.Load().(readOnly) ???? e, ok := read.m[key] ???? if ?!ok && read.amended { ???????? m.mu.Lock() ???????? read, _ = m.read.Load().(readOnly) ???????? e, ok = read.m[key] ???????? if ?!ok && read.amended {? // 說明可能在 ???????????? delete(m.dirty, key) ???????? } ???????? m.mu.Unlock() ???? } ???? if ?ok { ???????? e.delete() ???? } } func ?(e *entry) delete() (hadValue bool) { ???? for ?{ ???????? p := atomic.LoadPointer(&e.p) ???????? if ?p == nil || p == expunged { ???????????? return ?false ???????? } ???????? if ?atomic.CompareAndSwapPointer(&e.p, p, nil) { ???????????? return ?true ???????? } ???? } } |
補充說明一下,delete() 執行完之后,e.p 變成 nil,下次 Store 的時候,執行到 dirtyLocked() 這一步的時候,會被標記成 enpunged。因此在 read 中 nil 和 enpunged 都表示刪除狀態。
sync.map 總結
上面對源碼粗略的梳理了一遍,最后在總結一下 sync.map 的實現思路:
-
讀寫分離。讀(更新)相關的操作盡量通過不加鎖的 read 實現,寫(新增)相關的操作通過 dirty 加鎖實現。
-
動態調整。新寫入的 key 都只存在 dirty 中,如果 dirty 中的 key 被多次讀取,dirty 就會上升成不需要加鎖的 read。
-
延遲刪除。Delete 只是把被刪除的 key 標記成 nil,新增 key-value 的時候,標記成 enpunged;dirty 上升成 read 的時候,標記刪除的 key 被批量移出 map。這樣的好處是 dirty 變成 read 之前,這些 key 都會命中 read,而 read 不需要加鎖,無論是讀還是更新,性能都很高。
總結了 sync.map 的設計思路后,我們就能理解官方文檔推薦的 sync.map 的兩種應用場景了。
三、總結
Go 內置的 map 使用起來很方便,但是在并發頻繁的 Go 程序中很容易出現并發讀寫沖突導致的問題。本文介紹了三種常見的線程安全 map 的實現方式,分別是讀寫鎖、分片鎖和 sync.map。
較常使用的是前兩種,而在特定的場景下,sync.map 的性能會有更優的表現。