MQ 之 RocketMQ

前些天發現了一個巨牛的人工智能學習網站,通俗易懂,風趣幽默,忍不住分享一下給大家。點擊跳轉到教程。

RocketMQ 是出自 A 公司的開源產品,用 Java 語言實現,在設計時參考了 Kafka,并做出了自己的一些改進,消息可靠性上比 Kafka 更好,目前,RocketMQ 的文檔仍然不夠豐富?1?2,社區仍然無法與 Kafka 比肩,但 A 公司已經推出了基于 RocketMQ 的云產品?3,相信未來 RocketMQ 也會有不錯的發展。本文采用 RocketMQ 3.2.6 進行實驗,由于 RocketMQ 與 Kafka 很相似,本文很多地方對兩者做出了比較。

基本概念

RocketMQ 由于借鑒了 Kafka 的設計,包括組件的命名也很多與 Kafka 相似,下面摘抄一段《RocketMQ 原理簡介》中的介紹,可以與 Kafka 的命名比對一下,

  • Producer,消息生產者,負責產生消息,一般由業務系統負責產生消息。
  • Consumer,消息消費者,負責消費消息,一般是后臺系統負責異步消費。
  • Push Consumer,Consumer 的一種,應用通常向 Consumer 對象注冊一個 Listener 接口,一旦收到消息,Consumer 對象立 刻回調 Listener 接口方法。
  • Pull Consumer,Consumer 的一種,應用通常主動調用 Consumer 的拉消息方法從 Broker 拉消息,主動權由應用控制。
  • Producer Group,一類 Producer 的集合名稱,這類 Producer 通常發送一類消息,且發送邏輯一致。
  • Consumer Group,一類 Consumer 的集合名稱,這類 Consumer 通常消費一類消息,且消費邏輯一致。
  • Broker,消息中轉角色,負責存儲消息,轉發消息,一般也稱為 Server。在 JMS 規范中稱為 Provider。

《RocketMQ 原理簡介》中還介紹了一些其他的概念,例如,廣播消費和集群消費,廣播消費是 Consumer Group 中對于同一條消息每個 Consumer 都消費,集群消費是 Consumer Group 中對于同一條消息只有一個 Consumer 消費。Kafka 采用的是集群消費,不支持廣播消費(好吧,是我沒有找到)。再例如,普通順序消息和嚴格順序消息,普通順序消息在 Broker 重啟情況下不會保證消息順序性;嚴格順序消息即使在異常情況下也會保證消息的順序性。個人理解,所謂普通順序消息,應該就是 Kafka 中的 Partition 級別有序,嚴格順序消息,應該是 Topic 級別有序,但文中也提到,這樣的有序級別是要付出代價的,Broker 集群中只要有一臺機器不可用,則整個集群都不可用,降低服務可用性。使用這種模式,需要依賴同步雙寫,主備自動切換,但自動切換功能目前還未實現(我猜,自動切換僅僅是沒開源吧)。說白了,嚴格順序消息不具備生產可用性,自己玩玩還行,其應用場景主要是數據庫 binlog 同步。

關于 RocketMQ 和 Kafka 的對比,可以參考 RocketMQ Wiki 中的文章?4,看看就行,不必較真。

關于順序和分區

順序性的話題,剛才已經提到了一些,RocketMQ 的實現應該不弱于 Kafka。對于分區,RocketMQ 似乎有意弱化了這個概念,只有在 Producer 中有一個參數?defaultTopicQueueNums,分區在 RocketMQ 中有時被稱為隊列。RocketMQ 的普通順序消息模式,應該就是分區順序性,這點與 Kafka 一致。

關于高可用

RocketMQ 實現高可用的方式有多種,《RocketMQ 用戶指南》文檔中提到的有:多主模式、多主多從異步復制模式、多主多從同步復制模式。多主模式下,性能較好,但是在 Broker 宕機的時候,該 Broker 上未消費的交易不可消費;多主多從異步復制模式,與 Kafka 的副本模式比較類似,主 Broker 宕機后,會自動切換到從 Broker,消息的消費不會出現間斷;多主多從同步復制模式更進一步,采用同步刷盤的方式,避免了主 Broker 宕機帶來的消息丟失,但是,目前不支持自動切換。

雖然 RocketMQ 提供了多種高可用方式,但是目前能生產使用的就只有多主多從異步復制模式,即使在這個模式上,其實現也比 Kafka 要差。因為 RocketMQ 的機制中,主從關系是人為指定的,主 Broker 上承擔所有的消息派發,而 Kafka 的主從關系是通過選舉的方式選出來的,每個分區的主節點都是不一樣的,可以從不同的節點派發消息。Kafka 的模式是分散模式,有利于負載均衡,而且當一個 Broker 宕機的時候,只影響部分 Topic,而 RocketMQ 一旦主 Broker 宕機,會影響所有的 Topic。另外,Kafka 可以支持 Broker 間同步復制(通過設置 Broker 的?acks?參數),這樣比的話,RocketMQ 就差太多了。

關于 RocketMQ 的介紹,網上的文章不算太多,也比較雜,《分布式開放消息系統(RocketMQ)的原理與實踐》5?6?7這篇原理介紹的不錯,推薦。

RocketMQ 的工具和編程接口

