rabbitmq——歲月云實戰筆記

1 rabbitmq設計

? ? ? ? 生產者并不是直接將消息投遞到queue,而是發送給exchange,由exchange根據type的規則來選定投遞的queue,這樣消息設計在生產者和消費者就實現解耦。

? ? ? ? rabbitmq會給沒有type預定義一些exchage,而實際我們卻應該使用自己定義的。

1.1 用戶注冊設計

? ? ? ? 用戶在官網注冊,因為官網與其他各子系統是分庫的,因此涉及到用戶注冊后,用戶的賬號信息也需要同步各子產品,于是就有了下面的設計。2018的時候SOA設計我還用通過otter進行同步,但是現在覺得還不如使用rabbitmq,因為消息隊列有很多作用,而且有些情況是,各個子系統承建時間不一樣,各自的創建用戶后,也會觸發其他的操作,這些otter的小表復制策略就不是那么好了。

1.1.1 生產者? ? ? ??

????????歲月云官網,可以看到這個里面只需要一個exchange名稱即可,將對象轉成字符串作為消息發送過去即可。

1.1.2 消費者

? ? ? ? 消費者中定義的監聽是針對queue,ignoreDeclarationExceptions是冪等設計,可以確保即使某個實例的聲明操作失敗(例如,因為另一個實例已經成功聲明了相同的資源),整個系統仍然可以正常工作。

? ? ? ? fanout是一種廣播,綁定到此eayc_user_add_change的queue都可以收到此消息。因為從官網下發的消息,到各子系統都應該收到,并各自創建。

? ? ? ? 下面是子系統acc的配置

? ? ? ? 具體消費的代碼如下所示,

? ? ? ? 下面是子系統ps的配置,與acc使用同一個exchange,但queue是不同的。

1.2 死信隊列和延時隊列

x-message-ttl定義了消息的時間生存期,有了這特性,就可以拓展一些功能,比如高并發的流量控制。

? ? ? ? 下面通過x-message-ttl設置了一個延遲隊列,通過DECLARE_DEAD_ROUTING_KEY與死信交換機declareDeadExchange進行匹配路由。

@Configuration
public class RabbitMQDelayConfig {@Value("${spring.rabbitmq.declare.exchange}")private String DECLARE_EXCHANGE;@Value("${spring.rabbitmq.declare.queue}")private String DECLARE_QUEUE;@Value("${spring.rabbitmq.declare.routing}")private String DECLARE_ROUTING_KEY;@Value("${spring.rabbitmq.declare.deadExchange}")private String DECLARE_EXCHANGE_DEAD;@Value("${spring.rabbitmq.declare.deadQueue}")private String DECLARE_QUEUE_DEAD;@Value("${spring.rabbitmq.declare.deadRouting}")private String DECLARE_DEAD_ROUTING_KEY;@Value("${spring.rabbitmq.declare.ttl}")private int DECLARE_TTL;/*** 申明自動申報業務交換機:*/@Beanpublic DirectExchange declareExchange() {return new DirectExchange(DECLARE_EXCHANGE);}/*** 申明自動申報業務死信交換機:*/@Beanpublic DirectExchange declareDeadExchange() {return new DirectExchange(DECLARE_EXCHANGE_DEAD);}/*** 申明自動申報業務隊列* 并綁定死信隊列*/@Beanpublic Queue declareQueue() {Map<String, Object> arguments = new HashMap<>(3);// 設置死信交換機arguments.put("x-dead-letter-exchange", DECLARE_EXCHANGE_DEAD);// 設置死信路由鍵arguments.put("x-dead-letter-routing-key", DECLARE_DEAD_ROUTING_KEY);// 設置過期時間arguments.put("x-message-ttl", DECLARE_TTL);return new Queue(DECLARE_QUEUE, true, false, false, arguments);}/*** 申明自動申報業務死信隊列*/@Beanpublic Queue declareDeadQueue() {return new Queue(DECLARE_QUEUE_DEAD);}/*** 綁定交換機和隊列*/@Beanpublic Binding declareQueueBinding(@Qualifier("declareQueue") Queue queue, @Qualifier("declareExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DECLARE_ROUTING_KEY);}/*** 綁定死信交換機和死信隊列*/@Beanpublic Binding declareDeadQueueBinding(@Qualifier("declareDeadQueue") Queue queue, @Qualifier("declareDeadExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DECLARE_DEAD_ROUTING_KEY);}}

? ? ? ? 生產者只需要往業務的exchange投遞消息即可

// 發送一條消息到rabbitmq延時隊列中,處理申報流程超時的情況message = new HashMap<>();message.put("dataId", taxDeclareDto.getDataId());message.put("batchId", req.getBatchId());rabbitTemplate.convertAndSend(DECLARE_EXCHANGE, DECLARE_ROUTING_KEY, gson.toJson(message));

? ? ? ? ?異常情況是監聽死信隊列,處理對應的邏輯。

