[閱讀指南]
這是該系列第四篇
基于kubernetes 1.27 stage版本
為了方便閱讀,后續所有代碼均省略了錯誤處理及與關注邏輯無關的部分。
文章目錄
- client-go中的存儲結構
- DeltaFIFO
- delta
- 索引 key
- queue push操作
- delta push 去重
- queue pop操作
- 總結
client-go中的存儲結構
如下圖,clinet-go中定義了存儲類型接口store,用來提供存儲對象的基本能力。
queue繼承了store接口,并提供了隊列的能力,隊列中可以保存需要增刪改的存儲對象的key,它會取出隊頭元素,調用PopProcessFunc處理。
queue的實現有兩個:FIFO
和deltaFIFO
。
deltaFIFO的不同點在于,deltaFIFO隊列中,key對應的不是對象本身,而是對象的delta。
另外deltaFIFO除了通過add、update、delete添加元素,還有兩種特殊的方式:replaced和sync。replaced一般發生在資源版本更新時,而sync由resync定時發起。
DeltaFIFO
下面是deltaFIFO數據結構的定義
type DeltaFIFO struct {// 并發讀寫鎖lock sync.RWMutexcond sync.Cond// `items` maps a key to a Deltas.// 資源對象的key與對應的delta數組,每個數組至少都會有一個deltaitems map[string]Deltas// 按照FIFO隊列順序存儲key,用來給pop()消費。// 該數組不會有重復值,并且所有元素都一定在items中queue []string// 生成key值的函數,默認是 MetaNamespaceKeyFunckeyFunc KeyFunc// 本地緩存中已知的所有資源對象的keyknownObjects KeyListerGetter......
}
delta
如前面所說,deltaFIFO中key映射的不是對象本身,是delta數組。
根據Delta數據結構的定義,delta包含了一個資源對象的變更類型及變更的內容。這里的Object不一定是完整的資源數據,大部分場景下只會有變更的部分信息。
type Delta struct {Type DeltaTypeObject interface{}
}type DeltaType string
const (Added DeltaType = "Added"Updated DeltaType = "Updated"Deleted DeltaType = "Deleted"Replaced DeltaType = "Replaced"Sync DeltaType = "Sync"
)
舉個栗子,本地已經有了一個pod對象,
&Pod{Name: "mypod",Namespace: "default",Labels: map[string]string{"app": "web", "version": "0.0.1"},
}
此時mypod的 lable從web變成了app-server,reflector就會創建一個這樣的delta對象放入FIFO隊列中。
&Delta{Type: "Updated",Object: &Pod{Name: "mypod",Namespace: "default",Labels: map[string]string{"app": "app-server"},},
}
索引 key
deltaFIFO隊列中,存儲的是delta的key值,通過key值可以在items map中獲取到對應的delta對象。
這個key值在初始化FIFO時通過KeyFunction進行定義,使用者沒有指定時,都會使用自帶的命名函數 MetaNamespaceKeyFunc
進行命名,命名規則是
- namespace不為空,key為/
- namespace為空,key為
這里的name是在yaml資源配置中的matadata.name,比如上面的mypod。在同一個資源下,name在所有api version都一定是唯一的。
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {if key, ok := obj.(ExplicitKey); ok {return string(key), nil}meta, err := meta.Accessor(obj)if len(meta.GetNamespace()) > 0 {return meta.GetNamespace() + "/" + meta.GetName(), nil}return meta.GetName(), nil
}
queue push操作
watcher監控的資源變更時,會調用deltaFIFO中Added、Updated、Deleted、Replaced、Sync方法,最終它們都會通過queueActionLocked 方法往deltaFIFO隊列中加入對應類型的delta對象。
queueActionLocked 也就是deltaFIFO的入隊操作。
和一般的入隊不同的是,新加入的delta不是直接加入到隊尾,隊列queue數組中保存的是delta的key。所以入隊的操作是這樣的
- 獲取delta對應的key值(還記得keyfunc嗎,又是它)
- 如果delta所屬的資源key已經在隊列中,直接將delta添加到key對應到deltas數組末尾。更新已存在的資源delta并不會影響他的key在隊列中的位置。
- 如果delta所屬的資源key不在隊列中,就將key添加到隊列末尾,并在items中關聯key和delta
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err := f.KeyOf(obj)// 自定義的轉換函數。可以在delta事件被處理之前完成一些預處理// 常見的用法是用來過濾一些處理程序不關注的資源對象、以及處理數據格式等if f.transformer != nil {obj, err = f.transformer(obj)}// 將新的delta放入資源key對應的delta數組末尾// 如果原本的key不存在,就是創建了一個新的數組,并將新的delta放入其中oldDeltas := f.items[id]newDeltas := append(oldDeltas, Delta{actionType, obj})// 對delta數組中的delta去重newDeltas = dedupDeltas(newDeltas)// 判斷key是否已經在隊列中,并且更新key對應的delta數組if len(newDeltas) > 0 {if _, exists := f.items[id]; !exists {f.queue = append(f.queue, id)}f.items[id] = newDeltasf.cond.Broadcast()}return nil
}
delta push 去重
上一節提到,delta進行push操作時,會對加入的delta進行去重。去重邏輯目前只針對兩個delete類型的delta有效:當delta數組中倒數第一個和第二個delta都是delete類型時,將會去掉其中一個
。
func dedupDeltas(deltas Deltas) Deltas {n := len(deltas)if n < 2 {return deltas}a := &deltas[n-1]b := &deltas[n-2]if out := isDup(a, b); out != nil {deltas[n-2] = *outreturn deltas[:n-1]}return deltas
}// 判斷a、b兩個delta是否重復
// 目前暫時只有兩個delete類型的delta會被判定為重復。
func isDup(a, b *Delta) *Delta {if out := isDeletionDup(a, b); out != nil {return out}return nil
}// 判定兩個delta是否都是deleted類型
func isDeletionDup(a, b *Delta) *Delta {if b.Type != Deleted || a.Type != Deleted {return nil}if _, ok := b.Object.(DeletedFinalStateUnknown); ok {return a}return b
}
舉個小小的例子來回顧一下delta push操作。假設queue中有3個pod對象,對應了不同的變更事件,如下所示。
此時watcher監聽到資源發生變化:
- pod2收到了updated事件
- pod1收到了deleted事件
- pod3收到了deleted事件
于是,三個delta入隊成功后的隊列圖如下
pod1已有一個deleted事件,再次收到deleted后,經過dedupDeltas去重,最終只保留一個deleted。
pod3雖然有兩個deleted事件,但是他們并不是連續的事件,不會被去重
queue pop操作
deltaFIFO出隊的操作和普通的隊列出隊類似,從隊頭取出一個資源對象key,并刪除items中key對應的deltas數組。
pop出隊時,會調用傳參PopProcessFunc對出隊元素進行處理。
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {f.lock.Lock()defer f.lock.Unlock()for {for len(f.queue) == 0 {// 隊列為空時阻塞if f.closed {return nil, ErrFIFOClosed}f.cond.Wait()}// 取出隊首的資源對象keyid := f.queue[0]f.queue = f.queue[1:]// 獲取key對應的deltas數組item, ok := f.items[id]// 執行pop處理函數,處理delta事件,如果處理失敗了,資源對象會被重新加入到隊列中。// 但是如果隊列中存在相同的對象,資源對象會被丟棄。err := process(item, isInInitialList)if e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)err = e.Err}return item, err}
}
這里一開始有個小疑問,如果資源的delta處理失敗了,并且隊列中又出現了同樣的資源key,這部分delta數據不就丟失了嗎?
但是仔細看出隊入隊公用一個鎖,pop處理對象時不會有新的對象入隊,所以理論上不會出現在addIfNotPresent時,key是persent的情況。而deltaFIFO入隊的邏輯,也不會存在一個隊列有兩個相同的key的情況,所以不會有丟失的問題,addIfNotPresent應該只是加多一層保障。如果理解有問題,歡迎大佬們指正。
回顧一下pop的調用方processLoop
,調用pop時傳入PopProcessFunc(c.config.Process))。
系列第一篇介紹informer時提到過,c.config.Process最終調用的是processDeltas函數,它包含了數據同步到存儲,以及調用注冊的用戶函數兩個操作。
func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {...}}
}// 數據處理函數
func processDeltas(handler ResourceEventHandler,clientState Store,deltas Deltas,isInInitialList bool,
) error {// from oldest to newestfor _, d := range deltas {obj := d.Object// 區分事件類型進行處理switch d.Type {case Sync, Replaced, Added, Updated:// 同步存儲數據if old, exists, err := clientState.Get(obj); err == nil && exists {if err := clientState.Update(obj); err != nil {return err}// 回調用戶函數handler.OnUpdate(old, obj)} else {// 同步存儲數據if err := clientState.Add(obj); err != nil {return err}// 回調用戶函數handler.OnAdd(obj, isInInitialList)}case Deleted:// 同步存儲數據if err := clientState.Delete(obj); err != nil {return err}// 回調用戶函數handler.OnDelete(obj)}}return nil
}
總結
還是用上一節的例子,小結回顧一下整體的流程