RocketMq詳解:六、RocketMq的負載均衡機制

上一章:《SpringBoot+Aop實現RocketMq的冪等》


文章目錄

  • 1.背景
    • 1.1 什么是負載均衡
    • 1.2 負載均衡的意義
  • 2.RocketMQ消息消費
    • 2.1 消息的流轉過程
    • 2.2 Consumer消費消息的流程
  • 3.RocketMq的負載均衡策略
    • 3.1 Broker負載均衡
    • 3.2 Producer發送消息負載均衡
    • 3.3 消費端的負載均衡
      • 3.3.1 Rebalance組件
      • 3.3.2 Rebalance觸發時機
      • 3.3.3 負載均衡流程
      • 3.3.4 Queue分配算法
      • 3.3.5 負載均衡對消費的影響
      • 3.3.6 RocketMQ 5.0 消息級別負載均衡
  • 4.RocketMQ指定機器消費設計思路

1.背景

在RocketMQ中,它的負載均衡主要可以分為

  • Consumer訂閱消息的負載均衡
  • Broker端的消息分發策略
  • Producer端的發送消息的負載均衡

其中在消費者端還有一個重量級的組件:Rebalance負載均衡組件,他負責相對均勻的給消費者分配需要拉取的隊列信息。

在了解負載均衡組件之前,我們先看看什么是負載均衡以及為什么要使用負載均衡

1.1 什么是負載均衡

負載均衡在分布式服務里會頻繁的出現,其主要是用來在多個資源 (一般是服務器)中分配負載,達到最優化資源使用,避免單臺服務器過載

RocketMQ中的負載均衡,指的是如何將消息隊列(Message Queue)均勻地分配給消費者組中的各個消費者實例

通過負載均衡機制,可以避免某些消費者實例處理過多消息隊列而過載,或者某些實例沒有消息可處理,從而提高系統的整體處理能力和資源利用率

1.2 負載均衡的意義

在這里插入圖片描述

上圖是 RocketMQ 的消息儲存模型:消息是按照隊列的方式分區有序儲存的

RocketMQ 的隊列模型使得生產者、消費者和讀寫隊列都是多對多的映射關系,彼此之間都可以無限水平擴展

對比傳統的消息隊列如 RabbitMQ 是很大的優勢。尤其是在流式處理場景下有天然優勢,能夠保證同一隊列的消息被相同的消費者處理,對于批量處理、聚合處理更友好
在這里插入圖片描述

消費者消費某個 topic 的消息等同于消費這個 topic 上所有隊列的消息(上圖中 Consumer A1 消費隊列 1,Consumer A2 消費隊列 2、3)。

所以,要保證每個消費者的負載盡量均衡,也就是要給這些消費者分配相同數量的隊列,并保證在異常情況下(如客戶端宕機)隊列可以在不同消費者之間遷移

在具體了解RocketMQ的負載均衡策略之前,我們先了解一些RocketMq的整個消費邏輯,以便我們后期可以更好的理解

2.RocketMQ消息消費

2.1 消息的流轉過程

RocketMQ 支持兩種消費模式:集群消費( Clustering )和廣播消費( Broadcasting )。

集群消費:同一 Topic 下的一條消息只會被同一消費組中的一個消費者消費。也就是說,消息被負載均衡到了同一個消費組的多個消費者實例上。
在這里插入圖片描述

廣播消費:當使用廣播消費模式時,每條消息推送給集群內所有的消費者,保證消息至少被每個消費者消費一次
在這里插入圖片描述

上面提到了兩個名詞:消費者組、消息隊列
這兩個名詞在:《RocketMQ 介紹及基本概念》這篇文章中已進行說明,下面在簡單概述一下

  • 消費者組

消費者組是 RocketMQ 中負載均衡的基本單位

一個主題(Topic)的消息隊列可以被多個消費者組訂閱,每個消費者組中的消費者實例會共享消息隊列。

如果一個主題有 8 個隊列,而一個消費者組有 4 個消費者實例,那么負載均衡機制會將這 8 個隊列均勻分配給這 4 個消費者實例。

  • 消息隊列

消息隊列是 RocketMQ 中消息存儲和消費的基本單位

