Spring整合JMS——基于ActiveMQ實現(一)
1.1???? JMS簡介
?????? JMS的全稱是Java Message Service,即Java消息服務。它主要用于在生產者和消費者之間進行消息傳遞,生產者負責產生消息,而消費者負責接收消息。把它應用到實際的業務需求中的話我們可以在特定的時候利用生產者生成一消息,并進行發送,對應的消費者在接收到對應的消息后去完成對應的業務邏輯。對于消息的傳遞有兩種類型,一種是點對點的,即一個生產者和一個消費者一一對應;另一種是發布/訂閱模式,即一個生產者產生消息并進行發送后,可以由多個消費者進行接收。
1.2???? Spring整合JMS
?????? 對JMS做了一個簡要介紹之后,接下來就講一下Spring整合JMS的具體過程。JMS只是一個標準,真正在使用它的時候我們需要有它的具體實現,這里我們就使用Apache的activeMQ來作為它的實現。所使用的依賴利用Maven來進行管理,具體依賴如下:
?
Xml代碼?
?
- <dependencies>??
- ????????<dependency>??
- ????????????<groupId>junit</groupId>??
- ????????????<artifactId>junit</artifactId>??
- ????????????<version>4.10</version>??
- ????????????<scope>test</scope>??
- ????????</dependency>??
- ????????<dependency>??
- ????????????<groupId>org.springframework</groupId>??
- ????????????<artifactId>spring-context</artifactId>??
- ????????????<version>${spring-version}</version>??
- ????????</dependency>??
- ????????<dependency>??
- ????????????<groupId>org.springframework</groupId>??
- ????????????<artifactId>spring-jms</artifactId>??
- ????????????<version>${spring-version}</version>??
- ????????</dependency>??
- ????????<dependency>??
- ????????????<groupId>org.springframework</groupId>??
- ????????????<artifactId>spring-test</artifactId>??
- ????????????<version>${spring-version}</version>??
- ????????</dependency>??
- ????????<dependency>??
- ????????????<groupId>javax.annotation</groupId>??
- ????????????<artifactId>jsr250-api</artifactId>??
- ????????????<version>1.0</version>??
- ????????</dependency>??
- ????????<dependency>??
- ????????????<groupId>org.apache.activemq</groupId>??
- ????????????<artifactId>activemq-core</artifactId>??
- ????????????<version>5.7.0</version>??
- ????????</dependency>??
- </dependencies>??



