rocketmq實現限流

目錄

問題背景

技術方向

方案確認

消息隊列(√)

分布式鎖(×)

方案實現

監控方向

業務方向


問題背景

公司郵件服務token有 分鐘內超200封的熔斷機制,當前token被熔斷后,系統發郵件操作會被忽略,所以郵件服務也沒有重試操作

人工發現token被熔斷后,需要聯系郵件群中值班人,將token恢復

分貨業務依賴郵件來查看分貨通知以及結果,并且分貨層層依賴,如果不能及時收到郵件會影響業務的分貨時效等,所以通過三個方面去解決這個問題

技術方向

系統內發郵件收口做限流

方案確認

方向:限流發郵件方法1分鐘內最大200次

實現:改造系統發郵件底層方法,1分鐘內最多發200個

消息隊列(√)

面臨問題:多出來的怎么處理?消息隊列(需要持久化)

實現:新建一個topic,調用發郵件方法的請求全部扔到MQ中,自己消費,通過設置消費者的拉取間隔以及最大拉取數量限制,分鐘內消費消息條數不超過200條

面臨問題:多分區多消費者?

實現:默認拉取數量為32,目前MQ服務端設置,限制最大拉取數量為32

(可行)設置1個分區,一個消費者組,目前有2個實例(此時其中一個實例不會消費),設置拉取間隔為10s

(不可行,有自動加實例機制)設置2個分區,一個消費者組,目前有2個實例,設置拉取間隔為10s,最大拉取條數為16;系統在流量激增的情況下會增加實例來分攤流量

最終實現方式

topic設置1個分區,一個消費者組,使用默認負載均衡策略:平均分配

//平均分配負載均衡核心邏輯
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);for(int i = 0; i < range; ++i) {result.add(mqAll.get((startIndex + i) % mqAll.size()));
}return result;

解析兩個實例負載均衡過程

//第一個實例起來,觸發負載均衡
//index = 0
int index = cidAll.indexOf(currentCID);
//mod = 1%2 = 1
int mod = mqAll.size() % cidAll.size();
//averageSize = 1
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
//startIndex = 0 * 1 = 0
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
//range = min(1, 1 - 0) = 1
int range = Math.min(averageSize, mqAll.size() - startIndex);for(int i = 0; i < range; ++i) {//(0+0)%1 = 0,所以將第一個分區分給當前實例result.add(mqAll.get((startIndex + i) % mqAll.size()));
}//第二個實例起來,觸發負載均衡
//index = 1
int index = cidAll.indexOf(currentCID);
//mod = 1%2 = 1
int mod = mqAll.size() % cidAll.size();
//averageSize = 1
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
//startIndex = 1 * 1 + 1 = 2
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
//range = min(1, 1 - 2) = -1
int range = Math.min(averageSize, mqAll.size() - startIndex);//不會進入循環分配分區
for(int i = 0; i < range; ++i) {result.add(mqAll.get((startIndex + i) % mqAll.size()));
}

所以只會有一個實例去消費當前這個分區,在集群消費模式下,一個分區只會被消費組內的一個消費者消費,rmq默認拉取數量為32,設置拉取間隔為10s,所以每分鐘內消費:32*6 = 192

分布式鎖(×)

當前場景的點,在于需要將超出1分鐘兩百條的那些郵件持久化存儲,等到下一個一分鐘去發送,而分布式鎖只能實現控制接口的流量,沒法保證超出流量那部分的存儲,所以沒法解決當前問題

方案實現

最終采用消息隊列,RocketMQ解決該問題

實現代碼,使用Java SDK,設置拉取間隔為10s即可

public void run(String... args) throws Exception {Properties properties = new Properties();properties.setProperty(ConfigKey.CONSUMER_GROUP, emailNotifyMqProperties.getConsumerGroup());properties.setProperty(ConfigKey.ACCESS_KEY, rocketMqProperties.getAccessKey());properties.setProperty(ConfigKey.SECRET_KEY, rocketMqProperties.getSecretKey());properties.setProperty(ConfigKey.NAME_SERVER_ADDR, rocketMqProperties.getServer());properties.setProperty(ConfigKey.ENABLE_MSG_TRACE, "true");//消費限流:解決發郵件分鐘內超過200封會被熔斷的問題properties.setProperty(ConfigKey.PULL_INTERVAL, "10000");NormalConsumer consumer = ClientFactory.createNormalConsumer(properties, this::consumeMessage);consumer.subscribe(emailNotifyMqProperties.getTopic(), null);consumer.start();
}

