RabbitMq使用與整合

MQ基本概念

MQ概述

MQ全稱 Message Queue([kju?])(消息隊列),是在消息的傳輸過程中保存消息的容器。多用于分布式系統之間進行通信。

(隊列是一種容器,用于存放數據的都是容器,存放消息的就是消息隊列)

分布式系統的調用:

方式一:直接調用

方式二:間接調用

A將數據存放到中間一個系統,通過中間的系統發送到B

中間系統可以成為中間件MQ

MQ是用于存放消息的中間件

被調用者叫生產者 調用者是消費者

MQ的優勢和劣勢

優勢

應用解耦:提高系統容錯性和可維護性。

異步提速:提升用戶體驗和系統吞吐量。

削峰填谷:提高系統穩定性。

應用解耦

系統的耦合性越高,容錯性就越低,可維護性就越低。

使用 MQ 使得應用間解耦,提升容錯性和可維護性

異步提速

提升用戶體驗和系統吞吐量(單位時間內處理請求的數目)。

削峰填谷(削峰)

使用了 MQ 之后,限制消費消息的速度為1000,這樣一來,高峰期產生的數據勢必會被積壓在 MQ 中,高峰就被“削”掉了,但是因為消息積壓,在高峰期過后的一段時間內,消費消息的速度還是會維持在1000,直到消費完積壓的消息,這就叫做“填谷”。

使用MQ后,可以提高系統穩定性。

劣勢

系統可用性降低

系統引入的外部依賴越多,系統穩定性越差。一旦 MQ 宕機,就會對業務造成影響。如何保證MQ的高可用?

系統復雜度提高

MQ 的加入大大增加了系統的復雜度,以前系統間是同步的遠程調用,現在是通過 MQ 進行異步調用。如何保證消息沒有被重復消費?怎么處理消息丟失情況?那么保證消息傳遞的順序性?

一致性問題

A 系統處理完業務,通過 MQ 給B、C、D三個系統發消息數據,如果 B 系統、C 系統處理成功,D 系統處理失敗。如何保證消息數據處理的一致性?

既然 MQ 有優勢也有劣勢,那么使用 MQ 需要滿足什么條件呢?

消費者-->生產者

1.生產者不需要從消費者處獲得反饋。引入消息隊列之前的直接調用,其接口的返回值應該為空,這才讓明明下層的動作還沒做,上層卻當成動作做完了繼續往后走,即所謂異步成為了可能。

2.容許短暫的不一致性。

3.確實是用了有效果。即解耦、提速、削峰這些方面的收益,超過加入MQ,管理MQ這些成本。

RabbitMQ基本介紹

2007年,Rabbit 技術公司基于 AMQP 標準開發的 RabbitMQ 1.0 發布。RabbitMQ 采用 Erlang 語言開發。Erlang語言由Ericson設計,專門為開發高并發和分布式系統的一種語言,在電信領域使用廣泛。

RabbitMQ 基礎架構

Broker 中間者 服務

procedure 和consumer都是客戶端

客戶端通過鏈接和服務端進行通信 所以需要建立起來連接 然后進行通信a

使用channel(管道)節省資源

一個rabbitmq里面有很多的虛擬機 相當于mysql里面有很多數據庫,數據庫里面有很多表,都是獨立的。

每個虛擬機里面有很多的exchange和queue 獨立分區的作用

?RabbitMQ 中的相關概念

Broker:接收和分發消息的應用,RabbitMQ Server就是 Message Broker。

Virtual host:出于多租戶和安全因素設計的,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似于網絡中的 namespace 概念。當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個用戶在自己的 vhost 創建 exchange/queue 等。

Connection:publisher/consumer 和 broker 之間的 TCP 連接。

Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection,在消息量大的時候建立 TCP Connection的開銷將是巨大的,效率也較低。Channel 是在 connection 內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創建單獨的 channel 進行通訊,AMQP method 包含了channel id 幫助客戶端和message broker 識別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級的 Connection 極大減少了操作系統建立 TCP connection 的開銷。

Exchange:message 到達 broker 的第一站,根據分發規則,匹配查詢表中的 routing key,分發消息到queue 中去。常用的類型有:

????????direct (point-to-point)

