接上文:
Redisson的RDelayedQueue
Redisson他是Redis的兒子(Redis son),基于Redis實現了非常多的功能,其中最常使用的就是Redis分布式鎖的實現,但是除了實現Redis分布式鎖之外,它還實現了延遲隊列的功能。
先來個demo
引入pom
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.1</version>
</dependency>
封裝了一個RedissonDelayQueue類
@Component
@Slf4j
public?class?RedissonDelayQueue?{private?RedissonClient?redissonClient;private?RDelayedQueue<String>?delayQueue;private?RBlockingQueue<String>?blockingQueue;@PostConstructpublic?void?init()?{initDelayQueue();startDelayQueueConsumer();}private?void?initDelayQueue()?{Config?config?=?new?Config();SingleServerConfig?serverConfig?=?config.useSingleServer();serverConfig.setAddress("redis://localhost:6379");redissonClient?=?Redisson.create(config);blockingQueue?=?redissonClient.getBlockingQueue("SANYOU");delayQueue?=?redissonClient.getDelayedQueue(blockingQueue);}private?void?startDelayQueueConsumer()?{new?Thread(()?->?{while?(true)?{try?{String?task?=?blockingQueue.take();log.info("接收到延遲任務:{}",?task);}?catch?(Exception?e)?{e.printStackTrace();}}},?"SANYOU-Consumer").start();}public?void?offerTask(String?task,?long?seconds)?{log.info("添加延遲任務:{}?延遲時間:{}s",?task,?seconds);delayQueue.offer(task,?seconds,?TimeUnit.SECONDS);}}
這個類在創建的時候會去初始化延遲隊列,創建一個RedissonClient對象,之后通過RedissonClient對象獲取到RDelayedQueue和RBlockingQueue對象,傳入的隊列名字叫SANYOU,這個名字無所謂。
當延遲隊列創建之后,會開啟一個延遲任務的消費線程,這個線程會一直從RBlockingQueue中通過take方法阻塞獲取延遲任務。
添加任務的時候是通過RDelayedQueue的offer方法添加的。
controller類,通過接口添加任務,延遲時間為5s
@RestController
public?class?RedissonDelayQueueController?{@Resourceprivate?RedissonDelayQueue?redissonDelayQueue;@GetMapping("/add")public?void?addTask(@RequestParam("task")?String?task)?{redissonDelayQueue.offerTask(task,?5);}}
啟動項目,在瀏覽器輸入如下連接,添加任務
http://localhost:8080/add?task=sanyou
靜靜等待5s,成功獲取到任務。
實現原理
如下是Redisson延遲隊列的實現原理
SANYOU前面的前綴都是固定的,Redisson創建的時候會拼上前綴。
-
redisson_delay_queue_timeout:SANYOU
,sorted set數據類型,存放所有延遲任務,按照延遲任務的到期時間戳(提交任務時的時間戳 + 延遲時間)來排序的,所以列表的最前面的第一個元素就是整個延遲隊列中最早要被執行的任務,這個概念很重要 -
redisson_delay_queue:SANYOU
,list數據類型,也是存放所有的任務,但是研究下來發現好像沒什么用。。 -
SANYOU
,list數據類型,被稱為目標隊列,這個里面存放的任務都是已經到了延遲時間的,可以被消費者獲取的任務,所以上面demo中的RBlockingQueue的take方法是從這個目標隊列中獲取到任務的 -
redisson_delay_queue_channel:SANYOU
,是一個channel,用來通知客戶端開啟一個延遲任務
任務提交的時候,Redisson會將任務放到redisson_delay_queue_timeout:SANYOU
中,分數就是提交任務的時間戳+延遲時間,就是延遲任務的到期時間戳
Redisson客戶端內部通過監聽redisson_delay_queue_channel:SANYOU
這個channel來提交一個延遲任務,這個延遲任務能夠保證將redisson_delay_queue_timeout:SANYOU
中到了延遲時間的任務從redisson_delay_queue_timeout:SANYOU
中移除,存到SANYOU
這個目標隊列中。
于是消費者就可以從SANYOU
這個目標隊列獲取到延遲任務了。
所以從這可以看出,Redisson的延遲任務的實現跟前面說的MQ的實現都是殊途同歸,最開始任務放到中間的一個地方,叫做redisson_delay_queue_timeout:SANYOU
,然后會開啟一個類似于定時任務的一個東西,去判斷這個中間地方的消息是否到了延遲時間,到了再放到最終的目標的隊列供消費者消費。
Redisson的這種實現方式比監聽Redis過期key的實現方式更加可靠,因為消息都存在list和sorted set數據類型中,所以消息很少丟。
上述說的兩種Redis的方案更詳細的介紹,可以查看我之前寫的用Redis實現延遲隊列,我研究了兩種方案,發現并不簡單這篇文章。
Netty的HashedWheelTimer
先來個demo
@Slf4j
public?class?NettyHashedWheelTimerDemo?{public?static?void?main(String[]?args)?{HashedWheelTimer?timer?=?new?HashedWheelTimer(100,?TimeUnit.MILLISECONDS,?8);timer.start();log.info("提交延遲任務");timer.newTimeout(timeout?->?log.info("執行延遲任務"),?5,?TimeUnit.SECONDS);}}
測試結果
實現原理
如圖,時間輪會被分成很多格子(上述demo中的8就代表了8個格子),一個格子代表一段時間(上述demo中的100就代表一個格子是100ms),所以上述demo中,每800ms會走一圈。
當任務提交的之后,會根據任務的到期時間進行hash取模,計算出這個任務的執行時間所在具體的格子,然后添加到這個格子中,通過如果這個格子有多個任務,會用鏈表來保存。所以這個任務的添加有點像HashMap儲存元素的原理。
HashedWheelTimer內部會開啟一個線程,輪詢每個格子,找到到了延遲時間的任務,然后執行。
由于HashedWheelTimer也是單線程來處理任務,所以跟Timer一樣,長時間運行的任務會導致其他任務的延時處理。
前面Redisson中提到的客戶端延遲任務就是基于Netty的HashedWheelTimer實現的。
Hutool的SystemTimer
Hutool工具類也提供了延遲任務的實現SystemTimer
demo
@Slf4j
public?class?SystemTimerDemo?{public?static?void?main(String[]?args)?{SystemTimer?systemTimer?=?new?SystemTimer();systemTimer.start();log.info("提交延遲任務");systemTimer.addTask(new?TimerTask(()?->?log.info("執行延遲任務"),?5000));}}
執行結果
Hutool底層其實也用到了時間輪。
Qurtaz
Qurtaz是一款開源作業調度框架,基于Qurtaz提供的api也可以實現延遲任務的功能。
demo
依賴
<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.3.2</version>
</dependency>
SanYouJob實現Job接口,當任務到達執行時間的時候會調用execute的實現,從context可以獲取到任務的內容
@Slf4j
public?class?SanYouJob?implements?Job?{@Overridepublic?void?execute(JobExecutionContext?context)?throws?JobExecutionException?{JobDetail?jobDetail?=?context.getJobDetail();JobDataMap?jobDataMap?=?jobDetail.getJobDataMap();log.info("獲取到延遲任務:{}",?jobDataMap.get("delayTask"));}
}
測試類
public?class?QuartzDemo?{public?static?void?main(String[]?args)?throws?SchedulerException,?InterruptedException?{//?1.創建Scheduler的工廠SchedulerFactory?sf?=?new?StdSchedulerFactory();//?2.從工廠中獲取調度器實例Scheduler?scheduler?=?sf.getScheduler();//?6.啟動?調度器scheduler.start();//?3.創建JobDetail,Job類型就是上面說的SanYouJobJobDetail?jb?=?JobBuilder.newJob(SanYouJob.class).usingJobData("delayTask",?"這是一個延遲任務").build();//?4.創建TriggerTrigger?t?=?TriggerBuilder.newTrigger()//任務的觸發時間就是延遲任務到的延遲時間.startAt(DateUtil.offsetSecond(new?Date(),?5)).build();//?5.注冊任務和定時器log.info("提交延遲任務");scheduler.scheduleJob(jb,?t);}
}
執行結果:
實現原理
核心組件
-
Job:表示一個任務,execute方法的實現是對任務的執行邏輯
-
JobDetail:任務的詳情,可以設置任務需要的參數等信息
-
Trigger:觸發器,是用來觸發業務的執行,比如說指定5s后觸發任務,那么任務就會在5s后觸發
-
Scheduler:調度器,內部可以注冊多個任務和對應任務的觸發器,之后會調度任務的執行
啟動的時候會開啟一個QuartzSchedulerThread調度線程,這個線程會去判斷任務是否到了執行時間,到的話就將任務交給任務線程池去執行。
無限輪詢延遲任務
無限輪詢的意思就是開啟一個線程不停的去輪詢任務,當這些任務到達了延遲時間,那么就執行任務。
demo
@Slf4j
public?class?PollingTaskDemo?{private?static?final?List<DelayTask>?DELAY_TASK_LIST?=?new?CopyOnWriteArrayList<>();public?static?void?main(String[]?args)?{new?Thread(()?->?{while?(true)?{try?{for?(DelayTask?delayTask?:?DELAY_TASK_LIST)?{if?(delayTask.triggerTime?<=?System.currentTimeMillis())?{log.info("處理延遲任務:{}",?delayTask.taskContent);DELAY_TASK_LIST.remove(delayTask);}}TimeUnit.MILLISECONDS.sleep(100);}?catch?(Exception?e)?{}}}).start();log.info("提交延遲任務");DELAY_TASK_LIST.add(new?DelayTask("三友的java日記",?5L));}@Getter@Setterpublic?static?class?DelayTask?{private?final?String?taskContent;private?final?Long?triggerTime;public?DelayTask(String?taskContent,?Long?delayTime)?{this.taskContent?=?taskContent;this.triggerTime?=?System.currentTimeMillis()?+?delayTime?*?1000;}}}
任務可以存在數據庫又或者是內存,看具體的需求,這里我為了簡單就放在內存里了。
執行結果:
這種操作簡單,但是就是效率低下,每次都得遍歷所有的任務。