一個主題可以有多個消息隊列,這些隊列中的消息會被消費者組中的消費者實例消費。通過將消息隊列均勻地分配給各個消費者實例,RocketMQ 實現了負載均衡。

2.2 Consumer消費消息的流程

consumer消息消費過程:
在這里插入圖片描述

因為廣播模式所有的Consumer都會收到全量消息,所以RocketMQ的負載均衡只針對于Consumer集群消費的模式

3.RocketMq的負載均衡策略

3.1 Broker負載均衡

Broker是以group(消費者組)為單位提供服務。

一個group里面分master和slave,master和slave存儲的數據一樣,slave從master同步數據(同步雙寫或異步復制看配置)。

通過nameserver暴露給客戶端后,只是客戶端關心(注冊或發送)一個個的topic路由信息。

路由信息中會細化為message queue的路由信息。而message queue會分布在不同的broker group。所以對于客戶端來說,分布在不同broker group的message queue為成為一個服務集群,但客戶端會把請求分攤到不同的queue。

而由于壓力分攤到了不同的queue,不同的queue實際上分布在不同的Broker group,也就是說壓力會分攤到不同的broker進程,這樣消息的存儲和轉發均起到了負載均衡的作用。

Broker一旦需要橫向擴展,只需要增加broker group,然后把對應的topic建上,客戶端的message queue集合即會變大,這樣對于broker的負載則由更多的broker group來進行分擔。

并且由于每個group下面的topic的配置都是獨立的,也就說可以讓group1下面的那個topic的queue數量是4,其他group下的topic queue數量是2,這樣group1則得到更大的負載。

  • commit log

雖然每個topic下面有很多message queue,但是message queue本身并不存儲消息。真正的消息存儲會寫在SetCommitLog的文件ter,message queue只是存儲CommitLog中對應的位置信息,方便通過message queue找到對應存儲在CommitLog的消息。

不同的topic,message queue都是寫到相同的CommitLog 文件,也就是說CommitLog完全的順序寫

具體如下圖:
在這里插入圖片描述

3.2 Producer發送消息負載均衡

Producer端,每個實例在發消息的時候,默認會輪詢所有的message queue發送,以達到讓消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就發送到不同的broker下,如下圖:
在這里插入圖片描述

注: 另外多個隊列可以部署在一臺機器上,也可以分別部署在多臺不同的機器上

上圖所示的若干隊列可以部署在一臺機器上,也可以分別部署在不同的機器上,發送消息通過輪詢隊列的方式發送,每個隊列接收平均的消息量。通過增加機器,可以水平擴展隊列容量。另外也可以自定義方式選擇發往哪個隊列。

RocketMQ的順序消息發送的時候,就要求我們自己實現隊列選擇器,根據消息唯一標識選擇對應的隊列進行發送。

3.3 消費端的負載均衡

3.3.1 Rebalance組件

消費端的負載均衡主要依賴于Rebalance組件,將 Broker 端中多個隊列按照某種算法分配給同一個消費組中的不同消費者

Rebalance即再均衡,指的是將一個Topic下的多個Queue在同一個Consumer Group中的多個 Consumer間進行重新分配的過程,它能夠提升消息的并行消費能力

RocketMQ 5.0以前是按照隊列粒度進行負載均衡的,5.0以后提供了按消息粒度進行負載均衡。

對于4.x/3.x的版本,包括DefaultPushConsumer、DefaultPullConsumer、LitePullConsumer等,默認且僅能使用隊列粒度負載均衡策略

  • 隊列粒度負載均衡策略

隊列粒度負載均衡策略中,同一消費者組內的多個消費者將按照隊列粒度消費消息,每個隊列只能被其中一個消費者消費

隊列粒度負載均衡是在每個消費者端進行的,并不是由某個節點統一進行負載均衡之后將分配結果通知到每個消費者。

費者增加或者減少會影響消息隊列的分配消,所以Broker需要感知消費者的上下線情況

消費者在啟動時會向所有的Broker發送心跳包進行注冊,通知Broker消費者上線,下線的時候也會向Broker發送取消注冊的請求

