線程池核心概述
- Executors工廠類使用?? ??? ??? ??? ??? ??? ?
- Executors工廠類底層源碼分析詳解?? ??? ??? ??? ?
- ThreadPoolExecutor自定義線程池?? ??? ??? ??? ?
- ThreadPoolExecutor拒絕策略詳解?? ??? ??? ??? ?
- 計算機密集型與IO密集型詳解?? ??? ??? ?
- 如何正確的使用線程池
線程池初步
- 線程池,一般高并發其實是一個非常抽象的概念,要實現高并發其實不僅僅是一個JAVA 線程集合類、或者JAVA基礎層面就能搞定的事情,在互聯網大廠中,高并發其實涉及方方面面,從前端到后端,到支持高并發的中間組件(redis、zookeper等),最后到數據存儲,持久化層面等等,都需要對高并發做一些考量和設計
- 管理控制:首先,從管理角度就是為了更好的控制線程,使用線程池來幫助我們去管理線程,使得我們對線程的生命周期、初始化、運行狀態、銷毀等各個環節有一個把控
- 系統資源:另外一點,從系統資源的角度考慮,線程池可以控制線程的數量,根據任務的多少去對線程池中的線程個數進行添加或者減少,可以回收空閑狀態的線程,減少線程的頻繁初始化和銷毀,避免不必要的系統開銷,節省系統資源,保障穩定性
- 應用性能:從性能的角度去考慮,線程池可以配合高并發容器的設置,對任務和工作項進行緩存,異步的多線程的去處理任務,從而提高應用服務的吞吐率、消費性能,也從而提高單個線程的利用率
- 兜底策略:從健壯性的角度去分析,線程池提供了很多拒絕策略,我們在任務過多或者處理不過來的時候,可以進行有效的拒絕策略、降級方案,以補償的形式進行處理任務,避免因為線程池的問題對系統產生較為嚴重的影響
Executors
- JDK提供了一套線程框架Executors,存儲在java.util.concurrent包中,是JDK并發包的核心
- Executors:線程工廠的角色,通過Executors可以創建特定功能的線程池
Executors創建線程池的方法
- newFixedThreadPool()方法:該方法返回一個固定數量的線程池,該方法的線程數始終不變,當有一個任務提交時,若線程池中空閑,則立即執行,若沒有,則會被暫緩在一個任務隊列中等待有空閑的線程去執行
- newSingleThreadPool ()方法:創建一個線程的線程池,若空閑則執行,若沒有空閑線程則暫緩在任務列隊中
- newCachedThreadPool()方法:返回一個可根據實際情況調整線程個數的線程池,不限制最大線程數量,若有任務,則創建線程,若無任務則不創建線程。如果沒有任務則線程在60s后自動回收(空閑時間60s)
- newScheduledThreadPool()方法:該方法返回一個SchededExecutorService對象,但該線程池可以指定線程的數量
自定義線程池ThreadPoolExecutor
- 自定義線程池:若Executors工廠無法滿足我們的需求,可以自己創建自定義線程池,其實Executors工廠類里面的創建線程方法其內部實現均是用了ThreadPoolExecutor這個類,這個類可以自定義線程。構造方法如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
- 使用有界隊列:在使用有界隊列時,若有新的任務需要執行,如果線程池實際線程數小于corePoolSize,則優先創建線程,若大于corePoolSize,則會將任務加入隊列,若隊列已滿,則在總線程數不大于maximumPoolSize的前提下,創建新的線程,若線程數大于maximumPoolSize,則執行拒絕策略。或其他自定義方式
- 使用無界隊列:在使用無界隊列時:LinkedBlockingQueue。與有界隊列相比,除非系統資源耗盡,否則無界的任務隊列不存在任務入隊失敗的情況。當有新任務到來,系統的線程數小于corePoolSize時,則新建線程執行任務。當達到corePoolSize后,就不會繼續增加。若后續仍有新的任務加入,而有沒有空閑的線程資源,則任務直接進入隊列等待。若任務創建和處理的速度差異很大,無界隊列會保持快速增長,直到耗盡系統內存
線程池的拒絕策略
- AbortPolicy:直接拋出異常阻止系統正常工作
- CallerRunsPolicy:只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務
- DiscardOldestPolicy:丟棄最老的一個請求,嘗試再次提交當前任務
- DiscardPolicy:丟棄無法處理的任務,不給予任何處理
- 如果需要自定義拒絕策略可以實現RejectedExecutionHandler接口
//RejectedExecutionHandler接口 public class MyRejected implements RejectedExecutionHandler{public MyRejected(){}@Overridepublic void rejectExecution(Runnable r,ThreadPoolExecutor executor){} }
如何使用好線程池
- 線程個數大小的設置
- 線程池相關參數配置
- 利用Hook嵌入你的行為
- 線程池的關閉
線程池大小設置(計算密集型/IO密集型)
- 計算密集型: 顧名思義就是應用需要非常多的CPU計算資源,在多核CPU時代,我們要讓每一個CPU核心都參與計算,將CPU的性能充分利用起來,這樣才算是沒有浪費服務器配置,如果在非常好的服務器配置上還運行著單線程程序那將是多么重大的浪費。對于計算密集型的應用,完全是靠CPU的核數來工作,所以為了讓它的優勢完全發揮出來,避免過多的線程上下文切換。比較理想方案是: 線程數 = CPU核數+1,也可以設置成CPU核數*2,但還要看JDK的版本以及CPU配置(服務器的CPU有超線程)
- IO密集型: 就很好理解了,我們現在做的開發大部分都是WEB應用,涉及到大量的網絡傳輸,不僅如此,與數據庫,與緩存間的交互也涉及到IO,一旦發生IO,線程就會處于等待狀態,當IO結束,數據準備好后,線程才會繼續執行。因此從這里可以發現,對于IO密集型的應用,我們可以多設置一些線程池中線程的數量,這樣就能讓在等待IO的這段時間內,線程可以去做其它事,提高并發處理效率。那么這個線程池的數據量是不是可以隨便設置呢?當然不是的,請一定要記得,線程上下文切換是有代價的。目前總結了一套公式,對于IO密集型應用: 線程數 = CPU核心數/(1-阻塞系數) 這個阻塞系數一般為0.8~0.9之間,也可以取0.8或者0.9。 套用公式,對于雙核CPU來說,它比較理想的線程數就是20,當然這都不是絕對的,需要根據實際情況以及實際業務來調整:final int poolSize = (int)(cpuCore/(1-0.9))
線程池相關參數配置注意事項
- 避免線上操作數據庫,查詢、修改都很麻煩
- 使用線程池的時候都不要選擇沒有上限限制的配置項
- 不要使用沒有上限的線程池和設置無界隊列
- newCachedThreadPool的設置與無界隊列的設置因為某些不可預期的情況,線程池會出現系統異常,導致線程暴增的情況或者任務隊列不斷膨脹,內存耗盡導致系統崩潰和異常。 我們推薦使用自定義線程池來避免該問題,這也是在使用線程池規范的首要原則
- 合理設置線程數量、和線程空閑回收時間,根據具體的任務執行周期和時間去設定,避免頻繁的回收和創建,雖然我們使用線程池的目的是為了提升系統性能和吞吐量,但是也要考慮下系統的穩定性,不然出現不可預期問題會很麻煩
- 根據實際場景,選擇適用于自己的拒絕策略。進行補償,不要亂用JDK支持的自動補償機制!盡量采用自定義的拒絕策略去進行兜底
利用Hook嵌入你的行為
- 利用Hook,留下線程池執行軌跡
- 例如ThreadPoolExecutor提供了protected類型可以被覆蓋的鉤子方法,允許用戶在任務執行之前會執行之后做一些事情。我們可以通過它來實現比如初始化ThreadLocal、收集統計信息、如記錄日志等操作。這類Hook如beforeExecute和afterExecute。另外還有一個Hook可以用來在任務被執行完的時候讓用戶插入邏輯,如rerminated
- 如果hook方法執行失敗,則內部的工作線程的執行將會失敗或被中斷
???????關閉線程池
- 內容當線程池不在被引用并且工作線程數為0的時候,線程池將被終止。我們也可以調用shutdown來手動終止線程池。如果我們忘記調用shutdown,為了讓線程資源被釋放,我們還可以使用keepAliveTime和allowCoreThreadTimeOut來達到目的
- 當然,穩妥的方式是使用虛擬機Runtime.getRuntime().addShutdownHook方法,手工去調用線程池的關閉方法
相關代碼
package com.bfxy.thread.core.pool;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class UseThreadPoolExecutor {public static void main(String[] args) {ThreadPoolExecutor pool = new ThreadPoolExecutor(1, // corePoolSize: 核心線程數,線程池初始化的時候就會被創建3, // maximumPoolSize: 線程池的最大上限 //在使用無界隊列的時候, 此參數 不起作用60, //線程的存活時間TimeUnit.SECONDS,//workQueue:BlockingQueue接口下面的實現類//new ArrayBlockingQueue<>(2), //使用有界隊列: ArrayBlockingQueuenew LinkedBlockingQueue<>(), //使用無界隊列: LinkedBlockingQueuenew ThreadFactory() { //threadFactory 線程工廠, 用于獲取一個新的線程, 然后把該線程 投遞到我們的線程池中去@Overridepublic Thread newThread(Runnable r) {Thread th = new Thread(r, "order-thread");if(th.getPriority() != Thread.NORM_PRIORITY) {th.setPriority(Thread.NORM_PRIORITY);}if(th.isDaemon()) {th.setDaemon(false);}return th;}}, //使用無界隊列時, 拒絕策略不起到作用new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.err.println("當前的任務已經被拒絕: " + r.toString());}});Task t1 = new Task(1);Task t2 = new Task(2);Task t3 = new Task(3);Task t4 = new Task(4);Task t5 = new Task(5);Task t6 = new Task(6);/**//線程池提交任務的方法:pool.execute(t1); //execute: 如果你的任務沒有返回值, 則使用該方法提交任務pool.submit(t1); //submit: 如果你的任務有返回值, 則使用該方法提交任務, 返回一個Future對象(Future模式)*//*** * 在使用有界隊列時:* 1 若有新的任務需要執行,如果線程池實際線程數小于corePoolSize,則優先創建線程* 2 若大于corePoolSize,則會將任務加入隊列* 3 若隊列已滿,則在總線程數不大于maximumPoolSize的前提下,創建新的線程* 4 若線程數大于maximumPoolSize,則執行拒絕策略。*/// 1 若有新的任務需要執行,如果線程池實際線程數小于corePoolSize,則優先創建線程pool.execute(t1); //core size = 1 t1任務會被核心線程執行// 2 若大于corePoolSize,則會將任務加入隊列pool.execute(t2); // 有界隊列容量為: 2pool.execute(t3);// 3 若隊列已滿,則在總線程數不大于maximumPoolSize的前提下,創建新的線程, 并執行該任務pool.execute(t4); // 線程池中的總線程數 2 , maximumPoolSize = 3 pool.execute(t5); // 線程池中的總線程數 3 , maximumPoolSize = 3 // 4 若線程數大于maximumPoolSize,則執行拒絕策略。pool.execute(t6);pool.shutdown();}
}
?