kafka 重新分配節點_Kafka控制器-分區重分配

分區重分配指的是將分區的副本重新分配到不同的代理節點上。如果ZK節點中分區的副本的新副本集合和當前分區副本集合相同,這個分區就不需要重新分配了。

分區重分配是通過監聽ZK的 /admin/reassign_partitions 節點觸發的,Kafka也提供了相應的腳本工具進行分區重分配,使用方法如下:

./kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file XXX.json --execute

其中?XXX.json是分區重分配的JSON文件,格式如下:

{

"version":1,

"partitions":[

{"topic":"product", "partition":0, "replicas":[4,5,6]},

{"topic":"product", "partition":1, "replicas":[1,2,3]},

{"topic":"product", "partition":4, "replicas":[4,5,6]}

]}

假設主題 product 的分區數只有 {P0, P1},當執行上面的腳本時。此時會發現P4的分區對于 product 主題根本就不存在,此時就會忽略掉P4的副本遷移。對于P0和P1的副本重分配,可以簡單的理解為下面的過程。

分區重分配命令接收

當使用腳本提交分區重分配時,接收命令的是?kafka.admin.ReassignPartitionsCommand#executeAssignment():

def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L) {

val (partitionAssignment, replicaAssignment) = parseAndValidate(zkClient, reassignmentJsonString)

val adminZkClient = new AdminZkClient(zkClient)

val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, adminClientOpt, partitionAssignment.toMap, replicaAssignment, adminZkClient)

// 如果當前有正在執行中的分區重分配,則終止當前提交

if (zkClient.reassignPartitionsInProgress()) {

println("There is an existing assignment running.")

reassignPartitionsCommand.maybeLimit(throttle)

} else {

printCurrentAssignment(zkClient, partitionAssignment.map(_._1.topic))

if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0)

println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))

/** 更新重分配數據至ZK */

if (reassignPartitionsCommand.reassignPartitions(throttle, timeoutMs)) {

println("Successfully started reassignment of partitions.")

} else

println("Failed to reassign partitions %s".format(partitionAssignment))

}

}

提交命令時,如果分區重分配還在進行,那么本次無法提交,意味著當前只能有一個執行的分區重分配。

重分配監聽執行整體流程

當?/admin/reassign_partitions 被修改后,監聽器會觸發?PartitionReassignment 事件,其代碼執行鏈如下所示:

下面我們看一下代碼執行流程的展開。

分區重分配流程

控制器事件模型中 PartitionReassignment 事件,會觸發調用processPartitionReassignment()。此時會注冊監聽ZK節點 /admin/reassign_partitions 變化,當重分配策略更新到ZK上時,該監聽器就會被觸發,然后執行分區重分配邏輯。

case PartitionReassignment =>

processPartitionReassignment()

private def processPartitionReassignment(): Unit = {

if (!isActive) return

/** 注冊 /admin/reassign_partitions 節點變化監聽 */

if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {

val partitionReassignment = zkClient.getPartitionReassignment

partitionReassignment.foreach { case (tp, newReplicas) =>

val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, tp)

/** 記錄正在遷移的分區副本 */

controllerContext.partitionsBeingReassigned.put(tp, ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler))

}

maybeTriggerPartitionReassignment(partitionReassignment.keySet)

}

}

前置判斷

對于是否需要分區重分配,在?maybeTriggerPartitionReassignment() 中做了一些判斷取舍,其代碼實現如下:

/**

* 如有下面情況發生,則不進行分區重分配:

* 1. Topic設置了刪除標識;

* 2. 新副本與已經存在的副本相同;

* 3. 分區所有新分配的副本都不存活;

* 上面的情況發生時, 會輸出一條日志, 并從ZK移除該分區副本的重分配記錄

*/

