JMS(Java消息服務)與消息隊列ActiveMQ基本使用(一)

最近的項目中用到了mq,之前自己一直在碼農一樣的照葫蘆畫瓢。最近幾天研究了下,把自己所有看下來的文檔和了解總結一下。

一. 認識JMS

1.概述

對于JMS,百度百科,是這樣介紹的:JMS即Java消息服務(Java Message Service)應用程序接口是一個Java平臺中關于面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。

簡短來說,JMS是一種與廠商無關的 API,用來訪問消息收發系統消息。它類似于JDBC(Java Database Connectivity),提供了應用程序之間異步通信的功能。

JMS1.0是jsr 194里規定的規范(關于jsr規范,請點擊)。目前最新的規范是JSR 343,JMS2.0。

好了,說了這么多,其實只是在說,JMS只是sun公司為了統一廠商的接口規范,而定義出的一組api接口。

2. JMS體系結構

描述如下:

  • JMS提供者(JMS的實現者,比如activemq jbossmq等)
  • JMS客戶(使用提供者發送消息的程序或對象,例如在12306中,負責發送一條購票消息到處理隊列中,用來解決購票高峰問題,那么,發送消息到隊列的程序和從隊列獲取消息的程序都叫做客戶)
  • JMS生產者,JMS消費者(生產者及負責創建并發送消息的客戶,消費者是負責接收并處理消息的客戶)
  • JMS消息(在JMS客戶之間傳遞數據的對象)
  • JMS隊列(一個容納那些被發送的等待閱讀的消息的區域)
  • JMS主題(一種支持發送消息給多個訂閱者的機制)

3. JMS對象模型

  • 連接工廠(connectionfactory)客戶端使用JNDI查找連接工廠,然后利用連接工廠創建一個JMS連接。
  • JMS連接 表示JMS客戶端和服務器端之間的一個活動的連接,是由客戶端通過調用連接工廠的方法建立的。
  • JMS會話 session 標識JMS客戶端和服務端的會話狀態。會話建立在JMS連接上,標識客戶與服務器之間的一個會話進程。
  • JMS目的 Destinatio 又稱為消息隊列,是實際的消息源
  • 生產者和消費者
  • 消息類型,分為隊列類型(優先先進先出)以及訂閱類型

二. ActiveMQ

1. ActiveMQ的安裝

  1. 從官網下載安裝包,http://activemq.apache.org/download.html
  2. 賦予運行權限 chmod +x,windows可以忽略此步
  3. 運行 ./active start | stop

啟動后,activeMQ會占用兩個端口,一個是負責接收發送消息的tcp端口:61616,一個是基于web負責用戶界面化管理的端口:8161。這兩個端口可以在conf下面的xml中找到。http服務器使用了jettry。
這里有個問題是啟動mq后,很長時間管理界面才可以顯示出來。

2. 用Java訪問ActiveMQ

先附上Bean代碼:

public class MqBean implements Serializable{ private Integer age; private String name; public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } }

2.1 隊列消息的發送:

