任務進度狀態同步 萬能版 參考 工廠+策略+觀察者設計模式 +鎖設計 springboot+redission

文章目錄

    • 概要
    • 效果
    • 解釋
    • 狀態
    • 流轉說明
    • 設計
      • AI任務實體類
      • AI任務狀態枚舉
      • AI模型枚舉
      • 基礎實體類
      • 簡單字典接口
      • 工廠+策略模式 接口設計
        • AiJobProcessor
        • AiJobProcessorFactory
      • 觀察者模式
        • AI任務相關的Event
        • MyEventListener
        • MyEventPubLisher
      • RedissonConfig
      • 定時任務
    • 實現
      • ReplicateJobProcessor
      • ReplicateApi
      • OkHttpClientUtil 通用萬能版
      • 新建任務參考
    • 定時任務實現
      • IAiJobService
      • AiJobServiceImpl
    • 整體業務流程
    • 總結

概要

我發現在無論是什么項目中,都幾乎很難避免三方對接API的任務,或者本地長時間的服務,那么有必要設計一套穩定并發高且擴展性高的一套設計方案來完成這些任務的狀態監聽和進度更新,那么我在多年的探索中,設計一套適用于三方API的任務狀態更新的解決方案的設計。

效果

webscoket狀態同步效果展示

解釋

效果用的是webscoket通信來實現的,這里就不側重講了,本文章注重后端的設計,包您滿意

狀態

- DRAFT(0, "草稿", 0)
- SUBMITTED(1, "已提交", 16.67)
- QUEUED(2, "排隊中", 33.33)
- PROCESSING(3, "生成中", 50.00)
- GENERATED(4, "已生成", 66.67)
- MIGRATING(5, "遷移中", 83.33)
- SUCCESS(6, "成功", 100.00)
- FAILED(7, "失敗", 0)
- TIMEOUT(8, "超時", 0)
- CANCELED(9, "取消", 0)
這些狀態我發現是必不可少的,幾乎可以同時概括三方任務的狀態?為什么要有遷移,這個也是必須,三方給的文件(圖片,視頻)什么的畢竟都是三方不可靠的,要遷移到自己的oss或者存儲系統中去。

流轉說明

- 正常流程(進度遞增): 草稿(0) → 已提交(1) → 排隊中(2) → 生成中(3) → 已生成(4) → 遷移中(5) → 成功(6)
- 異常終止流程:
- 取消(CANCELED):可從任意非終止狀態主動觸發
- 超時(TIMEOUT):提交/排隊/處理/遷移階段超時時觸發
- 失敗(FAILED):提交/處理/遷移階段執行異常時觸發
- 終態節點: 成功(6)、失敗(7)、超時(8)、取消(9)為最終狀態,無后續流轉

設計

AI任務實體類

package com.cc672cc.entity.tb;import com.gitee.sunchenbin.mybatis.actable.annotation.*;
import com.gitee.sunchenbin.mybatis.actable.constants.MySqlTypeConstant;
import com.cc672cc.entity.BaseEntity;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import tk.mybatis.mapper.annotation.KeySql;
import javax.persistence.Id;
import javax.persistence.Table;
import java.math.BigDecimal;
import java.util.Date;/*** AI任務表*/
@Data
@Table(name = "tb_ai_job")
@TableComment("AI任務表")
@Schema(description = "AI任務表")
public class AiJob extends BaseEntity {/*** ID*/@Id@KeySql(useGeneratedKeys = true)@IsAutoIncrement@Column(name = "id", type = MySqlTypeConstant.BIGINT, isKey = true, isNull = false, comment = "ID")@Schema(description = "ID")private Long id;/*** 用戶id*/@Column(name = "user_id", type = MySqlTypeConstant.BIGINT, comment = "用戶id")@Schema(description = "用戶id")private Long userId;/*** 照片點評分析編碼*/@Column(name = "photo_review_analysis_code", type = MySqlTypeConstant.VARCHAR, length = 10, comment = "照片點評分析編碼")@Schema(description = "照片點評分析編碼")private String photoReviewAnalysisCode;/*** 類型* 生文本 生圖片 生視頻*/@Column(name = "type", type = MySqlTypeConstant.VARCHAR, length = 10, comment = "類型(生文本/生圖片/生視頻)")@Schema(description = "類型")private String type;/*** 動作*/@Column(name = "action", type = MySqlTypeConstant.VARCHAR, length = 20, comment = "動作")@Schema(description = "動作")private String action;/*** 編碼*/@Column(name = "code", type = MySqlTypeConstant.VARCHAR, length = 10, comment = "編碼")@Schema(description = "編碼")private String code;/*** 渠道*/@Column(name = "channel", type = MySqlTypeConstant.VARCHAR, length = 20, comment = "渠道")@Schema(description = "渠道")private String channel;/*** 平臺*/@Column(name = "platform", type = MySqlTypeConstant.VARCHAR, length = 20, comment = "平臺")@Schema(description = "平臺")private String platform;/*** 模型*/@Column(name = "model", type = MySqlTypeConstant.VARCHAR, length = 50, comment = "模型")@Schema(description = "模型")private String model;/*** 是否異步* 0否1是*/@Column(name = "asyn", type = MySqlTypeConstant.TINYINT, length = 1, comment = "是否異步(0否1是)")@Schema(description = "是否異步")private Boolean asyn;/*** 模型版本*/@Column(name = "model_version", type = MySqlTypeConstant.VARCHAR, length = 50, comment = "模型版本")@Schema(description = "模型版本")private String modelVersion;/*** 模型id*/@Column(name = "model_id", type = MySqlTypeConstant.VARCHAR, length = 50, comment = "模型id")@Schema(description = "模型id")private String modelId;/*** 模型名稱*/@Column(name = "model_name", type = MySqlTypeConstant.VARCHAR, length = 50, comment = "模型名稱")@Schema(description = "模型名稱")private String modelName;/*** 輸出數量*/@Column(name = "output_count", type = MySqlTypeConstant.INT, comment = "輸出數量", defaultValue = "1")@Schema(description = "輸出數量")private Integer outputCount = 1;/*** 創建日期*/@Column(name = "create_date", type = MySqlTypeConstant.VARCHAR, length = 10, comment = "創建日期")@Schema(description = "創建日期")private String createDate;/*** 請求時間*/@Column(name = "req_time", type = MySqlTypeConstant.DATETIME, comment = "請求時間")@Schema(description = "請求時間")private Date reqTime;/*** 響應時間*/@Column(name = "resp_time", type = MySqlTypeConstant.DATETIME, comment = "響應時間")@Schema(description = "響應時間")private Date respTime;/*** 耗時* 單位s*/@Column(name = "cost_time", type = MySqlTypeConstant.BIGINT, comment = "耗時(單位s)")@Schema(description = "耗時 單位s")private Long costTime;/*** 三方id*/@Column(name = "out_id", type = MySqlTypeConstant.VARCHAR, length = 100, comment = "三方id")@Schema(description = "三方id")private String outId;/*** 請參json*/@Column(name = "req_json", type = MySqlTypeConstant.TEXT, comment = "請參json")@Schema(description = "請參json")private String reqJson;/*** 反參json*/@Column(name = "resp_json", type = MySqlTypeConstant.TEXT, comment = "反參json")@Schema(description = "反參json")private String respJson;/*** 任務狀態(0草稿 1已提交 2排隊中 3生成中 4已生成 5遷移中 6成功 7失敗 8超時 9取消)*/@Column(name = "job_status", type = MySqlTypeConstant.INT, comment = "任務狀態(0草稿 1已提交 2排隊中 3生成中 4已生成 5遷移中 6成功 7失敗 8超時 9取消)")@Schema(description = "任務狀態(0草稿 1已提交 2排隊中 3生成中 4已生成 5遷移中 6成功 7失敗 8超時 9取消)")private Integer jobStatus;/*** 單元進度* 對應的是每個任務階段的進度*/@Column(name = "unit_progress", type = MySqlTypeConstant.DECIMAL, length = 5, decimalLength = 2, comment = "單元進度", defaultValue = "0")@Schema(description = "單元進度")private BigDecimal unitProgress;/*** 任務狀態描述*/@Column(name = "job_status_desc", type = MySqlTypeConstant.VARCHAR, length = 255, comment = "任務狀態描述")@Schema(description = "圖片狀態描述")private String jobStatusDesc;/*** 整體進度*/@Column(name = "overall_progress", type = MySqlTypeConstant.DECIMAL, length = 5, decimalLength = 2, comment = "整體進度", defaultValue = "0")@Schema(description = "整體進度")private BigDecimal overallProgress;}

