【Kafka】深入理解 Kafka MirrorMaker2 - 實戰篇

文章目錄

    • 一、把“家伙事兒”都備齊
    • 二、部署其實很簡單
    • 三、配置 MirrorMaker2
    • 四、修改啟動腳本
    • 五、集群啟動與驗證
    • 六、這集群“結實”嗎?聊聊它的高可用
      • 它沒有“大腦”,但活得很好
      • 極限測試:干掉兩個節點會怎樣?
    • 寫在最后

最近在跟 Kafka 死磕,想著搭一個跨機房的數據同步方案,MirrorMaker2 自然就成了首選。所以,我決定自己從頭到尾摸索一遍,把整個過程記錄下來,權當是寫給未來自己的備忘錄,也希望能給同樣在折騰的你一點點啟發。

同時,這一篇也承接 【Kafka】深入理解 Kafka MirrorMaker2 - 理論篇,如果您對 Kafka MirrorMaker2 知之甚少,可以先閱讀我的這篇博客。

一、把“家伙事兒”都備齊

開工前,先得把需要的軟件包都下載下來。我用的是 Kafka 3.8.1 版本。另外,為了監控,還需要一個 Prometheus 的 JMX Exporter Agent。(3.9.0 版本的 mirrormaker2 還有 bug,社區已經在修復)。

  • Kafka 安裝包: kafka_2.13-3.8.1.tgz
  • Prometheus JMX Exporter: jmx_prometheus_javaagent-1.0.1.jar

我的計劃是在三臺機器上部署,組成一個 MirrorMaker2 集群,這樣就算掛了一臺,服務也能繼續跑,穩!

二、部署其實很簡單

拿到安裝包后,事情就簡單了。我把 kafka_2.13-3.8.1.tgz 分別上傳到三臺服務器的 /opt/kafka 目錄下解壓。

解壓后,目錄結構大概是這樣:

/opt/kafka/kafka_2.13-3.8.1/
├── bin
├── config
├── libs
├── ... (其他文件)

接下來是關鍵一步,為了讓 Prometheus 能抓到 MirrorMaker2 的監控指標,我把下載好的 jmx_prometheus_javaagent-1.0.1.jar 扔進了 libs 目錄。三臺機器都要做同樣的操作。

# 假設 jar 包在當前目錄
mv jmx_prometheus_javaagent-1.0.1.jar /opt/kafka/kafka_2.13-3.8.1/libs/

三、配置 MirrorMaker2

軟件部署好了,但它還不知道要干嘛。接下來就得靠配置文件來告訴它。我主要修改了 config/connect-mirror-maker.properties 這個文件。

這是我的配置,里面加了一些注釋來解釋每個參數是干嘛的。

