分區重分配指的是將分區的副本重新分配到不同的代理節點上。如果ZK節點中分區的副本的新副本集合和當前分區副本集合相同,這個分區就不需要重新分配了。
分區重分配是通過監聽ZK的 /admin/reassign_partitions 節點觸發的,Kafka也提供了相應的腳本工具進行分區重分配,使用方法如下:
./kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file XXX.json --execute
其中?XXX.json是分區重分配的JSON文件,格式如下:
{
"version":1,
"partitions":[
{"topic":"product", "partition":0, "replicas":[4,5,6]},
{"topic":"product", "partition":1, "replicas":[1,2,3]},
{"topic":"product", "partition":4, "replicas":[4,5,6]}
]}
假設主題 product 的分區數只有 {P0, P1},當執行上面的腳本時。此時會發現P4的分區對于 product 主題根本就不存在,此時就會忽略掉P4的副本遷移。對于P0和P1的副本重分配,可以簡單的理解為下面的過程。
分區重分配命令接收
當使用腳本提交分區重分配時,接收命令的是?kafka.admin.ReassignPartitionsCommand#executeAssignment():
def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L) {
val (partitionAssignment, replicaAssignment) = parseAndValidate(zkClient, reassignmentJsonString)
val adminZkClient = new AdminZkClient(zkClient)
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, adminClientOpt, partitionAssignment.toMap, replicaAssignment, adminZkClient)
// 如果當前有正在執行中的分區重分配,則終止當前提交
if (zkClient.reassignPartitionsInProgress()) {
println("There is an existing assignment running.")
reassignPartitionsCommand.maybeLimit(throttle)
} else {
printCurrentAssignment(zkClient, partitionAssignment.map(_._1.topic))
if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0)
println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))
/** 更新重分配數據至ZK */
if (reassignPartitionsCommand.reassignPartitions(throttle, timeoutMs)) {
println("Successfully started reassignment of partitions.")
} else
println("Failed to reassign partitions %s".format(partitionAssignment))
}
}
提交命令時,如果分區重分配還在進行,那么本次無法提交,意味著當前只能有一個執行的分區重分配。
重分配監聽執行整體流程
當?/admin/reassign_partitions 被修改后,監聽器會觸發?PartitionReassignment 事件,其代碼執行鏈如下所示:
下面我們看一下代碼執行流程的展開。
分區重分配流程
控制器事件模型中 PartitionReassignment 事件,會觸發調用processPartitionReassignment()。此時會注冊監聽ZK節點 /admin/reassign_partitions 變化,當重分配策略更新到ZK上時,該監聽器就會被觸發,然后執行分區重分配邏輯。
case PartitionReassignment =>
processPartitionReassignment()
private def processPartitionReassignment(): Unit = {
if (!isActive) return
/** 注冊 /admin/reassign_partitions 節點變化監聽 */
if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {
val partitionReassignment = zkClient.getPartitionReassignment
partitionReassignment.foreach { case (tp, newReplicas) =>
val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, tp)
/** 記錄正在遷移的分區副本 */
controllerContext.partitionsBeingReassigned.put(tp, ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler))
}
maybeTriggerPartitionReassignment(partitionReassignment.keySet)
}
}
前置判斷
對于是否需要分區重分配,在?maybeTriggerPartitionReassignment() 中做了一些判斷取舍,其代碼實現如下:
/**
* 如有下面情況發生,則不進行分區重分配:
* 1. Topic設置了刪除標識;
* 2. 新副本與已經存在的副本相同;
* 3. 分區所有新分配的副本都不存活;
* 上面的情況發生時, 會輸出一條日志, 并從ZK移除該分區副本的重分配記錄
*/
private def maybeTriggerPartitionReassignment(topicPartitions: Set[TopicPartition]) {
val partitionsToBeRemovedFromReassignment = scala.collection.mutable.Set.empty[TopicPartition]
topicPartitions.foreach { tp =>
if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {
/** 如果topic已經設置了刪除,不進行重分配(從需要副本遷移的集合中移除) */
partitionsToBeRemovedFromReassignment.add(tp)
} else {
val reassignedPartitionContext = controllerContext.partitionsBeingReassigned.get(tp).getOrElse {
throw new IllegalStateException(s"Initiating reassign replicas for partition $tp not present in " +
s"partitionsBeingReassigned: ${controllerContext.partitionsBeingReassigned.mkString(", ")}")
}
val newReplicas = reassignedPartitionContext.newReplicas
val topic = tp.topic
val assignedReplicas = controllerContext.partitionReplicaAssignment(tp)
if (assignedReplicas.nonEmpty) {
if (assignedReplicas == newReplicas) {
/** 新副本與已經存在的副本相同,不進行重分配 */
partitionsToBeRemovedFromReassignment.add(tp)
} else {
try {
/** 注冊ISR變化監聽 */
reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)
/** 設置正在遷移的副本不能刪除 */
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic), reason = "topic reassignment in progress")
/** 執行重分配 */
onPartitionReassignment(tp, reassignedPartitionContext)
} catch {
}
}
} else {
/** 分區副本都不存活,不進行重分配 */
partitionsToBeRemovedFromReassignment.add(tp)
}
}
}
removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment)
}
對于前置校驗的流程如下:
1、如果topic已經設置了刪除,不進行重分配(從需要副本遷移的集合中移除);
2、如果分區副本都不存活,不進行重分配;
3、如果新副本與已經存在的副本相同,不進行重分配;
4、注冊ISR變化監聽;
5、設置將要遷移的副本為不能刪除;
6、調用 onPartitionReassignment() 執行重分配。
執行分區重分配
分區重分配的執行是在 onPartitionReassignment() 中實現的,下面說明一下官方給出的幾個技術名詞:
RAR:新分配的副本列表;
OAR:原先的分區副本列表;
AR:當前副本列表,隨著分配過程不斷變化;
RAR-OAR:RAR與OAR的差集,即需要創建、數據遷移的新副本;
OAR-RAR:OAR與RAR的差集,即遷移后需要下線的副本。
重分配的具體代碼實現如下所示:
/**
* 當需要進行分區重分配時, 會在[/admin/reassign_partitions]下創建一個節點來觸發操作
* RAR: 重新分配的副本, OAR: 分區原副本列表, AR: 當前的分配的副本
*/
private def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
if (!areReplicasInIsr(topicPartition, reassignedReplicas)) {
/** 新分配的并沒有全在ISR中 */
/** RAR-OAR */
val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicPartition).toSet
/** RAR+OAR */
val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicPartition)).toSet
/** 1.將AR更新為OAR + RAR */
updateAssignedReplicasForPartition(topicPartition, newAndOldReplicas.toSeq)
/** 2.向上面AR(OAR+RAR)中的所有副本發送LeaderAndIsr請求 */
updateLeaderEpochAndSendRequest(topicPartition, controllerContext.partitionReplicaAssignment(topicPartition), newAndOldReplicas.toSeq)
/** 3.新分配的副本狀態更新為NewReplica(第2步中發送LeaderAndIsr時, 新副本會開始創建并且同步數據)*/
startNewReplicasForReassignedPartition(topicPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
} else {
/** 4.等待所有的RAR都在ISR中 */
/** OAR - RAR */
val oldReplicas = controllerContext.partitionReplicaAssignment(topicPartition).toSet -- reassignedReplicas.toSet
/** 5.將副本狀態設置為OnlineReplica */
reassignedReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicPartition, replica)), OnlineReplica)
}
/** 6.將上下文中的AR設置為RAR */
/** 7.新加入的副本已經同步完成, LeaderAndIsr都更新到最新的結果 */
moveReassignedPartitionLeaderIfRequired(topicPartition, reassignedPartitionContext)
/** 8-9.將舊的副本下線 */
stopOldReplicasOfReassignedPartition(topicPartition, reassignedPartitionContext, oldReplicas)
/** 10.將ZK中的AR設置為RAR */
updateAssignedReplicasForPartition(topicPartition, reassignedReplicas)
/** 11.分區重分配完成, 從ZK /admin/reassign_partitions 節點刪除遷移報文 */
removePartitionsFromReassignedPartitions(Set(topicPartition))
/** 12.發送metadata更新請求給所有存活的broker */
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
/** 如果topic標記了刪除, 此時喚醒刪除線程*/
topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic))
}
}
上面代碼執行的過程描述如下:
1.將AR更新為OAR+RAR;
2.向上面AR(OAR+RAR)中的所有副本發送LeaderAndIsr請求;
3.新分配的副本狀態更新為NewReplica(第2步中發送LeaderAndIsr時, 新副本會開始創建并且同步數據);
4.等待所有的RAR都在ISR中;
5.將副本狀態設置為OnlineReplica;
6.將上下文中的AR設置為RAR;
7.新加入的副本已經同步完成, LeaderAndIsr都更新到最新的結果;
8-9.將舊的副本下線;
10.將ZK中的AR設置為RAR;
11.分區重分配完成, 從ZK /admin/reassign_partitions 節點刪除遷移報文;
12.發送metadata更新請求給所有存活的broker;
重分配簡單描述
通過代碼層面看起來不是很好理解,下面簡單描述一下執行過程:
1、創建新的副本,開始同步數據,等所有新副本都加入了ISR后,在RAR中進行Leader選舉;
2、下線不需要的副本(OAR-RAR),下線完成后將AR(即RAR)信息更新到ZK中;
3、發送LeaderAndIsr給存活broker。
假如初始情況下,分區副本在 {1,2,3} 三個 Broker 上;重分配之后在{4,5,6}上,此時變化過程如下圖所示:
參考:《Kafka技術內幕》、《Apache Kafka 源碼剖析》、Kafka源碼