Java操作RabbitMQ

文章目錄

  • Spring集成RabbitMQ
    • 1. AMQP&SpringAMQP
    • 2. SpringBoot集成RabbitMQ
    • 3. 模型
      • work模型
    • 4.交換機
      • Fanout交換機
      • Direct交換機
      • Topic交換機
    • 5.聲明式隊列和交換機
      • 基于API聲明
      • 基于注解聲明
    • 6.消息轉換器


Spring集成RabbitMQ

1. AMQP&SpringAMQP

  • AMQP(高級消息隊列協議):Advanced Message Queuing Protocol,是用于在應用程序之間傳遞業務消息的開放標準。該協議與語言和平臺無關,更符合微服務中獨立性的要求。是一種面向消息通信的協議,就像HTTP協議是一種瀏覽器向服務器發消息的協議。
  • SpringAMQP:Spring AMOP是基于AMQP協議定義的一套API規范,提供了模板來發送和接收消息。包含兩部分,其中spring-amqp是基礎抽象spring-rabbit是底層的默認實現。也就是說SpringAMQP只是一種思想,而spring-rabbit是其具體實現

2. SpringBoot集成RabbitMQ

在Maven依賴中引入amqp的起步依賴即可

<!--AMQP依賴,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在Spring配置文件中配置

spring:rabbitmq:host: 127.0.0.1port: 5672# 虛擬主機virtual-host: /hhyusername: hhypassword: hhy

RabbitTemplate是Spring封裝好的操作RabbitMQ的工具類

生產者

@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessage2Queue() {String queueName = "hhy.q1";String msg = "hello, mq!666";rabbitTemplate.convertAndSend(queueName, msg);}

消費者

@Component
public class MqListener {@RabbitListener(queues = "hhy.q1")public void listenSimpleQueue(String msg){System.out.println("hhy.q1的消息:【" + msg +"】");}}

3. 模型

work模型

假設消息生產者生產消息的速度非常的快,消息消費者消費消息的速度趕不上生產的速度,就會導致MQ隊列中的消息越來越多,從而導致消息堆積問題,如何處理消息堆積問題?

  1. 讓多個消費者綁定一個隊列,加快消息處理速度
  2. 還可以在代碼層面使用異步操作,比說線程池

綁定多個消費者,每個消費者的處理能力也可能不一致,而Spring默認將消息以輪詢的方式發送給多個消費者,處理能力慢的消費者還是會影響處理速度,此時就可以通過添加配置prefetch讓消費者只獲取一條消息處理完成后再獲取,進一步避免消息堆積問題

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息

在這里插入圖片描述

work模型就是多個消費者綁定一個隊列

@Component
public class MqListener {@RabbitListener(queues = "work.q")public void workListen1(String msg){System.out.println("消費者1:work.q的消息:【" + msg +"】");}@RabbitListener(queues = "work.q")public void workListen2(String msg){System.err.println("消費者2:work.q的消息:【" + msg +"】");}
}

4.交換機

上訴實例代碼中并沒有使用交換機,生產者是直接將消息發送到隊列中,實際這種方式是不合理的,假設多個服務都需要訂閱同一條消息這種方式就無法滿足需求了,那么就要引入交換機。

Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!

交換機的類型有四種:

  • Fanout:廣播,將消息交給所有綁定到交換機的隊列。我們最早在控制臺使用的正是Fanout交換機
  • Direct:訂閱,基于RoutingKey(路由key)發送給訂閱了消息的隊列
  • Topic:通配符訂閱,與Direct類似,只不過RoutingKey可以使用通配符
  • Headers:頭匹配,基于MQ的消息頭匹配,用的較少。

Fanout交換機

Fanout交換機其實就是廣播,將生產者發布的消息廣播給綁定的自身的所有消息隊列。發送消息流程:

  • 可以有多個隊列
  • 每個隊列都要綁定到Exchange(交換機)
  • 生產者發送的消息,只能發送到交換機
  • 交換機把消息發送給綁定過的所有隊列
  • 訂閱隊列的消費者都能拿到消息

