Java自定義IO密集型和CPU密集型線程池

文章目錄

  • 前言
  • 線程池各類場景描述
  • 常見場景案例設計思路
    • 公共類
      • 自定義工廠類-MyThreadFactory
      • 自定義拒絕策略-RejectedExecutionHandlerFactory
      • 自定義阻塞隊列-TaskQueue(實現 核心線程->最大線程數->隊列)
    • 場景1:CPU密集型場景
      • 思路&計算公式
      • 實現代碼
    • 場景2:IO密集型場景
      • 思路&計算公式
      • 實現代碼
  • 其他部分組成
    • 拒絕策略兜底方案
      • 思路設計及思考
      • 設計1:數據庫持久化方案
      • 設計2:Netty兩種拒絕策略實現(根據場景來進行是否重試入隊 + 失敗拋異常)
      • 設計3:ActiveMQ(有效時間內嘗試入隊+入隊失敗拋出異常)
      • 設計4:dubbo設計思路(dump文件+拋出異常)
      • 設計5: 自定義設計-阻塞入隊
  • 參考文章

稿定智能設計202502031721

前言

本章節配套源碼:

  • gitee:https://gitee.com/changluJava/demo-exer/tree/master/JUC/src/main/java/demo10

線程池各類場景描述

**類型場景:**不同的場景設置參數也各不相同

  • 第一種:CPU密集型:最大線程數應該等于CPU核數+1,這樣最大限度提高效率。
// 通過該代碼獲取當前運行環境的cpu核數
Runtime.getRuntime().availableProcessors();
  • **第二種:**IO密集型:主要是進行IO操作,執行IO操作的時間較長,這時cpu出于空閑狀態,導致cpu的利用率不高。線程數為2倍CPU核數。當其中的線程在IO操作的時候,其他線程可以繼續用cpu,提高了cpu的利用率。

  • 第三種:混合型:如果CPU密集型和IO密集型執行時間相差不大那么可以拆分;如果兩種執行時間相差很大,就沒必要拆分了。

  • **第四種(了解):**在IO優化中,線程等待時間所占比越高,需要線程數越多;線程cpu時間占比越高,需要越少線程數。

線程池初始化所有參數:

corePoolSize : 核心線程數,當線程池中的線程數量為 corePoolSize 時,即使這些線程處于空閑狀態,也不會銷毀(除非設置 allowCoreThreadTimeOut)。
maximumPoolSize : 最大線程數,線程池中允許的線程數量的最大值。
keepAliveTime : 線程空閑時間,當線程池中的線程數大于 corePoolSize 時,多余的空閑線程將在銷毀之前等待新任務的最長時間。
workQueue : 任務隊列
unit : 線程空閑時間的單位。
threadFactory : 線程工廠,線程池創建線程時使用的工廠。
handler : 拒絕策略,因達到線程邊界和任務隊列滿時,針對新任務的處理方法。CallerRunsPolicy:由提交任務的線程直接執行任務,避免任務丟失。適合任務量波動較大的場景。AbortPolicy:直接拋出 RejectedExecutionException 異常。適合任務量可控的場景。DiscardPolicy:靜默丟棄任務,不拋出異常。適合對任務丟失不敏感的場景。DiscardOldestPolicy:丟棄隊列中最舊的任務,然后重新嘗試提交當前任務。適合對任務時效性要求較高的場景。

核心線程池execute邏輯代碼:

public void execute(Runnable command) {//任務判空if (command == null)throw new NullPointerException();//查看當前運行的線程數量int c = ctl.get();//若小于核心線程則直接添加一個工作線程并執行任務if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//如果線程數等于核心線程數則嘗試將任務入隊if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}//入隊失敗,調用addWorker參數為false,嘗試創建應急線程處理突發任務else if (!addWorker(command, false))//如果創建應急線程失敗,說明當前線程數已經大于最大線程數,這個任務只能拒絕了reject(command);}

image-20250201004416120


常見場景案例設計思路

公共類

image-20250202175504266

自定義工廠類-MyThreadFactory

MyThreadFactory.java:自定義了線程池工廠類,可以自行進行命名

package demo10;import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;/*** 自定義線程池工廠類*/
public class MyThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public MyThreadFactory(String factoryName) {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = factoryName + "-pool-" +poolNumber.getAndIncrement() +"-thread-";}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}
}

