第 5 篇:RabbitMQ 消息隊列與閘機通信設計
RabbitMQ 是一款開源的消息隊列中間件(Message Queue,MQ),基于 Erlang 語言開發,遵循 AMQP(Advanced Message Queuing Protocol,高級消息隊列協議) 標準,主要用于在分布式系統中實現消息的可靠傳遞,核心價值是解耦系統組件、削峰填谷、異步通信等。
RabbitMQ
一、核心定位:為什么需要 RabbitMQ?
RabbitMQ 作為“中間件”,通過存儲轉發消息的方式,讓發送方(生產者)和接收方(消費者)“解耦”:生產者只需要把消息發給 RabbitMQ,不必關心誰來接收;消費者從 RabbitMQ 取消息,不必關心誰發來的。雙方通過消息間接通信,提高系統靈活性和穩定性。
二、核心概念與架構
RabbitMQ 的工作流程基于“生產者-交換機-隊列-消費者”的鏈路,核心組件包括:
- 生產者(Producer):發送消息的應用(如訂單系統生成“新訂單”消息)。
- 交換機(Exchange):接收生產者的消息,根據“路由規則”將消息轉發到對應的隊列。交換機是 RabbitMQ 靈活路由的核心,有 4 種類型:
- 直連交換機(Direct Exchange):消息的“路由鍵”(Routing Key)必須與隊列綁定的“綁定鍵”完全匹配才轉發(如“order.pay”只能匹配“order.pay”)。
- 主題交換機(Topic Exchange):支持通配符匹配(
*
匹配一個單詞,#
匹配多個單詞),適合按“主題”分類消息(如“order.*”可匹配“order.pay”“order.cancel”)。 - 扇形交換機(Fanout Exchange):忽略路由鍵,將消息廣播到所有綁定的隊列(適合“一對多”通知,如“系統通知”需要所有模塊接收)。
- Headers 交換機:不依賴路由鍵,根據消息頭(Headers)的鍵值對匹配(較少用)。
- 隊列(Queue):存儲消息的緩沖區,消息最終會進入隊列等待消費者取走。隊列是“持久化”的(可配置),即使 RabbitMQ 重啟,未消費的消息也能保留。
- 綁定(Binding):連接交換機和隊列的“規則”,包含“綁定鍵”(Binding Key),用于交換機判斷消息該轉發到哪個隊列。
- 消費者(Consumer):從隊列中獲取并處理消息的應用(如支付系統接收“新訂單”消息并處理支付)。
三、工作流程(核心邏輯)
- 生產者發送消息時,會指定消息的“路由鍵”(Routing Key)和目標交換機。
- 交換機根據自身類型和“綁定規則”(綁定鍵),將消息轉發到匹配的隊列。
- 隊列存儲消息,等待消費者連接并獲取。
- 消費者從隊列中取走消息并處理,處理完成后向 RabbitMQ 發送“確認信號(Ack)”,RabbitMQ 收到后刪除隊列中的該消息(避免重復消費)。
一、引入消息隊列的作用
閘機控制涉及設備通信,采用同步調用易受網絡波動影響。引入 RabbitMQ 可實現:
-
系統解耦:閘機控制服務與設備通信模塊分離,便于獨立擴展。
-
異步處理:開閘、關閘命令異步發送,不阻塞主線程。
-
可靠性保障:消息持久化,避免命令丟失。
二、RabbitMQ 核心組件設計
- 交換機(Exchange):
-
gate.control.exchange
:處理閘機控制命令(開閘 / 關閘)。 -
access.record.exchange
:處理通行記錄存儲。
- 隊列(Queue):
-
gate.command.queue
:綁定閘機控制交換機,接收命令。 -
record.save.queue
:綁定記錄交換機,異步保存通行信息。
- 消息模型:
// 閘機命令消息public class GateCommand{public string GateCode { get; set; } // 目標閘機編碼public int CommandType { get; set; } // 1-開閘/0-關閘public DateTime Timestamp { get; set; } = DateTime.Now;}
三、RabbitMQ 集成實現
- 安裝依賴:
dotnet add package RabbitMQ.Client
- 封裝消息服務:
public class RabbitMQService : IDisposable{private readonly IConnection _connection;private readonly IModel_channel;public RabbitMQService(string connectionString){var factory = new ConnectionFactory { Uri = new Uri(connectionString) };_connection = factory.CreateConnection();_channel = _connection.CreateModel();// 聲明交換機和隊列_channel.ExchangeDeclare("gate.control.exchange", ExchangeType.Direct, durable: true);_channel.QueueDeclare("gate.command.queue", durable: true, exclusive: false, autoDelete: false);_channel.QueueBind("gate.command.queue", "gate.control.exchange", "gate.command");}// 發送閘機命令public void PublishGateCommand(GateCommand command){var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(command));_channel.BasicPublish(exchange: "gate.control.exchange",routingKey: "gate.command",basicProperties: null,body: body);}// 消費命令(閘機端實現)public void ConsumeGateCommands(Action<GateCommand> handler){var consumer = new EventingBasicConsumer(_channel);consumer.Received += (model, ea) => {var body = ea.Body.ToArray();var command = JsonConvert.DeserializeObject\<GateCommand>(Encoding.UTF8.GetString(body));handler(command);_channel.BasicAck(ea.DeliveryTag, multiple: false);};_channel.BasicConsume(queue: "gate.command.queue", autoAck: false, consumer: consumer);}public void Dispose(){_channel?.Close();\connection?.Close();}}
四、閘機命令處理流程
-
驗證通過后,系統向 RabbitMQ 發送開閘命令(
CommandType=1
)。 -
閘機端消費者接收命令,執行物理開閘操作。
-
開閘成功后,觸發票證狀態更新回調。
-
延遲 5 秒后,系統發送關閘命令(
CommandType=0
),完成一次控制流程。