MQ基本概念
MQ概述
MQ全稱 Message Queue([kju?])(消息隊列),是在消息的傳輸過程中保存消息的容器。多用于分布式系統之間進行通信。
(隊列是一種容器,用于存放數據的都是容器,存放消息的就是消息隊列)
分布式系統的調用:
方式一:直接調用
方式二:間接調用
A將數據存放到中間一個系統,通過中間的系統發送到B
中間系統可以成為中間件MQ
MQ是用于存放消息的中間件
被調用者叫生產者 調用者是消費者
MQ的優勢和劣勢
優勢
應用解耦:提高系統容錯性和可維護性。
異步提速:提升用戶體驗和系統吞吐量。
削峰填谷:提高系統穩定性。
應用解耦
系統的耦合性越高,容錯性就越低,可維護性就越低。
使用 MQ 使得應用間解耦,提升容錯性和可維護性
異步提速
提升用戶體驗和系統吞吐量(單位時間內處理請求的數目)。
削峰填谷(削峰)
使用了 MQ 之后,限制消費消息的速度為1000,這樣一來,高峰期產生的數據勢必會被積壓在 MQ 中,高峰就被“削”掉了,但是因為消息積壓,在高峰期過后的一段時間內,消費消息的速度還是會維持在1000,直到消費完積壓的消息,這就叫做“填谷”。
使用MQ后,可以提高系統穩定性。
劣勢
系統可用性降低
系統引入的外部依賴越多,系統穩定性越差。一旦 MQ 宕機,就會對業務造成影響。如何保證MQ的高可用?
系統復雜度提高
MQ 的加入大大增加了系統的復雜度,以前系統間是同步的遠程調用,現在是通過 MQ 進行異步調用。如何保證消息沒有被重復消費?怎么處理消息丟失情況?那么保證消息傳遞的順序性?
一致性問題
A 系統處理完業務,通過 MQ 給B、C、D三個系統發消息數據,如果 B 系統、C 系統處理成功,D 系統處理失敗。如何保證消息數據處理的一致性?
既然 MQ 有優勢也有劣勢,那么使用 MQ 需要滿足什么條件呢?
消費者-->生產者
1.生產者不需要從消費者處獲得反饋。引入消息隊列之前的直接調用,其接口的返回值應該為空,這才讓明明下層的動作還沒做,上層卻當成動作做完了繼續往后走,即所謂異步成為了可能。
2.容許短暫的不一致性。
3.確實是用了有效果。即解耦、提速、削峰這些方面的收益,超過加入MQ,管理MQ這些成本。
RabbitMQ基本介紹
2007年,Rabbit 技術公司基于 AMQP 標準開發的 RabbitMQ 1.0 發布。RabbitMQ 采用 Erlang 語言開發。Erlang語言由Ericson設計,專門為開發高并發和分布式系統的一種語言,在電信領域使用廣泛。
RabbitMQ 基礎架構
Broker 中間者 服務
procedure 和consumer都是客戶端
客戶端通過鏈接和服務端進行通信 所以需要建立起來連接 然后進行通信a
使用channel(管道)節省資源
一個rabbitmq里面有很多的虛擬機 相當于mysql里面有很多數據庫,數據庫里面有很多表,都是獨立的。
每個虛擬機里面有很多的exchange和queue 獨立分區的作用
?RabbitMQ 中的相關概念
Broker:接收和分發消息的應用,RabbitMQ Server就是 Message Broker。
Virtual host:出于多租戶和安全因素設計的,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似于網絡中的 namespace 概念。當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個用戶在自己的 vhost 創建 exchange/queue 等。
Connection:publisher/consumer 和 broker 之間的 TCP 連接。
Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection,在消息量大的時候建立 TCP Connection的開銷將是巨大的,效率也較低。Channel 是在 connection 內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創建單獨的 channel 進行通訊,AMQP method 包含了channel id 幫助客戶端和message broker 識別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級的 Connection 極大減少了操作系統建立 TCP connection 的開銷。
Exchange:message 到達 broker 的第一站,根據分發規則,匹配查詢表中的 routing key,分發消息到queue 中去。常用的類型有:
????????direct (point-to-point)
????????topic (publish-subscribe)
????????fanout (multicast)
Queue:消息最終被送到這里等待 consumer 取走
Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查詢表中,用于 message 的分發依據
RabbitMQ的6 種工作模式
RabbitMQ 提供了 6 種工作模式:
簡單模式、work queues、Publish/Subscribe 發布與訂閱模式、Routing 路由模式、Topics 主題模式、RPC 遠程調用模式(遠程調用,不太算 MQ;暫不作介紹)。
官網對應模式介紹:RabbitMQ Tutorials — RabbitMQ
RabbitMQ的安裝和配置
安裝依賴環境
在線安裝依賴環境:
yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
安裝Erlang
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
安裝RabbitMQ
#安裝依賴的包
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
#安裝rabbitmq
rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm
rpm -ivh?erlang-22.0.7-1.el7.x86_64.rpm?socat-1.7.3.2-2.el7.x86_64.rpm?rabbitmq-server-3.7.18-1.el7.noarch.rpm
啟動RabbitMQ
systemctl start rabbitmq-server # 啟動服務
systemctl stop rabbitmq-server # 停止服務
systemctl restart rabbitmq-server # 重啟服務
systemctl status rabbitmq-server #?查看狀態
開啟管理界面及配置
# 開啟管理界面
rabbitmq-plugins enable rabbitmq_management
# 修改默認配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/ebin/rabbit.app
# 比如修改密碼、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
修改之后重啟一下rabbitmq
入門實例
1.添加虛擬主機
2.添加用戶
3.重新設置權限
點擊虛擬主機設置權限
4.idea連接
項目結構搭建
mq 導入依賴
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version>
</dependency>
生產者
public static void main( String[] args ) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setHost("192.168.229.16");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/test");
// 建立連接Connection connection = connectionFactory.newConnection();
// 管道Channel channel = connection.createChannel();
// 創建隊列/** String queue 隊列的名稱* boolean durable 持久化* boolean exclusive 是否獨占* boolean autoDelete 是否自動刪除* Map<String,Object> arguments 參數* */channel.queueDeclare("test01",false,false,false,null);
// 發布消息channel.basicPublish("","test01",null,"第一次發送".getBytes());}
消費者
{public static void main( String[] args ) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setHost("192.168.229.16");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/test");
// 建立連接Connection connection = connectionFactory.newConnection();
// 管道Channel channel = connection.createChannel();
// 消費信息Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println(s);}};channel.basicConsume("test01",true,consumer);}
同時消費后消息組內為 0
RabbitMQ工作模式
Work queues工作隊列模式
Work Queues
與入門程序的簡單模式
相比,多了一個或一些消費端,多個消費端共同消費同一個隊列中的消息。
應用場景
:對于 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。
代碼
Work Queues
與入門程序的簡單模式
的代碼是幾乎一樣的;可以完全復制,并復制多一個消費者進行多個消費者同時消費消息的測試。
1.復制一個消費者
2.先運行起兩個消費者
3.在生產者中多發布幾條消息
for (int i = 0; i < 10; i++) {channel.basicPublish("","test01",null,("第"+i+"次發送").getBytes()); }
4.兩個消費者會采用輪詢的方式拿到消息
在一個隊列中如果有多個消費者,那么消費者之間對于同一個消息的關系是競爭的關系
訂閱模式類型
而在訂閱模型中,多了一個exchange角色,而且過程略有變化:
生產者發消息給交換機,交換機將消息路由分發給隊列,消費者監聽隊列接收信息
Exchange:交換機。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:
Fanout:廣播,將消息交給所有綁定到交換機的隊列
Direct:定向,把消息交給符合指定routing key 的隊列
Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
Exchange(交換機)只負責轉發消息,不具備存儲消息的能力
,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!
代碼實現
生產者更改代碼邏輯為:
// 創建交換機channel.exchangeDeclare("Exchange", BuiltinExchangeType.FANOUT,false);
// 創建隊列channel.queueDeclare("test01Ex",false,false,false,null);channel.queueDeclare("test02Ex",false,false,false,null);
// 隊列綁定交換機channel.queueBind("test01Ex","Exchange","");channel.queueBind("test02Ex","Exchange","");
// 發布消息for (int i = 0; i < 10; i++) {channel.basicPublish("Exchange","",null,("第"+i+"次發送").getBytes());}
消費者分別從兩個隊列獲取消息
Routing路由模式(Direct:定向)
隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
消息的發送方在 向Exchange發送消息時,也必須指定消息的RoutingKey
Exchange不再把消息交給每一個綁定的隊列,而是根據消息的RoutingKey進行判斷,只有隊列的RoutingKey與消息的 RoutingKey完全一致,才會接收到消息
channel.queueBind(queuename,exchangename,"error"); // errorchannel.queueBind(queuename2,exchangename,"error");// error info channel.queueBind(queuename2,exchangename,"info");// 發送消息//String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish(exchangename,"info",null,"hello".getBytes());
Topics通配符模式
Topic與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定RoutingKey的時候使用通配符!
RoutingKey一般都是有一個或多個單詞組成,多個單詞之間以”.”分割
#:匹配一個或多個詞?
*:匹配不多不少恰好1個詞???test.* test.insert
channel.queueBind(queuename,exchangename,"order.*");channel.queueBind(queuename,exchangename,"*.error");channel.queueBind(queuename2,exchangename,"*.*");//發送消息//String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish(exchangename,"goods.info",null,"hello".getBytes());
模式總結
RabbitMQ工作模式:
簡單模式 HelloWorld
一個生產者、一個消費者,不需要設置交換機(使用默認的交換機)
工作隊列模式 Work Queue
一個生產者、多個消費者(競爭關系),不需要設置交換機(使用默認的交換機)
發布訂閱模式 Publish/subscribe
需要設置類型為fanout的交換機,并且交換機和隊列進行綁定,當發送消息到交換機后,交換機會將消息發送到綁定的隊列
路由模式 Routing
需要設置類型為direct的交換機,交換機和隊列進行綁定,并且指定routing key,當發送消息到交換機后,交換機會根據routing key將消息發送到對應的隊列
通配符模式 Topic
需要設置類型為topic的交換機,交換機和隊列進行綁定,并且指定通配符方式的routing key,當發送消息到交換機后,交換機會根據routing key將消息發送到對應的隊列
SpringBoot整合Mq
在Spring項目中,可以使用Spring-Rabbit去操作RabbitMQ
尤其是在spring boot項目中只需要引入對應的amqp啟動器依賴即可,方便的使用RabbitTemplate發送消息,使用注解接收消息。
創建工程結構
添加依賴(sys-mq)
<parent><groupId>com.example</groupId><artifactId>mqdemo02</artifactId><version>0.0.1-SNAPSHOT</version></parent><artifactId>sys-mq</artifactId><packaging>pom</packaging><name>sys-mq</name><url>http://maven.apache.org</url><modules><module>mq-product</module><module>mq-consumer</module></modules><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency>
<!-- rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
sys-mq里面兩個子模塊的 application.yml
server
? ? ?port: 記得修改
spring:rabbitmq:username: rootpassword: roothost: 192.168.229.16virtual-host: /testport: 5672
server:port: 8086
生產者
@Configuration
public class RabbitMqConfig {
// 設置交換機的名稱和隊列的名字public static final String EXCHANGE_NAME="exchange_topic-test";public static final String QUEUE_NAME="queue_topic-test";public static final String QUEUE_NAME2="queue_topic-test2";
// 創建交換機 將交換機作為bean注入到spring中@Bean("topicExchange")public Exchange topicExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(false).build();}
// 隊列@Bean("topicQueue")public Queue topicQueue(){return QueueBuilder.durable(QUEUE_NAME).build();}@Bean("topicQueue2")public Queue topicQueue2(){return QueueBuilder.durable(QUEUE_NAME2).build();}
// 將隊列與交換機進行綁定@Beanpublic Binding exchangeWithQueue(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("*.test").noargs();}@Beanpublic Binding exchangeWithQueue2(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();}
}
@SpringBootTest
public class MyTest {public static final String EXCHANGE_NAME="exchange_topic-test";@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMsg() {rabbitTemplate.convertAndSend(EXCHANGE_NAME,"success.test","測試整合");}
}
消費者
@Component
public class MyListener {
// 監聽隊列的消息@RabbitListener(queues = "queue_topic-test")public void listenQueue(Message message) {byte[] body = message.getBody();System.out.println(new String(body));}
}