ActiveMq使用筆記

java JMS技術

.1.?? 什么是JMS

???????? JMS即Java消息服務(Java Message Service)應用程序接口是一個Java平臺中關于面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。

???????? JMS是一種與廠商無關的 API,用來訪問消息收發系統消息。它類似于JDBC(Java Database Connectivity):這里,JDBC 是可以用來訪問許多不同關系數據庫的 API,而 JMS 則提供同樣與廠商無關的訪問方法,以訪問消息收發服務。許多廠商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,這只是幾個例子。 JMS 使您能夠通過消息收發服務(有時稱為消息中介程序或路由器)從一個 JMS 客戶機向另一個 JMS客戶機發送消息。消息是 JMS 中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成。消息主體則攜帶著應用程序的數據或有效負載。根據有效負載的類型來劃分,可以將消息分為幾種類型,它們分別攜帶:簡單文本(TextMessage)、可序列化的對象 (ObjectMessage)、屬性集合 (MapMessage)、字節流 (BytesMessage)、原始值流 (StreamMessage),還有無有效負載的消息 (Message)。

.2.?? JMS規范

.2.1.??? 專業技術規范

JMS(Java Messaging Service)是Java平臺上有關面向消息中間件(MOM)的技術規范,它便于消息系統中的Java應用程序進行消息交換,并且通過提供標準的產生、發送、接收消息的接口簡化企業應用的開發,翻譯為Java消息服務。

.2.2.??? 體系架構

JMS由以下元素組成。

JMS提供者provider:連接面向消息中間件的,JMS接口的一個實現。提供者可以是Java平臺的JMS實現,也可以是非Java平臺的面向消息中間件的適配器。

JMS客戶:生產或消費基于消息的Java的應用程序或對象。

JMS生產者:創建并發送消息的JMS客戶。

JMS消費者:接收消息的JMS客戶。

JMS消息:包括可以在JMS客戶之間傳遞的數據的對象

JMS隊列:一個容納那些被發送的等待閱讀的消息的區域。與隊列名字所暗示的意思不同,消息的接受順序并不一定要與消息的發送順序相同。一旦一個消息被閱讀,該消息將被從隊列中移走。

JMS主題:一種支持發送消息給多個訂閱者的機制。

.2.3.??? Java消息服務應用程序結構支持兩種模型

1、? 點對點或隊列模型

在點對點或隊列模型下,一個生產者向一個特定的隊列發布消息,一個消費者從該隊列中讀取消息。這里,生產者知道消費者的隊列,并直接將消息發送到消費者的隊列。

這種模式被概括為:

只有一個消費者將獲得消息

生產者不需要在接收者消費該消息期間處于運行狀態,接收者也同樣不需要在消息發送時處于運行狀態。

每一個成功處理的消息都由接收者簽收

2、發布者/訂閱者模型

發布者/訂閱者模型支持向一個特定的消息主題發布消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發布者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。

?

這種模式被概括為:

多個消費者可以獲得消息

在發布者和訂閱者之間存在時間依賴性。發布者需要建立一個訂閱(subscription),以便客戶能夠訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連接時發布的消息將在訂閱者重新連接時重新發布。

?

1.下載ActiveMQ

去官方網站下載:http://activemq.apache.org/

2.運行ActiveMQ

解壓縮apache-activemq-5.5.1-bin.zip,

修改配置文件activeMQ.xml,將0.0.0.0修改為localhost

默認的activeMQ.xml文件如下:

修改后:

<transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61616"/><transportConnector name="ssl"     uri="ssl://localhost:61617"/><transportConnector name="stomp"   uri="stomp://localhost:61613"/><transportConnector uri="http://localhost:8081"/><transportConnector uri="udp://localhost:61618"/>
</transportConnectors>

然后雙擊apache-activemq-5.5.1\bin\activemq.bat運行ActiveMQ程序。

訪問的時候如果需要用戶名和密碼 都是admin admin...

啟動topic的相關的生產者和消費者:

生產者代碼:

ProducerTest.java

import java.util.Random;import javax.jms.JMSException;      public class ProducerTest {      /**    * @param args    */     public static void main(String[] args) throws JMSException, Exception {      ProducerTool producer = new ProducerTool(); Random random = new Random();for(int i=0;i<20;i++){Thread.sleep(random.nextInt(10)*1000);producer.produceMessage("Hello, world!--"+i);      producer.close();}}      
}      

?ProducerTool.java

