目錄
一、需求背景
項目背景:電控數據管理系統優化
優化方案:引入OLAP數據庫和動態線程池
線程池性能急需解決的問題
資源過載與閑置的平衡:
優先級處理與公平性:
任務類型適配性:
二、線程池介紹
2.1、線程池的核心設計分析
1. 頂層接口:Executor(任務提交與執行解耦)
2. 擴展接口:ExecutorService(增強任務管理和控制能力)
3. 抽象類:AbstractExecutorService(串聯任務執行流程)
4. 具體實現類:ThreadPoolExecutor(線程與任務協同管理)
2.2 線程池的執行流程
2.3 線程池中需要關注的幾個點
2.3.1 線程池狀態維護設計
1.?ctl變量的作用
2.?位運算的原理
3.?關鍵計算方法
2.3.2 線程池中的參數可以動態修改嗎
2.3.3 核心線程數可以單獨進行設置嗎
2.3.4? 隊列的數量怎么設置
2.3.4 線程池被創建后沒有任務過來,里面是不會有線程的,如何預熱。
2.3.4.1 全部啟動
2.3.4.2 只啟動一個
三、代碼實現
3.1 監聽naocs的配置中心,如果有變化更新參數
3.2 構建優先級枚舉類及其注冊分級線程池
3.2 構建線程池管理類和可調整隊列
3.2 構建具體任務類 和 場景策略枚舉調用
一、需求背景
項目背景:電控數據管理系統優化
電控數據管理系統主要用于處理汽車電子控制單元(ECU)的相關數據文件,包括解析、比對、合成和校驗等功能。核心文件包括:
- A2L文件:大小約10~20MB,每個文件包含約8000個變量(如傳感器參數)。
- Hex/S19文件:大小4~10MB,存儲ECU所有變量的地址和值(如內存映射數據)。
- DCM文件:大小約1MB,包含1~5000個變量(具體數量取決于主機廠的管理規則)。
系統需要執行以下關鍵操作:
- 文件解析:將文件內容解析為可處理的數據結構(如變量列表)。
- 參數比對:比較不同文件中的上萬個參數,涉及單值、數組和MAP類型(例如,檢查A2L和Hex文件中的變量是否一致)。
- 文件合成:合并多個文件(如基于比對結果生成新文件),涉及上萬文件的處理。
- 文件校驗:驗證數據完整性(如檢查文件是否損壞或篡改)。
- 數據入庫:將解析結果存儲到數據庫,供后續查詢和分析。
- 等其他耗時計算的操作
這些操作都是CPU密集型任務(需要大量計算),尤其在批量操作(如同時處理多個文件)或多用戶并發訪問時,負載急劇增加。例如:
- 單個文件解析可能需要幾秒到幾分鐘,取決于文件大小和復雜度。
- 當10個用戶同時上傳文件時,系統可能面臨數十個并發任務,導致響應延遲或資源爭用。
優化方案:引入OLAP數據庫和動態線程池
為解決性能瓶頸,引入兩項技術:
- OLAP數據庫(Doris):項目中用到了大量的分析功能,需要設計海量數據的處理分析。Doris支持列式存儲和實時分析用于高效存儲和查詢海量數據,能加速數據入庫和比對操作(例如,查詢變量值時,響應時間從秒級降到毫秒級)。
- 動態優先級線程池:用于管理任務執行線程,根據系統負載自動調整資源。線程池的核心是動態分配線程數,優先處理高優先級任務(如用戶實時請求),避免低優先級任務(如后臺批量處理)阻塞系統。
線程池性能急需解決的問題
動態線程池能顯著提升系統性能,但必須解決以下關鍵問題,否則在高負載下可能導致系統崩潰或效率低下:
-
資源過載與閑置的平衡:
- 問題:在用戶量激增時(如多用戶批量上傳文件),線程數不足會導致任務排隊,響應時間變慢(例如,解析延遲從1秒增至10秒)。反之,線程過多會耗盡CPU,引發系統卡頓。
- 急需解決:線程池必須根據解析時間(任務耗時)和用戶量(并發請求數)動態調整線程數。例如,當用戶量增加時,自動增加線程;當任務耗時較長時,減少線程以避免爭用。
-
優先級處理與公平性:
- 問題:不同任務優先級不同(如用戶交互操作優先于后臺合成),但靜態線程池可能讓低優先級任務“餓死”高優先級任務(例如,批量文件合成占用所有線程,導致用戶無法及時查看結果)。
- 急需解決:線程池需支持動態優先級隊列。高優先級任務(如文件校驗)應優先獲取線程;低優先級任務(如批量入庫)在系統空閑時執行。這需要算法實時計算任務權重(如基于用戶等待時間或任務類型)。
-
任務類型適配性:
- 問題:文件解析任務差異大(如A2L文件解析比DCM更耗時),線程池若“一刀切”分配線程,會造成資源浪費(例如,簡單任務占用線程但快速完成,復雜任務等待)。
- 急需解決:線程池應識別任務類型(如通過預估解析時間),并動態分配線程資源。例如,復雜任務(耗時>5秒)分配更多線程,簡單任務(耗時<1秒)合并處理。
二、線程池介紹
2.1、線程池的核心設計分析
(參考了美團的線程池文章)
ThreadPoolExecutor從繼承關系入手,整體采用分層設計:從頂層接口到抽象類,再到具體實現。以下按邏輯順序解析核心組件。
1. 頂層接口:Executor(任務提交與執行解耦)
- 核心思想:將任務提交和任務執行分離,用戶無需關心線程創建或調度細節。
- 工作方式:用戶提供Runnable對象(定義任務邏輯),Executor負責線程調配和任務執行。
- 優勢:簡化并發編程,提升代碼可維護性。
2. 擴展接口:ExecutorService(增強任務管理和控制能力)
- 功能擴展:在Executor基礎上,添加兩類關鍵能力:
- 任務執行增強:支持異步任務處理,如生成Future對象(用于跟蹤一批任務結果)。
- 線程池管控:提供停止線程池等方法(如shutdown()),實現更精細的資源控制。
3. 抽象類:AbstractExecutorService(串聯任務執行流程)
- 承上啟下作用:作為ExecutorService的實現骨架,將任務執行流程標準化。
- 設計目的:抽象通用邏輯(如任務提交、結果處理),確保子類僅需關注核心執行方法。
- 簡化開發:降低底層實現復雜度,便于擴展新線程池類型。
4. 具體實現類:ThreadPoolExecutor(線程與任務協同管理)
- 核心職責:作為最終實現,同時處理兩方面:
- 生命周期維護:管理線程池狀態(如運行、關閉)。
- 資源協調:高效調度線程與任務,實現并行執行(例如,通過工作隊列和線程復用機制)。
- 核心參數說明(標紅部分是最核心的參數部分,也是需要關注配置的地方):
-
corePoolSize(核心線程數大小):不管它們創建以后是不是空閑的。線程池需要保持 corePoolSize 數量的線程,除非設置了 allowCoreThreadTimeOut。
-
maximumPoolSize(最大線程數):線程池中最多允許創建 maximumPoolSize 個線程。
-
keepAliveTime:(存活時間):如果經過 keepAliveTime 時間后,超過核心線程數的線程還沒有接受到新的任務,那就回收。
-
unit(keepAliveTime 的時間單位。)
-
workQueue(存放待執行任務的隊列):當提交的任務數超過核心線程數大小后,再提交的任務就存放在這里。它僅僅用來存放被 execute 方法提交的 Runnable 任務。
-
threadFactory(線程工程):用來創建線程工廠。比如這里面可以自定義線程名稱,看著名字就知道這個線程是哪里來的。
-
handler?(拒絕策略):當隊列里面放滿了任務、最大線程數的線程都在工作時,這時繼續提交的任務線程池就處理不了,應該執行怎么樣的拒絕策略。
-
2.2 線程池的執行流程
運行邏輯可分為兩個核心部分:
-
任務管理(生產者角色)
當任務提交時,線程池根據當前狀態決定任務的流轉:- 如果線程可用,直接分配線程執行任務。
- 如果線程繁忙,將任務緩沖到隊列中等待。
- 如果系統過載,拒絕該任務。
-
線程管理(消費者角色)
線程池維護一組線程,它們:- 主動獲取并執行任務。
- 任務完成后,繼續從隊列中獲取新任務。
- 當長時間無任務可獲取時,線程被回收以節省資源。
2.3 線程池中需要關注的幾個點
2.3.1 線程池狀態維護設計
線程池內部使用一個名為ctl
的原子整數變量來高效維護運行狀態和線程數量,避免不一致性問題并提升性能。
1.?ctl
變量的作用
private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,?0));
ctl
是一個整數變量(具體是AtomicInteger
類型),它同時存儲兩個關鍵信息:- 運行狀態(runState):表示線程池的生命周期狀態(如運行中、關閉等),存儲在
ctl
的高3位。 - 線程數量(workerCount):表示當前活躍線程的數量,存儲在
ctl
的低29位。
- 運行狀態(runState):表示線程池的生命周期狀態(如運行中、關閉等),存儲在
- 用一個變量存儲兩個值,好處是:在決策時(如添加或移除線程),無需額外鎖來保證狀態和數量的一致性,提高了并發效率。
2.?位運算的原理
ctl
的高3位和低29位互不干擾,通過位運算快速提取或組合值:- 這種設計基于二進制表示,例如:
- 如果
ctl
的值是二進制xxx yyyyy...
(x
代表高3位狀態,y
代表低29位數量),則:- 提取狀態時,只保留高3位。
- 提取數量時,只保留低29位。
- 如果
3.?關鍵計算方法
線程池提供了三個方法來操作ctl
:
- 獲取運行狀態:
runStateOf(c)
?- 解釋:通過按位與操作,屏蔽掉低29位,只保留高3位的狀態值。
-
private?static?int?runStateOf(int?c){?return?c?&?~CAPACITY;?}?//計算當前運行狀態
- 獲取線程數量:
workerCountOf(c)
- 解釋:通過按位與操作,屏蔽掉高3位,只保留低29位的數量值。
-
private?static?int?workerCountOf(int?c){?return?c?&?CAPACITY;?}//計算當前線程數量
- 組合狀態和數量:
ctlOf(rs, wc)
- 解釋:通過按位或操作,將高3位的狀態和低29位的數量合并成一個新值。
-
private?static?int?ctlOf(int?rs,?int?wc)?{?return?rs?|?wc;?}?//通過狀態和線程數生成ctl
2.3.2 線程池中的參數可以動態修改嗎
當線程池在運行期調用?setCorePoolSize
?方法設置新的?corePoolSize
?值時,線程池會立即覆蓋原有的?corePoolSize
?值。隨后,基于新值與原始值的比較結果,采取以下處理策略:
-
如果新值小于當前工作線程數:
表明存在多余的 worker 線程。線程池會向當前空閑(idle)的 worker 線程發送中斷請求以啟動回收過程;多余的 worker 線程在下次空閑時也會被回收。 -
如果新值大于原始值且任務隊列中有待執行任務:
線程池會創建新的 worker 線程,以執行隊列中的任務。
此設計確保了線程池資源能動態調整,以適應配置變化。
源碼中也寫了可以動態進行修改設置。
設置最大線程數量同時可以進行設置
2.3.3 核心線程數可以單獨進行設置嗎
此處需要注意的是最大線程數和核心線程數要一起進行設置。如果當前核心是3 最大線程是5,我們在單獨設置核心線程數為7,這個時候核心線程數會大于最大線程數。活躍線程數是不會變的,所以會出現設置出現問題,所以建議設置參數的時候一起進行設置
ava.util.concurrent.ThreadPoolExecutor#getTask
2.3.4? 隊列的數量怎么設置
隊列的 capacity 是被 final 修飾了,所以沒辦法進行設置,需要我們進行重寫一個類,把final去掉就可以設置線程池隊列的大小了。
2.3.4 線程池被創建后沒有任務過來,里面是不會有線程的,如何預熱。
2.3.4.1 全部啟動
2.3.4.2 只啟動一個
三、代碼實現
ok,下面進行代碼的展示,如何寫一個支持基于nacos動態刷新的分級線程池
3.1 監聽naocs的配置中心,如果有變化更新參數
/*** Nacos配置監聽,動態調整線程池參數*/
@RefreshScope
@Component
public class NacosThreadPoolConfig implements InitializingBean {private static final Logger logger = LoggerFactory.getLogger(DynamicStrategyTask.class);@Resourceprivate ThreadPoolManager threadPoolManager;@Autowiredprivate NacosConfigManager nacosConfigManager;@Autowiredprivate NacosConfigProperties nacosConfigProperties;/*** 監聽Nacos配置變化,動態調整線程池*/public void onConfigChange(String config) {try {// 解析為實體類ThreadPoolProperties TPconfig = YamlParser.parseToObject(config);// 處理高優先級線程池配置handleThreadPoolConfig(TPconfig, HIGH);// 處理中優先級線程池配置handleThreadPoolConfig(TPconfig, MEDIUM);// 處理低優先級線程池配置handleThreadPoolConfig(TPconfig, LOW);} catch (Exception e) {// 處理配置解析異常e.printStackTrace();}}private void handleThreadPoolConfig(ThreadPoolProperties configJson, PriorityEnum priority) {// 解析為Mapif(checkNotNull(configJson)){int core = configJson.getCore().getSize();int max = configJson.getMax().getSize();int queue = configJson.getQueue().getSize();switch (priority){case HIGH:core = core * 2;max = max * 2;break;case MEDIUM:queue = queue * 2;break;case LOW:core = core / 2;max = max / 2;queue = queue * 3;break;default:break;}threadPoolManager.adjustThreadPool(priority, core, max, queue);}}private boolean checkNotNull(ThreadPoolProperties configJson) {if (configJson == null || configJson.getCore()== null|| configJson.getMax() == null || configJson.getQueue() == null){return false;}return true;}@Overridepublic void afterPropertiesSet() throws Exception {//nacos配置變更監聽nacosConfigManager.getConfigService().addListener("thread-pool-config", nacosConfigProperties.getGroup(),new Listener() {@Overridepublic Executor getExecutor() {return null;}@Overridepublic void receiveConfigInfo(String configInfo) {//配置變更,修改線程池配置logger.info("線程池參數有變動請留意:【%s】",configInfo);onConfigChange(configInfo);}});}
3.2 構建優先級枚舉類及其注冊分級線程池
/*** 任務優先級枚舉*/
public enum PriorityEnum {HIGH(3),MEDIUM(2),LOW(1);private final int level;PriorityEnum(int level) {this.level = level;}public int getLevel() {return level;}public static PriorityEnum fromLevel(int level) {for (PriorityEnum priority : values()) {if (priority.level == level) {return priority;}}return MEDIUM;}
}
/*** 線程池配置類*/
@Configuration
@ConfigurationProperties(prefix = "threadpool")
public class ThreadPoolConfig {@Value("${threadpool.core-pool-size}")private Integer corePoolSize;@Value("${threadpool.maximum-pool-size}")private Integer maximumPoolSize;@Value("${threadpool.queue-capacity}")private Integer queueCapacity;/*** 高優先級線程池*/@Beanpublic DynamicThreadPool highPriorityThreadPool() {DynamicThreadPool threadPool = new DynamicThreadPool(PriorityEnum.HIGH);// 可以根據需要設置不同優先級線程池的默認參數threadPool.setCorePoolSize(corePoolSize * 2);threadPool.setMaxPoolSize(maximumPoolSize * 2);threadPool.setQueueCapacity(queueCapacity);threadPool.setThreadNamePrefix("High-Priority-Thread-");return threadPool;}/*** 中優先級線程池*/@Beanpublic DynamicThreadPool mediumPriorityThreadPool() {DynamicThreadPool threadPool = new DynamicThreadPool(PriorityEnum.MEDIUM);threadPool.setCorePoolSize(corePoolSize);threadPool.setMaxPoolSize(maximumPoolSize);threadPool.setQueueCapacity(queueCapacity * 2);threadPool.setThreadNamePrefix("Medium-Priority-Thread-");return threadPool;}/*** 低優先級線程池*/@Beanpublic DynamicThreadPool lowPriorityThreadPool() {DynamicThreadPool threadPool = new DynamicThreadPool(PriorityEnum.LOW);threadPool.setCorePoolSize(corePoolSize / 2);threadPool.setMaxPoolSize(maximumPoolSize / 2);threadPool.setQueueCapacity(queueCapacity * 3);threadPool.setThreadNamePrefix("Low-Priority-Thread-");return threadPool;}
}
/*** 動態線程池,支持動態調整參數*/
public class DynamicThreadPool extends ThreadPoolTaskExecutor {@Getterprivate final PriorityEnum priority;public DynamicThreadPool(PriorityEnum priority) {this.priority = priority;}/*** 動態調整線程池參數*/public void adjustParameters(int corePoolSize, int maximumPoolSize, int queueCapacity) {// 調整核心線程數setCorePoolSize(corePoolSize);// 調整最大線程數setMaxPoolSize(maximumPoolSize);setQueueCapacity(queueCapacity);}@Overrideprotected BlockingQueue<Runnable> createQueue(int queueCapacity) {if (queueCapacity > 0) {return new ResizeCapacityLinkedBlockingQueue<>(queueCapacity);}else {return new SynchronousQueue<>();}}
}
3.2 構建線程池管理類和可調整隊列
/*** 線程池管理器,管理不同優先級的線程池*/
@Component
public class ThreadPoolManager {// 用枚舉作為鍵存儲線程池private final Map<PriorityEnum, DynamicThreadPool> threadPools;// 自動注入所有DynamicThreadPool類型的Bean(key為Bean名稱,value為Bean實例)@Autowiredpublic ThreadPoolManager(List<DynamicThreadPool> dynamicThreadPools) {this.threadPools = new EnumMap<>(PriorityEnum.class);// 將Bean按名稱匹配到對應的枚舉for (DynamicThreadPool dtp : dynamicThreadPools) {try {// 假設Bean名稱與枚舉名一致(如"HIGH"對應PriorityEnum.HIGH)threadPools.put(dtp.getPriority(), dtp);} catch (IllegalArgumentException e) {// 處理不匹配的Bean名稱(可選:日志告警或忽略)System.out.println("未匹配到枚舉的線程池Bean:" + dtp.getPriority());}}}// 根據優先級獲取線程池public DynamicThreadPool getThreadPool(PriorityEnum priority) {return threadPools.get(priority);}/*** 提交任務到合適的線程池*/public <T> CompletableFuture<T> submit(PriorityTask<T> task) {PriorityEnum priority = task.getPriority();DynamicThreadPool threadPool = threadPools.get(priority);if (threadPool == null) {throw new IllegalArgumentException("No thread pool found for priority: " + priority);}// 可以根據任務預估時長做更精細的路由調整return CompletableFuture.supplyAsync(() -> {try {return task.call();} catch (Exception e) {throw new RuntimeException(e);}}, threadPool);}/*** 動態調整線程池參數*/public void adjustThreadPool(PriorityEnum priority, int corePoolSize, int maxPoolSize, int queueCapacity) {DynamicThreadPool threadPool = threadPools.get(priority);if (threadPool != null) {threadPool.adjustParameters(corePoolSize, maxPoolSize, queueCapacity);}}/*** 獲取線程池狀態信息*/public Map<String, Object> getThreadPoolStatus() {Map<String, Object> statusMap = new HashMap<>();threadPools.forEach((priority, pool) -> {Map<String, Object> poolStatus = new java.util.HashMap<>();poolStatus.put("corePoolSize", pool.getCorePoolSize());poolStatus.put("maxPoolSize", pool.getMaxPoolSize());poolStatus.put("queueCapacity", pool.getQueueCapacity());poolStatus.put("activeCount", pool.getActiveCount());poolStatus.put("queueSize", pool.getThreadPoolExecutor().getQueue().size());poolStatus.put("completedTaskCount", pool.getThreadPoolExecutor().getCompletedTaskCount());statusMap.put(priority.name(), poolStatus);});return statusMap;}
ublic class ResizeCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {//其他省略private int capacity;
}
3.2 構建具體任務類 和 場景策略枚舉調用
public class DynamicStrategyTask implements PriorityTask<String> {private static final Logger logger = LoggerFactory.getLogger(DynamicStrategyTask.class);private final String taskName;private final PriorityEnum priority;private final EvenStrategyInterface strategy;private final BaseDomainEvent event;public DynamicStrategyTask(String taskName, PriorityEnum priority, EvenStrategyInterface strategy,BaseDomainEvent event) {this.taskName = taskName;this.priority = priority;this.strategy = strategy;this.event = event;}@Overridepublic PriorityEnum getPriority() {return priority;}@Overridepublic long estimateExecutionTime() {return 0;}@Overridepublic String call() throws Exception {// 模擬任務執行Date startTime = new Date();logger.info("任務【%s】開始執行,執行線程為【%s】,開始時間【%s】",taskName,Thread.currentThread().getName(), startTime);strategy.execute(event);Date endTime = new Date();long diff = DateUtils.diff(endTime,startTime);logger.info("任務【%s】執行結束,執行線程為【%s】,結束時間【%s】,計時【%s】",taskName,Thread.currentThread().getName(), endTime,diff);return String.format("任務 [%s] 執行完成,優先級: %s,耗時: %dms",taskName, priority,diff);}
}
@Component
public class EvenStrategyContext {private static final Logger logger = LoggerFactory.getLogger(EvenStrategyContext.class);private EvenStrategyInterface strategy;@Autowiredprivate ThreadPoolManager threadPoolManager;public EvenStrategyContext getEvenStrategyContext(String cycleTypeCode) {if (EvenTypeEnum.getByDomainEvent(cycleTypeCode) == null || EvenTypeEnum.getByDomainEvent(cycleTypeCode).getEvenStrategyInterface() == null){this.strategy = null;}else{this.strategy = EvenTypeEnum.getByDomainEvent(cycleTypeCode).getEvenStrategyInterface();}return this;}public void execute(BaseDomainEvent event) {if (strategy == null){logger.info("沒有對應的監聽事件,以及對應的處理方法,請檢查傳入時間【%s】",event.getEventType());}else{threadPoolManager.submit(new DynamicStrategyTask(event.getEventType().getValue(), PriorityEnum.fromLevel(event.getEventType().getPriority()),strategy,event));}}
}
最后在說一下,關于線程池如何調整,美團給了一個公式,但是我目前還是根據經驗調整,監控動態線程池的執行情況適量進行調整,畢竟一般的公司服務器資源不是很充足,一個服務器可能部署很多的服務會占用線程資源,所以還得根據 實際情況,綜合進行考慮。