一、概念介紹:
RabbitMQ中幾個重要的概念介紹:
-
Channels:信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的 TCP 連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
-
Exchanges:交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列。
- 交換機類型主要有以下幾種:
- Direct Exchange(直連交換機):這種類型的交換機根據消息的Routing Key(路由鍵)進行精確匹配,只有綁定了相同路由鍵的隊列才會收到消息。適用于點對點的消息傳遞場景。
- Fanout Exchange(扇形交換機):這種類型的交換機采用廣播模式,它會將消息發送給所有綁定到該交換機的隊列,不管消息的路由鍵是什么。適用于消息需要被多個消費者處理的場景。
- Topic Exchange(主題交換機):這種類型的交換機支持基于模式匹配的路由鍵,可以使用通配符*(匹配一個單詞)和#(匹配零個或多個單詞)進行匹配。適用于實現更復雜的消息路由邏輯。
- Headers Exchange(頭交換機):這種類型的交換機不處理路由鍵,而是根據發送的消息內容中的headers屬性進行匹配。適用于需要在消息頭中攜帶額外信息的場景。
-
Queues:消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
二、引入依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
三、添加配置信息
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestlistener:simple:acknowledge-mode: manual # 手動提交
四、Direct Exchange(直連交換機)模式
1、新建配置文件 RabbitDirectConfig類
package com.example.direct;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author wasin* @version 1.0* @date 2024/6/4* @description: 直連交換機--這種類型的交換機根據消息的Routing Key(路由鍵)進行精確匹配,* 只有綁定了相同路由鍵的隊列才會收到消息。適用于點對點的消息傳遞場景*/
@Configuration
public class RabbitDirectConfig {/*** 隊列名稱*/public static final String QUEUE_MESSAGE ="QUEUE_MESSAGE";public static final String QUEUE_USER ="QUEUE_USER";/*** 交換機*/public static final String EXCHANGE="EXCHANGE_01";/*** 路由*/public static final String ROUTING_KEY="ROUTING_KEY_01";@Beanpublic Queue queue01() {return new Queue(QUEUE_MESSAGE, //隊列名稱true, //是否持久化false, //是否排他false //是否自動刪除);}@Beanpublic Queue queue02() {return new Queue(QUEUE_USER, //隊列名稱true, //是否持久化false, //是否排他false //是否自動刪除);}@Beanpublic DirectExchange exchange01() {return new DirectExchange(EXCHANGE,true, //是否持久化false //是否排他);}@Beanpublic Binding demoBinding() {return BindingBuilder.bind(queue01()).to(exchange01()).with(ROUTING_KEY);}@Beanpublic Binding demoBinding2() {return BindingBuilder.bind(queue02()).to(exchange01()).with(ROUTING_KEY);}
}
2、添加消息生產者 Producer類
package com.example.direct;import com.example.entity.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Component
public class Producer {@ResourceRabbitTemplate rabbitTemplate;public void sendMessageByExchangeANdRoute(String message){rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE, RabbitDirectConfig.ROUTING_KEY,message);}/*** 默認交換器,隱式地綁定到每個隊列,路由鍵等于隊列名稱。* @param message*/public void sendMessageByQueue(String message){rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_MESSAGE,message);}public void sendMessage(User user){rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_USER,user);}
}
3、添加消息消費者
package com.example.direct;import com.example.entity.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Component
public class Consumer {@RabbitListener(queues = RabbitDirectConfig.QUEUE_USER)public void onMessage(User user){System.out.println("收到的實體bean消息:"+user);}@RabbitListener(queues = RabbitDirectConfig.QUEUE_MESSAGE)public void onMessage2(String message){System.out.println("收到的字符串消息:"+message);}
}
4、 測試
package com.example;import com.example.entity.User;
import com.example.direct.Producer;
import com.example.fanout.FanoutProducer;
import com.example.topic.TopicProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
class SpringbootRabbitMqApplicationTests {@ResourceProducer producer;@Testpublic void sendMessage() throws InterruptedException {producer.sendMessageByQueue("哈哈");producer.sendMessage(new User().setAge(10).setName("wasin"));}
}
五、Topic Exchange(主題交換機)模式
1、新建RabbitTopicConfig類
package com.example.topic;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author wasin* @version 1.0* @date 2024/6/4* @description: 主題交換機--這種類型的交換機支持基于模式匹配的路由鍵,* 可以使用通配符*(匹配一個單詞)和#(匹配零個或多個單詞)進行匹配。適用于實現更復雜的消息路由邏輯。*/
@Configuration
public class RabbitTopicConfig {/*** 交換機*/public static final String EXCHANGE = "EXCHANGE_TOPIC1";/*** 隊列名稱*/public static final String QUEUE_TOPIC1 = "QUEUE_TOPIC";/*** 路由* "*" 與 "#",用于做模糊匹配。其中 "*" 用于匹配一個單詞,"#" 用于匹配多個單詞(可以是零個)* 可以匹配 aa.wasin.aa.bb wasin.aa.bb wasin.aa ....* aa.bb.wasin.cc 無法匹配*/public static final String ROUTING_KEY1 = "*.wasin.#";@Beanpublic Queue queue() {return new Queue(QUEUE_TOPIC1, //隊列名稱true, //是否持久化false, //是否排他false //是否自動刪除);}@Beanpublic TopicExchange exchange() {return new TopicExchange(EXCHANGE,true, //是否持久化false //是否排他);}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY1);}
}
2、新建 消息生產者和發送者
- TopicProducer類
package com.example.topic;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Component
public class TopicProducer {@ResourceRabbitTemplate rabbitTemplate;/*** @param routeKey 路由* @param message 消息*/public void sendMessageByQueue(String routeKey, String message){rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE,routeKey,message);}}
- TopicConsumer類
package com.example.topic;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Slf4j
@Component
public class TopicConsumer {@RabbitListener(queues = RabbitTopicConfig.QUEUE_TOPIC1)public void onMessage2(String message){log.info("topic收到的字符串消息:{}",message);}
}
六、Fanout Exchange(扇形交換機)模式
1、 新建 RabbitFanoutConfig類
package com.example.fanout;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author wasin* @version 1.0* @date 2024/6/4* @description: 扇形交換機--這種類型的交換機采用廣播模式,它會將消息發送給所有綁定到該交換機的隊列,* 不管消息的路由鍵是什么。適用于消息需要被多個消費者處理的場景。*/
@Configuration
public class RabbitFanoutConfig {/*** 交換機*/public static final String EXCHANGE = "EXCHANGE_FANOUT";/*** 隊列名稱*/public static final String QUEUE_FANOUT1 = "QUEUE_FANOUT";/*** 隊列名稱*/public static final String QUEUE_FANOUT2 = "QUEUE_FANOUT2";@Beanpublic Queue queueFanout1() {return new Queue(QUEUE_FANOUT1, //隊列名稱true, //是否持久化false, //是否排他false //是否自動刪除);}@Beanpublic Queue queueFanout2() {return new Queue(QUEUE_FANOUT2, //隊列名稱true, //是否持久化false, //是否排他false //是否自動刪除);}@Beanpublic FanoutExchange exchangeFanout() {return new FanoutExchange(EXCHANGE,true, //是否持久化false //是否排他);}@Beanpublic Binding bindingFanout() {return BindingBuilder.bind(queueFanout1()).to(exchangeFanout());}@Beanpublic Binding bindingFanout2() {return BindingBuilder.bind(queueFanout2()).to(exchangeFanout());}}
2、新建 消息生產者和發送者
- FanoutProducer類:
package com.example.fanout;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Component
public class FanoutProducer {@ResourceRabbitTemplate rabbitTemplate;/*** @param message 消息*/public void sendMessageByQueue(String message) {rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE, "", message);}}
- FanoutConsumer類
package com.example.fanout;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Slf4j
@Component
public class FanoutConsumer {/*** 手動提交* @param message* @param channel* @param tag* @throws IOException*/@RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT1)public void onMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {log.info("fanout1收到的字符串消息:{}",message);channel.basicAck(tag,false);}@RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT2)public void onMessage2(String message){log.info("fanout2到的字符串消息:{}",message);}
}