   /*** 監聽消息隊列,處理申報流程超時的申請記錄*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "${spring.rabbitmq.declare.deadQueue}"),exchange = @Exchange(name = "${spring.rabbitmq.declare.deadExchange}")))@RabbitHandlerpublic void declareTimeout(Message message){logger.info("收到rabbitMq申報超時消息:{}", message);Map<String, String> map = gson.fromJson((String) message.getPayload(), Map.class);if(CheckEmptyUtil.isNotEmpty(map.get("batchId"))) {// 如果是批量申報超時,中斷批次中所有的申報中的請求interruptDeclaresInBatch(map.get("batchId"));} else {String dataId = map.get("dataId");updateTaxDeclareStatus(new TaxDeclareYearStatusUpdateReq(dataId, null,false, StatementConstants.DeclareMessage.TIMEOUT));}}

1.3 重復消費

? ? ? ? 如果根據高內聚低耦合的設計原則,消費者側應該作重復消費設計,這個問題并不只是rabbitmq的問題,因為只要出現數據重復推送的問題,就會有重復消費的問題。比如有第三方系統定時同步數據到自己的系統,這個同步數據是由第三方承建的,你無法進行約束,必須在自己的系統進行冪等設計。

? ? ? ? springboot默認使用tomcat作為servlet容器,servlet容器使用線程池管理http請求,而controller和service都是單例,是線程不安全的,因此在接收到重復數據的請求時,如果其程序再新啟動了異步線程,就會出現重復的情況,如下所示:

? ? ? ? 主線程接收消息,做一些轉換,然后執行交給異步線程處理。

 @PostMapping("/xx/batchSync")public ResponseResult xxBatchSync(@RequestBody CommonRequest commonRequest) {log.info("銷項發票同步請求:{}",commonRequest.getInfo());XxBatchSyncReq xxBatchSyncReq = JsonUtil.toPojo(commonRequest.getInfo(),XxBatchSyncReq.class);String zyCompanyId = xxBatchSyncReq.getZyCompanyId();if (!CheckEmptyUtil.isEmpty(xxBatchSyncReq.getInvoices())){// 賬套信息Integer asId = accAccountSetService.selectByZyCompanyId(xxBatchSyncReq.getZyCompanyId());if (asId==null){throw new RuntimeException(String.format("賬套信息不存在,企業id:%s",xxBatchSyncReq.getZyCompanyId() ));}// 異步寫入發票數據accInvoice4ZYService.xxBatchSync(asId,xxBatchSyncReq);}return new ResponseResult(true,"銷項發票接收成功");}

? ? ? ? 異步線程的邏輯如下,accInvoiceService.isExist看似基礎邏輯沒有問題,但是在多線程環境下會有問題,因為線程A添加進入到addInvoice方法添加發票的時候還沒有提交,這個時候線程B執行accInvoiceService.isExist的時候判斷已經是不存在的,于是他依舊會向下執行。導致出現數據重復寫入。由此判斷這個重復消費問題并不是消息隊列獨有的,還是業務處理的問題

    @Override@Async("loadDataExecutor")public void xxBatchSync(Integer asId, XxBatchSyncReq xxBatchSyncReq) {// 發票模板AccInvoiceTemplate accInvoiceTemplate = accInvoiceTemplateService.selectOne(asId, InvoiceConstants.InvoiceTemplateType.SALES);for (XxInvoiceDto xxInvoiceDto:xxBatchSyncReq.getInvoices()){xxInvoiceDto.setAsId(asId);if (!accInvoiceService.isExist(asId,xxInvoiceDto.getFpdm(),xxInvoiceDto.getFphm())){AccInvoiceDto accInvoiceDto = getAccInvoiceDto(xxInvoiceDto, xxBatchSyncReq.getZyCompanyId(),accInvoiceTemplate);addInvoice(accInvoiceDto);}}}

? ? ? ? 再看事務邏輯愿望是美好的,接收到批量發票,然后一張張提交。這里就很有問題,

    @Override@Transactional
//    @RedisReentrantLock(key = "'acc_invoice_lock_'+#accInvoiceDto.asId")public void addInvoice(AccInvoiceDto accInvoiceDto) {// 保存發票頭accInvoiceService.save(accInvoiceDto);Integer invoiceId = accInvoiceDto.getId();// 保存發票明細信息List<AccInvoiceDetail> accInvoiceDetails = accInvoiceDto.getAccInvoiceDetails();accInvoiceDetails.stream().forEach(accInvoiceDetail -> {accInvoiceDetail.setInvoiceId(invoiceId);});accInvoiceDetailService.saveBatch(accInvoiceDetails);}

? ? ? ? ?代碼作如下調整,下面的代碼依然會有問題,

    @Override@Async("loadDataExecutor")public void xxBatchSync(Integer asId, XxBatchSyncReq xxBatchSyncReq) {// 發票模板AccInvoiceTemplate accInvoiceTemplate = accInvoiceTemplateService.selectOne(asId, InvoiceConstants.InvoiceTemplateType.SALES);for (XxInvoiceDto xxInvoiceDto:xxBatchSyncReq.getInvoices()){xxInvoiceDto.setAsId(asId);addInvoice(xxBatchSyncReq.getZyCompanyId(),xxInvoiceDto,accInvoiceTemplate);}}@Override@Transactional(rollbackFor = Exception.class)public void addInvoice(String zyCompanyId,XxInvoiceDto xxInvoiceDto,AccInvoiceTemplate accInvoiceTemplate){if (!accInvoiceService.isExist(xxInvoiceDto.getAsId(),xxInvoiceDto.getFpdm(),xxInvoiceDto.getFphm())){AccInvoiceDto accInvoiceDto = getAccInvoiceDto(xxInvoiceDto, zyCompanyId,accInvoiceTemplate);addInvoice(accInvoiceDto);}}

? ? ? ? 用MySQL來模擬一下,就可以看到問題。

? ? ? ? 另起一個事務,因為判斷還是不存在,依舊寫入進去,導致數據重復。那么為什么呢?Mysql的Repeatable Read事務隔離級別,不會出現臟讀、不會出現不可重復讀,而間隙鎖又解決了幻讀的問題,但這個業務問題卻需要自己認為去處理。

? ? ? ? ?解決方案最簡的辦法就是設置唯一鍵索引。另外一種辦法,可以參考redis——歲月云實戰,我們也可以采取加分布式鎖的方式來控制數據操作。

2 線上問題

2.1 內存設置問題

k8s部署rabbitmq集群,搭建環境后登錄web控制臺發現內存飄紅。進入到rabbitmq容器中,發現vm_memory_high_watermark.absolute = 100MB,這個就是從其他復制過來沒有經過大腦的原因。這個值應該是按照Pod中設置最大內存的75%進行設置

? ? ? ? 調整為3GB后,恢復正常。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/65305.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/65305.shtml
英文地址,請注明出處:http://en.pswp.cn/web/65305.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

2.系統學習-邏輯回歸

邏輯回歸 前言最大似然估計概率似然函數(likelihood function)最大似然估計 邏輯回歸邏輯回歸的似然函數與梯度 分類問題常用評價指標項目案例拓展內容作業 前言 邏輯回歸與線性回歸均屬于廣義線性模型&#xff0c;區別在于線性回歸用于解決回歸問題&#xff0c;例如身高、銷量…

記錄一次電腦被入侵用來挖礦的過程(Trojan、Miner、Hack、turminoob)

文章目錄 0、總結1、背景2、端倪3、有個微軟的系統更新&#xff0c;就想著更新看看&#xff08;能否沖掉問題&#xff09;4、更新沒成功&#xff0c;自動重啟電腦5、風險文件&#xff08;好家伙命名還挺規范&#xff0c;一看名字就知道出問題了&#xff09;6、開機有一些注冊表…

行為樹詳解(6)——黑板模式

【動作節點數據共享】 行為樹中需要的參數可以來自游戲中的各個模塊&#xff0c;如果僅需從多個模塊獲取少量參數&#xff0c;那么可以直接在代碼中調用其他模塊的單例繼而層層調用獲取數據。 如果獲取的參數量很大&#xff0c;從架構上看&#xff0c;我們需要通過加一個中間…

阿里云 人工智能與機器學習

阿里云的 人工智能&#xff08;AI&#xff09;與機器學習&#xff08;ML&#xff09; 服務為企業提供了全面的AI解決方案&#xff0c;幫助用戶在多個行業實現數據智能化&#xff0c;提升決策效率&#xff0c;推動業務創新。阿里云通過先進的技術和豐富的工具&#xff0c;支持用…

如果Adobe 退出中國后怎么辦

最近聽說Adobe要退出中國了?那咱們的設計師們可得好好想想怎么搞到正版軟件了。別急&#xff0c;今天教大家一個超酷的福利——Edu郵箱&#xff01; Edu郵箱是什么&#xff1f;有什么好處&#xff1f; Edu郵箱就是學校給學生和老師們發的郵箱&#xff0c;一般結尾是.edu。有了…

Structured-Streaming集成Kafka

一、上下文 《Structured-Streaming初識》博客中已經初步認識了Structured-Streaming&#xff0c;Kafka作為目前最流行的一個分布式的實時流消息系統&#xff0c;是眾多實時流處理框架的最優數據源之一。下面我們就跟著官方例子來看看Structured-Streaming是如何集成Kafka的&a…

Spring Boot 項目中集成 Kafka-03

在 Spring Boot 項目中集成 Kafka 有多種方式&#xff0c;適應不同的應用場景和需求。以下將詳細介紹幾種常用的集成方法&#xff0c;包括&#xff1a; 使用 Spring Kafka (KafkaTemplate 和 KafkaListener)使用 Spring Cloud Stream 與 Kafka Binder使用 Spring for Apache K…

生物醫學信號處理--緒論

前言 參考書籍&#xff1a;劉海龍&#xff0c;生物醫學信號處理&#xff0c;化學工業出版社 生物醫學信號分類 1、由生理過程自發或者誘發產生的電生理信號和非電生理信號 ? 電生理信號&#xff1a;ECG/心電、EEG/腦電、EMG/肌電、 EGG/胃電、 EOG/眼電 ? 非電生理信號&am…

unity 播放 序列幀圖片 動畫

提示&#xff1a;文章寫完后&#xff0c;目錄可以自動生成&#xff0c;如何生成可參考右邊的幫助文檔 文章目錄 前言一、方法一&#xff1a;代碼控制播放序列幀1、設置圖片屬性2、創建Image組件3、簡單的代碼控制4、掛載代碼并賦值 二、方法二&#xff1a;直接使用1.Image上添加…

QT c++ 自定義按鈕類 加載圖片 美化按鈕

如果你有需要利用圖片美化按鈕的情況&#xff0c;本文能幫助你。 鼠標左鍵按下按鈕和松開&#xff0c;按鈕顯示不同的圖片。 1.按鈕類 //因為此類比較簡單&#xff0c;1個頭文件搞定&#xff0c;沒有cpp文件 #ifndef CUSTOMBUTTON_H #define CUSTOMBUTTON_H #include <Q…

web漏洞之文件包含漏洞

一、文件包含漏洞 1、把DVWA頁面改為low級別&#xff0c;然后點擊File Inclusion頁面 文件包含漏洞有四種include()/require()/include_once()/require_once() 常見的文件包含漏洞代碼如下 <?php$file$_GET[filename]; filename隨意定義include($file); ?> -----…

小程序與物聯網(IoT)融合:開啟智能生活新篇章

一、引言 隨著移動互聯網技術的飛速發展&#xff0c;小程序作為一種輕量級的應用形式&#xff0c;憑借其無需下載安裝、即用即走的特點&#xff0c;迅速滲透到人們生活的各個領域。與此同時&#xff0c;物聯網&#xff08;IoT&#xff09;技術也在不斷進步&#xff0c;將各種物…

Ubuntu無法創建python venv環境

排查步驟如下 1. python3 -m venv venv he virtual environment was not created successfully because ensurepip is not available. On Debian/Ubuntu systems, you need to install the python3-venv package using the following command.apt install python3.8-venvYou…

如何很快將文件轉換成另外一種編碼格式?編碼?按指定編碼格式編譯?如何檢測文件編碼格式?Java .class文件編碼和JVM運行期內存編碼?

如何很快將文件轉換成另外一種編碼格式? 利用VS Code右下角的"選擇編碼"功能&#xff0c;選擇"通過編碼保存"可以很方便將文件轉換成另外一種編碼格式。尤其&#xff0c;在測試w/ BOM或w/o BOM, 或者ANSI編碼和UTF編碼轉換&#xff0c;特別方便。VS文件另…

PCL點云庫入門——PCL庫點云特征之PFH點特征直方圖(Point Feature Histograms -PHF)

1、算法原理 PFH點&#xff08;Point Feature Histogram&#xff09;特征直方圖的原理涉及利用參數化查詢點與鄰域點之間的空間差異&#xff0c;并構建一個多維直方圖以捕捉點的k鄰域幾何屬性。這個高維超空間為特征表示提供了一個可度量的信息空間&#xff0c;對于點云對應曲面…

5. CSS引入方式

5.1 CSS的三種樣式 按照 CSS 樣式書寫的位置(或者引入的方式)&#xff0c;CSS樣式表可以分為三大類&#xff1a; 1.行內樣式表&#xff08;行內式&#xff09; 2.內部樣式表&#xff08;嵌入式&#xff09; 3. 外部樣式表&#xff08;鏈接式&#xff09; 5.2 內部樣式表 …

為什么ip屬地一會河南一會江蘇

在使用互聯網的過程中&#xff0c;許多用戶可能會遇到這樣一個問題&#xff1a;自己的IP屬地一會兒顯示為河南&#xff0c;一會兒又變成了江蘇。這種現象可能會讓人感到困惑&#xff0c;甚至產生疑慮&#xff0c;擔心自己的網絡活動是否受到了某種影響。為了解答這一疑問&#…

unity3d-搞個場景漫游如何實現Alpha

要處理兩個問題&#xff1a; 如何設置地面人不掉下去 方法一、 游戲物體加剛體&#xff0c;將游戲物體和地面加collider。如果是地形&#xff0c;可以使用 Terrain Collider&#xff1b;如果是簡單的平面&#xff0c;可以添加 Box Collider 或者 Mesh Collider&#xff08;如果…

git merge rebase

merge操作 Git自己分支合并dev分支 rebase 操作 git rebase