文章目錄
- Overview
- Totals
- Message Rates
- Queued Messages
- Nodes
- Churn Statistics
- Ports and Contexts
- Export Definitions
- Import Definitions
- Connections
- Channels
- Exchanges
- Queues
- Admin
- 他們之間的關聯
在上一篇文章中我們講到了如何在Windows中安裝Rabbitmq, 小白也能搞定!手把手教你在 Windows 上安裝 RabbitMQ我們先安裝了Erlang環境,之后又安裝了Rabbitmq,最后我們通過安裝Rabbitmq的web管理端成功驗證了,但是相信大家進入Rabbitmq的管理頁面都和作者一樣傻眼了,這么多的頁簽,這都什么意思啊???
所以這次我們來結合代碼和頁面對應的功能來入門一下Rabbitmq的基礎名詞和他們的概念。
在說Rabbitmq之前,我們需要了解一下什么是消息隊列?
消息隊列(Message Queue, MQ)是一種軟件設計模式,也是一種中間件技術,它允許應用程序通過網絡進行異步通信。在消息隊列中,消息被發送到隊列中,接收端(消費者)可以從隊列中取出這些消息進行處理。消息隊列的設計目的是為了實現異步處理、解耦服務以及提升系統的可擴展性和健壯性。
消息隊列(Message Queue, MQ)是一種應用程序間的通信方法,它允許多個組件之間通過異步方式傳遞數據。消息隊列的主要目的是解耦系統組件,提高系統的可擴展性和健壯性。在消息隊列中,生產者發送消息,消費者接收消息,中間件(通常是消息隊列服務)負責存儲和轉發消息。
那他在我們工作中應該怎么用呢?
假設我們正在開發一個電子商務網站,當用戶提交訂單時,前端向訂單服務發送一個請求,訂單服務創建訂單并將相關信息(例如訂單號、商品信息等)放入消息隊列。然后,訂單服務立即返回給前端,告訴用戶訂單已提交。此時,后臺的訂單處理服務從消息隊列中讀取新訂單的消息并開始處理訂單,如檢查庫存、扣減庫存、計算運費等。
這種方式的好處是,訂單服務可以立即響應用戶,而無需等待耗時的操作完成,從而提高了用戶體驗。同時,如果訂單處理服務暫時不可用或者處理速度較慢,也不會影響到用戶的即時體驗。
好了,這下簡潔明了的介紹了一下MQ,那么我們來認識一下Rabbitmq。
RabbitMQ 是一個在分布式系統中實現消息傳遞的應用程序,它基于 AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)標準。RabbitMQ 由 LShift 和 NetBM 公司在 2007 年開發,并于 2010 年被 Pivotal 軟件公司收購。它是用 Erlang 語言編寫的,這使得它非常擅長處理大量的并發連接。市面上有很多的MQ產品,例如kafka,RocketMQ等,而Rabbitmq也是MQ系列產品中的一個產品。
認識Rabbitmq基礎模式:
在最新的版本中Rabbitmq有七種模式
-
“Hello World!”:
- 生產者發送一條消息到隊列,消費者接收并處理這條消息。
-
Work Queues:
- 多個消費者競爭從隊列中取出并處理任務消息。
-
Publish/Subscribe (Pub/Sub):
- 生產者發送消息到交換器,所有訂閱該交換器的消費者都會接收到消息。
-
Routing:
- 生產者發送帶有 routing key 的消息,只有綁定對應 routing key 的隊列中的消費者才能接收到消息。這類似于精確匹配(等于)routing key
-
Topics:
- 生產者發送消息,消費者根據訂閱的 routing key 模式接收消息。這類似于使用通配符進行模式匹配(類似于 SQL 中的
LIKE
)
- 生產者發送消息,消費者根據訂閱的 routing key 模式接收消息。這類似于使用通配符進行模式匹配(類似于 SQL 中的
-
RPC (Remote Procedure Call):
- 客戶端發送請求消息,服務端處理后返回響應。
-
Publisher Confirms:
- 生產者發送消息后等待 RabbitMQ 的確認,以確保消息已被成功接收。
好了,說了這么多,可以介紹一下web管理端了
RabbitMQ 的 Web 管理界面提供了豐富的工具來監控和管理 RabbitMQ 服務器的狀態和配置。
Overview
- 簡介:提供了 RabbitMQ 服務器的整體概覽,包括節點狀態、內存使用情況、隊列數量、連接數、通道數等關鍵指標。
- 用途:幫助管理員快速了解 RabbitMQ 的運行狀態和整體健康狀況。
Totals
如果我們使用sdk發送一條消息的話:
@Testpublic void testSend() throws Exception {//創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//設置主機connectionFactory.setHost("127.0.0.1");//設置端口connectionFactory.setPort(5672);//設置連接的虛擬主機connectionFactory.setVirtualHost("/local");//設置用戶名和密碼connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//獲取連接對象Connection connection = connectionFactory.newConnection();//獲取連接通道Channel channel = connection.createChannel();//通道綁定消息隊列channel.queueDeclare("test-send", false, false, false, null);//發部消息channel.basicPublish("", "test-send", null, "hello rabbitmq".getBytes());channel.close();connection.close();}
我們就會在Totals下的Message rates和Queued messages的儀表盤中看到一條曲線。
Message Rates
消息的發送速率(publish rate)和消費速率(deliver rate),幫助管理員了解當前系統的消息吞吐量,并評估系統的性能和負載情況。
-
Publish
每秒發布的消息數量。這反映了生產者向 RabbitMQ 發送消息的速度。
-
Publisher Confirm
每秒確認的消息數量。這反映了生產者收到的消息確認速度,確保消息已被 RabbitMQ 成功接收。 -
Deliver (manual ack)
每秒手動確認交付給消費者的非自動確認消息數量。這反映了消費者從 RabbitMQ 接收并需要手動確認的消息速度
-
Deliver (auto ack)
每秒自動確認交付給消費者的自動確認消息數量。這反映了消費者從 RabbitMQ 接收并自動確認的消息速度。
-
Consumer Ack
每秒由消費者確認的消息數量。這反映了消費者處理并確認消息的速度。
-
Redelivered
每秒重新交付給消費者的未確認消息數量。這反映了由于消費者未確認而導致重新發送的消息數量。
-
Get (manual ack)
每秒通過
get
命令手動確認的消息數量。這反映了通過get
API 方法獲取并需要手動確認的消息速度。 -
Get (auto ack)
每秒通過
get
命令自動確認的消息數量。這反映了通過get
API 方法獲取并自動確認的消息速度。 -
Get (empty)
每秒通過
get
命令嘗試獲取消息但隊列為空的次數。這反映了嘗試從空隊列中獲取消息的頻率。 -
Unroutable (return)
每秒因無法路由而被返回給生產者的消息數量。這反映了無法被正確路由的消息數量。
-
Unroutable (drop)
每秒因無法路由而被丟棄的消息數量。這反映了無法被正確路由并被丟棄的消息數量。
Queued Messages
當前隊列中等待處理的消息總數,幫助管理員了解隊列中積壓的消息量,并判斷系統是否存在消息積壓問題及積壓程度。
- Ready:當前隊列中有幾條消息準備好被消費者消費。
- Unacked:當前消息被消費者獲取后還未確認的數量。
- Total:當前隊列中總共有幾條消息。
這個頁面下后面的配置項就不做過多介紹,包括:
Nodes
- 顯示當前 RabbitMQ 集群中所有節點的狀態信息,幫助監控每個節點的運行狀況。
Churn Statistics
- 提供集群中節點加入和離開的統計信息,反映集群的穩定性。
Ports and Contexts
- 展示 RabbitMQ 服務器使用的端口和上下文信息,幫助管理和配置網絡設置。
Export Definitions
- 允許導出當前配置到文件,方便備份或遷移配置。
Import Definitions
- 允許從文件導入配置,方便恢復或遷移配置到另一個環境。
Connections
- 簡介:顯示當前所有活動的連接及其詳細信息,如連接 ID、用戶名、來源 IP 地址、狀態等。
- 用途:便于監控和管理客戶端與 RabbitMQ 服務器之間的連接,識別潛在的問題連接。
我們把上面發送消息的代碼改造一下,讓他在關閉之前線程睡一下,我們就可以發現有一個Connection連接:
@Testpublic void testSend() throws Exception {//創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//設置主機connectionFactory.setHost("127.0.0.1");//設置端口connectionFactory.setPort(5672);//設置連接的虛擬主機connectionFactory.setVirtualHost("/local");//設置用戶名和密碼connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//獲取連接對象Connection connection = connectionFactory.newConnection();//獲取連接通道Channel channel = connection.createChannel();//通道綁定消息隊列channel.queueDeclare("test-send", false, false, false, null);//發部消息channel.basicPublish("", "test-send", null, "hello rabbitmq".getBytes());Thread.sleep(100000);channel.close();connection.close();}
所以這里主要的代碼就是這里:
//獲取連接對象Connection connection = connectionFactory.newConnection();
通過這個我們就可以在管理端看到對應的連接了
name的tab點擊進來后就可以看到我們連接的具體信息了:
包含了一個連接的詳細信息,包括連接名稱、數據傳輸速率、連接細節、通道列表和客戶端屬性
Channels
- 簡介:展示當前所有活動的通道及其相關信息,如通道 ID、所屬連接、狀態等。
- 用途:用于監控和管理客戶端與 RabbitMQ 之間的通信通道,確保消息傳遞的正常運作。
而這里就是因為上面的這行代碼:
//獲取連接通道Channel channel = connection.createChannel();
點擊Channel的tab標簽也可以看到詳細信息
包含了某個特定通道的詳細信息,包括消息速率、通道狀態、未確認的消息數、消費者數量和高級運行時指標。
在 RabbitMQ 中,連接(Connection)和通道(Channel)之間存在著密切的關系。連接是客戶端與 RabbitMQ 服務器之間的物理通信路徑,而通道是在連接之上建立的邏輯信道,用于執行各種操作,如發布消息、訂閱隊列等。
一個連接可以同時打開多個通道,每個通道都有自己的生命周期,獨立于其他通道。多個通道共享同一個連接的資源,如心跳檢測和認證信息。不同通道之間的操作相互隔離,不會互相影響。
Exchanges
- 簡介:列出所有已定義的交換器及其配置,包括類型、持久性、自動刪除等屬性。
- 用途:管理消息的路由規則,查看和配置交換器,確保消息能夠正確地路由到目標隊列。
交換機(Exchange)是一個消息分發中心,它接收生產者發布的消息并根據預設的規則將其轉發到適當的隊列。
可以看到rabbitmq中默認有七個交換機,他們的作用分別是:
(AMQP default)
: 默認交換機,用于 AMQP 協議的基本功能。amq.direct
: 直接交換機,用于簡單的一對一消息傳遞。amq.fanout
: 扇出交換機,用于廣播消息給所有綁定的隊列。amq.headers
: 基于消息頭進行路由的交換機。amq.match
: 匹配所有條件的交換機。amq.rabbitmq.trace
: 用于發送 RabbitMQ 內部跟蹤消息的交換機。amq.topic
: 主題交換機,支持模式匹配。
這些交換機在 RabbitMQ 中已經預置好,用戶可以直接使用,也可以根據需要創建自己的交換機。
具體什么是交換機,我們說我隊列再講。
Queues
- 簡介:顯示所有已聲明的隊列及其狀態信息,如消息數量、未確認消息數、消費者數量等。
- 用途:監控隊列的狀態,確保消息能夠正確地存儲和被消費者處理。
而我們打開右邊的 +/-
即可把 Consumer count
打開查看 每個隊列對應的消費者,方便我們排查問題
隊列,這是MQ最核心的一個機制了,而這也是我們的這一行代碼創建的:
//通道綁定消息隊列channel.queueDeclare("test-send", false, false, false, null);
聲明隊列
queue – 隊列的名稱
durable – 如果我們聲明一個持久隊列,則為 true(隊列將在服務器重啟后繼續存在)
exclusive – 如果我們聲明一個獨占隊列(僅限于此連接),則為 true。
autoDelete – 如果我們聲明一個 autodelete 隊列,則為 true(服務器將在不再使用時將其刪除)
arguments – 隊列的其他屬性(構造參數)
而點進name標簽我們可以看到下面的信息:
而這個界面也是我們開發過程中最頻繁打開的頁面了,這里可以看到隊列里面的總的消息,沒確認的消息,以及隊列的情況,等等,例如:
- Features: 隊列的特性,如是否持久化、是否有消費者等。
- State: 隊列的狀態,這里是 “idle”,表示沒有活動的消費者。
- Messages: 隊列中的消息總數、準備就緒的消息數、未確認的消息數、內存中的消息數、持久化的消息數以及分頁外的消息數。
- Consumer utilisation: 消費者利用率,目前為 0%。
- Message body bytes: 消息體的字節大小。
- Process memory: 隊列占用的進程內存大小。
以及Consumers:哪些消費者連接到了這個隊列。例如我們使用sdk監聽這個隊列:
package com.masiyi.springbootrabbitmq;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import java.io.IOException;public class Consumer {public static void main(String[] args) throws Exception {//創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//設置主機connectionFactory.setHost("127.0.0.1");//設置端口connectionFactory.setPort(5672);//設置連接的虛擬主機connectionFactory.setVirtualHost("/local");//設置用戶名和密碼connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//獲取連接對象Connection connection = connectionFactory.newConnection();//獲取連接通道Channel channel = connection.createChannel();//通道綁定消息隊列channel.queueDeclare("test-send", false, false, false, null);channel.basicConsume("test-send", true, new DefaultConsumer(channel) {/*** @param body 接受到的消息*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}});//如果想一直監聽就不要關閉通道
// channel.close();
// connection.close();}
}
可以看到我們里面的6條消息瞬間就被消費掉了
而我們的頁面也會變成這樣:
我們可以看到消費者具體的情況
Admin
- 簡介:提供了一系列高級管理功能,如用戶管理、權限配置、節點配置等。
- 用途:用于配置 RabbitMQ 的高級設置,包括用戶賬號、權限分配、集群配置等,確保 RabbitMQ 的安全性和可配置性。
這塊主要顯示的是跟賬號有關的內容,點擊Name的頁簽,可以看到guest用戶具體的信息,可以改權限,設置密碼,刪除用戶啥的
而在下面的添加用戶功能則可以添加新的用戶
但是大家有沒有注意到右上角有一個Virtual host選項,虛擬主機(Virtual Host)是一種邏輯隔離機制,它可以讓你在一個 RabbitMQ 實例中同時運行多個獨立的應用程序或服務。虛擬主機允許你將相關聯的交換機、隊列和綁定組合在一起,形成一個邏輯單元,從而實現多租戶環境。每個虛擬主機有自己的權限設置,因此不同虛擬主機內的實體彼此隔離,避免相互影響。
而我們在連接到 RabbitMQ 服務器的客戶端時,必須指定要使用的虛擬主機。例如我們的代碼在這里就會指定虛擬機
//設置連接的虛擬主機connectionFactory.setVirtualHost("/local");
他們之間的關聯
在頁面上我們都介紹完他們的作用了,那么我們看到的東西:Connection、Channel、Virtual Host、Exchange 和 Queue 他們之間的作用和關系是什么樣的呢?
-
Connection:連接是指客戶端(發送或接收消息的應用程序)與 RabbitMQ 服務器之間的物理連接。通常,一個應用程序會創建一個或多個連接來與 RabbitMQ 進行交互。連接是 TCP 連接,負責在客戶端和服務器之間傳輸數據。
-
Channel:通道是在連接之上建立的邏輯信道,用于執行實際的操作,如發布消息、訂閱隊列等。通道是輕量級的,比連接更高效,而且可以復用連接。大多數 RabbitMQ 操作都是通過通道完成的。
-
Virtual Host:虛擬主機是一種邏輯隔離,它將一組交換機、隊列和綁定組成一個集合,使得多個應用程序可以在同一臺 RabbitMQ 服務器上運行而不會互相干擾。每個連接都需要指定一個虛擬主機,這樣它就能訪問相應的交換機和隊列。
-
Exchange:交換機是消息路由的核心組件。它接收來自生產者的消息,并根據預先定義的路由規則將消息轉發到一個或多個隊列。交換機有多種類型,如 direct、fanout、topic 和 headers,每種類型都對應著不同的路由策略。
-
Queue:隊列是消息的容器,用來存儲消息直到消費者將其取走。隊列是無序的,一次只有一個消費者可以讀取消息。隊列可以是持久性的,即使 RabbitMQ 服務器重啟也能保留消息。
emem。。。我相信大家已經聽蒙了,下面給大家舉一個例子:如果大家熟悉 MySQL 數據庫,那么可以將 RabbitMQ 中的 Connection、Channel、Virtual Host、Exchange 和 Queue 類比作以下數據庫概念:
- Connection:類似于數據庫連接(Database Connection),它是客戶端與數據庫服務器之間的物理連接,用于發送 SQL 查詢和接收響應。
- Channel:類似于數據庫事務(Transaction),它是一組操作的邏輯單位,在一個事務中執行的所有操作要么全部成功,要么全部失敗。在 RabbitMQ 中,通道也是輕量級的,用于執行一系列操作,如發送和接收消息。
- Virtual Host:類似于數據庫中的數據庫(Database),它是一個邏輯隔離的集合,包含了多個表(Queue)和視圖(Exchange)。每個數據庫都有自己的用戶權限和設置。
- Exchange:類似于數據庫中的視圖(View),它根據預定義的規則展示數據,但并不存儲數據本身。在 RabbitMQ 中,交換機接收消息并根據路由規則將消息轉發到一個或多個隊列。
- Queue:類似于數據庫中的表(Table),它存儲了數據記錄。隊列存儲消息,等待消費者來讀取消息。
至此,我們知道了rabbitmq的web管理端的相關頁面的操作,也結合代碼講了rabbitmq的核心名詞,相信大家對rabbitmq已經有個概念了。