Broker會維護消費者信息的注冊信息,在消費者發生變更時會通知消費者進行負載均衡

由于負載均衡是每個客戶端獨立進行計算,那么何時觸發呢?

3.3.2 Rebalance觸發時機

  • 消費端啟動時,立即進行負載均衡

消費者在啟動時會進行一次負載均衡,為自己分配消息隊列。

  • 消費端定時任務每隔 20 秒觸發負載均衡

消費者本身也會定時執行負載均衡,默認是20s執行一次。
在這里插入圖片描述

  • 消費者上下線,Broker 端通知消費者觸發負載均衡

如果有消費者向Broker發送UNREGISTER_CLIENT取消注冊請求,并且開啟了允許通知變更,會觸發變更事件。

變更事件同上,Broker會通知該消費者組下的所有消費者進行一次負載均衡。

比如我們動態添加了Consumer進行消費,那么此時肯定是要重新分配一下,也就是觸發Rebalance再均衡。

例如,一個Topic下5個隊列,在只有1個消費者的情況下,這個消費者將負責消費這5個隊列的消息。如果此時其中一個消費者分配2個隊列,給另一個分配3個隊列,從而提升消息我們增加一個消費者,那么就可以給的并行消費能力。 如下圖:
在這里插入圖片描述

  • 消費者所訂閱Topic的隊列數量發生變化

比如我們動態調整了Topic對應的隊列數量,那么此時肯定是要重新分配一下,也就是觸發Rebalance再均衡。

例如一個Topic下5個隊列,有2個消費者的情況下,那么就可以給其中一個消費者分配2個隊列,給另一個分配3個隊列,假設我們調整到Topic下有7個隊列,還是2個消費者的情況下,那么就可以給其中一消費者分配4個隊列,給另一個分配3個隊列;從而提升消息的并行消費能力。如下圖:
在這里插入圖片描述
像Consumer Group擴容或縮容、Consumer與NameServer間發生網絡異常、Consumer發生宕機等都會導致消費者組中消費者的數量發生變化。

需要注意的是,由于一個隊列最多分配給一個消費者,因此當某個消費者組下的消費者實例數量大于隊列的數量時,多余的消費者實例將分配不到任何隊列,等于是多余的消費者什么都不做,白白浪費

3.3.3 負載均衡流程

1、發送心跳
消費者啟動后,"它就會通過定時任務不斷地向 RocketMQ 集群中的所有 Broker 實例發送心跳包消息消費分組名稱、訂閱關系集合、消息通信模式和客戶端實例編號等信息

Broker 端在收到消費者的心跳消息后,會將它維護在 ConsumerManager 的本地緩存變量 consumerTable,同時并將封裝后的客戶端網絡通道信息保存在本地緩存變量 channelinfoTable 中,為之后做消費端的負載均衡提供可以依據的元數據信息。

2、啟動負載均衡服務
負載均衡核心代碼:
負載均衡服務執行邏輯在doRebalance函數,里面會對每個消費者組執行負載均衡操作。

/* group */
private ConcurrentMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();public void doRebalance() {//每個消費者組都有負載均衡for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {try {impl.doRebalance();} catch (Throwable e) {log.error("doRebalance exception", e);}}}
}

由于每個消費者組可能會消費很多topic,每個topic都有自己的不同隊列,最終是按topic的維度進行負載均衡。

 public boolean doRebalance(boolean isOrder) {boolean balanced = true;Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {Iterator var4 = subTable.entrySet().iterator();while(var4.hasNext()) {Map.Entry<String, SubscriptionData> entry = (Map.Entry)var4.next();String topic = (String)entry.getKey();try {if (!this.clientRebalance(topic) && this.tryQueryAssignment(topic)) {balanced = this.getRebalanceResultFromBroker(topic, isOrder);} else {balanced = this.rebalanceByTopic(topic, isOrder);}} catch (Throwable var8) {Throwable e = var8;if (!topic.startsWith("%RETRY%")) {log.warn("rebalance Exception", e);balanced = false;}}}}this.truncateMessageQueueNotMyTopic();return balanced;}

里面最核心的便是:
在這里插入圖片描述

