【rabbitmq基礎】

RabbitMq基礎

  • 1.概念
  • 2.數據隔離
  • 3.使用控制臺向mq傳遞消息
    • 1.創建兩個隊列-“測試隊列”,“測試隊列2”
    • 2.創建一個交換機-"測試交換機"
    • 3.測試發送消息
      • 3.1讓交換機和隊列進行綁定
      • 3.2發送消息
      • 3.3查看消息
  • 4.創建虛擬主機
  • 5.java使用rabbitmq
    • 5.1 發送消息
    • 5.2 消費消息
  • 6.任務模型work queues
  • 7.交換機
    • 7.1.為什么使用交換機
    • 7.2.交換機模型
      • 7.2.1交換機模型Fanout(廣播)
        • 7.2.1.1改造java代碼
      • 7.2.2交換機模型Direct(訂閱)
        • 7.2.2.1
      • 7.2.3交換機模型Topic()
    • 7.3.隊列和交換機的申明
  • 8.消息轉換器

1.概念

  • 消息發送者(publisher):生產消息
  • 交換機(exchange):負責路由消息,把消息路由給隊列,可以路由給一個隊列,也可以路由給多個隊列,這取決于交換機的類型
  • 隊列(queue):隊列,存儲消息
  • 消息消費者(coumsmser):消費消息
  • 虛擬主機(virtual-host):虛擬主機,數據隔離作用
    在這里插入圖片描述

2.數據隔離

在實際工作中,公司一般是在一個指定的服務器上去搭建mq,或者多個機器上去搭建集群模式,那一個公司肯定不止一個項目組,多個項目組的情況下,不可能每個項目都搞一套自己的mq,費時費力不說,維護還麻煩,所以mq就有數據隔離,多個項目組用一個環境的mq,數據不一樣而已
在這里插入圖片描述

3.使用控制臺向mq傳遞消息

1.創建兩個隊列-“測試隊列”,“測試隊列2”

在這里插入圖片描述

2.創建一個交換機-“測試交換機”

在這里插入圖片描述

3.測試發送消息

3.1讓交換機和隊列進行綁定

在這里插入圖片描述
綁定成功之后在指定的"測試隊列"中也可以看到他和交換機的綁定關系
在這里插入圖片描述

3.2發送消息

在這里插入圖片描述

在這里插入圖片描述

3.3查看消息

在這里插入圖片描述
當然你也可以使用這個交換機同時綁定創建的兩個隊列

4.創建虛擬主機

在這里插入圖片描述
在這里插入圖片描述

5.java使用rabbitmq

5.1 發送消息

接著之前的在common里面引入依賴(沒看之前的文章的直接就創建一個單體的springboot項目引入這個依賴就行)
在這里插入圖片描述

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

在用戶工程作為消息投遞方,訂單工程作為消費者,不通過交換機投遞消息,并且消費
在這里插入圖片描述

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest

userController

 @AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/sendMassage")@ApiOperation(value = "不通過交換機發送消息")public void sendMassage( String queueName ,String msg ){rabbitTemplate.convertAndSend(queueName,msg);}

接口測試
在這里插入圖片描述
查看消息
在這里插入圖片描述
在這里插入圖片描述

5.2 消費消息

order工程加配置

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest

創建orderListen

@Component
public class orderListen {@RabbitListener(queues = "測試隊列2")public void listenOrder(String msg){System.out.println("我已經接收到訂單消息:"+msg);}
}

在這里插入圖片描述

6.任務模型work queues

