初試rabbitmq

rabbitmq的七種模式?

?Hello word

客戶端引入依賴

<!--rabbitmq 依賴客戶端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency>

生產者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//創建一個連接工廠ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");//channel 實現了自動 close 接口 自動關閉 不需要顯示關閉try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {/*** 生成一個隊列* 1.隊列名稱* 2.隊列里面的消息是否持久化 默認消息存儲在內存中* 3.該隊列是否只供一個消費者進行消費 是否進行共享 true 可以多個消費者消費* 4.是否自動刪除 最后一個消費者端開連接以后 該隊列是否自動刪除 true 自動刪除* 5.其他參數*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world!!!!";/*** 發送一個消息* 1.發送到那個交換機* 2.路由的 key 是哪個* 3.其他的參數信息* 4.發送消息的消息體*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息發送完畢");}}
}

消費者

import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();System.out.println("等待接收消息....");//推送的消息如何進行消費的接口回調DeliverCallback deliverCallback=(consumerTag, delivery)->{String message= new String(delivery.getBody());System.out.println(message);};//取消消費的一個回調接口 如在消費的時候隊列被刪除掉了CancelCallback cancelCallback=(consumerTag)->{System.out.println("消息消費被中斷");};/*** 消費者消費消息* 1.消費哪個隊列* 2.消費成功之后是否要自動應答 true 代表自動應答 false 手動應答* 3.消費者未成功消費的回調*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

獲取消息

?工作隊列

封裝獲取getChannel的工具類

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils {//得到一個連接的 channelpublic static Channel getChannel() throws Exception{//創建一個連接工廠ConnectionFactory factory = new ConnectionFactory();factory.setHost("43.139.59.28");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}
}

接收消息工作1線程

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker01 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消費者取消消費接口回調邏輯");};System.out.println("C1 消費者啟動等待消費......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

?接收消息工作線程2

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Worker02 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消費者取消消費接口回調邏輯");};System.out.println("C2 消費者啟動等待消費......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

發送10次消息

線程1和線程2平分消息

發布訂閱?

?RabbitMQ 消息傳遞模型的核心思想是: 生產者生產的消息從不會直接發送到隊列。實際上,通常生產者甚至都不知道這些消息傳遞傳遞到了哪些隊列中。

總共有以下類型: 直接(direct), 主題 (topic) , 標題 (headers) , 扇出 (fanout)

fanout

接收者1?

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogs01 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/*** 生成一個臨時的隊列 隊列的名稱是隨機的* 當消費者斷開和該隊列的連接時 隊列自動刪除*/String queueName = channel.queueDeclare().getQueue();//把該臨時隊列綁定我們的 exchange 其中 routingkey(也稱之為 binding key)為空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息打印在屏幕.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("控制臺打印接收到的消息"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

接收者2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;import java.io.File;public class ReceiveLogs02 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/*** 生成一個臨時的隊列 隊列的名稱是隨機的* 當消費者斷開和該隊列的連接時 隊列自動刪除*/String queueName = channel.queueDeclare().getQueue();//把該臨時隊列綁定我們的 exchange 其中 routingkey(也稱之為 binding key)為空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息寫到文件.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");
//           File file = new File("C:\\work\\rabbitmq_info.txt");
//           FileUtils.writeStringToFile(file,message,"UTF-8");System.out.println("數據寫入文件成功"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

?發送者

import com.rabbitmq.client.Channel;import java.util.Scanner;public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {/*** 聲明一個 exchange* 1.exchange 的名稱* 2.exchange 的類型*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout");Scanner sc = new Scanner(System.in);System.out.println("請輸入信息");while (sc.hasNext()) {String message = sc.nextLine();channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println("生產者發出消息" + message);}}}
}

?結果

?Direct

接收者1 ,寫入錯誤日志

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;import java.io.File;public class ReceiveLogsDirect01 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "disk";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");message="接收綁定鍵:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;
//             File file = new File("C:\\work\\rabbitmq_info.txt");
//             FileUtils.writeStringToFile(file,message,"UTF-8");System.out.println("錯誤日志已經接收"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

?接收者2,打印控制臺信息

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsDirect02 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "console";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收綁定鍵 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

?發送者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//創建多個 bindingKeyMap<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("info","普通 info 信息");bindingKeyMap.put("warning","警告 warning 信息");bindingKeyMap.put("error","錯誤 error 信息");//debug 沒有消費這接收這個消息 所有就丟失了bindingKeyMap.put("debug","調試 debug 信息");for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));System.out.println("生產者發出消息:" + message);}}}
}

接收到信息

Topics ?

主題1?

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsTopic01 {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");//聲明 Q1 隊列與綁定關系String queueName="Q1";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收隊列 :"+queueName+" 綁 定 鍵:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

?主題2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsTopic02 {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");//聲明 Q2 隊列與綁定關系String queueName="Q2";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收隊列 :"+queueName+" 綁 定 鍵:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

發送者

import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;public class EmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");/*** Q1-->綁定的是* 中間帶 orange 帶 3 個單詞的字符串(*.orange.*)* Q2-->綁定的是* 最后一個單詞是 rabbit 的 3 個單詞(*.*.rabbit)* 第一個單詞是 lazy 的多個單詞(lazy.#)**/Map<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("quick.orange.rabbit","被隊列 Q1Q2 接收到");bindingKeyMap.put("lazy.orange.elephant","被隊列 Q1Q2 接收到");bindingKeyMap.put("quick.orange.fox","被隊列 Q1 接收到");bindingKeyMap.put("lazy.brown.fox","被隊列 Q2 接收到");bindingKeyMap.put("lazy.pink.rabbit","雖然滿足兩個綁定但只被隊列 Q2 接收一次");bindingKeyMap.put("quick.brown.fox","不匹配任何綁定不會被任何隊列接收到會被丟棄");bindingKeyMap.put("quick.orange.male.rabbit","是四個單詞不匹配任何綁定會被丟棄");bindingKeyMap.put("lazy.orange.male.rabbit","是四個單詞但匹配 Q2");for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));System.out.println("生產者發出消息" + message);}}}
}

