1. 概述
進入 K8s 的世界,會發現幾乎所有對象都被抽象為了資源(Resource),包括 K8s Core Resources(Pod, Service, Namespace 等)、CRD、APIService 擴展的資源類型。同時 K8s 底層將這些資源統一抽象為了 RESTful 的存儲(Storage),一方面服務端按目錄形式(/registry/xxx) 存放在 ETCD 中,另一方面也為客戶端提供了 RESTful API 接口,便于對資源的操作(get/post/put/patch/delete 等)。
K8s Watch API 就是為資源提供的一種持續監聽其變化的機制,當資源有任何變化的時候,都可以實時、順序、可靠的傳遞給客戶端,使得用戶可以針對目標資源進行靈活應用與操作。
那 K8s Watch 機制是怎么實現的呢?底層具體依賴了哪些技術?
本文將從 HTTP 協議、APIServer 啟動、ETCD Watch 封裝、服務端 Watch 實現、客戶端 Watch 實現等方面,對 K8s Watch 實現機制進行了解析。
大致流程如下
本文及后續相關文章都基于 K8s v1.23
2. 從 HTTP 說起
2.1 Content-Length
如下圖所示,HTTP 發送請求 Request 或服務端 Response,會在 HTTP header 中攜帶 Content-Length,以表明此次傳輸的總數據長度。如果 Content-Length 長度與實際傳輸長度不一致,則會發生異常(大于實際值會超時, 小于實際值會截斷并可能導致后續的數據解析混亂)。
curl baidu.com -v> GET / HTTP/1.1
> User-Agent: curl/7.29.0
> Host: baidu.com
> Accept: */*< HTTP/1.1 200 OK
< Date: Thu, 17 Mar 2022 04:15:25 GMT
< Server: Apache
< Last-Modified: Tue, 12 Jan 2010 13:48:00 GMT
< ETag: "51-47cf7e6ee8400"
< Accept-Ranges: bytes
< Content-Length: 81
< Cache-Control: max-age=86400
< Expires: Fri, 18 Mar 2022 04:15:25 GMT
< Connection: Keep-Alive
< Content-Type: text/html<html>
<meta http-equiv="refresh" content="0;url=http://www.baidu.com/">
</html>
如果服務端提前不知道要傳輸數據的總長度,怎么辦?
2.2 Chunked Transfer Encoding
HTTP 從 1.1 開始增加了分塊傳輸編碼(Chunked Transfer Encoding),將數據分解成一系列數據塊,并以一個或多個塊發送,這樣服務器可以發送數據而不需要預先知道發送內容的總大小。數據塊長度以十六進制的形式表示,后面緊跟著 \r\n,之后是分塊數據本身,后面也是 \r\n,終止塊則是一個長度為 0 的分塊。
> GET /test HTTP/1.1
> Host: baidu.com
> Accept-Encoding: gzip< HTTP/1.1 200 OK
< Server: Apache
< Date: Sun, 03 May 2015 17:25:23 GMT
< Content-Type: text/html
< Transfer-Encoding: chunked
< Connection: keep-alive
< Content-Encoding: gzip4\r\n (bytes to send)
Wiki\r\n (data)
6\r\n (bytes to send)
pedia \r\n (data)
E\r\n (bytes to send)
in \r\n
\r\n
chunks.\r\n (data)
0\r\n (final byte - 0)
\r\n (end message)
為了實現以流(Streaming)的方式 Watch 服務端資源變更,HTTP1.1 Server 端會在 Header 里告訴 Client 要變更 Transfer-Encoding 為 chunked,之后進行分塊傳輸,直到 Server 端發送了大小為 0 的數據。
2.3 HTTP/2
HTTP/2 并沒有使用 Chunked Transfer Encoding 進行流式傳輸,而是引入了以 Frame(幀) 為單位來進行傳輸,其數據完全改變了原來的編解碼方式,整個方式類似很多 RPC協議。Frame 由二進制編碼,幀頭固定位置的字節描述 Body 長度,就可以讀取 Body 體,直到 Flags 遇到 END_STREAM。這種方式天然支持服務端在 Stream 上發送數據,不需要通知客戶端做什么改變。
+-----------------------------------------------+
| Body Length (24) | ----Frame Header
+---------------+---------------+---------------+
| Type (8) | Flags (8) |
+-+-------------+---------------+-------------------+
|R| Stream Identifier (31) |
+=+=================================================+
| Frame Payload (0...) ... ----Frame Data
+---------------------------------------------------+
K8s 為了充分利用 HTTP/2 在 Server-Push、Multiplexing 上的高性能 Stream 特性,在實現 RESTful Watch 時,提供了 HTTP1.1/HTTP2 的協議協商(ALPN, Application-Layer Protocol Negotiation) 機制,在服務端優先選中 HTTP2,協商過程如下:
curl https://{kube-apiserver}/api/v1/watch/namespaces/default/pods/mysql-0 -v* ALPN, offering h2
* ALPN, offering http/1.1
* SSL verify...
* ALPN, server accepted to use h2
* Using HTTP2, server supports multiplexing
* Connection state changed (HTTP/2 confirmed)
* Copying HTTP/2 data in stream buffer to connection buffer after upgrade: len=0
* Using Stream ID: 1 (easy handle 0x7f2b921a6a90)
> GET /api/v1/watch/namespaces/default/pods/mysql-0 HTTP/2
> Host: 9.165.12.1
> user-agent: curl/7.79.1
> accept: */*
> authorization: Bearer xxx
>
* TLSv1.3 (IN), TLS handshake, Newsession Ticket (4):
* Connection state changed (MAX_CONCURRENT_STREAMS == 250)!< HTTP/2 200
< cache-control: no-cache, private
< content-type: application/json
< date: Thu, 17 Mar 2022 04:46:36 GMT{"type":"ADDED","object":{"kind":"Pod","apiVersion":"v1","metadata":xxx}}
3. APIServer 啟動
APIServer 啟動采用 Cobra 命令行,解析相關 flags 參數,經過 Complete(填充默認值)->Validate(校驗) 邏輯后,通過 Run 啟動服務。啟動入口如下:
// kubernetes/cmd/kube-apiserver/app/server.go
// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {s := options.NewServerRunOptions()cmd := &cobra.Command{Use: "kube-apiserver",...RunE: func(cmd *cobra.Command, args []string) error {...// set default optionscompletedOptions, err := Complete(s)if err != nil {return err}// validate optionsif errs := completedOptions.Validate(); len(errs) != 0 {return utilerrors.NewAggregate(errs)}return Run(completedOptions, genericapiserver.SetupSignalHandler())},}...return cmd
}
在 Run 函數中,按序分別初始化 APIServer 鏈(APIExtensionsServer、KubeAPIServer、AggregatorServer),分別服務于 CRD(用戶自定義資源)、K8s API(內置資源)、API Service(API 擴展資源) 對應的資源請求。相關代碼如下:
// kubernetes/cmd/kube-apiserver/app/server.go
// 創建 APIServer 鏈(APIExtensionsServer、KubeAPIServer、AggregatorServer),分別服務 CRD、K8s API、API Service
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {// 創建 APIServer 通用配置kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)if err != nil {return nil, err}...// 第一:創建 APIExtensionsServerapiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))if err != nil {return nil, err}// 第二:創建 KubeAPIServerkubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)if err != nil {return nil, err}...// 第三:創建 AggregatorServeraggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)if err != nil {// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routinesreturn nil, err}return aggregatorServer, nil
}
之后,經過非阻塞(NonBlockingRun) 方式啟動 SecureServingInfo.Serve,并配置 HTTP2(默認開啟) 相關傳輸選項,最后啟動 Serve 監聽客戶端請求。
4. ETCD 資源封裝
ETCD 實現 Watch 機制,經歷了從 ETCD2 到 ETCD3 實現方式的轉變。ETCD2 通過長輪詢 Long-Polling 的方式監聽資源事件的變更;ETCD3 則通過基于 HTTP2 的 gRPC 實現 Watch stream,性能得到了很大的提升。
Polling(輪詢):由于 http1.x 沒有服務端 push 的機制,為了 Watch 服務端的數據變化,最簡單的辦法當然是客戶端去 pull:客戶端每隔定長時間去服務端拉數據同步,無論服務端有沒有數據變化。但是必然存在通知不及時和大量無效的輪詢的問題。
Long-Polling(長輪詢):就是在這個 Polling 的基礎上的優化,當客戶端發起 Long-Polling 時,如果服務端沒有相關數據,會 hold 住請求,直到服務端有數據要發或者超時才會返回。
在上一步配置 APIServerConfig 時,封裝了底層存儲用的 ETCD。以 kubeAPIServerConfig 為例,說明 K8s 內置資源是如何封裝 ETCD 底層存儲的。
首先,通過 buildGenericConfig 實例化 RESTOptionsGetter,用于封裝 RESTStorage。之后通過 InstallLegacyAPI -> NewLegacyRESTStorage 實例化 K8s 內置資源的 RESTStorage,包括 podStorage、nsStorage、pvStorage、serviceStorage 等,用于 APIServer 在處理客戶端資源請求時,調用的后端資源存儲。
InstallLegacyAPI 源碼如下:
// kubernetes/pkg/controlplane/instance.go
// 注冊 K8s 的內置資源,并封裝到對應的 RESTStorage(如 podStorage/pvStorage)
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter) error {...legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(c.ExtraConfig.APIResourceConfigSource, restOptionsGetter)if err != nil {return fmt.Errorf("error building core storage: %v", err)}if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 { // if all core storage is disabled, return.return nil}controllerName := "bootstrap-controller"coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)bootstrapController, err := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())if err != nil {return fmt.Errorf("error creating bootstrap controller: %v", err)}m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)...return nil
}
在實例化 ETCD 底層存儲中,通過開關 EnableWatchCache 來控制是否啟用 Watch 緩存。如果啟用了,則會先走 StorageWithCacher 邏輯,然后才走 UndecoratedStorage 真正調用底層 ETCD3 存儲。
K8s 當前只支持 ETCD3,不再支持 ETCD2。K8s 充分信任 ETCD3 的 Watch 機制,保證資源狀態與 ETCD 底層存儲的一致性。
整個調用過程如下:
K8s 各類資源(CRD/Core/Aggregator) 都統一以 RESTful 風格暴露 HTTP 請求接口,并支持多種類型的編解碼格式,如 json/yaml/protobuf。
5. 客戶端 Watch 實現
經過上面的步驟,APIServer 服務端已準備好 K8s 各類資源的 RESTStorage(底層封裝了 ETCD3),此時客戶端可通過 RESTful HTTP 接口向 APIServer 發出資源請求,包括 GET/POST/PATCH/WATCH/DELETE 等操作。
客戶端 Watch 包括:
(1). kubectl get xxx -w,獲取某類資源、并持續監聽資源變化;
(2). client-go 中 Reflector ListAndWatch APIServer 各類資源,點此查看;
我們以 kubectl get pod -w 為例,說明客戶端是如何實現資源的 Watch 操作。
首先,kubectl 也是通過 Cobra 命令行解析參數(--watch,或 --watch-only),然后調用 Run 調用 cli-runtime 包下面的 Watch 接口,之后通過 RESTClient.Watch 向 APIServer 發起 Watch 請求,獲得一個流式 watch.Interface,然后不斷從其中 ResultChan 獲取 watch.Event。之后,根據客戶端發送的編解碼類型(json/yaml/protobuf),從 stream 中按幀(Frame) 讀取并解碼(Decode) 數據,輸出顯示到命令行終端。
客戶端通過 RESTClient 發起 Watch 請求,代碼如下:
// kubernetes/staging/src/k8s.io/cli-runtime/pkg/resource/helper.go
func (m *Helper) Watch(namespace, apiVersion string, options *metav1.ListOptions) (watch.Interface, error) {options.Watch = truereturn m.RESTClient.Get().NamespaceIfScoped(namespace, m.NamespaceScoped).Resource(m.Resource).VersionedParams(options, metav1.ParameterCodec).Watch(context.TODO())
}
客戶端 Watch 實現過程小結如下:
6. 服務端 Watch 實現
服務端 APIServer 啟動后,一直在持續監聽著各類資源的變更事件。在接收到某類資源的 Watch 請求后,調用 RESTStorage 的 Watch 接口,通過開關 EnableWatchCache 來控制是否啟用 Watch 緩存,最終通過 etcd3.Watch 封裝實現了 ETCD 底層的 Event 變更事件。
RESTStorage 就是在 APIServer 啟動時候,提前注冊、封裝的 ETCD 資源存儲。
etcd3.watcher 通過兩個 channel(incomingEventChan、resultChan,默認容量都為 100) 實現 ETCD 底層事件到 watch.Event 的轉換,然后通過 serveWatch 流式監聽返回的 watch.Interface,不斷從 resultChan 中取出變更事件。之后,根據客戶端發送的編解碼類型(json/yaml/protobuf),編碼(Encode) 數據,按幀(Frame) 組裝后發送到 stream 中給客戶端。
服務端通過 serveWatch 流式監聽返回的 watch.Interface,代碼如下:
// kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go
func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {return func(w http.ResponseWriter, req *http.Request) {...if opts.Watch || forceWatch {...watcher, err := rw.Watch(ctx, &opts)if err != nil {scope.err(err, w, req)return}requestInfo, _ := request.RequestInfoFrom(ctx)metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {serveWatch(watcher, scope, outputMediaType, req, w, timeout)})return}...}
}
K8s 在 v1.11 之后將 WATCH/WATCHLIST 類型的 action.Verb 廢棄了,統一都交由 LIST -> restfulListResource 處理。
服務端 Watch 實現過程小結如下:
APIServer 除了支持 HTTP2,也支持 WebSocket 通信。當客戶端請求包含了 Upgrade: websocket,Connection: Upgrade 時,則服務端會通過 WebSocket 與客戶端進行數據傳輸。
值得注意的是,底層 ETCD 事件通過 transform 函數轉換為 watch.Event,包括以下幾種類型(Type):
7. 小結
本文通過分析 K8s 中 APIServer 啟動、ETCD watch 封裝、服務端 Watch 實現、客戶端 Watch 實現等核心流程,對 K8s Watch 實現機制進行了解析。通過源碼、圖文方式說明了相關流程邏輯,以期更好的理解 K8s Watch 實現細節。
K8s 底層完全信任 ETCD(ListAndWatch),將各類資源統一抽象為了 RESTful 的存儲(Storage),通過 Watch 機制獲取各類資源的變更事件,然后通過 Informer 機制分發給下游監聽的 ResourceEventHandler,最終由 Controller 實現資源的業務邏輯處理。隨著 ETCD3 在 HTTP/2 基礎上不斷優化完善,K8s 將提供更高效、更穩定的編排能力。