本地緩存是一個項目中很常見的組件。在很多人的眼中就是一個簡單的key-value的map存儲即可實現,但實際上,設計一個本地緩存需要考慮的問題遠比你想象的多,比如說,本地緩存是將數據存儲在內存,若數據量激增突破了內存限制怎么辦?如何高效地管理本地緩存,以最低的時間復雜度添加和刪除緩存都是值得思考的問題
本文以第一視角帶你走進go-zero框架,從源碼的角度解析本地緩存的設計與實現
1. 數據結構
我們先來看本地緩存的數據結構
// 本地緩存對象
type LocalCache struct {name stringlock sync.Mutexdata map[string]any // 字典,用于索引數據expire time.Duration // 緩存過期時間lruCache LRU // LRU緩存,用于限制內存大小barrier singleflight.Group // 單飛模式,防止重復請求unstableExpiry Unstable // 引入過期時間的抖動,避免緩存同時過期timingWheel *TimingWheel // 時間輪,用于管理緩存的過期stats *cacheStat // 用于統計緩存命中率
}
一大堆東西眼花繚亂,不過不用擔心,后面我們會逐個解析。現在你只需要關注name、lock、data、expire這幾個成員。
顧名思義,name就是對應緩存的名稱;data是存儲緩存的map;expire是緩存過期時間;lock是緩存使用中用到的鎖,很好理解,當高并發的情況下,必然涉及到競爭,加個鎖來保證安全。
好了,現在你可以很輕易的實現本地緩存的增刪改查,無非就是操作map,操作時上個鎖罷了。不過,這只是個模型的雛形,接下來才是重點。
2. 內存溢出問題
首先,我們來考慮第一個問題:如果設置了大量的緩存數據導致內存溢出該怎么辦?
比較直觀的一個想法就是 給緩存設置固定的大小,當數據量超過緩存容量時就淘汰掉某個歷史數據。
沒錯,這就是緩存淘汰機制,常見的策略有很多,本文使用的是LRU。
2.1. LRU
所謂LRU指的是淘汰最近最少使用的數據
什么意思呢,假設緩存容量為3,而此時你依次插入了1、2、3三條數據,接下來要插入4了,那么將把1淘汰掉換成4,因為1是最久未使用的數據
再比如,還是依次插入了1、2、3三條數據,這時查詢了1,那么1就變成了最近最多使用的數據了,接下來再要淘汰數據就會淘汰2了
2.2. LRU實現
關于LRU的實現原理其實很簡單,一個雙向鏈表和一個map再加一個容量參數就可解決
- 雙向鏈表:管理數據的先后順序,鏈表頭的數據表示最近最常使用的數據,當插入和查詢數據時都將相應的數據插入或移動到鏈表頭;而刪除數據時從鏈表尾刪除
- map:用于存儲數據,維護映射關系,保證數據查詢時可以以O(1)的時間復雜度查到
- 容量參數:限制LRU的最大容量
2.3. 框架實現
接下來我們看框架源碼
上述數據結構中有一個LRU類型的LruCache,這就是其接口了
package localcacheimport "container/list"// LRU緩存
type LRU interface {Add(key string)Remove(key string)
}// 占位用,表示不起用LRU模型
type emptyLRU struct{}func (e emptyLRU) Add(key string) {}func (e emptyLRU) Remove(key string) {}// 關鍵LRU緩存
type KeyLRU struct {limit int // 最大容量evicts *list.List // 雙向鏈表elements map[string]*list.Element // key-value映射onEvict func(key string) // 回調函數
}func NewKeyLRU(limit int, onEvict func(key string)) *KeyLRU {return &KeyLRU{limit: limit,evicts: list.New(),elements: make(map[string]*list.Element),onEvict: onEvict,}
}func (k *KeyLRU) Add(key string) {// 1. 若key已存在,則將其移動到鏈表頭部if elem, ok := k.elements[key]; ok {k.evicts.MoveToFront(elem)return}// 2. 若key不存在,則將其添加到鏈表頭部,若加入后超過最大容量,則將鏈表尾的元素刪除elem := k.evicts.PushFront(key)k.elements[key] = elemif k.evicts.Len() > k.limit {k.removeOldest()}
}func (k *KeyLRU) removeOldest() {elem := k.evicts.Back()if elem != nil {k.removeElement(elem)}
}func (k *KeyLRU) removeElement(elem *list.Element) {k.evicts.Remove(elem)key := elem.Value.(string)delete(k.elements, key)k.onEvict(key)
}func (k *KeyLRU) Remove(key string) {if elem, ok := k.elements[key]; ok {k.removeElement(elem)}
}
首先LRU的接口中定義了Add和Remove兩個方法
為啥沒有Get呢?因為LRU我們這里用他就是用來管理緩存溢出問題的,并不是用來保存和查詢數據的,所以無需Get
接下來我們定義了兩個接口實現的結構體:
- emptyLRU:空LRU,表示不起用LRU模型
- KeyLRU:關鍵LRU,這個才是真正核心的LRU模型
好,我們重點看一下KeyLRU的實現,包含了4個參數,除了我們上述提及的雙向鏈表、map和容量參數外,還有一個回調函數
這個回調函數是干嘛的呢?
實際上,在LRU淘汰數據時,我們整個本地緩存模型還需要執行其他操作(這些后面會講到),由于這些操作涉及到了其他模塊,所以干脆搞一個回調函數,讓本地緩存模型把這些操作寫在回調函數里統一管理。
2.3.1. 構造函數
沒啥可說的,幾個構建操作
2.3.2. 插入操作
插入的邏輯如下:
- 判斷key是否已存在,若已存在則直接將該數據挪到雙向鏈表的頭部
- 若不存在,則先將key從頭部插進去,map也將之保存,然后看此時的雙向鏈表數據量是否已超過容量參數,如果超過了,就將鏈表尾的數據刪了,map中也把這條數據刪了,然后調用回調函數執行其他模塊的操作
2.3.3. 刪除操作
了解了插入操作,我們再看刪除操作就簡單了
如果map中存在要刪除的key,那么就按上面的刪除尾結點同樣的操作把這條數據刪了就ok
OK,現在你已經有了LRU來保證你的內存不會溢出,只要在執行本地緩存插入和刪除操作時對LRU進行相應的操作就可以了!
3. 緩存過期問題
每一條緩存數據都可能存在一個過期時間,當過期時間到達時我們需要將該數據刪除,但是這并不好實現。
假設我們在map中插入一條數據后為其起一個定時器,等到過期時間一到就刪除他。這么做看似合理,但如果數據量很大,且同一時間大量的緩存未過期就意味著需要大量的協程去起定時器。這樣既會造成較大的內存壓力也不方便管理。
于是我們很自然地想到是否可以構建一個高效的定時任務管理模型來統一處理緩存過期問題
這個模型需要滿足以下幾個條件:
- 以較低的時間復雜度進行定時任務的插入和刪除
- 支持插入大規模的任務
- 高并發場景下避免頻繁的鎖競爭
- 內存占用不宜過高
- 易于實現和擴展
能夠滿足上述條件的常見模型叫做“時間輪”
3.1. 時間輪模型
我們從圖出發,由圖可見,時間輪就是一個隨時間轉動的大轉盤,還有一個固定位置的指針。而轉盤上有著一個一個的槽位(圖中0、1、2…),槽位之間有著固定的時間間隔,定時任務就分布在這些槽上。當轉盤轉到了哪個槽,那么這個槽上的任務就開始執行。
精彩的來了,每個槽上都是一個雙向鏈表,同一槽位的任務就拴在這個鏈表上,當轉盤轉到這個槽位時,就從前向后遍歷這個鏈表,依次執行可執行的任務。
這個時候您可能要問了
為什么時間輪一定要設計成一個環形?
很簡單,因為節約空間,當設計成環形時意味著槽位的數量是固定的,那么雙向鏈表的數量也就是固定的。
但是這樣又帶來了一個新的問題:
如果槽位的數量是固定的,那么它能表示的時間范圍不就固定了?
比方說,槽位之間的間隔是1s,一共有5個槽位,那時間輪不就只能表示0-5s內的任務了嗎?
這個問題也很好解決,只需要引入“圈”的概念就可以了。還是上面的例子,一圈就是5s,那么對于6s后執行的任務它的槽位就是1圈1s的位置,3s的任務就是0圈3s。
由此我們只需要按照 過期時間%(槽位時間間隔*槽位數) 就可以得到需要插入的槽位位置。
當然,這個公式并不準確,因為它是按0槽位作為起點的,但實際我們應該以當前指針指向的槽位(代表當前時間)為起點,在此基礎上加上你的過期時間,于是公式變成了:
插入的槽位 = (當前槽位 + 過期時間/槽位時間間隔)%槽位數
圈數 = (過期時間/槽位時間間隔 - 1)/槽位數
值得一提的是,對于雙向鏈表中的每一條任務我們都需要維護其圈數,如果轉盤轉過來了,但發現你的圈數大于0,說明你至少還要再等待一輪才能執行。
時間輪為啥高效?
最后我們再來看看時間輪的時間復雜度和空間復雜度問題
- 首先時間輪的槽位固定,決定了它并不怎么占用空間,您可能會說槽位上不是有雙向鏈表嗎,怎么就不占用空間了。但細細想來,當任務逐漸被執行,雙向鏈表一直在流動著插入刪除,但槽位卻是固定占用一定的空間,即便某個槽位上沒有數據,也占著一定的空間。所以這才是問題的關鍵。
- 再來看時間復雜度問題。
- 由于槽位固定,本質上就是個定長數組,所以每個槽位的執行就是根據當前指針找到對應槽位而已,O(1)時間復雜度。
- 而執行就更簡單了,遍歷雙向鏈表,從前到后執行。執行完立馬刪了,O(1)時間復雜度
- 再看插入和刪除。根據我們上面提到的插入的槽位和圈數的計算公式,可能輕松找到待插入的槽位索引,然后插到鏈表尾就完事了,O(1)時間復雜度。而刪除也很容易,我們只需要給待刪除的任務打一下標記,當要執行的時候發現存在這個標記就直接把它刪了,也是O(1)時間復雜度
查找問題
可能您也發現了,無論是插入還是刪除任務都離不開查找任務是否存在的操作,但如果按照時間輪現有模型去查找豈不是要遍歷每個槽位和雙向鏈表?
于是我們很容易地想到在時間輪之外我們還需要維護一個map去方便查找,不過這個map一定是并發安全的。
3.2. 框架實現
了解了時間輪的設計思想,接下來看框架的具體實現
對應上述數據結構中的TimingWheel
package localcacheimport ("container/list""errors""fmt""time"
)var (ErrArgument = errors.New("incorrect task argument")ErrClosed = errors.New("TimingWheel is closed already")
)// 時間輪,用于管理和調度本地緩存過期任務
type TimingWheel struct {interval time.Duration // 每個slot的時間間隔ticker Ticker // 定時器,用于驅動時間輪的移動slots []*list.List // 時間輪的槽,每個槽位存儲一個任務鏈表timers *SafeMap // 線程安全的映射,用于跟蹤和管理所有定時任務tickedPos int // 當前時間輪指針的位置numSlots int // 時間輪的總槽數execute Execute // 執行任務函數// 接收不同類型任務操作的通道setChannel chan timingEntrymoveChannel chan baseEntryremoveChannel chan anystopChannel chan PlaceholderType
}// 定時任務基本信息
type baseEntry struct {delay time.Durationkey any
}// 表示一個定時任務
type timingEntry struct {baseEntryvalue anycircle int // 剩余圈數diff intremoved bool // 是否被移除
}// 表示任務在時間輪中的位置和狀態
type positionEntry struct {pos int // 槽位置item *timingEntry // 任務
}type timingTask struct {key anyvalue any
}// 定義一個執行任務的方法
type Execute func(key, value any)func NewTimingWheel(interval time.Duration, numSlots int, execute Execute) (*TimingWheel, error) {if interval <= 0 || numSlots <= 0 || execute == nil {return nil, fmt.Errorf("interval: %v, slots: %v, execute: %p",interval, numSlots, execute)}return NewTimingWheelWithTicker(interval, numSlots, execute, NewTicker(interval))
}func NewTimingWheelWithTicker(interval time.Duration, numSlots int, execute Execute, ticker Ticker) (*TimingWheel, error) {tw := &TimingWheel{interval: interval,ticker: ticker,slots: make([]*list.List, numSlots),timers: NewSafeMap(),execute: execute,numSlots: numSlots,setChannel: make(chan timingEntry),moveChannel: make(chan baseEntry),removeChannel: make(chan any),stopChannel: make(chan PlaceholderType),}tw.initSlots()go tw.run()return tw, nil
}func (tw *TimingWheel) initSlots() {for i := 0; i < tw.numSlots; i++ {tw.slots[i] = list.New()}
}// 借助select機制實際執行時間輪各任務
func (tw *TimingWheel) run() {for {select {case <-tw.ticker.Chan():tw.onTicker()case task := <-tw.setChannel:tw.setTask(&task)case key := <-tw.removeChannel:tw.removeTask(key)case task := <-tw.moveChannel:tw.moveTask(task)case <-tw.stopChannel:tw.ticker.Stop()return}}
}func (tw *TimingWheel) SetTimer(key, value any, delay time.Duration) error {if delay < 0 || key == nil {return ErrArgument}select {case tw.setChannel <- timingEntry{baseEntry: baseEntry{delay: delay,key: key,},value: value,}:return nilcase <-tw.stopChannel:return ErrClosed}
}// 移動任務到指定delay的位置
func (tw *TimingWheel) MoveTimer(key any, delay time.Duration) error {if delay <= 0 || key == nil {return ErrArgument}select {case tw.moveChannel <- baseEntry{delay: delay,key: key,}:return nilcase <-tw.stopChannel:return ErrClosed}
}func (tw *TimingWheel) RemoveTimer(key any) error {if key == nil {return ErrArgument}select {case tw.removeChannel <- key:return nilcase <-tw.stopChannel:return ErrClosed}
}// ================================== 實際執行任務方法 ==================================// 執行時間輪任務
func (tw *TimingWheel) onTicker() {// 找到執行槽位,掛在其任務鏈表上執行tw.tickedPos = (tw.tickedPos + 1) % tw.numSlotsl := tw.slots[tw.tickedPos]tw.scanAndRunTasks(l)
}func (tw *TimingWheel) scanAndRunTasks(l *list.List) {var tasks []timingTaskfor e := l.Front(); e != nil; {task := e.Value.(*timingEntry)if task.removed {next := e.Next()l.Remove(e)e = nextcontinue} else if task.circle > 0 {task.circle--e = e.Next()continue} else if task.diff > 0 {next := e.Next()l.Remove(e)pos := (tw.tickedPos + task.diff) % tw.numSlotstw.slots[pos].PushBack(task)tw.setTimerPosition(pos, task)task.diff = 0e = nextcontinue}tasks = append(tasks, timingTask{key: task.key,value: task.value,})next := e.Next()l.Remove(e)tw.timers.Del(task.key)e = next}tw.runTasks(tasks)
}func (tw *TimingWheel) runTasks(tasks []timingTask) {if len(tasks) == 0 {return}go func() {for i := range tasks {RunSafe(func() {tw.execute(tasks[i].key, tasks[i].value)})}}()
}func (tw *TimingWheel) setTask(task *timingEntry) {if task.delay < tw.interval {task.delay = tw.interval}if val, ok := tw.timers.Get(task.key); ok {// 任務已存在,更新任務的值并移動到指定delay的位置entry := val.(*positionEntry)entry.item.value = task.valuetw.moveTask(task.baseEntry)} else {pos, circle := tw.getPositionAndCircle(task.delay)task.circle = circletw.slots[pos].PushBack(task)tw.setTimerPosition(pos, task)}
}func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos, circle int) {steps := int(d / tw.interval)pos = (tw.tickedPos + steps) % tw.numSlotscircle = (steps - 1) / tw.numSlotsreturn
}func (tw *TimingWheel) setTimerPosition(pos int, task *timingEntry) {if val, ok := tw.timers.Get(task.key); ok {timer := val.(*positionEntry)timer.item = tasktimer.pos = pos} else {tw.timers.Set(task.key, &positionEntry{pos: pos,item: task,})}
}func (tw *TimingWheel) moveTask(task baseEntry) {val, ok := tw.timers.Get(task.key)if !ok {return}timer := val.(*positionEntry)if task.delay < tw.interval {GoSafe(func() {tw.execute(timer.item.key, timer.item.value)})return}pos, circle := tw.getPositionAndCircle(task.delay)if pos >= timer.pos {timer.item.circle = circletimer.item.diff = pos - timer.pos} else if circle > 0 {circle--timer.item.circle = circletimer.item.diff = tw.numSlots + pos - timer.pos} else {timer.item.removed = truenewItem := &timingEntry{baseEntry: task,value: timer.item.value,}tw.slots[pos].PushBack(newItem)tw.setTimerPosition(pos, newItem)}
}func (tw *TimingWheel) removeTask(key any) {val, ok := tw.timers.Get(key)if !ok {return}timer := val.(*positionEntry)timer.item.removed = truetw.timers.Del(key)
}
我們一點點來剖析,先看數據結構,包含了:
- interval:slot時間間隔
- ticker:定時器,驅動時間輪轉動
- slots:槽位,每個槽位是一個雙向鏈表
- timers:并發安全的map
- tickedPos:時間輪指針
- numSlots:總槽位數
- execute:任務執行函數
有了上述原理的基礎,這里的大部分參數應該都好理解。需要額外說明的是為什么有execute這個東西。
其實和LRU中的回調函數一樣,都是把具體的操作放在本地緩存模型中統一管理。
接下來注意到setChannel、moveChannel、removeChannel、
stopChannel,他們分別表示設置、移動、刪除、停止操作的通道。為什么有這幾個東西呢?這就涉及到時間輪的事件處理機制。
不理解沒關系,我們看下代碼
3.2.1. 事件處理機制
從構造函數入手,我們發現最后通過起了一個協程執行run()
func NewTimingWheelWithTicker(interval time.Duration, numSlots int, execute Execute, ticker Ticker) (*TimingWheel, error) {......go tw.run()......
}// 借助select機制實際執行時間輪各任務
func (tw *TimingWheel) run() {for {select {case <-tw.ticker.Chan():tw.onTicker()case task := <-tw.setChannel:tw.setTask(&task)case key := <-tw.removeChannel:tw.removeTask(key)case task := <-tw.moveChannel:tw.moveTask(task)case <-tw.stopChannel:tw.ticker.Stop()return}}
}
可以看到,run方法在監聽各個操作通道,當通道有數據被取出時就調用相應的執行函數去執行。同時也啟動定時器轉動時間輪。
為什么這么設計呢?
- 這就涉及到時間輪事件驅動的理念,通過channel可以將插入、刪除這些操作都看做一個個事件,逐一處理。
- 所有核心邏輯都整合在一個run方法中去執行,減少了代碼的耦合性
- 這種方式天然就具備了線程安全,如果沒有它,那么每個操作勢必要上鎖解鎖
- 再往深了想,這種方式進一步擴展,你可以為channel添加緩沖,可以很方便的擴展更多的事件
接下來我們從run方法出發,看各個事件是咋處理的
3.2.2. 時間輪轉動
首先,我們先說構造函數里的定時器
return NewTimingWheelWithTicker(interval, numSlots, execute, NewTicker(interval))
明白了吧,就是起了一個時間間隔為槽位時間間隔的Ticker
我們再看核心的onTicker方法
// 執行時間輪任務
func (tw *TimingWheel) onTicker() {// 找到執行槽位,掛在其任務鏈表上執行tw.tickedPos = (tw.tickedPos + 1) % tw.numSlotsl := tw.slots[tw.tickedPos]tw.scanAndRunTasks(l)
}
很好理解,每次Ticker觸發的時候,時間輪轉動一個槽位,考慮到是個環,所以本次觸發的槽位就是:
執行槽位 = (上次的槽位 + 1)%總槽位數
找到執行槽位后,再看scanAndRunTasks方法
func (tw *TimingWheel) scanAndRunTasks(l *list.List) {var tasks []timingTaskfor e := l.Front(); e != nil; {task := e.Value.(*timingEntry)if task.removed {next := e.Next()l.Remove(e)e = nextcontinue} else if task.circle > 0 {task.circle--e = e.Next()continue} else if task.diff > 0 {next := e.Next()l.Remove(e)pos := (tw.tickedPos + task.diff) % tw.numSlotstw.slots[pos].PushBack(task)tw.setTimerPosition(pos, task)task.diff = 0e = nextcontinue}tasks = append(tasks, timingTask{key: task.key,value: task.value,})next := e.Next()l.Remove(e)tw.timers.Del(task.key)e = next}tw.runTasks(tasks)
}
可以看到,本質就是從前向后遍歷雙向鏈表
- 先看是不是被標記刪除了,如果是直接干掉它
- 再看圈數是不是大于1,如果是說明還沒到它,跳過
- 再看是否需要延遲執行,如果是就把他扔到該去的槽里(這個后面還會講到)
- 最后把需要執行的任務統一放到runTasks中執行execute函數
那么execute函數到底是個啥?
好的,我提前滿足你的好奇心,我們來看本地緩存構造函數
func NewLocalCache(expire time.Duration, opts ...CacheOption) (*LocalCache, error) {......timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v any) {// 緩存過期,直接刪除key, ok := k.(string)if !ok {return}cache.Del(key)})if err != nil {return nil, err}cache.timingWheel = timingWheelreturn cache, nil
}
明白了吧,其實就是直接把緩存刪掉,當然這里面會存在很多操作,我們后面再說
3.2.3. 插入任務
func (tw *TimingWheel) setTask(task *timingEntry) {if task.delay < tw.interval {task.delay = tw.interval}if val, ok := tw.timers.Get(task.key); ok {// 任務已存在,更新任務的值并移動到指定delay的位置entry := val.(*positionEntry)entry.item.value = task.valuetw.moveTask(task.baseEntry)} else {pos, circle := tw.getPositionAndCircle(task.delay)task.circle = circletw.slots[pos].PushBack(task)tw.setTimerPosition(pos, task)}
}
邏輯就是先看任務是否存在,存在就更新其過期時間,也就是移到其他槽位去,不存在就找到其槽位尾插進去。
getPositionAndCircle就是根據上文提到的計算公式計算待插入的槽位和圈數
3.2.4. 移動任務
func (tw *TimingWheel) moveTask(task baseEntry) {val, ok := tw.timers.Get(task.key)if !ok {return}timer := val.(*positionEntry)if task.delay < tw.interval {GoSafe(func() {tw.execute(timer.item.key, timer.item.value)})return}pos, circle := tw.getPositionAndCircle(task.delay)if pos >= timer.pos {timer.item.circle = circletimer.item.diff = pos - timer.pos} else if circle > 0 {circle--timer.item.circle = circletimer.item.diff = tw.numSlots + pos - timer.pos} else {timer.item.removed = truenewItem := &timingEntry{baseEntry: task,value: timer.item.value,}tw.slots[pos].PushBack(newItem)tw.setTimerPosition(pos, newItem)}
}
大致的邏輯是:
- 先查到任務
- 如果任務的過期時間比槽位時間間隔還短,那就沒必要再移動了,直接執行就完事了
- 計算需要移動到的槽位和圈數,將需要移動的diff記錄下來,在時間輪轉動的時候移走(呼應前文)
3.2.5. 刪除任務
func (tw *TimingWheel) removeTask(key any) {val, ok := tw.timers.Get(key)if !ok {return}timer := val.(*positionEntry)timer.item.removed = truetw.timers.Del(key)
}
這里就是打個標記
3.2.6. 并發安全的map
最后我們來說說模型中并發安全的map,也就是SafeMap
這是框架自己定義的,我就不細說了,有興趣的自己去看源碼。
簡單來說就是設計了兩個map,一新一舊,記錄各自刪了多少數據。如果一個刪超標了,就用另一個覆蓋它,然后自己重置(清空)
終于說完時間輪了!
4. 緩存雪崩問題
所謂緩存雪崩是指緩存的數據在某個時刻大面積過期,從而導致系統資源的集中消耗或性能瓶頸。
那么框架式怎么解決的呢?
引入了隨機偏移量抖動。簡單來說就是在輸入的過期時間基礎上按一定的偏移量隨機偏移,避免大量緩存的過期時間一致,導致集中過期。
4.1. 實現
對應數據結構中的unstableExpiry,我們來看其結構Unstable
package localcacheimport ("math/rand""sync""time"
)// 過期時間抖動
type Unstable struct {deviation float64 // 抖動閾值r *rand.Randlock *sync.Mutex
}func NewUnstable(deviation float64) Unstable {if deviation < 0 {deviation = 0}if deviation > 1 {deviation = 1}return Unstable{deviation: deviation,r: rand.New(rand.NewSource(time.Now().UnixNano())),lock: new(sync.Mutex),}
}// 抖動時間
// 生成一個在[1-u.deviation,1+u.deviation]之間的隨機因子
func (u Unstable) AroundDuration(base time.Duration) time.Duration {u.lock.Lock()val := time.Duration((1 + u.deviation - 2*u.deviation*u.r.Float64()) * float64(base))u.lock.Unlock()return val
}// 抖動整數
// 生成一個在[1-u.deviation,1+u.deviation]之間的隨機因子
func (u Unstable) AroundInt(base int64) int64 {u.lock.Lock()val := int64((1 + u.deviation - 2*u.deviation*u.r.Float64()) * float64(base))u.lock.Unlock()return val
}
邏輯還是比較清晰的,就是經過計算使最終的過期時間保持在
[(1-deviation)*原過期時間, (1+deviation)*原過期時間]
5. 緩存擊穿問題
所謂緩存擊穿是指在某一時間有大量請求打過來,而恰巧此時一些熱點數據被集中清除,那么在緩存+DB的架構中請求就全部穿過了緩存落到DB上,造成壓力驟增。
因此我們希望當緩存失效時只有一個請求去加載數據,其他請求等待
Go語言中singleflight(單飛模式)可以完美滿足我們的需求
singleflight 是 golang.org/x/sync/singleflight 包提供的一個功能模塊,其內部使用了一個映射(map)來跟蹤正在進行的請求,當一個新的請求到來時,
singleflight會檢查是否已經有相同鍵的請求正在進行:
- 如果存在:新的請求會等待已在進行中的請求完成,并共享其結果
- 如果不存在:singleflight會執行該請求,并將結果緩存起來供后續相同鍵的請求使用
其使用也非常簡單,下面是一個示例:
package mainimport ("fmt""sync""time""golang.org/x/sync/singleflight"
)// 模擬從數據庫加載數據的函數
func loadDataFromDB(key string) (string, error) {// 模擬延遲time.Sleep(2 * time.Second)return fmt.Sprintf("data for %s", key), nil
}func main() {var wg sync.WaitGroupsfGroup := singleflight.Group{}// 模擬多個并發請求同一個keykey := "hot_key"for i := 0; i < 5; i++ {wg.Add(1)go func(id int) {defer wg.Done()// 使用 Do 方法確保只有一個請求會執行 loadDataFromDBv, err, shared := sfGroup.Do(key, func() (interface{}, error) {return loadDataFromDB(key)})if err != nil {fmt.Printf("Goroutine %d: error loading data: %v\n", id, err)return}fmt.Printf("Goroutine %d: got data: %s (shared: %v)\n", id, v, shared)}(i)}wg.Wait()
}
6. 緩存命中率統計
這個其實很好處理,每次查詢緩存的時候做一個計數。緩存命中了就給命中數+1,未命中就給未命中數+1,每隔一段時間就用 命中數/(命中數+未命中數) 計算出命中率,打印到日志即可。
來看代碼實現
package localcacheimport ("sync/atomic""time""e.coding.net/xverse-git/public/go_common/logger"
)const statInterval = time.Minute// 緩存命中率統計模塊
type cacheStat struct {name string // 緩存名稱,標識統計信息hit uint64 // 緩存命中次數miss uint64 // 緩存未命中次數sizeCallback func() int // 回調函數,用于動態獲取緩存的大小
}func newCacheStat(name string, sizeCallback func() int) *cacheStat {st := &cacheStat{name: name,sizeCallback: sizeCallback,}go st.statLoop()return st
}// 開啟定時任務進行統計
func (cs *cacheStat) statLoop() {ticker := time.NewTicker(statInterval)defer ticker.Stop()for range ticker.C {hit := atomic.SwapUint64(&cs.hit, 0)miss := atomic.SwapUint64(&cs.miss, 0)total := hit + missif total == 0 {continue}percent := 100 * float32(hit) / float32(total)logger.Infof("cache(%s) - qpm: %d, hit_ratio: %.1f%%, elements: %d, hit: %d, miss: %d",cs.name, total, percent, cs.sizeCallback(), hit, miss)}
}func (cs *cacheStat) IncrementHit() {atomic.AddUint64(&cs.hit, 1)
}func (cs *cacheStat) IncrementMiss() {atomic.AddUint64(&cs.miss, 1)
}
很直觀,也很好理解,就不去贅述了。額外提一嘴的是命中數和未命中數的+1操作由于是并發情況下的變更,因此是原子操作,所以通過 atomic.AddUint64 的方式。
7. 本地緩存各操作
本地緩存的各個設計要點和模塊終于講完了,現在我們可以回過頭來看其各操作的實現了。
7.1. 構造函數
type CacheOption func(*LocalCache)func NewLocalCache(expire time.Duration, opts ...CacheOption) (*LocalCache, error) {cache := &LocalCache{data: make(map[string]any),expire: expire,lruCache: emptyLRU{},barrier: singleflight.Group{},unstableExpiry: NewUnstable(expiryDeviation),}for _, opt := range opts {opt(cache)}if len(cache.name) == 0 {cache.name = defaultCacheName}cache.stats = newCacheStat(cache.name, cache.size)timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v any) {// 緩存過期,直接刪除key, ok := k.(string)if !ok {return}cache.Del(key)})if err != nil {return nil, err}cache.timingWheel = timingWheelreturn cache, nil
}func WithName(name string) CacheOption {return func(cache *LocalCache) {cache.name = name}
}// 設置最大容量
func WithLimit(limit int) CacheOption {return func(cache *LocalCache) {if limit > 0 {cache.lruCache = NewKeyLRU(limit, cache.onEvict)}}
}func (lc *LocalCache) onEvict(key string) {delete(lc.data, key)lc.timingWheel.RemoveTimer(key)
}
這里主要是對各模塊的一些初始化操作,包括定義時間輪的回調函數等。值得一提的是這里使用了函數式編程,使用opts將參數注入構造函數中。
這其實是一種很常見的默認值注入方法。當方法的部分參數不一定注入實參的時候,由于Go不像Python那樣擁有默認值機制,所以往往采用這種函數式編程的方式注入。
在使用時代碼如下:
cache, err := NewLocalCache(time.Second*2, WithName("any"))
當限制緩存數量,即設置WithLimit時才使用KeyLRU去控制。LRU的回調函數是在刪除緩存時觸發,對應onEvict方法,內容也很簡單,刪除map對應的key和時間輪刪除
7.2. 查詢緩存
func (lc *LocalCache) Get(key string) (any, bool) {value, ok := lc.doGet(key)if ok {lc.stats.IncrementHit()} else {lc.stats.IncrementMiss()}return value, ok
}func (lc *LocalCache) doGet(key string) (any, bool) {lc.lock.Lock()defer lc.lock.Unlock()value, ok := lc.data[key]if ok {lc.lruCache.Add(key)}return value, ok
}
這里沒什么好說的,包含了命中率的計數和LRU的添加(移動到鏈表頭結點)
7.3. 添加緩存
func (lc *LocalCache) Set(key string, value any) {lc.SetWithExpire(key, value, lc.expire)
}//nolint:errcheck // 不做檢查
func (lc *LocalCache) SetWithExpire(key string, value any, expire time.Duration) {lc.lock.Lock()_, ok := lc.data[key]lc.data[key] = valuelc.lruCache.Add(key)lc.lock.Unlock()expiry := lc.unstableExpiry.AroundDuration(expire) // 過期時間抖動處理if ok {lc.timingWheel.MoveTimer(key, expiry)} else {lc.timingWheel.SetTimer(key, value, expiry)}
}
當添加緩存時先進行LRU的添加,數據的保存。然后對過期時間進行抖動處理,然后將任務添加進時間輪,由時間輪轉到到過期時間的槽位時調用回調函數刪除該緩存。
7.4. 刪除緩存
func (lc *LocalCache) Del(key string) {lc.lock.Lock()delete(lc.data, key)lc.lruCache.Remove(key)lc.lock.Unlock()lc.timingWheel.RemoveTimer(key)
}
這里涉及map的刪除、LRU的刪除和時間輪的刪除。時間輪執行時調用的回調函數里就是這個方法。
7.5. 獲取緩存
func (lc *LocalCache) Take(key string, fetch func() (any, error)) (any, error) {if val, ok := lc.doGet(key); ok {lc.stats.IncrementHit()return val, nil}var fresh boolval, err, _ := lc.barrier.Do(key, func() (any, error) {if val, ok := lc.doGet(key); ok {return val, nil}v, e := fetch()if e != nil {return nil, e}fresh = truelc.Set(key, v)return v, nil})if err != nil {return nil, err}if fresh {lc.stats.IncrementMiss()return val, nil}lc.stats.IncrementHit()return val, nil
}
這個方法相對特殊,它允許當緩存不存在時根據用戶注入的fetch函數的邏輯去獲取并設置緩存。
當然,這里的設置是需要借助singleflight邏輯的。同樣,也涉及命中率的計數。
示例類似這樣:
cache.Take("first", func() (any, error) {time.Sleep(time.Millisecond * 100)return "first element", nil
})
OK,至此本地緩存全部講完。完結,撒花!