springboot整合rabbitmq

rabbitmq的七種模式?

?

?Hello word

客戶端引入依賴

<!--rabbitmq 依賴客戶端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency>

生產者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//創建一個連接工廠ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");//channel 實現了自動 close 接口 自動關閉 不需要顯示關閉try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {/*** 生成一個隊列* 1.隊列名稱* 2.隊列里面的消息是否持久化 默認消息存儲在內存中* 3.該隊列是否只供一個消費者進行消費 是否進行共享 true 可以多個消費者消費* 4.是否自動刪除 最后一個消費者端開連接以后 該隊列是否自動刪除 true 自動刪除* 5.其他參數*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world!!!!";/*** 發送一個消息* 1.發送到那個交換機* 2.路由的 key 是哪個* 3.其他的參數信息* 4.發送消息的消息體*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息發送完畢");}}
}

消費者

import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();System.out.println("等待接收消息....");//推送的消息如何進行消費的接口回調DeliverCallback deliverCallback=(consumerTag, delivery)->{String message= new String(delivery.getBody());System.out.println(message);};//取消消費的一個回調接口 如在消費的時候隊列被刪除掉了CancelCallback cancelCallback=(consumerTag)->{System.out.println("消息消費被中斷");};/*** 消費者消費消息* 1.消費哪個隊列* 2.消費成功之后是否要自動應答 true 代表自動應答 false 手動應答* 3.消費者未成功消費的回調*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

獲取消息

?

?

?工作隊列

封裝獲取getChannel的工具類

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils {//得到一個連接的 channelpublic static Channel getChannel() throws Exception{//創建一個連接工廠ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}
}

接收消息工作1線程

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker01 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消費者取消消費接口回調邏輯");};System.out.println("C1 消費者啟動等待消費......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

?接收消息工作線程2

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker02 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消費者取消消費接口回調邏輯");};System.out.println("C2 消費者啟動等待消費......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

發送10次消息

線程1和線程2平分消息

?

發布訂閱?

?RabbitMQ 消息傳遞模型的核心思想是: 生產者生產的消息從不會直接發送到隊列。實際上,通常生產者甚至都不知道這些消息傳遞傳遞到了哪些隊列中。

總共有以下類型: 直接(direct), 主題 (topic) , 標題 (headers) , 扇出 (fanout)

fanout

接收者1?

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogs01 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/*** 生成一個臨時的隊列 隊列的名稱是隨機的* 當消費者斷開和該隊列的連接時 隊列自動刪除*/String queueName = channel.queueDeclare().getQueue();//把該臨時隊列綁定我們的 exchange 其中 routingkey(也稱之為 binding key)為空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息打印在屏幕.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("控制臺打印接收到的消息"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

接收者2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;import java.io.File;public class ReceiveLogs02 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/*** 生成一個臨時的隊列 隊列的名稱是隨機的* 當消費者斷開和該隊列的連接時 隊列自動刪除*/String queueName = channel.queueDeclare().getQueue();//把該臨時隊列綁定我們的 exchange 其中 routingkey(也稱之為 binding key)為空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息寫到文件.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
//           File file = new File("C:\\work\\rabbitmq_info.txt");
//           FileUtils.writeStringToFile(file,message,"UTF-8");System.out.println("數據寫入文件成功"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

?發送者

import com.rabbitmq.client.Channel;import java.util.Scanner;public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {/*** 聲明一個 exchange* 1.exchange 的名稱* 2.exchange 的類型*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout");Scanner sc = new Scanner(System.in);System.out.println("請輸入信息");while (sc.hasNext()) {String message = sc.nextLine();channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println("生產者發出消息" + message);}}}
}

?結果

?Direct

接收者1 ,寫入錯誤日志

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;import java.io.File;public class ReceiveLogsDirect01 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "disk";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");message="接收綁定鍵:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;
//             File file = new File("C:\\work\\rabbitmq_info.txt");
//             FileUtils.writeStringToFile(file,message,"UTF-8");System.out.println("錯誤日志已經接收"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

?接收者2,打印控制臺信息

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsDirect02 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "console";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收綁定鍵 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

?發送者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//創建多個 bindingKeyMap<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("info","普通 info 信息");bindingKeyMap.put("warning","警告 warning 信息");bindingKeyMap.put("error","錯誤 error 信息");//debug 沒有消費這接收這個消息 所有就丟失了bindingKeyMap.put("debug","調試 debug 信息");for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));System.out.println("生產者發出消息:" + message);}}}
}

接收到信息

Topics ?

主題1?

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsTopic01 {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");//聲明 Q1 隊列與綁定關系String queueName="Q1";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收隊列 :"+queueName+" 綁 定 鍵:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

?主題2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsTopic02 {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");//聲明 Q2 隊列與綁定關系String queueName="Q2";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收隊列 :"+queueName+" 綁 定 鍵:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

發送者

import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;public class EmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");/*** Q1-->綁定的是* 中間帶 orange 帶 3 個單詞的字符串(*.orange.*)* Q2-->綁定的是* 最后一個單詞是 rabbit 的 3 個單詞(*.*.rabbit)* 第一個單詞是 lazy 的多個單詞(lazy.#)**/Map<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("quick.orange.rabbit","被隊列 Q1Q2 接收到");bindingKeyMap.put("lazy.orange.elephant","被隊列 Q1Q2 接收到");bindingKeyMap.put("quick.orange.fox","被隊列 Q1 接收到");bindingKeyMap.put("lazy.brown.fox","被隊列 Q2 接收到");bindingKeyMap.put("lazy.pink.rabbit","雖然滿足兩個綁定但只被隊列 Q2 接收一次");bindingKeyMap.put("quick.brown.fox","不匹配任何綁定不會被任何隊列接收到會被丟棄");bindingKeyMap.put("quick.orange.male.rabbit","是四個單詞不匹配任何綁定會被丟棄");bindingKeyMap.put("lazy.orange.male.rabbit","是四個單詞但匹配 Q2");for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));System.out.println("生產者發出消息" + message);}}}
}