public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageProducer producer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { connection = connectionFactory.createConnection(); connection.start(); //第一個參數是是否是事務型消息,設置為true,第二個參數無效 //第二個參數是 //Session.AUTO_ACKNOWLEDGE為自動確認,客戶端發送和接收消息不需要做額外的工作。異常也會確認消息,應該是在執行之前確認的 //Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到消息后,必須調用javax.jms.Message的acknowledge方法。jms服務器才會刪除消息。可以在失敗的 //時候不確認消息,不確認的話不會移出隊列,一直存在,下次啟動繼續接受。接收消息的連接不斷開,其他的消費者也不會接受(正常情況下隊列模式不存在其他消費者) //DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;而且允許重復確認。在需要考慮資源使用時,這種模式非常有效。 //待測試 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createQueue("test-queue"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //優先級不能影響先進先出。。。那這個用處究竟是什么呢呢呢呢 MqBean bean = new MqBean(); bean.setAge(13); for(int i=0;i<100;i++){ bean.setName("小黃"+i); producer.send(session.createObjectMessage(bean)); } producer.close(); System.out.println("呵呵"); } catch (JMSException e) { e.printStackTrace(); } }

注:在上面的代碼中,確認模式有三種,里面的DUPS_OK_ACKNOWLEDGE和AUTO_ACKNOWLEDGE一直沒明白有什么區別。因為無法測試。不過大概也明白了一些。其實主要是MQ處理消息的流程決定的:

  1. 消息從生成方客戶端傳送到消息服務器。
  2. 消息服務器讀取消息。
  3. 消息被放置到持久性存儲器當中(出于可靠性的考慮)。
  4. 消息服務器確認收到消息(出于可靠性的考慮)。
  5. 消息服務器確定消息的路由。
  6. 消息服務器寫出消息。
  7. 消息從消息服務器傳送到使用方客戶端。
  8. 使用方客戶端確認收到消息(出于可靠性的考慮)。
  9. 消息服務器處理客戶端確認(出于可靠性的考慮)。
  10. 消息服務器確定已經處理客戶端確認。

這些步驟是連續的,所以任何步驟都可能成為消息從生成方客戶端到使用方客戶端的傳送過程的瓶頸。這些步驟中的大多數都取決于消息傳送系統的物理特征:網絡帶寬、計算機處理速度和消息服務器體系結構等等。但是,有一些步驟還取決于消息傳送應用程序的特征和該應用程序要求的可靠性級別。
其實就是基于可靠性還是性能的選擇.

2.2 隊列消息的接收:

public static void main(String[] args) { ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的連接 Connection connection = null; // Session: 一個發送或接收消息的線程 Session session; // Destination :消息的目的地;消息發送給誰. Destination destination; // 消費者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { // 構造從工廠得到連接對象 connection = connectionFactory.createConnection(); // 啟動 connection.start(); // 獲取操作連接 //這個最好還是有事務 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 destination = session.createQueue("test-queue"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { MqBean bean = (MqBean) ((ObjectMessage)message).getObject(); System.out.println(bean); if (null != message) { System.out.println("收到消息" + bean.getName()); } } catch (Exception e) { // TODO: handle exception } } }); } catch (Exception e) { e.printStackTrace(); } }

注:對于隊列來說,比較簡單的優化策略,應該就是隊列分載了。由于每個消費者都是單線程的,所以可以設置多個消費者來提高速度。
大家可以復制個消費者自己測試下,在消費者中添加sleep測試下效果。

2.3 訂閱消息的發送

public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageProducer producer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createTopic("test-topic"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //優先級不能影響先進先出。。。那這個用處究竟是什么呢呢呢呢 MqBean bean = new MqBean(); bean.setAge(13); for(int i=0;i<100;i++){ Thread.sleep(1000); bean.setName("小黃"+i); producer.send(session.createObjectMessage(bean)); } producer.close(); System.out.println("呵呵"); } catch (Exception e) { e.printStackTrace(); } }

2.4 訂閱消息的接收

public static void main(String[] args) { ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的連接 Connection connection = null; // Session: 一個發送或接收消息的線程 Session session; // Destination :消息的目的地;消息發送給誰. Destination destination; // 消費者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { // 構造從工廠得到連接對象 connection = connectionFactory.createConnection(); // 啟動 connection.start(); // 獲取操作連接 //這個最好還是有事務 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 destination = session.createQueue("test-queue"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { MqBean bean = (MqBean) ((ObjectMessage)message).getObject(); System.out.println(bean); if (null != message) { System.out.println("收到消息" + bean.getName()); } } catch (Exception e) { // TODO: handle exception } } }); } catch (Exception e) { e.printStackTrace(); } }

以上的消息發送后,如果沒有接收到,可以登錄自己的MQ管理頁面:http://192.168.3.159:8161/admin/?,默認帳號密碼都是admin,查看隊列中的消息

enter image description here

Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。可以理解為總接收數-總出隊列數
Messages Enqueued 進入隊列的消息 進入隊列的總數量,包括出隊列的。 這個數量只增不減
Messages Dequeued 出了隊列的消息 可以理解為是消費這消費掉的數量

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/539225.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/539225.shtml
英文地址,請注明出處:http://en.pswp.cn/news/539225.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

python單詞反轉_python文本 字符串逐字符反轉以及逐單詞反轉

python文本 字符串逐字符反轉以及逐單詞反轉 場景&#xff1a; 字符串逐字符反轉以及逐單詞反轉 首先來看字符串逐字符反轉&#xff0c;由于python提供了非常有用的切片&#xff0c;所以只需要一句就可以搞定了 >>> aabc edf degd >>> a[::-1] dged fde cba …

hive復合數據類型之struct

概述 STRUCT&#xff1a;STRUCT可以包含不同數據類型的元素。這些元素可以通過”點語法”的方式來得到所需要的元素&#xff0c;比如user是一個STRUCT類型&#xff0c;那么可以通過user.address得到這個用戶的地址。 操作實例 1、創建表 create table student_test(id int,in…

pycharm 運行celery_Celery全面學習筆記

來源介紹Celery 是 Distributed Task Queue&#xff0c;分布式任務隊列。分布式決定了可以有多個 worker 的存在&#xff0c;隊列表示其是異步操作。Celery 核心模塊Celery有一下5個核心角色Task就是任務&#xff0c;有異步任務和定時任務Broker中間人&#xff0c;接收生產者發…

hive復合數據類型之array

概述 ARRAY&#xff1a;ARRAY類型是由一系列相同數據類型的元素組成&#xff0c;這些元素可以通過下標來訪問。比如有一個ARRAY類型的變量fruits&#xff0c;它是由[apple,orange,mango]組成&#xff0c;那么我們可以通過fruits[1]來訪問元素orange&#xff0c;因為ARRAY類型的…

Exploit開發系列教程-Mona 2 SEH

P3nro5e 2015/07/10 10:580x00 Mona 2 前言 & 準備Mona 2是一種非常有用的插件&#xff0c;它由Corelan Team開發。起初是為Immunity Debugger寫的&#xff0c;現在它適用于WinDbg調試器。你將需要為WinDbg x86 和 WinDbg x64安裝一些工具&#xff1a;安裝Python 2.7 (從這…

python集合的元素可以是_Python集合的元素中,為什么不可以是包含嵌套列表的元組?...

你有一個誤解&#xff0c;hash算法針對的是元素的內容&#xff0c;并不是針對指針&#xff0c;所以指針不變不等于可hash。 如果你想深究細節的話&#xff0c;可以看tuple的源碼&#xff1a; static Py_hash_t tuplehash(PyTupleObject *v) { Py_uhash_t x; /* Unsigned for de…

python lib庫_python_lib基礎庫

1&#xff1a;argv傳遞給python腳本的命令行參數列表&#xff0c;argv[0]是腳本的名字(他是平臺獨立的&#xff0c;不管他是一個路徑全名或不是)&#xff0c;如果使用了-c參數選項&#xff0c;argv[0]會被設置為字符串-c&#xff0c;如果沒有腳本名傳遞給python解釋器&#xff…

hive復合數據類型之map

概述 MAP&#xff1a;MAP包含key->value鍵值對&#xff0c;可以通過key來訪問元素。比如”userlist”是一個map類型&#xff0c;其中username是key&#xff0c;password是value&#xff1b;那么我們可以通過userlist[username]來得到這個用戶對應的password&#xff1b; 操…

Beego框架使用

為什么80%的碼農都做不了架構師&#xff1f;>>> Beego Web項目目錄結構 new 命令是新建一個 Web 項目&#xff0c;我們在命令行下執行 bee new <項目名> 就可以創建一個新的項目。但是注意該命令必須在 $GOPATH/src 下執行。最后會在 $GOPATH/src 相應目錄下…

oracle下lag和lead分析函數

Lag和Lead分析函數可以在同一次查詢中取出同一字段的前N行的數據(Lag)和后N行的數據(Lead)作為獨立的列。 這種操作可以代替表的自聯接&#xff0c;并且LAG和LEAD有更高的效率。 語法&#xff1a; [sql] view plaincopy /*語法*/ lag(exp_str,offset,defval) over() Lead(…

802d簡明調試手冊_SINUMERIK-828D簡明調試手冊.pdf

SINUMERIK 828D / 828D BASIC簡明調試手冊SINUMERIKAnswers for industry. SIEMENSABC01.2012 ASINUMERIK 828D / 828D BASIC V04.04SP01123PLC 45NC 67PLC 891011121314151617PLC 18i1 11.1 11.1.1 NC 31.1.2 31.2

jtessboxeditorfx 界面顯示不出來_macOS 使用 XQuartz 支持 X11 實現 Linux 圖形化界面顯示...

更多奇技淫巧歡迎訂閱博客&#xff1a;https://fuckcloudnative.io前言在 Windows 中相信大家已經很熟悉使用 Xmanager(Xshell), MobaXterm, SecureCRT 通過 X11 實現 Linux 圖形化界面顯示&#xff0c;我的需求是在 macOS 下使用 iTerm2 作為 Terminal 實現 X11 圖形化界面顯示…

EntityFramework Core 2.0 Explicitly Compiled Query(顯式編譯查詢)

前言 EntityFramework Core 2.0引入了顯式編譯查詢&#xff0c;在查詢數據時預先編譯好LINQ查詢便于在請求數據時能夠立即響應。顯式編譯查詢提供了高可用場景&#xff0c;通過使用顯式編譯的查詢可以提高查詢性能。EF Core已經使用查詢表達式的散列來表示自動編譯和緩存查詢&a…

Oracle Minus關鍵字 不包含 取差集

Oracle Minus關鍵字   SQL中的MINUS關鍵字   SQL中有一個MINUS關鍵字&#xff0c;它運用在兩個SQL語句上&#xff0c;它先找出第一條SQL語句所產生的結果&#xff0c;然后看這些結果有沒有在第二個SQL語句的結果 中。如果有的話&#xff0c;那這一筆記錄就被去除&#xff0…

python掃描器甄別操作系統類型_20189317 《網絡攻防技術》 第三周作業

一.教材內容總結1.網絡踩點&#xff1a;web搜索與挖掘、DNS和IP查詢、網絡拓撲偵察(1)網絡踩點目標確定(2)技術手段&#xff1a;web信息搜索與挖掘、DNS和IP查詢、網絡拓撲偵察(3)web信息搜索與挖掘&#xff1a;基本搜索與挖掘技巧、高級搜索與挖掘技巧、編程實現google搜索、元…

python 網頁重定向_小試牛刀:python爬蟲爬取springer開放電子書.

首先聲明,本文旨在記錄反思,并沒有資源,代碼也不具有借鑒意義(水平實在不行.某天,水群的時候發現群友發了一個文件,里面是疫情時期springer開放的免費電子書名單,同時還附有下載鏈接,總共有400多本,這要是一個一個下載不得累死個人,只下載自己感興趣的書也是一個好主意,但是,我…

直面桌面云帶來的現狀優勢

在桌面云解決方案里&#xff0c;首先&#xff0c;所有的數據以及運算都在服務器端進行&#xff0c;客戶端只是顯示其變化的影像而已&#xff0c;所以在不需要擔心客戶端來非法竊取資料&#xff0c;我們在電影里面看到的商業間諜拿著 U 盤瘋狂的拷貝公司商業機密的情況再也不會出…

ORA-28001: the password has expired解決方法

Oracle提示錯誤消息ORA-28001: the password has expired&#xff0c;是由于Oracle11G的新特性所致&#xff0c; Oracle11G創建用戶時缺省密碼過期限制是180天&#xff08;即6個月&#xff09;&#xff0c; 如果超過180天用戶密碼未做修改則該用戶無法登錄。 Oracle公司是為了數…

.net 導出excel_Qt編寫的項目作品18-數據導出到Excel及Pdf和打印數據

一、功能特點原創導出數據機制&#xff0c;不依賴任何office組件或者操作系統等第三方庫&#xff0c;尤其是支持嵌入式linux。10萬行數據9個字段只需要2秒鐘完成。只需要四個步驟即可開始急速導出大量數據到Excel。同時提供直接寫入數據接口和多線程寫入數據接口&#xff0c;不…

hive數據庫定義

默認數據庫"default" 可以顯式切換數據庫&#xff1a;hive> use 數據庫名; 創建 hive>CREATE DATABASE [IF NOT EXISTS] mydb [LOCATION] /....... [COMMENT] ....; 實例 hive (default)> create database test_db comment test database; OK Ti…