簡單的說就是多個消費者綁定一個隊列

  1. 創建一個隊列work.queue
  2. 生產者(用戶服務)向隊列(work.queue)中發送消息,每秒鐘100條記錄
  3. 創建兩個消費者(訂單服務)監聽隊列,一個消費者一秒鐘消費20條,一個消費者一秒鐘消費30條記錄
    生產者代碼
 @GetMapping("/sendWorkQueueMassage")@ApiOperation(value = "發送到任務模型")public void sendWorkQueueMassage() throws InterruptedException {String queueName="work.queue";for (int i = 1; i <=100 ; i++) {String msg="msg_"+i;rabbitTemplate.convertAndSend(queueName,msg);//休眠20毫秒Thread.sleep(20);}}

消費者代碼

 @RabbitListener(queues = "work.queue")public void listenWorkQueueOrder(String msg) throws InterruptedException {System.out.println("消費者1已經接收到訂單消息:"+msg);// Thread.sleep(30);}@RabbitListener(queues = "work.queue")public void listenWorkQueueOrder2(String msg) throws InterruptedException {System.err.println("消費者2已經接收到訂單消息:"+msg);//  Thread.sleep(40);}

結果:
1.隊列在被多個消費者綁定的時候,隊列會把消息輪詢分配給每一個消費者
2.消息被消費方消費之后就消失了

在這里插入圖片描述
產生的問題:
問題1.資源浪費:現實生活中,每個服務器的負載能力都是不一樣的,假如B服務器一秒鐘只能處理2個請求,A服務器一秒鐘能處理20個,那在輪詢消費的時候,假設時間過去0.3秒,B服務器還沒消費完一個消息,按照A服務器的性能,他0.3秒都可以處理好幾個了,他應該在0.05秒的時候就處理完畢一個了,但是由于輪詢他只能處理一個,這個時候A就要等著B消費完,這樣就很浪費A的服務器資源。
2.消息積壓,以上代碼,生產方發送消息到隊列,休眠時間為20毫秒,消費者1消費一個消息要30毫秒,B需要40毫秒,時間長了。生產者發的消息消費者就消費不過來

問題1處理方案:
增加配置

spring:rabbitmq:listener:direct:prefetch: 1  #保證同一時刻最多投遞一條消息給消費者

結果,因為消費者1的消費能力比消費者2要快,所有可以看到他沒有等著
在這里插入圖片描述
問題2處理方案:
很明顯能看到,兩個消費者的消費能力跟不上生產者的生產速度,所有只能再增加多個消費者,直到消費者的消費能力快過生產者的生產能力

7.交換機

7.1.為什么使用交換機

我們上面的代碼,是生產者直接連接隊列,然后消費者消費,實際業務中,你在網購平臺買東西,購買成功你的訂單微服務得知道,積分微服務得知道,購物車微服務得知道,如果按照不用交換機去做,那消息一旦被訂單服務消費了,這條消息在隊列認為就消費完畢了,直接就會刪除,造成的結果就是積分微服務就不知道了。那咋搞,所以就可以用到交換機

7.2.交換機模型

7.2.1交換機模型Fanout(廣播)

把消息放到交換機,然后交換機廣播給多個隊列(積分隊列,購物車隊列,訂單隊列),然后相應得微服務去跟相應得隊列綁定,這種方式叫做廣播
在這里插入圖片描述

7.2.1.1改造java代碼

在這里插入圖片描述

  1. 使用之前創建的交換機,測試交換機,并且綁定"測試隊列","測試隊列2"兩個隊列
    在這里插入圖片描述
  2. 編寫兩個消費者方法,分別監聽兩個隊列
    創建積分微服務
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:direct:prefetch: 1  #保證同一時刻最多投遞一條消息給消費者
@Component
public class PointsFanoutListen {@RabbitListener(queues = "測試隊列2")public void listenPoints(String msg){System.out.println("積分服務已經接收到消息:"+msg);}
}

訂單微服務微服務中監聽另外一個隊列

@Component
public class OrderFanoutListen {@RabbitListener(queues = "測試隊列")public void listenOrder(String msg){System.out.println("訂單服務已經接收到消息:"+msg);}}
  1. 編寫生產者方法,向交換機發送消息
 @GetMapping("/sendFanoutMassage")@ApiOperation(value = "發送消息到廣播交換機")public void sendFanoutMassage() throws InterruptedException {String exchangeName="測試交換機";String msg="用戶成功下單了";rabbitTemplate.convertAndSend(exchangeName,null,msg);}

測試
本地調用接口: loclahost:8001/user/sendFanoutMassage
在這里插入圖片描述
啟動兩個消費者

在這里插入圖片描述
在這里插入圖片描述

7.2.2交換機模型Direct(訂閱)

實際業務中,我可能不需要把消息發送給每個隊列,比如。我訂單交易失敗,我的積分微服務就不需要接收到這種,積分微服務只有在交易成功才做積分減少或者增加的操作,那就是我只訂閱交易成功的訂單消息

7.2.2.1
  1. 創建交換機
    在這里插入圖片描述

  2. 創建隊列
    在這里插入圖片描述

  3. 交換機跟隊列綁定
    在這里插入圖片描述

  4. 創建消費者
    消費者1:訂單服務監聽隊列1

@Component
public class PointsDirectListen {@RabbitListener(queues = "driect.queue2")public void listenPoints(String msg){System.out.println("積分服務已經接收到用戶成功下單消息:"+msg);}
}

消費者2:積分服務監聽隊列2

@Component
public class PointsDirectListen {@RabbitListener(queues = "driect.queue2")public void listenPoints(String msg){System.out.println("積分服務已經接收到用戶成功下單消息:"+msg);}
}

創建生產者用戶服務

@GetMapping("/sendDirectMassage")@ApiOperation(value = "發送消息到訂閱交換機")public void sendDirectMassage() throws InterruptedException {String exchangeName="work.dirice";String msg="用戶成功下單了";rabbitTemplate.convertAndSend(exchangeName,"red",msg);}@GetMapping("/sendDirectMassageFaild")@ApiOperation(value = "發送消息到訂閱交換機")public void sendDirectMassageFaild() throws InterruptedException {String exchangeName="work.dirice";String msg="用戶下單失敗了";rabbitTemplate.convertAndSend(exchangeName,"blue",msg);}@GetMapping("/sendDirectMassageWait")@ApiOperation(value = "發送消息到訂閱交換機")public void sendDirectMassageWait() throws InterruptedException {String exchangeName="work.dirice";String msg="用戶下單但是還未付款";rabbitTemplate.convertAndSend(exchangeName,"yellow",msg);}

分別調用三個接口.結果如下
sendDirectMassage接口兩個消費者都能接收到
sendDirectMassageFaild只有消費者1能接收到
sendDirectMassageWait只有消費者2能接收到

7.2.3交換機模型Topic()

在這里插入圖片描述
編寫案例
在這里插入圖片描述
創建綁定關系
在這里插入圖片描述
在這里插入圖片描述

7.3.隊列和交換機的申明

在之前我們都是手動在控制臺去創建隊列或者交換機,但是在真實企業中,不可能手動在控制臺去創建,而且這樣創建的,一旦中間件出問題了,所有的隊列和交換機就沒了,一般是用代碼處理。


/*** 注解的方式創建隊列* 一般在消費方創建* 1.創建一個名字叫annotate.work的且類型為TOPIC的交換機* 2.交換機綁定的隊列為annotate.queue,該隊列持久化* 3.交換機綁定的key為"red","yellow"* @param msg* @throws InterruptedException*/@Componentpublic class orderListen {@RabbitListener(bindings = @QueueBinding(value = @Queue(name ="annotate.queue",declare = "true"), //隊列名稱叫annotate.queue,且需要持久化exchange = @Exchange(name = "annotate.work",type = ExchangeTypes.TOPIC),//交換機名稱和類型key={"red","yellow"} //路由key))public void listenWorkAnnotateQueueOrder2(String msg) throws InterruptedException {System.err.println("注解方式生成的隊列收到消息:"+msg);Thread.sleep(50);}}

項目啟動之后就能直接創建相應的隊列和交換機

8.消息轉換器

1.創建一個隊列,名字叫object.queue
在這里插入圖片描述

2.創建生產者往這個隊列發送一個消息,消息的類型為map或者java對象


import com.threesum.OderApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.HashMap;@SpringBootTest(classes = OderApplication.class)
public class orderTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void sendObjectMsg(){HashMap<String, Object> msg = new HashMap<>();msg.put("name","aa");msg.put("age",21);rabbitTemplate.convertAndSend("object.queue",msg);}
}

3.觀察隊列中的消息
在這里插入圖片描述
結論:會發現變成了一堆亂碼(因為默認采用的是java的jdk序列化)

4.采用java序列化方式處理問題
4.1引入依賴

 <dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-yaml</artifactId></dependency>

4.2發送方和消費方都使用java序列化

package com.threesum.config.rabbit;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class JacksonDada {@Beanpublic MessageConverter JacksonJsonMessageConverter(){return new Jackson2JsonMessageConverter();}
}

4.3再次獲取,就轉換正常了
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/75626.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/75626.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/75626.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

加固計算機廠家 | 工業加固筆記本電腦廠家

北京魯成偉業科技發展有限公司&#xff08;以下簡稱“魯成偉業”&#xff09;成立于2005年&#xff0c;是集研發、生產、銷售與服務于一體的高新技術企業&#xff0c;專注于加固計算機、工業加固筆記本電腦及特種計算機的研發與制造。憑借20年的技術積累與行業深耕&#xff0c;…

鏈路聚合配置命令

技術信息 加入捆綁組&#xff0c;加大鏈路間帶寬等 配置命令 華三 靜態聚合 將接口加入聚合口后再進行配置 //創建靜態鏈路聚合口1&#xff0c;不啟用lacp[SWB]interface Bridge-Aggregation 1 [SWB-Bridge-Aggregation1]port link-type trunk [SWB-Bridge-Aggregation…

ekf-imu --- 四元數乘法符號 ? 的含義

? 表示四元數的乘法運算&#xff1a; 用于組合兩個四元數代表的旋轉。四元數乘法是非交換的&#xff08;即順序不同結果不同&#xff09;&#xff0c;其定義如下&#xff1a; 若兩個四元數分別為&#xff1a; qq0q1iq2jq3k, pp0p1ip2jp3k, 則它們的乘積為&#xff1a;4*1 …

論文閱讀Diffusion Autoencoders: Toward a Meaningful and Decodable Representation

原文框架圖&#xff1a; 官方代碼&#xff1a; https://github.com/phizaz/diffae/blob/master/interpolate.ipynb 主要想記錄一下模型的推理過程 &#xff1a; %load_ext autoreload %autoreload 2 from templates import * device cuda:1 conf ffhq256_autoenc() # pri…

OpenVLA-OFT——微調VLA的三大關鍵設計:并行解碼、動作分塊、連續動作表示以及L1回歸目標

前言 25年3.26日&#xff0c;這是一個值得紀念的日子&#xff0c;這一天&#xff0c;我司「七月在線」的定位正式升級為了&#xff1a;具身智能的場景落地與定制開發商 &#xff0c;后續則從定制開發 逐步過渡到 標準產品化 比如25年q2起&#xff0c;在定制開發之外&#xff0…

【論文閱讀】Dynamic Adversarial Patch for Evading Object Detection Models

一、介紹 這篇文章主要是針對目標檢測框架的攻擊&#xff0c;不同于現有的攻擊方法&#xff0c;該論文主要的側重點是考慮視角的變化問題&#xff0c;通過在車上布置多個顯示器&#xff0c;利用視角動態選擇哪一個顯示器播放攻擊內容&#xff0c;通過這種方法達到隱蔽與攻擊的…

多模態技術概述(一)

1.1 多模態技術簡介 1.1.1 什么是多模態 多模態(Multimodal)涉及多種不同類型數據或信號的處理和融合&#xff0c;每種數據類型或信號被稱為一種模態。常見的模態包括文本、圖像、音頻、視頻等。多模態技術旨在同時利用這些不同模態的數據&#xff0c;以實現更全面、更準確的理…

nginx2

Nginx反向代理(七層代理)、Nginx的TCP/UDP調度器(四層代理)、 一、Nginx反向代理(七層代理) 步驟&#xff1a; ? 部署后端web服務器集群 ? 配置Nginx代理服務器 ? 配置upstream集群池 ? 調節集群池權重比 <img src"/home/student/Deskt…

調用kimi api

官網支持python&#xff0c;curl和node.js 因為服務器剛好有php環境&#xff0c;所以先用curl調個普通的語音溝通api <?php // 定義 API Key 和請求地址 define(MOONSHOT_API_KEY, sk-PXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXgk1); define(MOONSHOT_API_URL, https://…

關于 UPDATE 語句 和 SELECT ... FOR UPDATE 的對比分析,包括語法、功能、鎖機制、使用場景及示例代碼

以下是關于 UPDATE 語句 和 SELECT ... FOR UPDATE 的對比分析&#xff0c;包括語法、功能、鎖機制、使用場景及示例代碼&#xff1a; 1. UPDATE 語句 功能 直接修改數據&#xff1a;立即更新表中的數據&#xff0c;并提交修改。無顯式鎖&#xff1a;雖然會自動加鎖&#xff…

在航電系統中提高可靠性的嵌入式軟件設計

1.總線余度設計 數據傳輸采用雙余度總線設計&#xff0c;CANFD為主&#xff0c;RS485為備。發送方將相同的數據分別通過雙總線來發送&#xff0c;接收方優先處理主線數據。由于總線上數據頻率固定&#xff0c;可設置定時器監控主總線的數據&#xff0c;當定時器超時后&#xff…

第十五屆藍橋杯大賽軟件賽省賽Python 大學 C 組:5.回文數組

題目1 回文數組 小藍在無聊時隨機生成了一個長度為 n 的整數數組&#xff0c;數組中的第 i 個數為 ai&#xff0c;他覺得隨機生成的數組不太美觀&#xff0c;想把它變成回文數組&#xff0c;也是就對于任意 i∈[1,n] 滿足 a i a n ? i 1 a_ia_{n?i}1 ai?an?i?1。 小藍…

netty中的WorkerGroup使用詳解

Netty中WorkerGroup的深度解析 WorkerGroup是Netty線程模型中的從Reactor線程組&#xff0c;負責處理已建立連接的I/O讀寫、編解碼及業務邏輯執行。其設計基于主從多Reactor模型&#xff0c;與BossGroup分工協作&#xff0c;共同實現高并發網絡通信的高效處理。 一、WorkerGro…

模運算核心性質與算法應用:從數學原理到編程實踐

目錄 &#x1f680;前言&#x1f31f;數學性質&#xff1a;模運算的理論基石&#x1f4af;基本定義&#xff1a;余數的本質&#x1f4af;四則運算規則&#xff1a;保持同余性的關鍵 &#x1f99c;編程實踐&#xff1a;模運算的工程化技巧&#x1f4af;避免數值溢出&#xff1a;…

#Git 變基(Rebase)案例

適合學習理解的 Git 變基&#xff08;Rebase&#xff09;案例 為了幫助你更好地理解 Git 變基&#xff08;Rebase&#xff09;的操作和效果&#xff0c;下面通過一個簡單的案例來演示變基的過程和影響。 案例背景 假設我們有一個 Git 倉庫&#xff0c;包含兩個分支&#xff1…

泰博云平臺solr接口存在SSRF漏洞

免責聲明&#xff1a;本號提供的網絡安全信息僅供參考&#xff0c;不構成專業建議。作者不對任何由于使用本文信息而導致的直接或間接損害承擔責任。如涉及侵權&#xff0c;請及時與我聯系&#xff0c;我將盡快處理并刪除相關內容。 漏洞描述 SSRF漏洞是一種在未能獲取服務器…

MyBatis 動態SQL 詳解!

目錄 一、 什么是動態 SQL&#xff1f;二、 為什么需要動態 SQL&#xff1f;三、 MyBatis 動態 SQL 標簽四、 標簽詳解及示例1、 if 標簽2、 choose、when、otherwise 標簽3、 where 標簽4、 set 標簽5、 foreach 標簽6、 sql、include 標簽 五、 總結 &#x1f31f;我的其他文…

阿里云服務器遭遇DDoS攻擊有爭議?

近年來&#xff0c;阿里云服務器頻繁遭遇DDoS攻擊的事件引發廣泛爭議。一方面&#xff0c;用戶質疑其防御能力不足&#xff0c;導致服務中斷甚至被迫進入“黑洞”&#xff08;清洗攻擊流量的隔離機制&#xff09;&#xff0c;輕則中斷半小時&#xff0c;重則長達24小時&#xf…

如何在Springboot的Mapper中輕松添加新的SQL語句呀?

在如今的軟件開發界&#xff0c;Spring Boot可是非常受歡迎的框架哦&#xff0c;尤其是在微服務和RESTful API的構建上&#xff0c;真的是讓人愛不釋手&#xff01;今天&#xff0c;我們就來聊聊如何為Spring Boot項目中的Mapper添加新的SQL語句吧&#xff01;說起來&#xff0…

Qt 中 findChild和findChildren綁定自定義控件

在 Qt 中&#xff0c;findChild 和 findChildren 是兩個非常實用的方法&#xff0c;用于在對象樹中查找特定類型的子對象。這兩個方法是 QObject 類的成員函數&#xff0c;因此所有繼承自 QObject 的類都可以使用它們。當您需要查找并綁定自定義控件時&#xff0c;可以按照以下…