private def maybeTriggerPartitionReassignment(topicPartitions: Set[TopicPartition]) {

val partitionsToBeRemovedFromReassignment = scala.collection.mutable.Set.empty[TopicPartition]

topicPartitions.foreach { tp =>

if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) {

/** 如果topic已經設置了刪除,不進行重分配(從需要副本遷移的集合中移除) */

partitionsToBeRemovedFromReassignment.add(tp)

} else {

val reassignedPartitionContext = controllerContext.partitionsBeingReassigned.get(tp).getOrElse {

throw new IllegalStateException(s"Initiating reassign replicas for partition $tp not present in " +

s"partitionsBeingReassigned: ${controllerContext.partitionsBeingReassigned.mkString(", ")}")

}

val newReplicas = reassignedPartitionContext.newReplicas

val topic = tp.topic

val assignedReplicas = controllerContext.partitionReplicaAssignment(tp)

if (assignedReplicas.nonEmpty) {

if (assignedReplicas == newReplicas) {

/** 新副本與已經存在的副本相同,不進行重分配 */

partitionsToBeRemovedFromReassignment.add(tp)

} else {

try {

/** 注冊ISR變化監聽 */

reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)

/** 設置正在遷移的副本不能刪除 */

topicDeletionManager.markTopicIneligibleForDeletion(Set(topic), reason = "topic reassignment in progress")

/** 執行重分配 */

onPartitionReassignment(tp, reassignedPartitionContext)

} catch {

}

}

} else {

/** 分區副本都不存活,不進行重分配 */

partitionsToBeRemovedFromReassignment.add(tp)

}

}

}

removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment)

}

對于前置校驗的流程如下:

1、如果topic已經設置了刪除,不進行重分配(從需要副本遷移的集合中移除);

2、如果分區副本都不存活,不進行重分配;

3、如果新副本與已經存在的副本相同,不進行重分配;

4、注冊ISR變化監聽;

5、設置將要遷移的副本為不能刪除;

6、調用 onPartitionReassignment() 執行重分配。

執行分區重分配

分區重分配的執行是在 onPartitionReassignment() 中實現的,下面說明一下官方給出的幾個技術名詞:

RAR:新分配的副本列表;

OAR:原先的分區副本列表;

AR:當前副本列表,隨著分配過程不斷變化;

RAR-OAR:RAR與OAR的差集,即需要創建、數據遷移的新副本;

OAR-RAR:OAR與RAR的差集,即遷移后需要下線的副本。

重分配的具體代碼實現如下所示:

/**

* 當需要進行分區重分配時, 會在[/admin/reassign_partitions]下創建一個節點來觸發操作

* RAR: 重新分配的副本, OAR: 分區原副本列表, AR: 當前的分配的副本

*/

private def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext) {

val reassignedReplicas = reassignedPartitionContext.newReplicas

if (!areReplicasInIsr(topicPartition, reassignedReplicas)) {

/** 新分配的并沒有全在ISR中 */

/** RAR-OAR */

val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicPartition).toSet

/** RAR+OAR */

val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicPartition)).toSet

/** 1.將AR更新為OAR + RAR */

updateAssignedReplicasForPartition(topicPartition, newAndOldReplicas.toSeq)

/** 2.向上面AR(OAR+RAR)中的所有副本發送LeaderAndIsr請求 */

updateLeaderEpochAndSendRequest(topicPartition, controllerContext.partitionReplicaAssignment(topicPartition), newAndOldReplicas.toSeq)

/** 3.新分配的副本狀態更新為NewReplica(第2步中發送LeaderAndIsr時, 新副本會開始創建并且同步數據)*/

startNewReplicasForReassignedPartition(topicPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)

} else {

/** 4.等待所有的RAR都在ISR中 */

/** OAR - RAR */

val oldReplicas = controllerContext.partitionReplicaAssignment(topicPartition).toSet -- reassignedReplicas.toSet

/** 5.將副本狀態設置為OnlineReplica */

reassignedReplicas.foreach { replica =>

replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicPartition, replica)), OnlineReplica)

}

/** 6.將上下文中的AR設置為RAR */

/** 7.新加入的副本已經同步完成, LeaderAndIsr都更新到最新的結果 */

moveReassignedPartitionLeaderIfRequired(topicPartition, reassignedPartitionContext)

/** 8-9.將舊的副本下線 */

stopOldReplicasOfReassignedPartition(topicPartition, reassignedPartitionContext, oldReplicas)

/** 10.將ZK中的AR設置為RAR */

updateAssignedReplicasForPartition(topicPartition, reassignedReplicas)

/** 11.分區重分配完成, 從ZK /admin/reassign_partitions 節點刪除遷移報文 */

removePartitionsFromReassignedPartitions(Set(topicPartition))

/** 12.發送metadata更新請求給所有存活的broker */

sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))

/** 如果topic標記了刪除, 此時喚醒刪除線程*/

topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic))

}

}

上面代碼執行的過程描述如下:

1.將AR更新為OAR+RAR;

