智能BI(后端)-- 系統異步化

文章目錄

  • 系統問題分析
  • 什么是異步化?
  • 業務流程分析
    • 標準異步化的業務流程
    • 系統業務流程
  • 線程池
    • 為什么需要線程池?
    • 線程池兩種實現方式
    • 線程池的參數
    • 線程池的開發
  • 項目異步化改造

系統問題分析

問題場景:調用的服務能力有限,或者接口的處理(或返回)時長較長時,就應該考慮異步化了

什么是異步化?

不用等一件事做完,就可以做另外一件事,等第一件事完成時,可以收到一個通知

業務流程分析

標準異步化的業務流程

  1. 當用戶要進行耗時很長的操作時,點擊提交后,不需要在界面空等,而是應該把這個任務保存到數據庫中記錄下來
  2. 用戶要執行新任務時:
    a. 任務提交成功:
    ⅰ. 若程序存在空閑線程,可以立即執行此任務
    ⅱ. 若所有線程均繁忙,任務將入隊列等待處理
    b. 任務提交失敗:比如所有線程都在忙碌且任務隊列滿了
    ⅰ.選擇拒絕此任務,不再執行
    ⅱ.通過查閱數據庫記錄,發現提交失敗的任務,并在程序空閑時將這些任務取出執行
  3. 程序(線程)從任務隊列中取出任務依次執行,每完成一項任務,就更新任務狀態。
  4. 用戶可以查詢任務的執行狀態,或者在任務執行成功或失敗時接收通知(例如:發郵件、系統消息提示或短信),從而優化體驗
  5. 對于復雜且包含多個環節的任務,在每個小任務完成時,要在程序(數據庫中))記錄任務的執行狀態(進度)。

系統業務流程

  1. 用戶點擊智能分析頁提交按鈕時,先把圖表立刻保存到數據庫中(作為一個任務)
  2. 用戶可以在圖表管理查看所有圖表(已生成的,生成中的,生成失敗的)的信息和狀態
  3. 用戶可以修改生成失敗的圖表信息,點擊重新生成,以嘗試再次創建圖表
    在這里插入圖片描述

問題分析?

  1. 任務隊列的最大容量應該設置為多少
  2. 程序怎么從任務隊列中取出任務去執行?這個任務隊列的流程怎么實現?怎么保證程序最多同時執行多少個任務?

線程池實現

線程池

為什么需要線程池?

  1. 線程的管理比較復雜
  2. 任務存取比較復雜
  3. 線程池可以幫你輕松管理線程,協調任務的執行過程

線程池兩種實現方式

  1. Spring中,可以用ThreadPoolTaskExrcutor配合@Async注解來實現(不推薦)
  2. 在Java中,可以使用JUC并發編程包中的ThreadPoolExecutor來實現非常靈活地自定義線程池