<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring-version}</version> </dependency> <dependency> <groupId>javax.annotation</groupId> <artifactId>jsr250-api</artifactId> <version>1.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency></dependencies>
?
1.2.1??activeMQ準備
?????? 既然是使用的apache的activeMQ作為JMS的實現,那么首先我們應該到apache官網上下載activeMQ(http://activemq.apache.org/download.html),進行解壓后運行其bin目錄下面的activemq.bat文件啟動activeMQ。
1.2.2配置ConnectionFactory
?????? ConnectionFactory是用于產生到JMS服務器的鏈接的,Spring為我們提供了多個ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory對于建立JMS服務器鏈接的請求會一直返回同一個鏈接,并且會忽略Connection的close方法調用。CachingConnectionFactory繼承了SingleConnectionFactory,所以它擁有SingleConnectionFactory的所有功能,同時它還新增了緩存功能,它可以緩存Session、MessageProducer和MessageConsumer。這里我們使用SingleConnectionFactory來作為示例。
Xml代碼?
?
- <bean?id="connectionFactory"?class="org.springframework.jms.connection.SingleConnectionFactory"/>??



<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>
?
?????? 這樣就定義好產生JMS服務器鏈接的ConnectionFactory了嗎?答案是非也。Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正產生到JMS服務器鏈接的ConnectionFactory還得是由JMS服務廠商提供,并且需要把它注入到Spring提供的ConnectionFactory中。我們這里使用的是ActiveMQ實現的JMS,所以在我們這里真正的可以產生Connection的就應該是由ActiveMQ提供的ConnectionFactory。所以定義一個ConnectionFactory的完整代碼應該如下所示:
Xml代碼?
?
- <!--?真正可以產生Connection的ConnectionFactory,由對應的?JMS服務廠商提供-->??
- <bean?id="targetConnectionFactory"?class="org.apache.activemq.ActiveMQConnectionFactory">??
- ????<property?name="brokerURL"?value="tcp://localhost:61616"/>??
- </bean>??
- ??
- <!--?Spring用于管理真正的ConnectionFactory的ConnectionFactory?-->??
- <bean?id="connectionFactory"?class="org.springframework.jms.connection.SingleConnectionFactory">??
- ????<!--?目標ConnectionFactory對應真實的可以產生JMS?Connection的ConnectionFactory?-->??
- ????<property?name="targetConnectionFactory"?ref="targetConnectionFactory"/>??
- </bean>??



<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean>
??
1.2.3配置生產者
配置好ConnectionFactory之后我們就需要配置生產者。生產者負責產生消息并發送到JMS服務器,這通常對應的是我們的一個業務邏輯服務實現類。但是我們的服務實現類是怎么進行消息的發送的呢?這通常是利用Spring為我們提供的JmsTemplate類來實現的,所以配置生產者其實最核心的就是配置進行消息發送的JmsTemplate。對于消息發送者而言,它在發送消息的時候要知道自己該往哪里發,為此,我們在定義JmsTemplate的時候需要往里面注入一個Spring提供的ConnectionFactory對象。
Xml代碼?
?
- <!--?Spring提供的JMS工具類,它可以進行消息發送、接收等?-->??
- <bean?id="jmsTemplate"?class="org.springframework.jms.core.JmsTemplate">??
- ????<!--?這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象?-->??
- ????<property?name="connectionFactory"?ref="connectionFactory"/>??
- </bean>??



<!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory"/> </bean>
?
?????? 在真正利用JmsTemplate進行消息發送的時候,我們需要知道消息發送的目的地,即destination。在Jms中有一個用來表示目的地的Destination接口,它里面沒有任何方法定義,只是用來做一個標識而已。當我們在使用JmsTemplate進行消息發送時沒有指定destination的時候將使用默認的Destination。默認Destination可以通過在定義jmsTemplate bean對象時通過屬性defaultDestination或defaultDestinationName來進行注入,defaultDestinationName對應的就是一個普通字符串。在ActiveMQ中實現了兩種類型的Destination,一個是點對點的ActiveMQQueue,另一個就是支持訂閱/發布模式的ActiveMQTopic。在定義這兩種類型的Destination時我們都可以通過一個name屬性來進行構造,如:
?
?
?
?
Xml代碼?
?
- <!--這個是隊列目的地,點對點的-->??
- <bean?id="queueDestination"?class="org.apache.activemq.command.ActiveMQQueue">??
- ????<constructor-arg>??
- ????????<value>queue</value>??
- ????</constructor-arg>??
- </bean>??
- <!--這個是主題目的地,一對多的-->??
- <bean?id="topicDestination"?class="org.apache.activemq.command.ActiveMQTopic">??
- ????<constructor-arg?value="topic"/>??
- </bean>??



<!--這個是隊列目的地,點對點的--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic"/> </bean>
?
?
?
?????? 假設我們定義了一個ProducerService,里面有一個向Destination發送純文本消息的方法sendMessage,那么我們的代碼就大概是這個樣子:
?
?
?
?
Java代碼?
?
- package?com.tiantian.springintejms.service.impl; ??
- ? ??
- import?javax.annotation.Resource; ??
- import?javax.jms.Destination; ??
- import?javax.jms.JMSException; ??
- import?javax.jms.Message; ??
- import?javax.jms.Session; ??
- ? ??
- import?org.springframework.jms.core.JmsTemplate; ??
- import?org.springframework.jms.core.MessageCreator; ??
- import?org.springframework.stereotype.Component; ??
- ? ??
- import?com.tiantian.springintejms.service.ProducerService; ??
- ? ??
- @Component??
- public?class?ProducerServiceImpl?implements?ProducerService?{ ??
- ? ??
- ????private?JmsTemplate?jmsTemplate; ??
- ???? ??
- ????public?void?sendMessage(Destination?destination,?final?String?message)?{ ??
- ????????System.out.println("---------------生產者發送消息-----------------"); ??
- ????????System.out.println("---------------生產者發了一個消息:"?+?message); ??
- ????????jmsTemplate.send(destination,?new?MessageCreator()?{ ??
- ????????????public?Message?createMessage(Session?session)?throws?JMSException?{ ??
- ????????????????return?session.createTextMessage(message); ??
- ????????????} ??
- ????????}); ??
- ????}? ??
- ??
- ????public?JmsTemplate?getJmsTemplate()?{ ??
- ????????returnjmsTemplate; ??
- ????}? ??
- ??
- ????@Resource??
- ????public?void?setJmsTemplate(JmsTemplate?jmsTemplate)?{ ??
- ????????this.jmsTemplate?=?jmsTemplate; ??
- ????} ??
- ? ??
- }??



package com.tiantian.springintejms.service.impl; import javax.annotation.Resource;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session; import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.stereotype.Component; import com.tiantian.springintejms.service.ProducerService; @Componentpublic class ProducerServiceImpl implements ProducerService { private JmsTemplate jmsTemplate; public void sendMessage(Destination destination, final String message) { System.out.println("---------------生產者發送消息-----------------"); System.out.println("---------------生產者發了一個消息:" + message); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } public JmsTemplate getJmsTemplate() { returnjmsTemplate; } @Resource public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
?
?
?
?????? 我們可以看到在sendMessage方法體里面我們是通過jmsTemplate來發送消息到對應的Destination的。到此,我們生成一個簡單的文本消息并把它發送到指定目的地Destination的生產者就配置好了。
1.2.4配置消費者
生產者往指定目的地Destination發送消息后,接下來就是消費者對指定目的地的消息進行消費了。那么消費者是如何知道有生產者發送消息到指定目的地Destination了呢?這是通過Spring為我們封裝的消息監聽容器MessageListenerContainer實現的,它負責接收信息,并把接收到的信息分發給真正的MessageListener進行處理。每個消費者對應每個目的地都需要有對應的MessageListenerContainer。對于消息監聽容器而言,除了要知道監聽哪個目的地之外,還需要知道到哪里去監聽,也就是說它還需要知道去監聽哪個JMS服務器,這是通過在配置MessageConnectionFactory的時候往里面注入一個ConnectionFactory來實現的。所以我們在配置一個MessageListenerContainer的時候有三個屬性必須指定,一個是表示從哪里監聽的ConnectionFactory;一個是表示監聽什么的Destination;一個是接收到消息以后進行消息處理的MessageListener。Spring一共為我們提供了兩種類型的MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。
SimpleMessageListenerContainer會在一開始的時候就創建一個會話session和消費者Consumer,并且會使用標準的JMS MessageConsumer.setMessageListener()方法注冊監聽器讓JMS提供者調用監聽器的回調函數。它不會動態的適應運行時需要和參與外部的事務管理。兼容性方面,它非常接近于獨立的JMS規范,但一般不兼容Java EE的JMS限制。
大多數情況下我們還是使用的DefaultMessageListenerContainer,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer會動態的適應運行時需要,并且能夠參與外部的事務管理。它很好的平衡了對JMS提供者要求低、先進功能如事務參與和兼容Java EE環境。
定義處理消息的MessageListener
?????? 要定義處理消息的MessageListener我們只需要實現JMS規范中的MessageListener接口就可以了。MessageListener接口中只有一個方法onMessage方法,當接收到消息的時候會自動調用該方法。
?
?
?
?
Java代碼?
?
- package?com.tiantian.springintejms.listener; ??
- ? ??
- import?javax.jms.JMSException; ??
- import?javax.jms.Message; ??
- import?javax.jms.MessageListener; ??
- import?javax.jms.TextMessage; ??
- ? ??
- public?class?ConsumerMessageListener?implements?MessageListener?{ ??
- ? ??
- ????public?void?onMessage(Message?message)?{ ??
- ????????//這里我們知道生產者發送的就是一個純文本消息,所以這里可以直接進行強制轉換,或者直接把onMessage方法的參數改成Message的子類TextMessage??
- ????????TextMessage?textMsg?=?(TextMessage)?message; ??
- ????????System.out.println("接收到一個純文本消息。"); ??
- ????????try?{ ??
- ????????????System.out.println("消息內容是:"?+?textMsg.getText()); ??
- ????????}?catch?(JMSException?e)?{ ??
- ????????????e.printStackTrace(); ??
- ????????} ??
- ????} ??
- ? ??
- }??



package com.tiantian.springintejms.listener; import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage; public class ConsumerMessageListener implements MessageListener { public void onMessage(Message message) { //這里我們知道生產者發送的就是一個純文本消息,所以這里可以直接進行強制轉換,或者直接把onMessage方法的參數改成Message的子類TextMessage TextMessage textMsg = (TextMessage) message; System.out.println("接收到一個純文本消息。"); try { System.out.println("消息內容是:" + textMsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
?
??
?
?????? 有了MessageListener之后我們就可以在Spring的配置文件中配置一個消息監聽容器了。
Xml代碼?
?
- <!--這個是隊列目的地-->??
- <bean?id="queueDestination"?class="org.apache.activemq.command.ActiveMQQueue">??
- ????<constructor-arg>??
- ????????<value>queue</value>??
- ????</constructor-arg>??
- </bean>??
- <!--?消息監聽器?-->??
- <bean?id="consumerMessageListener"?class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>???? ??
- ??
- <!--?消息監聽容器?-->??
- <bean?id="jmsContainer"????????class="org.springframework.jms.listener.DefaultMessageListenerContainer">??
- ????<property?name="connectionFactory"?ref="connectionFactory"?/>??
- ????<property?name="destination"?ref="queueDestination"?/>??
- ????<property?name="messageListener"?ref="consumerMessageListener"?/>??
- </bean>??



<!--這個是隊列目的地--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>queue</value> </constructor-arg> </bean> <!-- 消息監聽器 --> <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/> <!-- 消息監聽容器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> </bean>
?
?
?????? 我們可以看到我們定義了一個名叫queue的ActiveMQQueue目的地,我們的監聽器就是監聽了發送到這個目的地的消息。
?????? 至此我們的生成者和消費者都配置完成了,這也就意味著我們的整合已經完成了。這個時候完整的Spring的配置文件應該是這樣的:
Xml代碼?
?
- <?xml?version="1.0"?encoding="UTF-8"?>??
- <beans?xmlns="http://www.springframework.org/schema/beans"??
- ????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"?xmlns:context="http://www.springframework.org/schema/context"??
- ????xmlns:jms="http://www.springframework.org/schema/jms"??
- ????xsi:schemaLocation="http://www.springframework.org/schema/beans ??
- ?????http://www.springframework.org/schema/beans/spring-beans-3.0.xsd ??
- ?????http://www.springframework.org/schema/context ??
- ?????http://www.springframework.org/schema/context/spring-context-3.0.xsd ??
- ????http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans-3.0.xsd ??
- ????http://www.springframework.org/schema/jms?http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">??
- ? ??
- ????<context:component-scan?base-package="com.tiantian"?/>??
- ? ??
- ????<!--?Spring提供的JMS工具類,它可以進行消息發送、接收等?-->??
- ????<bean?id="jmsTemplate"?class="org.springframework.jms.core.JmsTemplate">??
- ????????<!--?這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象?-->??
- ????????<property?name="connectionFactory"?ref="connectionFactory"/>??
- ????</bean>??
- ???? ??
- ????<!--?真正可以產生Connection的ConnectionFactory,由對應的?JMS服務廠商提供-->??
- ????<bean?id="targetConnectionFactory"?class="org.apache.activemq.ActiveMQConnectionFactory">??
- ????????<property?name="brokerURL"?value="tcp://localhost:61616"/>??
- ????</bean>??
- ???? ??
- ????<!--?Spring用于管理真正的ConnectionFactory的ConnectionFactory?-->??
- ????<bean?id="connectionFactory"?class="org.springframework.jms.connection.SingleConnectionFactory">??
- ????????<!--?目標ConnectionFactory對應真實的可以產生JMS?Connection的ConnectionFactory?-->??
- ????????<property?name="targetConnectionFactory"?ref="targetConnectionFactory"/>??
- ????</bean>??
- ???? ??
- ????<!--這個是隊列目的地-->??
- ????<bean?id="queueDestination"?class="org.apache.activemq.command.ActiveMQQueue">??
- ????????<constructor-arg>??
- ????????????<value>queue</value>??
- ????????</constructor-arg>??
- ????</bean>??
- ????<!--?消息監聽器?-->??
- ????<bean?id="consumerMessageListener"?class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>??
- ????<!--?消息監聽容器?-->??
- ????<bean?id="jmsContainer"??
- ????????class="org.springframework.jms.listener.DefaultMessageListenerContainer">??
- ????????<property?name="connectionFactory"?ref="connectionFactory"?/>??
- ????????<property?name="destination"?ref="queueDestination"?/>??
- ????????<property?name="messageListener"?ref="consumerMessageListener"?/>??
- ????</bean>??
- </beans>??



<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"> <context:component-scan base-package="com.tiantian" /> <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!--這個是隊列目的地--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>queue</value> </constructor-arg> </bean> <!-- 消息監聽器 --> <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/> <!-- 消息監聽容器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> </bean></beans>
?
?
?????? 接著我們來測試一下,看看我們的整合是否真的成功了,測試代碼如下:
?
?
?
?
Java代碼?
?
- package?com.tiantian.springintejms.test; ??
- ? ??
- import?javax.jms.Destination; ??
- ? ??
- import?org.junit.Test; ??
- import?org.junit.runner.RunWith; ??
- import?org.springframework.beans.factory.annotation.Autowired; ??
- import?org.springframework.beans.factory.annotation.Qualifier; ??
- import?org.springframework.test.context.ContextConfiguration; ??
- import?org.springframework.test.context.junit4.SpringJUnit4ClassRunner; ??
- import?com.tiantian.springintejms.service.ProducerService; ??
- ? ??
- @RunWith(SpringJUnit4ClassRunner.class) ??
- @ContextConfiguration("/applicationContext.xml") ??
- public?class?ProducerConsumerTest?{ ??
- ? ??
- ????@Autowired??
- ????private?ProducerService?producerService; ??
- ????@Autowired??
- ????@Qualifier("queueDestination") ??
- ????private?Destination?destination; ??
- ???? ??
- ????@Test??
- ????public?void?testSend()?{ ??
- ????????for?(int?i=0;?i<2;?i++)?{ ??
- ????????????producerService.sendMessage(destination,?"你好,生產者!這是消息:"?+?(i+1)); ??
- ????????} ??
- ????} ??
- ???? ??
- }??



package com.tiantian.springintejms.test; import javax.jms.Destination; import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;import com.tiantian.springintejms.service.ProducerService; @RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("/applicationContext.xml")public class ProducerConsumerTest { @Autowired private ProducerService producerService; @Autowired @Qualifier("queueDestination") private Destination destination; @Test public void testSend() { for (int i=0; i<2; i++) { producerService.sendMessage(destination, "你好,生產者!這是消息:" + (i+1)); } } }
?
?
?
?????? 在上面的測試代碼中我們利用生產者發送了兩個消息,正常來說,消費者應該可以接收到這兩個消息。運行測試代碼后控制臺輸出如下:
?

?
?????? 看,控制臺已經進行了正確的輸出,這說明我們的整合確實是已經成功了。
?
Spring整合JMS(二)——消息監聽器
?
3.1???? 消息監聽器MessageListener
?????? 在Spring整合JMS的應用中我們在定義消息監聽器的時候一共可以定義三種類型的消息監聽器,分別是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。下面就分別來介紹一下這幾種類型的區別。
3.1.1??MessageListener
MessageListener是最原始的消息監聽器,它是JMS規范中定義的一個接口。其中定義了一個用于處理接收到的消息的onMessage方法,該方法只接收一個Message參數。我們前面在講配置消費者的時候用的消息監聽器就是MessageListener,代碼如下:
?



- import?javax.jms.JMSException; ??
- import?javax.jms.Message; ??
- import?javax.jms.MessageListener; ??
- import?javax.jms.TextMessage; ??
- ? ??
- public?class?ConsumerMessageListener?implements?MessageListener?{ ??
- ? ??
- ????public?void?onMessage(Message?message)?{ ??
- ????????//這里我們知道生產者發送的就是一個純文本消息,所以這里可以直接進行強制轉換,或者直接把onMessage方法的參數改成Message的子類TextMessage??
- ????????TextMessage?textMsg?=?(TextMessage)?message; ??
- ????????System.out.println("接收到一個純文本消息。"); ??
- ????????try?{ ??
- ????????????System.out.println("消息內容是:"?+?textMsg.getText()); ??
- ????????}?catch?(JMSException?e)?{ ??
- ????????????e.printStackTrace(); ??
- ????????} ??
- ????} ??
- ? ??
- }??
import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage; public class ConsumerMessageListener implements MessageListener { public void onMessage(Message message) { //這里我們知道生產者發送的就是一個純文本消息,所以這里可以直接進行強制轉換,或者直接把onMessage方法的參數改成Message的子類TextMessage TextMessage textMsg = (TextMessage) message; System.out.println("接收到一個純文本消息。"); try { System.out.println("消息內容是:" + textMsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
??
3.1.2??SessionAwareMessageListener
SessionAwareMessageListener是Spring為我們提供的,它不是標準的JMS MessageListener。MessageListener的設計只是純粹用來接收消息的,假如我們在使用MessageListener處理接收到的消息時我們需要發送一個消息通知對方我們已經收到這個消息了,那么這個時候我們就需要在代碼里面去重新獲取一個Connection或Session。SessionAwareMessageListener的設計就是為了方便我們在接收到消息后發送一個回復的消息,它同樣為我們提供了一個處理接收到的消息的onMessage方法,但是這個方法可以同時接收兩個參數,一個是表示當前接收到的消息Message,另一個就是可以用來發送消息的Session對象。先來看一段代碼:



- package?com.tiantian.springintejms.listener; ??
- ? ??
- import?javax.jms.Destination; ??
- import?javax.jms.JMSException; ??
- import?javax.jms.Message; ??
- import?javax.jms.MessageProducer; ??
- import?javax.jms.Session; ??
- import?javax.jms.TextMessage; ??
- ? ??
- import?org.springframework.jms.listener.SessionAwareMessageListener; ??
- ? ??
- public?class?ConsumerSessionAwareMessageListener?implements??
- ????????SessionAwareMessageListener<TextMessage>?{ ??
- ? ??
- ????private?Destination?destination; ??
- ???? ??
- ????public?void?onMessage(TextMessage?message,?Session?session)?throws?JMSException?{ ??
- ????????System.out.println("收到一條消息"); ??
- ????????System.out.println("消息內容是:"?+?message.getText()); ??
- ????????MessageProducer?producer?=?session.createProducer(destination); ??
- ????????Message?textMessage?=?session.createTextMessage("ConsumerSessionAwareMessageListener。。。"); ??
- ????????producer.send(textMessage); ??
- ????} ??
- ? ??
- ????public?Destination?getDestination()?{ ??
- ????????returndestination; ??
- ????} ??
- ? ??
- ????public?void?setDestination(Destination?destination)?{ ??
- ????????this.destination?=?destination; ??
- ????} ??
- ? ??
- }??
package com.tiantian.springintejms.listener; import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage; import org.springframework.jms.listener.SessionAwareMessageListener; public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener<TextMessage> { private Destination destination; public void onMessage(TextMessage message, Session session) throws JMSException { System.out.println("收到一條消息"); System.out.println("消息內容是:" + message.getText()); MessageProducer producer = session.createProducer(destination); Message textMessage = session.createTextMessage("ConsumerSessionAwareMessageListener。。。"); producer.send(textMessage); } public Destination getDestination() { returndestination; } public void setDestination(Destination destination) { this.destination = destination; } }
?
?????? 在上面代碼中我們定義了一個SessionAwareMessageListener,在這個Listener中我們在接收到了一個消息之后,利用對應的Session創建了一個到destination的生產者和對應的消息,然后利用創建好的生產者發送對應的消息。
?????? 接著我們在Spring的配置文件中配置該消息監聽器將處理來自一個叫sessionAwareQueue的目的地的消息,并且往該MessageListener中通過set方法注入其屬性destination的值為queueDestination。這樣當我們的SessionAwareMessageListener接收到消息之后就會往queueDestination發送一個消息。



- <?xml?version="1.0"?encoding="UTF-8"?>??
- <beans?xmlns="http://www.springframework.org/schema/beans"??
- ????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"?xmlns:context="http://www.springframework.org/schema/context"??
- ????xmlns:jms="http://www.springframework.org/schema/jms"??
- ????xsi:schemaLocation="http://www.springframework.org/schema/beans ??
- ?????http://www.springframework.org/schema/beans/spring-beans-3.0.xsd ??
- ?????http://www.springframework.org/schema/context ??
- ?????http://www.springframework.org/schema/context/spring-context-3.0.xsd ??
- ????http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans-3.0.xsd ??
- ????http://www.springframework.org/schema/jms?http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">??
- ? ??
- ????<context:component-scan?base-package="com.tiantian"?/>? ??
- ????<!--?Spring提供的JMS工具類,它可以進行消息發送、接收等?-->??
- ????<bean?id="jmsTemplate"?class="org.springframework.jms.core.JmsTemplate">??
- ????????<!--?這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象?-->??
- ????????<property?name="connectionFactory"?ref="connectionFactory"/>??
- ????</bean>??
- ???? ??
- ????<!--?真正可以產生Connection的ConnectionFactory,由對應的?JMS服務廠商提供-->??
- ????<bean?id="targetConnectionFactory"?class="org.apache.activemq.ActiveMQConnectionFactory">??
- ????????<property?name="brokerURL"?value="tcp://localhost:61616"/>??
- ????</bean>??
- ???? ??
- ????<!--?Spring用于管理真正的ConnectionFactory的ConnectionFactory?-->??
- ????<bean?id="connectionFactory"?class="org.springframework.jms.connection.SingleConnectionFactory">??
- ????????<!--?目標ConnectionFactory對應真實的可以產生JMS?Connection的ConnectionFactory?-->??
- ????????<property?name="targetConnectionFactory"?ref="targetConnectionFactory"/>??
- ????</bean>??
- ???? ??
- ????<!--這個是隊列目的地-->??
- ????<bean?id="queueDestination"?class="org.apache.activemq.command.ActiveMQQueue">??
- ????????<constructor-arg>??
- ????????????<value>queue</value>??
- ????????</constructor-arg>??
- ????</bean>??
- ????<!--這個是sessionAwareQueue目的地-->??
- ????<bean?id="sessionAwareQueue"?class="org.apache.activemq.command.ActiveMQQueue">??
- ????????<constructor-arg>??
- ????????????<value>sessionAwareQueue</value>??
- ????????</constructor-arg>??
- ????</bean>??
- ????<!--?消息監聽器?-->??
- ????<bean?id="consumerMessageListener"?class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>??
- ????<!--?可以獲取session的MessageListener?-->??
- ????<bean?id="consumerSessionAwareMessageListener"?class="com.tiantian.springintejms.listener.ConsumerSessionAwareMessageListener">??
- ????????<property?name="destination"?ref="queueDestination"/>??
- ????</bean>??
- ????<!--?消息監聽容器?-->??
- ????<bean?id="jmsContainer"????????class="org.springframework.jms.listener.DefaultMessageListenerContainer">??
- ????????<property?name="connectionFactory"?ref="connectionFactory"?/>??
- ????????<property?name="destination"?ref="queueDestination"?/>??
- ????????<property?name="messageListener"?ref="consumerMessageListener"?/>??
- ????</bean>??
- ???? ??
- ????<bean?id="sessionAwareListenerContainer"??
- ????????class="org.springframework.jms.listener.DefaultMessageListenerContainer">??
- ????????<property?name="connectionFactory"?ref="connectionFactory"?/>??
- ????????<property?name="destination"?ref="sessionAwareQueue"?/>??
- ????????<property?name="messageListener"?ref="consumerSessionAwareMessageListener"?/>??
- ????</bean>??
- </beans>??
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"> <context:component-scan base-package="com.tiantian" /> <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!--這個是隊列目的地--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>queue</value> </constructor-arg> </bean> <!--這個是sessionAwareQueue目的地--> <bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>sessionAwareQueue</value> </constructor-arg> </bean> <!-- 消息監聽器 --> <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/> <!-- 可以獲取session的MessageListener --> <bean id="consumerSessionAwareMessageListener" class="com.tiantian.springintejms.listener.ConsumerSessionAwareMessageListener"> <property name="destination" ref="queueDestination"/> </bean> <!-- 消息監聽容器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> </bean> <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="sessionAwareQueue" /> <property name="messageListener" ref="consumerSessionAwareMessageListener" /> </bean></beans>
?
?
?????? 接著我們來做一個測試,測試代碼如下:



- @RunWith(SpringJUnit4ClassRunner.class) ??
- @ContextConfiguration("/applicationContext.xml") ??
- public?class?ProducerConsumerTest?{ ??
- ? ??
- ????@Autowired??
- ????private?ProducerService?producerService; ??
- ????@Autowired??
- ????@Qualifier("sessionAwareQueue") ??
- ????private?Destination?sessionAwareQueue; ??
- ???? ??
- ????@Test??
- ????public?void?testSessionAwareMessageListener()?{ ??
- ????????producerService.sendMessage(sessionAwareQueue,?"測試SessionAwareMessageListener"); ??
- ????} ??
- ???? ??
- }??
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("/applicationContext.xml")public class ProducerConsumerTest { @Autowired private ProducerService producerService; @Autowired @Qualifier("sessionAwareQueue") private Destination sessionAwareQueue; @Test public void testSessionAwareMessageListener() { producerService.sendMessage(sessionAwareQueue, "測試SessionAwareMessageListener"); } }
?
?????? 在上述測試代碼中,我們通過前面定義好的生產者往我們定義好的SessionAwareMessageListener監聽的sessionAwareQueue發送了一個消息。程序運行之后控制臺輸出如下:
?
?
?????? 這說明我們已經成功的往sessionAwareQueue發送了一條純文本消息,消息會被ConsumerSessionAwareMessageListener的onMessage方法進行處理,在onMessage方法中ConsumerSessionAwareMessageListener就是簡單的把接收到的純文本信息的內容打印出來了,之后再往queueDestination發送了一個純文本消息,消息內容是“ConsumerSessionAwareMessageListener…”,該消息隨后就被ConsumerMessageListener處理了,根據我們的定義,在ConsumerMessageListener中也只是簡單的打印了一下接收到的消息內容。
3.1.3??MessageListenerAdapter
MessageListenerAdapter類實現了MessageListener接口和SessionAwareMessageListener接口,它的主要作用是將接收到的消息進行類型轉換,然后通過反射的形式把它交給一個普通的Java類進行處理。
?????? MessageListenerAdapter會把接收到的消息做如下轉換:
?????? TextMessage轉換為String對象;
?????? BytesMessage轉換為byte數組;
?????? MapMessage轉換為Map對象;
?????? ObjectMessage轉換為對應的Serializable對象。
?????? 既然前面說了MessageListenerAdapter會把接收到的消息做一個類型轉換,然后利用反射把它交給真正的目標處理器——一個普通的Java類進行處理(如果真正的目標處理器是一個MessageListener或者是一個SessionAwareMessageListener,那么Spring將直接使用接收到的Message對象作為參數調用它們的onMessage方法,而不會再利用反射去進行調用),那么我們在定義一個MessageListenerAdapter的時候就需要為它指定這樣一個目標類。這個目標類我們可以通過MessageListenerAdapter的構造方法參數指定,如:



- <!--?消息監聽適配器?-->??
- ????<bean?id="messageListenerAdapter"?class="org.springframework.jms.listener.adapter.MessageListenerAdapter">??
- ????????<constructor-arg>??
- ????????????<bean?class="com.tiantian.springintejms.listener.ConsumerListener"/>??
- ????????</constructor-arg>??
- ????</bean>??
<!-- 消息監聽適配器 --> <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg> <bean class="com.tiantian.springintejms.listener.ConsumerListener"/> </constructor-arg> </bean>
?
?
?????? 也可以通過它的delegate屬性來指定,如:



- <!--?消息監聽適配器?-->??
- ????<bean?id="messageListenerAdapter"?class="org.springframework.jms.listener.adapter.MessageListenerAdapter">??
- ????????<property?name="delegate">??
- ????????????<bean?class="com.tiantian.springintejms.listener.ConsumerListener"/>??
- ????????</property>??
- ????????<property?name="defaultListenerMethod"?value="receiveMessage"/>??
- ????</bean>??
<!-- 消息監聽適配器 --> <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate"> <bean class="com.tiantian.springintejms.listener.ConsumerListener"/> </property> <property name="defaultListenerMethod" value="receiveMessage"/> </bean>
?
?
?????? 前面說了如果我們指定的這個目標處理器是一個MessageListener或者是一個SessionAwareMessageListener的時候Spring將直接利用接收到的Message對象作為方法參數調用它們的onMessage方法。但是如果指定的目標處理器是一個普通的Java類時Spring將利用Message進行了類型轉換之后的對象作為參數通過反射去調用真正的目標處理器的處理方法,那么Spring是如何知道該調用哪個方法呢?這是通過MessageListenerAdapter的defaultListenerMethod屬性來決定的,當我們沒有指定該屬性時,Spring會默認調用目標處理器的handleMessage方法。
?????? 接下來我們來看一個示例,假設我們有一個普通的Java類ConsumerListener,其對應有兩個方法,handleMessage和receiveMessage,其代碼如下:



- package?com.tiantian.springintejms.listener; ??
- ? ??
- public?class?ConsumerListener?{ ??
- ? ??
- ????public?void?handleMessage(String?message)?{ ??
- ????????System.out.println("ConsumerListener通過handleMessage接收到一個純文本消息,消息內容是:"?+?message); ??
- ????} ??
- ???? ??
- ????public?void?receiveMessage(String?message)?{ ??
- ????????System.out.println("ConsumerListener通過receiveMessage接收到一個純文本消息,消息內容是:"?+?message); ??
- ????} ??
- ???? ??
- }??
package com.tiantian.springintejms.listener; public class ConsumerListener { public void handleMessage(String message) { System.out.println("ConsumerListener通過handleMessage接收到一個純文本消息,消息內容是:" + message); } public void receiveMessage(String message) { System.out.println("ConsumerListener通過receiveMessage接收到一個純文本消息,消息內容是:" + message); } }
?
?????? 假設我們要把它作為一個消息監聽器來監聽發送到adapterQueue的消息,這個時候我們就可以定義一個對應的MessageListenerAdapter來把它當做一個MessageListener使用。



- <!--?消息監聽適配器?-->??
- <bean?id="messageListenerAdapter"?class="org.springframework.jms.listener.adapter.MessageListenerAdapter">??
- ????<property?name="delegate">??
- ????????<bean?class="com.tiantian.springintejms.listener.ConsumerListener"/>??
- ????</property>??
- ????<property?name="defaultListenerMethod"?value="receiveMessage"/>??
- </bean>??
<!-- 消息監聽適配器 --> <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate"> <bean class="com.tiantian.springintejms.listener.ConsumerListener"/> </property> <property name="defaultListenerMethod" value="receiveMessage"/> </bean>
?
?????? 當然,有了MessageListener之后我們還需要配置其對應的MessageListenerContainer,這里配置如下:



- <!--?消息監聽適配器對應的監聽容器?-->??
- <bean?id="messageListenerAdapterContainer"?class="org.springframework.jms.listener.DefaultMessageListenerContainer">??
- ????<property?name="connectionFactory"?ref="connectionFactory"/>??
- ????<property?name="destination"?ref="adapterQueue"/>??
- ????<property?name="messageListener"?ref="messageListenerAdapter"/><!--?使用MessageListenerAdapter來作為消息監聽器?-->??
- </bean>??
<!-- 消息監聽適配器對應的監聽容器 --> <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="adapterQueue"/> <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter來作為消息監聽器 --> </bean>
?
?????? 在上面的MessageListenerAdapter中我們指定了其defaultListenerMethod屬性的值為receiveMessage,所以當MessageListenerAdapter接收到消息之后會自動的調用我們指定的ConsumerListener的receiveMessage方法。
?????? 針對于上述代碼我們定義測試代碼如下:



- package?com.tiantian.springintejms.test; ??
- ? ??
- import?javax.jms.Destination; ??
- ? ??
- import?org.junit.Test; ??
- import?org.junit.runner.RunWith; ??
- import?org.springframework.beans.factory.annotation.Autowired; ??
- import?org.springframework.beans.factory.annotation.Qualifier; ??
- import?org.springframework.test.context.ContextConfiguration; ??
- import?org.springframework.test.context.junit4.SpringJUnit4ClassRunner; ??
- ? ??
- import?com.tiantian.springintejms.service.ProducerService; ??
- ? ??
- @RunWith(SpringJUnit4ClassRunner.class) ??
- @ContextConfiguration("/applicationContext.xml") ??
- public?class?ProducerConsumerTest?{ ??
- ??
- ????@Autowired??
- ????@Qualifier("adapterQueue") ??
- ????private?Destination?adapterQueue; ??
- ??
- ????@Test??
- ????public?void?testMessageListenerAdapter()?{ ??
- ????????producerService.sendMessage(adapterQueue,?"測試MessageListenerAdapter"); ??
- ????} ??
- ???? ??
- }??
package com.tiantian.springintejms.test; import javax.jms.Destination; import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.tiantian.springintejms.service.ProducerService; @RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("/applicationContext.xml")public class ProducerConsumerTest { @Autowired @Qualifier("adapterQueue") private Destination adapterQueue; @Test public void testMessageListenerAdapter() { producerService.sendMessage(adapterQueue, "測試MessageListenerAdapter"); } }
?
?????? 這時候我們會看到控制臺輸出如下:
????????如果我們指定MessageListenerAdapter的defaultListenerMethod屬性,那么在運行上述代碼時控制臺會輸出如下結果:
????????MessageListenerAdapter除了會自動的把一個普通Java類當做MessageListener來處理接收到的消息之外,其另外一個主要的功能是可以自動的發送返回消息。
???? 當我們用于處理接收到的消息的方法的返回值不為空的時候,Spring會自動將它封裝為一個JMS Message,然后自動進行回復。那么這個時候這個回復消息將發送到哪里呢?這主要有兩種方式可以指定。
???????第一,可以通過發送的Message的setJMSReplyTo方法指定該消息對應的回復消息的目的地。這里我們把我們的生產者發送消息的代碼做一下修改,在發送消息之前先指定該消息對應的回復目的地為一個叫responseQueue的隊列目的地,具體代碼如下所示:



- package?com.tiantian.springintejms.service.impl; ??
- ? ??
- import?javax.jms.Destination; ??
- import?javax.jms.JMSException; ??
- import?javax.jms.Message; ??
- import?javax.jms.Session; ??
- import?javax.jms.TextMessage; ??
- ? ??
- import?org.springframework.beans.factory.annotation.Autowired; ??
- import?org.springframework.beans.factory.annotation.Qualifier; ??
- import?org.springframework.jms.core.JmsTemplate; ??
- import?org.springframework.jms.core.MessageCreator; ??
- import?org.springframework.stereotype.Component; ??
- ? ??
- import?com.tiantian.springintejms.service.ProducerService; ??
- ? ??
- @Component??
- public?class?ProducerServiceImpl?implements?ProducerService?{? ??
- ??
- ????@Autowired??
- ????private?JmsTemplate?jmsTemplate; ??
- ??
- ????@Autowired??
- ????@Qualifier("responseQueue") ??
- ????private?Destination?responseDestination; ??
- ???? ??
- ????public?void?sendMessage(Destination?destination,?final?String?message)?{ ??
- ????????System.out.println("---------------生產者發送消息-----------------"); ??
- ????????System.out.println("---------------生產者發了一個消息:"?+?message); ??
- ????????jmsTemplate.send(destination,?new?MessageCreator()?{ ??
- ????????????public?Message?createMessage(Session?session)?throws?JMSException?{ ??
- ????????????????TextMessage?textMessage?=?session.createTextMessage(message); ??
- ????????????????textMessage.setJMSReplyTo(responseDestination); ??
- ????????????????return?textMessage; ??
- ????????????} ??
- ????????}); ??
- ????} ??
- ? ??
- }??
package com.tiantian.springintejms.service.impl; import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import javax.jms.TextMessage; import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.stereotype.Component; import com.tiantian.springintejms.service.ProducerService; @Componentpublic class ProducerServiceImpl implements ProducerService { @Autowired private JmsTemplate jmsTemplate; @Autowired @Qualifier("responseQueue") private Destination responseDestination; public void sendMessage(Destination destination, final String message) { System.out.println("---------------生產者發送消息-----------------"); System.out.println("---------------生產者發了一個消息:" + message); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(message); textMessage.setJMSReplyTo(responseDestination); return textMessage; } }); } }
?
?????? 接著定義一個叫responseQueue的隊列目的地及其對應的消息監聽器和監聽容器。



- <!--?用于測試消息回復的?-->??
- <bean?id="responseQueue"?class="org.apache.activemq.command.ActiveMQQueue">??
- ????<constructor-arg>??
- ????????<value>responseQueue</value>??
- ????</constructor-arg>??
- </bean>??
- ??
- <!--?responseQueue對應的監聽器?-->??
- <bean?id="responseQueueListener"?class="com.tiantian.springintejms.listener.ResponseQueueListener"/>??
- ??
- <!--?responseQueue對應的監聽容器?-->??
- <bean?id="responseQueueMessageListenerContainer"?class="org.springframework.jms.listener.DefaultMessageListenerContainer">??
- ????<property?name="connectionFactory"?ref="connectionFactory"/>??
- ????<property?name="destination"?ref="responseQueue"/>??
- ????<property?name="messageListener"?ref="responseQueueListener"/>??
- </bean>??
<!-- 用于測試消息回復的 --> <bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>responseQueue</value> </constructor-arg> </bean> <!-- responseQueue對應的監聽器 --> <bean id="responseQueueListener" class="com.tiantian.springintejms.listener.ResponseQueueListener"/> <!-- responseQueue對應的監聽容器 --> <bean id="responseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="responseQueue"/> <property name="messageListener" ref="responseQueueListener"/> </bean>
??
?????? ResponseQueueListener的定義如下所示:



- public?class?ResponseQueueListener?implements?MessageListener?{ ??
- ? ??
- ????public?void?onMessage(Message?message)?{ ??
- ????????if?(message?instanceof?TextMessage)?{ ??
- ????????????TextMessage?textMessage?=?(TextMessage)?message; ??
- ????????????try?{ ??
- ????????????????System.out.println("接收到發送到responseQueue的一個文本消息,內容是:"?+?textMessage.getText()); ??
- ????????????}?catch?(JMSException?e)?{ ??
- ????????????????e.printStackTrace(); ??
- ????????????} ??
- ????????} ??
- ????} ??
- ? ??
- }??
public class ResponseQueueListener implements MessageListener { public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收到發送到responseQueue的一個文本消息,內容是:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }
?
?????? 接著我們運行我們的測試代碼,利用生產者往我們定義好的MessageListenerAdapter負責處理的adapterQueue目的地發送一個消息。測試代碼如下所示:



- @RunWith(SpringJUnit4ClassRunner.class) ??
- @ContextConfiguration("/applicationContext.xml") ??
- public?class?ProducerConsumerTest?{ ??
- ? ??
- ????@Autowired??
- ????private?ProducerService?producerService; ??
- ??
- ????@Qualifier("adapterQueue") ??
- ????@Autowired??
- ????private?Destination?adapterQueue;??? ??
- ??
- ????@Test??
- ????public?void?testMessageListenerAdapter()?{ ??
- ????????producerService.sendMessage(adapterQueue,?"測試MessageListenerAdapter"); ??
- ????} ??
- ???? ??
- }??
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("/applicationContext.xml")public class ProducerConsumerTest { @Autowired private ProducerService producerService; @Qualifier("adapterQueue") @Autowired private Destination adapterQueue; @Test public void testMessageListenerAdapter() { producerService.sendMessage(adapterQueue, "測試MessageListenerAdapter"); } }
?
?????? 運行上述測試代碼之后,控制臺輸出如下:
?
????????這說明我們的生產者發送消息被MessageListenerAdapter處理之后,MessageListenerAdapter確實把監聽器的返回內容封裝成一個Message往原Message通過setJMSReplyTo方法指定的回復目的地發送了一個消息。對于MessageListenerAdapter對應的監聽器處理方法返回的是一個null值或者返回類型是void的情況,MessageListenerAdapter是不會自動進行消息的回復的,有興趣的網友可以自己測試一下。
?????? 第二,通過MessageListenerAdapter的defaultResponseDestination屬性來指定。這里我們也來做一個測試,首先維持生產者發送消息的代碼不變,即發送消息前不通過Message的setJMSReplyTo方法指定消息的回復目的地;接著我們在定義MessageListenerAdapter的時候通過其defaultResponseDestination屬性指定其默認的回復目的地是“defaultResponseQueue”,并定義defaultResponseQueue對應的消息監聽器和消息監聽容器。



- <!--?消息監聽適配器?-->??
- <bean?id="messageListenerAdapter"?class="org.springframework.jms.listener.adapter.MessageListenerAdapter">??
- ????<!--?<constructor-arg>??
- ????????<bean?class="com.tiantian.springintejms.listener.ConsumerListener"/>??
- ????</constructor-arg>?-->??
- ????<property?name="delegate">??
- ????????<bean?class="com.tiantian.springintejms.listener.ConsumerListener"/>??
- ????</property>??
- ????<property?name="defaultListenerMethod"?value="receiveMessage"/>??
- ????<property?name="defaultResponseDestination"?ref="defaultResponseQueue"/>??
- </bean>??
- ??
- <!--?消息監聽適配器對應的監聽容器?-->??
- <bean?id="messageListenerAdapterContainer"?class="org.springframework.jms.listener.DefaultMessageListenerContainer">??
- ????<property?name="connectionFactory"?ref="connectionFactory"/>??
- ????<property?name="destination"?ref="adapterQueue"/>??
- ????<property?name="messageListener"?ref="messageListenerAdapter"/><!--?使用MessageListenerAdapter來作為消息監聽器?-->??
- </bean>??
- ??
- !--?默認的消息回復隊列?-->??
- <bean?id="defaultResponseQueue"?class="org.apache.activemq.command.ActiveMQQueue">??
- ????<constructor-arg>??
- ????????<value>defaultResponseQueue</value>??
- ????</constructor-arg>??
- </bean>??
- ??
- <!--?defaultResponseQueue對應的監聽器?-->??
- <bean?id="defaultResponseQueueListener"?class="com.tiantian.springintejms.listener.DefaultResponseQueueListener"/>??
- ??
- <!--?defaultResponseQueue對應的監聽容器?-->??
- <bean?id="defaultResponseQueueMessageListenerContainer"?class="org.springframework.jms.listener.DefaultMessageListenerContainer">??
- ????<property?name="connectionFactory"?ref="connectionFactory"/>??
- ????<property?name="destination"?ref="defaultResponseQueue"/>??
- ????<property?name="messageListener"?ref="defaultResponseQueueListener"/>??
- </bean>??
<!-- 消息監聽適配器 --> <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <!-- <constructor-arg> <bean class="com.tiantian.springintejms.listener.ConsumerListener"/> </constructor-arg> --> <property name="delegate"> <bean class="com.tiantian.springintejms.listener.ConsumerListener"/> </property> <property name="defaultListenerMethod" value="receiveMessage"/> <property name="defaultResponseDestination" ref="defaultResponseQueue"/> </bean> <!-- 消息監聽適配器對應的監聽容器 --> <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="adapterQueue"/> <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter來作為消息監聽器 --> </bean><!-- 默認的消息回復隊列 --> <bean id="defaultResponseQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>defaultResponseQueue</value> </constructor-arg> </bean> <!-- defaultResponseQueue對應的監聽器 --> <bean id="defaultResponseQueueListener" class="com.tiantian.springintejms.listener.DefaultResponseQueueListener"/> <!-- defaultResponseQueue對應的監聽容器 --> <bean id="defaultResponseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="defaultResponseQueue"/> <property name="messageListener" ref="defaultResponseQueueListener"/> </bean>
?
?????? DefaultResponseQueueListener的代碼如下所示:



- package?com.tiantian.springintejms.listener; ??
- ? ??
- import?javax.jms.JMSException; ??
- import?javax.jms.Message; ??
- import?javax.jms.MessageListener; ??
- import?javax.jms.TextMessage; ??
- ? ??
- public?class?DefaultResponseQueueListener?implements?MessageListener?{ ??
- ? ??
- ????public?void?onMessage(Message?message)?{ ??
- ????????if?(message?instanceof?TextMessage)?{ ??
- ????????????TextMessage?textMessage?=?(TextMessage)?message; ??
- ????????????try?{ ??
- ????????????????System.out.println("DefaultResponseQueueListener接收到發送到defaultResponseQueue的一個文本消息,內容是:"?+?textMessage.getText()); ??
- ????????????}?catch?(JMSException?e)?{ ??
- ????????????????e.printStackTrace(); ??
- ????????????} ??
- ????????} ??
- ????} ??
- ? ??
- }??
package com.tiantian.springintejms.listener; import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage; public class DefaultResponseQueueListener implements MessageListener { public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("DefaultResponseQueueListener接收到發送到defaultResponseQueue的一個文本消息,內容是:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }
?
?????? 這時候運行如下測試代碼:



- @Test??
- public?void?testMessageListenerAdapter()?{ ??
- ????producerService.sendMessage(adapterQueue,?"測試MessageListenerAdapter"); ??
- }??
@Test public void testMessageListenerAdapter() { producerService.sendMessage(adapterQueue, "測試MessageListenerAdapter"); }
?
?????? 控制臺將輸出如下內容:
?
????????這說明MessageListenerAdapter會自動把真正的消息處理器返回的非空內容封裝成一個Message發送回復消息到通過defaultResponseDestination屬性指定的默認消息回復目的地。
?????? 既然我們可以通過兩種方式來指定MessageListenerAdapter自動發送回復消息的目的地,那么當我們兩種方式都指定了而且它們的目的地還不一樣的時候會怎么發送呢?是兩個都發還是只發其中的一個呢?關于這部分的測試我這里就不贅述了,有興趣的網友可以自己進行。這里我可以直接的告訴大家,當兩種方式都指定了消息的回復目的地的時候使用發送消息的setJMSReplyTo方法指定的目的地將具有較高的優先級,MessageListenerAdapter將只往該方法指定的消息回復目的地發送回復消息。