# 定義兩個集群的別名,后面都用這個別名來引用
clusters = clusterA, clusterB# --- 源集群 (clusterA) 和目標集群 (clusterB) 的基本信息 ---
# 這是 MirrorMaker2 內部消費者組的 ID,隨便起個有意義的名字就行
clusterA.group.id = clusterA-clusterB-clusterAconsumer
clusterB.group.id = clusterA-clusterB-clusterBconsumer# MirrorMaker2 實例的名字
name = clusterA-clusterB
# 定義誰是源,誰是目標
source.cluster.alias = clusterA
target.cluster.alias = clusterB# 最大并發任務數,可以根據 Topic 分區數和機器性能來調整
tasks.max = 8
# ? tasks.max 詳解:
# 這個參數定義了該連接器可以創建的最大并發任務數,但實際任務數可能更少
# 影響因素:
# 1. 源Topic分區總數:實際任務數不會超過所有匹配topic的分區總和(這是最主要的限制因素)
# 注意:這里指的是所有匹配 prod.*.global 正則表達式的topic的分區數總和,不是單個topic的最大分區數
# 2. 集群節點數:任務會分配到不同節點執行
# 3. 系統資源:CPU核心數、內存大小、網絡帶寬
# 4. 連接器類型:不同連接器有不同的并行化能力
# 設置原則:建議先根據源topic分區數設置,然后根據實際性能表現調整
# 我這里設置8是因為:預估匹配的所有topic總共有30-50個分區,8個任務可以合理分配負載# --- 集群連接信息 ---
# 兩個集群的 broker 地址
clusterA.bootstrap.servers = brokerA-1:9092, brokerA-2:9092, brokerA-3:9092
clusterB.bootstrap.servers = brokerB-1:9092, brokerB-2:9092, brokerB-3:9092# --- 核心:定義同步規則 ---
# 啟用從 clusterA 到 clusterB 的同步
clusterA->clusterB.enabled = true# 正則表達式,匹配需要同步的 topic,我這里同步所有以 .global 結尾的 prod 前綴的 topic
clusterA->clusterB.topics = prod.*.global$
# 正則表達式,排除掉一些不需要的 topic,比如 MirrorMaker 內部的心跳 topic(實際測試中,發現如果不加這段配置,heartbears topic會循環鏡像)
clusterA->clusterB.topics.exclude = ^(clusterA|clusterB).*heartbeats$# 正則表達式,匹配需要同步的消費者組,這樣消費位點也能同步過去
clusterA->clusterB.groups = ^produser_consumer.*# 我這里只做單向同步,所以把反向的關掉
clusterB->clusterA.enabled = false# --- 內部 Topic 和可靠性配置 ---
# 在目標集群創建的 topic 的副本數,生產環境必須大于1,我設為3
clusterA->clusterB.replication.factor = 3
checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
offset-syncs.topic.replication.factor = 3# Connect 框架內部也需要一些 topic 來存配置、位點、狀態,副本數同樣設為3
offset.storage.replication.factor = 3
status.storage.replication.factor = 3
config.storage.replication.factor = 3
config.storage.topic = mm2-config.from-clusterA-to-clusterB.internal
offset.storage.topic = mm2-offset.from-clusterA-to-clusterB.internal
status.storage.topic = mm2-status.from-clusterA-to-clusterB.internal# --- 其他高級同步選項 ---
# 啟用 ACL 同步、位點同步等
clusterA->clusterB.sync.topic.acls.enabled = true
clusterA->clusterB.emit.checkpoints.enabled = true
clusterA->clusterB.emit.offset-syncs.enabled = true
clusterA->clusterB.sync.group.offsets.enabled = true
# 定期刷新 topic 和 group 列表的時間間隔
clusterA->clusterB.refresh.topics.interval.seconds = 300
clusterA->clusterB.refresh.groups.interval.seconds = 300
clusterA->clusterB.sync.topic.acls.interval.seconds = 300# --- 安全配置 ---
# 如果你的 Kafka 集群啟用了 SASL/PLAIN 認證,就需要加上這些
clusterA.sasl.mechanism=PLAIN
clusterA.security.protocol=SASL_PLAINTEXT
clusterA.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="your_user" password="your_password";clusterB.sasl.mechanism=PLAIN
clusterB.security.protocol=SASL_PLAINTEXT
clusterB.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="your_user" password="your_password";

除了這個,JMX Exporter 也需要一個簡單的配置文件 config/jmx_exporter.yml,我這里就用了個最基礎的配置,讓它把所有 MBean 都暴露出來。

---
# 這個文件可以留空,或者只寫一個空的 yaml 對象 {}
# 也可以使用官方提供的 JMX MBeans

四、修改啟動腳本

萬事俱備,只欠東風。這個“東風”就是啟動腳本。我沒有直接用官方的 connect-mirror-maker.sh,而是復制了一份,然后做了點修改,讓它加載 JMX Exporter。

這是我的 bin/connect-mirror-maker.sh 腳本:

#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# ... (省略官方的版權信息) ...if [ $# -lt 1 ];
thenecho "USAGE: $0 [-daemon] mm2.properties"exit 1
fibase_dir=$(dirname $0)# 設置日志目錄
if [ "x$LOG_DIR" = "x" ]; thenexport LOG_DIR="/var/log/kafka"
fi# 設置 Log4j 配置文件
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; thenexport KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties"
fi# 設置 JVM 堆大小,這個根據服務器內存來定
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx14336m -Xms14336m"
fi# !!! 最核心的修改在這里 !!!
# 通過 javaagent 參數加載 JMX Exporter,并指定端口 7070 和配置文件
export KAFKA_OPTS="$KAFKA_OPTS -javaagent:$base_dir/../libs/jmx_prometheus_javaagent-1.0.1.jar=7070:$base_dir/../config/jmx_exporter.yml"EXTRA_ARGS=${EXTRA_ARGS-'-name mirrorMaker'}COMMAND=$1
case $COMMAND in-daemon)EXTRA_ARGS="-daemon "$EXTRA_ARGSshift;;*);;
esac# 執行官方的啟動類
exec $(dirname $0)/kafka-run-class.sh $EXTRA_ARGS org.apache.kafka.connect.mirror.MirrorMaker "$@"

