文章目錄
- 前言
- 線程池各類場景描述
- 常見場景案例設計思路
- 公共類
- 自定義工廠類-MyThreadFactory
- 自定義拒絕策略-RejectedExecutionHandlerFactory
- 自定義阻塞隊列-TaskQueue(實現 核心線程->最大線程數->隊列)
- 場景1:CPU密集型場景
- 思路&計算公式
- 實現代碼
- 場景2:IO密集型場景
- 思路&計算公式
- 實現代碼
- 其他部分組成
- 拒絕策略兜底方案
- 思路設計及思考
- 設計1:數據庫持久化方案
- 設計2:Netty兩種拒絕策略實現(根據場景來進行是否重試入隊 + 失敗拋異常)
- 設計3:ActiveMQ(有效時間內嘗試入隊+入隊失敗拋出異常)
- 設計4:dubbo設計思路(dump文件+拋出異常)
- 設計5: 自定義設計-阻塞入隊
- 參考文章
前言
本章節配套源碼:
- gitee:https://gitee.com/changluJava/demo-exer/tree/master/JUC/src/main/java/demo10
線程池各類場景描述
**類型場景:**不同的場景設置參數也各不相同
- 第一種:CPU密集型:最大線程數應該等于CPU核數+1,這樣最大限度提高效率。
// 通過該代碼獲取當前運行環境的cpu核數
Runtime.getRuntime().availableProcessors();
-
**第二種:**IO密集型:主要是進行IO操作,執行IO操作的時間較長,這時cpu出于空閑狀態,導致cpu的利用率不高。線程數為2倍CPU核數。當其中的線程在IO操作的時候,其他線程可以繼續用cpu,提高了cpu的利用率。
-
第三種:混合型:如果CPU密集型和IO密集型執行時間相差不大那么可以拆分;如果兩種執行時間相差很大,就沒必要拆分了。
-
**第四種(了解):**在IO優化中,線程等待時間所占比越高,需要線程數越多;線程cpu時間占比越高,需要越少線程數。
線程池初始化所有參數:
corePoolSize : 核心線程數,當線程池中的線程數量為 corePoolSize 時,即使這些線程處于空閑狀態,也不會銷毀(除非設置 allowCoreThreadTimeOut)。
maximumPoolSize : 最大線程數,線程池中允許的線程數量的最大值。
keepAliveTime : 線程空閑時間,當線程池中的線程數大于 corePoolSize 時,多余的空閑線程將在銷毀之前等待新任務的最長時間。
workQueue : 任務隊列
unit : 線程空閑時間的單位。
threadFactory : 線程工廠,線程池創建線程時使用的工廠。
handler : 拒絕策略,因達到線程邊界和任務隊列滿時,針對新任務的處理方法。CallerRunsPolicy:由提交任務的線程直接執行任務,避免任務丟失。適合任務量波動較大的場景。AbortPolicy:直接拋出 RejectedExecutionException 異常。適合任務量可控的場景。DiscardPolicy:靜默丟棄任務,不拋出異常。適合對任務丟失不敏感的場景。DiscardOldestPolicy:丟棄隊列中最舊的任務,然后重新嘗試提交當前任務。適合對任務時效性要求較高的場景。
核心線程池execute邏輯代碼:
public void execute(Runnable command) {//任務判空if (command == null)throw new NullPointerException();//查看當前運行的線程數量int c = ctl.get();//若小于核心線程則直接添加一個工作線程并執行任務if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//如果線程數等于核心線程數則嘗試將任務入隊if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}//入隊失敗,調用addWorker參數為false,嘗試創建應急線程處理突發任務else if (!addWorker(command, false))//如果創建應急線程失敗,說明當前線程數已經大于最大線程數,這個任務只能拒絕了reject(command);}
常見場景案例設計思路
公共類
自定義工廠類-MyThreadFactory
MyThreadFactory.java
:自定義了線程池工廠類,可以自行進行命名
package demo10;import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** 自定義線程池工廠類*/
public class MyThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public MyThreadFactory(String factoryName) {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = factoryName + "-pool-" +poolNumber.getAndIncrement() +"-thread-";}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}
}
自定義拒絕策略-RejectedExecutionHandlerFactory
RejectedExecutionHandlerFactory.java
:包含有多種拒絕策略,其中包含本次需要使用的阻塞入隊拒絕策略
package demo10;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;/*** 拒絕策略工廠類**/
@Slf4j
public class RejectedExecutionHandlerFactory {private static final AtomicLong COUNTER = new AtomicLong();/*** 拒絕執行,拋出 RejectedExecutionException* @param source name for log* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newAbort(String source) {return (r, e) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Abort, Maybe you need to adjust the ThreadPool config!", source, e, r);throw new RejectedExecutionException("Task " + r.toString() +" rejected from " + source);};}/*** 直接丟棄該任務* @param source log name* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newDiscard(String source) {return (r, p) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Discard, Maybe you need to adjust the ThreadPool config!", source, p, r);};}/*** 調用線程運行* @param source log name* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newCallerRun(String source) {System.out.println("thread =>" + Thread.currentThread().getName() + "觸發阻塞中...");return (r, p) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", source, p, r);if (!p.isShutdown()) {r.run();}};}/*** 新線程運行* @param source log name* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newThreadRun(String source) {return (r, p) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!, Maybe you need to adjust the ThreadPool config!", source, p, r);if (!p.isShutdown()) {String threadName = source + "-T-" + COUNTER.getAndIncrement();log.info("[{}] create new thread[{}] to run job", source, threadName);new Thread(r, threadName).start();}};}/*** 依據阻塞隊列put 阻塞添加到隊列中* @return 拒絕策略執行器*/public static RejectedExecutionHandler blockCallerPolicy(String source) {return new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", source, e, r);if (!e.isShutdown()) {try {// 阻塞入隊操作,阻塞方為調用方執行submitjob的線程e.getQueue().put(r);} catch (InterruptedException ex) {log.error("reject put queue error", ex);}}}};}}
自定義阻塞隊列-TaskQueue(實現 核心線程->最大線程數->隊列)
TaskQueue.java
:線程池中實現先使用核心線程數
package demo10;import java.util.concurrent.*;public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {private transient ThreadPoolExecutor parent;public TaskQueue(int capacity) {super(capacity);}public void setExecutor(ThreadPoolExecutor parent) {this.parent = parent;}/*** 核心線程 -> 最大核心線程數 -> 隊列* @param runnable the element to add* @return*/@Overridepublic boolean offer(Runnable runnable) {// 如果沒有線程池父類,則直接嘗試入隊if (parent == null) return super.offer(runnable);// 若是工作線程數 < 最大線程數,則優先創建線程跑任務if (parent.getPoolSize() < parent.getMaximumPoolSize()) return false;// 工作線程數 >= 最大線程數,入隊return super.offer(runnable);}
}
場景1:CPU密集型場景
思路&計算公式
**場景:**具體是指那種包含大量運算、在持有的 CPU 分配的時間片上一直在執行任務、幾乎不需要依賴或等待其他任何東西。處理起來其實沒有多少優化空間,因為處理時幾乎沒有等待時間,所以一直占有 CPU 進行執行,才是最好的方式。
**可優化的點:**就是當單個線程累計較多任務時,其他線程能進行分擔,類似fork/join框架
的概念。
設置參數:設置線程數時,針對單臺機器,最好就是有幾個 CPU ,就創建幾個線程,然后每個線程都在執行這種任務,永不停歇。
Nthreads=Ncpu+1
w/c =0
理解也是正確的,+1 主要是防止因為系統上下文切換,讓系統資源跑滿!
實現代碼
這里核心+最大線程數使用的是CPU核心數+1:
package demo10.cpu;import demo10.MyThreadFactory;
import demo10.RejectedExecutionHandlerFactory;
import demo10.TaskQueue;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** cpu密集型場景任務提交* 自定義隊列:核心線程 -> 最大線程 -> 隊列* 自定義拒絕策略:自定義采用執行阻塞隊列的put操作來實現任務阻塞入隊,而非直接使用調用者線程來直接跑任務* 非影響主線程執行流程:批次1000個任務統一在一個線程中去進行處理,與主流程main線程隔離**/
@Slf4j
public class CPUThreadPoolExample {public static void main(String[] args) {// 獲取 CPU 核心數int cpuCores = Runtime.getRuntime().availableProcessors();// 自定義線程池參數int corePoolSize = cpuCores + 1; // 核心線程數 cpu核心數+1int maximumPoolSize = corePoolSize; // 最大線程數 cpu核心數+1long keepAliveTime = 60L; // 空閑線程存活時間TimeUnit unit = TimeUnit.SECONDS; // 時間單位// 自定義任務隊列 核心線程 -> 最大核心線程數 -> 隊列TaskQueue<Runnable> taskQueue = new TaskQueue<>(500); // 隊列容量為核心線程數的 2 倍// 創建自定義線程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,taskQueue,new MyThreadFactory("IOIntensiveThreadPool"), // 默認線程工廠 Executors.defaultThreadFactory() | 自定義工廠支持自定義線程池名字RejectedExecutionHandlerFactory.blockCallerPolicy("IOIntensiveThreadPool"));// 將線程池對象設置到任務隊列中taskQueue.setExecutor(executor);// 統計任務的執行數量int jobNums = 1000000;final AtomicInteger count = new AtomicInteger(0);// 記錄任務開始時間long startTime = System.currentTimeMillis();// 單獨開一個線程(后續可改為線程池 核心、最大就1個場景)去完成整個任務提交處理// 如果submitjob阻塞,僅僅只會影響該thread線程new Thread(() -> {CountDownLatch latch = new CountDownLatch(jobNums);// 模擬1000個任務 (可改造為queue隊列形式去在這個線程中去消費)for (int i = 0; i < jobNums; i++) {final int taskId = i;executor.submit(() -> {// CPU計算int sum = 0;for (int j = 0; j < 100000; j++) {sum += j;}System.out.println(Thread.currentThread().getName() + " 任務 " + taskId + " 完成!sum = " + sum);count.incrementAndGet(); // 原子操作,+1 并返回新值latch.countDown();});}System.out.println("所有任務提交完成!");// 關閉線程池,等待任務全部執行完畢try {latch.await();System.out.println("所有任務執行結束!");// 記錄任務結束時間long endTime = System.currentTimeMillis();// 計算任務執行時間long duration = endTime - startTime;System.out.println("任務執行總耗時: " + duration + " 毫秒");} catch (InterruptedException e) {e.printStackTrace();throw new RuntimeException(e);}finally {executor.shutdown();}}).start();try {// 等待所有任務完成if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow(); // 強制關閉}System.out.println("執行完任務數統計:" + count.get());} catch (InterruptedException e) {executor.shutdownNow();}}
}
效果:
場景2:IO密集型場景
思路&計算公式
**場景:**其消耗的主要資源就是 IO 了,所接觸到的 IO ,大致可以分成兩種:磁盤 IO
和網絡 IO
。IO 操作的特點就是需要等待,我們請求一些數據,由對方將數據寫入緩沖區
,在這段時間中,需要讀取數據的線程根本無事可做,因此可以把 CPU 時間片讓出去,直到緩沖區
寫滿。
- 磁盤 IO ,大多都是一些針對磁盤的讀寫操作,最常見的就是文件的讀寫,假如你的數據庫、 Redis 也是在本地的話,那么這個也屬于磁盤 IO。
- 網絡 IO ,這個應該是大家更加熟悉的,我們會遇到各種網絡請求,比如 http 請求、遠程數據庫讀寫、遠程 Redis 讀寫等等。
設置參數:
# 如果存在IO,那么肯定w/c>1(阻塞耗時一般都是計算耗時的很多倍),但是需要考慮系統內存有限(每開啟一個線程都需要內存空間),這里需要上服務器測試具體多少個線程數適合(CPU占比、線程數、總耗時、內存消耗)。如果不想去測試,保守點取1即,Nthreads=Ncpu*(1+1)=2Ncpu。這樣設置一般都OK
# 通用就是2倍的CPU核心數(如果要效率最大化,需要測算當前系統環境每個線程任務的阻塞等待時間與實際計算時間)
Nthreads=Ncpu*(1+w/c)
公式中 W/C 為系統 阻塞率 w:等待時間 c:計算時間
實現代碼
IOIntensiveThreadPoolExample2.java
:這里最終實現的Example2類來進行測試
package demo10.io;import demo10.MyThreadFactory;
import demo10.RejectedExecutionHandlerFactory;
import demo10.TaskQueue;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** io密集型場景任務提交* demo3:基于demo2自定義拒絕策略* 自定義隊列:核心線程 -> 最大線程 -> 隊列* 自定義拒絕策略:自定義采用執行阻塞隊列的put操作來實現任務阻塞入隊,而非直接使用調用者線程來直接跑任務* 非影響主線程執行流程:批次1000個任務統一在一個線程中去進行處理,與主流程main線程隔離**/
@Slf4j
public class IOIntensiveThreadPoolExample2 {public static void main(String[] args) {// 獲取 CPU 核心數int cpuCores = Runtime.getRuntime().availableProcessors();// 自定義線程池參數int corePoolSize = cpuCores * 2; // 核心線程數(IO 密集型任務可以設置較大)int maximumPoolSize = cpuCores * 4; // 最大線程數long keepAliveTime = 60L; // 空閑線程存活時間TimeUnit unit = TimeUnit.SECONDS; // 時間單位// 自定義任務隊列 核心線程 -> 最大核心線程數 -> 隊列TaskQueue<Runnable> taskQueue = new TaskQueue<>(corePoolSize * 2); // 隊列容量為核心線程數的 2 倍// 創建自定義線程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,taskQueue,new MyThreadFactory("IOIntensiveThreadPool"), // 默認線程工廠 Executors.defaultThreadFactory() | 自定義工廠支持自定義線程池名字RejectedExecutionHandlerFactory.blockCallerPolicy("IOIntensiveThreadPool"));// 將線程池對象設置到任務隊列中taskQueue.setExecutor(executor);// 統計任務的執行數量int jobNums = 1000;final AtomicInteger count = new AtomicInteger(0);// 記錄任務開始時間long startTime = System.currentTimeMillis();// 單獨開一個線程(后續可改為線程池 核心、最大就1個場景)去完成整個任務提交處理// 如果submitjob阻塞,僅僅只會影響該thread線程new Thread(() -> {CountDownLatch latch = new CountDownLatch(jobNums);// 模擬1000個任務 (可改造為queue隊列形式去在這個線程中去消費)for (int i = 0; i < jobNums; i++) {final int taskId = i;executor.submit(() -> {System.out.println(Thread.currentThread().getName() + " 正在執行任務 " + taskId + "...");try {Thread.sleep(500); // 模擬 IO 操作(如網絡請求或文件讀寫)10s// xxxio類耗時操作} catch (InterruptedException e) {e.printStackTrace();throw new RuntimeException(e);}finally {System.out.println(Thread.currentThread().getName() + " 任務 " + taskId + " 完成!");count.incrementAndGet(); // 原子操作,+1 并返回新值latch.countDown();}});}System.out.println("所有任務提交完成!");// 關閉線程池,等待任務全部執行完畢try {latch.await();System.out.println("所有任務執行結束!");// 記錄任務結束時間long endTime = System.currentTimeMillis();// 計算任務執行時間long duration = endTime - startTime;System.out.println("任務執行總耗時: " + duration + " 毫秒");} catch (InterruptedException e) {e.printStackTrace();throw new RuntimeException(e);}finally {executor.shutdown();}}).start();try {// 等待所有任務完成if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow(); // 強制關閉}System.out.println("執行完任務數統計:" + count.get());} catch (InterruptedException e) {executor.shutdownNow();}}
}
一個任務耗時0.5s,1000個任務執行如下:
說明:經過測試驗證,如果IO阻塞時間特別長,調大最大核心線程數效果更好。
其他部分組成
拒絕策略兜底方案
思路設計及思考
如果核心線程、最大線程、隊列都滿了的情況下該如何處理?如果本身就是單臺機器資源打滿,就需要在設計策略上改變線程池的調度方案,如果我的目的是任何一個任務都不丟棄,同時在服務器上有余力及時處理?
方案1:持久化數據庫設計
- 如:設計一張任務表間任務存儲到 MySQL 數據庫中;redis緩存;任務提交到中間件來緩沖。
設計思路可以如下:參考https://zhuanlan.zhihu.com/p/700719289
方案2:Netty 為例,它的拒絕策略則是直接創建一個線程池以外的線程處理這些任務,為了保證任務的實時處理,這種做法可能需要良好的硬件設備且臨時創建的線程無法做到準確的監控。
- 后續通過翻閱源碼發現一種在拒絕策略場景帶退避的重試策略。
方案3:ActiveMQ 則是嘗試在指定的時效內盡可能的爭取將任務入隊,以保證最大交付
**方案4:**dubbo設計思路(dump文件+拋出異常)
方案5:線程阻塞隊列
思路:隊列采用阻塞隊列,在拒絕策略方法中使用put方法實現阻塞效果。
可能情況:阻塞主線程任務執行。
設計1:數據庫持久化方案
設計思路:自定義拒絕策略,在拒絕策略情況下進行數據庫持久化;自定義實現隊列,在poll的時候優先從db獲取任務,接著再從隊列中獲取。
**詳細具體實現可見:**某大廠線程池拒絕策略連環問 https://blog.csdn.net/shark_chili3007/article/details/137042400
設計2:Netty兩種拒絕策略實現(根據場景來進行是否重試入隊 + 失敗拋異常)
實現思路1:創建新線程執行任務
說明:為了保證任務的實時處理,這種做法需要良好的硬件設備且臨時創建的線程無法做到準確的監控。
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {NewThreadRunsPolicy() {super();}public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {//創建一個臨時線程處理任務final Thread t = new Thread(r, "Temporary task executor");t.start();} catch (Throwable e) {throw new RejectedExecutionException("Failed to start a new thread", e);}}
}
弊端:如果任務數特別多無上限場景,就會出現oom情況,導致服務掛掉。
實現思路2:拒絕策略場景帶退避的重試策略。
源碼地址:https://github.dev/netty/netty
- 具體代碼文件:RejectedExecutionHandlers
/*** Tries to backoff when the task can not be added due restrictions for an configured amount of time. This* is only done if the task was added from outside of the event loop which means* {@link EventExecutor#inEventLoop()} returns {@code false}.*/
public static RejectedExecutionHandler backoff(final int retries, long backoffAmount, TimeUnit unit) {// 檢查 retries 參數是否為正數,如果不是則拋出異常ObjectUtil.checkPositive(retries, "retries");// 將退避時間轉換為納秒final long backOffNanos = unit.toNanos(backoffAmount);// 返回一個實現了 RejectedExecutionHandler 接口的匿名類return new RejectedExecutionHandler() {@Overridepublic void rejected(Runnable task, SingleThreadEventExecutor executor) {// 檢查當前線程是否不是事件循環線程if (!executor.inEventLoop()) {// 進行最多 retries 次重試for (int i = 0; i < retries; i++) {// 嘗試喚醒事件循環線程,以便它能夠處理任務隊列中的任務executor.wakeup(false);// 當前線程休眠指定的退避時間LockSupport.parkNanos(backOffNanos);// 嘗試將任務重新加入任務隊列if (executor.offerTask(task)) {// 如果任務成功加入隊列,則直接返回return;}}}// 如果當前線程是事件循環線程,或者重試次數用盡后仍然無法加入任務隊列,// 則拋出 RejectedExecutionException 異常throw new RejectedExecutionException();}};
}
設計3:ActiveMQ(有效時間內嘗試入隊+入隊失敗拋出異常)
說明:嘗試在指定的時效內盡可能的爭取將任務入隊,以保證最大交付,超過時間內則返回false。
github地址:https://github.dev/apache/activemq
- 對應代碼:BrokerService#getExecutor
new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {try {// 在60s內進行嘗試入隊,如果入隊失敗,則拋出異常if (!executor.getQueue().offer(r, 60, TimeUnit.SECONDS)) {throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");}} catch (InterruptedException e) {throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");}}}
設計4:dubbo設計思路(dump文件+拋出異常)
github地址:
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg = String.format("Thread pool is EXHAUSTED!"+ " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d),"+ " Task: %d (completed: %d),"+ " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",threadName,e.getPoolSize(),e.getActiveCount(),e.getCorePoolSize(),e.getMaximumPoolSize(),e.getLargestPoolSize(),e.getTaskCount(),e.getCompletedTaskCount(),e.isShutdown(),e.isTerminated(),e.isTerminating(),url.getProtocol(),url.getIp(),url.getPort());// 0-1 - Thread pool is EXHAUSTED!logger.warn(COMMON_THREAD_POOL_EXHAUSTED, "too much client requesting provider", "", msg);if (Boolean.parseBoolean(url.getParameter(DUMP_ENABLE, Boolean.TRUE.toString()))) {// 進行dump文件dumpJStack();}// 指派發送消息給listener監聽器dispatchThreadPoolExhaustedEvent(msg);throw new RejectedExecutionException(msg);
}
dubbo的工作線程觸發了線程拒絕后,主要做了三個事情,原則就是盡量讓使用者清楚觸發線程拒絕策略的真實原因。
1)輸出了一條警告級別的日志,日志內容為線程池的詳細設置參數,以及線程池當前的狀態,還有當前拒絕任務的一些詳細信息。可以說,這條日志,使用dubbo的有過生產運維經驗的或多或少是見過的,這個日志簡直就是日志打印的典范,其他的日志打印的典范還有spring。得益于這么詳細的日志,可以很容易定位到問題所在
2)輸出當前線程堆棧詳情,這個太有用了,當你通過上面的日志信息還不能定位問題時,案發現場的dump線程上下文信息就是你發現問題的救命稻草。
3)繼續拋出拒絕執行異常,使本次任務失敗,這個繼承了JDK默認拒絕策略的特性
設計5: 自定義設計-阻塞入隊
在線程池初始化的時候自定義拒絕策略:阻塞入隊操作,阻塞方為調用方執行submitjob的線程
new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", "IOIntensiveThreadPool", e, r);if (!e.isShutdown()) {try {// 阻塞入隊操作,阻塞方為調用方執行submitjob的線程e.getQueue().put(r);} catch (InterruptedException ex) {log.error("reject put queue error", ex);}}}
}
如果要執行的任務數量過多,核心線程數、最大核心線程數占滿、任務隊列占滿,此時讓任務進行入隊阻塞,等待隊列中任務有空余位置。
參考文章
[1]. Java 線程池講解——針對 IO 密集型任務:https://www.jianshu.com/p/66b6dfcf3173(提出dubbo 或者 tomcat 的線程池中自定義Queue的實現,核心線程數 -> 最大線程數 -> 隊列中)
[2]. 某大廠線程池拒絕策略連環問 https://blog.csdn.net/shark_chili3007/article/details/137042400
[3]. 線程池拒絕策略:https://blog.csdn.net/qq_40428665/article/details/121680262
[4]. Java線程池如何合理配置核心線程數:https://www.cnblogs.com/Vincent-yuan/p/16022613.html
[5]. 線程池參數配置:https://blog.csdn.net/whp404/article/details/131960756(計算公式)