????????topic (publish-subscribe)

????????fanout (multicast)

Queue:消息最終被送到這里等待 consumer 取走

Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查詢表中,用于 message 的分發依據

RabbitMQ的6 種工作模式

RabbitMQ 提供了 6 種工作模式:

簡單模式、work queues、Publish/Subscribe 發布與訂閱模式、Routing 路由模式、Topics 主題模式、RPC 遠程調用模式(遠程調用,不太算 MQ;暫不作介紹)。

官網對應模式介紹:RabbitMQ Tutorials — RabbitMQ

RabbitMQ的安裝和配置

安裝依賴環境

在線安裝依賴環境:

yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

安裝Erlang

rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm

安裝RabbitMQ

#安裝依賴的包

rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm

#安裝rabbitmq

rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm

rpm -ivh?erlang-22.0.7-1.el7.x86_64.rpm?socat-1.7.3.2-2.el7.x86_64.rpm?rabbitmq-server-3.7.18-1.el7.noarch.rpm

啟動RabbitMQ

systemctl start rabbitmq-server # 啟動服務

systemctl stop rabbitmq-server # 停止服務

systemctl restart rabbitmq-server # 重啟服務

systemctl status rabbitmq-server #?查看狀態

開啟管理界面及配置

# 開啟管理界面

rabbitmq-plugins enable rabbitmq_management

# 修改默認配置信息

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/ebin/rabbit.app

# 比如修改密碼、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest

修改之后重啟一下rabbitmq

入門實例

1.添加虛擬主機

2.添加用戶

3.重新設置權限

點擊虛擬主機設置權限

4.idea連接

項目結構搭建

mq 導入依賴

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version>
</dependency>

生產者

public static void main( String[] args ) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setHost("192.168.229.16");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/test");
//        建立連接Connection connection = connectionFactory.newConnection();
//        管道Channel channel = connection.createChannel();
//        創建隊列/** String queue 隊列的名稱* boolean durable 持久化* boolean exclusive 是否獨占* boolean autoDelete 是否自動刪除* Map<String,Object> arguments 參數* */channel.queueDeclare("test01",false,false,false,null);
//        發布消息channel.basicPublish("","test01",null,"第一次發送".getBytes());}

消費者

{public static void main( String[] args ) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setHost("192.168.229.16");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/test");
//        建立連接Connection connection = connectionFactory.newConnection();
//        管道Channel channel = connection.createChannel();
//        消費信息Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println(s);}};channel.basicConsume("test01",true,consumer);}

同時消費后消息組內為 0

RabbitMQ工作模式

Work queues工作隊列模式

Work Queues與入門程序的簡單模式相比,多了一個或一些消費端,多個消費端共同消費同一個隊列中的消息。

應用場景對于 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。

代碼

Work Queues與入門程序的簡單模式的代碼是幾乎一樣的;可以完全復制,并復制多一個消費者進行多個消費者同時消費消息的測試。

1.復制一個消費者

2.先運行起兩個消費者

3.在生產者中多發布幾條消息

for (int i = 0; i < 10; i++) {channel.basicPublish("","test01",null,("第"+i+"次發送").getBytes());
}

4.兩個消費者會采用輪詢的方式拿到消息

在一個隊列中如果有多個消費者,那么消費者之間對于同一個消息的關系是競爭的關系

訂閱模式類型

而在訂閱模型中,多了一個exchange角色,而且過程略有變化:

生產者發消息給交換機,交換機將消息路由分發給隊列,消費者監聽隊列接收信息

Exchange:交換機。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:

Fanout:廣播,將消息交給所有綁定到交換機的隊列

Direct:定向,把消息交給符合指定routing key 的隊列

Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列

Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!

代碼實現

生產者更改代碼邏輯為:

//        創建交換機channel.exchangeDeclare("Exchange", BuiltinExchangeType.FANOUT,false);
//        創建隊列channel.queueDeclare("test01Ex",false,false,false,null);channel.queueDeclare("test02Ex",false,false,false,null);
//        隊列綁定交換機channel.queueBind("test01Ex","Exchange","");channel.queueBind("test02Ex","Exchange","");
//        發布消息for (int i = 0; i < 10; i++) {channel.basicPublish("Exchange","",null,("第"+i+"次發送").getBytes());}

消費者分別從兩個隊列獲取消息

Routing路由模式(Direct:定向)

隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)

