MQ基本概念
1. MQ概述
MQ全稱 Message Queue([kju?])(消息隊列),是在消息的傳輸過程中保存消息的容器。多用于分布式系統之間進行通信。
(隊列是一種容器,用于存放數據的都是容器,存放消息的就是消息隊列)
分布式系統的調用:
方式一:直接調用
order
product
account
方式二:間接調用
A將數據存放到中間一個系統,通過中間的系統發送到B
中間系統可以成為中間件MQ
生產者-》中間件《--消費者
MQ是用于存放消息的中間件
被調用者叫生產者 調用者是消費者(微服務中說過)
2. MQ的優勢和劣勢
1 優勢
應用解耦:提高系統容錯性和可維護性。
異步提速:提升用戶體驗和系統吞吐量。
削峰填谷:提高系統穩定性。
應用解耦
系統的耦合性越高,容錯性就越低,可維護性就越低。
例:訂單系統 的時候 依賴于庫存系統 支付系統 物流系統 當庫存系統發生異常,就有可能導致訂單系統發生異常 下單失敗
追加系統 x 就只能修改訂單系統更改代碼 導致維護性比較低
使用 MQ 使得應用間解耦,提升容錯性和可維護性
庫存系統宕機訂單系統影響不大,因為消息已經發送到mq了當庫存系統恢復的時候就可以正常使用了。
追加系統的時候跟訂單系統無關
已將數據發送到MQ了,直接從MQ中拿就行了,無需更改訂單中的代碼,可維護性提高
異步提速
一個下單操作耗時:20 + 300 + 300 + 300 = 920ms
用戶點擊完下單按鈕后,需要等待920ms才能得到下單響應,太慢!
用戶點擊完下單按鈕后,只需等待25ms就能得到下單響應 (20 + 5 = 25ms)。
提升用戶體驗和系統吞吐量(單位時間內處理請求的數目)。
以前920ms處理一個請求,現在25ms處理一個請求,系統的吞吐量(單位時間內訪問量)增加
削峰填谷(削峰)
使用了 MQ 之后,限制消費消息的速度為1000,這樣一來,高峰期產生的數據勢必會被積壓在 MQ 中,高峰就被“削”掉了,但是因為消息積壓,在高峰期過后的一段時間內,消費消息的速度還是會維持在1000,直到消費完積壓的消息,這就叫做“填谷”。
使用MQ后,可以提高系統穩定性。
2 劣勢
系統可用性降低
系統引入的外部依賴越多,系統穩定性越差。一旦 MQ 宕機,就會對業務造成影響。如何保證MQ的高可用?
系統復雜度提高
MQ 的加入大大增加了系統的復雜度,以前系統間是同步的遠程調用,現在是通過 MQ 進行異步調用。如何保證消息沒有被重復消費?怎么處理消息丟失情況?那么保證消息傳遞的順序性?
一致性問題
A 系統處理完業務,通過 MQ 給B、C、D三個系統發消息數據,如果 B 系統、C 系統處理成功,D 系統處理失敗。如何保證消息數據處理的一致性?
既然 MQ 有優勢也有劣勢,那么使用 MQ 需要滿足什么條件呢?
消費者--》生產者
- 生產者不需要從消費者處獲得反饋。引入消息隊列之前的直接調用,其接口的返回值應該為空,這才讓明明下層的動作還沒做,上層卻當成動作做完了繼續往后走,即所謂異步成為了可能。
訂單->庫存
- 容許短暫的不一致性。
- 確實是用了有效果。即解耦、提速、削峰這些方面的收益,超過加入MQ,管理MQ這些成本。
3. 常見的MQ產品
目前業界有很多的 MQ 產品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充當消息隊列的案例,而這些消息隊列產品,各有側重,在實際選型時,需要結合自身需求及 MQ 產品特征,綜合考慮。
RabbitMQ基本介紹
AMQP,即 Advanced Message Queuing Protocol(英[?pr??t?k?l])(高級消息隊列協議),是一個網絡協議,是應用層協議的一個開放標準,為面向消息的中間件設計。基于此協議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產品,不同的開發語言等條件的限制。2006年,AMQP 規范發布。類比HTTP。
消息隊列中間件
exchange 交換機 分發消息 分發到不同的容器 queue 通過路由來處理
queue 容器
routes 路由
生產者 發布消息到exchange exchange 通過不同的路由規則發布/路由 給不同的queue 進行存儲 cunsumer通過隊列去監聽拿到消息進行消費
2007年,Rabbit 技術公司基于 AMQP 標準開發的 RabbitMQ 1.0 發布。RabbitMQ 采用 Erlang 語言開發。Erlang 語言由 Ericson 設計,專門為開發高并發和分布式系統的一種語言,在電信領域使用廣泛。
1. RabbitMQ 基礎架構
Broker 中間者 服務
procedure 和consumer都是客戶端
客戶端通過鏈接和服務端進行通信 所以需要建立起來連接 然后進行通信a
使用channel(管道)節省資源
一個rabbitmq里面有很多的虛擬機 相當于mysql里面有很多數據庫,數據庫里面有很多表,都是獨立的。
每個虛擬機里面有很多的exchange和queue 獨立分區的作用
2. RabbitMQ 中的相關概念
Broker:接收和分發消息的應用,RabbitMQ Server就是 Message Broker。
Virtual host:出于多租戶和安全因素設計的,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似于網絡中的 namespace 概念。當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個用戶在自己的 vhost 創建 exchange/queue 等。
Connection:publisher/consumer 和 broker 之間的 TCP 連接。
Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection,在消息量大的時候建立 TCP Connection的開銷將是巨大的,效率也較低。Channel 是在 connection 內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創建單獨的 channel 進行通訊,AMQP method 包含了channel id 幫助客戶端和message broker 識別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級的 Connection 極大減少了操作系統建立 TCP connection 的開銷。
Exchange:message 到達 broker 的第一站,根據分發規則,匹配查詢表中的 routing key,分發消息到queue 中去。常用的類型有:
direct (point-to-point)
topic (publish-subscribe)
fanout (multicast)
Queue:消息最終被送到這里等待 consumer 取走
Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查詢表中,用于 message 的分發依據
3. RabbitMQ的6 種工作模式
RabbitMQ 提供了 6 種工作模式:
簡單模式、work queues、Publish/Subscribe 發布與訂閱模式、Routing 路由模式、Topics 主題模式、RPC 遠程調用模式(遠程調用,不太算 MQ;暫不作介紹)。==
官網對應模式介紹:RabbitMQ Tutorials — RabbitMQ
4. AMQP 和 JMS
MQ是消息通信的模型;實現MQ的大致有兩種主流方式:AMQP、JMS。
AMQP
AMQP是一種協議,更準確的說是一種binary wire-level protocol(鏈接協議)。這是其和JMS的本質差別,AMQP不從API層進行限定,而是直接定義網絡交換的數據格式。
JMS
JMS即Java消息服務(Java Message Service)應用程序接口,
是一個Java平臺中關于面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。
(規定了消息客戶端的一套api的東西,rabbitmq沒有遵循規則)
JMS 是 JavaEE 規范中的一種,類比JDBC。
AMQP與 JMS 區別
JMS是定義了統一的接口,來對消息操作進行統一;AMQP是通過規定協議來統一數據交互的格式。
JMS限定了必須使用Java語言;AMQP只是協議,不規定實現方式,因此是跨語言的。
JMS規定了兩種消息模式;而AMQP的消息模式更加豐富
再談市場上常見的消息隊列
ActiveMQ:基于JMS
ZeroMQ:基于C語言開發
RabbitMQ:基于AMQP協議,erlang語言開發,穩定性好
RocketMQ:基于JMS,阿里巴巴產品
Kafka:類似MQ的產品;分布式消息系統,高吞吐量。
RabbitMQ的安裝和配置
安裝依賴環境
在線安裝依賴環境:
yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
安裝Erlang
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
安裝RabbitMQ
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
安裝rabbitmq
rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm
啟動RabbitMQ
systemctl start rabbitmq-server # 啟動服務
systemctl stop rabbitmq-server # 停止服務
systemctl restart rabbitmq-server # 重啟服務
systemctl status rabbitmq-server #查看狀態
配置RabbitMQ
開啟管理界面
rabbitmq-plugins enable rabbitmq_management
修改默認配置信息
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/ebin/rabbit.app?
修改后重啟
打開客戶端
192.168.221.37:15672
使用guest/guest登錄之后出現如下即為安裝成功
操作RabbitMQ
1)添加虛擬主機
2)添加用戶
3)給用戶分配虛擬主機
先清除
再分配
查看結果
分配成功
重新使用新用戶登錄
4)在MQ中添加發布消息
發送成功后
消費信息
搭建工程
添加jar包
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
添加消費者
public class MyTest {@Testpublic void aaa() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//連接mqconnectionFactory.setUsername("賬號");connectionFactory.setPassword("密碼");connectionFactory.setHost("IP地址");connectionFactory.setPort(端口號);connectionFactory.setVirtualHost("/***");//建立連接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("mq:"+s);}};channel.basicConsume("test",true,consumer);} }
結果
添加生產者
public class MyTest {@Testpublic void bbb() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//連接mqconnectionFactory.setUsername("賬號"); connectionFactory.setPassword("密碼");connectionFactory.setHost("IP地址");connectionFactory.setPort(端口號); connectionFactory.setVirtualHost("/***");//建立連接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();/*** String queue, 隊列的名稱* boolean durable, 持久化* boolean exclusive, 是否獨占* boolean autoDelete, 受否自動刪除* Map<String, Object> arguments 參數*/channel.queueDeclare("test",false,false,false,null);// 創建隊列channel.basicPublish("","test",null,"hello mq1".getBytes());}}
發消息
RabbitMQ工作模式
官網對應模式介紹:RabbitMQ Tutorials — RabbitMQ
Work queues工作隊列模式
?模式說明
Work Queues
與入門程序的簡單模式
相比,多了一個或一些消費端,多個消費端共同消費同一個隊列中的消息。
應用場景
:對于 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。
創建兩個消費者
使用生產者發布消息
結果
訂閱模式
添加交換機
創建隊列
交換機綁定隊列
在交換機中發布消息
關聯隊列都會發布
修改消費者監聽不同隊列 測試
創建兩個消費者
創建交換機生產者
public class MyTestEx {@Testpublic void bbb() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//連接mqconnectionFactory.setUsername("賬號");connectionFactory.setPassword("密碼");connectionFactory.setHost("IP地址");connectionFactory.setPort(端口號);connectionFactory.setVirtualHost("/***");//建立連接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//創建交換機channel.exchangeDeclare("myex1", BuiltinExchangeType.FANOUT,false);//創建隊列/*** String queue, 隊列的名稱* boolean durable, 持久化* boolean exclusive, 是否獨占* boolean autoDelete, 受否自動刪除* Map<String, Object> arguments 參數*/channel.queueBind("testmyex1",false,false,false,null);channel.queueBind("testmyex2",false,false,false,null);//綁定交換機channel.exchangeBind("testmyex1","myex1","");channel.exchangeBind("testmyex2","myex1","");channel.basicPublish("myex1","",null,"testex".getBytes());} }