上一章:《SpringBoot+Aop實現RocketMq的冪等》
文章目錄
- 1.背景
- 1.1 什么是負載均衡
- 1.2 負載均衡的意義
- 2.RocketMQ消息消費
- 2.1 消息的流轉過程
- 2.2 Consumer消費消息的流程
- 3.RocketMq的負載均衡策略
- 3.1 Broker負載均衡
- 3.2 Producer發送消息負載均衡
- 3.3 消費端的負載均衡
- 3.3.1 Rebalance組件
- 3.3.2 Rebalance觸發時機
- 3.3.3 負載均衡流程
- 3.3.4 Queue分配算法
- 3.3.5 負載均衡對消費的影響
- 3.3.6 RocketMQ 5.0 消息級別負載均衡
- 4.RocketMQ指定機器消費設計思路
1.背景
在RocketMQ中,它的負載均衡主要可以分為
- Consumer訂閱消息的負載均衡
- Broker端的消息分發策略
- Producer端的發送消息的負載均衡
其中在消費者端還有一個重量級的組件:Rebalance負載均衡組件,他負責相對均勻的給消費者分配需要拉取的隊列信息。
在了解負載均衡組件之前,我們先看看什么是負載均衡以及為什么要使用負載均衡
1.1 什么是負載均衡
負載均衡在分布式服務里會頻繁的出現,其主要是用來在多個資源 (一般是服務器)中分配負載,達到最優化資源使用,避免單臺服務器過載。
RocketMQ中的負載均衡,指的是如何將消息隊列(Message Queue)均勻地分配給消費者組中的各個消費者實例。
通過負載均衡機制,可以避免某些消費者實例處理過多消息隊列而過載,或者某些實例沒有消息可處理,從而提高系統的整體處理能力和資源利用率。
1.2 負載均衡的意義
上圖是 RocketMQ 的消息儲存模型:消息是按照隊列的方式分區有序儲存的。
RocketMQ 的隊列模型使得生產者、消費者和讀寫隊列都是多對多的映射關系,彼此之間都可以無限水平擴展。
對比傳統的消息隊列如 RabbitMQ 是很大的優勢。尤其是在流式處理場景下有天然優勢,能夠保證同一隊列的消息被相同的消費者處理,對于批量處理、聚合處理更友好。
消費者消費某個 topic 的消息等同于消費這個 topic 上所有隊列的消息(上圖中 Consumer A1 消費隊列 1,Consumer A2 消費隊列 2、3)。
所以,要保證每個消費者的負載盡量均衡,也就是要給這些消費者分配相同數量的隊列,并保證在異常情況下(如客戶端宕機)隊列可以在不同消費者之間遷移。
在具體了解RocketMQ的負載均衡策略之前,我們先了解一些RocketMq的整個消費邏輯,以便我們后期可以更好的理解
2.RocketMQ消息消費
2.1 消息的流轉過程
RocketMQ 支持兩種消費模式:集群消費( Clustering )和廣播消費( Broadcasting )。
集群消費:同一 Topic 下的一條消息只會被同一消費組中的一個消費者消費。也就是說,消息被負載均衡到了同一個消費組的多個消費者實例上。
廣播消費:當使用廣播消費模式時,每條消息推送給集群內所有的消費者,保證消息至少被每個消費者消費一次。
上面提到了兩個名詞:消費者組、消息隊列
這兩個名詞在:《RocketMQ 介紹及基本概念》這篇文章中已進行說明,下面在簡單概述一下
- 消費者組
消費者組是 RocketMQ 中負載均衡的基本單位。
一個主題(Topic)的消息隊列可以被多個消費者組訂閱,每個消費者組中的消費者實例會共享消息隊列。
如果一個主題有 8 個隊列,而一個消費者組有 4 個消費者實例,那么負載均衡機制會將這 8 個隊列均勻分配給這 4 個消費者實例。
- 消息隊列
消息隊列是 RocketMQ 中消息存儲和消費的基本單位。
一個主題可以有多個消息隊列,這些隊列中的消息會被消費者組中的消費者實例消費。通過將消息隊列均勻地分配給各個消費者實例,RocketMQ 實現了負載均衡。
2.2 Consumer消費消息的流程
consumer消息消費過程:
因為廣播模式所有的Consumer都會收到全量消息,所以RocketMQ的負載均衡只針對于Consumer集群消費的模式。
3.RocketMq的負載均衡策略
3.1 Broker負載均衡
Broker是以group(消費者組)為單位提供服務。
一個group里面分master和slave,master和slave存儲的數據一樣,slave從master同步數據(同步雙寫或異步復制看配置)。
通過nameserver暴露給客戶端后,只是客戶端關心(注冊或發送)一個個的topic路由信息。
路由信息中會細化為message queue的路由信息。而message queue會分布在不同的broker group。所以對于客戶端來說,分布在不同broker group的message queue為成為一個服務集群,但客戶端會把請求分攤到不同的queue。
而由于壓力分攤到了不同的queue,不同的queue實際上分布在不同的Broker group,也就是說壓力會分攤到不同的broker進程,這樣消息的存儲和轉發均起到了負載均衡的作用。
Broker一旦需要橫向擴展,只需要增加broker group,然后把對應的topic建上,客戶端的message queue集合即會變大,這樣對于broker的負載則由更多的broker group來進行分擔。
并且由于每個group下面的topic的配置都是獨立的,也就說可以讓group1下面的那個topic的queue數量是4,其他group下的topic queue數量是2,這樣group1則得到更大的負載。
- commit log
雖然每個topic下面有很多message queue,但是message queue本身并不存儲消息。真正的消息存儲會寫在SetCommitLog的文件ter,message queue只是存儲CommitLog中對應的位置信息,方便通過message queue找到對應存儲在CommitLog的消息。
不同的topic,message queue都是寫到相同的CommitLog 文件,也就是說CommitLog完全的順序寫。
具體如下圖:
3.2 Producer發送消息負載均衡
Producer端,每個實例在發消息的時候,默認會輪詢所有的message queue發送,以達到讓消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就發送到不同的broker下,如下圖:
注: 另外多個隊列可以部署在一臺機器上,也可以分別部署在多臺不同的機器上
上圖所示的若干隊列可以部署在一臺機器上,也可以分別部署在不同的機器上,發送消息通過輪詢隊列的方式發送,每個隊列接收平均的消息量。通過增加機器,可以水平擴展隊列容量。另外也可以自定義方式選擇發往哪個隊列。
RocketMQ的順序消息發送的時候,就要求我們自己實現隊列選擇器,根據消息唯一標識選擇對應的隊列進行發送。
3.3 消費端的負載均衡
3.3.1 Rebalance組件
消費端的負載均衡主要依賴于Rebalance組件,將 Broker 端中多個隊列按照某種算法分配給同一個消費組中的不同消費者。
Rebalance即再均衡,指的是將一個Topic下的多個Queue在同一個Consumer Group中的多個 Consumer間進行重新分配的過程,它能夠提升消息的并行消費能力。
RocketMQ 5.0以前是按照隊列粒度進行負載均衡的,5.0以后提供了按消息粒度進行負載均衡。
對于4.x/3.x的版本,包括DefaultPushConsumer、DefaultPullConsumer、LitePullConsumer等,默認且僅能使用隊列粒度負載均衡策略。
- 隊列粒度負載均衡策略
隊列粒度負載均衡策略中,同一消費者組內的多個消費者將按照隊列粒度消費消息,每個隊列只能被其中一個消費者消費。
隊列粒度負載均衡是在每個消費者端進行的,并不是由某個節點統一進行負載均衡之后將分配結果通知到每個消費者。
費者增加或者減少會影響消息隊列的分配消,所以Broker需要感知消費者的上下線情況。
消費者在啟動時會向所有的Broker發送心跳包進行注冊,通知Broker消費者上線,下線的時候也會向Broker發送取消注冊的請求。
Broker會維護消費者信息的注冊信息,在消費者發生變更時會通知消費者進行負載均衡。
由于負載均衡是每個客戶端獨立進行計算,那么何時觸發呢?
3.3.2 Rebalance觸發時機
- 消費端啟動時,立即進行負載均衡
消費者在啟動時會進行一次負載均衡,為自己分配消息隊列。
- 消費端定時任務每隔 20 秒觸發負載均衡
消費者本身也會定時執行負載均衡,默認是20s執行一次。
- 消費者上下線,Broker 端通知消費者觸發負載均衡
如果有消費者向Broker發送UNREGISTER_CLIENT取消注冊請求,并且開啟了允許通知變更,會觸發變更事件。
變更事件同上,Broker會通知該消費者組下的所有消費者進行一次負載均衡。
比如我們動態添加了Consumer進行消費,那么此時肯定是要重新分配一下,也就是觸發Rebalance再均衡。
例如,一個Topic下5個隊列,在只有1個消費者的情況下,這個消費者將負責消費這5個隊列的消息。如果此時其中一個消費者分配2個隊列,給另一個分配3個隊列,從而提升消息我們增加一個消費者,那么就可以給的并行消費能力。 如下圖:
- 消費者所訂閱Topic的隊列數量發生變化
比如我們動態調整了Topic對應的隊列數量,那么此時肯定是要重新分配一下,也就是觸發Rebalance再均衡。
例如一個Topic下5個隊列,有2個消費者的情況下,那么就可以給其中一個消費者分配2個隊列,給另一個分配3個隊列,假設我們調整到Topic下有7個隊列,還是2個消費者的情況下,那么就可以給其中一消費者分配4個隊列,給另一個分配3個隊列;從而提升消息的并行消費能力。如下圖:
像Consumer Group擴容或縮容、Consumer與NameServer間發生網絡異常、Consumer發生宕機等都會導致消費者組中消費者的數量發生變化。
需要注意的是,由于一個隊列最多分配給一個消費者,因此當某個消費者組下的消費者實例數量大于隊列的數量時,多余的消費者實例將分配不到任何隊列,等于是多余的消費者什么都不做,白白浪費。
3.3.3 負載均衡流程
1、發送心跳
消費者啟動后,"它就會通過定時任務不斷地向 RocketMQ 集群中的所有 Broker 實例發送心跳包消息消費分組名稱、訂閱關系集合、消息通信模式和客戶端實例編號等信息
Broker 端在收到消費者的心跳消息后,會將它維護在 ConsumerManager 的本地緩存變量 consumerTable,同時并將封裝后的客戶端網絡通道信息保存在本地緩存變量 channelinfoTable 中,為之后做消費端的負載均衡提供可以依據的元數據信息。
2、啟動負載均衡服務
負載均衡核心代碼:
負載均衡服務執行邏輯在doRebalance函數,里面會對每個消費者組執行負載均衡操作。
/* group */
private ConcurrentMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();public void doRebalance() {//每個消費者組都有負載均衡for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {try {impl.doRebalance();} catch (Throwable e) {log.error("doRebalance exception", e);}}}
}
由于每個消費者組可能會消費很多topic,每個topic都有自己的不同隊列,最終是按topic的維度進行負載均衡。
public boolean doRebalance(boolean isOrder) {boolean balanced = true;Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {Iterator var4 = subTable.entrySet().iterator();while(var4.hasNext()) {Map.Entry<String, SubscriptionData> entry = (Map.Entry)var4.next();String topic = (String)entry.getKey();try {if (!this.clientRebalance(topic) && this.tryQueryAssignment(topic)) {balanced = this.getRebalanceResultFromBroker(topic, isOrder);} else {balanced = this.rebalanceByTopic(topic, isOrder);}} catch (Throwable var8) {Throwable e = var8;if (!topic.startsWith("%RETRY%")) {log.warn("rebalance Exception", e);balanced = false;}}}}this.truncateMessageQueueNotMyTopic();return balanced;}
里面最核心的便是:
這段代碼的邏輯如下:
1.clientRebalance(topic):首先調用clientRebalance方法,嘗試對指定主題(topic)進行客戶端再平衡。如果這個方法返回false,意味著客戶端再平衡沒有成功或不需要進行。
2.tryQueryAssignment(topic):接著調用tryQueryAssignment方法,嘗試從代理服務器(broker)查詢最新的訂閱分配信息。這個方法通常會在客戶端再平衡失敗或者不需要進行時被調用,目的是檢查是否可以從代理服務器獲取新的分配信息。
3.如果上述兩個條件都滿足(即clientRebalance返回false且tryQueryAssignment成功),則會調用getRebalanceResultFromBroker(topic, isOrder)方法,從代理服務器獲取再平衡的結果,并將結果賦值給balanced變量。這通常意味著需要根據來自代理服務器的信息來更新本地的消費隊列分配。
4.否則,如果上述兩個條件不同時滿足,則調用rebalanceByTopic(topic, isOrder)方法,通過其他方式(可能是基于當前已有的分配信息)來進行再平衡,并將結果賦值給balanced變量。
總結來說,這段代碼是在判斷是否應該從代理服務器獲取最新的分配信息來完成再平衡,還是基于現有的分配信息自行處理再平衡。
選擇哪條路徑取決于clientRebalance和tryQueryAssignment方法的執行結果。這種設計允許RocketMQ靈活地應對不同的網絡狀況和系統狀態,以確保消息能夠高效、公平地分發給各個消費者。
最終最終負載均衡邏輯處理的實現在:
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic。
private boolean rebalanceByTopic(String topic, boolean isOrder) {boolean balanced = true;Set mqSet;switch (this.messageModel) {//廣播模式case BROADCASTING:mqSet = (Set)this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}", new Object[]{this.consumerGroup, topic, mqSet, mqSet});}balanced = mqSet.equals(this.getWorkingMessageQueue(topic));} else {this.messageQueueChanged(topic, Collections.emptySet(), Collections.emptySet());log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);}break;//集群模式case CLUSTERING:mqSet = (Set)this.topicSubscribeInfoTable.get(topic);List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, this.consumerGroup);if (null == mqSet && !topic.startsWith("%RETRY%")) {this.messageQueueChanged(topic, Collections.emptySet(), Collections.emptySet());log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", this.consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);} catch (Throwable var11) {Throwable e = var11;log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);return false;}Set<MessageQueue> allocateResultSet = new HashSet();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("client rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", new Object[]{strategy.getName(), this.consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet});this.messageQueueChanged(topic, mqSet, allocateResultSet);}balanced = allocateResultSet.equals(this.getWorkingMessageQueue(topic));}}return balanced;}
我們一起來看一下這段代碼:
負載均衡服務會根據消費模式為”廣播模式”還是“集群模式”做不同的邏輯處理,這里主要來看下集群模式下的主要處理流程:
(1) 獲取該主題下的消息消費隊列集合;
(2) 查詢 Broker 端獲取該消費組下消費者 Id 列表;
(3) 先對 Topic 下的消息消費隊列、消費者 Id 排序,然后用消息隊列分配策略算法(默認為:消息隊列的平均分配算法),計算出待拉取的消息隊列;
這里的平均分配算法,類似于分頁的算法,將所有 MessageQueue 排好序類似于記錄,將所有消費端排好序類似頁數,并求出每一頁需要包含的平均 size 和每個頁面記錄的范圍 range ,最后遍歷整個 range 而計算出當前消費端應該分配到的記錄。
(4) 分配到的消息隊列集合與 processQueueTable 做一個過濾比對操作
消費者實例內 ,processQueueTable 對象存儲著當前負載均衡的隊列 ,以及該隊列的消費快照。
標紅的部分表示與分配到的消息隊列集合互不包含,則需要將這些紅色隊列 Dropped 屬性為 true , 然后從 processQueueTable 對象中移除。
綠色的部分表示與分配到的消息隊列集合的交集,processQueueTable 對象中已經存在該隊列。
黃色的部分表示這些隊列需要添加到 processQueueTable 對象中,創建這些隊列的消費快照。最后創建拉取消息請求列表,并將請求分發到消息拉取服務,進入拉取消息環節。
3.3.4 Queue分配算法
一個Topic中的Queue只能由Consumer Group中的一個Consumer進行消費,而一個Consumer可以同時消費多個Queue中的消息。
那么Queue與Consumer間的配對關系是如何確定的,即Queue要分配給哪個Consumer進行消費,也是有算法策略的。
負載均衡策略頂層接口:
/*** Strategy Algorithm for message allocating between consumers*/
public interface AllocateMessageQueueStrategy {/*** Allocating by consumer id* 給消費者id分配消費隊列*/List<MessageQueue> allocate(final String consumerGroup, //消費者組final String currentCID, //當前消費者idfinal List<MessageQueue> mqAll, //所有的隊列final List<String> cidAll //所有的消費者);}
他默認共有7種負載均衡策略實現:
常見的有四種策略,分別是:平均分配策略、環形平均策略、一致性hash策略、同機房策略。
這些策略是通過在創建Consumer時的構造器傳進去的。
(1)、平均分配策略 (默認)
該算法是根據
$[avg = QueueCount/ ConsumerCount] $的計算結果進行分配的,如果能夠整除,則按順序將avg個Queue逐個分配,如果不能整除,則將多余出的Queue按照Consumer順序逐個分配
(2)、環形分配策略
環形平均算法是指,根據消費者的順序,依次由Queue隊列組成的環形圖逐個分配,該方法不需要提前計算,如下圖:
(3)、一致性哈希分配策略
該算法會將consumer的hash值作為Node節點存放到hash環上,然后將queue的hash值也放到hash環 上,通過順時針方向,距離queue最近的那個consumer就是該queue要分配的consumer。
一致性哈希算法可以有效減少由于消費者組擴容或縮容所帶來的大量的Rebalance,所以它適合用在Consume數量變化較頻繁的場景,如下圖:
但是一致性哈希算法也存在不足,就是分配效率較低,容易導致分配不均的情況。即每個消費者消費的隊列數,有可能相差很大,這樣就會造成個別消費者壓力過大。
我們可以引入虛擬桶,讓queue在hash環中盡可能分配均勻。
在負載均衡的分配策略中,一致性哈希算法數見不鮮,感興趣的同學可以移步至:《負載均衡的常見幾種算法》
(4)、機房分配策略
該算法會根據queue的部署機房位置和consumer的位置,過濾出當前consumer相同機房的queue。
然后按照平均分配策略或環形平均策略對同機房queue進行分配。如果沒有同機房queue,則按照平均分配策略或環形平均策略對所有queue進行分配。如下圖:
上面我們講了那么多好處,但是沒有什么事情都是能夠十分完美的兼容所有,下面我們來辯證的討論一下負載均衡對消費的影響
3.3.5 負載均衡對消費的影響
Rebalance的在提升消費能力的同時,也帶來一些問題
a、消費暫停: 在只有一個Consumer時,其負責消費所有隊列;在新增了一個Consumer后會觸發 Rebalance的發生。此時原Consumer就需要暫停部分隊列的消費,等到這些隊列分配給新的Consumer后,這些暫停消費的隊列才能繼續被消費。
b、消費重復: Consumer在消費新分配給自己的隊列時,必須接著之前Consumer 提交的消費進度的offset 繼續消費。然而默認情況下,offset是異步提交的,這個異步性導致提交到Broker的offset與Consumer實際消費的消息并不一致。這個不一致的差值就是可能會重復消費的消息。
C、消費突刺: 由于Rebalance可能導致重復消費,如果需要重復消費的消息過多,或者因為Rebalance暫停時間過長從而導致積壓了部分消息。那么有可能會導致在Rebalance結束之后瞬間需要消費很多消息
上圖是真實的一個線上case,這兩個時間點在進行應用發布,根據我們上文的分析某個消費者下線后同組的其他消費者感知這一變化需要一定時間,導致有秒級的消費延遲產生。在發布結束后消費者快速處理堆積的消息,可以發現消費速度有一個明顯的上漲。
這個例子展示了下線時由于負載均衡帶來了短暫的消息處理延遲,新的消費者會從服務端獲取消費位點繼續之前的消費進度。如果消費者異常宕機或者沒有調用 shutdown 優雅下線,沒有上傳自己的最新消費位點,會使得新分配的消費者重復消費。
當某個客戶端觸發負載均衡時,就會出現:
- 對于新分配的隊列可能會重復消費,這也是官方要求消費要做好冪等的原因;
- 對于不再負責的隊列會短時間消費停止,如果原本的消費 TPS 很高或者正好出現生產高峰就會造成消費毛刺。
為了避免這些影響,則需要我們在使用時注意:
- 1.避免頻繁上下線,為了避免負載均衡的影響應該盡量減少客戶端的上下線,同時做好消費冪等;
- 2.同時在有應用重啟或下線前要調用 shutdown 方法,這樣服務端在收到客戶端的下線請求后會通知客戶端及時觸發負載均衡,減少消費延遲;
- 3.選擇合適的負載均衡策略;
- 需要根據業務需要靈活選擇負載均衡策略:
- 需要保證客戶端的負載盡可能的均衡:選擇默認的平均分配策略;
- 需要降低應用重啟帶來的消費延遲:選擇一致性哈希的分配策略。
- 4.RocketMQ 的負載均衡是每個客戶端獨立進行計算,所以務必要保證每個客戶端的負載均衡算法和訂閱語句一致:
- 負載均衡策略不一致會導致多個客戶端分配到相同隊列或有客戶端分不到隊列;
- 訂閱語句不一致會導致有消息未能消費。
3.3.6 RocketMQ 5.0 消息級別負載均衡
為了徹底解決客戶端負載均衡導致的重復消費和消費延遲問題,RocketMQ 5.0 提出了消息級別的負載均衡機制。
同一個隊列的消息可以由多個消費者消費,服務端會確保消息不重不漏的被客戶端消費到:
消息粒度的負載均衡機制,是基于內部的單條消息確認語義實現的。
消費者獲取某條消息后,服務端會將該消息加鎖,保證這條消息對其他消費者不可見,直到該消息消費成功或消費超時。
因此,即使多個消費者同時消費同一隊列的消息,服務端也可保證消息不會被多個消費者重復消費。
在 4.x 的客戶端中,順序消費的實現強依賴于隊列的分配。
RocketMQ 5.0 在消息維度的負載均衡的基礎上也實現了順序消費的語意:不同消費者處理同一個消息組內的消息時,會嚴格按照先后順序鎖定消息狀態,確保同一消息組的消息串行消費。
如上圖所述,隊列 Queue1 中有 4 條順序消息,這 4 條消息屬于同一消息組 G1,存儲順序由 M1 到 M4。
在消費過程中,前面的消息 M1、M2 被 消費者Consumer A1 處理時,只要消費狀態沒有提交,消費者 A2 是無法并行消費后續的 M3、M4 消息的,必須等前面的消息提交消費狀態后才能消費后面的消息。
4.RocketMQ指定機器消費設計思路
日常測試環境當中會存在多臺consumer進行消費,但實際開發當中某臺consumer新上了功能后希望消息只由該機器進行消費進行邏輯覆蓋,這個時候consumerGroup的集群模式就會給我們造成困擾,因為消費負載均衡的原因不確定消息具體由哪臺consumer進行消費。當然我們可以通過介入consumer的負載均衡機制來實現指定機器消費。
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {List<MessageQueue> result = new ArrayList<MessageQueue>();// 通過改寫這部分邏輯,增加判斷是否是指定IP的機器,如果不是直接返回空列表表示該機器不負責消費if (!cidAll.contains(currentCID)) {return result;}int index = cidAll.indexOf(currentCID);int mod = mqAll.size() % cidAll.size();int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}
}