并發編程——15 線程池ForkJoinPool實戰及其工作原理分析

1 一道算法題引發的思考及其實現

1.1 算法題

  • 問:如何充分利用多核 CPU 的性能,快速對一個2千萬大小的數組進行排序?
    • 這道題可以通過歸并排序來解決;

1.2 什么是歸并排序?

  • 歸并排序(Merge Sort)是基于分治思想的排序算法,基本思想是把一個大數組分成兩個等大的子數組,對每個子數組分別排序,再將排好序的兩個子數組合并成一個有序大數組,常通過遞歸實現,步驟為分(拆分數組為子數組)、治(子數組排序)、合(合并有序子數組)。它的時間復雜度是 (O(n\log n)),空間復雜度是 (O(n))((n) 為數組長度)。

  • 分治思想是把規模為 (N) 的問題分解成 (K) 個規模更小、相互獨立且與原問題性質相同的子問題,解決子問題后,合并子問題的解得到原問題的解。步驟分為分解(拆分原問題為子問題)、求解(子問題足夠小時用簡單方法解決)、合并(合并子問題的解得到原問題解)。像歸并排序、快速排序、二分查找這些計算機經典算法都基于分治思想。分治任務模型圖:

    在這里插入圖片描述

  • 動圖演示:Comparison Sorting Visualization。

1.3 使用歸并排序解決算法題

1.3.1 單線程實現歸并排序

  • 單線程歸并算法的實現,它的基本思路是將序列分成兩個部分,分別進行遞歸排序,然后將排序好的子序列合并起來;

    public class MergeSort {private final int[] arrayToSort;  // 待排序的原始數組private final int threshold;      // 遞歸拆分的最小閾值,當子數組長度小于此值時將直接使用Arrays.sort()排序/*** 構造函數,初始化待排序數組和閾值* @param arrayToSort 待排序數組* @param threshold 拆分閾值*/public MergeSort(final int[] arrayToSort, final int threshold) {this.arrayToSort = arrayToSort;this.threshold = threshold;}/*** 對類內數組進行順序歸并排序的入口方法* @return 排序后的數組*/public int[] sequentialSort() {return sequentialSort(arrayToSort, threshold);}/*** 靜態方法:遞歸實現歸并排序* @param arrayToSort 待排序數組* @param threshold 拆分閾值* @return 排序后的數組*/public static int[] sequentialSort(final int[] arrayToSort, int threshold) {// 如果當前數組長度小于閾值,則直接使用JDK內置排序(通常是快速排序優化)if (arrayToSort.length < threshold) {Arrays.sort(arrayToSort);return arrayToSort;}// 計算中點,拆分數組為左右兩部分int midpoint = arrayToSort.length / 2;int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint);   // 左半部分 [0, midpoint)int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length); // 右半部分 [midpoint, end)// 遞歸排序左右子數組leftArray = sequentialSort(leftArray, threshold);rightArray = sequentialSort(rightArray, threshold);// 合并兩個已排序的子數組return merge(leftArray, rightArray);}/*** 合并兩個已排序的數組* @param leftArray 已排序的左數組* @param rightArray 已排序的右數組* @return 合并后的有序數組*/public static int[] merge(final int[] leftArray, final int[] rightArray) {// 創建合并后的數組,長度為兩數組之和int[] mergedArray = new int[leftArray.length + rightArray.length];int mergedArrayPos = 0;  // 合并數組的當前位置int leftArrayPos = 0;    // 左數組的當前指針int rightArrayPos = 0;   // 右數組的當前指針// 同時遍歷左右數組,按升序選擇較小的元素放入合并數組while (leftArrayPos < leftArray.length && rightArrayPos < rightArray.length) {if (leftArray[leftArrayPos] <= rightArray[rightArrayPos]) {mergedArray[mergedArrayPos] = leftArray[leftArrayPos];leftArrayPos++;} else {mergedArray[mergedArrayPos] = rightArray[rightArrayPos];rightArrayPos++;}mergedArrayPos++;}// 如果左數組還有剩余元素,全部追加到合并數組while (leftArrayPos < leftArray.length) {mergedArray[mergedArrayPos] = leftArray[leftArrayPos];leftArrayPos++;mergedArrayPos++;}// 如果右數組還有剩余元素,全部追加到合并數組while (rightArrayPos < rightArray.length) {mergedArray[mergedArrayPos] = rightArray[rightArrayPos];rightArrayPos++;mergedArrayPos++;}return mergedArray;}
    }  
    

