Java并發編程-線程池

Java并發編程-線程池

  • 線程池運行原理
  • 線程池生命周期
  • 線程池的核心參數
  • 線程池的阻塞隊列
  • 線程池的拒絕策略
  • 線程池的種類
    • newFixedThreadPool
    • newSingleThreadExecutor
    • newCachedThreadPool
    • newScheduledThreadPool
  • 創建線程池
    • jdk的Executors(不建議,會導致OOM)
    • jdk的ThreadPoolExecutor(阿里開發手冊推薦)
    • Spring內置的ThreadPoolTaskExecutor(開發常用)

線程池(Thread Pool) 是一種并發編程技術,用于管理一組線程,以便復用這些線程來執行多個任務。使用線程池的核心目的就是用來減少線程的創建和銷毀的開銷,從而能提高系統的響應性能,同時線程池對線程的管理也能避免線程創建過多導致內存溢出。

線程池運行原理

線程池運行原理

線程池生命周期

線程池生命周期

  1. running(運行狀態): 會接收新任務并且會處理隊列中的任務
  2. shutdown(關閉狀態): 不會接收新任務并且會處理隊列中的任務,任務處理完之后會中斷所有線程
  3. stop(停止狀態): 不會接收新任務并且不會處理隊列中的任務,并且會直接中斷所有線程
  4. tidying(整理狀態): 所有線程都停止之后,線程池的狀態就會轉為tidying,一旦達到此狀態,就會調用線程池的terminated(),此方法內部是空的,可由程序員自定義
  5. terminated(終止狀態): terminated()執行完之后就會轉變為terminated狀態

線程池的核心參數

new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler)
  1. corePoolSize(核心線程數): 即使空閑也不會被回收的線程數量;
  2. maximumPoolSize(最大線程數): 線程池允許創建的最大線程數(含核心線程);
  3. keepAliveTime(空閑線程存活時間): 非核心線程空閑時的存活時間(超時后回收);
  4. unit(時間單位): keepAliveTime 的時間單位(如秒、毫秒);
  5. workQueue(任務隊列): 用于緩存未執行任務的阻塞隊列;
  6. ThreadFactory(線程工廠): 這是一個接口,用于創建新的線程,可自定義線程創建方式;
  7. RejectedExecutionHandler(拒絕策略): 當任務隊列滿且線程數達到上限時的處理策略;

應用程序大致可分為兩種類型,IO密集型任務和CPU密集型任務。
IO密集型任務,一般指文件讀寫、DB讀寫、網絡請求等,核心線程數大小設置為2N+1
CPU密集型任務,一般指計算型代碼、Bitmap轉換、Gson轉換等,特點是高并發,任務執行時間短,核心線程數大小設置為N+1,可減少線程上下文的切換。
可通過這段代碼獲取CPU的邏輯線程數 int poolSize = Runtime.getRuntime().availableProcessors();

線程池的阻塞隊列

workQueue:當沒有空閑核心線程時,新來任務會加入到此隊列排隊,隊列滿會創建救急線程執行任務。

  1. ArrayBlockingQueue:基于數組結構的有界阻塞隊列,FIFO
  2. LinkedBlockingQueue:基于鏈表結構的有界阻塞隊列,FIFO
  3. DelayedWorkQueue:一個優先級隊列,它可以保證每次出隊的任務都是當前隊列中執行時間最靠前的
  4. SynchronousQueue:不存儲元素的阻塞隊列,每個插入操作都必須等待一個移出操作。
LinkedBlockingQueueArrayBlockingQueue
默認無界,支持有界強制有界
底層是鏈表底層是數組
是懶惰的,創建節點的時候添加數據提前初始化 Node 數組
入隊會生成新 NodeNode需要是提前創建好的
兩把鎖(頭尾)一把鎖

線程池的拒絕策略

  1. AbortPolicy: 默認的拒絕策略,當任務無法提交的時候就會拋出 RejectedExecutionException 異常
  2. CallerRunsPolicy: 當任務無法提交的時候就會把這個任務交給調用線程執行,這樣就能避免任務被丟棄,但是有可能會導致調用者線程被阻塞
  3. DiscardPolicy: 當任務無法提交時,直接丟棄該任務,不做任何處理,如果任務不重要就可以用這個拒絕策略直接丟棄
  4. DiscardOldestPolicy: 當任務無法提交時,丟棄任務隊列中最舊的任務,然后嘗試重新提交當前任務,適合在需要盡快處理新任務的情況下使用

線程池的種類

newFixedThreadPool

Executors.newFixedThreadPool(nThreads); 創建一個定長的線程池,可以控制線程的最大并發數,超出的線程會在隊列中等待,特點如下:

  1. 核心線程數和最大線程數相同,意味者線程池能處理的任務就是 核心線程數 + 隊列長度
  2. 隊列使用了 LinkedBlockingQueue,長度沒有設置,意味者里面用了一個無界隊列,需要注意任務過多導致的內存溢出的問題
  3. 適用于任務量已知,相對耗時的任務
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor

