Python并發——concurrent.futures梳理

Python并發——concurrent.futures梳理

參考官方文檔: concurrent.futures — 啟動并行任務

Executor對象

class concurrent.funtures.Executor

該抽象類是 ThreadPoolExecutorProcessPoolExecutor 的父類,提供異步執行調用方法。要通過它的子類調用,而不是直接調用。

submit方法

submit(fn, /, *args, **kwargs)

提交一個可執行對象,fn 將按照如下方式被調用:fn(*args. **kwargs),并返回一個 Future 對象(下面會詳細說)表示執行的結果,

例:

with ThreadPoolExecutor(max_workers=1) as executor:future = executor.submit(pow, 323, 1235)print(future.result())

map方法

map(func, *iterables, timeout=None, chunksize=1)
  • 類似于 map(func, *iterables) 函數,但是以下兩點不同:

    • iterables 是立即執行而不是延遲執行的;
    • func 是異步執行的,對 func 的多個調用可以并發執行。
  • 如果從原始調用到 Executor.map() 經過 timeout 秒后, __next__() 已被調用且返回的結果還不可用,那么已返回的迭代器將觸發 concurrent.futures.TimeoutErrortimeout 可以是整數或浮點數。如果 timeout 沒有指定或為 None ,則沒有超時限制。

  • 使用 ProcessPoolExecutor 時,這個方法會將 iterables 分割任務塊并作為獨立的任務并提交到執行池中。這些塊的大概數量可以由 chunksize 指定正整數設置。 對很長的迭代器來說,使用大的 chunksize 值比默認值 1 能顯著地提高性能。 chunksizeThreadPoolExecutor 沒有效果。

  • 如果 func 調用引發一個異常,當從迭代器中取回它的值時這個異常將被引發。

  • iterables 是一個可迭代對象,其中每個元素是傳遞給 func 的參數列表,與 submit 方法直接傳參不同的是,如果需要傳遞給 func 函數多個列表,通常需要再加一個中間層來進行列表參數的解析。python 線程池map()方法傳遞多參數list

shutdown方法

shutdown(wait=True, *, cancel_futures=False)

當待執行的 future 對象完成執行后向執行者發送信號,它就會釋放正在使用的任何資源。 在關閉后調用 Executor.submit()Executor.map() 將會引發 RuntimeError

  • 如果 waitTrue 則此方法只有在所有待執行的 future 對象完成執行且釋放已分配的資源后才會返回;

  • 如果 waitFalse,方法立即返回,所有待執行的 future 對象完成執行后會釋放已分配的資源。 不管 wait 的值是什么,整個 Python 程序將等到所有待執行的 future 對象完成執行后才退出;

  • 如果 cancel_futuresTrue,此方法將取消所有執行器還未開始運行的掛起的 future。 任何已完成或正在運行的 future 將不會被取消,無論 cancel_futures 的值是什么;

  • 如果 cancel_futureswait 均為 True,則執行器已開始運行的所有 Future 將在此方法返回之前完成。 其余的 Future 會被取消。

如果使用 with 語句,你就可以避免顯式調用這個方法,它將會停止 Executor (就相當于 Executor.shutdown() 調用時 wait 設為 True 一樣等待):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:e.submit(shutil.copy, 'src1.txt', 'dest1.txt')e.submit(shutil.copy, 'src2.txt', 'dest2.txt')e.submit(shutil.copy, 'src3.txt', 'dest3.txt')e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

ThreadPoolExecutor