import javax.jms.Connection;      
import javax.jms.DeliveryMode;      
import javax.jms.Destination;      
import javax.jms.JMSException;      
import javax.jms.MessageProducer;      
import javax.jms.Session;      
import javax.jms.TextMessage;      import org.apache.activemq.ActiveMQConnection;      
import org.apache.activemq.ActiveMQConnectionFactory;      public class ProducerTool {        private String user = ActiveMQConnection.DEFAULT_USER;         private String password = ActiveMQConnection.DEFAULT_PASSWORD;       private String url = ActiveMQConnection.DEFAULT_BROKER_URL;       private String subject = "mytopic";      private Destination destination = null;      private Connection connection = null;      private Session session = null;      private MessageProducer producer = null;// 初始化      private void initialize() throws JMSException, Exception {      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(      user, password, url);      connection = connectionFactory.createConnection();      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      destination = session.createTopic(subject);      producer = session.createProducer(destination);      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      }// 發送消息      public void produceMessage(String message) throws JMSException, Exception {      initialize();      TextMessage msg = session.createTextMessage(message);      connection.start();      System.out.println("Producer:->Sending message: " + message);      producer.send(msg);      System.out.println("Producer:->Message sent complete!");      }// 關閉連接      public void close() throws JMSException {      System.out.println("Producer:->Closing connection");      if (producer != null){producer.close();      }      if (session != null){session.close();      }      if (connection != null){connection.close();      }      }      
} 

消費者代碼:

ConsumerTest.java

import javax.jms.JMSException;public class ConsumerTest implements Runnable {static Thread t1 = null;/*** @param args* @throws InterruptedException* @throws InterruptedException* @throws JMSException* @throws InterruptedException*/public static void main(String[] args) throws InterruptedException {t1 = new Thread(new ConsumerTest());t1.setDaemon(false);t1.start();/*** 如果發生異常,則重啟consumer*//*while (true) {System.out.println(t1.isAlive());if (!t1.isAlive()) {t1 = new Thread(new ConsumerTest());t1.start();System.out.println("重新啟動");}Thread.sleep(5000);}*/// 延時500毫秒之后停止接受消息// Thread.sleep(500);// consumer.close();
    }public void run() {try {ConsumerTool consumer = new ConsumerTool();consumer.consumeMessage();while (ConsumerTool.isconnection) {    }} catch (Exception e) {}}
}

ConsumerTool.java

import javax.jms.Connection;      
import javax.jms.Destination;      
import javax.jms.ExceptionListener;
import javax.jms.JMSException;      
import javax.jms.MessageConsumer;      
import javax.jms.Session;      
import javax.jms.MessageListener;      
import javax.jms.Message;      
import javax.jms.TextMessage;      import org.apache.activemq.ActiveMQConnection;      
import org.apache.activemq.ActiveMQConnectionFactory;      
/*** 消費者的模板     * @author ABC**/
public class ConsumerTool implements MessageListener,ExceptionListener {      private String user = ActiveMQConnection.DEFAULT_USER;      private String password = ActiveMQConnection.DEFAULT_PASSWORD;      private String url =ActiveMQConnection.DEFAULT_BROKER_URL;      private String subject = "mytopic";      private Destination destination = null;      private Connection connection = null;      private Session session = null;      private MessageConsumer consumer = null;  public static Boolean isconnection=false;// 初始化      private void initialize() throws JMSException, Exception {      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(      user, password, url);      connection = connectionFactory.createConnection();      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      destination = session.createTopic(subject);      consumer = session.createConsumer(destination);     }      // 消費消息      public void consumeMessage() throws JMSException, Exception {      initialize();      connection.start();consumer.setMessageListener(this);    //注冊一個消息監聽器,有消息就執行onMessage()方法connection.setExceptionListener(this);//注冊一個異常監聽器,有異常就執行onException()方法isconnection=true;System.out.println("Consumer:->Begin listening...");      // 開始監聽  // Message message = consumer.receive();      
    }// 關閉連接      public void close() throws JMSException {      System.out.println("Consumer:->Closing connection");      if (consumer != null)      consumer.close();      if (session != null)      session.close();      if (connection != null)      connection.close();      }// 消息處理函數      public void onMessage(Message message) {      try {      if (message instanceof TextMessage) {      TextMessage txtMsg = (TextMessage) message;      String msg = txtMsg.getText();      System.out.println("Consumer:->Received: " + msg);      } else {      System.out.println("Consumer:->Received: " + message);      }      } catch (JMSException e) {      // TODO Auto-generated catch block      
            e.printStackTrace();      }      }public void onException(JMSException arg0) {isconnection=false;//出現異常把isconnection設置成false
    }      
} 

只啟動ProducerTest.java

如果這個時候把ActiveMq 關閉再開啟....重新訪問

?

之前的主題 mytopic產生的數據就沒有了.....

