RabbitMQ
簡介:RabbitMQ 是一種開源的消息隊列中間件,你可以把它想象成一個高效的“郵局”。它專門負責在不同應用程序之間傳遞消息,讓系統各部分能松耦合地協作
優勢:
-
異步處理:比如用戶注冊后,主程序將發送驗證郵件的任務扔進隊列就立刻返回,郵件服務后續慢慢處理,避免用戶等待。
-
削峰填谷:突然的流量高峰(如秒殺活動)會被隊列緩沖,避免服務器被壓垮。
-
智能路由:通過交換機(Exchange)的四種路由策略(直連/主題/廣播/頭匹配),實現精準投遞,比如將VIP用戶的訂單定向到專屬客服隊列。
-
故障恢復:支持消息持久化和確認機制,即使服務器宕機,消息也不會丟失。
同步VS異步(以實際開發為例子進行說明):
-
同步業務功能的耦合度高,異步耦合度低,可以達到解耦的效果
-
同步業務流程響應的時間長,異步響應的時間短
-
同步模式會導致并發壓力向后進行傳遞,異步可以削峰限流
-
同步模式下系統結構彈性不足,異步模式下系統彈性強,可擴展性強
注意:在實際開發中并不是說異步模式就完全優與同步模式,在一定的場景下使用異步模式是優化系統的架構,但是在一些其它的業務場景下需要同步來保證流程的完整性。所以說異步還是同步要跟據具體業務進行選擇。
底層實現:
-
AMQP(Advanced Message Queuing Protocol):AMQP 是 跨語言的通用消息協議,適合異構系統間的復雜通信。
-
JMS(Java Message Service):JMS是 Java 專屬的 API 標準,適合統一 Java 生態的消息處理。
主流的MQ產品對比
對比項 | RabbitMQ | ActiveMQ | RocketMQ | Kafka |
---|---|---|---|---|
開發語言 | Erlang | Java | Java | Scala/Java |
維護方 | Rabbit(公司) | Apache(社區) | 阿里(公司) | Apache(社區) |
核心機制 | 基于 AMQP 協議的生產者-消費者模型 | 基于 JMS 的消息傳遞模型 | 分布式消息隊列(Topic + Tag 分類) | 分布式流處理平臺(發布-訂閱模型) |
協議支持 | AMQP、STOMP、MQTT、HTTP 插件 | AMQP、STOMP、OpenWire、REST、MQTT | 自定義協議(支持 TCP/HTTP) | 自定義協議(社區封裝 HTTP 支持) |
客戶端語言 | 官方:Erlang、Java、Ruby;社區:多語言 | Java、C/C++、.NET、Python、PHP | 官方:Java;社區:C++(不成熟) | 官方:Java;社區:Python、Go、Rust 等 |
可用性 | 鏡像隊列、仲裁隊列(Quorum Queue) | 主從復制 | 主從復制 | 分區(Partition) + 副本(Replica) |
單機吞吐量 | 約 10 萬/秒 | 約 5 萬/秒 | 10 萬+/秒(阿里雙十一驗證) | 百萬級/秒 |
消息延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒以內 |
消息確認 | 完整 ACK/NACK 機制 | 支持 JMS ACK 模式 | 基于數據庫持久化的消息表 | 基于副本同步和 ISR 機制 |
功能特性 | ? 低延遲、高并發、管理界面豐富 | ? 老牌穩定、支持 JMS 規范 | ? 高吞吐、阿里生態集成、事務消息 | ? 高吞吐、流處理、大數據場景專用 |
適用場景 | 復雜路由、實時業務(如支付訂單) | 傳統企業級系統(Java 生態) | 電商高并發場景(如秒殺、訂單) | 日志采集、實時分析、流式計算 |
原生RabbitMQAPI調用:
//=========================================發送消息的代碼示例=================================
public class Producer { ?public static void main(String[] args) throws Exception { ?// 創建連接工廠 ?ConnectionFactory connectionFactory = new ConnectionFactory(); ?// 設置主機地址 ?connectionFactory.setHost("192.168.200.100"); ?// 設置連接端口號:默認為 5672connectionFactory.setPort(5672);// 虛擬主機名稱:默認為 /connectionFactory.setVirtualHost("/");// 設置連接用戶名;默認為guest ?connectionFactory.setUsername("guest");// 設置連接密碼;默認為guest ?connectionFactory.setPassword("123456");// 創建連接 ?Connection connection = connectionFactory.newConnection(); ?// 創建頻道 ?Channel channel = connection.createChannel(); ?// 聲明(創建)隊列 ?// queue ? ? 參數1:隊列名稱 ?// durable ? 參數2:是否定義持久化隊列,當 MQ 重啟之后還在 ?// exclusive 參數3:是否獨占本次連接。若獨占,只能有一個消費者監聽這個隊列且 Connection 關閉時刪除這個隊列 ?// autoDelete 參數4:是否在不使用的時候自動刪除隊列,也就是在沒有Consumer時自動刪除 ?// arguments 參數5:隊列其它參數 ?channel.queueDeclare("simple_queue", true, false, false, null); ?// 要發送的信息 ?String message = "你好;小兔子!"; ?// 參數1:交換機名稱,如果沒有指定則使用默認Default Exchange ?// 參數2:路由key,簡單模式可以傳遞隊列名稱 ?// 參數3:配置信息 ?// 參數4:消息內容 ?channel.basicPublish("", "simple_queue", null, message.getBytes()); ?System.out.println("已發送消息:" + message); ?// 關閉資源 ?channel.close(); ?connection.close(); ?} ?}
//=========================================接收消息的代碼示例=================================
public class Consumer { ?public static void main(String[] args) throws Exception { ?// 1.創建連接工廠 ?ConnectionFactory factory = new ConnectionFactory(); ?// 2. 設置參數 ?factory.setHost("192.168.200.100"); ?factory.setPort(5672); ?factory.setVirtualHost("/"); ?factory.setUsername("guest");factory.setPassword("123456"); ?// 3. 創建連接 Connection ? ? ? ?Connection connection = factory.newConnection(); ?// 4. 創建Channel ?Channel channel = connection.createChannel(); ?// 5. 創建隊列 ?// 如果沒有一個名字叫simple_queue的隊列,則會創建該隊列,如果有則不會創建 ?// 參數1. queue:隊列名稱 ?// 參數2. durable:是否持久化。如果持久化,則當MQ重啟之后還在 ?// 參數3. exclusive:是否獨占。 ?// 參數4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉 ?// 參數5. arguments:其它參數。 ?channel.queueDeclare("simple_queue",true,false,false,null); ?// 接收消息 ?DefaultConsumer consumer = new DefaultConsumer(channel){ ?// 回調方法,當收到消息后,會自動執行該方法 ?// 參數1. consumerTag:標識 ?// 參數2. envelope:獲取一些信息,交換機,路由key... ?// 參數3. properties:配置信息 ?// 參數4. body:數據 ?@Override ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ?System.out.println("consumerTag:"+consumerTag); ?System.out.println("Exchange:"+envelope.getExchange()); ?System.out.println("RoutingKey:"+envelope.getRoutingKey()); ?System.out.println("properties:"+properties); ?System.out.println("body:"+new String(body)); ?} ?}; ?// 參數1. queue:隊列名稱 ?// 參數2. autoAck:是否自動確認,類似咱們發短信,發送成功會收到一個確認消息 ?// 參數3. callback:回調對象 ?// 消費者類似一個監聽程序,主要是用來監聽消息 ?channel.basicConsume("simple_queue",true,consumer); ?} ?}
封裝RabbitMQ工具類:
public class ConnectionUtil { ?//跟據自己服務的具體需求進行相關ip+端口的配置(動態變化)public static final String HOST_ADDRESS = "192.168.200.100"; ?public static Connection getConnection() throws Exception { ?// 定義連接工廠 ?ConnectionFactory factory = new ConnectionFactory(); ?// 設置服務地址 ?factory.setHost(HOST_ADDRESS); ?// 端口 ?factory.setPort(5672); ?//設置賬號信息,用戶名、密碼、vhost ?factory.setVirtualHost("/"); ?factory.setUsername("guest"); ?factory.setPassword("123456"); ?// 通過工程獲取連接 ?Connection connection = factory.newConnection(); ?return connection; ?} ?public static void main(String[] args) throws Exception { ?Connection con = ConnectionUtil.getConnection(); ?// amqp://guest@192.168.200.100:5672/ ?System.out.println(con); ?con.close(); ?} ?}
RabbitMQ體系結構:
-
生產者(Producer):發送消息到 RabbitMQ 的應用程序。
-
消費者(Consumer):從隊列中接收并處理消息的應用程序。
-
交換機(Exchange):接收生產者消息,根據類型和路由規則將消息分發到隊列。
-
四大類型:
-
類型 | 路由規則 | 典型場景 |
---|---|---|
Direct | 精確匹配 Routing Key (如 order.pay ) | 一對一精準投遞(如支付成功通知) |
Topic | 通配符匹配(如 order.* 或 *.pay ) | 多服務訂閱同一類消息(如日志分類) |
Fanout | 廣播到所有綁定隊列(無視 Routing Key ) | 群發通知(如系統公告) |
Headers | 通過消息頭(Headers)鍵值對匹配 | 復雜條件路由(需靈活匹配時) |
-
隊列(Queue):定義交換機與隊列之間的映射關系,指定路由規則
-
信道(Channel):復用 TCP 連接的輕量級虛擬鏈路,減少資源消耗。
-
虛擬主機(Virtual Host):邏輯隔離的“消息域”,不同 vhost 間資源(交換機、隊列)互不干擾。
總結:RabbitMQ 通過 生產者-交換機-隊列-消費者 模型實現異步通信,核心在于靈活的路由規則(交換機類型)和可靠性保障(持久化、確認機制)。
基礎篇
工作模式
-
簡單模式:最簡單的消息隊列模型,包含一個生產者、一個隊列和一個消費者。生產者直接將消息發送到隊列,消費者從隊列中接收消息。
-
工作隊列模式(Work Queues):使用默認的交換機,一個隊列對應多個消費者,消息按輪詢(Round-Robin)或公平分發(Fair Dispatch)分配給消費者,避免單個消費者過載。
-
發布/訂閱模式(Publish/Subscribe):使用 扇形交換機(Fanout Exchange),生產者將消息發送到交換機,交換機將消息廣播到所有綁定的隊列,每個消費者獨立接收一份消息副本。
-
路由模式(Routing):使用 直接交換機(Direct Exchange),生產者指定消息的 路由鍵(Routing Key),交換機根據路由鍵將消息精確匹配到綁定的隊列。
-
主題模式(Topics):使用 主題交換機(Topic Exchange),路由鍵支持通配符匹配(
*
匹配一個詞,#
匹配多個詞)。例如路由鍵stock.usd.nyse
可被*.nyse
或stock.#
訂閱。 -
遠程過程調用(RPC):通過消息隊列實現遠程調用。客戶端發送請求消息時附帶回調隊列和唯一ID,服務端處理完成后將響應發送到回調隊列,客戶端通過ID匹配響應。
-
發布者確認(Publisher Confirms):生產者發送消息后,RabbitMQ會異步返回確認(ACK)或未確認(NACK),確保消息成功到達交換機或隊列。
工作隊列模式(Work Queues)
-
并行處理能力
-
多消費者競爭消費:一個隊列可綁定多個消費者,消息被并發處理,消息只會被其中的一個消費者拿到。
-
橫向擴展:通過增加消費者數量,輕松應對高并發或大流量場景。
-
負載均衡機制
-
輪詢分發(Round-Robin):默認策略,均攤消息到所有消費者,簡單但可能因消費者性能差異導致負載不均。
-
公平分發(Fair Dispatch):通過
prefetch_count
限制消費者同時處理的消息數,確保“能者多勞”,避免慢消費者堆積任務。
-
消息可靠性保障
-
ACK確認機制:消費者處理完成后需手動發送確認(ACK),若處理失敗或消費者宕機,消息自動重新入隊,確保任務不丟失。
-
持久化支持:隊列和消息均可設置為持久化(
durable=true
),防止RabbitMQ服務重啟后數據丟失。
//================================生產端代碼循環發送10次消息================================ ?
public class Producer { ?public static final String QUEUE_NAME = "work_queue"; ?public static void main(String[] args) throws Exception { ?Connection connection = ConnectionUtil.getConnection(); ?Channel channel = connection.createChannel(); ?channel.queueDeclare(QUEUE_NAME,true,false,false,null); ?for (int i = 1; i <= 10; i++) { ?String body = i+"hello rabbitmq~~~"; ?channel.basicPublish("",QUEUE_NAME,null,body.getBytes()); ?} ?channel.close(); ?connection.close(); ?} ?}
//================================消費端代碼競爭消息================================public class Consumer1/2 { ?static final String QUEUE_NAME = "work_queue"; ?public static void main(String[] args) throws Exception { ?Connection connection = ConnectionUtil.getConnection(); ?Channel channel = connection.createChannel(); ?channel.queueDeclare(QUEUE_NAME,true,false,false,null); ?Consumer consumer = new DefaultConsumer(channel){ ?@Override ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ?System.out.println("Consumer1 body:"+new String(body)); ?} ?}; ?channel.basicConsume(QUEUE_NAME,true,consumer); ?} ?}
發布/訂閱模式(Publish/Subscribe)
-
消息廣播機制
-
扇形交換機(Fanout Exchange)驅動:生產者將消息發送到交換機,交換機會將消息無條件廣播到所有與其綁定的隊列,每個隊列的消費者都能收到一份消息副本。
-
一對多分發:一條消息可被多個消費者同時接收,適用于需要廣泛觸達的場景(如系統通知、日志收集)。
//====================================生產者代碼====================================
public class Producer { ?public static void main(String[] args) throws Exception { ?// 1、獲取連接 ?Connection connection = ConnectionUtil.getConnection(); ?// 2、創建頻道 ?Channel channel = connection.createChannel(); ?// 參數1. exchange:交換機名稱 ?// 參數2. type:交換機類型 ?// ? ? DIRECT("direct"):定向 ?// ? ? FANOUT("fanout"):扇形(廣播),發送消息到每一個與之綁定隊列。 ?// ? ? TOPIC("topic"):通配符的方式 ?// ? ? HEADERS("headers"):參數匹配 ?// 參數3. durable:是否持久化 ?// 參數4. autoDelete:自動刪除 ?// 參數5. internal:內部使用。一般false ?// 參數6. arguments:其它參數 ?String exchangeName = "test_fanout"; ?// 3、創建交換機 ?channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); ?// 4、創建隊列 ?String queue1Name = "test_fanout_queue1"; ?String queue2Name = "test_fanout_queue2"; ?channel.queueDeclare(queue1Name,true,false,false,null); ?channel.queueDeclare(queue2Name,true,false,false,null); ?// 5、綁定隊列和交換機 ?// 參數1. queue:隊列名稱 ?// 參數2. exchange:交換機名稱 ?// 參數3. routingKey:路由鍵,綁定規則 ?// ? ? 如果交換機的類型為fanout,routingKey設置為"" ?channel.queueBind(queue1Name,exchangeName,""); ?channel.queueBind(queue2Name,exchangeName,""); ?String body = "日志信息:張三調用了findAll方法...日志級別:info..."; ?// 6、發送消息 ?channel.basicPublish(exchangeName,"",null,body.getBytes()); ?// 7、釋放資源 ?channel.close(); ?connection.close(); ?} ?}
//====================================消費者1代碼===================================
public class Consumer1 { ?public static void main(String[] args) throws Exception { ?Connection connection = ConnectionUtil.getConnection(); ?Channel channel = connection.createChannel(); ?String queue1Name = "test_fanout_queue1"; ?channel.queueDeclare(queue1Name,true,false,false,null); ?Consumer consumer = new DefaultConsumer(channel){ ?@Override ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ?System.out.println("body:"+new String(body)); ?System.out.println("隊列 1 消費者 1 將日志信息打印到控制臺....."); ?} ?}; ?channel.basicConsume(queue1Name,true,consumer); ?} ?}
?
//====================================消費者2代碼===================================
public class Consumer2 { ?public static void main(String[] args) throws Exception { ?Connection connection = ConnectionUtil.getConnection(); ?Channel channel = connection.createChannel(); ?String queue2Name = "test_fanout_queue2"; ?channel.queueDeclare(queue2Name,true,false,false,null); ?Consumer consumer = new DefaultConsumer(channel){ ?@Override ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ?System.out.println("body:"+new String(body)); ?System.out.println("隊列 2 消費者 2 將日志信息打印到控制臺....."); ?} ?}; ?channel.basicConsume(queue2Name,true,consumer); ?} ?}
路由模式(Routing)
-
基于路由鍵的精確分發
-
直接交換機(Direct Exchange)驅動:生產者發送消息時需指定路由鍵(Routing Key),交換機會將消息精確匹配到綁定相同路由鍵的隊列。
-
條件性路由:僅當隊列綁定的路由鍵與消息的路由鍵完全一致時,消息才會被投遞,實現按條件分發。
-
靈活的消息過濾
-
多隊列綁定不同路由鍵:可為同一交換機綁定多個隊列,每個隊列聲明不同的路由鍵(例如
error
、info
、warning
),實現消息分類處理。 -
生產者可控性:生產者通過指定路由鍵決定消息的目標隊列,無需消費者干預。
//================================生產者代碼======================================== ?
public class Producer { ?public static void main(String[] args) throws Exception { ?Connection connection = ConnectionUtil.getConnection(); ?Channel channel = connection.createChannel(); ?String exchangeName = "test_direct"; ?// 創建交換機 ?channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null); ?// 創建隊列 ?String queue1Name = "test_direct_queue1"; ?String queue2Name = "test_direct_queue2"; ?// 聲明(創建)隊列 ?channel.queueDeclare(queue1Name,true,false,false,null); ?channel.queueDeclare(queue2Name,true,false,false,null); ?// 隊列綁定交換機 ?// 隊列1綁定error ?channel.queueBind(queue1Name,exchangeName,"error"); ?// 隊列2綁定info error warning ?channel.queueBind(queue2Name,exchangeName,"info"); ?channel.queueBind(queue2Name,exchangeName,"error"); ?channel.queueBind(queue2Name,exchangeName,"warning"); ?String message = "日志信息:張三調用了delete方法.錯誤了,日志級別warning"; ?// 發送消息 ?channel.basicPublish(exchangeName,"warning",null,message.getBytes()); ?System.out.println(message); ?// 釋放資源 ?channel.close(); ?connection.close(); ?} ?}
//===============================消費者1代碼========================================
public class Consumer1 { ?public static void main(String[] args) throws Exception { ?Connection connection = ConnectionUtil.getConnection(); ?Channel channel = connection.createChannel(); ?String queue1Name = "test_direct_queue1"; ?channel.queueDeclare(queue1Name,true,false,false,null); ?Consumer consumer = new DefaultConsumer(channel){ ?@Override ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ?System.out.println("body:"+new String(body)); ?System.out.println("Consumer1 將日志信息打印到控制臺....."); ?} ?}; ?channel.basicConsume(queue1Name,true,consumer); ?} ?}
?
//===============================消費者2代碼========================================
public class Consumer2 { ?public static void main(String[] args) throws Exception { ?Connection connection = ConnectionUtil.getConnection(); ?Channel channel = connection.createChannel(); ?String queue2Name = "test_direct_queue2"; ?channel.queueDeclare(queue2Name,true,false,false,null); ?Consumer consumer = new DefaultConsumer(channel){ ?@Override ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ?System.out.println("body:"+new String(body)); ?System.out.println("Consumer2 將日志信息存儲到數據庫....."); ?} ?}; ?channel.basicConsume(queue2Name,true,consumer); ?} ?}
主題模式(Topics)
-
基于通配符的靈活路由
-
主題交換機(Topic Exchange)驅動:生產者發送消息時指定帶層級的路由鍵(Routing Key,如
order.europe.paid
),消費者通過綁定鍵(Binding Key)使用通配符(*
匹配一個詞,#
匹配零或多個詞)訂閱消息。-
示例:綁定鍵
*.europe.*
可匹配order.europe.paid
或shipment.europe.delayed
。 -
綁定鍵
stock.#
可匹配stock.usd.nyse
或stock.eur.london.close
。
-
-
多維度匹配:支持復雜的分層路由邏輯,適用于需要按多條件分類的場景。
-
高度動態的消息過濾
-
靈活訂閱:消費者可動態定義綁定鍵的通配規則,按需訂閱特定模式的消息,無需修改生產者邏輯。
-
精準與模糊匹配結合:既能精確匹配固定路由鍵,也能通過通配符覆蓋一類消息。
//================================生產者代碼========================================
public class Producer { ?public static void main(String[] args) throws Exception { ?Connection connection = ConnectionUtil.getConnection(); ?Channel channel = connection.createChannel(); ?String exchangeName = "test_topic"; ?channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null); ?String queue1Name = "test_topic_queue1"; ?String queue2Name = "test_topic_queue2"; ?channel.queueDeclare(queue1Name,true,false,false,null); ?channel.queueDeclare(queue2Name,true,false,false,null); ?// 綁定隊列和交換機 ?// 參數1. queue:隊列名稱 ?// 參數2. exchange:交換機名稱 ?// 參數3. routingKey:路由鍵,綁定規則 ?// ? ? 如果交換機的類型為fanout ,routingKey設置為"" ?// routing key 常用格式:系統的名稱.日志的級別。 ?// 需求: 所有error級別的日志存入數據庫,所有order系統的日志存入數據庫 ?channel.queueBind(queue1Name,exchangeName,"#.error"); ?channel.queueBind(queue1Name,exchangeName,"order.*"); ?channel.queueBind(queue2Name,exchangeName,"*.*"); ?// 分別發送消息到隊列:order.info、goods.info、goods.error ?String body = "[所在系統:order][日志級別:info][日志內容:訂單生成,保存成功]"; ?channel.basicPublish(exchangeName,"order.info",null,body.getBytes()); ?body = "[所在系統:goods][日志級別:info][日志內容:商品發布成功]"; ?channel.basicPublish(exchangeName,"goods.info",null,body.getBytes()); ?body = "[所在系統:goods][日志級別:error][日志內容:商品發布失敗]"; ?channel.basicPublish(exchangeName,"goods.error",null,body.getBytes()); ?channel.close(); ?connection.close(); ?} ?}
//================================消費者1代碼=======================================
public class Consumer1 { ?public static void main(String[] args) throws Exception { ?Connection connection = ConnectionUtil.getConnection(); ?Channel channel = connection.createChannel(); ?String QUEUE_NAME = "test_topic_queue1"; ?channel.queueDeclare(QUEUE_NAME,true,false,false,null); ?Consumer consumer = new DefaultConsumer(channel){ ?@Override ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ?System.out.println("body:"+new String(body)); ?} ?}; ?channel.basicConsume(QUEUE_NAME,true,consumer); ?} ?}
?
//================================消費者2代碼=======================================
public class Consumer2 { ?public static void main(String[] args) throws Exception { ?Connection connection = ConnectionUtil.getConnection(); ?Channel channel = connection.createChannel(); ?String QUEUE_NAME = "test_topic_queue2"; ?channel.queueDeclare(QUEUE_NAME,true,false,false,null); ?Consumer consumer = new DefaultConsumer(channel){ ?@Override ?public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ?System.out.println("body:"+new String(body)); ?} ?}; ?channel.basicConsume(QUEUE_NAME,true,consumer); ?} ?}