ThreadPoolExecutorExecutor 的子類,它使用線程池來異步執行調用。

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
  • 使用最多 max_workers 個線程的線程池來異步執行調用。

  • (Python3.7之后)initializer 是在每個 worker 線程開始處調用的一個可選可調用對象。 initargs 是傳遞給 initializer 的元組參數。任何向池提交更多工作的嘗試, initializer 都將引發一個異常,當前所有等待的工作都會引發一個 BrokenThreadPool

  • (Python3.5之后)如果 max_workersNone 或沒有指定,將默認為機器處理器的個數,假如ThreadPoolExecutor 則重于I/O操作而不是CPU運算,那么可以乘以 5 ,同時工作線程的數量可以比 ProcessPoolExecutor 的數量高。

  • (Python3.6之后)thread_name_prefix 參數允許用戶控制由線程池創建的 threading.Thread 工作線程名稱以方便調試。

  • (Python3.8之后)max_workers 的默認值已改為 min(32, os.cpu_count() + 4)。 這個默認值會保留至少 5 個工作線程用于 I/O 密集型任務。 對于那些釋放了 GIL 的 CPU 密集型任務,它最多會使用 32 個 CPU 核心。這樣能夠避免在多核機器上不知不覺地使用大量資源。現在 ThreadPoolExecutor 在啟動 max_workers 個工作線程之前也會重用空閑的工作線程。

  • 當回調已關聯了一個 Future 然后再等待另一個Future 的結果時就會發產死鎖情況。例如:

    # 例一
    import time
    def wait_on_b():time.sleep(5)print(b.result())  # b will never complete because it is waiting on a.return 5def wait_on_a():time.sleep(5)print(a.result())  # a will never complete because it is waiting on b.return 6executor = ThreadPoolExecutor(max_workers=2)
    a = executor.submit(wait_on_b)
    b = executor.submit(wait_on_a)# 例二
    def wait_on_future():f = executor.submit(pow, 5, 2)# This will never complete because there is only one worker thread and# it is executing this function.print(f.result())executor = ThreadPoolExecutor(max_workers=1)
    executor.submit(wait_on_future)
    

ThreadPoolExecutor 實例:

import concurrent.futures
import urllib.requestURLS = ['http://www.foxnews.com/','http://www.cnn.com/','http://europe.wsj.com/','http://www.bbc.co.uk/','http://some-made-up-domain.com/']# Retrieve a single page and report the URL and contents
def load_url(url, timeout):with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:# Start the load operations and mark each future with its URLfuture_to_url = {executor.submit(load_url, url, 60): url for url in URLS}for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:data = future.result()except Exception as exc:print('%r generated an exception: %s' % (url, exc))else:print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

ProcessPoolExecutor 類是 Executor 的子類,它使用進程池來異步地執行調用。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
  • ProcessPoolExecutor 會使用 multiprocessing 模塊,這允許它繞過 GIL,但也意味著只可以處理和返回可封存的對象。

  • __main__ 模塊必須可以被工作者子進程導入。這意味著 ProcessPoolExecutor不可以工作在交互式解釋器中。

  • 從可調用對象中調用 ExecutorFuture 的方法提交給 ProcessPoolExecutor 會導致死鎖。這與 ThreadPoolExecutor 類似。

  • (Python3.7之后)異步地執行調用的 Executor 子類使用最多具有 max_workers 個進程的進程池。 如果 max_workersNone 或未給出,它將默認為機器的處理器個數。 如果 max_workers 小于等于 0,則將引發 ValueError。 在 Windows 上,max_workers 必須小于等于 61,否則將引發 ValueError。 如果 max_workersNone,則所選擇的默認值最多為 61,即使存在更多的處理器。 mp_context 可以是一個多進程上下文或是 None。 它將被用來啟動工作進程。 如果 mp_contextNone 或未給出,則將使用默認的多進程上下文。

  • (Python3.7之后)initializer 是一個可選的可調用對象,它會在每個工作進程啟動時被調用;initargs 是傳給 initializer 的參數元組。 如果 initializer 引發了異常,則所有當前在等待的任務以及任何向進程池提交更多任務的嘗試都將引發 BrokenProcessPool

  • (Python3.3之后)如果其中一個工作進程被突然終止,BrokenProcessPool 就會馬上觸發。 可預計的行為沒有定義,但執行器上的操作或它的 future 對象會被凍結或死鎖。

可以看到,大部分參數含義與 ThreadPoolExecutor 是一致的。

ProcessPoolExecutor 實例:

import concurrent.futures
import mathPRIMES = [112272535095293,112582705942171,112272535095293,115280095190773,115797848077099,1099726899285419]def is_prime(n):if n < 2:return Falseif n == 2:return Trueif n % 2 == 0:return Falsesqrt_n = int(math.floor(math.sqrt(n)))for i in range(3, sqrt_n + 1, 2):if n % i == 0:return Falsereturn Truedef main():with concurrent.futures.ProcessPoolExecutor() as executor:for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):print('%d is prime: %s' % (number, prime))if __name__ == '__main__':main()

Future對象

Future 類將可調用對象封裝為異步執行。Future 實例由 Executor.submit() 創建。將可調用對象封裝為異步執行。Future 實例由 Executor.submit()創建,除非測試,不應手動直接創建。

