目錄
引入
標準庫中的線程池 -- ThreadPoolExecutor
研究一下這個方法的幾個參數
1. int corePoolSize
2. int maximumPoolSize?
3. long keepAliveTime
4. TimeUnit unit
6. ThreadFactory threadFactory?
7. RejectedExecutionHandler handler
四種拒絕策略
Executors 工廠類
如何確定線程池的數量???
實現一個簡單的線程池
完
引入
池,是一個非常重要的概念,我們有常量池,數據庫連接池,線程池,進程池,內存池....
池的作用:
? ? ? ? 1. 提前把要用的對象準備好。
? ? ? ? 2. 把用完的對象也不立即釋放,先留著以備下次使用 ==》 提高效率!!!
舉個栗子: 我是個美女,談了一個男朋友。如果我有一天對這個男人厭倦了,如何才能提高更換男朋友的效率呢? ==》 備胎池...
最開始,進程能夠解決并發編程的問題,之后因為頻繁的創建和銷毀進程,成本太高了,所以我們引入了輕量級的進程 -->?線程,但如果創建和銷毀線程的頻率進一步提高,此時線程的創建和銷毀的開銷,也不能夠無視了。(拋開劑量談毒性,都是耍流氓)我們就需要想辦法來優化此處的線程的創建銷毀的效率。
? ? ? ? 1. 引入輕量級 線程 --> 纖程 / 協程(Java 21 里引入 “虛擬線程” 就是這個東西)。協程本質,是程序員在用戶態的代碼中進行調度,不是靠內核的調度器調度 ==》節省了很多調度上的開銷。(我們在此處不做深入研究...)
? ? ? ? 2.?線程池?把要使用的線程提前創建好了,用完了也不直接釋放而是以備下次使用。這樣就節省了創建 / 銷毀 線程的開銷。在這個過程中,并沒有真的頻繁創建銷毀線程,只是從線程池里面,取線程使用,用完了就還給線程池。(在這個過程中,會占用比較多的空間,這個代價是無法避免的,可以接收)
那為什么,從線程池里面取線程,就比從系統申請更加高效呢???
舉個栗子來說明:還是銀行的例子,柜臺里面,相當于內核態,大堂相當于用戶態。
當我們要辦理業務,需要一個身份證復印件的時候,我們并沒有帶,這時候,柜員就說我們有兩個途徑取解決:
? ? ? ? 1. 自己拿著身份證,去自助復印機上復印即可(純用戶態代碼)
? ? ? ? 2. 把身份證交給柜員,他拿著身份證幫你去復印。(柜員拿到我們的身份證之后,就消失在我們的視野中了,此時我們無法知道他要花費多長時間,也不知道他都要做那些事情,我們唯一能做的,就是等,等他回來 ~~)
基本的結論:
如果一個工作,滑稽老鐵自己就能完成,就更加可控,更加高效。 從線程池里面取線程,就是純用戶代碼,就是可控的。
如果一個工作,滑稽老鐵要拜托銀行的柜員來完成,就不可控,更低效。通過系統申請創建線程,就是需要內核來完成的,不太可控。
標準庫中的線程池 -- ThreadPoolExecutor
我們可以在 Java 官方文檔中,找到 java.util.concurrent 包,在下面的 Classes 中就可以找到 ThreadPoolExecutor,往下翻可以找到構造方法,有 4 種
我們只需要關注最后一個即可(最后一個的參數是最全的)
研究一下這個方法的幾個參數
1. int corePoolSize
表示的是?核心線程數(一個線程池里面,最少得有多少個線程)
2. int maximumPoolSize?
表示的是?最大線程數(一個線程池里面,最多最多能有多少個線程)
注意: 標準庫提供的線程池,持有的線程個數,并非是一成不變的,會根據當前任務量,自適應線程的個數。(任務非常多,就多搞幾個線程;任務比較少,就少搞幾個線程)
舉個栗子:假設一個公司,里面有 10 個員工(正式簽勞動合同的員工)。當公司的業務非常繁忙的時候,10 個人干不過來了,就需要招聘,一個成本比較低的做法是,招聘實習生(非正式員工),比如可以再招聘 5 個實習生(廉價勞動力)
過了一段時間,公司沒那么忙了,大家都閑下來開始摸魚了,10 個正式員工,是不能被隨便裁員的(勞動仲裁~~~)但是這 5 個實習生,是可以隨便裁的。把這 5 個實習生裁掉,使當前這 10 個正式員工也沒有那么空閑了,整體的成本就降低了。
如果過了一段時間,公司業務又多了起來,10 個人又忙不過來了,此時重新再招幾個實習生就好啦~~~
通過實習生,來應對突發的峰值!!!
10 個正式員工,就是核心線程數(參數叫 核心線程數, 而不是 最小線程數)
10 + 5 正式員工 + 實習生 就是最大線程數了
3. long keepAliveTime
表示的是?保持存活時間
4. TimeUnit unit
表示的是?時間單位(s,min,ms,hour)
再用我們的栗子解釋:keepAliveTime 就是實習生線程,允許最大的空閑摸魚時間。即,如果發現某個實習生正在摸魚(這個線程空閑了),此時要立即馬上把這個實習生開除掉嗎???不應該!!!可能發生,這邊一空閑馬上就開除,結果下一時刻,任務又突然多起來了!!!
此處的 keepAliveTime 意思就是實習生線程,空閑時間超過了這個時間閾值,就會被銷毀掉。注意:實習生線程被銷毀之后,就沒有了,在未來的某一天,線程還會重新招聘實習生,但不是之前的那個了
5. BolckingQueue<Runnable> workQueue
這個和定時器是類似的阻塞隊列,用來存放等待執行的任務。當核心線程都在忙碌的時候,新任務會被放入這個隊列中排隊等待。用 Runnable 來作為描述任務的主體。 ==》 也可以設置 PriorityBlockingQueue 帶有優先級
6. ThreadFactory threadFactory?
這個表示線程工廠。
工廠模式,也是一種常見的設計模式。通過專門的 “工廠類 / 工廠對象”來創建指定的對象~~
工廠模式,本質上是給 Java 語法填坑的?
舉個栗子:
我們會發現,上面的代碼,無法通過編譯~~
在 c++ / Java 中要想提供多個版本的構造方法,就需要讓這多個方法能夠構成重載
重載的要求:
1. 方法名字相同(構造方法的名字本身都相同)
2. 形參的 個數 / 類型 不同!
上面的代碼不符合第二個要求,所以無法通過編譯。為了解決上述問題,就引入了 “工廠模式”,使用普通的方法來創建對象,把構造方法封裝了一層。
如果把工廠方法放到一個其他的類里面,這個其他的類就叫做“工廠類”。
總的來說,通過靜態方法封裝 new 操作,無需實例化對象,在方法內部設定不同的屬性完成對象初始化,構造對象的過程,就是工廠模式。
回過頭來說我們的參數:ThreadFactory threadFactory 通過這個工廠類,來創建線程對象(Thread 對象)在這個類里面提供了方法(也不一定非得是靜態的)讓方法封裝 new Thread 的操作,并且給 Thread 設置一些屬性,就構成了 ThreadFactory 線程工廠!
7. RejectedExecutionHandler handler
上述參數中,這個是最重要的!!!
這個表示的是拒絕策略。在線程池中,有一個阻塞隊列,能夠容納的元素是有上限的。當任務隊列已經滿了的時候,如果繼續往隊列里面添加元素,那么線程池會怎么辦呢??? ==》 就是這個拒絕策略參數了!
在官方文檔中,構造方法的上面就是拒絕策略
四種拒絕策略
1.?繼續添加任務,直接拋出異常。此時就是“撂挑子”的狀態,新任務 舊任務 都不執行了!!!
2. 新的任務,由添加任務的線程負責執行,此時新的任務會執行,不過并不是線程池執行,而是調用者執行。同事讓我幫忙,我自己都忙的焦頭爛額,只能忙自己的,同事的忙還得他自己解決。
3.?丟棄最老的任務。將最老的任務舍棄一個,然后執行新的任務。
4.?丟棄最新的任務。直接拋棄新的任務,新的任務就無了,不執行了,調用的線程不會進程, 線程池也不會執行。
Executors 工廠類
ThreadPoolExecutor 本身使用起來還是比較復雜, 因此 Java 標準庫還提供了另一個版本,把 ThreadPoolExecutor 給封裝了一下~~
Executors 工廠類,通過這個類來創建出不同的線程池對象(在內部把 ThreadPoolExecutor 創建好了并且設置了不同的參數)
我們可以創建一個線程池用如下的方式
可以看到 Executors 這個工廠類中有許多不同的線程池
newSingleThreadExecutor() 是一個定時器類似物,也能延時執行任務
newScheduleThreadPool 是只包含單個線程的線程池
newCachedThreadPool 是線程數目能夠動態擴容是線程池
newFixedThreadPool() 是線程數目固定的線程池
示例代碼如下:
打印結果如下:
ThreadPoolExecutor 也是通過 submit 添加任務的,只是構造方法不同
什么時候使用 Executors 什么時候使用 ThreadPoolExecutor 呢???
當我們只是簡單使用一個線程池的時候,就可以使用 Executors
當我們需要一個高度定制化的線程池的時候,就可以使用 ThreadPoolExecutor
網上流傳的 阿里巴巴Java開發編程規范中,寫了不建議使用 Executors,一定要使用 ThreadPoolExecutor,用 ThreadPoolExecutor 意味著一切盡在掌握之中,不會出現一些不可控的因素~~ 我們可以參考,但還是要以具體的公司編程規范要求為準啦...
如何確定線程池的數量???
創建線程池的時候,很多時候,需要設定線程池的數量。這個數量應該怎么設置比較合適???我們上面只是隨意的設置了一個 4 ,到底怎么樣是合適的呢?
網上有很多說法,假如 CPU 的邏輯核心數是 N ,網上的說法:線程數量應該是 N,N + 1,1.5N,2N... ==》?都是錯誤的。
不同的程序,能夠設定的線程的數量是不同的,必須要具體問題具體分析。
要區分,一個線程是 CPU 密集型的任務,還是 IO 密集型的任務。
CPU 密集型的任務:這個線程大部分的時間,都在要 CPU 上運行,進行計算。 比如,在線程 run 里面計算 1 + 2 + ... + 10w 這種就是 CPU 密集型
IO 密集型的任務:這個線程大部分的時間都在等待 IO,不需要去?CPU 上運行,比如,線程 run 里,搞一個 scanner,讀取用戶的輸入,就是 IO 密集型
如果一個進程中,所有的線程都是 CPU 密集型的,每個線程所有的工作都是在 CPU 上執行的(假定的一種極端情況~~~)此時,線程的數目就不應該超過 N(CPU 邏輯核心數)
如果一個進程中,所有的線程都是 IO 密集型的,每個線程的大部分工作都是在等待 IO,CPU 消耗非常少,此時線程的數目就可以很多很多,遠遠超過 N(CPU 邏輯核心數)
上面的兩個場景,是兩種非常極端的情況,實際上,一個進程中的線程,一部分是 IO,一份是 CPU,這里的比例是不好確定的。
綜上,由于程序的復雜性,很難直接對線程的數量進行估計。更合適的做法應該是:通過實驗 / 測試的方式,找到合適的線程數目。==》 嘗試給線程池,設定不同的線程數目,分別進行性能測試,衡量每種線程數目下,總的時間開銷 和 系統資源占用的開銷,找到這兩者的合適值。
實現一個簡單的線程池
我們這里直接寫一個固定數目的線程池,暫時不考慮線程數目的增多和減少。
? ? ? ? 1. 提供構造方法,指定創建多少個線程。
? ? ? ? 2. 在構造方法中,把這些線程都創建好
? ? ? ? 3. 有一個阻塞隊列,能夠持有要執行的方法
? ? ? ? 4. 提供 subbmit 方法,可以添加新的任務。
代碼如下:
成員變量有 threadList,用來存儲管理線程;queue 用來保存任務的隊列,這里因為 ArrayBlockingQueue 是線程安全的,所以在下面的構造方法并沒有上鎖。在線程中,利用 n 來創建指定個線程。在 while(true) 循環中,線程持續運行,不斷從任務隊列中 take 任務并 run 執行。
submit 方法:將新的任務添加到隊列中
解釋:
測試代碼:
但是在測試代碼中,發現有一個小小的紅色波浪線,在 i 下面,為啥會編譯報錯呢?
==》?變量捕獲!!!
run 回調函數訪問當前外部作用域的變量就是變量捕獲,我們之前講過,變量捕獲的值,要不然是 final 修飾的常量值,要不然是一個“事實 final” 變量,但現在 i 是一直變化的,怎么辦呢?
在創建一個 n,把 i 賦值給 n,此處的 n 就是一個“事實 final” 變量,每次循環,都是一個新的 n,n 本身沒有改變,就可以被捕獲!!!
運行起來,注意,這些多個線程之間的執行順序的不確定的!!!某個線程獲取到了某個任務,但是并非立即執行,這個過程中可能其他線程就到前面執行了。(此處的這些線程,彼此之間都是等價的...)
完整代碼如下:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;class MyThreadPoolExecutor {// 用 List 數據結構來存儲線程private List<Thread> threadList = new ArrayList<>();// 是一個用來保存任務的隊列private BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);// 通過 n 指定創建多少個線程public MyThreadPoolExecutor (int n) {for (int i = 0; i < n; i++) {Thread t = new Thread(() -> {// 線程要做的事情是 把任務隊列中的任務不停的取出來,并且進行執行while (true) {try {// 此處的 take 是帶有阻塞功能的// 如果隊列為空,此處的 take 會阻塞Runnable runnable = queue.take();// 取出一個任務執行一個任務runnable.run();} catch (InterruptedException e){e.printStackTrace();}}});t.start();threadList.add(t);}}// 將新任務添加到任務隊列里面!public void submit(Runnable runnable) throws InterruptedException {queue.put(runnable);}}
public class ThreadDemo43 {public static void main(String[] args) throws InterruptedException {MyThreadPoolExecutor executor = new MyThreadPoolExecutor(4);for (int i = 0; i < 1000; i++) {int n = i;executor.submit(new Runnable() {@Overridepublic void run() {System.out.println("執行任務 " + n + "當前線程為:+ " + Thread.currentThread().getName());}});}}
}
代碼細節完善:?
? ? ? ? 1. 我們還可以補充一個關閉線程的方法 shutdown
? ? ? ? 2. 補充 shutdown 的成員變量 isShutdown 并且優化捕捉到異常時候的操作
? ? ? ? 3. submit 前先進行一個判斷:
? ? ? ? 4. 打印完畢之后,大約 5s 程序結束
完整改善代碼:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;class MyThreadPoolExecutor {// 用 List 數據結構來存儲線程private List<Thread> threadList = new ArrayList<>();// 是一個用來保存任務的隊列private BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);// 線程是否關閉的標志private volatile boolean isShutdown = false;// 通過 n 指定創建多少個線程public MyThreadPoolExecutor (int n) {for (int i = 0; i < n; i++) {Thread t = new Thread(() -> {// 線程要做的事情是 把任務隊列中的任務不停的取出來,并且進行執行while (!isShutdown) {try {// 此處的 take 是帶有阻塞功能的// 如果隊列為空,此處的 take 會阻塞Runnable runnable = queue.take();// 取出一個任務執行一個任務runnable.run();} catch (InterruptedException e){// 當捕捉到 異常的時候,恢復線程的中斷狀態并退出循環Thread.currentThread().interrupt();break;}}});t.start();threadList.add(t);}}// 將新任務添加到任務隊列里面!public void submit(Runnable runnable) throws InterruptedException {if (isShutdown) {throw new IllegalStateException("線程池已經關閉,無法提交新的任務");}queue.put(runnable);}public void shutdown() {isShutdown = true;for (Thread t : threadList) {t.interrupt();}}
}
public class ThreadDemo43 {public static void main(String[] args) throws InterruptedException {MyThreadPoolExecutor executor = new MyThreadPoolExecutor(4);for (int i = 0; i < 1000; i++) {int n = i;executor.submit(new Runnable() {@Overridepublic void run() {System.out.println("執行任務 " + n + "當前線程為:+ " + Thread.currentThread().getName());}});}Thread.sleep(5000);executor.shutdown();}
}