RabbitMQ的常見工作模式

Work queues 工作隊列模式

模式說明

通過Helloworld工程我們已經能夠構建一個簡單的消息隊列的基本項目,項目中存在幾個角色:生產 者、消費者、隊列,而對于我們真實的開發中 ,對于消息的消費者通過是有多個的。

比如在實現用戶注冊功能時,用戶注冊成功,會給響對應用戶發送郵件,同時給用戶發送手機短信,告訴用戶已成功注冊網站或者app 應用,這種功能在大部分項目開發中都比較常見 ,而對于helloworld 的應用中雖然能夠對 消息進行消費,但是有很大問題: 消息消費者只有一個,當消息量非常大時,單個消費者處理消息就會變得很慢,同時給節點頁帶來很大壓力,導致消息堆積越來越多。對于這種情況,RabbitMQ 提供了工作 隊列模式,通過工作隊列提供做個消費者,對MQ產生的消息進行消費,提高MQ消息的吞吐率,降低消息的處理時間。處理模型圖如下。

在這里插入圖片描述

實現步驟

生產者

package cn.wolfcode.java.rabbitmq._02worker;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;/*** Created by wolfcode-fanjialong*/public class NewTask {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.142.129");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);String message = "tasequeue";for(int i=0;i<20;i++){channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,(message+i).getBytes("UTF-8"));}System.out.println(" [x] Sent '" + message + "'");}}
}

消費者

package cn.wolfcode.java.rabbitmq._02worker;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.util.concurrent.TimeUnit;/*** Created by wolfcode-fanjialong*/public class Worker {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.142.129");final Connection connection = factory.newConnection;final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}finally {channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });}}
簡單問題說明

從結果可以看出消息被平均分配到兩個消費方,來對消息進行處理,提高了消息處理效率,創建多個消費者來對消息進行處理。這里RabitMQ采用輪詢來對消息進行分發時保證了消息被平均分配到每個消費方 。

但是引入新的問題:真正的生產環境下,對于消息的處理基本不會像我們現在看到的這樣,每個消 費方處理的消息數量是平均分配的,比如因為網絡原因,機器cpu ,內存等硬問題,消費方處理消息時 同類消息不同機器進行處理時消耗時間也是不一樣的,比如1號消費者消費1條消息時1秒,2號消費者消費1條消息是5秒,對于1號消費者比2號消費者處理消息快,那么在分配消息時就應該讓1號消費者多收 到消息進行處理,也即是我們通常所說的”能者多勞”,同樣Rabbitmq對于這種消息分配模式提供了支持。

問題: 任務量很大,消息雖然得到了及時的消費,單位時間內消息處理速度加快,提高了吞吐量,可 是不同消費者處理消息的時間不同,導致部分消費者的資源被浪費。

解決:采用消息公平分發。

總結:工作隊列消息輪詢分發消費者收到的消息數量平均分配,單位時間內消息處理速度加快,提高了吞吐量。

工作模式隊列-消息公平分發(fair dispatch)

在案例01中對于消息分發采用的是默認輪詢分發,消息應答采用的自動應答模式,這是因為當消息進 入隊列,RabbitMQ就會分派消息。它不看消費者為應答的數目,只是盲目的將第n條消息發給第n個消費者。

為了解決這個問題,我們使用 basicQos(prefetchCount = 1) 方法,來限RabbitMQ只發不超過1條的消息給同一個消費者。當消息處理完畢后,有了反饋,才會進行第二次發送。執行模型圖如下:

在這里插入圖片描述

Pub/Sub 訂閱模式

模式說明

在這里插入圖片描述

在訂閱模型中,多了一個 Exchange 角色,而且過程略有變化:

P:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)

C:消費者,消息的接收者,會一直等待消息到來

Queue:消息隊列,接收消息、緩存消息

Exchange:交換機(X)。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、

遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:

  • Fanout:廣播,將消息交給所有綁定到交換機的隊列

  • Direct:定向,把消息交給符合指定routing key 的隊列

  • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列

Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與 Exchange 綁定,或者沒有符合路由規則的隊列,那么消息會丟失!

實現步驟

生產者

package cn.wolfcode.java.rabbitmq._03pubsub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** Created by wolfcode-fanjialong*/
public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "info: Hello World!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}

消費者

package cn.wolfcode.java.rabbitmq._03pubsub;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;/*** Created by wolfcode-fanjialong*/public class ReceiveLogs {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

Routing 路由模式

模式說明

隊列與交換機的綁定,不能是任意綁定了,而是要指定一個 RoutingKey(路由key)

消息的發送方在向 Exchange 發送消息時,也必須指定消息的 RoutingKey

Exchange 不再把消息交給每一個綁定的隊列,而是根據消息的 Routing Key 進行判斷,只有隊列的 Routingkey 與消息的 Routing key 完全一致,才會接收到消息
在這里插入圖片描述

圖解

P:生產者,向 Exchange 發送消息,發送消息時,會指定一個routing key

X:Exchange(交換機),接收生產者的消息,然后把消息遞交給與 routing key 完全匹配的隊列

C1:消費者,其所在隊列指定了需要 routing key 為 error 的消息

C2:消費者,其所在隊列指定了需要 routing key 為 info、error、warning 的消息

實現步驟

生產者

package cn.wolfcode.java.rabbitmq._04rounting;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionF![在這里插入圖片描述](https://img-blog.csdnimg.cn/direct/6bffe712644346098952dba200523856.png)
actory;
/*** Created by wolfcode-fanjialong*/public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String severity = "info";String message = "directMsg";channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + severity + "':'" + message + "'");}}
}

