title: java線程池使用 author: 哪吒 date: '2023-06-15'
點擊勘誤issues,哪吒感謝大家的閱讀
Java線程池使用指南
1. 線程池基礎使用
1.1 創建線程池的方式
方式一:使用Executors工具類(不推薦)
// 1. 固定大小線程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);// 2. 緩存線程池
ExecutorService cachedPool = Executors.newCachedThreadPool();// 3. 單線程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();// 4. 定時任務線程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3);
為什么不推薦使用Executors?
問題分析:
┌─────────────────────────────────────────────────────────┐
│ ?newFixedThreadPool & newSingleThreadExecutor ? ? ? ? ?│
│ ?↓ ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? │
│ ?使用LinkedBlockingQueue(無界隊列) ? ? ? ? ? ? ? ? ? ?│
│ ?↓ ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? │
│ ?可能導致內存溢出(OOM) ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?│
│ ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? │
│ ?newCachedThreadPool ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?│
│ ?↓ ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? │
│ ?最大線程數為Integer.MAX_VALUE ? ? ? ? ? ? ? ? ? ? ? ? ?│
│ ?↓ ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? │
│ ?可能創建大量線程導致系統崩潰 ? ? ? ? ? ? ? ? ? ? ? ? ? ?│
└─────────────────────────────────────────────────────────┘
方式二:手動創建ThreadPoolExecutor(推薦)
public?class?ThreadPoolFactory?{/*** 創建標準線程池*/public?static?ThreadPoolExecutor?createStandardPool()?{returnnew?ThreadPoolExecutor(5, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// 核心線程數10, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??// 最大線程數60L, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?// 空閑時間TimeUnit.SECONDS, ? ? ? ? ? ? ? ? ? ? ??// 時間單位new?ArrayBlockingQueue<>(100), ? ? ? ? ?// 工作隊列new?ThreadFactory() { ? ? ? ? ? ? ? ? ??// 線程工廠privatefinal?AtomicInteger threadNumber =?new?AtomicInteger(1);@Overridepublic?Thread?newThread(Runnable r)?{Thread t =?new?Thread(r,?"CustomPool-"?+ threadNumber.getAndIncrement());t.setDaemon(false); ?// 設置為用戶線程t.setPriority(Thread.NORM_PRIORITY);return?t;}},new?ThreadPoolExecutor.CallerRunsPolicy()?// 拒絕策略);}
}
2. 常用線程池類型及應用場景
2.1 CPU密集型任務線程池
public?class?CPUIntensiveThreadPool?{/*** CPU密集型任務線程池配置* 核心線程數 = CPU核心數 + 1*/public?static?ThreadPoolExecutor?createCPUIntensivePool()?{int?corePoolSize = Runtime.getRuntime().availableProcessors() +?1;returnnew?ThreadPoolExecutor(corePoolSize,corePoolSize,0L,TimeUnit.MILLISECONDS,new?LinkedBlockingQueue<>(50),new?ThreadFactory() {privatefinal?AtomicInteger threadNumber =?new?AtomicInteger(1);@Overridepublic?Thread?newThread(Runnable r)?{Thread t =?new?Thread(r,?"CPU-Pool-"?+ threadNumber.getAndIncrement());t.setDaemon(false);return?t;}},new?ThreadPoolExecutor.AbortPolicy());}// 使用示例:計算密集型任務public?static?void?main(String[] args)?{ThreadPoolExecutor executor = createCPUIntensivePool();// 提交計算任務for?(int?i =?0; i <?10; i++) {finalint?taskId = i;executor.submit(() -> {long?result = fibonacci(35);?// 計算斐波那契數列System.out.println("任務"?+ taskId +?"計算結果:"?+ result +?" - 線程:"?+ Thread.currentThread().getName());});}shutdownGracefully(executor);}private?static?long?fibonacci(int?n)?{if?(n <=?1)?return?n;return?fibonacci(n -?1) + fibonacci(n -?2);}private?static?void?shutdownGracefully(ExecutorService executor)?{executor.shutdown();try?{if?(!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow();}}?catch?(InterruptedException e) {executor.shutdownNow();Thread.currentThread().interrupt();}}
}
2.2 IO密集型任務線程池
public?class?IOIntensiveThreadPool?{/*** IO密集型任務線程池配置* 核心線程數 = CPU核心數 * 2*/public?static?ThreadPoolExecutor?createIOIntensivePool()?{int?corePoolSize = Runtime.getRuntime().availableProcessors() *?2;returnnew?ThreadPoolExecutor(corePoolSize,corePoolSize *?2,60L,TimeUnit.SECONDS,new?LinkedBlockingQueue<>(200),new?ThreadFactory() {privatefinal?AtomicInteger threadNumber =?new?AtomicInteger(1);@Overridepublic?Thread?newThread(Runnable r)?{Thread t =?new?Thread(r,?"IO-Pool-"?+ threadNumber.getAndIncrement());t.setDaemon(false);return?t;}},new?ThreadPoolExecutor.CallerRunsPolicy());}// 使用示例:文件讀寫任務public?static?void?main(String[] args)?{ThreadPoolExecutor executor = createIOIntensivePool();// 提交IO任務for?(int?i =?0; i <?20; i++) {finalint?taskId = i;executor.submit(() -> {try?{// 模擬文件讀寫操作Thread.sleep(1000);?// 模擬IO等待System.out.println("任務"?+ taskId +?"完成文件操作 - 線程:"?+?Thread.currentThread().getName());}?catch?(InterruptedException e) {Thread.currentThread().interrupt();}});}shutdownGracefully(executor);}private?static?void?shutdownGracefully(ExecutorService executor)?{executor.shutdown();try?{if?(!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow();}}?catch?(InterruptedException e) {executor.shutdownNow();Thread.currentThread().interrupt();}}
}
2.3 定時任務線程池
public?class?ScheduledThreadPoolExample?{public?static?void?main(String[] args)?throws?InterruptedException?{ScheduledThreadPoolExecutor scheduler =?new?ScheduledThreadPoolExecutor(3,new?ThreadFactory() {privatefinal?AtomicInteger threadNumber =?new?AtomicInteger(1);@Overridepublic?Thread?newThread(Runnable r)?{Thread t =?new?Thread(r,?"Scheduler-"?+ threadNumber.getAndIncrement());t.setDaemon(false);return?t;}});// 1. 延遲執行任務scheduler.schedule(() -> {System.out.println("延遲3秒執行的任務 - "?+?new?Date());},?3, TimeUnit.SECONDS);// 2. 固定頻率執行任務ScheduledFuture<?> fixedRateTask = scheduler.scheduleAtFixedRate(() -> {System.out.println("每2秒執行一次的任務 - "?+?new?Date());},?1,?2, TimeUnit.SECONDS);// 3. 固定延遲執行任務ScheduledFuture<?> fixedDelayTask = scheduler.scheduleWithFixedDelay(() -> {System.out.println("上次執行完成后延遲1秒再執行 - "?+?new?Date());try?{Thread.sleep(500);?// 模擬任務執行時間}?catch?(InterruptedException e) {Thread.currentThread().interrupt();}},?1,?1, TimeUnit.SECONDS);// 運行10秒后停止Thread.sleep(10000);fixedRateTask.cancel(false);fixedDelayTask.cancel(false);scheduler.shutdown();}
}
3. 線程池監控與管理
3.1 線程池狀態監控
public?class?ThreadPoolMonitor?{/*** 創建可監控的線程池*/public?static?ThreadPoolExecutor?createMonitorablePool()?{returnnew?ThreadPoolExecutor(5,?10,?60L, TimeUnit.SECONDS,new?ArrayBlockingQueue<>(100),new?ThreadFactory() {privatefinal?AtomicInteger threadNumber =?new?AtomicInteger(1);@Overridepublic?Thread?newThread(Runnable r)?{Thread t =?new?Thread(r,?"Monitor-Pool-"?+ threadNumber.getAndIncrement());t.setDaemon(false);return?t;}},new?ThreadPoolExecutor.CallerRunsPolicy()) {@Overrideprotected?void?beforeExecute(Thread t, Runnable r)?{super.beforeExecute(t, r);System.out.println("任務開始執行 - 線程:"?+ t.getName());}@Overrideprotected?void?afterExecute(Runnable r, Throwable t)?{super.afterExecute(r, t);if?(t !=?null) {System.err.println("任務執行異常:"?+ t.getMessage());}?else?{System.out.println("任務執行完成");}}@Overrideprotected?void?terminated()?{super.terminated();System.out.println("線程池已終止");}};}/*** 打印線程池狀態信息*/public?static?void?printPoolStatus(ThreadPoolExecutor executor)?{System.out.println("\n=== 線程池狀態信息 ===");System.out.println("核心線程數:"?+ executor.getCorePoolSize());System.out.println("最大線程數:"?+ executor.getMaximumPoolSize());System.out.println("當前線程數:"?+ executor.getPoolSize());System.out.println("活躍線程數:"?+ executor.getActiveCount());System.out.println("隊列中任務數:"?+ executor.getQueue().size());System.out.println("已完成任務數:"?+ executor.getCompletedTaskCount());System.out.println("總任務數:"?+ executor.getTaskCount());System.out.println("是否關閉:"?+ executor.isShutdown());System.out.println("是否終止:"?+ executor.isTerminated());System.out.println("========================\n");}
}
3.2 線程池異常處理
public?class?ThreadPoolExceptionHandling?{/*** 創建帶異常處理的線程池*/public?static?ThreadPoolExecutor?createSafeThreadPool()?{returnnew?ThreadPoolExecutor(5,?10,?60L, TimeUnit.SECONDS,new?ArrayBlockingQueue<>(50),new?ThreadFactory() {privatefinal?AtomicInteger threadNumber =?new?AtomicInteger(1);@Overridepublic?Thread?newThread(Runnable r)?{Thread t =?new?Thread(r,?"Safe-Pool-"?+ threadNumber.getAndIncrement());t.setDaemon(false);// 設置未捕獲異常處理器t.setUncaughtExceptionHandler((thread, ex) -> {System.err.println("線程 "?+ thread.getName() +?" 發生未捕獲異常:"?+ ex.getMessage());ex.printStackTrace();});return?t;}},new?ThreadPoolExecutor.CallerRunsPolicy());}/*** 安全任務包裝器*/public?static?Runnable?wrapTask(Runnable task)?{return?() -> {try?{task.run();}?catch?(Exception e) {System.err.println("任務執行異常:"?+ e.getMessage());e.printStackTrace();// 可以在這里添加異常上報邏輯}};}public?static?void?main(String[] args)?{ThreadPoolExecutor executor = createSafeThreadPool();// 提交可能拋異常的任務executor.submit(wrapTask(() -> {System.out.println("正常任務執行");}));executor.submit(wrapTask(() -> {thrownew?RuntimeException("模擬任務異常");}));// 使用Future處理異常Future<?> future = executor.submit(() -> {thrownew?RuntimeException("Future異常");});try?{future.get();}?catch?(ExecutionException e) {System.err.println("通過Future捕獲異常:"?+ e.getCause().getMessage());}?catch?(InterruptedException e) {Thread.currentThread().interrupt();}executor.shutdown();}
}
4. 最佳實踐與注意事項
4.1 線程池參數配置指南
public?class?ThreadPoolConfigGuide?{/*** 根據任務類型配置線程池*/publicstaticclass?ThreadPoolConfigurator?{// CPU密集型任務配置public?static?ThreadPoolExecutor?forCPUIntensive()?{int?processors = Runtime.getRuntime().availableProcessors();returnnew?ThreadPoolExecutor(processors, ? ? ? ? ? ? ? ? ? ?// 核心線程數 = CPU核心數processors, ? ? ? ? ? ? ? ? ? ?// 最大線程數 = CPU核心數0L, TimeUnit.MILLISECONDS, ? ??// 無空閑時間new?ArrayBlockingQueue<>(processors *?2),?// 隊列大小適中new?ThreadPoolExecutor.AbortPolicy());}// IO密集型任務配置public?static?ThreadPoolExecutor?forIOIntensive()?{int?processors = Runtime.getRuntime().availableProcessors();returnnew?ThreadPoolExecutor(processors *?2, ? ? ? ? ? ? ? ?// 核心線程數 = CPU核心數 * 2processors *?4, ? ? ? ? ? ? ? ?// 最大線程數 = CPU核心數 * 460L, TimeUnit.SECONDS, ? ? ? ??// 空閑60秒回收new?LinkedBlockingQueue<>(1000),?// 較大的隊列new?ThreadPoolExecutor.CallerRunsPolicy());}// 混合型任務配置public?static?ThreadPoolExecutor?forMixed()?{int?processors = Runtime.getRuntime().availableProcessors();returnnew?ThreadPoolExecutor(processors +?1, ? ? ? ? ? ? ? ?// 核心線程數 = CPU核心數 + 1processors *?2, ? ? ? ? ? ? ? ?// 最大線程數 = CPU核心數 * 260L, TimeUnit.SECONDS,new?ArrayBlockingQueue<>(200),new?ThreadPoolExecutor.CallerRunsPolicy());}}
}
4.2 常見問題與解決方案
public?class?ThreadPoolTroubleshooting?{/*** 問題1:線程池任務積壓* 解決方案:動態調整線程池大小*/publicstaticclass?DynamicThreadPool?{privatefinal?ThreadPoolExecutor executor;privatefinal?ScheduledExecutorService monitor;public?DynamicThreadPool(ThreadPoolExecutor executor)?{this.executor = executor;this.monitor = Executors.newSingleThreadScheduledExecutor();startMonitoring();}private?void?startMonitoring()?{monitor.scheduleAtFixedRate(() -> {int?queueSize = executor.getQueue().size();int?activeCount = executor.getActiveCount();int?corePoolSize = executor.getCorePoolSize();int?maxPoolSize = executor.getMaximumPoolSize();// 隊列積壓嚴重,增加線程if?(queueSize >?100?&& activeCount >= corePoolSize *?0.8) {int?newCoreSize = Math.min(corePoolSize +?1, maxPoolSize);if?(newCoreSize > corePoolSize) {executor.setCorePoolSize(newCoreSize);System.out.println("增加核心線程數至:"?+ newCoreSize);}}// 隊列空閑,減少線程if?(queueSize <?10?&& activeCount < corePoolSize *?0.5?&& corePoolSize >?2) {executor.setCorePoolSize(corePoolSize -?1);System.out.println("減少核心線程數至:"?+ (corePoolSize -?1));}},?0,?30, TimeUnit.SECONDS);}}/*** 問題2:任務執行時間過長* 解決方案:任務超時控制*/publicstaticclass?TimeoutTaskExecutor?{privatefinal?ThreadPoolExecutor executor;public?TimeoutTaskExecutor(ThreadPoolExecutor executor)?{this.executor = executor;}public?<T>?T?executeWithTimeout(Callable<T> task,?long?timeout, TimeUnit unit)?throws?InterruptedException, ExecutionException, TimeoutException?{Future<T> future = executor.submit(task);try?{return?future.get(timeout, unit);}?catch?(TimeoutException e) {future.cancel(true);?// 取消任務throw?e;}}}
}
4.3 性能優化建議
線程池性能優化清單:1. 參數配置優化? 根據任務類型選擇合適的核心線程數? 設置合理的最大線程數和隊列大小? 選擇適當的拒絕策略2. 任務設計優化? 避免任務執行時間過長? 合理拆分大任務? 避免任務間的強依賴關系3. 監控與調優? 定期監控線程池狀態? 記錄任務執行時間? 根據監控數據調整參數4. 資源管理? 及時關閉線程池? 避免創建過多線程池? 合理設置線程優先級
5. 總結
Java線程池是并發編程的重要工具,正確使用線程池可以:
提高性能:減少線程創建和銷毀的開銷
控制資源:限制并發線程數量,避免系統資源耗盡
提高響應性:復用線程,快速響應任務請求
便于管理:統一管理線程生命周期
關鍵要點:
避免使用
Executors
創建線程池,推薦手動創建ThreadPoolExecutor
根據任務類型(CPU密集型/IO密集型)合理配置參數
實施有效的監控和異常處理機制
注意線程池的優雅關閉
定期評估和調優線程池配置
通過遵循這些最佳實踐,可以充分發揮線程池的優勢,構建高性能、穩定的并發應用程序。