在這里插入圖片描述

根據上訴圖編寫代碼

// 消費者1消費隊列1
@RabbitListener(queues = "fanout.q1")
public void fanoutListen1(String msg){System.out.println("消費者1:fanout.q1的消息:【" + msg +"】");
}
// 消費者2消費隊列2
@RabbitListener(queues = "fanout.q2")
public void fanoutListen2(String msg){System.out.println("消費者1:fanout.q2的消息:【" + msg +"】");
}

生產者向Fanout類型交換機發送消息,前提需要創建Fanout類型的交換機

@Test
void testSendFanout() {// 交換機名稱String exchangeName = "amq.fanout";String msg = "hello, fanout!";rabbitTemplate.convertAndSend(exchangeName, null, msg);
}

Direct交換機

在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。

在這里插入圖片描述

在Direct模型下:

  • 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
  • 消息的發送方在 向 Exchange發送消息時,也必須指定消息的 RoutingKey
  • Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息

在這里插入圖片描述

通過key進行綁定,如下圖也就是說生產者發送消息時指定key為test兩個消費者內的隊列都能收到,key為java時只有dirct.q1隊列能收到,key為cpp時只有dirct.q2隊列能收到

在這里插入圖片描述

消費者代碼

@RabbitListener(queues = "direct.q1")
public void fanoutDirect1(String msg){System.out.println("消費者1:direct.q1的消息:【" + msg +"】");
}
@RabbitListener(queues = "direct.q2")
public void fanoutDirect2(String msg){System.out.println("消費者2:direct.q2的消息:【" + msg +"】");
}

生產者代碼

生產者在指定消息時指定不同的key來發送消息

@Test
void testSendDirect() {String exchangeName = "hhy.direct";String msg = "所有隊列都能收到該消息";rabbitTemplate.convertAndSend(exchangeName, "test", msg);
}
@Test
void testSendDirect() {String exchangeName = "hhy.direct";String msg = "只有隊列direct.q1能收到消息";rabbitTemplate.convertAndSend(exchangeName, "java", msg);
}
@Test
void testSendDirect() {String exchangeName = "hhy.direct";String msg = "只有隊列direct.q2能收到消息";rabbitTemplate.convertAndSend(exchangeName, "cpp", msg);
}

Topic交換機

Topic類型的ExchangeDirect相比,都是可以根據RoutingKey把消息路由到不同的隊列。
只不過Topic類型Exchange可以讓隊列在綁定BindingKey 的時候使用通配符!也就是說Topic交換機是非常靈活的,Bindingkey支持模糊匹配。

BindingKey 一般都是有一個或多個單詞組成,多個單詞之間以.分割,例如: china.hunan

通配符規則:

  • #:匹配一個或多個詞
  • *:匹配不多不少恰好1個詞

假設有多個隊列綁定的Bindingkey分別為:

  • china.hunan.chenzhou.weather:湖南郴州的天氣
  • china.hunan.chenzhou.news:湖南郴州的新聞
  • china.zhejiang.hangzhou.weather:浙江杭州的天氣
  • japan.tokyo.news:日本東京的新聞

那么使用通配符:

  • china.hunan.#:表示接受湖南的所有新聞和天氣消息
  • #.news:表示接受所有新聞消息
  • china.hunan.*.news:表示接受湖南省各個市區的新聞

建立綁定關系:

在這里插入圖片描述

在這里插入圖片描述

代碼實例

// 消費者
@RabbitListener(queues = "topic.q1")
public void topicListen1(String msg){System.out.println("消費者1:topic.q1的消息:【" + msg +"】");
}@RabbitListener(queues = "topic.q2")
public void topicListen2(String msg){System.out.println("消費者2:topic.q2的消息:【" + msg +"】");
}

生產者代碼

這一條消息topic.q1topic.q2兩個隊列都能收到消息,因為它們和交換機綁定的關系的時候指定的KEY:

  • #.news:接受所有地方的新聞
  • china.hunan.#:接受湖南的新聞和天氣

