RabbitMQ-消息中間件學習記錄(what-how-why)

  1. 什么是消息中間件
    簡單的來說就是消息隊列中間件,生產者發送消息到中間件,消息中間件用于
    保存消息并發送消息到消費者。

  2. 消息中間件RabbitMQ的基本組件
    1)producer -生產者
    2)customer -消費者
    3)broker (經紀人)- MQ服務器,管理消息對列、消息及相關消息。(接收并存儲生產者發送的消息,發送消息到消費者)
    4)exchange-交換機,將生產者的消息按照一定規則發送給對應的消息對列queue
    5)queue-消息對列,隊列,消息存放的容器,消息先進先出
    6)Message-消息,程序間的通信的數據

  3. 什么是消息隊列queue(生產者生產msg-queue,消費者監聽queue-消費)
    消息對列是一種分布式中的通信方式,它通過異步傳輸消息的方式,來解耦消 息的 生產者和消費者。在消息中間件中,生產者將消息發送到消息對列中,以為先進先出的方式,消費者從對列中取出消息(可以監聽對列是否有消息-@RabblitListener和@RabbitHandler)

  4. 消息中間件的作用
    主要有三個作用:分別是服務解耦、實現異步通信、流量削峰
    1). 服務解耦:(場景-用戶下訂單、庫存服務工作)
    例如訂單服務-用戶下訂單,庫存服務處理對應減庫存,才返回給用戶下單成功的消息。如果說庫存服務出現了問題,就會造成訂單丟失等問題。如果使用消息中間件(消息對列),可以把下的訂單信息—> mq就返回用戶下單這個,mq再發送給庫存服務,這樣生產者發送消息和消費者接收處理消息相互不影響,即使宕機了,消息還在中間件中。

2). 異步通信/異步調用:(用戶注冊新用戶,服務發送短信和郵件)
傳統的模式,用戶注冊系統新用戶,服務給用戶發送短信和郵件,三個操作都完成之后才返回用戶下單注冊的消息。因為短信和郵箱和注冊信息是沒有關系的服務,用戶注冊后消息發送給mq,用戶不需要等郵件和短信發送成功,mq直接返回用戶注冊成功,至此用戶注冊業務完成。至于短信和郵件交給mq發送給短信業務-去發送。

注意:
異步就是某線程發出請求,不需要等其他線程完成就接著完成操作。用戶注冊,消息發送給mq,不需要等短信服務完成,短信發布發送都與注冊無關,兩者是異步關系。異步不是并發,所有操作同時進行,異步是各過各的。

3). 流量削峰:(商品秒殺)
例如商品秒殺的時候,這時候數據庫并不能承受這么大的請求。可以把請求下訂單的信息暫存在mq中,返回給用戶下單成功,之后的操作由mq發送給對應的服務處理。緩存數據減少數據庫的壓力。

  1. 為什么需要使用消息中間件
    服務解耦、異步通信、流量削峰

  2. 消息中間件在分布式系統中使用場景(異步)
    6.1 服務解耦-訂單和庫存服務。用戶下訂單,消息發給mq,mq返回用戶下訂單成功,消費者-庫存服務接收mq消息再去調用減少庫存的消息。
    6.2 異步通信-用戶注冊新賬戶 用戶注冊和admin發送短信和郵件異步
    6.3 流量削峰-商品秒殺,先mq先存儲訂單信息,返回訂單服務下單成功,后慢慢處理。減少大并發對數據庫的影響/。

  3. RabbitMQ的五種消息模型/工作模式、
    1) simple 簡單的一對一模式,producce-queue-customer
    2) word模式,一個消息對列queue—> 多個消費者,消費者爭搶消息隊列里面消息,注意一個消息只能被一個消費者消費。
    3) fanout-廣播、訂閱者模式。交換機將消息發送給所有binding的對列,消費端可以有多個customer使用word模式消費對列的消息。
    4) topic-主體模式,生產者的消息按照不同的路由規則,模糊匹配給不同滿足條件的消息對列,消費者再去消費對列中消息
    5)routeKey,路由鍵(exchange-type-direct),按照不同的路由鍵發送到對應的queue中。

  4. 消息中間件是異步還是同步
    異步,各干各的,互不影響。(異步并不是并發-同時請求一個請求,而是互不影響個干各的,沒有約束和先后順序)。received生產者的message,send消息到消費者。二者是異步,解耦合互不影響。

  5. mq的消息確認機制confirm(MQ如何避免消息丟失?)

    1. . 對于生產者端來說,主要有兩種確認機制
      a. message到broker后,mq立馬確認confirm并返回消息告知生產者消息發送成功,如果失敗也告知生產者,并重新發送。
      b. message到MQ之后,如果消息對列沒有received成功(queue存儲msg成功),會確認并返回消息接收失敗到生產者
      a b 保證了生產者端不會丟失消息。

    2). 對于消費者來說。
    a. 消費者接收到queue的消息后,默認自動確認,queue刪除該message。
    b. 消費者接收到msg后,對數據進行邏輯處理,如果直接confirm-queue直接刪除msg,處理數據過程中可能會宕機消息丟失。
    ----設置為手動confirm確認收貨,數據處理完再收貨成功,queue再去刪除msg。也可以對數據不滿,退回到queue重新入隊,也可以直接刪除數據。
    c. 接收失敗告知queue,不會刪除數據,MQ重新發送消息-這種操作很常見
    這樣避免數據在消費者端丟失