最重要的就是 export KAFKA_OPTS 那一行,它在啟動 MirrorMaker2 的 JVM 進程時,動態地掛載了 JMX Exporter Agent,這樣 Prometheus 就能通過 7070 端口來抓取監控數據了。

五、集群啟動與驗證

所有準備工作都完成了,我在三臺機器上都執行了同樣的啟動命令:

# 確保腳本有執行權限
chmod +x /opt/kafka/kafka_2.13-3.8.1/bin/connect-mirror-maker.sh# 后臺啟動 MirrorMaker2
/opt/kafka/kafka_2.13-3.8.1/bin/connect-mirror-maker.sh /opt/kafka/kafka_2.13-3.8.1/config/connect-mirror-maker.properties

啟動后,我做了幾件事來確認它是不是真的在工作:

  1. 看日志tail -f /var/log/kafka/connect.log,看看有沒有報錯。

  2. 查 Topic:去目標集群 clusterB 上看看,是不是出現了 clusterA.prod.some-topic.global 這樣的 Topic。

  3. 查 Metrics:在任意一臺 MirrorMaker2 的機器上,用 curl 訪問 JMX Exporter 的端口。

    curl http://localhost:7070
    

    如果能看到一大堆以 kafka_ 開頭的監控指標,那就說明 JMX Exporter 也正常工作了。接下來就可以在 Prometheus 里配置一個 Job,讓它來抓取這三臺機器的 7070 端口,實現監控和告警。

六、這集群“結實”嗎?聊聊它的高可用

搭完三節點的集群后,我心里冒出一個問題:這玩意兒到底有多可靠?萬一哪天運維手滑,搞掛了一兩臺機器,我的數據同步任務會歇菜嗎?帶著這個疑問,我深入研究了一下它的工作模式。

它沒有“大腦”,但活得很好

我最開始有個誤解,以為 MirrorMaker2 集群內部也得像 ZooKeeper 那樣,搞個投票選舉,選個“老大”出來發號施令。結果發現,它根本沒有這種“大腦”或者“法定人數(Quorum)”的設定

它的聰明之處在于,把自己完全“托管”給了 Kafka。它就是個“甩手掌柜”,把所有臟活累活都丟給了 Kafka Connect 框架和 Kafka 集群本身:

  1. “我們是一個團隊”:我的三個 MirrorMaker2 節點在啟動后,會向 Kafka 報到,說:“我們都是 clusterA-clusterB 這個組的成員”。于是 Kafka 的“小組長”(Group Coordinator)就把它們當成一個消費者團隊來管理。
  2. “工作日志”都記在 Kafka:所有的工作狀態,比如“我同步到哪里了?”(Offsets)、“團隊的規章制度是什么?”(Configs),全都記錄在 Kafka 的內部 Topic 里。只要 Kafka 集群本身是高可用的(這也是為什么我們把內部 Topic 的副本數設為3),這些工作日志就不會丟。

所以,MirrorMaker2 集群的可靠性,本質上依賴于你背后 Kafka 集群的可靠性。它自己是個“無狀態”的執行者,非常靈活。

極限測試:干掉兩個節點會怎樣?

理論歸理論,實踐出真知。如果我的三節點集群真的掛了兩個,會發生什么?

答案是:只要還有一個節點在喘氣,活兒就不會停!

