-
什么是消息中間件
簡單的來說就是消息隊列中間件,生產者發送消息到中間件,消息中間件用于
保存消息并發送消息到消費者。 -
消息中間件RabbitMQ的基本組件
1)producer -生產者
2)customer -消費者
3)broker (經紀人)- MQ服務器,管理消息對列、消息及相關消息。(接收并存儲生產者發送的消息,發送消息到消費者)
4)exchange-交換機,將生產者的消息按照一定規則發送給對應的消息對列queue
5)queue-消息對列,隊列,消息存放的容器,消息先進先出
6)Message-消息,程序間的通信的數據 -
什么是消息隊列queue(生產者生產msg-queue,消費者監聽queue-消費)
消息對列是一種分布式中的通信方式,它通過異步傳輸消息的方式,來解耦消 息的 生產者和消費者。在消息中間件中,生產者將消息發送到消息對列中,以為先進先出的方式,消費者從對列中取出消息(可以監聽對列是否有消息-@RabblitListener和@RabbitHandler) -
消息中間件的作用
主要有三個作用:分別是服務解耦、實現異步通信、流量削峰
1). 服務解耦:(場景-用戶下訂單、庫存服務工作)
例如訂單服務-用戶下訂單,庫存服務處理對應減庫存,才返回給用戶下單成功的消息。如果說庫存服務出現了問題,就會造成訂單丟失等問題。如果使用消息中間件(消息對列),可以把下的訂單信息—> mq就返回用戶下單這個,mq再發送給庫存服務,這樣生產者發送消息和消費者接收處理消息相互不影響,即使宕機了,消息還在中間件中。
2). 異步通信/異步調用:(用戶注冊新用戶,服務發送短信和郵件)
傳統的模式,用戶注冊系統新用戶,服務給用戶發送短信和郵件,三個操作都完成之后才返回用戶下單注冊的消息。因為短信和郵箱和注冊信息是沒有關系的服務,用戶注冊后消息發送給mq,用戶不需要等郵件和短信發送成功,mq直接返回用戶注冊成功,至此用戶注冊業務完成。至于短信和郵件交給mq發送給短信業務-去發送。
注意:
異步就是某線程發出請求,不需要等其他線程完成就接著完成操作。用戶注冊,消息發送給mq,不需要等短信服務完成,短信發布發送都與注冊無關,兩者是異步關系。異步不是并發,所有操作同時進行,異步是各過各的。
3). 流量削峰:(商品秒殺)
例如商品秒殺的時候,這時候數據庫并不能承受這么大的請求。可以把請求下訂單的信息暫存在mq中,返回給用戶下單成功,之后的操作由mq發送給對應的服務處理。緩存數據減少數據庫的壓力。
-
為什么需要使用消息中間件
服務解耦、異步通信、流量削峰 -
消息中間件在分布式系統中使用場景(異步)
6.1 服務解耦-訂單和庫存服務。用戶下訂單,消息發給mq,mq返回用戶下訂單成功,消費者-庫存服務接收mq消息再去調用減少庫存的消息。
6.2 異步通信-用戶注冊新賬戶 用戶注冊和admin發送短信和郵件異步
6.3 流量削峰-商品秒殺,先mq先存儲訂單信息,返回訂單服務下單成功,后慢慢處理。減少大并發對數據庫的影響/。 -
RabbitMQ的五種消息模型/工作模式、
1) simple 簡單的一對一模式,producce-queue-customer
2) word模式,一個消息對列queue—> 多個消費者,消費者爭搶消息隊列里面消息,注意一個消息只能被一個消費者消費。
3) fanout-廣播、訂閱者模式。交換機將消息發送給所有binding的對列,消費端可以有多個customer使用word模式消費對列的消息。
4) topic-主體模式,生產者的消息按照不同的路由規則,模糊匹配給不同滿足條件的消息對列,消費者再去消費對列中消息
5)routeKey,路由鍵(exchange-type-direct),按照不同的路由鍵發送到對應的queue中。 -
消息中間件是異步還是同步
異步,各干各的,互不影響。(異步并不是并發-同時請求一個請求,而是互不影響個干各的,沒有約束和先后順序)。received生產者的message,send消息到消費者。二者是異步,解耦合互不影響。 -
mq的消息確認機制confirm(MQ如何避免消息丟失?)
- . 對于生產者端來說,主要有兩種確認機制
a. message到broker后,mq立馬確認confirm并返回消息告知生產者消息發送成功,如果失敗也告知生產者,并重新發送。
b. message到MQ之后,如果消息對列沒有received成功(queue存儲msg成功),會確認并返回消息接收失敗到生產者
a b 保證了生產者端不會丟失消息。
2). 對于消費者來說。
a. 消費者接收到queue的消息后,默認自動確認,queue刪除該message。
b. 消費者接收到msg后,對數據進行邏輯處理,如果直接confirm-queue直接刪除msg,處理數據過程中可能會宕機消息丟失。
----設置為手動confirm確認收貨,數據處理完再收貨成功,queue再去刪除msg。也可以對數據不滿,退回到queue重新入隊,也可以直接刪除數據。
c. 接收失敗告知queue,不會刪除數據,MQ重新發送消息-這種操作很常見
這樣避免數據在消費者端丟失 - . 對于生產者端來說,主要有兩種確認機制
1、2兩種方式避免了mq的消息丟失。
-
MQ重復消費
1)如何造成重復消費
(1) 生產者端,傳輸到MQ-queue消息對列接收成功,MQ因為網絡問題沒有ack->producer,導致生產者又發送了一次消息到MQ。queue-customer-這樣msg就被消費了兩次。
(2)消費者端,MQ-queue消息對列消息傳到customer。一種是消費者沒有接收成功,因為網絡問題沒有ack queue,queue重復發送,這種不會造成msg重復消費。另一種是消費者消費成功,但是因為不可控因素沒有ack queue,消息對列重復發送mgs-to-customer-重復消費。2)解決方法
對于冪等性消息(查詢),消費者重復消費也沒有關系。
對于非冪等性消息,消費者重復消費就會有影響了。
方法:消費者在消費消息之前,獲取msg唯一id,到redis進行存儲判斷setnx(判斷是已經存在并存儲key-value)。
Boolen flag=stringRadisTemplate.opsForValue.setAbsent(id,value);
1-flag=true,key不存在,未被消費,c正常消費msg
2-false,key存在,已經被消費(兩種可能-正在消費或者完成消費-忘記告知ack-queue了),無論哪種情況都直接丟棄。
注意一個問題:如果redis顯示有消費記錄-且消費者正在消費,此時消費者執行業務宕機了,redis分布式鎖會成死鎖-解決方法在IfAbsent方法加上過期時間和單位。
一句話就是:消費之前,緩存中有消費記錄則丟棄消息,不二次消費。
redis緩存中沒有消費記錄則,重復存入緩存并消費(設計鎖過期時間)。
以下是消息中間件MQ的相關代碼和配置信息
- 使用MQ的步驟
1)在pom文件中加上依賴amqp
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency
2) 配置文件配置rabbit服務器的對應信息(spring.rabbitmq host、port,username,ps等)
spring.rabbitmq.host=rabbitmq服務器地址信息
spring.rabbitmq.port=端口號
spring.rabbitmq.username=賬戶name
spring.rabbitmq.password=密碼
spring.rabbitmq.virtual-host=/
#1. 生產者發送message, mq收到消息就確認回復到生產者
spring.rabbitmq.publisher-confirms=tr
#2. queue消息對列接收生產者的消息失敗,就確認返回消息到生產操者
spring.rabbitmq.publisher-returns=true
#3. 消費者接收queue消息對列的消息之后,手動確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
3) 服務啟動類上面加上注解@EnableRabbit-開啟MQ
在springboot啟動類加上 @EnableRabbit-開啟MQ
4) 業務使用消息中間件存儲消息的時候
(1) 創建交換機(注意有不同類型的交換機 direct-fanout-topic)
public void createExchange() {
// 1. 創建direct類型的exchange 交換機的名字-hello.java.exchangeDirectExchange directExchange = new DirectExchange("hello.java.exchange", true, false);
// 2. 聲明交換機amqpAdmin.declareExchange(directExchange);log.info("exchange創建成功1111", "hello.java.exchange");}
(2)創建消息隊列queue
public void createQueue() {
// 1. 創建隊列-queue 隊列名稱-hello-java-queueQueue queue = new Queue("hello.java.queue", true, false, false);
// 2. 聲明mq隊列amqpAdmin.declareQueue(queue);log.info("queue創建成功1111", "hello.java.queue");}
(3)交換機和消息隊列直接關系綁定
public void bindEQ() {
// 1. 創建綁定對象( "hello.java.queue"--消息對列, "hello.java.exchange"--交換機,"hello.java"-綁定關系的route-key)Binding binding = new Binding("hello.java.queue",Binding.DestinationType.QUEUE,"hello.java.exchange","hello.java",null);// 2. 聲明綁定關系(這個關系實際也是一個對象)amqpAdmin.declareBinding(binding);log.info("Binding創建成功1111", "hello.java.binding");}
(4)使用MQ的操作工具類 RabbitTemplate-操作發送消息
對象注入
@AutowiredRabbitTemplate rabbitTemplate;
生產者發送消息,需要攜帶消息-mgs和發送給哪個queue的route-key。注意發送消息需要一個唯一id,后面防止重復發送需要此id判斷
public void sendMessageStr() throws InterruptedException {String msg = "測試數據測試數";
// 發送10條message到exchange中
// new CorrelationData(UUID.randomUUID().toString() 發送的消息的唯一id mq可以接收并處理rabbitTemplate.convertAndSend("hello.java.exchange", "hello.java" , msg+ "11111111111111", new CorrelationData(UUID.randomUUID().toString()));rabbitTemplate.convertAndSend("hello.java.exchange", "hello.java", msg + "222222222222", new CorrelationData(UUID.randomUUID().toString()));log.info("交換機消息發送成功----------->");}
(4)消費者監聽消息對列消息,消費消息使用@RabbitListener監聽消息對列,使用RabbitHandler接收對應類型的消息。前者放在類上面,后者放到監聽方法上面。
queues是消息對列名稱的集合
@RabbitListener(queues = {"hello.java.queue"})
使用@RabbitHandler監聽不同類型的消息
// 消息是TestEntity2 類型,會自動匹配到對應方法接收
@RabbitHandler
public void receiveOfSecond(TestEntity2 testEntity2) throws InterruptedException {System.out.println("receiveOfSecond-監聽接受queue的數據是----->" + testEntity2);
}@RabbitHandler
public void receiveOfFirst(TestEntity testEntity) throws InterruptedException {System.out.println("receiveOfFirst-監聽接受queue的數據是----->" + testEntity);
}