市面常見消息隊列中間件對比
技術名稱 | 吞吐量 /IO/并發 | 時效性(類似延遲)消息到達時間 | 可用性 | 可靠性 | 優勢 | 應用場景 |
---|---|---|---|---|---|---|
activemq | 萬級 | 高 | 高 | 高 | 簡單易學 | 中小型企業、項目 |
rabbitmq | 萬級 | 極高(微秒) | 高 | 極高 | 生態好(基本什么語言都支持)、時效性高、易學 | 適合絕大數的分布式應用 |
kafka | 10萬 QBS | 高(毫秒) | 極高 | 極高 | 吞吐量大、可靠性、可用性、強大的數據流處理能力 | 適合大規模處理數據的場景、比如構建日志手機系統、實時數據傳輸、事件流收集傳輸 |
rocketmq | 10萬 QBS | 高ms | 極高 | 極高 | 吞吐量大、可靠性、可用性、可擴展性 | 適用于金融等可靠性要求較高的場景、適合大規模的消息處理。金融、電商、大規模社交 |
pulsar | 10萬 QBS | 高ms | 高 | 極高 | 可靠性、可用性很高、新興(技術架構先進) | 適合大規模、高并發的分布式系統(云原生)適合實時分析、事件流處理、物聯網數據處理。 |
RabbitMQ?
RabbitMQ 是基于 AMQP 高級消息隊列協議的。
?實際使用可根據官方文檔的 demo 。
官方文檔:RabbitMQ Tutorials | RabbitMQ
模型
生產者:通俗就是發消息的人,比如在外賣軟件上點餐的人
消費者:通俗就是處理消息的任務,比如外賣軟件上的商家,需要根據顧客的要求制作餐
交換機:負責把消息轉發到對應的隊列中,比如有外賣的時候,系統給附近的外面小哥派單
隊列:存放消息的地方,等待消費者消費,比如商家肯定不是只做一份餐,做好的餐放在一個指定的位置等待外賣小哥來取餐
路由:轉發,就是怎么把消息從一個地方轉到另一個地方,通常加在交換機和隊列之間,比如系統指定某個范圍的外賣小哥接這單
安裝
1. 首先安裝 RabbitMQ,直接官網下載即可,如果下載速度慢,可以換個網絡,或者找找有沒有國內鏡像。(當初我下載的時候找了半天的鏡像都是版本比較老的,結果想著掛一晚上下載,結果官網突然就快了,白折騰了。)
官方網站:Installing on Windows | RabbitMQ
國內鏡像:Index of rabbitmq-server-local/v3.12.7
一路 next ,傻瓜式安裝即可
安裝之后檢查服務中是否已經運行了。
2. 安裝監控面板
在 RabbitMQ 安裝目錄下的 sbin 目錄下的CMD 輸入下面的命令
rabbitmq-plugins.bat enable rabbitmq_management
?安裝成功:
默認端口號是 5672,webUI 是 15672
在瀏覽器輸入地址打開管理界面:http://localhost:15672
默認賬號密碼是 guest
注意:1. 安裝目錄不能是中文,不能有空格等非法字符,否則頁面打不開
????????? ?2. 如果想要在遠程服務器訪問 RabbitMQ 管理面板,需要創建管理員賬號,比如在寶塔面板使用時寶塔面板提供的 admin賬號,地址就是寶塔面板的 IP?
創建賬號:access-control | RabbitMQ
入門
依賴引入
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>
單消費者和生產者
一對一的關系
1. 生產者代碼
public class SingleProducer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//創建連接ConnectionFactory factory = new ConnectionFactory();//設置了本地連接,如果修改了用戶名和密碼,需要設置/*factory.setPassword();factory.setUsername();*/factory.setHost("localhost");//建立連接、創建頻道//頻道,類似客戶端,用于調用serverConnection connection = factory.newConnection();Channel channel = connection.createChannel();//創建隊列,隊列持久化,第二份參數設置為 truechannel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";//發送消息channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));System.out.println(" [*] Waiting for messages. To exit press CTRL+C");}
}
channel 頻道:理解為操作消息隊列的 Client,通過 channel 收發消息,提供了和消息對了 server 建立通信的傳輸方法
channel.queueDeclare 方法參數:
queue:這是一個字符串參數,代表要聲明的隊列的名稱。如果隊列不存在,則會自動創建一個新的隊列。
durable:這是一個布爾值參數,表示隊列是否持久化。如果設置為true,則隊列會在服務器重啟后仍然存在;如果設置為false,則隊列在服務器重啟后會被刪除。默認值為false。
exclusive:這也是一個布爾值參數,表示隊列是否為獨占模式。如果設置為true,則只有當前連接可以訪問該隊列;如果設置為false,則其他連接也可以訪問該隊列。默認值為false。
autoDelete:這是另一個布爾值參數,表示隊列是否自動刪除。如果設置為true,則當最后一個消費者取消訂閱時,隊列將被刪除;如果設置為false,則隊列將一直存在,直到手動刪除或服務器重啟。默認值為false。
arguments:這是一個可選參數,用于設置隊列的其他屬性,比如消息的最大長度、最大優先級等。
channel.basicPublish 參數:
exchange:這是一個字符串參數,代表交換機的名稱。如果不需要使用特定的交換機,可以傳遞一個空字符串("")。交換機是RabbitMQ中用于接收生產者發送的消息并根據綁定規則路由到隊列的組件。
routingKey:這也是一個字符串參數,它指定了發布消息的隊列。無論通道綁定到哪個隊列,最終發布的消息都會包含這個指定的路由鍵。路由鍵是用來確定消息應該發送到哪個隊列的重要信息。
message:這是要發布的消息本身,通常是字節數組的形式。
properties:這是一個可選參數,用于設置消息的屬性,比如消息的優先級、過期時間等。
在使用channel.basicPublish
時,需要注意以下幾點:
exchange和routingKey不能為空:在AMQImpl類中的實現要求這兩個參數都不能為null,否則會拋出異常。
交換機類型:根據不同的需求,可以選擇不同類型的交換機,如fanout、direct或topic。每種類型的交換機都有其特定的路由規則。
非命名隊列:在某些情況下,比如日志系統,可以使用非命名隊列,這樣消費者可以接收到所有相關的日志消息,而不是特定的部分。
2. 消費者代碼
public class SingleConsumer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明隊列,同一個消息隊列參數必須一致channel.queueDeclare(QUEUE_NAME, false, false, false, null);//定義了如何處理消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};//接收、消費消息channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });System.out.println(" [*] Waiting for messages. To exit press CTRL+C");}
}
?channel.basicConsume 參數:
- queue:這是一個字符串參數,代表要消費的隊列的名稱。如果隊列不存在,則會拋出異常。
- onMessage:這是一個回調函數,當有新的消息到達時會被調用。該函數需要接收兩個參數:一個表示消息內容的Delivery對象和一個表示通道的Channel對象。
- consumerTag:這是一個可選參數,用于標識消費者。如果沒有指定,則會自動生成一個唯一的標識符。
- autoAck:這是一個布爾值參數,表示是否自動確認消息。如果設置為true,則在消息被處理后會自動發送確認信息;如果設置為false,則需要手動發送確認信息。默認值為false。
- arguments:這是一個可選參數,用于設置消費者的其他屬性,比如消息的最大長度、最大優先級等。
在使用channel.basicConsume
時,需要注意以下幾點:
- 隊列名稱:隊列名稱應該是唯一的,否則會拋出異常。
- 消息處理:在
onMessage
回調函數中,需要對消息進行處理,并根據需要發送確認信息。 - 消費者標識符:可以通過設置
consumerTag
來標識消費者,以便在后續操作中進行識別和管理。 - 消費者屬性:可以通過設置消費者的其他屬性來控制消費者的行為,比如設置消息的最大長度、最大優先級等。
多消費者
多個消費者,比如一個工廠生產商品,一個商店賣不完,分給多個商店一起賣
生產者代碼和上面一樣
public class MultiProducer {private static final String TASK_QUEUE_NAME = "multi_queue";public static void main(String[] argv) throws Exception {//創建連接ConnectionFactory factory = new ConnectionFactory();//設置本地連接factory.setHost("localhost");//創建隊列,創建頻道,類似客戶端try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//隊列持久化channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);//設置消息Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){//輸入消息String message = scanner.nextLine();//發送消息channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}
控制處理任務的積壓數,最多同時處理任務數
channel.basicQos(1); //最多處理1個
消息確認機制
ack 確認、nack 消息失敗、reject 拒絕
當消息拿走之后會有一個確認機制,保證消息成功被消費。當消費者接收消息會給一個反饋,確認消息的狀態,成功消息才會被移除。
支持配置 autoack ,建議修改為 false,根據實際情況手動確認。
//手動確認
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
//手動拒絕
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);
?消費者代碼
public class MultiConsumer {private static final String TASK_QUEUE_NAME = "multi_queue";public static void main(String[] argv) throws Exception {//創建連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");for (int i = 0; i < 2; i++) {final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();//隊列持久化,參數要一致channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//控制處理任務的積壓數,最多同時處理任務數channel.basicQos(1);//定義了如何處理消息int finalI = i;DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");try {//處理工作的邏輯System.out.println(" [x] Received '" +"消費者:" + finalI + " 消息:"+ message + "'");//睡一定時間,模擬機器處理能力有限Thread.sleep(20000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {System.out.println(" [x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};//接收消息,消費消息,開啟消息監聽channel.basicConsume(TASK_QUEUE_NAME, false , deliverCallback, consumerTag -> {});}}
}