AI任務狀態枚舉

package com.cc672cc.enums.dict;import com.cc672cc.enums.IDict;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;import java.math.BigDecimal;
import java.util.LinkedHashMap;
import java.util.Map;/*** AI任務狀態枚舉** @author CC* @date 2019/5/5 14:34**/
@Getter
@Schema(description = "AI任務狀態 0草稿 1已提交 2排隊中 3生成中 4已生成 5遷移中 6成功 7失敗 8超時 9取消")
public enum AiJobStatusEnum implements IDict {DRAFT(0, "草稿", BigDecimal.ZERO),SUBMITTED(1, "已提交", new BigDecimal("16.67")),QUEUED(2, "排隊中", new BigDecimal("33.33")),PROCESSING(3, "生成中", new BigDecimal("50.00")),GENERATED(4, "已生成", new BigDecimal("66.67")),MIGRATING(5, "遷移中", new BigDecimal("83.33")),SUCCESS(6, "成功", new BigDecimal("100.00")),FAILED(7, "失敗", BigDecimal.ZERO),TIMEOUT(8, "超時", BigDecimal.ZERO),CANCELED(9, "取消", BigDecimal.ZERO);private Integer code;private String description;/*** 進度*/private BigDecimal progress;public static Map<String, String> cdMap;public static Map<Integer, AiJobStatusEnum> map;AiJobStatusEnum(int code, String description, BigDecimal progress) {this.code = code;this.description = description;this.progress = progress;}@Overridepublic Map<String, String> dictMap() {if (cdMap == null) {cdMap = new LinkedHashMap<>();AiJobStatusEnum[] values = values();for (AiJobStatusEnum value : values) {cdMap.put(String.valueOf(value.getCode()), value.getDescription());}}return cdMap;}public static Map<Integer, AiJobStatusEnum> getMap() {if (map == null) {map = new LinkedHashMap<>();AiJobStatusEnum[] values = values();for (AiJobStatusEnum value : values) {map.put(value.getCode(), value);}}return map;}public static BigDecimal getProgress(Integer code) {return getMap().get(code).getProgress();}
}

AI模型枚舉

package com.cc672cc.enums.dict;import com.cc672cc.enums.IDict;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;import java.util.LinkedHashMap;
import java.util.Map;/*** AI模型枚舉** @author CC* @date 2019/5/5 14:34**/
@Getter
@Schema(description = "AI模型枚舉")
public enum AiModelEnum implements IDict {TTAPI_MIDJOURNEY_V7("ttapi_midjourney_v7", "TTAPI_MIDJOURNEY_V7", "TTApi Midjourney V7", "Midjourney", "7.0", "TTapi的Midjourney的第七版本", "TTApi", "TTApi", true, 1),REPLICATE_IMAGEUPSCALE_V1("replicate_imageupscale_v1", "REPLICATE_IMAGEUPSCALE_V1", "Replicate ImageUpscale V1", "ImageUpscale", "1.0", "Replicate的圖片放大的第一版本", "Replicate", "Replicate", true, 2),PHOTOREVIEW_ANALYSIS_V1("photoReview_analysis_v1", "PHOTOREVIEW_ANALYSIS_V1", "PhotoReview Analysis V1", "PhotoReviewAnalysis", "1.0", "照片點評的第一版本", "PhotoReview", "PhotoReview", true, 1);/*** 模型id*/private String modelId;/*** 模型編碼* 自定義的編碼 格式 {平臺}_{模型}_{版本}*/private String modelCode;private String modelName;private String model;private String modelVersion;private String modelDesc;/*** 平臺*/private String platform;/*** 渠道* 指哪個公司的 集團下面的*/private String channel;/*** 是否異步*/private Boolean asyn;/*** 單位積分* 就是每個輸出的積分*/private Integer unitPoint;public static Map<String, AiModelEnum> map;public Map<String, String> inmap;AiModelEnum(String modelId, String modelCode, String modelName, String model, String modelVersion, String modelDesc, String platform, String channel, Boolean asyn, Integer unitPoint) {this.modelId = modelId;this.modelCode = modelCode;this.modelName = modelName;this.model = model;this.modelVersion = modelVersion;this.modelDesc = modelDesc;this.platform = platform;this.channel = channel;this.asyn = asyn;this.unitPoint = unitPoint;}@Overridepublic Map<String, String> dictMap() {if (inmap == null) {inmap = new LinkedHashMap<>();AiModelEnum[] values = values();for (AiModelEnum value : values) {inmap.put(value.getModelId(), value.getModelName());}}return inmap;}public static Map<String, AiModelEnum> getMap() {if (map == null) {map = new LinkedHashMap<>();AiModelEnum[] values = values();for (AiModelEnum value : values) {map.put(value.getModelId(), value);}}return map;}}

基礎實體類

package com.cc672cc.entity;import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableLogic;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.gitee.sunchenbin.mybatis.actable.annotation.Column;
import com.gitee.sunchenbin.mybatis.actable.annotation.ColumnComment;
import com.gitee.sunchenbin.mybatis.actable.constants.MySqlTypeConstant;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;import java.util.Date;@Data
@Schema(description = "基礎實體類")
public class BaseEntity {/*** 狀態(0禁止1正常)*/@Column(type = MySqlTypeConstant.INT, length = 1, defaultValue = "1")@Schema(description = "狀態(0禁止1正常)")@ColumnComment("狀態(0禁止1正常)")@TableField(fill = FieldFill.INSERT)private Integer status;/*** 刪除狀態(0否1是)*/@TableLogic@Schema(description = "刪除狀態(0否1是)")@Column(type = MySqlTypeConstant.INT, length = 1, defaultValue = "0")@ColumnComment("刪除狀態(0否1是)")@TableField(fill = FieldFill.INSERT)private Integer del;/*** 排序權重*/@Schema(description = "排序權重")@Column(type = MySqlTypeConstant.INT, length = 4, defaultValue = "0")@ColumnComment("排序權重")private Integer sort;/*** 創建人*/@Column(type = MySqlTypeConstant.VARCHAR, length = 64)@ColumnComment("創建人")@TableField(fill = FieldFill.INSERT)private String createBy;/*** 創建時間*/@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@Schema(description = "創建時間")@Column(type = MySqlTypeConstant.DATETIME, defaultValue = "CURRENT_TIMESTAMP")@ColumnComment("創建時間")private Date createTime;/*** 更新人*/@Column(type = MySqlTypeConstant.VARCHAR, length = 64)@ColumnComment("更新人")@TableField(fill = FieldFill.UPDATE)private String updateBy;/*** 更新時間*/@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@Schema(description = "更新時間")@Column(type = MySqlTypeConstant.DATETIME, defaultValue = "NULL ON UPDATE CURRENT_TIMESTAMP")@ColumnComment("更新時間")private Date updateTime;/*** 版本*/@Column(type = MySqlTypeConstant.VARCHAR, length = 10, defaultValue = "v1")@TableField(fill = FieldFill.INSERT)@ColumnComment("版本")private String version;
}

簡單字典接口

public interface IDict {Map<String,String> dictMap();
}

工廠+策略模式 接口設計

AiJobProcessor
package com.cc672cc.processor;import com.cc672cc.entity.tb.AiJob;
import com.cc672cc.entity.tb.PhotoReviewAnalysis;
import com.cc672cc.pojo.vo.reqvo.BeautifyPhotoReqVO;public interface AiJobProcessor {/*** 創建請參Json** @param reqVO* @param photoReviewAnalysis* @return*/String buildReqJson(BeautifyPhotoReqVO reqVO, PhotoReviewAnalysis photoReviewAnalysis);/*** 處理任務** @param aiJob AI任務* @return 任務狀態*/Integer process(AiJob aiJob);/*** 查詢AI任務狀態** @param aiJob AI任務* @return 任務狀態*/Integer query(AiJob aiJob);/*** 遷移AI任務** @param aiJobId* @return*/Integer migrate(Long aiJobId, Integer jobStatus);/*** 獲取當前處理器支持的模型類型(與AiJob.model字段對應)** @return 模型類型*/String getSupportedModel();/*** 業務超時時間(秒)** @return*/Long businessTimeoutS();/*** 單元進度展示標志位* 0不展示 1展示* @return*/Integer[] unitProgressShowFlag();
}
AiJobProcessorFactory
package com.cc672cc.processor;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** AI任務處理器工廠(根據model動態獲取處理器)*/
@Component
public class AiJobProcessorFactory {private final Map<String, AiJobProcessor> processorMap = new HashMap<>();// 自動注入所有AiJobProcessor實現類@Autowiredpublic AiJobProcessorFactory(List<AiJobProcessor> processors) {for (AiJobProcessor processor : processors) {processorMap.put(processor.getSupportedModel(), processor);}}/*** 根據模型類型獲取處理器* @param model 模型類型(AiJob.model)* @return 處理器實例* @throws IllegalArgumentException 無對應處理器時拋出異常*/public AiJobProcessor getProcessor(String model) {AiJobProcessor processor = processorMap.get(model);if (processor == null) {throw new IllegalArgumentException("未找到模型[" + model + "]對應的處理器");}return processor;}
}

