歡迎訪問我的GitHub
這里分類和匯總了欣宸的全部原創(含配套源碼):https://github.com/zq2599/blog_demos
本篇概覽
- 本文是《client-go實戰》系列的第十二篇,又有一個精彩的知識點在本章呈現:選主(leader-election)
- 在解釋什么是選主之前,咱們先來看一個場景(有真實適用場景的技術,學起來才有動力),如下圖所示(稍后有詳細說明)
上圖所描述的業務場景是個普通的controller應用:
- 右側是人工操作,通過kubectl命令修改了service資源
- 左側的業務應用訂閱了service的變化,在收到service變更的事件后,對pod進行寫操作(例如將收到事件的時間寫入pod的label)
- 以上的業務應用就是個很普通的controller,很簡單,運行起來也沒啥問題,但是,如果這個業務應用有多個實例呢?
多實例的問題
- 所謂多個實例,就是同樣的業務應用我們運行了多個進程(例如三個),為什么多個進程?同一個應用運行多個進程不是很正常么?橫向擴容不就是多進程嘛
- 多個進程運行的時候,如果service發生變化,那么每個進程都會去修改pod的label,這不是我們想要的(只要修改一次就行了)
- 所以,如何解決這個問題呢?三個進程都是同一套代碼,都會訂閱service的變化,但是最終只修改一次pod
- 經驗豐富的您應該會想到分布式鎖,三個進程去搶分布式鎖,搶到的負責更新,沒錯,這是一個正確的解法
- 但是,分布式鎖需要引入相關組件吧,redis的setnx,或者mysql的樂觀鎖,這樣就需要維護新的組件了
- 其實這在kubernetes是個很典型的問題,畢竟pod多實例在kubernetes是常態了,所以當然也有官方的解法,頁就是本文的主題:選主(leader-election)
選主(leader-election)
- 說到這里您應該能理解選主的含義了:多個進程競爭某個key的leader,咱們可以把特定的代碼放在競爭成功后再執行,由于同一時刻只有一個進程可以競爭成功,這就相當于在不引入額外組件的情況下,只用client-go就實現了分布式鎖
- 由于選主只是個特定的小知識點,本篇就沒什么多余的理論要研究了,接下來直接開始實戰,編碼實現一個功能來說明選主的用法
- 實戰的業務需求如下
- 開發一個應用,該應用同時運行多個進程
- 當kubernetes的指定namespace下的service發生變化時,在pod的label中記錄這個service的變化時間
- 每次serivce變化,pod的label只能修改一次(盡管此時有多個進程)
- 讓我們少些套路,多一點真誠,不說廢話,直接開始動手實戰吧
源碼下載
- 如果您不想編寫代碼,也可以從GitHub上直接下載,地址和鏈接信息如下表所示(https://github.com/zq2599/blog_demos):
名稱 | 鏈接 | 備注 |
---|---|---|
項目主頁 | https://github.com/zq2599/blog_demos | 該項目在GitHub上的主頁 |
git倉庫地址(https) | https://github.com/zq2599/blog_demos.git | 該項目源碼的倉庫地址,https協議 |
git倉庫地址(ssh) | git@github.com:zq2599/blog_demos.git | 該項目源碼的倉庫地址,ssh協議 |
- 這個git項目中有多個文件夾,本篇的源碼在leader-tutorials文件夾下,如下圖黃框所示:
提前了解選主的代碼
- 接下來會開發一個完整的controller應用,以此來說明選主功能
- 如果您覺得完整應用的代碼太多,懶得看,只想了解選主部分,那就在此提前將整個工程中選主相關的代碼貼出來
- 核心代碼如下所示,先創建鎖對象,就像分布式鎖一樣,總要有個key,然后執行leaderelection.RunOrDie方法參與選主,一旦有了結果,OnNewLeader方法會被回調,這時候通過自身id和leader的id比較就知道是不是自己了,另外,當OnStartedLeading被執行的時候,就意味著當前進程就是leader,并且可以立即開始執行只有leader才能做的事情了
// startLeaderElection 選主的核心邏輯代碼
func startLeaderElection(ctx context.Context, clientset *kubernetes.Clientset, stop chan struct{}) {klog.Infof("[%s]創建選主所需的鎖對象", processIndentify)// 創建鎖對象lock := &resourcelock.LeaseLock{LeaseMeta: metav1.ObjectMeta{Name: "leader-tutorials",Namespace: NAMESPACE,},Client: clientset.CoordinationV1(),LockConfig: resourcelock.ResourceLockConfig{Identity: processIndentify,},}klog.Infof("[%s]開始選主", processIndentify)// 啟動選主操作leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{Lock: lock,ReleaseOnCancel: true,LeaseDuration: 10 * time.Second,RenewDeadline: 5 * time.Second,RetryPeriod: 2 * time.Second,Callbacks: leaderelection.LeaderCallbacks{OnStartedLeading: func(ctx context.Context) {klog.Infof("[%s]當前進程是leader,只有leader才能執行的業務邏輯立即開始", processIndentify)// 在這里寫入選主成功的代碼,// 就像搶分布式鎖一樣,當前進程選舉成功的時候,這的代碼就會被執行,// 所以,在這里填寫搶鎖成功的業務邏輯吧,本例中就是監聽service變化,然后修改pod的labelCreateAndStartController(ctx, clientset, &v1.Service{}, "services", NAMESPACE, stop)},OnStoppedLeading: func() {// 失去了leader時的邏輯klog.Infof("[%s]失去leader身份,不再是leader了", processIndentify)os.Exit(0)},OnNewLeader: func(identity string) {// 收到通知,知道最終的選舉結果if identity == processIndentify {klog.Infof("[%s]選主結果出來了,當前進程就是leader", processIndentify)// I just got the lockreturn}klog.Infof("[%s]選主結果出來了,leader是 : [%s]", processIndentify, identity)},},})
}
實戰:部署service和deployment
- 首先請準備好k8s環境,這在《client-go實戰之六:時隔兩年,刷新版本繼續實戰》里面已有詳細說明
- 然后把本次實戰所需的service和deployment部署好,- 所有要部署的內容我都集中在這個名為nginx-deployment-service.yaml腳本中了
---
apiVersion: apps/v1
kind: Deployment
metadata:namespace: client-go-tutorialsname: nginx-deploymentlabels:app: nginx-apptype: front-end
spec:replicas: 3selector:matchLabels:app: nginx-apptype: front-endtemplate:metadata:labels:app: nginx-apptype: front-end# 這是第一個業務自定義label,指定了mysql的語言類型是c語言language: c# 這是第二個業務自定義label,指定了這個pod屬于哪一類服務,nginx屬于web類business-service-type: webspec:containers:- name: nginx-containerimage: nginx:latestresources:limits:cpu: "0.5"memory: 128Mirequests:cpu: "0.1"memory: 64Mi
---
apiVersion: v1
kind: Service
metadata:namespace: client-go-tutorialsname: nginx-service
spec:type: NodePortselector:app: nginx-apptype: front-endports:- port: 80targetPort: 80nodePort: 30011
- 先執行以下命令創建namespace
kubectl create namespace client-go-tutorials
- 再執行以下命令即可完成資源的創建
kubectl apply -f nginx-deployment-service.yaml
- 來查看一下資源情況,如下圖,service和pod都創建好了,準備工作完成,可以開始編碼了
編碼:準備工程
- 執行命令名為go mod init leader-tutorials,新建module
- 確保您的goproxy是正常的
- 執行命令go get k8s.io/client-go@v0.22.8,下載client-go的指定版本
- 現在工程已經準備好了,接著就是具體的編碼
編碼:梳理
- 咱們按照開發順序開始寫代碼,如果您看過欣宸的《client-go實戰》系列,此刻對使用client-go開發簡易版controller應該很熟悉了,這里再簡單提一下開發的流程
- 將controller完整的寫出來,功能是監聽service,一旦有變化就更新pod的label
- 在主控邏輯中,根據選主結果決定是否啟動步驟1中的controller
- 下面開始寫代碼
編碼:controller
- 新建controller.go文件
- 在controller.go中增加常量和數據結構的定義
package mainimport ("context""encoding/json""fmt""time""k8s.io/klog/v2"metav1 "k8s.io/apimachinery/pkg/apis/meta/v1""k8s.io/apimachinery/pkg/fields"objectruntime "k8s.io/apimachinery/pkg/runtime""k8s.io/apimachinery/pkg/types""k8s.io/apimachinery/pkg/util/runtime""k8s.io/apimachinery/pkg/util/wait""k8s.io/client-go/kubernetes""k8s.io/client-go/tools/cache""k8s.io/client-go/util/workqueue"
)const (LABLE_SERVICE_UPDATE_TIME = "service-update-time" // 這個label用來記錄service的更新時間
)// 自定義controller數據結構,嵌入了真實的控制器
type Controller struct {ctx context.Contextclientset *kubernetes.Clientset// 本地緩存,關注的對象都會同步到這里indexer cache.Indexer// 消息隊列,用來觸發對真實對象的處理事件queue workqueue.RateLimitingInterface// 實際運行運行的控制器informer cache.Controller
}
- 然后是controller的套路代碼,主要是從隊列中不斷獲取數據并處理的邏輯
// processNextItem 不間斷從隊列中取得數據并處理
func (c *Controller) processNextItem() bool {// 注意,隊列里面不是對象,而是key,這是個阻塞隊列,會一直等待key, quit := c.queue.Get()if quit {return false}// Tell the queue that we are done with processing this key. This unblocks the key for other workers// This allows safe parallel processing because two pods with the same key are never processed in// parallel.defer c.queue.Done(key)// 注意,這里的syncToStdout應該是業務代碼,處理對象變化的事件err := c.updatePodsLabel(key.(string))// 如果前面的業務邏輯遇到了錯誤,就在此處理c.handleErr(err, key)// 外面的調用邏輯是:返回true就繼續調用processNextItem方法return true
}// runWorker 這是個無限循環,不斷地從隊列取出數據處理
func (c *Controller) runWorker() {for c.processNextItem() {}
}// handleErr 如果前面的業務邏輯執行出現錯誤,就在此集中處理錯誤,本例中主要是重試次數的控制
func (c *Controller) handleErr(err error, key interface{}) {if err == nil {// Forget about the #AddRateLimited history of the key on every successful synchronization.// This ensures that future processing of updates for this key is not delayed because of// an outdated error history.c.queue.Forget(key)return}// 如果重試次數未超過5次,就繼續重試if c.queue.NumRequeues(key) < 5 {klog.Infof("Error syncing pod %v: %v", key, err)// Re-enqueue the key rate limited. Based on the rate limiter on the// queue and the re-enqueue history, the key will be processed later again.c.queue.AddRateLimited(key)return}// 代碼走到這里,意味著有錯誤并且重試超過了5次,應該立即丟棄c.queue.Forget(key)// 這種連續五次重試還未成功的錯誤,交給全局處理邏輯runtime.HandleError(err)klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}// Run 開始常規的控制器模式(持續響應資源變化事件)
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {defer runtime.HandleCrash()// Let the workers stop when we are donedefer c.queue.ShutDown()klog.Info("Starting Pod controller")go c.informer.Run(stopCh)// Wait for all involved caches to be synced, before processing items from the queue is started// 剛開始啟動,從api-server一次性全量同步所有數據if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))return}// 支持多個線程并行從隊列中取得數據進行處理for i := 0; i < threadiness; i++ {go wait.Until(c.runWorker, time.Second, stopCh)}<-stopChklog.Info("Stopping Pod controller")
}
- 從上述代碼可見,監聽的資源發生變化時,調用的是updatePodsLabel方法,此方法的作用就是查找該namespace下的所有pod,依次用patch的方式更新pod的label
// updatePodsLabel 這是業務邏輯代碼,一旦service發生變化,就修改pod的label,將service的變化事件記錄進去
func (c *Controller) updatePodsLabel(key string) error {// 開始進入controller的業務邏輯klog.Infof("[%s]這里是controller的業務邏輯,key [%s]", processIndentify, key)// 從本地緩存中取出完整的對象_, exists, err := c.indexer.GetByKey(key)if err != nil {klog.Errorf("[%s]根據key[%s]從本地緩存獲取對象失敗 : %v", processIndentify, key, err)return err}if !exists {klog.Infof("[%s]對象不存在,key [%s],這是個刪除事件", processIndentify, key)} else {klog.Infof("[%s]對象存在,key [%s],這是個新增或修改事件", processIndentify, key)}// 代碼走到這里,表示監聽的對象發生了變化,// 按照業務設定,需要修改pod的指定label,// 準備好操作pod的接口podInterface := c.clientset.CoreV1().Pods(NAMESPACE)// 遠程取得最新的pod列表pods, err := podInterface.List(c.ctx, metav1.ListOptions{})if err != nil {klog.Errorf("[%s]遠程獲取pod列表失敗 : %v", processIndentify, err)return err}// 將service的變化時間寫入pod的指定label,這里先獲取當前時間updateTime := time.Now().Format("20060102150405")// 準備patch對象patchData := map[string]interface{}{"metadata": map[string]interface{}{"labels": map[string]interface{}{LABLE_SERVICE_UPDATE_TIME: updateTime,},},}// 轉為byte數組,稍后更新pod的時候,就用這個數組進行patch更新patchByte, _ := json.Marshal(patchData)// 遍歷所有pod,逐個更新labelfor _, pod := range pods.Items {podName := pod.Nameklog.Infof("[%s]正在更新pod [%s]", processIndentify, podName)_, err := podInterface.Patch(c.ctx, podName, types.MergePatchType, patchByte, metav1.PatchOptions{})// 失敗就返回,會導致整體重試if err != nil {klog.Infof("[%s]更新pod [%s]失敗, %v", processIndentify, podName, err)return err}klog.Infof("[%s]更新pod [%s]成功", processIndentify, podName)}return nil
}
- 到這里,controller的代碼已經寫得七七八八了,還剩創建controller對象以及運行informer的代碼,這里將它們集中封裝在一個方法中,一旦這個方法被調用,就意味著controller會被創建,然后監聽service變化再更新pod的label的邏輯就會被執行
// CreateAndStartController 為了便于外部使用,這里將controller的創建和啟動封裝在一起
func CreateAndStartController(ctx context.Context, clientset *kubernetes.Clientset, objType objectruntime.Object, resource string, namespace string, stopCh chan struct{}) {// ListWatcher用于獲取數據并監聽資源的事件podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), resource, NAMESPACE, fields.Everything())// 限速隊列,里面存的是有事件發生的對象的身份信息,而非對象本身queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())// 創建本地緩存并對指定類型的資源開始監聽// 注意,如果業務上有必要,其實可以將新增、修改、刪除等事件放入不同隊列,然后分別做針對性處理,// 但是,controller對應的模式,主要是讓status與spec達成一致,也就是說增刪改等事件,對應的都是查到實際情況,令其與期望情況保持一致,// 因此,多數情況下增刪改用一個隊列即可,里面放入變化的對象的身份,至于處理方式只有一種:查到實際情況,令其與期望情況保持一致indexer, informer := cache.NewIndexerInformer(podListWatcher, objType, 0, cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {key, err := cache.MetaNamespaceKeyFunc(obj)if err == nil {// 再次注意:這里放入隊列的并非對象,而是對象的身份,作用是僅僅告知消費方,該對象有變化,// 至于有什么變化,需要消費方自行判斷,然后再做針對性處理queue.Add(key)}},UpdateFunc: func(old interface{}, new interface{}) {key, err := cache.MetaNamespaceKeyFunc(new)if err == nil {queue.Add(key)}},DeleteFunc: func(obj interface{}) {key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)if err == nil {queue.Add(key)}},}, cache.Indexers{})controller := &Controller{ctx: ctx,clientset: clientset,informer: informer,indexer: indexer,queue: queue,}go controller.Run(1, stopCh)
}
編碼:主控程序(選主邏輯也在里面)
- 本文是講選主(leader-election)的,前面做了這么多鋪墊,主角該上場了,新建main.go文件
- 定義常量,以及全局變量
package mainimport ("context""flag""os""path/filepath""time""github.com/google/uuid"v1 "k8s.io/api/core/v1"metav1 "k8s.io/apimachinery/pkg/apis/meta/v1""k8s.io/client-go/kubernetes""k8s.io/client-go/tools/clientcmd""k8s.io/client-go/tools/leaderelection""k8s.io/client-go/tools/leaderelection/resourcelock""k8s.io/client-go/util/homedir""k8s.io/klog/v2"
)const (NAMESPACE = "client-go-tutorials"
)// 用于表明當前進程身份的全局變量,目前用的是uuid
var processIndentify string
- 先把套路的代碼寫了,就是client-go初始化的那部分,以及main方法,里面是整個程序的啟動和業務調用流程,可見選主有關的代碼都放在名為startLeaderElection的方法中
// initOrDie client有關的初始化操作
func initOrDie() *kubernetes.Clientset {klog.Infof("[%s]開始初始化kubernetes客戶端相關對象", processIndentify)var kubeconfig *stringvar master string// 試圖取到當前賬號的家目錄if home := homedir.HomeDir(); home != "" {// 如果能取到,就把家目錄下的.kube/config作為默認配置文件kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")master = ""} else {// 如果取不到,就沒有默認配置文件,必須通過kubeconfig參數來指定flag.StringVar(kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")flag.StringVar(&master, "master", "", "master url")flag.Parse()}config, err := clientcmd.BuildConfigFromFlags(master, *kubeconfig)if err != nil {klog.Fatal(err)}clientset, err := kubernetes.NewForConfig(config)if err != nil {klog.Fatal(err)}klog.Infof("[%s]kubernetes客戶端相關對象創建成功", processIndentify)return clientset
}func main() {// 一次性確定當前進程身份processIndentify = uuid.New().String()// 準備一個帶cancel的context,這樣在主程序退出的時候,可以將停止的信號傳遞給業務ctx, cancel := context.WithCancel(context.Background())// 這個是用來停止controller的stop := make(chan struct{})// 主程序結束的時候,下面的操作可以將業務邏輯都停掉defer func() {close(stop)cancel()}()// 初始化clientSet配置,因為是啟動階段,所以必須初始化成功,否則進程退出clientset := initOrDie()// 在一個新的協程中執行選主邏輯,以及選主成功的后的邏輯go startLeaderElection(ctx, clientset, stop)// 這里可以繼續做其他事情klog.Infof("選主的協程已經在運行,接下來可以執行其他業務 [%s]", processIndentify)select {}
}
- 最后是選主的代碼,如下所示,先創建鎖對象,就像分布式鎖一樣,總要有個key,然后執行leaderelection.RunOrDie方法參與選主,一旦有了結果,OnNewLeader方法會被回調,這時候通過自身id和leader的id比較就知道是不是自己了,另外,當OnStartedLeading被執行的時候,就意味著當前進程就是leader,并且可以立即開始執行只有leader才能做的事情了
// startLeaderElection 選主的核心邏輯代碼
func startLeaderElection(ctx context.Context, clientset *kubernetes.Clientset, stop chan struct{}) {klog.Infof("[%s]創建選主所需的鎖對象", processIndentify)// 創建鎖對象lock := &resourcelock.LeaseLock{LeaseMeta: metav1.ObjectMeta{Name: "leader-tutorials",Namespace: NAMESPACE,},Client: clientset.CoordinationV1(),LockConfig: resourcelock.ResourceLockConfig{Identity: processIndentify,},}klog.Infof("[%s]開始選主", processIndentify)// 啟動選主操作leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{Lock: lock,ReleaseOnCancel: true,LeaseDuration: 10 * time.Second,RenewDeadline: 5 * time.Second,RetryPeriod: 2 * time.Second,Callbacks: leaderelection.LeaderCallbacks{OnStartedLeading: func(ctx context.Context) {klog.Infof("[%s]當前進程是leader,只有leader才能執行的業務邏輯立即開始", processIndentify)// 在這里寫入選主成功的代碼,// 就像搶分布式鎖一樣,當前進程選舉成功的時候,這的代碼就會被執行,// 所以,在這里填寫搶鎖成功的業務邏輯吧,本例中就是監聽service變化,然后修改pod的labelCreateAndStartController(ctx, clientset, &v1.Service{}, "services", NAMESPACE, stop)},OnStoppedLeading: func() {// 失去了leader時的邏輯klog.Infof("[%s]失去leader身份,不再是leader了", processIndentify)os.Exit(0)},OnNewLeader: func(identity string) {// 收到通知,知道最終的選舉結果if identity == processIndentify {klog.Infof("[%s]選主結果出來了,當前進程就是leader", processIndentify)// I just got the lockreturn}klog.Infof("[%s]選主結果出來了,leader是 : [%s]", processIndentify, identity)},},})
}
- 上述代碼中,請注意LeaderElectionConfig對象的幾個重要字段,例如LeaseDuration、RenewDeadline、RetryPeriod這些,是和選主時候的續租、超時、重試相關,需要按照您的實際網絡情況進行調整
- 現在代碼寫完了,可以開始驗證了
驗證
- 這里捋一下驗證的步驟
- 構建項目,生產二進制文件
- 執行此二進制文件,啟動三個進程
- 觀察日志,應該有一個進程選舉成功,另外兩個只會在日志輸出選主結果
- 修改service資源,再去觀察日志,發現leader進程會輸出日志,再檢查pod的label,發現已經修改
- 用ctrl+C命令將leader進程退出,可見另外兩個進程會有一個成為新的leader
- 再次修改service資源,新的leader會負責更新pod的label
- 接下來開始操作
- 執行命令go build,對當前工程進行編譯構建,得到二進制文件leader-tutorials
- 打開三個終端窗口,輸入同樣的命令./leader-tutorials,選主成功的進程日志如下,之前操作過的殘留,所以沒有一開始就選主成功,而是等了幾秒后才成為leader,一旦成為leader,全量同步service會觸發一次pod的更新操作
- 再去看另外兩個進程的日志,可見已經識別到leader的身份,于是就沒有執行controller的邏輯
- 現在去修改service,用命令kubectl edit service nginx-service -n client-go-tutorials編輯,我這里是給service增加了一個label,如下圖所示
- 此刻,leader進程會監聽到service變化,下圖黃色箭頭以下的內容就是處理pod的日志
- 去看另外兩個進程的日志,不會有任何變化,因為controller都沒有
- 執行以下命令查看pod的修改情況(注意pod的名字要從您自己的環境復制)
kubectl describe pod nginx-deployment-78f6b696d9-cr47w -n client-go-tutorials
- 可以看到pod的label有變化,如下圖黃色箭頭所示,這和上面的leader日志的時間是一致的
- 目前leader進程工作正常,再來試試leader進程退出后的情況,用ctrl+C終止leader進程
- 再去看另外兩個進程的日志,發現其中一個成功成為新的leader
- 驗證完成,都符合預期
- 至此,client-go的選主功能實戰就完成了,如果您在尋找kubernetes原生的分布式鎖方案,希望本篇能給您一些參考
你不孤單,欣宸原創一路相伴
- Java系列
- Spring系列
- Docker系列
- kubernetes系列
- 數據庫+中間件系列
- DevOps系列