文章目錄
- 一、把“家伙事兒”都備齊
- 二、部署其實很簡單
- 三、配置 MirrorMaker2
- 四、修改啟動腳本
- 五、集群啟動與驗證
- 六、這集群“結實”嗎?聊聊它的高可用
- 它沒有“大腦”,但活得很好
- 極限測試:干掉兩個節點會怎樣?
- 寫在最后
最近在跟 Kafka 死磕,想著搭一個跨機房的數據同步方案,MirrorMaker2 自然就成了首選。所以,我決定自己從頭到尾摸索一遍,把整個過程記錄下來,權當是寫給未來自己的備忘錄,也希望能給同樣在折騰的你一點點啟發。
同時,這一篇也承接 【Kafka】深入理解 Kafka MirrorMaker2 - 理論篇,如果您對 Kafka MirrorMaker2 知之甚少,可以先閱讀我的這篇博客。
一、把“家伙事兒”都備齊
開工前,先得把需要的軟件包都下載下來。我用的是 Kafka 3.8.1 版本。另外,為了監控,還需要一個 Prometheus 的 JMX Exporter Agent。(3.9.0 版本的 mirrormaker2 還有 bug,社區已經在修復)。
- Kafka 安裝包: kafka_2.13-3.8.1.tgz
- Prometheus JMX Exporter: jmx_prometheus_javaagent-1.0.1.jar
我的計劃是在三臺機器上部署,組成一個 MirrorMaker2 集群,這樣就算掛了一臺,服務也能繼續跑,穩!
二、部署其實很簡單
拿到安裝包后,事情就簡單了。我把 kafka_2.13-3.8.1.tgz
分別上傳到三臺服務器的 /opt/kafka
目錄下解壓。
解壓后,目錄結構大概是這樣:
/opt/kafka/kafka_2.13-3.8.1/
├── bin
├── config
├── libs
├── ... (其他文件)
接下來是關鍵一步,為了讓 Prometheus 能抓到 MirrorMaker2 的監控指標,我把下載好的 jmx_prometheus_javaagent-1.0.1.jar
扔進了 libs
目錄。三臺機器都要做同樣的操作。
# 假設 jar 包在當前目錄
mv jmx_prometheus_javaagent-1.0.1.jar /opt/kafka/kafka_2.13-3.8.1/libs/
三、配置 MirrorMaker2
軟件部署好了,但它還不知道要干嘛。接下來就得靠配置文件來告訴它。我主要修改了 config/connect-mirror-maker.properties
這個文件。
這是我的配置,里面加了一些注釋來解釋每個參數是干嘛的。
# 定義兩個集群的別名,后面都用這個別名來引用
clusters = clusterA, clusterB# --- 源集群 (clusterA) 和目標集群 (clusterB) 的基本信息 ---
# 這是 MirrorMaker2 內部消費者組的 ID,隨便起個有意義的名字就行
clusterA.group.id = clusterA-clusterB-clusterAconsumer
clusterB.group.id = clusterA-clusterB-clusterBconsumer# MirrorMaker2 實例的名字
name = clusterA-clusterB
# 定義誰是源,誰是目標
source.cluster.alias = clusterA
target.cluster.alias = clusterB# 最大并發任務數,可以根據 Topic 分區數和機器性能來調整
tasks.max = 8
# ? tasks.max 詳解:
# 這個參數定義了該連接器可以創建的最大并發任務數,但實際任務數可能更少
# 影響因素:
# 1. 源Topic分區總數:實際任務數不會超過所有匹配topic的分區總和(這是最主要的限制因素)
# 注意:這里指的是所有匹配 prod.*.global 正則表達式的topic的分區數總和,不是單個topic的最大分區數
# 2. 集群節點數:任務會分配到不同節點執行
# 3. 系統資源:CPU核心數、內存大小、網絡帶寬
# 4. 連接器類型:不同連接器有不同的并行化能力
# 設置原則:建議先根據源topic分區數設置,然后根據實際性能表現調整
# 我這里設置8是因為:預估匹配的所有topic總共有30-50個分區,8個任務可以合理分配負載# --- 集群連接信息 ---
# 兩個集群的 broker 地址
clusterA.bootstrap.servers = brokerA-1:9092, brokerA-2:9092, brokerA-3:9092
clusterB.bootstrap.servers = brokerB-1:9092, brokerB-2:9092, brokerB-3:9092# --- 核心:定義同步規則 ---
# 啟用從 clusterA 到 clusterB 的同步
clusterA->clusterB.enabled = true# 正則表達式,匹配需要同步的 topic,我這里同步所有以 .global 結尾的 prod 前綴的 topic
clusterA->clusterB.topics = prod.*.global$
# 正則表達式,排除掉一些不需要的 topic,比如 MirrorMaker 內部的心跳 topic(實際測試中,發現如果不加這段配置,heartbears topic會循環鏡像)
clusterA->clusterB.topics.exclude = ^(clusterA|clusterB).*heartbeats$# 正則表達式,匹配需要同步的消費者組,這樣消費位點也能同步過去
clusterA->clusterB.groups = ^produser_consumer.*# 我這里只做單向同步,所以把反向的關掉
clusterB->clusterA.enabled = false# --- 內部 Topic 和可靠性配置 ---
# 在目標集群創建的 topic 的副本數,生產環境必須大于1,我設為3
clusterA->clusterB.replication.factor = 3
checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
offset-syncs.topic.replication.factor = 3# Connect 框架內部也需要一些 topic 來存配置、位點、狀態,副本數同樣設為3
offset.storage.replication.factor = 3
status.storage.replication.factor = 3
config.storage.replication.factor = 3
config.storage.topic = mm2-config.from-clusterA-to-clusterB.internal
offset.storage.topic = mm2-offset.from-clusterA-to-clusterB.internal
status.storage.topic = mm2-status.from-clusterA-to-clusterB.internal# --- 其他高級同步選項 ---
# 啟用 ACL 同步、位點同步等
clusterA->clusterB.sync.topic.acls.enabled = true
clusterA->clusterB.emit.checkpoints.enabled = true
clusterA->clusterB.emit.offset-syncs.enabled = true
clusterA->clusterB.sync.group.offsets.enabled = true
# 定期刷新 topic 和 group 列表的時間間隔
clusterA->clusterB.refresh.topics.interval.seconds = 300
clusterA->clusterB.refresh.groups.interval.seconds = 300
clusterA->clusterB.sync.topic.acls.interval.seconds = 300# --- 安全配置 ---
# 如果你的 Kafka 集群啟用了 SASL/PLAIN 認證,就需要加上這些
clusterA.sasl.mechanism=PLAIN
clusterA.security.protocol=SASL_PLAINTEXT
clusterA.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="your_user" password="your_password";clusterB.sasl.mechanism=PLAIN
clusterB.security.protocol=SASL_PLAINTEXT
clusterB.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="your_user" password="your_password";
除了這個,JMX Exporter 也需要一個簡單的配置文件 config/jmx_exporter.yml
,我這里就用了個最基礎的配置,讓它把所有 MBean 都暴露出來。
---
# 這個文件可以留空,或者只寫一個空的 yaml 對象 {}
# 也可以使用官方提供的 JMX MBeans
四、修改啟動腳本
萬事俱備,只欠東風。這個“東風”就是啟動腳本。我沒有直接用官方的 connect-mirror-maker.sh
,而是復制了一份,然后做了點修改,讓它加載 JMX Exporter。
這是我的 bin/connect-mirror-maker.sh
腳本:
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# ... (省略官方的版權信息) ...if [ $# -lt 1 ];
thenecho "USAGE: $0 [-daemon] mm2.properties"exit 1
fibase_dir=$(dirname $0)# 設置日志目錄
if [ "x$LOG_DIR" = "x" ]; thenexport LOG_DIR="/var/log/kafka"
fi# 設置 Log4j 配置文件
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; thenexport KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties"
fi# 設置 JVM 堆大小,這個根據服務器內存來定
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx14336m -Xms14336m"
fi# !!! 最核心的修改在這里 !!!
# 通過 javaagent 參數加載 JMX Exporter,并指定端口 7070 和配置文件
export KAFKA_OPTS="$KAFKA_OPTS -javaagent:$base_dir/../libs/jmx_prometheus_javaagent-1.0.1.jar=7070:$base_dir/../config/jmx_exporter.yml"EXTRA_ARGS=${EXTRA_ARGS-'-name mirrorMaker'}COMMAND=$1
case $COMMAND in-daemon)EXTRA_ARGS="-daemon "$EXTRA_ARGSshift;;*);;
esac# 執行官方的啟動類
exec $(dirname $0)/kafka-run-class.sh $EXTRA_ARGS org.apache.kafka.connect.mirror.MirrorMaker "$@"
最重要的就是 export KAFKA_OPTS
那一行,它在啟動 MirrorMaker2 的 JVM 進程時,動態地掛載了 JMX Exporter Agent,這樣 Prometheus 就能通過 7070
端口來抓取監控數據了。
五、集群啟動與驗證
所有準備工作都完成了,我在三臺機器上都執行了同樣的啟動命令:
# 確保腳本有執行權限
chmod +x /opt/kafka/kafka_2.13-3.8.1/bin/connect-mirror-maker.sh# 后臺啟動 MirrorMaker2
/opt/kafka/kafka_2.13-3.8.1/bin/connect-mirror-maker.sh /opt/kafka/kafka_2.13-3.8.1/config/connect-mirror-maker.properties
啟動后,我做了幾件事來確認它是不是真的在工作:
-
看日志:
tail -f /var/log/kafka/connect.log
,看看有沒有報錯。 -
查 Topic:去目標集群
clusterB
上看看,是不是出現了clusterA.prod.some-topic.global
這樣的 Topic。 -
查 Metrics:在任意一臺 MirrorMaker2 的機器上,用
curl
訪問 JMX Exporter 的端口。curl http://localhost:7070
如果能看到一大堆以
kafka_
開頭的監控指標,那就說明 JMX Exporter 也正常工作了。接下來就可以在 Prometheus 里配置一個 Job,讓它來抓取這三臺機器的7070
端口,實現監控和告警。
六、這集群“結實”嗎?聊聊它的高可用
搭完三節點的集群后,我心里冒出一個問題:這玩意兒到底有多可靠?萬一哪天運維手滑,搞掛了一兩臺機器,我的數據同步任務會歇菜嗎?帶著這個疑問,我深入研究了一下它的工作模式。
它沒有“大腦”,但活得很好
我最開始有個誤解,以為 MirrorMaker2 集群內部也得像 ZooKeeper 那樣,搞個投票選舉,選個“老大”出來發號施令。結果發現,它根本沒有這種“大腦”或者“法定人數(Quorum)”的設定。
它的聰明之處在于,把自己完全“托管”給了 Kafka。它就是個“甩手掌柜”,把所有臟活累活都丟給了 Kafka Connect 框架和 Kafka 集群本身:
- “我們是一個團隊”:我的三個 MirrorMaker2 節點在啟動后,會向 Kafka 報到,說:“我們都是
clusterA-clusterB
這個組的成員”。于是 Kafka 的“小組長”(Group Coordinator)就把它們當成一個消費者團隊來管理。 - “工作日志”都記在 Kafka:所有的工作狀態,比如“我同步到哪里了?”(Offsets)、“團隊的規章制度是什么?”(Configs),全都記錄在 Kafka 的內部 Topic 里。只要 Kafka 集群本身是高可用的(這也是為什么我們把內部 Topic 的副本數設為3),這些工作日志就不會丟。
所以,MirrorMaker2 集群的可靠性,本質上依賴于你背后 Kafka 集群的可靠性。它自己是個“無狀態”的執行者,非常靈活。
極限測試:干掉兩個節點會怎樣?
理論歸理論,實踐出真知。如果我的三節點集群真的掛了兩個,會發生什么?
答案是:只要還有一個節點在喘氣,活兒就不會停!
整個過程就像這樣:
- Kafka 小組長發現有人曠工:Kafka 的 Group Coordinator 很快會發現:“咦,團隊里有兩個人沒心跳了!”
- 緊急召開重組會議 (Rebalance):小組長立刻組織剩下的人(也就是那個幸存的節點)開會,說:“情況有變,工作得重新分配。”
- 幸存者扛起所有:所有之前分配給那兩個失敗節點的工作任務,現在都會被打包,一股腦兒全部分配給這最后一個幸存的節點。
- 翻開工作日志,繼續干:這個“幸運兒”會先去內部 Topic 里翻看最新的工作日志,找到之前大家同步到的精確位置,然后接著干活,保證數據不重不漏。
一句話總結:
這種設計確保了服務的高可用性,哪怕是“傷亡慘重”到只剩一個節點,也能保證業務連續。當然,代價就是剩下的那個兄弟會壓力山大,數據同步可能會因為性能跟不上而產生延遲(Lag)。所以,生產環境中還是得盡快把掛掉的節點救活,讓團隊恢復完整的戰斗力。
寫在最后
整個過程下來,感覺 MirrorMaker2 還是挺強大的,配置項雖然多,但都很靈活。最大的收獲是把監控也一并搞定了,這樣服務上線后心里才有底。從下載軟件包到最后看到 Grafana 監控圖上跳動的曲線,整個過程充滿了解決問題的樂趣。希望這份記錄能幫到有緣人。