@Test
void testSendTopic() {// 交換機名稱String exchangeName = "hhy.topic";String key = "china.hunan.chenzhou.news";String msg = "這是一條湖南郴州的新聞!";rabbitTemplate.convertAndSend(exchangeName, key, msg);
}

下面這條消息只有topic.q2能收到,因為topic.q2和交換機綁定時指定的KEY為china.hunan.#,接受湖南的所有天氣和新聞消息

@Test
void testSendTopic() {// 交換機名稱String exchangeName = "hhy.topic";String key = "china.hunan.chenzhou.weather";String msg = "郴州今天多云轉晴";rabbitTemplate.convertAndSend(exchangeName, key, msg);
}

小結:

描述下Direct交換機與Topic交換機的差異?

  • Topic交換機接收的消息RoutingKey必須是多個單詞,以 **.** 分割
  • Topic交換機與隊列綁定時的bindingKey可以指定通配符
  • #:代表0個或多個詞
  • *:代表1個詞

5.聲明式隊列和交換機

通過RabbitMQ提供的管理頁面創建隊列和交換機比較麻煩,SpringAMQP提供了對應API方便開發者來創建隊列和交換機。

基于API聲明

通過Spring提供的API創建fanout交換機和隊列并建立綁定關系

@Configuration
public class FanoutConfiguration {/*** 聲明式創建fanout交換機* @return*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("hhy.fanout");}/*** 聲明式創建隊列* @return*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 聲明式創建綁定關系* @param fanoutQueue1* @param fanoutExchange* @return*/@Beanpublic Binding fanoutBinding3(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}}

但如果使用這種方式創建Direct交換機就會非常麻煩,因為如果要綁定時要指定多個Key就會出現很多冗余代碼,每綁定一個不同的Key就需要多寫一份代碼

@Configuration
public class DirectConfiguration {@Beanpublic DirectExchange directExchange(){return new DirectExchange("test.direct");}@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}@Beanpublic Binding directQueue1BindingRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}@Beanpublic Binding directQueue1BindingBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}@Beanpublic Binding directQueue2BindingRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}@Beanpublic Binding directQueue2BindingBlue(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}

基于注解聲明

基于@Bean的方式聲明隊列和交換機的方式比價麻煩,代碼有點冗余,Spring還為我們提供基于注解的方式來聲明。

使用注解的方式聲明Direct模式的交換機和隊列,通過注解聲明這種創建方式更簡單清爽,一個注解直接創建交換機并且綁定隊列。并且對應消費者直接就可以監聽隊列接收消息

@Component
public class MqListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenSimpleQueue1(String msg){System.out.println("消費者1:收到了simple.queue的消息:【" + msg +"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenSimpleQueue2(String msg){System.out.println("消費者2:收到了simple.queue的消息:【" + msg +"】");}
}

6.消息轉換器

前面我們生產者發送的消息都是一些字符串,當我們發送的消息是一個對象的時候就會出現問題。

@Test
void testSendObject() {String exchangeName = "test.direct";Map<String, Object> msg = new HashMap<>(2);msg.put("name", "jack");msg.put("age", 21);rabbitTemplate.convertAndSend(exchangeName, "red", msg);
}

如下圖RabbitMQ中的消息隊列中存儲的消息,數據類型是通過JDK自帶的序列化后的數據

在這里插入圖片描述

而JDK自帶的序列化,存在以下問題:

  • 消息體積大
  • 毫無可讀性
  • 有安全漏洞,利用Java字節碼反序列化能被替換惡意代碼

所以使用JDK自帶的序列化方式并不合適,那么我可以使用JSON的序列化方式來解決這個問題。

使用jackson就行,引入jackson依賴

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>

將消息轉換器交給Spring管理

@Bean
public MessageConverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();
}

在這里插入圖片描述


本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/899165.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/899165.shtml
英文地址,請注明出處:http://en.pswp.cn/news/899165.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Kotlin泛型: 協變|逆變|不變