?結果

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/36006.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/36006.shtml
英文地址,請注明出處:http://en.pswp.cn/news/36006.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

STM32 LoRa源碼解讀

目錄結構&#xff1a; SX1278 |-- include | |-- fifo.h | |-- lora.h | |-- platform.h | |-- radio.h | |-- spi.h | |-- sx1276.h | |-- sx1276Fsk.h | |-- sx1276FskMisc.h | |-- sx1276Hal.h | |-- sx1276LoRa.h | -- sx1276LoRaMisc.h – src |-- fifo.c |-- lora.c |-- …

【解析postman工具的使用---基礎篇】

postman前端請求詳解 主界面1.常見類型的接口請求1.1 查詢參數的接口請求1.1.1 什么是查詢參數?1.1.2 postman如何請求 1.2 ?表單類型的接口請求1.2.1 復習下http請求1.2.2? 什么是表單 1.3 上傳文件的表單請求1.4? json類型的接口請求 2. 響應接口數據分析2.1 postman的響…

紅帽RHCA考試內容解析

紅帽RHCA考試內容解析&#xff1a;最新的RHCA有3大方向體系&#xff0c;考試內容分別為&#xff1a; 一、Platform 平臺技術 RH442&#xff1a;性能調優 使用紅帽企業Linux和紅帽網絡提供的工具來學習Linux的性能調優和規劃的技巧及方法&#xff0c;學習系統架構&#xff0c;…

什么是DNS欺騙及如何進行DNS欺騙

提示&#xff1a;文章寫完后&#xff0c;目錄可以自動生成&#xff0c;如何生成可參考右邊的幫助文檔 文章目錄 前言一、什么是 DNS 欺騙&#xff1f;二、開始1.配置2.Ettercap啟動3.操作 總結 前言 我已經離開了一段時間&#xff0c;我現在回來了&#xff0c;我終于在做一個教…

【AI】p54-p58導航網絡、藍圖和AI樹實現AI隨機移動和跟隨移動、靠近玩家揮拳、AI跟隨樣條線移動思路

p54-p58導航網絡、藍圖和AI樹實現AI隨機移動和跟隨移動、靠近玩家揮拳、AI跟隨樣條線移動思路 p54導航網格p55藍圖實現AI隨機移動和跟隨移動AI Move To&#xff08;AI進行移動&#xff09;Get Random Pointln Navigable Radius&#xff08;獲取可導航半徑內的隨機點&#xff09…

Java基礎十 - 設計模式