2.向上面AR(OAR+RAR)中的所有副本發送LeaderAndIsr請求;

3.新分配的副本狀態更新為NewReplica(第2步中發送LeaderAndIsr時, 新副本會開始創建并且同步數據);

4.等待所有的RAR都在ISR中;

5.將副本狀態設置為OnlineReplica;

6.將上下文中的AR設置為RAR;

7.新加入的副本已經同步完成, LeaderAndIsr都更新到最新的結果;

8-9.將舊的副本下線;

10.將ZK中的AR設置為RAR;

11.分區重分配完成, 從ZK /admin/reassign_partitions 節點刪除遷移報文;

12.發送metadata更新請求給所有存活的broker;

重分配簡單描述

通過代碼層面看起來不是很好理解,下面簡單描述一下執行過程:

1、創建新的副本,開始同步數據,等所有新副本都加入了ISR后,在RAR中進行Leader選舉;

2、下線不需要的副本(OAR-RAR),下線完成后將AR(即RAR)信息更新到ZK中;

3、發送LeaderAndIsr給存活broker。

假如初始情況下,分區副本在 {1,2,3} 三個 Broker 上;重分配之后在{4,5,6}上,此時變化過程如下圖所示:

參考:《Kafka技術內幕》、《Apache Kafka 源碼剖析》、Kafka源碼

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/385997.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/385997.shtml
英文地址,請注明出處:http://en.pswp.cn/news/385997.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

7天拿到阿里安卓崗位offer,統統給你解決!

開頭 技術的發展產生了程序員這個職位,從這些年各大互聯網公司曝光的一些員工收入水平來看,程序員的工資還是相對比較高的,可是我們在互聯網上還聽到了另外一種聲音,很多程序員想轉行,特別是大齡程序員,這…

python mysqldb 查詢不到最新記錄_python – MySQLdb是否緩存SELECT結果?

我正在循環中運行SELECT查詢.偶爾,數據庫表會更新(由另一個程序).第一個SELECT檢索正確的數據,但循環中的其他調用返回第一個值.如何檢索最新數據?到目前為止我找到的唯一解決方法是在每次迭代時重新連接到數據庫!在我的例子中,取消注釋#1#和#2#的注釋.僅…

7天拿到阿里安卓崗位offer,附高頻面試題合集

前言 眾所周知,Android是一個基于Linux實現的操作系統。但對于Linux內核來說,Android也僅僅只是一個運行在內核之上的應用程序,與其他運行在內核之上的應用程序沒有任何區別。 所以Android也需要運行環境,需要Linux內核在啟動完…

找零錢問題

最近在做華為機試體驗題,遇到一個“找零錢”的題目,如下 想起之前在牛客網上看到左程云老師講過的動態規劃問題,很像,題目如下: 有數組penny,penny中所有的值都為正數且不重復。每個值代表一種面值的貨幣&…

vga焊接線順序_焊接工藝問答,不做焊接也要收藏起來

點 機械前沿”關注置頂引領機械前沿、機械視頻,汽車、加工技術、3D打印、自動化、機器人、生產工藝、軸承、模具、機床、鈑金等行業前沿在這里等你 焊接工藝問答1.什么叫焊接條件?它有哪些內容??答:焊…

7年老Android一次操蛋的面試經歷,揮淚整理面經

看到還有很多程序員連面試流程都沒有徹底弄清楚,今天,我們以阿里為例,來聊聊互聯網大廠的面試流程和過程! 本篇主要還是聊聊社招的面試過程!阿里以及其他的互聯網大廠的技術類社招面試,通常情況是 4 個輪次…

gin context和官方context_Go Web 小技巧(一)簡化Gin接口代碼

不知道大家在使用 Gin 構建 API 服務時有沒有這樣的問題:參數綁定的環節可不可以自動處理?錯誤可不可以直接返回,不想寫空 return, 漏寫就是 bug本文通過簡單地封裝,利用 go 的接口特性,提供一個解決上述兩個問題的思路一、解決過…

7年老Android一次操蛋的面試經歷,深度好文

Java基礎 Java Object類方法HashMap原理,Hash沖突,并發集合,線程安全集合及實現原理HashMap 和 HashTable 區別HashCode 作用,如何重載hashCode方法ArrayList與LinkList區別與聯系GC機制Java反射機制,Java代理模式Jav…

Hadoop大數據應用生態圈中最主要的組件及其關系

