JMS
?
一、?概述與介紹
ActiveMQ 是Apache出品,最流行的、功能強大的即時通訊和集成模式的開源服務器。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現。提供客戶端支持跨語言和協議,帶有易于在充分支持JMS 1.1和1.4使用J2EE企業集成模式和許多先進的功能。
?
二、?特性
1、 多種語言和協議編寫客戶端。語言: Java、C、C++、C#、Ruby、Perl、Python、PHP。應用協議:OpenWire、Stomp REST、WS Notification、XMPP、AMQP
2、完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)
3、對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性
4、通過了常見J2EE服務器(如 Geronimo、JBoss 4、GlassFish、WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器上
5、支持多種傳送協議:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA
6、支持通過JDBC和journal提供高速的消息持久化
7、從設計上保證了高性能的集群,客戶端-服務器,點對點
8、支持Ajax
9、支持與Axis的整合
10、可以很容易得調用內嵌JMS provider,進行測試
三、?安裝
開發環境:
System:Windows
JDK:1.7+(如果MQ版本低于5.8是需要1.7以上的)
IDE:eclipse
apache ActiveMQ 5.8
1、 下載ActiveMQ,下載地址:http://www.apache.org/dyn/closer.cgi?path=/activemq/apache-activemq/5.8.0/apache-activemq-5.8.0-bin.zip
2、 解壓apache-activemq-5.8.0.zip即可完成ActiveMQ的安裝
3、 解壓后目錄結構如下
+bin (windows下面的bat和unix/linux下面的sh) 啟動ActiveMQ的啟動服務就在這里
+conf (activeMQ配置目錄,包含最基本的activeMQ配置文件)
+data (默認是空的)
+docs (index,replease版本里面沒有文檔)
+example (幾個例子)
+lib (activeMQ使用到的lib)
+webapps (系統管理員控制臺代碼)
+webapps-demo(系統示例代碼)
-activemq-all-5.8.0.jar (ActiveMQ的binary)
-user-guide.html (部署指引)
-LICENSE.txt
-NOTICE.txt
-README.txt
其他文件就不相信介紹了,搞Java的應該都知道干什么用的。
你可以進入bin目錄,使用activemq.bat雙擊啟動(windows用戶可以選擇系統位數,如果你是linux的話,就用命令行的發送去啟動),如果一切順利,你就會看見類似下面的信息:
如果你看到這個,那么恭喜你成功了。如果你啟動看到了異常信息:
Caused by: java.io.IOException: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600 due to: java.net.SocketException: Unrecognized Windows Sockets error: 0: JVM_Bind
?
報錯1、那么我告訴你,很不幸,你的端口被占用了。接下來你大概想知道是哪個程序占用了你的端口,并kill掉該進程或服務。或者你要嘗試修改ActiveMQ的默認端口61616(ActiveMQ使用的默認端口是61616),在大多數情況下,占用61616端口的是Internet Connection Sharing (ICS) 這個Windows服務,你只需停止它就可以啟動ActiveMQ了。
報錯2、集成后運行始終報錯:ActiveMQConnectionFactory : Unsupported major.minor version 51.0
解決辦法:http://blog.csdn.net/u012891504/article/details/52302131
報錯3、啟動分64位啟動和31位啟動 如果JDK是32位的但是啟動是64位 則報錯 需要版本統一?
4、 啟動成功就可以訪問管理員界面:http://localhost:8161/admin,默認用戶名和密碼admin/admin。如果你想修改用戶名和密碼的話,在conf/jetty-realm.properties中修改即可。
常用的選項:Queues點對點
Topics發布/訂閱
其中在導航菜單中,Queues是隊列方式消息。Topics是主題方式消息。Subscribers消息訂閱監控查詢。Connections可以查看鏈接數,分別可以查看xmpp、ssl、stomp、openwire、ws和網絡鏈接。Network是網絡鏈接數監控。Send可以發送消息數據。
5、 運行demo示例,在dos控制臺輸入activemq.bat xbean:../conf/activemq-demo.xml?即可啟動demo示例。官方提供的user-guide.html中的access the web console中是提示輸入:activemq.bat console xbean:conf/activemq-demo.xml,我用這種方式不成功。
當然你還可以用絕對的文件目錄方式:activemq.bat xbean:file:D:/mq/conf/activemq-demo.xml
如果提示conf/activemq-demo.xml沒有找到,你可以嘗試改變下路徑,也就是去掉上面的“..”。通過http://localhost:8161/demo/?就可以訪問示例了。
?
四、?消息示例
1、ActiviteMQ消息有3中形式
?
JMS?公共
點對點域
發布/訂閱域
ConnectionFactory
QueueConnectionFactory
TopicConnectionFactory
Connection
QueueConnection
TopicConnection
Destination
Queue
Topic
Session
QueueSession
TopicSession
MessageProducer
QueueSender
TopicPublisher
MessageConsumer
QueueReceiver
TopicSubscriber
?
(1)、點對點方式(point-to-point)
點對點的消息發送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存貯消息,Sneder 發送消息,receive接收消息.具體點就是Sender Client發送Message Queue ,而 receiver Cliernt從Queue中接收消息和"發送消息已接受"到Quere,確認消息接收。消息發送客戶端與接收客戶端沒有時間上的依賴,發送客戶端可以在任何時刻發送信息到Queue,而不需要知道接收客戶端是不是在運行
(2)、發布/訂閱 方式(publish/subscriber Messaging)
發布/訂閱方式用于多接收客戶端的方式.作為發布訂閱的方式,可能存在多個接收客戶端,并且接收端客戶端與發送客戶端存在時間上的依賴。一個接收端只能接收他創建以后發送客戶端發送的信息。作為subscriber ,在接收消息時有兩種方法,destination的receive方法,和實現message listener 接口的onMessage 方法。
2、ActiviteMQ接收和發送消息基本流程
發送消息的基本步驟:
(1)、創建連接使用的工廠類JMS ConnectionFactory
(2)、使用管理對象JMS ConnectionFactory建立連接Connection,并啟動
(3)、使用連接Connection 建立會話Session
(4)、使用會話Session和管理對象Destination創建消息生產者MessageSender
(5)、使用消息生產者MessageSender發送消息
消息接收者從JMS接受消息的步驟
(1)、創建連接使用的工廠類JMS ConnectionFactory
(2)、使用管理對象JMS ConnectionFactory建立連接Connection,并啟動
(3)、使用連接Connection 建立會話Session
(4)、使用會話Session和管理對象Destination創建消息接收者MessageReceiver
(5)、使用消息接收者MessageReceiver接受消息,需要用setMessageListener將MessageListener接口綁定到MessageReceiver消息接收者必須實現了MessageListener接口,需要定義onMessage事件方法。
五、?代碼示例
創建session第二個參數詳解
Session.AUTO_ACKNOWLEDGE。 當客戶成功的從receive方法返回的時候, 或者從MessageListener.onMessage
方法成功返回的時候,會話自動確認客戶收到的消息。
Session.CLIENT_ACKNOWLEDGE。 客戶通過消息的 acknowledge 方法確認消息。需要注意的是,在這種模
式中,確認是在會話層上進行:確認一個被消費的消息將自動確認所有已被會話消 費的消息。例如,如果一
個消息消費者消費了 10 個消息,然后確認第 5 個消息,那么所有 10 個消息都被確認。
Session.DUPS_ACKNOWLEDGE。 該選擇只是會話遲鈍第確認消息的提交。如果 JMS provider 失敗,那么可
能會導致一些重復的消息。如果是重復的消息,那么 JMS provider 必須把消息頭的 JMSRedelivered 字段設置
為 true。?
1 public class JMSProducer { 2 3 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; 4 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; 5 private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL; 6 7 public static void main(String[] args) { 8 //連接工廠 9 ConnectionFactory connectionFactory; 10 //連接 11 Connection connection; 12 //接受或者發送消息線程 13 Session session; 14 //消息目的地 15 Destination destination; 16 //消息生產者 17 MessageProducer messageProducer; 18 19 //實例化連接工廠 20 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL); 21 22 try { 23 //從工廠獲取連接 24 connection = connectionFactory.createConnection(); 25 //開啟連接 26 connection.start(); 27 //創建會話 28 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 29 //點對點方式 30 //destination = session.createQueue("testQueue"); 31 //訂閱方式 32 destination = session.createTopic("topicTest"); 33 //創建生產者 34 messageProducer = session.createProducer(destination); 35 //寫個循環 發送10條消息 36 for(int i = 0;i<10;i++){ 37 //創建消息 38 TextMessage text = session.createTextMessage("我是消息00"+i); 39 //輸出 40 System.out.println("發布:我是消息00"+i); 41 //使用生產者 提供方法 發送出去 42 messageProducer.send(text); 43 } 44 //因為 是加事務的 所以一定要提交 45 session.commit(); 46 //最后一定要將連接關閉 47 connection.close(); 48 } catch (Exception e) { 49 e.printStackTrace(); 50 } 51 52 } 53 }
消費者代碼
1 public class JMSConsumer { 2 3 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; 4 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; 5 private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL; 6 7 public static void main(String[] args) { 8 //連接工廠 9 ConnectionFactory connectionFactory; 10 //連接 11 Connection connection; 12 //會話 13 Session session; 14 //消費目的地 15 Destination destination; 16 //消費者 17 MessageConsumer consumer; 18 19 20 //實例化連接工廠 21 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL); 22 23 //從工廠獲取連接 24 try { 25 connection = connectionFactory.createConnection(); 26 //開啟連接 27 connection.start(); 28 //獲得會話 注意名字一定要一樣和生產者 29 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 30 31 //獲得消息隊列 32 destination = session.createQueue("testQueue"); 33 //訂閱者 34 //destination = session.createTopic("topicTest"); 35 36 //創建消費者 37 consumer = session.createConsumer(destination); 38 //這種接受方式一般不常用 39 while(true){ 40 TextMessage message = (TextMessage) consumer.receive(100000); 41 if(message!=null){ 42 System.out.println(message.getText()); 43 }else{ 44 break; 45 } 46 47 } 48 } catch (JMSException e) { 49 // TODO Auto-generated catch block 50 e.printStackTrace(); 51 } 52 53 } 54 }
消費者監聽代碼
1 public class JMSConsumer1 { 2 3 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; 4 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; 5 private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL; 6 7 public static void main(String[] args) { 8 //連接工廠 9 ConnectionFactory connectionFactory; 10 //連接 11 Connection connection; 12 //會話 13 Session session; 14 //消費目的地 15 Destination destination; 16 //消費者 17 MessageConsumer consumer; 18 19 20 //實例化連接工廠 21 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL); 22 23 //從工廠獲取連接 24 try { 25 connection = connectionFactory.createConnection(); 26 //開啟連接 27 connection.start(); 28 //獲得會話 注意名字一定要一樣和生產者 29 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 30 //獲得消息隊列 31 destination = session.createTopic("topicTest"); 32 //創建消費者 33 consumer = session.createConsumer(destination); 34 //設置監聽 35 consumer.setMessageListener(new Listener1()); 36 } catch (JMSException e) { 37 // TODO Auto-generated catch block 38 e.printStackTrace(); 39 } 40 41 } 42 }
監聽類
1 public class Listener1 implements MessageListener { 2 3 @Override 4 public void onMessage(Message message) { 5 try { 6 System.out.println("訂閱者"+((TextMessage)message).getText()); 7 } catch (JMSException e) { 8 // TODO Auto-generated catch block 9 e.printStackTrace(); 10 } 11 } 12 13 }
點對點方式和發布/訂閱 代碼區別 只有
//點對點方式destination = session.createQueue("testQueue");//訂閱方式destination = session.createTopic("topicTest");
?