深入源碼分析kubernetes informer機制(四)DeltaFIFO


[閱讀指南]
這是該系列第四篇
基于kubernetes 1.27 stage版本
為了方便閱讀,后續所有代碼均省略了錯誤處理及與關注邏輯無關的部分。


文章目錄

  • client-go中的存儲結構
  • DeltaFIFO
    • delta
    • 索引 key
    • queue push操作
      • delta push 去重
    • queue pop操作
  • 總結

client-go中的存儲結構

如下圖,clinet-go中定義了存儲類型接口store,用來提供存儲對象的基本能力。

queue繼承了store接口,并提供了隊列的能力,隊列中可以保存需要增刪改的存儲對象的key,它會取出隊頭元素,調用PopProcessFunc處理。

queue的實現有兩個:FIFOdeltaFIFO

deltaFIFO的不同點在于,deltaFIFO隊列中,key對應的不是對象本身,而是對象的delta。

另外deltaFIFO除了通過add、update、delete添加元素,還有兩種特殊的方式:replaced和sync。replaced一般發生在資源版本更新時,而sync由resync定時發起。

DeltaFIFO

下面是deltaFIFO數據結構的定義

type DeltaFIFO struct {// 并發讀寫鎖lock sync.RWMutexcond sync.Cond// `items` maps a key to a Deltas.// 資源對象的key與對應的delta數組,每個數組至少都會有一個deltaitems map[string]Deltas// 按照FIFO隊列順序存儲key,用來給pop()消費。// 該數組不會有重復值,并且所有元素都一定在items中queue []string// 生成key值的函數,默認是 MetaNamespaceKeyFunckeyFunc KeyFunc// 本地緩存中已知的所有資源對象的keyknownObjects KeyListerGetter......
}

delta

如前面所說,deltaFIFO中key映射的不是對象本身,是delta數組。

根據Delta數據結構的定義,delta包含了一個資源對象的變更類型及變更的內容。這里的Object不一定是完整的資源數據,大部分場景下只會有變更的部分信息。

type Delta struct {Type   DeltaTypeObject interface{}
}type DeltaType string
const (Added   DeltaType = "Added"Updated DeltaType = "Updated"Deleted DeltaType = "Deleted"Replaced DeltaType = "Replaced"Sync DeltaType = "Sync"
)

舉個栗子,本地已經有了一個pod對象,

&Pod{Name:      "mypod",Namespace: "default",Labels:    map[string]string{"app": "web", "version": "0.0.1"},
}

此時mypod的 lable從web變成了app-server,reflector就會創建一個這樣的delta對象放入FIFO隊列中。

&Delta{Type:   "Updated",Object: &Pod{Name:      "mypod",Namespace: "default",Labels:    map[string]string{"app": "app-server"},},
}

索引 key

deltaFIFO隊列中,存儲的是delta的key值,通過key值可以在items map中獲取到對應的delta對象。

這個key值在初始化FIFO時通過KeyFunction進行定義,使用者沒有指定時,都會使用自帶的命名函數 MetaNamespaceKeyFunc 進行命名,命名規則是

  • namespace不為空,key為/
  • namespace為空,key為

這里的name是在yaml資源配置中的matadata.name,比如上面的mypod。在同一個資源下,name在所有api version都一定是唯一的。

func MetaNamespaceKeyFunc(obj interface{}) (string, error) {if key, ok := obj.(ExplicitKey); ok {return string(key), nil}meta, err := meta.Accessor(obj)if len(meta.GetNamespace()) > 0 {return meta.GetNamespace() + "/" + meta.GetName(), nil}return meta.GetName(), nil
}

queue push操作

watcher監控的資源變更時,會調用deltaFIFO中Added、Updated、Deleted、Replaced、Sync方法,最終它們都會通過queueActionLocked 方法往deltaFIFO隊列中加入對應類型的delta對象。

queueActionLocked 也就是deltaFIFO的入隊操作。