class concurrent.futures.Future
  • cancel()

    嘗試取消調用。 如果調用正在執行或已結束運行不能被取消則該方法將返回 False,否則調用會被取消并且該方法將返回 True

  • cancelled()

    如果調用成功取消返回 True

  • running()

    如果調用正在執行而且不能被取消那么返回 True

  • done()

    如果調用已被取消或正常結束那么返回 True

  • result(timeout=None)

    返回調用返回的值。如果調用還沒完成那么這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,concurrent.futures.TimeoutError 將會被觸發。timeout 可以是整數或浮點數。如果 timeout 沒有指定或為 None,那么等待時間就沒有限制。如果 futrue 在完成前被取消則 CancelledError 將被觸發。如果調用引發了一個異常,這個方法也會引發同樣的異常。

  • exception(timeout=None)

    返回由調用引發的異常。如果調用還沒完成那么這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,concurrent.futures.TimeoutError 將會被觸發。timeout 可以是整數或浮點數。如果 timeout 沒有指定或為 None,那么等待時間就沒有限制。如果 futrue 在完成前被取消則 CancelledError 將被觸發。如果調用正常完成那么返回 None

  • add_done_callback(fn)

    附加可調用 fnfuture 對象。當 future 對象被取消或完成運行時,將會調用 fn,而這個 future 對象將作為它唯一的參數。加入的可調用對象總被屬于添加它們的進程中的線程按加入的順序調用。如果可調用對象引發一個 Exception 子類,它會被記錄下來并被忽略掉。如果可調用對象引發一個 BaseException 子類,這個行為沒有定義。如果 future 對象已經完成或已取消,fn 會被立即調用。

下面這些 Future 方法用于單元測試和 Executor 實現,一般不會用到,就不詳細介紹了。

  • set_running_or_notify_cancel()

  • set_result(result)

  • set_exception(exception)

concurrent.futures模塊的函數

  • wait

    concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
    

    等待 fs 所指定的 Future 實例(可能有不同的 Exectutor 實例創建)完成。fs 若給出重復的 futures 將被移除,只返回一次。

    • 返回值是一個 set 組成的命名二元組,其中第一個 set 叫做 done,包含完成(包括 finished 和 cancelled)的 futures,第二個 set 叫做 not_done,包含未完成(包括 pending 和 running)的 futures。

    • timeout 可以用來控制返回前最大的等待秒數。 timeout 可以為 int 或 float 類型。 如果 timeout 未指定或為 None ,則不限制等待時間。

    • return_when 指定此函數應在何時返回。它必須為以下常數之一:

    常量描述
    FIRST_COMPLETED函數將在任意可等待對象結束或取消時返回。
    FIRST_EXCEPTION函數將在任意可等待對象因引發異常而結束時返回。當沒有引發任何異常時它就相當于 ALL_COMPLETED
    ALL_COMPLETED函數將在所有可等待對象結束或取消時返回。
  • as_complete

    concurrent.futures.as_completed(fs, timeout=None)
    

    返回一個包含 fs 所指定的 Future 實例(可能由不同的 Executor實例創建)的迭代器,這些實例會在完成時生成 future 對象(包括正常結束或被取消的 future 對象)。 任何由 fs 所指定的重復 future 對象將只被返回一次。 任何在 as_completed()被調用之前完成的 future 對象將優先被生成。 如果 __next__() 被調用并且在對 as_completed()的原始調用 timeout 秒之后結果仍不可用,則返回的迭代器將引發 concurrent.futures.TimeoutErrortimeout 可以為整數或浮點數。 如果 timeout 未指定或為 None,則不限制等待時間。

Exception 類

  • concurrent.futures.CancelledError

    future 對象被取消時會觸發。

  • concurrent.futures.TimeoutError

future 對象執行超出給定的超時數值時引發。

  • concurrent.futures.BrokenExecutor

    當執行器被某些原因中斷而且不能用來提交或執行新任務時就會被引發派生于 RuntimeError 的異常類。3.7 新版功能.

  • concurrent.futures.InvalidStateError

    當某個操作在一個當前狀態所不允許的 future 上執行時將被引發。3.8 新版功能.

  • concurrent.futures.thread.BrokenThreadPool

    ThreadPoolExecutor 中的其中一個工作者初始化失敗時會引發派生于 BrokenExecutor的異常類。3.7 新版功能.

  • concurrent.futures.process.BrokenProcessPool