這段代碼的邏輯如下:
1.clientRebalance(topic):首先調用clientRebalance方法,嘗試對指定主題(topic)進行客戶端再平衡。如果這個方法返回false,意味著客戶端再平衡沒有成功或不需要進行。

2.tryQueryAssignment(topic):接著調用tryQueryAssignment方法,嘗試從代理服務器(broker)查詢最新的訂閱分配信息。這個方法通常會在客戶端再平衡失敗或者不需要進行時被調用,目的是檢查是否可以從代理服務器獲取新的分配信息。

3.如果上述兩個條件都滿足(即clientRebalance返回false且tryQueryAssignment成功),則會調用getRebalanceResultFromBroker(topic, isOrder)方法,從代理服務器獲取再平衡的結果,并將結果賦值給balanced變量。這通常意味著需要根據來自代理服務器的信息來更新本地的消費隊列分配。

4.否則,如果上述兩個條件不同時滿足,則調用rebalanceByTopic(topic, isOrder)方法,通過其他方式(可能是基于當前已有的分配信息)來進行再平衡,并將結果賦值給balanced變量。

總結來說,這段代碼是在判斷是否應該從代理服務器獲取最新的分配信息來完成再平衡,還是基于現有的分配信息自行處理再平衡。

選擇哪條路徑取決于clientRebalancetryQueryAssignment方法的執行結果。這種設計允許RocketMQ靈活地應對不同的網絡狀況和系統狀態,以確保消息能夠高效、公平地分發給各個消費者。

