Python并發——concurrent.futures梳理
參考官方文檔: concurrent.futures — 啟動并行任務
Executor對象
class concurrent.funtures.Executor
該抽象類是 ThreadPoolExecutor
和 ProcessPoolExecutor
的父類,提供異步執行調用方法。要通過它的子類調用,而不是直接調用。
submit方法
submit(fn, /, *args, **kwargs)
提交一個可執行對象,fn 將按照如下方式被調用:fn(*args. **kwargs)
,并返回一個 Future
對象(下面會詳細說)表示執行的結果,
例:
with ThreadPoolExecutor(max_workers=1) as executor:future = executor.submit(pow, 323, 1235)print(future.result())
map方法
map(func, *iterables, timeout=None, chunksize=1)
-
類似于
map(func, *iterables)
函數,但是以下兩點不同:- iterables 是立即執行而不是延遲執行的;
- func 是異步執行的,對 func 的多個調用可以并發執行。
-
如果從原始調用到
Executor.map()
經過 timeout 秒后,__next__()
已被調用且返回的結果還不可用,那么已返回的迭代器將觸發concurrent.futures.TimeoutError
。 timeout 可以是整數或浮點數。如果 timeout 沒有指定或為 None ,則沒有超時限制。 -
使用
ProcessPoolExecutor
時,這個方法會將 iterables 分割任務塊并作為獨立的任務并提交到執行池中。這些塊的大概數量可以由 chunksize 指定正整數設置。 對很長的迭代器來說,使用大的 chunksize 值比默認值 1 能顯著地提高性能。 chunksize 對ThreadPoolExecutor
沒有效果。 -
如果 func 調用引發一個異常,當從迭代器中取回它的值時這個異常將被引發。
-
iterables 是一個可迭代對象,其中每個元素是傳遞給 func 的參數列表,與
submit
方法直接傳參不同的是,如果需要傳遞給 func 函數多個列表,通常需要再加一個中間層來進行列表參數的解析。python 線程池map()方法傳遞多參數list
shutdown方法
shutdown(wait=True, *, cancel_futures=False)
當待執行的 future
對象完成執行后向執行者發送信號,它就會釋放正在使用的任何資源。 在關閉后調用 Executor.submit()
和 Executor.map()
將會引發 RuntimeError
。
-
如果 wait 為 True 則此方法只有在所有待執行的
future
對象完成執行且釋放已分配的資源后才會返回; -
如果 wait 為 False,方法立即返回,所有待執行的
future
對象完成執行后會釋放已分配的資源。 不管 wait 的值是什么,整個 Python 程序將等到所有待執行的future
對象完成執行后才退出; -
如果 cancel_futures 為 True,此方法將取消所有執行器還未開始運行的掛起的
future
。 任何已完成或正在運行的future
將不會被取消,無論 cancel_futures 的值是什么; -
如果 cancel_futures 和 wait 均為
True
,則執行器已開始運行的所有 Future 將在此方法返回之前完成。 其余的 Future 會被取消。
如果使用 with 語句,你就可以避免顯式調用這個方法,它將會停止 Executor
(就相當于 Executor.shutdown()
調用時 wait 設為 True 一樣等待):
import shutil
with ThreadPoolExecutor(max_workers=4) as e:e.submit(shutil.copy, 'src1.txt', 'dest1.txt')e.submit(shutil.copy, 'src2.txt', 'dest2.txt')e.submit(shutil.copy, 'src3.txt', 'dest3.txt')e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
ThreadPoolExecutor
ThreadPoolExecutor
是 Executor
的子類,它使用線程池來異步執行調用。
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
-
使用最多 max_workers 個線程的線程池來異步執行調用。
-
(Python3.7之后)initializer 是在每個 worker 線程開始處調用的一個可選可調用對象。 initargs 是傳遞給 initializer 的元組參數。任何向池提交更多工作的嘗試, initializer 都將引發一個異常,當前所有等待的工作都會引發一個
BrokenThreadPool
。 -
(Python3.5之后)如果 max_workers 為 None 或沒有指定,將默認為機器處理器的個數,假如
ThreadPoolExecutor
則重于I/O操作而不是CPU運算,那么可以乘以 5 ,同時工作線程的數量可以比ProcessPoolExecutor
的數量高。 -
(Python3.6之后)thread_name_prefix 參數允許用戶控制由線程池創建的
threading.Thread
工作線程名稱以方便調試。 -
(Python3.8之后)max_workers 的默認值已改為
min(32, os.cpu_count() + 4)
。 這個默認值會保留至少 5 個工作線程用于 I/O 密集型任務。 對于那些釋放了 GIL 的 CPU 密集型任務,它最多會使用 32 個 CPU 核心。這樣能夠避免在多核機器上不知不覺地使用大量資源。現在 ThreadPoolExecutor 在啟動 max_workers 個工作線程之前也會重用空閑的工作線程。 -
當回調已關聯了一個
Future
然后再等待另一個Future
的結果時就會發產死鎖情況。例如:# 例一 import time def wait_on_b():time.sleep(5)print(b.result()) # b will never complete because it is waiting on a.return 5def wait_on_a():time.sleep(5)print(a.result()) # a will never complete because it is waiting on b.return 6executor = ThreadPoolExecutor(max_workers=2) a = executor.submit(wait_on_b) b = executor.submit(wait_on_a)# 例二 def wait_on_future():f = executor.submit(pow, 5, 2)# This will never complete because there is only one worker thread and# it is executing this function.print(f.result())executor = ThreadPoolExecutor(max_workers=1) executor.submit(wait_on_future)
ThreadPoolExecutor
實例:
import concurrent.futures
import urllib.requestURLS = ['http://www.foxnews.com/','http://www.cnn.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://some-made-up-domain.com/']# Retrieve a single page and report the URL and contents
def load_url(url, timeout):with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:# Start the load operations and mark each future with its URLfuture_to_url = {executor.submit(load_url, url, 60): url for url in URLS}for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:data = future.result()except Exception as exc:print('%r generated an exception: %s' % (url, exc))else:print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor
ProcessPoolExecutor
類是 Executor
的子類,它使用進程池來異步地執行調用。
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
-
ProcessPoolExecutor
會使用multiprocessing
模塊,這允許它繞過 GIL,但也意味著只可以處理和返回可封存的對象。 -
__main__
模塊必須可以被工作者子進程導入。這意味著ProcessPoolExecutor
不可以工作在交互式解釋器中。 -
從可調用對象中調用
Executor
或Future
的方法提交給ProcessPoolExecutor
會導致死鎖。這與ThreadPoolExecutor
類似。 -
(Python3.7之后)異步地執行調用的
Executor
子類使用最多具有 max_workers 個進程的進程池。 如果 max_workers 為 None 或未給出,它將默認為機器的處理器個數。 如果 max_workers 小于等于 0,則將引發ValueError
。 在 Windows 上,max_workers 必須小于等于 61,否則將引發ValueError
。 如果 max_workers 為 None,則所選擇的默認值最多為 61,即使存在更多的處理器。 mp_context 可以是一個多進程上下文或是 None。 它將被用來啟動工作進程。 如果 mp_context 為 None 或未給出,則將使用默認的多進程上下文。 -
(Python3.7之后)initializer 是一個可選的可調用對象,它會在每個工作進程啟動時被調用;initargs 是傳給 initializer 的參數元組。 如果 initializer 引發了異常,則所有當前在等待的任務以及任何向進程池提交更多任務的嘗試都將引發
BrokenProcessPool
。 -
(Python3.3之后)如果其中一個工作進程被突然終止,
BrokenProcessPool
就會馬上觸發。 可預計的行為沒有定義,但執行器上的操作或它的future
對象會被凍結或死鎖。
可以看到,大部分參數含義與 ThreadPoolExecutor
是一致的。
ProcessPoolExecutor
實例:
import concurrent.futures
import mathPRIMES = [112272535095293,112582705942171,112272535095293,115280095190773,115797848077099,1099726899285419]def is_prime(n):if n < 2:return Falseif n == 2:return Trueif n % 2 == 0:return Falsesqrt_n = int(math.floor(math.sqrt(n)))for i in range(3, sqrt_n + 1, 2):if n % i == 0:return Falsereturn Truedef main():with concurrent.futures.ProcessPoolExecutor() as executor:for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):print('%d is prime: %s' % (number, prime))if __name__ == '__main__':main()
Future對象
Future
類將可調用對象封裝為異步執行。Future
實例由 Executor.submit()
創建。將可調用對象封裝為異步執行。Future
實例由 Executor.submit()
創建,除非測試,不應手動直接創建。
class concurrent.futures.Future
-
cancel()
嘗試取消調用。 如果調用正在執行或已結束運行不能被取消則該方法將返回 False,否則調用會被取消并且該方法將返回 True。
-
cancelled()
如果調用成功取消返回 True。
-
running()
如果調用正在執行而且不能被取消那么返回 True 。
-
done()
如果調用已被取消或正常結束那么返回 True。
-
result(timeout=None)
返回調用返回的值。如果調用還沒完成那么這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,
concurrent.futures.TimeoutError
將會被觸發。timeout 可以是整數或浮點數。如果 timeout 沒有指定或為 None,那么等待時間就沒有限制。如果futrue
在完成前被取消則CancelledError
將被觸發。如果調用引發了一個異常,這個方法也會引發同樣的異常。 -
exception(timeout=None)
返回由調用引發的異常。如果調用還沒完成那么這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,
concurrent.futures.TimeoutError
將會被觸發。timeout 可以是整數或浮點數。如果 timeout 沒有指定或為 None,那么等待時間就沒有限制。如果futrue
在完成前被取消則CancelledError
將被觸發。如果調用正常完成那么返回 None。 -
add_done_callback(fn)
附加可調用 fn 到
future
對象。當future
對象被取消或完成運行時,將會調用 fn,而這個future
對象將作為它唯一的參數。加入的可調用對象總被屬于添加它們的進程中的線程按加入的順序調用。如果可調用對象引發一個Exception
子類,它會被記錄下來并被忽略掉。如果可調用對象引發一個BaseException
子類,這個行為沒有定義。如果future
對象已經完成或已取消,fn 會被立即調用。
下面這些 Future
方法用于單元測試和 Executor
實現,一般不會用到,就不詳細介紹了。
-
set_running_or_notify_cancel()
-
set_result(result)
-
set_exception(exception)
concurrent.futures模塊的函數
-
wait
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待 fs 所指定的 Future 實例(可能有不同的 Exectutor 實例創建)完成。fs 若給出重復的 futures 將被移除,只返回一次。
-
返回值是一個 set 組成的命名二元組,其中第一個 set 叫做
done
,包含完成(包括 finished 和 cancelled)的 futures,第二個 set 叫做not_done
,包含未完成(包括 pending 和 running)的 futures。 -
timeout 可以用來控制返回前最大的等待秒數。 timeout 可以為 int 或 float 類型。 如果 timeout 未指定或為
None
,則不限制等待時間。 -
return_when 指定此函數應在何時返回。它必須為以下常數之一:
常量 描述 FIRST_COMPLETED
函數將在任意可等待對象結束或取消時返回。 FIRST_EXCEPTION
函數將在任意可等待對象因引發異常而結束時返回。當沒有引發任何異常時它就相當于 ALL_COMPLETED
。ALL_COMPLETED
函數將在所有可等待對象結束或取消時返回。 -
-
as_complete
concurrent.futures.as_completed(fs, timeout=None)
返回一個包含 fs 所指定的
Future
實例(可能由不同的Executor
實例創建)的迭代器,這些實例會在完成時生成 future 對象(包括正常結束或被取消的 future 對象)。 任何由 fs 所指定的重復 future 對象將只被返回一次。 任何在as_completed()
被調用之前完成的 future 對象將優先被生成。 如果__next__()
被調用并且在對as_completed()
的原始調用 timeout 秒之后結果仍不可用,則返回的迭代器將引發concurrent.futures.TimeoutError
。 timeout 可以為整數或浮點數。 如果 timeout 未指定或為 None,則不限制等待時間。
Exception 類
-
concurrent.futures.CancelledError
future 對象被取消時會觸發。
-
concurrent.futures.TimeoutError
future 對象執行超出給定的超時數值時引發。
-
concurrent.futures.BrokenExecutor
當執行器被某些原因中斷而且不能用來提交或執行新任務時就會被引發派生于
RuntimeError
的異常類。3.7 新版功能. -
concurrent.futures.InvalidStateError
當某個操作在一個當前狀態所不允許的 future 上執行時將被引發。3.8 新版功能.
-
concurrent.futures.thread.BrokenThreadPool
當
ThreadPoolExecutor
中的其中一個工作者初始化失敗時會引發派生于BrokenExecutor
的異常類。3.7 新版功能. -
concurrent.futures.process.BrokenProcessPool
當 ThreadPoolExecutor
中的其中一個 worker 不完整終止時(比如,被外部殺死)會引發派生于 BrokenExecutor
( 原名 RuntimeError
) 的異常類。3.3 新版功能.