Executors.newSingleThreadExecutor(); 創建一個單線程的線程池,所有任務將按照提交的順序依次執行,特點如下:

  1. 核心線程數為 1,最大線程數是 1,意味者只有一個線程
  2. 隊列使用了 LinkedBlockingQueue,容量沒有限制,需要注意任務過多導致的內存溢出的問題
  3. 適用于對執行順序有要求的任務
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool

Executors.newCachedThreadPool(); 創建一個可緩存的線程池,如果線程池的大小超過了需要,可以靈活回收空閑線程,如果沒有可回收線程,則新建線程,特點如下:

  1. 核心線程數為 0,最大線程數是 Integer.MAX_VALUE,這意味著能處理的任務數沒有限制,同時創建出來的線程 60S 內沒有處理任務就會被回收掉
  2. 隊列使用了 SynchronousQueue,不存放任務,需要注意線程創建過多導致的內存溢出的問題
  3. 適合處理大量短期任務,也就是任務數比較密集,但每個任務執行時間較短的情況
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

newScheduledThreadPool

Executors.newScheduledThreadPool(corePoolSize); 創建一個可調度任務的線程池,適用于需要延遲執行或定期執行任務的場景,特點如下:

  1. 核心線程數可設置,最大線程數是 Integer.MAX_VALUE,意味者可以接收的任務沒有限制
  2. 隊列使用了 DelayedWorkQueue,支持延遲執行和定期執行任務,需要注意線程過多導致的內存溢出的問題
  3. 適合用于執行一些定時任務的場景,比如定時提醒
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
// scheduled 提交任務到線程池中。
// 參數一,提交的任務;參數二,任務執行的延遲時間;參數三,時間單位
scheduled.schedule(() -> {System.out.println("1234");
}, 10, TimeUnit.SECONDS);

創建線程池

jdk的Executors(不建議,會導致OOM)

創建方式參考【線程池的種類】這一章節。Executors返回的線程池對象的弊端如下:

  1. FixedThreadPool和SingleThreadPool
    允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致OOM
  2. CachedThreadPool和ScheduledThreadPool
    允許的創建線程數量為 Iteger.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。

jdk的ThreadPoolExecutor(阿里開發手冊推薦)

int corePoolSize = 5;
int maximumPoolSize = 10;
long keepAliveTime = 1;
TimeUnit unit = TimeUnit.MINUTES;
// 阻塞隊列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(5);
// 線程池工廠
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 異常處理策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
// 手動構造線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);

使用線程池,并發獲取數據并,整合到一起


