你不會忘記你的根源

執行程序是具有單個執行方法的根接口。 任何實現Runnable接口的東西都可以作為參數傳遞。 但是,傻傻的執行器不支持Callable。

好消息: ExecutorService接口擴展了Executor,增加了對Callable的支持。 它的實現類是ThreadPoolExecutor。
我要假裝ScheduledExecutorService接口及其實現類ScheduledThreadPoolExecutor不存在,因為它們只是在ExecutorService和ThreadPoolExecutor之上添加了調度功能。 但是,當功能強大但無聊的java.util.Timer
不夠用,而功能強大的外部調度程序實在太多了時,請記住此類。
如果您不熟悉并發性,或者忘記了Callable和Runnable之間的區別,則可能需要先閱讀一點,然后再繼續閱讀。 虛擬指南在這里
ExecutorService.submit事實:
這三個提交變量:
將來提交(可調用任務)
將來提交(可運行任務)
將來提交(可運行任務,T結果)
- 所述
submit
的的ExecutorService的方法過載并且可以接受一個Callable
或Runnable
。 - 由于Runnable的
run
方法返回void,因此在任務完成時Future.get
總是返回null也就Future.get
了。Future<?> submit(Runnable task)
- 另一個接受
Runnable
和泛型的重載submit
方法將返回您作為第二個參數傳入的結果。<T> Future<T> submit(Runnable task, T result)
事實上,開放代碼( FutureTask
),你會發現, RunnableAdapter
頂級嵌套類的Executors
只需保存結果,并返回相同的結果run方法完成之后。
static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;}public T [More ...] call() {task.run();return result;}
}
RunnableAdapter源
在這兩種情況下,如果您想要(應該!)終止程序,而不是執行程序線程阻止該程序并進入繁忙循環 ,則應按以下方式調用shutdown方法:
executorService.shutdown()
關閉事實
您可以想象shutdown
是購物中心的半關門。 不會有新客戶進入,但現有客戶一旦完成就可以離開購物中心。
重申一下,
-
shutdown
是一種禮貌的方法。 它實際上并不會立即關閉池中的任務。 它只是說不會接受任何新任務。 - 除非您使用
invokeAll
執行任務,否則需要等待所有正在進行的任務完成。 這可以通過調用awaitTermination
方法來實現。 (invokeAll并在帖子底部提交示例) - 當前所有任務完成后,執行程序服務將關閉。
如果您需要一種不禮貌的侵入方法,而該方法不關心當前線程是否已完成其任務,那么shutdownNow是您的理想選擇。 但是,不能保證該方法將關閉點上的服務,但這是您必須立即關閉的最接近的方法。
在awaitTermination上,您可以指定超時時間,直到主線程等待池線程完成其任務為止。
ExecutorService executorService=Executors.newFixedThreadPool(10);…future = executorService.submit(getInstanceOfCallable(count,sum));…executorService.shutdown();if (executorService.awaitTermination(10, TimeUnit.SECONDS)){System.out.println('All threads done with their jobs');}
執行者–工廠的家伙