1、2兩種方式避免了mq的消息丟失。

  1. MQ重復消費
    1)如何造成重復消費
    (1) 生產者端,傳輸到MQ-queue消息對列接收成功,MQ因為網絡問題沒有ack->producer,導致生產者又發送了一次消息到MQ。queue-customer-這樣msg就被消費了兩次。
    (2)消費者端,MQ-queue消息對列消息傳到customer。一種是消費者沒有接收成功,因為網絡問題沒有ack queue,queue重復發送,這種不會造成msg重復消費。另一種是消費者消費成功,但是因為不可控因素沒有ack queue,消息對列重復發送mgs-to-customer-重復消費。

    2)解決方法
    對于冪等性消息(查詢),消費者重復消費也沒有關系。
    對于非冪等性消息,消費者重復消費就會有影響了。
    方法:消費者在消費消息之前,獲取msg唯一id,到redis進行存儲判斷setnx(判斷是已經存在并存儲key-value)。

Boolen flag=stringRadisTemplate.opsForValue.setAbsent(id,value);
1-flag=true,key不存在,未被消費,c正常消費msg
2-false,key存在,已經被消費(兩種可能-正在消費或者完成消費-忘記告知ack-queue了),無論哪種情況都直接丟棄。

注意一個問題:如果redis顯示有消費記錄-且消費者正在消費,此時消費者執行業務宕機了,redis分布式鎖會成死鎖-解決方法在IfAbsent方法加上過期時間和單位。

一句話就是:消費之前,緩存中有消費記錄則丟棄消息,不二次消費。
redis緩存中沒有消費記錄則,重復存入緩存并消費(設計鎖過期時間)。

以下是消息中間件MQ的相關代碼和配置信息

  1. 使用MQ的步驟
    1)在pom文件中加上依賴amqp
 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency
2) 配置文件配置rabbit服務器的對應信息(spring.rabbitmq host、port,username,ps等)
spring.rabbitmq.host=rabbitmq服務器地址信息
spring.rabbitmq.port=端口號
spring.rabbitmq.username=賬戶name
spring.rabbitmq.password=密碼
spring.rabbitmq.virtual-host=/
#1. 生產者發送message, mq收到消息就確認回復到生產者
spring.rabbitmq.publisher-confirms=tr
#2. queue消息對列接收生產者的消息失敗,就確認返回消息到生產操者
spring.rabbitmq.publisher-returns=true
#3. 消費者接收queue消息對列的消息之后,手動確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3) 服務啟動類上面加上注解@EnableRabbit-開啟MQ
在springboot啟動類加上 @EnableRabbit-開啟MQ

