前言
RabbitMQ 是在 AMQP(Advanced Message Queuing Protocol) 協議標準基礎上完整的,可復用的企業消息系統。它遵循 Mozilla Public License 開源協議,采用 Erlang 實現的工業級的消息隊列(MQ)服務器,建立在 Erlang OTP 平臺上(因為采用 Erlang 開發,所以 RabbitMQ 穩定性和可靠性比較高)
其他主流 MQ 產品
- ActiveMQ:Apache 出品,最流行的,能力強勁的開源消息總線,基于 JMS(Java Message Service)規范
- RocketMQ:阿里低延遲、高并發、高可用、高可靠的分布式消息中間件,基于 JMS,目前由 Apache 基金會維護
- Kafka:分布式,分區的,多副本的,多訂閱者的消息發布訂閱系統(分布式 MQ 系統),可以用于搜索日志,監控日志,訪問日志等
本文為 RabbitMQ 入門教程,主要將會講解 RabbitMQ 安裝配置(Windows),相關概念,及項目中具體應用
安裝
Erlang
官網下載鏈接:Downloads - Erlang/OTP
RabbitMQ 服務器必須首先安裝 Erlang 運行環境,同時安裝時需要注意 RabbityMQ 所依賴的 Erlang 版本,我們可以查看下方官方版本對應信息
版本對應:RabbitMQ Erlang Version Requirements — RabbitMQ
本次使用版本 Erlang OTP 25.3(點擊跳轉下載鏈接)
雙擊執行 exe 安裝程序,除了安裝路徑其他都按照默認即可
然后配置環境變量
ERLANG_HOME = D:\Erlang\Erlang\Erlang OTP
并且添加 /bin 目錄到 Path 環境變量中,即添加 %ERLANG_HOME%\bin
到 Path 中
安裝配置之后,打開 CMD,輸入 erl 然后回車鍵,會彈出版本信息,表示 Erlang 安裝成功
RabbitMQ
官方下載頁面:RabbitMQ Changelog — RabbitMQ
下載鏈接: RabbitMQ 3.12.0
安裝 exe 文件,執行安裝包,同樣除了安裝路徑外其他保持默認
配置環境變量
RABBITMQ_SERVER = D:\RabbitMQ\RabbitMQ\rabbitmq_server-3.12.0
然后添加 %RABBITMQ_SERVER%\sbin
到 Path 環境變量中
查看所有插件
rabbitmq-plugins list
注:如果出現問題請參考最后一章 徹底卸載
之后我們需要安裝 rabbitmq_management 插件,可以使用可視化的方式查看 RabbitMQ 服務器實例的狀態,以及操控 RabbitMQ 服務器
# 安裝插件
rabbitmq-plugins enable rabbitmq_management
訪問管理界面: http://localhost:15672/ (賬號密碼:guest / guest)
前期安裝配置完畢,下面可以配合官方入門文檔學習
官方文檔:RabbitMQ Tutorials — RabbitMQ
消息隊列
定義
消息指的是兩個應用間傳遞的數據。數據的類型有很多種形式,可能只包含文本字符串,也可能包含嵌入對象。
“消息隊列(Message Queue)”是在消息的傳輸過程中保存消息的容器。在消息隊列中,通常有生產者和消費者兩個角色。生產者只負責發送數據到消息隊列,誰從消息隊列中取出數據處理,他不管。消費者只負責從消息隊列中取出數據處理,他不管這是誰發送的數據
作用
解耦。如圖所示。假設有系統 B、C、D 都需要系統 A 的數據,于是系統 A 調用三個方法發送數據到 B、C、D。這時,系統 D 不需要了,那就需要在系統 A 把相關的代碼刪掉。假設這時有個新的系統 E 需要數據,這時系統 A 又要增加調用系統 E 的代碼。為了降低這種強耦合,就可以使用 MQ,系統 A 只需要把數據發送到 MQ,其他系統如果需要數據,則從 MQ 中獲取即可
異步。如圖所示。一個客戶端請求發送進來,系統 A 會調用系統 B、C、D 三個系統,同步請求的話,響應時間就是系統 A、B、C、D 的總和,也就是 800ms。如果使用 MQ,系統 A 發送數據到 MQ,然后就可以返回響應給客戶端,不需要再等待系統 B、C、D 的響應,可以大大地提高性能。對于一些非必要的業務,比如發送短信,發送郵件等等,就可以采用 MQ
削峰。如圖所示。這其實是 MQ 一個很重要的應用。假設系統 A 在某一段時間請求數暴增,有 5000 個請求發送過來,系統 A 這時就會發送 5000 條 SQL 進入 MySQL 進行執行,MySQL 對于如此龐大的請求當然處理不過來,MySQL 就會崩潰,導致系統癱瘓。如果使用 MQ,系統 A 不再是直接發送 SQL 到數據庫,而是把數據發送到 MQ,MQ 短時間積壓數據是可以接受的,然后由消費者每次拉取 2000 條進行處理,防止在請求峰值時期大量的請求直接發送到 MySQL 導致系統崩潰
特點
可靠性:通過支持消息持久化,支持事務,支持消費和傳輸的 ack 等來確保可靠性
路由機制:支持主流的訂閱消費模式,如廣播,訂閱,headers 匹配等
擴展性:多個 RabbitMQ 節點可以組成一個集群,也可以根據實際業務情況動態地擴展集群中節點
高可用性:隊列可以在集群中的機器上設置鏡像,使得在部分節點出現問題的情況下隊仍然可用
多種協議:RabbitMQ 除了原生支持 AMQP 協議,還支持 STOMP,MQTT 等多種消息中間件協議
多語言客戶端:RabbitMQ 幾乎支持所有常用語言,比如 Java、Python、Ruby、PHP、C#、JavaScript 等
管理界面:RabbitMQ 提供了易用的用戶界面,使得用戶可以監控和管理消息、集群中的節點等
插件機制:RabbitMQ 提供了許多插件,以實現從多方面進行擴展,當然也可以編寫自己的插件
應用
本章將會集成 rabbitmq 到 SpringBoot 中,并使用 rabbitmq-provider (生產者)和 rabbitmq-consumer(消費者) 兩個項目進行具體講解, 也可以在父項目中創建這兩個模塊(本文采用父子模塊方式)
所有代碼示例已經上傳到 GitHub 倉庫
倉庫地址:ReturnTmp/rabbitmq-demo: rabbitmq 實例代碼 (github.com)
生產者
配置
創建子模塊 rabbitmq-provider
依賴配置(也可以 IDEA 初始化模塊直接勾選)
<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
application.yml
server: port: 8021
spring: application: name: rabbitmq-provider rabbitmq: host: 127.0.0.1 port: 5672 username: root password: 111111 virtual-host: RootHost
其中虛擬 host 配置項不是必須的,需要自行創建 vhost,如果未自行創建,默認為 virtual-host: /
注:vhost 可以理解為虛擬 broker,即 mini-RabbitMQ?server,其內部均含有獨立的 queue、bind、exchange 等,最重要的是擁有獨立的權限系統,可以做到 vhost 范圍內的用戶控制。當然,從 RabbitMQ 全局角度,vhost 可以作為不同權限隔離的手段
可以按照如下步驟創建 vhost
然后創建用戶(管理員)
然后我們需要為用戶分配權限,指定使用我們剛剛創建的 vhost
代碼
創建直連交換機配置類
注:RabbitMQ 共有四種交換機,分別為:直連交換機,扇形交換機,主題交換機,首部交換機。這里使用直連交換機演示,其他讀者可以自行嘗試
@Configuration
public class DirectRabbitConfig {//隊列 起名:TestDirectQueue@Beanpublic Queue TestDirectQueue() {// durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效// exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高于durable// autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。// return new Queue("TestDirectQueue",true,true,false);//一般設置一下隊列的持久化就好,其余兩個就是默認falsereturn new Queue("TestDirectQueue", true);}//Direct交換機 起名:TestDirectExchange@BeanDirectExchange TestDirectExchange() {// return new DirectExchange("TestDirectExchange",true,true);return new DirectExchange("TestDirectExchange", true, false);}//綁定 將隊列和交換機綁定, 并設置用于匹配鍵:TestDirectRouting@BeanBinding bindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");}@BeanDirectExchange lonelyDirectExchange() {return new DirectExchange("lonelyDirectExchange");}}
然后寫簡單的接口進行消息推送(可以視情況寫為定時任務)
@RestController
public class SendMessageController {@AutowiredRabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發送等等方法@GetMapping("/sendDirectMessage")public String sendDirectMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "test message, hello!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);//將消息攜帶綁定鍵值:TestDirectRouting 發送到交換機TestDirectExchangerabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);return "ok";}
}
啟動項目,調用接口: http://localhost:8021/sendDirectMessage
查看 RabbitMQ 管理頁面查看是否推送成功
消費者
配置
創建子模塊 rabbitmq-consumer
依賴配置
<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
application.yml
server: port: 8022
spring: application: name: rabbitmq-consumer rabbitmq: host: 127.0.0.1 port: 5672 username: root password: 111111 virtual-host: RootHost
代碼
創建消息接收監聽類
@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {@RabbitHandlerpublic void process(Map testMessage) {System.out.println("DirectReceiver receive message: " + testMessage.toString());}
}
之后啟動項目,查看消費者接收情況
序列化
發送接收消息可能出現 Failed to convert message
問題,可以通過使用 JSON 序列化傳輸信息方式解決
生產者
@Configuration
public class RabbitMQConfig implements InitializingBean {/*** 自動注入RabbitTemplate模板*/@Resourceprivate RabbitTemplate rabbitTemplate;/*** 發送消息JSON序列化*/@Overridepublic void afterPropertiesSet() {//使用JSON序列化rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());}
}
消費者
@Configuration
public class RabbitMQConfig {@Beanpublic MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {return new Jackson2JsonMessageConverter(objectMapper);}
}
徹底卸載
我們安裝中可能出現各種問題,一般情況下是 RabbitMQ 和 Erlang 版本不對應,需要完全卸載 RabbitMQ 和 Erlang,可以按照如下步驟卸載
注:博主首次安裝使用的是 Erlang 20.3 Rabbit 3.7.15 ,之后似乎小版本不對應,出現問題,需要重新卸載安裝
(1)打開 Windows 控制面板,雙擊“程序和功能”。
(2)在當前安裝的程序列表中,右鍵單擊 RabbitMQ Server,然后單擊“卸載”。
(3)在當前安裝的程序列表中,右鍵單擊“Erlang OTP”,然后單擊“卸載”。
(4)打開 Windows 任務管理器。
(5)在任務管理器中,查找進程 epmd.exe。 如果此進程仍在運行,請右鍵單擊該進程,然后單擊“結束進程”。
(6)刪除 RabbitMQ 和 Erlang 的所有安裝目錄。
(7)刪除文件 C:\Windows\System32\config\systemprofile.erlang.cookie
(如果存在)。
(8)轉到用戶文件夾:C:\Users\[username]
,然后刪除文件.erlang.cookie。
(9)同樣在 User 文件夾中,轉到 AppData \ Roaming \ RabbitMQ
。刪除 RabbitMQ 文件夾。
(10)刪除注冊表 HKEY_LOCAL_MACHINE\SOFTWARE\Ericsson\Erlang\ErlSrv
的子項。
(11)打開運行 cmd->sc delete RabbitMQ。
(12)打開運行->regedit 找到 RabbitMQ 節點,刪掉即可(如果存在)
參考鏈接
- Windows 下安裝 RabbitMQ 服務器及基本配置 - 藍之風 - 博客園 (cnblogs.com)
- RabbitMQ Windows 安裝、配置、使用 - 小白教程-阿里云開發者社區 (aliyun.com)
- Windows 如何完全卸載 RabbitMQ 和 Erlang 刪除注冊表
- windows 下 Erlang 與 RabbitMQ 重新安裝時,由于卸載不干凈導致各類錯誤
- 超詳細的 RabbitMQ 入門,看這篇就夠了!-阿里云開發者社區 (aliyun.com)
- RabbitMQ 整合 Spring Boot,實現 Hello World
- Springboot 整合 RabbitMq ,用心看完這一篇就夠了
- RabbitMq 核心知識點小結 - 知乎 (zhihu.com)
- RabbitMQ消費消息坑:failed to convert serialized Message content - jiuchengi
本文由博客一文多發平臺 OpenWrite 發布!