? ??
? ? ?在JAVA7之前,并行處理數據非常麻煩。第一,你得明確把包含數據的數據結構分成若干份。第二,你要將每個子部分分配給一個獨立的線程。第三,你要在恰當的時候對它們進行同步避免不希望的競爭條件,等待所有線程完成,最后把這些部分結果合并起來。在Java 7引入了分支/合并框架,讓這些操作更穩定、更不容易出錯。
? ? ?分支/合并框架的目的是以遞歸的方式將可以并行的任務拆分為更小的任務,然后將每個子任務的結果合并起來生成整體結果。要把子任務提交到ForkJoinPool必須創建RecursiveTask<R>的子類。需要實現它唯一的抽象方法 protected abstract R compute(); ?在這個方法中定義了將任務拆分成子任務的邏輯,以及無法拆分時生成單個子任務結果的邏輯。
?
?
? 計算1到10000000的和
?
/*** Desc:Fork/Join框架的目的是以遞歸方式將可以并行的任務拆分為更小的任務,然后將每個子任務的結果合并起來生成一個整體結果。* 要把任務提交到ForkJoinPool必須創建RecursiveTask<T> 的一個子類* * @author wei.zw* @since 2016年7月6日 下午9:27:56* @version v 0.1*/
public class ForkJoinSumCalculator extends RecursiveTask<Long> {/** */private static final long serialVersionUID = -8013303660374621470L;private final long[] numbers;private final int start;private final int end;private static final long THRESHOLD = 1000;/*** @param numbers* @param start* @param end*/public ForkJoinSumCalculator(long[] numbers, int start, int end) {super();this.numbers = numbers;this.start = start;this.end = end;}/*** @param numbers*/public ForkJoinSumCalculator(long[] numbers) {super();this.numbers = numbers;this.start = 0;this.end = numbers.length;}/*** @see java.util.concurrent.RecursiveTask#compute()*/@Overrideprotected Long compute() {int length = end - start;if (length <= THRESHOLD) {return computeSequentially();}ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);leftTask.fork();ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);rightTask.fork();Long rightResult = 0L;try {rightResult = rightTask.get();} catch (Exception e) {}Long leftResult = leftTask.join();return leftResult + rightResult;}/*** * @return* @author wei.zw*/private Long computeSequentially() {long sum = 0;for (int i = start; i < end; i++) {sum += numbers[i];}return sum;}public static void main(String[] args) {long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();long start = System.currentTimeMillis();System.out.println(new ForkJoinPool().invoke(new ForkJoinSumCalculator(numbers)) + " 耗時:"+ (System.currentTimeMillis() - start));}}
?結果是:50000005000000 耗時:37
?
優化后的
/*** @see java.util.concurrent.RecursiveTask#compute()*/@Overrideprotected Long compute() {int length = end - start;if (length <= THRESHOLD) {return computeSequentially();}ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);leftTask.fork();ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);Long rightResult = rightTask.compute();Long leftResult = leftTask.join();return leftResult + rightResult;}
?計算結果是:50000005000000 耗時:25
?
使用Fork/Join框架的最佳做法:
- 對一個任務調用join方法會阻塞調用方,直到該任務作出結果。因此,又必須要在兩個子任務的計算都開始之后再調用它。
- 不應該在RecursiveTask內部使用ForkJoinPool的invoke方法,應該直接調用compute或者fork方法
- 對子任務調用fork方法可以將這個子任務排進ForkJoinPool。同時對左右兩邊的子任務都調用似乎很自然,但是這樣做的效率比直接對其中一個調用compute方法低。這樣做可以為其中一個子任務重用同一線程,從而避免在線程池中多分配一個任務造成的開銷。
看完了基本示例,在分析一下源碼;首先看一下RecursiveTask,通過名稱可以知道這是一個遞歸Task.源碼很簡單
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {private static final long serialVersionUID = 5232453952276485270L;//計算結果V result;//抽象的計算方法protected abstract V compute();//獲取計算結果public final V getRawResult() {return result;}//設置計算結果protected final void setRawResult(V value) {result = value;}//執行計算protected final boolean exec() {result = compute();return true;}}
RecursiveTask源碼看完以后,繼續分析ForkJoinTask
??
?