和一般的入隊不同的是,新加入的delta不是直接加入到隊尾,隊列queue數組中保存的是delta的key。所以入隊的操作是這樣的

  1. 獲取delta對應的key值(還記得keyfunc嗎,又是它)
  2. 如果delta所屬的資源key已經在隊列中,直接將delta添加到key對應到deltas數組末尾。更新已存在的資源delta并不會影響他的key在隊列中的位置。
  3. 如果delta所屬的資源key不在隊列中,就將key添加到隊列末尾,并在items中關聯key和delta
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err := f.KeyOf(obj)// 自定義的轉換函數。可以在delta事件被處理之前完成一些預處理// 常見的用法是用來過濾一些處理程序不關注的資源對象、以及處理數據格式等if f.transformer != nil {obj, err = f.transformer(obj)}// 將新的delta放入資源key對應的delta數組末尾// 如果原本的key不存在,就是創建了一個新的數組,并將新的delta放入其中oldDeltas := f.items[id]newDeltas := append(oldDeltas, Delta{actionType, obj})// 對delta數組中的delta去重newDeltas = dedupDeltas(newDeltas)// 判斷key是否已經在隊列中,并且更新key對應的delta數組if len(newDeltas) > 0 {if _, exists := f.items[id]; !exists {f.queue = append(f.queue, id)}f.items[id] = newDeltasf.cond.Broadcast()}return nil
}

delta push 去重

上一節提到,delta進行push操作時,會對加入的delta進行去重。去重邏輯目前只針對兩個delete類型的delta有效:當delta數組中倒數第一個和第二個delta都是delete類型時,將會去掉其中一個

func dedupDeltas(deltas Deltas) Deltas {n := len(deltas)if n < 2 {return deltas}a := &deltas[n-1]b := &deltas[n-2]if out := isDup(a, b); out != nil {deltas[n-2] = *outreturn deltas[:n-1]}return deltas
}// 判斷a、b兩個delta是否重復
// 目前暫時只有兩個delete類型的delta會被判定為重復。
func isDup(a, b *Delta) *Delta {if out := isDeletionDup(a, b); out != nil {return out}return nil
}// 判定兩個delta是否都是deleted類型
func isDeletionDup(a, b *Delta) *Delta {if b.Type != Deleted || a.Type != Deleted {return nil}if _, ok := b.Object.(DeletedFinalStateUnknown); ok {return a}return b
}

舉個小小的例子來回顧一下delta push操作。假設queue中有3個pod對象,對應了不同的變更事件,如下所示。

此時watcher監聽到資源發生變化:

  1. pod2收到了updated事件
  2. pod1收到了deleted事件
  3. pod3收到了deleted事件

于是,三個delta入隊成功后的隊列圖如下

pod1已有一個deleted事件,再次收到deleted后,經過dedupDeltas去重,最終只保留一個deleted。

pod3雖然有兩個deleted事件,但是他們并不是連續的事件,不會被去重

queue pop操作

deltaFIFO出隊的操作和普通的隊列出隊類似,從隊頭取出一個資源對象key,并刪除items中key對應的deltas數組。

pop出隊時,會調用傳參PopProcessFunc對出隊元素進行處理。

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {f.lock.Lock()defer f.lock.Unlock()for {for len(f.queue) == 0 {// 隊列為空時阻塞if f.closed {return nil, ErrFIFOClosed}f.cond.Wait()}// 取出隊首的資源對象keyid := f.queue[0]f.queue = f.queue[1:]// 獲取key對應的deltas數組item, ok := f.items[id]// 執行pop處理函數,處理delta事件,如果處理失敗了,資源對象會被重新加入到隊列中。// 但是如果隊列中存在相同的對象,資源對象會被丟棄。err := process(item, isInInitialList)if e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)err = e.Err}return item, err}
}

這里一開始有個小疑問,如果資源的delta處理失敗了,并且隊列中又出現了同樣的資源key,這部分delta數據不就丟失了嗎?

但是仔細看出隊入隊公用一個鎖,pop處理對象時不會有新的對象入隊,所以理論上不會出現在addIfNotPresent時,key是persent的情況。而deltaFIFO入隊的邏輯,也不會存在一個隊列有兩個相同的key的情況,所以不會有丟失的問題,addIfNotPresent應該只是加多一層保障。如果理解有問題,歡迎大佬們指正。

回顧一下pop的調用方processLoop,調用pop時傳入PopProcessFunc(c.config.Process))。

系列第一篇介紹informer時提到過,c.config.Process最終調用的是processDeltas函數,它包含了數據同步到存儲,以及調用注冊的用戶函數兩個操作。

func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {...}}
}// 數據處理函數
func processDeltas(handler ResourceEventHandler,clientState Store,deltas Deltas,isInInitialList bool,
) error {// from oldest to newestfor _, d := range deltas {obj := d.Object// 區分事件類型進行處理switch d.Type {case Sync, Replaced, Added, Updated:// 同步存儲數據if old, exists, err := clientState.Get(obj); err == nil && exists {if err := clientState.Update(obj); err != nil {return err}// 回調用戶函數handler.OnUpdate(old, obj)} else {// 同步存儲數據if err := clientState.Add(obj); err != nil {return err}// 回調用戶函數handler.OnAdd(obj, isInInitialList)}case Deleted:// 同步存儲數據if err := clientState.Delete(obj); err != nil {return err}// 回調用戶函數handler.OnDelete(obj)}}return nil
}

