深入源碼分析kubernetes informer機制(二)Reflector


[閱讀指南]
這是該系列第二篇
基于kubernetes 1.27 stage版本
為了方便閱讀,后續所有代碼均省略了錯誤處理及與關注邏輯無關的部分。


文章目錄

  • Reflector是什么
  • 整體結構
  • 工作流程
    • list拉取數據
    • 緩存resync操作
    • watch監聽操作
  • 總結

Reflector是什么

reflector在informer中就像是一個對外的窗口,它與api-server建立連接,監聽和獲取來自api-server的資源變化信息,并把這些信息放進deltaFIFO中,交給下一個環節處理。

整體結構

與api-server進行交互,通過list獲取指定的全量資源,watch監聽指定的資源變化事件,并將這些事件放入delta FIFO隊列中。
結構與交互如下圖

// 省略了部分字段,只留下我們關注的
type Reflector struct {// name identifies this reflector. By default it will be a file:line if possible.name string// reflector對象需要監控的資源類型,比如上一節workqueue中的&v1.Pod{}expectedType reflect.Type// deltaFIFO 隊列存儲對象store Store// 實現list/watchlisterWatcher ListerWatcher// 上次更新的資源版本號,用來判斷當前的node的資源狀況lastSyncResourceVersion string......
}

工作流程

reflecter主函數比較簡單,循環同步運行ListAndWatch直到收到stop信號。

func (r *Reflector) Run(stopCh <-chan struct{}) {wait.BackoffUntil(func() {if err := r.ListAndWatch(stopCh); err != nil {r.watchErrorHandler(r, err)}}, r.backoffManager, true, stopCh)
}

ListAndWatch主要做了這幾件事:

  1. 通過stream或者chunk方式拉取全量list數據
  2. 開啟一個協程進行緩存resync操作。
  3. 循環執行watch監聽操作
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {...fallbackToList := !r.UseWatchList// stream式同步if r.UseWatchList {w, err = r.watchList(stopCh)...if err != nil {...fallbackToList = truew = nil}}// chunk式同步if fallbackToList {err = r.list(stopCh)if err != nil {return err}}...go r.startResync(stopCh, cancelCh, resyncerrc)return r.watch(w, stopCh, resyncerrc)
}

接下來咱一步步來看。

list拉取數據

ListAndWatch拉取全量數據時,出現了兩種數據拉取的方式,list /watchstream list /watch

stream list是 kubernetes 1.27 引入的新方案,通過 ENABLE_CLIENT_GO_WATCH_LIST_ALPHA 變量可以啟用stream list,默認會使用原有的list/watch。后續會單獨開一篇介紹stream list方案,詳情可以通過KEP-3157了解

前者在初始化時list拉取全量數據,通過watch更新增量變化。
后者可以通過watch 請求的方式獲取list數據,從而減輕大規模集群初始化list數據時的資源消耗。
在建立watch連接時,攜帶如下兩個參數即可告知服務器使用streaming list進行一致性讀取。
sendInitialEvents=true
resourceVersionMatch=NotOlderThan

常規的list流程借用這個博主畫的時序圖來看下。
在這里插入圖片描述

緩存resync操作

resync負責定期將本地的緩存重新加入deltaFIFO隊列,確保本地緩存與controller的數據一致性。

國內太多博客沒了解清楚就介紹這一部分是與api-server交互,進行relist。實際上resync完全沒有涉及到服務端的部分,他就是一個本地緩存的同步機制。與服務端的交互使用list/watch已經完全可以確保資源一致性了,基本不怎么需要進行relist操作,并且對于節點非常多的大集群來說,list非常消耗資源,何況是定期relist呢。

關于resync機制的介紹,不在這里展開,詳細看下一篇筆記。

watch監聽操作

watch的實現非常巧妙,它利用了http的chunk編碼傳輸機制建立長連接,來實現動態的數據監聽,可以了解分塊傳輸編碼。
同樣借用一張時序圖來看下watch的流程
在這里插入圖片描述

reflector通過Watcher監聽api-server端的數據delta事件,并將這些事件放入deltaFIFO中統一處理。

