k8s client-go k8s informers 實現了持續獲取集群的所有資源對象、監聽集群的資源對象變化功能,并在本地維護了全量資源對象的內存緩存,以減少對 apiserver、對 etcd 的請求壓力。Informers 在啟動的時候會首先在客戶端調用 List 接口來獲取全量的對象集合,然后通過 Watch 接口來獲取增量的對象,然后更新本地緩存。
1. k8s informer 概述
我們都知道可以使用 k8s 的 Clientset 來獲取所有的原生資源對象,那么怎么能持續的獲取集群的所有資源對象,或監聽集群的資源對象數據的變化呢?這里不需要輪詢去不斷執行 List 操作,而是調用 Watch 接口,即可監聽資源對象的變化,當資源對象發生變化,客戶端即可通過 Watch 接口收到資源對象的變化。
Watch 接口雖然可以直接使用,但一般情況下很少直接使用,因為往往由于集群中的資源較多,我們需要自己在客戶端去維護一套緩存,而這個維護成本比較大。
也是因為如此,client-go 提供了自己的實現機制,Informers 應運而生。informers 實現了持續獲取集群的所有資源對象、監聽集群的資源對象變化功能,并在本地維護了全量資源對象的內存緩存,以減少對 apiserver、對 etcd 的請求壓力。Informers 在啟動的時候會首先在客戶端調用 List 接口來獲取全量的對象集合,然后通過 Watch 接口來獲取增量的對象,然后更新本地緩存。
此外 informers 也有很強的健壯性,當長期運行的 watch 連接中斷時,informers 會嘗試拉起一個新的 watch 請求來恢復連接,在不丟失任何事件的情況下恢復事件流。另外,informers 還可以配置一個重新同步的周期參數,每間隔該周期,informers 就會重新 List 全量數據。
在 informers 的使用上,通常每個 GroupVersionResource(GVR)只實例化一個 informers,但有時候我們在一個應用中往往會在多個地方對同一種資源對象都有 informer 的需求,所以就有了共享 informer,即 SharedInformerFactory。所以可以通過使用 SharedInformerFactory 來實例化 informers,這樣本地內存緩存就只有一份,通知機制也只有一套,大大提高了效率,減少了資源浪費。
1.1 k8s informer 架構
1.2 k8s informer 包含部件
k8s client-go informer 主要包括以下部件:
- Reflector:Reflector 從 kube-apiserver 中 list&watch 資源對象,然后調用 DeltaFIFO 的 Add/Update/Delete/Replace 方法將資源對象及其變化包裝成 Delta 并將其丟到 DeltaFIFO 中;
- DeltaFIFO:DeltaFIFO 中存儲著一個 map 和一個 queue,即
map[object key]Deltas
以及 object key 的 queue,Deltas 為 Delta 的切片類型,Delta 裝有對象及對象的變化類型(Added/Updated/Deleted/Sync) ,Reflector 負責 DeltaFIFO 的輸入,Controller 負責處理 DeltaFIFO 的輸出; - Controller:Controller 從 DeltaFIFO 的 queue 中 pop 一個 object key 出來,并獲取其關聯的 Deltas 出來進行處理,遍歷 Deltas,根據對象的變化更新 Indexer 中的本地內存緩存,并通知 Processor,相關對象有變化事件發生;
- Processor:Processor 根據對象的變化事件類型,調用相應的 ResourceEventHandler 來處理對象的變化;
- Indexer:Indexer 中有 informer 維護的指定資源對象的相對于 etcd 數據的一份本地內存緩存,可通過該緩存獲取資源對象,以減少對 apiserver、對 etcd 的請求壓力;
- ResourceEventHandler:用戶根據自身處理邏輯需要,注冊自定義的的 ResourceEventHandler,當對象發生變化時,將觸發調用對應類型的 ResourceEventHandler 來做處理。
根據 informer 架構,對 k8s informer 的分析將分為以下幾部分進行,本篇為概要分析:
(1)informer概要分析;
(2)informer之初始化與啟動分析;
(3)informer之Reflector分析;
(4)informer之DeltaFIFO分析;
(5)informer之Controller&Processor分析;
(6)informer之Indexer分析;
2. informer 使用示例代碼
使用大致過程如下:
(1)構建與 kube-apiserver 通信的 config 配置;
(2)初始化與 apiserver 通信的 clientset;
(3)利用 clientset 初始化 shared informer factory 以及 pod informer;
(4)注冊 informer 的自定義 ResourceEventHandler;
(5)啟動 shared informer factory,開始 informer 的 list & watch 操作;
(6)等待 informer 從 kube-apiserver 同步資源完成,即 informer 的 list 操作獲取的對象都存入到 informer 中的 indexer 本地緩存中;
(7)創建 lister,可以從 informer 中的 indexer 本地緩存中獲取對象;
func main() {// 自定義與kube-apiserver通信的config配置master := "192.168.1.10" // apiserver urlkubeconfig := "/.kube/config"config, err = clientcmd.BuildConfigFromFlags(master, kubeconfig)if err != nil {klog.Fatalf("Failed to create config: %v", err)}// 或使用k8s serviceAccount機制與kube-apiserver通信// config, err = rest.InClusterConfig()// 初始化與apiserver通信的clientsetclientset, err := kubernetes.NewForConfig(config)if err != nil {klog.Fatalf("Failed to create client: %v", err)}// 初始化shared informer factory以及pod informerfactory := informers.NewSharedInformerFactory(clientset, 30*time.Second)podInformer := factory.Core().V1().Pods()informer := podInformer.Informer()// 注冊informer的自定義ResourceEventHandlerinformer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: xxx,UpdateFunc: xxx,DeleteFunc: xxx,})// 啟動shared informer factory,開始informer的list & watch操作stopper := make(chan struct{})go factory.Start(stopper)// 等待informer從kube-apiserver同步資源完成,即informer的list操作獲取的對象都存入到informer中的indexer本地緩存中 // 或者調用factory.WaitForCacheSync(stopper)if !cache.WaitForCacheSync(stopper, informer.HasSynced) {runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))return}// 創建listerpodLister := podInformer.Lister()// 從informer中的indexer本地緩存中獲取對象podList, err := podLister.List(labels.Everything())if err != nil {fmt.Println(err)}}
以上只是對 K8s informer 做了簡單的介紹,以及簡單的寫了一下如何使用 informer 的示例代碼,后面將開始對 informer 的各個部件做進一步的源碼分析。
從圖中可以看出,k8s informer主要包括以下幾個部分:
2.1.Reflector
(1)Reflector 從 kube-apiserver 中 list 資源對象列表,然后調用 DeltaFIFO 的 Replace 方法將 object 包裝成 Sync/Deleted 類型的 Delta 丟進 DeltaFIFO 中;
(2)Reflector 從 kube-apiserver 中 watch 資源對象的變化,然后調用 DeltaFIFO 的 Add/Update/Delete 方法將 object 包裝成 Added/Updated/Deleted 類型的 Delta 丟到 DeltaFIFO 中;
2.2.DeltaFIFO
DeltaFIFO 中存儲著一個 map 和一個 queue;
(1)其中 queue 可以看成是一個先進先出隊列,一個 object 進入 DeltaFIFO 中,會判斷 queue 中是否已經存在該 object key,不存在則添加到隊尾;
(2)map 即 map[object key]Deltas
,是 object key 和 Deltas 的映射,Deltas 是 Delta 的切片類型,Delta 中存儲著 DeltaType 和 object;另外,Deltas 最末尾的兩個 Deleted 類型的 Delta 會被去重;
DeltaType 有4種,分別是 Added、Updated、Deleted、Sync
2.3.Controller
Controller 從 DeltaFIFO 的 queue 中 pop 一個 object key 出來,并從 DeltaFIFO 的 map 中獲取其對應的 Deltas 出來進行處理,遍歷 Deltas,根據 object 的變化類型更新 Indexer 本地緩存,并通知 Processor 相關對象有變化事件發生:
(1)如果 DeltaType 是 Deleted,則調用 Indexer 的 Delete 方法,將 Indexer 本地緩存中的 object 刪除,并構造 deleteNotification struct,通知 Processor 做處理;
(2)如果 DeltaType 是 Added/Updated/Sync,調用 Indexer 的 Get 方法從 Indexer 本地緩存中獲取該對象,存在則調用 Indexer 的 Update 方法來更新 Indexer 緩存中的該對象,隨后構造 updateNotification struct,通知 Processor 做處理;如果 Indexer 中不存在該對象,則調用 Indexer 的 Add 方法將該對象存入本地緩存中,并構造 addNotification struct,通知 Processor 做處理;
2.4.Processor
Processor 根據 Controller 的通知,即根據對象的變化事件類型(addNotification、updateNotification、deleteNotification),調用相應的 ResourceEventHandler(addFunc、updateFunc、deleteFunc)來處理對象的變化。
2.5.Indexer
Indexer 中有 informer 維護的指定資源對象的相對于 etcd 數據的一份本地內存緩存,可通過該緩存獲取資源對象,以減少對 apiserver、對etcd 的請求壓力。
informer 所維護的緩存依賴于 threadSafeMap 結構體中的 items 屬性,其本質上是一個用 map 構建的鍵值對,資源對象都存在 items 這個 map 中,key 為資源對象的 namespace/name 組成,value 為資源對象本身,這些構成了 informer 的本地緩存。
Indexer 除了維護了一份本地內存緩存外,還有一個很重要的功能,便是索引功能了。索引的目的就是為了快速查找,比如我們需要查找某個 node 節點上的所有 pod、查找某個命名空間下的所有 pod 等,利用到索引,可以實現快速查找。關于索引功能,則依賴于 threadSafeMap 結構體中的 indexers 與 indices 屬性。
2.6.ResourceEventHandler
用戶根據自身處理邏輯需要,注冊自定義的的 ResourceEventHandler,當對象發生變化時,將觸發調用對應類型的 ResourceEventHandler 來做處理。