一、Exectuor框架簡介?? ? ?
?Java從1.5版本開始,為簡化多線程并發編程,引入全新的并發編程包:java.util.concurrent及其并發編程框架(Executor框架)。?Executor框架是指java 5中引入的一系列并發庫中與executor相關的一些功能類,其中包括線程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。他們的關系為? ?
??
?
?
在Executor框架中,使用執行器(Exectuor)來管理Thread對象,從而簡化了并發編程。
?
二、認識Exectuor(執行器)
1、并發編程的一種編程方式是把任務拆分為一系列的小任務,即Runnable,然后將這些任務提交給一個Executor執行,Executor.execute(Runnalbe)?。Executor在執行時使用其內部的線程池來完成操作。
? ? ? Executor的子接口有:ExecutorService,ScheduledExecutorService,已知實現類:AbstractExecutorService,ScheduledThreadPoolExecutor,ThreadPoolExecutor。
?
2、Executor屬于public類型的接口。可以用于提交,管理或者執行Runnable任務。實現Executor接口的class還可以控制Runnable任務執行線程的具體細節。包括線程使用的細節、調度等。一般來說,Runnable任務開辟在新線程中的使用方法為:new Thread(new RunnableTask())).start()
?
3、但在Executor中,可以使用Executor而不用顯示地創建線程。例如,可以使用以下方法創建線程,而不是像第2點中為一種任務中的每個任務都調用new Thread(...)的方法。
?
- Exectuor?executor?=?anExecutor();??
- executor.execute(new?RunnableTask());?//?異步執行??
- executor.execute(new?RunnableTask());??
?
?
?
4、但是,Executor接口并沒有嚴格地要求執行必須是異步/同步的,一切都相當自由。在最簡單的情況下,執行程序可以在調用者的線程中立即運行已提交的任務,
?
- class?DirectExecutor?implements?Executor?{????????
- ???????public?void?execute(Runnable?r)?{????????????
- ??????????????r.run();??????
- ???????}????
- }??
?更常見的是,任務在某個不是調用者線程的線程中執行的。如在另一個線程中啟動:
?
- class?ThreadPerTaskExecutor?implements?Executor?{????????
- ???????????public?void?execute(Runnable?r)?{????????????
- ??????????????new?Thread(r).start();????????
- ????????????}????
- }??
?
?
?也可以在實現中用另一個Executor來序列化執行過程:
?
- class?SerialExecutor?implements?Executor?{????
- ????final?Queue<Runnable>?tasks?=?new?ArrayDeque<Runnable>();????
- ????final?Executor?executor;????
- ????Runnable?active;????
- ????
- ????SerialExecutor(Executor?executor)?{????
- ????????this.executor?=?executor;????
- ????}????
- ????
- ????public?synchronized?void?execute(final?Runnable?r)?{????
- ????????tasks.offer(new?Runnable()?{????
- ????????????public?void?run()?{????
- ????????????????try?{????
- ????????????????????r.run();????
- ????????????????}?finally?{????
- ????????????????????scheduleNext();????
- ????????????????}????
- ????????????}????
- ????????});????
- ????????if?(active?==?null)?{????
- ????????????scheduleNext();????
- ????????}????
- ????}????
- ????
- ????protected?synchronized?void?scheduleNext()?{????
- ????????if?((active?=?tasks.poll())?!=?null)?{????
- ????????????executor.execute(active);????
- ????????}????
- ????}????
- }????
?
?
?
?
?5、ThreadPoolExecutor類提供了一個可供可擴展的線程池實現。Executors類為Executor接口及其實現提供了便捷的工廠方法。
?
6、 Executor中的方法execute。void execute(Runnable command)表示在未來的某個時間執行給定的命令。該命令可能在新的線程、已經入池的線程或者正在調用的線程中執行。
?
三、Executors類: 主要用于提供線程池相關的操作
Executors類,提供了一系列工廠方法用于創建線程池,返回的線程池都實現了ExecutorService接口。
?1、public static ExecutorService newFiexedThreadPool(int Threads)?創建固定數目線程的線程池。
?
2、public static ExecutorService newCachedThreadPool():創建一個可緩存的線程池,調用execute 將重用以前構造的線程(如果線程可用)。如果沒有可用的線程,則創建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。
?
3、public static ExecutorService newSingleThreadExecutor():創建一個單線程化的Executor。
?
4、public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
創建一個支持定時及周期性的任務執行的線程池,多數情況下可用來替代Timer類。
?
?四、ExecutorService與生命周期
?
1、ExecutorService可以理解為程序員提供了一堆操作Executor的API
?
2、ExecutorService擴展了Executor并添加了一些生命周期管理的方法。一個Executor的生命周期有三種狀態
運行、關閉和終止。
? ? ?Executor創建時處于運行狀態。當調用ExecutorService.shutdown()后,處于關閉狀態,isShutdown()方法返回true。這時,不應該再向Executor中添加任務,所有已添加的任務執行完畢后,Executor處于終止狀態,isTerminated()返回true。如果Executor處于關閉狀態,往Executor提交任務會拋出unchecked exception RejectedExecutionException。
?
3、本質
? ? 接口ExecutorService 表述了異步執行的機制,并且可以讓任務在后臺執行。一個ExecutorService 實例因此特別像一個線程池。事實上,在 java.util.concurrent 包中的 ExecutorService 的實現就是一個線程池的實現。
?
- ExecutorService?executorService?=?Executors.newFixedThreadPool(10);??
- ???
- executorService.execute(new?Runnable()?{??
- ????public?void?run()?{??
- ????????System.out.println("Asynchronous?task");??
- ????}??
- });??
- ???
- executorService.shutdown();??
?
?
? ?該示例代碼首先使用 newFixedThreadPool() 工廠方法創建一個ExecutorService ,上述代碼創建了一個可以容納10個線程任務的線程池。其次,向 execute() 方法中傳遞一個異步的 Runnable 接口的實現,這樣做會讓 ExecutorService 中的某個線程執行這個Runnable 線程。
?
4、任務的委托
下方展示了一個線程的把任務委托異步執行的ExecutorService的示意圖。
?一旦線程把任務委托給 ExecutorService,該線程就會繼續執行與運行任務無關的其它任務。
?
5、ExecutorService 的實現
由于 ExecutorService 只是一個接口,ExecutorService 接口在 java.util.concurrent 包中有如下實現類:
- ThreadPoolExecutor
-
ScheduledThreadPoolExecutor
6、ExecutorService 使用方法
這里有幾種不同的方式讓你將任務委托給一個ExecutorService:
?
- execute(Runnable)??
- submit(Runnable)??
- submit(Callable)??
- invokeAny()??
- invokeAll()??
?
?
7、execute(Runnable)
方法 execute(Runnable) 接收一個java.lang.Runnable 對象作為參數,并且以異步的方式執行它。如下是一個使用 ExecutorService 執行 Runnable 的例子:
?
- ExecutorService?executorService?=?Executors.newSingleThreadExecutor();??
- ???
- executorService.execute(new?Runnable()?{??
- ????public?void?run()?{??
- ????????System.out.println("Asynchronous?task");??
- ????}??
- });??
- ???????
- executorService.shutdown();??
使用這種方式沒有辦法獲取執行 Runnable 之后的結果,如果你希望獲取運行之后的返回值,就必須使用接收 Callable 參數的 execute() 方法。接下來會提到。
?
?
8、submit(Runnable)
方法 submit(Runnable) 同樣接收一個Runnable 的實現作為參數,但是會返回一個Future 對象。這個Future 對象可以用于判斷 Runnable 是否結束執行。如下是一個ExecutorService 的 submit() 方法的例子:
?
- Future?future?=?executorService.submit(new?Runnable()?{??
- ????public?void?run()?{??
- ????????System.out.println("Asynchronous?task");??
- ????}??
- });??
- //如果任務結束執行則返回?null??
- System.out.println("future.get()="?+?future.get());??
?
?
9、submit(Callable)
方法 submit(Callable) 和方法 submit(Runnable) 比較類似,但是區別則在于它們接收不同的參數類型。Callable 的實例與 Runnable 的實例很類似,但是 Callable 的 call() 方法可以返回一個結果。方法 Runnable.run() 則不能返回結果。
Callable 的返回值可以從方法 submit(Callable) 返回的 Future 對象中獲取。如下是一個 ExecutorService Callable 的樣例:
?
- Future?future?=?executorService.submit(new?Callable(){??
- ????public?Object?call()?throws?Exception?{??
- ????????System.out.println("Asynchronous?Callable");??
- ????????return?"Callable?Result";??
- ????}??
- });??
- ???
- System.out.println("future.get()?=?"?+?future.get());??
?上述樣例代碼會輸出如下結果:
?
- Asynchronous?Callable??
- future.get()?=?Callable?Result??
?
10、inVokeAny()
方法 invokeAny() 接收一個包含 Callable 對象的集合作為參數。調用該方法不會返回 Future 對象,而是返回集合中某一個Callable 對象的結果,而且無法保證調用之后返回的結果是哪一個 Callable,只知道它是這些 Callable 中一個執行結束的 Callable 對象。如果一個任務運行完畢或者拋出異常,方法會取消其它的 Callable 的執行。
以下是一個樣例:
- ExecutorService?executorService?=?Executors.newSingleThreadExecutor();??
- ???
- Set<Callable<String>>?callables?=?new?HashSet<Callable<String>>();??
- ???
- callables.add(new?Callable<String>()?{??
- ????public?String?call()?throws?Exception?{??
- ????????return?"Task?1";??
- ????}??
- });??
- callables.add(new?Callable<String>()?{??
- ????public?String?call()?throws?Exception?{??
- ????????return?"Task?2";??
- ????}??
- });??
- callables.add(new?Callable<String>()?{??
- ????public?String?call()?throws?Exception?{??
- ????????return?"Task?3";??
- ????}??
- });??
- ???
- String?result?=?executorService.invokeAny(callables);??
- ???
- System.out.println("result?=?"?+?result);??
- ???
- executorService.shutdown();??
?以上樣例代碼會打印出在給定的集合中的某一個Callable 的返回結果。嘗試運行后發現每次結果都在改變。有時候返回結果是"Task 1",有時候是"Task 2",等等。
?
11、invokeAll()
方法 invokeAll() 會調用存在于參數集合中的所有 Callable 對象,并且返回一個包含 Future 對象的集合,你可以通過這個返回的集合來管理每個 Callable 的執行結果。需要注意的是,任務有可能因為異常而導致運行結束,所以它可能并不是真的成功運行了。但是我們沒有辦法通過 Future 對象來了解到這個差異。
12、ExecutorService服務的關閉
? ? ? 當使用 ExecutorService 完畢之后,我們應該關閉它,這樣才能保證線程不會繼續保持運行狀態。?
? ? ? 舉例來說,如果你的程序通過 main() 方法啟動,并且主線程退出了你的程序,如果還有一個活動的 ExecutorService 存在于程序中,那么程序將會繼續保持運行狀態。存在于 ExecutorService 中的活動線程會阻止Java虛擬機關閉。?
? ? ? 為了關閉在 ExecutorService 中的線程,需要調用 shutdown() 方法。但ExecutorService 并不會馬上關閉,而是不再接收新的任務,一旦所有的線程結束執行當前任務,ExecutorServie 才會真的關閉。所有在調用 shutdown() 方法之前提交到 ExecutorService 的任務都會執行。?
? ? ?如果你希望立即關閉 ExecutorService,你可以調用 shutdownNow() 方法。這個方法會嘗試馬上關閉所有正在執行的任務,并且跳過所有已經提交但是還沒有運行的任務。但是對于正在執行的任務,是否能夠成功關閉它是無法保證的,有可能他們真的被關閉掉了,也有可能它會一直執行到任務結束。這是一個最好的嘗試。
?
五、CompletionService
? ? ? ? 根據上面的介紹我們知道,現在在Java中使用多線程通常不會再使用Thread對象了。而是會用到java.util.concurrent包下的ExecutorService來初始化一個線程池供我們使用。使用ExecutorService類的時候,我們常維護一個list保存submit的callable task所返回的Future對象。然后在主線程中遍歷這個list并調用Future的get()方法取到Task的返回值。
? ? ? ?其實除了使用ExecutorService外,還可通過CompletionService包裝ExecutorService,然后調用其take()方法去取Future對象。
? ? ? ?CompletionService和ExecutorService的主要的區別在于submit的task不一定是按照加入自己維護的list順序完成的。
? ? ? ?ExecutorService中從list中遍歷的每個Future對象并不一定處于完成狀態,這時調用get()方法就會被阻塞住,如果系統是設計成每個線程完成后就能根據其結果繼續做后面的事,這樣對于處于list后面的但是先完成的線程就會增加了額外的等待時間。
? ? ? ?而CompletionService的實現是維護一個保存Future對象的BlockingQueue。只有當這個Future對象狀態是結束的時候,才會加入到這個Queue中,take()方法其實就是Producer-Consumer中的Consumer。它會從Queue中取出Future對象,如果Queue是空的,就會阻塞在那里,直到有完成的Future對象加入到Queue中。所以,先完成的必定先被取出。這樣就減少了不必要的等待時間。
?
六、使用Callable,Future返回結果
? ? ? ?Future<V>代表一個異步執行的操作,通過get()方法可以獲得操作的結果,如果異步操作還沒有完成,則,get()會使當前線程阻塞。FutureTask<V>實現了Future<V>和Runable<V>。Callable代表一個有返回值的操作。
- Callable<Integer>?func?=?new?Callable<Integer>(){??
- ????public?Integer?call()?throws?Exception?{??
- ????????System.out.println("inside?callable");??
- ????????Thread.sleep(1000);??
- ????????return?new?Integer(8);??
- ????}?????????
- };????????
- FutureTask<Integer>?futureTask??=?new?FutureTask<Integer>(func);??
- Thread?newThread?=?new?Thread(futureTask);??
- newThread.start();??
- ??
- try?{??
- ????System.out.println("blocking?here");??
- ????Integer?result?=?futureTask.get();??
- ????System.out.println(result);??
- }?catch?(InterruptedException?ignored)?{??
- }?catch?(ExecutionException?ignored)?{??
- }?
?
? ? ? ?ExecutoreService提供了submit()方法,傳遞一個Callable,或Runnable,返回Future。如果Executor后臺線程池還沒有完成Callable的計算,則調用返回Future對象的get()方法,會阻塞直到計算完成。
? ? ? ?
? ? ? ?Java5以后可以利用Future來跟蹤異步計算的結果。在此之前主線程要想獲得工作線程(異步計算線程)的結果是比較麻煩的事情,需要我們進行特殊的程序結構設計,比較繁瑣而且容易出錯。有了Future我們就可以設計出比較優雅的異步計算程序結構模型:根據分而治之的思想,我們可以把異步計算的線程按照職責分為3類:
? ? ? 1. 異步計算的發起線程(控制線程):負責異步計算任務的分解和發起,把分解好的任務交給異步計算的work線程去執行,發起異步計算后,發起線程可以獲得Futrue的集合,從而可以跟蹤異步計算結果。
? ? ? 2. 異步計算work線程:負責具體的計算任務
? ? ? 3. 異步計算結果收集線程:從發起線程那里獲得Future的集合,并負責監控Future的狀態,根據Future的狀態來處理異步計算的結果。
本文轉自農夫山泉別墅博客園博客,原文鏈接:http://www.cnblogs.com/yaowen/p/6323689.html,如需轉載請自行聯系原作者