Hadoop Common Hadoop Common是在Hadoop0.2版本之后分離出來的HDFS和MapReduce獨立子項目的內容,是Hadoop的核心部分,能為其他模塊提供一些常用工具集,如序列化機制、Hadoop抽象文件系統FileSystem、系統配置工具Configuration,并…

7年老Android一次操蛋的面試經歷,系列教學

公司的需求 不同的公司,不同的需求現在的市場上,公司很多,大致上可以歸納為兩個大類:大公司和小公司,他們招聘時對人才的需求也不一樣。 小公司 小公司他們一般急需的是能夠投入工作的人才,因為公司規模…

丁香園 武漢 神童_杭州、武漢、成都哪個城市更適合程序員發展

很多朋友討論起房價和職業發展機會,都會提到這三個城市,有的人認為目前杭州房價太貴了,生活成本高,華中的武漢和西部崛起的成都都在鼓勵高新技術發展并且有了一定成果,在選擇職業發展和定居城市之間該如何取舍呢&#…

Windows 7 64位系統上搭建Hadoop偽分布式環境(很詳細)

在開始配置前,我們先了解Hadoop的三種運行模式。 Hadoop的三種運行模式 獨立(或本地)模式:無需運行任何守護進程,所有程序都在同一個JVM上執行。在獨立模式下測試和調試MapReduce程序很方便,因此該模式在…

7年老Android一次操蛋的面試經歷,講的太透徹了

由于涉及到的面試題較多導致篇幅較長,我根據這些面試題所涉及到的常問范圍總結了并做出了一份學習進階路線圖???????及面試題答案免費分享給大家,文末有免費領取方式! View面試專題 View的滑動方式View的事件分發機制View的加載流程…

處理效應模型stata實例_stata︱政策處理效應模型sata基本命令匯總

本文來源經管之家論壇,由壇友cuifengbao歸納 Use ,文件名.dta,clear Ssc installpamatch2,replace 一、首先做一元回歸 reg 結果變量 處理變量,r 二、直接引入協變量,再做多元回歸 reg 結果變量 處理變量 協變量1 協變量2 協變量3……,r 三、接下來進行傾向得分匹配 1.將數…

80后程序員月薪30K+感慨中年危機,面試必問!

說說程序猿行業 現在社會上給IT行業貼上了幾個標簽:高薪、高危、高大上、禿頂(哈哈)。這些標簽我相比大家都比較清楚,至于為什么是這些標簽呢?而且這些標簽是真實還是假象呢? 高薪 作為IT行業來說&#…

華為照片在哪個文件夾_原來華為手機還能這樣清理垃圾,怪不得你的手機可以多用5年...

對于目前市場上的智能手機來說,大家的手機功能都是差不多的,除了一些外觀上的差別之外,最大的區別就是手機的內存,但是很多朋友卻表示手機內存很大,但是沒用多久,手機就會出現卡頓或者是運行速度變慢的現象…

996頁阿里Android面試真題解析火爆全網,全網首發!

在安卓系統中: 當系統內存不足時,Android系統將根據進程的優先級選擇殺死一 些不太重要的進程,優先級低的先殺死。進程優先級從高到低如下。 前臺進程 處于正在與用戶交互的activity與前臺activity綁定的service調用了startForeground&…

python不適合大型項目_在大型項目上,Python 是個爛語言嗎? |

【洪強寧的回答(89票)】:太多硬傷和臆想,懶得批。只說“代碼超過 10w 以后你就別想用 python 開發了”這一句,2012年4月豆瓣主站項目代碼行數就近50萬行了,可我們還在用 python 開發。【劉鑫的回答(42票)】:我寫過幾年Python,也寫…

996頁阿里Android面試真題解析火爆全網,分享面經!

導語 學歷永遠是橫在我們進人大廠的一道門檻,好像無論怎么努力,總能被那些985,211 按在地上摩擦! 不僅要被“他們”看不起,在HR挑選簡歷,學歷這塊就直接被刷下去了,連證明自己的機會也沒有,學…

access ole 對象 最大長度_Redis 數據結構和對象系統,有這 12 張圖就夠了!

作者 | 程序員歷小冰責編 | 林瑟Redis 是一個開源的 key-value 存儲系統,它使用六種底層數據結構構建了包含字符串對象、列表對象、哈希對象、集合對象和有序集合對象的對象系統。 今天我們就通過 12 張圖來全面了解一下它的數據結構和對象系統的實現原理。01數據結…