詳解RabbitMQ工作模式之工作隊列模式

目錄

工作隊列模式

概念

特點

應用場景

工作原理

注意事項

代碼案例

引入依賴

常量類

編寫生產者代碼

編寫消費者1代碼

編寫消費者2代碼

先運行生產者,后運行消費者

先運行消費者,后運行生產者


工作隊列模式

概念

在工作隊列模式中,一個生產者(producer)將任務發布到隊列中,多個消費者(consumer)從隊列中獲取任務并執行。這種模式的主要目標是提高任務的并行處理能力,從而提高系統的吞吐量和效率。

特點

可以有多個消費者,但一條消息只能被一個消費者獲取。
消費者在處理完某條消息后,才會收到下一條消息。
RabbitMQ采用輪詢(Round-Robin)或公平分發(Fair Dispatch)的方式將消息發送給消費者。?

應用場景

1.任務分發:將任務分發給多個工作者(消費者),以便并行處理。這對于需要高吞吐量和任務處理效率的應用程序非常有用。例如,圖像處理、視頻編碼、數據轉換等應用可以使用工作隊列模式來并行處理大量任務。
2.負載均衡:當有多個消費者時,工作隊列模式可以用來實現負載均衡。任務將均勻分布給可用的消費者,以確保每個消費者都有工作可做,而且不會超負荷。
3.后臺任務處理:在Web應用程序中,后臺任務處理是一個常見的需求。工作隊列模式可用于處理與Web請求無關的長時間運行任務,而不會影響用戶體驗。例如,發送電子郵件、生成報告、備份數據等后臺任務可以使用工作隊列來處理。

工作原理

1.生產者發送任務:生產者將任務封裝為消息,并將其發送到RabbitMQ隊列中。
2.RabbitMQ分發任務:RabbitMQ根據配置的分發策略(如輪詢或公平分發)將任務分發給消費者。
3.消費者處理任務:消費者從隊列中獲取任務并執行。在處理完任務后,消費者會向RabbitMQ發送確認消息,表示任務已完成。
4.RabbitMQ確認任務完成:在收到消費者的確認消息后,RabbitMQ會將該任務從隊列中移除。

注意事項

1.消息確認:為了確保消息不會丟失,消費者在處理完任務后需要向RabbitMQ發送確認消息。如果消費者在處理任務時失敗或崩潰,RabbitMQ會將該任務重新分發給其他消費者。
2.負載均衡:RabbitMQ默認采用輪詢方式將消息分發給消費者。如果需要更復雜的負載均衡策略,可以考慮使用其他分發策略或自定義交換機類型。
3.錯誤處理:在生產者和消費者中都需要添加適當的錯誤處理邏輯,以處理可能出現的異常情況,如連接失敗、消息發送失敗等。

代碼案例
引入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version>
</dependency>
常量類
public class Constants {public static final String HOST = "47.98.109.138";public static final int PORT = 5672;public static final String USER_NAME = "study";public static final String PASSWORD = "study";public static final String VIRTUAL_HOST = "aaa";//工作隊列模式public static final String WORK_QUEUE = "work.queue";
}
編寫生產者代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列   使用內置的交換機//如果隊列不存在, 則創建, 如果隊列存在, 則不創建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 發送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue...."+i;channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("消息發送成功~");//6. 資源釋放channel.close();connection.close();}
}
編寫消費者1代碼
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列   使用內置的交換機//如果隊列不存在, 則創建, 如果隊列存在, 則不創建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消費消息DefaultConsumer consumer = new DefaultConsumer(channel){//從隊列中收到消息, 就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//6. 資源釋放
//        channel.close();
//        connection.close();}
}
編寫消費者2代碼
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前開放端口號connectionFactory.setUsername(Constants.USER_NAME);//賬號connectionFactory.setPassword(Constants.PASSWORD);  //密碼connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虛擬主機Connection connection = connectionFactory.newConnection();//2. 開啟信道Channel channel = connection.createChannel();//3. 聲明隊列   使用內置的交換機//如果隊列不存在, 則創建, 如果隊列存在, 則不創建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消費消息DefaultConsumer consumer = new DefaultConsumer(channel){//從隊列中收到消息, 就會執行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//TODOSystem.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//        //6. 資源釋放
//        channel.close();
//        connection.close();}
}
先運行生產者,后運行消費者

查看管理界面

我們此時會看到,先啟動的消費者會消費掉隊列中所有的消息。

