文章目錄
- 系統問題分析
- 什么是異步化?
- 業務流程分析
- 標準異步化的業務流程
- 系統業務流程
- 線程池
- 為什么需要線程池?
- 線程池兩種實現方式
- 線程池的參數
- 線程池的開發
- 項目異步化改造
系統問題分析
問題場景:調用的服務能力有限,或者接口的處理(或返回)時長較長時,就應該考慮異步化了
什么是異步化?
不用等一件事做完,就可以做另外一件事,等第一件事完成時,可以收到一個通知
業務流程分析
標準異步化的業務流程
- 當用戶要進行耗時很長的操作時,點擊提交后,不需要在界面空等,而是應該把這個任務保存到數據庫中記錄下來
- 用戶要執行新任務時:
a. 任務提交成功:
ⅰ. 若程序存在空閑線程,可以立即執行此任務
ⅱ. 若所有線程均繁忙,任務將入隊列等待處理
b. 任務提交失敗:比如所有線程都在忙碌且任務隊列滿了
ⅰ.選擇拒絕此任務,不再執行
ⅱ.通過查閱數據庫記錄,發現提交失敗的任務,并在程序空閑時將這些任務取出執行 - 程序(線程)從任務隊列中取出任務依次執行,每完成一項任務,就更新任務狀態。
- 用戶可以查詢任務的執行狀態,或者在任務執行成功或失敗時接收通知(例如:發郵件、系統消息提示或短信),從而優化體驗
- 對于復雜且包含多個環節的任務,在每個小任務完成時,要在程序(數據庫中))記錄任務的執行狀態(進度)。
系統業務流程
- 用戶點擊智能分析頁提交按鈕時,先把圖表立刻保存到數據庫中(作為一個任務)
- 用戶可以在圖表管理查看所有圖表(已生成的,生成中的,生成失敗的)的信息和狀態
- 用戶可以修改生成失敗的圖表信息,點擊重新生成,以嘗試再次創建圖表
問題分析?
- 任務隊列的最大容量應該設置為多少
- 程序怎么從任務隊列中取出任務去執行?這個任務隊列的流程怎么實現?怎么保證程序最多同時執行多少個任務?
線程池實現
線程池
為什么需要線程池?
- 線程的管理比較復雜
- 任務存取比較復雜
- 線程池可以幫你輕松管理線程,協調任務的執行過程
線程池兩種實現方式
- Spring中,可以用ThreadPoolTaskExrcutor配合@Async注解來實現(不推薦)
- 在Java中,可以使用JUC并發編程包中的ThreadPoolExecutor來實現非常靈活地自定義線程池
線程池的參數
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
現狀:AI生成能力的并發是只允許4個任務同時去執行,AI能力允許20個任務排隊
corePoolSize(核心線程數):正常情況下,我們的系統應該能同時工作的線程數
maximumPoolSize(最大線程數):極限情況下,我們的線程池所擁有的線程
keepAliveTime(空閑線程存活時間):非核心線程在沒有任務的情況下,過多久要刪除,從而釋放無用的線程資源
unit(空閑線程存活時間的單位):分鐘,秒
workQueue(工作隊列):用于存放給線程執行的任務,存在一個隊列的長度(一定要設置)
threadFactory(線程工廠):控制每個線程的生成,線程的屬性
RejectedExecutionHandler(拒絕策略):任務隊列滿的時候,我們采取什么措施
線程池的開發
自定義線程池配置
package com.yupi.springbootinit.config;import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Configuration
public class ThreadPoolExecutorConfig {@Beanpublic ThreadPoolExecutor threadPoolExecutor(){// 創建一個線程工廠ThreadFactory threadFactory = new ThreadFactory(){// 初始化線程數為 1private int count = 1;// 創建一個新的線程@Override// 每當線程池需要創建新線程時,就會調用newThread方法// @NotNull Runnable r 表示方法參數 r 應該永遠不為null,public Thread newThread(@NotNull Runnable r) {Thread thread = new Thread(r);thread.setName("線程" + count ++);return thread;}};// 創建一個新的線程池,線程池核心大小為2,最大線程數為4,// 非核心線程空閑時間為100秒,任務隊列為阻塞隊列,長度為4,使用自定義的線程工廠創建線ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,4,100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(4),threadFactory);return threadPoolExecutor;}
}
測試controller層(注意線上環境不要暴露出去)
package com.yupi.springbootinit.controller;import cn.hutool.json.JSONUtil;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Profile;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;/*** 隊列測試controller*/
@RestController
@RequestMapping("/queue")
@Slf4j
@Profile({"dev","local"}) // 只在開發環境和本地環境生效
public class QueueController {@Resourceprivate ThreadPoolExecutor threadPoolExecutor;@GetMapping("/add")// 接收一個參數name,然后將任務添加到線程池中public void add(String name){// 使用CompletableFuture運行一個異步任務CompletableFuture.runAsync(()->{log.info("任務執行中:" + name + "執行人:" + Thread.currentThread().getName());try {// 讓線程休眠10分鐘,模擬長時間運行的任務Thread.sleep(600000);} catch (InterruptedException e) {throw new RuntimeException(e);} 異步任務在threadPoolExecutor中執行},threadPoolExecutor);}@GetMapping("/get")public String get(){Map<String, Object> map = new HashMap<>();int size = threadPoolExecutor.getQueue().size();map.put("隊列長度",size);long taskCount = threadPoolExecutor.getTaskCount();map.put("任務總數",taskCount);long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();map.put("已完成的任務總數",completedTaskCount);int activeCount = threadPoolExecutor.getActiveCount();map.put("正在工作的線程數",activeCount);return JSONUtil.toJsonStr(map);}
}
項目異步化改造
/*** 智能分析(異步)** @param multipartFile* @param genChartByAiRequest* @param request* @return*/
@PostMapping("/gen/async")
public BaseResponse<BiResponse> genChartByAiAsync(@RequestPart("file") MultipartFile multipartFile,GenChartByAiRequest genChartByAiRequest, HttpServletRequest request) {String name = genChartByAiRequest.getName();String goal = genChartByAiRequest.getGoal();String chartType = genChartByAiRequest.getChartType();// 校驗ThrowUtils.throwIf(StringUtils.isBlank(goal), ErrorCode.PARAMS_ERROR, "目標為空");ThrowUtils.throwIf(StringUtils.isNotBlank(name) && name.length() > 100, ErrorCode.PARAMS_ERROR, "名稱過長");// 校驗文件long size = multipartFile.getSize();String originalFilename = multipartFile.getOriginalFilename();// 校驗文件大小final long ONE_MB = 1024 * 1024L;ThrowUtils.throwIf(size > ONE_MB, ErrorCode.PARAMS_ERROR, "文件超過 1M");// 校驗文件大小綴 aaa.pngString suffix = FileUtil.getSuffix(originalFilename);final List<String> validFileSuffixList = Arrays.asList("xlsx", "xls");ThrowUtils.throwIf(!validFileSuffixList.contains(suffix), ErrorCode.PARAMS_ERROR, "文件后綴非法");User loginUser = userService.getLoginUser(request);// 限流判斷,每個用戶一個限流器redisLimiterManager.doRateLimit("genChartByAi_" + loginUser.getId());// 指定一個模型id(把id寫死,也可以定義成一個常量)long biModelId = 1659171950288818178L;// 分析需求:// 分析網站用戶的增長情況// 原始數據:// 日期,用戶數// 1號,10// 2號,20// 3號,30// 構造用戶輸入StringBuilder userInput = new StringBuilder();userInput.append("分析需求:").append("\n");// 拼接分析目標String userGoal = goal;if (StringUtils.isNotBlank(chartType)) {userGoal += ",請使用" + chartType;}userInput.append(userGoal).append("\n");userInput.append("原始數據:").append("\n");// 壓縮后的數據String csvData = ExcelUtils.excelToCsv(multipartFile);userInput.append(csvData).append("\n");// 先把圖表保存到數據庫中Chart chart = new Chart();chart.setName(name);chart.setGoal(goal);chart.setChartData(csvData);chart.setChartType(chartType);// 插入數據庫時,還沒生成結束,把生成結果都去掉
// chart.setGenChart(genChart);
// chart.setGenResult(genResult);// 設置任務狀態為排隊中chart.setStatus("wait");chart.setUserId(loginUser.getId());boolean saveResult = chartService.save(chart);ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "圖表保存失敗");// 在最終的返回結果前提交一個任務// todo 建議處理任務隊列滿了后,拋異常的情況(因為提交任務報錯了,前端會返回異常)CompletableFuture.runAsync(() -> {// 先修改圖表任務狀態為 “執行中”。等執行成功后,修改為 “已完成”、保存執行結果;執行失敗后,狀態修改為 “失敗”,記錄任務失敗信息。(為了防止同一個任務被多次執行)Chart updateChart = new Chart();updateChart.setId(chart.getId());// 把任務狀態改為執行中updateChart.setStatus("running");boolean b = chartService.updateById(updateChart);// 如果提交失敗(一般情況下,更新失敗可能意味著你的數據庫出問題了)if (!b) {handleChartUpdateError(chart.getId(), "更新圖表執行中狀態失敗");return;}// 調用 AIString result = aiManager.doChat(biModelId, userInput.toString());String[] splits = result.split("【【【【【");if (splits.length < 3) {handleChartUpdateError(chart.getId(), "AI 生成錯誤");return;}String genChart = splits[1].trim();String genResult = splits[2].trim();// 調用AI得到結果之后,再更新一次Chart updateChartResult = new Chart();updateChartResult.setId(chart.getId());updateChartResult.setGenChart(genChart);updateChartResult.setGenResult(genResult);updateChartResult.setStatus("succeed");boolean updateResult = chartService.updateById(updateChartResult);if (!updateResult) {handleChartUpdateError(chart.getId(), "更新圖表成功狀態失敗");}},threadPoolExecutor);BiResponse biResponse = new BiResponse();
// biResponse.setGenChart(genChart);
// biResponse.setGenResult(genResult);biResponse.setChartId(chart.getId());return ResultUtils.success(biResponse);
}
// 上面的接口很多用到異常,直接定義一個工具類
private void handleChartUpdateError(long chartId, String execMessage) {Chart updateChartResult = new Chart();updateChartResult.setId(chartId);updateChartResult.setStatus("failed");updateChartResult.setExecMessage(execMessage);boolean updateResult = chartService.updateById(updateChartResult);if (!updateResult) {log.error("更新圖表失敗狀態失敗" + chartId + "," + execMessage);
}