activemq 發兩條只收到一條_淺談ActiveMQ與使用

更多大數據架構、實戰經驗,歡迎關注【大數據每日嗶嗶】,期待與你一起成長!

本文將介紹一下 ActiveMQ 的安裝、原理和簡單實戰。

一、什么是消息中間件

消息中間件顧名思義實現的就是在兩個系統或兩個客戶端之間進行消息傳送

5ce8ad8e45d23d2669fd178982e81226.png

二、什么是ActiveMQ

ActiveMQ是一種開源的基于JMS(Java Message Servie)規范的一種消息中間件的實現,ActiveMQ的設計目標是提供標準的,面向消息的,能夠跨越多語言和多系統的應用集成消息通信中間件。

三、什么時候需要用ActiveMQ

ActiveMQ常被應用與系統業務的解耦,異步消息的推送,增加系統并發量,提高用戶體驗。例如以我在工作中的使用,在比較耗時且異步的遠程開鎖操作時

49e3953af0015d4f12e166f23bc7e213.png

四、如何使用ActiveMQ

1.AcitveMQ的數據傳送流程

ec66838c8568d2088f637b62256bfd03.png

2.ActiveMQ的兩種消息傳遞類型

(1)點對點傳輸,即一個生產者對應一個消費者,生產者向broke推送數據,數據存儲在broke的一個隊列中,當消費者接受該條隊列里的數據。

(2)基于發布/訂閱模式的傳輸,即根據訂閱話題來接收相應數據,一個生產者可向多個消費者推送數據,與MQTT協議的實現是類似的,對MQTT協議有興趣的可跳轉到https://www.cnblogs.com/xiguadadage/p/11216463.html

兩種消息傳遞類型的不同,點對點傳輸消費者可以接收到在連接之前生產者所推送的數據,而基于發布/訂閱模式的傳輸方式消費者只能接收到連接之后生產者推送的數據。

3.ActiveMQ的安裝與啟動

(1)官網下載對應服務器版本

4cbfe622234ddb3c25add8846f07d5e7.png

(2)解壓后進入apache-activemq-5.15.9/bin目錄

(3)執行./activemq start啟動ActiveMQ

18af0e80684c892d33203add367ba13f.png

(4)瀏覽器輸入ActiveMQ啟動的服務器ip:8161便可進入web界面,點擊Manage ActiveMQ broker可以查看消息推送的狀態,默認賬號密碼為admin,admin

a31fb2c9935d7d814e74400488c7091f.png

(5)啟動錯誤分析

進入/root/apache-activemq-5.15.9/data目錄查看activemq.log文件,根據錯誤提示信息修改,例如端口號被占用等。

4.ActiveMQ的代碼測試

(1)構建maven項目,引入依賴

org.apache.activemq    activemq-all    5.9.0

(2)生產者類

