分布式開放消息系統 ( RocketMQ ) 的原理與實踐

分布式消息系統作為實現分布式系統可擴展、可伸縮性的關鍵組件,需要具有高吞吐量、高可用等特點。而談到消息系統的設計,就回避不了兩個問題:

  1. 消息的順序問題
  2. 消息的重復問題

RocketMQ作為阿里開源的一款高性能、高吞吐量的消息中間件,它是怎樣來解決這兩個問題的?RocketMQ 有哪些關鍵特性?其實現原理是怎樣的?

關鍵特性以及其實現原理

一、順序消息

消息有序指的是可以按照消息的發送順序來消費。例如:一筆訂單產生了 3 條消息,分別是訂單創建、訂單付款、訂單完成。消費時,要按照順序依次消費才有意義。與此同時多筆訂單之間又是可以并行消費的。首先來看如下示例:

假如生產者產生了2條消息:M1、M2,要保證這兩條消息的順序,應該怎樣做?你腦中想到的可能是這樣:

?

你可能會采用這種方式保證消息順序

?

假定M1發送到S1,M2發送到S2,如果要保證M1先于M2被消費,那么需要M1到達消費端被消費后,通知S2,然后S2再將M2發送到消費端。

這個模型存在的問題是,如果M1和M2分別發送到兩臺Server上,就不能保證M1先達到MQ集群,也不能保證M1被先消費。換個角度看,如果M2先于M1達到MQ集群,甚至M2被消費后,M1才達到消費端,這時消息也就亂序了,說明以上模型是不能保證消息的順序的。如何才能在MQ集群保證消息的順序?一種簡單的方式就是將M1、M2發送到同一個Server上:

保證消息順序,你改進后的方法

?

這樣可以保證M1先于M2到達MQServer(生產者等待M1發送成功后再發送M2),根據先達到先被消費的原則,M1會先于M2被消費,這樣就保證了消息的順序。

這個模型也僅僅是理論上可以保證消息的順序,在實際場景中可能會遇到下面的問題:

網絡延遲問題

只要將消息從一臺服務器發往另一臺服務器,就會存在網絡延遲問題。如上圖所示,如果發送M1耗時大于發送M2的耗時,那么M2就仍將被先消費,仍然不能保證消息的順序。即使M1和M2同時到達消費端,由于不清楚消費端1和消費端2的負載情況,仍然有可能出現M2先于M1被消費的情況。

那如何解決這個問題?將M1和M2發往同一個消費者,且發送M1后,需要消費端響應成功后才能發送M2。

聰明的你可能已經想到另外的問題:如果M1被發送到消費端后,消費端1沒有響應,那是繼續發送M2呢,還是重新發送M1?一般為了保證消息一定被消費,肯定會選擇重發M1到另外一個消費端2,就如下圖所示。

保證消息順序的正確姿勢

這樣的模型就嚴格保證消息的順序,細心的你仍然會發現問題,消費端1沒有響應Server時有兩種情況,一種是M1確實沒有到達(數據在網絡傳送中丟失),另外一種消費端已經消費M1且已經發送響應消息,只是MQ Server端沒有收到。如果是第二種情況,重發M1,就會造成M1被重復消費。也就引入了我們要說的第二個問題,消息重復問題,這個后文會詳細講解。

回過頭來看消息順序問題,嚴格的順序消息非常容易理解,也可以通過文中所描述的方式來簡單處理。總結起來,要實現嚴格的順序消息,簡單且可行的辦法就是:

保證生產者 - MQServer - 消費者是一對一對一的關系

這樣的設計雖然簡單易行,但也會存在一些很嚴重的問題,比如:

  1. 并行度就會成為消息系統的瓶頸(吞吐量不夠)
  2. 更多的異常處理,比如:只要消費端出現問題,就會導致整個處理流程阻塞,我們不得不花費更多的精力來解決阻塞的問題。

但我們的最終目標是要集群的高容錯性和高吞吐量。這似乎是一對不可調和的矛盾,那么阿里是如何解決的?

世界上解決一個計算機問題最簡單的方法:“恰好”不需要解決它!—— 沈詢