RocketMQ 的工具

相比較 Kafka 而言,RocketMQ 提供的工具要少一些,如下,

bin/mqadminbin/mqbrokerbin/mqbroker.numanode0bin/mqbroker.numanode1bin/mqbroker.numanode2bin/mqbroker.numanode3bin/mqfiltersrvbin/mqnamesrvbin/mqshutdown

除了進程啟停之外,常用的運維命令都在?mqadmin?中,詳見《RocketMQ 運維指令》文檔。我實驗中常用的一些命令如下,

sh mqnamesrv &sh mqbroker -c async-broker-a.properties &sh mqbroker -c async-broker-a-s.properties &sh mqadmin topicList -n 192.168.232.23:9876sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTestjjjsh mqadmin clusterList -n 192.168.232.23:9876sh mqadmin deleteTopic -c DefaultCluster -n 192.168.232.23:9876 -t TopicTestjjjsh mqadmin consumerProgress -n 192.168.232.23:9876 -g ConsumerGroupNamecc4sh mqadmin deleteSubGroup -c DefaultCluster -n 192.168.232.23:9876 -g ConsumerGroupNamecc4sh mqadmin consumerConnection -n 192.168.232.23:9876 -g ConsumerGroupNamecc4

RocketMQ 使用了自己的 name server 來做調度(Kafka 用了 Zookeeper),使用?sh mqnamesrv?來啟動,默認監聽端口9876,sh mqnamesrv -m?可以查看所有默認參數,使用?-c xxxx.properties?參數來指定自定義配置。sh mqbroker?是用于啟動 Broker 的命令,參數比較多,詳細可以通過?sh mqbroker -m?查看默認參數,配置項細節后文再說。sh mqadmin?是運維命令入口,topicList?是列出所有 Topic;topicRoute?是列出單個 Topic 的詳細信息;clusterList?是列出集群的信息;deleteTopic?是刪除 Topic。consumerProgress?是查看消費者消費進度,deleteSubGroup?是刪除消費者的訂閱,consumerConnection?是查詢消費者訂閱的情況。

Broker 的配置是最多的,實驗中我修改到的部分如下,其他使用默認,

brokerClusterName=DefaultClusterbrokerIP1=192.168.232.23brokerName=broker-abrokerId=0namesrvAddr=192.168.232.23:9876listenPort=10911deleteWhen=04fileReservedTime=120storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlogbrokerRole=ASYNC_MASTERflushDiskType=ASYNC_FLUSH

配置文件中的多數配置看例子就可以知道意思,挑幾個說一下。brokerName?和?brokerId, 同名的 Broker,ID 是0的是主節點,其他是從節點;deleteWhen,刪除文件時間點,默認凌晨4點;fileReservedTime,文件保留時間,設置為120小時;brokerRole,Broker 的角色,ASYNC_MASTER 是異步復制主節點,SYNC_MASTER 是同步雙寫主節點,SLAVE 是備節點。

其實,這些工具的寫法也基本一致,都是先做一些檢查,最后運行 Java 程序,JVM 系統上的應用應該差不多都這樣。

RocketMQ 的 Java API

RocketMQ 是用 Java 語言開發的,因此,其 Java API 相對是比較豐富的,當然也有部分原因是 RocketMQ 本身提供的功能就比較多。RocketMQ API 提供的功能包括,

  1. 廣播消費,這個在之前已經提到過;
  2. 消息過濾,支持簡單的 Message Tag 過濾,也支持按 Message Header、body 過濾;
  3. 順序消費和亂序消費,之前也提到過,這里的順序消費應該指的是普通順序性,這一點與 Kafka 相同;
  4. Pull 模式消費,這個是相對 Push 模式來說的,Kafka 就是 Pull 模式消費;
  5. 事務消息,這個好像沒有開源,但是 example 代碼中有示例,總之,不推薦用;
  6. Tag,RocketMQ 在 Topic 下面又分了一層 Tag,用于表示消息類別,可以用來過濾,但是順序性還是以 Topic 來看;

單看功能的話,即使不算事務消息,也不算 Tag,RocketMQ 也遠超 Kafka,Kafka 應該只實現了 Pull 模式消費 + 順序消費這2個功能。RocketMQ 的代碼示例在 rocketmq-example 中,注意,代碼是不能直接運行的,因為所有的代碼都少了設置 name server 的部分,需要自己手動加上,例如,producer.setNamesrvAddr("192.168.232.23:9876");

先來看一下生產者的 API,比較簡單,只有一種,如下,