// 在這里向服務端發起watch請求,并接收和處理資源變更事件
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {...for {...// w == nil表示使用常規的list/watch方式,streaming 方式會創建特殊的watcherif w == nil {timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))options := metav1.ListOptions{// 上次同步的資源版本,也就是本地的資源版本。以此來獲取增量的數據ResourceVersion: r.LastSyncResourceVersion(),// watch 超時時間,長時間沒有接受任務事件的watcher會被關掉,避免長時間掛起。TimeoutSeconds: &timeoutSeconds,// watch書簽,避免watch重啟時請求api-server導致的消耗。AllowWatchBookmarks: true,}// 創建一個watch對象,監聽api-server的資源變更事件,將接收到的事件丟進resultChan中w, err = r.listerWatcher.Watch(options)...}// 將resultChan中的取出放入FIFO 隊列err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)// 失敗重試邏輯...}
}

建立連接的邏輯在這一行
w, err = r.listerWatcher.Watch(options)

還是用上一篇workqueue來看看這個Watch實例的實現。
從Watch函數一路往上追溯,可以看到先是與server建立了http連接,再通過watch標記建立了watch連接,創建stream watcher對象,并拉起一個協程去處理監聽到的事件信息。

  • 此后所有監聽的delta事件都會經過receive協程進入到resultChan中。
// reflector調用的watch函數
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {return lw.WatchFunc(options)
}// watchFunc函數的定義
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {...watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {options.Watch = true // 向服務端請求chunk連接optionsModifier(&options)return c.Get().Namespace(namespace).Resource(resource).VersionedParams(&options, metav1.ParameterCodec).Watch(context.TODO()) // 這里調用了getter的watch函數// getter是controller初始化時建立的http客戶端: clientset.CoreV1().RESTClient()}return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {...	url := r.URL().String()for {req, err := r.newHTTPRequest(ctx)resp, err := client.Do(req)if err == nil && resp.StatusCode == http.StatusOK {return r.newStreamWatcher(resp)}...}
}func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {sw := &StreamWatcher{source:   d,reporter: r,result: make(chan Event),done: make(chan struct{}),}go sw.receive() // 處理消息事件的協程return sw
}// 解析接收到的事件,并放到resultChan中等待后續處理。
func (sw *StreamWatcher) receive() {for {// 解析數據action, obj, err := sw.source.Decode()select {case <-sw.done:return// 將事件發送到resultChancase sw.result <- Event{Type:   action,Object: obj,}:}}
}
  • 進入resultChan的事件,由watchHandler取出再分類添加到FIFO隊列中。
func watchHandler(start time.Time,w watch.Interface,	// watch實例store Store, // 存儲對象 比如delta FIFO queue...
) error {...
loop:for {select {case <-stopCh:return errorStopRequestedcase err := <-errc:return err// 從ResultChan中取出變更事件,并放進隊列中,比如delta FIFO隊列中case event, ok := <-w.ResultChan():// 省略了一些資源過濾和錯誤處理...// 解析監聽到的事件數據meta, err := meta.Accessor(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))continue}// 解析資源事件的版本resourceVersion := meta.GetResourceVersion()switch event.Type {case watch.Added: err := store.Add(event.Object) // 往隊列添加add delta事件... // err handlecase watch.Modified: err := store.Update(event.Object) // 往隊列添加update delta事件... // err handlecase watch.Deleted:	err := store.Delete(event.Object) // 往隊列添加delete delta事件,在此之前會判斷事件對應的資源對象是否存在... // err handlecase watch.Bookmark:...default:... // err handle}// 更新resourceVersion版本號,下一輪watch就不會再收到重復的更新事件setLastSyncResourceVersion(resourceVersion)if rvu, ok := store.(ResourceVersionUpdater); ok {rvu.UpdateResourceVersion(resourceVersion)}...}}...return nil
}

總結

用一個圖來回顧下reflector各個模塊的關系~
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/39743.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/39743.shtml
英文地址,請注明出處:http://en.pswp.cn/news/39743.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

RocketMQ雙主雙從同步集群部署

&#x1f388; 作者&#xff1a;互聯網-小啊宇 &#x1f388; 簡介&#xff1a; CSDN 運維領域創作者、阿里云專家博主。目前從事 Kubernetes運維相關工作&#xff0c;擅長Linux系統運維、開源監控軟件維護、Kubernetes容器技術、CI/CD持續集成、自動化運維、開源軟件部署維護…

學習筆記十九:Pod常見的狀態和重啟策略

Pod常見的狀態和重啟策略 常見的pod狀態第一階段&#xff1a;第二階段&#xff1a;擴展&#xff1a; pod重啟策略測試Always重啟策略正常停止容器內的tomcat服務非正常停止容器里的tomcat服務 測試never重啟策略正常停止容器里的tomcat服務非正常停止容器里的tomcat服務 測試On…

Mac安裝opencv后無法導入cv2的解決方法

前提條件&#xff1a;以下兩個插件安裝成功 pip install opencv-python pip install --user opencv-contrib-python 注&#xff1a;直接用pip install opencv-contrib-python如果報錯&#xff0c;就加上“–user" 第一步&#xff1a; 設置–添加python解釋器 第二步&am…

go語言惡意代碼檢測系統--對接前端可視化與算法檢測部分

Malware Detect System 1 產品介紹 惡意代碼檢測系統。 2 產品描述 2.1 產品功能 功能點詳細描述注冊賬號未注冊用戶注冊成為產品用戶&#xff0c;從而具備享有產品各項服務的資格登錄賬號用戶登錄產品&#xff0c;獲得產品提供的各項服務上傳惡意樣本用戶可以將上傳自己的…

uniapp微信小程序消息訂閱快速上手

一、微信公眾平臺小程序開通消息訂閱并設置模板 這邊的模板id和詳細內容后續前后端需要使用 二、uniapp前端 需要是一個button觸發 js&#xff1a; wx.getSetting({success(res){console.log(res)if(res.authSetting[scope.subscribeMessage]){// 業務邏輯}else{uni.request…

智安網絡|深入比較:Sass系統與源碼系統的差異及選擇指南

隨著前端開發的快速發展&#xff0c;開發人員需要使用更高效和靈活的工具來處理樣式表。在這個領域&#xff0c;Sass系統和源碼系統是兩個備受關注的選項。 Sass系統 Sass&#xff08;Syntactically Awesome Style Sheets&#xff09;是一種CSS預處理器&#xff0c;它擴展了CS…

CSS常見單位匯總

像素&#xff08;px&#xff09;&#xff1a; 絕對單位&#xff0c;以屏幕上的實際像素為基準&#xff0c;最常用于具體的尺寸和位置表示。 百分比&#xff08;%&#xff09;&#xff1a; 相對單位&#xff0c;基于父元素的屬性計算大小&#xff0c;如寬度、高度、邊距等。 自適…

@Param詳解

文章目錄 背景什么是ParamParam的使用方法使用方法&#xff1a;遇到的問題及因Param解決了什么問題使用與不使用對比 Param是如何進行映射的總結 背景 最近在開發過程中&#xff0c;在寫mapper接口是在參數前加了Param注解&#xff0c;但是在運行的時候就會報錯&#xff0c;說…

關于游戲盾

游戲盾&#xff08;Game Shield&#xff09;是一種針對游戲行業特點的網絡安全解決方案&#xff0c;主要針對游戲平臺面臨的各種網絡攻擊和安全威脅。以下是一些原因&#xff0c;說明為什么游戲平臺需要加游戲盾&#xff1a; 1. DDoS攻擊&#xff1a;游戲平臺通常容易受到分布式…

深入理解多態:面向對象編程中的靈活性與擴展性

文章目錄 代碼學習-多態什么是多態&#xff1f;多態在代碼中的體現多態的優勢 代碼學習-多態 什么是多態&#xff1f; 多態是面向對象編程中的重要概念之一&#xff0c;它指的是為不同的數據類型的實體提供統一的接口。簡而言之&#xff0c;就是同一個命令在不同的對象上會產…

更多openEuler鏡像加入AWS Marketplace!

自2023年7月openEuler 22.03 LTS SP1正式登陸AWS Marketplace后&#xff0c;openEuler社區一直持續于在AWS上提供更多版本。 目前&#xff0c;openEuler22.03 LTS SP1 ,SP2兩個版本及 x86 arm64兩種架構的四個鏡像均可通過AWS對外提供&#xff0c;且在亞太及歐洲15個Region開放…

wkhtmltopdf 與 .Net Core

wkhtmltopdf 是使用webkit引擎轉化為pdf的開源小插件. 其有.NET CORE版本的組件,DinkToPdf,但該控件對跨平臺支持有限。 故打算在Linux上安裝相關插件直接調用. 準備工作 虛擬機&#xff1a;Linux version 3.10.0-1160.el7.x86_64 wkhtmltox開發包&#xff1a;wkhtmltox_0.12…

Caused by: java.lang.ClassNotFoundException: net.sf.cglib.proxy.MethodProxy

1. 異常信息 2023-08-16 14:17:14.817 INFO 14304 [ restartedMain] io.seata.config.ConfigurationFactory : load Configuration:FileConfiguration$$EnhancerByCGLIB$$862af1eb 2023-08-16 14:17:15.006 ERROR 14304 [ restartedMain] g.springframework.boot.Sprin…

大數據Flink(六十):Flink 數據流和分層 API介紹

文章目錄 Flink 數據流和分層 API介紹 一、??????????????Flink 數據流

ZooKeeper的應用場景(命名服務、分布式協調通知)

3 命名服務 命名服務(NameService)也是分布式系統中比較常見的一類場景&#xff0c;在《Java網絡高級編程》一書中提到&#xff0c;命名服務是分布式系統最基本的公共服務之一。在分布式系統中&#xff0c;被命名的實體通常可以是集群中的機器、提供的服務地址或遠程對象等一這…

iOS申請證書(.p12)和描述文件(.mobileprovision)

打包app時&#xff0c;經常會用到ios證書&#xff0c;但很多人都苦于沒有蘋果電腦&#xff0c;即使有蘋果電腦的&#xff0c;也會覺得蘋果電腦操作也很麻煩&#xff0c;這里記錄一下&#xff0c;用香蕉云編&#xff0c;申請證書及描述文件的過程。 香蕉云編的地址&#xff1a;…

【C語言】每日一題(多數元素)

多數元素&#xff0c;鏈接奉上 方法 1.摩爾投票2.合理但錯誤的方法2.1暴力循環2.2排序求出中間元素中間元素 1.摩爾投票 先來簡單的介紹摩爾投票&#xff1a; 摩爾投票是一種用來解決絕對眾數問題的算法。 什么是絕對眾數呢&#xff1f; 在一個集合中&#xff0c;如果一個元素…

[國產MCU]-BL602開發實例-SPI與WS2812B驅動

SPI與WS2812B驅動 文章目錄 SPI與WS2812B驅動1、BL602的SPI介紹2、SPI驅動API介紹3、WS2812B介紹4、WS2812B的SPI驅動實現串行外設接口(Serial Peripheral Interface Bus,SPI)是一種用于短程通信的同步串行通信接口規范,設備之間使用全雙工模式通信,是一個主機和一個或多個…

每天一練:SpringBoot連接mq

目錄 每天一練:Springboot連接rabbitmq 每天一練:Springboot連接rabbitmq 目錄一、部署Rabbitmq&#xff1f;二、增加maven依賴三、連接RabbitMq四、發布和訂閱消息總結 一、部署Rabbitmq&#xff1f; 這里rabbitmq采用docker安裝部署。 拉取docker鏡像 [root192 ~]# docker…

【ChatGLM】ChatGLM-6B模型Win+4GB顯卡本地部署筆記

ChatGLM-6B是清華大學知識工程和數據挖掘小組發布的一個類似ChatGPT的開源對話機器人&#xff0c;由于該模型是經過約1T標識符的中英文訓練&#xff0c;且大部分都是中文&#xff0c;因此十分適合國內使用。 預期環境 本機電腦備注&#xff1a; Win10專業版 32G內存256固態系統…