RabbitMq的基礎及springAmqp的使用

RabbitMq

官網:RabbitMQ: One broker to queue them all | RabbitMQ

什么是MQ?

mq就是消息隊列,消息隊列遵循這先入先出原則。一般用來解決應用解耦,異步消息,流量削峰等問題,實現高性能,高可用,可伸縮和最終一致性架構。

rabbitMq的四大核心

image20230424154420240.png

RabbitMq的安裝

RabbitMQ是一個開源的遵循 AMQP協議實現的基于 Erlang語言編寫,即需要先安裝部署Erlang環境再安裝RabbitMQ環境。

查看兼容關系:Erlang Version Requirements | RabbitMQ

百度云地址:

鏈接:百度網盤 請輸入提取碼 提取碼:6666

本篇文章使用版本:3.8.8,liunx7-cenOs7

#在存放位置執行以下指令rpm -ivh erlang-21.3-1.el7.x86_64.rpm#安裝socat yum install socat -y#安裝mqrpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm 

啟動

#開機自動啟動chkconfig rabbitmq-server on
#啟動服務
/sbin/service rabbitmq-serve start
#查看啟動
/sbin/service rabbitmq-serve status
#停止服務
/sbin/service rabbitmq-serve stop

image20230424154409259.png

坑:執行以上指令無效,重新執行下面指令

systemctl start rabbitmq-server.service #啟動
systemctl status rabbitmq-server.service#查看狀態

安裝可視化界面

#盡量停止服務,在安裝
#安裝可視化界面
rabbitmq-plugins enable rabbitmq_management

訪問地址:http://ip:15672/

如果訪問不了,查看防火墻是夠關閉 systemctl stop firewalld 關閉防火墻,訪問成功后走rabbitmq的基本指令

卸載MQ:

systemctl stop rabbitmq-server
yum list | grep rabbitmq
yum -y remove rabbitmq-server.noarch
yum list | grep erlang
yum -y remove erlang-*
rm -rf /usr/lib64/erlang 
rm -rf /var/lib/rabbitmq
rm -rf /usr/local/erlang
rm -rf /usr/local/rabbitmq

docker安裝

docker pull rabbitmq:3-management
#運行
docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \--name mq \--hostname mq1 \-p 15672:15672 \ ? #網頁訪問端口-p 5672:5672 \ ? ? #mq連接端口-d \rabbitmq:3-management

rabbitMq基本指令

#查看用戶
rabbitmqctl list_users
#添加用戶
rabbitmqctl add_user admin 123456
#設置角色 (超級管理員)rabbitmqctl set_user_tags admin administrator
#設置權限 
rabbitmqctl set_permissions -p '/' admin '.*' '.*' '.*'

image20230424160438952.png

登錄后也可以在此界面添加用戶

image20230424160714982.png

對接java(入門)

image20230506093856370.png