線程池的參數

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {

現狀:AI生成能力的并發是只允許4個任務同時去執行,AI能力允許20個任務排隊
corePoolSize(核心線程數):正常情況下,我們的系統應該能同時工作的線程數
maximumPoolSize(最大線程數):極限情況下,我們的線程池所擁有的線程
keepAliveTime(空閑線程存活時間):非核心線程在沒有任務的情況下,過多久要刪除,從而釋放無用的線程資源
unit(空閑線程存活時間的單位):分鐘,秒
workQueue(工作隊列):用于存放給線程執行的任務,存在一個隊列的長度(一定要設置)
threadFactory(線程工廠):控制每個線程的生成,線程的屬性
RejectedExecutionHandler(拒絕策略):任務隊列滿的時候,我們采取什么措施

線程池的開發

自定義線程池配置

package com.yupi.springbootinit.config;import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Configuration
public class ThreadPoolExecutorConfig {@Beanpublic ThreadPoolExecutor threadPoolExecutor(){// 創建一個線程工廠ThreadFactory threadFactory = new ThreadFactory(){// 初始化線程數為 1private int count = 1;// 創建一個新的線程@Override// 每當線程池需要創建新線程時,就會調用newThread方法// @NotNull Runnable r 表示方法參數 r 應該永遠不為null,public Thread newThread(@NotNull Runnable r) {Thread thread = new Thread(r);thread.setName("線程" + count ++);return thread;}};// 創建一個新的線程池,線程池核心大小為2,最大線程數為4,// 非核心線程空閑時間為100秒,任務隊列為阻塞隊列,長度為4,使用自定義的線程工廠創建線ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,4,100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(4),threadFactory);return threadPoolExecutor;}
}

測試controller層(注意線上環境不要暴露出去)

package com.yupi.springbootinit.controller;import cn.hutool.json.JSONUtil;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Profile;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;/*** 隊列測試controller*/
@RestController
@RequestMapping("/queue")
@Slf4j
@Profile({"dev","local"}) // 只在開發環境和本地環境生效
public class QueueController {@Resourceprivate ThreadPoolExecutor threadPoolExecutor;@GetMapping("/add")// 接收一個參數name,然后將任務添加到線程池中public void add(String name){// 使用CompletableFuture運行一個異步任務CompletableFuture.runAsync(()->{log.info("任務執行中:" + name + "執行人:" + Thread.currentThread().getName());try {// 讓線程休眠10分鐘,模擬長時間運行的任務Thread.sleep(600000);} catch (InterruptedException e) {throw new RuntimeException(e);} 異步任務在threadPoolExecutor中執行},threadPoolExecutor);}@GetMapping("/get")public String  get(){Map<String, Object> map = new HashMap<>();int size = threadPoolExecutor.getQueue().size();map.put("隊列長度",size);long taskCount = threadPoolExecutor.getTaskCount();map.put("任務總數",taskCount);long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();map.put("已完成的任務總數",completedTaskCount);int activeCount = threadPoolExecutor.getActiveCount();map.put("正在工作的線程數",activeCount);return JSONUtil.toJsonStr(map);}
}

項目異步化改造

/*** 智能分析(異步)** @param multipartFile* @param genChartByAiRequest* @param request* @return*/
@PostMapping("/gen/async")
public BaseResponse<BiResponse> genChartByAiAsync(@RequestPart("file") MultipartFile multipartFile,GenChartByAiRequest genChartByAiRequest, HttpServletRequest request) {String name = genChartByAiRequest.getName();String goal = genChartByAiRequest.getGoal();String chartType = genChartByAiRequest.getChartType();// 校驗ThrowUtils.throwIf(StringUtils.isBlank(goal), ErrorCode.PARAMS_ERROR, "目標為空");ThrowUtils.throwIf(StringUtils.isNotBlank(name) && name.length() > 100, ErrorCode.PARAMS_ERROR, "名稱過長");// 校驗文件long size = multipartFile.getSize();String originalFilename = multipartFile.getOriginalFilename();// 校驗文件大小final long ONE_MB = 1024 * 1024L;ThrowUtils.throwIf(size > ONE_MB, ErrorCode.PARAMS_ERROR, "文件超過 1M");// 校驗文件大小綴 aaa.pngString suffix = FileUtil.getSuffix(originalFilename);final List<String> validFileSuffixList = Arrays.asList("xlsx", "xls");ThrowUtils.throwIf(!validFileSuffixList.contains(suffix), ErrorCode.PARAMS_ERROR, "文件后綴非法");User loginUser = userService.getLoginUser(request);// 限流判斷,每個用戶一個限流器redisLimiterManager.doRateLimit("genChartByAi_" + loginUser.getId());// 指定一個模型id(把id寫死,也可以定義成一個常量)long biModelId = 1659171950288818178L;// 分析需求:// 分析網站用戶的增長情況// 原始數據:// 日期,用戶數// 1號,10// 2號,20// 3號,30// 構造用戶輸入StringBuilder userInput = new StringBuilder();userInput.append("分析需求:").append("\n");// 拼接分析目標String userGoal = goal;if (StringUtils.isNotBlank(chartType)) {userGoal += ",請使用" + chartType;}userInput.append(userGoal).append("\n");userInput.append("原始數據:").append("\n");// 壓縮后的數據String csvData = ExcelUtils.excelToCsv(multipartFile);userInput.append(csvData).append("\n");// 先把圖表保存到數據庫中Chart chart = new Chart();chart.setName(name);chart.setGoal(goal);chart.setChartData(csvData);chart.setChartType(chartType);// 插入數據庫時,還沒生成結束,把生成結果都去掉
//        chart.setGenChart(genChart);
//        chart.setGenResult(genResult);// 設置任務狀態為排隊中chart.setStatus("wait");chart.setUserId(loginUser.getId());boolean saveResult = chartService.save(chart);ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "圖表保存失敗");// 在最終的返回結果前提交一個任務// todo 建議處理任務隊列滿了后,拋異常的情況(因為提交任務報錯了,前端會返回異常)CompletableFuture.runAsync(() -> {// 先修改圖表任務狀態為 “執行中”。等執行成功后,修改為 “已完成”、保存執行結果;執行失敗后,狀態修改為 “失敗”,記錄任務失敗信息。(為了防止同一個任務被多次執行)Chart updateChart = new Chart();updateChart.setId(chart.getId());// 把任務狀態改為執行中updateChart.setStatus("running");boolean b = chartService.updateById(updateChart);// 如果提交失敗(一般情況下,更新失敗可能意味著你的數據庫出問題了)if (!b) {handleChartUpdateError(chart.getId(), "更新圖表執行中狀態失敗");return;}// 調用 AIString result = aiManager.doChat(biModelId, userInput.toString());String[] splits = result.split("【【【【【");if (splits.length < 3) {handleChartUpdateError(chart.getId(), "AI 生成錯誤");return;}String genChart = splits[1].trim();String genResult = splits[2].trim();// 調用AI得到結果之后,再更新一次Chart updateChartResult = new Chart();updateChartResult.setId(chart.getId());updateChartResult.setGenChart(genChart);updateChartResult.setGenResult(genResult);updateChartResult.setStatus("succeed");boolean updateResult = chartService.updateById(updateChartResult);if (!updateResult) {handleChartUpdateError(chart.getId(), "更新圖表成功狀態失敗");}},threadPoolExecutor);BiResponse biResponse = new BiResponse();
//        biResponse.setGenChart(genChart);
//        biResponse.setGenResult(genResult);biResponse.setChartId(chart.getId());return ResultUtils.success(biResponse);
}
// 上面的接口很多用到異常,直接定義一個工具類
private void handleChartUpdateError(long chartId, String execMessage) {Chart updateChartResult = new Chart();updateChartResult.setId(chartId);updateChartResult.setStatus("failed");updateChartResult.setExecMessage(execMessage);boolean updateResult = chartService.updateById(updateChartResult);if (!updateResult) {log.error("更新圖表失敗狀態失敗" + chartId + "," + execMessage);
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/13177.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/13177.shtml
英文地址,請注明出處:http://en.pswp.cn/web/13177.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

離岸公司+外貿

為什么外貿公司老板都喜歡注冊離岸公司呢&#xff1f;怎樣利用離岸公司做進出口貿易呢&#xff1f; 今天大家花一分鐘時間來了解清楚 第一步就是注冊一家離岸公司&#xff0c;將這個離岸公司作為國際外貿的中轉站&#xff0c;與國外客戶簽訂單&#xff0c;你從國內工廠采購商…

【文檔理解】TextMonkey:一種OCR-Free的用于文檔理解的多模態大模型

背景 傳統的信息提取&#xff0c;通常是從文本中提取信息&#xff0c;相關技術也比較成熟。然而對于復雜領域&#xff0c;例如圖片&#xff0c;文檔等形式的數據&#xff0c;想要提取出高質量的、可信的數據難度就比較大了&#xff0c;這種任務也常稱為&#xff1a;視覺文檔理…

CTF網絡安全大賽web題目:just_sqli

這道題目是bugku的web題目 題目的 描  述: KosenCTF{} 原文鏈接&#xff1a; CTF網絡安全大賽web題目&#xff1a;just_sqli - 紅客網-網絡安全與滲透技術 題目Web源代碼&#xff1a; <?php$user NULL; $is_admin 0;if (isset($_GET["source"])) {highlig…

齊護K210系列教程(二十七)_語音識別

語音識別 1.燒錄固件和模型2.語音識別程序2.1訓練并識別2.2使用本地文件語音識別 3.課程資源聯系我們 1.燒錄固件和模型 注&#xff1a;本應用只適用于有麥克風功能的型號&#xff1a;AIstart_pro、AIstart_掌機、AIstart_Mini, 其它型號不支持&#xff01; 機器碼生成以及模…

linux中遠程服務器上傳輸文件的10個sftp命令示例

目錄 1. 如何連接到 SFTP 2. 幫助 3.檢查當前工作目錄 4. 使用 sftp 列出文件 遠程 本地 5. 使用 sftp 上傳文件 6. 使用 sftp 上傳多個文件 7. 使用 sftp 下載文件 8. 在 sftp 中切換目錄 遠程 本地 9. 使用 sftp 創建目錄 10. 使用 sftp 刪除目錄 11. 退出 sf…

(001)apidoc 的安裝

安裝 1.確定 node 和 npm 的匹配版本 node -vv10.14.1# 切換node 版本 nvm list nvm use 20.12.22.安裝 apidoc。 npm install -g apidoc3.生成文檔&#xff1a; apidoc -i ../ -o document/ -f ".java$"-i &#xff1a;指定掃描路徑。-o&#xff1a;輸出目錄。…

golang并發(同步)多任務高性能執行聚合

taskgroup golang并發執行多任務&#xff0c;并聚合多任務結果。 使用文檔、 項目github 使用: go get github.com/mlee-msl/taskgroup 功能特點 并發安全的執行多個任務將多個任務的結果進行聚合通過扇出/扇入模式&#xff0c;結合線程安全channel實現高效協程間通信多任務復…

【Linux:環境變量】

環境變量一般是指在操作系統中用來指定操作系統環境的一些參數 常見的環境變量&#xff1a; PATH 指定可執行程序的搜索路徑 系統級的文件&#xff1a;/etc/bashrc 用戶級文件&#xff1a;~/.bashrc ~/.bash_profile HOME 指定用戶的主要工作目錄&#xff08;當前用…

kettle從入門到精通 第六十一課 ETL之kettle 任務調度器,輕松使用xxl-job調用kettle中的job和trans

想真正學習或者提升自己的ETL領域知識的朋友歡迎進群&#xff0c;一起學習&#xff0c;共同進步。若二維碼失效&#xff0c;公眾號后臺加我微信入群&#xff0c;備注kettle。 1、大家都知道kettle設計的job流程文件有個缺點&#xff1a;只能設置簡單的定時任務&#xff0c;無法…

DPDK:用rte_wmb()來保序,對ARM和IA而言,RTE_WMB()的實現有何不同

rte_wmb()函數在DPDK中用于實現寫入屏障&#xff08;Write Memory Barrier&#xff09;&#xff0c;它的作用是確保在CPU執行寫操作之前&#xff0c;所有先前的寫操作已經被完全刷新到內存中。這個函數在IA和ARM處理器上的實現有一些不同。 對于Intel Architecture (IA)處理器而…

PHP黑魔法之既是0又是1/switch/$a==0可用.繞過(非數字都可繞過)/PHP://偽協議繞過

1、既是0又是1的情況 $a==1 & $test[$a]=t 時 知識點1)php在處理數字時,如果數字的位數超過 16 位是可以弱等于1的,也就是 var_dump( 9999999999999999999 == 1 );//true 因為當數字位數超過 16 位時,是將該數字轉換成了數值為 1 的字符串進行處理 知識點2)在科學…

LabVIEW和usrp連接實現ofdm通信系統 如何實現

1. 硬件準備 USRP設備&#xff1a;選擇合適的USRP硬件&#xff08;如USRP B210或N210&#xff09;&#xff0c;并確保其與計算機連接&#xff08;通常通過USB或以太網&#xff09;。天線&#xff1a;根據頻段需求選擇合適的天線。 2. 軟件安裝 LabVIEW&#xff1a;安裝LabVI…

【Golang】 Golang 的 GORM 庫中的 Rows 函數

文章目錄 前言一、Rows 函數解釋二、代碼實現三、總結 前言 在使用 Go 語言進行數據庫操作時&#xff0c;GORM&#xff08;Go Object-Relational Mapping&#xff09;庫是一個常用的工具。它提供了一種簡潔和強大的方式來處理數據庫操作。本文將介紹 GORM 庫中的 Rows 函數&am…

數據庫-索引(高級篇)

文章目錄 索引概念&#xff1f;索引演示&#xff1f;索引的優劣&#xff1f;為什么使用索引就快&#xff1f;本篇小結 更多相關內容可查看 索引概念&#xff1f; 索引&#xff08;index&#xff09;是幫助MySQL高效獲取數據的數據結構(有序)。在數據之外&#xff0c;數據庫系統…

生成完美口型同步的 AI 數字人視頻

目錄 摘要 關鍵詞 1 前言 1.1 研究背景 1.2 研究意義 2 技術框架 2.1 深度學習框架 2.2 語音識別 2.3 面部動作捕捉和口型同步 2.4 綜合項目 3 實現過程 3.1 環境搭建 3.2 代碼開發 3.3 整合代碼 3.4 部署 3.5 更多細節 4 測試過程 4.1 數據準備 4.2 面部檢測…

語法分析-文法

如果對于一部文法中&#xff0c;存在至少一個句子有兩個或者兩個以上的語法樹則該文法是二義性的。 我們可以以上面的例子進行解釋&#xff0c;對于第棵個語法樹&#xff0c;我們可以看到是先進行了加法運算再進行的乘法運算&#xff0c;因為需要先把EE作為整體運算完后再成為E…

上海亞商投顧:滬指低開低走 兩市成交額跌破8000億

上海亞商投顧前言&#xff1a;無懼大盤漲跌&#xff0c;解密龍虎榜資金&#xff0c;跟蹤一線游資和機構資金動向&#xff0c;識別短期熱點和強勢個股。 一.市場情緒 市場全天震蕩走低&#xff0c;三大股指尾盤均跌近1%。地產股逆勢走強&#xff0c;光大嘉寶、天地源、云南城投…

幻獸帕魯Palworld服務器手動+docker部署方法+備份遷移

目錄 帕魯部署官方文檔帕魯手動安裝法手動安裝steamcmd通過steamcmd安裝帕魯后端 docker容器一鍵部署幻獸帕魯綠聯云NAS機器部署幻獸帕魯客戶端連接附錄1&#xff1a;PalServer.sh的啟動項附錄2&#xff1a;配置文件游戲存檔保存和遷移 關于阿里云計算巢 帕魯部署官方文檔 htt…

學習MySQL(五):窗口函數

窗口函數介紹 窗口函數的引入是為了解決想要既顯示聚集前的數據&#xff0c;又要顯示聚集后的數據&#xff1b;窗口數對一組值進行操作&#xff0c;不需要使用GROUP BY子句對數據進行分組&#xff0c;能夠在同一行中同時返回基礎行的列和聚合列。 強調&#xff1a;使用MySQL …

?學者觀察 | 從區塊鏈應用創新看長安鏈發展——CCF區塊鏈專委會榮譽主任斯雪明

導語 2024年1月27日&#xff0c;斯雪明教授在長安鏈發布三周年慶暨生態年會上發表演講&#xff0c;認為在區塊鏈發展過程中&#xff0c;不僅需要技術創新&#xff0c;同時需要有價值、有特色、有示范意義的應用創新。斯雪明教授介紹了國內區塊鏈技術與應用發展的現狀、趨勢與挑…