引言
由于項目中有處理大量后臺任務并且串行
執行的需求,特意寫了一個簡易的任務調度器,方便監控每個任務執行和異常情況,任務之間互不影響。正如上所述,Kotlin中的TaskScheduler類提供了一個強大的解決方案,用于使用ScheduledExecutorService異步地排隊和執行任務。
使用方法
1.初始化:
val taskListener = object : TaskScheduler.TaskListener {override fun beforeExecute(task: TaskScheduler.NamedRunnable) {println("開始任務:${task.name}")}override fun afterExecute(task: TaskScheduler.NamedRunnable, exception: Exception?) {println("完成任務:${task.name},異常:$exception")}
}
val scheduler = TaskScheduler(taskListener, 5)
2.提交任務:
scheduler.submit("加載數據") {// 加載數據的代碼
}
scheduler.submit("處理數據") {// 處理數據的代碼
}
3.優雅關閉:
當所有任務完成后,調度器將在指定的超時后自動關閉,確保不浪費資源。
完整代碼
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBooleanclass TaskScheduler(private val listener: TaskListener? = null, private val timeout: Long = 5) {private val taskQueue = ConcurrentLinkedQueue<NamedRunnable>()private val isTaskRunning = AtomicBoolean(false)private var executorService: ScheduledExecutorService? = null@Synchronizedfun submit(name: String, task: Runnable) {ensureExecutorService()taskQueue.offer(NamedRunnable(name, task))if (isTaskRunning.compareAndSet(false, true)) {executorService?.submit { processTasks() }}}private fun processTasks() {try {while (taskQueue.isNotEmpty()) {val nextTask = taskQueue.poll()listener?.beforeExecute(nextTask)var exception: Exception? = nulltry {nextTask.run()} catch (e: Exception) {exception = e}listener?.afterExecute(nextTask, exception)}} finally {isTaskRunning.set(false)scheduleShutdown()}}private fun ensureExecutorService() {if (executorService == null || executorService!!.isShutdown) {executorService = Executors.newSingleThreadScheduledExecutor()println("ensureExecutorService newSingleThreadScheduledExecutor")}}private fun scheduleShutdown() {executorService?.schedule({if (taskQueue.isEmpty() && isTaskRunning.compareAndSet(false, true)) {executorService?.shutdown()executorService = nullprintln("scheduleShutdown shutdown")} else {isTaskRunning.set(false) // 確保新任務可以觸發執行器重啟}}, timeout, TimeUnit.SECONDS)}interface TaskListener {fun beforeExecute(task: NamedRunnable)fun afterExecute(task: NamedRunnable, exception: Exception?)}class NamedRunnable(val name: String, private val task: Runnable) : Runnable {override fun run() {task.run()}}
}
最后
簡要概括下優缺點:
- 資源自動管理,超時自動釋放資源
- 任務命名,更清晰的了解每個任務執行情況
- 線程安全,不用擔心多線程添加任務導致順序紊亂
優點:
- 靈活性:允許動態添加任務,并根據任務負載需要創建或關閉執行器,從而管理執行器的生命周期。
缺點:
- 單線程限制:當前實現使用單線程執行器,這意味著任務是順序執行的,而不是并行執行。這可能是CPU密集型任務的瓶頸。