創建一個maven工程:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.chen</groupId><artifactId>mq</artifactId><version>1.0-SNAPSHOT</version><properties><rabbitmq.version>5.8.0</rabbitmq.version><common.version>2.6</common.version></properties><dependencies>
<!--         https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>${rabbitmq.version}</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>${common.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>
</project>

生產者:

package com.chen.rabbitmq.one;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** 生產者*/
public class production {private static final String MQ_KEY="holle";public static void main(String[] args)throws Exception {
//        創建rabbitmq的工廠ConnectionFactory factory = new ConnectionFactory();
//    連接地址ipfactory.setHost("172.17.18.162");
//        用戶名factory.setUsername("admin");
//        密碼factory.setPassword("123456");//        創建連接Connection connection = factory.newConnection();
//        創建通道Channel channel = connection.createChannel();
//   生產隊列
//   參數一:隊列名稱
//   參數二:持久性(默認為false)
//   參數三:該隊列是否可以有多個消費者,是否消息共享
//   參數四:是否自動刪除
//   參數五:其他參數channel.queueDeclare(MQ_KEY,true,false,false,null);/*** 發送一個消費者* 1.發送到那個交換機* 2.路由的key值是哪個 本次是隊列的名稱* 3.其他參數* 4.發送消息的消息體*/channel.basicPublish("",MQ_KEY,null,"holle word".getBytes());System.out.println("消息發送成功!");}}

測試是否發送成功:

image20230506094013364.png

消費者:

package com.chen.rabbitmq.one;import com.rabbitmq.client.*;import java.io.IOException;public class Consumption {private static final String MQ_KEY="holle";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("172.17.18.162");factory.setUsername("admin");factory.setPassword("123456");
//        創建一個新的連接Connection connection = factory.newConnection();Channel channel = connection.createChannel();/*參數:1: 消費哪個隊列2.消費成功之后是否要自動應答, true 帶邊自動應答 false 手動3.消費者未成功的回調4.消費者取錄成功的回調*/channel.basicConsume(MQ_KEY, true,(DeliverCallback) (consumerTag, message) -> System.out.println(new String(message.getBody())),(CancelCallback) (consumerTag)-> System.out.println(consumerTag));}}

工作隊列:

工作隊列(又稱任務隊列)的主要思想是避免立即執行資源密集型任務,而不得不等待它完成。相反我們安排任務在之后執行。我們把任務封裝為消息并將其發送到隊列。在后臺運行的工作進程將彈出任務并最終執行作業。當有多個工作線程時,這些工作線程將一起處理這些任務。

image20230506103939702.png

1.線程輪詢

類似nginx的負載均衡(輪詢),線1一次,線2一次。

image20230506111137320.png

工具類:

package com.chen.rabbitmq.tow.utils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitUtils {public static Channel rabbitConnection() throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("172.17.18.162");factory.setUsername("admin");factory.setPassword("123456");
//        創建一個新的連接Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}
}
生產者:package com.chen.rabbitmq.tow.test;import com.chen.rabbitmq.tow.utils.RabbitUtils;
import com.rabbitmq.client.Channel;import java.util.Scanner;public class Production {private final static String MQ_KEY="word";
//    生產者public static void production() throws Exception{Channel channel = RabbitUtils.rabbitConnection();Scanner scanner = new Scanner(System.in);//生產隊列channel.queueDeclare(MQ_KEY,true,false,false,null);while (scanner.hasNext()){String next = scanner.next();channel.basicPublish("",MQ_KEY,null,next.getBytes());System.out.println("消息發布成功-> "+next);}}public static void main(String[] args) throws Exception{production();}
}
消費者:package com.chen.rabbitmq.tow.test;import com.chen.rabbitmq.tow.utils.RabbitUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class Consumption {private final static String MQ_KEY="word";//    消費者public static void consumption() throws Exception{
//        獲取連接隊列Channel channel = RabbitUtils.rabbitConnection();channel.basicConsume(MQ_KEY,true,(DeliverCallback)(consumerTag,message)->{System.out.println(new String(message.getBody()));},(CancelCallback)(tag)->{System.out.println(tag);System.out.println("中斷了");});}public static void main(String[] args) throws Exception{consumption();}
}

image20230506111456297.png

idea開啟兩個線程。

消息應答

1.自動應答

RabbitMQ 是一個廣泛使用的開源消息代理,它支持多種消息協議,例如 AMQP、MQTT、STOMP 等。在 RabbitMQ 中,自動應答(Automatic Acknowledgement,Auto-ack)是一種消息確認機制,用于標記消息是否已被成功接收和處理。了解自動應答的概念,對于構建可靠、高效的消息傳遞系統非常重要。

當消費者接收并處理來自 RabbitMQ 的消息時,通常會使用消息確認(acknowledgements)機制來告知 RabbitMQ 該消息已經成功處理。這樣一來,RabbitMQ 就可以確保消息不會意外丟失。然而,這種確認過程可能會導致一定的延遲和額外開銷。為了解決這個問題,RabbitMQ 提供了自動應答機制。

在自動應答模式下,消費者接收到消息后,RabbitMQ 會立即將該消息標記為已處理。這意味著消費者不需要顯式地發送確認(ack)消息給 RabbitMQ。這種機制可以降低延遲,提高消息傳遞的速度,但是也存在一定的風險。因為消息一旦被發送出去,RabbitMQ 就認為它已經成功處理,而實際上消費者可能還沒有完成對消息的處理。如果消費者在處理消息時發生故障,那么這個消息可能會丟失。

2.手動應答

方法:

Channel.basicAck (用于肯定確認)

RabbitMQ已知道該消息并且成功的處理消息,可以將其丟棄了

Channel.basicNack(用于否定確認)

Channel.basicReject (用于否定確認)

Channel.basicNack 相比少一個參數 不處理該消息了直接拒絕,可以將其丟棄了

Multiple

//源碼
public void basicAck(long deliveryTag, boolean multiple) throws IOException {this.delegate.basicAck(deliveryTag, multiple);
}

multiple 的 true 和 false 代表不同意思:

  1. true 代表批量應答 channel 上未應答的消息

    比如說 channel 上有傳送 tag 的消息 5,6,7,8 當前 tag 是 8 那么此時5-8 的這些還未應答的消息都會被確認收到消息應答

    image20230506145530499.png

    2.false 只會應答 tag=8 的消息 5,6,7 這三個消息依然不會被確認收到消息應答

    image20230506145536088.png

消息重新入隊

為了解決消息丟失問題。

image20230506150713801.png

具體代碼:

生產者:

package com.chen.rabbitmq.three;
import com.chen.rabbitmq.tow.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class Pro {private static final String MQ_KEY="mqkey";public static void pro() throws Exception{Channel channel = RabbitUtils.rabbitConnection();channel.queueDeclare(MQ_KEY,true,false,false,null);Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String next = scanner.next();channel.basicPublish("",MQ_KEY,null,scanner.next().getBytes());System.out.println("消息發布成功-> "+next);}}public static void main(String[] args) throws Exception {pro();}
}

消費者1:

package com.chen.rabbitmq.three;import com.chen.rabbitmq.tow.utils.RabbitUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;//消費者1
public class Word1 {public static final String MQ_KEY="mqkey";public static void word() throws Exception{Channel channel = RabbitUtils.rabbitConnection();channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
//             睡眠1stry {Thread.sleep(1*1000);System.out.println("Word1接收到消息->"+new String(message.getBody()));
//                 參數一:tag標記  參數二:是否批量channel.basicAck(message.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) {e.printStackTrace();}},(CancelCallback) e->{System.out.println("消息中斷"+e);} );}public static void main(String[] args) throws Exception {word();}}

消費者2:

package com.chen.rabbitmq.three;import com.chen.rabbitmq.tow.utils.RabbitUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;//消費者1
public class Word2 {public static final String MQ_KEY="mqkey";public static void word() throws Exception{Channel channel = RabbitUtils.rabbitConnection();channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
//             睡眠10stry {Thread.sleep(10*1000);System.out.println("Word2接收到消息->"+new String(message.getBody()));
//                 參數一:tag標記  參數二:是否批量channel.basicAck(message.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) {e.printStackTrace();}},(CancelCallback) e->{System.out.println("消息中斷"+e);} );}public static void main(String[] args) throws Exception{word();}}

image20230506160401052.png

經測試會發現,消費者1為第一個接收到消息,接下來當生產者在生產出一條消息,應到消費者2接收到消息,但是此時消費者2突然出現宕機,使用了應答機制,消息則會重新打到消費者1;

持久化設置

1.隊列持久化

作用:當rabbitmq宕機后,重啟隊列依然存在

//創建隊列時的第二個參數為設置持久化 
channel.queueDeclare(MQ_KEY,true,false,false,null);

image20230506161649268.png

2.消息持久化

作用:當rabbitmq宕機了重新啟動,發送的消息依然存在。

下面的方法不是絕對的能保證消息的持久化

//生產者  private static final String MQ_KEY="mqkey";public static void pro() throws Exception{Channel channel = RabbitUtils.rabbitConnection();channel.queueDeclare(MQ_KEY,true,false,false,null);Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String next = scanner.next();
//MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化channel.basicPublish("",MQ_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,scanner.next().getBytes());System.out.println("消息發布成功-> "+next);}}

3.發布確認

完成以上兩步還不足以持久化,要把發布確認加上。

//默認是不開啟的
Channel channel = RabbitUtils.rabbitConnection();
channel.confirmSelect();//開啟發布確認
發布確認的策略:

1.單個確認發布

這個發布確認是同步的,需等待確認一次在發布下一次,一手交錢一手交貨原則

缺點:發布速度特別慢

//單個確認public static void one() throws Exception{Channel channel = RabbitUtils.rabbitConnection();//開啟發布確認channel.confirmSelect();String uuid = UUID.randomUUID().toString();//創建隊列channel.queueDeclare(uuid,true,false,false,null);//開始時間long begin = System.currentTimeMillis();for (Integer i = 0; i < COUNT; i++) {String message = i + "";channel.basicPublish("",uuid,null,message.getBytes());//發布確認boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息確認成功!");}}long last = System.currentTimeMillis();System.out.println("耗時:"+(last-begin));}

2.批量確認發布

發布速度相對單個發布確認要快,但是當其中一條消息出現異常,將無法查找到那個消息丟失 。

 //批量public static void batch() throws Exception{Channel channel = RabbitUtils.rabbitConnection();String uuid = UUID.randomUUID().toString();//開啟消息確認channel.confirmSelect();//創建隊列channel.queueDeclare(uuid,true,false,false,null);//這個這個變量用記錄發布值Integer messageCount=100;Integer record =0;//開始時間long begin = System.currentTimeMillis();for (Integer i = 0; i < COUNT; i++) {record++;String message=i+"";//發布消息channel.basicPublish("",uuid,null,message.getBytes());if(messageCount.equals(record)){channel.waitForConfirms();record=0;}}long last = System.currentTimeMillis();System.out.println("耗時"+(last-begin));}

3.異步確認發布(推薦使用)

異步確認雖然比上的兩個代碼復雜,但同時也解決了上面兩種方式遺留下來的問題。

image20230511104153138.png

public static void asyn() throws Exception{Channel channel = RabbitUtils.rabbitConnection();//開啟發布確認channel.confirmSelect();String uuid = UUID.randomUUID().toString();//創建隊列channel.queueDeclare(uuid,true,false,false,null);//開始時間long begin = System.currentTimeMillis();//        創建一個線程的ListMap用于記錄 ----》處理異步未確認的消息ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();//        監聽消息channel.addConfirmListener((deliveryTag, multiple)->{if(multiple){ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap =map.headMap(deliveryTag);longStringConcurrentNavigableMap.clear();}else{map.remove(deliveryTag);}System.out.println("確認消息:"+deliveryTag);},(deliveryTag, multiple)->{String message = map.get(deliveryTag);System.out.println("發送失敗的數據是:"+message+"未確認消息:"+deliveryTag+"-----失敗");});for (Integer i = 0; i < COUNT; i++) {String message=""+i;channel.basicPublish("",uuid,null,message.getBytes());//獲取信道的標識,存入消息map.put(channel.getNextPublishSeqNo(),message);}long last = System.currentTimeMillis();System.out.println("耗時:"+(last-begin));}

不公平分發原則(能者多勞原則)

在上面中的所有例子都是尊尋這輪詢的規則去執行的,問題:當其中的一臺服務響應特別慢時就會影響到整體的效率。

channel.basicQos(1);

//消費者
public static void word() throws Exception{Channel channel = RabbitUtils.rabbitConnection();//設置不公平分發channel.basicQos(1);channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{try {//模擬虛擬機延遲Thread.sleep(1*1000);System.out.println("Word2接收到消息->"+new String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) {e.printStackTrace();}},(CancelCallback) e->{System.out.println("消息中斷"+e);} );
}

也可以用來設置預期值!

//消費者1
public static void word2() throws Exception{Channel channel = RabbitUtils.rabbitConnection();//設置預期值channel.basicQos(3);channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{try {//模擬虛擬機延遲Thread.sleep(1*1000);System.out.println("Word2接收到消息->"+new String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) {e.printStackTrace();}},(CancelCallback) e->{System.out.println("消息中斷"+e);} );
}
//消費者2
public static void word2() throws Exception{Channel channel = RabbitUtils.rabbitConnection();//設置預期值channel.basicQos(5);  channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{try {//模擬虛擬機延遲Thread.sleep(10*1000);System.out.println("Word2接收到消息->"+new String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) {e.printStackTrace();}},(CancelCallback) e->{System.out.println("消息中斷"+e);} );
}

交換機

在RabbitMQ中,生產者發送消息不會直接將消息投遞到隊列中,而是先將消息投遞到交換機中, 在由交換機轉發到具體的隊列, 隊列再將消息以推送或者拉取方式給消費者進行消費

綁定(bindings)

與交換機產生關系,并且能有routekey控制發送消息給哪個隊列。

fanout交換機(扇形)

扇形交換機是最基本的交換機類型,它所能做的事清非常簡單廣播消息。扇形交換機會把能接收到的消息全部發送給綁定在自己身上的隊列。因為廣播不需要'思考”,所以扇形交換機處理消息的速度也是所有的交換機類型里面最快的。

//消費者
public class Word {
//    交換機名稱private static String EXCHANGE_NAME="logs";public static void main(String[] args) throws Exception {Channel channel = RabbitUtils.rabbitConnection();
//        聲明一個交換機channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//        聲明一個隊列 臨時隊列String queue = channel.queueDeclare().getQueue();
//        綁定交換機與隊列channel.queueBind(queue,EXCHANGE_NAME,"");System.out.println("等待消息~");//消費者取消消息時回調接口channel.basicConsume(queue,true, (consumerTag,message)->{System.out.println("word1控制臺打印接收消息:"+new String(message.getBody(),"UTF-8"));},cancelCallback->{});}
}public class Word2 {
//    交換機名稱private static String EXCHANGE_NAME="logs";public static void main(String[] args) throws Exception {Channel channel = RabbitUtils.rabbitConnection();
//        聲明一個交換機channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//        聲明一個隊列 臨時隊列String queue = channel.queueDeclare().getQueue();
//        綁定交換機與隊列channel.queueBind(queue,EXCHANGE_NAME,"");System.out.println("等待消息~");
//消費者取消消息時回調接口channel.basicConsume(queue,true, (consumerTag,message)->{System.out.println("word2控制臺打印接收消息:"+new String(message.getBody(),"UTF-8"));},cancelCallback->{});}
}
//生產者
public class send {//    交換機名稱private static String EXCHANGE_NAME="logs";public static void main(String[] args) throws Exception {Channel channel = RabbitUtils.rabbitConnection();Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String next = scanner.next();channel.basicPublish(EXCHANGE_NAME,"",null,next.getBytes("UTF-8"));System.out.println("生產者發送消息:"+next);}}
}

直連交換機: Direct exchange

直連交換機的路由算法非常簡單: 將消息推送到binding key與該消息的routing key相同的隊列。

代碼幾乎類型fanout交換機,只需要指定routerkey即可。

主題交換機: Topic exchange

發送到主題交換機的 消息不能有任意的 routing key, 必須是由點號分開的一串單詞,這些單詞可以是任意的,但通常是與消息相關的一些特征。

如以下是幾個有效的routing key:

"stock.usd.nyse", "nyse.vmw", "quick.orange.rabb 代", routing key的單詞可以 有很多,最大限制是255 bytes。

Topic 交換機的 邏輯與 direct 交換機有點 相似 使用特定路由鍵發送的消息 將被發送到所有使用匹配綁定鍵綁定的隊列 ,然而 ,綁定鍵有兩個特殊的情況:

*表示匹配任意一個單詞

#表示匹配任意—個或多個單詞

image20230525104709697.png

比如上圖:

發送routerkey為:ws.orange.rabbit那么對應的就是Q1,Q2

發送routerkey為:lazy.orange.elephant那么對應的就是Q1,Q2

//消費者
public class word1 {private static final String EXCHANGE_NAME="topic_logs";private static final String QUEUE_NAME="Q1";public static void main(String[] args) throws Exception {Channel channel = RabbitUtils.rabbitConnection();
//        創建交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//       創建隊列channel.queueDeclare(QUEUE_NAME,true,true,false,null);
//        綁定隊列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
//        接收消息channel.basicConsume(QUEUE_NAME,true,(consumerTag,message)->{System.out.println("接收到的消息:"+new String(message.getBody()));},cancelCallback->{});System.out.println("等下消息~");}
}
public class word2 {private static final String EXCHANGE_NAME="topic_logs";private static final String QUEUE_NAME="Q2";public static void main(String[] args) throws Exception {Channel channel = RabbitUtils.rabbitConnection();
//        創建交換機channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//       創建隊列channel.queueDeclare(QUEUE_NAME,true,true,false,null);
//        綁定隊列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
//        接收消息channel.basicConsume(QUEUE_NAME,true,(consumerTag,message)->{System.out.println("接收到的消息:"+new String(message.getBody()));},cancelCallback->{});System.out.println("等下消息~");}
}
//生產者
public class send {private static final String EXCHANGE_NAME="topic_logs";public static void main(String[] args) throws Exception{Channel channel = RabbitUtils.rabbitConnection();Scanner scanner = new Scanner(System.in);while (true){System.out.println("請輸入routerkey:");String key = scanner.next();System.out.println("請輸入消息內容:");String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes());}}
}

image20230525112943356.png

image20230525112959575.png

死信隊列

顧名思義:無法被消費的消息,一般來說,producer將消息投遞broker或者直接到queue里了,consumer(消費者)從queue取出消息進行消費,但某些時間由特定原因導致queue中的某些消息無法被消費,這樣如果沒有后續的處理,就變成了死信。

應用場景:為了確保訂單業務的消息數據不丟失,需要使用到RabbitMQ的死信隊列機制,當消息被消息時發生了異常,這是就將消息存到死信中,還比如說:用戶商城下單成功,并且點擊支付后在指定時間支付時自動失效。

image20230525143950674.png

消息TTL過期時間測試:

//生產者
public class send {private static final String NORMAL_EXCHANGE="normal_exchange";public static final String NORMAL_QUEUE="normal_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitUtils.rabbitConnection();
//        設置死信時間AMQP.BasicProperties basicProperties =new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 11; i++) {String msg="info"+i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",basicProperties,msg.getBytes());}}
}
//消費者1
public class C1 {private static final String NORMAL_EXCHANGE="normal_exchange";private static final String DEAD_EXCHANGE="dead_exchange";public static final String NORMAL_QUEUE="normal_queue";public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitUtils.rabbitConnection();
//      創建c1交換機channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.TOPIC);
//       聲明普通隊列HashMap<String, Object>  map = new HashMap<>();
//        設置過期時間 10s  單位ms 這里有消費整去做控制
//        map.put("x-message-ttl",100000);
//        正常隊列設置死信交換機map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//       設置死信的routerKeymap.put("x-dead-letter-routing-key","lisi");//    創建普通隊列channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//創建死信隊列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//        綁定channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");channel.basicConsume(NORMAL_QUEUE,true,(consumerTag, message) -> {System.out.println("C1消息為:"+message.getBody());},cancelCallback->{});}
}
//消費者2
public class C2 {public static final String DEAD_QUEUE="dead_queue";private static final String DEAD_EXCHANGE="dead_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitUtils.rabbitConnection();channel.basicConsume(DEAD_QUEUE,true,(consumerTag,message)->{System.out.println("消息為:"+new String(message.getBody()));},cancelCallback->{});}
}

正常隊列長度的限制:

根據c1做修改,測試報錯先刪除原來的隊列與交換機

//設置正常隊列長度的限制
map.put("x-max-length",6);

拒接消息:

添加手動應答拒接。

public class C1 {private static final String NORMAL_EXCHANGE="normal_exchange";private static final String DEAD_EXCHANGE="dead_exchange";public static final String NORMAL_QUEUE="normal_queue";public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitUtils.rabbitConnection();
//      創建c1交換機channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.TOPIC);
//       聲明普通隊列HashMap<String, Object>  map = new HashMap<>();
//        設置過期時間 10s  單位ms
//        map.put("x-message-ttl",100000);
//        正常隊列設置死信交換機map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//       設置死信的routerKeymap.put("x-dead-letter-routing-key","lisi");
//        設置正常隊列長度的限制
//        map.put("x-max-length",6);//    創建普通隊列channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//創建死信隊列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//        綁定channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");channel.basicConsume(NORMAL_QUEUE,false,(consumerTag, message) -> {String msg = new String(message.getBody());System.out.println("C1消息為:"+msg);
//            拒接對應消息if(msg.equals("info2")){
//                deliveryTagchannel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else{channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}},cancelCallback->{});}
}

SpringAMQP

官網地址:Spring AMQP

Spring AMQP 是 Spring 框架中的一個模塊,它提供了基于 AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)標準的抽象層,用于簡化在 Spring 應用程序中使用消息隊列的過程。Spring AMQP 不僅簡化了與消息代理(如 RabbitMQ)的集成,還提供了一套高度可配置的模板類來生產、消費消息,并管理AMQP基礎設施組件,如隊(Queue)、交換機(Exchange)和綁定(Binding)。

使用

      <!--AMQP依賴,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

生產者

logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 38.6.217.70port: 5672username: itcastpassword: 123321virtual-host: /
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() throws InterruptedException {// 1.發送消息String message = "Hello, Spring Amqp!";rabbitTemplate.convertAndSend("simple.queue", message);}
}

?

消費者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//監聽機制
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) {System.out.println("spring接收到的消息是:" + msg);}
}

預取限制

案例:將50條消息在一秒內分類交給兩個消費者消費。

//生成者   
@Testpublic void testSendWordSimpleQueue() throws InterruptedException {// 1.發送消息String key ="simple.queue";String message = "Hello, Spring Amqp____";for (int i = 0; i < 49; i++) {rabbitTemplate.convertAndSend(key, message+i);Thread.sleep(20);}}
//消費者
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring接收到的消息是:" + msg+"___"+ LocalDateTime.now());Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenFanoutQueue1(String msg) throws InterruptedException {System.err.println("FanoutQueue1接收到的消息是:" + msg+"___"+ LocalDateTime.now());Thread.sleep(200); //模擬性能}
}

通過執行結果我們可以看出listenFanoutQueue1這個監聽器執行的是奇數,而listenSimpleQueueMessage則是偶數。且時間超出了1秒。為什么呢?

因為在生產者發送到隊列中時,消費者會預取消息,在默認情況下進行平分機制,在上面代碼中我們可以看到我們使用了線程睡眠的方式模擬了性能,在平分的情況下,睡眠200的執行了25條,所以導致了超出了1s。 如何調整呢?

logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 38.6.217.70port: 5672username: itcastpassword: 123321virtual-host: /listener: #設置預取simple:prefetch: 1 #每次只取一條#這段配置的作用是在使用 RabbitMQ 的時候,配置消費者監聽器的簡單模式,并設置消息預取值為 1。這意味著每次只會從隊列中取出一條消息進行處理,處理完后再去取下一條消息。這種方式可以保證消息的順序處理。

發布與訂閱

?

fanoutExchange

這種交換機需要進行綁定對應的隊列,綁定對應的隊列后,生產者將消息推送給交換機,交換機會將消息分別都發給綁定的消息隊列。

實現

//消費者配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutExchangeConfig {//    創建隊列1  fanout.queue1@Beanpublic Queue queue1(){return new Queue("fanout.queue1");}
//    創建交換機 fanoutExchange@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanoutExchange");}
//   隊列1綁定交換機@Beanpublic Binding bindingExchange1(){return BindingBuilder.bind(queue1()).to(fanoutExchange());}//    創建隊列1  fanout.queue2@Beanpublic Queue queue2(){return new Queue("fanout.queue2");}//   隊列2綁定交換機@Beanpublic Binding bindingExchange2(){return BindingBuilder.bind(queue2()).to(fanoutExchange());}
}

?

消費者

@Component
public class SpringRabbitListener {@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1QueueMessage(String msg) throws InterruptedException {System.out.println("fanout.queue1接收到的消息是:" + msg+"___"+ LocalDateTime.now());}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2QueueMessage(String msg) throws InterruptedException {System.out.println("fanout.queue2接收到的消息是:" + msg+"___"+ LocalDateTime.now());}
}

生產者

    @Testpublic void testSendMessageFanoutQueue()  {// 1.發送消息String message = "Hello, testSendMessageFanoutQueue !";
//       交換機名稱String exchange = "fanoutExchange";rabbitTemplate.convertAndSend(exchange,"",message);}
DirectExchange

這種交換機需要指定一個key進行發送,通過可以區別發送到那個隊列,同時這些隊列也可以綁定相同的key,那么也就是實現了fanout的效果。?

實現

//消費者
@Component
public class DirectExchangeListener {//    可以通過@bena的方式進注入,這里我們采用@RabbitListenner的方式@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),//綁定的隊列exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),//綁定的交換機key = {"red", "blue"} //綁定的key))public void listenDirectQueue1(String msg) throws InterruptedException {System.out.println("listenDirectQueue1接收到的消息是:" + msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),//綁定的隊列exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),//綁定的交換機key = {"red", "yellow"} //綁定的key))public void listenDirectQueue2(String msg) throws InterruptedException {System.out.println("listenDirectQueue2接收到的消息是:" + msg);}
}
//生產者@Testpublic void testSendMessageDirectQueue()  {String routingKey = "yellow";// 1.發送消息String message = "Hello, testSendMessageFanoutQueue !"+"__"+routingKey;
//       交換機名稱String exchange = "direct.exchange";rabbitTemplate.convertAndSend(exchange,routingKey,message);}

?

TopicExchange

這種交換機其實和direct類型的交換機差不錯,只不過它是使用通配符的方式。

使用

//消費者
@Component
public class TopicExchangeListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),key = {"china.#"}))public void listenTopicQueue1(String msg) throws InterruptedException {System.out.println("topic.queue1接收到消息:" + msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),key = {"#.news"}))public void listenTopicQueue2(String msg) throws InterruptedException {System.out.println("topic.queue2接收到消息:" + msg);}
}
//生產者   
@Testpublic void testSendMessageTopicQueue()  {String routingKey = "news";// 1.發送消息String message = "Hello, testSendMessageTopicQueue !"+"__"+routingKey;
//       交換機名稱String exchange = "topic.exchange";rabbitTemplate.convertAndSend(exchange,routingKey,message);}

消息轉換器

例子:

//我們聲明一個objQueue
@Beanpublic Queue objQueue(){return new Queue("obj.queue");}//發送消息@Testpublic void testSendMessageobjQueue()  {Map<String, Object> map = new HashMap<>();map.put("name","test");map.put("age",18);rabbitTemplate.convertAndSend("obj.queue",map);}

我們重rabbitmq的ui界面中我們可以發現消息是基于JDK完成的序列化。

缺點:這樣不能很直接的看出消息的結果,并且占用大量內存,所以下面我們使用jdckson進行json序列化。

發送者

依賴:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>

配置bean

//生產者配置
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}

?

消費者

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>
//銷售者配置
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}@RabbitListener(queues = "obj.queue")public void listenObjQueueMessage( Map<String, Object> msg) throws InterruptedException {System.out.println("obj.queue接收到的消息是:" + msg);}

