概述
介紹
JMS(Java Message Service)即 Java 消息服務應用程序接口,是一個 Java 平臺中關于面向消息中間件(MOM)的 API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java 消息服務是一個與具體平臺無關的 API,絕大多數 MOM 提供商都對JMS 提供支持。
簡短來說,JMS 是一種與廠商無關的 API,是 sun 公司為了統一廠商的接口規范,而定義出的一組api接口,用來訪問消息收發系統消息。它類似于 JDBC(Java Database Connectivity),提供了應用程序之間異步通信的功能。
JMS 體系結構
- JMS 提供者(JMS 的實現者,比如 activemq、jbossmq、tonglinkmq 等)
- JMS 客戶(使用提供者發送消息的程序或對象,例如在 12306 中,負責發送一條購票消息到處理隊列中,用來解決購票高峰問題,那么,發送消息到隊列的程序和從隊列獲取消息的程序都叫做客戶)
- JMS 生產者(producer、sender):負責創建并發送消息的客戶
- JMS 消費者(customer、listener):負責接收并處理消息的客戶
- JMS 消息(message):在 JMS 客戶之間傳遞數據的對象
- JMS 隊列(queue):一個容納那些被發送的等待閱讀的消息的區域
- JMS 主題(topic):一種支持發送消息給多個訂閱者的機制
JMS 對象模型
- 連接工廠(connectionFactory)客戶端使用 JNDI 查找連接工廠,然后利用連接工廠創建一個 JMS 連接
- JMS 連接:表示 JMS 客戶端和服務器端之間的一個活動的連接,是由客戶端通過調用連接工廠的方法建立的
- JMS 會話:session 標識 JMS 客戶端和服務端的會話狀態。會話建立在 JMS 連接上,標識客戶與服務器之間的一個會話進程。
- JMS 目的(Destination): 又稱為消息隊列,是實際的消息源
- 生產者和消費者
- 消息類型:分為隊列類型(優先先進先出)以及訂閱類型
消息監聽器
MessageListener
MessageListener 是最原始的消息監聽器,它是 JMS 規范中定義的一個接口。其中定義了一個用于處理接收到的消息的 onMessage() 方法,該方法只接收一個 Message 參數。
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;public class ConsumerMessageListener implements MessageListener {public void onMessage(Message message) {// 若生產者發送的是一個純文本消息,可以直接進行強制轉換,或者直接把onMessage方法的參數改成Message的子類TextMessageTextMessage textMsg = (TextMessage) message;System.out.println("接收到一個純文本消息。");try {System.out.println("消息內容是:" + textMsg.getText());} catch (JMSException e) {e.printStackTrace();}}
}
SessionAwareMessageListener
SessionAwareMessageListener 是 Spring 提供的,它不是標準的 JMS MessageListener。
MessageListener 的設計只是純粹用來接收消息的,假如在使用 MessageListener 處理接收到的消息時需要發送一個消息通知對方已經收到這個消息了,那么這個時候就需要在代碼里面去重新獲取一個 Connection 或 Session。而 SessionAwareMessageListener 的設計就是為了方便在接收到消息后發送一個回復的消息,它同樣提供了一個處理接收到的消息的 onMessage() 方法,但是這個方法可以同時接收兩個參數,一個是表示當前接收到的消息Message,另一個就是可以用來發送消息的 Session 對象。
使用 SessionAwareMessageListener 監聽器,可以在監聽并消費了消息后,不用重新獲取一個 Connection 或 Session,而是直接向原 Connection 或 Session 的某一個隊列發送消息。
代碼示例:
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.listener.SessionAwareMessageListener;public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener {private Destination destination;public void onMessage(TextMessage message, Session session) throws JMSException {System.out.println("收到一條消息");System.out.println("消息內容是:" + message.getText());MessageProducer producer = session.createProducer(destination);Message textMessage = session.createTextMessage("ConsumerSessionAwareMessageListener。。。");producer.send(textMessage);}public Destination getDestination() {returndestination;}public void setDestination(Destination destination) {this.destination = destination;}
}
說明:定義了一個 SessionAwareMessageListener,在這個 Listener 中在接收到了一個消息之后,利用對應的 Session 創建了一個到 destination 的生產者和對應的消息,然后利用創建好的生產者發送對應的消息。
MessageListenerAdapter
MessageListenerAdapter 類實現了 MessageListener 接口和 SessionAwareMessageListener 接口,它的主要作用是將接收到的消息進行類型轉換,然后通過反射的形式把它交給一個普通的 Java 類進行處理。
-
MessageListenerAdapter 會把接收到的消息做如下轉換:
- TextMessage 轉換為 String 對象
- BytesMessage 轉換為 byte 數組
- MapMessage 轉換為 Map 對象
- ObjectMessage 轉換為對應的 Serializable 對象
代碼示例:
// 目標處理器類 public class ConsumerListener { public void handleMessage(String message) { System.out.println("ConsumerListener通過handleMessage接收到一個純文本消息,消息內容是:" + message); } public void receiveMessage(String message) { System.out.println("ConsumerListener通過receiveMessage接收到一個純文本消息,消息內容是:" + message); } }
<!-- 消息監聽適配器 --> <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate"> <bean class="com.tiantian.springintejms.listener.ConsumerListener"/> </property> <property name="defaultListenerMethod" value="receiveMessage"/> </bean> <!-- 消息監聽適配器對應的監聽容器 --> <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="adapterQueue"/> <!-- 使用MessageListenerAdapter來作為消息監聽器 --> <property name="messageListener" ref="messageListenerAdapter"/> </bean>
注意:
-
MessageListenerAdapter 會把接收到的消息做一個類型轉換,然后利用反射把它交給真正的目標處理器:一個普通的 Java 類(ConsumerListener)進行處理。
如果真正的目標處理器是一個 MessageListener 或者是一個 SessionAwareMessageListener,那么 Spring 將直接使用接收到的Message 對象作為參數調用它們的 onMessage 方法,而不會再利用反射去進行調用。
故在定義一個 MessageListenerAdapter 的時候就需要為它指定這樣一個目標類。這個目標類可以通過 MessageListenerAdapter 的構造方法參數指定,也可以通過它的 delegate 屬性來指定。
-
MessageListenerAdapter 另外一個主要的功能是可以通過 MessageListenerAdapter 注入的 handleMessage 方法自動的發送返回消息。
當用于處理接收到的消息的方法(默認是 handleMessage)的返回值不為空(null或者void)的時候,Spring 會自動將它封裝為一個 JMS Message,然后自動進行回復。這個回復消息將發送到的地址主要有兩種方式可以指定:
- 可以通過發送的 Message 的 setJMSReplyTo 方法指定該消息對應的回復消息的目的地
- 通過 MessageListenerAdapter 的 defaultResponseDestination 屬性來指定
基本使用
依賴
<!-- jms -->
<dependency><groupId>javax.jms</groupId><artifactId>javax.jms-api</artifactId>
</dependency>
<!-- spring jms -->
<dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId>
</dependency><!-- tonglinkMq jms api -->
<dependency><groupId>com.tongtech.tlq</groupId><artifactId>TongJMS-without-atomikos</artifactId><version>8.1.0-SNAPSHOT</version>
</dependency>
SpringBoot 集成 jms
jms 配置類
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.tongtech.tmqi.ConnectionFactory;@EnableJms // 聲明對 JMS 注解的支持
@Configuration
public class TestCreator {private String host;private Integer port;private String queueManager;private String channel;private String username;private String password;private int ccsid;private String queueName;private long receiveTimeout;// 配置連接工廠(tonglinkMq)@Beanpublic ConnectionFactory connectionFactory() throws JMSException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setProperty("tmqiAddressList", "tlq://127.0.0.1:10024");connectionFactory.setProperty("tmqiDefaultUsername", "admin");connectionFactory.setProperty("tmqiDefaultPassword", "123456");return connectionFactory;}// 配置緩存連接工廠 不配置該類則每次與MQ交互都需要重新創建連接,大幅降低速度。@Bean@Primarypublic CachingConnectionFactory cachingConnectionFactory(ConnectionFactory connectionFactory) {CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();cachingConnectionFactory.setTargetConnectionFactory(connectionFactory);cachingConnectionFactory.setSessionCacheSize(500);cachingConnectionFactory.setReconnectOnException(true);return cachingConnectionFactory;}// 配置DefaultJmsListenerContainerFactory, 用@JmsListener注解來監聽隊列消息時,尤其存在多個監聽的時候,通過實例化配置DefaultJmsListenerContainerFactory來控制消息分發@Bean(name = "jmsQueueListenerCF")public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(cachingConnectionFactory);// 設置連接數。如果對消息消費有順序要求,這里建議設置為"1-1"// 注:使用同一個監聽工廠類監聽多個隊列時,連接數需大于等于監聽隊列數factory.setConcurrency("3-10"); // 下限-上限// 重連間隔時間factory.setRecoveryInterval(1000L);// factory.setPubSubDomain(true); // 支持發布訂閱功能(topic)// factory.setConcurrency("1"); // topic 模式,并發必須設置為1,不然一條消息可能會被消費多次return factory;}// 配置JMS模板,實例化jmsTemplate后,可以在方法中通過@autowired的方式注入模板,用方法調用發送/接收消息// 注:如果只是接收消息,可以不配置此步@Beanpublic JmsTemplate jmsQueueTemplate(CachingConnectionFactory cachingConnectionFactory) {JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);jmsTemplate.setReceiveTimeout(receiveTimeout); // 設置超時時間// jmsTemplate.setPubSubDomain(true); // 開啟發布訂閱功能(topic)return jmsTemplate;}
}
發送消息
public class jmsUtil {@Autowiredprivate JmsTemplate jmsQueueTemplate;/*** 發送原始消息 Message*/public void send(){jmsQueueTemplate.send("queue1", new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {return session.createTextMessage("我是原始消息");}});}/*** 發送消息自動轉換成原始消息* 注:關于消息轉換,還可以通過實現MessageConverter接口來自定義轉換內容*/public void convertAndSend(){jmsQueueTemplate.convertAndSend("queue1", "我是自動轉換的消息");}
}
監聽接收消息
采用注解 @JmsListener
來設置監聽方法
@Slf4j
@Component
// 此處繼承MessageListenerAdapter非必需。但若只使用@JmsListener注解監聽,可能會出現監聽消息獲取不及時或者獲取不到消息的情況,加上繼承MessageListenerAdapter后便不會出現
public class MdxpMessageListener extends MessageListenerAdapter {/*** 消息隊列監聽器* destination 隊列地址,此處使用靜態變量,支持配置化詳見下文* containerFactory 監聽器容器工廠(包含配置源), 若存在2個以上的監聽容器工廠,需進行指定*/@Override@JmsListener(destination = "TEST_QUEUE",containerFactory = "jmsQueueListenerCF")public void onMessage(Message message) {// JmsListener收到消息后,會自動封裝成自己特有的數據格式,需要自行根據消息類型解析原始消息String msgText = ""; double d = 0; try { if (msg instanceof TextMessage) { msgText = ((TextMessage) msg).getText(); } else if (msg instanceof StreamMessage) { msgText = ((StreamMessage) msg).readString(); d = ((StreamMessage) msg).readDouble(); } else if (msg instanceof BytesMessage) { byte[] block = new byte[1024]; ((BytesMessage) msg).readBytes(block); msgText = String.valueOf(block); } else if (msg instanceof MapMessage) { msgText = ((MapMessage) msg).getString("name"); }log.info("接收消息={}", msgText);} catch (JMSException e) { log.error("消息接收異常!", e);}}@JmsListener(destination = "TEST_QUEUE2",containerFactory = "jmsQueueListenerCF")// @Payload是消費者接受生產者發送的隊列消息,將隊列中的json字符串變成對象的注解,注意填充類需要實現序列化接口public void messageListener2(@payload User user){log.info("message={}", user)}
}
@JmsListener 注解 destination 支持配置化
注入配置讀取類
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** 隊列名稱配置* 這里切記要@Data,或手動set和get*/
@Component
@Data
public class QueueNameConfig {@Value("${ibmmq.queue-test}")private String testQueue;}
隊列監聽類
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import javax.jms.Message;
import javax.jms.TextMessage;/*** MQ消費者*/
@Component
@Slf4j
public class ReceiveMessage extends MessageListenerAdapter {/*** destination:監聽的隊列名稱,使用SpEL表達式寫入* containerFactory:監聽的工廠類,為配置類中所配置的名字*/@Override@JmsListener(destination = "#{@queueNameConfig.testQueue}", containerFactory = "jmsListenerContainerFactory")public void onMessage(Message message) {TextMessage textMessage = (TextMessage) message; //轉換成文本消息try {String text = textMessage.getText();log.info("接收信息:{}", text);} catch (Exception e) {e.printStackTrace();}}
}
javax 原生 jms
public class jmstest {public static void main(String[] args) throws Exception { // 配置工廠ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setProperty("tmqiAddressList", "tlq://127.0.0.1:10024");connectionFactory.setProperty("tmqiDefaultUsername", "admin");connectionFactory.setProperty("tmqiDefaultPassword", "123456");// 獲取連接和會話Connection mqConn = connectionFactory.createConnection(); // 創建會話。CLIENT_ACKNOWLEDGE:手動應答,AUTO_ACKNOWLEDGE:自動應答Session mqSession = mqConn.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); // 創建隊列Queue queuemq = Session.createQueue(queueName);// 獲取消費者MessageConsumer consumer = mqSession.createConsumer(mqSession.createQueue(queueName)); // 設置監聽器consumer.setMessageListener(new MessageListener() { public void onMessage(Message msg) { // JmsListener收到消息后,會自動封裝成自己特有的數據格式,需要自行根據消息類型解析原始消息String msgText = ""; double d = 0; try { if (msg instanceof TextMessage) {msgText = ((TextMessage) msg).getText(); } else if (msg instanceof StreamMessage) { msgText = ((StreamMessage) msg).readString(); d = ((StreamMessage) msg).readDouble(); } else if (msg instanceof BytesMessage) { byte[] block = new byte[1024]; ((BytesMessage) msg).readBytes(block); msgText = String.valueOf(block); } else if (msg instanceof MapMessage) { msgText = ((MapMessage) msg).getString("name"); }log.info("接收消息={}", msgText);// 手動應答textMessage.acknowledge();} catch (JMSException e) { log.error("消息接收異常!", e);}}}); // 啟動連接mqConn.start(); }// 獲取生產者MessageProducer producer = mqSession.createProducer(mqSession.createQueue(queueName)); // topic(廣播)模式// Topic topic = Session.createTopic(queueName);// MessageProducer producer = mqSession.createProducer(topic); producer.setDeliveryMode(DeliveryMOde.NON_PERSISTENT);producer.send(mqSession.createTexttMessage("這是一條消息"));// 關閉資源producer.close();// 斷開連接connection.close();
}