有些問題,看起來很重要,但實際上我們可以通過合理的設計或者將問題分解來規避。如果硬要把時間花在解決問題本身,實際上不僅效率低下,而且也是一種浪費。從這個角度來看消息的順序問題,我們可以得出兩個結論:

  1. 不關注亂序的應用實際大量存在
  2. 隊列無序并不意味著消息無序

所以從業務層面來保證消息的順序而不僅僅是依賴于消息系統,是不是我們應該尋求的一種更合理的方式?

最后我們從源碼角度分析RocketMQ怎么實現發送順序消息。

RocketMQ通過輪詢所有隊列的方式來確定消息被發送到哪一個隊列(負載均衡策略)。比如下面的示例中,訂單號相同的消息會被先后發送到同一個隊列中:

// RocketMQ通過MessageQueueSelector中實現的算法來確定消息發送到哪一個隊列上
// RocketMQ默認提供了兩種MessageQueueSelector實現:隨機/Hash
// 當然你可以根據業務實現自己的MessageQueueSelector來決定消息按照何種策略發送到消息隊列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}
}, orderId);

在獲取到路由信息以后,會根據MessageQueueSelector實現的算法來選擇一個隊列,同一個OrderId獲取到的肯定是同一個隊列。

private SendResult send()  {// 獲取topic路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;// 根據我們的算法,選擇一個發送隊列// 這里的arg = orderIdmq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);if (mq != null) {return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);}}
}

二、消息重復

上面在解決消息順序問題時,引入了一個新的問題,就是消息重復。那么RocketMQ是怎樣解決消息重復的問題呢?還是“恰好”不解決。

造成消息重復的根本原因是:網絡不可達。只要通過網絡交換數據,就無法避免這個問題。所以解決這個問題的辦法就是繞過這個問題。那么問題就變成了:如果消費端收到兩條一樣的消息,應該怎樣處理?

  1. 消費端處理消息的業務邏輯保持冪等性
  2. 保證每條消息都有唯一編號且保證消息處理成功與去重表的日志同時出現

第1條很好理解,只要保持冪等性,不管來多少條重復消息,最后處理的結果都一樣。第2條原理就是利用一張日志表來記錄已經處理成功的消息的ID,如果新到的消息ID已經在日志表中,那么就不再處理這條消息。

第1條解決方案,很明顯應該在消費端實現,不屬于消息系統要實現的功能。第2條可以消息系統實現,也可以業務端實現。正常情況下出現重復消息的概率其實很小,如果由消息系統來實現的話,肯定會對消息系統的吞吐量和高可用有影響,所以最好還是由業務端自己處理消息重復的問題,這也是RocketMQ不解決消息重復的問題的原因。

RocketMQ不保證消息不重復,如果你的業務需要保證嚴格的不重復消息,需要你自己在業務端去重。

三、事務消息

RocketMQ除了支持普通消息,順序消息,另外還支持事務消息。首先討論一下什么是事務消息以及支持事務消息的必要性。我們以一個轉帳的場景為例來說明這個問題:Bob向Smith轉賬100塊。

在單機環境下,執行事務的情況,大概是下面這個樣子:

單機環境下轉賬事務示意圖

當用戶增長到一定程度,Bob和Smith的賬戶及余額信息已經不在同一臺服務器上了,那么上面的流程就變成了這樣:

集群環境下轉賬事務示意圖

這時候你會發現,同樣是一個轉賬的業務,在集群環境下,耗時居然成倍的增長,這顯然是不能夠接受的。那如何來規避這個問題?

大事務 = 小事務 + 異步

將大事務拆分成多個小事務異步執行。這樣基本上能夠將跨機事務的執行效率優化到與單機一致。轉賬的事務就可以分解成如下兩個小事務:

?

小事務+異步消息

?

圖中執行本地事務(Bob賬戶扣款)和發送異步消息應該保證同時成功或者同時失敗,也就是扣款成功了,發送消息一定要成功,如果扣款失敗了,就不能再發送消息。那問題是:我們是先扣款還是先發送消息呢?