?結果

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/41830.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/41830.shtml
英文地址,請注明出處:http://en.pswp.cn/news/41830.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

邀請函|澎峰科技邀您參加CCF HPC China2023

一年一度的全球超算盛會&#xff01; 以“算力互聯智領未來”為主題的第十九屆全國高性能計算學術年會&#xff08;CCF HPC China 2023&#xff09;將于8月24-26日&#xff08;展覽23-25日&#xff09;在青島紅島國際會議展覽中心舉辦。 九大院士領銜 打造頂級超算盛會 力邀…

《離散數學及其應用(原書第8版)》ISBN978-7-111-63687-8 第11章 11.1.3 樹的性質 節 第664頁的例9說明

《離散數學及其應用&#xff08;原書第8版&#xff09;》ISBN978-7-111-63687-8 第11章 11.1.3 樹的性質 節 第664頁的定理3的引申 定理3 帶有i個內點的m叉樹含有nmi1個頂點 見本人博文 內點定義不同的討論 如果對于一個m叉正則樹&#xff0c;即任意分支節點的兒子恰好有m個&am…

談談IP地址和子網掩碼的概念及應用

個人主頁&#xff1a;insist--個人主頁?????? 本文專欄&#xff1a;網絡基礎——帶你走進網絡世界 本專欄會持續更新網絡基礎知識&#xff0c;希望大家多多支持&#xff0c;讓我們一起探索這個神奇而廣闊的網絡世界。 目錄 一、IP地址的概念 二、IP地址的分類 1、A類 …

長勝證券:散戶可以隨大流嗎?怎么做才好?

在我國的股市里邊&#xff0c;最不缺的或許便是散戶了&#xff0c;一方面&#xff0c;散戶促進了股市的活潑&#xff0c;可一方面又特容易望風而動&#xff0c;追漲殺跌。因此&#xff0c;散戶能夠隨大流嗎&#xff1f;該怎么做才好&#xff1f;對于這些&#xff0c;長勝證券為…

IntelliJ IDEA熱部署:JRebel插件的安裝與使用

熱部署 概述JRebel 概述 熱部署&#xff0c;指修改代碼后&#xff0c;無需停止應用程序&#xff0c;即可使修改后的代碼生效&#xff0c;其有利于提高開發效率。 熱部署方式&#xff1a; 手動熱部署&#xff1a;修改代碼后&#xff0c;重新編譯項目&#xff0c;然后啟動應用程…

Springboot項目啟動后按順序加載自定義類 (demo)

1. 實現ApplicationRunner接口, 重寫run方法 import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; import org.springframewor…

IDEA啟動報錯java.nio.charset.MalformedInputException: Input length=2

IDEA啟動報錯java.nio.charset.MalformedInputException: Input length2 問題解決后記 問題 原本系統運行好好得&#xff0c;一段時間沒打開&#xff0c;再次打開重啟 IDEA啟動報錯java.nio.charset.MalformedInputException: Input length2。 解決 百度了 https://blog.csd…

使用 Qt 生成 Word 和 PDF 文檔的詳細教程

系列文章目錄 文章目錄 系列文章目錄前言一、安裝 Qt二、生成 Word 文檔三、生成 PDF 文檔四、運行代碼并查看結果五、自定義文檔內容總結前言 Qt 是一個跨平臺的應用程序開發框架,除了用于創建圖形界面應用程序外,還可以用來生成 Word 和 PDF 文檔。本文將介紹如何使用 Qt …

【C語言】const修飾普通變量和指針