后續會更新使用MQ做的具體案例:秒殺、訂單業務處理等。?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/37976.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/37976.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/37976.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

容器技術-docker2

容器化技術Docker Docker介紹 官網&#xff1a; docker.io docker.com 公司名稱&#xff1a;原名dotCloud 14年改名為docker 容器產品&#xff1a;docker 16年已經被更名為Moby docker-hub docker.io docker容器歷史 和虛擬機一樣&#xff0c;容器技術也是一種資源隔…

java基于ssm+jsp 二手手機回收平臺系統

1前臺首頁功能模塊 二手手機回收平臺系統&#xff0c;在系統首頁可以查看首頁、手機商城、新聞資訊、我的、跳轉到后臺、購物車等內容&#xff0c;如圖1所示。 圖1前臺首頁功能界面圖 用戶注冊&#xff0c;在用戶注冊頁面可以填寫賬號、密碼、姓名、手機、郵箱、照片、地址、…

深度解析RocketMq源碼-消息推送、持久化、消費全流程

1.緒論 前面的幾篇文章都剖析了broker的存儲文件。那么生產者發送一條消息到達broker過后是如何處理的&#xff0c;這條消息結果什么處理過后&#xff0c;消費者才能夠消費這條消息。接下來&#xff0c;帶我們將仔細剖析一下一條消息從生產者生產消息&#xff0c;到到達broker…

