RabbitMQ快速學習之WorkQueues模型、三種交換機、消息轉換器(SpringBoot整合)

文章目錄

  • 前言
  • 一、WorkQueues模型
    • 消息發送
    • 消息接收
    • 能者多勞
  • 二、交換機類型
    • 1.Fanout交換機
      • 消息發送
      • 消息接收
    • 2.Direct交換機
      • 消息接收
      • 消息發送
    • 3.Topic交換機
      • 消息發送
      • 消息接收
  • 三、編程式聲明隊列和交換機
    • fanout示例
    • direct示例
    • 基于注解
  • 四、消息轉換器
  • 總結


前言

WorkQueues模型、Fanout交換機、Direct交換機、Topic交換機、基于SpringBoot注解聲明隊列和交換機、消息轉換器。


一、WorkQueues模型

  • Work queues,任務模型。簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列中的消息。
  • 當消息處理比較耗時的時候,可能生產消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。此時就可以使用work 模型,多個消費者共同處理消息處理,消息處理的速度就能大大提高了。

在這里插入圖片描述
在控制臺創建一個work.queue的隊列:
在這里插入圖片描述

消息發送

循環發送,模擬大量消息堆積現象。

/*** workQueue* 向隊列中不停發送消息,模擬消息堆積。*/
@Test
public void testWorkQueue() throws InterruptedException {// 隊列名稱String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 發送消息,每20毫秒發送一次,相當于每秒發送50條消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

消息接收

模擬多個消費者綁定同一個隊列,我們添加2個方法,并且設置不同睡眠時間模擬不同性能讀取

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消費者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消費者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

在這里插入圖片描述

  • 消費者1很快完成了自己的25條消息
  • 消費者2卻在緩慢的處理自己的25條消息。

也就是說消息是平均分配給每個消費者,并沒有考慮到消費者的處理能力。導致1個消費者空閑,另一個消費者忙的不可開交。沒有充分利用每一個消費者的能力,最終消息處理的耗時遠遠超過了1秒。這樣顯然是有問題的。

能者多勞

在spring中有一個簡單的配置,可以解決這個問題。我們修改consumer服務的application.yml文件,添加配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息

再次測試,發現結果如下:
在這里插入圖片描述

可以發現,由于消費者1處理速度較快,所以處理了更多的消息;消費者2處理速度較慢,只處理了6條消息。而最終總的執行耗時也在1秒左右,大大提升。
正所謂能者多勞,這樣充分利用了每一個消費者的處理能力,可以有效避免消息積壓問題。

Work模型的使用:

  • 多個消費者綁定到一個隊列,同一條消息只會被一個消費者處理
  • 通過設置prefetch來控制消費者預取的消息數量

二、交換機類型

1.Fanout交換機

發送流程:
在這里插入圖片描述

  • 可以有多個隊列
  • 每個隊列都要綁定到Exchange(交換機)
  • 生產者發送的消息,只能發送到交換機
  • 交換機把消息發送給綁定過的所有隊列
  • 訂閱隊列的消費者都能拿到消息

在控制臺創建fanout.queue1、fanoutqueue2隊列和dragon.fanout交換機,并將隊列綁定到交換機:
在這里插入圖片描述

在這里插入圖片描述

消息發送

@Test
public void testFanoutExchange() {// 交換機名稱String exchangeName = "dragon.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
}

消息接收

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消費者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消費者2接收到Fanout消息:【" + msg + "】");
}

交換機的作用是什么?

  • 接收publisher發送的消息
  • 將消息按照規則路由到與之綁定的隊列
  • 不能緩存消息,路由失敗,消息丟失
  • FanoutExchange的會將消息路由到每個綁定的隊列

2.Direct交換機

在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。

在這里插入圖片描述
在Direct模型下:

  • 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
  • 消息的發送方在 向 Exchange發送消息時,也必須指定消息的 RoutingKey。
  • Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息

在控制臺聲明兩個隊列direct.queue1和direct.queue2,然后聲明一個direct類型的交換機,綁定隊列和交換機(使用red和blue作為key,綁定direct.queue1到dragon.direct;使用red和yellow作為key,綁定direct.queue2到dragon.direct):
在這里插入圖片描述

在這里插入圖片描述

消息接收