ThreadPoolExecutor 中的其中一個 worker 不完整終止時(比如,被外部殺死)會引發派生于 BrokenExecutor( 原名 RuntimeError ) 的異常類。3.3 新版功能.

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/532483.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/532483.shtml
英文地址,請注明出處:http://en.pswp.cn/news/532483.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

TensorRT ONNX 基礎

TensorRT ONNX 基礎 tensorRT從零起步邁向高性能工業級部署&#xff08;就業導向&#xff09; 課程筆記&#xff0c;講師講的不錯&#xff0c;可以去看原視頻支持下。 概述 TensorRT 的核心在于對模型算子的優化&#xff08;合并算子、利用當前 GPU 特性選擇特定的核函數等多種…

回文子串、回文子序列相關題目

回文子串、回文子序列相關題目 回文子串是要連續的&#xff0c;回文子序列可不是連續的。 516. 最長回文子序列 dp數組含義&#xff1a;dp[i][j]dp[i][j]dp[i][j] 表示子序列 s[i,j]s[i,j]s[i,j] 中的最長回文子序列的長度。 dp數組初始化&#xff1a;子序列長度為 1 時&am…

mmdetection tools工具梳理

mmdetection tools工具梳理 mmdetection 是一個非常好用的開源目標檢測框架&#xff0c;我們可以用它方便地訓練自己的目標檢測模型&#xff0c;mmdetection 項目倉庫提供許多實用的工具來實現幫助我們進行各種測試。本篇將梳理以下 mmdetection 項目倉庫 tools 目錄下的各種實…

TensorRT ONNX 基礎(續)

TensorRT ONNX 基礎&#xff08;續&#xff09; PyTorch正確導出ONNX 幾條推薦的原則&#xff0c;可以減少潛在的錯誤&#xff1a; 對于任何使用到 shape、size 返回值的參數時&#xff0c;例如 tensor.view(tensor.size(0), -1) 這類操作&#xff0c;避免直接使用 tensor.s…

frp實現內網穿透極簡教程

frp實現內網穿透極簡教程 本文是內網穿透極簡教程&#xff0c;為求簡潔&#xff0c;我們不介紹為什么內網穿透也不介紹其原理&#xff0c;這里假設各位讀者都已經明確的知道自己的目的&#xff0c;本文僅介紹如何安裝配置 frp 實現內網穿透。 簡單來說&#xff0c;內網穿透就…

圖像預處理之warpaffine與雙線性插值及其高性能實現

圖像預處理之warpaffine與雙線性插值及其高性能實現 視頻講解&#xff1a;https://www.bilibili.com/video/BV1ZU4y1A7EG 代碼Repo&#xff1a;https://github.com/shouxieai/tensorRT_Pro 本文為視頻講解的個人筆記。 warpaffine矩陣變換 對于坐標點的變換&#xff0c;我們通…

LeetCode-10 正則表達式匹配

LeetCode-10 正則表達式匹配 動態規劃 10. 正則表達式匹配 dp數組含義&#xff1a;dp[i][j]dp[i][j]dp[i][j] 表示 s[0:i?1]s[0:i-1]s[0:i?1] 能否被 p[0:j?1]p[0:j-1]p[0:j?1] 成功匹配。 狀態轉移方程 &#xff1a; 如果 s[i?1]p[j?1]s[i-1]p[j-1]s[i?1]p[j?1] …

shell if判斷和for循環常見寫法

shell if判斷和for循環常見寫法 轉自&#xff1a; Shell中for循環的幾個常用寫法 Shell中if 條件判斷總結 if常見寫法 一、if的基本語法: if [ command ];then符合該條件執行的語句 elif [ command ];then符合該條件執行的語句 else符合該條件執行的語句 fibash shell會按順序…

關于pytorch使用多個dataloader并使用zip和cycle來進行循環時出現的顯存泄漏的問題

關于pytorch使用多個dataloader并使用zip和cycle來進行循環時出現的顯存泄漏的問題 如果我們想要在 Pytorch 中同時迭代兩個 dataloader 來處理數據&#xff0c;會有兩種情況&#xff1a;一是我們按照較短的 dataloader 來迭代&#xff0c;長的 dataloader 超過的部分就丟棄掉…

neovim及coc.nvim自動補全初探