消息的發送方在 向Exchange發送消息時,也必須指定消息的RoutingKey

Exchange不再把消息交給每一個綁定的隊列,而是根據消息的RoutingKey進行判斷,只有隊列的RoutingKey與消息的 RoutingKey完全一致,才會接收到消息

 channel.queueBind(queuename,exchangename,"error"); // errorchannel.queueBind(queuename2,exchangename,"error");// error info channel.queueBind(queuename2,exchangename,"info");// 發送消息//String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish(exchangename,"info",null,"hello".getBytes());

Topics通配符模式

Topic與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定RoutingKey的時候使用通配符

RoutingKey一般都是有一個或多個單詞組成,多個單詞之間以”.”分割

#:匹配一個或多個詞?

*:匹配不多不少恰好1個詞???test.* test.insert

channel.queueBind(queuename,exchangename,"order.*");channel.queueBind(queuename,exchangename,"*.error");channel.queueBind(queuename2,exchangename,"*.*");//發送消息//String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish(exchangename,"goods.info",null,"hello".getBytes());

模式總結

RabbitMQ工作模式:

簡單模式 HelloWorld

一個生產者、一個消費者,不需要設置交換機(使用默認的交換機)

工作隊列模式 Work Queue

一個生產者、多個消費者(競爭關系),不需要設置交換機(使用默認的交換機)

發布訂閱模式 Publish/subscribe

需要設置類型為fanout的交換機,并且交換機和隊列進行綁定,當發送消息到交換機后,交換機會將消息發送到綁定的隊列

路由模式 Routing

需要設置類型為direct的交換機,交換機和隊列進行綁定,并且指定routing key,當發送消息到交換機后,交換機會根據routing key將消息發送到對應的隊列

通配符模式 Topic

需要設置類型為topic的交換機,交換機和隊列進行綁定,并且指定通配符方式的routing key,當發送消息到交換機后,交換機會根據routing key將消息發送到對應的隊列

SpringBoot整合Mq

在Spring項目中,可以使用Spring-Rabbit去操作RabbitMQ

尤其是在spring boot項目中只需要引入對應的amqp啟動器依賴即可,方便的使用RabbitTemplate發送消息,使用注解接收消息。

創建工程結構

