Java并發編程-線程池
- 線程池運行原理
- 線程池生命周期
- 線程池的核心參數
- 線程池的阻塞隊列
- 線程池的拒絕策略
- 線程池的種類
- newFixedThreadPool
- newSingleThreadExecutor
- newCachedThreadPool
- newScheduledThreadPool
- 創建線程池
- jdk的Executors(不建議,會導致OOM)
- jdk的ThreadPoolExecutor(阿里開發手冊推薦)
- Spring內置的ThreadPoolTaskExecutor(開發常用)
線程池(Thread Pool) 是一種并發編程技術,用于管理一組線程,以便復用這些線程來執行多個任務。使用線程池的核心目的就是用來減少線程的創建和銷毀的開銷,從而能提高系統的響應性能,同時線程池對線程的管理也能避免線程創建過多導致內存溢出。
線程池運行原理
線程池生命周期
- running(運行狀態): 會接收新任務并且會處理隊列中的任務
- shutdown(關閉狀態): 不會接收新任務并且會處理隊列中的任務,任務處理完之后會中斷所有線程
- stop(停止狀態): 不會接收新任務并且不會處理隊列中的任務,并且會直接中斷所有線程
- tidying(整理狀態): 所有線程都停止之后,線程池的狀態就會轉為tidying,一旦達到此狀態,就會調用線程池的terminated(),此方法內部是空的,可由程序員自定義
- terminated(終止狀態): terminated()執行完之后就會轉變為terminated狀態
線程池的核心參數
new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler)
- corePoolSize(核心線程數): 即使空閑也不會被回收的線程數量;
- maximumPoolSize(最大線程數): 線程池允許創建的最大線程數(含核心線程);
- keepAliveTime(空閑線程存活時間): 非核心線程空閑時的存活時間(超時后回收);
- unit(時間單位): keepAliveTime 的時間單位(如秒、毫秒);
- workQueue(任務隊列): 用于緩存未執行任務的阻塞隊列;
- ThreadFactory(線程工廠): 這是一個接口,用于創建新的線程,可自定義線程創建方式;
- RejectedExecutionHandler(拒絕策略): 當任務隊列滿且線程數達到上限時的處理策略;
應用程序大致可分為兩種類型,IO密集型任務和CPU密集型任務。
IO密集型任務,一般指文件讀寫、DB讀寫、網絡請求等,核心線程數大小設置為2N+1
CPU密集型任務,一般指計算型代碼、Bitmap轉換、Gson轉換等,特點是高并發,任務執行時間短,核心線程數大小設置為N+1,可減少線程上下文的切換。
可通過這段代碼獲取CPU的邏輯線程數 int poolSize = Runtime.getRuntime().availableProcessors();
線程池的阻塞隊列
workQueue:當沒有空閑核心線程時,新來任務會加入到此隊列排隊,隊列滿會創建救急線程執行任務。
- ArrayBlockingQueue:基于數組結構的有界阻塞隊列,FIFO
- LinkedBlockingQueue:基于鏈表結構的有界阻塞隊列,FIFO
- DelayedWorkQueue:一個優先級隊列,它可以保證每次出隊的任務都是當前隊列中執行時間最靠前的
- SynchronousQueue:不存儲元素的阻塞隊列,每個插入操作都必須等待一個移出操作。
LinkedBlockingQueue | ArrayBlockingQueue |
---|---|
默認無界,支持有界 | 強制有界 |
底層是鏈表 | 底層是數組 |
是懶惰的,創建節點的時候添加數據 | 提前初始化 Node 數組 |
入隊會生成新 Node | Node需要是提前創建好的 |
兩把鎖(頭尾) | 一把鎖 |
線程池的拒絕策略
- AbortPolicy: 默認的拒絕策略,當任務無法提交的時候就會拋出 RejectedExecutionException 異常
- CallerRunsPolicy: 當任務無法提交的時候就會把這個任務交給調用線程執行,這樣就能避免任務被丟棄,但是有可能會導致調用者線程被阻塞
- DiscardPolicy: 當任務無法提交時,直接丟棄該任務,不做任何處理,如果任務不重要就可以用這個拒絕策略直接丟棄
- DiscardOldestPolicy: 當任務無法提交時,丟棄任務隊列中最舊的任務,然后嘗試重新提交當前任務,適合在需要盡快處理新任務的情況下使用
線程池的種類
newFixedThreadPool
Executors.newFixedThreadPool(nThreads); 創建一個定長的線程池,可以控制線程的最大并發數,超出的線程會在隊列中等待,特點如下:
- 核心線程數和最大線程數相同,意味者線程池能處理的任務就是 核心線程數 + 隊列長度
- 隊列使用了 LinkedBlockingQueue,長度沒有設置,意味者里面用了一個無界隊列,需要注意任務過多導致的內存溢出的問題
- 適用于任務量已知,相對耗時的任務
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
newSingleThreadExecutor
Executors.newSingleThreadExecutor(); 創建一個單線程的線程池,所有任務將按照提交的順序依次執行,特點如下:
- 核心線程數為 1,最大線程數是 1,意味者只有一個線程
- 隊列使用了 LinkedBlockingQueue,容量沒有限制,需要注意任務過多導致的內存溢出的問題
- 適用于對執行順序有要求的任務
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
newCachedThreadPool
Executors.newCachedThreadPool(); 創建一個可緩存的線程池,如果線程池的大小超過了需要,可以靈活回收空閑線程,如果沒有可回收線程,則新建線程,特點如下:
- 核心線程數為 0,最大線程數是 Integer.MAX_VALUE,這意味著能處理的任務數沒有限制,同時創建出來的線程 60S 內沒有處理任務就會被回收掉
- 隊列使用了 SynchronousQueue,不存放任務,需要注意線程創建過多導致的內存溢出的問題
- 適合處理大量短期任務,也就是任務數比較密集,但每個任務執行時間較短的情況
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
newScheduledThreadPool
Executors.newScheduledThreadPool(corePoolSize); 創建一個可調度任務的線程池,適用于需要延遲執行或定期執行任務的場景,特點如下:
- 核心線程數可設置,最大線程數是 Integer.MAX_VALUE,意味者可以接收的任務沒有限制
- 隊列使用了 DelayedWorkQueue,支持延遲執行和定期執行任務,需要注意線程過多導致的內存溢出的問題
- 適合用于執行一些定時任務的場景,比如定時提醒
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
// scheduled 提交任務到線程池中。
// 參數一,提交的任務;參數二,任務執行的延遲時間;參數三,時間單位
scheduled.schedule(() -> {System.out.println("1234");
}, 10, TimeUnit.SECONDS);
創建線程池
jdk的Executors(不建議,會導致OOM)
創建方式參考【線程池的種類】這一章節。Executors返回的線程池對象的弊端如下:
- FixedThreadPool和SingleThreadPool
允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致OOM - CachedThreadPool和ScheduledThreadPool
允許的創建線程數量為 Iteger.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。
jdk的ThreadPoolExecutor(阿里開發手冊推薦)
int corePoolSize = 5;
int maximumPoolSize = 10;
long keepAliveTime = 1;
TimeUnit unit = TimeUnit.MINUTES;
// 阻塞隊列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(5);
// 線程池工廠
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 異常處理策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
// 手動構造線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
使用線程池,并發獲取數據并,整合到一起
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;public class ApplicationMainTest {// 手動構造線程池private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 120, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5));public static void main(String[] args) {// 使用線程安全的 ConcurrentLinkedDeque 存儲查詢結果集ConcurrentLinkedDeque<List<Integer>> resultList = new ConcurrentLinkedDeque<>();threadPoolExecutor.submit(() -> {System.out.println(Thread.currentThread().getName() + " 開始工作...");List<Integer> queryList = buildQueryData(); // 模擬數據查詢操作System.out.println(queryList);resultList.offer(queryList);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " 工作完成!!!");});threadPoolExecutor.submit(() -> {System.out.println(Thread.currentThread().getName() + " 開始工作...");List<Integer> queryList = buildQueryData(); // 模擬數據查詢操作System.out.println(queryList);resultList.offer(queryList);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " 工作完成!!!");});// 關閉線程池threadPoolExecutor.shutdown();while (!threadPoolExecutor.isTerminated()) { // 檢查所有任務是否完成System.out.println("等待所有任務完成...");try {Thread.sleep(100); // 每100毫秒檢查一次} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("所有任務已完成");System.out.println("分隔符==============");// 匯總結果List<Integer> mergedList = new ArrayList<>();for (List<Integer> list : resultList) {mergedList.addAll(list);}System.out.println("Merged List: " + mergedList);}private static List<Integer> buildQueryData() {List<Integer> list = new ArrayList<>();Random rand = new Random();for (int i = 0; i < 3; i++) {list.add(rand.nextInt(100));}return list;}}
Spring內置的ThreadPoolTaskExecutor(開發常用)
Spring框架內置線程池
package cn.study.com.configbean;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
@EnableAsync
public class AsyncScheduledTaskConfig implements AsyncConfigurer {@Bean("checkAsync") // 將當前方法返回的對象,存到容器里public Executor scheduledTaskAsync() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//最大線程數executor.setMaxPoolSize(10);//核心線程數executor.setCorePoolSize(5);//任務隊列大小executor.setQueueCapacity(5);//線程存活時間 當超過30s后,線程池中存有的線程數量大于核心線程,觸發超時回收executor.setKeepAliveSeconds(30);//拒絕處理策略:回收最老的任務executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());// or 自定義拒絕策略【1.記錄錯誤信息 2.通知消息系統,發送告警信息】executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {}});executor.setThreadNamePrefix("Async-Thread-Pool-");//初始化executor.initialize();return executor;}
}
使用注解,通過線程池異步執行方法。
@Async("checkAsync")
public Boolean deleteFolder(String bucketName, String path) {log.info("當前Minio文件夾清除線程:"+Thread.currentThread().getName());MinioFileManager minioFileManager = new MinioFileManager(minioHost, accessKey, secretKey);return minioFileManager.deleteFolder(bucketName, path);
}
依賴注入,并發執行任務。
@Autowired
private ExecutorService executorService;Future<List<User>> f1 = executorService.submit(()-> {List<User> userList = queryUserList();return userList;
});
Future<List<Order>> f2 = executorService.submit(()-> {List<Order> orderList = queryOrderList();return orderList;
});
// 匯總信息
Map<String, Object> resultMap = new HashMap<>();
resultMap.put("user", f1.get());
resultMap.put("order", f2.get());