大家好&#xff0c;我是蘇貝&#xff0c;本篇博客帶大家了解const修飾普通變量和指針&#xff0c;如果你覺得我寫的還不錯的話&#xff0c;可以給我一個贊&#x1f44d;嗎&#xff0c;感謝?? 文章目錄 一.const修飾普通變量二.const修飾指針1.const 放在 * 左邊2.const 放在…

git commit用法

git commit 是 Git 版本控制系統中的一個命令&#xff0c;用于將更改提交到本地存儲庫。以下是 git commit 的一些常見用法和選項&#xff1a; 基本用法: git commit -m "提交信息"使用 -m 選項可以直接在命令行中添加提交信息。 提交所有更改: git commit -a -m &q…

設計模式-簡單工廠模式

簡單工廠模式又稱為靜態工廠模式&#xff0c;其實就是根據傳入參數創建對應具體類的實例并返回實例對象&#xff0c;這些類通常繼承至同一個父類&#xff0c;該模式專門定義了一個類來負責創建其他類的實例。 using System.Collections; using System.Collections.Generic; us…

Nacos - 安裝指南(Windows系統)

一、下載安裝包 Nacos現在雖然已經出到二點幾的版本&#xff0c;但二點幾版本還處在測試階段&#xff0c;我們選擇下載成熟的 1.4.6 版本 下載地址&#xff1a;Nacos 1.4.6 GitHub的Release下載頁 拉到頁面最底部&#xff0c;可以看到下載按鈕&#xff0c;windows版本使用naco…

htmlCSS-----彈性布局

目錄 前言 什么是彈性布局 樣式 學習概要 容器和項目 彈性布局的排列方式 1.橫向排列&#xff08;默認樣式&#xff09; 2.父元素容器的屬性&#xff08;*5&#xff09; &#xff08;1&#xff09;主軸 代碼示例&#xff1a; &#xff08;2&#xff09;交叉軸 3.子元素…

正則表達式試煉

序 我希望在這里列出我很多想寫的正則表達式&#xff0c;很多我想寫&#xff0c;但是不知道怎么寫的。分享點滴案例。未來這個文章會越來越長 前言 互聯網時代&#xff0c;除了文本還有更好的學習方式&#xff0c;下面是幾個不錯的練習網站&#xff0c;如果你想系統地學習&a…

[Flutter]有的時候調用setState(() {})報錯?

先看FlutterSDK的原生類State中有一個變量mounted。 abstract class State<T extends StatefulWidget> with Diagnosticable {/// mounted的作用是&#xff0c;此State對象當前是否在樹中。/// 在創建State對象之后&#xff0c;在調用initState之前&#xff0c;框架通過…

【Linux】【驅動】應用層和驅動層傳輸數據

【Linux】【驅動】應用層和驅動層傳輸數據 緒論1.如果我在應用層使用系統0 對設備節點進行打開&#xff0c;關閉&#xff0c;讀寫等操作會發生什么呢? 2 我們的應用層和內核層是不能直接進行數據傳輸的3 驅動部分的代碼4 應用代碼5 編譯以及運行代碼 緒論 Linux一切皆文件! 文…

如何使用CSS實現一個下拉菜單?

聚沙成塔每天進步一點點 ? 專欄簡介? 使用CSS實現下拉菜單? HTML 結構? CSS 樣式? 寫在最后 ? 專欄簡介 前端入門之旅&#xff1a;探索Web開發的奇妙世界 記得點擊上方或者右側鏈接訂閱本專欄哦 幾何帶你啟航前端之旅 歡迎來到前端入門之旅&#xff01;這個專欄是為那些…

學習筆記」左偏樹

dist 的性質 對于一棵二叉樹&#xff0c;我們定義左孩子或右孩子為空的節點為外節點&#xff0c;定義外節點的 distdist 為 11&#xff0c;空節點的 distdist 為 00&#xff0c;不是外節點也不是空節點的 distdist 為其到子樹中最近的外節點的距離加一。 一棵根的 distdist 為…

中間件(下)

1、中間件與性能優化的關系&#xff1a; 中間件與性能優化之間存在密切的關系&#xff0c;特別是在構建復雜的分布式系統、處理高并發、實現異步通信等情況下。中間件可以在性能優化方面發揮重要作用&#xff0c;但同時&#xff0c;不當的中間件選擇和配置也可能導致性能問題。…

【卡碼網】31. 字符串的最大價值 <貪心>

【卡碼網】31. 字符串的最大價值 給定一個字符串 S S S&#xff08;S.lenth < 5000&#xff09;&#xff0c;只包含 0 和 1 兩個數字&#xff0c;下標從 1 開始&#xff0c;設第 i i i 位的價值為 v a l i val_i vali?&#xff0c;則 v a l i val_i vali?的定義如下&a…