添加依賴(sys-mq

<parent><groupId>com.example</groupId><artifactId>mqdemo02</artifactId><version>0.0.1-SNAPSHOT</version></parent><artifactId>sys-mq</artifactId><packaging>pom</packaging><name>sys-mq</name><url>http://maven.apache.org</url><modules><module>mq-product</module><module>mq-consumer</module></modules><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency>
<!--        rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>

sys-mq里面兩個子模塊的 application.yml

server

? ? ?port: 記得修改

spring:rabbitmq:username: rootpassword: roothost: 192.168.229.16virtual-host: /testport: 5672
server:port: 8086

生產者

@Configuration
public class RabbitMqConfig {
//    設置交換機的名稱和隊列的名字public static final String EXCHANGE_NAME="exchange_topic-test";public static final String QUEUE_NAME="queue_topic-test";public static final String QUEUE_NAME2="queue_topic-test2";
//    創建交換機 將交換機作為bean注入到spring中@Bean("topicExchange")public Exchange topicExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(false).build();}
//    隊列@Bean("topicQueue")public Queue topicQueue(){return QueueBuilder.durable(QUEUE_NAME).build();}@Bean("topicQueue2")public Queue topicQueue2(){return QueueBuilder.durable(QUEUE_NAME2).build();}
//    將隊列與交換機進行綁定@Beanpublic Binding exchangeWithQueue(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("*.test").noargs();}@Beanpublic Binding exchangeWithQueue2(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();}
}
@SpringBootTest
public class MyTest {public static final String EXCHANGE_NAME="exchange_topic-test";@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMsg() {rabbitTemplate.convertAndSend(EXCHANGE_NAME,"success.test","測試整合");}
}

消費者

@Component
public class MyListener {
//    監聽隊列的消息@RabbitListener(queues = "queue_topic-test")public void listenQueue(Message message) {byte[] body = message.getBody();System.out.println(new String(body));}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/167933.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/167933.shtml
英文地址,請注明出處:http://en.pswp.cn/news/167933.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

優秀的時間追蹤軟件Timemator for Mac輕松管理時間!

在現代社會&#xff0c;時間管理成為了我們工作和生活中的一大挑戰。如果你經常感到時間不夠用&#xff0c;無法高效地完成任務&#xff0c;那么Timemator for Mac將成為你的得力助手。 Timemator for Mac是一款出色的時間追蹤軟件&#xff0c;它可以幫助你精確記錄和管理你的…

Linux的基本指令 ( 一 )

目錄 前言 Linux基本指令 快速認識五個指令 ls指令 補充內容 pwd指令 補充內容 cd指令 補充內容 重新認識指令 指令的本質 which指令 alias指令 最后 一個文件的三種時間 tree指令及安裝 tree指令 前言 關于Linux操作系統的桌面&#xff0c;在學校教學中我們…

實用高效 無人機光伏巡檢系統助力電站可持續發展

近年來&#xff0c;我國光伏發電行業規模日益壯大&#xff0c;全球領先地位愈發鞏固。為解決光伏電站運維中的難題&#xff0c;浙江某光伏電站與復亞智能達成戰略合作&#xff0c;共同推出全自動無人機光伏巡檢系統&#xff0c;旨在提高發電效率、降低運維成本&#xff0c;最大…

Spark---SparkCore(一)

一、術語與寬窄依賴 1、術語解釋 1、Master(standalone):資源管理的主節點&#xff08;進程&#xff09; 2、Cluster Manager:在集群上獲取資源的外部服務(例如&#xff1a;standalone,Mesos,Yarn) 3、Worker Node(standalone):資源管理的從節點(進程)或者說管理本機資源的…

用Python寫一個瀏覽器集群框架

更多Python學習內容&#xff1a;ipengtao.com 在分布式爬蟲和大規模數據采集的場景中&#xff0c;使用瀏覽器集群是一種有效的方式&#xff0c;可以提高數據采集的速度和效率。本文將介紹如何用Python編寫一個簡單但強大的瀏覽器集群框架&#xff0c;以應對需要使用多個瀏覽器實…

WebGL/threeJS面試題掃描與總結

什么是 WebGL&#xff1f;什么是 Three.js&#xff1f;請解釋three.js中的WebGL和Canvas的區別&#xff1f; WebGL(全寫Web Graphics Library)是一種3D繪圖協議&#xff0c;這種繪圖技術標準允許把JavaScript和OpenGL ES 2.0結合在一起&#xff0c;通過增加OpenGL ES 2.0的一個…

分庫分表、分布式數據庫、MPP

分庫分表、分布式數據庫、MPP的區別嗎&#xff1f; 一、MySQL分庫分表和MySQL分布式集群在性能方面各有優劣&#xff0c;具體取決于應用場景和需求。 MySQL分庫分表&#xff1a; 在分庫分表的場景下&#xff0c;可以將負載分散到多個數據庫實例上&#xff0c;從而提高整體性能…

【模糊測試】課堂筆記

模糊測試 模糊測試過程通常是自動化的。這個過程經典地分為以下幾個階段。 準備&#xff1a;這是第一階段&#xff0c;重點是 SUT 輸入和輸出格式的識別和規范。基于此&#xff0c;規范可以減少生成初始無效模糊數據的可能性并創建有效且精確的輸入。Fuzz Data Generation&am…

思科模擬器操作命令

模式 思科模擬器常見的模式有 用戶模式 能夠操作的命令比較少 特權模式特權模式下面可以操作的比較多 全局模式 接口模式 用戶模式進入特權模式: 命令enable 特權模式進行全局模式命令: configure terminal 退出命令 exit命令&#xff1a;返回上一層&#xff0c;即一步一步…

RocketMQ 消息中間件 知識點匯總

目錄 RocketMQ1、什么是RocketMQ?常用術語:2、為什么需要消息隊列3、什么是異步處理4、什么是服務解耦5、什么是流量控制6、消息隊列兩種模型隊列模型:發布/訂閱模型:總結:7、怎么保證消息不丟失8、如何處理消息被重復消費**出現消息重復的情況:****解決方法:**9、如何保…

流量分析-PhishingEmail_WriteUp

一、題目問題 問題1&#xff1a;黑客的email名稱 問題2&#xff1a;黑客向幾人發送了釣魚郵件 問題3&#xff1a;黑客傳輸的木馬文件名 問題4&#xff1a;下載并運行了木馬文件的人的email名稱和ip地址&#xff0c;用“-”連接 問題5&#xff1a;黑客用于反彈shell的主機i…

什么葡萄酒會適用這種雙重潷析方法呢?

潷析有兩個主要目的&#xff0c;一種是去除陳年或未經過濾的葡萄酒中的沉淀物。雖然沉淀物不會對你造成任何傷害&#xff0c;但當喝葡萄酒滿嘴都是葡萄沉淀物時是一件很糟糕的事。其次&#xff0c;傾析葡萄酒是可以讓葡萄酒“呼吸”與氧氣接觸的&#xff0c;氧氣可以軟化單寧&a…

二維數值型數組例題

1、單位矩陣初始化 題目描述 對用作單位矩陣的數組初始化。單位矩陣在主對角線上的值為1&#xff0c;而其他的地方的值為0&#xff0c;并且主對角線上的行、列下標是一樣的。 輸入要求 輸入一個整數n表示矩陣的行數 輸出要求 輸出n*n的單位矩陣。數據之間以空格間隔&…

LeetCode Hot100 102.二叉樹的層序遍歷

題目&#xff1a; 給你二叉樹的根節點 root &#xff0c;返回其節點值的 層序遍歷 。 &#xff08;即逐層地&#xff0c;從左到右訪問所有節點&#xff09;。 方法&#xff1a;迭代 class Solution {public List<List<Integer>> levelOrder(TreeNode root) {if …

C語言——輸入一個4位正整數,輸出其逆數。

#define _CRT_SECURE_NO_WARNINGS 1#include<stdio.h> int main() {int i,j 0;int a1,a2,a3,a4;printf("輸入一個4位正整數&#xff1a;\n");scanf("%d",&i);a1 i/1000; a2 i/100%10; a3 i/10%10; a4 i%10; printf("千位a1%d,百位a…

【JavaFx】利用JavaFx寫一個登錄頁面

以下是一個基本的JavaFX登錄頁面示例: import javafx.application.Application; import javafx.geometry.Insets; import javafx.geometry.Pos; import javafx.scene.Scene; import javafx.scene.control.Button; import javafx.scene.control.Label; import javafx.scene.co…

mysql的alter怎么使用?

在MySQL中&#xff0c;ALTER語句用于修改數據庫的表結構。下面是一些ALTER語句的示例用法&#xff1a; 1. 添加列&#xff1a; ALTER TABLE 表名 ADD 列名 數據類型; 2. 修改列的數據類型&#xff1a; ALTER TABLE 表名 MODIFY 列名 新數據類型; 3. 修…

新人工作方法論:高效率的工作

引言&#xff1a; 轉眼間入職半載&#xff0c;在工作期間曾迷茫、困惑&#xff0c;深深的感受到職場身份的轉變帶來的痛苦。痛苦的原因不僅僅包括學生時代自己悶頭做事的思維習慣與團隊合作需求的差異性&#xff0c;也包括缺乏體系的工作方法。 自己在網絡上查了一些方法論&a…

80C51單片機----數據傳送類指令

目錄 一.一般傳送指令&#xff0c;即mov指令 1.16位傳送&#xff08;僅1條&#xff09; 2.8位傳送 &#xff08;1&#xff09;目的字節為A&#xff08;累加器&#xff09; &#xff08;2&#xff09;目的字節為Rn(工作寄存器) &#xff08;3&#xff09;目的字節為direct…

超分辨率重建

意義 客觀世界的場景含有豐富多彩的信息&#xff0c;但是由于受到硬件設備的成像條件和成像方式的限制&#xff0c;難以獲得原始場景中的所有信息。而且&#xff0c;硬件設備分辨率的限制會不可避免地使圖像丟失某些高頻細節信息。在當今信息迅猛發展的時代&#xff0c;在衛星…