引言 無論java 通配符上限還是下限&#xff0c;都多少存在缺陷&#xff0c;要么存不安全&#xff0c;要么取不安全。而kotlin就解決這個問題。讓out 純輸出&#xff0c; 讓in純輸入。 java這塊知識&#xff1a; java泛型的協變、逆變和不變-CSDN博客 協變 生產者out T 協變…

【Excel使用技巧】某列保留固定字段或內容

目錄 ? 方法一&#xff1a;使用 Excel 公式提取 body 部分 &#x1f50d; 解釋&#xff1a; ? 方法二&#xff1a;批量處理整列數據 &#x1f6a8; 注意事項 &#x1f6a8; 處理效果 我想保留Excel某一列的固定內容&#xff0c;比如原內容是&#xff1a; thread entry i…

C# System.Text.Encoding 使用詳解

總目錄 前言 在C#編程中&#xff0c;處理字符串和字節數組之間的轉換是一個常見的任務。System.Text.Encoding類及其派生類提供了豐富的功能&#xff0c;幫助開發者實現不同字符編碼之間的轉換。本文將詳細講解System.Text.Encoding類的使用方法&#xff0c;包括常用編碼的介紹…

Pre-flash和Main flash

在相機拍照過程中&#xff0c;Pre-flash&#xff08;預閃光&#xff09; 和 Main flash&#xff08;主閃光&#xff09; 是常見的兩種閃光燈使用模式&#xff0c;通常用于提高低光環境下的拍攝質量&#xff0c;尤其在自動曝光&#xff08;AE&#xff09;和自動對焦&#xff08;…

Kafka 4.0 發布:KRaft 替代 Zookeeper、新一代重平衡協議、點對點消息模型、移除舊協議 API

KRaft 全面替代 ZooKeeper Apache Kafka 4.0 是一個重要的里程碑&#xff0c;標志著第一個完全無需 Apache ZooKeeper 運行的主要版本。 通過默認運行在 KRaft 模式下&#xff0c;Kafka 簡化了部署和管理&#xff0c;消除了維護單獨 ZooKeeper 集群的復雜性。 這一變化顯著降…

SFT實驗報告

大模型微調實驗報告* 實驗目標 梳理大模型微調方法&#xff0c;評估各種基座和微調方法的實驗效果。 基礎模型 \1.Llama \2.Qwen \3.Chatglm4 \4. 微調策略 LoRA系列 低秩適配&#xff08;LoRA&#xff09;的核心思想是凍結原始參數&#xff0c;通過低秩分解引入可訓…

LLM - R1 強化學習 DRPO 策略優化 DAPO 與 Dr. GRPO 算法 教程

歡迎關注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/146533892 在強化學習算法中&#xff0c;DAPO (Decoupled Clip and Dynamic Sampling Policy Optimization)&#xff0c;通過解耦裁剪和動態采樣策…

美攝科技智能汽車視頻延遲攝影解決方案,開啟智能出行新視界

在智能汽車時代&#xff0c;車載影像技術正以前所未有的速度發展&#xff0c;成為提升駕乘體驗和滿足用戶多樣化需求的關鍵因素。美攝科技憑借其卓越的技術實力和創新精神&#xff0c;推出了智能汽車視頻延遲攝影解決方案&#xff0c;為智能汽車行業帶來了一場視覺盛宴。 一、…

[250325] Claude AI 現已支持網絡搜索功能!| ReactOS 0.4.15 發布!

目錄 Claude AI 現已支持網絡搜索功能&#xff01;ReactOS 0.4.15 發布&#xff01; Claude AI 現已支持網絡搜索功能&#xff01; 近日&#xff0c;Anthropic 公司宣布&#xff0c;其 AI 助手 Claude 現在可以進行網絡搜索&#xff0c;為用戶提供更及時、更相關的回復。這項新…

代碼規范之Variable Names變量名

代碼規范之Variable Names變量名 golang中 官方文檔&#xff1a;https://go.dev/wiki/CodeReviewComments#variable-names Variable names in Go should be short rather than long. This is especially true for local variables with limited scope. Prefer c to lineCoun…

