最近做的頭條項目其中有個功能是創作者發表的文章可以設置在未來某個時間發表,在實現這個功能的時候就在想該怎么實現呢?剛開始想的是利用Spring的定時任務定時的去數據庫中查詢,可以這個查詢頻率該怎么設置,每次從數據庫中需要查詢文章延遲發布表的全部信息,未免有點太消耗時間了,況且MySQL還是存在本地磁盤的,讀取成本過高。
這個時候我就想既然遇到了讀寫速度問題,就要找緩存來解決了,利用Redis存儲在內存的特性,將馬上就要發布的文章信息存進Redis中,利用定時任務一分鐘查詢一次緩存,查看是否有要發布的文章,拉取對應的文章信息進行審核發布
我在redis中使用兩種數據結構來存儲文章發布信息
list
?是一個簡單的字符串列表,按照插入順序排序。你可以在列表的頭部(左邊)或尾部(右邊)插入元素。
?使用list存儲發布時間小于等于當前時間也就是立刻就要發布的文章,每次都從列表的左邊插入文章信息,定時任務消費的時候從右邊拉取數據,以形成一定的時間順序
?
zset
?是一種特殊的集合,它和普通集合一樣,成員都是唯一的,但每個成員都會關聯一個分數(score),Redis 會根據分數對成員進行從小到大的排序。
zset和list的區別就是元素唯一,并且每個元素綁定一個分數, 該集合會根據分數的大小進行排序,正好就可以把文章的預發布時間當作score,這樣每次取score最小的文章也就是最早要發布的文章,符合業務邏輯
延遲發布任務的存儲處理
處理流程如上圖所示,當有用戶發起文章發布請求時
1 先將文章相關的信息存入本地數據庫中做備份(防止因為系統或斷電導致緩存丟失)
2 然后判斷文章的預發布時間
????????如果小于等于當前時間,直接放入list中等待定時任務消費
??????? 否則如果發布時間在未來五分鐘以內,放入zset中
public long addTask(Task task) {//先存儲進本地數據庫boolean success=addTaskToDb(task);//存進緩存if(success){addTaskToCache(task);}return task.getTaskId();}/*** 把任務添加到redis中** @param task*/private void addTaskToCache(Task task) {String key = task.getTaskType() + "_" + task.getPriority();//獲取5分鐘之后的時間 毫秒值Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);long nextScheduleTime = calendar.getTimeInMillis();//2.1 如果任務的執行時間小于等于當前時間,存入listif (task.getExecuteTime() <= System.currentTimeMillis()) {cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));} else if (task.getExecuteTime() <= nextScheduleTime) {//2.2 如果任務的執行時間大于當前時間 && 小于等于預設時間(未來5分鐘) 存入zset中cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());}}//MySQL數據private boolean addTaskToDb(Task task) {//用來標記是否存儲成功boolean flag = false;try {//保存任務表Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task, taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);//設置taskIDtask.setTaskId(taskinfo.getTaskId());//保存任務日志數據TaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo, taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskinfoLogsMapper.insert(taskinfoLogs);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}
以上僅僅只是請求到來后初步的處理,當消費列表list中的文章發布任務處理完畢后怎么辦呢?
Redis數據處理
針對Redis中zset的數據(未來五分鐘內發布的文章),需要每分鐘查詢是否有發布時間小于等于當前時間的,然后從zset中移動到list中
這就涉及到Redis列表的搜索算法了,目前常用的匹配對應元素的方法有keys 的模糊匹配、Scan掃描,由于keys模糊匹配非常占用CPU的時間,所以一般使用SCAN掃描符合要求的數據
當用戶數據量較大是,如果從zset中一條一條的將文章發布任務移動到list中也很占用時間,恰好Redis提供了Pipeline請求服務,可以一次傳送大量數據,大大節省時間
(同時,基于分布式的軟件架構下可能有多個端同時處理文章預發布信息,這里使用redis的分布式鎖,占用時間三十秒)
定時任務代碼如下
@Scheduled(cron = "0 */1 * * * ?")public void refresh() {//使用redis的分布式鎖,三十秒后結束String token= cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);if(StringUtils.isNotBlank(token)){System.out.println(System.currentTimeMillis() / 1000 + "執行了定時任務");// 獲取所有未來數據集合的key值,使用scan而非keysSet<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*for (String futureKey : futureKeys) { // future_250_250String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];//獲取該組key下當前需要消費的任務數據Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//將這些任務數據添加到消費者隊列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);System.out.println("成功的將" + futureKey + "下的當前需要執行的任務數據刷新到" + topicKey + "下");}}}}
普通redis客戶端和服務器交互模式
Pipeline請求模型
官方測試結果數據對比
延遲發布任務的消費
上面已經解決了文章預發布任務的處理,下面就是從緩存中定時的拉取任務進行文章發布了
在自媒體段使用Feign接口遠程調用任務模塊的poll方法拉取緩存中的任務
@Scheduled(fixedRate = 1000)@SneakyThrows@Overridepublic void scanNewsByTask() {log.info("文章審核---消費任務執行---begin---");//從緩存中拉取文章發布任務ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(),TaskTypeEnum.NEWS_SCAN_TIME.getPriority());if(responseResult.getCode().equals(200) && responseResult.getData() != null){String json_str = JSON.toJSONString(responseResult.getData());Task task = JSON.parseObject(json_str, Task.class);byte[] parameters = task.getParameters();WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);//審核文章內容wmNewsAutoScanService.autoScanWmNews(wmNews.getId());}log.info("文章審核---消費任務執行---end---");}
?任務模塊的poll
public Task poll(int type,int priority) {Task task = null;try {String key = type+"_"+priority;String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);if(StringUtils.isNotBlank(task_json)){task = JSON.parseObject(task_json, Task.class);//更新數據庫信息updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);}}catch (Exception e){e.printStackTrace();log.error("poll task exception");}return task;}