import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.MessageQueueSelector;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.common.message.MessageQueue;import java.util.List;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.setNamesrvAddr("192.168.232.23:9876");producer.start();for (int i = 0; i < 10; i++)try {{Message msg = new Message("TopicTest1",// topic"TagA",// tag"OrderID188",// key("RocketMQ "+String.format("%05d", i)).getBytes());// bodySendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, i));System.out.println(String.format("%05d", i)+sendResult);}}catch (Exception e) {e.printStackTrace();}producer.shutdown();}}

可以發現,相比 Kafka 的 API,只多了 Tag,但實際上行為有很大不同。Kafka 的生產者客戶端,有同步和異步兩種模式,但都是阻塞模式,send?方法返回發送狀態的?Future,可以通過?Future?的?get?方法阻塞獲得發送狀態。而 RocketMQ 采用的是同步非阻塞模式,發送之后立刻返回發送狀態(而不是?Future)。正常情況下,兩者使用上差別不大,但是在高可用場景中發生主備切換的時候,Kafka 的同步可以等待切換完成并重連,最后返回;而 RocketMQ 只能立刻報錯,由生產者選擇是否重發。所以,在生產者的 API 上,其實 Kafka 是要強一些的。

另外,RocketMQ 可以通過指定?MessageQueueSelector?類的實現來指定將消息發送到哪個分區去,Kafka 是通過指定生產者的?partitioner.class?參數來實現的,靈活性上 RocketMQ 略勝一籌。

再來看消費者的API,由于 RocketMQ 的功能比較多,我們先看 Pull 模式消費的API,如下,

import java.util.HashMap;import java.util.Map;import java.util.Set;import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;import com.alibaba.rocketmq.client.consumer.PullResult;import com.alibaba.rocketmq.client.consumer.store.OffsetStore;import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;import com.alibaba.rocketmq.common.message.MessageQueue;public class PullConsumer {private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");for (MessageQueue mq : mqs) {System.out.println("Consume from the queue: " + mq);SINGLE_MQ: while (true) {try {long offset = consumer.fetchConsumeOffset(mq, true);PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);if (null != pullResult.getMsgFoundList()) {for (MessageExt messageExt : pullResult.getMsgFoundList()) {System.out.print(new String(messageExt.getBody()));System.out.print(pullResult);System.out.println(messageExt);}}putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:// TODObreak;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}}catch (Exception e) {e.printStackTrace();}}}consumer.shutdown();}private static void putMessageQueueOffset(MessageQueue mq, long offset) {offseTable.put(mq, offset);}private static long getMessageQueueOffset(MessageQueue mq) {Long offset = offseTable.get(mq);if (offset != null)return offset;return 0;}}

這部分的 API 其實是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分區,而 Kafka 可以自動管理(當然也可以手動管理),并且不需要指定分區(分區是在 Kafka 訂閱的時候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用?OffsetStore?接口,提供了兩種管理方式,本地文件和遠程 Broker。這部分感覺兩者差不多。

下面再看看 Push 模式順序消費,代碼如下,

import java.util.List;import java.util.concurrent.atomic.AtomicLong;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest1", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(false);System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;}else if ((this.consumeTimes.get() % 3) == 0) {return ConsumeOrderlyStatus.ROLLBACK;}else if ((this.consumeTimes.get() % 4) == 0) {return ConsumeOrderlyStatus.COMMIT;}else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}

雖然提供了 Push 模式,RocketMQ 內部實際上還是 Pull 模式的 MQ,Push 模式的實現應該采用的是長輪詢,這點與 Kafka 一樣。使用該方式有幾個注意的地方,

  1. 接收消息的監聽類要使用?MessageListenerOrderly
  2. ConsumeFromWhere?有幾個參數,表示從頭開始消費,從尾開始消費,還是從某個 TimeStamp 開始消費;
  3. 可以控制 offset 的提交,應該就是?context.setAutoCommit(false);?的作用;

控制 offset 提交這個特性非常有用,某種程度上擴展一下,就可以當做事務來用了,看代碼?ConsumeMessageOrderlyService?的實現,其實并沒有那么復雜,在不啟用 AutoCommit 的時候,只有返回?COMMIT?才 commit offset;啟用 AutoCommit 的時候,返回?COMMITROLLBACK(這個比較扯)、SUCCESS?的時候,都 commit offset。

后來發現,commit offset 功能在 Kafka 里面也有提供,使用新的 API,調用?consumer.commitSync

再看一個 Push 模式亂序消費 + 消息過濾的例子,消費者的代碼如下,

import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.subscribe("TopicTest1", MessageFilterImpl.class.getCanonicalName());consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}


接收消息的監聽類使用的是?MessageListenerConcurrently;這個例子與之前順序消費不同的地方在于,

  1. 回調方法中,使用的是自動 offset commit;
  2. 訂閱的時候增加了消息過濾類?MessageFilterImpl

消息過濾類?MessageFilterImpl?的代碼如下,

import com.alibaba.rocketmq.common.filter.MessageFilter;import com.alibaba.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter {@Overridepublic boolean match(MessageExt msg) {String property = msg.getUserProperty("SequenceId");if (property != null) {int id = Integer.parseInt(property);if ((id % 3) == 0 && (id > 10)) {return true;}}return false;}}

RocketMQ 執行過濾是在 Broker 端,Broker 所在的機器會啟動多個 FilterServer 過濾進程;Consumer 啟動后,會向 FilterServer 上傳一個過濾的 Java 類;Consumer 從 FilterServer 拉消息,FilterServer 將請求轉發給 Broker,FilterServer 從 Broker 收到消息后,按照 Consumer 上傳的 Java 過濾程序做過濾,過濾完成后返回給 Consumer。這種過濾方法可以節省網絡流量,但是增加了 Broker 的負擔。可惜我沒有實驗出來使用過濾的效果,即使是用 github wiki 上的例子8也沒成功,不糾結了。RocketMQ 的按 Tag 過濾的功能也是在 Broker 上做的過濾,能用,是個很方便的功能。

還有一種廣播消費模式,比較簡單,可以去看代碼,不再列出。

總之,RocketMQ 提供的功能比較多,比 Kafka 多很多易用的 API。

RocketMQ 的主備模式

按之前所說,只有 RocketMQ 的多主多從異步復制是可以生產使用的,因此只在這個場景下測試。另外,消息采用 Push 順序模式消費。

假設集群采用2主2備的模式,需要啟動4個 Broker,配置文件如下,

brokerName=broker-abrokerId=0listenPort=10911storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlogbrokerRole=ASYNC_MASTERbrokerName=broker-abrokerId=1listenPort=10921storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async-slavestorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async-slave/commitlogbrokerRole=SLAVEbrokerName=broker-bbrokerId=0listenPort=20911storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async/commitlogbrokerRole=ASYNC_MASTERbrokerRole=ASYNC_MASTERbrokerName=broker-bbrokerId=1listenPort=20921storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async-slavestorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async-slave/commitlogbrokerRole=SLAVE

另外,每個機構共通的配置項如下,

brokerClusterName=DefaultClusterbrokerIP1=192.168.232.23namesrvAddr=192.168.232.23:9876deleteWhen=04fileReservedTime=120flushDiskType=ASYNC_FLUSH

其他設置均采用默認。啟動 NameServer 和所有 Broker,并試運行一下 Producer,然后看一下 TestTopic1 當前的情況,

$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1{"brokerDatas":[{"brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"},"brokerName":"broker-b"},{"brokerAddrs":{0:"192.168.232.23:10911",1:"192.168.232.23:10921"},"brokerName":"broker-a"}],"filterServerTable":{},"queueDatas":[{"brokerName":"broker-a","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4},{"brokerName":"broker-b","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4}]}

可見,TestTopic1 在2個 Broker 上,且每個 Broker 備機也在運行。下面開始主備切換的實驗,分別啟動 Consumer 和 Producer 進程,消息采用 Pull 順序模式消費。在消息發送接收過程中,使用?kill -9?停掉?broker-a?的主進程,模擬突然宕機。此時,TestTopic1 的狀態如下,

$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1{"brokerDatas":[{"brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"},"brokerName":"broker-b"},{"brokerAddrs":{1:"192.168.232.23:10921"},"brokerName":"broker-a"}],"filterServerTable":{},"queueDatas":[{"brokerName":"broker-a","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4},{"brokerName":"broker-b","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4}]}

此時,RocketMQ 已經恢復。

再來看看 Producer 和 Consumer 的日志,先看 Producer 的,如下,

......00578SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F08, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=141]00579SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F9F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=141]00580SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078D47, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=700]00581SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078DDE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=700]00582SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078E75, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=699]00583SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078F0C, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=699]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)00588SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078FA3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=701]00589SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007903A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=701]00590SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000790D1, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=700]00591SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079168, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=700]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)00596SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000791FF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=702]00597SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079296, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=702]00598SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007932D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=701]00599SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000793C4, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=701]00600SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007945B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=703]00601SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000794F2, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=703]00602SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079589, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=702]00603SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079620, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=702]......01389SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000965BE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=900]01390SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096655, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=899]01391SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000966EC, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=899]01392SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127036, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0], queueOffset=143]01393SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001270CD, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1], queueOffset=141]01394SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127164, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=142]01395SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001271FB, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=142]01396SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096783, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=901]01397SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000009681A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=901]

