轉自:https://blog.csdn.net/zz775854904/article/details/81092892
MQ全稱為Message Queue,?消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用于諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求..
開始正題!
在開發之前需要下載rabbitmq, 而在rabbitmq安裝之前,童鞋們需要安裝erlang, 因為rabbitmq是用erlang寫的.
安裝完畢之后,我們建立一個maven項目.然后我們開始配置項目.
- <!-- spring版本號 -->
- <spring.version>3.2.8.RELEASE</spring.version>
- <!-- 添加Spring依賴 -->
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-core</artifactId>
- <version>${spring.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-webmvc</artifactId>
- <version>${spring.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- <version>${spring.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context-support</artifactId>
- <version>${spring.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-aop</artifactId>
- <version>${spring.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-aspects</artifactId>
- <version>${spring.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-tx</artifactId>
- <version>${spring.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-jdbc</artifactId>
- <version>${spring.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-web</artifactId>
- <version>${spring.version}</version>
- </dependency>
?
由于是spring整合,我們需要加入spring的依賴.
- <!--rabbitmq依賴 -->
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit</artifactId>
- <version>1.3.5.RELEASE</version>
- </dependency>
依賴加好了之后, 我們需要定義消息生產者和消息發送者.
由于exchange有幾種,這里我只測試了兩種, 通過分別定義兩個exchange去綁定direct和topic..
首先, 定義消息生產者, 通過配置將template鏈接connect-factory并注入到代碼中使用.
- package com.chris.producer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import java.io.IOException;
- /**
- * Created by wuxing on 2016/9/21.
- */
- @Service
- public class MessageProducer {
- private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
- @Resource(name="amqpTemplate")
- private AmqpTemplate amqpTemplate;
- @Resource(name="amqpTemplate2")
- private AmqpTemplate amqpTemplate2;
- public void sendMessage(Object message) throws IOException {
- logger.info("to send message:{}", message);
- amqpTemplate.convertAndSend("queueTestKey", message);
- amqpTemplate.convertAndSend("queueTestChris", message);
- amqpTemplate2.convertAndSend("wuxing.xxxx.wsdwd", message);
- }
- }
?
然后我們定義消息消費者, 這里,我定義了三個消費者, 通過監聽消息隊列, 分別接受各自所匹配的消息.
第一個消費者, 接受direct的消息, 他的exchange為exchangeTest, ?rout-key為queueTestKey
- package com.chris.consumer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageListener;
- /**
- * Created by wuxing on 2016/9/21.
- */
- public class MessageConsumer implements MessageListener {
- private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
-
- public void onMessage(Message message) {
- logger.info("consumer receive message------->:{}", message);
- }
- }
?
第二個消費者, 接受direct的消息(為了測試一個exchange可以發送多個消息), 他的exchange為exchangeTest, ?rout-key為queueTestChris.
- package com.chris.consumer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageListener;
- /**
- * Created by wuxing on 2016/9/21.
- */
- public class ChrisConsumer implements MessageListener {
- private Logger logger = LoggerFactory.getLogger(ChrisConsumer.class);
-
- public void onMessage(Message message) {
- logger.info("chris receive message------->:{}", message);
- }
- }
第三個消費者, 接受topic的消息他的exchange為exchangeTest2, ?pattern為wuxing.*.. 網上說.*可以匹配一個, .#可以匹配一個或多個..但是筆者好像兩個都試了..都可以匹配一個或多個..不知道什么鬼...
- package com.chris.consumer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageListener;
- /**
- * Created by wuxing on 2016/9/21.
- */
- public class WuxingConsumer implements MessageListener {
- private Logger logger = LoggerFactory.getLogger(WuxingConsumer.class);
-
- public void onMessage(Message message) {
- logger.info("wuxing receive message------->:{}", message);
- }
- }
?
然后就是關鍵的地方了..rabbit整合spring的配置文件.
-
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
- http://www.springframework.org/schema/rabbit
- http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
- <!--配置connection-factory,指定連接rabbit server參數 -->
- <rabbit:connection-factory id="connectionFactory"
- username="guest" password="guest" host="localhost" port="5672" />
- <!--定義rabbit template用于數據的接收和發送 -->
- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
- exchange="exchangeTest"/>
- <!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 -->
- <rabbit:admin id="connectAdmin" connection-factory="connectionFactory"/>
- <!--定義queue -->
- <rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin"/>
- <!-- 定義direct exchange,綁定queueTest -->
- <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false" declared-by="connectAdmin">
- <rabbit:bindings>
- <rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
- </rabbit:bindings>
- </rabbit:direct-exchange>
- <!-- 消息接收者 -->
- <bean id="messageReceiver" class="com.chris.consumer.MessageConsumer"></bean>
- <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象-->
- <rabbit:listener-container connection-factory="connectionFactory">
- <rabbit:listener queues="queueTest" ref="messageReceiver"/>
- </rabbit:listener-container>
- <!--定義queue -->
- <rabbit:queue name="queueChris" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin"/>
- <!-- 定義direct exchange,綁定queueTest -->
- <rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false" declared-by="connectAdmin">
- <rabbit:bindings>
- <rabbit:binding queue="queueChris" key="queueTestChris"></rabbit:binding>
- </rabbit:bindings>
- </rabbit:direct-exchange>
- <!-- 消息接收者 -->
- <bean id="receiverChris" class="com.chris.consumer.ChrisConsumer"></bean>
- <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象-->
- <rabbit:listener-container connection-factory="connectionFactory">
- <rabbit:listener queues="queueChris" ref="receiverChris"/>
- </rabbit:listener-container>
- <!-- 分隔線 -->
- <!--配置connection-factory,指定連接rabbit server參數 -->
- <rabbit:connection-factory id="connectionFactory2"
- username="guest" password="guest" host="localhost" port="5672"/>
- <!--定義rabbit template用于數據的接收和發送 -->
- <rabbit:template id="amqpTemplate2" connection-factory="connectionFactory2"
- exchange="exchangeTest2"/>
- <!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 -->
- <rabbit:admin id="connectAdmin2" connection-factory="connectionFactory2"/>
- <!--定義queue -->
- <rabbit:queue name="queueWuxing" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin2"/>
- <!-- 定義direct exchange,綁定queueTest -->
- <rabbit:topic-exchange name="exchangeTest2" durable="true" auto-delete="false" declared-by="connectAdmin2">
- <rabbit:bindings>
- <rabbit:binding queue="queueWuxing" pattern="wuxing.*"></rabbit:binding>
- </rabbit:bindings>
- </rabbit:topic-exchange>
- <!-- 消息接收者 -->
- <bean id="recieverWuxing" class="com.chris.consumer.WuxingConsumer"></bean>
- <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象-->
- <rabbit:listener-container connection-factory="connectionFactory2" >
- <rabbit:listener queues="queueWuxing" ref="recieverWuxing"/>
- </rabbit:listener-container>
- </beans>
?
這里,有個問題筆者研究了好久...就是如何定義兩個exchange, 一開始一直不成功..直到找到了一篇國外的文章才解決...
定義兩個exchange的時候, 需要用到declared-by..
而這個必須要引入下面的這個申明, 才有..
- http://www.springframework.org/schema/rabbit
- http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
?
文件中大概的配置解釋一下.
connect-factory進行連接rabbitmq服務.
template用于連接factory并指定exchange, 這上面還能直接指定rout-key.
admin相當于一個管理員的角色..可以將exchange和queue進行管理,?
queue和topic-exchange分別定義隊列和路由器, 這里需要用declared-by指定管理員,從而連接到相應的factory.
listener-container用于消費者的監聽(其實,rabbit配置中是可以指定某個類的某個方法的, 但是筆者失敗了, 還在試驗中...)
這里還有一個問題...需要大家注意..
當一個exchange綁定了一種類型之后, 這個exchange在配置就不能再換成另一種了.會一直報錯,?received 'direct' but current is 'topic' ?類似這種..
筆者這個也是被坑了若干時間去找問題...
?
然后貼下spring的基本配置
-
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
- <import resource="classpath*:rabbitmq.xml" />
- <!-- 掃描指定package下所有帶有如@controller,@services,@resource,@ods并把所注釋的注冊為Spring Beans -->
- <context:component-scan base-package="com.chris.consumer, com.chris.producer" />
- <!-- 激活annotation功能 -->
- <context:annotation-config />
- <!-- 激活annotation功能 -->
- <context:spring-configured />
- </beans>
?
然后是單元測試類, 這里通過輸出100-1慢慢遞減,去觀察控制臺消費者接收消息的情況.
- package com.chris;
- import com.chris.producer.MessageProducer;
- import org.junit.Before;
- import org.junit.Test;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.support.ClassPathXmlApplicationContext;
- /**
- * Created by wuxing on 2016/9/21.
- */
- public class MessageTest {
- private Logger logger = LoggerFactory.getLogger(MessageTest.class);
- private ApplicationContext context = null;
-
- public void setUp() throws Exception {
- context = new ClassPathXmlApplicationContext("application.xml");
- }
-
- public void should_send_a_amq_message() throws Exception {
- MessageProducer messageProducer = (MessageProducer) context.getBean("messageProducer");
- int a = 100;
- while (a > 0) {
- messageProducer.sendMessage("Hello, I am amq sender num :" + a--);
- try {
- //暫停一下,好讓消息消費者去取消息打印出來
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
然后控制臺的結果如下(這里只貼出關鍵信息, 其他配置的log的省略了)
- 2016-09-22 16:15:00,330 [main] INFO [com.chris.producer.MessageProducer] - to send message:Hello, I am amq sender num :100
- 2016-09-22 16:15:00,348 [main] DEBUG [org.springframework.amqp.rabbit.connection.CachingConnectionFactory] - Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,3)
- 2016-09-22 16:15:00,348 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3)
- 2016-09-22 16:15:00,349 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Publishing message on exchange [exchangeTest], routingKey = [queueTestKey]
- 2016-09-22 16:15:00,357 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3)
- 2016-09-22 16:15:00,358 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Publishing message on exchange [exchangeTest], routingKey = [queueTestChris]
- 2016-09-22 16:15:00,368 [main] DEBUG [org.springframework.amqp.rabbit.connection.CachingConnectionFactory] - Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,2)
- 2016-09-22 16:15:00,369 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2)
- 2016-09-22 16:15:00,369 [main] DEBUG [org.springframework.amqp.rabbit.core.RabbitTemplate] - Publishing message on exchange [exchangeTest2], routingKey = [wuxing.xxxx.wsdwd]
- 2016-09-22 16:15:00,370 [pool-1-thread-6] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Storing delivery for Consumer: tags=[[amq.ctag-hyW85GZHk-AHLLFJUmNLDQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
- 2016-09-22 16:15:00,372 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Received message: (Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestKey, deliveryTag=1, messageCount=0])
- 2016-09-22 16:15:00,373 [SimpleAsyncTaskExecutor-1] INFO [com.chris.consumer.MessageConsumer] - consumer receive message------->:(Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestKey, deliveryTag=1, messageCount=0])
- 2016-09-22 16:15:00,374 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Retrieving delivery for Consumer: tags=[[amq.ctag-hyW85GZHk-AHLLFJUmNLDQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
- 2016-09-22 16:15:00,379 [pool-2-thread-4] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Storing delivery for Consumer: tags=[[amq.ctag-T-c1red0T_HHyCFfpXLYIQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
- 2016-09-22 16:15:00,381 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Received message: (Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest2, receivedRoutingKey=wuxing.xxxx.wsdwd, deliveryTag=1, messageCount=0])
- 2016-09-22 16:15:00,382 [SimpleAsyncTaskExecutor-1] INFO [com.chris.consumer.WuxingConsumer] - wuxing receive message------->:(Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest2, receivedRoutingKey=wuxing.xxxx.wsdwd, deliveryTag=1, messageCount=0])
- 2016-09-22 16:15:00,383 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Retrieving delivery for Consumer: tags=[[amq.ctag-T-c1red0T_HHyCFfpXLYIQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
- 2016-09-22 16:15:00,396 [pool-1-thread-5] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Storing delivery for Consumer: tags=[[amq.ctag-h5ERpaWrnqmkNhbfM7S8Ww]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), acknowledgeMode=AUTO local queue size=0
- 2016-09-22 16:15:00,397 [SimpleAsyncTaskExecutor-1] DEBUG [org.springframework.amqp.rabbit.listener.BlockingQueueConsumer] - Received message: (Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestChris, deliveryTag=1, messageCount=0])
- 2016-09-22 16:15:00,398 [SimpleAsyncTaskExecutor-1] INFO [com.chris.consumer.ChrisConsumer] - chris receive message------->:(Body:'Hello, I am amq sender num :100'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchangeTest, receivedRoutingKey=queueTestChris, deliveryTag=1, messageCount=0])
我們可以看到生產者有發出一個信息, 然后發布在了三個通道上.
?
1. on exchange [exchangeTest] , routingKey = [queueTestKey]
2. on exchange [exchangeTest] , routingKey = [queueTestChris]
3. on exchange [exchangeTest2] , routingKey =?[wuxing.xxxx.wsdwd]
?
然后三個消費者分別收到了他們的消息..至此, 整個test就結束了.
對項目有興趣的童鞋可以拿項目的源碼玩一玩 ?源碼在這里