目錄
1、什么是延時任務,分別可以使用哪些技術實現?
1.2 使用? Redis 和 DB 相結合的思路圖以及分析
2、實現添加任務、刪除任務、拉取任務
3、實現未來數據的定時更新
4、將數據庫中的任務數據,同步到 Redis 中
1、什么是延時任務,分別可以使用哪些技術實現?
延時任務:有固定周期的,有明確的觸發時間
延遲隊列:沒有固定的開始時間,它常常是由一個事件觸發的,而在這個事件觸發之后的一段時間內觸發另一個事件,任務可以立即執行,也可以延遲
使用場景:
場景一:訂單下單之后30分鐘后,如果用戶沒有付錢,則系統自動取消訂單;如果期間下單成功,則任務取消
場景二:接口對接出現網絡問題,1分鐘后重試,如果失敗,2分鐘重試,直到出現閾值終止
常用的技術方案:
DelayQueue(JDK自帶):是一個支持延時獲取元素的阻塞隊列, 內部采用優先隊列 PriorityQueue 存儲元素,同時元素必須實現 Delayed 接口;在創建元素時可以指定多久才可以從隊列中獲取當前元素,只有在延遲期滿時才能從隊列中提取元素
弊端:使用線程池或者原生 DelayQueue 程序掛掉之后,任務都是放在內存,需要考慮未處理消息的丟失帶來的影響,如何保證數據不丟失,需要持久化(磁盤)
RabbitMQ(消息中間件):允許不同應用之間通過消息傳遞進行通信,提供了可靠的消息傳遞機制(將消息保存在磁盤中),支持多種消息模式,包括點對點和發布/訂閱。RabbitMQ基于AMQP(高級消息隊列協議)設計,具有高度的可擴展性和靈活性
使用 Redis 結合 DB 實現:能夠充分利用Redis的高性能特性和靈活的數據結構,同時結合數據庫的持久化和數據管理能力(存在磁盤,不易丟失),為系統提供高效、實時、可靠的延時任務處理機制
這里我們選用的是 Redis 結合DB進行實現??????????????????????????????????????????
【問題】
為什么選用 Redis + DB ,而不選用 RabbitMQ ?
1、Redis 相對于 RabbitMQ 更加輕量級,對于簡單的延時任務隊列,可能更傾向于使用輕量級的Redis而不是引入RabbitMQ等消息中間件的復雜性
2、Redis通常更容易集成和維護,因為它是一個簡單的鍵值存儲系統,而RabbitMQ是一個完整的消息中間件系統。對于一些小型項目或者對于消息中間件功能的需求不是很大的情況下,選擇Redis可能更為經濟實惠
1.2 使用? Redis 和 DB 相結合的思路圖以及分析
【整體流程圖】
【分析問題】
1、為什么任務需要存儲在數據庫中?
延遲任務是一個通用的服務,任何需要延遲得任務都可以調用該服務,需要考慮數據持久化的問題,存儲數據庫中是一種數據安全的考慮(不容易丟失)
2、為什么 Redis 中使用兩種數據類型,list 和 zset?
結合場景,考慮效率問題以及算法的時間復雜度
3、在添加 zset 數據的時候,為什么需要預加載?
任務模塊是一個通用的模塊,項目中任何需要延遲隊列的地方,都可以調用這個接口,要考慮到數據量的問題;如果數據量特別大,為了防止阻塞,只需要把未來幾分鐘要執行的數據存入緩存即可
2、實現添加任務、刪除任務、拉取任務
【數據庫表結構信息】
Taskinfo
TaskinfoLog
【添加任務】
將任務添加到數據庫中
這里 TaskinfoLog 內置了 version 版本號,即樂觀鎖,保證同一時刻只有一個線程執行成功;其中,Task 是 DTO 數據,Taskinfo(任務) 與?TaskinfoLog(任務日志)是DB數據
private boolean addTackToDB(Task task) {boolean loop = false;try {//1.保存任務表Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task, taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);task.setTaskId(taskinfo.getTaskId()); //將 任務ID 傳給前端//2.保存日志數據TaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo, taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED); //初始化taskinfoLogsMapper.insert(taskinfoLogs);loop = true;}catch (Exception exception){exception.printStackTrace();}return loop;}
將任務添加到 Redis 中
這里調用 Calender.getInstance() 獲得任務預設時間(這里是當前時間5min后);將小于等于 LocalTime 的任務放入 List 中,否則,則將預設任務放入 Zset 進行暫存
private void addTaskToRedis(Task task) {String key = task.getTaskType() + "_" +task.getPriority();//1.獲取未來 5 分鐘之后的預設時間Calendar calendar = Calendar.getInstance(); //獲取當前日期和時間的日歷實例calendar.add(Calendar.MINUTE,5);long calendarTimeInMillis = calendar.getTimeInMillis(); //獲取其毫秒值//2.1 若任務執行的時間小于當前時間,則直接放入 list 數據結構中if(task.getExecuteTime() <= System.currentTimeMillis()){cacheService.lLeftPush(ScheduleConstants.TOPIC+key, JSON.toJSONString(task));}else if(task.getExecuteTime() <= calendarTimeInMillis){//2.2 若任務執行的時間大于當前時間 并且 小于等于預設時間(未來5分鐘),則直接放入 zset 中按照分值排序進行存儲cacheService.zAdd(ScheduleConstants.FUTURE+ key,JSON.toJSONString(task),task.getExecuteTime());}}
?調用以上方法
public long addTask(Task task) {//1.添加任務到 DB 中,保證任務的持久化boolean res = addTackToDB(task);if(res) {//2.將任務添加到 redis 中addTaskToRedis(task);}return task.getTaskId();}
【刪除任務】
刪除數據庫中的任務,并更新對應任務的任務日志
private Task deleteTask_UpdateTaskLog(long taskId, int status) {Task task =null;try {//1.刪除任務taskinfoMapper.deleteById(taskId);//2.更新任務日志TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);taskinfoLogs.setStatus(status);taskinfoLogsMapper.updateById(taskinfoLogs);task = new Task();BeanUtils.copyProperties(taskinfoLogs,task);task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime()); //更新當前執行時間}catch (Exception e){log.error("任務處理失敗,異常任務ID:{}",taskId);e.printStackTrace();}return task;}
?根據任務的時間類型,刪除 Redis 中 List 與 Zset 中保存的任務信息
private void removeTaskFromRedis(Task task) {String key = task.getTaskType() + "_" +task.getPriority();//1. 執行時間小于當前時間,則進行刪除任務if(task.getExecuteTime() <= System.currentTimeMillis()){cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task)); //list}else{cacheService.zRemove(ScheduleConstants.FUTURE+key,JSON.toJSONString(task)); //zset}}
調用以上方法
public boolean cancelTask(long taskId) {boolean loop = false;//1.刪除任務,更新任務日志Task task = deleteTask_UpdateTaskLog (taskId,ScheduleConstants.CANCELLED);//2.刪除 redis 中的數據if(task!=null){removeTaskFromRedis(task);loop = true;}return loop;}
【拉取任務】
?由于 List 中存儲的任務是以 JSON 的形式進行存儲的,所以需要將其進行 parseObj?序列化
? 使用 lRightPop() 將需要立即執行的任務從 List 中拉取出來,并更新任務日志的狀態
public Task pullTask(int type, int priority) {Task task = null;try {String key = type + "_" +priority;//1.從 list 中使用 pop 拉取任務String taskJSON = cacheService.lRightPop(key); //解析出來的信息是 JSON 字段if(StringUtils.isNotBlank(taskJSON)){task = JSON.parseObject(taskJSON, Task.class);//1.1.在數據庫中刪除任務,更新任務日志deleteTask_UpdateTaskLog(task.getTaskId(), ScheduleConstants.EXECUTED); //已執行}}catch (Exception e){e.printStackTrace();log.error("拉取任務異常!");}return task;}
3、實現未來數據的定時更新
將任務根據執行的時間,分別存入 Redis 中的 List 與 Zset 中后
還需要判斷?Zset 中進行預設時間的任務,是否到了需要執行的時間,到了的話需要進行任務消費
所以,需要設定一個時間,定時的將 Zset 中的數據推送到 List 中,避免任務的堆積與消費延時
【分析問題】
? 在任務推送時,需要將 Redis?中所有的 future 任務提取出來進行遍歷判斷(通過 key 獲取)
? 在進行全局模糊匹配?Key 值獲取的時候,一般有兩種方法:Keys? 和? Scan
? Keys:keys的模糊匹配功能很方便也很強大,但是在生產環境需要慎用;開發中使用 keys的模糊匹配卻發現 Redis 的 CPU 使用率極高,Redis是單線程,會被堵塞
? Scan:SCAN 命令是一個基于游標的迭代器,SCAN 命令每次被調用之后, 都會向用戶返回一個新的游標, 用戶在下次迭代時需要使用這個新游標作為 SCAN 命令的游標參數, 以此來延續之前的迭代過程
這里,我們使用 Scan 技術進行模糊匹配
根據模糊匹配獲取對應的任務后,需要進行消息的推送,Redis 中一般存在兩種消息交互的方法:
普通 Redis 客戶端和服務器交互模式
Pipeline 消息管道的請求模型
根據場景以及考慮到效率的問題,這里我們使用管道技術進行消息的推送
? 以上代碼實現:?
//1.查詢所有未來數值的 keySet<String> future_keys = cacheService.scan(ScheduleConstants.FUTURE + "*");future_keys.forEach(new Consumer<String>() { //future_100_20@Overridepublic void accept(String future_key) {//以 future 進行分組 =》 future + 100_20 ,然后以 topic 前綴進行拼接String topic_Key = ScheduleConstants.TOPIC + future_key.split(ScheduleConstants.FUTURE)[1];//1.1 根據 key 查詢符合條件的信息(即判斷執行的時間是否大于當前時間,若小于或等于,則符合條件)Set<String> tasks = cacheService.zRangeByScore(future_key, 0, System.currentTimeMillis());//2. 進行同步數據if (!tasks.isEmpty()) {//2.1 使用管道技術,將任務數據批量同步到 list 中,等待消費cacheService.refreshWithPipeline(future_key, topic_Key, tasks);log.info("將定時任務 " + future_key + " 刷新到了 " + topic_Key);}}});
【分析問題】
? 這是在單服務下進行消息的推送,若在多服務下進行,由于多個 Tomcat 中對應著不同的 JVM ,所以所控制的鎖也不一樣,這樣,就又會出現線程同步問題
【解決問題】
? ?對于這種情況,使用分布式鎖可能是最好的選擇;而實現分布式鎖的方法多種多樣,而 Redis 中所提供的 SetNX 正好可以解決
? SetNX?分布式鎖代碼如下:
/*** 使用 setnx 實現分布式鎖*/public String tryLock(String name, long expire) {name = name + "_lock";String token = UUID.randomUUID().toString();RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();RedisConnection conn = factory.getConnection();try {//參考redis命令://set key value [EX seconds] [PX milliseconds] [NX|XX]Boolean result = conn.set(name.getBytes(),token.getBytes(),Expiration.from(expire, TimeUnit.MILLISECONDS),RedisStringCommands.SetOption.SET_IF_ABSENT //NX);if (result != null && result)return token;} finally {RedisConnectionUtils.releaseConnection(conn, factory,false);}return null;}
完整代碼如下:
@Scheduled(cron = "0 */1 * * * ?") //定時,每分鐘刷新一次public void refreshTask(){String token = cacheService.tryLock("FUTURE_TASK_SN", 1000 * 30);if(StringUtils.isNotBlank(token) && token.length()!=0) { //進行 NX 加鎖操作,使不同服務下同一時刻只能有一個搶占當前任務//1.查詢所有未來數值的 keySet<String> future_keys = cacheService.scan(ScheduleConstants.FUTURE + "*");future_keys.forEach(new Consumer<String>() { //future_100_20@Overridepublic void accept(String future_key) {//以 future 進行分組 =》 future + 100_20 ,然后以 topic 前綴進行拼接String topic_Key = ScheduleConstants.TOPIC + future_key.split(ScheduleConstants.FUTURE)[1];//1.1 根據 key 查詢符合條件的信息(即判斷執行的時間是否大于當前時間,若小于或等于,則符合條件)Set<String> tasks = cacheService.zRangeByScore(future_key, 0, System.currentTimeMillis());//2. 進行同步數據if (!tasks.isEmpty()) {//2.1 使用管道技術,將任務數據批量同步到 list 中,等待消費cacheService.refreshWithPipeline(future_key, topic_Key, tasks);log.info("將定時任務 " + future_key + " 刷新到了 " + topic_Key);}}});}}
??
4、將數據庫中的任務數據,同步到 Redis 中
由于時間是流動的,任務的執行時間是死的,所以需要進行動態的數據更新,保證數據的有效性
流程圖如下所示:
? 為了數據同步的時候,避免數據庫中的數據,與 Redis 中未消費的任務的重復;所以,需要清除 Redis 中所有任務的緩存數據,以確保同步到 Redis 中的數據是最新的
public void clearCacheByRedis(){Set<String> topic_keys = cacheService.scan(ScheduleConstants.TOPIC + "*"); //list 中的所有任務的 keySet<String> future_keys = cacheService.scan(ScheduleConstants.FUTURE + "*"); //zset 中所有任務中的 keycacheService.delete(topic_keys);cacheService.delete(future_keys);}
任務同步的代碼如下:
這里使用?@PostConstruct 注解?進行方法的初始化操作(根據實際情況定義)
@PostConstruct //進行初始化操作,每當啟動微服務時,當前方法就會執行一次@Scheduled(cron = "0 */5 * * * ?") //每五分鐘執行一次public void renewDBTasks_To_Redis(){//1.清除 redis 中的緩存clearCacheByRedis();//2.查詢 DB 中執行時間小于預設時間的任務//2.1.獲取未來 5 分鐘之后的預設時間Calendar calendar = Calendar.getInstance(); //獲取當前日期和時間的日歷實例calendar.add(Calendar.MINUTE,5);long calendarTimeInMillis = calendar.getTimeInMillis(); //獲取其毫秒值LambdaQueryWrapper<Taskinfo> queryWrapper = new LambdaQueryWrapper<>();queryWrapper.lt(Taskinfo::getExecuteTime,calendarTimeInMillis);List<Taskinfo> taskInfos = taskinfoMapper.selectList(queryWrapper);//3.將數據庫中數據同步保存到 redis 中if(taskInfos!=null && taskInfos.size()>0) {taskInfos.forEach(new Consumer<Taskinfo>() {@Overridepublic void accept(Taskinfo taskinfo) {Task task = new Task();BeanUtils.copyProperties(taskinfo,task);task.setExecuteTime(taskinfo.getExecuteTime().getTime());//3.1 由它內部判斷,是存儲在 list 中還是 zset 中addTaskToRedis(task);}});}log.info("成功將數據庫中的數據更新同步到了 redis 中");}
所有方法的完整代碼:
@Slf4j
@Service
@Transactional
public class TaskServiceImpl implements TaskService {@Resourceprivate TaskinfoMapper taskinfoMapper;@Resourceprivate TaskinfoLogsMapper taskinfoLogsMapper;@Resourceprivate CacheService cacheService;/*** 添加任務* @param task 任務對象* @return 任務ID*/@Overridepublic long addTask(Task task) {//1.添加任務到 DB 中,保證任務的持久化boolean res = addTackToDB(task);if(res) {//2.將任務添加到 redis 中addTaskToRedis(task);}return task.getTaskId();}/*** 將已完成的任務刪除*/@Overridepublic boolean cancelTask(long taskId) {boolean loop = false;//1.刪除任務,更新任務日志Task task = deleteTask_UpdateTaskLog (taskId,ScheduleConstants.CANCELLED);//2.刪除 redis 中的數據if(task!=null){removeTaskFromRedis(task);loop = true;}return loop;}/*** 按照類型和優先級進行拉取 list 中的任務*/@Overridepublic Task pullTask(int type, int priority) {Task task = null;try {String key = type + "_" +priority;//1.從 list 中使用 pop 拉取任務String taskJSON = cacheService.lRightPop(key); //解析出來的信息是 JSON 字段if(StringUtils.isNotBlank(taskJSON)){task = JSON.parseObject(taskJSON, Task.class);//1.1.在數據庫中刪除任務,更新任務日志deleteTask_UpdateTaskLog(task.getTaskId(), ScheduleConstants.EXECUTED); //已執行}}catch (Exception e){e.printStackTrace();log.error("拉取任務異常!");}return task;}/*** 未來數據的更新,將 zset 中的任務推送到 list 中*/@Scheduled(cron = "0 */1 * * * ?") //定時,每分鐘刷新一次public void refreshTask(){String token = cacheService.tryLock("FUTURE_TASK_SN", 1000 * 30);if(StringUtils.isNotBlank(token) && token.length()!=0) { //進行 NX 加鎖操作,使不同服務下同一時刻只能有一個搶占當前任務//1.查詢所有未來數值的 keySet<String> future_keys = cacheService.scan(ScheduleConstants.FUTURE + "*");future_keys.forEach(new Consumer<String>() { //future_100_20@Overridepublic void accept(String future_key) {//以 future 進行分組 =》 future + 100_20 ,然后以 topic 前綴進行拼接String topic_Key = ScheduleConstants.TOPIC + future_key.split(ScheduleConstants.FUTURE)[1];//1.1 根據 key 查詢符合條件的信息(即判斷執行的時間是否大于當前時間,若小于或等于,則符合條件)Set<String> tasks = cacheService.zRangeByScore(future_key, 0, System.currentTimeMillis());//2. 進行同步數據if (!tasks.isEmpty()) {//2.1 使用管道技術,將任務數據批量同步到 list 中,等待消費cacheService.refreshWithPipeline(future_key, topic_Key, tasks);log.info("將定時任務 " + future_key + " 刷新到了 " + topic_Key);}}});}}/*** 數據庫中的任務同步到 redis 中,保證數據的一致性*/@PostConstruct //進行初始化操作,每當啟動微服務時,當前方法就會執行一次@Scheduled(cron = "0 */5 * * * ?") //每五分鐘執行一次public void renewDBTasks_To_Redis(){//1.清除 redis 中的緩存clearCacheByRedis();//2.查詢 DB 中執行時間小于預設時間的任務//2.1.獲取未來 5 分鐘之后的預設時間Calendar calendar = Calendar.getInstance(); //獲取當前日期和時間的日歷實例calendar.add(Calendar.MINUTE,5);long calendarTimeInMillis = calendar.getTimeInMillis(); //獲取其毫秒值LambdaQueryWrapper<Taskinfo> queryWrapper = new LambdaQueryWrapper<>();queryWrapper.lt(Taskinfo::getExecuteTime,calendarTimeInMillis);List<Taskinfo> taskInfos = taskinfoMapper.selectList(queryWrapper);//3.將數據庫中數據同步保存到 redis 中if(taskInfos!=null && taskInfos.size()>0) {taskInfos.forEach(new Consumer<Taskinfo>() {@Overridepublic void accept(Taskinfo taskinfo) {Task task = new Task();BeanUtils.copyProperties(taskinfo,task);task.setExecuteTime(taskinfo.getExecuteTime().getTime());//3.1 由它內部判斷,是存儲在 list 中還是 zset 中addTaskToRedis(task);}});}log.info("成功將數據庫中的數據更新同步到了 redis 中");}/******************************************************************************************************************** 刪除 redis 中對應的任務*/private void removeTaskFromRedis(Task task) {String key = task.getTaskType() + "_" +task.getPriority();//1. 執行時間小于當前時間,則進行刪除任務if(task.getExecuteTime() <= System.currentTimeMillis()){cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task)); //list}else{cacheService.zRemove(ScheduleConstants.FUTURE+key,JSON.toJSONString(task)); //zset}}/*** 刪除 redis 中所有的緩存數據*/public void clearCacheByRedis(){Set<String> topic_keys = cacheService.scan(ScheduleConstants.TOPIC + "*"); //list 中的所有任務的 keySet<String> future_keys = cacheService.scan(ScheduleConstants.FUTURE + "*"); //zset 中所有任務中的 keycacheService.delete(topic_keys);cacheService.delete(future_keys);}/*** 在數據庫中刪除任務,更新任務日志*/private Task deleteTask_UpdateTaskLog(long taskId, int status) {Task task =null;try {//1.刪除任務taskinfoMapper.deleteById(taskId);//2.更新任務日志TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);taskinfoLogs.setStatus(status);taskinfoLogsMapper.updateById(taskinfoLogs);task = new Task();BeanUtils.copyProperties(taskinfoLogs,task);task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime()); //更新當前執行時間}catch (Exception e){log.error("任務處理失敗,異常任務ID:{}",taskId);e.printStackTrace();}return task;}/*** 將任務存到 redis 中*/private void addTaskToRedis(Task task) {String key = task.getTaskType() + "_" +task.getPriority();//1.獲取未來 5 分鐘之后的預設時間Calendar calendar = Calendar.getInstance(); //獲取當前日期和時間的日歷實例calendar.add(Calendar.MINUTE,5);long calendarTimeInMillis = calendar.getTimeInMillis(); //獲取其毫秒值//2.1 若任務執行的時間小于當前時間,則直接放入 list 數據結構中if(task.getExecuteTime() <= System.currentTimeMillis()){cacheService.lLeftPush(ScheduleConstants.TOPIC+key, JSON.toJSONString(task));}else if(task.getExecuteTime() <= calendarTimeInMillis){//2.2 若任務執行的時間大于當前時間 并且 小于等于預設時間(未來5分鐘),則直接放入 zset 中按照分值排序進行存儲cacheService.zAdd(ScheduleConstants.FUTURE+ key,JSON.toJSONString(task),task.getExecuteTime());}}/*** 將任務添加到數據庫中*/private boolean addTackToDB(Task task) {boolean loop = false;try {//1.保存任務表Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task, taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);task.setTaskId(taskinfo.getTaskId()); //將 任務ID 傳給前端//2.保存日志數據TaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo, taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED); //初始化taskinfoLogsMapper.insert(taskinfoLogs);loop = true;}catch (Exception exception){exception.printStackTrace();}return loop;}