日志中顯示,在發送完00583條消息之后,開始發生異常?connect to <192.168.232.23:10911> failed,原因應該是?broker-a?的主節點被 kill 掉。之后,從00596條消息開始,RocketMQ 又恢復正常,原因是?broker-b?已經開始提供服務,承擔了所有的工作。然后,又重新啟動了?broker-a?主節點,由于該節點的加入,從01392條消息開始,broker-a?又開始恢復工作。實驗中可以驗證,RocketMQ 所謂的多主多備模式,實際上,備機被弱化到無以復加,在主節點宕機的時候,備機無法接替主機的工作,而只是將尚未發送的數據發送出去,由剩下的主節點接替工作。也就是說,N 主 N 備的 RocketMQ 集群中,總共有 2N 臺機器,實際工作的只有 N 臺,如果有一臺掛了,就只有 N-1 臺工作了,機器的利用率太低了。

再來看一下 Consumer 的日志,如下,

RocketMQ 00551PullResult [pullStatus=FOUND, nextBeginOffset=696, minOffset=0, maxOffset=696, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=695, sysFlag=0, bornTimestamp=1469175032446, bornHost=/192.168.234.98:51987, storeTimestamp=1469175020973, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007859C, commitLogOffset=492956, bodyCRC=943070764, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=696, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00559PullResult [pullStatus=FOUND, nextBeginOffset=697, minOffset=0, maxOffset=697, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=696, sysFlag=0, bornTimestamp=1469175032720, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021247, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000787F8, commitLogOffset=493560, bodyCRC=921540126, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=697, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00567PullResult [pullStatus=FOUND, nextBeginOffset=698, minOffset=0, maxOffset=698, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=697, sysFlag=0, bornTimestamp=1469175033005, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021533, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078A54, commitLogOffset=494164, bodyCRC=2054744282, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=698, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00575PullResult [pullStatus=FOUND, nextBeginOffset=699, minOffset=0, maxOffset=699, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=698, sysFlag=0, bornTimestamp=1469175033286, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021814, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078CB0, commitLogOffset=494768, bodyCRC=225294519, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=699, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00583PullResult [pullStatus=FOUND, nextBeginOffset=700, minOffset=0, maxOffset=700, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=699, sysFlag=0, bornTimestamp=1469175033586, bornHost=/192.168.234.98:51987, storeTimestamp=1469175022113, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078F0C, commitLogOffset=495372, bodyCRC=1670775117, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=700, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00591PullResult [pullStatus=FOUND, nextBeginOffset=701, minOffset=0, maxOffset=701, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=700, sysFlag=0, bornTimestamp=1469175037890, bornHost=/192.168.234.98:51987, storeTimestamp=1469175026418, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079168, commitLogOffset=495976, bodyCRC=344150304, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=701, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00599PullResult [pullStatus=FOUND, nextBeginOffset=702, minOffset=0, maxOffset=702, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=701, sysFlag=0, bornTimestamp=1469175042200, bornHost=/192.168.234.98:51987, storeTimestamp=1469175030734, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000793C4, commitLogOffset=496580, bodyCRC=442030354, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=702, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00603PullResult [pullStatus=FOUND, nextBeginOffset=703, minOffset=0, maxOffset=703, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=702, sysFlag=0, bornTimestamp=1469175042345, bornHost=/192.168.234.98:51987, storeTimestamp=1469175030872, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079620, commitLogOffset=497184, bodyCRC=688469276, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=703, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00607PullResult [pullStatus=FOUND, nextBeginOffset=704, minOffset=0, maxOffset=704, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=703, sysFlag=0, bornTimestamp=1469175042481, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031008, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007987C, commitLogOffset=497788, bodyCRC=778367237, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=704, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00611PullResult [pullStatus=FOUND, nextBeginOffset=705, minOffset=0, maxOffset=705, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=704, sysFlag=0, bornTimestamp=1469175042615, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031143, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079AD8, commitLogOffset=498392, bodyCRC=1578919281, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=705, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00615PullResult [pullStatus=FOUND, nextBeginOffset=706, minOffset=0, maxOffset=706, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=705, sysFlag=0, bornTimestamp=1469175042753, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031280, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079D34, commitLogOffset=498996, bodyCRC=1500619112, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=706, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00619PullResult [pullStatus=FOUND, nextBeginOffset=707, minOffset=0, maxOffset=707, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=706, sysFlag=0, bornTimestamp=1469175042887, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031414, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079F90, commitLogOffset=499600, bodyCRC=1355279683, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=707, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00623PullResult [pullStatus=FOUND, nextBeginOffset=708, minOffset=0, maxOffset=708, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=707, sysFlag=0, bornTimestamp=1469175043021, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031548, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A1EC, commitLogOffset=500204, bodyCRC=457136030, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=708, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00627PullResult [pullStatus=FOUND, nextBeginOffset=709, minOffset=0, maxOffset=709, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=708, sysFlag=0, bornTimestamp=1469175043154, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031681, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A448, commitLogOffset=500808, bodyCRC=475173767, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=709, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00631PullResult [pullStatus=FOUND, nextBeginOffset=710, minOffset=0, maxOffset=710, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=709, sysFlag=0, bornTimestamp=1469175043299, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031826, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A6A4, commitLogOffset=501412, bodyCRC=1814693875, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=710, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00635PullResult [pullStatus=FOUND, nextBeginOffset=711, minOffset=0, maxOffset=711, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=710, sysFlag=0, bornTimestamp=1469175043435, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031962, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A900, commitLogOffset=502016, bodyCRC=1799865322, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=711, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=0, sysFlag=0, bornTimestamp=1468572196808, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191827, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011C60, commitLogOffset=72800, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=1, sysFlag=0, bornTimestamp=1468572196876, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191895, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011EB0, commitLogOffset=73392, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=2, sysFlag=0, bornTimestamp=1468572196903, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191928, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012100, commitLogOffset=73984, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]RocketMQ 00001PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=3, sysFlag=0, bornTimestamp=1468572718149, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713175, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000001222B, commitLogOffset=74283, bodyCRC=1133127810, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]RocketMQ 00005PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=4, sysFlag=0, bornTimestamp=1468572718178, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713210, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012487, commitLogOffset=74887, bodyCRC=1156050075, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]......[queueId=1, storeSize=151, queueOffset=22, sysFlag=0, bornTimestamp=1469170324786, bornHost=/192.168.234.98:49814, storeTimestamp=1469170313333, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D3AA, commitLogOffset=1102762, bodyCRC=1707898805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00477PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=23, sysFlag=0, bornTimestamp=1469170325237, bornHost=/192.168.234.98:49814, storeTimestamp=1469170313771, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D606, commitLogOffset=1103366, bodyCRC=1654764460, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00481PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=24, sysFlag=0, bornTimestamp=1469170325652, bornHost=/192.168.234.98:49814, storeTimestamp=1469170314163, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D862, commitLogOffset=1103970, bodyCRC=207227478, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00485PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=25, sysFlag=0, bornTimestamp=1469170326066, bornHost=/192.168.234.98:49814, storeTimestamp=1469170314595, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010DABE, commitLogOffset=1104574, bodyCRC=188206671, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]......RocketMQ 01370PullResult [pullStatus=FOUND, nextBeginOffset=895, minOffset=0, maxOffset=895, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=894, sysFlag=0, bornTimestamp=1469175070573, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059101, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095A89, commitLogOffset=613001, bodyCRC=1094080495, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=895, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01374PullResult [pullStatus=FOUND, nextBeginOffset=896, minOffset=0, maxOffset=896, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=895, sysFlag=0, bornTimestamp=1469175070712, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059251, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095CE5, commitLogOffset=613605, bodyCRC=1180406774, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=896, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01378PullResult [pullStatus=FOUND, nextBeginOffset=897, minOffset=0, maxOffset=897, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=896, sysFlag=0, bornTimestamp=1469175070899, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059427, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095F41, commitLogOffset=614209, bodyCRC=1340989405, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=897, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01382PullResult [pullStatus=FOUND, nextBeginOffset=898, minOffset=0, maxOffset=898, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=897, sysFlag=0, bornTimestamp=1469175071054, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059582, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000009619D, commitLogOffset=614813, bodyCRC=681585164, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=898, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01386PullResult [pullStatus=FOUND, nextBeginOffset=899, minOffset=0, maxOffset=899, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=898, sysFlag=0, bornTimestamp=1469175071203, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059731, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000963F9, commitLogOffset=615417, bodyCRC=802024981, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=899, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01390PullResult [pullStatus=FOUND, nextBeginOffset=900, minOffset=0, maxOffset=900, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=899, sysFlag=0, bornTimestamp=1469175071338, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059866, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000096655, commitLogOffset=616021, bodyCRC=1605728865, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=900, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=0, sysFlag=0, bornTimestamp=1468571752640, bornHost=/192.168.234.98:56433, storeTimestamp=1468571747895, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011B38, commitLogOffset=72504, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=1, sysFlag=0, bornTimestamp=1468572196772, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191803, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011BCC, commitLogOffset=72652, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=2, sysFlag=0, bornTimestamp=1468572196865, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191886, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011E1C, commitLogOffset=73244, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=3, sysFlag=0, bornTimestamp=1468572196899, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191917, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000001206C, commitLogOffset=73836, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]RocketMQ 00000PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=151, queueOffset=4, sysFlag=0, bornTimestamp=1468572718127, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713166, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012194, commitLogOffset=74132, bodyCRC=881661972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]RocketMQ 00004PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=151, queueOffset=5, sysFlag=0, bornTimestamp=1468572718170, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713197, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000000123F0, commitLogOffset=74736, bodyCRC=870374413, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]......RocketMQ 00560PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=140, sysFlag=0, bornTimestamp=1469175032756, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021285, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126922, commitLogOffset=1206562, bodyCRC=1679588729, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00568PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=141, sysFlag=0, bornTimestamp=1469175033043, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021570, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126B7E, commitLogOffset=1207166, bodyCRC=1791489355, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00576PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=142, sysFlag=0, bornTimestamp=1469175033320, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021848, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126DDA, commitLogOffset=1207770, bodyCRC=342157581, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01392PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=143, sysFlag=0, bornTimestamp=1469175071411, bornHost=/192.168.234.98:52034, storeTimestamp=1469175059951, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000127036, commitLogOffset=1208374, bodyCRC=834345805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01400PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=144, sysFlag=0, bornTimestamp=1469175071746, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060289, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000127292, commitLogOffset=1208978, bodyCRC=188274605, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01408PullResult [pullStatus=FOUND, nextBeginOffset=211, minOffset=0, maxOffset=211, msgFoundList=1]MessageExt [queueId=0, storeSize=151, queueOffset=145, sysFlag=0, bornTimestamp=1469175072078, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060614, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000001274EE, commitLogOffset=1209582, bodyCRC=98787231, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=211, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01416PullResult [pullStatus=FOUND, nextBeginOffset=214, minOffset=0, maxOffset=214, msgFoundList=3]MessageExt [queueId=0, storeSize=151, queueOffset=146, sysFlag=0, bornTimestamp=1469175072405, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060934, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000012774A, commitLogOffset=1210186, bodyCRC=2067809241, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=214, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]

可以看到,Consumer 在?broker-a?宕機時間的附近,也出現了異常,connect to <192.168.232.23:10911> failed。雖然還能保持分區上的順序性,但是已經某種程度上出現了一些紊亂,例如,將我在實驗之前的數據給取了出來(Hello MetaQ的消息)。可是,我在實驗前,明明做過刪除這個 Topic 的動作,看來 RocketMQ 所謂的刪除,并未刪除 Topic 的數據。之后,broker-a?主機重啟之后,又恢復正常。

RocketMQ Pull模式消費需要手動管理 offset 和指定分區,這個在調用的時候不覺得,實際運行的時候才會發現每次總是消費一個分區,消費完之后,才開始消費下一個分區,而下一個分區可能已經堆積了很多消息了,手動做消息分配又比較費事。或許,Push 順序模式消費才是更好的選擇。

另外還有幾個比較異常的情況,實驗中有幾次出現了?CODE: 17 DESC: topic[TopicTest1] not exist, apply first please!?這樣的錯誤,實際上,這時候我只是關掉了 Producer;還有,sh mqadmin updateTopic –n 192.168.232.23:9876 –c DefaultCluster –t TopicTest1?明明文檔中說可以用來新增 Topic,而實際上不行。

補充一下:之后,我又使用 Push 順序模式消費重做了上述實驗,結論差不多。只是因為有多線程的原因,日志看起來偶爾有錯位,這個問題不大,可以解決。而且,在關閉重啟 Broker 的附近,往往伴隨著多次的消息重發,不過,RocketMQ 也不保證消息只收到一次就是了。消息重復的問題,Kafka 要比 RocketMQ 顯得不那么嚴重一些。Push 順序模式消費不需要指定 offset,不需要指定分區,第二次啟動可以自動從前一次的 offset 后開始消費。功能上這個與 Kafka 的 Consumer 更類似,雖然 RocketMQ 采用的是異步模式。

RocketMQ 最佳實踐

實際上,RocketMQ 自己就有一份《RocketMQ 最佳實踐》的文檔,里面提到了一些系統設計的問題,例如消費者要冪等,一個應用對應一個 Topic,如此等等。這些經驗不僅僅是對 RocketMQ 有用,對 Kafka 也頗有借鑒意義。

后記

這里談談我對選擇 RocketMQ 還是 Kafka 的個人建議。以上已經做了多處 RocketMQ 和 Kafka 的對比,我個人覺得,Kafka 是一個不斷發展中的系統,開源社區比 RocketMQ 要大,也要更活躍一些;另外,Kafka 最新版本已經有了同步復制,消息可靠性更有保障;還有,Kafka 的分區機制,幾乎實現了自動負載均衡,這絕對是個殺手級特性;RocketMQ 雖然提供了很多易用的功能,遠超出 Kafka,但這些功能并不一定都能用得上,而且多數可以繞過。相比之下,Kafka 的基本功能更加吸引我,再處理故障恢復的時候,細節上要勝過 RocketMQ。當然,如果是 A 公司內部,或者所在公司使用了 A 公司的云產品,那么 RocketMQ 的企業級特性更多一些,或許我會選擇 RocketMQ。

?

轉自:http://valleylord.github.io/post/201607-mq-rocketmq/

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/448602.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/448602.shtml
英文地址,請注明出處:http://en.pswp.cn/news/448602.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

好久沒敲代碼了(強行補上今天的博客。。。)

流水賬日記&#xff08;哈哈&#xff09; 今天沒課&#xff0c;早上好好的睡了個懶覺&#xff08;雖然還是很困- -&#xff09;&#xff1b; 哥幾個把昨天買的排骨洗了做了個排骨湯&#xff0c;雖然不如家里做的好吃&#xff0c;但對此時的我們來說已經是美味了&#xff0c;晚…

Ubuntu下使用AMD APP編寫OpenCL程序

對于Ubuntu或其近親&#xff08;Lubuntu、Kubuntu、Mint等&#xff09;編寫OpenCL程序也不會太難。由于本例用的是AMD APP SDK&#xff0c;因此需要AMD的GPU以及相關驅動。首先&#xff0c;去AMD官網下載GPU驅動——AMD Catalyst。如果你用的是APU并且還有一塊獨立顯卡的話&…

jdk的安裝與配置

Linux一、安裝JDK 從sun網站上直接下載JDK&#xff1a;http://java.sun.com/j2se/1.4.2/download.html提供了兩個下載j2re-1_4_2_10-linux-i586.bin 13.75 MB, j2re-1_4_2_10-linux-i586-rpm.bin 13.27 MB&#xff1a;1、RPM in self-extracting file (j2re-1_4_2_10-linux…

李洋瘋狂C語言之n個人報數,報到3的退出,最后留在場上的是原來的第幾位(約瑟夫環)

今天老師布置了個題目&#xff0c;約瑟夫環&#xff0c;俗稱猴子選大王。n個人報數&#xff0c;報到3的退出&#xff0c;最后留在場上的時原來的第幾位 #include <stdio.h>int main() {int i, n, q, p 0; //計數 i ,人數 n ,報數 p ,場上人數 qprintf ("input…

搭建Vue腳手架(vue-cli)并創建一個項目

1、 安裝nodejs環境 官網下載&#xff1a;https://nodejs.org/en/download/ 一直默認就行&#xff0c;路徑可以改變但要記得到 安裝完成后cmd&#xff0c;輸入node -v ,npm -v 如果能看到node和npm的版本號了&#xff0c;說明已經安裝成功 2、安裝vue-cli 有npm和cnpm兩種方式…

NPM 使用介紹

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 NPM是隨同NodeJS一起安裝的包管理工具&#xff0c;能解決NodeJS代碼部署上的很多問題&#xff0c;常見的使用場景有以下幾種&#xff1a…

人生致命的八個經典問題

問題一&#xff1a;如果你家附近有一家餐廳&#xff0c;東西又貴又難吃&#xff0c;桌上還爬著蟑螂&#xff0c;你會因為它很近很方便&#xff0c;就一而再、再而三地光臨嗎&#xff1f; 回答&#xff1a;你一定會說&#xff0c;這是什么爛問題&#xff0c;誰那么笨&#xff0c…

RabbitMQ學習總結(5)——發布和訂閱實例詳解

2019獨角獸企業重金招聘Python工程師標準>>> 一、Publish/Subscribe&#xff08;發布/訂閱&#xff09;&#xff08;using the Java Client&#xff09; 在前面的教程中,我們創建了一個work Queue&#xff08;工作隊列&#xff09;。工作隊列背后的假設是每個任務是…

iOS有哪些數據類型/基本數據類型?

簡述 本文主要探究使用OC作為iOS開發語言時&#xff0c;我們能使用哪些數據類型。 一切類型始于C。 C語言的類型 基本數據類型&#xff1a; 基本數據類型&#xff08;fundamental data types&#xff09;也叫原始數據類型&#xff08;primitive data types&#xff09; 整型、字…

李洋瘋狂C語言之將”you are come from shanghai ”倒置為”shanghai from come are you”,將句子中的單詞位置倒置,而不改變單詞內部結構

題目: 編寫一個C函數,將”you are come from shanghai ”倒置為”shanghai from come are you”,及將句子中的單詞位置倒置,而不改變單詞內部結構 #include <stdio.h> #include <string.h> void change(char *p1, char *p2); //函數聲明 int main() {char str[] …

馬桶怎么清洗才干凈無異味?

方法/步驟 在馬桶水箱中一定要放上潔廁寶&#xff1a; 潔廁寶里面含有多種去除馬桶中雜質以及異味的功能&#xff0c;另外它還帶有香香的味道&#xff0c;我們一按沖馬桶的按鈕&#xff0c;放出來的總是藍色的水&#xff0c;十分的美觀和好看&#xff0c;但是這并不是花瓶般的作…

白話解說:阻塞和非阻塞,同步和異步

阻塞和非阻塞&#xff0c;同步和異步是node.js里經常遇到的詞匯&#xff0c;舉例說明&#xff1a; 我要看足球比賽&#xff0c;但是媽媽叫我燒水&#xff0c;電視機在客廳&#xff0c;燒水要在廚房。家里有2個水壺&#xff0c;一個是普通的水壺&#xff0c;另一個是水開了會叫的…

蘇嵌點滴(一)

來蘇嵌也有12天了&#xff0c;也漸漸開始習慣這樣的生活&#xff0c;每天睜眼到閉眼&#xff0c;全都是代碼。每天都得學習很多新的知識&#xff0c;C語言學到現在也學得差不多了&#xff0c;還有明天一天課。 指針、數組這些C語言中的重點&#xff0c;還是需要一點時間消化的…

Mysql學習總結(8)——MySql基本查詢、連接查詢、子查詢、正則表達查詢講解...

2019獨角獸企業重金招聘Python工程師標準>>> 查詢數據指從數據庫中獲取所需要的數據。查詢數據是數據庫操作中最常用&#xff0c;也是最重要的操作。用戶可以根據自己對數據的需求&#xff0c;使用不同的查詢方式。通過不同的查詢方式&#xff0c;可以獲得不同的數據…

安裝OpenCL和AMD驅動程序

我們將安裝AMD OpenCL軟件開發工具包&#xff08;SDK&#xff09;和AMD驅動程序。 userubuntu:~$ mkdir AMD-APP-SDK-v2.5-lnx64 userubuntu:~$ cd AMD-APP-SDK-v2.5-lnx64/ userubuntu:~$ wgethttp://developer.amd.com/Downloads/AMD-APP-SDK-v2.5-lnx64.tgz userubuntu:~$ t…

Node.js -- Stream 使用小例 ( 流運用 :讀取、寫入、寫出、拷貝)

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 Stream 是一個抽象接口&#xff0c;Node 中有很多對象實現了這個接口。例如&#xff0c;對http 服務器發起請求的request 對象就是一個 …

李洋瘋狂C語言之有關“you are come from shanghai”逆序(二)

今天將指針和函數全部學完了&#xff0c;之前這題的做法&#xff0c;現在看來有點繁瑣&#xff0c;于是乎做了一些修改&#xff0c;下面是新的代碼 //you are from shanghai逆序#include <stdio.h> #include <string.h> //下面要用到strlenvoid reverse(c…

sync - 清空文件系統緩沖區

總覽 (SYNOPSIS) sync [OPTION] 描述 (DESCRIPTION) 強迫把更改的塊寫入磁盤&#xff0c; 并更新超級塊。 --help顯示幫助然后終止。--version顯示版本信息然后終止。 轉載于:https://www.cnblogs.com/fanweisheng/p/11101219.html

學會用好 Visual Studio Code

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 Visual Studio Code是個牛逼的編輯器&#xff0c;啟動非常快&#xff0c;完全可以用來代替其他文本文件編輯工具。又可以用來做開發&…

蘇嵌點滴(二)

今天把指針和函數講完了&#xff0c;這些都還能接受&#xff0c;之后老師和我們講了遞歸&#xff0c;有點難度。 晚上電腦還出了點狀況&#xff0c;一個晚自習全用來重裝系統和學習軟件套裝X_X&#xff0c;調試完已經接近下課&#xff0c;遞歸還沒來得及看。 放學后&#xff…