1 rabbitmq設計
? ? ? ? 生產者并不是直接將消息投遞到queue,而是發送給exchange,由exchange根據type的規則來選定投遞的queue,這樣消息設計在生產者和消費者就實現解耦。
? ? ? ? rabbitmq會給沒有type預定義一些exchage,而實際我們卻應該使用自己定義的。
1.1 用戶注冊設計
? ? ? ? 用戶在官網注冊,因為官網與其他各子系統是分庫的,因此涉及到用戶注冊后,用戶的賬號信息也需要同步各子產品,于是就有了下面的設計。2018的時候SOA設計我還用通過otter進行同步,但是現在覺得還不如使用rabbitmq,因為消息隊列有很多作用,而且有些情況是,各個子系統承建時間不一樣,各自的創建用戶后,也會觸發其他的操作,這些otter的小表復制策略就不是那么好了。
1.1.1 生產者? ? ? ??
????????歲月云官網,可以看到這個里面只需要一個exchange名稱即可,將對象轉成字符串作為消息發送過去即可。
1.1.2 消費者
? ? ? ? 消費者中定義的監聽是針對queue,ignoreDeclarationExceptions是冪等設計,可以確保即使某個實例的聲明操作失敗(例如,因為另一個實例已經成功聲明了相同的資源),整個系統仍然可以正常工作。
? ? ? ? fanout是一種廣播,綁定到此eayc_user_add_change的queue都可以收到此消息。因為從官網下發的消息,到各子系統都應該收到,并各自創建。
? ? ? ? 下面是子系統acc的配置
? ? ? ? 具體消費的代碼如下所示,
? ? ? ? 下面是子系統ps的配置,與acc使用同一個exchange,但queue是不同的。
1.2 死信隊列和延時隊列
x-message-ttl定義了消息的時間生存期,有了這特性,就可以拓展一些功能,比如高并發的流量控制。
? ? ? ? 下面通過x-message-ttl設置了一個延遲隊列,通過DECLARE_DEAD_ROUTING_KEY與死信交換機declareDeadExchange進行匹配路由。
@Configuration
public class RabbitMQDelayConfig {@Value("${spring.rabbitmq.declare.exchange}")private String DECLARE_EXCHANGE;@Value("${spring.rabbitmq.declare.queue}")private String DECLARE_QUEUE;@Value("${spring.rabbitmq.declare.routing}")private String DECLARE_ROUTING_KEY;@Value("${spring.rabbitmq.declare.deadExchange}")private String DECLARE_EXCHANGE_DEAD;@Value("${spring.rabbitmq.declare.deadQueue}")private String DECLARE_QUEUE_DEAD;@Value("${spring.rabbitmq.declare.deadRouting}")private String DECLARE_DEAD_ROUTING_KEY;@Value("${spring.rabbitmq.declare.ttl}")private int DECLARE_TTL;/*** 申明自動申報業務交換機:*/@Beanpublic DirectExchange declareExchange() {return new DirectExchange(DECLARE_EXCHANGE);}/*** 申明自動申報業務死信交換機:*/@Beanpublic DirectExchange declareDeadExchange() {return new DirectExchange(DECLARE_EXCHANGE_DEAD);}/*** 申明自動申報業務隊列* 并綁定死信隊列*/@Beanpublic Queue declareQueue() {Map<String, Object> arguments = new HashMap<>(3);// 設置死信交換機arguments.put("x-dead-letter-exchange", DECLARE_EXCHANGE_DEAD);// 設置死信路由鍵arguments.put("x-dead-letter-routing-key", DECLARE_DEAD_ROUTING_KEY);// 設置過期時間arguments.put("x-message-ttl", DECLARE_TTL);return new Queue(DECLARE_QUEUE, true, false, false, arguments);}/*** 申明自動申報業務死信隊列*/@Beanpublic Queue declareDeadQueue() {return new Queue(DECLARE_QUEUE_DEAD);}/*** 綁定交換機和隊列*/@Beanpublic Binding declareQueueBinding(@Qualifier("declareQueue") Queue queue, @Qualifier("declareExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DECLARE_ROUTING_KEY);}/*** 綁定死信交換機和死信隊列*/@Beanpublic Binding declareDeadQueueBinding(@Qualifier("declareDeadQueue") Queue queue, @Qualifier("declareDeadExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DECLARE_DEAD_ROUTING_KEY);}}
? ? ? ? 生產者只需要往業務的exchange投遞消息即可
// 發送一條消息到rabbitmq延時隊列中,處理申報流程超時的情況message = new HashMap<>();message.put("dataId", taxDeclareDto.getDataId());message.put("batchId", req.getBatchId());rabbitTemplate.convertAndSend(DECLARE_EXCHANGE, DECLARE_ROUTING_KEY, gson.toJson(message));
? ? ? ? ?異常情況是監聽死信隊列,處理對應的邏輯。
/*** 監聽消息隊列,處理申報流程超時的申請記錄*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "${spring.rabbitmq.declare.deadQueue}"),exchange = @Exchange(name = "${spring.rabbitmq.declare.deadExchange}")))@RabbitHandlerpublic void declareTimeout(Message message){logger.info("收到rabbitMq申報超時消息:{}", message);Map<String, String> map = gson.fromJson((String) message.getPayload(), Map.class);if(CheckEmptyUtil.isNotEmpty(map.get("batchId"))) {// 如果是批量申報超時,中斷批次中所有的申報中的請求interruptDeclaresInBatch(map.get("batchId"));} else {String dataId = map.get("dataId");updateTaxDeclareStatus(new TaxDeclareYearStatusUpdateReq(dataId, null,false, StatementConstants.DeclareMessage.TIMEOUT));}}
1.3 重復消費
? ? ? ? 如果根據高內聚低耦合的設計原則,消費者側應該作重復消費設計,這個問題并不只是rabbitmq的問題,因為只要出現數據重復推送的問題,就會有重復消費的問題。比如有第三方系統定時同步數據到自己的系統,這個同步數據是由第三方承建的,你無法進行約束,必須在自己的系統進行冪等設計。
? ? ? ? springboot默認使用tomcat作為servlet容器,servlet容器使用線程池管理http請求,而controller和service都是單例,是線程不安全的,因此在接收到重復數據的請求時,如果其程序再新啟動了異步線程,就會出現重復的情況,如下所示:
? ? ? ? 主線程接收消息,做一些轉換,然后執行交給異步線程處理。
@PostMapping("/xx/batchSync")public ResponseResult xxBatchSync(@RequestBody CommonRequest commonRequest) {log.info("銷項發票同步請求:{}",commonRequest.getInfo());XxBatchSyncReq xxBatchSyncReq = JsonUtil.toPojo(commonRequest.getInfo(),XxBatchSyncReq.class);String zyCompanyId = xxBatchSyncReq.getZyCompanyId();if (!CheckEmptyUtil.isEmpty(xxBatchSyncReq.getInvoices())){// 賬套信息Integer asId = accAccountSetService.selectByZyCompanyId(xxBatchSyncReq.getZyCompanyId());if (asId==null){throw new RuntimeException(String.format("賬套信息不存在,企業id:%s",xxBatchSyncReq.getZyCompanyId() ));}// 異步寫入發票數據accInvoice4ZYService.xxBatchSync(asId,xxBatchSyncReq);}return new ResponseResult(true,"銷項發票接收成功");}
? ? ? ? 異步線程的邏輯如下,accInvoiceService.isExist看似基礎邏輯沒有問題,但是在多線程環境下會有問題,因為線程A添加進入到addInvoice方法添加發票的時候還沒有提交,這個時候線程B執行accInvoiceService.isExist的時候判斷已經是不存在的,于是他依舊會向下執行。導致出現數據重復寫入。由此判斷這個重復消費問題并不是消息隊列獨有的,還是業務處理的問題。
@Override@Async("loadDataExecutor")public void xxBatchSync(Integer asId, XxBatchSyncReq xxBatchSyncReq) {// 發票模板AccInvoiceTemplate accInvoiceTemplate = accInvoiceTemplateService.selectOne(asId, InvoiceConstants.InvoiceTemplateType.SALES);for (XxInvoiceDto xxInvoiceDto:xxBatchSyncReq.getInvoices()){xxInvoiceDto.setAsId(asId);if (!accInvoiceService.isExist(asId,xxInvoiceDto.getFpdm(),xxInvoiceDto.getFphm())){AccInvoiceDto accInvoiceDto = getAccInvoiceDto(xxInvoiceDto, xxBatchSyncReq.getZyCompanyId(),accInvoiceTemplate);addInvoice(accInvoiceDto);}}}
? ? ? ? 再看事務邏輯愿望是美好的,接收到批量發票,然后一張張提交。這里就很有問題,
@Override@Transactional
// @RedisReentrantLock(key = "'acc_invoice_lock_'+#accInvoiceDto.asId")public void addInvoice(AccInvoiceDto accInvoiceDto) {// 保存發票頭accInvoiceService.save(accInvoiceDto);Integer invoiceId = accInvoiceDto.getId();// 保存發票明細信息List<AccInvoiceDetail> accInvoiceDetails = accInvoiceDto.getAccInvoiceDetails();accInvoiceDetails.stream().forEach(accInvoiceDetail -> {accInvoiceDetail.setInvoiceId(invoiceId);});accInvoiceDetailService.saveBatch(accInvoiceDetails);}
? ? ? ? ?代碼作如下調整,下面的代碼依然會有問題,
@Override@Async("loadDataExecutor")public void xxBatchSync(Integer asId, XxBatchSyncReq xxBatchSyncReq) {// 發票模板AccInvoiceTemplate accInvoiceTemplate = accInvoiceTemplateService.selectOne(asId, InvoiceConstants.InvoiceTemplateType.SALES);for (XxInvoiceDto xxInvoiceDto:xxBatchSyncReq.getInvoices()){xxInvoiceDto.setAsId(asId);addInvoice(xxBatchSyncReq.getZyCompanyId(),xxInvoiceDto,accInvoiceTemplate);}}@Override@Transactional(rollbackFor = Exception.class)public void addInvoice(String zyCompanyId,XxInvoiceDto xxInvoiceDto,AccInvoiceTemplate accInvoiceTemplate){if (!accInvoiceService.isExist(xxInvoiceDto.getAsId(),xxInvoiceDto.getFpdm(),xxInvoiceDto.getFphm())){AccInvoiceDto accInvoiceDto = getAccInvoiceDto(xxInvoiceDto, zyCompanyId,accInvoiceTemplate);addInvoice(accInvoiceDto);}}
? ? ? ? 用MySQL來模擬一下,就可以看到問題。
? ? ? ? 另起一個事務,因為判斷還是不存在,依舊寫入進去,導致數據重復。那么為什么呢?Mysql的Repeatable Read事務隔離級別,不會出現臟讀、不會出現不可重復讀,而間隙鎖又解決了幻讀的問題,但這個業務問題卻需要自己認為去處理。
? ? ? ? ?解決方案最簡的辦法就是設置唯一鍵索引。另外一種辦法,可以參考redis——歲月云實戰,我們也可以采取加分布式鎖的方式來控制數據操作。
2 線上問題
2.1 內存設置問題
k8s部署rabbitmq集群,搭建環境后登錄web控制臺發現內存飄紅。進入到rabbitmq容器中,發現vm_memory_high_watermark.absolute = 100MB,這個就是從其他復制過來沒有經過大腦的原因。這個值應該是按照Pod中設置最大內存的75%進行設置
? ? ? ? 調整為3GB后,恢復正常。