首先看下先發送消息的情況,大致的示意圖如下:

事務消息:先發送消息

存在的問題是:如果消息發送成功,但是扣款失敗,消費端就會消費此消息,進而向Smith賬戶加錢。

先發消息不行,那就先扣款吧,大致的示意圖如下:

事務消息-先扣款

存在的問題跟上面類似:如果扣款成功,發送消息失敗,就會出現Bob扣錢了,但是Smith賬戶未加錢。

可能大家會有很多的方法來解決這個問題,比如:直接將發消息放到Bob扣款的事務中去,如果發送失敗,拋出異常,事務回滾。這樣的處理方式也符合“恰好”不需要解決的原則。

這里需要說明一下:如果使用Spring來管理事物的話,大可以將發送消息的邏輯放到本地事物中去,發送消息失敗拋出異常,Spring捕捉到異常后就會回滾此事物,以此來保證本地事物與發送消息的原子性。

RocketMQ支持事務消息,下面來看看RocketMQ是怎樣來實現的。

RocketMQ實現發送事務消息

RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段通過第一階段拿到的地址去訪問消息,并修改消息的狀態。

細心的你可能又發現問題了,如果確認消息發送失敗了怎么辦?RocketMQ會定期掃描消息集群中的事物消息,如果發現了Prepared消息,它會向消息發送端(生產者)確認,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。

那我們來看下RocketMQ源碼,是如何處理事務消息的。客戶端發送事務消息的部分(完整代碼請查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):

// =============================發送事務消息的一系列準備工作========================================
// 未決事務,MQ服務器回查客戶端
// 也就是上文所說的,當RocketMQ發現`Prepared消息`時,會根據這個Listener實現的策略來決斷事務
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 構造事務消息的生產者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 設置事務決斷處理類
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事務的處理邏輯,相當于示例中檢查Bob賬戶并扣錢的邏輯
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 構造MSG,省略構造參數
Message msg = new Message(......);
// 發送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();

接著查看sendMessageInTransaction方法的源碼,總共分為3個階段:發送Prepared消息、執行本地事務、發送確認消息。

//  ================================事務消息的發送過程=============================================
public TransactionSendResult sendMessageInTransaction(.....)  {// 邏輯代碼,非實際代碼// 1.發送消息sendResult = this.send(msg);// sendResult.getSendStatus() == SEND_OK// 2.如果消息發送成功,處理與消息關聯的本地事務單元LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);// 3.結束事務this.endTransaction(sendResult, localTransactionState, localException);
}

endTransaction方法會將請求發往broker(mq server)去更新事務消息的最終狀態:

  1. 根據sendResult找到Prepared消息sendResult包含事務消息的ID
  2. 根據localTransaction更新消息的最終狀態

如果endTransaction方法執行失敗,數據沒有發送到broker,導致事務消息的 狀態更新失敗,broker會有回查線程定時(默認1分鐘)掃描每個存儲事務狀態的表格文件,如果是已經提交或者回滾的消息直接跳過,如果是prepared狀態則會向Producer發起CheckTransaction請求,Producer會調用DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回調請求,而checkTransactionState會調用我們的事務設置的決斷方法來決定是回滾事務還是繼續執行,最后調用endTransactionOnewaybroker來更新消息的最終狀態。

再回到轉賬的例子,如果Bob的賬戶的余額已經減少,且消息已經發送成功,Smith端開始消費這條消息,這個時候就會出現消費失敗和消費超時兩個問題,解決超時問題的思路就是一直重試,直到消費端消費消息成功,整個過程中有可能會出現消息重復的問題,按照前面的思路解決即可。

消費事務消息

這樣基本上可以解決消費端超時問題,但是如果消費失敗怎么辦?阿里提供給我們的解決方法是:人工解決。大家可以考慮一下,按照事務的流程,因為某種原因Smith加款失敗,那么需要回滾整個流程。如果消息系統要實現這個回滾流程的話,系統復雜度將大大提升,且很容易出現Bug,估計出現Bug的概率會比消費失敗的概率大很多。這也是RocketMQ目前暫時沒有解決這個問題的原因,在設計實現消息系統時,我們需要衡量是否值得花這么大的代價來解決這樣一個出現概率非常小的問題,這也是大家在解決疑難問題時需要多多思考的地方。

