文章目錄
- Spring集成RabbitMQ
- 1. AMQP&SpringAMQP
- 2. SpringBoot集成RabbitMQ
- 3. 模型
- work模型
- 4.交換機
- Fanout交換機
- Direct交換機
- Topic交換機
- 5.聲明式隊列和交換機
- 基于API聲明
- 基于注解聲明
- 6.消息轉換器
Spring集成RabbitMQ
1. AMQP&SpringAMQP
- AMQP(高級消息隊列協議):Advanced Message Queuing Protocol,是用于在應用程序之間傳遞業務消息的開放標準。該協議與語言和平臺無關,更符合微服務中獨立性的要求。是一種面向消息通信的協議,就像HTTP協議是一種瀏覽器向服務器發消息的協議。
- SpringAMQP:Spring AMOP是基于AMQP協議定義的一套API規范,提供了模板來發送和接收消息。包含兩部分,其中spring-amqp是基礎抽象spring-rabbit是底層的默認實現。也就是說SpringAMQP只是一種思想,而spring-rabbit是其具體實現
2. SpringBoot集成RabbitMQ
在Maven依賴中引入amqp的起步依賴即可
<!--AMQP依賴,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在Spring配置文件中配置
spring:rabbitmq:host: 127.0.0.1port: 5672# 虛擬主機virtual-host: /hhyusername: hhypassword: hhy
RabbitTemplate
是Spring封裝好的操作RabbitMQ的工具類
生產者
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessage2Queue() {String queueName = "hhy.q1";String msg = "hello, mq!666";rabbitTemplate.convertAndSend(queueName, msg);}
消費者
@Component
public class MqListener {@RabbitListener(queues = "hhy.q1")public void listenSimpleQueue(String msg){System.out.println("hhy.q1的消息:【" + msg +"】");}}
3. 模型
work模型
假設消息生產者生產消息的速度非常的快,消息消費者消費消息的速度趕不上生產的速度,就會導致MQ隊列中的消息越來越多,從而導致消息堆積問題,如何處理消息堆積問題?
- 讓多個消費者綁定一個隊列,加快消息處理速度
- 還可以在代碼層面使用異步操作,比說線程池
綁定多個消費者,每個消費者的處理能力也可能不一致,而Spring默認將消息以輪詢的方式發送給多個消費者,處理能力慢的消費者還是會影響處理速度,此時就可以通過添加配置prefetch
讓消費者只獲取一條消息處理完成后再獲取,進一步避免消息堆積問題
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息
work模型就是多個消費者綁定一個隊列
@Component
public class MqListener {@RabbitListener(queues = "work.q")public void workListen1(String msg){System.out.println("消費者1:work.q的消息:【" + msg +"】");}@RabbitListener(queues = "work.q")public void workListen2(String msg){System.err.println("消費者2:work.q的消息:【" + msg +"】");}
}
4.交換機
上訴實例代碼中并沒有使用交換機,生產者是直接將消息發送到隊列中,實際這種方式是不合理的,假設多個服務都需要訂閱同一條消息這種方式就無法滿足需求了,那么就要引入交換機。
Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!
交換機的類型有四種:
- Fanout:廣播,將消息交給所有綁定到交換機的隊列。我們最早在控制臺使用的正是Fanout交換機
- Direct:訂閱,基于RoutingKey(路由key)發送給訂閱了消息的隊列
- Topic:通配符訂閱,與Direct類似,只不過RoutingKey可以使用通配符
- Headers:頭匹配,基于MQ的消息頭匹配,用的較少。
Fanout交換機
Fanout交換機其實就是廣播,將生產者發布的消息廣播給綁定的自身的所有消息隊列。發送消息流程:
- 可以有多個隊列
- 每個隊列都要綁定到Exchange(交換機)
- 生產者發送的消息,只能發送到交換機
- 交換機把消息發送給綁定過的所有隊列
- 訂閱隊列的消費者都能拿到消息
根據上訴圖編寫代碼
// 消費者1消費隊列1
@RabbitListener(queues = "fanout.q1")
public void fanoutListen1(String msg){System.out.println("消費者1:fanout.q1的消息:【" + msg +"】");
}
// 消費者2消費隊列2
@RabbitListener(queues = "fanout.q2")
public void fanoutListen2(String msg){System.out.println("消費者1:fanout.q2的消息:【" + msg +"】");
}
生產者向Fanout類型交換機發送消息,前提需要創建Fanout類型的交換機
@Test
void testSendFanout() {// 交換機名稱String exchangeName = "amq.fanout";String msg = "hello, fanout!";rabbitTemplate.convertAndSend(exchangeName, null, msg);
}
Direct交換機
在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。
在Direct模型下:
- 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個
RoutingKey
(路由key) - 消息的發送方在 向 Exchange發送消息時,也必須指定消息的
RoutingKey
。 - Exchange不再把消息交給每一個綁定的隊列,而是根據消息的
Routing Key
進行判斷,只有隊列的Routingkey
與消息的Routing key
完全一致,才會接收到消息
通過key進行綁定,如下圖也就是說生產者發送消息時指定key為test
兩個消費者內的隊列都能收到,key為java
時只有dirct.q1
隊列能收到,key為cpp
時只有dirct.q2
隊列能收到
消費者代碼
@RabbitListener(queues = "direct.q1")
public void fanoutDirect1(String msg){System.out.println("消費者1:direct.q1的消息:【" + msg +"】");
}
@RabbitListener(queues = "direct.q2")
public void fanoutDirect2(String msg){System.out.println("消費者2:direct.q2的消息:【" + msg +"】");
}
生產者代碼
生產者在指定消息時指定不同的key來發送消息
@Test
void testSendDirect() {String exchangeName = "hhy.direct";String msg = "所有隊列都能收到該消息";rabbitTemplate.convertAndSend(exchangeName, "test", msg);
}
@Test
void testSendDirect() {String exchangeName = "hhy.direct";String msg = "只有隊列direct.q1能收到消息";rabbitTemplate.convertAndSend(exchangeName, "java", msg);
}
@Test
void testSendDirect() {String exchangeName = "hhy.direct";String msg = "只有隊列direct.q2能收到消息";rabbitTemplate.convertAndSend(exchangeName, "cpp", msg);
}
Topic交換機
Topic
類型的Exchange
與Direct
相比,都是可以根據RoutingKey
把消息路由到不同的隊列。
只不過Topic
類型Exchange
可以讓隊列在綁定BindingKey
的時候使用通配符!也就是說Topic
交換機是非常靈活的,Bindingkey
支持模糊匹配。
BindingKey
一般都是有一個或多個單詞組成,多個單詞之間以.
分割,例如: china.hunan
通配符規則:
#
:匹配一個或多個詞*
:匹配不多不少恰好1個詞
假設有多個隊列綁定的Bindingkey分別為:
china.hunan.chenzhou.weather
:湖南郴州的天氣china.hunan.chenzhou.news
:湖南郴州的新聞china.zhejiang.hangzhou.weather
:浙江杭州的天氣japan.tokyo.news
:日本東京的新聞
那么使用通配符:
china.hunan.#
:表示接受湖南的所有新聞和天氣消息#.news
:表示接受所有新聞消息china.hunan.*.news
:表示接受湖南省各個市區的新聞
建立綁定關系:
代碼實例
// 消費者
@RabbitListener(queues = "topic.q1")
public void topicListen1(String msg){System.out.println("消費者1:topic.q1的消息:【" + msg +"】");
}@RabbitListener(queues = "topic.q2")
public void topicListen2(String msg){System.out.println("消費者2:topic.q2的消息:【" + msg +"】");
}
生產者代碼
這一條消息topic.q1
和topic.q2
兩個隊列都能收到消息,因為它們和交換機綁定的關系的時候指定的KEY:
#.news
:接受所有地方的新聞china.hunan.#
:接受湖南的新聞和天氣
@Test
void testSendTopic() {// 交換機名稱String exchangeName = "hhy.topic";String key = "china.hunan.chenzhou.news";String msg = "這是一條湖南郴州的新聞!";rabbitTemplate.convertAndSend(exchangeName, key, msg);
}
下面這條消息只有topic.q2
能收到,因為topic.q2
和交換機綁定時指定的KEY為china.hunan.#
,接受湖南的所有天氣和新聞消息
@Test
void testSendTopic() {// 交換機名稱String exchangeName = "hhy.topic";String key = "china.hunan.chenzhou.weather";String msg = "郴州今天多云轉晴";rabbitTemplate.convertAndSend(exchangeName, key, msg);
}
小結:
描述下Direct交換機與Topic交換機的差異?
- Topic交換機接收的消息RoutingKey必須是多個單詞,以
**.**
分割 - Topic交換機與隊列綁定時的bindingKey可以指定通配符
#
:代表0個或多個詞*
:代表1個詞
5.聲明式隊列和交換機
通過RabbitMQ提供的管理頁面創建隊列和交換機比較麻煩,SpringAMQP提供了對應API方便開發者來創建隊列和交換機。
基于API聲明
通過Spring提供的API創建fanout交換機和隊列并建立綁定關系
@Configuration
public class FanoutConfiguration {/*** 聲明式創建fanout交換機* @return*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("hhy.fanout");}/*** 聲明式創建隊列* @return*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 聲明式創建綁定關系* @param fanoutQueue1* @param fanoutExchange* @return*/@Beanpublic Binding fanoutBinding3(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}}
但如果使用這種方式創建Direct交換機就會非常麻煩,因為如果要綁定時要指定多個Key就會出現很多冗余代碼,每綁定一個不同的Key就需要多寫一份代碼
@Configuration
public class DirectConfiguration {@Beanpublic DirectExchange directExchange(){return new DirectExchange("test.direct");}@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}@Beanpublic Binding directQueue1BindingRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}@Beanpublic Binding directQueue1BindingBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}@Beanpublic Binding directQueue2BindingRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}@Beanpublic Binding directQueue2BindingBlue(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}
基于注解聲明
基于@Bean的方式聲明隊列和交換機的方式比價麻煩,代碼有點冗余,Spring還為我們提供基于注解的方式來聲明。
使用注解的方式聲明Direct
模式的交換機和隊列,通過注解聲明這種創建方式更簡單清爽,一個注解直接創建交換機并且綁定隊列。并且對應消費者直接就可以監聽隊列接收消息
@Component
public class MqListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenSimpleQueue1(String msg){System.out.println("消費者1:收到了simple.queue的消息:【" + msg +"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenSimpleQueue2(String msg){System.out.println("消費者2:收到了simple.queue的消息:【" + msg +"】");}
}
6.消息轉換器
前面我們生產者發送的消息都是一些字符串,當我們發送的消息是一個對象的時候就會出現問題。
@Test
void testSendObject() {String exchangeName = "test.direct";Map<String, Object> msg = new HashMap<>(2);msg.put("name", "jack");msg.put("age", 21);rabbitTemplate.convertAndSend(exchangeName, "red", msg);
}
如下圖RabbitMQ中的消息隊列中存儲的消息,數據類型是通過JDK自帶的序列化后的數據
而JDK自帶的序列化,存在以下問題:
- 消息體積大
- 毫無可讀性
- 有安全漏洞,利用Java字節碼反序列化能被替換惡意代碼
所以使用JDK自帶的序列化方式并不合適,那么我可以使用JSON的序列化方式來解決這個問題。
使用jackson就行,引入jackson依賴
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>
將消息轉換器交給Spring管理
@Bean
public MessageConverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();
}