1.3.2 Fork/Join實現并行歸并排序

  • 并行歸并排序是一種利用多線程實現的歸并排序算法。它的基本思路是將數據分成若干部分,然后在不同線程上對這些部分進行歸并排序,最后將排好序的部分合并成有序數組。在多核 CPU 上,這種算法也能夠有效提高排序速度;

  • 可以使用 Java 的 Fork/Join 框架來實現歸并排序的并行化:

    public class MergeSortTask extends RecursiveAction {private final int threshold; // 任務拆分的最小閾值,當子數組長度小于等于此值時將直接進行排序private int[] arrayToSort;   // 當前任務需要排序的數組/*** 構造函數,初始化待排序數組和閾值* @param arrayToSort 待排序數組* @param threshold 拆分閾值*/public MergeSortTask(final int[] arrayToSort, final int threshold) {this.arrayToSort = arrayToSort;this.threshold = threshold;}/*** Fork/Join框架的核心方法,實現任務的分解與執行*/@Overrideprotected void compute() {// 如果數組長度小于等于閾值,直接進行排序(基準情況)if (arrayToSort.length <= threshold) {Arrays.sort(arrayToSort); // 使用JDK優化排序算法return;}// 計算中點,將數組拆分為左右兩個子數組int midpoint = arrayToSort.length / 2;int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint);    // 左半部分 [0, midpoint)int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length); // 右半部分 [midpoint, end)// 創建左右子任務MergeSortTask leftTask = new MergeSortTask(leftArray, threshold);MergeSortTask rightTask = new MergeSortTask(rightArray, threshold);// 將左右任務提交到Fork/Join線程池異步執行(fork操作)leftTask.fork();  // 將左任務提交到工作隊列,由其他工作線程執行rightTask.fork(); // 將右任務提交到工作隊列,由其他工作線程執行// 等待左右子任務執行完成(join操作)leftTask.join();  // 阻塞當前線程直到左任務完成rightTask.join(); // 阻塞當前線程直到右任務完成// 合并兩個已排序的子數組結果arrayToSort = MergeSort.merge(leftTask.getSortedArray(), rightTask.getSortedArray());}/*** 獲取排序后的數組* @return 排序完成的數組*/public int[] getSortedArray() {return arrayToSort;}
    }
    
  • 在這個示例中,使用 Fork/Join 框架實現了歸并排序算法,并通過遞歸調用實現了并行化;

  • 使用 Fork/Join 框架實現歸并排序算法的關鍵在于將排序任務分解成小的任務,使用 Fork/Join 框架將這些小任務提交給線程池中的不同線程并行執行,并在最后將排序后的結果進行合并。這樣可以充分利用多核CPU的并行處理能力,提高程序的執行效率。