消費者

package cn.wolfcode.java.rabbitmq._04rounting;import com.rabbitmq.client.*;/*** Created by wolfcode-fanjialong*/public class ReceiveLogsDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "error");channel.queueBind(queueName, EXCHANGE_NAME, "warning");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}}

Topic 模式

模式介紹

Topic 類型與 Direct 相比,都是可以根據 RoutingKey 把消息路由到不同的隊列。只不過 Topic 類型

Exchange 可以讓隊列在綁定 Routing key 的時候使用通配符

Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert

通配符規則:# 匹配一個或多個詞,* 匹配不多不少恰好1個詞,例如:item.# 能夠匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert

在這里插入圖片描述

圖解:

紅色 Queue:綁定的是 usa.# ,因此凡是以 usa. 開頭的 routing key 都會被匹配到

黃色 Queue:綁定的是 #.news ,因此凡是以 .news 結尾的 routing key 都會被匹配

實現步驟

生產者

package cn.wolfcode.java.rabbitmq._05topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** Created by wolfcode-fanjialong*/public class EmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");String routingKey = "order1.save";String message = "topicMsg";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");}}}

消費者

package cn.wolfcode.java.rabbitmq._05topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;/*** Created by wolfcode-fanjialong*/public class ReceiveLogsTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "order.*");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/712745.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/712745.shtml
英文地址,請注明出處:http://en.pswp.cn/news/712745.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【OpenCV】繪制桌面鎖屏時鐘

OpenCV 是一個開源的計算機視覺&#xff08;Computer Vision&#xff09;與機器學習軟件庫&#xff0c;提供了多種圖像處理算法與接口&#xff0c;用于解決計算機視覺相關問題。OpenCV 支持多平臺與多語言&#xff0c;本文主要記錄如何使用 OpenCV-Python 繪制桌面鎖屏時鐘。 目…

一、環境配置

一、下載Ubuntu18.04版本鏡像 我的電腦配置比較低(08年奧運限定版哦)&#xff0c;使用的是虛擬機VMware進行安裝Ubuntu18.04版&#xff0c;跟書上使用的一樣 Ubuntu 18.04鏡像 別下載錯了哈 二、VMware下安裝Ubuntu18.04操作系統 之前寫過相關的博文&#xff0c;詳細配置可…

軟件測試——性能測試

目錄 一、性能測試簡介 二、性能測試指標 三、性能測試的流程 四、Jmeter簡介 五、Jmeter常見測試框架 六、Jmeter錄制腳本 七、腳本增強 八、Jmeter腳本參數化 九、斷言 十、關聯 十一、JDBC請求 十二、分布式測試 十三、性能測試報告 一、性能測試簡介…

API接口技術開發拼多多接口鏈接采集實時銷量、SKU信息、券后價格、優惠券信息API接入步驟及演示示例

要實現拼多多接口鏈接采集實時銷量、SKU信息、券后價格、優惠券信息API接入步驟及演示示例&#xff0c;首先需要了解拼多多開放平臺的相關API接口。以下是一個簡單的Java示例&#xff0c;使用requests庫調用拼多多開放平臺的API接口&#xff1a; 注冊并登錄開放平臺&#xff0c…

【LeetCode】一周中的第幾天+ 一年中的第幾天

2023-12-30 文章目錄 一周中的第幾天方法一&#xff1a;模擬思路步驟 方法二&#xff1a;調用庫函數方法三&#xff1a;調用庫函數 [1154. 一年中的第幾天](https://leetcode.cn/problems/day-of-the-year/)方法一&#xff1a;直接計算思路&#xff1a; 方法二&#xff1a;調用…

react Hook useEffect的基本使用及注解

一、概述 useEffect是React提供的一個Hook&#xff0c;用于在函數組件中執行副作用操作。副作用操作通常包括數據獲取、訂閱事件、手動操作DOM等。useEffect可以讓我們在函數組件中模擬類組件的生命周期方法&#xff08;如componentDidMount、componentDidUpdate、componentWi…

vue3+ts實現圖片預覽功能

首先&#xff0c;需要創建一個Vue組件&#xff0c;用于顯示圖片和預覽圖。 <template><div><img :src"imageSrc" mouseover"showPreview" mouseout"hidePreview" /><div v-if"previewVisible" class"previ…

【計算機考研】學碩還是專碩?

這里面沒有什么更好的選擇&#xff0c;大家都知道專碩錄取人數多&#xff0c;學碩錄取人數上&#xff0c;博弈一直存在 單就考研的難度上來說&#xff0c;專碩的難度確實要比學說低很多的&#xff0c;同樣的專業課的情況下&#xff0c;專碩的考研數學考的是數學二和英語二&…