自定義拒絕策略-RejectedExecutionHandlerFactory

RejectedExecutionHandlerFactory.java:包含有多種拒絕策略,其中包含本次需要使用的阻塞入隊拒絕策略

package demo10;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;/*** 拒絕策略工廠類**/
@Slf4j
public class RejectedExecutionHandlerFactory {private static final AtomicLong COUNTER = new AtomicLong();/*** 拒絕執行,拋出 RejectedExecutionException* @param source name for log* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newAbort(String source) {return (r, e) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Abort, Maybe you need to adjust the ThreadPool config!", source, e, r);throw new RejectedExecutionException("Task " + r.toString() +" rejected from " + source);};}/*** 直接丟棄該任務* @param source log name* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newDiscard(String source) {return (r, p) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Discard, Maybe you need to adjust the ThreadPool config!", source, p, r);};}/*** 調用線程運行* @param source log name* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newCallerRun(String source) {System.out.println("thread =>" + Thread.currentThread().getName() + "觸發阻塞中...");return (r, p) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", source, p, r);if (!p.isShutdown()) {r.run();}};}/*** 新線程運行* @param source log name* @return A handler for tasks that cannot be executed by ThreadPool*/public static RejectedExecutionHandler newThreadRun(String source) {return (r, p) -> {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!, Maybe you need to adjust the ThreadPool config!", source, p, r);if (!p.isShutdown()) {String threadName = source + "-T-" + COUNTER.getAndIncrement();log.info("[{}] create new thread[{}] to run job", source, threadName);new Thread(r, threadName).start();}};}/*** 依據阻塞隊列put 阻塞添加到隊列中* @return 拒絕策略執行器*/public static RejectedExecutionHandler blockCallerPolicy(String source) {return new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", source, e, r);if (!e.isShutdown()) {try {// 阻塞入隊操作,阻塞方為調用方執行submitjob的線程e.getQueue().put(r);} catch (InterruptedException ex) {log.error("reject put queue error", ex);}}}};}}

自定義阻塞隊列-TaskQueue(實現 核心線程->最大線程數->隊列)

TaskQueue.java:線程池中實現先使用核心線程數

package demo10;import java.util.concurrent.*;public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {private transient ThreadPoolExecutor parent;public TaskQueue(int capacity) {super(capacity);}public void setExecutor(ThreadPoolExecutor parent) {this.parent = parent;}/*** 核心線程 -> 最大核心線程數 -> 隊列* @param runnable the element to add* @return*/@Overridepublic boolean offer(Runnable runnable) {// 如果沒有線程池父類,則直接嘗試入隊if (parent == null) return super.offer(runnable);// 若是工作線程數 < 最大線程數,則優先創建線程跑任務if (parent.getPoolSize() < parent.getMaximumPoolSize()) return false;// 工作線程數 >= 最大線程數,入隊return super.offer(runnable);}
}

場景1:CPU密集型場景

思路&計算公式

**場景:**具體是指那種包含大量運算、在持有的 CPU 分配的時間片上一直在執行任務、幾乎不需要依賴或等待其他任何東西。處理起來其實沒有多少優化空間,因為處理時幾乎沒有等待時間,所以一直占有 CPU 進行執行,才是最好的方式。

**可優化的點:**就是當單個線程累計較多任務時,其他線程能進行分擔,類似fork/join框架的概念。

設置參數:設置線程數時,針對單臺機器,最好就是有幾個 CPU ,就創建幾個線程,然后每個線程都在執行這種任務,永不停歇。

Nthreads=Ncpu+1 
w/c =0 
理解也是正確的,+1 主要是防止因為系統上下文切換,讓系統資源跑滿!

實現代碼

image-20250202184351200

這里核心+最大線程數使用的是CPU核心數+1:

package demo10.cpu;import demo10.MyThreadFactory;
import demo10.RejectedExecutionHandlerFactory;
import demo10.TaskQueue;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** cpu密集型場景任務提交*  自定義隊列:核心線程 -> 最大線程 -> 隊列*  自定義拒絕策略:自定義采用執行阻塞隊列的put操作來實現任務阻塞入隊,而非直接使用調用者線程來直接跑任務*  非影響主線程執行流程:批次1000個任務統一在一個線程中去進行處理,與主流程main線程隔離**/
@Slf4j
public class CPUThreadPoolExample {public static void main(String[] args) {// 獲取 CPU 核心數int cpuCores = Runtime.getRuntime().availableProcessors();// 自定義線程池參數int corePoolSize = cpuCores + 1; // 核心線程數 cpu核心數+1int maximumPoolSize = corePoolSize; // 最大線程數 cpu核心數+1long keepAliveTime = 60L; // 空閑線程存活時間TimeUnit unit = TimeUnit.SECONDS; // 時間單位// 自定義任務隊列 核心線程 -> 最大核心線程數 -> 隊列TaskQueue<Runnable> taskQueue = new TaskQueue<>(500); // 隊列容量為核心線程數的 2 倍// 創建自定義線程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,taskQueue,new MyThreadFactory("IOIntensiveThreadPool"), // 默認線程工廠 Executors.defaultThreadFactory() | 自定義工廠支持自定義線程池名字RejectedExecutionHandlerFactory.blockCallerPolicy("IOIntensiveThreadPool"));// 將線程池對象設置到任務隊列中taskQueue.setExecutor(executor);// 統計任務的執行數量int jobNums = 1000000;final AtomicInteger count = new AtomicInteger(0);// 記錄任務開始時間long startTime = System.currentTimeMillis();// 單獨開一個線程(后續可改為線程池 核心、最大就1個場景)去完成整個任務提交處理// 如果submitjob阻塞,僅僅只會影響該thread線程new Thread(() -> {CountDownLatch latch = new CountDownLatch(jobNums);// 模擬1000個任務 (可改造為queue隊列形式去在這個線程中去消費)for (int i = 0; i < jobNums; i++) {final int taskId = i;executor.submit(() -> {// CPU計算int sum = 0;for (int j = 0; j < 100000; j++) {sum += j;}System.out.println(Thread.currentThread().getName() + " 任務 " + taskId + " 完成!sum = " + sum);count.incrementAndGet(); // 原子操作,+1 并返回新值latch.countDown();});}System.out.println("所有任務提交完成!");// 關閉線程池,等待任務全部執行完畢try {latch.await();System.out.println("所有任務執行結束!");// 記錄任務結束時間long endTime = System.currentTimeMillis();// 計算任務執行時間long duration = endTime - startTime;System.out.println("任務執行總耗時: " + duration + " 毫秒");} catch (InterruptedException e) {e.printStackTrace();throw new RuntimeException(e);}finally {executor.shutdown();}}).start();try {// 等待所有任務完成if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow(); // 強制關閉}System.out.println("執行完任務數統計:" + count.get());} catch (InterruptedException e) {executor.shutdownNow();}}
}

效果:

image-20250202184444336


場景2:IO密集型場景

思路&計算公式

**場景:**其消耗的主要資源就是 IO 了,所接觸到的 IO ,大致可以分成兩種:磁盤 IO網絡 IO。IO 操作的特點就是需要等待,我們請求一些數據,由對方將數據寫入緩沖區,在這段時間中,需要讀取數據的線程根本無事可做,因此可以把 CPU 時間片讓出去,直到緩沖區寫滿。

  • 磁盤 IO ,大多都是一些針對磁盤的讀寫操作,最常見的就是文件的讀寫,假如你的數據庫、 Redis 也是在本地的話,那么這個也屬于磁盤 IO。
  • 網絡 IO ,這個應該是大家更加熟悉的,我們會遇到各種網絡請求,比如 http 請求、遠程數據庫讀寫、遠程 Redis 讀寫等等。

設置參數:

# 如果存在IO,那么肯定w/c>1(阻塞耗時一般都是計算耗時的很多倍),但是需要考慮系統內存有限(每開啟一個線程都需要內存空間),這里需要上服務器測試具體多少個線程數適合(CPU占比、線程數、總耗時、內存消耗)。如果不想去測試,保守點取1即,Nthreads=Ncpu*(1+1)=2Ncpu。這樣設置一般都OK
# 通用就是2倍的CPU核心數(如果要效率最大化,需要測算當前系統環境每個線程任務的阻塞等待時間與實際計算時間)
Nthreads=Ncpu*(1+w/c)
公式中 W/C 為系統 阻塞率  w:等待時間 c:計算時間

實現代碼

image-20250202183217126

IOIntensiveThreadPoolExample2.java:這里最終實現的Example2類來進行測試

package demo10.io;import demo10.MyThreadFactory;
import demo10.RejectedExecutionHandlerFactory;
import demo10.TaskQueue;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** io密集型場景任務提交* demo3:基于demo2自定義拒絕策略*  自定義隊列:核心線程 -> 最大線程 -> 隊列*  自定義拒絕策略:自定義采用執行阻塞隊列的put操作來實現任務阻塞入隊,而非直接使用調用者線程來直接跑任務*  非影響主線程執行流程:批次1000個任務統一在一個線程中去進行處理,與主流程main線程隔離**/
@Slf4j
public class IOIntensiveThreadPoolExample2 {public static void main(String[] args) {// 獲取 CPU 核心數int cpuCores = Runtime.getRuntime().availableProcessors();// 自定義線程池參數int corePoolSize = cpuCores * 2; // 核心線程數(IO 密集型任務可以設置較大)int maximumPoolSize = cpuCores * 4; // 最大線程數long keepAliveTime = 60L; // 空閑線程存活時間TimeUnit unit = TimeUnit.SECONDS; // 時間單位// 自定義任務隊列 核心線程 -> 最大核心線程數 -> 隊列TaskQueue<Runnable> taskQueue = new TaskQueue<>(corePoolSize * 2); // 隊列容量為核心線程數的 2 倍// 創建自定義線程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,taskQueue,new MyThreadFactory("IOIntensiveThreadPool"), // 默認線程工廠 Executors.defaultThreadFactory() | 自定義工廠支持自定義線程池名字RejectedExecutionHandlerFactory.blockCallerPolicy("IOIntensiveThreadPool"));// 將線程池對象設置到任務隊列中taskQueue.setExecutor(executor);// 統計任務的執行數量int jobNums = 1000;final AtomicInteger count = new AtomicInteger(0);// 記錄任務開始時間long startTime = System.currentTimeMillis();// 單獨開一個線程(后續可改為線程池 核心、最大就1個場景)去完成整個任務提交處理// 如果submitjob阻塞,僅僅只會影響該thread線程new Thread(() -> {CountDownLatch latch = new CountDownLatch(jobNums);// 模擬1000個任務 (可改造為queue隊列形式去在這個線程中去消費)for (int i = 0; i < jobNums; i++) {final int taskId = i;executor.submit(() -> {System.out.println(Thread.currentThread().getName() + " 正在執行任務 " + taskId + "...");try {Thread.sleep(500); // 模擬 IO 操作(如網絡請求或文件讀寫)10s// xxxio類耗時操作} catch (InterruptedException e) {e.printStackTrace();throw new RuntimeException(e);}finally {System.out.println(Thread.currentThread().getName() + " 任務 " + taskId + " 完成!");count.incrementAndGet(); // 原子操作,+1 并返回新值latch.countDown();}});}System.out.println("所有任務提交完成!");// 關閉線程池,等待任務全部執行完畢try {latch.await();System.out.println("所有任務執行結束!");// 記錄任務結束時間long endTime = System.currentTimeMillis();// 計算任務執行時間long duration = endTime - startTime;System.out.println("任務執行總耗時: " + duration + " 毫秒");} catch (InterruptedException e) {e.printStackTrace();throw new RuntimeException(e);}finally {executor.shutdown();}}).start();try {// 等待所有任務完成if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow(); // 強制關閉}System.out.println("執行完任務數統計:" + count.get());} catch (InterruptedException e) {executor.shutdownNow();}}
}

一個任務耗時0.5s,1000個任務執行如下:

image-20250202183457306

說明:經過測試驗證,如果IO阻塞時間特別長,調大最大核心線程數效果更好。


其他部分組成

拒絕策略兜底方案

思路設計及思考

如果核心線程、最大線程、隊列都滿了的情況下該如何處理?如果本身就是單臺機器資源打滿,就需要在設計策略上改變線程池的調度方案,如果我的目的是任何一個任務都不丟棄,同時在服務器上有余力及時處理?

方案1:持久化數據庫設計

  • 如:設計一張任務表間任務存儲到 MySQL 數據庫中;redis緩存;任務提交到中間件來緩沖。

設計思路可以如下:參考https://zhuanlan.zhihu.com/p/700719289

image-20250201084054264

方案2:Netty 為例,它的拒絕策略則是直接創建一個線程池以外的線程處理這些任務,為了保證任務的實時處理,這種做法可能需要良好的硬件設備且臨時創建的線程無法做到準確的監控。

  • 后續通過翻閱源碼發現一種在拒絕策略場景帶退避的重試策略

方案3:ActiveMQ 則是嘗試在指定的時效內盡可能的爭取將任務入隊,以保證最大交付

**方案4:**dubbo設計思路(dump文件+拋出異常)

方案5:線程阻塞隊列

思路:隊列采用阻塞隊列,在拒絕策略方法中使用put方法實現阻塞效果。

可能情況:阻塞主線程任務執行。


設計1:數據庫持久化方案

設計思路:自定義拒絕策略,在拒絕策略情況下進行數據庫持久化;自定義實現隊列,在poll的時候優先從db獲取任務,接著再從隊列中獲取。

**詳細具體實現可見:**某大廠線程池拒絕策略連環問 https://blog.csdn.net/shark_chili3007/article/details/137042400


設計2:Netty兩種拒絕策略實現(根據場景來進行是否重試入隊 + 失敗拋異常)

實現思路1:創建新線程執行任務

說明:為了保證任務的實時處理,這種做法需要良好的硬件設備且臨時創建的線程無法做到準確的監控

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {NewThreadRunsPolicy() {super();}public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {//創建一個臨時線程處理任務final Thread t = new Thread(r, "Temporary task executor");t.start();} catch (Throwable e) {throw new RejectedExecutionException("Failed to start a new thread", e);}}
}

弊端:如果任務數特別多無上限場景,就會出現oom情況,導致服務掛掉。

實現思路2:拒絕策略場景帶退避的重試策略

源碼地址:https://github.dev/netty/netty

  • 具體代碼文件:RejectedExecutionHandlers
/*** Tries to backoff when the task can not be added due restrictions for an configured amount of time. This* is only done if the task was added from outside of the event loop which means* {@link EventExecutor#inEventLoop()} returns {@code false}.*/
public static RejectedExecutionHandler backoff(final int retries, long backoffAmount, TimeUnit unit) {// 檢查 retries 參數是否為正數,如果不是則拋出異常ObjectUtil.checkPositive(retries, "retries");// 將退避時間轉換為納秒final long backOffNanos = unit.toNanos(backoffAmount);// 返回一個實現了 RejectedExecutionHandler 接口的匿名類return new RejectedExecutionHandler() {@Overridepublic void rejected(Runnable task, SingleThreadEventExecutor executor) {// 檢查當前線程是否不是事件循環線程if (!executor.inEventLoop()) {// 進行最多 retries 次重試for (int i = 0; i < retries; i++) {// 嘗試喚醒事件循環線程,以便它能夠處理任務隊列中的任務executor.wakeup(false);// 當前線程休眠指定的退避時間LockSupport.parkNanos(backOffNanos);// 嘗試將任務重新加入任務隊列if (executor.offerTask(task)) {// 如果任務成功加入隊列,則直接返回return;}}}// 如果當前線程是事件循環線程,或者重試次數用盡后仍然無法加入任務隊列,// 則拋出 RejectedExecutionException 異常throw new RejectedExecutionException();}};
}

設計3:ActiveMQ(有效時間內嘗試入隊+入隊失敗拋出異常)

說明:嘗試在指定的時效內盡可能的爭取將任務入隊,以保證最大交付,超過時間內則返回false。

github地址:https://github.dev/apache/activemq

  • 對應代碼:BrokerService#getExecutor
new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {try {// 在60s內進行嘗試入隊,如果入隊失敗,則拋出異常if (!executor.getQueue().offer(r, 60, TimeUnit.SECONDS)) {throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");}} catch (InterruptedException e) {throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");}}}

設計4:dubbo設計思路(dump文件+拋出異常)

github地址:

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg = String.format("Thread pool is EXHAUSTED!"+ " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d),"+ " Task: %d (completed: %d),"+ " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",threadName,e.getPoolSize(),e.getActiveCount(),e.getCorePoolSize(),e.getMaximumPoolSize(),e.getLargestPoolSize(),e.getTaskCount(),e.getCompletedTaskCount(),e.isShutdown(),e.isTerminated(),e.isTerminating(),url.getProtocol(),url.getIp(),url.getPort());// 0-1 - Thread pool is EXHAUSTED!logger.warn(COMMON_THREAD_POOL_EXHAUSTED, "too much client requesting provider", "", msg);if (Boolean.parseBoolean(url.getParameter(DUMP_ENABLE, Boolean.TRUE.toString()))) {// 進行dump文件dumpJStack();}// 指派發送消息給listener監聽器dispatchThreadPoolExhaustedEvent(msg);throw new RejectedExecutionException(msg);
}

dubbo的工作線程觸發了線程拒絕后,主要做了三個事情,原則就是盡量讓使用者清楚觸發線程拒絕策略的真實原因

1)輸出了一條警告級別的日志,日志內容為線程池的詳細設置參數,以及線程池當前的狀態,還有當前拒絕任務的一些詳細信息。可以說,這條日志,使用dubbo的有過生產運維經驗的或多或少是見過的,這個日志簡直就是日志打印的典范,其他的日志打印的典范還有spring。得益于這么詳細的日志,可以很容易定位到問題所在

2)輸出當前線程堆棧詳情,這個太有用了,當你通過上面的日志信息還不能定位問題時,案發現場的dump線程上下文信息就是你發現問題的救命稻草。

3)繼續拋出拒絕執行異常,使本次任務失敗,這個繼承了JDK默認拒絕策略的特性


設計5: 自定義設計-阻塞入隊

在線程池初始化的時候自定義拒絕策略:阻塞入隊操作,阻塞方為調用方執行submitjob的線程

new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", "IOIntensiveThreadPool", e, r);if (!e.isShutdown()) {try {// 阻塞入隊操作,阻塞方為調用方執行submitjob的線程e.getQueue().put(r);} catch (InterruptedException ex) {log.error("reject put queue error", ex);}}}
}

如果要執行的任務數量過多,核心線程數、最大核心線程數占滿、任務隊列占滿,此時讓任務進行入隊阻塞,等待隊列中任務有空余位置。


參考文章

[1]. Java 線程池講解——針對 IO 密集型任務:https://www.jianshu.com/p/66b6dfcf3173(提出dubbo 或者 tomcat 的線程池中自定義Queue的實現,核心線程數 -> 最大線程數 -> 隊列中)

[2]. 某大廠線程池拒絕策略連環問 https://blog.csdn.net/shark_chili3007/article/details/137042400

[3]. 線程池拒絕策略:https://blog.csdn.net/qq_40428665/article/details/121680262

[4]. Java線程池如何合理配置核心線程數:https://www.cnblogs.com/Vincent-yuan/p/16022613.html

[5]. 線程池參數配置:https://blog.csdn.net/whp404/article/details/131960756(計算公式)

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/894587.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/894587.shtml
英文地址,請注明出處:http://en.pswp.cn/news/894587.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【VM】VirtualBox安裝ubuntu22.04虛擬機

閱讀本文之前&#xff0c;請先根據 安裝virtualbox 教程安裝virtulbox虛擬機軟件。 1.下載Ubuntu系統鏡像 打開阿里云的鏡像站點&#xff1a;https://developer.aliyun.com/mirror/ 找到如圖所示位置&#xff0c;選擇Ubuntu 22.04.3(destop-amd64)系統 Ubuntu 22.04.3(desto…

Pandas基礎08(分箱操作/時間序列/畫圖)

3.8.1 Pandas分箱操作 數據分箱&#xff08;Binning&#xff09; 是一種數據預處理方法&#xff0c;用于將連續型變量的數值范圍分割成若干個區間或“箱”&#xff08;bins&#xff09;&#xff0c;將數據按照這些區間進行分類&#xff0c;從而轉換為離散型變量。這種方法常用…

C#,shell32 + 調用控制面板項(.Cpl)實現“新建快捷方式對話框”(全網首發)

Made By 于子軒&#xff0c;2025.2.2 不管是使用System.IO命名空間下的File類來創建快捷方式文件&#xff0c;或是使用Windows Script Host對象創建快捷方式&#xff0c;亦或是使用Shell32對象創建快捷方式&#xff0c;都對用戶很不友好&#xff0c;今天小編為大家帶來一種全新…

國產編輯器EverEdit - 輸出窗口

1 輸出窗口 1.1 應用場景 輸出窗口可以顯示用戶執行某些操作的結果&#xff0c;主要包括&#xff1a; 查找類&#xff1a;查找全部&#xff0c;篩選等待操作&#xff0c;可以把查找結果打印到輸出窗口中&#xff1b; 程序類&#xff1a;在執行外部程序時(如&#xff1a;命令窗…

Vue-data數據

目錄 一、Vue中的data數據是什么&#xff1f;二、data支持的數據類型有哪些&#xff1f; 一、Vue中的data數據是什么&#xff1f; Vue中用到的數據定義在data中。 二、data支持的數據類型有哪些&#xff1f; data中可以寫復雜類型的數據&#xff0c;渲染復雜類型數據時只要遵…

02.03 遞歸運算

使用遞歸求出 1 1/3 -1/5 1/7 - 1/9 ... 1/n的值。 1>程序代碼 #include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #inc…

數據分析系列--⑥RapidMiner構建決策樹(泰坦尼克號案例含數據)

一、資源下載 二、數據處理 1.導入數據 2.數據預處理 三、構建模型 1.構建決策樹 2.劃分訓練集和測試集 3.應用模型 4.結果分析 一、資源下載 點擊下載數據集 二、數據處理 1.導入數據 2.數據預處理 三、構建模型 1.構建決策樹 雖然決策樹已經構建,但對于大多數初學者或…

高階開發基礎——快速入門C++并發編程6——大作業:實現一個超級迷你的線程池

目錄 實現一個無返回的線程池 完全代碼實現 Reference 實現一個無返回的線程池 實現一個簡單的線程池非常簡單&#xff0c;我們首先聊一聊線程池的定義&#xff1a; 線程池&#xff08;Thread Pool&#xff09; 是一種并發編程的設計模式&#xff0c;用于管理和復用多個線程…

pytorch實現主成分分析 (PCA):用于數據降維和特征提取

人工智能例子匯總&#xff1a;AI常見的算法和例子-CSDN博客 使用 PyTorch 實現主成分分析&#xff08;PCA&#xff09;可以通過以下步驟進行&#xff1a; 標準化數據&#xff1a;首先&#xff0c;需要對數據進行標準化處理&#xff0c;確保每個特征的均值為 0&#xff0c;方差…

100 ,【8】 buuctf web [藍帽杯 2021]One Pointer PHP(別看)

進入靶場 沒提示&#xff0c;去看源代碼。 user.php <?php // 定義一個名為 User 的類&#xff0c;該類可用于表示用戶相關信息或執行與用戶有關的操作 class User{// 聲明一個公共屬性 $count&#xff0c;可在類的內部和外部直接訪問// 這個屬性可能用于記錄與用戶相關…

巧妙利用數據結構優化部門查詢

目錄 一、出現的問題 部門樹接口超時 二、問題分析 源代碼分析 三、解決方案 具體實現思路 四、優化的效果 一、出現的問題 部門樹接口超時 無論是在A項目還是在B項目中&#xff0c;都存在類似的頁面&#xff0c;其實就是一個部門列表或者叫組織列表。 從頁面的展示形式…

QT簡單實現驗證碼(字符)

0&#xff09; 運行結果 1&#xff09; 生成隨機字符串 Qt主要通過QRandomGenerator類來生成隨機數。在此之前的版本中&#xff0c;qrand()函數也常被使用&#xff0c;但從Qt 5.10起&#xff0c;推薦使用更現代化的QRandomGenerator類。 在頭文件添加void generateRandomNumb…

JavaFX - 3D 形狀

在前面的章節中&#xff0c;我們已經了解了如何在 JavaFX 應用程序中的 XY 平面上繪制 2D 形狀。除了這些 2D 形狀之外&#xff0c;我們還可以使用 JavaFX 繪制其他幾個 3D 形狀。 通常&#xff0c;3D 形狀是可以在 XYZ 平面上繪制的幾何圖形。它們由兩個或多個維度定義&#…

深入理解開放尋址法中的三種探測序列

一、引言 開放尋址法是解決散列表中沖突的一種重要方法&#xff0c;當發生沖突&#xff08;即兩個不同的鍵通過散列函數計算得到相同的散列值&#xff09;時&#xff0c;它會在散列表中尋找下一個可用的存儲位置。而探測序列就是用于確定在發生沖突后&#xff0c;依次嘗試哪些…

【雙指針題目】

雙指針 美麗區間&#xff08;滑動窗口&#xff09;合并數列&#xff08;雙指針的應用&#xff09;等腰三角形全部所有的子序列 美麗區間&#xff08;滑動窗口&#xff09; 美麗區間 滑動窗口模板&#xff1a; int left 0, right 0;while (right < nums.size()) {// 增大…

為什么命令“echo -e “\033[9;0]“ > /dev/tty0“能控制開發板上的LCD不熄屏?

為什么命令"echo -e “\033[9;0]” > /dev/tty0"能控制開發板上的LCD不熄屏&#xff1f; 在回答這個問題前請先閱讀我之前寫的與tty和終端有關的博文 https://blog.csdn.net/wenhao_ir/article/details/145431655 然后再來看這條命令的解釋就要容易些了。 這條…

嵌入式八股文面試題(一)C語言部分

1. 變量/函數的聲明和定義的區別&#xff1f; &#xff08;1&#xff09;變量 定義不僅告知編譯器變量的類型和名字&#xff0c;還會分配內存空間。 int x 10; // 定義并初始化x int x; //同樣是定義 聲明只是告訴編譯器變量的名字和類型&#xff0c;但并不為它分配內存空間…

go-zero學習筆記(三)

利用goctl生成rpc服務 編寫proto文件 // 聲明 proto 使用的語法版本 syntax "proto3";// proto 包名 package demoRpc;// golang 包名(可選) option go_package "./demo";// 如需為 .proto 文件添加注釋&#xff0c;請使用 C/C 樣式的 // 和 /* ... */…

Javascript代碼庫-jQuery入門

摘自千鋒教育kerwin的js教程 jQuery 是一個前端庫&#xff0c;也是一個方法庫他里面封裝著一些列的方法供我們使用我們常用的一些方法它里面都有&#xff0c;我們可以直接拿來使用就行了jQuery 之所以好用&#xff0c;很多人愿意使用&#xff0c;是因為他的幾個優點太強大了 優…

【25考研】南開軟件考研復試復習重點!

一、復試內容 復試采取現場復試的方式。復試分為筆試、機試和面試三部分。三部分合計100分&#xff0c;其中筆試成績占30%、機試成績占30%、面試成績占40%。 1.筆試&#xff1a;專業綜合基礎測試 考核方式&#xff1a;閉卷考試&#xff0c;時長為90分鐘。 筆試考查內容范圍…