[閱讀指南]
這是該系列第二篇
基于kubernetes 1.27 stage版本
為了方便閱讀,后續所有代碼均省略了錯誤處理及與關注邏輯無關的部分。
文章目錄
- Reflector是什么
- 整體結構
- 工作流程
- list拉取數據
- 緩存resync操作
- watch監聽操作
- 總結
Reflector是什么
reflector在informer中就像是一個對外的窗口,它與api-server建立連接,監聽和獲取來自api-server的資源變化信息,并把這些信息放進deltaFIFO中,交給下一個環節處理。
整體結構
與api-server進行交互,通過list獲取指定的全量資源,watch監聽指定的資源變化事件,并將這些事件放入delta FIFO隊列中。
結構與交互如下圖
// 省略了部分字段,只留下我們關注的
type Reflector struct {// name identifies this reflector. By default it will be a file:line if possible.name string// reflector對象需要監控的資源類型,比如上一節workqueue中的&v1.Pod{}expectedType reflect.Type// deltaFIFO 隊列存儲對象store Store// 實現list/watchlisterWatcher ListerWatcher// 上次更新的資源版本號,用來判斷當前的node的資源狀況lastSyncResourceVersion string......
}
工作流程
reflecter主函數比較簡單,循環同步運行ListAndWatch直到收到stop信號。
func (r *Reflector) Run(stopCh <-chan struct{}) {wait.BackoffUntil(func() {if err := r.ListAndWatch(stopCh); err != nil {r.watchErrorHandler(r, err)}}, r.backoffManager, true, stopCh)
}
ListAndWatch主要做了這幾件事:
- 通過stream或者chunk方式拉取全量list數據
- 開啟一個協程進行緩存resync操作。
- 循環執行watch監聽操作
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {...fallbackToList := !r.UseWatchList// stream式同步if r.UseWatchList {w, err = r.watchList(stopCh)...if err != nil {...fallbackToList = truew = nil}}// chunk式同步if fallbackToList {err = r.list(stopCh)if err != nil {return err}}...go r.startResync(stopCh, cancelCh, resyncerrc)return r.watch(w, stopCh, resyncerrc)
}
接下來咱一步步來看。
list拉取數據
ListAndWatch拉取全量數據時,出現了兩種數據拉取的方式,list /watch
和stream list /watch
。
stream list是 kubernetes 1.27 引入的新方案,通過 ENABLE_CLIENT_GO_WATCH_LIST_ALPHA 變量可以啟用stream list,默認會使用原有的list/watch。后續會單獨開一篇介紹stream list方案,詳情可以通過KEP-3157了解
前者在初始化時list拉取全量數據,通過watch更新增量變化。
后者可以通過watch 請求的方式獲取list數據,從而減輕大規模集群初始化list數據時的資源消耗。
在建立watch連接時,攜帶如下兩個參數即可告知服務器使用streaming list進行一致性讀取。
sendInitialEvents=true
resourceVersionMatch=NotOlderThan
常規的list流程借用這個博主畫的時序圖來看下。
緩存resync操作
resync負責定期將本地的緩存重新加入deltaFIFO隊列,確保本地緩存與controller的數據一致性。
國內太多博客沒了解清楚就介紹這一部分是與api-server交互,進行relist。實際上resync完全沒有涉及到服務端的部分,他就是一個本地緩存的同步機制。與服務端的交互使用list/watch已經完全可以確保資源一致性了,基本不怎么需要進行relist操作,并且對于節點非常多的大集群來說,list非常消耗資源,何況是定期relist呢。
關于resync機制的介紹,不在這里展開,詳細看下一篇筆記。
watch監聽操作
watch的實現非常巧妙,它利用了http的chunk編碼傳輸機制建立長連接,來實現動態的數據監聽,可以了解分塊傳輸編碼。
同樣借用一張時序圖來看下watch的流程
reflector通過Watcher監聽api-server端的數據delta事件,并將這些事件放入deltaFIFO中統一處理。
// 在這里向服務端發起watch請求,并接收和處理資源變更事件
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {...for {...// w == nil表示使用常規的list/watch方式,streaming 方式會創建特殊的watcherif w == nil {timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))options := metav1.ListOptions{// 上次同步的資源版本,也就是本地的資源版本。以此來獲取增量的數據ResourceVersion: r.LastSyncResourceVersion(),// watch 超時時間,長時間沒有接受任務事件的watcher會被關掉,避免長時間掛起。TimeoutSeconds: &timeoutSeconds,// watch書簽,避免watch重啟時請求api-server導致的消耗。AllowWatchBookmarks: true,}// 創建一個watch對象,監聽api-server的資源變更事件,將接收到的事件丟進resultChan中w, err = r.listerWatcher.Watch(options)...}// 將resultChan中的取出放入FIFO 隊列err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)// 失敗重試邏輯...}
}
建立連接的邏輯在這一行
w, err = r.listerWatcher.Watch(options)
還是用上一篇workqueue來看看這個Watch實例的實現。
從Watch函數一路往上追溯,可以看到先是與server建立了http連接,再通過watch標記建立了watch連接,創建stream watcher對象,并拉起一個協程去處理監聽到的事件信息。
- 此后所有監聽的delta事件都會經過receive協程進入到resultChan中。
// reflector調用的watch函數
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {return lw.WatchFunc(options)
}// watchFunc函數的定義
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {...watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {options.Watch = true // 向服務端請求chunk連接optionsModifier(&options)return c.Get().Namespace(namespace).Resource(resource).VersionedParams(&options, metav1.ParameterCodec).Watch(context.TODO()) // 這里調用了getter的watch函數// getter是controller初始化時建立的http客戶端: clientset.CoreV1().RESTClient()}return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {... url := r.URL().String()for {req, err := r.newHTTPRequest(ctx)resp, err := client.Do(req)if err == nil && resp.StatusCode == http.StatusOK {return r.newStreamWatcher(resp)}...}
}func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {sw := &StreamWatcher{source: d,reporter: r,result: make(chan Event),done: make(chan struct{}),}go sw.receive() // 處理消息事件的協程return sw
}// 解析接收到的事件,并放到resultChan中等待后續處理。
func (sw *StreamWatcher) receive() {for {// 解析數據action, obj, err := sw.source.Decode()select {case <-sw.done:return// 將事件發送到resultChancase sw.result <- Event{Type: action,Object: obj,}:}}
}
- 進入resultChan的事件,由watchHandler取出再分類添加到FIFO隊列中。
func watchHandler(start time.Time,w watch.Interface, // watch實例store Store, // 存儲對象 比如delta FIFO queue...
) error {...
loop:for {select {case <-stopCh:return errorStopRequestedcase err := <-errc:return err// 從ResultChan中取出變更事件,并放進隊列中,比如delta FIFO隊列中case event, ok := <-w.ResultChan():// 省略了一些資源過濾和錯誤處理...// 解析監聽到的事件數據meta, err := meta.Accessor(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))continue}// 解析資源事件的版本resourceVersion := meta.GetResourceVersion()switch event.Type {case watch.Added: err := store.Add(event.Object) // 往隊列添加add delta事件... // err handlecase watch.Modified: err := store.Update(event.Object) // 往隊列添加update delta事件... // err handlecase watch.Deleted: err := store.Delete(event.Object) // 往隊列添加delete delta事件,在此之前會判斷事件對應的資源對象是否存在... // err handlecase watch.Bookmark:...default:... // err handle}// 更新resourceVersion版本號,下一輪watch就不會再收到重復的更新事件setLastSyncResourceVersion(resourceVersion)if rvu, ok := store.(ResourceVersionUpdater); ok {rvu.UpdateResourceVersion(resourceVersion)}...}}...return nil
}
總結
用一個圖來回顧下reflector各個模塊的關系~