14、ForkJoin
ForkJoin
框架是Java中用于并行執行任務的框架,特別適合處理可以分解為多個子任務的復雜計算。它基于“分而治之”的思想,將一個大任務分解為多個小任務,這些小任務可以并行執行,最后將結果合并。
ForkJoin框架的核心組件
-
ForkJoinPool
:線程池,用于管理線程和任務的執行。 -
RecursiveTask
:有返回值的任務。 -
RecursiveAction
:無返回值的任務。
工作原理
-
任務分解:將大任務分解為多個小任務。
-
并行執行:小任務在
ForkJoinPool
中并行執行。 -
任務合并:將小任務的結果合并為最終結果。
-
工作竊取:線程池中的線程如果完成自己的任務,可以“竊取”其他線程的任務來執行,提高資源利用率。
代碼示例:計算斐波那契數列
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;public class FibonacciExample {// 定義一個閾值,當任務大小小于這個值時,直接計算而不是進一步分解private static final int THRESHOLD = 10;public static void main(String[] args) {// 創建ForkJoinPoolForkJoinPool forkJoinPool = new ForkJoinPool();// 提交任務FibonacciTask task = new FibonacciTask(30);int result = forkJoinPool.invoke(task);// 輸出結果System.out.println("Fibonacci(30) = " + result);}// 定義RecursiveTaskstatic class FibonacciTask extends RecursiveTask<Integer> {private final int n;public FibonacciTask(int n) {this.n = n;}@Overrideprotected Integer compute() {// 如果任務大小小于閾值,直接計算if (n <= THRESHOLD) {return computeFibonacci(n);} else {// 分解任務FibonacciTask leftTask = new FibonacciTask(n - 1);FibonacciTask rightTask = new FibonacciTask(n - 2);// 異步執行一個子任務leftTask.fork();// 執行另一個子任務int rightResult = rightTask.compute();// 等待第一個子任務完成并獲取結果int leftResult = leftTask.join();// 合并結果return leftResult + rightResult;}}// 計算斐波那契數列的值private int computeFibonacci(int n) {if (n <= 1) {return n;}return computeFibonacci(n - 1) + computeFibonacci(n - 2);}}
}
代碼解析
-
ForkJoinPool
:-
ForkJoinPool
是ForkJoin
框架的核心,用于管理線程和任務的執行。在上面的代碼中,我們創建了一個ForkJoinPool
實例。 -
使用
invoke
方法提交任務并獲取結果。
-
-
RecursiveTask
:-
RecursiveTask
是一個有返回值的任務類。在上面的代碼中,我們定義了一個FibonacciTask
類,繼承自RecursiveTask<Integer>
。 -
在
compute
方法中,我們實現了任務的分解和結果的合并。
-
-
任務分解:
-
如果任務大小小于閾值(
THRESHOLD
),直接計算結果。 -
如果任務大小大于閾值,將任務分解為兩個子任務:
F(n - 1)
和F(n - 2)
。 -
使用
fork
方法異步執行一個子任務,使用compute
方法同步執行另一個子任務。 -
使用
join
方法等待異步執行的子任務完成并獲取結果。
-
-
結果合并:
-
將兩個子任務的結果相加,得到最終結果。
-
下面詳細闡述ForkJoin
. ForkJoinPool
類
ForkJoinPool
是ForkJoin
框架的核心,用于管理線程和任務的執行。
構造方法
-
ForkJoinPool()
:創建一個默認的ForkJoinPool
,其線程數量等于可用的處理器數量。
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinPool(int parallelism)
:創建一個具有指定并行級別的ForkJoinPool
。
ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 指定并行級別為4
常用方法
invoke(ForkJoinTask<?> task)
:執行給定的任務,等待其完成,并返回結果。
int result = forkJoinPool.invoke(new FibonacciTask(30));
execute(ForkJoinTask<?> task)
:異步執行給定的任務,不等待其完成。
forkJoinPool.execute(new FibonacciTask(30));
submit(ForkJoinTask<?> task)
:異步執行給定的任務,并返回一個ForkJoinTask
對象,可以用來獲取結果。
ForkJoinTask<Integer> task = forkJoinPool.submit(new FibonacciTask(30));
int result = task.get(); // 獲取結果
shutdown()
:關閉線程池,不再接受新任務,但會等待已提交的任務完成。
forkJoinPool.shutdown();
shutdownNow()
:立即關閉線程池,嘗試取消所有未完成的任務。
forkJoinPool.shutdownNow();
RecursiveTask
類
構造方法
RecursiveTask()
:無參構造方法,通常在子類中使用。
public class FibonacciTask extends RecursiveTask<Integer> {private final int n;public FibonacciTask(int n) {this.n = n;}
}
常用方法
compute()
:任務的主體,用于執行任務的邏輯。可以在這個方法中分解任務并調用fork
和join
方法。
@Override
protected Integer compute() {if (n <= THRESHOLD) {return computeFibonacci(n);} else {FibonacciTask leftTask = new FibonacciTask(n - 1);FibonacciTask rightTask = new FibonacciTask(n - 2);leftTask.fork();int rightResult = rightTask.compute();int leftResult = leftTask.join();return leftResult + rightResult;}
}
fork()
:異步執行當前任務,返回一個ForkJoinTask
對象。
leftTask.fork();
join()
:等待任務完成并獲取結果。如果任務已經完成,直接返回結果。
RecursiveAction
類
RecursiveAction
是一個無返回值的任務類,用于表示可以分解為多個子任務的任務。
構造方法
RecursiveAction()
:無參構造方法,通常在子類中使用。
public class SortTask extends RecursiveAction {private final int[] array;private final int start;private final int end;public SortTask(int[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}
}
常用方法
compute()
:任務的主體,用于執行任務的邏輯。可以在這個方法中分解任務并調用fork
和join
方法。
@Override
protected void compute() {if (end - start < THRESHOLD) {Arrays.sort(array, start, end);} else {int middle = (start + end) / 2;SortTask leftTask = new SortTask(array, start, middle);SortTask rightTask = new SortTask(array, middle, end);leftTask.fork();rightTask.compute();leftTask.join();}
}
fork()
:異步執行當前任務。
leftTask.fork();
join()
:等待任務完成。由于RecursiveAction
沒有返回值,join
方法不返回任何結果。
leftTask.join();
總結
ForkJoin
框架提供了多種方法來支持任務的分解、執行和結果合并。通過合理使用這些方法,可以高效地處理復雜的并行計算任務。以下是一些關鍵點:
-
ForkJoinPool
:用于管理線程和任務的執行。 -
RecursiveTask
:有返回值的任務類,適用于需要返回結果的任務。 -
RecursiveAction
:無返回值的任務類,適用于不需要返回結果的任務。 -
compute()
:任務的主體,用于實現任務的邏輯。 -
fork()
:異步執行任務。 -
join()
:等待任務完成并獲取結果(對于RecursiveTask
)或等待任務完成(對于RecursiveAction
)。