ActiveMq默認是沒有做持久化的,如果是Kafka只要是發過去的消息,都會一直存在,也可以設置一個過期的時間.到了期限,那些消息也是可以清除掉.否則就會一直都在.

ActiveMq一般是用在JavaEE中的....Kafka是用在大數據領域的.

再運行生產者的模板代碼: ConsumerTest.java

生產者生產的數據:

再運行生產者的模板代碼: ConsumerTest.java

生產者生產的數據:

消費者消費到數據:

?

看WEBUI

?

其他常用的JMS實現

要使用Java消息服務,你必須要有一個JMS提供者,管理會話和隊列。既有開源的提供者也有專有的提供者。

開源的提供者包括:

Apache ActiveMQ

JBoss 社區所研發的 HornetQ

Joram

Coridan的MantaRay

The OpenJMS Group的OpenJMS

專有的提供者包括:

BEA的BEA WebLogic Server JMS

TIBCO Software的EMS

GigaSpaces Technologies的GigaSpaces

Softwired 2006的iBus

IONA Technologies的IONA JMS

SeeBeyond的IQManager(2005年8月被Sun Microsystems并購)

webMethods的JMS+ -

my-channels的Nirvana

Sonic Software的SonicMQ

SwiftMQ的SwiftMQ

IBM的WebSphere MQ

?========================================================

附關于ActiveMq處理queue的模板代碼:

ProducerTest.java

import java.util.Random;import javax.jms.JMSException;      public class ProducerTest {      /**    * @param args    * @throws Exception * @throws JMSException */     public static void main(String[] args) throws JMSException, Exception{      ProducerTool producer = new ProducerTool();Random random = new Random();for(int i=0;i<20;i++){Thread.sleep(random.nextInt(10)*1000);producer.produceMessage("Hello, world!--"+i);      producer.close();}}      
}

ProducerTool.java

import javax.jms.Connection;      
import javax.jms.DeliveryMode;      
import javax.jms.Destination;      
import javax.jms.JMSException;      
import javax.jms.MessageProducer;      
import javax.jms.Session;      
import javax.jms.TextMessage;      import org.apache.activemq.ActiveMQConnection;      
import org.apache.activemq.ActiveMQConnectionFactory;      public class ProducerTool {        private String user = ActiveMQConnection.DEFAULT_USER;         private String password = ActiveMQConnection.DEFAULT_PASSWORD;       private String url = ActiveMQConnection.DEFAULT_BROKER_URL;       private String subject = "myqueue";      private Destination destination = null;      private Connection connection = null;      private Session session = null;      private MessageProducer producer = null;// 初始化   private void initialize() throws JMSException, Exception {      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(      user, password, url);      connection = connectionFactory.createConnection();      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      destination = session.createQueue(subject);      producer = session.createProducer(destination);      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      }// 發送消息 public void produceMessage(String message) throws JMSException, Exception {      initialize();      TextMessage msg = session.createTextMessage(message);      connection.start();      System.out.println("Producer:->Sending message: " + message);      producer.send(msg);      System.out.println("Producer:->Message sent complete!");      }// 關閉連接      public void close() throws JMSException {      System.out.println("Producer:->Closing connection");      if (producer != null){producer.close();      }      if (session != null){session.close();      }      if (connection != null){connection.close();      }      }      
}    

CustomerTest.java

public class ConsumerTest implements Runnable {static Thread t1 = null;public static void main(String[] args) throws InterruptedException {t1 = new Thread(new ConsumerTest());t1.start();
//        while (true) {
//            System.out.println(t1.isAlive());
//            if (!t1.isAlive()) {
//                t1 = new Thread(new ConsumerTest());
//                t1.start();
//                System.out.println("重新啟動");
//            }
//            Thread.sleep(5000);
//        }// 延時500毫秒之后停止接受消息// Thread.sleep(500);// consumer.close();
    }public void run() {try {ConsumerTool consumer = new ConsumerTool();consumer.consumeMessage();while (ConsumerTool.isconnection) {    //System.out.println(123);
            }} catch (Exception e) {}}
}