20160321補充:在3.2.6版本中移除了事務消息的實現,所以此版本不支持事務消息,具體情況請參考rocketmq的issues(已失效):
https://github.com/alibaba/RocketMQ/issues/65
https://github.com/alibaba/RocketMQ/issues/138
https://github.com/alibaba/RocketMQ/issues/156

四、Producer如何發送消息

Producer輪詢某topic下的所有隊列的方式來實現發送方的負載均衡,如下圖所示:

producer發送消息負載均衡


首先分析一下RocketMQ的客戶端發送消息的源碼:

?

// 構造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer,整個應用生命周期內,只需要初始化1次
producer.start();
// 構造Message
Message msg = new Message("TopicTest1",// topic"TagA",// tag:給消息打標簽,用于區分一類消息,可為null"OrderID188",// key:自定義Key,可以用于去重,可為null("Hello MetaQ").getBytes());// body:消息內容
// 發送消息并返回結果
SendResult sendResult = producer.send(msg);
// 清理資源,關閉網絡連接,注銷自己
producer.shutdown();

在整個應用生命周期內,生產者需要調用一次start方法來初始化,初始化主要完成的任務有:

  1. 如果沒有指定namesrv地址,將會自動尋址
  2. 啟動定時任務:更新namesrv地址、從namsrv更新topic路由信息、清理已經掛掉的broker、向所有broker發送心跳...
  3. 啟動負載均衡的服務

初始化完成后,開始發送消息,發送消息的主要代碼如下:

private SendResult sendDefaultImpl(Message msg,......) {// 檢查Producer的狀態是否是RUNNINGthis.makeSureStateOK();// 檢查msg是否合法:是否為null、topic,body是否為空、body是否超長Validators.checkMessage(msg, this.defaultMQProducer);// 獲取topic路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());// 從路由信息中選擇一個消息隊列MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);// 將消息發送到該隊列上去sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}

代碼中需要關注的兩個方法tryToFindTopicPublishInfoselectOneMessageQueue。前面說過在producer初始化時,會啟動定時任務獲取路由信息并更新到本地緩存,所以tryToFindTopicPublishInfo會首先從緩存中獲取topic路由信息,如果沒有獲取到,則會自己去namesrv獲取路由信息。selectOneMessageQueue方法通過輪詢的方式,返回一個隊列,以達到負載均衡的目的。

如果Producer發送消息失敗,會自動重試,重試的策略:

  1. 重試次數 < retryTimesWhenSendFailed(可配置)
  2. 總的耗時(包含重試n次的耗時) < sendMsgTimeout(發送消息時傳入的參數)
  3. 同時滿足上面兩個條件后,Producer會選擇另外一個隊列發送消息

五、消息存儲

RocketMQ的消息存儲是由consume queuecommit log配合完成的。

1、Consume Queue

consume queue是消息的邏輯隊列,相當于字典的目錄,用來指定消息在物理文件commit log上的位置。

我們可以在配置中指定consumequeuecommitlog存儲的目錄
每個topic下的每個queue都有一個對應的consumequeue文件,比如:

${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

Consume Queue文件組織,如圖所示:

Consume Queue文件組織示意圖

  1. 根據topicqueueId來組織文件,圖中TopicA有兩個隊列0,1,那么TopicA和QueueId=0組成一個ConsumeQueue,TopicA和QueueId=1組成另一個ConsumeQueue。
  2. 按照消費端的GroupName來分組重試隊列,如果消費端消費失敗,消息將被發往重試隊列中,比如圖中的%RETRY%ConsumerGroupA
  3. 按照消費端的GroupName來分組死信隊列,如果消費端消費失敗,并重試指定次數后,仍然失敗,則發往死信隊列,比如圖中的%DLQ%ConsumerGroupA

死信隊列(Dead Letter Queue)一般用于存放由于某種原因無法傳遞的消息,比如處理失敗或者已經過期的消息。

Consume Queue中存儲單元是一個20字節定長的二進制數據,順序寫順序讀,如下圖所示:

consumequeue文件存儲單元格式

  1. CommitLog Offset是指這條消息在Commit Log文件中的實際偏移量
  2. Size存儲中消息的大小
  3. Message Tag HashCode存儲消息的Tag的哈希值:主要用于訂閱時消息過濾(訂閱時如果指定了Tag,會根據HashCode來快速查找到訂閱的消息)

2、Commit Log

CommitLog:消息存放的物理文件,每臺broker上的commitlog被本機所有的queue共享,不做任何區分。
文件的默認位置如下,仍然可通過配置文件修改:

${user.home} \store\${commitlog}\${fileName}

CommitLog的消息存儲單元長度不固定,文件順序寫,隨機讀。消息的存儲結構如下表所示,按照編號順序以及編號對應的內容依次存儲。

?

Commit Log存儲單元結構圖

3、消息存儲實現

消息存儲實現,比較復雜,也值得大家深入了解,后面會單獨成文來分析(目前正在收集素材),這小節只以代碼說明一下具體的流程。

// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
synchronized (this) {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();// Here settings are stored timestamp, in order to ensure an orderly globalmsg.setStoreTimestamp(beginLockTimestamp);// MapedFile:操作物理文件在內存中的映射以及將內存數據持久化到物理文件中MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();// 將Message追加到文件commitlogresult = mapedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:// Create a new file, re-write the messagemapedFile = this.mapedFileQueue.getLastMapedFile();result = mapedFile.appendMessage(msg, this.appendMessageCallback);break;DispatchRequest dispatchRequest = new DispatchRequest(topic,// 1queueId,// 2result.getWroteOffset(),// 3result.getWroteBytes(),// 4tagsCode,// 5msg.getStoreTimestamp(),// 6result.getLogicsOffset(),// 7msg.getKeys(),// 8/*** Transaction*/msg.getSysFlag(),// 9msg.getPreparedTransactionOffset());// 10// 1.分發消息位置到ConsumeQueue// 2.分發到IndexService建立索引this.defaultMessageStore.putDispatchRequest(dispatchRequest);
}

4、消息的索引文件

如果一個消息包含key值的話,會使用IndexFile存儲消息索引,文件的內容結構如圖:

?

消息索引

?

索引文件主要用于根據key來查詢消息的,流程主要是:

  1. 根據查詢的 key 的 hashcode%slotNum 得到具體的槽的位置(slotNum 是一個索引文件里面包含的最大槽的數目,例如圖中所示 slotNum=5000000)
  2. 根據 slotValue(slot 位置對應的值)查找到索引項列表的最后一項(倒序排列,slotValue 總是指向最新的一個索引項)
  3. 遍歷索引項列表返回查詢時間范圍內的結果集(默認一次最大返回的 32 條記錄)

六、消息訂閱

RocketMQ消息訂閱有兩種模式,一種是Push模式,即MQServer主動向消費端推送;另外一種是Pull模式,即消費端在需要時,主動到MQServer拉取。但在具體實現時,Push和Pull模式都是采用消費端主動拉取的方式。

首先看下消費端的負載均衡:

?

消費端負載均衡

?

消費端會通過RebalanceService線程,10秒鐘做一次基于topic下的所有隊列負載:

  1. 遍歷Consumer下的所有topic,然后根據topic訂閱所有的消息
  2. 獲取同一topic和Consumer Group下的所有Consumer
  3. 然后根據具體的分配策略來分配消費隊列,分配的策略包含:平均分配、消費端配置等

如同上圖所示:如果有 5 個隊列,2 個 consumer,那么第一個 Consumer 消費 3 個隊列,第二 consumer 消費 2 個隊列。這里采用的就是平均分配策略,它類似于分頁的過程,TOPIC下面的所有queue就是記錄,Consumer的個數就相當于總的頁數,那么每頁有多少條記錄,就類似于某個Consumer會消費哪些隊列。

通過這樣的策略來達到大體上的平均消費,這樣的設計也可以很方面的水平擴展Consumer來提高消費能力。

消費端的Push模式是通過長輪詢的模式來實現的,就如同下圖:

Push模式示意圖

?

Consumer端每隔一段時間主動向broker發送拉消息請求,broker在收到Pull請求后,如果有消息就立即返回數據,Consumer端收到返回的消息后,再回調消費者設置的Listener方法。如果broker在收到Pull請求時,消息隊列里沒有數據,broker端會阻塞請求直到有數據傳遞或超時才返回。

當然,Consumer端是通過一個線程將阻塞隊列LinkedBlockingQueue<PullRequest>中的PullRequest發送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest時,如果發現沒有消息,就會把PullRequest扔到ConcurrentHashMap中緩存起來。broker在啟動時,會啟動一個線程不停的從ConcurrentHashMap取出PullRequest檢查,直到有數據返回。

七、RocketMQ的其他特性

前面的6個特性都是基本上都是點到為止,想要深入了解,還需要大家多多查看源碼,多多在實際中運用。當然除了已經提到的特性外,RocketMQ還支持:

  1. 定時消息
  2. 消息的刷盤策略
  3. 主動同步策略:同步雙寫、異步復制
  4. 海量消息堆積能力
  5. 高效通信
  6. .......

其中涉及到的很多設計思路和解決方法都值得我們深入研究:

  1. 消息的存儲設計:既要滿足海量消息的堆積能力,又要滿足極快的查詢效率,還要保證寫入的效率。
  2. 高效的通信組件設計:高吞吐量,毫秒級的消息投遞能力都離不開高效的通信。
  3. .......

RocketMQ最佳實踐

一、Producer最佳實踐

1、一個應用盡可能用一個 Topic,消息子類型用 tags 來標識,tags 可以由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才可以利用 tags 在 broker 做消息過濾。
2、每個消息在業務層面的唯一標識碼,要設置到 keys 字段,方便將來定位消息丟失問題。由于是哈希索引,請務必保證 key 盡可能唯一,這樣可以避免潛在的哈希沖突。
3、消息發送成功或者失敗,要打印消息日志,務必要打印 sendresult 和 key 字段。
4、對于消息不可丟失應用,務必要有消息重發機制。例如:消息發送失敗,存儲到數據庫,能有定時程序嘗試重發或者人工觸發重發。
5、某些應用如果不關注消息是否發送成功,請直接使用sendOneWay方法發送消息。

二、Consumer最佳實踐

1、消費過程要做到冪等(即消費端去重)
2、盡量使用批量方式消費方式,可以很大程度上提高消費吞吐量。
3、優化每條消息消費過程

三、其他配置

線上應該關閉autoCreateTopicEnable,即在配置文件中將其設置為false

RocketMQ在發送消息時,會首先獲取路由信息。如果是新的消息,由于MQServer上面還沒有創建對應的Topic,這個時候,如果上面的配置打開的話,會返回默認TOPIC的(RocketMQ會在每臺broker上面創建名為TBW102的TOPIC)路由信息,然后Producer會選擇一臺Broker發送消息,選中的broker在存儲消息時,發現消息的topic還沒有創建,就會自動創建topic。后果就是:以后所有該TOPIC的消息,都將發送到這臺broker上,達不到負載均衡的目的。

所以基于目前RocketMQ的設計,建議關閉自動創建TOPIC的功能,然后根據消息量的大小,手動創建TOPIC。

RocketMQ設計相關

RocketMQ的設計假定:

每臺PC機器都可能宕機不可服務
任意集群都有可能處理能力不足
最壞的情況一定會發生
內網環境需要低延遲來提供最佳用戶體驗

RocketMQ的關鍵設計:

分布式集群化
強數據安全
海量數據堆積
毫秒級投遞延遲(推拉模式)

這是RocketMQ在設計時的假定前提以及需要到達的效果。我想這些假定適用于所有的系統設計。隨著我們系統的服務的增多,每位開發者都要注意自己的程序是否存在單點故障,如果掛了應該怎么恢復、能不能很好的水平擴展、對外的接口是否足夠高效、自己管理的數據是否足夠安全...... 多多規范自己的設計,才能開發出高效健壯的程序。

參考資料

  1. RocketMQ用戶指南
  2. RocketMQ原理簡介
  3. RocketMQ最佳實踐
  4. 阿里分布式開放消息服務(ONS)原理與實踐2
  5. 阿里分布式開放消息服務(ONS)原理與實踐3
  6. RocketMQ原理解析


轉自:https://www.jianshu.com/p/453c6e7ff81c
來源:簡書

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/448894.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/448894.shtml
英文地址,請注明出處:http://en.pswp.cn/news/448894.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

數據結構02-鏈表

說明&#xff1a;由于該數據結構是由java并且是原生實現&#xff0c;所以與C有一些出入&#xff0c;不過原理是相同的 1.鏈表的定義 為了表示線性表元素a與a1的邏輯關系&#xff0c;存儲數據時&#xff0c;除了存儲元素本身的信息之外&#xff0c;還存儲了直接后繼元素的位置信…

第四章 面向對象

第四章 面向對象 1. 基本格式 定義&#xff1a;當函數(業務功能)比較多&#xff0c;可以使用面向對象來進行歸類&#xff0c;如果有一個凡事使用的公共值&#xff0c;也可以放到對象中 #格式&關鍵字 class 類名:def __inti__(self,x)self.x xdef 方法名(self,name):print(…

洛谷P2347 砝碼稱重 某一年noip提高組原題

可以轉化為01背包求方案數的問題&#xff0c;dp數組f[][]表示第幾個砝碼能稱出的重量,可壓縮至一維 轉移方程為f(i,j)f(i-1,j-w[i]) 當前我們可以稱出的重量必定是由之前的砝碼重量轉移過來的 #include<bits/stdc.h> using namespace std; const int N550; const int max…

解決:-bash: unzip: command not found (Linux 中 unZip/Zip 的安裝及使用)

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 Linux系統沒有自帶的壓縮解壓工具&#xff1b;需要我們自己安裝&#xff1b; 當用到zip或者unzip如果沒有安裝就會出現 unzip: Command…

云計算時代IT專業人員需具備的10項技能

摘要&#xff1a;IT專業人員需要不斷的學習&#xff0c;才能確保自己的工作能力跟上時代的步伐。云時代IT專業人員不僅需要具備一定的專業技能&#xff0c;比如快速運用自身知識快速在互聯網上構建應用程序&#xff0c;還必須具備商業、金融、業務需求分析等等。 【編者按】談…

java自定義注解學習筆記

注解學習筆記之自定義注解 Target&#xff08;{1,2,3,4,5,6,7}&#xff09; 1.ElementType.CONSTRUCTOR:用于描述構造器2.ElementType.FIELD:用于描述域3.ElementType.LOCAL_VARIABLE:用于描述局部變量4.ElementType.METHOD:用于描述方法5.ElementType.PACKAGE:用于描述包6.Ele…

[xsy3132]數表

題意&#xff1a;一個$n\times m$的數表&#xff0c;數值$\in[0,4)$&#xff0c;你可以任意次選擇一行或一列$1,\text{mod }4$&#xff0c;要最小化所有數的和 因為$n\leq10$&#xff0c;所以數表可以看成$m$個$n$位$4$進制數$a_{1\cdots m}$&#xff0c;以下使用不進位加法 定…

linux 下載、安裝 maven

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 1. 創建maven的文件夾并下載maven的tar包到此文件夾中 //進入一個目錄 cd /usr/local//創建一個文件夾 mkdir maven//下載maven的tar包…

ELK4之進階學習

1.精確查找和模糊查找(term和match的區別) match經過分析(analyer)的, term是不經過分詞,直接去倒排索引中查找精確的值. 2.建議器的簡介(最左前綴或者自帶的做) (1)直接用現成的 (2)不只是糾錯,還有建議等等. (3)優點:用戶體驗,服務器減少請求(減少壓力,太耗電了,熱量太大) (4…

女人必知 教你認清6種隱性壞男人

周圍不乏有女朋友喜歡歷數往事、追憶曾擦肩而過的男人&#xff0c;有的說如果不是自己太苛求提早要見他家人引起反感&#xff0c;早就和心愛的人儷影雙雙甜蜜快樂了&#xff0c;還有的說暗戀的男生那一夜向他表露情感、她萬分感動、可男生最后提出上床她拒絕了、因而錯失了一段…

c# 編程學習(二)

2019獨角獸企業重金招聘Python工程師標準>>> 標識符是對程序中的各個元素進行標識的名稱。 ? 只能使用字母(大寫和小寫)、數字和下劃線 ? 標識符必須以字母或下劃線開頭 變量是容納值的存儲位置。可將變量想象成容納臨時信息的容器 命名變量的建議&#xff1a; …

linux 中的 nohup 命令(設置后臺進程): nohup: ignoring input and appending output to ‘nohup.out’

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 一、Linux 下使用 nohup Unix/Linux下一般比如想讓某個程序在后臺運行&#xff0c;很多都是使用 & 在程序結尾來讓程序自動運行。 …

PowerDesigner表結構和字段大小寫轉換

原文&#xff1a;https://www.cnblogs.com/zhzhang/p/3946609.html 【轉】PowerDesigner表結構和字段大小寫轉換 【轉自】http://blog.csdn.net/xysh1991/article/details/8016192 使用方法&#xff1a;進入PowerDesigner&#xff0c;打開一個PDM&#xff0c;在菜單欄找到&…

解決:Could not find or load main class org.apache.rocketmq.example.quickstart.Producer

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 1.情景描述 &#xff1a;我只是想安裝運行 rocketmq&#xff0c;執行命令&#xff1a; sh bin/tools.sh org.apache.rocketmq.example.…

深入理解C++ 虛函數表

目錄 深入理解C 虛函數表虛函數表概述單繼承下的虛函數表派生類未覆蓋基類虛函數派生類覆蓋基類虛函數多繼承下的虛函數表無虛函數覆蓋派生類覆蓋基類虛函數鉆石型虛繼承總結幾個原則安全性問題深入理解C 虛函數表 ? C中的虛函數的作用主要是實現了多態的機制。關于多態&#…

react-native-baidu-map使用及注意問題

使用組件&#xff1a; react-native-baidu-map 獲取百度地圖API_KEY 地址&#xff1a;lbsyun.baidu.com&#xff0c;在控制臺成功創建應用后&#xff0c;就可以看到應用的api key了 安裝 yarn add react-native-baidu-map 復制代碼原生部分 Android配置 react-native link reac…

簡單掃清身體垃圾

“我們的身體在被‘設計’之初&#xff0c;就擁有了自主掃除體內垃圾的功能。只不過&#xff0c;這需要我們按照正確的方法去激發它 。”美國暢銷書作者喬斯卡曼和朱莉佩萊斯&#xff0c;在她們去年合著的《自我清潔》一書中強調了養成良好生活習慣可為身體排毒的重要性。 近日…

linux (阿里云 CentOS7) 中安裝配置 RocketMQ

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 JDK1.8的安裝&#xff1a; 1.檢查系統的JDK版本 根目錄下操作&#xff1a;cd java -version 2.檢測JDK安裝包 rpm -qa | grep ja…

Bootstrap簡介

1.使用準備 1.1 Bootstrap的下載 http://www.bootcss.com&#xff0c;下載用于生產環境的Bootstrap即可。 1.2 Bootstrap包含的內容 ● 全局CSS&#xff1a;基本的 HTML 元素均可以通過 class 設置樣式并得到增強效果&#xff1b;還有先進的柵格系統。 ● 組件&#xff1a;無數…

用TortoiseGit時的實用git命令

生成并獲取 sshkey&#xff1a; ssh-keygen -t rsa -C "xxxxxxxxxx.com" cat ~/.ssh/id_rsa.pub 克隆倉庫&#xff1a; git clone xxxxxx/xxx.git 重命名文件&#xff1a; mv file_name new_file_name git目錄區分大小寫&#xff1a; git config core.ignorecase fal…