在線字節大端序小端序轉換器

具體請前往&#xff1a;在線字節大端序小端序轉換器

操作系統期末復習真題四

一、前言&#x1f680;&#x1f680;&#x1f680; 小鄭在刷題的過程中幫大家整理了一些常見的考試題目&#xff0c;以及易于遺忘的知識點&#xff0c;希望對大家有所幫助。 二、正文?????? 1.OS的不確定性是指(ABC)。 A.程序的運行次序不確定 B.程序多次運行的時間不…

獨立開發者系列(13)——示例理解面向對象與過程

專業術語晦澀難懂&#xff0c;特別是當你沒有寫過稍微大點的系統的時候&#xff0c;你要理解這里面的區別很難。 從最簡單的早期我們學習開始&#xff0c;我們除了練習hello world掌握了入門函數之后&#xff0c;基本都再練習算法。比如水仙花數的獲取&#xff0c;冒泡排序&…

Redis的使用和原理

目錄 1.初識Redis 1.1 Redis是什么&#xff1f; 1.2 Redis的特性 1.2.1 速度快 1.2.2 基于鍵值對的數據結構服務器 1.2.3 豐富的功能 1.2.4 簡單穩定 1.2.5 持久化 1.2.6 主從復制 1.2.7 高可用和分布式 1.3 Redis的使用場景 1.3.1 緩存 1.3.2 排行榜系統 1.3.3 計數器應用 1.3…