CustomerTool.java

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class ConsumerTool implements MessageListener,ExceptionListener {private String user = ActiveMQConnection.DEFAULT_USER;private String password = ActiveMQConnection.DEFAULT_PASSWORD;private String url = ActiveMQConnection.DEFAULT_BROKER_URL;private String subject = "myqueue";private Destination destination = null;private Connection connection = null;private Session session = null;private MessageConsumer consumer = null;private ActiveMQConnectionFactory connectionFactory=null;public static Boolean isconnection=false;// 初始化private void initialize() throws JMSException {connectionFactory= new ActiveMQConnectionFactory(user, password, url);connection = connectionFactory.createConnection();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);destination = session.createQueue(subject);consumer = session.createConsumer(destination);}// 消費消息public void consumeMessage() throws JMSException {initialize();connection.start();consumer.setMessageListener(this);connection.setExceptionListener(this);System.out.println("Consumer:->Begin listening...");isconnection=true;// 開始監聽Message message = consumer.receive();System.out.println(message.getJMSMessageID());}// 關閉連接public void close() throws JMSException {System.out.println("Consumer:->Closing connection");if (consumer != null){consumer.close();}if (session != null){session.close();}if (connection != null){connection.close();}}// 消息處理函數public void onMessage(Message message) {try {if (message instanceof TextMessage) {TextMessage txtMsg = (TextMessage) message;String msg = txtMsg.getText();System.out.println("Consumer:->Received: " + msg);} else {System.out.println("Consumer:->Received: " + message);}} catch (JMSException e) {e.printStackTrace();}}public void onException(JMSException arg0){isconnection=false;}
}

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/456369.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/456369.shtml
英文地址,請注明出處:http://en.pswp.cn/news/456369.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

導入Anaconda中的第三方庫運行時報錯:ImportError: Missing required dependencies ['pandas']

今天碰到一個大坑&#xff0c;花了大半天才搞明白問題出在哪來。事情的經過是這樣的&#xff1a;博主下午手賤把已將裝好anaconda2給卸載了&#xff08;同時裝了2和3&#xff09;&#xff0c;然后再次安裝anconda2后&#xff0c;卻發現配置完pycharm的解釋器后&#xff0c;代碼…

BZOJ2005 NOI2010 能量采集 歐拉函數

題意&#xff1a;求$\sum\limits_{i 1}^N {\sum\limits_{j 1}^M {f(i,j)} } $&#xff0c;其中f(i,j)(0,0)與(i,j)連線上點的數量 題解&#xff1a; 如果一個點(x,y)在(0,0)與(x,y)的連線上&#xff0c;則有gcd(x,y)gcd(x,y)。因此f(i,j)(gcd(i,j)gcd(i,j))且i<i,j<j的…

通過__tablename__ = 'xxx' #定義表名

from datetime import datetimefrom exts import dbclass User(db.Model):__tablename__ user1 #定義表名id db.Column(db.Integer,primary_keyTrue,autoincrementTrue)username db.Column(db.String(10), nullableTrue)password db.Column(db.String(256), nullableTrue)p…

python子類繼承父類特性,pycharm上面已經提示繼承了,為什么會報沒有該特性的錯誤?

因為在子類里覆蓋了父類的__init__ 如果需要調用父類用super class A(object):def __init__(self):self.a 1def fun(self):print self.aclass B(A):def __init__(self):self.b 2super(B, self).__init__()def fun(self):print self.aprint self.bB().fun()

Hadoop偽分布安裝詳解(一)

注&#xff1a;以下截圖針對Ubuntu操作系統&#xff0c;對Centos步驟類似。請讀者選擇不同鏡像即可。 第一部分&#xff1a;VMware WorkStation10 安裝 1.安裝好VMware10虛擬機軟件并下載好Ubuntu16.04 LTS 64位版的鏡像包 2.打開VMware10虛擬機軟件&#xff0c;選擇“創建新的…

C++_const常成員作用

介紹 常成員是什么 1.常成員關鍵詞為&#xff1a;const 2.常成員有&#xff1a;常成員變量、常成員函數、常成員對象 常成員有什么用 1.常成員變量&#xff1a;用于在程序中定義不可修改內部成員變量的函數 2.常成員函數&#xff1a;只能夠訪問成員變量&#xff0c;不可以修改成…

Python開發中收集的一些常用功能Demo

文章目錄目錄&#xff1a;前言&#xff1a;1、Python判斷文件是否存在的幾種方法&#xff1a;1.1、使用os模塊1.2、使用Try語句&#xff08;比較嚴謹的寫法&#xff09;1.3、使用pathlib模塊2、Python中寫入List到文本中并換行的方法3、Python按行讀取文件的幾種簡單實現方法3.…

Unlicensed ARC session – terminating!

問題描述 近日&#xff0c;發現ArcGIS10.4中存在很多bug&#xff0c;而且費了好多時間去測試它&#xff0c;最終決定改用10.1。在降級程序時遇到許可問題。 重裝ArcGIS10.1后&#xff0c;打開工程&#xff0c;所有引用都自動映射&#xff0c;沒報任何錯誤&#xff0c;清理重新生…