總結

還是用上一節的例子,小結回顧一下整體的流程

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/38961.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/38961.shtml
英文地址,請注明出處:http://en.pswp.cn/news/38961.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

設計模式

本文主要介紹設計模式的主要設計原則和常用設計模式。 一、UML畫圖 1.類圖 2.時序圖 二、設計模式原則 1.單一職責原則 就是一個方法、一個類只做一件事&#xff1b; 2.開閉原則 就是軟件的設計應該對拓展開放&#xff0c;對修改關閉&#xff0c;這在java中體現最明顯的就…

什么是A股交易接口_(股票交易c接口)開發原理

A股交易接口是指用于與國內的證券交易所&#xff08;上海證券交易所和深圳證券交易所&#xff09;進行股票買賣交易的電子接口或軟件系統。A股交易接口是金融機構、券商以及個人投資者的必備掌握操作技能之一&#xff0c;它提供了實時的股票行情、交易下單、撤單、查詢賬戶信息…

基于Hadoop的表級監管

現狀 大數據平臺中,采用hadoop的方式存儲數據,hdfs本質上是文件系統,而文件系統對數據的監管能力有限,但是數據安全領域問題日漸凸顯,現目前,大數據平臺一般以分層結構進行授權,但是對于一線開發人員而言,是能夠接觸到整個大數據平臺中的所有表的,那么如何實現這樣一…

yum install/update排除特定/某些包方式

1 什么是 yum&#xff1f; yum 代表 “Yellowdog Updater, Modified”。Yum 是用于 rpm 系統的自動更新程序和包安裝/卸載器。 它在安裝包時自動解決依賴關系。 2 什么是 rpm&#xff1f; rpm 代表 “Red Hat Package Manager”&#xff0c;它是一款用于 Red Hat 系統的功能…

PB:庫管理函數

庫管理函數 1、LibraryCreate() 功 能:創建一個空的PowerBuilder應用庫,并可根據需要在創建應用庫的同時添加庫注解。 語 法:LibraryCreate ( libraryname{, comments } ) 參 數:libraryname:string類型,指定要創建應用庫的名稱,可以帶上路徑,不帶路徑時在當前目…

Docker本地鏡像發布到阿里云

1. 本地鏡像發布到阿里云 2. 鏡像的生成方法 OPTIONS說明&#xff1a; -a :提交的鏡像作者&#xff1b; -m :提交時的說明文字&#xff1b; 本次案例centosubuntu兩個&#xff0c;當堂講解一個&#xff0c;家庭作業一個&#xff0c;請大家務必動手&#xff0c;親自實操。 docke…

Gradio部署應用到服務器不能正常訪問

用Gradio部署一個基于ChatGLM-6B的應用&#xff0c;發布到團隊的服務器上&#xff08;局域網&#xff0c;公網不能訪問&#xff09;&#xff0c;我將gradio應用發布到服務器的9001端口 import gradio as gr with gr.Blocks() as demo:......demo.queue().launch(server_port90…

ad+硬件每日學習十個知識點(34)23.8.14 (DCDC詳細設計,續流二極管的選擇,COMP引腳的環路設計)

文章目錄 1.二極管的rrm電壓和rms電壓有什么不同2.DCDC續流二極管的選擇3.充電電容4.COMP引腳的環路設計5.DCDC設計總結6.多路并聯7.相位匹配8.工作模式9.低溫輸出偏離10.電源負載與效率11.降壓升壓模塊 1.二極管的rrm電壓和rms電壓有什么不同 答&#xff1a; 二極管的 RRM &a…

redis主從復制、哨兵服務、持久化、數據類型

Top NSD DBA DAY10 案例1&#xff1a;配置主從復制案例2&#xff1a;配置帶驗證的主從復制案例3&#xff1a;哨兵服務案例4&#xff1a;使用RDB文件恢復數據案例5&#xff1a;AOF案例6&#xff1a;字符類型案例7&#xff1a;列表類型案例8&#xff1a;散列類型案例9&#xff…

