一、ThreadPoolExcutors的作用
java提供了ThreadPoolExcutors來創建一個線程池,我們為什么要用線程池呢?
1.降低資源的消耗:通過重復利用已經創建好的線程降低線程的創建和銷毀帶來的損耗
2.提高響應速度:因為線程池中的線程處于等待分配任務的狀態,當任務來時無需創建新的線程就能執行
3.提高線程的可管理性
?
?
?
?
?
?
二、ThreadPoolExecutor構造函數參數詳細介紹
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
1.int corePoolSize(核心線程數)
線程池新建線程的時候,如果當前線程總數小于corePoolSize,則新建的是核心線程,如果超過corePoolSize,則新建的是非核心線程;核心線程默認情況下會一直存活在線程池中,即使這個核心線程啥也不干(閑置狀態);如果設置了 allowCoreThreadTimeOut 為 true,那么核心線程如果不干活(閑置狀態)的話,超過一定時間(時長下面參數決定),就會被銷毀掉。
2.int maximumPoolSize(線程池能容納的最大線程數量)
線程總數 = 核心線程數 + 非核心線程數。
3.long keepAliveTime(非核心線程空閑存活時長)
非核心線程空閑時長超過該時長將會被回收,主要應用在緩存線程池中,當設置了 allowCoreThreadTimeOut 為 true 時,對核心線程同樣起作用。
4.TimeUnit unit?
空閑線程的存活時間(keepAliveTime 的單位)它是一個枚舉類型,常用的如:TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)。
5.BlockingQueue workQueue(任務隊列)
當所有的核心線程都在干活時,新添加的任務會被添加到這個隊列中等待處理,如果隊列滿了,則新建非核心線程執行任務
常用的workQueue類型:
????(1)SynchornousQueue:這隊列接收到任務時會直接交給線程處理,而不保留它,如果所有的線程都在工作,那就創建一???個新的線程來處理這個任務,為了保證不出現線程數達到maxnumPoolSize而不能新建線程的錯誤,所以使用這個類型 的隊列時,maxnumPoolSize一般指定成Integer.MAX_VALUE,即無限大,然后核心線程數corePoolSize一般是0.
????(2)LinkedBlockingQueue:這個隊列接收到任務的時候,如果當前線程數小于核心線程數,則新建線程(核心線程)處理任務;如果當前線程數等于核心線程數,則進入隊列等待。由于這個隊列沒有最大值限制,即所有超過核心線程數的任務都將被添加到隊列中,這也就導致了 maximumPoolSize 的設定失效,因為總線程數永遠不會超過 corePoolSize。
????(3)ArrayBlockingQueue:可以限定隊列的長度,接收到任務的時候,如果沒有達到 corePoolSize 的值,則新建線程(核心線程)執行任務,如果達到了,則入隊等候,如果隊列已滿,則新建線程(非核心線程)執行任務,又如果總線程數到了 maximumPoolSize,并且隊列也滿了,則發生錯誤。
????(4)DelayQueue:隊列內元素必須實現 Delayed 接口,這就意味著你傳進去的任務必須先實現 Delayed 接口。這個隊列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務。
6.ThreadFactory threadFactory(線程工廠):用來創建線程池中的線程,通常用默認的即可
7.RejectedExecutionHandler handler(拒絕策略):在線程池已經關閉的情況下和任務太多導致最大線程數和任務隊列已經飽和,無法再接收新的任務,在上面兩種情況下,只要滿足其中一種時,在使用 execute() 來提交新的任務時將會拒絕,線程池提供了以下 4 種策略:
AbortPolicy:默認策略,在拒絕任務時,會拋出RejectedExecutionException。
CallerRunsPolicy:只要線程池未關閉,該策略直接在調用者線程中,運行當前的被丟棄的任務。
DiscardOldestPolicy:該策略將丟棄最老的一個請求,也就是即將被執行的任務,并嘗試再次提交當前任務。
DiscardPolicy:該策略默默的丟棄無法處理的任務,不予任何處理。
線程池遵循的原則:
其會優先創建核心線程,執行任務,當核心線程增加CorePoolSize后,我們會把任務添加到work Queue中,當work Queue里面的任務也塞滿了,線程池就會創建非核心線程執行去執行任務,當線程達到maximumPoolSize時候和work queue也達最大值時候我們會執行對應的拒絕策略
?
?
?
三、4類常見的線程池
newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
newFixedThreadPool 創建一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待。
newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。
newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
其實他們都是通過ThreadPoolExcutors創建的
1.CachedThreadPool
由Executors的newCachedThreadPool方法創建,不存在核心線程,只存在數量不定的非核心線程,而且其數量最大值為Integer.MAX_VALUE。當線程池中的線程都處于活動時(全滿),線程池會創建新的線程來處理新的任務,否則就會利用新的線程來處理新的任務,線程池中的空閑線程都有超時機制,默認超時時長為60s,超過60s的空閑線程就會被回收。和FixedThreadPool不同的是,CachedThreadPool的任務隊列其實相當于一個空的集合,這將導致任何任務都會被執行,因為在這種場景下SynchronousQueue是不能插入任務的,SynchronousQueue是一個特殊的隊列,在很多情況下可以理解為一個無法儲存元素的隊列。從CachedThreadPool的特性看,這類線程比較適合執行大量耗時較小的任務。當整個線程池都處于閑置狀態時,線程池中的線程都會因為超時而被停止回收,幾乎是不占任何系統資源。實現方式如下:
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);
}
2.FixedThreadPool
由Executors的newFixedThreadPool方法創建。它是一種線程數量固定的線程池,當線程處于空閑狀態時,他們并不會被回收,除非線程池被關閉。當所有的線程都處于活動狀態時,新的任務都會處于等待狀態,直到有線程空閑出來。FixedThreadPool只有核心線程,且該核心線程都不會被回收,這意味著它可以更快地響應外界的請求。jdk實現如下:FixedThreadPool沒有額外線程,只存在核心線程,而且核心線程沒有超時機制,而且任務隊列沒有長度的限制。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);
}
3.ScheduledThreadPool
通過Executors的newScheduledThreadPool方式創建,核心線程數量是固定的,而非核心線程是沒有限制的,并且當非核心線程閑置時它會被立即回收,ScheduledThreadPool這類線程池主要用于執行定時任務和具有固定時期的重復任務,實現方法如下:
?
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue(), threadFactory);
}
4.SingleThreadPool
通過Executors的newSingleThreadExecutor方法來創建。這類線程池內部只有一個核心線程,它確保所有的任務都在同一個線程中按順序執行。SingleThreadExecutor的意義在于統一所有外界任務一個線程中,這使得這些任務之間不需要處理線程同步的問題,實現方式如下:
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));
}
?
?
?
?
四、為什么《阿里巴巴Java開發手冊》上要禁止使用Executors來創建線程池
?Executors創建出來的線程池使用的全都是無界隊列,而使用無界隊列會帶來很多弊端,最重要的就是,它可以無限保存任務,因此很有可能造成OOM異常。同時在某些類型的線程池里面,使用無界隊列還會導致maxinumPoolSize、keepAliveTime、handler等參數失效?
?
?
?
五、我們用代碼測試下
import java.util.*;
import java.lang.*;
import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;class Rextester
{ private static int produceTaskSleepTime = 5;private static int consumeTaskSleepTime = 1000;private static int taskMaxNumber = 10; //定義最大添加10個線程到線程池中public static void main(String args[]) {//構造一個線程池//該策略默默的丟棄無法處理的任務,不予任何處理。ThreadPoolExecutor threadPool = getTpe("DiscardPolicy");//該策略將丟棄最老的一個請求,也就是即將被執行的任務,并嘗試再次提交當前任務。//ThreadPoolExecutor threadPool = getTpe("DiscardOldestPolicy");for (int i = 1; i <= taskMaxNumber; i++) {try {//添加任務到線程池,我們要知道添加的任務是RunnableString value = "value is: " + i;System.out.println("put:" + value);threadPool.execute(new ThreadPoolTask(value));//線程休息一段時間,方便我們分析結果Thread. sleep(produceTaskSleepTime);} catch (Exception e) {e.printStackTrace();}}}/*** 線程池執行的任務*/public static class ThreadPoolTask implements Runnable {//保存任務所需要的數據private String value;public ThreadPoolTask(String value) {this.value = value;}public void run() {//打印語句System. out.println("start------" + value);try {Thread. sleep(consumeTaskSleepTime);} catch (Exception e) {e.printStackTrace();}value = "";}}/***初始化線程池 */public static ThreadPoolExecutor getTpe(String type) {if ("DiscardOldestPolicy".equals(type)) {return new ThreadPoolExecutor(2, 4, 3,TimeUnit. SECONDS, new ArrayBlockingQueue<Runnable>(3),new ThreadPoolExecutor.DiscardOldestPolicy()); } else {return new ThreadPoolExecutor(2, 4, 3,TimeUnit. SECONDS, new ArrayBlockingQueue<Runnable>(3),new ThreadPoolExecutor.DiscardPolicy()); }//DiscardPolicy DiscardOldestPolicy}
}
運行結果:
put:value is: 1
start------value is: 1
put:value is: 2
start------value is: 2
put:value is: 3
put:value is: 4
put:value is: 5
put:value is: 6
start------value is: 6
put:value is: 7
start------value is: 7
put:value is: 8
put:value is: 9
put:value is: 10
start------value is: 3
start------value is: 4
start------value is: 5
我們把上面的生成線程池的代碼不//
ThreadPoolExecutor threadPool = getTpe("DiscardOldestPolicy");
運行結果如下
put:value is: 1
start------value is: 1
put:value is: 2
start------value is: 2
put:value is: 3
put:value is: 4
put:value is: 5
put:value is: 6
start------value is: 6
put:value is: 7
start------value is: 7
put:value is: 8
put:value is: 9
put:value is: 10
start------value is: 8
start------value is: 9
start------value is: 10
?
部分內容參考了博客:https://blog.csdn.net/qq_34835058/article/details/80497602