上面的課程都很棒。 但是,例如,您想創建一個單線程執行器,您將編寫類似
new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
比較一下
Executors.newSingleThreadExecutor()
所以,你去。 Executors
是僅具有工廠方法的類,用于使用各種常用默認值創建各種形式的executor服務。 請注意,除了很棒的工廠方法外,它沒有為表帶來任何新功能。
建議您快速查看工廠方法的實現,并檢查它是否適合您的需求。
invokeAll和提交
ExecutorService
的invokeAll
方法的All
部分毫不奇怪。 它只是說您需要傳遞Callable
的Collection。 再次,正如預期的那樣,該方法直到所有線程完成其任務后才返回。 因此,對于僅在所有工作完成后才對結果感興趣的情況, invokeAll
是您的最佳選擇。
另一方面, submit
方法在可調用對象被提交給執行者服務之后立即返回。 除非您在Callable
call
方法中什么都不做,否則理想情況下,當submit
方法返回時,工作線程應該正在運行。
以下示例可能對您有用。 這些程序只是嘗試找到所有自然數的和,直到100(當然是蠻力)
package me.rerun.incubator;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;public class ExecutorInvokeAll {public void runApp() throws InterruptedException, ExecutionException{//variable to store the sumAtomicInteger sum=new AtomicInteger();//Use our friendly neighbourhood factory method of the Executors.ExecutorService executorService=Executors.newFixedThreadPool(10);List<Callable<AtomicInteger>> callableList=new ArrayList<Callable<AtomicInteger>>();for (int count = 0; count <= 100;count++) {callableList.add(getInstanceOfCallable(count,sum));}//returns only after all tasks are completeList<Future<AtomicInteger>> resultFuture = executorService.invokeAll(callableList);//Prints 5050 all throughfor (Future<AtomicInteger> future : resultFuture) {//Didn't deliberately put a timeout here for the get method. Remember, the invoke All does not return until the task is done.System.out.println("Status of future : " + future.isDone() +". Result of future : "+future.get().get());}executorService.shutdown();// You might as well call a resultFuture.get(0).get().get() and that would give you the same //result since all your worker threads hold reference to the same atomicinteger sum.System.out.println("Final Sum : "+sum); }//Adds count to the sum and returns the reference of the sum as the resultprivate Callable<AtomicInteger> getInstanceOfCallable(final int count, final AtomicInteger sum) {Callable<AtomicInteger> clientPlanCall=new Callable<AtomicInteger>(){public AtomicInteger call() {sum.addAndGet(count);System.out.println("Intermediate sum :"+sum);return sum;}};return clientPlanCall;}public static void main(String[] args) throws ExecutionException {try {new ExecutorInvokeAll().runApp();} catch (InterruptedException e) {e.printStackTrace();}} }
package me.rerun.incubator;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;public class ExecutorSubmit {public void runApp() throws InterruptedException, ExecutionException{//holder for the total sumAtomicInteger sum=new AtomicInteger();//Use the factory method of ExecutorsExecutorService executorService=Executors.newFixedThreadPool(10);Future<AtomicInteger> future = null;for (int count = 0; count <= 100; count++) {future = executorService.submit(getInstanceOfCallable(count,sum));//prints intermediate sumtry {System.out.println("Status of future : " + future.isDone() +". Result of future : "+future.get(1000, TimeUnit.MILLISECONDS).get());} catch (TimeoutException e) {System.out.println("<IGNORE> Timeout exception for count : "+count);//e.printStackTrace();}//System.out.println("Result of future : "+future.get().get() +".Status of future : " + future.isDone());}executorService.shutdown();if (executorService.awaitTermination(10, TimeUnit.SECONDS)){System.out.println("All threads done with their jobs");}//execSystem.out.println("Final Sum : "+sum);}//Adds count to the sum and returns the reference of the sum as the resultprivate Callable<AtomicInteger> getInstanceOfCallable(final int count, final AtomicInteger sum) {Callable<AtomicInteger> clientPlanCall=new Callable<AtomicInteger>(){public AtomicInteger call() {sum.addAndGet(count);//System.out.println("Intermediate sum :"+sum);return sum;}};return clientPlanCall;}public static void main(String[] args) throws ExecutionException {try {new ExecutorSubmit().runApp();} catch (InterruptedException e) {e.printStackTrace();}} }
進一步閱讀:
亞歷克斯·米勒的驚人博客
亞歷克斯·米勒的并發陷阱
Vogella關于與原始API進行比較的文章
總體上很好地介紹了并發
強烈推薦有關Java并發性的書
祝您編程愉快,別忘了分享!
參考: Rerun.me博客上的JCG合作伙伴 Arun Manivannan的Java并發執行器懶惰開發人員簡介 。
翻譯自: https://www.javacodegeeks.com/2012/10/a-lazy-developers-introduction-to-java.html