/** * @Description 生產者 * @Date 2019/7/20 * @Created by yqh */public class MyProducer {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 創建連接工廠        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 創建連接        Connection connection = activeMQConnectionFactory.createConnection();        // 打開連接        connection.start();        // 創建會話        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 創建隊列目標,并標識隊列名稱,消費者根據隊列名稱接收數據        Destination destination = session.createQueue("myQueue");        // 創建一個生產者        MessageProducer producer = session.createProducer(destination);        // 向隊列推送10個文本消息數據        for (int i = 1 ; i <= 10 ; i++){            // 創建文本消息            TextMessage message = session.createTextMessage("第" + i + "個文本消息");            //發送消息            producer.send(message);            //在本地打印消息            System.out.println("已發送的消息:" + message.getText());        }        //關閉連接        connection.close();    }}

運行結果:

已發送的消息:第1個文本消息已發送的消息:第2個文本消息已發送的消息:第3個文本消息已發送的消息:第4個文本消息已發送的消息:第5個文本消息已發送的消息:第6個文本消息已發送的消息:第7個文本消息已發送的消息:第8個文本消息已發送的消息:第9個文本消息已發送的消息:第10個文本消息

測試查看web后臺顯示,有10條消息在隊列中等待消費

741f805875676838d69010b9abfc15d1.png

(3)消費者類

/** * @Description 消費者類 * @Date 2019/7/20 0020 * @Created by yqh */public class MyConsumer {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 創建連接工廠        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 創建連接        Connection connection = activeMQConnectionFactory.createConnection();        // 打開連接        connection.start();        // 創建會話        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 創建隊列目標,并標識隊列名稱,消費者根據隊列名稱接收數據        Destination destination = session.createQueue("myQueue");        // 創建消費者        MessageConsumer consumer = session.createConsumer(destination);        // 創建消費的監聽        consumer.setMessageListener(new MessageListener() {            public void onMessage(Message message) {                TextMessage textMessage = (TextMessage) message;                try {                    System.out.println("消費的消息:" + textMessage.getText());                } catch (JMSException e) {                    e.printStackTrace();                }            }        });    }}

測試結果:

消費的消息:第1個文本消息消費的消息:第2個文本消息消費的消息:第3個文本消息消費的消息:第4個文本消息消費的消息:第5個文本消息消費的消息:第6個文本消息消費的消息:第7個文本消息消費的消息:第8個文本消息消費的消息:第9個文本消息消費的消息:第10個文本消息

web后臺顯示有一個消費者處于連接狀態,且已消費了10個message,而該條隊列已沒有message待消費了

70affd879daf184562b635fee86edd35.png

(4)當我們運行兩個消費者類,消息又是怎么被消費的呢?是兩個消費者都能收到生產者生產的message,還是只有其中一個消費者能消費呢?

我們先運行兩個消費者,在運行一個生產者對目標隊列生產10個message,會發現有以下情況

// Consumer1控制臺消費的消息:第1個文本消息消費的消息:第3個文本消息消費的消息:第5個文本消息消費的消息:第7個文本消息消費的消息:第9個文本消息
// Consumer2控制臺消費的消息:第2個文本消息消費的消息:第4個文本消息消費的消息:第6個文本消息消費的消息:第8個文本消息消費的消息:第10個文本消息

即隊列中的數據會平均的分給每一個消費者消費,且每一條數據只能被消費一次

(5)以上是基于隊列點對點的傳輸類型,以下是基于發布/訂閱模式傳輸的類型測試

/** * @Description 基于發布/訂閱模式傳輸類型的生產者測試 * @Date 2019/7/20 0020 * @Created by yqh */public class MyProducerForTopic {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 創建連接工廠        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 創建連接        Connection connection = activeMQConnectionFactory.createConnection();        // 打開連接        connection.start();        // 創建會話        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 創建隊列目標,并標識隊列名稱,消費者根據隊列名稱接收數據        Destination destination = session.createTopic("topicTest");        // 創建一個生產者        MessageProducer producer = session.createProducer(destination);        // 向隊列推送10個文本消息數據        for (int i = 1 ; i <= 10 ; i++){            // 創建文本消息            TextMessage message = session.createTextMessage("第" + i + "個文本消息");            //發送消息            producer.send(message);            //在本地打印消息            System.out.println("已發送的消息:" + message.getText());        }        //關閉連接        connection.close();    }}
/** * @Description 基于發布/訂閱模式傳輸類型的消費者測試 * @Date 2019/7/20 0020 * @Created by yqh */public class MyConsumerForTopic {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 創建連接工廠        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 創建連接        Connection connection = activeMQConnectionFactory.createConnection();        // 打開連接        connection.start();        // 創建會話        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 創建隊列目標,并標識隊列名稱,消費者根據隊列名稱接收數據        Destination destination = session.createTopic("topicTest");        // 創建消費者        MessageConsumer consumer = session.createConsumer(destination);        // 創建消費的監聽        consumer.setMessageListener(new MessageListener() {            public void onMessage(Message message) {                TextMessage textMessage = (TextMessage) message;                try {                    System.out.println("消費的消息:" + textMessage.getText());                } catch (JMSException e) {                    e.printStackTrace();                }            }        });    }}

現在如果我們先啟動生產者,再啟動消費者,會發現消費者是無法接收到之前生產者之前所生產的數據,只有消費者先啟動,再讓生產者消費才可以正常接收數據,這也是發布/訂閱的主題模式與點對點的隊列模式的一個明顯區別。

而如果啟動兩個消費者,那么每一個消費者都能完整的接收到生產者生產的數據,即每一條數據都被消費了兩次,這是發布/訂閱的主題模式與點對點的隊列模式的另一個明顯區別。

更多大數據架構、實戰經驗,歡迎關注【大數據每日嗶嗶】,期待與你一起成長!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/276727.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/276727.shtml
英文地址,請注明出處:http://en.pswp.cn/news/276727.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

php發送get、post請求的幾種方法

方法1: 用file_get_contents 以get方式獲取內容 <?php $urlhttp://www.domain.com/; $html file_get_contents($url); echo $html; ?>方法2: 用fopen打開url, 以get方式獲取內容<?php $fp fopen($url, r); stream_get_meta_data($fp); while(!feof($fp)) { $res…

ZZ:深入理解new

new的過程當我們使用關鍵字new在堆上動態創建一個對象時&#xff0c;它實際上做了三件事&#xff1a;獲得一塊內存空間、調用構造函數、返回正確的指針。當然&#xff0c;如果我們創建的是簡單類型的變量&#xff0c;那么第二步會被省略。假如我們定義了如下一個類A&#xff1a…

mysql數據庫的優缺點

優點1. 通常存儲過程 標題有助于提高應用程序的性能。因為當你創建他的時候就已經編譯了&#xff0c;只不過是按需編譯的。2.存儲過程有助于減少應用程序和數據庫服務器之間的流量&#xff0c;因為應用程序不必發送多個冗長的SQL語句&#xff0c;而只能發送存儲過程的名稱和參數…

大數據小白系列——HDFS(1)

【注1&#xff1a;結尾有大福利&#xff01;】 【注2&#xff1a;想寫一個大數據小白系列&#xff0c;介紹大數據生態系統中的主要成員&#xff0c;理解其原理&#xff0c;明白其用途&#xff0c;萬一有用呢&#xff0c;對不對。】 大數據是什么&#xff1f;拋開那些高大上但籠…

PHP檢測遠端文件是否存在

簡單解釋一下上面的代碼。get_headers的作用就是訪問一個遠程地址&#xff0c;把服務器發送的HTTP頭以數組形式返回。而$header[0]則是服務器返回的狀態碼&#xff08;如果不出意外的話狀態碼應該都是第一個返回的&#xff09;。 要確定一個文件在遠端服務器上存在&#xff0c…

C#中使用DTS來導入數據及相關問題

向Sql 中導入Excel數據時&#xff0c;使用MS SQL的DTS功能 可以很方便的導入&#xff0c;同時引用Dll文件&#xff0c;可以在程序中對導入過程進行控制。 創建DTS包的過程如下&#xff1a; &#xff11;。在&#xff33;&#xff31;&#xff2c;企業管理器中&#xff0c;工具菜…

html select選擇事件_一道搜狗面試題:IO多路復用中select、poll、epoll之間的區別...

(1)select>時間復雜度O(n)它僅僅知道了&#xff0c;有I/O事件發生了&#xff0c;卻并不知道是哪那幾個流(可能有一個&#xff0c;多個&#xff0c;甚至全部)&#xff0c;我們只能無差別輪詢所有流&#xff0c;找出能讀出數據&#xff0c;或者寫入數據的流&#xff0c;對他們…

【MySQL】redo log --- 刷入磁盤過程

1、redo log基本概念 redo log的相關概念這里就不再過多闡述&#xff0c;網上有非常多的好的資料&#xff0c;可以看下縹緲大神的文章&#xff1a;https://www.cnblogs.com/cuisi/p/6525077.html&#xff0c;個人感覺介紹的非常詳細。 2、數據更改過程簡述 MySQL 在更新數據的時…

WPF DataGrid根據內容設置行顏色

轉&#xff1a; https://code.4noobz.net/wpf-change-color-of-a-row-in-a-datagrid-depending-on-the-value/ 轉載于:https://www.cnblogs.com/Mindy-hym/p/11475024.html

QQ web api

QQ的很多功能和信息可以通過web方式獲得&#xff5e;以下鏈接&#xff0c;星號應換成你要查詢的QQ號一、Activities Previewhttp://labs.qq.com/ie8/preview/?uin******二、QQ空間訪問次數查詢&#xff08;需權限&#xff09;http://g.qzone.qq.com/fcg-bin/cgi_emotion_list.…

delphi tclientsocket接收不到返回數據_RS—485中教你主站發送報文結構、從站返回報文結構?系列11...

作者&#xff1a;馬樂1.主站發送報文結構大家可以看到我之前寫的文章中的程序都是沒有什么具體功能的&#xff0c;都是兩個站點之間互相傳遞數據&#xff0c;這些數據我們只是看看是否可以正常接收發送&#xff0c;數據本身是沒有任何含義的。很明顯在實際使用過程中我們是不會…

MybatisPlus 通用枚舉無法正確取值

正常使用mybatisplus <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.0.4</version></dependency> 使用后發現項目中原先對枚舉值的轉換存在異常&#xff1a; ER…

零基礎學習 Python 之條件語句

寫在之前 我們寫程序&#xff0c;就好比學生時代寫作文一樣&#xff0c;由 “字” 到 “詞” 到 “句” 最后到 “文章” 。此前我們學會了一些詞語&#xff08;對象類型&#xff09;&#xff0c;我們接下來就是學如何造句&#xff0c;而在編程語言里&#xff0c;句子被叫做語句…

python input 文件名_Python播放音頻與錄音

這一講主要介紹些音頻基本處理方式&#xff0c;為接下來的語音識別打基礎。三種播放音頻的方式使用 python 播放音頻有以下幾種方式&#xff1a;os.system()os.system(file) 調用系統應用來打開文件&#xff0c;file 可為圖片或者音頻文件。缺點&#xff1a;要打開具體的應用&a…

jQuery選擇器和方法的等價關系

層級選擇器 1、ancestor descendant &#xff08;后代選擇器&#xff09; 在給定的祖先元素下匹配所有的后代元素 $(“form input”) $(.div span)選取<div>里的所有的<span>元素 2、parent child &#xff08;子選擇器&#xff09;在給定的父元素下匹配所有…

ActionScript 3.0 Step By Step系列(四):來自面向對象開發之前的吶喊:“學會寫可重用的代碼”...

增強代碼的可重用能力&#xff0c;從創建可重用的代碼開始&#xff0c;可重用的代碼則是通過從現有代碼中重構加以封裝,使其成為功能單一的可復用代碼塊。這句話籠統點說便是“封裝”或“抽象”。 在實際的編程開發中&#xff0c;要實現代碼重用&#xff0c;而不是每次都去Copy…

express利用nodemailer發送郵件(163郵箱)

Nodemailer 是一個簡單易用的Node.js郵件發送組件 首先安裝這個組件 npm install nodemailer --save安裝之后&#xff0c;可以在某個get請求下&#xff0c;發送郵件&#xff0c;具體路由代碼&#xff1a; const express require("express"); const nodemailer requ…

使用 Solid 私有化存儲 IPFS 文件哈希值

背景 星際文件系統 IPFS&#xff08;InterPlanetary File System&#xff09;是一個面向全球的、點對點的分布式文件系統&#xff0c;目標是為了補充&#xff08;甚至是取代&#xff09;目前統治互聯網的超文本傳輸協議&#xff08;HTTP&#xff09;&#xff0c;將所有具有相同…

使用window.postMessage實現跨域通信

JavaScript由于同源策略的限制,跨域通信一直是棘手的問題。當然解決方案也有很多&#xff1a; document.domainiframe的設置&#xff0c;應用于主域相同而子域不同&#xff1b;利用iframe和location.hash&#xff0c;數據直接暴露在了url中&#xff0c;數據容量和類型都有限Fla…

appium啟動app失敗_Appium-Desktop Capability 配置及啟動App演示

Appium-Desktop Capability配置介紹desired capability的功能是配置Appium會話。為什么要配置capability&#xff0c;目的就是為了告訴Appium服務器您想要自動化的平臺和應用程序。Desired Capabilities是一組設置的鍵值對的集合&#xff0c;其中鍵對應設置的名稱&#xff0c;而…