Linux交叉編譯opencv并移植ARM端

Linux交叉編譯opencv并移植ARM端 - 知乎 一、安裝交叉編譯器 目標平臺為arm7l&#xff0c;此為32位ARM架構&#xff0c;要安裝合適的編譯器 sudo apt install arm-linux-gnueabihf-gcc sudo apt install arm-linux-gnueabihf-g注意&#xff1a;64位ARM架構的編譯器與32位ARM架…

【MyBatis】查詢數據庫

目錄 一、什么是MyBatis 二、MyBatis框架的搭建 1、搭建MyBatis框架 2、設置MyBaits項目的配置 三、使用MyBatis完成數據庫的操作 1、MyBatis程序中sql語句的即時執行和預編譯 1.1、即時執行&#xff08;${}&#xff09; 1.2、預編譯&#xff08;#{}&#xff09; 1.3、即…

tomcat設置PermSize

最近tomcat老是報錯,查看了日志出現PermGen 內存不夠用,重啟tomcat后查詢使用情況 通過啟動參數發現沒有設置 PermGen,繼續通過jmap查看 jmap -heap 21179 發現99%已使用,而且默認是30.5M,太小了,這里設置成256M 1. 創建setenv.sh文件 在/usr/local/tomcat/bin目錄下創建一個…

解鎖編程的新契機:深入探討Kotlin Symbol Processor (KSP)的編寫

解鎖編程的新契機&#xff1a;深入探討Kotlin Symbol Processor (KSP)的編寫 1. 引言 隨著軟件開發領域的不斷發展&#xff0c;新的工具和技術不斷涌現&#xff0c;以滿足開發者在構建高效、可維護和創新性的代碼方面的需求。Kotlin Symbol Processor&#xff08;KSP&#xf…

從零開始,快速打造租車服務小程序的分享

隨著移動互聯網的發展&#xff0c;小程序成為了企業推廣和服務的重要手段之一。租車服務行業也不例外&#xff0c;通過打造一款租車服務小程序&#xff0c;企業可以更好地與用戶進行互動和交流&#xff0c;提供更方便快捷的租車服務。本文將介紹如何利用第三方制作平臺/工具快速…

PHP實現在線年齡計算器

1. 輸入日期查詢年齡 2. php laravel框架實現 代碼 /*** 在線年齡計算器*/public function ageDateCal(){// 輸入的生日時間$birthday $this->request(birthday);// 當前時間$currentDate date(Y-m-d);// 計算周歲$age date_diff(date_create($birthday), date_create($…

Eleastisearch5.2.2利用鏡像遷移構建實例后ES非健康狀態

正常遷移完成后啟動服務&#xff0c;查看ES非健康狀態 此時觀察ES集群狀態&#xff1a;curl -XGET -u elastic:xxx localhost:9200/_cluster/health?pretty 注意到"active_shards_percent_as_number" : 88.8888 該項的值不產生變化;集群狀態"status" : “…

8-1 統計字符

本題要求編寫程序&#xff0c;輸入10個字符&#xff0c;統計其中英文字母、空格或回車、數字字符和其他字符的個數。 輸入格式: 輸入為10個字符。最后一個回車表示輸入結束&#xff0c;不算在內。 輸出格式: 在一行內按照 letter 英文字母個數, blank 空格或回車個數, d…

升級指定版本Node.js或npm

一. 下載指定node.js版本Node.js 二. 升級node.js版本 打開電腦cmd 輸入 npm install node18.17.1 -g 三. 升級npm版本 打開電腦cmd 輸入 npm install npm8.1.2 -g

SQL注入之Oracle注入

SQL注入之Oracle注入 7.1 SQL注入之Oracle環境搭建 前言 Oracle Database&#xff0c;又名Oracle RDBMS&#xff0c;或簡稱Oracle。是甲骨文公司的一款關系數據庫管理系統。它是在數據庫領域一直處于領先地位的產品。可以說Oracle數據庫系統是世界上流行的關系數據庫管理系統…

在WordPress站點中展示閱讀量等流量分析數據(超詳細實現)

這篇文章也可以在我的博客中查看 關于本文 專業的流量統計系統能夠相對真實地反應網站的訪問情況。 這些數據可以在后臺很好地進行分析統計&#xff0c;但有時我們希望在網站前端展示一些數據 最常見的情景就是&#xff1a;展示頁面的瀏覽量 這簡單的操作當然也可以通過簡單…