neovim及coc.nvim自動補全初探 安裝 # mac # 安裝 brew install neovim # 查看neovim安裝路徑 brew list nvim# ubuntu apt install neovim習慣了打開 vi/vim 的方式&#xff0c;可以用個 alias 在 ~/.zshrc 中設置一下&#xff1a; alias vi"nvim"插件 vim-plug…

sed 簡明教程

sed 簡明教程 轉自&#xff1a;https://coolshell.cn/articles/9104.html awk于1977年出生&#xff0c;今年36歲本命年&#xff0c;sed比awk大2-3歲&#xff0c;awk就像林妹妹&#xff0c;sed就是寶玉哥哥了。所以 林妹妹跳了個Topless&#xff0c;他的哥哥sed坐不住了&#xf…

awk 簡明教程

awk 簡明教程 轉自&#xff1a;https://coolshell.cn/articles/9070.html 有一些網友看了前兩天的《Linux下應該知道的技巧》希望我能教教他們用awk和sed&#xff0c;所以&#xff0c;出現了這篇文章。我估計這些80后的年輕朋友可能對awk/sed這類上古神器有點陌生了&#xff0c…

應該知道的LINUX技巧

應該知道的LINUX技巧 轉自&#xff1a;https://coolshell.cn/articles/8883.html 這篇文章來源于Quroa的一個問答《What are some time-saving tips that every Linux user should know?》—— Linux用戶有哪些應該知道的提高效率的技巧。我覺得挺好的&#xff0c;總結得比較好…

[深度][PyTorch] DDP系列第一篇:入門教程

[深度][PyTorch] DDP系列第一篇&#xff1a;入門教程 轉自&#xff1a;[原創][深度][PyTorch] DDP系列第一篇&#xff1a;入門教程 概覽 想要讓你的PyTorch神經網絡在多卡環境上跑得又快又好&#xff1f;那你definitely需要這一篇&#xff01; No one knows DDP better than I…

[深度][PyTorch] DDP系列第二篇:實現原理與源代碼解析

[深度][PyTorch] DDP系列第二篇&#xff1a;實現原理與源代碼解析 轉自&#xff1a;https://zhuanlan.zhihu.com/p/187610959 概覽 想要讓你的PyTorch神經網絡在多卡環境上跑得又快又好&#xff1f;那你definitely需要這一篇&#xff01; No one knows DDP better than I do! …

[深度][PyTorch] DDP系列第三篇:實戰與技巧

[深度][PyTorch] DDP系列第三篇&#xff1a;實戰與技巧 轉自&#xff1a;https://zhuanlan.zhihu.com/p/250471767 零. 概覽 想要讓你的PyTorch神經網絡在多卡環境上跑得又快又好&#xff1f;那你definitely需要這一篇&#xff01; No one knows DDP better than I do! – – …

PIL、OpenCV中resize算子實現不同的問題

PIL、OpenCV中resize算子實現不同的問題 測試圖像&#xff1a;https://raw.githubusercontent.com/TropComplique/ssd-pytorch/master/images/dogs-and-cats.jpg &#xff08;直接 wget 可獲得&#xff09; 測試版本&#xff1a; opencv-python 4.4.0.46Pillow 8.0.1 測試代…

mac X11 XQuartz的安裝與使用

mac X11 XQuartz的安裝與使用 本地系統&#xff1a;MacOS 12.4 遠程主機系統&#xff1a;Ubuntu 18.04 命令說明 ssh命令 ssh 命令大家很熟悉了&#xff0c;這里僅介紹與 X11 forwarding 相關的幾個選項。 本部分譯自 ssh 命令手冊&#xff0c;可見 man ssh -X &#xf…

機器學習:系統設計與實現 分布式訓練

機器學習系統:設計與實現 分布式訓練 轉自&#xff1a;https://openmlsys.github.io/chapter_distributed_training/index.html 隨著機器學習的進一步發展&#xff0c;科學家們設計出更大型&#xff0c;更多功能的機器學習模型&#xff08;例如說&#xff0c;GPT-3&#xff09;…

Linux命令行及各常用工具代理設置

Linux命令行及各常用工具代理設置 命令行代理設置 1 通過命令行指定 直接為當前命令行設置代理 對當前終端的全部工具&#xff08;apt、curl、wget、git 等全都有效&#xff09;以下僅以 http 代理為例&#xff0c;如果是其他協議&#xff08;如 socks 等&#xff09;自行改…