fork join框架是java 7中引入框架,這個框架的引入主要是為了提升并行計算的能力。
fork join主要有兩個步驟,第一就是fork,將一個大任務分成很多個小任務,第二就是join,將第一個任務的結果join起來,生成最后的結果。如果第一步中并沒有任何返回值,join將會等到所有的小任務都結束。
還記得之前的文章我們講到了thread pool的基本結構嗎?
ExecutorService - ForkJoinPool 用來調用任務執行。
workerThread - ForkJoinWorkerThread 工作線程,用來執行具體的任務。
task - ForkJoinTask 用來定義要執行的任務。
下面我們從這三個方面來詳細講解fork join框架。
ForkJoinPool
ForkJoinPool是一個ExecutorService的一個實現,它提供了對工作線程和線程池的一些便利管理方法。
public class ForkJoinPool extends AbstractExecutorService
一個work thread一次只能處理一個任務,但是ForkJoinPool并不會為每個任務都創建一個單獨的線程,它會使用一個特殊的數據結構double-ended queue來存儲任務。這樣的結構可以方便的進行工作竊取(work-stealing)。
什么是work-stealing呢?
默認情況下,work thread從分配給自己的那個隊列頭中取出任務。如果這個隊列是空的,那么這個work thread會從其他的任務隊列尾部取出任務來執行,或者從全局隊列中取出。這樣的設計可以充分利用work thread的性能,提升并發能力。
下面看下怎么創建一個ForkJoinPool。
最常見的方法就是使用ForkJoinPool.commonPool()來創建,commonPool()為所有的ForkJoinTask提供了一個公共默認的線程池。
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
另外一種方式是使用構造函數:
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
這里的參數是并行級別,2指的是線程池將會使用2個處理器核心。
ForkJoinWorkerThread
ForkJoinWorkerThread是使用在ForkJoinPool的工作線程。
public class ForkJoinWorkerThread extends Thread
}
和一般的線程不一樣的是它定義了兩個變量:
final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
一個是該worker thread所屬的ForkJoinPool。 另外一個是支持 work-stealing機制的Queue。
再看一下它的run方法:
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
簡單點講就是從Queue中取出任務執行。
ForkJoinTask
ForkJoinTask是ForkJoinPool中運行的任務類型。通常我們會用到它的兩個子類:RecursiveAction和RecursiveTask。
他們都定義了一個需要實現的compute()方法用來實現具體的業務邏輯。不同的是RecursiveAction只是用來執行任務,而RecursiveTask可以有返回值。
既然兩個類都帶了Recursive,那么具體的實現邏輯也會跟遞歸有關,我們舉個使用RecursiveAction來打印字符串的例子:
public class CustomRecursiveAction extends RecursiveAction {
private String workload = "";
private static final int THRESHOLD = 4;
private static Logger logger =
Logger.getAnonymousLogger();
public CustomRecursiveAction(String workload) {
this.workload = workload;
}
@Override
protected void compute() {
if (workload.length() > THRESHOLD) {
ForkJoinTask.invokeAll(createSubtasks());
} else {
processing(workload);
}
}
private List createSubtasks() {
List subtasks = new ArrayList<>();
String partOne = workload.substring(0, workload.length() / 2);
String partTwo = workload.substring(workload.length() / 2, workload.length());
subtasks.add(new CustomRecursiveAction(partOne));
subtasks.add(new CustomRecursiveAction(partTwo));
return subtasks;
}
private void processing(String work) {
String result = work.toUpperCase();
logger.info("This result - (" + result + ") - was processed by "
+ Thread.currentThread().getName());
}
}
上面的例子使用了二分法來打印字符串。
我們再看一個RecursiveTask的例子:
public class CustomRecursiveTask extends RecursiveTask {
private int[] arr;
private static final int THRESHOLD = 20;
public CustomRecursiveTask(int[] arr) {
this.arr = arr;
}
@Override
protected Integer compute() {
if (arr.length > THRESHOLD) {
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.mapToInt(ForkJoinTask::join)
.sum();
} else {
return processing(arr);
}
}
private Collection createSubtasks() {
List dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, 0, arr.length / 2)));
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
return dividedTasks;
}
private Integer processing(int[] arr) {
return Arrays.stream(arr)
.filter(a -> a > 10 && a < 27)
.map(a -> a * 10)
.sum();
}
}
和上面的例子很像,不過這里我們需要有返回值。
在ForkJoinPool中提交Task
有了上面的兩個任務,我們就可以在ForkJoinPool中提交了:
int[] intArray= {12,12,13,14,15};
CustomRecursiveTask customRecursiveTask= new CustomRecursiveTask(intArray);
int result = forkJoinPool.invoke(customRecursiveTask);
System.out.println(result);
上面的例子中,我們使用invoke來提交,invoke將會等待任務的執行結果。
如果不使用invoke,我們也可以將其替換成fork()和join():
customRecursiveTask.fork();
int result2= customRecursiveTask.join();
System.out.println(result2);
fork() 是將任務提交給pool,但是并不觸發執行, join()將會真正的執行并且得到返回結果。
本文的例子可以參考https://github.com/ddean2009/learn-java-concurrency/tree/master/forkjoin
到此這篇關于java中的fork join框架的使用的文章就介紹到這了,更多相關java fork join框架內容請搜索云海天教程以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持云海天教程!