1.3.3 測試結果對比

  • 測試代碼:

    public class Utils {/*** 生成指定大小的隨機整數數組* @param size 數組的大小* @return 包含隨機整數的數組*/public static int[] buildRandomIntArray(final int size) {int[] arrayToCalculateSumOf = new int[size];Random generator = new Random();// 生成0-100,000,000范圍內的隨機整數填充數組for (int i = 0; i < arrayToCalculateSumOf.length; i++) {arrayToCalculateSumOf[i] = generator.nextInt(100000000);}return arrayToCalculateSumOf;}
    }public class ArrayToSortMain {public static void main(String[] args) {// 生成包含2000萬個隨機整數的測試數組int[] arrayToSortByMergeSort = Utils.buildRandomIntArray(20000000);// 創建數組副本,確保兩種排序方法使用相同的數據進行公平比較int[] arrayToSortByForkJoin = Arrays.copyOf(arrayToSortByMergeSort, arrayToSortByMergeSort.length);// 獲取當前系統的處理器核心數,用于優化并行計算int processors = Runtime.getRuntime().availableProcessors();// 創建單線程歸并排序實例,使用處理器數量作為拆分閾值MergeSort mergeSort = new MergeSort(arrayToSortByMergeSort, processors);long startTime = System.nanoTime(); // 記錄開始時間(納秒級精度)// 執行單線程歸并排序mergeSort.mergeSort();long duration = System.nanoTime()-startTime; // 計算耗時// 將納秒轉換為毫秒并輸出結果System.out.println("單線程歸并排序時間: "+(duration/(1000f*1000f))+"毫秒");// 創建Fork/Join并行排序任務,同樣使用處理器數量作為閾值MergeSortTask mergeSortTask = new MergeSortTask(arrayToSortByForkJoin, processors);// 構建Fork/Join線程池,線程數設置為處理器核心數ForkJoinPool forkJoinPool = new ForkJoinPool(processors);startTime = System.nanoTime(); // 重新記錄開始時間// 在線程池中執行并行排序任務(invoke會阻塞直到任務完成)forkJoinPool.invoke(mergeSortTask);duration = System.nanoTime()-startTime; // 計算并行排序耗時// 輸出并行排序時間System.out.println("fork/join排序時間: "+(duration/(1000f*1000f))+"毫秒");}
    }
    
  • 測試結果:數組越大,利用 Fork/Join 框架實現的并行化歸并排序比單線程歸并排序的效率更高

    在這里插入圖片描述

1.4 并行實現歸并排序的優化和注意事項

  • 在實際應用里,為充分利用多核CPU,同時保障算法的正確性與效率,得考量數據分布均勻性、內存使用、線程切換開銷等因素;

  • 具體而言:

    • 任務大小:它的選擇會影響并行算法的效率與負載均衡。任務太小,任務劃分和合并的開銷就會過大;任務太大,又難以充分利用多核 CPU 的并行處理能力。所以要結合數據量、CPU 核心數等,選合適的任務大小;

    • 負載均衡:并行算法得保證各線程執行的任務大小和時間盡可能相等,不然有的線程會負載過重,有的則過輕。歸并排序可通過遞歸調用來實現負載均衡,但遞歸層數不能太深,否則會增加線程創建和合并的開銷;

    • 數據分布:數據分布均勻與否會影響并行算法效率和負載均衡。若數據分布不均,會出現有的線程處理數據量過大,有的過小的情況。實際應用要考慮數據分布,盡量把數據分成大小相等的子數組;

    • 內存使用:處理大規模數據時,內存使用情況對算法執行效率至關重要。歸并排序可通過原地歸并節約內存,但要注意歸并的實現方式,避免數據覆蓋和不穩定排序等問題;

    • 線程切換:線程切換是并行算法的一大開銷,要盡量減少切換次數以提升效率。歸并排序可通過設置線程池大小和調整任務大小來控制線程數量和切換開銷,實現最優性能。

2 Fork/Join框架

2.1 簡介

  • Fork/Join 是一個支持分治任務模型的并行計算框架,其中:

    • Fork:對應分治任務模型里的任務分解,就是把一個大任務拆分成許多小任務;
    • Join:對應結果合并,即等小任務并行執行完后,將它們的結果合并成一個大結果;
  • 它適用于可采用分治策略的計算密集型任務,像大規模數組排序、圖形渲染、復雜算法求解等場景;

  • 下圖清晰展示了其工作流程:大任務(Big Task)先通過 Fork 不斷分解為更小的子任務(SubTask),子任務按順序計算得到各自結果(Result),之后通過 Join 逐步合并這些結果,最終得到大任務的最終結果(Final Result);

    在這里插入圖片描述

2.2 應用場景

  • 并行計算:能便捷執行大規模計算任務,把大任務分解為小任務,借助工作竊取算法并行執行,充分利用多核處理器優勢,提升計算效率;
  • 遞歸任務處理:適合遞歸式的任務分解與執行,可遞歸地將大任務拆成諸多小任務,再通過工作竊取算法動態分配給工作線程執行;
  • 并行流操作:Java 8 引入的 Stream API 用于集合的函數式編程操作,ForkJoinPool 常用來執行并行流操作里的并行計算部分,比如對元素進行過濾、映射、聚合等;
  • 高性能任務執行:提供高性能的任務執行機制,通過動態調度任務和管理線程池,有效利用系統資源,在多核處理器上實現任務并行執行。

2.3 使用

  • Fork/Join 框架的主要組成部分是 ForkJoinPool、ForkJoinTask;
    • ForkJoinPool 是一個線程池,它用于管理 Fork/Join 中任務的執行;
    • ForkJoinTask 是一個抽象類,用于表示可以被分割成更小部分的任務。

2.3.1 ForkJoinPool

  • ForkJoinPool 是 Fork/Join 框架中的線程池類,它用于管理 Fork/Join 任務的線程;
  • ForkJoinPool 類包括一些重要的方法,例如submit()invoke()shutdown()awaitTermination()等,用于提交任務、執行任務、關閉線程池和等待任務的執行結果;
  • ForkJoinPool 類中還包括一些參數,例如線程池的大小、工作線程的優先級、任務隊列的容量等,可以根據具體的應用場景進行設置。

2.3.2 構造器

  • ForkJoinPool 有四個核心參數,用于控制線程池的并行數、工作線程創建、異常處理和模式指定:

    • int parallelism:指定并行級別(parallelism level),ForkJoinPool 會據此決定工作線程數量。若未設置,會用 Runtime.getRuntime().availableProcessors() 獲取可用處理器數量來設置并行級別;

    • ForkJoinWorkerThreadFactory factoryForkJoinPool 創建線程時,通過該工廠創建。需實現 ForkJoinWorkerThreadFactory 接口,若不指定,由默認的 DefaultForkJoinWorkerThreadFactory 負責線程創建;

    • UncaughtExceptionHandler handler:指定異常處理器,當任務運行出錯時,由該處理器處理異常;

    • boolean asyncMode:設置隊列工作模式。asyncModetrue 時,使用先進先出隊列;為 false 時,使用后進先出模式;

  • 代碼示例:

    // 獲取處理器數量
    int processors = Runtime.getRuntime().availableProcessors();
    // 構建forkjoin線程池,用獲取到的處理器數量作為并行級別參數
    ForkJoinPool forkJoinPool = new ForkJoinPool(processors);
    

2.3.3 任務提交方式

返回值方法
提交異步執行voidexecute(ForkJoinTask)<?> task):提交一個ForkJoinTask類型的任務進行異步執行
execute(Runnable task):提交一個Runnable類型的任務進行異步執行
等待并獲取結果Tinvoke(ForkJoinTask task):提交ForkJoinTask類型的任務,等待任務執行完畢后獲取結果
提交執行獲取Future結果ForkJoinTasksubmit(ForkJoinTask task):提交ForkJoinTask類型的任務,返回對應的 ForkJoinTask
submit(Callable task):提交Callable類型的任務(Callable 有返回值),返回對應的ForkJoinTask
submit(Runnable task):提交Runnable類型的任務(Runnable 無返回值),返回對應的ForkJoinTask
submit(Runnable task, T result):提交Runnable類型的任務,同時指定一個結果,返回對應的ForkJoinTask

2.3.4 調用方法

  • ForkJoinTask 最核心的是fork()方法和join()方法,承載著主要的任務協調作用,一個用于任務提交,一個用于結果獲取;

  • fork():用于向當前任務所運行的線程池中提交任務。如果當前線程是 ForkJoinWorkerThread 類型,將會放入該線程的工作隊列,否則放入 common 線程池的工作隊列中;

    在 Fork/Join 框架中,“該線程的工作隊列”指的是當前執行任務的 ForkJoinWorkerThread 所擁有的工作隊列

    • ForkJoinWorkerThreadForkJoinPool 中用于執行任務的工作線程。每個 ForkJoinWorkerThread 內部都維護了一個雙端隊列(Deque),這個隊列就是“該線程的工作隊列”;
    • 當通過 fork() 方法提交任務時,如果當前線程是 ForkJoinWorkerThread 類型,任務會被放入當前 ForkJoinWorkerThread 自己的雙端隊列中,后續該線程會從自己的隊列里取出任務執行;
    • 這樣的設計是為了配合“工作竊取(Work - Stealing)”算法:當一個線程的隊列中沒有任務時,它可以去其他線程的隊列中“竊取”任務來執行,從而提高線程的利用率和整體并行效率;
  • join():用于獲取任務的執行結果。調用join()時,將阻塞當前線程直到對應的子任務完成運行并返回結果。

2.3.5 處理遞歸任務

  • 比如計算斐波那契數列:

    public class Fibonacci extends RecursiveTask<Integer> {final int n;  // 要計算斐波那契數列的第n項Fibonacci(int n) {this.n = n;}/*** 重寫RecursiveTask的compute()方法,實現斐波那契數列的遞歸計算* @return 斐波那契數列的第n項值*/protected Integer compute() {// 基準情況:斐波那契數列的前兩項直接返回if (n <= 1)return n;// 創建計算F(n-1)的子任務Fibonacci f1 = new Fibonacci(n - 1);// 將子任務提交到ForkJoinPool異步執行(fork操作)f1.fork();// 創建計算F(n-2)的子任務,并在當前線程同步執行Fibonacci f2 = new Fibonacci(n - 2);// 合并結果:F(n) = F(n-1) + F(n-2)return f2.compute() + f1.join();  // f2.compute()同步計算,f1.join()等待異步任務完成}public static void main(String[] args) {// 構建forkjoin線程池(使用默認并行級別,通常是處理器核心數)ForkJoinPool pool = new ForkJoinPool();// 創建計算斐波那契數列第10項的任務Fibonacci task = new Fibonacci(10);// 提交任務到線程池并阻塞等待執行完成,返回最終結果int result = pool.invoke(task);System.out.println(result);  // 輸出結果:55}
    }
    

    繼承RecursiveTask:表示這是一個有返回值的遞歸任務,返回Integer類型結果

    斐波那契數列定義F(0)=0,F(1)=1,F(n)=F(n?1)+F(n?2)F(0)=0, F(1)=1, F(n)=F(n-1)+F(n-2)F(0)=0,F(1)=1,F(n)=F(n?1)+F(n?2)

    任務分解策略

    • F(n)F(n)F(n)分解為F(n?1)F(n-1)F(n?1)F(n?2)F(n-2)F(n?2)兩個子問題
    • F(n?1)F(n-1)F(n?1)通過fork()異步執行
    • F(n?2)F(n-2)F(n?2)通過compute()在當前線程同步執行

    結果合并:使用f1.join()等待異步任務完成,然后與同步計算結果相加

    Fork/Join工作流程

    • fork():將任務推入工作隊列,供其他工作線程竊取執行
    • join():阻塞當前線程直到任務完成并返回結果

    性能問題:這種實現方式存在大量重復計算,時間復雜度為O(2n)O(2^n)O(2n),實際應用中應使用動態規劃等優化方法

    線程池使用pool.invoke(task)同步提交任務并等待結果,適合計算密集型遞歸任務

  • 如果 n 為100000,執行上面的代碼會發生什么問題?在上面的例子中,由于遞歸計算 Fibonacci 數列的任務數量呈指數級增長,當 n 較大時,就容易出現 StackOverflowError 錯誤。這個錯誤通常發生在遞歸過程中,由于遞歸過程中每次調用函數都會在棧中創建一個新的棧幀,當遞歸深度過大時,棧空間就會被耗盡,導致 StackOverflowError 錯誤;

  • 那么如何解決棧溢出呢?可以使用迭代的方式計算 Fibonacci 數列,以避免遞歸過程中占用大量的棧空間。示例代碼:

    public class Fibonacci {public static void main(String[] args) {int n = 100000;  // 要計算斐波那契數列的第100000項// 創建數組用于存儲斐波那契數列值,長度為n+1(包含0到n項)long[] fib = new long[n + 1];// 初始化斐波那契數列的前兩項fib[0] = 0;  // 第0項為0fib[1] = 1;  // 第1項為1// 使用動態規劃(自底向上)方法計算斐波那契數列for (int i = 2; i <= n; i++) {// 遞推公式:F(n) = F(n-1) + F(n-2)fib[i] = fib[i - 1] + fib[i - 2];}// 輸出斐波那契數列的第100000項System.out.println(fib[n]);}
    }
    
  • 對于一些遞歸深度較大的任務,使用 Fork/Join 框架可能會出現任務調度內存消耗的問題;

    • 當遞歸深度較大時,會產生大量的子任務,這些子任務可能被調度到不同的線程中執行,而線程的創建和銷毀以及任務調度的開銷都會占用大量的資源,從而導致性能下降;
    • 此外,對于遞歸深度較大的任務,由于每個子任務所占用的棧空間較大,可能會導致內存消耗過大,從而引起內存溢出的問題;
    • 因此,在使用 Fork/Join 框架處理遞歸任務時,需要根據實際情況來評估遞歸深度和任務粒度,以避免任務調度和內存消耗的問題。如果遞歸深度較大,可以嘗試采用其他方法來優化算法,如使用迭代方式替代遞歸,或者限制遞歸深度來減少任務數量,以避免 Fork/Join 框架的缺點。

2.3.6 處理阻塞任務

  • 在 ForkJoinPool 中使用阻塞型任務時需要注意以下幾點:

    • 防止線程饑餓
      • 當一個線程在執行一個阻塞型任務時,它將會一直等待任務完成,這時如果沒有其他線程可以竊取任務,那么該線程將一直被阻塞,直到任務完成為止;
      • 為了避免這種情況,應該避免在 ForkJoinPool 中提交大量的阻塞型任務;
    • 使用特定的線程池
      • 為了最大程度地利用 ForkJoinPool 的性能,可以使用專門的線程池來處理阻塞型任務,這些線程不會被 ForkJoinPool 的竊取機制所影響;
      • 例如,可以使用 ThreadPoolExecutor 來創建一個線程池,然后將這個線程池作為 ForkJoinPool 的執行器,這樣就可以使用 ThreadPoolExecutor 來處理阻塞型任務,而使用 ForkJoinPool 來處理非阻塞型任務;
    • 不要阻塞工作線程
      • 如果在 ForkJoinPool 中使用阻塞型任務,那么需要確保這些任務不會阻塞工作線程,否則會導致整個線程池的性能下降;
      • 為了避免這種情況,可以將阻塞型任務提交到一個專門的線程池中,或者使用 CompletableFuture 等異步編程工具來處理阻塞型任務;
  • 下面是一個使用阻塞型任務的例子,這個例子展示了如何使用 CompletableFuture 來處理阻塞型任務:

    public class BlockingTaskDemo {public static void main(String[] args) {// 構建一個forkjoin線程池(使用默認配置,通常為核心數)ForkJoinPool pool = new ForkJoinPool();// 創建一個異步任務,使用CompletableFuture并將其提交到ForkJoinPool中執行CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {// 模擬一個耗時的任務(阻塞5秒鐘)TimeUnit.SECONDS.sleep(5);return "Hello, world!";  // 任務完成后的返回值} catch (InterruptedException e) {e.printStackTrace();return null;  // 發生中斷異常時返回null}}, pool);  // 指定使用ForkJoinPool作為執行器try {// 阻塞等待任務完成,并獲取結果(get()方法會阻塞當前線程直到任務完成)String result = future.get();System.out.println(result);  // 輸出結果:"Hello, world!"} catch (InterruptedException e) {// 處理線程中斷異常e.printStackTrace();} catch (ExecutionException e) {// 處理任務執行過程中拋出的異常e.printStackTrace();} finally {// 關閉ForkJoinPool,釋放線程資源(優雅關閉,等待已提交任務完成)pool.shutdown();}}
    }
    
    • 在這個例子中,使用了 CompletableFuture 來處理阻塞型任務,因為它可以避免阻塞 ForkJoinPool 中的工作線程;
    • 另外,我們也可以使用專門的線程池來處理阻塞型任務,例如 ThreadPoolExecutor 等。不管是哪種方式,都需要避免在 ForkJoinPool 中提交大量的阻塞型任務,以免影響整個線程池的性能。

2.4 ForkJoinPool 工作原理

2.4.1 原理概述

  • 當通過 ForkJoinPool 的 invoke()submit() 方法提交任務時,ForkJoinPool 會依據路由規則將任務提交到某個任務隊列。若任務執行過程中創建子任務,子任務會被提交到工作線程對應的任務隊列;

  • 若工作線程對應的任務隊列空了,ForkJoinPool 支持任務竊取機制,空閑的工作線程可“竊取”其他工作任務隊列里的任務,確保所有工作線程都能有效利用,不閑置;

  • 核心設計

    • ForkJoinPool 內部存儲有 WorkQueue 數組,提交給它的任務會被分配到指定的 WorkQueue 上執行;

    • 每個 WorkQueue 內部維護一個 ForkJoinTask 數組來存儲待執行任務,還配有一個獨立的 ForkJoinWorkerThread 來真正執行任務;

    在這里插入圖片描述

2.4.2 工作線程 ForkJoinWorkerThread

  • ForkJoinWorkerThread 是 ForkJoinPool 里專門執行任務的線程。創建它時,會自動向 ForkJoinPool 注冊一個 WorkQueue,這個 WorkQueue 是該線程專屬的任務存儲隊列,且只能出現在 workqueues[] 的奇數位;

  • ForkJoinWorkerThread 工作線程啟動后,會掃描并 “竊取” 任務執行。另外,當它在 ForkJoinTask#join() 等待返回結果時,若被 ForkJoinPool 線程池發現其任務隊列為空,或者當前任務已執行完畢,也會通過工作竊取算法從其他任務隊列獲取任務,分配到自己的任務隊列中執行;

    在這里插入圖片描述

2.4.3 工作隊列 WorkQueue

  • WorkQueue 一個是雙端隊列,用于存儲工作線程自己的任務。每個工作線程都維護一個本地 WorkQueue,優先執行本地隊列中的任務。當本地隊列任務執行完,工作線程會嘗試從其他線程的 WorkQueue 中竊取任務;

  • WorkQueue 任務隊列分兩種類型:一種是外部提交任務所用隊列,在任務隊列數組中數組下標為偶數;另一種是工作線程私有的任務隊列,存儲大任務 fork 分解出的任務,在任務隊列數組中數組下標為奇數;

  • 下圖展示了 WorkQueue 的操作,如 externalSubmit(task) 外部提交任務、subtask(fork) 子任務提交,還有 pushpop 等操作,以及根據 asynctrue 還是 false,分別對應 FIFO_QUEUE(先進先出隊列)和 LIFO_QUEUE(后進先出隊列)的工作模式;

    在這里插入圖片描述

2.4.4 工作竊取機制

  • ForkJoinPool 和 ThreadPoolExecutor 有很大不同,ForkJoinPool 引入了工作竊取設計,這是其性能保障的關鍵之一;

  • 工作竊取指的是允許空閑線程從繁忙線程的雙端隊列中竊取任務。默認情況下,工作線程從自己雙端隊列的頭部獲取任務;當自己的任務為空時,線程會從其他繁忙線程雙端隊列的尾部獲取任務,這種方式能最大限度減少線程競爭任務的可能性;

  • 下圖展示了主線程提交任務到 ForkJoinPool 后,任務被推入工作隊列(workQueue),ForkJoinWorkerThread - 1ForkJoinWorkerThread - 2 不僅會處理自身隊列中的任務(如通過 fork 分解任務),當自身任務不足時,還會通過“steal 工作竊取”從其他工作隊列獲取任務;

    在這里插入圖片描述

  • 通過工作竊取,Fork/Join 框架能實現任務的自動負載均衡,充分利用多核 CPU 的計算能力,同時避免線程饑餓和延遲問題;

  • 如果想了解更多關于 ForkJoinPool 的內容,可以參考Doug Lea 的論文。

2.5 ForkJoinPool 執行流程

在這里插入圖片描述

  • 應用程序外部向Fork/Join框架提交任務(通過execute/submit/invoke)。此時有兩種處理路徑:

    • 路徑1:直接提交到workQueues。調用 externalPush(task),將任務直接放入工作隊列(workQueues)。若提交成功,會觸發 signalWork(w, q),嘗試用工作線程執行任務;

    • 路徑2:初始化workQueues并提交任務。調用 externalSubmit(task),先初始化工作隊列,再將任務提交到隊列,后續邏輯與“路徑1”類似,最終也會觸發 signalWork(w, q)

  • signalWork(w, q) 執行后,會判斷是否有空閑線程

    • 有空閑線程(Y分支):直接從空閑線程中選一個,執行任務(進入下面的任務執行階段);

    • 無空閑線程(N分支):需要創建新的工作線程,流程如下:

      1. tryAddWorker(c):嘗試添加新工作線程
      2. createWorker():創建工作線程實例
      3. new ForkJoinWorkerThread(pool):實例化ForkJoinWorkerThread
      4. forkJoinWorkerThread.start():啟動工作線程,觸發線程的run()方法
      5. forkJoinWorkerThread.run():線程啟動后,執行run()方法,此時會先執行 registerWorker(this),將新線程注冊到workQueues,完成線程與隊列的綁定;
  • 工作線程就緒后,進入任務執行的核心邏輯:

    1. 任務入隊等待執行。若任務是通過ForkJoinTask.fork()提交的,會調用 workQueue.push(this),將任務放入隊列等待執行;

    2. 工作線程執行任務。調用 workQueue.runTask(),從隊列中取出任務執行。執行時調用 forkJoinTask.doExec(),而doExec()最終會觸發任務的核心邏輯:

      • 若任務是RecursiveTask(有返回值的分治任務),會調用 RecursiveTask.compute()(用戶自定義的“任務拆分與計算邏輯”就在這里實現);
      • 執行的入口也可通過 exec() 方法觸發,最終導向compute()
    3. 任務掃描與等待

      • 若當前隊列“有任務執行”,則持續執行;

      • 若“未掃描到任務”,則調用 scan(w, r) 掃描其他隊列的任務(Fork/Join的工作竊取(Work-Stealing)機制:空閑線程會從其他線程的隊列“偷”任務執行,提升資源利用率);

      • 若掃描后仍無任務,調用 awaitWork(w, r) 讓線程進入“等待任務”狀態;

    4. 持續運行工作線程。整個執行過程由 runWorker(workQueue) 驅動,持續從隊列中獲取并執行任務。

2.6總結

  • Fork/Join是基于分治思想(把大任務拆成小任務,并行執行后合并結果)的編程模型,適合計算密集型任務(大量CPU運算的場景),效率提升源于兩點:

    • 任務切分:將大任務拆成更小的子任務,讓更多線程同時參與計算,充分利用多核CPU的并行能力;

    • 任務竊取:空閑的線程會“竊取”其他線程隊列中未執行的任務,減少線程閑置,同時降低線程間的競爭(因為每個線程優先處理自己隊列的任務,竊取是“補充性”的);

  • 使用ForkJoinPool(Fork/Join框架的線程池)時,要關注任務類型

    • 最適合的是純函數計算型任務:任務不依賴外部狀態、不修改共享變量(無副作用),這樣并行執行時更安全,不會因“狀態不一致”出問題;

    • 若要處理阻塞型任務(如I/O操作、等待鎖),需謹慎評估:雖然ForkJoinPool能處理,但會導致線程被長時間占用(阻塞),破壞“任務竊取”的高效性,增加線程管理成本;

  • 普通線程池(如ThreadPoolExecutor)更適合IO密集型(如網絡請求、文件讀寫)或“短小任務”,而Fork/Join針對大規模并行計算,核心區別有四點:

    • 工作竊取算法

      • ForkJoinPool:線程空閑時,會從其他線程的任務隊列中“偷取”任務執行,最大化線程利用率;
      • 普通線程池:任務存在公共隊列,線程直接從公共隊列取任務,無“竊取”邏輯,空閑線程只能等新任務入隊;
    • 任務的分解與合并

      • ForkJoinPool:支持分治——大任務拆成小任務并行執行,最終合并小任務結果得到大任務結果;
      • 普通線程池:任務是“獨立的”,不支持“自動拆分-合并”,需手動處理任務依賴和結果聚合;
    • 工作線程的數量

      • ForkJoinPool:會自動根據CPU核心數設置工作線程數量,避免線程過多(減少上下文切換開銷),最大化CPU性能;
      • 普通線程池:需手動指定線程數,若設置不合理(如線程過多導致切換頻繁,或過少導致CPU閑置),會影響性能;
    • 任務類型適配

      • ForkJoinPool:適合大規模并行計算任務(如大數據排序、矩陣運算);
      • 普通線程池:適合短小的、IO密集型任務(如Web服務器處理請求、數據庫操作)。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/95280.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/95280.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/95280.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Kafka面試精講 Day 6:Kafka日志存儲結構與索引機制

【Kafka面試精講 Day 6】Kafka日志存儲結構與索引機制 在“Kafka面試精講”系列的第6天&#xff0c;我們將深入剖析 Kafka的日志存儲結構與索引機制。這是Kafka高性能、高吞吐量背后的核心設計之一&#xff0c;也是中高級面試中的高頻考點。面試官常通過這個問題考察候選人是否…

Linux 字符設備驅動框架學習記錄(三)

Linux字符設備驅動開發新框架詳解 一、新舊驅動框架對比 傳統字符設備驅動流程 手動分配設備號 (register_chrdev_region)實現file_operations結構體使用mknod手動創建設備節點 新式驅動框架優勢 自動設備號分配&#xff1a;動態申請避免沖突自動節點創建&#xff1a;通過class…

《計算機網絡安全》實驗報告一 現代網絡安全挑戰 拒絕服務與分布式拒絕服務攻擊的演變與防御策略(1)

目 錄 摘 要 一、研究背景與目的 1.1 介紹拒絕服務&#xff08;DoS&#xff09;和分布式拒絕服務&#xff08;DDoS&#xff09;攻擊的背景 &#xff08;1&#xff09;拒絕服務攻擊&#xff08;DoS&#xff09;  &#xff08;2&#xff09;分布式拒絕服務攻擊&#xff0…

深度學習篇---模型組成部分

模型組成部分&#xff1a;在 PyTorch 框架下進行圖像分類任務時&#xff0c;深度學習代碼通常由幾個核心部分組成。這些部分中有些可以在不同網絡間復用&#xff0c;有些則需要根據具體任務或網絡結構進行修改。下面我將用通俗易懂的方式介紹這些組成部分&#xff1a;1. 數據準…

關于ANDROUD APPIUM安裝細則

1&#xff0c;可以先參考一下連接 PythonAppium自動化完整教程_appium python教程-CSDN博客 2&#xff0c;appium 需要對應的版本的node&#xff0c;可以用nvm對node 進行版本隔離 3&#xff0c;對應需要安裝android stuido 和對應的sdk &#xff0c;按照以上連接進行下載安…

八、算法設計與分析

1 算法設計與分析的基本概念 1.1 算法 定義 &#xff1a;算法是對特定問題求解步驟的一種描述&#xff0c;是有限指令序列&#xff0c;每條指令表示一個或多個操作。特性 &#xff1a; 有窮性&#xff1a;算法需在有限步驟和時間內結束。確定性&#xff1a;指令無歧義&#xff…

機器學習從入門到精通 - 神經網絡入門:從感知機到反向傳播數學揭秘

機器學習從入門到精通 - 神經網絡入門&#xff1a;從感知機到反向傳播數學揭秘開場白&#xff1a;點燃你的好奇心 各位&#xff0c;有沒有覺得那些能識圖、懂人話、下棋碾壓人類的AI特別酷&#xff1f;它們的"大腦"核心&#xff0c;很多時候就是神經網絡&#xff01;…

神經網絡模型介紹

如果你用過人臉識別解鎖手機、刷到過精準推送的短視頻&#xff0c;或是體驗過 AI 聊天機器人&#xff0c;那么你已經在和神經網絡打交道了。作為深度學習的核心技術&#xff0c;神經網絡模仿人腦的信息處理方式&#xff0c;讓機器擁有了 “學習” 的能力。一、什么是神經網絡&a…

蘋果開發中什么是Storyboard?object-c 和swiftui 以及Storyboard到底有什么關系以及邏輯?優雅草卓伊凡

蘋果開發中什么是Storyboard&#xff1f;object-c 和swiftui 以及Storyboard到底有什么關系以及邏輯&#xff1f;優雅草卓伊凡引言由于最近有個客戶咨詢關于 蘋果內購 in-purchase 的問題做了付費咨詢處理&#xff0c;得到問題&#xff1a;“昨天試著把您的那幾部分code 組裝成…

孩子玩手機都近視了,怎樣限制小孩的手機使用時長?

最近兩周&#xff0c;我給孩子檢查作業時發現娃總是把眼睛瞇成一條縫&#xff0c;而且每隔幾分鐘就會用手背揉眼睛&#xff0c;有時候揉得眼圈都紅了。有一次默寫單詞&#xff0c;他把 “太陽” 寫成了 “大陽”&#xff0c;我給他指出來&#xff0c;他卻盯著本子說 “沒有錯”…

醫療AI時代的生物醫學Go編程:高性能計算與精準醫療的案例分析(六)

第五章 案例三:GoEHRStream - 實時電子病歷數據流處理系統 5.1 案例背景與需求分析 5.1.1 電子病歷數據流處理概述 電子健康記錄(Electronic Health Record, EHR)系統是現代醫療信息化的核心,存儲了患者從出生到死亡的完整健康信息,包括 demographics、診斷、用藥、手術、…

GEM5學習(2):運行x86Demo示例

創建腳本 配置腳本內容參考官網的說明gem5: Creating a simple configuration script 首先根據官方說明創建腳本文件 mkdir configs/tutorial/part1/ touch configs/tutorial/part1/simple.py simple.py 中的內容如下&#xff1a; from gem5.prebuilt.demo.x86_demo_board…

通過 FinalShell 訪問服務器并運行 GUI 程序,提示 “Cannot connect to X server“ 的解決方法

FinalShell 是一個 SSH 客戶端&#xff0c;默認情況下 不支持 X11 圖形轉發&#xff08;不像 ssh -X 或 ssh -Y&#xff09;&#xff0c;所以直接運行 GUI 程序&#xff08;如 Qt、GNOME、Matplotlib 等&#xff09;會報錯&#xff1a; Error: Cant open display: Failed to c…

1.人工智能——概述

應用領域 替代低端勞動&#xff0c;解決危險、高體力精力損耗領域 什么是智能制造&#xff1f;數字孿生&#xff1f;邊緣計算&#xff1f; 邊緣計算 是 數字孿生 的 “感官和神經末梢”&#xff0c;負責采集本地實時數據和即時反應。瑣碎數據不上傳總服務器&#xff0c;實時進行…

傳統園區能源轉型破局之道:智慧能源管理系統驅動的“源-網-荷-儲”協同賦能

傳統園區能源結構轉型 政策要求&#xff1a;福建提出2025年可再生能源滲透率≥25%&#xff0c;山東強調“源網荷儲一體化”&#xff0c;安徽要求清潔能源就地消納。系統解決方案&#xff1a;多能協同調控&#xff1a;集成光伏、儲能、充電樁數據&#xff0c;通過AI算法動態優化…

[光學原理與應用-353]:ZEMAX - 設置 - 可視化工具:2D視圖、3D視圖、實體模型三者的區別,以及如何設置光線的數量

在光學設計軟件ZEMAX中&#xff0c;2D視圖、3D視圖和實體模型是三種不同的可視化工具&#xff0c;分別用于從不同維度展示光學系統的結構、布局和物理特性。它們的核心區別體現在維度、功能、應用場景及信息呈現方式上&#xff0c;以下是詳細對比&#xff1a;一、維度與信息呈現…

《sklearn機器學習》——交叉驗證迭代器

sklearn 交叉驗證迭代器 在 scikit-learn (sklearn) 中&#xff0c;交叉驗證迭代器&#xff08;Cross-Validation Iterators&#xff09;是一組用于生成訓練集和驗證集索引的工具。它們是 model_selection 模塊的核心組件&#xff0c;決定了數據如何被分割&#xff0c;從而支持…

Trae+Chrome MCP Server 讓AI接管你的瀏覽器

一、核心優勢1、無縫集成現有瀏覽器環境直接復用用戶已打開的 Chrome 瀏覽器&#xff0c;保留所有登錄狀態、書簽、擴展及歷史記錄&#xff0c;無需重新登錄或配置環境。對比傳統工具&#xff08;如 Playwright&#xff09;需獨立啟動瀏覽器進程且無法保留用戶環境&#xff0c;…

Shell 編程 —— 正則表達式與文本處理器

目錄 一. 正則表達式 1.1 定義 1.2 用途 1.3 Linux 正則表達式分類 1.4 正則表達式組成 &#xff08;1&#xff09;普通字符 &#xff08;2&#xff09;元字符&#xff1a;規則的核心載體 &#xff08;3&#xff09; 重復次數 &#xff08;4&#xff09;兩類正則的核心…

Springboot 監控篇

在 Spring Boot 中實現 JVM 在線監控&#xff08;包括線程曲線、內存使用、GC 情況等&#xff09;&#xff0c;最常用的方案是結合 Spring Boot Actuator Micrometer 監控可視化工具&#xff08;如 Grafana、Prometheus&#xff09;。以下是完整實現方案&#xff1a; 一、核…