SQLAlchemy - Column詳解

SQLAlchemy - Column詳解 Column常用參數&#xff1a; default&#xff1a;默認值 nullable&#xff1a;是否可有 primary_key&#xff1a;是否為主鍵 unique&#xff1a;是否唯一 autoincrement&#xff1a;是否自動增長 onupdate&#xff1a;更新的時候執行的函數 name&…

Linux命令三劍客:grep、sed、awk總結

文章目錄前言一、grep命令語法實例grep結合pattern正則二、sed命令語法案例三、awk命令語法實例前言 最近看到了幾篇關于linux命令grep、sed、awk的文章&#xff0c;這里總結下&#xff0c;方便后面使用。 一、grep grep命令&#xff08;grep的全稱&#xff1a;Global searc…

python 機器學習資料

!(7 Steps to Mastering Machine Learning With Python) [http://www.kdnuggets.com/2015/11/seven-steps-machine-learning-python.html] 轉載于:https://www.cnblogs.com/zk47/p/6448506.html

Flask-SQLAlchemy 中如何不區分大小寫查詢?

例如下面的 User 模型&#xff0c;在數據庫中查詢時并不會區分大小寫 class User(db.Model):__tablename__ usersid db.Column(db.Integer, primary_keyTrue)username db.Column(db.String(64), uniqueTrue, indexTrue)password_hash db.Column(db.String(128)) 這時&…

Git常用指令及功能總結

文章目錄前言&#xff1a;1、常用的git指令2、常用git功能及操作2.1、下載代碼&#xff1a;2.2、當前分支和master保持一致2.3、修改代碼后提交代碼到指定分支2.4、版本回退&#xff08;時空穿梭機&#xff09;2.5、概念工作區和暫存區2.6、添加遠程庫2.7、分支管理2.8、標簽管…

MacOS下MySQL配置

先去官網下載一個 MySQL for mac http://www.cnblogs.com/xiaobo-Linux/ 命令行運行終端&#xff0c;運行下面兩條命令&#xff1a; 12alias mysql/usr/local/mysql/bin/mysqlalias mysqladmin/usr/local/mysql/bin/mysqladmin方便終端直接輸入mysql命令&#xff0c;而不是必須…

HashMap為什么在多線程下會讓cpu100%

首先HashMap并不是sun公司多線程提供的集合&#xff0c;很多時候我們的程序是一個主線程&#xff0c;用了hashmap并沒有什么問題&#xff0c;但是在多線程下會出現問題。 hashmap是一個哈希表&#xff0c;存儲的數據結構也可以是一個線性數組&#xff0c;我們的存儲的數據都在e…

flask中關于endpoint端點、url_map映射、view_func視圖函數,view_functions、及視圖函數名是否何以相同的問題?

視圖函數中關于url_map視圖的映射&#xff1a;應該是[ url->methonds->endpoint ] 而整個請求的過程&#xff0c;是先通過url地址映射到端點endpoint&#xff0c;然后通過endpoint找到試圖函數view_func&#xff08;擴展:在Flask類里邊有一個view_funtions的屬性&…

SparkSQL-從0到1認識Catalyst

文章目錄前言正文預備知識&#xff0d;Tree&RuleCatalyst工作流程ParserAnalyzerOptimizerSparkSQL執行計劃前言 這篇文章是轉載一位大神的文章&#xff0c;為什么要轉載的&#xff0c;實在是因為寫的太經典了&#xff0c;所以忍不住希望能有更多的人可以看到。后續還會轉…

為什么程序員一定要加班?

摘要&#xff1a; 一提到程序員&#xff0c;大多數人的印象大概就是死宅、無趣、沒有私人生活&#xff0c;除了上班寫寫寫代碼&#xff0c;加班寫代碼更是標配。似乎在深夜頂著雞窩頭&#xff0c;目光呆滯&#xff0c;面無表情敲鍵盤的場景才是一個程序員的真實寫照。 當然&…

javascript 反斜杠\

通常&#xff0c;我們在動態給定一個div的innerHTML時&#xff0c;通常是樣做的&#xff1a; <div id"demo1" /> <SCRIPT> var demo document.getElementById("demo1"); var str "<h1>" "<a hrefjavascript:; ο…

SQLAlchemy 中的 Session、sessionmaker、scoped_session

SQLAlchemy 中的 Session、sessionmaker、scoped_session 目錄 一、關于 Session 1. Session是緩存嗎&#xff1f;2. Session作用&#xff1a;3. Session生命周期&#xff1a;4. Session什么時候創建&#xff0c;提交&#xff0c;關閉&#xff1f;4. 獲取一個Session&#xf…