vivo Pulsar 萬億級消息處理實踐(3)-KoP指標異常修復

作者:vivo 互聯網大數據團隊- Chen Jianbo

本文是《vivo Pulsar萬億級消息處理實踐》系列文章第3篇。

Pulsar是Apache基金會的開源分布式流處理平臺和消息中間件,它實現了Kafka的協議,可以讓使用Kafka API的應用直接遷移至Pulsar,這使得Pulsar在Kafka生態系統中更加容易被接受和使用。KoP提供了從Kafka到Pulsar的無縫轉換,用戶可以使用Kafka API操作Pulsar集群,保留了Kafka的廣泛用戶基礎和豐富生態系統。它使得Pulsar可以更好地與Kafka進行整合,提供更好的消息傳輸性能、更強的兼容性及可擴展性。vivo在使用Pulsar KoP的過程中遇到過一些問題,本篇主要分享一個分區消費指標缺失的問題。

系列文章:

  1. vivo Pulsar萬億級消息處理實踐(1)-數據發送原理解析和性能調優

  2. vivo Pulsar萬億級消息處理實踐(2)-從0到1建設Pulsar指標監控鏈路

文章太長?1分鐘看圖抓住核心觀點👇

圖片

一、問題背景

在一次版本灰度升級中,我們發現某個使用KoP的業務topic的消費速率出現了顯著下降,具體情況如下圖所示:

什么原因導致正常的升級重啟服務器會出現這個問題呢?直接查看上報采集的數據報文:

kop_server_MESSAGE_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 3
kop_server_BYTES_OUT{group="",partition="0",tenant="kop",topic="persistent://kop-tenant/kop-ns/service-raw-stats"} 188

我們看到,KoP消費指標kop_server_MESSAGE

_OUT、kop_server_BYTES_OUT是有上報的,但指標數據里的group標簽變成了空串(缺少消費組名稱),分區的消費指標就無法展示了。是什么原因導致了消費組名稱缺失?

二、問題分析

1、找到問題代碼

我們去找下這個消費組名稱是在哪里獲取的,是否邏輯存在什么問題。根據druid中的kop_subscription對應的消費指標kop_server_

MESSAGE_OUT、kop_server_BYTES_OUT,找到相關代碼如下:

private void handleEntries(final List<Entry> entries,final TopicPartition topicPartition,final FetchRequest.PartitionData partitionData,final KafkaTopicConsumerManager tcm,final ManagedCursor cursor,final AtomicLong cursorOffset,final boolean readCommitted) {
....// 處理消費數據時,獲取消費組名稱CompletableFuture<String> groupNameFuture = requestHandler.getCurrentConnectedGroup().computeIfAbsent(clientHost, clientHost -> {CompletableFuture<String> future = new CompletableFuture<>();String groupIdPath = GroupIdUtils.groupIdPathFormat(clientHost, header.clientId());requestHandler.getMetadataStore().get(requestHandler.getGroupIdStoredPath() + groupIdPath).thenAccept(getResultOpt -> {if (getResultOpt.isPresent()) {GetResult getResult = getResultOpt.get();future.complete(new String(getResult.getValue() == null? new byte[0] : getResult.getValue(), StandardCharsets.UTF_8));} else {// 從zk節點 /client_group_id/xxx 獲取不到消費組,消費組就是空的future.complete("");}}).exceptionally(ex -> {future.completeExceptionally(ex);return null;});returnfuture;});// this part is heavyweight, and we should not execute in the ManagedLedger Ordered executor threadgroupNameFuture.whenCompleteAsync((groupName, ex) -> {if (ex != null) {log.error("Get groupId failed.", ex);groupName = "";}
.....// 獲得消費組名稱后,記錄消費組對應的消費指標decodeResult.updateConsumerStats(topicPartition,entries.size(),groupName,statsLogger);

代碼的邏輯是,從requestHandler的currentConnectedGroup(map)中通過host獲取groupName,不存在則通過MetadataStore(帶緩存的zk存儲對象)獲取,如果zk緩存也沒有,再發起zk讀請求(路徑為/client_group_id/host-clientId)。讀取到消費組名稱后,用它來更新消費組指標。從復現的集群確定走的是這個分支,即是從metadataStore(帶緩存的zk客戶端)獲取不到對應zk節點/client_group_id/xxx。

2、查找可能導致zk節點/client_group_id/xxx節點獲取不到的原因

有兩種可能性:一是沒寫進去,二是寫進去但是被刪除了。

    @Overrideprotected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator,CompletableFuture<AbstractResponse> resultFuture) {
...// Store group name to metadata store for current client, use to collect consumer metrics.storeGroupId(groupId, groupIdPath).whenComplete((stat, ex) -> {if (ex != null) {// /client_group_id/xxx節點寫入失敗log.warn("Store groupId failed, the groupId might already stored.", ex);}findBroker(TopicName.get(pulsarTopicName)).whenComplete((node, throwable) -> {....});});
...

從代碼看到,clientId與groupId的關聯關系是通過handleFindCoordinatorRequest(FindCoordinator)寫進去的,而且只有這個方法入口。由于沒有找到warn日志,排除了第一種沒寫進去的可能性。看看刪除的邏輯:

protected void close(){if (isActive.getAndSet(false)) {...currentConnectedClientId.forEach(clientId -> {String path = groupIdStoredPath + GroupIdUtils.groupIdPathFormat(clientHost, clientId);// 刪除zk上的 /client_group_id/xxx 節點metadataStore.delete(path, Optional.empty()).whenComplete((__, ex) -> {if (ex != null) {if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {if (log.isDebugEnabled()) {log.debug("The groupId store path doesn't exist. Path: [{}]", path);}return;}log.error("Delete groupId failed. Path: [{}]", path, ex);return;}if (log.isDebugEnabled()) {log.debug("Delete groupId success. Path: [{}]", path);}});});}
}

刪除是在requsetHandler.close方法中執行,也就是說連接斷開就會觸發zk節點刪除。

但有幾個疑問:

  • /client_group_id/xxx 到底是干嘛用的?消費指標為什么要依賴它

  • 為什么要在handleFindCoordinatorRequest寫入?

  • 節點/client_group_id/xxx為什么要刪除,而且是在連接斷開時刪除,刪除時機是否有問題?

首先回答第1個問題,通過閱讀代碼可以知道,/client_group_id/xxx 這個zk節點是用于在不同broker實例間交換數據用的(相當redis cache),用于臨時存放IP+clientId與groupId的映射關系。由于fetch接口(拉取數據)的request沒有groupId的,只能依賴加入Group過程中的元數據,在fetch消費時才能知道當前拉數據的consumer是哪個消費組的。

3、復現

若要解決問題,最好能夠穩定地復現出問題,這樣才能確定問題的根本原因,并且確認修復是否完成。

因為節點是在requsetHandle.close方法中執行刪除,broker節點關閉會觸發連接關閉,進而觸發刪除。假設:客戶端通過brokerA發起FindCoordinator請求,寫入zk節點/client_group

_id/xxx,同時請求返回brokerB作為Coordinator,后續與brokerB進行joinGroup、syncGroup等交互確定消費關系,客戶端在brokerA、brokerB、brokerC都有分區消費。這時重啟brokerA,分區均衡到BrokerC上,但此時/client_group_id/xxx因關閉broker而斷開連接被刪除,consumer消費剛轉移到topic1-partition-1的分區就無法獲取到groupId。

按照假設,有3個broker,開啟生產和消費,通過在FindCoordinator返回前獲取node.leader()的返回節點BrokerB,關閉brokerA后,brokerC出現斷點復現,再關閉brokerC,brokerA也會復現(假設分區在brokerA與brokerC之間轉移)。

圖片

復現要幾個條件:

  1. broker數量要足夠多(不小于3個)

  2. broker內部有zk緩存metadataCache默認為5分鐘,可以把時間調小為1毫秒,相當于沒有cache

  3. findCoordinator返回的必須是其他broker的IP

  4. 重啟的必須是接收到findCoordinator請求那臺broker,而不是真正的coordinator,這時會從zk刪除節點

  5. 分區轉移到其他broker,這時新的broker會重新讀取zk節點數據

到此,我們基本上清楚了問題原因:連接關閉導致zk節點被刪除了,別的broker節點需要時就讀取不到了。那怎么解決?

三、問題解決

方案一

既然知道把消費者與FindCoordinator的連接進行綁定不合適的,那么是否應該把FindCoordinator寫入zk節點換成由JoinGroup寫入,斷連即刪除。

圖片

consumer統一由Coordinator管理,由于FindCoordinator接口不一定是Coordinator處理的,如果換成由Coordinator處理的JoinGroup接口是否就可以了,這樣consumer斷開與Coordinator的連接就應該刪除數據。但實現驗證時卻發現,客戶端在斷連后也不會再重連,所以沒法重新寫入zk,不符合預期。

方案二

還是由FindCoordinator寫入zk節點,但刪除改為GroupCoordinator監聽consumer斷開觸發。

因為consumer統一由Coordinator管理,它能監聽到consumer加入或者離開。GroupCoordinator的removeMemberAndUpdateGroup方法是coordinator對consumer成員管理。

private void removeMemberAndUpdateGroup(GroupMetadata group,MemberMetadata member) {group.remove(member.memberId());switch (group.currentState()) {case Dead:case Empty:return;case Stable:case CompletingRebalance:maybePrepareRebalance(group);break;case PreparingRebalance:joinPurgatory.checkAndComplete(new GroupKey(group.groupId()));break;default:break;}// 刪除 /client_group_id/xxx 節點deleteClientIdGroupMapping(group, member.clientHost(), member.clientId());
}

調用入口有兩個,其中handleLeaveGroup是主動離開,onExpireHeartbeat是超時被動離開,客戶端正常退出或者宕機都可以調用removeMemberAndUpdateGroup方法觸發刪除。

public CompletableFuture<Errors> handleLeaveGroup(String groupId,String memberId
) {return validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).map(error ->CompletableFuture.completedFuture(error)).orElseGet(() -> {return groupManager.getGroup(groupId).map(group -> {return group.inLock(() -> {if (group.is(Dead) || !group.has(memberId)) {return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);} else {...// 觸發刪除消費者consumerremoveMemberAndUpdateGroup(group, member);return CompletableFuture.completedFuture(Errors.NONE);}});})....});
}void onExpireHeartbeat(GroupMetadata group,MemberMetadata member,long heartbeatDeadline) {group.inLock(() -> {if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {log.info("Member {} in group {} has failed, removing it from the group",member.memberId(), group.groupId());// 觸發刪除消費者consumerremoveMemberAndUpdateGroup(group, member);}return null;});
}

但這個方案有個問題是,日志運維關閉broker也會觸發一個onExpireHeartbeat事件刪除zk節點,與此同時客戶端發現Coordinator斷開了會馬上觸發FindCoordinator寫入新的zk節點,但如果刪除晚于寫入的話,會導致誤刪除新寫入的節點。我們干脆在關閉broker時,使用ShutdownHook加上shuttingdown狀態防止關閉broker時刪除zk節點,只有客戶端斷開時才刪除。

這個方案修改上線半個月后,還是出現了一個客戶端的消費指標無法上報的情況。后來定位發現,如果客戶端因FullGC出現卡頓情況,客戶端可能會先于broker觸發超時,也就是先超時的客戶端新寫入的數據被后監聽到超時的broker誤刪除了。因為寫入與刪除并不是由同一個節點處理,所以無法在進程級別做并發控制,而且也無法判斷哪次刪除對應哪次的寫入,所以用zk也是很難實現并發控制。

方案三

其實這并不是新的方案,只是在方案二基礎上優化:數據一致性檢查。

既然我們很難控制好寫入與刪除的先后順序,我們可以做數據一致性檢查,類似于交易系統里的對賬。因為GroupCoordinator是負責管理consumer成員的,維護著consumer的實時狀態,就算zk節點被誤刪除,我們也可以從consumer成員信息中恢復,重新寫入zk節點。

private void checkZkGroupMapping(){  for (GroupMetadata group : groupManager.currentGroups()) {  for (MemberMetadata memberMetadata : group.allMemberMetadata()) {  String clientPath = GroupIdUtils.groupIdPathFormat(memberMetadata.clientHost(), memberMetadata.clientId());  String zkGroupClientPath = kafkaConfig.getGroupIdZooKeeperPath() + clientPath;  // 查找zk中是否存在節點metadataStore.get(zkGroupClientPath).thenAccept(resultOpt -> {  if (!resultOpt.isPresent()) {  // 不存在則進行補償修復metadataStore.put(zkGroupClientPath, memberMetadata.groupId().getBytes(UTF\_8), Optional.empty())  .thenAccept(stat -> {  log.info("repaired clientId and group mapping: {}({})",  zkGroupClientPath, memberMetadata.groupId());  })  .exceptionally(ex -> {  log.warn("repaired clientId and group mapping failed: {}({})",  zkGroupClientPath, memberMetadata.groupId());  return null;  });  }  }).exceptionally(ex -> {  log.warn("repaired clientId and group mapping failed: {} ", zkGroupClientPath, ex);  return null;  });  }  }  
}

經過方案三的優化上線,即使是歷史存在問題的消費組,個別分區消費流量指標缺少group字段的問題也得到了修復。具體效果如下圖所示:

圖片

四、總結

經過多個版本的優化和線上驗證,最終通過方案三比較完美的解決了這個消費指標問題。在分布式系統中,并發問題往往難以模擬和復現,我們也在嘗試多個版本后才找到有效的解決方案。如果您在這方面有更好的經驗或想法,歡迎提出,我們共同探討和交流。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/913969.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/913969.shtml
英文地址,請注明出處:http://en.pswp.cn/news/913969.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Marin說PCB之Allegro高亮BOM器件技巧詳解

一&#xff0c;首先在原理圖輸出BOM的時候&#xff0c;只需要勾選器件的位號這個選項即可&#xff0c;具體操作如下所示&#xff1a;二&#xff0c;輸出BOM完成后&#xff0c;打開表格選擇我們器件的位號那列即可&#xff0c;然后復制到我們的TEXT文本中。三&#xff0c;接著就…

數據結構與算法——從遞歸入手一維動態規劃【2】

前言&#xff1a; 記錄一下對左程云系列算法課程--算法講解066【必備】的剩余習題的學習。本文主要簡單記錄個人學習心得和提供C版本代碼。如需要題目的細致講解&#xff0c;請前往原視頻。 涉及內容&#xff1a; 動態規劃、三指針、 參考視頻&#xff1a; 左程云--算法講…

【理念●體系】Windows AI 開發環境搭建實錄:六層架構的逐步實現與路徑治理指南

【理念●體系】從零打造 Windows WSL Docker Anaconda PyCharm 的 AI 全鏈路開發體系-CSDN博客 Windows AI 開發環境搭建實錄&#xff1a;六層架構的逐步實現與路徑治理指南 ——理念落地篇&#xff0c;從路徑規劃到系統治理&#xff0c;打造結構化可復現的 AI 開發環境 AI…

5G標準學習筆記15 --CSI-RS測量

5G標準學習筆記15 --CSI-RS測量 前言 前面講了&#xff0c;在5GNR中&#xff0c;CSI-RS 是支持信道狀態評估、波束管理和無線資源管理&#xff08;RRM&#xff09;的關鍵參考信號。下面孬孬基于3GPP TS 38.331中的內容&#xff0c;詳細定義了基于 CSI-RS 的測量程序&#xff0c…

第P28:阿爾茨海默病診斷(優化特征選擇版)

&#x1f368; 本文為&#x1f517;365天深度學習訓練營 中的學習記錄博客&#x1f356; 原作者&#xff1a;K同學啊 一、進階說明 針對于特征對模型結果的影響我們做了特征分析 特征選擇 1. SelectFromModel 工作原理&#xff1a;基于模型的特征選擇方法&#xff0c;使用…

AI的歐幾里得要素時刻:從語言模型到可計算思維

引言 人工智能正在經歷一個關鍵的轉折點。就像歐幾里得的《幾何原本》為數學奠定了公理化基礎一樣&#xff0c;AI也正在尋找自己的"要素時刻"——一個能夠將當前的語言模型能力轉化為真正可計算、可驗證思考的轉變。 最近發表的論文《AI’s Euclid’s Elements Momen…

番外-linux系統運行.net framework 4.0的項目

基礎環境&#xff1a;linux系統&#xff0c;.net framework 4.0&#xff0c;npgsql 2.2.5.0 &#xff08;版本不同&#xff0c;構建可能失敗&#xff09; 方法背景&#xff1a;linux不支持運行.net framework 4.0&#xff0c;高版本mono不支持npgsql 2.x 主要使用&#xff1a…

國內AI訓練都有哪些企業?:技術深耕與場景實踐

國內AI訓練都有哪些企業&#xff1f;當人工智能從實驗室走向產業一線&#xff0c;AI 訓練就像為智能系統 “施肥澆水” 的關鍵環節&#xff0c;讓技術根系在各行業土壤里扎得更深。國內一批 AI 訓練企業正各展所長&#xff0c;有的專攻技術優化&#xff0c;有的深耕場景應用。它…

微算法科技基于格密碼的量子加密技術,融入LSQb算法的信息隱藏與傳輸過程中,實現抗量子攻擊策略強化

隨著量子計算技術的發展&#xff0c;傳統加密算法面臨被量子計算機破解的風險&#xff0c;LSQb 算法也需考慮應對未來可能的量子攻擊。微算法科技基于格密碼的量子加密技術&#xff0c;融入LSQb算法的信息隱藏與傳輸過程中&#xff0c;實現抗量子攻擊策略強化。格密碼在面對量子…

xAI發布Grok4+代碼神器Grok4 Code,教你如何在國內升級訂閱SuperGrok并使用到Grok4教程

就在今天&#xff0c;馬斯克旗下xAI發布了其最新的旗艦AI模型Grok4&#xff0c;并同步推出專為開發者打造的編程利器 Grok 4 Code&#xff0c;還推出了一項全新的AI訂閱計劃——每月300美元的SuperGrokHeavy。 那最新發布的Grok4以及有哪些特性呢&#xff1f;以及如何才能使用…

Rust 變量遮蔽(Variable Shadowing)

在 Rust 中&#xff0c;變量遮蔽&#xff08;Variable Shadowing&#xff09; 是一種在同一作用域內重新聲明同名變量的特性。它允許你創建一個新變量覆蓋之前的同名變量&#xff0c;新變量與舊變量類型可以不同&#xff0c;且舊變量會被完全隱藏。核心特點允許同名變量重復聲明…

【VScode | 快捷鍵】全局搜索快捷鍵(ctrl+shift+f)失效原因及解決方法

&#x1f601;博客主頁&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客內容&#x1f911;&#xff1a;&#x1f36d;嵌入式開發、Linux、C語言、C、數據結構、音視頻&#x1f36d; &#x1f60e;金句分享&#x1f60e;&a…

Windows 與 Linux 內核安全及 Metasploit/LinEnum 在滲透測試中的綜合應用

目錄 &#x1f6e0;? 1. 內核安全如何助力滲透測試與黑客行業 1.1 內核安全的戰略價值 1.2 結合 Metasploit 與 LinEnum 的作用 &#x1f50d; 2. Metasploit 信息收集模塊及其在內核安全中的應用 2.1 Windows 信息收集模塊 2.2 Linux 信息收集模塊 2.3 使用步驟 Wind…

京東攜手HarmonyOS SDK首發家電AR高精擺放功能

在電商行業的演進中&#xff0c;商品的呈現方式不斷升級&#xff1a;從文字、圖片到視頻&#xff0c;再到如今逐漸興起的3D與AR技術。作為XR應用探索的先行者&#xff0c;京東正站在這場體驗革新的最前沿&#xff0c;不斷突破商品展示的邊界&#xff0c;致力于通過創新技術讓消…

瞄準Win10難民,蘋果正推出塑料外殼、手機CPU的MacBook

最近有消息稱&#xff0c;蘋果正在研發一款定位“低價”的MacBook&#xff0c;售價可能低于800美元&#xff08;約合人民幣5800元&#xff09;&#xff0c;采用的是A18 Pro芯片&#xff0c;也就是未來iPhone 16 Pro同款的“手機芯片”&#xff0c;而不是現有的M系列。這款產品預…

原子級 macOS 信息竊取程序升級:新增后門實現持久化控制

臭名昭著的 Atomic macOS Stealer&#xff08;AMOS&#xff0c;原子級 macOS 竊取程序&#xff09;惡意軟件近期完成危險升級&#xff0c;全球 Mac 用戶面臨更嚴峻威脅。這款與俄羅斯有關聯的竊密程序首次植入后門模塊&#xff0c;使攻擊者能維持對受感染系統的持久訪問、執行遠…

Shader面試題100道之(81-100)

Shader面試題&#xff08;第81-100題&#xff09; 以下是第81到第100道Shader相關的面試題及答案&#xff1a; 81. Unity中如何實現屏幕空間的熱扭曲效果&#xff08;Heat Distortion&#xff09;&#xff1f; 熱扭曲效果可以通過GrabPass抓取當前屏幕圖像&#xff0c;然后在片…

C#洗牌算法

洗牌算法是一種將序列&#xff08;如數組、列表&#xff09;元素隨機打亂的經典算法&#xff0c;核心目標是讓每個元素在打亂后出現在任意位置的概率均等。在 C# 中&#xff0c;常用的洗牌算法有Fisher-Yates 洗牌算法&#xff08;也稱 Knuth 洗牌算法&#xff09;&#xff0c;…

Python PDFplumber詳解:從入門到精通的PDF處理指南

一、PDFplumber核心優勢解析 在數字化辦公場景中&#xff0c;PDF文檔處理是數據分析師和開發者的必備技能。相較于PyPDF2、pdfminer等傳統庫&#xff0c;PDFplumber憑借其三大核心優勢脫穎而出&#xff1a; 精準表格提取&#xff1a;采用流式布局分析算法&#xff0c;支持復雜表…

Flutter 與 Android 的互通幾種方式

Flutter 與 Android 的互通主要通過以下幾種方式實現&#xff0c;每種方式適用于不同的場景&#xff1a;1. 平臺通道&#xff08;Platform Channels&#xff09; Flutter 與原生 Android 代碼通信的核心方式&#xff0c;支持雙向調用。 類型&#xff1a; MethodChannel&#xf…