觀察者模式

AI任務相關的Event
public class AiJobMigrateEvent extends ApplicationEvent {private AiJob aiJob;public AiJobMigrateEvent(Object source) {super(source);}public AiJobMigrateEvent(Object source, AiJob aiJob) {super(source);this.aiJob = aiJob;}public AiJob getAiJob() {return aiJob;}public void setAiJob(AiJob aiJob) {this.aiJob = aiJob;}
}public class AiJobMsgEvent extends ApplicationEvent {private String msg;public AiJobMsgEvent(Object source) {super(source);}public AiJobMsgEvent(Object source, String msg) {super(source);this.msg = msg;}public String getMsg() {return msg;}public void setMsg(String msg) {this.msg = msg;}
}/*** AI任務需要立馬提交事件*/
public class AiJobNeedSubmitRightNowEvent extends ApplicationEvent {private AiJob aiJob;public AiJobNeedSubmitRightNowEvent(Object source) {super(source);}public AiJobNeedSubmitRightNowEvent(Object source, AiJob aiJob) {super(source);this.aiJob = aiJob;}public AiJob getAiJob() {return aiJob;}public void setAiJob(AiJob aiJob) {this.aiJob = aiJob;}
}public class AiJobStatusRefreshEvent extends ApplicationEvent {private AiJob aiJob;public AiJobStatusRefreshEvent(Object source) {super(source);}public AiJobStatusRefreshEvent(Object source, AiJob aiJob) {super(source);this.aiJob = aiJob;}public AiJob getAiJob() {return aiJob;}public void setAiJob(AiJob aiJob) {this.aiJob = aiJob;}
}
MyEventListener
@Component
@Slf4j
public class MyEventListener {private static final String TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";@Autowiredprivate BusinessAsync businessAsync;/*** 統一監聽** @param applicationEvent*/@EventListener(classes = {LifyMsgEvent.class})public void listener(ApplicationEvent applicationEvent) {String simpleName = applicationEvent.getClass().getSimpleName();log.info("***** listener reception time : {} , simpleName : {} ***** , context : {}", DateUtil.format(DateUtil.date(), TIME_FORMAT), simpleName, JSON.toJSONString(applicationEvent));}/*** @param lifyMsgEvent*/@EventListenerpublic void listener(LifyMsgEvent lifyMsgEvent) {businessAsync.saveChatContext(lifyMsgEvent);businessAsync.saveChatHistory(lifyMsgEvent);}/*** @param photoReviewMsgEvent*/@EventListenerpublic void listener(PhotoReviewMsgEvent photoReviewMsgEvent) {businessAsync.websocketMsg(photoReviewMsgEvent);}/*** @param aiJobMsgEvent*/@EventListenerpublic void listener(AiJobMsgEvent aiJobMsgEvent) {businessAsync.websocketMsg(aiJobMsgEvent);}/*** @param taskMsgEvent*/@EventListenerpublic void listener(TaskMsgEvent taskMsgEvent) {businessAsync.websocketMsg(taskMsgEvent);}/*** @param aiJobNeedSubmitRightNowEvent*/@EventListenerpublic void listener(AiJobNeedSubmitRightNowEvent aiJobNeedSubmitRightNowEvent) {businessAsync.submitAiJob(aiJobNeedSubmitRightNowEvent);}@EventListenerpublic void listener(TaskNeedSubmitRightNowEvent taskNeedSubmitRightNowEvent) {businessAsync.submitTask(taskNeedSubmitRightNowEvent);}/*** @param aiJobStatusRefreshEvent*/@EventListenerpublic void listener(AiJobStatusRefreshEvent aiJobStatusRefreshEvent) {businessAsync.refreshAiJobStatusDetail(aiJobStatusRefreshEvent);}/*** @param taskStatusRefreshEvent*/@EventListenerpublic void listener(TaskStatusRefreshEvent taskStatusRefreshEvent) {businessAsync.refreshTaskStatusDetail(taskStatusRefreshEvent);}/*** @param aiJobMigrateEvent*/@EventListenerpublic void listener(AiJobMigrateEvent aiJobMigrateEvent) {businessAsync.migrateAiJobDetail(aiJobMigrateEvent);}/*** @param taskMigrateEvent*/@EventListenerpublic void listener(TaskMigrateEvent taskMigrateEvent) {businessAsync.migrateTaskDetail(taskMigrateEvent);}}
MyEventPubLisher
/*** 我的事件發布器** @author liaoqian* @since 2024-01-24*/
@Component
public class MyEventPubLisher {@Autowiredprivate IRedisService redisService;@Autowiredprivate ApplicationEventPublisher applicationEventPublisher;public void pushLifyMsgEvent(String msg) {applicationEventPublisher.publishEvent(new LifyMsgEvent(this, msg));}public void pushPhotoReviewMsgEvent(String msg) {applicationEventPublisher.publishEvent(new PhotoReviewMsgEvent(this, msg));}public void pushAiJobMsgEvent(String msg) {applicationEventPublisher.publishEvent(new AiJobMsgEvent(this, msg));}public void pushTaskMsgEvent(String msg) {applicationEventPublisher.publishEvent(new TaskMsgEvent(this, msg));}public void pushAiJobNeedSubmitRightNow(AiJob aiJob) {applicationEventPublisher.publishEvent(new AiJobNeedSubmitRightNowEvent(this, aiJob));}public void pushTaskNeedSubmitRightNow(Task task) {applicationEventPublisher.publishEvent(new TaskNeedSubmitRightNowEvent(this, task));}public void pushAiJobStatusRefreshEvent(AiJob aiJob) {String cacheAiJobTaskProcess = RedisPreKey.CACHE_AI_JOB_PROCESS;String redisPreKey = cacheAiJobTaskProcess + aiJob.getId();if (!redisService.hasKey(redisPreKey)) {applicationEventPublisher.publishEvent(new AiJobStatusRefreshEvent(this, aiJob));}}public void pushTaskStatusRefreshEvent(Task task) {String cacheTaskTaskProcess = RedisPreKey.CACHE_TASK_PROCESS;String redisPreKey = cacheTaskTaskProcess + task.getId();if (!redisService.hasKey(redisPreKey)) {applicationEventPublisher.publishEvent(new TaskStatusRefreshEvent(this, task));}}public void pushAiJobMigrateEvent(AiJob aiJob) {String cacheAiJobTaskProcess = RedisPreKey.CACHE_AI_JOB_PROCESS;String redisPreKey = cacheAiJobTaskProcess + aiJob.getId();if (!redisService.hasKey(redisPreKey)) {applicationEventPublisher.publishEvent(new AiJobMigrateEvent(this, aiJob));}}public void pushTaskMigrateEvent(Task task) {String cacheTaskTaskProcess = RedisPreKey.CACHE_TASK_PROCESS;String redisPreKey = cacheTaskTaskProcess + task.getId();if (!redisService.hasKey(redisPreKey)) {applicationEventPublisher.publishEvent(new TaskMigrateEvent(this, task));}}
}

RedissonConfig

/*** RedissonClient* 常用:分布式鎖功能* @author cc* @date 2020/05/13*/
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(RedisProperties prop) {String address = "redis://%s:%d";Config config = new Config();config.useSingleServer().setPassword(prop.getPassword()).setAddress(String.format(address, prop.getHost(), prop.getPort())).setDatabase(0);return Redisson.create(config);}
}

定時任務