【計算機網絡】HTTPS——更安全的HTTP通信(個人筆記)

學習日期&#xff1a;2024.6.26 內容摘要&#xff1a;HTTPS存在的意義、特點和工作方式 HTTP的缺點——易竊聽、偽裝、篡改 在Web及網絡基礎中&#xff0c;我們已經知道了網頁是怎么打開的&#xff0c;HTTP確實是一個相當優秀和方便的協議&#xff0c;但HTTP也有很多不足&…

【操作系統期末速成】 EP04 | 學習筆記(基于五道口一只鴨)

文章目錄 一、前言&#x1f680;&#x1f680;&#x1f680;二、正文&#xff1a;??????2.1 考點七&#xff1a;進程通信2.2 考點八&#xff1a;線程的概念2.3 考點九&#xff1a;處理機調度的概念及原則2.4 考點十&#xff1a;調度方式與調度算法 一、前言&#x1f680;…

排序(冒泡排序、選擇排序、插入排序、希爾排序)-->深度剖析(一)

歡迎來到我的Blog&#xff0c;點擊關注哦&#x1f495; 前言 排序是一種基本的數據處理操作&#xff0c;它涉及將一系列項目重新排列&#xff0c;以便按照指定的標準&#xff08;通常是數值大小&#xff09;進行排序。在C語言中&#xff0c;排序算法是用來對元素進行排序的一系…

