在分布式系統中,etcd 的一致性與高效性得益于其強大的 Raft 協議模塊。而 processInternalRaftRequestOnce
是 etcd 服務器處理內部 Raft 請求的核心方法之一。本文將從源碼角度解析這個方法的邏輯流程,幫助讀者更好地理解 etcd 的內部實現。
方法源碼
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {ai := s.getAppliedIndex()ci := s.getCommittedIndex()if ci > ai+maxGapBetweenApplyAndCommitIndex {return nil, ErrTooManyRequests}r.Header = &pb.RequestHeader{ID: s.reqIDGen.Next(),}// check authinfo if it is not InternalAuthenticateRequestif r.Authenticate == nil {authInfo, err := s.AuthInfoFromCtx(ctx)if err != nil {return nil, err}if authInfo != nil {r.Header.Username = authInfo.Usernamer.Header.AuthRevision = authInfo.Revision}}data, err := r.Marshal()if err != nil {return nil, err}if len(data) > int(s.Cfg.MaxRequestBytes) {return nil, ErrRequestTooLarge}id := r.IDif id == 0 {id = r.Header.ID}ch := s.w.Register(id)cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())defer cancel()start := time.Now()err = s.r.Propose(cctx, data)if err != nil {proposalsFailed.Inc()s.w.Trigger(id, nil) // GC waitreturn nil, err}proposalsPending.Inc()defer proposalsPending.Dec()select {case x := <-ch:return x.(*applyResult), nilcase <-cctx.Done():proposalsFailed.Inc()s.w.Trigger(id, nil) // GC waitreturn nil, s.parseProposeCtxErr(cctx.Err(), start)case <-s.done:return nil, ErrStopped}
}
方法解析
1. 校驗狀態與索引
ai := s.getAppliedIndex()
ci := s.getCommittedIndex()
if ci > ai+maxGapBetweenApplyAndCommitIndex {return nil, ErrTooManyRequests
}
getAppliedIndex
和 getCommittedIndex
分別獲取當前節點的已應用索引和已提交索引。如果兩者的差值過大,說明節點存在過多未應用的日志條目,可能導致性能問題,因此直接返回錯誤。
maxGapBetweenApplyAndCommitIndex
:定義了允許的最大索引差距。- 防止機制:避免提交速度過快導致內存積壓。
2. 生成請求頭
r.Header = &pb.RequestHeader{ID: s.reqIDGen.Next(),
}
每個請求分配一個唯一的 ID
,以便后續跟蹤和處理。
3. 身份驗證檢查
if r.Authenticate == nil {authInfo, err := s.AuthInfoFromCtx(ctx)if err != nil {return nil, err}if authInfo != nil {r.Header.Username = authInfo.Usernamer.Header.AuthRevision = authInfo.Revision}
}
- 目的:除認證請求外,其他請求需要驗證用戶身份。
- 邏輯:
- 調用
AuthInfoFromCtx
從上下文中提取用戶身份。 - 將身份信息寫入請求頭,供后續處理。
- 調用
4. 請求大小檢查
if len(data) > int(s.Cfg.MaxRequestBytes) {return nil, ErrRequestTooLarge
}
- 目的:防止超大請求導致內存或網絡問題。
- 機制:檢查請求序列化后的大小是否超過配置的最大限制。
5. 注冊請求等待通道
id := r.ID
if id == 0 {id = r.Header.ID
}
ch := s.w.Register(id)
- 注冊通道:使用請求
ID
在s.w
(wait 組件)中注冊一個等待通道,用于異步獲取結果。
6. 發起 Raft 提案
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()start := time.Now()
err = s.r.Propose(cctx, data)
- 發起提案:調用
s.r.Propose
將請求數據交給 Raft 模塊進行分布式一致性處理。 - 超時控制:通過
Context.WithTimeout
設置提案的最大執行時間,避免長期阻塞。 - 錯誤處理:如果提案失敗,增加失敗計數,并觸發通道清理。
7. 等待提案結果
select {
case x := <-ch:return x.(*applyResult), nil
case <-cctx.Done():proposalsFailed.Inc()s.w.Trigger(id, nil) // GC waitreturn nil, s.parseProposeCtxErr(cctx.Err(), start)
case <-s.done:return nil, ErrStopped
}
- 等待邏輯:
- 通道
ch
:正常返回應用結果。 - 上下文超時:處理超時錯誤,并清理等待通道。
- 服務關閉:直接返回停止錯誤。
- 通道
- 觸發機制:使用
Trigger
清理通道,避免資源泄露。
8. 性能指標統計
proposalsPending.Inc()
:增加當前掛起的提案計數。proposalsFailed.Inc()
:統計失敗提案次數。
關鍵邏輯總結
processInternalRaftRequestOnce
方法的核心邏輯可分為以下幾個階段:
- 預檢查:檢查索引狀態、請求大小和用戶認證。
- 請求處理:序列化請求并將其提交到 Raft 模塊。
- 結果等待:通過通道或超時控制獲取提案的處理結果。
流程圖
zz總結
processInternalRaftRequestOnce
是 etcd 服務端處理內部 Raft 請求的重要方法,它結合了請求校驗、身份認證、Raft 提案以及結果返回的完整邏輯鏈條。理解其實現,可以幫助我們深入掌握 etcd 的核心一致性協議和服務端處理流程。