這些狀態類是 Raft 協議行為的核心載體。它們包含轉移邏輯 和 節點在特定狀態下的所有行為和數據。
QuorumState
它是 KRaft 客戶端實現中狀態管理的核心,扮演著“狀態機上下文(Context)”和“狀態轉換協調者”的關鍵角色。
QuorumState
?是整個 Raft 狀態機的“大腦”和“協調中心”。它的核心職責是:
- 維護當前狀態: 持有一個?
volatile EpochState state
?字段,該字段引用了當前節點所處的具體狀態對象(如?FollowerState
,?LeaderState
?等)。 - 執行狀態轉換: 提供一系列?
transitionToXXX
?方法(如?transitionToCandidate
,?transitionToFollower
),這些方法負責創建新的狀態對象并替換舊的,從而完成狀態的切換。 - 保證轉換的有效性: 在執行狀態轉換前,會進行嚴格的檢查,確保轉換是合法的。例如,只有?
CandidateState
?才能轉換到?LeaderState
。 - 持久化選舉狀態: 關鍵的選舉狀態(如當前任期、投票給了誰)需要被持久化,以便節點重啟后能恢復。
QuorumState
?負責調用?QuorumStateStore
?來完成這一任務。 - 提供統一視圖: 向外層(
KafkaRaftClient
)提供一個統一的、穩定的接口來查詢當前 Raft 集群的狀態(如當前 Leader、任期、高水位等),而無需關心內部具體是哪個狀態對象在工作。
從類的注釋中可以清晰地看到它對所有可能的狀態轉換路徑進行了詳細的定義:
// ... existing code ...
/*** This class is responsible for managing the current state of this node and ensuring* only valid state transitions. Below we define the possible state transitions and* how they are triggered:** Resigned transitions to:* Unattached: After learning of a new election with a higher epoch, or expiration of the election timeout* Follower: After discovering a leader with a larger epoch** Unattached transitions to:* Unattached: After learning of a new election with a higher epoch or after giving a binding vote* Prospective: After expiration of the election timeout* Follower: After discovering a leader with an equal or larger epoch** ... (and so on for all other states)*/
public class QuorumState {
// ... existing code ...
核心字段
QuorumState
?聚合了 Raft 節點運行所需的各種上下文信息。
// ... existing code ...
public class QuorumState {private final OptionalInt localId;private final Uuid localDirectoryId;private final Time time;private final Logger log;private final QuorumStateStore store;private final KRaftControlRecordStateMachine partitionState;private final Endpoints localListeners;private final SupportedVersionRange localSupportedKRaftVersion;private final Random random;private final int electionTimeoutMs;private final int fetchTimeoutMs;
// ... existing code ...private volatile EpochState state;// ... existing code ...
}
state
: 最核心的字段,一個?volatile
?引用,指向當前的狀態對象。volatile
?保證了多線程間的可見性。localId
,?localDirectoryId
: 本地節點的唯一標識。OptionalInt
?的使用表明節點可能作為無 ID 的觀察者(Observer)運行。store
:?QuorumStateStore
?的實例,通常是?FileQuorumStateStore
,負責將選舉狀態(ElectionState
)讀寫到磁盤上的?quorum-state
?文件中。partitionState
:?KRaftControlRecordStateMachine
?的實例,它負責管理 Voter 集合(VoterSet
)的狀態。QuorumState
?需要從它這里獲取最新的 Voter 信息。electionTimeoutMs
,?fetchTimeoutMs
: 選舉超時和 Fetch 超時的配置值,用于創建狀態對象時傳入。time
,?random
,?logContext
: 時間、隨機數生成器、日志上下文等工具類。
初始化邏輯 (initialize
)
initialize
?方法是?QuorumState
?生命周期的起點,它負責在節點啟動時,根據持久化的狀態和日志狀態,決定節點應該進入哪個初始狀態。
// ... existing code ...public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateException {// 1. 從 store 讀取上次持久化的選舉狀態ElectionState election = readElectionState();final EpochState initialState;// ... 一系列復雜的 if-else if-else 判斷 ...// 2. 根據 election state 和日志狀態決定初始狀態if (localId.isPresent() && election.isLeader(localId.getAsInt())) {// 如果上次是 Leader,則初始化為 ResignedStateinitialState = new ResignedState(...);} else if (localId.isPresent() &&election.isVotedCandidate(...)) {// 如果上次是 Candidate,則初始化為 CandidateStateinitialState = new CandidateState(...);} else if (election.hasLeader()) {// 如果知道 Leader,則初始化為 FollowerState (如果能找到 Leader 的地址)// 否則初始化為 UnattachedState// ...} else {// 其他情況,初始化為 UnattachedStateinitialState = new UnattachedState(...);}// 3. 完成初始狀態的轉換durableTransitionTo(initialState);}
// ... existing code ...
這個方法的邏輯非常嚴謹,覆蓋了各種重啟場景:
- 如果重啟前是 Leader,會進入?
ResignedState
。這是一種安全機制,可以防止它在同一個任期內為其他候選人投票,并確保日志的單調性。 - 如果重啟前是 Candidate,會重新進入?
CandidateState
。 - 如果重啟前是 Follower,會嘗試重新成為 Follower,但如果找不到 Leader 的網絡地址,則會退化到?
UnattachedState
。 - 在大多數不確定的情況下,最安全的選擇是進入?
UnattachedState
,等待集群的最新消息。
狀態轉換管理
QuorumState
?提供了一系列?transitionToXXX
?方法來驅動狀態轉換。這些方法是狀態機運轉的齒輪。
持久化轉換 vs. 內存轉換
QuorumState
?定義了兩種轉換方式:
durableTransitionTo(EpochState newState)
: 持久化轉換。- 它首先調用?
store.writeElectionState(...)
?將新狀態的選舉信息(任期、投票給誰、Leader是誰)寫入磁盤。 - 然后調用?
memoryTransitionTo(newState)
?完成內存中狀態對象的切換。 - 適用場景: 用于那些會改變持久化選舉狀態的轉換。例如,進入一個新的任期、投票給一個候選人、選舉出新的 Leader。
- 它首先調用?
memoryTransitionTo(EpochState newState)
: 純內存轉換。- 它只在內存中用?
newState
?替換當前的?state
?對象。 - 適用場景: 用于那些不影響持久化選舉狀態的轉換。例如,從 Leader 轉換到?
ResignedState
,因為?ResignedState
?是一個臨時的軟狀態,重啟后總會回到?ResignedState
,所以無需持久化。
- 它只在內存中用?
// ... existing code ...private void durableTransitionTo(EpochState newState) {log.info("Attempting durable transition to {} from {}", newState, state);store.writeElectionState(newState.election(), partitionState.lastKraftVersion());memoryTransitionTo(newState);}private void memoryTransitionTo(EpochState newState) {if (state != null) {state.close();}this.state = newState;log.info("Completed transition to {}", state);}
// ... existing code ...
作為狀態上下文 (Context)
QuorumState
?封裝了內部狀態的復雜性,對外提供了一致的查詢接口。KafkaRaftClient
?不需要知道當前是?FollowerState
?還是?LeaderState
,它只需要調用?QuorumState
?的方法。
// ... existing code ...public int epoch() {return state.epoch(); // 委托給當前 state 對象}public OptionalInt leaderId() {ElectionState election = state.election(); // 委托給當前 state 對象if (election.hasLeader())return OptionalInt.of(state.election().leaderId());elsereturn OptionalInt.empty();}public boolean isLeader() {return state instanceof LeaderState; // 通過類型判斷提供狀態查詢}public boolean isFollower() {return state instanceof FollowerState;}
// ... existing code ...
這些方法將具體的行為委托給了?this.state
?對象,或者通過?instanceof
?檢查來回答關于當前狀態的問題。這正是狀態設計模式中上下文(Context)對象的典型實現。
總結
QuorumState
?是 KRaft 狀態管理的核心樞紐。它通過應用狀態設計模式,將復雜的 Raft 協議邏輯清晰地分解到各個狀態類中。
- 它作為上下文(Context),聚合了所有必要的信息,并為上層提供了統一的交互接口。
- 它作為協調者,嚴格管理著狀態之間的轉換,并通過與?
QuorumStateStore
?的交互,保證了選舉狀態的持久性和節點重啟后的一致性。 - 它的?
initialize
?方法是系統魯棒性的重要體現,確保了節點在任何情況下都能以一個安全、一致的狀態啟動。
理解了?QuorumState
?的設計,就等于掌握了 KRaft 客戶端狀態機如何組織和運轉的藍圖。
EpochState
接口
在 Kafka 的 Raft 實現中(位于?org.apache.kafka.raft
?包下),EpochState
?扮演著核心角色。Raft 協議將時間劃分為一個個連續的任期(Term),在 Kafka 的實現中這被稱為?Epoch。在任何一個給定的 Epoch 中,一個 Raft 節點都必然處于某一種狀態,例如:領導者(Leader)、跟隨者(Follower)或候選人(Candidate)。
EpochState
?接口就是為了抽象和統一這些不同狀態下的共同行為和屬性。無論是 Leader、Follower 還是 Candidate,它們都共享一些基本特征:
- 它們都屬于某一個特定的?
epoch
。 - 它們都需要處理投票請求 (
canGrantVote
)。 - 它們都需要能提供當前的選舉狀態 (
election
)。
通過定義這樣一個接口,代碼變得更加模塊化和可擴展。Raft 的核心邏輯可以面向?EpochState
?接口編程,而不用關心當前節點具體是哪一種狀態,從而簡化了狀態切換和管理的復雜性。
EpochState
?接口繼承了?java.io.Closeable
。這是一個非常重要的設計細節。
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with//... 代碼省略 ...
package org.apache.kafka.raft;import java.io.Closeable;
import java.util.Optional;public interface EpochState extends Closeable {
//... 代碼省略 ...
Closeable
?接口只有一個方法:void close() throws IOException
。一個類實現?Closeable
?意味著它的實例管理著需要被顯式關閉的資源,例如文件句柄、網絡連接或定時器等。
為什么?EpochState
?需要?Closeable
?
不同的 Raft 狀態可能需要管理不同的生命周期性資源。例如:
- Follower/Candidate/ProspectiveState: 這些狀態通常需要一個選舉定時器?(
electionTimer
)。如果在一個超時時間內沒有收到 Leader 的心跳,Follower 就會轉變為 Candidate并發起選舉。當狀態切換時(比如從 Follower 切換到 Leader),舊狀態的定時器就需要被取消或關閉,以防止不必要的選舉和資源泄漏。 - Leader: Leader 狀態可能需要管理與所有 Follower 的心跳定時器。
通過繼承?Closeable
,EpochState
?的實現類可以被?try-with-resources
?語句管理,這確保了當一個狀態對象不再被使用時,它的?close()
?方法會被自動調用,從而安全地釋放其占有的資源。
例如,在?ResignedState.java
?中,close()
?方法是空的,因為它不管理需要釋放的資源。
//... 代碼省略 ...@Overridepublic void close() {}
}
但在其他狀態(如?FollowerState
?或?CandidateState
)中,close()
?方法會有關閉定時器等重要邏輯。
方法
//... 代碼省略 ...
public interface EpochState extends Closeable {default Optional<LogOffsetMetadata> highWatermark() {return Optional.empty();}/*** Decide whether to grant a vote to a replica.** It is the responsibility of the caller to invoke* {@link QuorumState#unattachedAddVotedState(int, ReplicaKey)} if a standard vote is granted.** @param replicaKey the id and directory of the replica requesting the vote* @param isLogUpToDate whether the replica's log is at least as up-to-date as receiver’s log* @param isPreVote whether the vote request is a PreVote (non-binding) or standard vote* @return true if it can grant the vote, false otherwise*/boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolean isPreVote);/*** Get the current election state, which is guaranteed to be immutable.*/ElectionState election();/*** Get the current (immutable) epoch.*/int epoch();/*** Returns the known endpoints for the leader.** If the leader is not known then {@code Endpoints.empty()} is returned.*/Endpoints leaderEndpoints();/*** User-friendly description of the state*/String name();
}
default Optional<LogOffsetMetadata> highWatermark()
- 作用: 獲取當前的高水位(High Watermark)。高水位是 Raft 協議中一個至關重要的概念,它代表了已經被集群中大多數節點確認(committed)的日志條目的最高偏移量。所有低于高水位的日志都可以被安全地應用到狀態機。
- 設計: 這是一個?
default
?方法,默認返回一個空的?Optional
。這意味著不是所有狀態都必須直接提供高水位信息。通常,只有?Leader?狀態會主動計算和維護高水位。Follower 狀態的高水位是從 Leader 的心跳消息中更新的。
boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolean isPreVote)
- 作用: 這是 Raft 領導者選舉的核心邏輯之一。當一個節點收到另一個節點(候選人)的投票請求時,它會調用此方法來決定是否將自己的選票投給該候選人。
- 參數:
replicaKey
: 請求投票的候選人的唯一標識。isLogUpToDate
: Raft 的一個安全機制,確保選票只會投給那些日志記錄至少和當前節點一樣新的候選人。isPreVote
: 區分“預投票”和“正式投票”。預投票是一種優化,用于在不增加?epoch
?的情況下探測一個節點是否能贏得選舉,從而避免網絡分區恢復時產生不必要的選舉干擾。
- 返回值:?
true
?表示同意投票,false
?表示拒絕。
ElectionState election()
- 作用: 獲取當前(不可變的)選舉狀態。
ElectionState
?對象封裝了關于當前選舉周期的詳細信息,例如:當前?epoch
、已知的 Leader 是誰、本節點投票給了誰、以及所有投票者的集合。 - 設計: 返回一個不可變對象是并發編程中的一種良好實踐,它保證了狀態信息在被外部讀取時不會被意外修改,增強了線程安全性。
- 作用: 獲取當前(不可變的)選舉狀態。
int epoch()
- 作用: 獲取當前狀態所屬的任期號(epoch)。
- 重要性: Epoch 在 Raft 中扮演邏輯時鐘的角色。它單調遞增,用于識別和拒絕來自舊任期的過時消息,是保證協議一致性的基礎。
Endpoints leaderEndpoints()
- 作用: 返回當前已知的 Leader 節點的網絡地址信息(IP和端口)。
- 用途: 使得 Follower 節點和客戶端能夠知道將請求發送到哪里。如果當前 Leader 未知(例如,在選舉期間),則會返回一個空的?
Endpoints
?對象。
String name()
- 作用: 返回一個用戶友好的狀態名稱字符串,例如 "Follower", "Candidate", "Leader", "Resigned" 等。
- 用途: 主要用于日志記錄、監控(Metrics)和調試,方便開發和運維人員快速了解節點當前所處的狀態。
總結
EpochState
?接口是 Kafka Raft 庫中一個設計精良的核心抽象。它通過定義一組通用的行為和屬性,成功地統一了 Raft 協議中各種復雜的狀態。
- 封裝性與模塊化: 它將不同狀態的共性行為(如投票、提供 epoch 信息)封裝起來,使得上層邏輯可以面向接口編程,降低了代碼的耦合度。
- 資源管理: 通過繼承?
Closeable
,它強制實現類考慮資源釋放問題,利用?try-with-resources
?機制保證了如定時器等資源的安全性,避免了資源泄漏。 - 不變性與線程安全: 像?
election()
?方法返回不可變對象的設計,體現了對并發環境下狀態一致性和線程安全的重視。
總而言之,EpochState
?為構建一個健壯、可維護、易于理解的 Raft 實現奠定了堅實的基礎。
ElectionState
?
// ... existing code ...
/*** Encapsulate election state stored on disk after every state change.*/
public final class ElectionState {
// ... existing code ...
正如類注釋所說,ElectionState
?的核心作用是封裝每次狀態變更后需要存儲在磁盤上的選舉狀態。
public final class ElectionState
:final
: 意味著這個類不能被繼承。這通常用于創建不可變 (Immutable) 對象。一旦?ElectionState
?對象被創建,它的內部狀態(如 epoch、leaderId 等)就不能再被修改。這在并發環境中是非常重要的,因為它天然是線程安全的,可以被自由共享而無需擔心數據被意外篡改。
Raft 協議要求節點必須持久化一些關鍵狀態,以便在節點重啟后能夠恢復,并保證協議的安全性。ElectionState
?正是這些需要持久化的核心選舉數據的在內存中的體現。
ElectionState
?封裝了 Raft 選舉的四個關鍵信息:
// ... existing code ...
public final class ElectionState {private static final int UNKNOWN_LEADER_ID = -1;private static final int NOT_VOTED = -1;private final int epoch;private final OptionalInt leaderId;private final Optional<ReplicaKey> votedKey;// This is deprecated. It is only used when writing version 0 of the quorum state fileprivate final Set<Integer> voters;
// ... existing code ...
epoch
:?int
?類型,代表 Raft 的任期號。這是 Raft 協議的邏輯時鐘,用于識別過時的請求和保證一致性。leaderId
:?OptionalInt
?類型,代表當前任期的 Leader 節點 ID。它是一個?Optional
,因為在選舉期間可能還沒有選出 Leader。votedKey
:?Optional<ReplicaKey>
?類型,代表在當前任期中,本節點投票給了哪個候選人。ReplicaKey
?不僅包含節點 ID,還包含一個目錄 ID (directoryId
),這是為了支持 JBOD (Just a Bunch Of Disks) 架構,確保在同一臺物理機上運行的多個 Kraft 節點(使用不同磁盤)能被唯一標識。voters
:?Set<Integer>
?類型,代表當前參與投票的節點集合。注釋明確指出這個字段已被棄用,僅為了兼容舊版本(version 0)的?quorum-state
?文件格式而保留。新版本中,Voter Set 的管理已經和?ElectionState
?分離。
對象的創建方式 (靜態工廠方法)
ElectionState
?的構造函數是包級私有的,外部無法直接?new
。它通過一系列靜態工廠方法來創建實例,這種方式使得代碼更具可讀性,因為方法名清晰地描述了所創建對象的狀態。
ElectionState.withVotedCandidate(int epoch, ReplicaKey votedKey, Set<Integer> voters)
- 創建一個表示已投票給某個候選人但尚未選出 Leader?的狀態。此時?
leaderId
?為空。 - 例如,當一個節點作為 Candidate 啟動并首先給自己投票時,就會處于這個狀態。
- 創建一個表示已投票給某個候選人但尚未選出 Leader?的狀態。此時?
ElectionState.withElectedLeader(int epoch, int leaderId, Optional<ReplicaKey> votedKey, Set<Integer> voters)
- 創建一個表示已成功選舉出 Leader?的狀態。
- 例如,當一個 Follower 收到 Leader 的心跳時,或一個 Candidate 贏得選舉時,就會進入這個狀態。
ElectionState.withUnknownLeader(int epoch, Set<Integer> voters)
- 創建一個表示Leader 未知且尚未投票的狀態。
- 例如,當一個 Follower 的選舉計時器超時,進入新一輪選舉的初始階段時,就會是這個狀態。
ElectionState.fromQuorumStateData(QuorumStateData data)
- 這是反序列化的入口,從一個?
QuorumStateData
?對象(從磁盤讀取的數據結構)恢復成一個?ElectionState
?內存對象。
- 這是反序列化的入口,從一個?
序列化與持久化
ElectionState
?是內存中的對象,它需要被轉換成可持久化的格式。toQuorumStateData
?方法就承擔了這個責任。
// ... existing code ...public QuorumStateData toQuorumStateData(short version) {QuorumStateData data = new QuorumStateData().setLeaderEpoch(epoch).setLeaderId(leaderIdOrSentinel()).setVotedId(votedKey.map(ReplicaKey::id).orElse(NOT_VOTED));if (version == 0) {List<QuorumStateData.Voter> dataVoters = voters.stream().map(voterId -> new QuorumStateData.Voter().setVoterId(voterId)).collect(Collectors.toList());data.setCurrentVoters(dataVoters);} else if (version == 1) {data.setVotedDirectoryId(votedKey.flatMap(ReplicaKey::directoryId).orElse(ReplicaKey.NO_DIRECTORY_ID));} else {
// ... existing code ...}return data;}
// ... existing code ...
這段代碼清晰地展示了版本兼容性處理:
- 對于?
version 0
: 它會序列化?voters
?列表,但不會序列化?votedDirectoryId
。 - 對于?
version 1
: 它不再序列化?voters
?列表,轉而序列化?votedDirectoryId
。
這種設計使得 KRaft 協議可以在不停機的情況下進行升級和演進。
關鍵方法分析
isVotedCandidate(ReplicaKey nodeKey)
// ... existing code ... public boolean isVotedCandidate(ReplicaKey nodeKey) {if (nodeKey.id() < 0) {throw new IllegalArgumentException("Invalid node key " + nodeKey);} else if (votedKey.isEmpty()) {return false;} else if (votedKey.get().id() != nodeKey.id()) {return false;} else if (votedKey.get().directoryId().isEmpty()) {// when the persisted voted directory id is not present assume that we voted for this candidate;// this happens when the kraft version is 0.return true;}return votedKey.get().directoryId().equals(nodeKey.directoryId()); } // ... existing code ...
這個方法用于檢查當前節點是否投票給了?
nodeKey
?所代表的候選人。它的邏輯同樣體現了向后兼容:如果持久化的?votedKey
?中沒有?directoryId
(這發生在從?version 0
?的狀態文件恢復時),它會默認匹配成功,只比較?id
。否則,它會同時比較?id
?和?directoryId
。leaderIdOrSentinel()
// ... existing code ... public int leaderIdOrSentinel() {return leaderId.orElse(UNKNOWN_LEADER_ID); } // ... existing code ...
這個方法提供了一種安全的獲取?
leaderId
?的方式。如果 Leader 不存在,它不會拋出異常,而是返回一個哨兵值?-1
。這在序列化到?QuorumStateData
?時非常有用,因為協議消息格式通常要求一個整數字段而不是?Optional
。
在系統中的角色
ElectionState
?并不是孤立存在的,它與?EpochState
?的實現類(如?CandidateState
,?FollowerState
?等)和?QuorumState
?緊密協作。
EpochState
?->?ElectionState
: 各種代表 Raft 節點當前狀態的?EpochState
?實現,在其?election()
?方法中,會根據自身狀態(如自己是不是 Leader,投票給了誰)構建一個對應的?ElectionState
?對象。例如,在?
CandidateState
?中:// ... existing code ... @Override public ElectionState election() {return ElectionState.withVotedCandidate(epoch,ReplicaKey.of(localId, localDirectoryId),epochElection.voterIds()); } // ... existing code ...
CandidateState
?總是認為自己是候選人,并且已經給自己投了票,所以它創建了一個?withVotedCandidate
?的?ElectionState
。QuorumState
?使用?ElectionState
:?QuorumState
?是管理 Raft 狀態切換和持久化的核心類。當 Raft 狀態發生改變時(例如,從 Follower 變成 Candidate),QuorumState
?會從新的?EpochState
?對象獲取?ElectionState
,然后調用?QuorumStateStore
?將其寫入磁盤,完成狀態的持久化。可以在很多測試用例中看到這個流程,比如:
// ... existing code ... store.writeElectionState(ElectionState.withElectedLeader(logEndEpoch, leader.id(), Optional.empty(), voters.voterIds()),kraftVersion ); // ... existing code ...
總結
ElectionState
?是 Kafka Raft 實現中一個設計精巧且至關重要的類。它是一個不可變的、代表持久化選舉狀態的值對象。
- 封裝核心狀態: 它精確地封裝了 Raft 選舉所需的最少但關鍵的信息:
epoch
,?leaderId
,?votedKey
。 - 保證協議安全: 通過持久化這些狀態,確保了節點在崩潰重啟后不會違反 Raft 的安全原則(例如,在同一個任期內投票給多個候選人)。
- 促進代碼清晰: 使用靜態工廠方法和不可變性,使得代碼更易于理解和維護,并保證了線程安全。
- 支持協議演進: 通過版本化的序列化/反序列化邏輯,支持了 KRaft 協議的平滑升級。
FollowerState
?
public class FollowerState implements EpochState
?定義了一個實現了?EpochState
?接口的類。在 Raft 協議中,一個節點絕大多數時間都處于 Follower 狀態。
Follower 的核心職責是:
- 被動地接收并復制來自 Leader 的日志條目。
- 響應 Leader 的心跳請求,以表明自己還存活。
- 如果在一個“選舉超時”周期內沒有收到 Leader 的任何消息,則認為 Leader 已失效,并將自己的狀態轉換為 Candidate,發起新一輪選舉。
FollowerState
?類就封裝了作為 Follower 時的所有狀態數據和行為邏輯。
// ... existing code ...
public class FollowerState implements EpochState {private final Logger log;private final int fetchTimeoutMs;private final int epoch;private final int leaderId;private final Endpoints leaderEndpoints;private final Optional<ReplicaKey> votedKey;private final Set<Integer> voters;// Used for tracking the expiration of both the Fetch and FetchSnapshot requestsprivate final Timer fetchTimer;// Used to track when to send another update voter requestprivate final Timer updateVoterPeriodTimer;/* Used to track if the replica has fetched successfully from the leader at least once since* the transition to follower in this epoch. If the replica has not yet fetched successfully,* it may be able to grant PreVotes.*/private boolean hasFetchedFromLeader = false;private Optional<LogOffsetMetadata> highWatermark;/* For kraft.version 0, track if the leader has received updated voter information from this* follower.*/private boolean hasUpdatedLeader = false;/* Used to track the currently fetching snapshot. When fetching snapshot regular Fetch request* are paused*/private Optional<RawSnapshotWriter> fetchingSnapshot = Optional.empty();
// ... existing code ...
epoch
,?leaderId
,?leaderEndpoints
,?votedKey
,?voters
: 這些是基本的選舉信息,定義了當前任期、公認的 Leader 以及投票狀態。fetchTimer
: 這是一個至關重要的選舉定時器。Follower 每次收到 Leader 的有效消息(如?Fetch
?或?FetchSnapshot
?請求)時,都會重置這個定時器。如果定時器超時,就意味著與 Leader "失聯"。hasFetchedFromLeader
: 一個布爾標記,用于記錄在當前任期內,是否已經成功地從 Leader 獲取過數據。這個標記在處理預投票(Pre-Vote)時有特殊作用。highWatermark
: Follower 所知的、已被集群提交的日志的最高位移。它由 Leader 在心跳消息中告知 Follower。fetchingSnapshot
: 如果 Follower 的日志落后 Leader 太多,它會通過接收快照來快速追趕。這個字段就用于管理正在接收的快照的狀態。updateVoterPeriodTimer
,?hasUpdatedLeader
: 這兩個字段主要用于兼容舊版本的 KRaft 協議,處理 Voter 集合的更新邏輯。
心跳與選舉超時 (fetchTimer
)
這是 Follower 狀態的核心機制。
hasFetchTimeoutExpired(long currentTimeMs)
: 檢查?fetchTimer
?是否已經超時。如果返回?true
,外部邏輯(QuorumState
)就會將節點狀態從 Follower 切換到 Candidate,并發起選舉。resetFetchTimeoutForSuccessfulFetch(long currentTimeMs)
: 當 Follower 成功收到 Leader 的消息后,會調用此方法。它會重置?fetchTimer
,并設置?hasFetchedFromLeader = true
。這相當于一次“心跳續約”。
// ... existing code ...public boolean hasFetchTimeoutExpired(long currentTimeMs) {fetchTimer.update(currentTimeMs);return fetchTimer.isExpired();}public void resetFetchTimeoutForSuccessfulFetch(long currentTimeMs) {fetchTimer.update(currentTimeMs);fetchTimer.reset(fetchTimeoutMs);hasFetchedFromLeader = true;}
// ... existing code ...
投票邏輯 (canGrantVote
)
Follower 的投票邏輯非常嚴格,因為它已經承認了一個 Leader。
// ... existing code ...@Overridepublic boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolean isPreVote) {if (isPreVote && !hasFetchedFromLeader && isLogUpToDate) {return true;}log.debug("Rejecting Vote request (preVote={}) from replica ({}) since we are in FollowerState with leader {} in " +"epoch {}, hasFetchedFromLeader={}, replica's log is up-to-date={}",isPreVote,replicaKey,leaderId,epoch,hasFetchedFromLeader,isLogUpToDate);return false;}
// ... existing code ...
- 通常情況: 對于任何標準的投票請求 (
isPreVote = false
),Follower 都會拒絕。 - 特殊情況 (Pre-Vote): Follower?只有在滿足以下所有條件時,才會同意一個預投票請求:
- 這是一個預投票請求 (
isPreVote = true
)。 - 在當前任期內,尚未成功從 Leader 處獲取過任何數據 (
!hasFetchedFromLeader
)。 - 請求者的日志至少和自己一樣新 (
isLogUpToDate = true
)。
- 這是一個預投票請求 (
這個設計的目的是處理一種特殊場景:一個網絡分區后剛剛恢復的節點,它可能還處于 Follower 狀態,但它的 Leader 實際上已經失效了。通過這個機制,它可以響應其他節點的預投票,從而幫助集群在不增加任期號(epoch)的情況下,確認是否可以開始一次有效的選舉。一旦它成功從 Leader 獲取數據 (hasFetchedFromLeader
?變為?true
),它就會堅定地跟隨當前 Leader,拒絕所有投票請求。
isPreVote
:什么是預投票 (Pre-Vote)?
預投票是 Raft 協議的一個重要優化。
- 問題場景: 想象一個節點因為網絡問題被隔離了。在它被隔離期間,集群的其他節點選舉出了新的 Leader,任期號(epoch)也增加了。當這個被隔離的節點網絡恢復后,它的選舉計時器會超時,然后它會立即增加自己的任期號并發起一次正式選舉。
- 帶來的麻煩: 這次選舉是注定要失敗的,因為它的日志是落后的。但它的高任期號投票請求會傳播到集群,導致當前的合法 Leader 收到更高任期號的請求后,降級為 Follower,從而引發一次不必要的集群抖動和短暫的不可用。
- 預投票的解決方案: 節點在發起正式選舉前,先發起一輪預投票。預投票不會增加任期號,它只是詢問其他節點:“如果我發起選舉,你們會投給我嗎?”。只有在獲得大多數節點的預投票同意后,它才會真正增加任期號并發起正式選舉。
這樣,上面場景中的那個恢復節點在發起預投票時,就會被其他節點拒絕,因為它無法獲得多數票,也就不會發起那次具有破壞性的正式選舉了。
!hasFetchedFromLeader
:為什么關心“是否已從 Leader 獲取過數據”?
hasFetchedFromLeader
?是一個布爾標記,它在?FollowerState
?被創建時為?false
。只有當這個 Follower 成功地從當前任期的 Leader 那里收到了心跳(Fetch
?或?FetchSnapshot
?請求)后,這個標記才會被設置為?true
。
// ... existing code ...public void resetFetchTimeoutForSuccessfulFetch(long currentTimeMs) {fetchTimer.update(currentTimeMs);fetchTimer.reset(fetchTimeoutMs);hasFetchedFromLeader = true;}
// ... existing code ...
這個標記代表了 Follower 對當前 Leader 的“信任程度”。
!hasFetchedFromLeader
?(false
): “我知道?leaderId
?是誰,但我還沒收到過它的消息。我不確定它是否真的存活并且能聯系到我。”hasFetchedFromLeader
?(true
): “我剛剛還跟 Leader 通過信,我很確定它活得好好的。”
那么hasFetchedFromLeader
在什么情況下會回到 false 呢?答案是:當 Raft 節點的狀態發生改變,導致需要創建一個新的 FollowerState 對象時。
FollowerState 實現了 EpochState 接口,它的生命周期與一個特定的 任期(Epoch) 緊密綁定。當任期發生變化時,舊的 EpochState 對象(無論是 FollowerState, CandidateState 還是 LeaderState)會被丟棄,并根據新的情況創建一個全新的 EpochState 對象。
考慮以下幾種場景:
- 選舉超時: 當前的 FollowerState 的選舉計時器超時了。節點會轉換到 CandidateState,這個 FollowerState 對象就被廢棄了。如果選舉失敗,又發現了一個新的 Leader,那么系統會創建一個新的 FollowerState 對象來跟隨這個新 Leader。在這個新創建的對象里,hasFetchedFromLeader 重新被初始化為 false。
- 發現更高任期的 Leader: Follower 收到一個來自更高任期(epoch)的 Leader 的消息。它會立即放棄當前的 FollowerState,并為這個新的、更高的任期創建一個新的 FollowerState 對象。同樣,在這個新對象里,hasFetchedFromLeader 也是 false。
isLogUpToDate
:Raft 的基本安全原則
這是 Raft 協議的基礎。一個節點只會把票投給日志記錄至少和自己一樣“新”的候選人,以確保不會丟失任何已提交的數據。
綜合分析:三個條件組合的智慧
現在我們把這三個條件組合起來看?if (isPreVote && !hasFetchedFromLeader && isLogUpToDate)
:
這個判斷覆蓋了一種非常特殊但重要的邊界情況: 一個節點剛剛進入 Follower 狀態,它知道 Leader 是誰,但在它收到來自這個 Leader 的第一次心跳之前的這個短暫窗口期,它其實并不完全確定這個 Leader 的有效性。
如果?
hasFetchedFromLeader
?是?true
: 這意味著 Follower 已經和 Leader 建立了穩定的通信。它堅信 Leader 是存活的,因此它會拒絕所有的投票/預投票請求,忠誠地跟隨當前 Leader。這可以從下面的測試用例中得到驗證,一旦調用了?resetFetchTimeoutForSuccessfulFetch
,canGrantVote
?就會一直返回?false
。// ... existing code ... @ParameterizedTest @ValueSource(booleans = {true, false}) public void testPreVoteAfterSuccessfulFetchFromLeader(boolean isLogUpToDate) {FollowerState state = newFollowerState(Set.of(1, 2, 3));state.resetFetchTimeoutForSuccessfulFetch(time.milliseconds());assertFalse(state.canGrantVote(ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true)); // ... existing code ... }
如果?
hasFetchedFromLeader
?是?false
: 在這個時間窗口內,如果收到了一個預投票請求,并且對方的日志是更新的,那么 Follower 會想:“我現在的 Leader 還沒聯系過我,可能它剛當選就掛了。這個發預投票的候選人看起來條件不錯,我先同意它的‘意向投票’也無妨,這不會改變我的任期,很安全。”
總結一下:
這個邏輯的核心是在不破壞 Raft 安全性的前提下,盡可能地提高系統的活性(Liveness)和穩定性。
- 對于標準投票 (
isPreVote = false
): Follower 永遠拒絕,因為它已經認定了本任期的 Leader。 - 對于預投票 (
isPreVote = true
):- 如果已經和 Leader 穩定通信 (
hasFetchedFromLeader = true
),則拒絕預投票,以維護當前 Leader 的穩定性。 - 如果還未和 Leader 建立通信 (
hasFetchedFromLeader = false
),則同意預投票(前提是日志最新),給可能出現問題的集群(如 Leader 選出后立即宕機)一個快速恢復的機會。
- 如果已經和 Leader 穩定通信 (
這是一個非常優雅的權衡,確保了 Follower 既不會輕易地被一個過時的節點干擾,也不會在一個真正需要新選舉的場景下“固執己見”。
高水位更新 (updateHighWatermark
)
Follower 從 Leader 的消息中獲取高水位信息,并用此方法更新本地狀態。
// ... existing code ...public boolean updateHighWatermark(OptionalLong newHighWatermark) {
// ...if (highWatermark.isPresent()) {
// ...} else if (previousHighWatermark > updatedHighWatermark) {throw new IllegalArgumentException(String.format("Non-monotonic update of high watermark from %d to %d",previousHighWatermark,updatedHighWatermark));}
// ...}
// ...return true;}
// ... existing code ...
此方法強制要求高水位的更新必須是單調遞增的。如果嘗試用一個更小的值來更新高水位,會直接拋出異常。這是 Raft 協議保證數據一致性的一個基本安全要求。
快照同步 (fetchingSnapshot
)
當需要通過快照來同步數據時,FollowerState
?會管理這個過程。
// ... existing code ...public void setFetchingSnapshot(Optional<RawSnapshotWriter> newSnapshot) {fetchingSnapshot.ifPresent(RawSnapshotWriter::close);fetchingSnapshot = newSnapshot;}@Overridepublic void close() {fetchingSnapshot.ifPresent(RawSnapshotWriter::close);}
// ... existing code ...
setFetchingSnapshot
?方法用于開始或更新一個快照接收過程,并確保舊的、未完成的快照寫入器被關閉。close()
?方法則確保當?FollowerState
?對象被銷毀時(例如,狀態切換),任何進行中的快照寫入器都能被正確關閉,以釋放文件句柄等資源。
接口實現 (EpochState
)
FollowerState
?實現了?EpochState
?接口的所有方法:
name()
: 返回固定的字符串 "Follower"。epoch()
: 返回當前任期號?epoch
。leaderEndpoints()
: 返回已知的 Leader 的網絡地址。election()
: 返回一個?ElectionState
?對象,該對象明確指出 Leader 是誰。// ... existing code ... @Override public ElectionState election() {return ElectionState.withElectedLeader(epoch, leaderId, votedKey, voters); } // ... existing code ...
highWatermark()
: 返回當前已知的高水位。canGrantVote()
: 實現了 Follower 特有的投票邏輯(如上文分析)。close()
: 實現了資源清理邏輯(如上文分析)。
總結
FollowerState
?是 KRaft 中對 Raft Follower 角色的精確建模。它不僅僅是一個簡單的數據容器,而是一個包含了復雜狀態和邏輯的能動對象。
- 核心職責: 它的核心是選舉定時器 (
fetchTimer
),驅動了 Raft 的活性(liveness)—— 確保當 Leader 失效時,集群能夠及時發起新的選舉。 - 保證安全: 它通過嚴格的投票邏輯 (
canGrantVote
) 和高水位單調性檢查 (updateHighWatermark
),保證了 Raft 協議的安全性(safety)—— 不會選出錯誤的 Leader,也不會提交未被確認的數據。 - 狀態封裝: 它良好地封裝了作為 Follower 所需的所有信息和行為,使得上層的狀態機(
QuorumState
)可以清晰地進行狀態轉換和管理。
CandidateState
它是在 KRaft (Kafka Raft) 協議中代表**候選人(Candidate)**角色的核心實現,是整個選舉過程的發起者和驅動者。
public class CandidateState implements NomineeState {
//...
}
CandidateState
?實現了?NomineeState
?接口,表明它是一個“提名”狀態,即正在爭取成為 Leader 的狀態(另一個實現是?ProspectiveState
)。
在 Raft 協議中,當一個 Follower 的選舉計時器超時,或者一個節點剛啟動時,它就會轉變為 Candidate 狀態。
Candidate 的核心職責是:
- 增加任期號(epoch)。
- 給自己投一票。
- 向集群中所有其他 Voter 發送投票請求(
VoteRequest
)。 - 等待并處理其他節點的響應,直到以下三種情況之一發生:
- 贏得選舉: 獲得超過半數節點的投票,成為 Leader。
- 選舉失敗: 收到來自更高任期的 Leader 的消息,轉變為 Follower。
- 選舉超時: 在一輪選舉中票數被瓜分,沒有任何節點獲得多數票,選舉超時后開始新一輪選舉。
CandidateState
?類就封裝了作為 Candidate 時的所有狀態數據和行為邏輯。
// ... existing code ...
public class CandidateState implements NomineeState {private final int localId;private final Uuid localDirectoryId;private final int epoch;private final EpochElection epochElection;private final Optional<LogOffsetMetadata> highWatermark;private final int electionTimeoutMs;private final Timer electionTimer;private final Logger log;
// ... existing code ...
localId
,?localDirectoryId
: 標識當前節點自身的 ID。epoch
: 當前的任期號。Candidate 狀態總是與一個新增加的任期號相關聯。epochElection
: 這是一個至關重要的輔助類,專門用于跟蹤本輪選舉的票數。它內部記錄了哪些節點投了贊成票,哪些投了反對票,并能判斷是否已獲得多數票(isVoteGranted()
)或選舉是否已失敗(isVoteRejected()
)。electionTimer
:?選舉計時器。Candidate 會在?electionTimeoutMs
?時間內等待選舉結果。如果計時器超時,本輪選舉就失敗了。highWatermark
: 候選人所知的、已被提交的日志的最高位移。這個信息會包含在投票請求中,用于讓其他節點判斷該候選人的日志是否足夠新。
構造與初始化
CandidateState
?的構造函數揭示了它被創建時的關鍵動作:
// ... existing code ...protected CandidateState(Time time,int localId,Uuid localDirectoryId,int epoch,VoterSet voters,Optional<LogOffsetMetadata> highWatermark,int electionTimeoutMs,LogContext logContext) {
// ... existing code ...this.electionTimer = time.timer(electionTimeoutMs);
// ... existing code ...this.epochElection = new EpochElection(voters.voterKeys());epochElection.recordVote(localId, true);}
// ... existing code ...
- 啟動選舉計時器:?
this.electionTimer = time.timer(electionTimeoutMs);
- 初始化計票器:?
this.epochElection = new EpochElection(voters.voterKeys());
- 給自己投票:?
epochElection.recordVote(localId, true);
這完美地復現了 Raft 協議的規定:一旦成為候選人,立即開始計時,并首先給自己投一票。
投票管理 (recordGrantedVote
,?recordRejectedVote
)
這兩個方法用于記錄從其他節點收到的投票結果。
// ... existing code ...@Overridepublic boolean recordGrantedVote(int remoteNodeId) {if (epochElection().isRejectedVoter(remoteNodeId)) {throw new IllegalArgumentException("Attempt to grant vote from node " + remoteNodeId +" which previously rejected our request");}return epochElection().recordVote(remoteNodeId, true);}@Overridepublic boolean recordRejectedVote(int remoteNodeId) {if (epochElection().isGrantedVoter(remoteNodeId)) {throw new IllegalArgumentException("Attempt to reject vote from node " + remoteNodeId +" which previously granted our request");}return epochElection().recordVote(remoteNodeId, false);}
// ... existing code ...
它們通過調用?epochElection
?來更新票數。同時,它們包含健壯性檢查,防止一個節點先投了贊成票又投反對票(或反之),確保投票的不可撤銷性。
選舉結果判斷 (epochElection
)
CandidateState
?本身不直接判斷選舉結果,而是將這個任務委托給?epochElection
?對象。外部的狀態機(如?QuorumState
)會通過調用?candidateState.epochElection().isVoteGranted()
?來檢查是否贏得了選舉。
// ... existing code ...CandidateState candidateState = candidateStateOrThrow();if (!candidateState.epochElection().isVoteGranted())throw new IllegalStateException("Cannot become leader without majority votes granted");// ... transition to LeaderState ...LeaderState<T> state = new LeaderState<>(
// ... existing code ...
如果?isVoteGranted()
?返回?true
,QuorumState
?就會將狀態轉換為?LeaderState
。
選舉超時 (hasElectionTimeoutExpired
)
這個方法檢查?electionTimer
?是否超時。如果超時,外部狀態機將把狀態從?Candidate
?轉換到?Prospective
,然后通常會經過一個隨機退避(backoff)時間后,再重新發起新一輪選舉。
響應他人投票請求 (canGrantVote
)
作為 Candidate,它已經把票投給了自己。那么它如何回應其他候選人的投票請求呢?
// ... existing code ...@Overridepublic boolean canGrantVote(ReplicaKey replicaKey,boolean isLogUpToDate,boolean isPreVote) {if (isPreVote && isLogUpToDate) {return true;}// Reject standard vote requests even if replicaId = localId, although the replica votes for// itself, this vote is implicit and not "granted".log.debug(
// ... existing code ...);return false;}
// ... existing code ...
- 對于標準投票 (
isPreVote = false
):?一律拒絕。因為在同一個任期?epoch
?內,一個節點只能投一票,而它已經投給了自己。 - 對于預投票 (
isPreVote = true
): 如果請求者的日志至少和自己一樣新 (isLogUpToDate
),則可以同意。這是因為預投票不改變任期號,也不會改變自己的投票承諾。同意預投票是一種合作行為,有助于集群更快地發現并選舉出最合適的 Leader,避免因競爭導致選舉超時。
狀態轉換與生命周期
類頂部的注釋清晰地描述了?CandidateState
?的生命周期:
- 開始: 發送投票請求,并記錄響應。
- 選舉成功: 如果獲得多數贊成票 (
epochElection.isVoteGranted()
),則轉換到?LeaderState
。 - 選舉失敗: 如果獲得多數反對票 (
epochElection.isVoteRejected()
),則轉換到?ProspectiveState
,并進入退避階段。 - 選舉超時: 如果計時器超時 (
electionTimer.isExpired()
),則立即轉換到?ProspectiveState
。
總結
CandidateState
?是 KRaft 選舉機制的核心驅動力。它是一個主動、有明確目標的臨時狀態。
- 主動性: 與被動的?
FollowerState
?不同,CandidateState
?主動發起選舉,推動集群狀態向前演進。 - 封裝性: 它良好地封裝了作為候選人所需的所有邏輯,包括給自己投票、管理選舉計時器、記錄票數和決定如何響應其他投票請求。
- 安全性: 它嚴格遵守 Raft 的投票規則(一任期一票),并通過?
canGrantVote
?的邏輯確保不會破壞選舉的安全性。 - 協作性: 通過對預投票的特殊處理,它又表現出一定的協作性,有助于提高選舉效率。
理解了?CandidateState
,就等于理解了 KRaft 集群在 Leader 失效后是如何發起自愈過程并選舉出新領導者的。
EpochElection
?
專門用于在 KRaft 選舉期間為某個特定的任期(Epoch)跟蹤和計算選票。它被?CandidateState
?和?ProspectiveState
?用來管理選舉過程。
/*** Tracks the votes cast by voters in an election held by a Nominee.*/
public class EpochElection {
//...
}
EpochElection
?的定位是一個計票器。在 Raft 協議中,當一個節點成為候選人(Candidate)并發起選舉時,它需要:
- 知道總共有哪些投票人(Voters)。
- 記錄每個投票人是投了贊成票、反對票,還是尚未投票。
- 實時判斷自己是否已經獲得了超過半數的贊成票(選舉成功)。
- 實時判斷自己是否已經收到了足夠多的反對票,以至于不可能再獲勝(選舉失敗)。
EpochElection
?類就是為了解決這些問題而設計的。它將計票的復雜邏輯從?CandidateState
?中剝離出來,使得?CandidateState
?可以更專注于自身的狀態轉換,遵循了單一職責原則。
在?
EpochElection
?這個類的上下文中,所有的“贊成”(granted)和“反對”(rejected)都是相對于發起這次選舉的那個候選人(Candidate)而言的。
讓我們把這個概念放在 Raft 協議的流程中來理解:
選舉開始: 節點 A 因為選舉超時,決定成為候選人(Candidate)。它會創建一個?
CandidateState
?對象。創建計票器: 在?
CandidateState
?的構造函數中,會立即創建一個?EpochElection
?對象。這個?EpochElection
?對象是專屬于節點 A 在當前這個任期(Epoch)的選舉的。// ... existing code ... protected CandidateState(// ... ) {// ...this.epochElection = new EpochElection(voters.voterKeys());// 候選人首先給自己投一票贊成票epochElection.recordVote(localId, true); } // ... existing code ...
發送投票請求: 節點 A 會向集群中其他所有投票人(Voters)發送?
VoteRequest
?消息,請求它們為自己投票。接收并記錄投票:
- 當節點 A 收到來自節點 B 的?
VoteResponse
,表示同意投票時,節點 A 會調用?epochElection.recordVote(nodeB_id, true)
。這里的?true
?意味著“節點 B?贊成我成為 Leader”。 - 當節點 A 收到來自節點 C 的?
VoteResponse
,表示拒絕投票時,節點 A 會調用?epochElection.recordVote(nodeC_id, false)
。這里的?false
?意味著“節點 C?反對我成為 Leader”。
- 當節點 A 收到來自節點 B 的?
判斷結果:
epochElection.isVoteGranted()
?判斷的是:“贊成我成為 Leader 的票數是否過半?”epochElection.isVoteRejected()
?判斷的是:“反對我成為 Leader 的票數是否已經多到讓我不可能獲勝了?”
核心數據結構 (VoterState
)
EpochElection
?的核心是內部私有類?VoterState
,它為每一個投票人維護一個狀態。
// ... existing code ...private static final class VoterState {private final ReplicaKey replicaKey;private State state = State.UNRECORDED;// ... existing code ...enum State {UNRECORDED,GRANTED,REJECTED}
// ... existing code ...}
}
VoterState
?封裝了每個投票人的?ReplicaKey
(ID 和目錄 ID)和投票狀態?state
。State
?是一個枚舉,有三種可能的值:UNRECORDED
: 未收到投票,這是初始狀態。GRANTED
: 已投贊成票。REJECTED
: 已投反對票。
EpochElection
?類中持有一個?Map<Integer, VoterState>
,通過投票人的 ID 快速查找其投票狀態。
// ... existing code ...
public class EpochElection {private Map<Integer, VoterState> voterStates;
// ... existing code ...
}
構造與初始化
EpochElection
?在構造時接收一個包含所有投票人(Voters)的?Set<ReplicaKey>
。
// ... existing code ...public EpochElection(Set<ReplicaKey> voters) {this.voterStates = voters.stream().collect(Collectors.toMap(ReplicaKey::id,VoterState::new));}
// ... existing code ...
它會遍歷這個集合,為每個投票人創建一個?VoterState
?對象,并以投票人 ID 為鍵,存入?voterStates
?這個 Map 中。此時,所有?VoterState
?的內部狀態都是默認的?UNRECORDED
。
記錄選票 (recordVote
)
這是更新計票結果的入口。
// ... existing code ...public boolean recordVote(int voterId, boolean isGranted) {VoterState voterState = getVoterStateOrThrow(voterId);boolean wasUnrecorded = voterState.state == VoterState.State.UNRECORDED;voterState.setState(isGranted ? VoterState.State.GRANTED : VoterState.State.REJECTED);return wasUnrecorded;}
// ... existing code ...
- 它接收?
voterId
?和一個布爾值?isGranted
(true
?代表贊成,false
?代表反對)。 - 它會更新對應?
VoterState
?的狀態。 - 返回值非常關鍵: 它返回?
true
?當且僅當這是第一次記錄該投票人的投票(即之前的狀態是?UNRECORDED
)。如果重復記錄同一個人的投票,會更新狀態但返回?false
。
recordVote 的實現允許一個投票從 GRANTED 變為 REJECTED。這看起來似乎不安全,因為 Raft 協議規定在一個任期內,一個節點只能投一次票。
這是分層設計的體現:
- EpochElection 作為一個底層的計票器,它的職責很簡單:忠實記錄收到的最新投票狀態。
- 而**保證“一個 voter 在一輪選舉中不能改變主意”**這個業務規則的責任,交給了它的調用者——CandidateState。在 CandidateState 中有檢查邏輯,如果一個已經投了贊成票的節點又發來反對票,會直接拋出異常,而不會去調用 epochElection.recordVote。
判斷選舉是否勝利 (isVoteGranted
)
這是判斷選舉是否成功的核心方法。
// ... existing code ...public boolean isVoteGranted() {return numGranted() >= majoritySize();}private long numGranted() {return votersOfState(VoterState.State.GRANTED).count();}private int majoritySize() {return voterStates.size() / 2 + 1;}
// ... existing code ...
邏輯非常清晰:
- 計算獲得贊成票(
GRANTED
)的數量?numGranted()
。 - 計算贏得選舉需要的多數票數量?
majoritySize()
,即?(總票數 / 2) + 1
。 - 如果贊成票數大于或等于多數票,返回?
true
,表示選舉成功。
判斷選舉是否失敗 (isVoteRejected
)
這是一個重要的優化,讓候選人能盡早知道選舉失敗,而無需等待超時。
// ... existing code ...public boolean isVoteRejected() {return numGranted() + numUnrecorded() < majoritySize();}private long numUnrecorded() {return votersOfState(VoterState.State.UNRECORDED).count();}
// ... existing code ...
這個邏輯稍微有些繞,但非常精妙。它的意思是:?“已獲得的贊成票數” + “尚未投票的票數” < “贏得選舉需要的多數票數”
換句話說,即使所有尚未投票的人都投贊成票,總贊成票數也無法達到多數。在這種情況下,選舉已經不可能獲勝了,可以判定為失敗。
從測試用例中可以看到這個邏輯:在一個3節點的集群中,當收到2個反對票時,isVoteRejected
?變為?true
。
// ... existing code ...// recording majority as rejectedassertTrue(epochElection.recordVote(voter1 + 1, false));
// ... existing code ...assertEquals(2, epochElection.rejectingVoters().size());
// ... existing code ...assertFalse(epochElection.isVoteGranted());assertTrue(epochElection.isVoteRejected());
// ... existing code ...
查詢投票者狀態
EpochElection
?還提供了一系列方法來查詢不同狀態的投票人集合,這對于日志記錄和調試非常有用。
grantingVoters()
: 返回所有投了贊成票的投票人 ID。rejectingVoters()
: 返回所有投了反對票的投票人 ID。unrecordedVoters()
: 返回所有尚未投票的投票人。
總結
EpochElection
?是一個設計精良的輔助類,它體現了軟件工程中的多個優秀實踐:
- 單一職責: 它只做一件事——計票,并且做得很好。這使得調用它的?
CandidateState
?代碼更簡潔、更關注于狀態機本身。 - 封裝性: 它將計票的內部實現(
VoterState
,?Map
?等)完全隱藏,只對外暴露清晰、易于理解的接口(recordVote
,?isVoteGranted
,?isVoteRejected
)。 - 邏輯清晰: 無論是判斷選舉成功還是提前判斷選舉失敗,其核心邏輯都直接且高效,完美地實現了 Raft 協議對選舉計票的要求。
- 優化:?
isVoteRejected
?方法是一個很好的例子,它通過提前判斷失敗來避免不必要的等待,提高了系統在選舉競爭激烈時的恢復速度。
通過?EpochElection
,KRaft 的選舉實現變得更加模塊化、健壯和易于理解。
LeaderState
LeaderState
?實現了?EpochState
?接口,代表一個節點在特定任期(Epoch)內作為 Leader 的狀態。一旦一個 Candidate 獲得多數選票,它就會轉換到?LeaderState
。
Leader 的核心職責是:
- 處理客戶端請求: 接收來自客戶端的寫請求,將它們作為日志條目(Log Entry)寫入自己的日志中。
- 日志復制: 將新的日志條目通過?
AppendEntries
?RPC(在 Kafka 中對應?Fetch
?響應)復制給所有的 Follower。 - 推進高水位(High Watermark): 當一個日志條目被復制到大多數節點上時,Leader 就認為該條目是“已提交”(Committed)的,并更新高水位。
- 維持活性: Leader 需要周期性地向所有 Follower 發送心跳(空的?
AppendEntries
?RPC)來證明自己的存活,并防止 Follower 因選舉超時而發起新的選舉。 - 管理集群成員變更: 安全地增加或移除集群中的節點。
LeaderState
?擁有大量字段來維護其復雜的狀態。
// ... existing code ...
public class LeaderState<T> implements EpochState {
// ... existing code ...private final VoterSet.VoterNode localVoterNode;private final int epoch;private final long epochStartOffset;private final Set<Integer> grantingVoters;private final VoterSet voterSetAtEpochStart;
// ... existing code ...private Optional<LogOffsetMetadata> highWatermark = Optional.empty();private Map<Integer, ReplicaState> voterStates = new HashMap<>();
// ... existing code ...private final Map<ReplicaKey, ReplicaState> observerStates = new HashMap<>();private final Logger log;private final BatchAccumulator<T> accumulator;// The set includes all the followers voters that FETCH or FETCH_SNAPSHOT during the current checkQuorumTimer interval.private final Set<Integer> fetchedVoters = new HashSet<>();private final Timer checkQuorumTimer;private final int checkQuorumTimeoutMs;private final Timer beginQuorumEpochTimer;
// ... existing code ...// This is volatile because resignation can be requested from an external thread.private volatile boolean resignRequested = false;
// ... existing code ...
}
epoch
,?localVoterNode
,?epochStartOffset
: Leader 的基本信息:當前任期、自己的節點信息、任期開始時的日志位移。voterStates
,?observerStates
: 這兩個 Map 是 Leader 管理集群的核心。它們為每一個 Follower(包括 Voter 和 Observer)維護一個?ReplicaState
?對象,用于跟蹤該 Follower 的日志同步進度、最后一次通信時間等信息。highWatermark
:?高水位標記。這是 Raft 安全性的基石。只有位移小于等于高水位的日志才被認為是“已提交”的,可以被應用到狀態機。accumulator
: 一個批處理累加器。Leader 將客戶端的寫請求先放入這個累加器中,然后批量寫入本地日志并發送給 Follower,以提高吞吐量。checkQuorumTimer
,?checkQuorumTimeoutMs
:?法定人數檢查計時器。這是 Leader 的“生命線”。Leader 必須在?checkQuorumTimeoutMs
?時間內收到大多數?Follower 的心跳(Fetch
?請求)。如果超時,說明 Leader 可能與集群的大多數節點失聯,必須主動下臺。fetchedVoters
: 一個集合,用于在?checkQuorumTimer
?的一個周期內,記錄哪些 Follower 已經發來過?Fetch
?請求。beginQuorumEpochTimer
: 用于向尚未確認新 Leader 的 Follower 發送?BeginQuorumEpoch
?請求。addVoterHandlerState
,?removeVoterHandlerState
: 用于處理動態成員變更(增加/移除 Voter)時的狀態。resignRequested
: 一個?volatile
?標志,允許從外部線程安全地請求 Leader 主動辭職。
構造與初始化
當一個節點從?CandidateState
?轉換到?LeaderState
?時,會調用其構造函數進行初始化。
// ... existing code ...protected LeaderState(Time time,VoterSet.VoterNode localVoterNode,int epoch,long epochStartOffset,VoterSet voterSetAtEpochStart,OptionalLong offsetOfVotersAtEpochStart,KRaftVersion kraftVersionAtEpochStart,Set<Integer> grantingVoters,BatchAccumulator<T> accumulator,int fetchTimeoutMs,LogContext logContext,KafkaRaftMetrics kafkaRaftMetrics) {
// ... existing code ...for (VoterSet.VoterNode voterNode: voterSetAtEpochStart.voterNodes()) {boolean hasAcknowledgedLeader = voterNode.isVoter(localVoterNode.voterKey());this.voterStates.put(voterNode.voterKey().id(),new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners()));}
// ... existing code ...// use the 1.5x of fetch timeout to tolerate some network transition time or other IO time.this.checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR);this.checkQuorumTimer = time.timer(checkQuorumTimeoutMs);this.beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2;this.beginQuorumEpochTimer = time.timer(0);
// ... existing code ...}
// ... existing code ...
關鍵初始化步驟:
- 初始化 Follower 狀態: 遍歷當前所有的 Voter,為它們創建?
ReplicaState
?對象并存入?voterStates
。注意,Leader 自己也被視為一個已確認的 Voter。 - 設置 Quorum 檢查超時:?
checkQuorumTimeoutMs
?通常設置為?fetchTimeoutMs
?的 1.5 倍,提供一定的網絡抖動容忍度。并啟動?checkQuorumTimer
。 - 重置高水位: 一個新 Leader 當選后,它不能繼承舊的?
highWatermark
。它必須通過與 Follower 的交互重新建立?highWatermark
,以保證其單調性。
日志復制與高水位(High Watermark)推進
Leader 通過?updateReplicaState
?方法更新 Follower 的日志同步進度。當收到 Follower 的?Fetch
?請求后,Leader 會知道該 Follower 已經復制到了哪個位移。
高水位的計算是 Leader 的核心安全職責。Leader 會在每次有 Follower 的日志位移更新時,重新計算高水位。其邏輯是:將所有 Voter(包括 Leader 自己)的已知日志位移進行排序,取中間位置的那個位移作為新的高水位。例如,在一個5節點的集群中,高水位就是所有節點位移中第?(5/2 + 1) = 3
?大的那個值。這確保了高水位標記的日志條目一定存在于多數節點上。
法定人數檢查(Quorum Check)與活性維持
這是 Leader 的“保活”機制,通過?checkQuorumTimer
?實現。
// ... existing code ...public void updateCheckQuorumForFollowingVoter(ReplicaKey replicaKey, long currentTimeMs) {updateFetchedVoters(replicaKey);// The majority number of the voters. Ex: 2 for 3 voters, 3 for 4 voters... etc.int majority = (voterStates.size() / 2) + 1;// If the leader is in the voter set, it should be implicitly counted as part of the// majority, but the leader will never be a member of the fetchedVoters.// If the leader is not in the voter set, it is not in the majority. Then, the// majority can only be composed of fetched voters.if (voterStates.containsKey(localVoterNode.voterKey().id())) {majority = majority - 1;}if (fetchedVoters.size() >= majority) {fetchedVoters.clear();checkQuorumTimer.update(currentTimeMs);checkQuorumTimer.reset(checkQuorumTimeoutMs);}}
// ... existing code ...
- 當 Leader 收到一個 Follower 的?
Fetch
?請求時,調用?updateCheckQuorumForFollowingVoter
。 - 該 Follower 的 ID 被加入?
fetchedVoters
?集合。 - 計算出達成多數需要的 Follower 數量(總數/2 + 1,再減去 Leader 自己)。
- 如果?
fetchedVoters
?的數量達到了這個多數,說明 Leader 與大多數 Follower 通信正常。此時,清空?fetchedVoters
?集合,并重置?checkQuorumTimer
。 - 如果計時器在重置前就超時了(通過?
timeUntilCheckQuorumExpires
?判斷),Leader 就會退位。
集群成員變更(Reconfiguration)
Leader 負責協調集群成員的變更。當收到增加/移除 Voter 的請求時,它會創建?AddVoterHandlerState
?或?RemoveVoterHandlerState
?來管理這個過程,這通常是一個兩階段的過程,以確保變更的安全性。
主動辭職(Resignation)
通過調用?requestResign()
?方法,可以將?resignRequested
?標志位設為?true
。Leader 在其主循環中會檢查這個標志,如果為?true
,它會嘗試將領導權平滑地轉移給一個日志最全的 Follower,然后自己退位。
狀態轉換
Leader 狀態不是永久的。在以下情況下,Leader 會退位并轉換到?FollowerState
:
- 失去法定人數(Quorum):?
checkQuorumTimer
?超時,表明 Leader 與大多數 Follower 失聯。 - 發現更高任期: 收到來自任何節點(Candidate 或其他 Leader)的、帶有更高?
epoch
?的消息。這是 Raft 協議的核心規則,確保系統中永遠只有一個合法的 Leader。 - 主動辭職:?
resignRequested
?被觸發。
總結
LeaderState
?是 KRaft 協議的“引擎”。它是一個高度復雜但職責清晰的狀態,體現了分布式共識協議的核心思想:
- 中心化協調: Leader 作為唯一的寫入口和協調者,簡化了系統的設計。
- 安全性: 通過嚴格的高水位計算和任期規則,保證了已提交日志的不可逆轉性。
- 活性(Liveness): 通過心跳和 Quorum 檢查機制,確保當 Leader 失效或網絡分區時,集群能夠及時發現并選舉出新的 Leader,保證服務的可用性。
- 可擴展性: 封裝了動態成員變更的邏輯,使得集群可以在線擴縮容。
理解了?LeaderState
?的工作原理,就掌握了 KRaft 協議最核心的運作機制。
UnattachedState
、ResignedState
?和?ProspectiveState
?
這三個在 KRaft 中非常重要的瞬時或過渡狀態。它們雖然不像?FollowerState
、CandidateState
?和?LeaderState
?那樣是 Raft 協議的經典狀態,但在 Kafka 的實現中起到了關鍵的粘合與緩沖作用。
ProspectiveState
?(預備/勘探狀態)
ProspectiveState
?是 Raft 選舉過程中的一個前置狀態,位于?FollowerState
?和?CandidateState
?之間。它的引入主要是為了解決 Raft 協議中的一個潛在問題:擾亂性選舉(Disruptive Elections)。
作用與目的:
預投票(Pre-Vote)階段: 這是?
ProspectiveState
?的核心職責。當一個節點(比如節點 A)的選舉計時器超時,它不會立即進入?CandidateState
?并增加自己的任期號(epoch)。相反,它會進入?ProspectiveState
,并以當前的任期號向其他節點發送“預投票”請求(Pre-Vote Request)。避免不必要的選舉:
- 場景: 假設節點 A 只是暫時與 Leader 網絡隔離,但 Leader 和集群的大多數節點通信正常。如果沒有?
ProspectiveState
,節點 A 會立即成為 Candidate,增加任期號,并發起一輪正式選舉。這會導致正在正常工作的 Leader 收到更高任期的投票請求而下臺,從而對整個集群造成不必要的干擾。 - 有了?
ProspectiveState
: 節點 A 發送的預投票請求會被其他節點拒絕,因為它無法證明自己的日志比其他節點更新。收到多數拒絕后,節點 A 就知道自己不適合發起選舉,會重新回到?FollowerState
?或?UnattachedState
,等待 Leader 的心跳。這樣就避免了一次無效且具有破壞性的選舉。
- 場景: 假設節點 A 只是暫時與 Leader 網絡隔離,但 Leader 和集群的大多數節點通信正常。如果沒有?
狀態轉換路徑:
- Follower/Unattached -> Prospective: 當選舉計時器超時。
- Prospective -> Candidate: 如果收到了大多數節點的預投票贊成票,證明自己有很大希望能贏得選舉,此時才會正式轉換到?
CandidateState
,增加任期號,并發起正式投票。 - Prospective -> Follower/Unattached: 如果預投票被大多數節點拒絕,或者在預投票期間收到了合法 Leader 的消息,則會放棄選舉,轉換回 Follower 或 Unattached 狀態。
// ... existing code ...
public class ProspectiveState implements NomineeState {
// ... existing code .../*** The lifetime of a prospective state is the following.** 1. Once started, it will send prevote requests and keep record of the received vote responses* 2. If it receives a message denoting a leader with a higher epoch, it will transition to follower state.* 3. If majority votes granted, it will transition to candidate state.* 4. If majority votes rejected or election times out, it will transition to unattached or follower state* depending on if it knows the leader id and endpoints or not*/
// ... existing code ...
}
這段注釋清晰地描述了?ProspectiveState
?的生命周期和作用。
UnattachedState
?(未連接/游離狀態)
UnattachedState
?是一個非常基礎的初始狀態或回退狀態。當一個節點啟動時,或者當它與集群失去聯系(比如不知道當前的 Leader 是誰)時,就會進入這個狀態。
作用與目的:
啟動時的初始狀態: 當一個 Raft 節點剛啟動時,它不知道集群的當前狀態(誰是 Leader,任期號是多少)。此時它就處于?
UnattachedState
,像一個“局外人”一樣,等待集群中其他節點的消息。失去 Leader 后的回退狀態: 如果一個 Follower 長時間沒有收到 Leader 的心跳,它的選舉計時器會超時。在進入?
ProspectiveState
?之前,它可能會先短暫進入?UnattachedState
,表示“我不知道 Leader 是誰了”。等待信息: 在?
UnattachedState
?下,節點非常被動。它主要做的事情就是等待:- 等待其他節點發來的?
VoteRequest
,然后根據規則決定是否投票。 - 等待 Leader 發來的?
BeginQuorumEpoch
?或?Fetch
?響應,一旦收到,就能知道新的 Leader 和任期,并轉換到?FollowerState
。 - 自己的選舉計時器超時,然后轉換到?
ProspectiveState
?嘗試發起選舉。
- 等待其他節點發來的?
總結:?UnattachedState
?是一個“信息不足”的狀態,節點在此狀態下無法參與日志復制,只能被動地等待或主動發起選舉來重新融入集群。
ResignedState
?(已辭職狀態)
ResignedState
?是 Leader 主動下臺后進入的一個短暫的過渡狀態。
作用與目的:
平滑的領導權交接: 當 Leader 決定辭職時(例如,管理員觸發了下線操作,或者它發現自己不再是 Voter),它不會立即變成 Follower。它會先向所有 Follower 發送一個?
EndQuorumEpoch
?請求,通知它們“我不再是 Leader 了,你們可以開始新的選舉了”。進入?
ResignedState
: 發送完?EndQuorumEpoch
?請求后,Leader 會立即進入?ResignedState
。只讀和拒絕服務: 在?
ResignedState
?下,該節點會:- 拒絕所有新的寫請求。
- 繼續響應 Follower 的?
Fetch
?請求,但不會再有新的日志條目。 - 拒絕其他候選人的投票請求,因為它知道一輪新的選舉即將開始。
等待新 Leader:?
ResignedState
?的主要目的是等待。一旦它收到了來自新選舉產生的 Leader 的消息,它就會轉換到?FollowerState
,成為新 Leader 的一個普通 Follower。
總結:?ResignedState
?是一個優雅下臺的機制。它確保了 Leader 的辭職是一個主動、可控的過程,而不是一個突然的崩潰,從而讓集群能夠更平穩、快速地完成領導權交接。
整體關系
這三個狀態可以看作是標準 Raft 狀態機(Follower, Candidate, Leader)的“輔助輪”和“緩沖帶”,使得 KRaft 的狀態轉換更加健壯和優雅:
UnattachedState
?是起點或迷失點。ProspectiveState
?是從 Follower 到 Candidate 的審慎的跳板,防止魯莽的選舉。ResignedState
?是從 Leader 到 Follower 的平滑的滑梯,確保優雅的退位。