整個過程就像這樣:

  1. Kafka 小組長發現有人曠工:Kafka 的 Group Coordinator 很快會發現:“咦,團隊里有兩個人沒心跳了!”
  2. 緊急召開重組會議 (Rebalance):小組長立刻組織剩下的人(也就是那個幸存的節點)開會,說:“情況有變,工作得重新分配。”
  3. 幸存者扛起所有:所有之前分配給那兩個失敗節點的工作任務,現在都會被打包,一股腦兒全部分配給這最后一個幸存的節點。
  4. 翻開工作日志,繼續干:這個“幸運兒”會先去內部 Topic 里翻看最新的工作日志,找到之前大家同步到的精確位置,然后接著干活,保證數據不重不漏。

一句話總結:

這種設計確保了服務的高可用性,哪怕是“傷亡慘重”到只剩一個節點,也能保證業務連續。當然,代價就是剩下的那個兄弟會壓力山大,數據同步可能會因為性能跟不上而產生延遲(Lag)。所以,生產環境中還是得盡快把掛掉的節點救活,讓團隊恢復完整的戰斗力。

寫在最后

整個過程下來,感覺 MirrorMaker2 還是挺強大的,配置項雖然多,但都很靈活。最大的收獲是把監控也一并搞定了,這樣服務上線后心里才有底。從下載軟件包到最后看到 Grafana 監控圖上跳動的曲線,整個過程充滿了解決問題的樂趣。希望這份記錄能幫到有緣人。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/89769.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/89769.shtml
英文地址,請注明出處:http://en.pswp.cn/web/89769.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

借助AI學習開源代碼git0.7之四update-cache

借助AI學習開源代碼git0.7之四update-cache update-cache.c 主要負責對索引(index),也即緩存(cache),進行增、刪、改操作。現在的高層命令 git add 的部分核心功能就是由這個代碼實現的。 核心功能 該程序的…

【48】MFC入門到精通——MFC 文件讀寫總結 CFile、CStdioFile、CFileDialog

文章目錄1 打開文件1.2 打開文件模式總結2 常用函數2.1 寫文件2.2 讀文件2.3 獲取文件長度3. 文件打開讀寫實力3.1 寫文件 覆蓋寫3.2 文尾追加寫3.3 換行寫4 文件對話框 CFileDialog4.2 文件對話框實例5 CStdioFile 類 讀寫CStingMFC提供了一個文件操作的基類CFile,…

Leetcode 124. 二叉樹中的最大路徑和