package com.cc672cc.scheduler;import com.cc672cc.service.IAiJobService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** AI任務調度器*/
@Slf4j
@Component
public class AiJobScheduler {private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Autowiredprivate IAiJobService aiJobService;/*** 每5秒* 刷新AI任務狀態*/@Scheduled(initialDelay = 10000, fixedDelay = 5000)public void refreshAiJobStatus() {log.info("refreshAiJobStatus start-->{}", sdf.format(new Date()));int refreshCount = aiJobService.refreshAiJobStatus();log.info("refreshAiJobStatus  end refreshCount-->{}", refreshCount);}/*** 每5秒* 遷移AI任務*/@Scheduled(initialDelay = 10000, fixedDelay = 5000)public void migrateAiJob() {log.info("migrateAiJob start-->{}", sdf.format(new Date()));int migrateCount = aiJobService.migrateAiJob();log.info("migrateAiJob  end  migrateCount-->{}", migrateCount);}
}

實現

就拿其中的一個來舉例,具體實現還得看自己的業務,拿其中的ReplicateJobProcessor舉例

ReplicateJobProcessor

package com.cc672cc.processor.aijob;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.cc672cc.common.utils.CodeUtil;
import com.cc672cc.common.utils.DateUtil;
import com.cc672cc.common.utils.MessageUtils;
import com.cc672cc.entity.tb.AiJob;
import com.cc672cc.entity.tb.PhotoBeautify;
import com.cc672cc.entity.tb.PhotoReviewAnalysis;
import com.cc672cc.enums.dict.AiJobStatusEnum;
import com.cc672cc.enums.dict.DelEnum;
import com.cc672cc.enums.dict.StatusEnum;
import com.cc672cc.pojo.vo.reqvo.BeautifyPhotoReqVO;
import com.cc672cc.pojo.vo.reqvo.ReplicateImageUpscaleBeautifyPhotoReqVO;
import com.cc672cc.pojo.vo.reqvo.client.ReplicateCommonReqVO;
import com.cc672cc.pojo.vo.reqvo.client.ReplicateImageUpscaleReqVO;
import com.cc672cc.pojo.vo.respvo.client.ReplicateCommonRespVO;
import com.cc672cc.processor.AiJobProcessor;
import com.cc672cc.service.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;@Slf4j
@Component
public class ReplicateJobProcessor implements AiJobProcessor {@Autowiredprivate IReplicateService replicateService;@Autowired@Lazyprivate IAiJobService aiJobService;@Autowiredprivate IQiNiuService qiNiuService;@Lazy@Autowiredprivate IPhotoBeautifyService photoBeautifyService;private List<String> enhanceModelList = Arrays.asList("Standard V2", "Low Resolution V2", "CGI", "High Fidelity V2", "Text Refine");private List<Integer> upscaleFactorList = Arrays.asList(2, 4, 6);@Overridepublic Integer[] unitProgressShowFlag() {return new Integer[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};}@Overridepublic Long businessTimeoutS() {return 60 * 60 * 24L;}@Overridepublic String buildReqJson(BeautifyPhotoReqVO reqVO, PhotoReviewAnalysis photoReviewAnalysis) {ReplicateImageUpscaleBeautifyPhotoReqVO detailReqVo = JSON.parseObject(JSON.toJSONString(reqVO.getParams()), ReplicateImageUpscaleBeautifyPhotoReqVO.class);if (StringUtils.isBlank(detailReqVo.getEnhanceModel())) {throw new RuntimeException("增強模型不能為空");}if (!enhanceModelList.contains(detailReqVo.getEnhanceModel())) {throw new RuntimeException("增強模型錯誤");}if (detailReqVo.getUpscaleFactor() == null) {throw new RuntimeException("放大倍數不能為空");}if (!upscaleFactorList.contains(detailReqVo.getUpscaleFactor())) {throw new RuntimeException("放大倍數錯誤");}ReplicateCommonReqVO<ReplicateImageUpscaleReqVO> req = new ReplicateCommonReqVO<>();ReplicateImageUpscaleReqVO input = new ReplicateImageUpscaleReqVO();input.setImage(photoReviewAnalysis.getOriImgUrl());input.setEnhanceModel(detailReqVo.getEnhanceModel());input.setUpscaleFactor(detailReqVo.getUpscaleFactor() + "x");req.setInput(input);return JSON.toJSONString(req);}@Overridepublic Integer process(AiJob aiJob) {String reqJson = aiJob.getReqJson();if (StringUtils.isBlank(reqJson)) {aiJob.setJobStatusDesc("reqJson is blank");return AiJobStatusEnum.FAILED.getCode();}Type type = new TypeReference<ReplicateCommonReqVO<ReplicateImageUpscaleReqVO>>() {}.getType();ReplicateCommonReqVO<ReplicateImageUpscaleReqVO> reqVO = JSON.parseObject(reqJson, type);ReplicateCommonRespVO<String> respVO = null;String message = "";try {respVO = replicateService.imageUpscale(reqVO);message = respVO != null ? respVO.getError() : "";if (respVO != null && StringUtils.isNotBlank(respVO.getId())) {String jobId = respVO.getId();aiJob.setOutId(jobId);return AiJobStatusEnum.SUBMITTED.getCode();}} catch (Exception e) {message = MessageUtils.normalMaxLength(e.getMessage());}aiJob.setJobStatusDesc(message);return AiJobStatusEnum.FAILED.getCode();}@Overridepublic Integer query(AiJob aiJob) {String jobId = aiJob.getOutId();ReplicateCommonRespVO<String> ajax = replicateService.query(jobId);Integer jobStatus = aiJob.getJobStatus();String jobStatusDesc = aiJob.getJobStatusDesc();String respJson = aiJob.getRespJson();Date respTime = aiJob.getRespTime() == null ? new Date() : aiJob.getRespTime();Long costTime = aiJob.getCostTime();Date reqTime = aiJob.getReqTime();if (ajax != null && StringUtils.isNotBlank(ajax.getStatus())) {String message = ajax.getError();String status = ajax.getStatus();String data = ajax.getOutput();respTime = new Date();respJson = JSON.toJSONString(ajax);if ("processing".equals(status) && AiJobStatusEnum.SUBMITTED.getCode().equals(jobStatus)) {jobStatus = AiJobStatusEnum.QUEUED.getCode();} else if ("processing".equals(status) && AiJobStatusEnum.QUEUED.getCode().equals(jobStatus)) {jobStatus = AiJobStatusEnum.PROCESSING.getCode();} else if ("succeeded".equals(status) && StringUtils.isNotBlank(data)) {jobStatus = AiJobStatusEnum.GENERATED.getCode();}}costTime = (respTime.getTime() - reqTime.getTime()) / 1000;if (costTime >= businessTimeoutS()) {jobStatus = AiJobStatusEnum.TIMEOUT.getCode();jobStatusDesc = "任務業務超時";}aiJob.setCostTime(costTime);aiJob.setRespTime(respTime);aiJob.setRespJson(respJson);aiJob.setJobStatus(jobStatus);aiJob.setJobStatusDesc(jobStatusDesc);aiJob.setUpdateTime(new Date());return jobStatus;}@Overridepublic Integer migrate(Long aiJobId, Integer jobStatus) {Date now = new Date();try {// 如果是已生成,則直接遷移if (AiJobStatusEnum.GENERATED.getCode().equals(jobStatus)) {jobStatus = AiJobStatusEnum.MIGRATING.getCode();return jobStatus;}// 1. 查詢AiJob信息AiJob aiJob = aiJobService.selectOneById(aiJobId);if (aiJob == null) {log.error("遷移任務失敗:未找到AiJob記錄,aiJobId={}", aiJobId);return jobStatus;}// 2. 解析響應JSONString respJson = aiJob.getRespJson();if (StringUtils.isBlank(respJson)) {log.error("遷移任務失敗:AiJob[aiJobId={}]的respJson為空", aiJobId);return jobStatus;}Type type = new TypeReference<ReplicateCommonRespVO<String>>() {}.getType();ReplicateCommonRespVO<String> respVO;try {respVO = JSON.parseObject(respJson, type);} catch (Exception e) {log.error("遷移任務失敗:AiJob[aiJobId={}]的respJson解析失敗,json={}", aiJobId, respJson, e);return jobStatus;}String data = respVO.getOutput();if (data == null) {log.error("遷移任務失敗:AiJob[aiJobId={}]的響應數據data為空", aiJobId);return jobStatus;}String mediaData = data;List<String> images = new ArrayList<>();images.add(mediaData);// 3. 下載圖片并創建PhotoBeautify列表List<PhotoBeautify> addList = new ArrayList<>();for (int i = 0; i < images.size(); i++) {String originalImgUrl = images.get(i);long start = System.currentTimeMillis();try {// 下載網絡圖片到七牛云(假設downloadWebFile已處理異常)String imageUrl = qiNiuService.downloadWebFile(originalImgUrl);PhotoBeautify photoBeautify = new PhotoBeautify();photoBeautify.setAiJobId(aiJobId);photoBeautify.setStatus(StatusEnum.EFFECTIVE.getCode());photoBeautify.setDel(DelEnum.NOT_DELETED.getCode());photoBeautify.setCode(CodeUtil.getRandomCode(10));photoBeautify.setImgUrl(imageUrl);photoBeautify.setOriginalImgUrl(originalImgUrl);photoBeautify.setImgStatus(4);photoBeautify.setCreateDate(DateUtil.format(now, "yyyy-MM-dd"));photoBeautify.setCreateTime(now);photoBeautify.setPhotoReviewAnalysisCode(aiJob.getPhotoReviewAnalysisCode());photoBeautify.setModelName(aiJob.getModelName());long end = System.currentTimeMillis();// 計算耗時 單位秒photoBeautify.setCostTime((end - start) / 1000);photoBeautify.setSort(i);addList.add(photoBeautify);} catch (IOException e) {log.error("遷移任務失敗:下載圖片[url={}]失敗,aiJobId={}", originalImgUrl, aiJobId, e);// 可選擇繼續處理后續圖片或直接返回失敗(根據業務需求)return AiJobStatusEnum.FAILED.getCode();}}// 4. 批量插入數據庫boolean insertResult = photoBeautifyService.batchAdd(addList);if (!insertResult) {log.error("遷移任務失敗:批量插入PhotoBeautify失敗,aiJobId={}", aiJobId);return AiJobStatusEnum.FAILED.getCode();}log.info("遷移任務成功,aiJobId={},共遷移圖片{}張", aiJobId, images.size());return AiJobStatusEnum.SUCCESS.getCode();} catch (Exception e) {log.error("遷移任務發生未知異常,aiJobId={}", aiJobId, e);return jobStatus;}}@Overridepublic String getSupportedModel() {return "ImageUpscale";}
}

ReplicateApi

package com.cc672cc.client;import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.cc672cc.common.utils.OkHttpClientUtil;
import com.cc672cc.pojo.vo.reqvo.client.ReplicateCommonReqVO;
import com.cc672cc.pojo.vo.reqvo.client.ReplicateImageUpscaleReqVO;import com.cc672cc.pojo.vo.respvo.client.ReplicateCommonRespVO;
import com.cc672cc.properties.ReplicateProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.lang.reflect.Type;import java.util.Map;@Component
public class ReplicateApi {@Autowiredprivate ReplicateProperties replicateProperties;public ReplicateCommonRespVO<String> imageUpscale(ReplicateCommonReqVO<ReplicateImageUpscaleReqVO> req) {String uri = "/v1/models/topazlabs/image-upscale/predictions";Map<String, String> headers = Map.of("Authorization", String.format("Bearer %s", replicateProperties.getAppKey()));Map<String, Object> body = JSON.parseObject(JSON.toJSONString(req), Map.class);// 使用 TypeReference 傳遞完整泛型類型Type type = new TypeReference<ReplicateCommonRespVO<String>>() {}.getType();return OkHttpClientUtil.ajax(replicateProperties.getBaseUrl(), uri, "POST", headers,OkHttpClientUtil.EMPTY_MAP, body, type);}public ReplicateCommonRespVO<String> query(String jobId) {String uri = "/v1/predictions/" + jobId;Map<String, String> headers = Map.of("Authorization", String.format("Bearer %s", replicateProperties.getAppKey()));Type type = new TypeReference<ReplicateCommonRespVO<String>>() {}.getType();return OkHttpClientUtil.ajax(replicateProperties.getBaseUrl(), uri, "GET", headers, OkHttpClientUtil.EMPTY_MAP, OkHttpClientUtil.EMPTY_MAP, type);}}

OkHttpClientUtil 通用萬能版

package com.cc672cc.common.utils;import cn.hutool.http.ContentType;
import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSON;
import com.cc672cc.common.model.ReturnInfo;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.HashMap;import java.util.Map;
import java.util.concurrent.TimeUnit;/*** 簡單封裝的okhttpcient工具用于返回同一反參** @author liaoqian* @date 2024-01-17*/
@Slf4j
@SuppressWarnings("all")
public class OkHttpClientUtil {private static final String okHttpClientName = "okHttpClientUtil";public static final String METHOD_POST = "POST";public static final String METHOD_GET = "GET";public static final Map<String, Object> EMPTY_MAP = new HashMap<>();/*** 連接超時時間**/private static final int CONNECT_TIMEOUT_SECONDS = 60;/*** 讀取返回信息超時時間**/private static final int READ_TIMEOUT_SECONDS = 60;/*** 讀取返回信息超時時間*/private static final int CALL_TIMEOUT_SECONDS = 120;/*** 讀取返回信息超時時間**/private static final int WRITE_TIMEOUT_SECONDS = 300;private static OkHttpClient okHttpClient;static {if (okHttpClient == null) {synchronized (OkHttpClientUtil.class) {if (okHttpClient == null) {okHttpClient = new OkHttpClient.Builder().connectTimeout(CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS).readTimeout(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS).callTimeout(CALL_TIMEOUT_SECONDS, TimeUnit.SECONDS).writeTimeout(WRITE_TIMEOUT_SECONDS, TimeUnit.SECONDS).retryOnConnectionFailure(true).build();}}}}/*** @param host      請求host* @param uri       請求uri* @param method    請求方式* @param headers   請求頭* @param paramsObj 請求參數* @param bodyObj   請求體* @return 返回JSONObject*/public static JSONObject ajax(String host, String uri, String method, Map<String, String> headers, Object paramsObj, Object bodyObj) {Map params = JSON.parseObject(JSON.toJSONString(paramsObj), Map.class);Map body = JSON.parseObject(JSON.toJSONString(bodyObj), Map.class);return ajax(host, uri, method, headers, params, body);}/*** @param host    請求host* @param uri     請求uri* @param method  請求方式* @param headers 請求頭* @param params  請求參數* @param body    請求體* @return 返回JSONObject*/public static JSONObject ajax(String host, String uri, String method, Map<String, String> headers, Map<String, Object> params, Map<String, Object> body) {Response response = ajaxProcess(host, uri, method, headers, params, body);JSONObject jsonObject = null;if (response != null) {try (ResponseBody responseBody = response.body()) {String respContentType = response.header("Content-Type");if (StringUtils.isNotBlank(respContentType)) {if ("text/event-stream".equals(respContentType)) {StringBuilder sb = new StringBuilder();// 將這個response的內容轉為字符串BufferedReader reader = new BufferedReader(new InputStreamReader(responseBody.byteStream()));String line;while ((line = reader.readLine()) != null) {if (line.startsWith("event: text")) {sb.append(extractTextData(reader));}}ReturnInfo returnInfo = new ReturnInfo(sb.toString());jsonObject = new JSONObject(returnInfo);return jsonObject;}}String result = responseBody.string();log.info("***** {} ajax result : {} *****", okHttpClientName, result);if (JSON.isValid(result)) {jsonObject = new JSONObject(result);} else {ReturnInfo returnInfo = new ReturnInfo(result);jsonObject = new JSONObject(returnInfo);}} catch (Exception e) {log.error("***** {} ajaxProcess  e : {}  *****", okHttpClientName, e);}}return jsonObject;}private static String extractTextData(BufferedReader reader) throws IOException {StringBuilder sb = new StringBuilder();String line;while ((line = reader.readLine()) != null && !line.isEmpty()) {if (line.startsWith("data: ")) {String substring = line.substring("data: ".length());substring = substring.replace("\"", "");sb.append(substring);}}return sb.toString();}private static Response ajaxProcess(String host, String uri, String method, Map<String, String> headers, Map<String, Object> params, Map<String, Object> body) {OkHttpClient client = okHttpClient;String url = host + uri;Request.Builder builder = new Request.Builder();// 請求頭處理if (headers != null && !headers.isEmpty()) {builder.headers(Headers.of(headers));}// 請求方式處理if (true) {if (params != null && !params.isEmpty()) {StringBuilder sb = new StringBuilder();sb.append("?");params.entrySet().stream().forEach(e -> {sb.append(e.getKey()).append("=").append(String.valueOf(e.getValue())).append("&");});sb.delete(sb.length() - 1, sb.length());url += sb.toString();}builder.get();}if (METHOD_POST.equals(method.toUpperCase())) {if (body != null) {builder.post(RequestBody.create(MediaType.parse(ContentType.JSON.toString()), JSON.toJSONString(body)));}}Request request = builder.url(url).build();Response response = null;try {response = client.newCall(request).execute();} catch (IOException e) {log.error("***** {} ajaxProcess  e : {}  *****", okHttpClientName, e);}return response;}public static <T> T ajax(String host, String uri, String method, Map<String, String> headers, Map<String, Object> params, Map<String, Object> body, Type type) {JSONObject ajax = ajax(host, uri, method, headers, params, body);if (ajax != null) {return JSON.parseObject(ajax.toString(), type);} else {return null;}}}

新建任務參考

public AiJobStatusModel beautifyPhoto(BeautifyPhotoReqVO reqVO) {UserInfo userInfo = userService.getCurLoginUser(true);String photoReviewAnalysisCode = reqVO.getPhotoReviewAnalysisCode();PhotoReviewAnalysis photoReviewAnalysis = photoReviewService.selectOneByCode(photoReviewAnalysisCode);if (photoReviewAnalysis == null) {throw new BusinessException(ExceptionEnum.DATA_NOT_FOUND);}if (!userInfo.getId().equals(photoReviewAnalysis.getUserId())) {throw new BusinessException(ExceptionEnum.PERMISSION_DENIED);}Boolean beautifyAbility = photoReviewAnalysis.getBeautifyAbility();if (Boolean.FALSE.equals(beautifyAbility)) {throw new RuntimeException("圖片暫不支持美化,請查看【開啟美化能力依據】");}Map<String, AiModelEnum> map = AiModelEnum.getMap();AiModelEnum aiModelEnum = map.get(reqVO.getModelId());if (aiModelEnum == null) {throw new RuntimeException("模型不存在");}Long userId = userInfo.getId();Date now = new Date();String dateFormat = DateUtil.format(now, "yyyy-MM-dd");UserPointPerDayCheckModel checkModel = userBenefitsService.checkPointLimitPerDay(userId, dateFormat);if (checkModel.getLimit()) {throw new RuntimeException("已到達今日次數上限,請明日再來試試吧");}String code = CodeUtil.getRandomCode(10);AiJob aiJob = new AiJob();aiJob.setStatus(StatusEnum.EFFECTIVE.getCode());aiJob.setDel(DelEnum.NOT_DELETED.getCode());aiJob.setJobStatus(AiJobStatusEnum.DRAFT.getCode());aiJob.setUnitProgress(BigDecimal.ZERO);aiJob.setOverallProgress(BigDecimal.ZERO);aiJob.setUserId(userInfo.getId());aiJob.setPhotoReviewAnalysisCode(photoReviewAnalysis.getCode());aiJob.setAction(reqVO.getAction());aiJob.setType(reqVO.getType());aiJob.setCode(code);aiJob.setModel(aiModelEnum.getModel());aiJob.setModelVersion(aiModelEnum.getModelVersion());aiJob.setModelId(aiModelEnum.getModelId());aiJob.setModelName(aiModelEnum.getModelName());aiJob.setOutputCount(reqVO.getOutputCount());aiJob.setPlatform(aiModelEnum.getPlatform());aiJob.setChannel(aiModelEnum.getChannel());aiJob.setAsyn(aiModelEnum.getAsyn());aiJob.setCreateDate(DateUtil.format(now, "yyyy-MM-dd"));aiJob.setReqTime(now);String reqJson = processorFactory.getProcessor(aiModelEnum.getModel()).buildReqJson(reqVO, photoReviewAnalysis);aiJob.setReqJson(reqJson);aiJob.setCreateTime(now);Long id = aiJobService.add(aiJob);if (id > 0) {// 給個人賬戶加1userBenefitsService.addPointCountPerDay(userId, dateFormat, aiModelEnum, reqVO.getOutputCount());AiJobStatusModel res = BeanHelper.copyProperties(aiJob, AiJobStatusModel.class);// 這里推送消息myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(res));// 立馬去提交任務myEventPubLisher.pushAiJobNeedSubmitRightNow(aiJob);return res;}throw new RuntimeException("生成美化圖片創建失敗,請稍后再試");}

定時任務實現

IAiJobService

package com.cc672cc.service;import com.cc672cc.entity.tb.AiJob;public interface IAiJobService {/*** 添加任務** @param aiJob* @return 任務ID*/Long add(AiJob aiJob);/*** 提交任務* @param submitAiJob 提交的任務* @return*/int submitAiJob(AiJob submitAiJob);/*** 刷新AI任務狀態** @return 刷新的任務數量*/int refreshAiJobStatus();/*** 刷新AI任務狀態詳情* @param aiJob AI任務* @return*/int refreshAiJobStatusDetail(AiJob aiJob);/*** 遷移AI任務* @return*/int migrateAiJob();/*** 遷移AI任務詳情* @param aiJob AI任務* @return*/int migrateAiJobDetail(AiJob aiJob);/**** @param aiJobId* @return*/AiJob selectOneById(Long aiJobId);}

AiJobServiceImpl

package com.cc672cc.service.impl;import com.alibaba.fastjson2.JSON;
import com.cc672cc.common.constants.RedisPreKey;
import com.cc672cc.common.model.AiJobStatusModel;
import com.cc672cc.common.utils.BeanHelper;
import com.cc672cc.dp.listenermode.publisher.MyEventPubLisher;
import com.cc672cc.entity.tb.AiJob;
import com.cc672cc.entity.tb.PhotoReviewAnalysis;
import com.cc672cc.enums.dict.AiJobStatusEnum;
import com.cc672cc.enums.dict.AiModelEnum;
import com.cc672cc.enums.dict.DelEnum;
import com.cc672cc.enums.dict.StatusEnum;
import com.cc672cc.mapper.AiJobMapper;
import com.cc672cc.processor.AiJobProcessorFactory;
import com.cc672cc.service.IAiJobService;
import com.cc672cc.service.IPhotoReviewService;
import com.cc672cc.service.IUserBenefitsService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import tk.mybatis.mapper.entity.Example;import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;@Slf4j
@Service
public class AiJobServiceImpl implements IAiJobService {@Autowiredprivate AiJobMapper aiJobMapper;@Autowiredprivate RedissonClient redissonClient;@Lazy@Autowiredprivate AiJobProcessorFactory processorFactory;@Lazy@Autowiredprivate IUserBenefitsService userBenefitsService;@Lazy@Autowiredprivate IPhotoReviewService photoReviewService;@Lazy@Autowiredprivate MyEventPubLisher myEventPubLisher;@Overridepublic Long add(AiJob aiJob) {int insert = aiJobMapper.insert(aiJob);return insert > 0 ? aiJob.getId() : null;}@Overridepublic int submitAiJob(AiJob submitAiJob) {int res = 0;List<AiJob> aiJobs = new ArrayList<>();if (submitAiJob != null) {aiJobs.add(submitAiJob);} else {Example example = new Example(AiJob.class);Example.Criteria criteria = example.createCriteria();criteria.andEqualTo("status", StatusEnum.EFFECTIVE.getCode());criteria.andEqualTo("del", DelEnum.NOT_DELETED.getCode());criteria.andIn("jobStatus", List.of(AiJobStatusEnum.DRAFT.getCode()));example.orderBy("reqTime").desc();aiJobs = aiJobMapper.selectByExample(example);}String redisPreKey = RedisPreKey.CACHE_AI_JOB_PROCESS;if (aiJobs != null && !aiJobs.isEmpty()) {for (AiJob aiJob : aiJobs) {String lock = redisPreKey + aiJob.getId();RLock rLock = redissonClient.getLock(lock);try {boolean tryLock = rLock.tryLock(5, 60, TimeUnit.SECONDS);if (!tryLock) {continue;}AiJob newAiJob = aiJobMapper.selectByPrimaryKey(aiJob.getId());String model = newAiJob.getModel();Integer aiJobStatus = processorFactory.getProcessor(model).process(newAiJob);Integer[] unitProgressShowFlag = processorFactory.getProcessor(model).unitProgressShowFlag();if (List.of(AiJobStatusEnum.SUBMITTED.getCode(),AiJobStatusEnum.QUEUED.getCode(),AiJobStatusEnum.PROCESSING.getCode(),AiJobStatusEnum.GENERATED.getCode(),AiJobStatusEnum.SUCCESS.getCode(),AiJobStatusEnum.FAILED.getCode(),AiJobStatusEnum.CANCELED.getCode(),AiJobStatusEnum.TIMEOUT.getCode()).contains(aiJobStatus)) {newAiJob.setJobStatus(aiJobStatus);newAiJob.setOverallProgress(AiJobStatusEnum.getProgress(aiJobStatus));Integer showFlag = unitProgressShowFlag[aiJobStatus];if (0 == showFlag) {newAiJob.setUnitProgress(null);}int update = aiJobMapper.updateByPrimaryKey(newAiJob);if (update == 1) {AiJobStatusModel aiJobStatusModel = BeanHelper.copyProperties(newAiJob, AiJobStatusModel.class);myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(aiJobStatusModel));res++;}}} catch (Exception e) {log.error("提交AI任務失敗", e);} finally {if (rLock != null && rLock.isHeldByCurrentThread()) {rLock.unlock();}}}}return res;}@Overridepublic int refreshAiJobStatus() {int res = 0;Example example = new Example(AiJob.class);Example.Criteria criteria = example.createCriteria();criteria.andEqualTo("status", StatusEnum.EFFECTIVE.getCode());criteria.andEqualTo("del", DelEnum.NOT_DELETED.getCode());criteria.andIn("jobStatus", List.of(AiJobStatusEnum.SUBMITTED.getCode(),AiJobStatusEnum.QUEUED.getCode(),AiJobStatusEnum.PROCESSING.getCode()));example.orderBy("reqTime").desc();List<AiJob> aiJobs = aiJobMapper.selectByExample(example);if (aiJobs != null && !aiJobs.isEmpty()) {for (AiJob aiJob : aiJobs) {myEventPubLisher.pushAiJobStatusRefreshEvent(aiJob);res++;}}return res;}@Overridepublic int refreshAiJobStatusDetail(AiJob aiJob) {String redisPreKey = RedisPreKey.CACHE_AI_JOB_PROCESS;if (aiJob != null && aiJob.getId() != null) {myEventPubLisher.pushAiJobStatusRefreshEvent(aiJob);String lock = redisPreKey + aiJob.getId();RLock rLock = redissonClient.getLock(lock);try {boolean tryLock = rLock.tryLock(5, 30, TimeUnit.SECONDS);if (!tryLock) {return 0;}AiJob newAiJob = aiJobMapper.selectByPrimaryKey(aiJob.getId());String model = newAiJob.getModel();Integer aiJobStatus = processorFactory.getProcessor(model).query(newAiJob);Integer[] unitProgressShowFlag = processorFactory.getProcessor(model).unitProgressShowFlag();if (List.of(AiJobStatusEnum.QUEUED.getCode(),AiJobStatusEnum.PROCESSING.getCode(),AiJobStatusEnum.GENERATED.getCode(),AiJobStatusEnum.SUCCESS.getCode(),AiJobStatusEnum.FAILED.getCode(),AiJobStatusEnum.CANCELED.getCode(),AiJobStatusEnum.TIMEOUT.getCode()).contains(aiJobStatus)) {newAiJob.setJobStatus(aiJobStatus);newAiJob.setOverallProgress(AiJobStatusEnum.getProgress(aiJobStatus));Integer showFlag = unitProgressShowFlag[aiJobStatus];if (0 == showFlag) {newAiJob.setUnitProgress(null);}int update = aiJobMapper.updateByPrimaryKey(newAiJob);if (update == 1) {AiJobStatusModel aiJobStatusModel = BeanHelper.copyProperties(newAiJob, AiJobStatusModel.class);myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(aiJobStatusModel));return 1;}}} catch (Exception e) {log.error("刷新AI任務失敗", e);} finally {if (rLock != null && rLock.isHeldByCurrentThread()) {rLock.unlock();}}}return 0;}@Overridepublic int migrateAiJob() {int res = 0;Example example = new Example(AiJob.class);Example.Criteria criteria = example.createCriteria();criteria.andEqualTo("status", StatusEnum.EFFECTIVE.getCode());criteria.andEqualTo("del", DelEnum.NOT_DELETED.getCode());criteria.andIn("jobStatus", List.of(AiJobStatusEnum.GENERATED.getCode(),AiJobStatusEnum.MIGRATING.getCode()));example.orderBy("reqTime").desc();List<AiJob> aiJobs = aiJobMapper.selectByExample(example);if (aiJobs != null && !aiJobs.isEmpty()) {for (AiJob aiJob : aiJobs) {myEventPubLisher.pushAiJobMigrateEvent(aiJob);res++;}}return res;}@Overridepublic int migrateAiJobDetail(AiJob aiJob) {String redisPreKey = RedisPreKey.CACHE_AI_JOB_PROCESS;if (aiJob != null && aiJob.getId() != null) {String lock = redisPreKey + aiJob.getId();RLock rLock = redissonClient.getLock(lock);try {boolean tryLock = rLock.tryLock(5, 300, TimeUnit.SECONDS);if (!tryLock) {return 0;}AiJob newAiJob = aiJobMapper.selectByPrimaryKey(aiJob.getId());String model = newAiJob.getModel();Integer jobStatus = newAiJob.getJobStatus();if (AiJobStatusEnum.SUCCESS.getCode().equals(jobStatus)) {AiJobStatusModel aiJobStatusModel = BeanHelper.copyProperties(newAiJob, AiJobStatusModel.class);myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(aiJobStatusModel));return 1;}Integer aiJobStatus = processorFactory.getProcessor(model).migrate(newAiJob.getId(), jobStatus);Integer[] unitProgressShowFlag = processorFactory.getProcessor(model).unitProgressShowFlag();if (List.of(AiJobStatusEnum.MIGRATING.getCode(),AiJobStatusEnum.SUCCESS.getCode(),AiJobStatusEnum.FAILED.getCode(),AiJobStatusEnum.CANCELED.getCode(),AiJobStatusEnum.TIMEOUT.getCode()).contains(aiJobStatus)) {newAiJob.setJobStatus(aiJobStatus);newAiJob.setOverallProgress(AiJobStatusEnum.getProgress(aiJobStatus));Integer showFlag = unitProgressShowFlag[aiJobStatus];if (0 == showFlag) {newAiJob.setUnitProgress(null);}int update = aiJobMapper.updateByPrimaryKey(newAiJob);if (update == 1) {AiJobStatusModel aiJobStatusModel = BeanHelper.copyProperties(newAiJob, AiJobStatusModel.class);myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(aiJobStatusModel));return 1;}}if (List.of(AiJobStatusEnum.FAILED.getCode(),AiJobStatusEnum.CANCELED.getCode(),AiJobStatusEnum.TIMEOUT.getCode()).contains(aiJobStatus)) {Long userId = newAiJob.getUserId();newAiJob.setJobStatusDesc("遷移照片過程失敗,已返還每日積分");String modelId = newAiJob.getModelId();AiModelEnum aiModelEnum = AiModelEnum.getMap().get(modelId);Integer outputCount = newAiJob.getOutputCount();userBenefitsService.subtractPointCountPerDay(userId, newAiJob.getCreateDate(), aiModelEnum, outputCount);String photoReviewAnalysisCode = newAiJob.getPhotoReviewAnalysisCode();PhotoReviewAnalysis photoReviewAnalysis = photoReviewService.selectOneByCode(photoReviewAnalysisCode);photoReviewAnalysis.setBeautifyImage(Boolean.FALSE);photoReviewAnalysis.setUpdateTime(new Date());photoReviewService.updateById(photoReviewAnalysis);}} catch (Exception e) {log.error("遷移AI任務失敗", e);} finally {if (rLock != null && rLock.isHeldByCurrentThread()) {rLock.unlock();}}}return 0;}@Overridepublic AiJob selectOneById(Long aiJobId) {return aiJobMapper.selectByPrimaryKey(aiJobId);}
}

整體業務流程

提交AI任務->定時任務刷新狀態->定時任務遷移 (配合webscoket實時推送狀態)

總結

該方案適用于所有需要對接三方異步 API 的場景(如 AI 生成、視頻處理、數據分析等),通過標準化狀態管理與流程控制,解決了異步任務的復雜性與不可靠性問題。通用95%以上的場景,非常好用

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/918282.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/918282.shtml
英文地址,請注明出處:http://en.pswp.cn/news/918282.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

printf函數格式化輸出攻略

目錄 一、基本用法 二、占位符 基本用法 常用占位符 字符串占位符示例 多占位符示例 注意事項 三、占位符列表 基本數據類型占位符 浮點數占位符 特殊類型占位符 長度修飾符 使用示例 注意事項 四、輸出格式 1、限定寬度 基本用法 左對齊輸出 浮點數寬度限制…

AI小智單片機esps32-s3燒錄教程

1. 下載代碼到本地 代碼地址&#xff1a;https://github.com/78/xiaozhi-esp32 2. vscode安裝環境 安裝一下這個插件 3. esp32-s3通過數據線連接電腦 【圖片】 4. vscode選擇對應配置 如果是用自己的服務還得改下地址 5. 點擊構建 6. 點擊燒錄

socket編程中系統調用send()詳細講解

在 socket 編程中&#xff0c;send() 是用于在已連接的套接字上發送數據的系統調用&#xff0c;主要用于 TCP 協議&#xff08;也可用于 UDP&#xff0c;但需配合連接操作&#xff09;。它負責將用戶態的數據傳遞到內核緩沖區&#xff0c;再由內核協議棧&#xff08;如 TCP/IP&…

Eclipse JSP/Servlet:深入解析與最佳實踐

Eclipse JSP/Servlet:深入解析與最佳實踐 引言 Eclipse JSP/Servlet 是 Java 企業版(Java EE)的核心技術之一,它為 Web 開發者提供了一個強大的平臺來構建動態和交互式的 Web 應用程序。本文將深入探討 Eclipse JSP/Servlet 的概念、工作原理以及最佳實踐,幫助開發者更好…

CNB私有化部署Dify(零成本)教程

本篇文章介紹一下如何進行dify的私有化部署的過程&#xff1a;dify定位是開源的LLM應用平臺&#xff0c;搭建自己的這個AI應用&#xff0c;而我們非常熟悉的這個coze實際上是通過搭積木一樣搭建AI應用&#xff0c;我選擇從dify開始進行了解學習 1.前置準備 我們需要有這個騰訊云…

imx6ull-驅動開發篇16——信號量與互斥體

目錄 前言 信號量 概念與特性 信號量 API 函數 互斥體 概念與特性 互斥體 API 函數 前言 Linux 內核提供的幾種并發和競爭的處理方法&#xff0c;我們學習了&#xff1a; 驅動開發篇14——原子操作 驅動開發篇15——linux自旋鎖 本講我們就繼續學習&#xff1a;信號…

Bug 記錄:SecureRandom.getInstanceStrong()導致驗證碼獲取阻塞

問題描述&#xff1a; 在發送驗證碼到郵件中&#xff0c;接口調用時卡在生成驗證碼階段&#xff0c;導致驗證碼功能完全不可用&#xff1b; 經排查開發環境一切正常&#xff0c;測試環境會重現此問題&#xff1b;問題分析&#xff1a; 現象&#xff1a; 代碼卡在 SecureRandom.…

商派小程序商城(小程序/官網/APP···)的范式躍遷與增長再想象

2025年&#xff0c;品牌官方商城應該如何定義&#xff1f;—— 還是一套“電商貨架”&#xff1f;在商派看來&#xff0c;現如今“品牌官方商城”則需要重新定義&#xff0c;結合不同品牌企業的業務發展需求&#xff0c;也就有著更多豐富的定義和組合想象——如&#xff0c;商城…

基于串口實現可擴展的硬件函數 RPC 框架(附完整 Verilog 源碼)

本文介紹如何使用簡單的串口協議搭建一個支持寄存器讀寫與硬件函數調用的通用交互框架&#xff0c;適用于 FPGA 調試、嵌入式接口、中小型控制系統等場合。 特性&#xff1a; 輕量協議、30 個32位寄存器、函數調用、狀態反饋&#xff0c;源碼清晰易擴展。&#x1f4a1; 背景與目…

jenkins-飛書通知機制

一. 目標&#xff1a; 觸發構建和結束后&#xff0c;自動觸發到飛書工作群&#xff0c;發布同學只需要關注消息即可&#xff0c;而無需人工盯梢。 二. 實現方式&#xff1a; 1. 機器人配置&#xff1a; 創建一個群&#xff0c; 配置機器人&#xff1a; 保管好下面的webhoo…

GoLand 項目從 0 到 1:第五天 —— 角色權限中間件實現與事務控制

第五天核心任務:權限校驗鏈路閉環 第五天的開發聚焦于權限控制的核心實現,完成了角色權限中間件的開發,實現了接口級別的權限校驗,并基于事務控制確保用戶權限操作的數據一致性。通過這部分工作,系統的權限管理從設計階段正式進入可運行階段,為后續業務模塊的安全接入提…

【工具變量】地級市固定資產投資數據(2000-2023年)

數據簡介&#xff1a;地級市固定資產投資是衡量地方經濟發展活力與動能的重要指標&#xff0c;其規模、結構及增速不僅反映區域產業布局和政策導向&#xff0c;也直接影響基礎設施完善、產業升級和民生改善。清晰展現長期經濟發展脈絡&#xff0c;助力捕捉經濟增長與波動規律。…

Kotlin初體驗

前言&#xff1a; 在當今的軟件開發領域&#xff0c;隨著技術的不斷發展&#xff0c;開發者總是在尋找更高效、更簡潔的編程語言來提升開發效率并減少代碼中的潛在問題。而 Kotlin&#xff0c;作為一門現代的編程語言&#xff0c;逐漸贏得了越來越多開發者的青睞&#xff0c;尤…

五十五、【Linux系統nginx服務】nginx安裝、用戶認證、https實現

一、Nginx 核心功能全景圖 #mermaid-svg-j5M1WUQFrtyrWjAD {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-j5M1WUQFrtyrWjAD .error-icon{fill:#552222;}#mermaid-svg-j5M1WUQFrtyrWjAD .error-text{fill:#552222;s…

AtCoder Beginner Contest 418

文章目錄A Im a teapotB Youre a teapotC FlushD XNOR OperationE TrapeziumF Were teapotsG Binary OperationAtCoder Beginner Contest 418A I’m a teapot Takahashi is a teapot. Since he is a teapot, he will gladly accept tea, but will refuse any other liquid. Det…

多級緩存詳解

多級緩存 傳統緩存&#xff1a; 傳統緩存策略一般是請求到達Tomcat后&#xff0c;先查詢Redis&#xff0c;如果未命中則查詢數據庫。 這種模式下請求一定要經過Tomcat處理&#xff0c;Tomcat的性能就成為了整個系統的瓶頸&#xff1b;并且Redis的緩存也有過期時間&#xff0c;一…

接口自動化-JSON Schema

目錄 1.介紹 2.安裝 3.使用 3.1type關鍵字 3.2最大值最小值 3.2.1minimum 、 maximum 3.2.2 exclusiveMinimum 、exclusiveMaximum 3.3字符串特殊校驗 3.4數據約束 3.5對象約束 3.6必須屬性 3.7依賴關系 4.總結 1.介紹 JSON Schema 是一個用來定義和校驗 JSON 的…

前端技術架構設計文檔(Vue2+Antd+Sass)

前端技術架構設計文檔&#xff08;Vue2AntdSass&#xff09; 文檔信息項目名稱前端系統&#xff08;基于 Vue2 技術棧&#xff09;技術棧核心Vue2 Ant Design Vue Sass版本號V1.0.0技術棧核心Vue2 Ant Design Vue Sass編制日期2025-08-071. 技術棧選型 1.1 核心技術框架類別…

【設計模式】抽象工廠模式 (工具(Kit)模式)

抽象工廠模式&#xff08;Abstract Factory Pattern&#xff09;詳解一、抽象工廠模式簡介 抽象工廠模式&#xff08;Abstract Factory Pattern&#xff09; 是一種 創建型設計模式&#xff08;對象創建型模式&#xff09;&#xff0c;它提供了一種創建一系列相關或相互依賴對象…

Android初學者系統開發學習路線參考

Android初學者系統開發學習路線參考 文章目錄Android初學者系統開發學習路線參考一、前言二、Android初學的學習計劃第一階段&#xff08;一個月&#xff09;UI相關學習&#xff1a;開發環境與 UI 基礎&#xff0c;第一周&#xff1a;UI 控件與布局進階&#xff0c;第二周&…