RabbitMQ 從入門到精通 (一)

目錄

  • 1. 初識RabbitMQ
  • 2. AMQP
  • 3.RabbitMQ的極速入門
  • 4. Exchange(交換機)詳解
    • 4.1 Direct Exchange
    • 4.2 Topic Exchange
    • 4.3 Fanout Exchange
  • 5. Message 消息

1. 初識RabbitMQ

RabbitMQ 是一個開源的消息代理和隊列服務器,用來通過普通協議在完全不同的應用之間共享數據,RabbitMQ是使用 Erlang語言來編寫的,并且RabbitMQ是基于AMQP協議的

RabbitMQ的優點:

  • 開源、性能優秀、穩定性保障
  • 提供可靠性消息投遞模式(confirm)、返回模式(return)
  • 與SpringAMQP完美的整合、API豐富
  • 集群模式豐富,表達式配置,HA模式,鏡像隊列模型
  • 保證數據不丟失的前提下做到高可靠性、可用性

RabbitMQ官網

RabbitMQ的整體架構:

1348730-20190606002957380-2097750065.png

?
RabbitMQ的消息流轉:

1348730-20190606002655965-1977548174.png

?

?

2. AMQP

AMQP全稱: Advanced Message Queuing Protocol

AMQP翻譯: 高級消息隊列協議

AMQP定義: 是具有現代特征的二進制協議。是一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計

1348730-20190606002906491-408602073.png

?
?

AMQP核心概念:

  • Server:又稱Broker,接受客戶端的連接,實現AMQP實體服務
  • Connection:連接,應用程序與Broker的網絡連接
  • Channel:網絡信道,幾乎所有的操作都在Channel中進行,Channel是進行消息讀寫的通道。客戶端可建立多個Channel,每個Channel代表一個會話任務
  • Message:消息,服務器和應用程序之間傳送的數據,由Properties和Body組成。Properties可以對消息進行修飾,比如消息的優先級、延遲等高級特性;Body則是消息體的內容
  • Virtual host:虛擬地址,用于進行邏輯隔離,最上層的消息路由。同一個Virtual Host里面不能有相同名稱的Exchange或Queue
  • Exchange:交換機,接收消息,根據路由鍵轉發消息到綁定的隊列
  • Binding:Exchange和Queue之間的虛擬連接,binding中可以包含routing key
  • Routing key:一個路由規則,虛擬機可用它確定如何路由一個特定消息
  • Queue:也稱為Message Queue,消息隊列,保存消息并將它們轉發給消費者

?

?

3.RabbitMQ的極速入門

后臺啟動: ./rabbitmq start &

關閉: ./rabbitmqctl stop

節點狀態: ./rabbitmqctl status

管控臺: http://ip:15672

?

?

RabbitMQ生產消費快速入門:

環境: springboot+jdk1.7+rabbitmq3.6.5 (Maven依賴配置)

 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.9.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version></dependency></dependencies>

?

public class Procuder {public static void main(String[] args) throws Exception {//1.創建一個ConnectionFactory 并進行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通過連接工廠創建連接Connection connection = connectionFactory.newConnection();//3.通過Connection 創建一個 ChannelChannel channel = connection.createChannel();/*** basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* exchange:指定交換機 不指定 則默認 (AMQP default交換機) 通過routingkey進行匹配 * props 消息屬性* body 消息體*///4.通過Channel發送數據for(int i = 0; i < 5; i++){System.out.println("生產消息:" + i);String msg = "Hello RabbitMQ" + i;channel.basicPublish("", "test", null, msg.getBytes());}//5.記得關閉相關的連接channel.close();connection.close();}
}

?

public class Consumer {public static void main(String[] args) throws Exception{//1.創建一個ConnectionFactory 并進行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通過連接工廠創建連接Connection connection = connectionFactory.newConnection();//3.通過Connection 創建一個 ChannelChannel channel = connection.createChannel();//4. 聲明創建一個隊列String queueName = "test";/*** durable 是否持久化* exclusive 獨占的  相當于加了一把鎖*/channel.queueDeclare(queueName,true,false,false,null);//5.創建消費者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);//6.設置channel/*** ACK: 當一條消息從生產端發到消費端,消費端接收到消息后會馬上回送一個ACK信息給broker,告訴它這條消息收到了* autoack: * true  自動簽收 當消費者一收到消息就表示消費者收到了消息,消費者收到了消息就會立即從隊列中刪除。* false 手動簽收 當消費者收到消息在合適的時候來顯示的進行確認,說我已經接收到了該消息了,RabbitMQ可以從隊列中刪除該消息了* */channel.basicConsume(queueName, true, queueingConsumer);//7.獲取消息while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消費端:" + msg);//Envelope envelope = delivery.getEnvelope();}}
}

?

4. Exchange(交換機)詳解

Exchange: 接收消息,并根據路由鍵轉發消息所綁定的隊列

1348730-20190606003024596-916792922.png

?

交換機屬性:

  • Name: 交換機名稱
  • Type: 交換機類型 diect、topic、fanout、headers
  • Durability: 是否需要持久化,true為持久化
  • AutoDelete: 當最后一個綁定到Exchange的隊列刪除后,自動刪除該Exchange
  • Internal: 當前Exchange是否用于RabbitMQ內部使用,默認為false (百分之99的情況默認為false 除非對Erlang語言較了解,做一些擴展)
  • Arguments: 擴展參數, 用于擴展AMQP協議可自定化使用

?

4.1 Direct Exchange

所有發送到Direct Exchange的消息被轉發到RouteKey指定的Queue

注意:Direct模式可以使用RabbitMQ自帶的Exchange: default Exchange,所以不需要將Exchange進行任何綁定(binding)操作,消息傳遞時,RoutingKey必須完全匹配才會被隊列接收,否則該消息會被拋棄

1348730-20190606003044253-618019408.png

?

public class ProducerDirectExchange {public static void main(String[] args) throws Exception {//1.創建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//2.創建ConnectionConnection connection = connectionFactory.newConnection();//3.創建ChannelChannel channel = connection.createChannel();//4.聲明String exchangeName = "test_direct_exchange";String routingKey = "test.direct";//5.發送String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());}
}

?

public class ConsumerDirectExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//聲明String exchangeName = "test_direct_exchange";String exchangeType = "direct";String queueName = "test_direct_queue";String routingKey = "test.direct";//表示聲明了一個交換機channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示聲明了一個隊列channel.queueDeclare(queueName,false,false,false,null);//建立一個綁定關系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//參數:隊列名稱,是否自動ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循環獲取消息while(true){//獲取消息,如果沒有消息,這一步將會一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}

?

4.2 Topic Exchange

所有發送到Topic Exchange的消息被轉發到所有關心RouteKey中指定Topic的Queue上

Exchange將RouteKey和某Topic進行模糊匹配,此時隊列需要綁定一個Topic

1348730-20190606003103076-1591219841.png

注意:可以使用通配符進行匹配

符號 # 匹配一個或多個詞

符號 * 匹配不多不少一個詞

例如: "log.#" 能夠匹配到 “log.info.oa”

? "log.*" 只會匹配到 "log.err"

public class ProducerTopicExchange {public static void main(String[] args) throws Exception {//1.創建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.創建ConnectionConnection connection = connectionFactory.newConnection();//3.創建ChannelChannel channel = connection.createChannel();//4.聲明String exchangeName = "test_topic_exchange";String routingKey1 = "user.save";String routingKey2 = "user.update";String routingKey3 = "user.delete.abc";//5.發送String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());}
}

?

public class ConsumerTopicExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//聲明String exchangeName = "test_topic_exchange";String exchangeType = "topic";String queueName = "test_topic_queue";String routingKey = "user.#";//表示聲明了一個交換機channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示聲明了一個隊列channel.queueDeclare(queueName,false,false,false,null);//建立一個綁定關系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//參數:隊列名稱,是否自動ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循環獲取消息while(true){//獲取消息,如果沒有消息,這一步將會一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}

?

4.3 Fanout Exchange

不處理路由鍵,只需要簡單的將隊列綁定到交換機上
發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上
所以Fanout交換機轉發消息是最快的

1348730-20190606003117689-979368743.png

?

public class ProducerFanoutExchange {public static void main(String[] args) throws Exception {//1.創建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.創建ConnectionConnection connection = connectionFactory.newConnection();//3.創建ChannelChannel channel = connection.createChannel();//4.聲明String exchangeName = "test_fanout_exchange";//5.發送for(int i = 0; i < 10 ; i++){String msg = "Hello World RabbitMQ4 Direct Exchange Message";channel.basicPublish(exchangeName, "", null, msg.getBytes());}channel.close();connection.close();}
}

?

public class ConsumerFanoutExchange {public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);connectionFactory.setAutomaticRecoveryEnabled(true);connectionFactory.setNetworkRecoveryInterval(3000);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//聲明String exchangeName = "test_fanout_exchange";String exchangeType = "fanout";String queueName = "test_topic_queue";//無需指定路由key String routingKey = "";//表示聲明了一個交換機channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);//表示聲明了一個隊列channel.queueDeclare(queueName,false,false,false,null);//建立一個綁定關系channel.queueBind(queueName, exchangeName, routingKey);//durable 是否持久化消息QueueingConsumer consumer = new QueueingConsumer(channel);//參數:隊列名稱,是否自動ACK,Consumerchannel.basicConsume(queueName, true, consumer);//循環獲取消息while(true){//獲取消息,如果沒有消息,這一步將會一直阻塞Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("收到消息:" + msg);}}
}

?

5. Message 消息

服務器與應用程序之間傳遞的數據,本質上就是一段數據,由Properties和Body組成

常用屬性:delivery mode、headers (自定義屬性)

其他屬性:content_type、content_encoding、priority、expiration

消息的properties屬性用法示例:

public class Procuder {public static void main(String[] args) throws Exception {//1.創建一個ConnectionFactory 并進行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通過連接工廠創建連接Connection connection = connectionFactory.newConnection();//3.通過Connection 創建一個 ChannelChannel channel = connection.createChannel();Map<String,Object> headers = new HashMap<>();headers.put("my1", "111");headers.put("my2", "222");//10秒不消費 消息過期移除消息隊列AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();//4.通過Channel發送數據for(int i = 0; i < 5; i++){System.out.println("生產消息:" + i);String msg = "Hello RabbitMQ" + i;channel.basicPublish("", "test", properties, msg.getBytes());}//5.記得關閉相關的連接channel.close();connection.close();}
}

?

public class Consumer {public static void main(String[] args) throws Exception{//1.創建一個ConnectionFactory 并進行配置ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.244.11");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setHandshakeTimeout(20000);//2.通過連接工廠創建連接Connection connection = connectionFactory.newConnection();//3.通過Connection 創建一個 ChannelChannel channel = connection.createChannel();//4. 聲明創建一個隊列String queueName = "test";channel.queueDeclare(queueName,true,false,false,null);//5.創建消費者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);//6.設置channelchannel.basicConsume(queueName, true, queueingConsumer);//7.獲取消息while(true){Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消費端:" + msg);Map<String, Object> headers = delivery.getProperties().getHeaders();System.err.println("headers value:" + headers.get("my1"));}}
}

轉載于:https://www.cnblogs.com/dwlovelife/p/10982735.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/448802.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/448802.shtml
英文地址,請注明出處:http://en.pswp.cn/news/448802.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

接收并解析消息體傳參、解析 json 參數

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 1.場景&#xff1a;postman 發送了一個 post 請求&#xff0c;如下&#xff1a; 2. 解析方式為用一個 vo 對象來接收 json。把 json 中的…

OpenCL memory object 之 傳輸優化

首先我們了解一些優化時候的術語及其定義&#xff1a; 1、deferred allocation&#xff08;延遲分配&#xff09;&#xff0c; 在第一次使用memory object傳輸數據時&#xff0c;runtime才對memory object真正分配空間。 這樣減少了資源浪費&#xff0c;但第一次使用時要慢一些…

VBS使文本框的光標位于所有字符后

有時候在文本框里會顯示一部分提示信息&#xff0c;用戶在這些提示信息后面輸入文本&#xff0c;但是將焦點設置于文本框后&#xff0c;光標總是在文本框的最前面&#xff0c; 用戶輸入的時候需要按"-->"鍵將光標移到最后才能輸入&#xff0c;這樣的操作很不爽。我…

記錄ionic 最小化應用時所遇的問題

ionic3與ionic4最小化插件安裝不一樣&#xff1a; ionic3安裝方法&#xff1a; $ ionic cordova plugin add cordova-plugin-appminimize $ npm install --save ionic-native/app-minimize4 并在app.module.ts中 注入依賴&#xff1a; import { AppMinimize } from ionic-nativ…

解決 --- Docker 啟動時報錯:iptables:No chain/target/match by the name

問題&#xff1a;jenkins的docker containner啟動失敗&#xff0c;報錯&#xff1a;failed programming external connectivity … iptables: No chain/target/match by that name” docker 服務啟動的時候&#xff0c;docker服務會向iptables注冊一個鏈&#xff0c;以便讓dock…

AMD OpenCL 大學課程

AMD OpenCL大學課程是非常好的入門級OpenCL教程&#xff0c;通過看教程中的PPT&#xff0c;我們能夠很快的了解OpenCL機制以及編程方法。下載地址&#xff1a;http://developer.amd.com/zones/OpenCLZone/universities/Pages/default.aspx 教程中的英文很簡單&#xff0c;我相信…

第一篇 計算機基礎

1.什么是編程語言 python和中文、英語一樣、都是一門語言&#xff0c;只要是語言&#xff0c;其實就庫看成是一種事物與另一種事物溝通的介質。python屬于編程語言&#xff0c;編程語言是程序員與計算機之間溝通的介質&#xff1b;中文和英文則是人與人之間溝通的介質。 2.什么…

47.QT-QChart之曲線圖,餅狀圖,條形圖使用

1.使用準備 在pro中, 添加QT charts 然后在界面頭文件中添加頭文件并聲明命名空間,添加: #include <QtCharts> QT_CHARTS_USE_NAMESPACE 2.QChart之曲線圖 繪制曲線圖需要用到3個類 QSplineSeries: 用于創建有由一系列數據組成的曲線.類似的還有QPieSeries(餅圖數據). Q…

Docker 部署應用、jar 工程 docker 方式部署

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 1. 把要部署的工程打成一個jar包。&#xff08;我的工程叫 gentle &#xff09; 打 jar 的方法&#xff1a;超簡單方法&#xff1a; Int…

流浪不是我的初衷 ... ...

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 或許&#xff0c;我從來就是一個習慣沉默的人 ... 或許&#xff0c;我從來就不善于傾述 ... 會有難過的時候&#xff0c;會有覺得累的…

第二階段沖刺(2)

1、整個項目預期的任務量 &#xff08;任務量 所有工作的預期時間&#xff09;和 目前已經花的時間 &#xff08;所有記錄的 ‘已經花費的時間’&#xff09;&#xff0c;還剩余的時間&#xff08;所有工作的 ‘剩余時間’&#xff09; &#xff1b; 所有工作的預期時間&#…

VS2008+OpenCL環境配置

1. 配置.cl文件支持: 1.1. 打開VS2008&#xff0c; 工具->選項->文本編輯器->文件擴展名&#xff0c;添加一個新的擴展名&#xff0c;指定編輯器為Microsoft Visual C 。這樣在OpenCL文件中就能顯示C的語法高亮了。 1.2. 配置OpenCL語法高亮 - 打開目錄~\NVIDIA Corpo…

第十二周學習進度報告

代碼時間&#xff1a;17小時左右&#xff0c; 代碼量&#xff1a;300行左右&#xff0c; 閱讀&#xff1a;一個app的誕生20頁&#xff1b;構建之法30頁 知識&#xff1a;抽象典型用戶&#xff08;具有代表性&#xff09;和場景&#xff0c;去設計相應功能。 轉載于:https://www…

我的桃花源

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 看了一個動畫片&#xff08;《貓與桃花源》&#xff09;&#xff0c;畫風和內容并不是我最偏好的... 但故事結尾的旁白和歌曲卻打動了一…

promise實例

不廢話&#xff0c;粘代碼 function ajax(method, url, data) {let request new XMLHttpRequest();return new Promise(function (resolve, reject) {request.onreadystatechange function () {if (request.readyState 4) {if (request.status 200) {resolve(request.respo…

華為路由器配置DHCP中繼

DHCP(動態主機配置協議)理論知識&#xff1a;DHCP主要用來為客戶機自動配置I P地址相關的網絡參數&#xff0c;包括IP地址、子網掩碼、默認網關、DNS服務器等。 DHCP 通信為廣播的方式&#xff0c;因此當需要 DHCP 服務器為不同廣播域&#xff08;路由或 VLAN 網段&#xff09;…

基于GPU的K-Means聚類算法

聚類是信息檢索、數據挖掘中的一類重要技術&#xff0c;是分析數據并從中發現有用信息的一種有效手段。它將數據對象分組成為多個類或簇&#xff0c;使得在同一個簇中的對象之間具有較高的相似度&#xff0c;而不同簇中的對象差別很大。作為統計學的一個分支和一種無監督的學習…

IntelliJ IDEA 工具篇之如何切換 git 分支

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 1、進入項目和工程。 2、點擊右下角的git:master&#xff0c;然后選擇origin/master&#xff0c;然后選擇你要切換的分支&#xff0c;我…

IDEA---SpringBoot同一個項目多端口啟動

-Dserver.port xxxx 轉載于:https://www.cnblogs.com/tonyzt/p/10987116.html

好程序員Web前端分享無法忽視的JavaScript技巧

好程序員Web前端分享無法忽視的JavaScript技巧。在大家從事web前端的工作中&#xff0c;很容易忽視一些JavaScript的小技巧&#xff0c;今天為大家總結了一些容易被大家忽略的技巧&#xff0c;希望能夠對大家有所幫助。1、過濾唯一值Set類型是在ES6中新增的&#xff0c;它類似于…