遞歸/*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode() : val(0), left(nullptr), right(nullptr) {}* TreeNode(int x) : val(x), left(nullptr), right(nullptr) {}* TreeNode…

MTSC2025參會感悟:手工測試用例的智能化生成

目錄 一、測試用例生成的時代困境與 AI 機遇 1.1 傳統手工測試用例的固有痛點 1.2 AI 時代的測試新挑戰 1.3 智能化轉型的機遇窗口 二、智能用例生成的核心特性與產品功能 2.1 核心特性解析 2.2 四大核心產品功能 功能一:基于 PRD 理解的一鍵生成用例 功能二…

后臺管理系統登錄模塊(雙token的實現思路)

最近在寫后臺管理,這里分享一下我的登錄模塊的實現,我是使用reacttypescript實現的,主要是登錄的邏輯和雙token的處理方式,請求接口的二次封裝aixos1.首先我們需要渲染登錄界面的窗口,這個很簡單就不詳細講解了&#x…

第十四講 | AVL樹實現

AVL樹實現一、AVL的概念二、AVL樹的實現1、AVL樹的結構2、AVL樹的插入(1)、AVL樹插入一個值的大概過程(2)、平衡因子更新更新原則更新停止條件插入結點及更新平衡因子的代碼實現3、旋轉(1)、旋轉的原則&…

《P3398 倉鼠找 sugar》

題目描述小倉鼠的和他的基(mei)友(zi)sugar 住在地下洞穴中,每個節點的編號為 1~n。地下洞穴是一個樹形結構。這一天小倉鼠打算從從他的臥室(a)到餐廳(b),而…

錘子助手插件功能六:啟用攔截消息撤回

錘子助手插件功能六:啟用攔截消息撤回錘子助手插件功能六:啟用攔截消息撤回🛡? 插件簡介 攔截撤回消息,信息不再消失🔧 功能說明?? 使用風險與注意事項🎯 適合人群?? 結語錘子助手插件功能六&#xf…

深度解析:基于EasyX的C++黑白棋AI實現 | 算法核心+圖形化實戰

摘要 本文詳解C黑白棋AI實現,使用EasyX圖形庫打造完整人機對戰系統。涵蓋: 遞歸搜索算法(動態規劃優化) 棋盤狀態評估函數設計 圖形界面與音效集成 勝負判定與用戶交互 附完整可運行代碼資源文件,提供AI難度調節方案…

樹同構(Tree Isomorphism)

樹同構(Tree Isomorphism)?? 是圖論中的一個經典問題,主要研究兩棵樹在結構上是否“相同”或“等價”,即是否存在一種節點的一一對應關系,使得兩棵樹的結構完全一致(不考慮節點的具體標簽或位置&#xff…

分享如何在保證畫質的前提下縮小視頻體積實用方案

大文件在通過互聯網分享或上傳時會遇到很多限制,比如電子郵件附件大小限制、社交媒體平臺的文件大小要求等。壓縮后的視頻文件更小,更容易上傳到網絡、發送給他人或共享在社交平臺上。它是一款無需安裝的視頻壓縮工具,解壓后直接運行&#xf…

SpringBoot 統一功能處理(攔截器、@ControllerAdvice、Spring AOP)

文章目錄攔截器快速入門攔截器詳解攔截路徑攔截器執行流程全局控制器增強機制(ControllerAdvice)統一數據返回格式(ControllerAdvice ResponseBodyAdvice)??全局異常處理機制??(ControllerAdvice ExceptionHandler)全局數據…

建筑墻壁損傷缺陷分割數據集labelme格式7820張20類別

數據集格式:labelme格式(不包含mask文件,僅僅包含jpg圖片和對應的json文件)圖片數量(jpg文件個數):7820標注數量(json文件個數):7820標注類別數:20標注類別名稱:["Graffiti","Bearing","Wets…

圖書管理軟件iOS(iPhone)

圖書管理軟件iOS(iPhone)開發進度表2025/07/19圖書管理軟件開發開始一:圖書管理軟件開發iOS(iPhone)

MySQL配置性能優化

技術文章大綱:MySQL配置性能優化賽 引言 介紹MySQL性能優化的重要性,特別是在高并發、大數據場景下的挑戰。概述MySQL配置優化的核心方向(如內存、查詢、索引等)。引出比賽目標:通過配置調整提升MySQL性能指標&#xf…

uniapp微信小程序 實現swiper與按鈕實現上下聯動

1. 需求:頁面頂部展示n個小圖標。當選中某個圖標時,下方視圖會相應切換;反之,當滑動下方視圖時,頂部選中的圖標也會同步更新。 2. 思路: 上方scroll-view 區域渲染圖標,并且可左右滑動&#xff…

44.sentinel授權規則

授權規則是對請求者的身份做一個判斷,有沒有權限來訪問。 需求:一般網關負責請求的轉發到微服務,可以做身份判斷。但是如果具體某個微服務的訪問地址直接透露給了外部,不是經過網關訪問過來的。那這種就沒有經過網關也就無法進行身份判斷了。這時候就需要sentinel的授權規…

[硬件電路-55]:絕緣柵雙極型晶體管(IGBT)的原理與應用

一、IGBT的原理:MOSFET與BJT的復合創新IGBT(Insulated Gate Bipolar Transistor)是一種復合全控型電壓驅動式功率半導體器件,其核心設計融合了MOSFET(金屬氧化物半導體場效應晶體管)的高輸入阻抗&#xff0…

取消office word中的段落箭頭標記

對于一個習慣用WPS的人來說,office word中的段落箭頭讓人非常難受,所以想要取消該功能點擊文件-更多-選項然后在顯示界面,找到段落標記,取消勾選即可最終效果

Win11 上使用 Qume 搭建銀河麒麟V10 arm版虛擬機

安裝全程需要下載3個文件,可在提前根據文章1.1、2.1、2.2網址下載。 1 QEMU軟件簡介與安裝流程 QEMU(Quick Emulator)是一個開源軟件,可以模擬不同的計算機硬件行為(如模擬arm架構),并可以創建…