最終最終負載均衡邏輯處理的實現在:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic。

    private boolean rebalanceByTopic(String topic, boolean isOrder) {boolean balanced = true;Set mqSet;switch (this.messageModel) {//廣播模式case BROADCASTING:mqSet = (Set)this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}", new Object[]{this.consumerGroup, topic, mqSet, mqSet});}balanced = mqSet.equals(this.getWorkingMessageQueue(topic));} else {this.messageQueueChanged(topic, Collections.emptySet(), Collections.emptySet());log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);}break;//集群模式case CLUSTERING:mqSet = (Set)this.topicSubscribeInfoTable.get(topic);List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, this.consumerGroup);if (null == mqSet && !topic.startsWith("%RETRY%")) {this.messageQueueChanged(topic, Collections.emptySet(), Collections.emptySet());log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", this.consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);} catch (Throwable var11) {Throwable e = var11;log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);return false;}Set<MessageQueue> allocateResultSet = new HashSet();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", new Object[]{strategy.getName(), this.consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet});this.messageQueueChanged(topic, mqSet, allocateResultSet);}balanced = allocateResultSet.equals(this.getWorkingMessageQueue(topic));}}return balanced;}

我們一起來看一下這段代碼:
負載均衡服務會根據消費模式為”廣播模式”還是“集群模式”做不同的邏輯處理,這里主要來看下集群模式下的主要處理流程:
在這里插入圖片描述

(1) 獲取該主題下的消息消費隊列集合;

(2) 查詢 Broker 端獲取該消費組下消費者 Id 列表;

(3) 先對 Topic 下的消息消費隊列、消費者 Id 排序,然后用消息隊列分配策略算法(默認為:消息隊列的平均分配算法),計算出待拉取的消息隊列;
在這里插入圖片描述

這里的平均分配算法,類似于分頁的算法,將所有 MessageQueue 排好序類似于記錄,將所有消費端排好序類似頁數,并求出每一頁需要包含的平均 size 和每個頁面記錄的范圍 range ,最后遍歷整個 range 而計算出當前消費端應該分配到的記錄。

(4) 分配到的消息隊列集合與 processQueueTable 做一個過濾比對操作
在這里插入圖片描述

消費者實例內 ,processQueueTable 對象存儲著當前負載均衡的隊列 ,以及該隊列的消費快照。

標紅的部分表示與分配到的消息隊列集合互不包含,則需要將這些紅色隊列 Dropped 屬性為 true , 然后從 processQueueTable 對象中移除。

綠色的部分表示與分配到的消息隊列集合的交集,processQueueTable 對象中已經存在該隊列。

黃色的部分表示這些隊列需要添加到 processQueueTable 對象中,創建這些隊列的消費快照。最后創建拉取消息請求列表,并將請求分發到消息拉取服務,進入拉取消息環節。

3.3.4 Queue分配算法

一個Topic中的Queue只能由Consumer Group中的一個Consumer進行消費,而一個Consumer可以同時消費多個Queue中的消息。

那么Queue與Consumer間的配對關系是如何確定的,即Queue要分配給哪個Consumer進行消費,也是有算法策略的。

負載均衡策略頂層接口:

/*** Strategy Algorithm for message allocating between consumers*/
public interface AllocateMessageQueueStrategy {/*** Allocating by consumer id* 給消費者id分配消費隊列*/List<MessageQueue> allocate(final String consumerGroup, //消費者組final String currentCID, //當前消費者idfinal List<MessageQueue> mqAll, //所有的隊列final List<String> cidAll //所有的消費者);}

他默認共有7種負載均衡策略實現:
在這里插入圖片描述

常見的有四種策略,分別是:平均分配策略環形平均策略一致性hash策略同機房策略

這些策略是通過在創建Consumer時的構造器傳進去的。

(1)、平均分配策略 (默認)
該算法是根據
$[avg = QueueCount/ ConsumerCount] $的計算結果進行分配的,如果能夠整除,則按順序將avg個Queue逐個分配,如果不能整除,則將多余出的Queue按照Consumer順序逐個分配
在這里插入圖片描述

(2)、環形分配策略
環形平均算法是指,根據消費者的順序,依次由Queue隊列組成的環形圖逐個分配,該方法不需要提前計算,如下圖:
在這里插入圖片描述

(3)、一致性哈希分配策略

該算法會將consumer的hash值作為Node節點存放到hash環上,然后將queue的hash值也放到hash環 上,通過順時針方向,距離queue最近的那個consumer就是該queue要分配的consumer。
在這里插入圖片描述

一致性哈希算法可以有效減少由于消費者組擴容或縮容所帶來的大量的Rebalance,所以它適合用在Consume數量變化較頻繁的場景,如下圖:
在這里插入圖片描述

但是一致性哈希算法也存在不足,就是分配效率較低,容易導致分配不均的情況。即每個消費者消費的隊列數,有可能相差很大,這樣就會造成個別消費者壓力過大。

我們可以引入虛擬桶,讓queue在hash環中盡可能分配均勻

在負載均衡的分配策略中,一致性哈希算法數見不鮮,感興趣的同學可以移步至:《負載均衡的常見幾種算法》
(4)、機房分配策略

該算法會根據queue的部署機房位置consumer的位置,過濾出當前consumer相同機房的queue。

然后按照平均分配策略或環形平均策略對同機房queue進行分配。如果沒有同機房queue,則按照平均分配策略或環形平均策略對所有queue進行分配。如下圖:
在這里插入圖片描述

上面我們講了那么多好處,但是沒有什么事情都是能夠十分完美的兼容所有,下面我們來辯證的討論一下負載均衡對消費的影響

3.3.5 負載均衡對消費的影響

Rebalance的在提升消費能力的同時,也帶來一些問題

a、消費暫停: 在只有一個Consumer時,其負責消費所有隊列;在新增了一個Consumer后會觸發 Rebalance的發生。此時原Consumer就需要暫停部分隊列的消費,等到這些隊列分配給新的Consumer后,這些暫停消費的隊列才能繼續被消費。

b、消費重復: Consumer在消費新分配給自己的隊列時,必須接著之前Consumer 提交的消費進度的offset 繼續消費。然而默認情況下,offset是異步提交的,這個異步性導致提交到Broker的offset與Consumer實際消費的消息并不一致。這個不一致的差值就是可能會重復消費的消息。

C、消費突刺: 由于Rebalance可能導致重復消費,如果需要重復消費的消息過多,或者因為Rebalance暫停時間過長從而導致積壓了部分消息。那么有可能會導致在Rebalance結束之后瞬間需要消費很多消息
在這里插入圖片描述

上圖是真實的一個線上case,這兩個時間點在進行應用發布,根據我們上文的分析某個消費者下線后同組的其他消費者感知這一變化需要一定時間,導致有秒級的消費延遲產生。在發布結束后消費者快速處理堆積的消息,可以發現消費速度有一個明顯的上漲。

這個例子展示了下線時由于負載均衡帶來了短暫的消息處理延遲,新的消費者會從服務端獲取消費位點繼續之前的消費進度。如果消費者異常宕機或者沒有調用 shutdown 優雅下線,沒有上傳自己的最新消費位點,會使得新分配的消費者重復消費。

當某個客戶端觸發負載均衡時,就會出現:

  • 對于新分配的隊列可能會重復消費,這也是官方要求消費要做好冪等的原因
  • 對于不再負責的隊列會短時間消費停止,如果原本的消費 TPS 很高或者正好出現生產高峰就會造成消費毛刺。

為了避免這些影響,則需要我們在使用時注意:

  • 1.避免頻繁上下線,為了避免負載均衡的影響應該盡量減少客戶端的上下線,同時做好消費冪等;
  • 2.同時在有應用重啟或下線前要調用 shutdown 方法,這樣服務端在收到客戶端的下線請求后會通知客戶端及時觸發負載均衡,減少消費延遲;
  • 3.選擇合適的負載均衡策略;
    • 需要根據業務需要靈活選擇負載均衡策略:
    • 需要保證客戶端的負載盡可能的均衡:選擇默認的平均分配策略;
    • 需要降低應用重啟帶來的消費延遲:選擇一致性哈希的分配策略。
  • 4.RocketMQ 的負載均衡是每個客戶端獨立進行計算,所以務必要保證每個客戶端的負載均衡算法和訂閱語句一致:
    • 負載均衡策略不一致會導致多個客戶端分配到相同隊列或有客戶端分不到隊列;
    • 訂閱語句不一致會導致有消息未能消費。

3.3.6 RocketMQ 5.0 消息級別負載均衡

為了徹底解決客戶端負載均衡導致的重復消費和消費延遲問題,RocketMQ 5.0 提出了消息級別的負載均衡機制

同一個隊列的消息可以由多個消費者消費,服務端會確保消息不重不漏的被客戶端消費到:

消息粒度的負載均衡機制,是基于內部的單條消息確認語義實現的

消費者獲取某條消息后,服務端會將該消息加鎖,保證這條消息對其他消費者不可見,直到該消息消費成功或消費超時

因此,即使多個消費者同時消費同一隊列的消息,服務端也可保證消息不會被多個消費者重復消費

在 4.x 的客戶端中,順序消費的實現強依賴于隊列的分配

RocketMQ 5.0 在消息維度的負載均衡的基礎上也實現了順序消費的語意:不同消費者處理同一個消息組內的消息時,會嚴格按照先后順序鎖定消息狀態,確保同一消息組的消息串行消費
在這里插入圖片描述

如上圖所述,隊列 Queue1 中有 4 條順序消息,這 4 條消息屬于同一消息組 G1,存儲順序由 M1 到 M4。

在消費過程中,前面的消息 M1、M2 被 消費者Consumer A1 處理時,只要消費狀態沒有提交,消費者 A2 是無法并行消費后續的 M3、M4 消息的,必須等前面的消息提交消費狀態后才能消費后面的消息。

4.RocketMQ指定機器消費設計思路

日常測試環境當中會存在多臺consumer進行消費,但實際開發當中某臺consumer新上了功能后希望消息只由該機器進行消費進行邏輯覆蓋,這個時候consumerGroup的集群模式就會給我們造成困擾,因為消費負載均衡的原因不確定消息具體由哪臺consumer進行消費。當然我們可以通過介入consumer的負載均衡機制來實現指定機器消費。

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();// 通過改寫這部分邏輯,增加判斷是否是指定IP的機器,如果不是直接返回空列表表示該機器不負責消費if (!cidAll.contains(currentCID)) {return result;}int index = cidAll.indexOf(currentCID);int mod = mqAll.size() % cidAll.size();int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/62175.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/62175.shtml
英文地址,請注明出處:http://en.pswp.cn/web/62175.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

yocto的xxx.bb文件在什么時候會拷貝文件到build目錄

在 Yocto 中&#xff0c;.bb 文件用于描述如何構建和安裝一個軟件包&#xff0c;而文件在構建過程中的拷貝操作通常會在某些特定的步驟中進行。具體來說&#xff0c;文件會在以下幾個階段被拷貝到 build 目錄&#xff08;或者更準確地說&#xff0c;拷貝到目標目錄 ${D}&#x…

主打極致性價比,AMD RX 8600/8800顯卡定了

*以下內容僅為網絡爆料及傳聞&#xff0c;一切以官方消息為準。 這誰能想到&#xff0c;率先掏出下一代桌面獨立顯卡的不是老大哥 NVIDIA&#xff0c;也不是 AMD&#xff0c;反而是三家中存在感最弱的 Intel&#xff01; 就在 12 月 3 日&#xff0c;Intel 正式發布了自家第二…

數組哪些方法會觸發Vue監聽,哪些不會觸發監聽

發現寶藏 前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。【寶藏入口】。 在 Vue 中&#xff0c;數組的變化是通過 響應式 系統來監聽的。Vue 使用 getter 和 setter 來追蹤數組的變化&#xff0c;并在數…

npm, yarn, pnpm之間的區別

前言 在現代化的開發中&#xff0c;一個人可能同時開發多個項目&#xff0c;安裝的項目越來越多&#xff0c;所隨之安裝的依賴包也越來越臃腫&#xff0c;而且有時候所安裝的速度也很慢&#xff0c;甚至會安裝失敗。 因此我們就需要去了解一下&#xff0c;我們的包管理器&#…

工業檢測基礎-工業相機選型及應用場景

以下是一些常見的工業檢測相機種類、檢測原理、應用場景及選型依據&#xff1a; 2D相機 檢測原理&#xff1a;基于二維圖像捕獲&#xff0c;通過分析圖像的明暗、紋理、顏色等信息來檢測物體的特征和缺陷.應用場景&#xff1a;廣泛應用于平面工件的外觀檢測&#xff0c;如檢測…

C語言連接數據庫

文章目錄 一、初始化數據庫二、創建數據庫連接三、執行增刪改查語句1、增刪改2、查 四、執行增刪改查語句 接下來我簡單的介紹一下怎么用C語言連接數據庫。 初始化數據庫創建數據庫連接執行增刪改查語句關閉數據庫連接 一、初始化數據庫 // 數據庫初始化 MYSQL mysql; MYSQL* r…

優化LabVIEW數據運算效率的方法

在LabVIEW中進行大量數據運算時&#xff0c;提升計算效率并減少時間占用是開發過程中常遇到的挑戰。為此&#xff0c;可以從多個角度著手優化&#xff0c;包括合理選擇數據結構與算法、并行處理、多線程技術、硬件加速、內存管理和界面優化等。通過采用這些策略&#xff0c;可以…

開源模型應用落地-安全合規篇-用戶輸入價值觀判斷(四)

一、前言 在深度合規功能中,對用戶輸入內容的價值觀判斷具有重要意義。這一功能不僅僅是對信息合法性和合規性的簡單審核,更是對信息背后隱含的倫理道德和社會責任的深刻洞察。通過對價值觀的判斷,系統能夠識別可能引發不當影響或沖突的內容,從而為用戶提供更安全、更和諧的…

計算機的錯誤計算(一百七十六)

摘要 利用某一大語言模型計算 的值&#xff0c;輸出為 0 . 例1. 在某一大語言模型下&#xff0c;計算 的值。其中sin中值取弧度。結果保留16位有效數字。 直接貼圖吧&#xff1a; 點評&#xff1a; &#xff08;1&#xff09;以上為一個大模型給的答案。從其回答可知&…

數據結構與算法——1204—遞歸分治法

1、斐波那契數列優化 使用滾動變量&#xff0c;保存當前計算結果和前兩項值 (1)RAB (2)更新計算對象&#xff0c;AB&#xff0c;BR #include<iostream> using namespace std;int fun(int n) {if (n 0)return 0;if (n 1 || n 2)return 1;int num11;int num21;int su…

openstack內部rpc消息通信源碼分析

我們知道openstack內部消息隊列基于AMQP協議&#xff0c;默認使用的rabbitmq 消息隊列。談到rabbitmq&#xff0c;大家或許并不陌生&#xff0c;但或許會對oslo message有些陌生。openstack內部并不是直接使用rabbitmq&#xff0c;而是使用了oslo.message 。oslo.message 后端的…

Python 3 和 MongoDB 的集成使用

Python 3 和 MongoDB 的集成使用 MongoDB 是一個流行的 NoSQL 數據庫&#xff0c;以其靈活的數據模型和強大的查詢功能而聞名。Python 3 作為一種廣泛使用的編程語言&#xff0c;與 MongoDB 的集成變得日益重要。本文將介紹如何在 Python 3 環境中集成和使用 MongoDB&#xff…

Postman自定義腳本Pre-request-script以及Test

這兩個都是我們進行自定義script腳本的地方&#xff0c;分別是在請求執行的前后運行。 我們舉兩個可能經常運用到的場景。 (一)請求A先執行&#xff0c;請求B使用請求A響應結果作為參數。如果我們不用自定義腳本&#xff0c;可能得先執行請求A&#xff0c;然后手動復制響應結果…

構建高效OTA旅游平臺的技術指南

1. 引言 在信息技術高速發展的今天&#xff0c;互聯網深刻地改變了人們的旅行方式。傳統的旅行社模式逐漸被在線旅游平臺所取代&#xff0c;OTA&#xff08;Online Travel Agency&#xff0c;在線旅行社&#xff09;旅游平臺應運而生&#xff0c;成為人們獲取旅游信息、預訂旅…

總結的一些MySql面試題

目錄 一&#xff1a;基礎篇 二&#xff1a;索引原理和SQL優化 三&#xff1a;事務原理 四&#xff1a;緩存策略 一&#xff1a;基礎篇 1&#xff1a;定義&#xff1a;按照數據結構來組織、存儲和管理數據的倉庫&#xff1b;是一個長期存儲在計算機內的、有組織的、可共享 的…

116. UE5 GAS RPG 實現擊殺掉落戰利品功能

這一篇&#xff0c;我們實現敵人被擊敗后&#xff0c;掉落戰利品的功能。首先&#xff0c;我們將創建一個新的結構體&#xff0c;用于定義掉落體的內容&#xff0c;方便我們設置掉落物。然后&#xff0c;我們實現敵人死亡時的掉落函數&#xff0c;并在藍圖里實現對應的邏輯&…

Excel技巧:如何批量調整excel表格中的圖片?

插入到excel表格中的圖片大小不一&#xff0c;如何做到每張圖片都完美的與單元格大小相同&#xff1f;并且能夠根據單元格來改變大小&#xff1f;今天分享&#xff0c;excel表格里的圖片如何批量調整大小。 方法如下&#xff1a; 點擊表格中的一個圖片&#xff0c;然后按住Ct…

智能合約

06-智能合約 0 啥是智能合約&#xff1f; 定義 智能合約&#xff0c;又稱加密合約&#xff0c;在一定條件下可直接控制數字貨幣或資產在各方之間轉移的一種計算機程序。 角色 區塊鏈網絡可視為一個分布式存儲服務&#xff0c;因為它存儲了所有交易和智能合約的狀態 智能合約還…

智慧油客:從初識、再識OceanBase,到全棧上線

今天&#xff0c;我們邀請了智慧油客的研發總監黃普友&#xff0c;為我們講述智慧油客與 OceanBase 初識、熟悉和結緣的故事。 智慧油客自2016年誕生以來&#xff0c;秉持新零售的思維&#xff0c;成功從過去二十年間以“以銷售產品為中心”的傳統思維模式&#xff0c;轉向“以…

【深度學習】手機SIM卡托缺陷檢測【附鏈接】

一、手機SIM卡托用途 SIM卡托是用于固定和保護SIM卡的部件&#xff0c;通過連接SIM卡與手機主板的方式&#xff0c;允許設備訪問移動網絡&#xff0c;用戶可以通過SIM卡進行通話、發送短信和使用數據服務。 二、手機SIM卡托不良影響 SIM卡接觸不良&#xff0c;造成信號中斷&…