一、先說結論
得看線程池的實現,JUC 的線程池(ThreadPoolExecutor)的話
- 不會影響其他的線程
- 若是 submit 方法,或者任務為 future 任務,異常只有在 get 的時候才會拋出
- 若是 execute + runnable 任務,異常就會拋出,線程掛掉后,線程池移除該線程并創建一個新的線程
若通過 submit 提交任務,會將任務封裝到 future 里面或者原任務本身就是 future 任務,而 future 的 run 方法執行的時候,拋出的異常在其內部會被捕獲,等到 get 方法的時候才會拋出。
execute + runnable,并沒有涉及 future,雖然異常會被捕獲,但也以為被重新拋出,導致線程中斷,線程池需要移除線程,并創建新的線程
二、submit、execute 方法
線程運行任務核心代碼
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);try {task.run();afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}
三、異常處理
比較全面的處理,主要是,afterExecute 方法,這個哪怕拋出異常也會執行
new ThreadPoolExecutor(IO_CORE,IO_MAX,KEEP_ALIVE_SECOND,TimeUnit.SECONDS,// 任務隊列存儲超過核心線程數的任務new LinkedBlockingDeque<>(QUEUE_SIZE),r -> {Thread thread = new Thread(r);thread.setDaemon(Boolean.TRUE);thread.setName(String.format("%s, message-process-thread-%d", threadName, NUM.getAndIncrement()));return thread;}
) {@Overrideprotected void afterExecute(Runnable r, Throwable t) {// 若 t 不為 null,正常處理if (Objects.nonNull(t)) {log.error(t.getMessage());}// 特別注意的是 futureTask 在 run 的時候不會立即拋異常,而是吞掉,在調用 get 的時候才能拋出// 如果是 submit 提交,原本的任務被封裝成 futureTask,異常不會在 t 里,而是在 futureTask 里(但原本的任務是 futureTask 的話,則應該是原本的任務 get 的時候才會拋異常)// 如果是 execute,則 r 還是原來的任務,但不排除 r 本來就是 futureTask,那么其錯誤信息本來就應該通過 get 獲取,在這里處理一下也無妨,不影響原本的處理結果即可// 原任務為 futureTask 的時候,get 時一定要處理異常if (r instanceof Future<?> futureTask) {try {futureTask.get();} catch (Exception e) {log.error(e.getMessage());}}}
};
log.info("線程池已經初始化");
EXECUTOR.allowCoreThreadTimeOut(Boolean.TRUE);
// JVM 關閉時的鉤子函數
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread("IO 密集型任務線程池", (Callable<Void>) () -> {shutdownThreadPoolGracefully(EXECUTOR);return null;})
);