rk3568 gamc0 控制器寄存器配置不了導致連接不上phy

系統啟動 GMAC 的驅動開機 log 上出現打印&#xff1a;No PHY found 或者 Cannot attach to PHY。查閱rk 官方gmac 配置指導手冊出現改問題的原因可能如下&#xff1a; 但反復檢查硬件和軟件發現都沒問題。看內核啟動日志發現gamc0 在啟動過程中讀取gmac0 的版本id是讀取不到&a…

P2440 木材加工

題目背景 要保護環境 題目描述 木材廠有 &#xfffd;n 根原木&#xff0c;現在想把這些木頭切割成 &#xfffd;k 段長度均為 &#xfffd;l 的小段木頭&#xff08;木頭有可能有剩余&#xff09;。 當然&#xff0c;我們希望得到的小段木頭越長越好&#xff0c;請求出 &a…

表格自定義音頻播放組件

1.在使用的地方調用 <Audiosvue ref"audio" class"audio-box" :audioFile"item.cont"></Audiosvue> 2.引入組件 import Audiosvue from "/components/audio"; 3.組件 <template><div><div style"d…

優化Vue項目中 WebStorm:2023.3 對 CSS 和 HTML 的默認注釋

前言 WebStorm是一種基于JetBrains IntelliJ平臺的集成開發環境&#xff08;IDE&#xff09;&#xff0c;專門用于Web開發。它支持JavaScript、HTML和CSS等語言&#xff0c;并提供了豐富的功能和工具&#xff0c;以提高開發人員的效率。但是&#xff0c;在開發Vue項目中發現&a…

(科目三)計算機操作系統

1、操作系統的基本概念及功能 1.1概念 一、操作系統的概念 操作系統是運行在裸機之上的&#xff0c;控制和管理計算機硬件和軟件資源、合理組織計算機工作流程一級方便用戶操作的程序集合。 操作系統由一套分層次的控制程序組成&#xff0c;是計算機硬件的一級擴充是軟件系…

【爬蟲逆向實戰 逆向滑塊 Python+Node】今天逆向的網站有點嘿嘿,還是僅供學習,別瞎搞

逆向日期&#xff1a;2024.03.01 使用工具&#xff1a;Node.js、Python 加密方法&#xff1a;AES標準算法 文章全程已做去敏處理&#xff01;&#xff01;&#xff01; 【需要做的可聯系我】 AES解密處理&#xff08;直接解密即可&#xff09;&#xff08;crypto-js.js 標準算…

Java底層自學大綱_互聯網安全設計篇

互聯網安全設計專題_自學大綱所屬類別學習主題建議課時&#xff08;h&#xff09; A 互聯網安全架構平臺設計001 如何防御XSS攻擊與防止抓包篡改數據2.5 A 互聯網安全架構平臺設計002 spring security實現動態權限控制2.5 A 互聯網安全架構平臺設計003 spring security整合j…

基于RISC-V架構的通信DSP的設計以及在5G RedCap基帶中的應用(六)-結論與展望

6 結論與展望 6.1 研究成果總結 基于RISC-V架構的向量指令集和通訊擴展指令集在5G Redcap基帶處理器中的應用&#xff0c;可以有效提升處理器在處理大量數據時的性能以及滿足特定應用需求的無線通訊能力。隨著5G技術的發展&#xff0c;對于滿足特定應用需求的行業&#xff0c;…

CentOS7 Mysql 忘記密碼或臨時密碼進不去時怎么跳過密碼進去然后再更改密碼

CentOS7 Mysql 忘記密碼或臨時密碼進不去時怎么跳過密碼進去然后再更改密碼 1、進文件 vi /etc/my.cnf2、加skip-grant-tables設置跳過密碼 在[mysqld]下面加 skip-grant-tables3、mysql -u root -p直接回車無密碼進去mysql mysql -u root -p3、先更新&#xff0c;不執行這…

實現unity場景切換

本文實現兩個按鍵實現場景1和場景2之間的切換 ①首先在unity 3D中創建兩個場景&#xff0c;分別為Scene1和Scene2 ②在Scene1中創建一個Button&#xff0c;修改txt內容為“To Scene2”&#xff0c;并在Buttons下創建一個空物體&#xff0c;用于掛載腳本。 腳本Trans Scene.…

操作系統x面試|進程與線程

1. 線程進程的區別 進程可以稱為是資源分配的最小單元&#xff0c;而線程可以稱為是處理器分配的最小單元。 資源包括內存空間。同時進程是一段代碼的執行過程&#xff0c;這段代碼需要多少的內存在代碼確定時已經確定下來了。 處理器就是執行單元&#xff0c;一個進程可以拆解…

程序員的金三銀四求職寶典

程序員的金三銀四求職寶典 金三銀四&#xff0c;即三月和四月&#xff0c;是程序員求職的黃金時期。在這個關鍵時期&#xff0c;求職者們需要做好充分的準備&#xff0c;以便在面試中脫穎而出。以下是一些建議&#xff0c;幫助程序員在金三銀四期間更好地準備求職&#xff1a;…