4) 業務使用消息中間件存儲消息的時候
(1) 創建交換機(注意有不同類型的交換機 direct-fanout-topic)

        public void createExchange() {
//        1. 創建direct類型的exchange     交換機的名字-hello.java.exchangeDirectExchange directExchange = new DirectExchange("hello.java.exchange", true, false);
//       2. 聲明交換機amqpAdmin.declareExchange(directExchange);log.info("exchange創建成功1111", "hello.java.exchange");}

(2)創建消息隊列queue

public void createQueue() {
//       1. 創建隊列-queue  隊列名稱-hello-java-queueQueue queue = new Queue("hello.java.queue", true, false, false);
//       2. 聲明mq隊列amqpAdmin.declareQueue(queue);log.info("queue創建成功1111", "hello.java.queue");}

(3)交換機和消息隊列直接關系綁定

    public void bindEQ() {
//        1. 創建綁定對象( "hello.java.queue"--消息對列, "hello.java.exchange"--交換機,"hello.java"-綁定關系的route-key)Binding binding = new Binding("hello.java.queue",Binding.DestinationType.QUEUE,"hello.java.exchange","hello.java",null);//       2. 聲明綁定關系(這個關系實際也是一個對象)amqpAdmin.declareBinding(binding);log.info("Binding創建成功1111", "hello.java.binding");}

(4)使用MQ的操作工具類 RabbitTemplate-操作發送消息
對象注入

  @AutowiredRabbitTemplate rabbitTemplate;

生產者發送消息,需要攜帶消息-mgs和發送給哪個queue的route-key。注意發送消息需要一個唯一id,后面防止重復發送需要此id判斷

    public void sendMessageStr() throws InterruptedException {String msg = "測試數據測試數";
//        發送10條message到exchange中
//        new CorrelationData(UUID.randomUUID().toString()  發送的消息的唯一id mq可以接收并處理rabbitTemplate.convertAndSend("hello.java.exchange", "hello.java" , msg+ "11111111111111", new CorrelationData(UUID.randomUUID().toString()));rabbitTemplate.convertAndSend("hello.java.exchange", "hello.java", msg + "222222222222", new CorrelationData(UUID.randomUUID().toString()));log.info("交換機消息發送成功----------->");}
   (4)消費者監聽消息對列消息,消費消息使用@RabbitListener監聽消息對列,使用RabbitHandler接收對應類型的消息。前者放在類上面,后者放到監聽方法上面。
queues是消息對列名稱的集合
@RabbitListener(queues = {"hello.java.queue"})

使用@RabbitHandler監聽不同類型的消息

// 消息是TestEntity2 類型,會自動匹配到對應方法接收
@RabbitHandler
public void receiveOfSecond(TestEntity2 testEntity2) throws InterruptedException {System.out.println("receiveOfSecond-監聽接受queue的數據是----->" + testEntity2);
}@RabbitHandler
public void receiveOfFirst(TestEntity testEntity) throws InterruptedException {System.out.println("receiveOfFirst-監聽接受queue的數據是----->" + testEntity);
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/43228.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/43228.shtml
英文地址,請注明出處:http://en.pswp.cn/news/43228.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Java 動態數據統計圖】動態數據統計思路案例(動態,排序,數組)四(116)

需求&#xff1a;&#xff1a;前端根據后端的返回數據&#xff1a;畫統計圖&#xff1b; 1.動態獲取地域數據以及數據中的平均值&#xff0c;按照平均值降序排序&#xff1b; 說明&#xff1a; X軸是動態的&#xff0c;有對應區域數據則展示&#xff1b; X軸 區域數據降序排序…

LabVIEW調用DLL傳遞結構體參數

LabVIEW 中調用動態庫接口時&#xff0c;如果是值傳遞的結構體&#xff0c;可以根據字段拆解為多個參數&#xff1b;如果參數為結構體指針&#xff0c;可用簇&#xff08;Cluster&#xff09;來匹配&#xff0c;其內存連續相當于單字節對齊。 1.值傳遞 接口定義&#xff1a; …

【FAQ】調用視頻匯聚平臺EasyCVR的iframe地址,視頻無法播放的原因排查

有用戶反饋&#xff0c;在調用iframe地址后嵌入用戶自己的前端頁面&#xff0c;視頻無法播放并且要求登錄。 安防監控視頻匯聚平臺EasyCVR基于云邊端一體化架構&#xff0c;具有強大的數據接入、處理及分發能力&#xff0c;可提供視頻監控直播、云端錄像、視頻云存儲、視頻集中…

視頻集中存儲EasyCVR視頻匯聚平臺定制項目增加AI智能算法

安防視頻集中存儲EasyCVR視頻匯聚平臺&#xff0c;可支持海量視頻的輕量化接入與匯聚管理。平臺能提供視頻存儲磁盤陣列、視頻監控直播、視頻輪播、視頻錄像、云存儲、回放與檢索、智能告警、服務器集群、語音對講、云臺控制、電子地圖、平臺級聯、H.265自動轉碼等功能。為了便…

【Unity每日一記】Physics.Raycast 相關_Unity中的“X光射線”

&#x1f468;?&#x1f4bb;個人主頁&#xff1a;元宇宙-秩沅 &#x1f468;?&#x1f4bb; hallo 歡迎 點贊&#x1f44d; 收藏? 留言&#x1f4dd; 加關注?! &#x1f468;?&#x1f4bb; 本文由 秩沅 原創 &#x1f468;?&#x1f4bb; 收錄于專欄&#xff1a;uni…

05_bitmaphyperloglogGEO

Bitmap&hyperloglog&GEO 面試問 記錄對集合中的數據進行統計在移動應用中&#xff0c;需要統計每天的新增用戶數和第2天的留存用戶數&#xff1b;在電商網站的商品評論中&#xff0c;需要統計評論列表中的最新評論&#xff1a;在簽到打卡中&#xff0c;需要統計一個月內…

Python “貪吃蛇”游戲,在不斷改進中學習pygame編程

目錄 前言 改進過程一 增加提示信息 原版幫助摘要 pygame.draw pygame.font class Rect class Surface 改進過程二 增加顯示得分 改進過程三 增加背景景樂 增加提示音效 音樂切換 靜音切換 mixer.music.play 注意事項 原版幫助摘要 pygame.mixer pygame.mix…

kvm和vmware有什么區別?如何選擇?

一、kvm和vmware的區別 VMware vSphere 平臺 VMware 可以提供 ESXi 虛擬機監控程序和 vSphere 虛擬化平臺。VMware ESXi 是一個能夠直接安裝到物理服務器上的裸機虛擬機監控程序&#xff0c;可以幫你整合硬件。你可以用 VMware 的虛擬化技術來創建和部署虛擬機&#xff08;VM…

HTML詳解連載(7)

HTML詳解連載&#xff08;7&#xff09; 專欄鏈接 [link](http://t.csdn.cn/xF0H3)下面進行專欄介紹 開始嘍結構偽類選擇器作用 :nth-child&#xff08;公式&#xff09;作用舉例 偽元素選擇器作用注意&#xff1a; PxCoook作用盒子模型-重要組成部分 盒子模型-邊框線屬性名屬性…

excel中定位條件,excel中有哪些數據類型、excel常見錯誤值、查找與替換

一、如何定位條件 操作步驟&#xff1a;開始 - 查找和選擇 - 定位條件&#xff08;ctrl G 或 F5&#xff09; 注&#xff1a;如果F5不可用&#xff0c;可能是這個快捷鍵被占用了 案例&#xff1a;使用定位條件選擇取余中空單元格&#xff0c;填入100&#xff0c;按組合鍵ct…

【LeetCode75】第三十三題 二叉樹的最大深度

目錄 題目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代碼&#xff1a; 題目&#xff1a; 示例&#xff1a; 分析&#xff1a; 從這一題開始&#xff0c;LeetCode75進入到了二叉樹章節。 這邊建議不熟悉二叉樹的小伙伴可以先去做做力扣的前序遍歷&#xff0c;中序遍…

使用git rebase 之后的如何恢復到原始狀態

我們常常喜歡使用git rebase去切換分支提交代碼,操作流程就是: 先切換分支:比如當前是master 我們修改了一堆代碼產生一個commit id :5555555567777 那么我們常常比較懶就直接切換了:git checkout dev 然后呢?使用命令git rebase 5555555567777,想把這筆修改提交到d…

iPhone上的個人熱點丟失了怎么辦?如何修復iPhone上不見的個人熱點?

個人熱點功能可將我們的iPhone手機轉變為 Wi-Fi 熱點&#xff0c;有了Wi-Fi 熱點后就可以與附近的其他設備共享其互聯網連接。 一般情況下&#xff0c;個人熱點打開就可以使用&#xff0c;但也有部分用戶在升級系統或越獄后發現 iPhone 的個人熱點消失了。 iPhone上的個人熱點…

antd5源碼調試環境搭建(window系統)

將antd源碼克隆至本地 $ git clone gitgithub.com:ant-design/ant-design.git $ cd ant-design $ npm install $ npm start前提安裝python3、安裝node版本18版本 不然后續安裝依賴會報python3相關的錯誤。 項目需要使用git 初始化 不然會報husky相關的錯誤 git init重新安…

【論文解讀】Hybrid-SORT: Weak Cues Matter for Online Multi-Object Tracking

因為Hybrid-SORT的baseline是基于OCSORT進行改進的&#xff0c;在這之前建議先了解byteTrack和【】的相關知識 1.介紹 1.1 基本框架 多目標跟蹤(MOT)將問題分為兩個子任務。第一個任務是檢測每個幀中的對象。第二個任務是將它們在不同的框架中聯系起來。關聯任務主要通過顯式…

RabbitMq-發布確認高級(避坑指南版)

在初學rabbitMq的時候&#xff0c;伙伴們肯定已經接觸到了“發布確認”的概念&#xff0c;但是到了后期學習中&#xff0c;會接觸到“springboot”中使用“發布確認”高級的概念。后者主要是解決什么問題呢&#xff1f;或者是什么樣的場景引出這樣的概念呢&#xff1f; 在生產環…

day45 ● 70. 爬樓梯 (進階)● 322. 零錢兌換 ● 279.完全平方數

70. 爬樓梯 class Solution {public int climbStairs(int n) {if(n <2) return n;int[] dp new int [n];dp[0] 1;dp[1] 2;for(int i 2; i< n;i){dp[i] dp[i-1] dp[i-2];}return dp[n-1];} } 322. 零錢兌換 class Solution {public int coinChange(int[] coins, in…

為什么kafka 需要 subscribe 的 group.id?我們是否需要使用 commitSync 手動提交偏移量?

目錄 一、為什么需要帶有 subscribe 的 group.id二、我們需要使用commitSync手動提交偏移量嗎&#xff1f;三、如果我想手動提交偏移量&#xff0c;該怎么做&#xff1f; 一、為什么需要帶有 subscribe 的 group.id 消費概念&#xff1a; Kafka 使用消費者組的概念來實現主題的…

vscode | linux | c++ intelliense 被棄用解決方案

每日一句&#xff0c;vscode用的爽是爽&#xff0c;主要是可配置太強了。如果也很會研究&#xff0c;可以直接去咸魚接單了 廢話少說&#xff0c;直接整。 用著用著說是c intelliense被棄用&#xff0c;很多輔助功能無法使用&#xff0c;像查看定義、查看引用、函數跳轉、智能提…

基于Rust的QuickLZ壓縮算法的詳細實現與分析

1. 引言 QuickLZ是一種被廣泛應用的高效壓縮算法。在許多應用中&#xff0c;快速的數據壓縮和解壓縮是非常關鍵的&#xff0c;特別是在網絡傳輸和存儲空間有限的場景中。為了滿足現代軟件開發的需求&#xff0c;我們將使用Rust語言來實現這一算法。Rust是一種專為系統級編程而…