java JMS技術
.1.?? 什么是JMS
???????? JMS即Java消息服務(Java Message Service)應用程序接口是一個Java平臺中關于面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。
???????? JMS是一種與廠商無關的 API,用來訪問消息收發系統消息。它類似于JDBC(Java Database Connectivity):這里,JDBC 是可以用來訪問許多不同關系數據庫的 API,而 JMS 則提供同樣與廠商無關的訪問方法,以訪問消息收發服務。許多廠商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,這只是幾個例子。 JMS 使您能夠通過消息收發服務(有時稱為消息中介程序或路由器)從一個 JMS 客戶機向另一個 JMS客戶機發送消息。消息是 JMS 中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成。消息主體則攜帶著應用程序的數據或有效負載。根據有效負載的類型來劃分,可以將消息分為幾種類型,它們分別攜帶:簡單文本(TextMessage)、可序列化的對象 (ObjectMessage)、屬性集合 (MapMessage)、字節流 (BytesMessage)、原始值流 (StreamMessage),還有無有效負載的消息 (Message)。
.2.?? JMS規范
.2.1.??? 專業技術規范
JMS(Java Messaging Service)是Java平臺上有關面向消息中間件(MOM)的技術規范,它便于消息系統中的Java應用程序進行消息交換,并且通過提供標準的產生、發送、接收消息的接口簡化企業應用的開發,翻譯為Java消息服務。
.2.2.??? 體系架構
JMS由以下元素組成。
JMS提供者provider:連接面向消息中間件的,JMS接口的一個實現。提供者可以是Java平臺的JMS實現,也可以是非Java平臺的面向消息中間件的適配器。
JMS客戶:生產或消費基于消息的Java的應用程序或對象。
JMS生產者:創建并發送消息的JMS客戶。
JMS消費者:接收消息的JMS客戶。
JMS消息:包括可以在JMS客戶之間傳遞的數據的對象
JMS隊列:一個容納那些被發送的等待閱讀的消息的區域。與隊列名字所暗示的意思不同,消息的接受順序并不一定要與消息的發送順序相同。一旦一個消息被閱讀,該消息將被從隊列中移走。
JMS主題:一種支持發送消息給多個訂閱者的機制。
.2.3.??? Java消息服務應用程序結構支持兩種模型
1、? 點對點或隊列模型
在點對點或隊列模型下,一個生產者向一個特定的隊列發布消息,一個消費者從該隊列中讀取消息。這里,生產者知道消費者的隊列,并直接將消息發送到消費者的隊列。
這種模式被概括為:
只有一個消費者將獲得消息
生產者不需要在接收者消費該消息期間處于運行狀態,接收者也同樣不需要在消息發送時處于運行狀態。
每一個成功處理的消息都由接收者簽收
2、發布者/訂閱者模型
發布者/訂閱者模型支持向一個特定的消息主題發布消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發布者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。
?
這種模式被概括為:
多個消費者可以獲得消息
在發布者和訂閱者之間存在時間依賴性。發布者需要建立一個訂閱(subscription),以便客戶能夠訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連接時發布的消息將在訂閱者重新連接時重新發布。
?
1.下載ActiveMQ
去官方網站下載:http://activemq.apache.org/
2.運行ActiveMQ
解壓縮apache-activemq-5.5.1-bin.zip,
修改配置文件activeMQ.xml,將0.0.0.0修改為localhost
默認的activeMQ.xml文件如下:
修改后:
<transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61616"/><transportConnector name="ssl" uri="ssl://localhost:61617"/><transportConnector name="stomp" uri="stomp://localhost:61613"/><transportConnector uri="http://localhost:8081"/><transportConnector uri="udp://localhost:61618"/> </transportConnectors>
然后雙擊apache-activemq-5.5.1\bin\activemq.bat運行ActiveMQ程序。
訪問的時候如果需要用戶名和密碼 都是admin admin...
啟動topic的相關的生產者和消費者:
生產者代碼:
ProducerTest.java
import java.util.Random;import javax.jms.JMSException; public class ProducerTest { /** * @param args */ public static void main(String[] args) throws JMSException, Exception { ProducerTool producer = new ProducerTool(); Random random = new Random();for(int i=0;i<20;i++){Thread.sleep(random.nextInt(10)*1000);producer.produceMessage("Hello, world!--"+i); producer.close();}} }
?ProducerTool.java
import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "mytopic"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageProducer producer = null;// 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic(subject); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); }// 發送消息 public void produceMessage(String message) throws JMSException, Exception { initialize(); TextMessage msg = session.createTextMessage(message); connection.start(); System.out.println("Producer:->Sending message: " + message); producer.send(msg); System.out.println("Producer:->Message sent complete!"); }// 關閉連接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (producer != null){producer.close(); } if (session != null){session.close(); } if (connection != null){connection.close(); } } }
消費者代碼:
ConsumerTest.java
import javax.jms.JMSException;public class ConsumerTest implements Runnable {static Thread t1 = null;/*** @param args* @throws InterruptedException* @throws InterruptedException* @throws JMSException* @throws InterruptedException*/public static void main(String[] args) throws InterruptedException {t1 = new Thread(new ConsumerTest());t1.setDaemon(false);t1.start();/*** 如果發生異常,則重啟consumer*//*while (true) {System.out.println(t1.isAlive());if (!t1.isAlive()) {t1 = new Thread(new ConsumerTest());t1.start();System.out.println("重新啟動");}Thread.sleep(5000);}*/// 延時500毫秒之后停止接受消息// Thread.sleep(500);// consumer.close(); }public void run() {try {ConsumerTool consumer = new ConsumerTool();consumer.consumeMessage();while (ConsumerTool.isconnection) { }} catch (Exception e) {}} }
ConsumerTool.java
import javax.jms.Connection; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.MessageListener; import javax.jms.Message; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /*** 消費者的模板 * @author ABC**/ public class ConsumerTool implements MessageListener,ExceptionListener { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url =ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "mytopic"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; public static Boolean isconnection=false;// 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic(subject); consumer = session.createConsumer(destination); } // 消費消息 public void consumeMessage() throws JMSException, Exception { initialize(); connection.start();consumer.setMessageListener(this); //注冊一個消息監聽器,有消息就執行onMessage()方法connection.setExceptionListener(this);//注冊一個異常監聽器,有異常就執行onException()方法isconnection=true;System.out.println("Consumer:->Begin listening..."); // 開始監聽 // Message message = consumer.receive(); }// 關閉連接 public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); }// 消息處理函數 public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); System.out.println("Consumer:->Received: " + msg); } else { System.out.println("Consumer:->Received: " + message); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }public void onException(JMSException arg0) {isconnection=false;//出現異常把isconnection設置成false } }
只啟動ProducerTest.java
如果這個時候把ActiveMq 關閉再開啟....重新訪問
?
之前的主題 mytopic產生的數據就沒有了.....
ActiveMq默認是沒有做持久化的,如果是Kafka只要是發過去的消息,都會一直存在,也可以設置一個過期的時間.到了期限,那些消息也是可以清除掉.否則就會一直都在.
ActiveMq一般是用在JavaEE中的....Kafka是用在大數據領域的.
再運行生產者的模板代碼: ConsumerTest.java
生產者生產的數據:
再運行生產者的模板代碼: ConsumerTest.java
生產者生產的數據:
消費者消費到數據:
?
看WEBUI
?
其他常用的JMS實現
要使用Java消息服務,你必須要有一個JMS提供者,管理會話和隊列。既有開源的提供者也有專有的提供者。
開源的提供者包括:
Apache ActiveMQ
JBoss 社區所研發的 HornetQ
Joram
Coridan的MantaRay
The OpenJMS Group的OpenJMS
專有的提供者包括:
BEA的BEA WebLogic Server JMS
TIBCO Software的EMS
GigaSpaces Technologies的GigaSpaces
Softwired 2006的iBus
IONA Technologies的IONA JMS
SeeBeyond的IQManager(2005年8月被Sun Microsystems并購)
webMethods的JMS+ -
my-channels的Nirvana
Sonic Software的SonicMQ
SwiftMQ的SwiftMQ
IBM的WebSphere MQ
?========================================================
附關于ActiveMq處理queue的模板代碼:
ProducerTest.java
import java.util.Random;import javax.jms.JMSException; public class ProducerTest { /** * @param args * @throws Exception * @throws JMSException */ public static void main(String[] args) throws JMSException, Exception{ ProducerTool producer = new ProducerTool();Random random = new Random();for(int i=0;i<20;i++){Thread.sleep(random.nextInt(10)*1000);producer.produceMessage("Hello, world!--"+i); producer.close();}} }
ProducerTool.java
import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "myqueue"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageProducer producer = null;// 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); }// 發送消息 public void produceMessage(String message) throws JMSException, Exception { initialize(); TextMessage msg = session.createTextMessage(message); connection.start(); System.out.println("Producer:->Sending message: " + message); producer.send(msg); System.out.println("Producer:->Message sent complete!"); }// 關閉連接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (producer != null){producer.close(); } if (session != null){session.close(); } if (connection != null){connection.close(); } } }
CustomerTest.java
public class ConsumerTest implements Runnable {static Thread t1 = null;public static void main(String[] args) throws InterruptedException {t1 = new Thread(new ConsumerTest());t1.start(); // while (true) { // System.out.println(t1.isAlive()); // if (!t1.isAlive()) { // t1 = new Thread(new ConsumerTest()); // t1.start(); // System.out.println("重新啟動"); // } // Thread.sleep(5000); // }// 延時500毫秒之后停止接受消息// Thread.sleep(500);// consumer.close(); }public void run() {try {ConsumerTool consumer = new ConsumerTool();consumer.consumeMessage();while (ConsumerTool.isconnection) { //System.out.println(123); }} catch (Exception e) {}} }
CustomerTool.java
import javax.jms.Connection; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.MessageListener; import javax.jms.Message; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class ConsumerTool implements MessageListener,ExceptionListener {private String user = ActiveMQConnection.DEFAULT_USER;private String password = ActiveMQConnection.DEFAULT_PASSWORD;private String url = ActiveMQConnection.DEFAULT_BROKER_URL;private String subject = "myqueue";private Destination destination = null;private Connection connection = null;private Session session = null;private MessageConsumer consumer = null;private ActiveMQConnectionFactory connectionFactory=null;public static Boolean isconnection=false;// 初始化private void initialize() throws JMSException {connectionFactory= new ActiveMQConnectionFactory(user, password, url);connection = connectionFactory.createConnection();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);destination = session.createQueue(subject);consumer = session.createConsumer(destination);}// 消費消息public void consumeMessage() throws JMSException {initialize();connection.start();consumer.setMessageListener(this);connection.setExceptionListener(this);System.out.println("Consumer:->Begin listening...");isconnection=true;// 開始監聽Message message = consumer.receive();System.out.println(message.getJMSMessageID());}// 關閉連接public void close() throws JMSException {System.out.println("Consumer:->Closing connection");if (consumer != null){consumer.close();}if (session != null){session.close();}if (connection != null){connection.close();}}// 消息處理函數public void onMessage(Message message) {try {if (message instanceof TextMessage) {TextMessage txtMsg = (TextMessage) message;String msg = txtMsg.getText();System.out.println("Consumer:->Received: " + msg);} else {System.out.println("Consumer:->Received: " + message);}} catch (JMSException e) {e.printStackTrace();}}public void onException(JMSException arg0){isconnection=false;} }
?