文章目錄
- 一.搭建SpringBoot環境
- 二.配置
- 1.配置application.yml
- 2.定義RabbitConfig類
- 三.生產端
- 四.消費端
一.搭建SpringBoot環境
我們選擇基于Spring-Rabbit去操作RabbitMQ
使用spring-boot-starter-amqp會自動添加spring-rabbit依賴,如下:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>
二.配置
1.配置application.yml
配置連接rabbitmq的參數
server:port: 44000
spring:application:name: test-rabbitmq-producerrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtualHost: /
2.定義RabbitConfig類
定義RabbitConfig類,配置Exchange、Queue、及綁定交換機
本例配置Topic交換機
package com.xuecheng.test.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitmqConfig {public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";public static final String QUEUE_INFORM_SMS = "queue_inform_sms";public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";public static final String ROUTINGKEY_EMAIL="inform.#.email.#";public static final String ROUTINGKEY_SMS="inform.#.sms.#";//聲明交換機@Bean(EXCHANGE_TOPICS_INFORM)public Exchange EXCHANGE_TOPICS_INFORM(){//durable(true) 持久化,mq重啟之后交換機還在return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();}//聲明QUEUE_INFORM_EMAIL隊列@Bean(QUEUE_INFORM_EMAIL)public Queue QUEUE_INFORM_EMAIL(){return new Queue(QUEUE_INFORM_EMAIL);}//聲明QUEUE_INFORM_SMS隊列@Bean(QUEUE_INFORM_SMS)public Queue QUEUE_INFORM_SMS(){return new Queue(QUEUE_INFORM_SMS);}//ROUTINGKEY_EMAIL隊列綁定交換機,指定routingKey@Beanpublic Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();}//ROUTINGKEY_SMS隊列綁定交換機,指定routingKey@Beanpublic Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();}}
三.生產端
使用RarbbitTemplate發送消息
package com.xuecheng.test.rabbitmq;
import com.xuecheng.test.rabbitmq.config.RabbitmqConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class Producer05_topics_springboot {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testSendByTopics(){
for (int i=0;i<5;i++){
String message = "sms email inform to user"+i;
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message);
System.out.println("Send Message is:'" + message + "'");
}
}
}
四.消費端
創建消費端工程,添加依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency
使用@RabbitListener注解監聽隊列
package com.xuecheng.test.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import com.xuecheng.test.rabbitmq.config.RabbitmqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ReceiveHandler {
//監聽email隊列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
public void receive_email(String msg,Message message,Channel channel){
System.out.println(msg);
} /
/監聽sms隊列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
public void receive_sms(String msg,Message message,Channel channel){
System.out.println(msg);
}
}