FPGA 690T NVME高速存儲設計

高速存儲設計會有各種需求的考慮&#xff0c;那么對應的方案也不完全相同&#xff0c;這篇文章出一期純FPGA實現的高速存儲方案。用純fpga實現高速存儲板卡有易國產化&#xff0c;功耗低和體積小等特點&#xff0c;缺點就是靈活性不是很強&#xff0c;實現標準ext4和nfs文件系統…

計算機的錯誤計算(十六)

摘要 計算機的錯誤計算&#xff08;十五&#xff09;中歷史事件給我們的啟示或警示。 計算機的錯誤計算&#xff08;十五&#xff09;介紹了歷史上發生的一些事件。從這些事件我們可以得到一些啟示或警示。 若不是油氣平臺的沉沒&#xff0c;設計者會得出精度低了嗎&#x…

信息盲盒系統設計

信息盲盒系統是一種結合了隨機性和趣味性的信息傳遞和接收方式&#xff0c;類似于實體盲盒的概念&#xff0c;但在數字領域應用。這種系統通常用于增加用戶參與度、提升用戶體驗或作為營銷策略的一部分。設計一個信息盲盒系統需要考慮以下幾個關鍵要素&#xff1a; 1. 定義目標…

數據倉庫建模基礎理論-01-為什么需要數據建模?

