前些天發現了一個巨牛的人工智能學習網站,通俗易懂,風趣幽默,忍不住分享一下給大家。點擊跳轉到教程。
RocketMQ 是出自 A 公司的開源產品,用 Java 語言實現,在設計時參考了 Kafka,并做出了自己的一些改進,消息可靠性上比 Kafka 更好,目前,RocketMQ 的文檔仍然不夠豐富?1?2,社區仍然無法與 Kafka 比肩,但 A 公司已經推出了基于 RocketMQ 的云產品?3,相信未來 RocketMQ 也會有不錯的發展。本文采用 RocketMQ 3.2.6 進行實驗,由于 RocketMQ 與 Kafka 很相似,本文很多地方對兩者做出了比較。
基本概念
RocketMQ 由于借鑒了 Kafka 的設計,包括組件的命名也很多與 Kafka 相似,下面摘抄一段《RocketMQ 原理簡介》中的介紹,可以與 Kafka 的命名比對一下,
- Producer,消息生產者,負責產生消息,一般由業務系統負責產生消息。
- Consumer,消息消費者,負責消費消息,一般是后臺系統負責異步消費。
- Push Consumer,Consumer 的一種,應用通常向 Consumer 對象注冊一個 Listener 接口,一旦收到消息,Consumer 對象立 刻回調 Listener 接口方法。
- Pull Consumer,Consumer 的一種,應用通常主動調用 Consumer 的拉消息方法從 Broker 拉消息,主動權由應用控制。
- Producer Group,一類 Producer 的集合名稱,這類 Producer 通常發送一類消息,且發送邏輯一致。
- Consumer Group,一類 Consumer 的集合名稱,這類 Consumer 通常消費一類消息,且消費邏輯一致。
- Broker,消息中轉角色,負責存儲消息,轉發消息,一般也稱為 Server。在 JMS 規范中稱為 Provider。
《RocketMQ 原理簡介》中還介紹了一些其他的概念,例如,廣播消費和集群消費,廣播消費是 Consumer Group 中對于同一條消息每個 Consumer 都消費,集群消費是 Consumer Group 中對于同一條消息只有一個 Consumer 消費。Kafka 采用的是集群消費,不支持廣播消費(好吧,是我沒有找到)。再例如,普通順序消息和嚴格順序消息,普通順序消息在 Broker 重啟情況下不會保證消息順序性;嚴格順序消息即使在異常情況下也會保證消息的順序性。個人理解,所謂普通順序消息,應該就是 Kafka 中的 Partition 級別有序,嚴格順序消息,應該是 Topic 級別有序,但文中也提到,這樣的有序級別是要付出代價的,Broker 集群中只要有一臺機器不可用,則整個集群都不可用,降低服務可用性。使用這種模式,需要依賴同步雙寫,主備自動切換,但自動切換功能目前還未實現(我猜,自動切換僅僅是沒開源吧)。說白了,嚴格順序消息不具備生產可用性,自己玩玩還行,其應用場景主要是數據庫 binlog 同步。
關于 RocketMQ 和 Kafka 的對比,可以參考 RocketMQ Wiki 中的文章?4,看看就行,不必較真。
關于順序和分區
順序性的話題,剛才已經提到了一些,RocketMQ 的實現應該不弱于 Kafka。對于分區,RocketMQ 似乎有意弱化了這個概念,只有在 Producer 中有一個參數?defaultTopicQueueNums
,分區在 RocketMQ 中有時被稱為隊列。RocketMQ 的普通順序消息模式,應該就是分區順序性,這點與 Kafka 一致。
關于高可用
RocketMQ 實現高可用的方式有多種,《RocketMQ 用戶指南》文檔中提到的有:多主模式、多主多從異步復制模式、多主多從同步復制模式。多主模式下,性能較好,但是在 Broker 宕機的時候,該 Broker 上未消費的交易不可消費;多主多從異步復制模式,與 Kafka 的副本模式比較類似,主 Broker 宕機后,會自動切換到從 Broker,消息的消費不會出現間斷;多主多從同步復制模式更進一步,采用同步刷盤的方式,避免了主 Broker 宕機帶來的消息丟失,但是,目前不支持自動切換。
雖然 RocketMQ 提供了多種高可用方式,但是目前能生產使用的就只有多主多從異步復制模式,即使在這個模式上,其實現也比 Kafka 要差。因為 RocketMQ 的機制中,主從關系是人為指定的,主 Broker 上承擔所有的消息派發,而 Kafka 的主從關系是通過選舉的方式選出來的,每個分區的主節點都是不一樣的,可以從不同的節點派發消息。Kafka 的模式是分散模式,有利于負載均衡,而且當一個 Broker 宕機的時候,只影響部分 Topic,而 RocketMQ 一旦主 Broker 宕機,會影響所有的 Topic。另外,Kafka 可以支持 Broker 間同步復制(通過設置 Broker 的?acks
?參數),這樣比的話,RocketMQ 就差太多了。
關于 RocketMQ 的介紹,網上的文章不算太多,也比較雜,《分布式開放消息系統(RocketMQ)的原理與實踐》5?6?7這篇原理介紹的不錯,推薦。
RocketMQ 的工具和編程接口
RocketMQ 的工具
相比較 Kafka 而言,RocketMQ 提供的工具要少一些,如下,
bin/mqadminbin/mqbrokerbin/mqbroker.numanode0bin/mqbroker.numanode1bin/mqbroker.numanode2bin/mqbroker.numanode3bin/mqfiltersrvbin/mqnamesrvbin/mqshutdown
除了進程啟停之外,常用的運維命令都在?mqadmin
?中,詳見《RocketMQ 運維指令》文檔。我實驗中常用的一些命令如下,
sh mqnamesrv &sh mqbroker -c async-broker-a.properties &sh mqbroker -c async-broker-a-s.properties &sh mqadmin topicList -n 192.168.232.23:9876sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTestjjjsh mqadmin clusterList -n 192.168.232.23:9876sh mqadmin deleteTopic -c DefaultCluster -n 192.168.232.23:9876 -t TopicTestjjjsh mqadmin consumerProgress -n 192.168.232.23:9876 -g ConsumerGroupNamecc4sh mqadmin deleteSubGroup -c DefaultCluster -n 192.168.232.23:9876 -g ConsumerGroupNamecc4sh mqadmin consumerConnection -n 192.168.232.23:9876 -g ConsumerGroupNamecc4
RocketMQ 使用了自己的 name server 來做調度(Kafka 用了 Zookeeper),使用?sh mqnamesrv
?來啟動,默認監聽端口9876,sh mqnamesrv -m
?可以查看所有默認參數,使用?-c xxxx.properties
?參數來指定自定義配置。sh mqbroker
?是用于啟動 Broker 的命令,參數比較多,詳細可以通過?sh mqbroker -m
?查看默認參數,配置項細節后文再說。sh mqadmin
?是運維命令入口,topicList
?是列出所有 Topic;topicRoute
?是列出單個 Topic 的詳細信息;clusterList
?是列出集群的信息;deleteTopic
?是刪除 Topic。consumerProgress
?是查看消費者消費進度,deleteSubGroup
?是刪除消費者的訂閱,consumerConnection
?是查詢消費者訂閱的情況。
Broker 的配置是最多的,實驗中我修改到的部分如下,其他使用默認,
brokerClusterName=DefaultClusterbrokerIP1=192.168.232.23brokerName=broker-abrokerId=0namesrvAddr=192.168.232.23:9876listenPort=10911deleteWhen=04fileReservedTime=120storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlogbrokerRole=ASYNC_MASTERflushDiskType=ASYNC_FLUSH
配置文件中的多數配置看例子就可以知道意思,挑幾個說一下。brokerName
?和?brokerId
, 同名的 Broker,ID 是0的是主節點,其他是從節點;deleteWhen
,刪除文件時間點,默認凌晨4點;fileReservedTime
,文件保留時間,設置為120小時;brokerRole
,Broker 的角色,ASYNC_MASTER 是異步復制主節點,SYNC_MASTER 是同步雙寫主節點,SLAVE 是備節點。
其實,這些工具的寫法也基本一致,都是先做一些檢查,最后運行 Java 程序,JVM 系統上的應用應該差不多都這樣。
RocketMQ 的 Java API
RocketMQ 是用 Java 語言開發的,因此,其 Java API 相對是比較豐富的,當然也有部分原因是 RocketMQ 本身提供的功能就比較多。RocketMQ API 提供的功能包括,
- 廣播消費,這個在之前已經提到過;
- 消息過濾,支持簡單的 Message Tag 過濾,也支持按 Message Header、body 過濾;
- 順序消費和亂序消費,之前也提到過,這里的順序消費應該指的是普通順序性,這一點與 Kafka 相同;
- Pull 模式消費,這個是相對 Push 模式來說的,Kafka 就是 Pull 模式消費;
- 事務消息,這個好像沒有開源,但是 example 代碼中有示例,總之,不推薦用;
- Tag,RocketMQ 在 Topic 下面又分了一層 Tag,用于表示消息類別,可以用來過濾,但是順序性還是以 Topic 來看;
單看功能的話,即使不算事務消息,也不算 Tag,RocketMQ 也遠超 Kafka,Kafka 應該只實現了 Pull 模式消費 + 順序消費這2個功能。RocketMQ 的代碼示例在 rocketmq-example 中,注意,代碼是不能直接運行的,因為所有的代碼都少了設置 name server 的部分,需要自己手動加上,例如,producer.setNamesrvAddr("192.168.232.23:9876");
。
先來看一下生產者的 API,比較簡單,只有一種,如下,
import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.MessageQueueSelector;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.common.message.MessageQueue;import java.util.List;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.setNamesrvAddr("192.168.232.23:9876");producer.start();for (int i = 0; i < 10; i++)try {{Message msg = new Message("TopicTest1",// topic"TagA",// tag"OrderID188",// key("RocketMQ "+String.format("%05d", i)).getBytes());// bodySendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, i));System.out.println(String.format("%05d", i)+sendResult);}}catch (Exception e) {e.printStackTrace();}producer.shutdown();}}
可以發現,相比 Kafka 的 API,只多了 Tag,但實際上行為有很大不同。Kafka 的生產者客戶端,有同步和異步兩種模式,但都是阻塞模式,send
?方法返回發送狀態的?Future
,可以通過?Future
?的?get
?方法阻塞獲得發送狀態。而 RocketMQ 采用的是同步非阻塞模式,發送之后立刻返回發送狀態(而不是?Future
)。正常情況下,兩者使用上差別不大,但是在高可用場景中發生主備切換的時候,Kafka 的同步可以等待切換完成并重連,最后返回;而 RocketMQ 只能立刻報錯,由生產者選擇是否重發。所以,在生產者的 API 上,其實 Kafka 是要強一些的。
另外,RocketMQ 可以通過指定?MessageQueueSelector
?類的實現來指定將消息發送到哪個分區去,Kafka 是通過指定生產者的?partitioner.class
?參數來實現的,靈活性上 RocketMQ 略勝一籌。
再來看消費者的API,由于 RocketMQ 的功能比較多,我們先看 Pull 模式消費的API,如下,
import java.util.HashMap;import java.util.Map;import java.util.Set;import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;import com.alibaba.rocketmq.client.consumer.PullResult;import com.alibaba.rocketmq.client.consumer.store.OffsetStore;import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;import com.alibaba.rocketmq.common.message.MessageQueue;public class PullConsumer {private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");for (MessageQueue mq : mqs) {System.out.println("Consume from the queue: " + mq);SINGLE_MQ: while (true) {try {long offset = consumer.fetchConsumeOffset(mq, true);PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);if (null != pullResult.getMsgFoundList()) {for (MessageExt messageExt : pullResult.getMsgFoundList()) {System.out.print(new String(messageExt.getBody()));System.out.print(pullResult);System.out.println(messageExt);}}putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:// TODObreak;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}}catch (Exception e) {e.printStackTrace();}}}consumer.shutdown();}private static void putMessageQueueOffset(MessageQueue mq, long offset) {offseTable.put(mq, offset);}private static long getMessageQueueOffset(MessageQueue mq) {Long offset = offseTable.get(mq);if (offset != null)return offset;return 0;}}
這部分的 API 其實是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分區,而 Kafka 可以自動管理(當然也可以手動管理),并且不需要指定分區(分區是在 Kafka 訂閱的時候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用?OffsetStore
?接口,提供了兩種管理方式,本地文件和遠程 Broker。這部分感覺兩者差不多。
下面再看看 Push 模式順序消費,代碼如下,
import java.util.List;import java.util.concurrent.atomic.AtomicLong;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest1", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(false);System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;}else if ((this.consumeTimes.get() % 3) == 0) {return ConsumeOrderlyStatus.ROLLBACK;}else if ((this.consumeTimes.get() % 4) == 0) {return ConsumeOrderlyStatus.COMMIT;}else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}
雖然提供了 Push 模式,RocketMQ 內部實際上還是 Pull 模式的 MQ,Push 模式的實現應該采用的是長輪詢,這點與 Kafka 一樣。使用該方式有幾個注意的地方,
- 接收消息的監聽類要使用?
MessageListenerOrderly
; ConsumeFromWhere
?有幾個參數,表示從頭開始消費,從尾開始消費,還是從某個 TimeStamp 開始消費;- 可以控制 offset 的提交,應該就是?
context.setAutoCommit(false);
?的作用;
控制 offset 提交這個特性非常有用,某種程度上擴展一下,就可以當做事務來用了,看代碼?ConsumeMessageOrderlyService
?的實現,其實并沒有那么復雜,在不啟用 AutoCommit 的時候,只有返回?COMMIT
?才 commit offset;啟用 AutoCommit 的時候,返回?COMMIT
、ROLLBACK
(這個比較扯)、SUCCESS
?的時候,都 commit offset。
后來發現,commit offset 功能在 Kafka 里面也有提供,使用新的 API,調用?
consumer.commitSync
。
再看一個 Push 模式亂序消費 + 消息過濾的例子,消費者的代碼如下,
import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");consumer.setNamesrvAddr("192.168.232.23:9876");consumer.subscribe("TopicTest1", MessageFilterImpl.class.getCanonicalName());consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}
接收消息的監聽類使用的是?MessageListenerConcurrently
;這個例子與之前順序消費不同的地方在于,
- 回調方法中,使用的是自動 offset commit;
- 訂閱的時候增加了消息過濾類?
MessageFilterImpl
;
消息過濾類?MessageFilterImpl
?的代碼如下,
import com.alibaba.rocketmq.common.filter.MessageFilter;import com.alibaba.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter {@Overridepublic boolean match(MessageExt msg) {String property = msg.getUserProperty("SequenceId");if (property != null) {int id = Integer.parseInt(property);if ((id % 3) == 0 && (id > 10)) {return true;}}return false;}}
RocketMQ 執行過濾是在 Broker 端,Broker 所在的機器會啟動多個 FilterServer 過濾進程;Consumer 啟動后,會向 FilterServer 上傳一個過濾的 Java 類;Consumer 從 FilterServer 拉消息,FilterServer 將請求轉發給 Broker,FilterServer 從 Broker 收到消息后,按照 Consumer 上傳的 Java 過濾程序做過濾,過濾完成后返回給 Consumer。這種過濾方法可以節省網絡流量,但是增加了 Broker 的負擔。可惜我沒有實驗出來使用過濾的效果,即使是用 github wiki 上的例子8也沒成功,不糾結了。RocketMQ 的按 Tag 過濾的功能也是在 Broker 上做的過濾,能用,是個很方便的功能。
還有一種廣播消費模式,比較簡單,可以去看代碼,不再列出。
總之,RocketMQ 提供的功能比較多,比 Kafka 多很多易用的 API。
RocketMQ 的主備模式
按之前所說,只有 RocketMQ 的多主多從異步復制是可以生產使用的,因此只在這個場景下測試。另外,消息采用 Push 順序模式消費。
假設集群采用2主2備的模式,需要啟動4個 Broker,配置文件如下,
brokerName=broker-abrokerId=0listenPort=10911storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlogbrokerRole=ASYNC_MASTERbrokerName=broker-abrokerId=1listenPort=10921storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async-slavestorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async-slave/commitlogbrokerRole=SLAVEbrokerName=broker-bbrokerId=0listenPort=20911storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-asyncstorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async/commitlogbrokerRole=ASYNC_MASTERbrokerRole=ASYNC_MASTERbrokerName=broker-bbrokerId=1listenPort=20921storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async-slavestorePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async-slave/commitlogbrokerRole=SLAVE
另外,每個機構共通的配置項如下,
brokerClusterName=DefaultClusterbrokerIP1=192.168.232.23namesrvAddr=192.168.232.23:9876deleteWhen=04fileReservedTime=120flushDiskType=ASYNC_FLUSH
其他設置均采用默認。啟動 NameServer 和所有 Broker,并試運行一下 Producer,然后看一下 TestTopic1 當前的情況,
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1{"brokerDatas":[{"brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"},"brokerName":"broker-b"},{"brokerAddrs":{0:"192.168.232.23:10911",1:"192.168.232.23:10921"},"brokerName":"broker-a"}],"filterServerTable":{},"queueDatas":[{"brokerName":"broker-a","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4},{"brokerName":"broker-b","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4}]}
可見,TestTopic1 在2個 Broker 上,且每個 Broker 備機也在運行。下面開始主備切換的實驗,分別啟動 Consumer 和 Producer 進程,消息采用 Pull 順序模式消費。在消息發送接收過程中,使用?kill -9
?停掉?broker-a
?的主進程,模擬突然宕機。此時,TestTopic1 的狀態如下,
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1{"brokerDatas":[{"brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"},"brokerName":"broker-b"},{"brokerAddrs":{1:"192.168.232.23:10921"},"brokerName":"broker-a"}],"filterServerTable":{},"queueDatas":[{"brokerName":"broker-a","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4},{"brokerName":"broker-b","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4}]}
此時,RocketMQ 已經恢復。
再來看看 Producer 和 Consumer 的日志,先看 Producer 的,如下,
......00578SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F08, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=141]00579SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F9F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=141]00580SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078D47, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=700]00581SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078DDE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=700]00582SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078E75, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=699]00583SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078F0C, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=699]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)00588SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078FA3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=701]00589SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007903A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=701]00590SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000790D1, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=700]00591SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079168, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=700]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)00596SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000791FF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=702]00597SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079296, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=702]00598SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007932D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=701]00599SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000793C4, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=701]00600SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007945B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=703]00601SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000794F2, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=703]00602SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079589, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=702]00603SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079620, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=702]......01389SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000965BE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=900]01390SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096655, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=899]01391SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000966EC, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=899]01392SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127036, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0], queueOffset=143]01393SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001270CD, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1], queueOffset=141]01394SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127164, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=142]01395SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001271FB, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=142]01396SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096783, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=901]01397SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000009681A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=901]
日志中顯示,在發送完00583條消息之后,開始發生異常?connect to <192.168.232.23:10911> failed
,原因應該是?broker-a
?的主節點被 kill 掉。之后,從00596條消息開始,RocketMQ 又恢復正常,原因是?broker-b
?已經開始提供服務,承擔了所有的工作。然后,又重新啟動了?broker-a
?主節點,由于該節點的加入,從01392條消息開始,broker-a
?又開始恢復工作。實驗中可以驗證,RocketMQ 所謂的多主多備模式,實際上,備機被弱化到無以復加,在主節點宕機的時候,備機無法接替主機的工作,而只是將尚未發送的數據發送出去,由剩下的主節點接替工作。也就是說,N 主 N 備的 RocketMQ 集群中,總共有 2N 臺機器,實際工作的只有 N 臺,如果有一臺掛了,就只有 N-1 臺工作了,機器的利用率太低了。
再來看一下 Consumer 的日志,如下,
RocketMQ 00551PullResult [pullStatus=FOUND, nextBeginOffset=696, minOffset=0, maxOffset=696, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=695, sysFlag=0, bornTimestamp=1469175032446, bornHost=/192.168.234.98:51987, storeTimestamp=1469175020973, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007859C, commitLogOffset=492956, bodyCRC=943070764, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=696, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00559PullResult [pullStatus=FOUND, nextBeginOffset=697, minOffset=0, maxOffset=697, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=696, sysFlag=0, bornTimestamp=1469175032720, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021247, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000787F8, commitLogOffset=493560, bodyCRC=921540126, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=697, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00567PullResult [pullStatus=FOUND, nextBeginOffset=698, minOffset=0, maxOffset=698, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=697, sysFlag=0, bornTimestamp=1469175033005, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021533, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078A54, commitLogOffset=494164, bodyCRC=2054744282, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=698, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00575PullResult [pullStatus=FOUND, nextBeginOffset=699, minOffset=0, maxOffset=699, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=698, sysFlag=0, bornTimestamp=1469175033286, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021814, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078CB0, commitLogOffset=494768, bodyCRC=225294519, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=699, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00583PullResult [pullStatus=FOUND, nextBeginOffset=700, minOffset=0, maxOffset=700, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=699, sysFlag=0, bornTimestamp=1469175033586, bornHost=/192.168.234.98:51987, storeTimestamp=1469175022113, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000078F0C, commitLogOffset=495372, bodyCRC=1670775117, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=700, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00591PullResult [pullStatus=FOUND, nextBeginOffset=701, minOffset=0, maxOffset=701, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=700, sysFlag=0, bornTimestamp=1469175037890, bornHost=/192.168.234.98:51987, storeTimestamp=1469175026418, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079168, commitLogOffset=495976, bodyCRC=344150304, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=701, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00599PullResult [pullStatus=FOUND, nextBeginOffset=702, minOffset=0, maxOffset=702, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=701, sysFlag=0, bornTimestamp=1469175042200, bornHost=/192.168.234.98:51987, storeTimestamp=1469175030734, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000793C4, commitLogOffset=496580, bodyCRC=442030354, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=702, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00603PullResult [pullStatus=FOUND, nextBeginOffset=703, minOffset=0, maxOffset=703, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=702, sysFlag=0, bornTimestamp=1469175042345, bornHost=/192.168.234.98:51987, storeTimestamp=1469175030872, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079620, commitLogOffset=497184, bodyCRC=688469276, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=703, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00607PullResult [pullStatus=FOUND, nextBeginOffset=704, minOffset=0, maxOffset=704, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=703, sysFlag=0, bornTimestamp=1469175042481, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031008, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007987C, commitLogOffset=497788, bodyCRC=778367237, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=704, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00611PullResult [pullStatus=FOUND, nextBeginOffset=705, minOffset=0, maxOffset=705, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=704, sysFlag=0, bornTimestamp=1469175042615, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031143, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079AD8, commitLogOffset=498392, bodyCRC=1578919281, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=705, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00615PullResult [pullStatus=FOUND, nextBeginOffset=706, minOffset=0, maxOffset=706, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=705, sysFlag=0, bornTimestamp=1469175042753, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031280, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079D34, commitLogOffset=498996, bodyCRC=1500619112, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=706, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00619PullResult [pullStatus=FOUND, nextBeginOffset=707, minOffset=0, maxOffset=707, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=706, sysFlag=0, bornTimestamp=1469175042887, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031414, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000079F90, commitLogOffset=499600, bodyCRC=1355279683, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=707, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00623PullResult [pullStatus=FOUND, nextBeginOffset=708, minOffset=0, maxOffset=708, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=707, sysFlag=0, bornTimestamp=1469175043021, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031548, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A1EC, commitLogOffset=500204, bodyCRC=457136030, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=708, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00627PullResult [pullStatus=FOUND, nextBeginOffset=709, minOffset=0, maxOffset=709, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=708, sysFlag=0, bornTimestamp=1469175043154, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031681, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A448, commitLogOffset=500808, bodyCRC=475173767, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=709, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00631PullResult [pullStatus=FOUND, nextBeginOffset=710, minOffset=0, maxOffset=710, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=709, sysFlag=0, bornTimestamp=1469175043299, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031826, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A6A4, commitLogOffset=501412, bodyCRC=1814693875, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=710, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00635PullResult [pullStatus=FOUND, nextBeginOffset=711, minOffset=0, maxOffset=711, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=710, sysFlag=0, bornTimestamp=1469175043435, bornHost=/192.168.234.98:51987, storeTimestamp=1469175031962, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007A900, commitLogOffset=502016, bodyCRC=1799865322, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=711, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=0, sysFlag=0, bornTimestamp=1468572196808, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191827, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011C60, commitLogOffset=72800, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=1, sysFlag=0, bornTimestamp=1468572196876, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191895, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011EB0, commitLogOffset=73392, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=148, queueOffset=2, sysFlag=0, bornTimestamp=1468572196903, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191928, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012100, commitLogOffset=73984, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]RocketMQ 00001PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=3, sysFlag=0, bornTimestamp=1468572718149, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713175, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000001222B, commitLogOffset=74283, bodyCRC=1133127810, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]RocketMQ 00005PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=4, sysFlag=0, bornTimestamp=1468572718178, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713210, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012487, commitLogOffset=74887, bodyCRC=1156050075, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]......[queueId=1, storeSize=151, queueOffset=22, sysFlag=0, bornTimestamp=1469170324786, bornHost=/192.168.234.98:49814, storeTimestamp=1469170313333, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D3AA, commitLogOffset=1102762, bodyCRC=1707898805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00477PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=23, sysFlag=0, bornTimestamp=1469170325237, bornHost=/192.168.234.98:49814, storeTimestamp=1469170313771, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D606, commitLogOffset=1103366, bodyCRC=1654764460, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00481PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=24, sysFlag=0, bornTimestamp=1469170325652, bornHost=/192.168.234.98:49814, storeTimestamp=1469170314163, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010D862, commitLogOffset=1103970, bodyCRC=207227478, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00485PullResult [pullStatus=FOUND, nextBeginOffset=62, minOffset=0, maxOffset=205, msgFoundList=31]MessageExt [queueId=1, storeSize=151, queueOffset=25, sysFlag=0, bornTimestamp=1469170326066, bornHost=/192.168.234.98:49814, storeTimestamp=1469170314595, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000010DABE, commitLogOffset=1104574, bodyCRC=188206671, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=205, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]......RocketMQ 01370PullResult [pullStatus=FOUND, nextBeginOffset=895, minOffset=0, maxOffset=895, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=894, sysFlag=0, bornTimestamp=1469175070573, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059101, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095A89, commitLogOffset=613001, bodyCRC=1094080495, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=895, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01374PullResult [pullStatus=FOUND, nextBeginOffset=896, minOffset=0, maxOffset=896, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=895, sysFlag=0, bornTimestamp=1469175070712, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059251, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095CE5, commitLogOffset=613605, bodyCRC=1180406774, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=896, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01378PullResult [pullStatus=FOUND, nextBeginOffset=897, minOffset=0, maxOffset=897, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=896, sysFlag=0, bornTimestamp=1469175070899, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059427, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000095F41, commitLogOffset=614209, bodyCRC=1340989405, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=897, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01382PullResult [pullStatus=FOUND, nextBeginOffset=898, minOffset=0, maxOffset=898, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=897, sysFlag=0, bornTimestamp=1469175071054, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059582, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000009619D, commitLogOffset=614813, bodyCRC=681585164, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=898, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01386PullResult [pullStatus=FOUND, nextBeginOffset=899, minOffset=0, maxOffset=899, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=898, sysFlag=0, bornTimestamp=1469175071203, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059731, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000963F9, commitLogOffset=615417, bodyCRC=802024981, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=899, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01390PullResult [pullStatus=FOUND, nextBeginOffset=900, minOffset=0, maxOffset=900, msgFoundList=1]MessageExt [queueId=2, storeSize=151, queueOffset=899, sysFlag=0, bornTimestamp=1469175071338, bornHost=/192.168.234.98:51987, storeTimestamp=1469175059866, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF0000000000096655, commitLogOffset=616021, bodyCRC=1605728865, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=900, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=0, sysFlag=0, bornTimestamp=1468571752640, bornHost=/192.168.234.98:56433, storeTimestamp=1468571747895, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011B38, commitLogOffset=72504, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=1, sysFlag=0, bornTimestamp=1468572196772, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191803, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011BCC, commitLogOffset=72652, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=2, sysFlag=0, bornTimestamp=1468572196865, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191886, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000011E1C, commitLogOffset=73244, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]Hello MetaQPullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=148, queueOffset=3, sysFlag=0, bornTimestamp=1468572196899, bornHost=/192.168.234.98:56837, storeTimestamp=1468572191917, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000001206C, commitLogOffset=73836, bodyCRC=1751783629, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=11]]RocketMQ 00000PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=151, queueOffset=4, sysFlag=0, bornTimestamp=1468572718127, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713166, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000012194, commitLogOffset=74132, bodyCRC=881661972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]RocketMQ 00004PullResult [pullStatus=FOUND, nextBeginOffset=31, minOffset=0, maxOffset=209, msgFoundList=31]MessageExt [queueId=0, storeSize=151, queueOffset=5, sysFlag=0, bornTimestamp=1468572718170, bornHost=/192.168.234.98:57165, storeTimestamp=1468572713197, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000000123F0, commitLogOffset=74736, bodyCRC=870374413, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=209, KEYS=OrderID188, WAIT=true, TAGS=TagA}, body=14]]......RocketMQ 00560PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=140, sysFlag=0, bornTimestamp=1469175032756, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021285, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126922, commitLogOffset=1206562, bodyCRC=1679588729, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00568PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=141, sysFlag=0, bornTimestamp=1469175033043, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021570, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126B7E, commitLogOffset=1207166, bodyCRC=1791489355, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 00576PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=142, sysFlag=0, bornTimestamp=1469175033320, bornHost=/192.168.234.98:51986, storeTimestamp=1469175021848, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000126DDA, commitLogOffset=1207770, bodyCRC=342157581, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01392PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=143, sysFlag=0, bornTimestamp=1469175071411, bornHost=/192.168.234.98:52034, storeTimestamp=1469175059951, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000127036, commitLogOffset=1208374, bodyCRC=834345805, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01400PullResult [pullStatus=FOUND, nextBeginOffset=210, minOffset=0, maxOffset=210, msgFoundList=24]MessageExt [queueId=0, storeSize=151, queueOffset=144, sysFlag=0, bornTimestamp=1469175071746, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060289, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F0000000000127292, commitLogOffset=1208978, bodyCRC=188274605, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=210, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01408PullResult [pullStatus=FOUND, nextBeginOffset=211, minOffset=0, maxOffset=211, msgFoundList=1]MessageExt [queueId=0, storeSize=151, queueOffset=145, sysFlag=0, bornTimestamp=1469175072078, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060614, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000001274EE, commitLogOffset=1209582, bodyCRC=98787231, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=211, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]RocketMQ 01416PullResult [pullStatus=FOUND, nextBeginOffset=214, minOffset=0, maxOffset=214, msgFoundList=3]MessageExt [queueId=0, storeSize=151, queueOffset=146, sysFlag=0, bornTimestamp=1469175072405, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060934, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000012774A, commitLogOffset=1210186, bodyCRC=2067809241, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=214, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
可以看到,Consumer 在?broker-a
?宕機時間的附近,也出現了異常,connect to <192.168.232.23:10911> failed
。雖然還能保持分區上的順序性,但是已經某種程度上出現了一些紊亂,例如,將我在實驗之前的數據給取了出來(Hello MetaQ
的消息)。可是,我在實驗前,明明做過刪除這個 Topic 的動作,看來 RocketMQ 所謂的刪除,并未刪除 Topic 的數據。之后,broker-a
?主機重啟之后,又恢復正常。
RocketMQ Pull模式消費需要手動管理 offset 和指定分區,這個在調用的時候不覺得,實際運行的時候才會發現每次總是消費一個分區,消費完之后,才開始消費下一個分區,而下一個分區可能已經堆積了很多消息了,手動做消息分配又比較費事。或許,Push 順序模式消費才是更好的選擇。
另外還有幾個比較異常的情況,實驗中有幾次出現了?CODE: 17 DESC: topic[TopicTest1] not exist, apply first please!
?這樣的錯誤,實際上,這時候我只是關掉了 Producer;還有,sh mqadmin updateTopic –n 192.168.232.23:9876 –c DefaultCluster –t TopicTest1
?明明文檔中說可以用來新增 Topic,而實際上不行。
補充一下:之后,我又使用 Push 順序模式消費重做了上述實驗,結論差不多。只是因為有多線程的原因,日志看起來偶爾有錯位,這個問題不大,可以解決。而且,在關閉重啟 Broker 的附近,往往伴隨著多次的消息重發,不過,RocketMQ 也不保證消息只收到一次就是了。消息重復的問題,Kafka 要比 RocketMQ 顯得不那么嚴重一些。Push 順序模式消費不需要指定 offset,不需要指定分區,第二次啟動可以自動從前一次的 offset 后開始消費。功能上這個與 Kafka 的 Consumer 更類似,雖然 RocketMQ 采用的是異步模式。
RocketMQ 最佳實踐
實際上,RocketMQ 自己就有一份《RocketMQ 最佳實踐》的文檔,里面提到了一些系統設計的問題,例如消費者要冪等,一個應用對應一個 Topic,如此等等。這些經驗不僅僅是對 RocketMQ 有用,對 Kafka 也頗有借鑒意義。
后記
這里談談我對選擇 RocketMQ 還是 Kafka 的個人建議。以上已經做了多處 RocketMQ 和 Kafka 的對比,我個人覺得,Kafka 是一個不斷發展中的系統,開源社區比 RocketMQ 要大,也要更活躍一些;另外,Kafka 最新版本已經有了同步復制,消息可靠性更有保障;還有,Kafka 的分區機制,幾乎實現了自動負載均衡,這絕對是個殺手級特性;RocketMQ 雖然提供了很多易用的功能,遠超出 Kafka,但這些功能并不一定都能用得上,而且多數可以繞過。相比之下,Kafka 的基本功能更加吸引我,再處理故障恢復的時候,細節上要勝過 RocketMQ。當然,如果是 A 公司內部,或者所在公司使用了 A 公司的云產品,那么 RocketMQ 的企業級特性更多一些,或許我會選擇 RocketMQ。
?
轉自:http://valleylord.github.io/post/201607-mq-rocketmq/