Mybatis_plus

前言 Mybatis_plus 是在 mybatis 的基礎上進行了增強&#xff0c;在 MyBatis 的基礎上只做增強不做改變&#xff0c;為簡化開發、提高效率而生。本文章只做簡單的使用介紹&#xff0c;更加詳細的內容大家可以參考官網。 下面是mybatis_plus 官網地址&#xff1a; mybatis_plu…

深圳問頂安全科技有限公司asktopsec是做什么的?

深圳問頂安全科技有限公司&#xff0c;是一家專業的AI與應用安全公司。 全球領先的AI、Android、IOS應用安全解決方案提供商&#xff0c;官網&#xff1a;https://asktopsec.com 問頂安全主要為企業提供AI和應用安全服務 移動應用安全檢測、移動應用安全加固、AI智能體安全、AI…

鴻蒙OS 5 架構設計探秘:從分層設計到多端部署

文章目錄 鴻蒙OS架構設計探秘&#xff1a;從分層設計到多端部署一、鴻蒙的分層架構設計二、模塊化設計的精髓三、智慧分發設計&#xff1a;資源的動態調度四、一次開發&#xff0c;多端部署的實踐總結與思考 鴻蒙OS架構設計探秘&#xff1a;從分層設計到多端部署 最近兩年來&a…

idea 沒有 add framework support(添加框架支持)選項

在 IntelliJ IDEA 2023 中&#xff0c;若需通過設置手動添加 “添加框架支持” 菜單項&#xff0c;可按照以下步驟操作&#xff1a; 手動添加 “添加框架支持” 菜單項 打開設置 點擊頂部菜單欄的 File&#xff08;文件&#xff09; -> Settings&#xff08;設置&#xff09…

計算機網絡--傳輸層(2)

傳輸層核心機制深度解析 一、可靠傳輸實現機制 1. 校驗和機制 技術原理&#xff1a; 使用16位二進制反碼求和算法&#xff0c;計算范圍包括TCP偽首部&#xff08;12字節&#xff09;、TCP首部&#xff08;20字節&#xff09;和數據部分接收端重新計算校驗和&#xff0c;若與…

再探帶權并查集

典型例題 Acwing 權值 故名思義&#xff0c;在帶權并查集中&#xff0c;我們需要讓每個節點攜帶一個**“權值”**。 那么這個權值應該是什么呢&#xff1f;其實答案就在并查集當中。 由于在并查集當中我們可以在 O ( 1 ) O(1) O(1) 時間內找到一個節點的根節點&#xff0c;那…

Vala編成語言教程-構造函數和析構函數

構造函數 Vala支持兩種略有不同的構造方案&#xff1a;我們將重點討論Java/C#風格的構造方案&#xff0c;另一種是GObject風格的構造方案。 Vala不支持構造函數重載的原因與方法重載不被允許的原因相同&#xff0c;這意味著一個類不能有多個同名構造函數。但這并不構成問題&…

本地部署Stable Diffusion生成爆火的AI圖片

直接上代碼 Mapping("/send") Post public Object send(Body String promptBody) { JSONObject postSend new JSONObject(); System.out.println(promptBody); JSONObject body JSONObject.parseObject(promptBody); List<S…

python爬蟲WASM

WASM 一.WASM簡介 1.1 WASM定義 ? WebAssembly(簡稱wasm)是一個虛擬指令集體系架構(virtual ISA),整體架構包括核心的ISA定義、二進制編碼、程序語義的定義與執行,以及面向不同的嵌入環境(如Web)的應用編程接口(WebAssembly API)。是一種運行在現代網絡瀏覽器中的…

Docker鏡像遷移方案

Docker鏡像遷移方案 文章目錄 Docker鏡像遷移方案一&#xff1a;背景二&#xff1a;操作方式三&#xff1a;異常原因參考&#xff1a; 一&#xff1a;背景 比如機器上已經有先有的容器&#xff0c;但是docker pull的時候是失敗的二&#xff1a;操作方式 1、停止正在運行的容器…