etcd 是一個開源的分布式鍵值存儲系統,專注于提供高可用性、強一致性的數據存儲與訪問,廣泛應用于分布式系統的服務發現、配置管理和協調任務。以下是其核心特性和應用場景的詳細介紹。
接下來就看看Etcd如何實現服務注冊,以及如何通過Raft算法保證服務的強一致性和高可用性,還有只針對存儲數據的watch監聽回調功能。
本文的etcd的版本是3.5.20,除此之外,學習歸學習,如果真的在項目中用到了etcd,一定要準備好相關的知識。
常見的etcd相關問題如下:
- 為什么要使用 etcd,怎么用的?
- Raft 算法
- etcd 是如何保證強一致性的呢
- etcd分布式鎖實現的基礎機制是怎樣的
- 能說一說用 etcd 時它處理請求的流程是怎樣的嗎
一.Etcd的基礎知識
1.1 什么是Etcd?
Etcd是一個開源的分布式鍵值存儲系統。
它的設計目的就是提供高可用性和強一致性的分布式數據管理服務,也就是CAP原理中的CP原理
讓我們看看官網的介紹:
- 簡單:etcd 的安裝簡單,且為用戶提供了 HTTP API,用戶使用起來也很簡單
- 存儲:etcd 的基本功能,數據分層存儲在文件目錄中,類似于我們日常使用的文件系統
- Watch 機制:Watch 指定的鍵、前綴目錄的更改,并對更改時間進行通知
- 安全通信:SSL 證書驗證
- 高性能:etcd 單實例可以支持 2k/s 讀操作,官方也有提供基準測試腳本
- 一致可靠:基于 Raft 共識算法,實現分布式系統數據的高可用性、一致性
1.2 Etcd的應用場景有哪些?
Etcd的使用場景很多,比如:
- 作為配置中心
- 服務注冊和發現
- 分布式鎖
- Watch通知
等等還有很多,在后續會介紹它的幾個常用的功能。
除此之外,還要知道Etcd還和Kubernetes深度集成,使用Etcd作為默認的元數據存儲組件
1.3 核心概念學習
對于后續我們要學習的內容,這里做一個簡單的說明:
- key-value存儲
- Lease(租約),TTL
- Watch(監聽機制)
- Revision/ModRevision(MVCC)
除此之外,etcd還自帶一些命令以及事務操作哦,會簡單的介紹一些相關內容
二.核心功能實現
在有了上述基礎的了解,就開始進一步學習一些比較重要的核心功能吧
2.1 鍵值存儲
ETCD的核心功能就是存儲鍵值對,支持一些基礎的命令。
主要是通過etcdctl命令行工具來操作鍵值的,如下:
PUT
:插入或更新鍵值。GET
:獲取鍵值。DELETE
:刪除鍵值。WATCH
:監聽鍵值變化。
# 寫入鍵值
etcdctl put /config/app1/log_level "debug"# 讀取鍵值
etcdctl get /config/app1/log_level
# 輸出:/config/app1/log_level debug# 監聽鍵值變化(另開一個終端執行)
etcdctl watch /config/app1/log_level
2.2 服務注冊和發現
在分布式環境中,業務服務多實例部署,這個時候就會涉及到服務之間的調用,就不能簡單使用編碼的方式指定實例信息。
服務的注冊和發現就是解決如何找到分布式集群中的一個服務(進程),并與之建立聯系。
接下來就來講講這個服務注冊和發現的過程
在這里說明一下:本質上鍵值對,租約和事務都是為這個操作服務的。
看一下服務注冊的過程(帶租約)
- 一開始啟動服務,連接etcd
- 先向etcd發送一個請求,申請一個Lease(租約,TTL=10s)
- 在利用這個租約注冊服務信息(eg:key = /services/{服務名}/{實例id})
- 后續在啟動KeepAlive續租機制
- 若服務崩潰或者斷線,則需要Lease失效,注冊信息自動刪除
服務發現的過程
- 客戶端通過查詢設置好的key,獲取所有value
- 將這些kv保存
- 啟動Watch,監聽內容。做好變更
接下來,就來看一個go語言的實例
下面是我寫的一個簡單的案例,也是有瑕疵的,主要就是方便記憶和嘗試
package etcimport ("UserServer/api/internal/config""context""fmt"clientv3 "go.etcd.io/etcd/client/v3""log""time"
)// 該服務發現是一個簡單的服務發現,在發現服務之后就會關閉,結束協程
// 并不是動態監聽地址的變化,不過一般也不需要,我只寫了一個發現的邏輯
// 首先進行第一次的監聽,如果沒有發現就需要開啟協程進行一個異步監聽
// 如果監聽到就給discover發送關閉消息,select接受到之后就會退出結束這個
// 服務發現的異步協程,釋放資源
// 如果沒有監聽到,會進入超時,也會把這個協程釋放var (discoverDone = make(chan struct{}) // 服務發現完成通知通道
)func Close() {close(discoverDone)
}// DiscoverEtcdService 阻塞式服務發現
func DiscoverEtcdService(etcd *config.Etcd) {cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"},DialTimeout: 5 * time.Second,})if err != nil {log.Fatal("ETCD連接失敗:", err)}defer cli.Close()// 對應注冊的服務名key := "/services/" + etcd.Name// 來個上下文ctx, cancel := context.WithCancel(context.Background())// 首次查詢resp, err := cli.Get(ctx, key)if !handleInitialDiscovery(resp, etcd, cancel) {log.Printf("服務 %s 未注冊,啟動監聽...", etcd.Name)}// 啟動異步監聽go startServiceWatch(cli, key, etcd)// 阻塞等待直到發現服務地址select {case <-ctx.Done(): // 正常發現//這個阻塞的是監聽,而下面那個是則是阻塞的整個服務log.Printf("成功發現服務地址: %s", etcd.Addr)select {case <-discoverDone:fmt.Println("關閉服務")}}
}// 處理初次服務發現
func handleInitialDiscovery(resp *clientv3.GetResponse, etcd *config.Etcd, cancel context.CancelFunc) bool {if len(resp.Kvs) > 0 {// 取第一個健康實例(實際生產環境應做健康檢查)etcd.Addr = string(resp.Kvs[0].Value)cancel()return true}return false
}// 啟動服務監聽
func startServiceWatch(cli *clientv3.Client, key string, etcd *config.Etcd) {// 1. 創建 Watcher,監聽指定 keywatchChan := cli.Watch(context.TODO(), key)// 2. 持續接收事件流for watchResp := range watchChan {// 3. 遍歷每個事件for _, event := range watchResp.Events {// 4. 處理事件processWatchEvent(event, etcd)}}
}// 處理watch事件(返回是否完成發現)
func processWatchEvent(event *clientv3.Event, etcd *config.Etcd) {switch event.Type {case clientv3.EventTypePut:// 只取第一個發現的地址(根據需求可改為列表)newAddr := string(event.Kv.Value)if newAddr == "127.0.0.1:50051" {log.Printf("監聽到服務地址: %s", newAddr)} else {log.Printf("監聽服務地址發生改變: %s", newAddr)}etcd.Addr = newAddr//close(discoverDone) // 通知主流程繼續執行case clientv3.EventTypeDelete:// 生產環境需要處理節點下線邏輯log.Printf("服務實例下線: %s", string(event.Kv.Key))}
}
package etcimport ("UserServer/rpc/internal/config""context""fmt"clientv3 "go.etcd.io/etcd/client/v3""log""os""os/signal""sync/atomic""syscall""time"
)// 寫一個服務注冊,rpc 服務端口為127.0.0.1:50051var (shutdownFlag int32 // 原子標記位用于優雅關閉控制
)func RegisterEtcdService(etcd config.Etcd) {// 注冊etcd服務etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"},DialTimeout: 5 * time.Second,})if err != nil {log.Fatal(err)}defer etcdCli.Close() // 確保關閉連接// etcd的鍵key := "/services/" + etcd.NameleaseID, err := registerWithRetry(etcdCli, key, etcd.Address, 3)if err != nil {log.Fatal("初始注冊失敗:", err)}// 心跳協程(帶重試)// 信號處理 , 用于捕獲操作系統發送的信號sigCh := make(chan os.Signal, 1)// 將指定的信號(SIGINT 和 SIGTERM)綁定到通道 sigChsignal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)// 主循環for {select {case <-sigCh:atomic.StoreInt32(&shutdownFlag, 1)log.Println("接收到關閉信號,開始清理...")revokeAndDelete(etcdCli, key, leaseID)returncase <-time.After(5 * time.Second):// 定期檢查租約狀態(生產環境可選)}}}// registerWithRetry 注冊邏輯(自帶重試)
// 參數介紹:參數是etcd的服務端+服務注冊的key + 服務地址 + 重試次數
func registerWithRetry(cli *clientv3.Client, key, addr string, maxRetry int) (clientv3.LeaseID, error) {retryInterval := 1 * time.Secondfor i := 0; i < maxRetry; i++ {// 申請新租約leaseResp, err := cli.Grant(context.Background(), 15)if err != nil {log.Printf("租約申請失敗(%d/%d): %v", i+1, maxRetry, err)time.Sleep(retryInterval)retryInterval *= 2 // 指數退避continue}// 注冊服務if _, err = cli.Put(context.Background(), key, addr, clientv3.WithLease(leaseResp.ID)); err != nil {log.Printf("服務注冊失敗(%d/%d): %v", i+1, maxRetry, err)time.Sleep(retryInterval)retryInterval *= 2continue}fmt.Println("注冊成功", leaseResp.ID)// 啟動心跳協程go startKeepAlive(cli, leaseResp.ID, key, addr)//測試服務監聽//time.Sleep(10 * time.Second)//NewLeaseResp, err := cli.Grant(context.Background(), 15)//if err != nil {// log.Printf("租約申請失敗(%d/%d): %v", i+1, maxRetry, err)// time.Sleep(retryInterval)// retryInterval *= 2 // 指數退避// continue//}//if _, err = cli.Put(context.Background(), key, "127.0.0.1:50052", clientv3.WithLease(NewLeaseResp.ID)); err != nil {// log.Printf("服務注冊失敗(%d/%d): %v", i+1, maxRetry, err)// time.Sleep(retryInterval)// retryInterval *= 2// continue//}//fmt.Println(NewLeaseResp.ID)return leaseResp.ID, nil}return 0, fmt.Errorf("超過最大重試次數")
}// 保持心跳
func startKeepAlive(cli *clientv3.Client, leaseID clientv3.LeaseID, key, addr string) {retryCount := 0maxRetry := 3keepAliveCh, err := cli.KeepAlive(context.Background(), leaseID)if err != nil {log.Printf("心跳初始化失敗: %v", err)return}for {select {case kaResp, ok := <-keepAliveCh:if atomic.LoadInt32(&shutdownFlag) == 1 {return}if !ok {log.Println("心跳通道異常關閉")if retryCount >= maxRetry {log.Fatal("連續心跳失敗,終止服務")}// 嘗試重新注冊newLeaseID, err := registerWithRetry(cli, key, addr, maxRetry)if err != nil {retryCount++time.Sleep(time.Duration(retryCount) * time.Second)continue}log.Printf("新租約 %x 注冊成功,終止舊心跳協程", newLeaseID)return}retryCount = 0log.Printf("租約 %x 心跳成功, TTL: %d", kaResp.ID, kaResp.TTL)}}
}// 清理注冊信息
func revokeAndDelete(cli *clientv3.Client, key string, leaseID clientv3.LeaseID) {// 1. 撤銷租約if _, err := cli.Revoke(context.Background(), leaseID); err != nil {log.Printf("租約撤銷失敗: %v", err)} else {log.Println("租約已撤銷")}// 2. 刪除鍵if _, err := cli.Delete(context.Background(), key); err != nil {log.Printf("鍵刪除失敗: %v", err)} else {log.Println("服務鍵已刪除")}
}
2.3 基于Etcd的分布式鎖
分布式鎖的實現除了使用redis之外,使用Etcd也是可以實現的。
由于Etcd是基于Raft算法,實現分布式集群的一致性,存儲到etcd集群中的值必然是一致的,因此基于etcd非常容易實現分布式鎖。
這里說一下它實現的基本原理:
首先是加鎖
- 創建一個唯一的key,使用put + withLease + if-not-exists
- 申請租約,分配一個TTL
- 之后定期KeepAlive,防止失效
解鎖
解鎖相對就比較簡單了,直接刪除就可以。
除此之外,還可以寫成監聽回調性(使用etcd的Watch機制)
來看一個簡單的案例
func Lock(cli *clientv3.Client, key, value string, ttl int64) (clientv3.LeaseID, int64, error) {// 1. 創建租約leaseResp, err := cli.Grant(context.TODO(), ttl)if err != nil {return 0, 0, err}leaseID := leaseResp.ID// 2. 事務加鎖(key 不存在就寫入)txn := cli.Txn(context.TODO())txnResp, err := txn.If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0),).Then(clientv3.OpPut(key, value, clientv3.WithLease(leaseID)),).Commit()if err != nil {return 0, 0, err}if !txnResp.Succeeded {return 0, 0, fmt.Errorf("lock failed: key already exists")}// 3. 從 Put 返回中獲取 CreateRevision(用于解鎖校驗)putResp := txnResp.Responses[0].GetResponsePut()createRev := putResp.Header.Revisionreturn leaseID, createRev, nil
}
//返回的 createRev 你可以保存到結構體中,在解鎖時使用。
func Unlock(cli *clientv3.Client, key string, createRev int64) error {txn := cli.Txn(context.TODO())txnResp, err := txn.If(clientv3.Compare(clientv3.CreateRevision(key), "=", createRev),).Then(clientv3.OpDelete(key),).Commit()if err != nil {return err}if !txnResp.Succeeded {return fmt.Errorf("unlock failed: not lock owner")}return nil
}
在后續過程中會詳細介紹這個過程,可以先簡單過一下。
除此之外,ETCD也提供了官方的分布式鎖
go get go.etcd.io/etcd/client/v3/concurrency
它的優點就是自動續期,阻塞排隊,順序節點,無需手動處理租約,還可以避免死鎖,可靠性高。
import ("go.etcd.io/etcd/client/v3""go.etcd.io/etcd/client/v3/concurrency"
)func main() {cli, _ := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"},DialTimeout: 5 * time.Second,})defer cli.Close()// 創建 Session(帶租約)session, _ := concurrency.NewSession(cli)defer session.Close()// 創建一個分布式鎖對象mutex := concurrency.NewMutex(session, "/my-lock/")// 加鎖(阻塞直到成功)fmt.Println("Acquiring lock...")mutex.Lock(context.TODO())fmt.Println("Got lock!")// 模擬業務處理time.Sleep(3 * time.Second)// 解鎖mutex.Unlock(context.TODO())fmt.Println("Released lock.")
}
2.4 Revision
這里要介紹一下:Etcd是一個多版本并發控制的一個強一致性的KV存儲,她會為每次寫入操作都分配一個單調遞增的Revision,每個key都會帶有兩個重要字段如下:
字段 | 含義 |
CreateRevision | 該 key 第一次被創建時的 revision |
ModRevision | 該 key 最近一次被修改時的 revision |
Version | 該 key 被修改了幾次(包括創建) |
如果這個key不存在就表示它的createRevision=0
還記得之前的簡單的分布式鎖案例嗎,我們回過頭來看看
func Lock(cli *clientv3.Client, key, value string, ttl int64) (clientv3.LeaseID, int64, error) {// 1. 創建租約leaseResp, err := cli.Grant(context.TODO(), ttl)if err != nil {return 0, 0, err}leaseID := leaseResp.ID// 2. 事務加鎖(key 不存在就寫入)txn := cli.Txn(context.TODO())txnResp, err := txn.If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0),).Then(clientv3.OpPut(key, value, clientv3.WithLease(leaseID)),).Commit()if err != nil {return 0, 0, err}if !txnResp.Succeeded {return 0, 0, fmt.Errorf("lock failed: key already exists")}// 3. 從 Put 返回中獲取 CreateRevision(用于解鎖校驗)putResp := txnResp.Responses[0].GetResponsePut()createRev := putResp.Header.Revisionreturn leaseID, createRev, nil
}
//返回的 createRev 你可以保存到結構體中,在解鎖時使用。
Compare(CreateRevision(key), "=", 0) 這個函數就是一個比較函數,返回bool類型
這個操作就相當于:如果這個key不存在,才可以加鎖。
加鎖完成之后,拿到它的createRev,以確保這把鎖的主人,可以寫一個結構體,讓他得到這些信息,這樣的話,解鎖的時候也可以方便一些。
這里可能有同學會好奇,為啥不用ModRev呢?
這是因為
- 如果你續租時也可能更新 key 的值(比如寫入心跳信息或更新時間戳),那么
ModRevision
會發生變化。 - 這時候不能用
ModRevision
來判斷鎖的歸屬權,否則續租一次就“失去了身份”。 - 但如果你只是用租約續租,而不更新 key 的值(只續租 Lease 本身),那么
ModRevision
不會變,此時它也可以用來判斷鎖歸屬。
三.ETCD集群架構與高可用設計
3.1 Etcd集群的核心架構
Etcd通常采用多節點集群架構,每個節點通過Raft共識算法實現數據同步。集群中的節點分為一下角色:
- Leader:負責接收客戶端請求、管理日志復制和提交操作。
- Follower:被動接收 Leader 的日志并同步數據。
- Candidate:在 Leader 失效時參與選舉的臨時角色。
一般情況下,集群的最小規模:
為保證高可用性,ETCD集群至少需要3個節點。
四.Raft共識算法
Raft是一種分布式共識算法,廣泛被應用于分布式系統中的一致性維護場景,比如:etcd等
該章節會大致說一下Raft的一些核心內容。
先說說Raft是解決什么問題的----答案顯而易見就是解決分布式場景下數據不一致的問題。
接下來看看他是如何解決的吧
4.1 原理概述
Raft這些分布式共識算法就是用來多個節點之間達成共識的,其可以解決一定的一致性問題
遵循Raft算法的分布式集群中每個節點扮演三種角色,在之前也提到過:
- leader:領導者,其負責和客戶端通信,接收來自客戶端的命令并將其轉發給follower
- follower:跟隨者,其一絲不茍的執行來自leader的命令
- candidate:候選者,當follower長時間沒收到 leader的消息就會揭竿而起成為候選者,搶奪成為leader的資格
從上面的描述我們可以看到節點的角色不是固定的,其會在三個角色中轉換。
假設說現在有三個節點ABC,一開始都處于follower的狀態。
注:在Raft算法中,所有節點會被分配不同的超時時間,時間限定在150ms~300ms之間。
為什么這么設置?
是因為如果設置相同的超時時間就會導致所有節點同時過期會導致遲遲選不出leader,看到后面就會明白。
這里150ms過去之后,A會發現怎么leader沒跟我聯絡聯絡感情,是不是leader已經寄了?王侯將相寧有種乎!于是A成為候選人給自己投了一票并開創自己的時代時期 1,并給其他還沒過期的follower發送信息請求它們支持自己當leader。
節點B和C在收到來自A的消息之后,又沒有收到其他要求稱王者的信息,于是就選擇支持A節點,加入A的時代并刷新自己的剩余時間。
之后 A 得到了超過一半的節點支持,成為leader,并定時給B和C聯絡聯絡感情(心跳信息)目的是防止有節點因為長時間收不到開始反叛成candidate。
之后整個分布式集群就可以和客戶端開始通信了,客戶端會發送消息給leader,之后leader會保證集群的一致性并且當整個集群中的一半節點都完成客戶端發送的命令之后才會真正的返回給客戶端,表示完成此次命令。
上述的描述只是一個raft算法的一個概述,只是冰山一角,我們還缺少億點點細節:
- 選舉時的特殊情況
- 日志復制
在這里說一下選舉時會遇見的一些特殊情況
- 新加入節點
- leader掉線
- 多個follower同時過期等
4.2 日志復制
當我們的集群完成選舉之后,Leader負責接收客戶端寫請求,然后轉化為log復制命令,發送并通知其它節點完成日志復制請求。每個日志復制請求包括狀態機命令,任期號,同時還有前一個日志的任期號和日志索引。
- 狀態機命令表示客戶端請求的數據操作指令。
- 任期號表示 leader 的當前任期,任期也就是上圖中的時期。
說一下流程:
- 客戶端請求發送到 Leader。
- Leader 將請求封裝為日志條目,追加到本地日志。
- Leader 并行向所有 Follower 發送 AppendEntries(攜帶日志)。
- 若多數 Follower 成功寫入,則日志被“提交”(commit)。
- Leader 向客戶端返回結果,并將日志應用到狀態機。
Follower 持久化日志后返回成功,失敗則 Leader 重試。
看一下Follower收到日志復制命令需要執行的處理流程:
- follower 會使用前一個日志的任期號和日志索引來對比自己的數據:
- 如果相同,接收復制請求,回復 ok;
- 否則回拒絕復制當前日志,回復 error;
- leader 收到拒絕復制的回復后,繼續發送節點日志復制請求,不過這次會帶上更前面的一個日志任期號和索引;
- 如此循環往復,直到找到一個共同的任期號&日志索引。此時 follower 從這個索引值開始復制,最終和 leader 節點日志保持一致;
這里為什么不直接用任期號來判斷的原因:任期號只是單調遞增但不唯一標識日志,他只只是表示時期,但不是表示日志