JAVA接口調用限速器

目錄

1、并發限速

2、串行限速??


?

需求:批量調用第三方ERP接口,對方接口限流時,減緩調用速率。

1、并發限速


@Slf4j
@RestController
public class ApiCallTask {//第三方接口@Resourceprivate ErpService erpService;//異步線程池@Resourceprivate ThreadPoolTaskExecutor taskExecutor;//定時調度器@Resourceprivate ThreadPoolTaskScheduler taskScheduler;private static final BlockingQueue<Seller> sellerQueue = new LinkedBlockingQueue<>(1000);private static final RateLimiter rateLimiter = RateLimiter.create(200.0 / 60.0);private static final ObjectMapper objectMapper = new ObjectMapper();private static final int MAX_RETRY_COUNT = 5;private static final int BATCH_SIZE = 10;@Scheduled(cron = "0 0 2 * * ?")@RequestMapping(value = "/jobAfterSalesSync")public ResponseEntity<String> jobAfterSalesSync() {log.info("開始同步商家售后數據...");Map<String, String> queryMap = Maps.newHashMap();queryMap.put("status", "2");List<商家seller> sellerList = erpService.getSellerList(queryMap);List<商家seller> sellerList = sellerList  != null ? sellerList  : new ArrayList<>();log.info("共 {} 個商家待處理", sellerList.size());for (Seller seller : sellerList) {if (!sellerQueue.offer(seller)) {log.warn("隊列已滿,商家 {} 未加入隊列", seller.getSellerName());} else {log.debug("商家 {} 已加入隊列", seller.getSellerName());}}processBatch(); // 啟動分批處理log.info("任務已提交,線程池活躍線程數: {}", taskExecutor.getActiveCount());return ResponseEntity.ok("任務已觸發");}/*** 異步處理任務* taskScheduler與rateLimiter的分工*      processBatch 中每批完成后等待 3 秒再調度下一批,這是批次之間的宏觀控制。Instant.now().plusSeconds(3)*      rateLimiter.acquire():在每批內部的 10 個任務中,控制每個 API 調用的微觀速率。* 隊列作用*      processBatch 在每次批次完成后檢查 sellerQueue.isEmpty()。如果隊列非空,通過 taskScheduler.schedule 調度下一次 processBatch,形成遞歸調用。保證所有seller都被處理*      限流(801)時,handleRetry 確保 Seller 被重新加入 sellerQueue。即使隊列滿,也通過延遲重試保證任務不丟失。* CompletableFuture作用*      CompletableFuture 是對傳統 Future 的增強,支持鏈式調用、異常處理和任務組合,用于異步執行 callErpApi,實現每批 10 個 Seller 的并發處理。*      將 callErpApi 的執行從主線程中分離出來,提交給線程池(如 taskExecutor)異步運行。*      submit 方法返回一個 Future 對象(這里未使用返回值),表示任務已交給線程池處理。*      CompletableFuture.runAsync 創建異步任務,執行 callErpApi。在批處理中,每個 Seller 的 API 調用是獨立的異步任務。*      futures 收集所有任務的 CompletableFuture 實例。*      使用 CompletableFuture.allOf 等待一批任務全部完成,然后觸發后續操作(如調度下一批)。*      通過 .exceptionally 或 .whenComplete 處理異步任務中的異常,確保任務鏈不會因錯誤中斷。* taskExecutor 的整體作用*      異步執行:*      將 processBatch 和 callErpApi 從主線程(定時任務或 HTTP 請求線程)中分離出來,避免阻塞主線程。*      例如,HTTP 請求可以快速響應,而實際處理在后臺進行。*      并發處理:*      在 processBatch 中,10 個 callErpApi 任務可以并行執行(取決于線程池大小),提高處理效率。*      例如,如果線程池核心線程數為 10,則每批 10 個任務可以同時運行。*      與 taskScheduler 的分工:*      taskExecutor:負責執行具體的任務(processBatch 和 callErpApi)。*      taskScheduler:負責調度任務的執行時間(例如批次間隔 3 秒或限流重試延遲)。*      與代碼目標的關系*      分批執行:taskExecutor 使每批 10 個任務并發運行。*      持續執行:與 taskScheduler 配合,確保隊列非空時任務持續調度。*      限流控制:rateLimiter 限制速率,taskExecutor 提供并發支持,二者結合實現高效且受控的處理。*/private void processBatch() {if (sellerQueue.isEmpty()) {log.info("隊列處理完成,剩余大小: {}", sellerQueue.size());return;}List<Seller> batch = new ArrayList<>();int drained = sellerQueue.drainTo(batch, BATCH_SIZE); // 取出最多 10 個,出隊log.info("從隊列中取出 {} 個元素,開始處理", drained);List<CompletableFuture<Void>> futures = new ArrayList<>();/**** 對于每批的 10 個 Seller,使用 CompletableFuture.runAsync 將 callErpApi(seller) 提交到 taskExecutor 執行。* runAsync 的第二個參數指定了執行器(taskExecutor),確保這些任務在 taskExecutor 的線程池中并行運行。*/for (Seller seller : batch) {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> callErpApi(seller), taskExecutor);futures.add(future);}// 等待當前批次所有任務完成。在批次完成后執行回調,檢查隊列并調度下一次 processBatch(延遲 3 秒)。CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((result, throwable) -> {if (throwable != null) {log.error("批處理異常: {}", throwable.getMessage());}log.info("完成一批處理,剩余隊列大小: {}", sellerQueue.size());if (!sellerQueue.isEmpty()) {// 只要 sellerQueue 中還有元素,processBatch 會在每次批次完成后通過 taskScheduler.schedule 重新調用自己。//限流(801 錯誤)時,handleRetry 會將 Seller 重新加入 sellerQueue,保持隊列非空。//每次批次完成后,只要隊列非空,就延遲 3 秒調度下一批,直到隊列為空。taskScheduler.schedule(this::processBatch, Instant.now().plusSeconds(3));} else {log.info("隊列已空,任務結束");}});}//調用ERP接口private void callErpApi(Seller seller) {rateLimiter.acquire();try {String response = request(seller);if (StringUtils.isBlank(response)) {log.info("商家 {} 處理成功(隊列剩余: {})", seller.getSellerName(), sellerQueue.size());} else {JsonNode jsonResponse = objectMapper.readTree(response);if (jsonResponse.has("code") && jsonResponse.get("code").asInt() == 801) {int waitTime = extractWaitTime(jsonResponse.get("message").asText());log.warn("限流,商家 {} 暫停 {} 秒后重試", seller.getSellerName(), waitTime);//限流時,sellerQueue.offer(seller) 嘗試入隊。如果隊列滿,延遲 waitTime 秒后重試。handleRetry(seller, waitTime);} else {log.warn("其他錯誤,商家: {}, 接口返回: {},視為成功", seller.getSellerName(), response);}}} catch (Exception e) {log.error("API 調用異常,商家: {},視為成功", seller.getSellerName(), e);}}//重試處理private void handleRetry(Seller seller, int waitTime) {if (seller.getRetryCount() < MAX_RETRY_COUNT) {seller.incrementRetry();boolean requeued = sellerQueue.offer(seller);//入隊if (requeued) {log.info("商家 {} 重試次數: {},已重新入隊,等待下次批處理", seller.getSellerName(), seller.getRetryCount());} else {log.warn("隊列已滿,商家 {} 延遲 {} 秒后重試", seller.getSellerName(), waitTime);taskScheduler.schedule(() -> handleRetry(seller, waitTime), Instant.now().plusSeconds(waitTime));}} else {log.error("商家 {} 達到最大重試次數 {},丟棄", seller.getSellerName(), MAX_RETRY_COUNT);}}//獲取限速接口中等待時間private static int extractWaitTime(String message) {Pattern pattern = Pattern.compile("(\\d+)\\s*秒");Matcher matcher = pattern.matcher(message);return matcher.find() ? Integer.parseInt(matcher.group(1)) : 30;}//請求接口public String request(Seller seller) {Map<String, Object> params = Maps.newHashMap();String[] range = DateUtil.getDateRange(14);params.put("limit", 200);params.put("page", 1);params.put("start_time", range[0]);params.put("end_time", range[1]);params.put("shop_nick", seller.getSellerName());try {String result = erpService.afterSalesData(params);return  result != null ? result  : "";} catch (Exception e) {log.error("請求 API 失敗,商家: {}", seller.getSellerName(), e);return "{}";}}}

2、串行限速??

以上代碼仍然有限速問題,調用接口限速頻率太高,改造并優化。


@Slf4j
@RestController
public class ErpApiCallTask {@Resourceprivate ErpService erpService;@Resourceprivate ThreadPoolTaskExecutor taskExecutor;@Resourceprivate ThreadPoolTaskScheduler taskScheduler;private static final BlockingQueue<Seller> sellerQueue = new LinkedBlockingQueue<>(1000);private static final RateLimiter rateLimiter = RateLimiter.create(1.0 / 5.0); // 每 5 秒 1 次private static final ObjectMapper objectMapper = new ObjectMapper();private static final int MAX_RETRY_COUNT = 5;private static final long WAIT_INTERVAL = 30000; // 等待 30 秒檢查新入隊元素@Scheduled(cron = "0 0 2 * * ?")@RequestMapping(value = "/jobAfterSalesSync")public ResponseEntity<String> jobAfterSalesSync() {log.info("開始從ERP系統同步商家售后數據...");Map<String, String> queryMap = Maps.newHashMap();queryMap.put("status", "2");List<商家seller> sellerList = erpService.getSellerList(queryMap);log.info("共 {} 個商家待處理", sellerList.size());for (Seller seller : sellerList) {if (!sellerQueue.offer(seller)) {log.warn("隊列已滿,商家 {} 未加入隊列,當前隊列大小: {}", seller.getSellerName(), sellerQueue.size());} else {log.debug("商家 {} 已加入隊列,當前隊列大小: {}", seller.getSellerName(), sellerQueue.size());}}log.info("隊列初始化完成,當前隊列大小: {}", sellerQueue.size());// 異步啟動處理taskExecutor.submit(this::processQueue);log.info("任務已提交,線程池活躍線程數: {}", taskExecutor.getActiveCount());return ResponseEntity.ok("任務已觸發");}/*** 串行處理隊列,使用 RateLimiter 控制每 5 秒 1 次調用,隊列為空時等待新入隊元素* 移除批處理和并發:原代碼按批次處理(每次 10 個),并通過 CompletableFuture 并發執行。現在改為 processQueue,串行處理隊列中的每個 Seller。* 移除 CompletableFuture:不需要并發,直接在單線程中順序調用 callErpApi。* 串行執行:使用 orderQueue.poll() 逐個取出 Seller,每次處理一個后等待 5 秒。* 保留 taskExecutor.submit:異步啟動處理,避免阻塞主線程(定時任務或 HTTP 請求)。處理邏輯在后臺線程中串行執行。* 使用 RateLimiter 控制速率,在每次調用 callErpApi 前獲取令牌,確保5秒最多 1 次調用。* 相比 Thread.sleep(1000),RateLimiter 更靈活,能動態調整速率并處理突發請求。* 限流處理,限流(801)時,延遲 waitTime 秒后重新入隊。隊列滿時遞歸重試,確保任務不丟失。* 新增等待機制:當 orderQueue.isEmpty() 時,不直接退出,而是等待 WAIT_INTERVAL(秒),然后再次檢查隊列。如果等待后隊列仍為空,設置 hasMoreTasks = false,結束循環;否則繼續處理。* 新增標志變量 hasMoreTasks:用布爾變量控制外層循環,避免無限等待。*/private void processQueue() {boolean hasMoreTasks = true;while (hasMoreTasks) {if (!sellerQueue.isEmpty()) {log.info("開始處理隊列,當前隊列大小: {}", sellerQueue.size());Seller seller = sellerQueue.poll(); // 取出隊列頭部元素,if (seller != null) {log.info("從隊列中取出商家: {},剩余隊列大小: {}", seller.getSellerName(), sellerQueue.size());rateLimiter.acquire(); // 獲取令牌,控制速率callErpApi(seller);}} else {log.info("隊列當前為空,等待 {} 毫秒檢查新入隊元素", WAIT_INTERVAL);try {Thread.sleep(WAIT_INTERVAL); // 等待一段時間,檢查是否有新元素} catch (InterruptedException e) {log.error("等待被中斷", e);Thread.currentThread().interrupt();}if (sellerQueue.isEmpty()) {log.info("等待后隊列仍為空,任務結束");hasMoreTasks = false; // 隊列仍為空,結束循環} else {log.info("檢測到新入隊元素,繼續處理,當前隊列大小: {}", sellerQueue.size());}}}log.info("隊列處理完成,剩余大小: {}", sellerQueue.size());}private void callErpApi(Seller seller) {try {String response = request(seller);if (StringUtils.isBlank(response)) {log.info("商家 {} 處理成功,當前隊列大小: {}", seller.getSellerName(), sellerQueue.size());} else {JsonNode jsonResponse = objectMapper.readTree(response);if (jsonResponse.has("code") && jsonResponse.get("code").asInt() == 801) {int waitTime = extractWaitTime(jsonResponse.get("message").asText());log.warn("限流,商家 {} 暫停 {} 秒后重試,當前隊列大小: {}",seller.getSellerName(), waitTime, sellerQueue.size());handleRetry(seller, waitTime);} else {log.warn("其他錯誤,商家: {},接口返回: {},視為成功,當前隊列大小: {}",seller.getSellerName(), response, sellerQueue.size());}}} catch (Exception e) {log.error("API 調用異常,商家: {},視為成功,當前隊列大小: {}",seller.getSellerName(), sellerQueue.size(), e);}}private void handleRetry(Seller seller, int waitTime) {if (seller.getRetryCount() < MAX_RETRY_COUNT) {seller.incrementRetry();// 延遲 waitTime 秒后重新入隊taskScheduler.schedule(() -> {boolean requeued = sellerQueue.offer(seller);if (requeued) {log.info("商家 {} 重試次數: {},已重新入隊,當前隊列大小: {}",seller.getSellerName(), seller.getRetryCount(), sellerQueue.size());} else {log.warn("隊列已滿,商家 {} 延遲 {} 秒后重試,當前隊列大小: {}",seller.getSellerName(), waitTime, sellerQueue.size());handleRetry(seller, waitTime); // 遞歸重試}}, Instant.now().plusSeconds(waitTime));} else {log.error("商家 {} 達到最大重試次數 {},丟棄,當前隊列大小: {}",seller.getSellerName(), MAX_RETRY_COUNT, sellerQueue.size());}}private static int extractWaitTime(String message) {Pattern pattern = Pattern.compile("(\\d+)\\s*秒");Matcher matcher = pattern.matcher(message);return matcher.find() ? Integer.parseInt(matcher.group(1)) : 30;}public String request(Seller seller) {Map<String, Object> params = Maps.newHashMap();String[] range = DateUtil.getDateRange(14);params.put("limit", 200);params.put("page", 1);params.put("start_time", range[0]);params.put("end_time", range[1]);params.put("shop_nick", seller.getSellerName());try {String result = erpService.afterSalesSyncHandler(params);return result  != null ? result  : "";} catch (Exception e) {log.error("請求 API 失敗,商家: {}", seller.getSellerName(), e);return "{}";}}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/899622.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/899622.shtml
英文地址,請注明出處:http://en.pswp.cn/news/899622.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

STM32 CAN控制器硬件資源與用法

1、硬件結構圖 以STM32F4為例&#xff0c;他有2個can控制器&#xff0c;分別為 CAN1 CAN2。 每個CAN控制器&#xff0c;都有3個發送郵箱、2個接收fifo&#xff0c;每個接收fifo又由3個接收郵箱組成。也即每個CAN控制器都有9個郵箱&#xff0c;其中3個供發送用&#xff0c;3個…

【C++ 繼承】—— 青花分水、和而不同,繼承中的“明明德”與“止于至善”

歡迎來到ZyyOvO的博客?&#xff0c;一個關于探索技術的角落&#xff0c;記錄學習的點滴&#x1f4d6;&#xff0c;分享實用的技巧&#x1f6e0;?&#xff0c;偶爾還有一些奇思妙想&#x1f4a1; 本文由ZyyOvO原創??&#xff0c;感謝支持??&#xff01;請尊重原創&#x1…

Qt warning LNK4042: 對象被多次指定;已忽略多余的指定

一、常規原因&#xff1a; pro或pri 文件中源文件被多次包含 解決&#xff1a;刪除變量 SOURCES 和 HEADERS 中重復條目 二、誤用 對于某些pri庫可以使用如下代碼簡寫包含 INCLUDEPATH $$PWDHEADERS $$PWD/*.hSOURCES $$PWD/*.cpp但是假如該目錄下只有頭文件&#xff0c;沒…

Visual Studio Code 無法打開源文件解決方法

&#x1f308; 個人主頁&#xff1a;Zfox_ &#x1f525; 系列專欄&#xff1a;Linux &#x1f525; 系列專欄&#xff1a;C從入門到精通 目錄 一&#xff1a;&#x1f525; 突發狀況 二&#xff1a;&#x1f525; 共勉 一&#xff1a;&#x1f525; 突發狀況 &#x1f42c;…

js文字兩端對齊

目錄 一、問題 二、原因及解決方法 三、總結 一、問題 1.text-align: justify; 不就可以了嗎&#xff1f;但是實際測試無效 二、原因及解決方法 1.原因&#xff1a;text-align只對非最后一行文字有效。只有一行文字時&#xff0c;text-align無效&#xff0c;要用text-alig…

LeetCode算法題(Go語言實現)_20

題目 給你兩個下標從 0 開始的整數數組 nums1 和 nums2 &#xff0c;請你返回一個長度為 2 的列表 answer &#xff0c;其中&#xff1a; answer[0] 是 nums1 中所有 不 存在于 nums2 中的 不同 整數組成的列表。 answer[1] 是 nums2 中所有 不 存在于 nums1 中的 不同 整數組成…

每天認識一個設計模式-橋接模式:在抽象與實現的平行宇宙架起彩虹橋

一、前言&#xff1a;虛擬機橋接的啟示 使用過VMware或者Docker的同學們應該都接觸過網絡橋接&#xff0c;在虛擬機網絡配置里&#xff0c;橋接模式是常用的網絡連接方式。選擇橋接模式時&#xff0c;虛擬機會通過虛擬交換機與物理網卡相連&#xff0c;獲取同網段 IP 地址&…

java筆記02

運算符 1.隱式轉換和強制轉換 類型轉換的分類 1.隱式轉換&#xff1a; 取值范圍小的數值 轉換為 取值范圍大的數值 2.強制轉換&#xff1a; 取值范圍大的數值 轉換為 取值范圍小的數值隱式轉換的兩種提升規則 取值范圍小的&#xff0c;和取值范圍大的進行運算&#xff0c;小的…

Redis-07.Redis常用命令-集合操作命令

一.集合操作命令 SADD key member1 [member2]&#xff1a; sadd set1 a b c d sadd set1 a 0表示沒有添加成功&#xff0c;因為集合中已經有了這個元素了&#xff0c;因此無法重復添加。 SMEMBERS key: smembers set1 SCARD key&#xff1a; scard set1 SADD key member1 …

李飛飛、吳佳俊團隊新作:FlowMo如何以零卷積、零對抗損失實現ImageNet重構新巔峰

目錄 一、摘要 二、引言 三、相關工作 四、方法 基于擴散先前的離散標記化器利用廣告 架構 階段 1A&#xff1a;模式匹配預訓練 階段 1B&#xff1a;模式搜索后訓練 采樣 第二階段&#xff1a;潛在生成建模 五、Coovally AI模型訓練與應用平臺 六、實驗 主要結果 …

CSS3:現代Web設計的魔法卷軸

一、布局革命&#xff1a;從平面到多維空間 1.1 Grid布局的次元突破 星際戰艦布局系統 .galaxy {display: grid;grid-template-areas: "nav nav nav""sidebar content ads""footer footer footer";grid-template-rows: 80px 1fr 120p…

美觀快速的react 的admin框架

系統特色&#xff1a; - &#x1f3a8; 精心設計的UI主題系統&#xff0c;提供優雅的配色方案和視覺體驗 - &#x1f4e6; 豐富完整的組件庫&#xff0c;包含大量開箱即用的高質量組件 - &#x1f528; 詳盡的組件使用示例&#xff0c;降低開發者的學習成本 - &#x1f680…

【C++】 string底層封裝的模擬實現

目錄 前情提要Member functions —— 成員函數構造函數拷貝構造函數賦值運算符重載析構函數 Element access —— 元素訪問Iterator —— 迭代器Capacity —— 容量sizecapacityclearemptyreserveresize Modifiers —— 修改器push_backappendoperator(char ch)operator(const …

計算機網絡相關知識小結

計算機網絡 1.計算機網絡&#xff1a;獨立計算機&#xff0c;通信線路連接&#xff0c;實現資源共享 2.組成&#xff1a;資源子網和通信子網 3.拓撲分類 4.范圍&#xff1a;LAN, MAN. WAN 5、有線和無線 6.按照方向&#xff1a;單工、雙工&#xff0c;全雙工 7.傳輸對象方式&a…

16-CSS3新增選擇器

知識目標 掌握屬性選擇器的使用掌握關系選擇器的使用掌握結構化偽類選擇器的使用掌握偽元素選擇器的使用 如何減少文檔內class屬性和id屬性的定義&#xff0c;使文檔變得更加簡潔&#xff1f; 可以通過屬性選擇器、關系選擇器、結構化偽類選擇器、偽元素選擇器。 1. 屬性選擇…

【彈性計算】異構計算云服務和 AI 加速器(四):FPGA 虛擬化技術

《異構計算云服務和 AI 加速器》系列&#xff0c;共包含以下文章&#xff1a; 異構計算云服務和 AI 加速器&#xff08;一&#xff09;&#xff1a;功能特點異構計算云服務和 AI 加速器&#xff08;二&#xff09;&#xff1a;適用場景異構計算云服務和 AI 加速器&#xff08;…

Java進階——位運算

位運算直接操作二進制位&#xff0c;在處理底層數據、加密算法、圖像處理等領域具有高效性能和效率。本文將深入探討Java中的位運算。 本文目錄 一、位運算簡介1. 與運算2. 或運算異或運算取反運算左移運算右移運算無符號右移運算 二、位運算的實際應用1. 權限管理2. 交換兩個變…

OpenAI深夜直播「偷襲」谷歌!GPT-4o原生圖像生成:奧特曼帶梗圖,AGI戰場再燃戰火

引言&#xff1a;AI戰場的「閃電戰」 當谷歌剛剛發布「地表最強」Gemini 2.5 Pro時&#xff0c;OpenAI立即以一場深夜直播「閃電反擊」——GPT-4o的原生圖像生成功能正式上線&#xff01;從自拍變梗圖到相對論漫畫&#xff0c;奧特曼&#xff08;OpenAI團隊&#xff09;用一連…

鴻蒙harmonyOS:筆記 正則表達式

從給出的文本中&#xff0c;按照既定的相關規則&#xff0c;匹配出符合的數據&#xff0c;其中的規則就是正則表達式&#xff0c;使用正則表達式&#xff0c;可以使得我們用簡潔的代碼就能實現一定復雜的邏輯&#xff0c;比如判斷一個郵箱賬號是否符合正常的郵箱賬號&#xff0…

[首發]烽火HG680-KD-海思MV320芯片-2+8G-安卓9.0-強刷卡刷固件包

烽火HG680-KD-海思MV320芯片-28G-安卓9.0-強刷卡刷固件包 U盤強刷刷機步驟&#xff1a; 1、強刷刷機&#xff0c;用一個usb2.0的8G以下U盤&#xff0c;fat32&#xff0c;2048塊單分區格式化&#xff08;強刷對&#xff35;盤非常非常挑剔&#xff0c;usb2.0的4G U盤兼容的多&a…