一、什么是數據模型&#xff1f; 數據模型是數據庫的基礎結構&#xff0c;用于描述和組織數據的方式。 它不僅是數據庫的底層結構&#xff0c;還是一個概念性工具&#xff0c;幫助理解數據的含義和關系。 數據模型包括數據本身、數據之間的關系、數據的語義&#xff08;含義和…

C++ | Leetcode C++題解之第206題反轉鏈表

題目&#xff1a; 題解&#xff1a; class Solution { public:ListNode* reverseList(ListNode* head) {if (!head || !head->next) {return head;}ListNode* newHead reverseList(head->next);head->next->next head;head->next nullptr;return newHead;} …

在Ubuntu 16.04上安裝和配置GitLab的方法

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到網站。 簡介 GitLab CE&#xff08;Community Edition&#xff09;是一個開源應用程序&#xff0c;主要用于托管 Git 倉庫&#xff0c;并提供額…

AI在創造還是毀掉音樂之論文

AI在創造還是毀掉音樂&#xff1f; 簡介&#xff1a;最近一個月&#xff0c;輪番上線的音樂大模型&#xff0c;一舉將素人生產音樂的門檻降到了最低&#xff0c;并掀起了音樂圈會不會被AI徹底顛覆的討論。短暫的興奮后&#xff0c;AI產品的版權歸屬于誰&#xff0c;創意產業要…