監控方向

1. 系統日志報警,配置郵件發送失敗報警

2. 關注token熔斷消息通知

業務方向

梳理當前系統中郵件通知的場景,分析報警內容,從以下方向減少郵件次數發送

1. 用戶是否需要關注(用戶長時間使用下來,部分通知發現自己并不關注的,比如節點報錯可重試成功的

2. 是否可以批量發送(多條通知集合到一條郵件發送:多用戶,多單據等)

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/41856.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/41856.shtml
英文地址,請注明出處:http://en.pswp.cn/web/41856.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

python中的原子操作簡介

深入理解Python中的原子操作 在現代編程中&#xff0c;多線程是提高程序執行效率的常用技術。然而&#xff0c;當多個線程并發執行時&#xff0c;如何確保數據的一致性和操作的正確性成為了一個關鍵問題。原子操作&#xff08;Atomic Operation&#xff09;便是解決這一問題的…

責任鏈模式(大話設計模式)C/C++版本

責任鏈模式 C #include <iostream> #include <memory>using namespace std; // 請求類 struct Request {std::string requestType; // 請求類型int number; // 該請求類型的數量std::string requestContent; // 請求內容 };// 抽象經理類 clas…

MySQL學習記錄 —— ?? CentOS7.9環境下的MySQL8.4 安裝和配置

文章目錄 1、安裝和配置2、MySQL 包位置3、主要程序介紹 本篇開始在之前mysql博客的基礎上繼續延伸&#xff0c;適合有一定基礎的mysql使用者閱讀 環境 &#xff1a;CentOS 7.9 root 用戶&#xff0c;MySQL 8.4 1、安裝和配置 看一下當前系統版本 cat /etc/redhat-release應當…

前端重點之:Vue+websocket通信詳細用法和websocket心跳機制的使用,websocket斷開實時監測,websocket實時通信

今年年初找工作,好多gou面試官總喜歡問關于websocket通信的使用方式,此次又用到了,在此做個總結:主要包含websocket的具體使用方法,和重點:(心跳機制的使用),就是主要是前端實時監測websocket是否有斷連和數據的處理 在前端開發中,WebSocket 是一種常見的技術,用于…

淺談序列化及文本格式

序列化及文本格式 需求背景 軟件項目在開發過程中&#xff0c;將大量初始化配置項在一定程度上保存在配置文件中。肯定有很多人有疑問&#xff0c;為什么不將這些信息放在軟件內存中。開機時與用戶交互進行確認&#xff1f;這肯定是一個好想法&#xff0c;但是如果配置太多或…

眾所周知沃爾瑪1P是怎么運營?

??沃爾瑪的1P模式&#xff0c;即第一方供應商模式&#xff0c;是其獨特的采購策略。在這種模式下&#xff0c;供應商先將商品賣給沃爾瑪&#xff0c;由沃爾瑪負責庫存管理和銷售。沃爾瑪通過強大的采購和物流能力控制庫存&#xff0c;確保商品品質&#xff0c;為客戶提供更加…

FPGA問題

fpga 問題 第一道坎&#xff0c;安裝軟件&#xff1b;沒有注冊&#xff0c;無法產生sop文件&#xff0c;無法下載 沒有相應的庫的quartus ii版本&#xff0c;需要另下載 第二道坎&#xff0c;模擬器的下載&#xff0c;安裝&#xff1b; 第三道&#xff0c;verilog 語法&#x…

deepspeed huggingface傳入參數 optimizer和lr_scheduler測試

Trainer中 首先&#xff1a; WarmupDecayLR --lr_scheduler_type linear WarmupLR --lr_scheduler_type constant_with_warmup 1 TrainArgument不傳lr_scheduler_type、optim&#xff0c;warmup_steps15 ds config文件中定義如下&#xff1a; 注意&#xff1a;如果不在Trai…

LangChain(四)工具調用的底層原理!給大模型按上雙手吧!(新手向)

背景 經過前面三篇的內容&#xff0c;我想大家對于大模型的構建、Langchain的優勢、Chain的構建有了相當程度的理解&#xff08;雖然只是最簡單的示例&#xff0c;但是足夠有代表性&#xff09;。 后續Chain的使用將會更加豐富多彩&#xff0c;您會了解Langchain開發的大模型…

14-31 劍和詩人5 - 使用 AirLLM 和分層推理在單個 4GB GPU 上運行 LLama 3 70B

利用分層推理實現大模型語言(LLM) 大型語言模型 (LLM) 領域最近取得了顯著進展&#xff0c;LLaMa 3 70B 等模型突破了之前認為可能實現的極限。然而&#xff0c;這些模型的龐大規模給其部署和實際使用帶來了巨大挑戰&#xff0c;尤其是在資源受限的設備上&#xff0c;例如內存…

怎么壓縮pdf文件的大小?減小PDF文件大小的四種方法

怎么壓縮pdf文件的大小&#xff1f;文件大小不僅影響傳輸速度&#xff0c;還可能涉及存儲空間的管理。當處理大型PDF文件時&#xff0c;可能會面臨電子郵件附件限制或云存儲容量不足的問題。此外&#xff0c;過大的文件在瀏覽和加載時也會導致延遲&#xff0c;影響閱讀體驗。這…

3款自己電腦就可以運行AI LLM的項目

AnythingLLM、LocalGPT和PrivateGPT都是與大語言模型&#xff08;LLM&#xff09;相關的項目&#xff0c;它們允許用戶在本地環境中與文檔進行交互&#xff0c;但它們在實現方式和特點上存在一些差異。AnythingLLM使用Pinecone和ChromaDB來處理矢量嵌入&#xff0c;并使用OpenA…

【C語言】return 關鍵字詳解

在C語言中&#xff0c;return是一個關鍵字&#xff0c;用于從函數中返回值或者結束函數的執行。它是函數的重要組成部分&#xff0c;負責將函數的計算結果返回給調用者&#xff0c;并可以提前終止函數的執行。 主要用途和原理&#xff1a; 返回值給調用者&#xff1a; 當函數執…

mysql數據庫創建用戶并授權某個庫的所有權限

這個就直接上語句吧&#xff01;只是注意要用管理員帳號執行&#xff0c;比如root去執行。 -- 創建新用戶&#xff08;替換new_user為您的用戶名&#xff0c;password為您的密碼&#xff09; CREATE USER new_user% IDENTIFIED BY password; -- 授予權限&#xff08;替換data…

社交媒體數據分析:賦能企業營銷策略的利器

在這個數字化時代&#xff0c;社交媒體不僅是品牌與消費者互動的舞臺&#xff0c;更是企業洞察市場趨勢、優化營銷策略的金礦。本文將探討如何利用社交媒體數據分析賦能企業營銷&#xff0c;通過實戰案例與技巧分享&#xff0c;揭示這把“利器”如何幫助企業精準定位目標受眾、…

【論文閱讀】-- Visual Traffic Jam Analysis Based on Trajectory Data

基于軌跡數據的可視化交通擁堵分析 摘要1 引言2 相關工作2.1 交通事件檢測2.2 交通可視化2.3 傳播圖可視化 3 概述3.1 設計要求3.2 輸入數據說明3.3 交通擁堵數據模型3.4 工作流程 4 預處理4.1 路網處理4.2 GPS數據清理4.3 地圖匹配4.4 道路速度計算4.5 交通擁堵檢測4.6 傳播圖…

架構面試-場景題-單點登錄(SSO)怎么實現的

文章目錄 概述基于Cookie基于Token(OAuth, JWT)集中式認證服務 (CAS, SAML)分布式Session:輕型目錄訪問協議&#xff08;LDAP&#xff09;OAuth 2.0/OIDCKerberos 概述 單點登錄&#xff08;Single Sign-On&#xff0c;簡稱SSO&#xff09;是一種身份驗證機制&#xff0c;允許…

掌握【Python異常處理】:打造健壯代碼的現代編程指南

目錄 ?編輯 1. 什么是異常&#xff1f; 知識點 示例 小李的理解 2. 常見的內置異常類型 知識點 示例 小李的理解 3. 異常機制的意義 知識點 示例 小李的理解 4. 如何處理異常 知識點 示例 小李的理解 5. 拋出異常 知識點 示例 小李的理解 6. Python內置…

Springboot整合Jsch-Sftp

背景 開發一個基于jsch的sftp工具類&#xff0c;方便在以后的項目中使用。寫代碼的過程記錄下來&#xff0c;作為備忘錄。。。 Maven依賴 springboot依賴 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-par…

codeforces 1633A

文章目錄 1. 題目鏈接2. 題目代碼正確代碼錯誤代碼 3. 題目總結 1. 題目鏈接 Div. 7 2. 題目代碼 正確代碼 #include<iostream> using namespace std; int main(){int testCase;cin >> testCase;while(testCase --){int ingeter;cin >> ingeter;if(!(inget…