方案分析
在上一篇文檔中,詳細講述了如何通過Canal+MQ實現對分庫分表的數據庫和數據表進行數據同步,而在這個方案中,還有一個關鍵點是需要注意的:首先,數據增刪改的信息是保證寫入binlog的,Canal解析出增刪改的信息后寫入MQ,同步程序從MQ中讀取消息,如果MQ中的消息丟失了數據將無法進行同步。
因此就需要對MQ傳遞消息的可靠性進行研究
保證MQ消息的可靠性分為兩個方面:保證生產消息的可靠性、保證消費消息的可靠性。
保證生產消息可靠性
RabbitMQ提供生產者確認機制保證生產消息的可靠性,技術方案如下:
1. 首先發送消息的方法如果執行失敗會進行重試,重試次數耗盡記錄消息失敗
zhilian-framework包中的zhilian-rabbitmq工程下面的client包的消息處理類,用于發送消息
package com.zhilian.rabbitmq.client;import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.zhilian.common.expcetions.MqException;
import com.zhilian.common.utils.DateUtils;
import com.zhilian.common.utils.JsonUtils;
import com.zhilian.common.utils.NumberUtils;
import com.zhilian.rabbitmq.dao.FailMsgDao;
import com.zhilian.rabbitmq.plugins.DelayMessagePostProcessor;
import com.zhilian.rabbitmq.plugins.RabbitMqListenableFutureCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;import javax.annotation.Resource;/*** 消息處理類**/
@Slf4j
@Service
public class RabbitClient {@Resourceprivate RabbitTemplate rabbitTemplate;@Autowired(required = false)private FailMsgDao failMsgDao;@Resourceprivate RabbitClient rabbitClient;public void sendMsg(String exchange, String routingKey, Object msg) {rabbitClient.sendMsg(exchange, routingKey, msg, null, null, false);}/*** 發送消息 重試3次** @param exchange 交換機* @param routingKey 路由key* @param msg 消息對象,會將對象序列化成json字符串發出* @param delay 延遲時間 秒* @param msgId 消息id* @param isFailMsg 是否是失敗消息* @return 是否發送成功*/@Retryable(value = MqException.class, maxAttempts = 3, backoff = @Backoff(value = 3000, multiplier = 1.5), recover = "saveFailMag")public void sendMsg(String exchange, String routingKey, Object msg, Integer delay, Long msgId, boolean isFailMsg) {// 1.發送消息前準備// 1.1獲取消息內容,如果非字符串將其序列化String jsonMsg = JsonUtils.toJsonStr(msg);// 1.2.全局唯一消息id,如果調用者設置了消息id,使用調用者消息id,如果為配置,默認雪花算法生成消息idmsgId = NumberUtils.null2Default(msgId, IdUtil.getSnowflakeNextId());// 1.3.設置默認延遲時間,默認立即發送delay = NumberUtils.null2Default(delay, -1);log.debug("消息發送!exchange = {}, routingKey = {}, msg = {}, msgId = {}", exchange, routingKey, jsonMsg, msgId);// 1.4.構建回調RabbitMqListenableFutureCallback futureCallback = RabbitMqListenableFutureCallback.builder().exchange(exchange).routingKey(routingKey).msg(jsonMsg).msgId(msgId).delay(delay).isFailMsg(isFailMsg).failMsgDao(failMsgDao).build();// 1.5.CorrelationData設置CorrelationData correlationData = new CorrelationData(msgId.toString());correlationData.getFuture().addCallback(futureCallback);// 1.6.構造消息對象Message message = MessageBuilder.withBody(StrUtil.bytes(jsonMsg, CharsetUtil.CHARSET_UTF_8))//持久化.setDeliveryMode(MessageDeliveryMode.PERSISTENT)//消息id.setMessageId(msgId.toString()).build();try {// 2.發送消息this.rabbitTemplate.convertAndSend(exchange, routingKey, message, new DelayMessagePostProcessor(delay), correlationData);} catch (Exception e) {log.error("send error:" + e);// 3.構建異常回調,并拋出異常MqException mqException = new MqException();mqException.setMsg(ExceptionUtil.getMessage(e));mqException.setMqId(msgId);throw mqException;}}/*** @param mqException mq異常消息* @param exchange 交換機* @param routingKey 路由key* @param msg mq消息* @param delay 延遲消息* @param msgId 消息id*/@Recoverpublic void saveFailMag(MqException mqException, String exchange, String routingKey, Object msg, Integer delay, String msgId) {//發送消息失敗,需要將消息持久化到數據庫,通過任務調度的方式處理失敗的消息failMsgDao.save(mqException.getMqId(), exchange, routingKey, JsonUtils.toJsonStr(msg), delay, DateUtils.getCurrentTime() + 10, ExceptionUtil.getMessage(mqException));}}
@Retryable注解可實現方法執行失敗進行重試,如下:
@Retryable(value = MqException.class, maxAttempts = 3, backoff = @Backoff(value = 3000, multiplier = 1.5), recover = "saveFailMag")
屬性說明如下:
value:拋出制定異常才會重試
include:和value一樣,默認為空,當exclude也為空時,默認所有異常
exclude:指定不處理的異常
maxAttempts:最大重試次數,默認3次
backoff:重試等待策略,默認使用@Backoff,@Backoff的value默認為1000L,我們設置為3000L;表示第一次失敗后等待3秒后重試,multiplier(指定延遲倍數)默認為0,如果把multiplier設置為1.5表示每次等待重試時間是上一次的1.5倍,則第一次重試為3秒,第二次為4.5秒,第三次為6.75秒。
Recover: 設置回調方法名,當重試耗盡時,通過recover屬性設置回調的方法名。通過@Recover注解定義重試失敗后的處理方法,在Recover方法中記錄失敗消息到數據庫。
2. 通過MQ提供的生產者確認機制保證生產消息的可靠性
使用生產者確認機制需要給每個信息指定一個唯一ID,生產者確認機制通過異步回調的方式進行,包括ConfirmCallback和Return回調。
ConfirmCallback:消息發送到Broker會有一個結果返回給發送者表示消息是否處理成功:
消息成功投遞到交換機,返回ack
消息未投遞到交換機,返回nack
發送消息時指定回調對象
// 1.4.構建回調RabbitMqListenableFutureCallback futureCallback = RabbitMqListenableFutureCallback.builder().exchange(exchange).routingKey(routingKey).msg(jsonMsg).msgId(msgId).delay(delay).isFailMsg(isFailMsg).failMsgDao(failMsgDao).build();// 1.5.CorrelationData設置CorrelationData correlationData = new CorrelationData(msgId.toString());correlationData.getFuture().addCallback(futureCallback);// 1.6.構造消息對象Message message = MessageBuilder.withBody(StrUtil.bytes(jsonMsg, CharsetUtil.CHARSET_UTF_8))//持久化.setDeliveryMode(MessageDeliveryMode.PERSISTENT)//消息id.setMessageId(msgId.toString()).build();
回調類:RabbitMqListenableFutureCallback
如果沒有返回ack則將消息記錄到失敗消息表,如果經過重試后返回了ack說明消息發送成功,此時將消息從失敗消息表刪除。
package com.zhilian.rabbitmq.plugins;import cn.hutool.core.exceptions.ExceptionUtil;
import com.zhilian.common.utils.DateUtils;
import com.zhilian.rabbitmq.dao.FailMsgDao;
import lombok.Builder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.util.concurrent.ListenableFutureCallback;@Builder
public class RabbitMqListenableFutureCallback implements ListenableFutureCallback<CorrelationData.Confirm> {//記錄失敗消息serviceprivate FailMsgDao failMsgDao;private String exchange;private String routingKey;private String msg;private Long msgId;private Integer delay;//是否是失敗消息private boolean isFailMsg=false;@Overridepublic void onFailure(Throwable ex) {if(failMsgDao == null) {return;}failMsgDao.save(msgId, exchange, routingKey, msg, delay, DateUtils.getCurrentTime() + 10, ExceptionUtil.getMessage(ex));}@Overridepublic void onSuccess(CorrelationData.Confirm result) {if(failMsgDao == null){return;}if(!result.isAck()){// 執行失敗保存失敗信息,如果已經存在保存信息,如果不在信息信息failMsgDao.save(msgId, exchange, routingKey, msg, delay,DateUtils.getCurrentTime() + 10, "MQ回復nack");}else if(isFailMsg && msgId != null){// 如果發送的是失敗消息,當收到ack需要從fail_msg刪除該消息failMsgDao.removeById(msgId);}}
}
Return回調:如果消息發送到交換機成功了但是并沒有到達隊列,此時會調用ReturnCallback回調方法,在回調方法中我們可以收到失敗的消息存入失敗消息表以便進行補償
要使用Return回調需要開啟設置:(在shared-rabbitmq.yaml中配置rabbitMQ參數:
spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true
說明:
publish-confirm-type:開啟publisher-confirm,這里支持兩種類型:
simple:同步等待confirm結果,直到超時
correlated:異步回調,定義ConfirmCallback,MQ返回結果時會回調這個ConfirmCallback
publish-returns:開啟publish-return功能,同樣是基于callback機制,不過是定義ReturnCallback
template.mandatory:定義消息路由失敗時的策略。true,則調用ReturnCallback;false:則直接丟棄消息
對于發送消息失敗之后將消息寫入失敗消息表的邏輯參考如下:
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 獲取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//定義returnCallback回調方法rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {byte[] body = returnedMessage.getMessage().getBody();//消息idString messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();String content = new String(body, Charset.defaultCharset());log.info("消息發送失敗,應答碼{},原因{},交換機{},路由鍵{},消息id{},消息內容{}",returnedMessage.getReplyCode(),returnedMessage.getReplyText(),returnedMessage.getExchange(),returnedMessage.getRoutingKey(),messageId,content);if (failMsgDao != null) {failMsgDao.save(NumberUtils.parseLong(messageId), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), content, null, DateUtils.getCurrentTime(), "returnCallback");}}});
}
保證消費信息可靠性
保證消費消息可靠性方案首先保證發送消息設置為持久化,其次通過MQ的消費確認機制保證消費者消費成功消息后再將消息刪除。
首先設置消息持久化,保證消息發送到MQ消息不丟失。具體需要設置交換機和隊列支持持久化,發送消息設置deliveryMode=2。
RabbitMQ是通過消費者回執來確認消費者是否成功處理消息的:消費者獲取消息后,向RabbitMQ發送ACK回執,表明自己已經處理完成消息,RabbitMQ收到ACK后刪除消息。
消費消息失敗重試3次,仍失敗則將消費失敗的消息放入失敗消息數據庫
通過任務調度掃描失敗消息隊列(錯誤消息隊列)重新發送,達到一定的次數還未成功則由人工處理
核心代碼實現:(從預定義的錯誤隊列中取出之前處理失敗的消息,重新發送到消息原本的目標地址
package com.zhilian.rabbitmq.plugins;import com.zhilian.common.utils.IoUtils;
import com.zhilian.common.utils.JsonUtils;
import com.zhilian.rabbitmq.domain.ErrorRabbitMqMessage;
import com.zhilian.rabbitmq.properties.RabbitmqProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;import javax.annotation.PreDestroy;
import java.io.IOException;
import java.nio.charset.Charset;@Slf4j
public class RabbitMqResender {private RabbitTemplate rabbitTemplate;private RabbitmqProperties rabbitmqProperties;private Channel channel;public RabbitMqResender(RabbitTemplate rabbitTemplate, RabbitmqProperties rabbitmqProperties) {this.rabbitTemplate = rabbitTemplate;this.rabbitmqProperties = rabbitmqProperties;channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);}/*** 從隊列中獲取一條數據并處理,如果沒有消息,返回false,有消息返回true*/public boolean getOneMessageAndProcess() {try {GetResponse response = channel.basicGet(rabbitmqProperties.getError().getQueue(), false);if(response == null) {return false;}ErrorRabbitMqMessage errorRabbitMqMessage = JsonUtils.toBean(new String(response.getBody()), ErrorRabbitMqMessage.class);Message message = MessageBuilder.withBody(errorRabbitMqMessage.getMessage().getBytes(Charset.defaultCharset())).build();rabbitTemplate.send(errorRabbitMqMessage.getOriginExchange(), errorRabbitMqMessage.getOriginRoutingKey(), message);channel.basicAck(response.getEnvelope().getDeliveryTag(), false);return true;}catch (IOException e) {log.error("消息重發失敗,e:",e);return false;}}@PreDestroypublic void destory() {log.info("rabbitmq銷毀...");IoUtils.close(channel);}
}
1. 核心功能//錯誤消息的撈取//從指定的錯誤隊列(error.queue,由RabbitmqProperties配置)中拉取未被確認(unacknowledged)的消息。2. 消息解析與重發//將錯誤隊列中的消息體解析為ErrorRabbitMqMessage對象(包含原始交換機、路由鍵、消息內容等元數據)。//使用RabbitTemplate將消息內容重新發送到原始的交換機(originExchange)和路由鍵(originRoutingKey)。
3. 異常處理與可靠性//解析失敗:若消息無法解析為ErrorRabbitMqMessage,直接拒絕消息并丟棄(不重新入隊),避免死循環。//發送失敗:若重發過程中拋出異常(如網絡問題),拒絕消息并允許重新入隊,等待下次重試。//資源安全:通過@PreDestroy確保程序關閉時正確釋放RabbitMQ的Channel和Connection,防止連接泄漏。
RabbitMQ提供三個確認模式:
?manual:手動ack,需要在業務代碼結束后,調用api發送ack。
?auto:自動ack,由spring監測listener代碼是否出現異常,沒有異常則返回ack;拋出異常則返回nack
?none:關閉ack,MQ假定消費者獲取消息后會成功處理,因此消息投遞后立即被刪除
這三種確認模式各有各的特點:
- none模式下,消息投遞是不可靠的,可能丟失
- auto模式類似事務機制,出現異常時返回nack,消息回滾到mq;沒有異常,返回ack
- manual:自己根據業務情況,判斷什么時候該ack
本項目的配置:
spring:rabbitmq:....listener:simple:acknowledge-mode: auto #,出現異常時返回nack,消息回滾到mq;沒有異常,返回ackretry:enabled: true # 開啟消費者失敗重試initial-interval: 1000 # 初識的失敗等待時長為1秒multiplier: 10 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-intervalmax-attempts: 3 # 最大重試次數stateless: true # true無狀態;false有狀態。如果業務中包含事務,這里改為false
本項目使用自動ack模式,當消費消息失敗會重試,重試3次如果還失敗會將消息投遞到失敗消息隊列,由定時任務程序定時讀取隊列的消息,達到一定的次數還未成功則由人工處理。
保證消息冪等性
消費者在消費消息時難免出現重復消費的情況,比如:消費者沒有向MQ返回ack導致重復消費,所以消費者需要保證消費信息冪等性
冪等性是指不論執行多少次其結果是一致的
比如:收到消息需要向數據新增一條記錄,如果重復消費則會出現重復添加記錄的問題。
根據場景分析解決方案:
- 查詢操作:本身具有冪等性
- 添加操作:如果主鍵是自增則可能重復添加記錄。
解決:保證冪等性可以設置數據庫的唯一約束,比如添加學生信息,將學號字段設置為唯一索引,即使重復添加相同學生,同一個學號只會添加一條記錄
- 更新操作:如果更新一個固定,比如update users set status = 1 where id = ?,本身就具有冪等性;
但是如果只允許更新成功一次?
解決:可以使用token機制,發送消息前生成一個token寫入redis,收到消息后解析出token,從redis查詢token,如果成功則說明沒有消費,此時更新成功,將token從redis刪除,當重復消費相同的消息時,由于token已經從redis刪除不會再執行更新操作
- 刪除操作:與更新操作類似,如果是刪除某個具體的記錄,比如:delete from users where id = ?,本身就具有冪等性,如果只允許刪除成功一次就可以采用更新操作相同的方法(操作緩存token機制)
根據以上分析:
為了保證消息冪等性,需要:
- 使用數據庫的唯一約束去控制
- 使用token機制:
-
- 消息具有唯一ID
- 發送消息時將消息ID寫入Redis
- 消費時根據消息ID查詢Redis判斷是否已經消費,如果已經消費則不再消費
能否百分百保證MQ消息可靠性?
當然不能!
保證消息可靠性分兩個方面:保證生產消息可靠性和保證消費消息可靠性
保證生產消息可靠性:
生產消息可靠性是通過MQ是否發送ack回執來進行判斷的。如果發nack表示發送消息失敗,此時會進行重發或記錄到失敗消息表,通過定時任務進行補償發送。
如果Java程序并沒有收到回執(如jvm進程異常結束,或斷電等因素),此時將無法保證生產消息的可靠性。
保證消費信息消息可靠性:
保證消費信息消息可靠性方案首先保證發送消息設置為持久化,其次通過MQ的消費確認機制保證消費者消費消息成功后再將消息刪除。
雖然設置了消息持久化,消息進入MQ首先是在緩存存儲,MQ會根據一定的規則進行刷盤,(比如每隔幾毫秒進行刷盤,如果在消息還沒有保存到磁盤時MQ進程終止,此時將會丟失消息)雖然可以使用鏡像隊列(用于在RabbitMQ集群中復制隊列的消息,這樣做的目的是提高隊列的可用性和容錯性,以防止在單個節點故障時導致消息的丟失)但是也不能百分百保證消息不丟失。
我們雖然加了很多的保證可靠性的機制,但是這只能去提高消息的可靠性,最終還是不能做到百分百的可靠,因此使用MQ的場景就必須要考慮消息可靠性問題的存在,做好補償處理任務
如何保證Canal+MQ同步消息的順序性
首先明確Canal解析binlog日志信息按順序發到MQ的隊列中,現在是要保證消費端如何按順序消費隊列中的消息。
生產中同一個服務會啟動多個jvm進程,每個進程作為canal-mq-jzo2o-foundations的消費者,如下圖:
現在對服務名稱先修改為aa再修改為bb,在MQ中的有兩個消息:
修改服務名稱為aa
修改服務名稱為bb
預期:最終將服務名稱修改為bb
此時兩條消息會被分發給兩個jvm進程,假設“修改服務名稱為aa”的消息發給jvm進程1,“修改服務名稱為bb”的消息發給jvm進程2,兩個進程分別去消費,此時無法控制兩個消息的先后順序,可能導致服務名稱最終并非修改為bb。
解決方法:
多個jvm進程監聽同一個隊列保證只有消費者活躍,即只有一個消費者接收消息。
消費隊列中的數據使用單線程。
隊列需要增加x-single-active-consumer參數,表示否啟用單一活動消費者模式。
配置完成查保證隊列上存在SAC標識,如下圖:
當有多個jvm進程都去監聽該隊列時,只有一個為活躍狀態
如果使用x-single-active-consumer參數需要修改為如下代碼:
在Queue中添加:arguments={@Argument(name="x-single-active-consumer", value = "true", type = "java.lang.Boolean") }
如下所示:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "canal-mq-jzo2o-foundations",arguments={@Argument(name="x-single-active-consumer", value = "true", type = "java.lang.Boolean") }),exchange = @Exchange(name="exchange.canal-jzo2o",type = ExchangeTypes.TOPIC),key="canal-mq-jzo2o-foundations"),concurrency="1")
public void onMessage(Message message) throws Exception{parseMsg(message);
}
concurrency=”1“表示 指定消費線程為1。