手寫一個簡單的線程池
項目倉庫:https://gitee.com/bossDuy/hand-tearing-thread-pool
基于一個b站up的課程:https://www.bilibili.com/video/BV1cJf2YXEw3/?spm_id_from=333.788.videopod.sections&vd_source=4cda4baec795c32b16ddd661bb9ce865
理解線程池的原理
線程池就是為了減少頻繁的創建和銷毀線程帶來的性能損耗,工作原理:
在這里插入圖片描述
簡單的說:線程池就是有一個存放線程的集合和一個存放任務的阻塞隊列。當提交一個任務的時候,判斷核心線程是否滿了,沒滿就會創建一個核心線程加入線程池并且執行任務,核心線程是不會被銷毀的即使沒有任務執行;滿了就會放入任務隊列等待;如果隊列滿了的話就會創建非核心線程進行執行任務,這些非核心線程在不執行任務的時候就會等一段時間銷毀(配置的過期時間),如果創建的線程達到了最大線程數,那么就會執行拒絕策略。
可以簡要整理如下:
提交任務 -> 核心任務是否已滿為滿,創建核心線程并執行任務已滿,則加入任務隊列隊列未滿 -> 等待執行隊列已滿 -> 創建非核心線程達到線程最大數量 -> 拒絕策略未達到最大數量 -> 執行任務
自己實現簡單的線程池
第一步:實現了一個線程復用的線程池
package com.yb0os1;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class MyThreadPool {//1、線程什么時候創建?/**核心線程中我們要保證線程是可以復用的,那么就不可以直接new Thread(task).start(); 這樣執行完task線程就會被銷毀了我們將接收到的任務對象放到隊列中,然后線程從隊列中取出任務,通過任務的run方法進行調用,這樣就是在該線程上調用任務,并且調用完后不會銷毀線程*///2、我們一開始使用 while (true) if(!tasks.isEmpty()) Runnable task = tasks.remove(0);/**這樣如果任務隊列一直為空就會一循環,消耗cpu資源。此時就是阻塞隊列出現了,當為空阻塞等待 非空執行*/BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(1024);Thread thread = new Thread(()->{while (true){if(!taskQueue.isEmpty()){try {Runnable task = taskQueue.take();task.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}}},"唯一線程");{thread.start();//啟動線程}public void execute(Runnable task){taskQueue.offer(task);//向隊列添加元素 盡量是否offer 滿則返回false add滿則排除異常}
}
package com.yb0os1;public class Main {public static void main(String[] args) {MyThreadPool myThreadPool = new MyThreadPool();for (int i = 0; i < 5; i++) {myThreadPool.execute(()->{try {Thread.sleep(1000);} catch (InterruptedException e) {//InterruptedException這個是線程中斷異常,// 這個異常一般都是線程在等待或者阻塞中被中斷了就會拋出的,// sleep wait等等都是有,除了LockSupport.park 這個會記錄中斷位 不會拋出這個異常e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"執行完畢");});}System.out.println("主線程沒有被阻塞");}
}
測試結果:
第二步:實現多個線程復用的線程池
package com.yb0os1;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class MyThreadPool {//任務隊列private final BlockingQueue<Runnable> taskQueue;//核心線程的數量private final int corePoolSize;//最大線程的數量private final int maxPoolSize;private final int keepAliveTime;private final TimeUnit unit;public MyThreadPool(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> taskQueue) {this.corePoolSize = corePoolSize;this.maxPoolSize = maxPoolSize;this.keepAliveTime = keepAliveTime;this.unit = unit;this.taskQueue = taskQueue;}//核心線程List<Thread> coreList = new ArrayList<>();//非核心線程List<Thread> supportList = new ArrayList<>();//添加元素和判斷長度不是原子的,所以存在線程安全問題 可以加鎖 CAS等解決public void execute(Runnable command) {//目前線程列表中線程數量小于核心線程的數量,則創建線程if (coreList.size() < corePoolSize) {Thread thread = new CoreThread();coreList.add(thread);thread.start();
// return;}//成功添加到阻塞隊列if (taskQueue.offer(command)) {return;}//任務隊列也滿了 需要創建非核心線程//核心線程滿 任務隊列滿 但是非核心線程沒有滿才可以添加if (coreList.size() + supportList.size() < maxPoolSize) {Thread thread = new SupportThread();supportList.add(thread);thread.start();return;}//我們創建完線程之后 并沒有處理剛才的command 不能確定是否隊列真的滿了if (!taskQueue.offer(command)) {//真的滿了 拋出異常throw new RuntimeException("線程池已滿");}}class CoreThread extends Thread {@Overridepublic void run() {while (true) {try {Runnable task = taskQueue.take();task.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}}}class SupportThread extends Thread {@Overridepublic void run() {while (true) {try {Runnable command = taskQueue.poll(keepAliveTime, unit);//等待一秒沒有獲取就會返回nullif (command == null) {//線程結束break;}command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println(Thread.currentThread().getName()+"非核心線程結束");supportList.remove(Thread.currentThread());System.out.println("當前非核心線程數量為:" + supportList.size());}}
}
package com.yb0os1;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;public class Main {public static void main(String[] args) {MyThreadPool myThreadPool = new MyThreadPool(2,4,1, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2));for (int i = 0; i < 4; i++) {myThreadPool.execute(()->{try {Thread.sleep(1000);} catch (InterruptedException e) {//InterruptedException這個是線程中斷異常,// 這個異常一般都是線程在等待或者阻塞中被中斷了就會拋出的,// sleep wait等等都是有,除了LockSupport.park 這個會記錄中斷位 不會拋出這個異常e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"執行完畢");});}System.out.println("主線程沒有被阻塞");}
}
存在問題,任務沒有被正確的執行:
b站評論區指出的:if (blockingQueue.offer(command)) { return; }
這里如果任務成功放入隊列,方法就直接 return 了。 但在 創建 SupportThread 的邏輯中,沒有保證這個任務會被執行,因為 offer() 失敗后你才創建新線程。 但 command 并沒有交給這個新線程,而是再次嘗試 offer(),如果失敗就直接走拒絕策略了。 這樣的話,可能 SupportThread 已經啟動,但任務卻沒被執行。
理解:如果隊列滿了,我們創建非核心線程,但是并沒有將這任務直接交給我們創建的新線程,而是再次嘗試加入隊列中,這就導致了一個不確定的狀態:
- 如果此時隊列還是滿的(
offer
返回false
),就會直接拋出異常,任務未被執行 - 如果隊列此時恰好有空間(可能因為其他線程剛剛完成了任務,從而騰出了隊列空間),那么任務會被放入隊列,后續由某個線程(可能是核心線程,也可能是其他非核心線程)從隊列中取出并執行。但新創建的非核心線程可能并沒有真正處理這個任務。
解決方案:如果隊列滿了,我們要創建非核心線程并且由這個線程執行任務
也可以說 讓線程執行當前 command 之后,再從 queue 中拿任務
第三步:修復bug 設計拒絕策略
package com.yb0os1;import com.yb0os1.reject.DiscardRejectHandle;
import com.yb0os1.reject.ThrowRejectHandle;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;public class Main {public static void main(String[] args) {MyThreadPool myThreadPool = new MyThreadPool(2,4,1, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2),new DiscardRejectHandle());for (int i = 0; i < 8; i++) {int finalI = i;myThreadPool.execute(()->{try {Thread.sleep(100);} catch (InterruptedException e) {//InterruptedException這個是線程中斷異常,// 這個異常一般都是線程在等待或者阻塞中被中斷了就會拋出的,// sleep wait等等都是有,除了LockSupport.park 這個會記錄中斷位 不會拋出這個異常e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"執行完畢---"+ finalI);});}System.out.println("主線程沒有被阻塞");}
}
package com.yb0os1;import com.yb0os1.reject.RejectHandle;import java.sql.SQLOutput;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;public class MyThreadPool {public BlockingQueue<Runnable> getTaskQueue() {return taskQueue;}//任務隊列private final BlockingQueue<Runnable> taskQueue;//核心線程的數量private final int corePoolSize;//最大線程的數量private final int maxPoolSize;private final int keepAliveTime;private final TimeUnit unit;//拒絕策略private final RejectHandle rejectHandle;public MyThreadPool(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> taskQueue, RejectHandle rejectHandle) {this.corePoolSize = corePoolSize;this.maxPoolSize = maxPoolSize;this.keepAliveTime = keepAliveTime;this.unit = unit;this.taskQueue = taskQueue;this.rejectHandle = rejectHandle;}//核心線程List<Thread> coreList = new ArrayList<>();//非核心線程List<Thread> supportList = new ArrayList<>();//添加元素和判斷長度不是原子的,所以存在線程安全問題 可以加鎖 CAS等解決public void execute(Runnable command) {//目前線程列表中線程數量小于核心線程的數量,則創建線程if (coreList.size() < corePoolSize) {Thread thread = new CoreThread(command);coreList.add(thread);thread.start();return;}//成功添加到阻塞隊列if (taskQueue.offer(command)) {return;}//任務隊列也滿了 需要創建非核心線程//核心線程滿 任務隊列滿 但是非核心線程沒有滿才可以添加if (coreList.size() + supportList.size() < maxPoolSize) {Thread thread = new SupportThread(command);supportList.add(thread);thread.start();return;}//我們創建完線程之后 并沒有處理剛才的command 不能確定是否隊列真的滿了if (!taskQueue.offer(command)) {//真的滿 使用拒絕策略rejectHandle.reject(command,this);}}//優先處理傳過來的 然后再去阻塞隊列中獲取class CoreThread extends Thread {private final Runnable command;CoreThread(Runnable command) {this.command = command;}@Overridepublic void run() {command.run();while (true) {try {Runnable task = taskQueue.take();System.out.println("核心線程"+Thread.currentThread().getName()+"正在執行任務");task.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}}}class SupportThread extends Thread {private final Runnable command;SupportThread(Runnable command) {this.command = command;}@Overridepublic void run() {command.run();while (true) {try {Runnable command = taskQueue.poll(keepAliveTime, unit);//等待一秒沒有獲取就會返回nullif (command == null) {//線程結束break;}command.run();} catch (InterruptedException e) {throw new RuntimeException(e);}}System.out.println(Thread.currentThread().getName()+"非核心線程結束");supportList.remove(Thread.currentThread());
// System.out.println("當前非核心線程數量為:" + supportList.size());}}
}
package com.yb0os1.reject;import com.yb0os1.MyThreadPool;public interface RejectHandle {void reject(Runnable command, MyThreadPool myThreadPool);
}
package com.yb0os1.reject;import com.yb0os1.MyThreadPool;public class DiscardRejectHandle implements RejectHandle{@Overridepublic void reject(Runnable command, MyThreadPool myThreadPool) {myThreadPool.getTaskQueue().poll();System.out.println("任務被丟棄");}
}
package com.yb0os1.reject;import com.yb0os1.MyThreadPool;public class ThrowRejectHandle implements RejectHandle{@Overridepublic void reject(Runnable command, MyThreadPool myThreadPool) {throw new RuntimeException("線程池已滿");}
}
思考
1.你能給線程池增加一個shutdown功能嗎
答:關閉線程池分兩種情況, 一個是清空任務隊列、線程全部完成任務后關閉; 二是等線程完成后直接關,不管隊列中的任務。
2、怎么理解拒絕策略
答:首先它是一個策略模式,在線程池的代碼中,當任務隊列滿時就會觸發該接口的方法,所以我們只要實現這個接口方法,再把實現類傳入線程池即可,并且方法里還可以拿到被拒絕的任務、線程池對象來實現自己的拒絕邏輯。
3、ThreadFactory參數
答:這個參數是線程池用來創建核心、輔助線程的方法,我們可以自定義線程名稱等參數。