一秒記單詞:音通義通,一秒牢記

一秒記單詞&#xff0c;從小學到高中&#xff0c;一秒牢記 一、小學生記單詞&#xff0c;快速突破 1.1 好的開始&#xff0c;是成功的一半 sun n.太陽 【通】尚 moon n.月亮 【通】母恩 mother n.母親&#xff0c;媽 【通】媽汁 sea n.海&#xff0c;大海 【通】細 sand …

【MySQL基礎篇】SQL指令:DQL及DCL

1、DQL DQL - 介紹 DQL英文全稱是Data Query Language(數據查詢語言)&#xff0c;數據查詢語言&#xff0c;用來查詢數據表中的記錄。&#xff08;在MySQL中應用是最為廣泛的&#xff09; 查詢關鍵字&#xff1a;SELECT DQL - 語法 SELECT 字段列表 FROM 表名列表 WHER…

【人工智能學習之圖像操作(六)】

【人工智能學習之圖像操作&#xff08;六&#xff09;】 Hough變換直線檢測圓檢測 圖像分割 Hough變換 在圖像處理中&#xff0c;霍夫變換用來檢測任意能夠用數學公式表達的形狀&#xff0c;即使這個形狀被破壞或者有點扭曲 直線檢測 import cv2 import numpy as np image …