import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;public class ApplicationMainTest {// 手動構造線程池private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 120, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5));public static void main(String[] args) {// 使用線程安全的 ConcurrentLinkedDeque 存儲查詢結果集ConcurrentLinkedDeque<List<Integer>> resultList = new ConcurrentLinkedDeque<>();threadPoolExecutor.submit(() -> {System.out.println(Thread.currentThread().getName() + " 開始工作...");List<Integer> queryList = buildQueryData(); // 模擬數據查詢操作System.out.println(queryList);resultList.offer(queryList);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " 工作完成!!!");});threadPoolExecutor.submit(() -> {System.out.println(Thread.currentThread().getName() + " 開始工作...");List<Integer> queryList = buildQueryData(); // 模擬數據查詢操作System.out.println(queryList);resultList.offer(queryList);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " 工作完成!!!");});// 關閉線程池threadPoolExecutor.shutdown();while (!threadPoolExecutor.isTerminated()) { // 檢查所有任務是否完成System.out.println("等待所有任務完成...");try {Thread.sleep(100); // 每100毫秒檢查一次} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("所有任務已完成");System.out.println("分隔符==============");// 匯總結果List<Integer> mergedList = new ArrayList<>();for (List<Integer> list : resultList) {mergedList.addAll(list);}System.out.println("Merged List: " + mergedList);}private static List<Integer> buildQueryData() {List<Integer> list = new ArrayList<>();Random rand = new Random();for (int i = 0; i < 3; i++) {list.add(rand.nextInt(100));}return list;}}

Spring內置的ThreadPoolTaskExecutor(開發常用)

Spring框架內置線程池

package cn.study.com.configbean;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
@EnableAsync
public class AsyncScheduledTaskConfig implements AsyncConfigurer {@Bean("checkAsync")  // 將當前方法返回的對象,存到容器里public Executor scheduledTaskAsync() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//最大線程數executor.setMaxPoolSize(10);//核心線程數executor.setCorePoolSize(5);//任務隊列大小executor.setQueueCapacity(5);//線程存活時間 當超過30s后,線程池中存有的線程數量大于核心線程,觸發超時回收executor.setKeepAliveSeconds(30);//拒絕處理策略:回收最老的任務executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());// or 自定義拒絕策略【1.記錄錯誤信息 2.通知消息系統,發送告警信息】executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {}});executor.setThreadNamePrefix("Async-Thread-Pool-");//初始化executor.initialize();return executor;}
}

使用注解,通過線程池異步執行方法。

@Async("checkAsync")
public Boolean deleteFolder(String bucketName, String path) {log.info("當前Minio文件夾清除線程:"+Thread.currentThread().getName());MinioFileManager minioFileManager = new MinioFileManager(minioHost, accessKey, secretKey);return minioFileManager.deleteFolder(bucketName, path);
}

依賴注入,并發執行任務。

@Autowired
private ExecutorService executorService;Future<List<User>> f1 = executorService.submit(()-> {List<User> userList = queryUserList();return userList;
});
Future<List<Order>> f2 = executorService.submit(()-> {List<Order> orderList = queryOrderList();return orderList;
});
// 匯總信息
Map<String, Object> resultMap = new HashMap<>();
resultMap.put("user", f1.get());
resultMap.put("order", f2.get());

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/77791.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/77791.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/77791.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【前沿】成像“跨界”測量——掃焦光場成像

01 背景 眼睛是人類認識世界的重要“窗口”&#xff0c;而相機作為眼睛的“延伸”&#xff0c;已經成為生產生活中最常見的工具之一&#xff0c;廣泛應用于工業檢測、醫療診斷與影音娛樂等領域。傳統相機通常以“所見即所得”的方式記錄場景&#xff0c;傳感器捕捉到的二維圖像…

TM1640學習手冊及示例代碼

數據手冊 TM1640數據手冊 數據手冊解讀 這里我們看管腳定義DIN和SCLK&#xff0c;一個數據線一個時鐘線 SEG1~SEG8為段碼&#xff0c;GRID1~GRID16為位碼&#xff08;共陰極情況下&#xff09; 這里VDD給5V 數據指令 數據命令設置 地址命令設置 顯示控制命令 共陰極硬件連接圖…

uni-app 開發企業級小程序課程

課程大小&#xff1a;7.7G 課程下載&#xff1a;https://download.csdn.net/download/m0_66047725/90616393 更多資源下載&#xff1a;關注我 備注&#xff1a;缺少兩個視頻5-14 tabs組件進行基本的數據展示和搜索歷史 處理searchData的刪除操作 1-1導學.mp4 2-10小程序內…

判斷點是否在多邊形內

代碼段解析: const intersect = ((yi > y) !== (yj > y)) && (x < (xj - xi) * (y - yi) / (yj - yi) + xi); 第一部分:(yi > y) !== (yj > y) 作用:檢查點 (x,y) 的垂直位置是否跨越多邊形的當前邊。 yi > y 和 yj > y 分別檢查邊的兩個端…

【redis】集群 如何搭建集群詳解

文章目錄 集群搭建1. 創建目錄和配置2. 編寫 docker-compose.yml完整配置文件 3. 啟動容器4. 構建集群超時 集群搭建 基于 docker 在我們云服務器上搭建出一個 redis 集群出來 當前節點&#xff0c;主要是因為我們只有一個云服務器&#xff0c;搞分布式系統&#xff0c;就比較…

[langchain教程]langchain03——用langchain構建RAG應用

RAG RAG過程 離線過程&#xff1a; 加載文檔將文檔按一定條件切割成片段將切割的文本片段轉為向量&#xff0c;存入檢索引擎&#xff08;向量庫&#xff09; 在線過程&#xff1a; 用戶輸入Query&#xff0c;將Query轉為向量從向量庫檢索&#xff0c;獲得相似度TopN信息將…

C語言復習筆記--字符函數和字符串函數(下)

在上篇我們了解了部分字符函數及字符串函數,下面我們來看剩下的字符串函數. strstr 的使用和模擬實現 老規矩,我們先了解一下strstr這個函數,下面看下這個函數的函數原型. char * strstr ( const char * str1, const char * str2); 如果沒找到就返回NULL指針. 下面我們看下它的…

FreeRTOS中的優先級翻轉問題及其解決方案:互斥信號量詳解

FreeRTOS中的優先級翻轉問題及其解決方案&#xff1a;互斥信號量詳解 在實時操作系統中&#xff0c;任務調度是基于優先級的&#xff0c;高優先級任務應該優先于低優先級任務執行。但在實際應用中&#xff0c;有時會出現"優先級翻轉"的現象&#xff0c;嚴重影響系統…

深度學習-全連接神經網絡

四、參數初始化 神經網絡的參數初始化是訓練深度學習模型的關鍵步驟之一。初始化參數&#xff08;通常是權重和偏置&#xff09;會對模型的訓練速度、收斂性以及最終的性能產生重要影響。下面是關于神經網絡參數初始化的一些常見方法及其相關知識點。 官方文檔參考&#xff1…

GIS開發筆記(9)結合osg及osgEarth實現三維球經緯網格繪制及顯隱

一、實現效果 二、實現原理 按照5的間隔分別創建經緯線的節點,掛在到組合節點,組合節點掛接到根節點。可以根據需要設置間隔度數和線寬、線的顏色。 三、參考代碼 //創建經緯線的節點 osg::Node *GlobeWidget::createGraticuleGeometry(float interval, const osg::Vec4 …

《Relay IR的基石:expr.h 中的表達式類型系統剖析》

TVM Relay源碼深度解讀 文章目錄 TVM Relay源碼深度解讀一 、從Constant看Relay表達式的設計哲學1. 類定義概述2. ConstantNode 詳解1. 核心成員2. 關鍵方法3. 類型系統注冊 3. Constant 詳解1. 核心功能 二. 核心內容概述(1) Relay表達式基類1. RelayExprNode 和 RelayExpr 的…

自動駕駛地圖數據傳輸協議ADASIS v2

ADASIS&#xff08;Advanced Driver Assistance Systems Interface Specification&#xff09;直譯過來就是 ADAS 接口規格&#xff0c;它要負責的東西其實很簡單&#xff0c;就是為自動駕駛車輛提供前方道路交通相關的數據&#xff0c;這些數據被抽象成一個標準化的概念&#…

Flutter 狀態管理 Riverpod

Android Studio版本 Flutter SDK 版本 將依賴項添加到您的應用 flutter pub add flutter_riverpod flutter pub add riverpod_annotation flutter pub add dev:riverpod_generator flutter pub add dev:build_runner flutter pub add dev:custom_lint flutter pub add dev:riv…

【EasyPan】MySQL主鍵與索引核心作用解析

【EasyPan】項目常見問題解答&#xff08;自用&持續更新中…&#xff09;匯總版 MySQL主鍵與索引核心作用解析 一、主鍵&#xff08;PRIMARY KEY&#xff09;核心作用 1. 數據唯一標識 -- 創建表時定義主鍵 CREATE TABLE users (id INT AUTO_INCREMENT PRIMARY KEY,use…

IcePlayer音樂播放器項目分析及學習指南

IcePlayer音樂播放器項目分析及學習指南 項目概述 IcePlayer是一個基于Qt5框架開發的音樂播放器應用程序&#xff0c;使用Visual Studio 2013作為開發環境。該項目實現了音樂播放、歌詞顯示、專輯圖片獲取等功能&#xff0c;展現了桌面應用程序開發的核心技術和設計思想。 技…

vscode 打開新頁簽

目錄 vscode 打開新頁簽 完整settings.json內容&#xff1a; vscode 打開新頁簽 .vscode目錄中 新建settings.json 在 settings.json 文件中&#xff0c;添加或修改以下行&#xff1a; json "workbench.editor.enablePreview": false 這將禁用預覽模式&#xff0…

C語言高頻面試題——常量指針與指針常量區別

1. 常量指針&#xff08;Pointer to Constant&#xff09; 定義&#xff1a; 常量指針是指向一個常量數據的指針&#xff0c;即指針指向的內容不能通過該指針被修改。 語法&#xff1a; const int* ptr;或者&#xff1a; int const* ptr;解釋&#xff1a; const修飾的是指…

c++基礎·列表初始化

目錄 一、列表初始化的核心優勢 二、基礎數據類型與數組初始化 1. 基礎類型初始化 2. 數組初始化 三、類與結構體初始化 1. 構造函數匹配規則 2. 注意事項 四、標準容器初始化 五、聚合類型&#xff08;Aggregate Types&#xff09;初始化 1. 聚合類型定義 2. 初始化…

數據分析與產品、運營、市場之間如何有效對齊

數據分析的重要性在于它能夠將海量的原始信息轉化為可操作的洞察。以產品開發為例,通過用戶行為數據的分析,產品經理可以清晰了解哪些功能被頻繁使用,哪些設計導致用戶流失,從而優化迭代方向。運營團隊則依靠數據分析來監控供應鏈效率、預測需求波動,甚至通過實時數據調整…

[C]基礎11.深入理解指針(3)

博客主頁&#xff1a;向不悔本篇專欄&#xff1a;[C]您的支持&#xff0c;是我的創作動力。 文章目錄 0、總結1、字符指針變量2、數組指針變量2.1 數組指針變量是什么&#xff1f;2.2 數組指針變量怎么初始化&#xff1f; 3、二維數組傳參的本質4、函數指針變量4.1 函數指針變量…