目錄
問題背景
技術方向
方案確認
消息隊列(√)
分布式鎖(×)
方案實現
監控方向
業務方向
問題背景
公司郵件服務token有 分鐘內超200封的熔斷機制,當前token被熔斷后,系統發郵件操作會被忽略,所以郵件服務也沒有重試操作
人工發現token被熔斷后,需要聯系郵件群中值班人,將token恢復
分貨業務依賴郵件來查看分貨通知以及結果,并且分貨層層依賴,如果不能及時收到郵件會影響業務的分貨時效等,所以通過三個方面去解決這個問題
技術方向
系統內發郵件收口做限流
方案確認
方向:限流發郵件方法1分鐘內最大200次
實現:改造系統發郵件底層方法,1分鐘內最多發200個
消息隊列(√)
面臨問題:多出來的怎么處理?消息隊列(需要持久化)
實現:新建一個topic,調用發郵件方法的請求全部扔到MQ中,自己消費,通過設置消費者的拉取間隔以及最大拉取數量限制,分鐘內消費消息條數不超過200條
面臨問題:多分區多消費者?
實現:默認拉取數量為32,目前MQ服務端設置,限制最大拉取數量為32
(可行)設置1個分區,一個消費者組,目前有2個實例(此時其中一個實例不會消費),設置拉取間隔為10s
(不可行,有自動加實例機制)設置2個分區,一個消費者組,目前有2個實例,設置拉取間隔為10s,最大拉取條數為16;系統在流量激增的情況下會增加實例來分攤流量
最終實現方式
topic設置1個分區,一個消費者組,使用默認負載均衡策略:平均分配
//平均分配負載均衡核心邏輯
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);for(int i = 0; i < range; ++i) {result.add(mqAll.get((startIndex + i) % mqAll.size()));
}return result;
解析兩個實例負載均衡過程
//第一個實例起來,觸發負載均衡
//index = 0
int index = cidAll.indexOf(currentCID);
//mod = 1%2 = 1
int mod = mqAll.size() % cidAll.size();
//averageSize = 1
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
//startIndex = 0 * 1 = 0
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
//range = min(1, 1 - 0) = 1
int range = Math.min(averageSize, mqAll.size() - startIndex);for(int i = 0; i < range; ++i) {//(0+0)%1 = 0,所以將第一個分區分給當前實例result.add(mqAll.get((startIndex + i) % mqAll.size()));
}//第二個實例起來,觸發負載均衡
//index = 1
int index = cidAll.indexOf(currentCID);
//mod = 1%2 = 1
int mod = mqAll.size() % cidAll.size();
//averageSize = 1
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
//startIndex = 1 * 1 + 1 = 2
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
//range = min(1, 1 - 2) = -1
int range = Math.min(averageSize, mqAll.size() - startIndex);//不會進入循環分配分區
for(int i = 0; i < range; ++i) {result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
所以只會有一個實例去消費當前這個分區,在集群消費模式下,一個分區只會被消費組內的一個消費者消費,rmq默認拉取數量為32,設置拉取間隔為10s,所以每分鐘內消費:32*6 = 192
分布式鎖(×)
當前場景的點,在于需要將超出1分鐘兩百條的那些郵件持久化存儲,等到下一個一分鐘去發送,而分布式鎖只能實現控制接口的流量,沒法保證超出流量那部分的存儲,所以沒法解決當前問題
方案實現
最終采用消息隊列,RocketMQ解決該問題
實現代碼,使用Java SDK,設置拉取間隔為10s即可
public void run(String... args) throws Exception {Properties properties = new Properties();properties.setProperty(ConfigKey.CONSUMER_GROUP, emailNotifyMqProperties.getConsumerGroup());properties.setProperty(ConfigKey.ACCESS_KEY, rocketMqProperties.getAccessKey());properties.setProperty(ConfigKey.SECRET_KEY, rocketMqProperties.getSecretKey());properties.setProperty(ConfigKey.NAME_SERVER_ADDR, rocketMqProperties.getServer());properties.setProperty(ConfigKey.ENABLE_MSG_TRACE, "true");//消費限流:解決發郵件分鐘內超過200封會被熔斷的問題properties.setProperty(ConfigKey.PULL_INTERVAL, "10000");NormalConsumer consumer = ClientFactory.createNormalConsumer(properties, this::consumeMessage);consumer.subscribe(emailNotifyMqProperties.getTopic(), null);consumer.start();
}
監控方向
1. 系統日志報警,配置郵件發送失敗報警
2. 關注token熔斷消息通知
業務方向
梳理當前系統中郵件通知的場景,分析報警內容,從以下方向減少郵件次數發送
1. 用戶是否需要關注(用戶長時間使用下來,部分通知發現自己并不關注的,比如節點報錯可重試成功的
2. 是否可以批量發送(多條通知集合到一條郵件發送:多用戶,多單據等)