@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {System.out.println("消費者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {System.out.println("消費者2接收到direct.queue2的消息:【" + msg + "】");
}

消息發送

@Test
public void testSendDirectExchange() {// 交換機名稱String exchangeName = "dragon.direct";// 消息String message = "紅色警報!日本亂排核廢水,導致海洋生物變異,驚現哥斯拉!";// 發送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

綁定紅色的都會接收到信息:
在這里插入圖片描述

描述下Direct交換機與Fanout交換機的差異?

  • Fanout交換機將消息路由給每一個與之綁定的隊列
  • Direct交換機根據RoutingKey判斷路由給哪個隊列
  • 如果多個隊列具有相同的RoutingKey,則與Fanout功能類似

3.Topic交換機

  • Topic類型的Exchange與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。
    只不過Topic類型Exchange可以讓隊列在綁定BindingKey 的時候使用通配符!
  • BindingKey 一般都是有一個或多個單詞組成,多個單詞之間以.分割,例如: item.insert
  • 通配符規則:
    • #:匹配一個或多個詞
    • *:匹配不多不少恰好1個詞
    • 舉例:
      dragon.#:能夠匹配dragon.stu.insert 或者 dragon.stu
      dragon.*:只能匹配dragon.stu

在這里插入圖片描述
假如此時publisher發送的消息使用的RoutingKey共有四種:

  • china.news 代表有中國的新聞消息;
  • china.weather 代表中國的天氣消息;
  • japan.news 則代表日本新聞
  • japan.weather 代表日本的天氣消息;

解釋:

  • topic.queue1:綁定的是china.# ,凡是以 china.開頭的routing key 都會被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:綁定的是#.news ,凡是以 .news結尾的 routing key 都會被匹配。包括:
    • china.news
    • japan.news

在這里插入圖片描述

消息發送

/*** topicExchange*/
@Test
public void testSendTopicExchange() {// 交換機名稱String exchangeName = "dragon.topic";// 消息String message = "喜報!孫悟空大戰哥斯拉,勝!";// 發送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

消息接收

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){System.out.println("消費者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){System.out.println("消費者2接收到topic.queue2的消息:【" + msg + "】");
}

描述下Direct交換機與Topic交換機的差異?

  • Topic交換機接收的消息RoutingKey必須是多個單詞,以 . 分割
  • Topic交換機與隊列綁定時的bindingKey可以指定通配符
  • #:代表0個或多個詞
  • *:代表1個詞

三、編程式聲明隊列和交換機

SpringAMQP提供了一個Queue類,用來創建隊列:

fanout示例

@Configuration
public class FanoutConfiguration {/*** 聲明交換機* @return Fanout類型交換機*/@Beanpublic FanoutExchange fanoutExchange(){
//    還可以是  return  ExchangeBuilder.fanoutExchange("dragon.fanout").build();return new FanoutExchange("dragon.fanout");}/*** 第1個隊列*/@Beanpublic Queue fanoutQueue1(){
//   還可以是  return  QueueBuilder.durable("fanout.queue1").build();return new Queue("fanout.queue1");}/*** 綁定隊列和交換機*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2個隊列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 綁定隊列和交換機*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
//綁定隊列和交換機的另一方法
//    @Bean
//    public Binding bindingQueue2(){
//        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
//    }
}

direct示例

@Configuration
public class DirectConfiguration {/*** 聲明交換機* @return Direct類型交換機*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("dragon.direct").build();}/*** 第1個隊列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 綁定隊列和交換機*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 綁定隊列和交換機*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2個隊列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 綁定隊列和交換機*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 綁定隊列和交換機*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}

基于注解

上面的代碼還是很多的,基于注解的方式也能夠代替上面的繁雜配置,下面演示direct和topic交換機,隊列的代碼

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "dragon.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消費者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "dragon.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消費者2接收到direct.queue2的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消費者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消費者2接收到topic.queue2的消息:【" + msg + "】");
}

四、消息轉換器

隨便創建一個隊列,然后發送Map對象,你會發現消息格式很不友好

@Test
public void testSendMap() throws InterruptedException {// 準備消息Map<String,Object> msg = new HashMap<>();msg.put("name", "柳巖");msg.put("age", 21);// 發送消息rabbitTemplate.convertAndSend("object.queue", msg);
}

控制臺查看:
在這里插入圖片描述

  • Spring的消息發送代碼接收的消息體是一個Object,而在數據傳輸時,它會把你發送的消息序列化為字節發送給MQ,接收消息的時候,還會把字節反序列化為Java對象。
  • 只不過,默認情況下Spring采用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:
    • 數據體積過大
      有安全漏洞
      可讀性差
  • 根據上面測試,顯然,JDK序列化方式并不合適。我們希望消息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化。

j解決:
引入依賴:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

注意,如果項目中引入了spring-boot-starter-web依賴,則無需再次引入Jackson依賴。其轉換器配置。
添加配置:

@Bean
public MessageConverter messageConverter(){// 1.定義消息轉換器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自動創建消息id,用于識別不同消息,也可以在業務中基于ID判斷是否是重復消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

總結

以上就是所有講解。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/163692.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/163692.shtml
英文地址,請注明出處:http://en.pswp.cn/news/163692.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C plus plus

環境配置 vscodewindows vscode c 環境配置(終極版)_vscode配置c/c環境_BangBang的博客-CSDN博客VsCode安裝和配置C環境詳細全流程_vscode安裝c-CSDN博客MinGW、MinGW-w64 與TDM-GCC 應該如何選擇&#xff1f; - 知乎、VsCode安裝和配置C環境詳細全流程_vscode安裝c-CSDN博客 …

?LeetCode解法匯總5-正則表達式匹配?

目錄鏈接&#xff1a; 力扣編程題-解法匯總_分享記錄-CSDN博客 GitHub同步刷題項目&#xff1a; https://github.com/September26/java-algorithms 原題鏈接&#xff1a;力扣&#xff08;LeetCode&#xff09;官網 - 全球極客摯愛的技術成長平臺 描述&#xff1a; 「HTML 實…

Educoder中Hive綜合應用案例——用戶學歷查詢

第1關:查詢每一個用戶從出生到現在的總天數 ---------- 禁止修改 ----------drop database if exists mydb cascade; ---------- 禁止修改 -------------------- begin ---------- ---創建mydb數據庫 create database mydb;---使用mydb數據庫 use mydb;---創建表user create …

電腦找不到xinput1_3.dll怎么修復,快速處理dll問題的5個方法分享

在使用電腦的過程中&#xff0c;我們常常會遇到一些常見的問題&#xff0c;其中之一就是“電腦缺少xinput1_3.dll”。這個問題可能會影響到我們對電腦的使用體驗&#xff0c;甚至導致某些軟件無法正常運行。在我遇到這個問題并解決之后&#xff0c;我深刻地體會到了解決問題的重…

迅鐳激光板材切割自動化生產線中標高端機械裝備龍頭豪邁集團!

近年來&#xff0c;中國制造業逐步由低端制造業向高端制造業邁進、由勞動密集型向技術密集型轉變&#xff0c;智能制造帶動了制造業生產環節的自動化、信息化、數字化、智能化的迭代升級。 位于山東省的高端機械裝備龍頭——豪邁集團&#xff0c;緊跟國家發展戰略&#xff0c;加…

【Spring集成MyBatis】MyBatis的Dao層實現(基于配置,非注解開發)

文章目錄 1. MyBatis的dao層實現(傳統方式)——需要寫接口及其實現類2. MyBatis的代理開發方式——僅需寫接口 1. MyBatis的dao層實現(傳統方式)——需要寫接口及其實現類 傳統方式就是在項目下邊建立dao包&#xff0c;里面包含接口及其實現類&#xff0c;文件結構如下&#x…

交直流一體化電源系統測試步驟詳解

交直流一體化電源擁有高度適應性&#xff0c;可以用于不同的電力需求領域。但是為了確保其質量和性能&#xff0c;需要對交直流一體化電源進行各項測試以保證正常工作。本文納米軟件將介紹交直流一體化電源的測試方法&#xff0c;以及如何用交直流一體化電源測試系統進行測試。…

Java,數據結構與集合源碼,關于Map接口的實現類(HashMap、LinkedHashMap)

HashMap中的元素的特點&#xff1a; HashMap中的所有key之間是不可重復的、無序的。所有的key構成一個Set集合。 HashMap中的所有的value彼此之間是可重復的、無序的。所有的value構成一個Collection集合。 HashMap中的一對key-value&#xff0c;就構成了一個entry。Map中的ent…

python常用第三方模塊---openyxl

openyxl模塊&#xff1a;用于處理excel文件的木塊&#xff0c;可以Excel中的數據進行寫入和讀取 函數或類的名稱功能描述load_workbook(filename)打開已經存在的工作簿&#xff0c;結果為工作簿對象 workbook.sheetnames 工作簿對象的sheetnames屬性&#xff0c;用戶獲取所有工…

深入理解 pytest Fixture 方法及其應用!

當涉及到編寫自動化測試時&#xff0c;測試框架和工具的選擇對于測試用例的設計和執行非常重要。在Python 中&#xff0c;pytest是一種廣泛使用的測試框架&#xff0c;它提供了豐富的功能和靈活的擴展性。其中一個很有用的功 能是fixture方法&#xff0c;它允許我們初始化測試環…

《實現領域驅動設計》筆記——上下文映射圖

一個項目的上下文映射圖可以用方式來表示。比較容易的一種是畫一個簡單的框圖表示兩個或多個限界上下文之間的映射關系。該框圖表示了不同的限界上下文在解決方案空間中是如何通過集成相互關聯的。另一種更詳細的方式是通過限界上下文集成的源代碼實現來表示。 上下文映射圖為什…

微軟WHQL認證

windows驅動開發要擺脫在測試模式下的開發&#xff0c;需要通過WHQL認證。 1&#xff1a;申請EV代碼簽名證書。EV代碼簽名證書在后續注冊Windows硬件開發中心帳戶&#xff0c;以及提交WHQL認證前為驅動程序進行數字簽名等流程中都需要用到&#xff0c;所以申請EV代碼簽名證書是…

唯一索引和普通索引的使用上要注意什么

考慮下面一種情況&#xff1a; select name from CUser where id_card xxxxxxxyyyyyyzzzzz;你可能會將id_card作為主鍵了&#xff0c;但最好別這么做。你想想這么長一串的字符串做主鍵&#xff0c;查詢時候效率其實是比較低的&#xff0c;其實是建議選擇其他的作為主鍵。 那么…

BUUCTF [SWPU2019]我有一只馬里奧 1

BUUCTF:https://buuoj.cn/challenges 題目描述&#xff1a; 得到的 flag 請包上 flag{} 提交。 密文&#xff1a; 下載附件&#xff0c;得到一個.exe文件。 解題思路&#xff1a; 1、雙擊.exe文件運行&#xff0c;得到一個1.txt文本。打開&#xff0c;如下圖。 2、提示我們…

Mysql中正則表達式Regexp常見用法

Mysql中正則表達式Regexp常見用法_regexp不包含-CSDN博客

List轉string 逗號分隔

List轉string 逗號分隔 1、將list轉化為逗號分割的字符串 String str String.join(",", list); String str StringUtils.json(list.toArray(), ",");   2、將逗號分隔的字符串轉換為List List<String> list Arrays.asList(str.split("…

當老師應該選文科還是理科

教育不斷發展和改革&#xff0c;教師職業的選擇也越來越受到關注。許多人在選擇專業時都會考慮成為一名教師&#xff0c;但對于選擇文科還是理科卻感到困惑。本文將探討當老師應該選文科還是理科。 文科注重的是人文素養和社會科學方面的知識&#xff0c;而理科則注重自然科學和…

如何做一個簡單的深度集成學習框架

使用同一個框架&#xff0c;獨立在一個數據集上面&#xff0c;分別訓練多次&#xff0c;每個單獨模型訓練超參數可以一樣&#xff0c;也可以不一樣&#xff0c;最后若干個訓練好的獨立模型在測試集數據上面做最后集中決策。 實例代碼如下&#xff1a; class MyEnsemble(nn.Modu…

問題 上位機程序重啟

/ 1、上位機程序重啟&#xff0c; 讀線程被殺死&#xff0c;mcu usb busy&#xff0c;數據在fifo發不出去 下次線程重啟后&#xff0c;在fifo里的數據首先被發送出去。 //

紅旗Asianux Server Linux V8 安裝萬里數據庫(GreatSQL)

紅旗Asianux Server Linux V8 安裝萬里數據庫&#xff08;GreatSQL&#xff09; 紅旗Asianux介紹&#xff1a; 紅旗Asianux Server Linux 8.0是為云時代重新設計的操作系統&#xff0c;為云時代的到來引入了大量新功能&#xff0c;包括用于配置管理、快速遷移框架、編程語言和…