一、單例 1. 創建 餓漢式 package basic;public class ESingleton {// 先私有化靜態實例private static ESingleton eSingleton new ESingleton();// 私有化構造方法&#xff0c;防止外部實例化private ESingleton(){};// 提供全局訪問方法public static ESingleton geteSi…

時序預測 | MATLAB實現基于LSTM長短期記憶神經網絡的時間序列預測-遞歸預測未來(多指標評價)

時序預測 | MATLAB實現基于LSTM長短期記憶神經網絡的時間序列預測-遞歸預測未來(多指標評價) 目錄 時序預測 | MATLAB實現基于LSTM長短期記憶神經網絡的時間序列預測-遞歸預測未來(多指標評價)預測結果基本介紹程序設計參考資料 預測結果 基本介紹 Matlab實現LSTM長短期記憶神經…

識別和應對內存抖動

關于作者&#xff1a;CSDN內容合伙人、技術專家&#xff0c; 從零開始做日活千萬級APP。 專注于分享各領域原創系列文章 &#xff0c;擅長java后端、移動開發、人工智能等&#xff0c;希望大家多多支持。 目錄 一、導讀二、概覽三、案例分析3.1 使用memory-profiler3.2 使用 cp…

磁粉制動器離合器收放卷應用介紹

張力控制系統的開環閉環應用介紹,請查看下面文章鏈接: PLC張力控制(開環閉環算法分析)_張力控制plc程序實例_RXXW_Dor的博客-CSDN博客里工業控制張力控制無處不在,也衍生出很多張力控制專用控制器,磁粉制動器等,本篇博客主要討論PLC的張力控制相關應用和算法,關于繞線…

什么是 fullgc

GC GC 全稱為garbage collection,中文含義為垃圾回收&#xff0c;在jvm中的含義為回收無用內存空間 Young space 中文名為年輕代或者新生代&#xff0c;為JVM 堆的一部分&#xff0c;由分代GC概念劃分而來&#xff0c;保存生命周期較短的對象 Tenured space 中文名為老年代…

APP外包開發的iOS開發語言

學習iOS開發需要掌握Swift編程語言和相關的開發工具、框架和技術。而學習iOS開發需要時間和耐心&#xff0c;尤其是對于初學者。通過堅持不懈的努力&#xff0c;您可以逐步掌握iOS開發技能&#xff0c;構建出功能豐富、優質的移動應用。今天和大家分享學習iOS開發的一些建議方法…

【數據結構系列】鏈表

&#x1f49d;&#x1f49d;&#x1f49d;歡迎來到我的博客&#xff0c;很高興能夠在這里和您見面&#xff01;希望您在這里可以感受到一份輕松愉快的氛圍&#xff0c;不僅可以獲得有趣的內容和知識&#xff0c;也可以暢所欲言、分享您的想法和見解。 推薦:kuan 的首頁,持續學…

解決hbase節點已下線,但在status中顯示為dead問題

工作中需要下線4臺hbase小節點&#xff0c;下線完成后使用status 命令查看,有一臺為dead狀態: 使用status detailed 查看&#xff0c;發現“hd-03"這臺節點是dead。 檢查各節點配置文件無誤&#xff0c;并使用 /opt/hbase/bin/hbase-daemon.sh restart master 重啟兩個…

less基本使用

1 less中的變量 //對值進行聲明 link-color: #ccc//定義變量名稱 .{sleName} {}bg: background-color; //定義屬性名稱 .container {{bg}: red; }2 繼承&#xff08;復用重復樣式&#xff09; //繼承必須位于選擇器最后 //繼承選擇器名不能為變量 .a:hover:extend(.b) {}.a {…

走出迷宮(多組輸入bfs)

鏈接&#xff1a;登錄—專業IT筆試面試備考平臺_牛客網 來源&#xff1a;牛客網 題目描述 小明現在在玩一個游戲&#xff0c;游戲來到了教學關卡&#xff0c;迷宮是一個N*M的矩陣。 小明的起點在地圖中用“S”來表示&#xff0c;終點用“E”來表示&#xff0c;障礙物用“#…

淺談人工智能技術與物聯網結合帶來的好處

物聯網是指通過互聯網和各種技術將設備進行連接&#xff0c;實時采集數據、交互信息的網絡&#xff0c;對設備實現智能化自動化感知、識別和控制&#xff0c;給人們帶來便利。 人工智能是計算機科學的一個分支&#xff0c;旨在研究和開發能夠模擬人類智能的技術和方法。人工智能…

Redis: 詳解、使用教程和示例

Redis: 詳解、使用教程和示例 什么是 Redis&#xff1f; Redis&#xff08;Remote Dictionary Server&#xff09;是一個開源的、內存數據存儲系統&#xff0c;它可以用作數據庫、緩存和消息中間件。它支持多種數據結構&#xff0c;如字符串、哈希表、列表、集合、有序集合等…

Hadoop組件

前言 Hadoop 是一個能夠對大量數據進行分布式處理的軟件框架。具有可靠、高效、可伸縮的特點。 HDFS&#xff08;hadoop分布式文件系統&#xff09; 是hadoop體系中數據存儲管理的基礎。他是一個高度容錯的系統&#xff0c;能檢測和應對硬件故障。

后院失火、持續虧損!Mobileye半年報「不回避」競爭壓力

"客戶在2023年上半年非常謹慎&#xff0c;導致增長率低于正常水平&#xff0c;但我們已經看到下半年回暖趨勢&#xff0c;預計下半年交付將比去年同期增長16%&#xff0c;遠高于上半年。"這是Mobileye在近日公司半年報發布會上的預判。 公開數據顯示&#xff0c;今年…

Python 實現Selenium錄屏的一種方法(圖片整合成動態圖)

由于UI層自動化的不穩定性&#xff0c;經常會遇到執行中斷或用例失敗的問題&#xff0c;以下是一些常見的措施。 1.詳細的日志 2.定位出錯時截圖 3.Pytest的緩存機制(可以記錄成功了哪些失敗了哪些) 4.自動重試機制(如pytest-rerunfailures) 5.用例錄像 用例錄像是最直觀的一…