先運行消費者,后運行生產者

此時我們能看到,兩個消費者都能夠消費消息。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/78970.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/78970.shtml
英文地址,請注明出處:http://en.pswp.cn/web/78970.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

數據結構-非線性結構-二叉樹

概述 /** * 術語 * 根節點&#xff08;root node&#xff09;&#xff1a;位于二叉樹頂層的節點&#xff0c;沒有父節點。 * 葉節點&#xff08;leaf node&#xff09;&#xff1a;沒有子節點的節點&#xff0c;其兩個指針均指向 None 。 * 邊&#xff08;edge&#xff09;&…

芯片筆記 - 手冊參數注釋

芯片手冊參數注釋 基礎參數外圍設備USB OTG&#xff08;On-The-Go&#xff09;以太網存儲卡&#xff08;SD&#xff09;SDIO 3.0(Secure Digital Input/Output)GPIO&#xff08;General Purpose Input/Output 通用輸入/輸出接口&#xff09;ADC&#xff08;Analog to Digital C…

力扣94. 二叉樹的中序遍歷

94. 二叉樹的中序遍歷 給定一個二叉樹的根節點 root &#xff0c;返回 它的 中序 遍歷 。 示例 1&#xff1a; 輸入&#xff1a;root [1,null,2,3] 輸出&#xff1a;[1,3,2]示例 2&#xff1a; 輸入&#xff1a;root [] 輸出&#xff1a;[]示例 3&#xff1a; 輸入&#…

深度學習:AI為老年癡呆患者點亮希望之光

引言 隨著全球人口老齡化進程的加速&#xff0c;老年癡呆癥已成為嚴重威脅老年人健康和生活質量的公共衛生問題。據世界衛生組織統計&#xff0c;全球每 3 秒鐘就有 1 人被診斷為癡呆&#xff0c;預計到 2050 年&#xff0c;全球癡呆患者人數將從目前的約 5000 萬激增至 1.52 億…

拋物線法(二次插值法)

拋物線法簡介 拋物線法&#xff08;Quadratic Interpolation Method&#xff09;是一種用于一維單峰函數極值搜索的經典優化方法。該方法通過在區間內選取三個不同的點&#xff0c;擬合一條二次拋物線&#xff0c;并求取這條拋物線的極值點作為新的迭代點&#xff0c;從而逐步…

FreeRTOS如何檢測內存泄漏

在嵌入式系統中&#xff0c;內存資源通常非常有限&#xff0c;內存泄漏可能導致系統性能下降甚至崩潰。內存泄漏是指程序分配的內存未被正確釋放&#xff0c;逐漸耗盡可用內存。 FreeRTOS作為一種輕量級實時操作系統&#xff08;RTOS&#xff09;&#xff0c;廣泛應用于資源受限…

Mockoon 使用教程

文章目錄 一、簡介二、模擬接口1、Get2、Post 一、簡介 1、Mockoon 可以快速模擬API&#xff0c;無需遠程部署&#xff0c;無需帳戶&#xff0c;免費&#xff0c;跨平臺且開源&#xff0c;適合離線環境。 2、支持get、post、put、delete等所有格式。 二、模擬接口 1、Get 左…

如何進行APP安全加固

進行APP安全加固的關鍵在于代碼混淆、加密敏感數據、權限管理、漏洞掃描與修復。其中&#xff0c;代碼混淆能有效阻止逆向工程與篡改攻擊&#xff0c;提升應用的安全防護能力。通過混淆代碼&#xff0c;攻擊者難以輕易理解源代碼邏輯&#xff0c;從而降低被破解或攻擊的風險。 …

【C++】手搓一個STL風格的string容器

C string類的解析式高效實現 GitHub地址 有夢想的電信狗 1. 引言&#xff1a;字符串處理的復雜性 ? 在C標準庫中&#xff0c;string類作為最常用的容器之一&#xff0c;其內部實現復雜度遠超表面認知。本文將通過一個簡易仿照STL的string類的完整實現&#xff0c;揭示其設…

辰鰻科技朱越洋:緊扣時代契機,全力投身能源轉型戰略賽道

國家能源局于4月28日出臺的《關于促進能源領域民營經濟發展若干舉措的通知》&#xff08;以下簡稱《通知》&#xff09;&#xff0c;是繼2月民營企業座談會后深化能源領域市場化改革的關鍵政策&#xff0c;標志著民營經濟在“雙碳”目標引領下正式進入能源轉型的核心賽道。 自…

Vue實現不同網站之間的Cookie共享功能

前言 最近有小伙伴在聊天室中提到這么一個需求&#xff0c;就是說希望用戶在博客首頁中登錄了之后&#xff0c;可以跳轉到管理系統去發布文章。這個需求的話就涉及到了不同網站之間cookie共享的功能&#xff0c;那么咱們就來試著解決一下這個功能。 實現方式 1. 后端做中轉 …

在一臺服務器上通過 Nginx 配置實現不同子域名訪問靜態文件和后端服務

一、域名解析配置 要實現通過不同子域名訪問靜態文件和后端服務&#xff0c;首先需要進行域名解析。在域名注冊商或 DNS 服務商處&#xff0c;為你的兩個子域名 blog.xxx.com 和 api.xxx.com 配置 A 記錄或 CNAME 記錄。將它們的 A 記錄都指向你服務器的 IP 地址。例如&#x…

Opencv進階操作:圖像拼接

文章目錄 前言一、圖像拼接的原理1. 特征提取與匹配2. 圖像配準3. 圖像變換與投影4. 圖像融合5. 優化與后處理 二、圖像拼接的簡單實現&#xff08;案例實現&#xff09;1.引入庫2.定義cv_show()函數3.創建特征檢測函數detectAndDescribe()4.讀取拼接圖片5.計算圖片特征點及描述…

LLM 論文精讀(三)Demystifying Long Chain-of-Thought Reasoning in LLMs

這是一篇2025年發表在arxiv中的LLM領域論文&#xff0c;主要描述了長思維鏈 Long Chain-of-Thought 對LLM的影響&#xff0c;以及其可能的生成機制。通過大量的消融實驗證明了以下幾點&#xff1a; 與shot CoT 相比&#xff0c;long CoT 的 SFT 可以擴展到更高的性能上限&…

計算機網絡常識:緩存、長短連接 網絡初探、URL、客戶端與服務端、域名操作 tcp 三次握手 四次揮手

緩存&#xff1a; 緩存是對cpu&#xff0c;內存的一個節約&#xff1a;節約的是網絡帶寬資源 節約服務器的性能 資源的每次下載和請求都會造成服務器的一個壓力 減少網絡對資源拉取的延遲 這個就是瀏覽器緩存的一個好處 表示這個html頁面的返回是不要緩存的 忽略緩存 需要每次…

《構建社交應用用戶激勵引擎:React Native與Flutter實戰解析》

React Native憑借其與JavaScript和React的緊密聯系&#xff0c;為開發者提供了一個熟悉且靈活的開發環境。在構建用戶等級體系時&#xff0c;它能夠充分利用現有的前端開發知識和工具。通過將用戶在社交應用中的各種行為進行量化&#xff0c;比如發布動態的數量、點贊評論的次數…

接口自動化測試框架詳解(pytest+allure+aiohttp+ 用例自動生成)

&#x1f345; 點擊文末小卡片&#xff0c;免費獲取軟件測試全套資料&#xff0c;資料在手&#xff0c;漲薪更快 近期準備優先做接口測試的覆蓋&#xff0c;為此需要開發一個測試框架&#xff0c;經過思考&#xff0c;這次依然想做點兒不一樣的東西。 接口測試是比較講究效…

Linux-----文件系統

文件大家都知道&#xff0c;前面的我的博客課程也為大家解釋了關于文件的打開等&#xff0c;今天我們要談論的是 文件在沒被打開的時候在磁盤中的位置和找到它的方式。 畫圖為大家展示&#xff1a; 方便理解 我們從下面幾個方面入手&#xff1a; 1. 看看物理磁盤 2. 了解一…

C++ set替換vector進行優化

文章目錄 demo代碼解釋&#xff1a; 底層原理1. 二叉搜索樹基礎2. 紅黑樹的特性3. std::set 基于紅黑樹的實現優勢4. 插入操作5. 刪除操作6. 查找操作 demo #include <iostream> #include <set>int main() {// 創建一個存儲整數的std::setstd::set<int> myS…

如何巧妙解決 Too many connections 報錯?

1. 背景 在日常的 MySQL 運維中&#xff0c;難免會出現參數設置不合理&#xff0c;導致 MySQL 在使用過程中出現各種各樣的問題。 今天&#xff0c;我們就來講解一下 MySQL 運維中一種常見的問題&#xff1a;最大連接數設置不合理&#xff0c;一旦到了業務高峰期就會出現連接…