一、進程Process
????????擁有自己獨立的堆和棧,既不共享堆,也不共享棧,進程由操作系統調度;進程切換需要的資源很最大,效率低。?
????????對于操作系統來說,一個任務就是一個進程(Process),比如打開一個瀏覽器就是啟動一個瀏覽器進程,就啟動了一個記事本進程,打開兩個記事本就啟動了兩個記事本進程,打開一個Word就啟動了一個Word進程。
?1.1、進程介紹
????????進程就是程序的一次執行過程,就是一個正在執行的任務,一段程序的每一次 運行都會產生一個或多個進程。進程是有生命周期的,大部分會隨著程序的運行而創 建,隨著程序的結束而終止,也可以去手動結束進程。在操作系統中,進程是操作系 統進行資源分配和調度的基本單位。每個進程都有自己的私有地址空間、執行堆棧、 程序計數器、局部變量以及其他系統資源(如文件描述符、網絡連接等)等。通俗的 說,一個正在運行的程序就是一個進程,比如QQ、微信等,但也有可能這個程序會 生成多個進程。
1.2、進程和程序的關系
進程:進程是程序的一次執行過程,它是動態的,具備生命周期,在程序運行時存 在,程序執行完畢及用戶主動結束、系統錯誤等都會導致進程結束。
程序:程序是靜態的,沒有生命周期,在磁盤上存放,由一系列指令和數據組成的文 件,這些文件可以被操作系統加載到內存中并執行。
1.3、進程的優缺點?
進程的優點:
????????1.可以使用計算機多核,進行任務的并行執行,提高執行效率
????????2.運行不受其他進程影響,創建方便
????????3.空間獨立,數據安全
進程的缺點:
????????進程的創建和刪除消耗的系統資源較多
1.4、進程的種類?
常見的Windows進程類型:
1. 系統進程(System Process) 這些進程是操作系統啟動時由系統本身創建的,它們負責管理系統的核心功 能,如內存管理、設備驅動程序、安全性和其他系統級服務。 例如:lsass.exe(本地安全認證子系統服務)、wininit.exe(Windows初始化 進程)。
2. 服務進程(Service Process) 這些進程通常在系統啟動時或按需啟動,它們在后臺運行,提供網絡、安 全、系統維護等服務,通常不直接與用戶交互。 例如: DHCP Client(管理IP地址分配)。
3. 用戶進程(User Process) 這些是由用戶啟動的應用程序創建的進程,用于執行特定任務,如文字處 理、網頁瀏覽、游戲等。 例如: winword.exe(Microsoft Word)、 chrome.exe(Google Chrome 瀏覽器)。
4. 交互式進程(Interactive Process) 這些是與用戶交互的進程,它們通常在用戶登錄并直接與操作系統交互時運 行。 例如:命令提示符( cmd.exe)或PowerShell( powershell.exe)。
特殊進程:
1. 守護進程(Daemo Process)
????????守護進程(Daemon)是一種在Unix和類Unix操作系統中運行的后臺進程。它們 通常在系統啟動時自動啟動,并在后臺執行特定的系統級任務,不需要用戶直接 干預。在Windows操作系統中,沒有與Unix和類Unix系統中的守護進程 (Daemon)完全等同的概念。然而,Windows有類似的系統服務,它們在功能 上與守護進程相似,都是在后臺運行,執行系統級任務,通常不需要用戶直接干 預。
2. 僵尸進程(Zombie Process)
????????僵尸進程是指那些已經結束但仍然存在于進程表中,等待父進程收集其退出狀態 的進程。它們不再占用系統資源,除了進程表中的一個條目,以及一個退出狀態 碼。僵尸進程通常不會占用CPU時間,但它們仍然存在于進程表中,直到父進程 去收集其退出狀態。在Windows中這種情況不常見,因為進程在Windows中結 束后會通過特定的機制去處理子進程的終止狀態。
3. 孤兒進程(Orphan Process)
????????孤兒進程是指那些父進程已經結束,而它們仍然在運行的進程。由于孤兒進程的 父進程已經終止,它們會被init進程(進程號為1)所領養。init進程負責處理這 些孤兒進程,確保它們能夠正常結束。而在Windows中,孤兒進程會被系統進程 收養,確保他們能繼續運行或正常結束。?
1.5、PID?
????????PID是“進程標識符”(Process Identifier)的縮寫,它是一個由操作系統分配給每個 進程的唯一數字(大于等于0)。在操作系統中,每個進程都會被賦予一個唯一的 PID,以便系統可以追蹤和管理這些進程。
1. 唯一性:
????????在一個操作系統中,每個正在運行的進程都有一個唯一的PID。即使在 進程結束后,該PID通常也不會立即被重新分配給其他進程,以避免混淆。
2. 進程管理:
????????操作系統使用PID來識別和管理進程。例如,可以使用PID來發送信號 給進程(如終止信號)、檢查進程的狀態、或者調整進程的優先級。
3. 系統資源:
????????PID還用于關聯進程使用的系統資源,如打開的文件、網絡連接和內 存分配。
4. 父進程與子進程:
????????每個進程除了有自己的PID外,還有一個與之關聯的父PID (PPID)。父PID是指啟動該進程的進程的PID。
5. 工具和命令:
????????在命令行界面中,可以使用各種工具和命令來查看和管理進程,如 ps(Unix-like系統)、 tasklist(Windows)等,這些工具通常會顯示進程的 PID。?
kill命令:
????????在Windows中,可以在PowerShell中通過kill + PID 來終止該進程的運行。
????????在Linux中,kill命令會更加強大。?
1.6、進程的創建方式?
Python的標準庫提供了個模塊:multiprocessing
進程的創建可以通過分為兩種方式:
1. 方法包裝
2. 類包裝
創建進程后,使用start()啟動進程
1.6.1、?方法包裝
from multiprocessing import Process
import os
from time import sleepdef func1(name):print("當前進程ID:",os.getpid())print("父進程ID:",os.getppid())print(f"Process:{name} start")sleep(3)print(f"Process:{name} end")if __name__ =="__main__":print("當前進程ID:",os.getpid())# 創建進程p1 = Process(target=func1, args=('p1',))p1.start()p1.join()
'''
當前進程ID: 8560
當前進程ID: 14004
父進程ID: 8560
Process:p1 start
Process:p1 end
'''
1.6.2、?類包裝
使用 Process 類創建實例化對象,其本質是調用該類的構造方法創建新進程。Process 類的構造方法格式如下:
????def __init__(self,group=None,target=None,name=None,args=(),kwargs={})
其中,各個參數的含義為:
????????group
:該參數未進行實現,不需要傳參;
????????target
:為新建進程指定執行任務,也就是指定一個函數;
????????name
:為新建進程設置名稱;
????????args
:為 target 參數指定的參數傳遞非關鍵字參數;
????????kwargs
:為 target 參數指定的參數傳遞關鍵字參數。
from multiprocessing import Process
from time import sleepclass MyProcess(Process):def __init__(self, name):Process.__init__(self)self.name = namedef run(self):print(f"Process:{self.name} start")sleep(3)print(f"Process:{self.name} end")if __name__ == "__main__":#創建進程p1 = MyProcess("p1")p1.start()p1.join()
'''
Process:p1 start
Process:p1 end
'''
二、多進程
2.1、多進程介紹
????????多進程(Multiprocessing)是指在同一時間內同時執行多個程序的技術或能力,每 個進程都有自己的內存空間、文件描述符及其他系統資源。
2.2、特點
1. 并發執行:多進程可以在多核或多處理器系統上實現真正的并行執行,即不同的 進程可以在不同的CPU核心上同時運行。
2. 資源分配:每個進程通常擁有獨立的內存空間,這意味著它們不共享內存,這減 少了資源共享帶來的復雜性和潛在的問題。
3. 獨立性:全局變量在多個進程中不共享資源,進程之間的數據是獨立的,默認是 互不影響的。且由于進程間相對獨立,一個進程的失敗通常不會影響到其他進 程,這提高了系統的穩定性和可靠性。
4. 進程間通信(IPC):進程間通信機制(如管道、消息隊列、共享內存、信號量 等)用于允許進程之間交換數據和同步操作。
2.3、創建多進程
????????multiprocessing庫提供了創建和管理進程的方法,它允許程序員創 建進程,并提供了一系列的API來支持進程間數據共享、同步和通信。
multiprocessing.Process(group=None, target=None, name=None, args= (), kwargs={}, *, daemon=None)
group:通常不使用,是為將來可能的擴展預留的。
target:表示調用對象,即子進程要執行的任務。這個參數通常是一個函數的 名字。
name:進程的名稱。默認情況下,進程名稱為 Process-N,其中N是進程的序 號。可以通過這個參數來指定一個自定義的名稱。
args:表示調用target函數時傳遞的參數元組
kwargs:表示調用?target函數時傳遞的參數字典
daemon:如果設置為 True,則子進程將是一個守護進程。當主進程結束時,所 有守護進程都將被終止。默認值為 None,表示繼承當前進程的守護進程設置。?
?創建的對象方法:
1. start():啟動進程。這將執行在創建 用start時會自動調用run。
2. Process對象時指定的 target函數,調 run():此方法用于定義進程啟動時執行的操作。默認情況下,它調用傳遞給 target參數的函數。如果 進程行為。
3. target沒有指定,你可以覆蓋此方法來實現自定義的 join([timeout]):主進程等待子進程終止或直到達到指定的超時時間。如果 timeout被省略或為
4. None,則主進程將一直停留在這里。 is_alive():返回一個布爾值,表示進程是否仍然在運行。
5. terminate():強行終止進程,且不會進行任何清理操作。 如果該進程創建了 子進程,那么這個子進程就變成了僵尸進程,如果p還保存了一個鎖,那么這個 鎖也不會被釋放,會變成死鎖。
6. kill():終止進程。在Unix上,這是通過發送SIGKILL信號實現的;在Windows 上,則是通過調用 TerminateProcess。
7. close():關閉進程。此方法釋放Process對象所持有的資源,如果子進程仍在 進行,調用此方法將是錯誤的。?
屬性:?
1. name:返回或設置進程的名稱。
2. daemon:返回或設置進程是否為守護進程,如果設置的話,必須在start()之前 設置。
3. pid:返回進程的PID。
4. exitcode:返回進程的退出代碼,如果進程未結束,就返回None,負值-N表示 子進程被信號N終止,正常終止返回0。
5. authkey:返回或設置進程間通信的密鑰,用于進程間通信的身份認證
import multiprocessingdef worker(name):print(f"Worker {name}")if __name__ == "__main__":processes = []for i in range(3):process = multiprocessing.Process(target=worker, args=(f"Process-{i+1}",))processes.append(process)process.start()for process in processes:process.join()print("All workers have finished their work")'''
Worker Process-1
Worker Process-2
Worker Process-3
All workers have finished their work
'''
三、創建進程?
3.1、創建不傳參的進程?
3.1.1、查看各進程的PID及PPID
import os
from multiprocessing import Process
def func1():my_pid = os.getpid()ppid = os.getppid()print(f'執行func2的pid是:{my_pid}')print(f'執行func2的ppid是:{ppid}')
if __name__ == '__main__':p1 = Process(target=func1)p1.start()# 獲取當前進程的PIDcurrent_pid = os.getpid()print(f"主進程的PID為: {current_pid}")p1.join()
'''
主進程的PID為: 10264
執行func2的pid是:12924
執行func2的ppid是:10264
'''
3.1.2、運行時間統計
import time
from multiprocessing import Process
def func1():print("Function 1 is running.")time.sleep(2)print("Function 1 has finished.")
def func2():print("Function 2 is running.")time.sleep(2)print("Function 2 has finished.")
if __name__ == '__main__':start_time = time.time() # 記錄程序開始時間# 創建一個子進程來運行func2process = Process(target=func2)process.start()# 獲取創建的進程的PIDpid = process.pid# 主進程運行func1func1()# 等待子進程結束process.join()# 計算并打印整個程序的運行時間total_time = time.time() - start_timeprint(f"Total execution time: {total_time:.2f} seconds")'''
Function 1 is running.
Function 2 is running.
Function 1 has finished.
Function 2 has finished.
Total execution time: 2.09 seconds
'''
3.2、創建傳參的進程
3.2.1、使用args傳參
from multiprocessing import Process
import time
def say_hello(name):time.sleep(2)print(f'Hello {name}, Nice to meet you!')
if __name__ == '__main__':start = time.time()process1 = Process(target=say_hello, args=('Alice', ))process1.start()say_hello('Bob')process1.join()exe_time = time.time() - startprint(exe_time)
'''
Hello Bob, Nice to meet you!
Hello Alice, Nice to meet you!
2.094315528869629
'''
3.2.2、使用kwargs傳參
from multiprocessing import Process
import timedef say_hello(name):time.sleep(2)print(f'Hello {name}, Nice to meet you!')
if __name__ == '__main__':start = time.time()process1 = Process(target=say_hello, kwargs={'name': 'Alice'})process1.start()say_hello('Bob')process1.join()exe_time = time.time() - startprint(exe_time)'''
Hello Bob, Nice to meet you!
Hello Alice, Nice to meet you!
2.088036060333252
'''
3.3、一次性創建多個進程
import sys
from multiprocessing import Process
import time
def say_hello(name):time.sleep(1)print(f'Hello {name}, Nice to meet you!')# 刷新標準輸出緩沖區sys.stdout.flush()
if __name__ == '__main__':names = ['Zhangsan', 'Lisi', 'Wangwu', 'Zhaoliu', 'Tianqi']processes = []start = time.time()for i in range(len(names)):process = Process(target=say_hello, args=(names[i], ))process.start()processes.append(process)for process in processes:process.join()exe_time = time.time() - startprint(exe_time)'''
Hello Zhangsan, Nice to meet you!
Hello Lisi, Nice to meet you!
Hello Wangwu, Nice to meet you!
Hello Zhaoliu, Nice to meet you!
Hello Tianqi, Nice to meet you!
1.1083273887634277
'''
四、進程間通信
4.1、管道
管道是一種在父子進程間或兄弟進程間進行通信的機制。Python的 multiprocessing模塊提供了 Pipe()函數,可以用來創建管道。
4.1.1、創建管道
parent_conn, child_conn = Pipe()
使用 multiprocessing.Pipe()可以創建一個管道。這個函數返回一個由兩個連接 對象組成的元組,這兩個對象分別代表管道的兩端。默認情況下,管道是雙向的,每 個端點都可以即讀又寫。?
from multiprocessing import Process, Pipeparent_conn, child_conn = Pipe()
4.1.2、管道方法
parent_conn和child_conn都是管道對象,它們都擁有共同的方法:
send(obj):發送一個對象到管道的另一端。這個對象必須是可序列化的。
recv():從管道的另一端接收一個對象。該方法是阻塞的。
close():關閉管道連接。當不再需要管道時,應該調用這個方法來釋放資源。
fileno():返回由連接對象使用的文件描述符。
poll(timeout):返回連接對象中是否有可以讀取的數據。如果未指定timeout, 會馬上返回,如果timeout是一個數字,則指定了阻塞的最大秒數,如果未指定 timeout,那么將一直等待。
send_bytes(buffer, offset, size):通過連接發送buffer,offset是buffer中的 偏移量,size是要發送的字節數。數據以一條完整的數據發送。
recv_bytes(maxlength):以字符串的形式返回一條從連接對象另一端發送過來 的字節數據。此方法在接收到數據前一直阻塞。如果連接對象被關閉或沒有數據 可讀取,將拋出異常。如果消息長度大于maxlength,則會拋出異常且該連接對 象不可再讀。
recv_bytes_into(buffer, offset):將一條完整的數據讀入buffer中并返回消息 的字節數,此方法在接收到數據前一直阻塞。如果連接對象被關閉或沒有數據可 讀取,將拋出異常。offset指定buffer中放置消息處的字節偏移量。如果消息長 度大于buffer將拋出異常。
4.1.3、在進程中使用管道
from multiprocessing import Process,Pipedef asend(pipe):pipe.send('張三')def aresve(pipe):print(f'接收到了===={pipe.recv()}====')if __name__=='__main__':ppipe,pipe=Pipe()p1=Process(target=aresve,args=(pipe, ))p1.start()asend(ppipe)p1.join()
# 接收到了====張三====
4.1.4、管道特點
雙向通信:管道允許兩個方向的通信,即每個管道有一個接收端和一個發送端。
點對點連接:管道通常用于兩個進程之間的直接通信,不支持多個進程之間的通 信。
管道大小有限:管道的緩沖區大小是有限的。如果緩沖區滿了,發送操作會阻 塞。
4.1.5、注意事項
管道默認是雙向的,但也可以通過設置 duplex=False來創建單向管道。此時返 回的第一個對象只能接收消息,第二個對象只能發送消息。
當使用管道在進程間傳遞大量數據時,要注意管道可能會成為性能瓶頸。
from multiprocessing import Process, Pipe
def sender(conn):conn.send([42, None, 'hello'])conn.close()
if __name__ == '__main__':# 創建單向管道,parten_conn是只讀的,child_conn是只寫的parent_conn, child_conn = Pipe(duplex=False)# 創建發送者進程# sender函數應該使用child_conn,因為它是可寫的sender_process = Process(target=sender, args=(child_conn,))# 啟動進程sender_process.start()print(parent_conn.recv())# 等待進程結束sender_process.join()
# [42, None, 'hello']
4.2、消息隊列
????????消息隊列提供了一種在進程間傳輸數據的方式,這種方式是通過在內核中維護一個消 息隊列來實現的。進程可以發送數據到隊列,也可以從隊列中接收數據。在Python 的multiprocessing模塊中, Queue類提供了一個先進先出(FIFO)的消息隊列。
4.2.1、創建消息隊列
from multiprocessing import Queue# 創建一個消息隊列
queue = Queue(maxsize=10) # maxsize為隊列中最多可以存放的元素數量
4.2.2、消息隊列的方法
put(obj, block=True, timeout=None):將obj放入隊列,如果可選參數 block是True而且timeout是None,將會阻塞當前進程,直到有空的緩沖槽。如 果timeout是正數,將會在阻塞了最多timeout秒之后還是沒有可用的緩沖槽時拋 出異常。如果block是False,那么在沒有空的緩沖槽時,會立即拋出異常,此時 timeout會被忽略。
get(block=True, timeout=None):從消息隊列里獲取消息。該方法為阻塞等 待的方法。block和timeout的作用與put一致。
empty():如果隊列為空,返回 True,否則返回
full():如果隊列滿了,返回
qsize(): 返回隊列中當前元素的數量。 False。 True,否則返回 False。
get_nowait():立即嘗試從隊列里獲取一個元素,如果隊列為空,拋出 Queue.Empty異常。
put_nowait():立即嘗試向隊列里放入一個元素,如果隊列滿了,拋出 Queue.Full異常。?
4.2.3、在進程中使用消息隊列
from multiprocessing import Process, Queue
import time
def process1(process_queue):print('準備接收數據')print('接收到的數據為:', process_queue.get())
if __name__ == '__main__':process_queue = Queue(5)p1 = Process(target=process1, args=(process_queue, ))p1.start()time.sleep(2)process_queue.put('hello')p1.join()p1.close()
# 準備接收數據
# 接收到的數據為: hello
4.2.4、消息隊列的特點
先進先出(FIFO):隊列遵循先進先出(FIFO)的原則,即先放入隊列的元素會 先被取出。
同步訪問: Queue類提供了一系列同步方法,如 put()、 get()等,以確保多進 程對隊列的訪問是安全的。
容量限制:隊列可以指定最大容量,當隊列滿時,新元素將無法放入;當隊列空 時,試圖從隊列中獲取元素的進程將阻塞,直到有新元素放入隊列。
生產者-消費者模式: Queue類非常適合用于生產者-消費者模式,其中生產者進 程將數據放入隊列,而消費者進程從隊列中取出數據。
from multiprocessing import Process, Queue
import time
# 生產者函數
def producer(queue):for i in range(5):queue.put(f"Product {i}")print(f"Produced {i}")time.sleep(1)
# 消費者函數
def consumer(queue):while True:product = queue.get()if product is None:breakprint(f"Consumed {product}")if __name__ == '__main__':queue = Queue(5)p = Process(target=producer, args=(queue,))c = Process(target=consumer, args=(queue,))p.start()c.start()p.join()c.join()'''
Produced 0
Consumed Product 0
Produced 1
Consumed Product 1
Produced 2
Consumed Product 2
Produced 3
Consumed Product 3
Produced 4
Consumed Product 4
.......
'''
4.2.5、注意事項
避免全局共享: 不要在多個進程間共享同一個隊列實例,而應該為每個進程創建 單獨的隊列實例。
設置隊列大小: 如果沒有指定隊列的大小,它將默認為無限大小。這可能導致內 存問題,特別是當生產者產生消息的速度遠大于消費者消費消息的速度時。
處理隊列異常: 當隊列操作失敗時(如隊列已滿或為空),應當捕獲并處理相應 的異常。
隊列性能: 隊列操作可能會影響性能,尤其是在高并發環境下。需要根據應用需 求選擇合適的隊列實現和大小。
4.3、共享內存
共享內存是一種進程間通信(IPC)機制,顧名思義,它允許多個進程訪問同一塊內 存空間。每個進程都可以讀取或寫入這塊內存,從而實現數據的共享。
4.3.1、創建共享內存
共享內存分為兩種,一種是共享一個變量,一種是共享一個數組。
????????在Python中,使用 multiprocessing.Value(type_code, *args, lock=True)來創建一個共享變量,其中type_code表示類型代碼,*args表示初 始化變量的值。lock表示鎖,默認會創建一個鎖用來保護共享變量。如果傳入 False,Value的實例就不會被鎖保護,它將不是進程安全的。
???????? 在Python中,使用 multiprocessing.Array(type_code, size_or_initializer, lock=True)來創建一個共享數組,其中type_code表 示類型代碼,size_or_initializer表示數組的大小或初始化值,如果是一個整數, 則表示數組的長度,且數組將被初始化為0,如果是一組序列,則就是數組的初 始化值,其長度決定數組的長度。lock表示鎖,默認會創建一個鎖用來保護共享 數組。?
from multiprocessing import Value, Array
shared_num = Value('i', 0)
shared_array = Array('i', range(10))
4.3.2、共享內存的方法
value:對于 [:]:對于 Value對象, value屬性用于獲取或設置共享變量的值。
Array對象,可以使用切片操作來獲取或修改數組中的元素。
4.3.3、在進程中使用共享內存
from multiprocessing import Process, Value, Array
def func(shared_num, shared_array):shared_num.value += 1for i in range(len(shared_array)):shared_array[i] += 1if __name__ == '__main__':shared_num = Value('i', 0)shared_array = Array('i', range(10))p = Process(target=func, args=(shared_num, shared_array))p.start()p.join()print(shared_num.value) # 輸出 1print(shared_array[:]) # 輸出 [1, 2, 3, ..., 10]
4.3.4、共享內存的特點
高效的數據共享:共享內存比其他IPC機制(如消息隊列)更高效,因為它避免 了數據的復制。
同步問題:共享內存需要同步機制(如鎖)來防止競態條件。
類型限制:共享內存的數據類型有限,通常只能是基本數據類型。
4.3.5、注意事項
數據同步:在使用共享內存時,應該使用鎖( Lock或 RLock)來同步對共享數 據的訪問,以避免競態條件。
避免死鎖:在使用鎖時,要注意正確的加鎖和解鎖順序,以避免死鎖。
內存管理:共享內存不會自動清理,需要確保所有進程完成對共享內存的操作后,適當地關閉或釋放內存。
安全性:共享內存的使用需要謹慎,因為它可能會被未經授權的進程訪問。
初始化值:在創建共享內存時,應該提供正確的初始化值,以確保數據的一致 性。
五、進程同步
????????在Python中,進程同步是指在多進程環境下協調各個進程對共享資源的訪問,主要 解決的問題是當多個進程并發訪問共享資源時,如何確保任意時刻只有一個進程能夠 訪問該資源,從而避免由于進程間的無序競爭而導致的系統資源沖突,確保系統的穩 定運行。
進程同步通常涉及到以下幾個核心概念:
????????1. 臨界資源是指一段時間內僅允許一個進程訪問的資源,這可能是硬件資源,也可 能是軟件資源如變量、數據、表格、隊列等。
????????2. 臨界區是指訪問臨界資源的那部分代碼。在進入臨界區之前,需要檢查是否可以 訪問臨界資源,以確保資源的互斥訪問。
進程同步的機制應遵循以下規則:
????????1. 空則讓進:如果臨界資源處于空閑狀態,則進程可以進入其臨界區。
????????2. 忙則等待:如果臨界資源正在被使用,則請求訪問的進程需要等待。 常見的進程同步機制包括鎖、信號量、事件、條件變量等。
5.1、鎖
????????鎖(Lock)是一種用于控制多個進程訪問共享資源的機制,鎖的主要目的是防止多個 進程同時訪問共享資源時可能產生的競態條件(Race Condition),確保數據的一致 性和完整性。在Python中,最常用的鎖為互斥鎖(Lock)和遞歸鎖(RLock)。
5.1.1、互斥鎖
????????這是最常見的一種鎖,它確保同一時間只有一個進程可以訪問共享資源。當一個進程 正在使用資源時,它會鎖定該資源,其他進程必須等待鎖被釋放后才能訪問。
????????在 multiprocessing模塊中, Lock對象可以用來確保臨界區代碼的互斥執行。
方法:
????????acquire(blocking=True, timeout=-1):嘗試獲取鎖。如果 blocking為True并且timeout是默認值-1,該方法會阻塞直到鎖被獲取。如果 blocking為False,則立即返回而不阻塞。
????????release():釋放鎖。
from multiprocessing import Queue, Process, current_process,Lock
import timedef task(block,q,acount):while True:block.acquire()money = q.get()if money >= acount:money -= acountprint(f'{current_process().name}==={acount}==={money}')else:print(f'{current_process().name}沒錢了')q.put(money)block.release()breakq.put(money)time.sleep(1)block.release()if __name__ == '__main__':q = Queue()block = Lock()q.put(1000)p1 = Process(target=task, args=(block, q, 100), name='張三')p2 = Process(target=task, args=(block, q, 50), name='李四')p1.start()p2.start()p1.join()p2.join()
'''
張三===100===900
李四===50===850
張三===100===750
李四===50===700
張三===100===600
李四===50===550
張三===100===450
李四===50===400
張三===100===300
李四===50===250
張三===100===150
李四===50===100
張三===100===0
李四沒錢了
張三沒錢了
'''
????????要避免死鎖的出現。在Python中,多進程的死鎖(Deadlock)是指在多 個進程之間,每個進程都在等待其他進程釋放資源,但是這些資源又被其他進程持 有,導致所有進程都無法繼續執行,形成了一種僵持狀態。簡單來說,死鎖是多個進 程因競爭資源而造成的一種互相等待的僵局。
????????死鎖通常發生在以下四個條件同時滿足 時:
????????1. 互斥條件:資源不能被多個進程同時使用或不想被多個進程同時訪問。
????????2. 占有和等待條件:進程至少持有一個資源,并且正在等待獲取額外的資源,而該 資源又被其他進程持有。
????????3. 不可搶占條件:已經分配給進程的資源在該進程完成任務前不能被搶占。
????????4. 循環等待條件:存在一種進程資源的循環等待鏈,每個進程至少持有一個資源, 并等待獲取下一個進程所持有的資源。
????????避免死鎖的出現,可以使用with語句,通過with來幫我們自動釋放鎖,從而可以大幅 度減少死鎖出現的可能。?
from multiprocessing import Lock, Queue, Process, current_process
import time
# 定義生產者函數,它會不斷地向隊列中添加數據
def producer(queue):while True:# 循環5次,每次向隊列中添加'hello'for i in range(6):queue.put('hello')# 生產者進程休眠2秒time.sleep(2)
# 定義消費者函數,它會不斷地從隊列中取出數據
def consumer(queue, lock):while True:# 獲取鎖,確保在消費數據時不會被其他消費者進程打斷,并且在執行完畢后自動釋放鎖with lock:# 消費者進程休眠2秒time.sleep(2)# 檢查隊列是否為空if not queue.empty():# 如果隊列不為空,取出一個數據res = queue.get()# 打印取出的數據print(f'{current_process().name}: {res}')
# 主程序入口
if __name__ == '__main__':# 創建一個隊列,用于生產者和消費者之間傳遞數據queue = Queue(10)# 創建一個互斥鎖,用于保護隊列lock = Lock()# 創建生產者進程producer_process = Process(target=producer, args=(queue,))producer_process.start()# 創建多個消費者進程consumer1_process = Process(target=consumer, args=(queue,lock), name='consumer1')consumer1_process.start()consumer2_process = Process(target=consumer, args=(queue,lock), name='consumer2')consumer2_process.start()consumer3_process = Process(target=consumer, args=(queue,lock), name='consumer3')consumer3_process.start()consumer4_process = Process(target=consumer, args=(queue,lock), name='consumer4')consumer4_process.start()consumer5_process = Process(target=consumer, args=(queue,lock), name='consumer5')consumer5_process.start()# 等待生產者進程完成producer_process.join()# 等待所有消費者進程完成consumer1_process.join()consumer2_process.join()consumer3_process.join()consumer4_process.join()consumer5_process.join()
5.1.2、遞歸鎖
????????遞歸鎖與互斥鎖最大的不同就是它允許同一個進程多次獲取同一把鎖,這意味著如果 一個進程獲取了鎖,它還可以再次獲取鎖而不會導致死鎖。但是該鎖的內部有一個計 數器,每當一個進程獲取到鎖時,計數器就會增加,當進程釋放鎖時,計數器就會減 少。只有當計數器為0時,鎖才會真正釋放,才會允許其他的進程去獲取鎖,其他的 使用和互斥鎖一模一樣。
from multiprocessing import Lock, Queue, Process, current_process, RLock
import time
# 定義生產者函數,它會不斷地向隊列中添加數據
def producer(queue):while True:# 循環5次,每次向隊列中添加'hello'for i in range(6):queue.put('hello')# 生產者進程休眠2秒time.sleep(2)
# 定義消費者函數,它會不斷地從隊列中取出數據
def consumer(queue, lock):while True:# 獲取鎖,確保在消費數據時不會被其他消費者進程打斷,并且在執行完畢后自動釋放鎖with lock:# 消費者進程休眠2秒time.sleep(2)# 檢查隊列是否為空if not queue.empty():# 如果隊列不為空,取出一個數據res = queue.get()# 打印取出的數據print(f'{current_process().name}: {res}')
# 主程序入口
if __name__ == '__main__':# 創建一個隊列,用于生產者和消費者之間傳遞數據queue = Queue(10)# 創建一個互斥鎖,用于保護隊列lock = RLock()# 創建生產者進程producer_process = Process(target=producer, args=(queue,))producer_process.start()# 創建多個消費者進程consumer1_process = Process(target=consumer, args=(queue,lock), name='consumer1')consumer1_process.start()consumer2_process = Process(target=consumer, args=(queue,lock), name='consumer2')consumer2_process.start()consumer3_process = Process(target=consumer, args=(queue,lock), name='consumer3')consumer3_process.start()consumer4_process = Process(target=consumer, args=(queue,lock), name='consumer4')consumer4_process.start()consumer5_process = Process(target=consumer, args=(queue,lock), name='consumer5')consumer5_process.start()# 等待生產者進程完成producer_process.join()# 等待所有消費者進程完成consumer1_process.join()consumer2_process.join()consumer3_process.join()consumer4_process.join()consumer5_process.join()
遞歸鎖的應用場景通常涉及遞歸函數或遞歸操作,這些操作需要在同一時間點多次進 入臨界區。遞歸鎖允許同一個進程多次進入臨界區,而不會導致死鎖。以下是一些常 見的遞歸鎖應用場景:?
?1. 遞歸函數: 遞歸函數可能會在遞歸過程中多次進入相同的代碼塊。如果沒有遞歸鎖,這 些進入可能會相互阻塞,導致死鎖。
2. 遞歸操作: 例如,在文件系統的遍歷操作中,遞歸函數可能會在遞歸過程中多次訪問同 一目錄。
3. 遞歸數據庫操作: 在數據庫中執行遞歸查詢時,可能需要多次訪問同一數據集。
4. 遞歸網絡操作: 在遞歸處理網絡請求時,可能需要多次訪問同一服務器或同一數據。
5. 遞歸圖形處理: 在圖形處理或圖像處理中,遞歸算法可能會在遞歸過程中多次訪問同一像素 或同一圖像區域。
6. 遞歸算法: 在算法實現中,遞歸鎖可以用于確保遞歸算法在遞歸過程中不會因為鎖的競 爭而失敗。
7. 遞歸數據結構訪問: 在遞歸訪問數據結構(如樹或圖)時,遞歸鎖可以確保不會因為遞歸調用而 出現死鎖。
5.2、信號量
????????信號量是一個更高級的同步機制,它內部維護一個計數器,用于控制對共享資源的最 大并發訪問數量。在 multiprocessing模塊中, Semaphore對象用于此類同步。
multiprocessing.Semaphore(value=1): 創建一個信號量對象, 定了初始可用的數量,默認為1。 信號量的方法:
acquire([timeout=None]): 嘗試獲取信號量。如果信號量可用,則其值減一并 立即返回 True。如果信號量不可用,則阻塞直到超時或信號量變為可用。如果 沒有指定 timeout或 timeout為 None,則一直等待直至信號量可用。
release(): 釋放一個信號量,其值加一。如果信號量之前已被阻塞,則會喚醒 一個正在等待的進程。?
import sys
from multiprocessing import Queue, Semaphore, Process, current_process
import multiprocessing
import timedef pro(q):while True:q.put('hello')time.sleep(0.3)def cum1(semaphore,q):while True:with semaphore:if not q.empty():res=q.get()print(f'{current_process().name}===={res}')sys.stdout.flush()def cum2(semaphore,q):while True:with semaphore:if not q.empty():res=q.get()print(f'{current_process().name}===={res}')sys.stdout.flush()def cum3(semaphore,q):while True:with semaphore:if not q.empty():res=q.get()print(f'{current_process().name}===={res}')sys.stdout.flush()if __name__ == '__main__':q = Queue(10)semaphore=Semaphore(3)p = Process(target=pro, args=(q, ), name='生產者')p1 = Process(target=cum1, args=(semaphore, q), name='消費者1')p2 = Process(target=cum2, args=(semaphore, q), name='消費者2')p3 = Process(target=cum3, args=(semaphore, q), name='消費者3')p.start()p1.start()p2.start()p3.start()p.join()p1.join()p2.join()p3.join()
?PV操作是進程同步中的一種基本機制,也被稱為信號量機制。PV操作是由荷蘭計算 機科學家Edsger Dijkstra所提出的,用于解決進程間的同步問題,特別是為了解決臨 界區問題(即多個進程試圖訪問共享資源時的競爭問題)。
PV操作的基本概念 :
????????P操作(測試和等待操作,有時也稱為 wait操作):該操作會檢查信號量的值。 如果信號量的值大于零,則將其減一;如果信號量的值為零,則該進程會被掛 起,直到信號量的值變為正數。
????????V操作(信號操作,有時也稱為 signal或 post操作):該操作會增加信號量的 值。如果信號量的值增加后仍然小于等于零,則會喚醒一個因P操作而被掛起的 進程。
PV操作通常用于解決以下幾種問題:?
????????1.互斥:確保一次只有一個進程能夠進入臨界區。例如,多個進程共享一個文件, 只允許一個進程讀寫文件。
????????2. 同步:協調兩個或多個進程按照某種順序執行。例如,生產者-消費者問題中,生 產者進程生成數據放入緩沖區,消費者進程從緩沖區取出數據處理。
????????3. 死鎖避免:通過合理設計PV操作序列,可以避免出現死鎖情況。
5.3、事件
????????事件是一種簡單的同步機制,允許一個進程通知一個或多個等待的進程某些事件已經 發生,也就是發送一個信號,而其他進程可以根據這個信號做出反應。
????????Event對象的 使用場景:
????????????????一個進程等待另一個進程完成某項任務。
????????????????控制多個進程間的簡單通信。
????????????????實現對共享資源的訪問控制。
Python中使用 multiprocessing.Event創建事件對象,其基本方法有:?
1. is_set():返回事件是否已設置的狀態,如果被設置則返回 False。
2. set():將事件設置為真狀態,即 True,否則返回 True,表示可以喚醒正在等待該事件的所有 進程。
3. clear():將事件設置為假狀態,即 False,表示沒有進程會被喚醒。
4.wait([timeout]):阻塞當前進程直到事件被設置為真狀態或超時(如果提供 了timeout參數)。如果沒有設置超時時間,則會一直等待直到事件被設置。?
import sys
from multiprocessing import Queue, Event, Process, current_process
import multiprocessing
import timedef pro(event):event.set()# 設置事件以通知消費者可以開始消費while True:print('project')time.sleep(1)event.clear()time.sleep(1)def cum1(event):event.wait()# 等待事件被設置while True:time.sleep(1)print('1\n')def cum2(event):event.wait()# 等待事件被設置while True:time.sleep(1)print('2\n')def cum3(event):event.wait()# 等待事件被設置while True:time.sleep(1)print('3\n')if __name__ == '__main__':q = Queue(10)event=Event()p = Process(target=pro, args=(event, ), name='生產者')p1 = Process(target=cum1, args=(event, ), name='消費者1')p2 = Process(target=cum2, args=(event, ), name='消費者2')p3 = Process(target=cum3, args=(event, ), name='消費者3')p.start()p1.start()p2.start()p3.start()p.join()p1.join()p2.join()p3.join()
5.4、條件變量
????????條件變量(Condition Variables)是用于協調多個進程的一種機制,它通常與鎖一起 使用來實現更復雜的同步模式。Python的 multiprocessing模塊提供了 類,用于支持多進程間的同步。 Condition Condition對象可以看作是一個鎖加上一個或多個 條件隊列的組合。當一個進程持有鎖時,它可以等待特定的條件滿足,或者通知其他 等待該條件的進程繼續執行。
????????使用multiprocessiong.Condition(lock=None)來創建一個條件變量對象,其中lock 可以替換為自己指定的Lock或RLock對象,如果lock為None,則它自己創建一個新 的RLock對象并使用。
????????Condition對象的基本方法:
????????????????1. acquire():獲取內部鎖。如果無法立即獲取鎖,則調用者會阻塞直到鎖可用。
????????????????2. release():釋放內部鎖。
????????????????3. wait():釋放內部鎖,并使調用進程阻塞,直到接收到通知。當被喚醒時,它會 嘗試重新獲取鎖。
???????????????? 4. wait_for(predicate,timeout=None):釋放內部鎖,并使調用進程阻塞。 predicate 應該是一個可調用對象而且它的返回值可被解釋為一個布爾值,如果為 True,則進程被喚醒,False就繼續等待。 timeout 參數給出最大等待時間。
????????????????5. notify(n=1):喚醒一個或多個正在等待的進程。參數n指定了要喚醒的等待進程 的數量,默認為1。 6. notify_all():喚醒所有正在等待的進程。?
5.4.1、默認鎖
from multiprocessing import Condition, Process
import time
def producer(condition):while True:condition.acquire()condition.notify_all() # 通知所有等待的消費者condition.release()time.sleep(3) # 模擬生產者的工作周期
def consumer(condition, number):while True:condition.acquire()print(f'{number}正在等待condition')condition.wait() # 等待條件被滿足print(f'{number}已釋放condition')
# 證明condition自帶的是RLock,而不是Lock
# condition.release()
# 這里如果是使用默認的RLock的話,如果不去release,不會導致只有一個進程一直去執行,原因如下:
'''
因為在Condition中,如果鎖是RLock的話,wait在釋放鎖時并不是使用的
release,而是使用RLock的內部接口,
即使遞歸鎖被多次獲取也可以直接去令計數器歸零直接解鎖,這樣就可以讓其他
的進程去獲取到這個鎖,所以如果Condition
的鎖是RLock的話,即使在wait下面沒有release也不會出現只有一個進程不
斷獲取鎖的情況。如果是Lock的話,
在wait下面沒有release就會導致死鎖。
'''
if __name__ == '__main__':condition = Condition()producer_process = Process(target=producer, args=(condition,))producer_process.start()consumers = [Process(target=consumer, args=(condition, 1)),Process(target=consumer, args=(condition, 2)),Process(target=consumer, args=(condition, 3)),Process(target=consumer, args=(condition, 4)),Process(target=consumer, args=(condition, 5))]for c in consumers:c.start()producer_process.join()for c in consumers:c.join()
5.4.2、with語句
from multiprocessing import Condition, Process
import time
def producer(condition):while True:with condition:condition.notify_all() # 通知所有等待的消費者time.sleep(3) # 模擬生產者的工作周期
def consumer(condition, number):while True:with condition:print(f'{number}正在等待condition')condition.wait() # 等待條件被滿足print(f'{number}已釋放condition')
if __name__ == '__main__':condition = Condition()producer_process = Process(target=producer, args=(condition,))producer_process.start()consumers = [Process(target=consumer, args=(condition, 1)),Process(target=consumer, args=(condition, 2)),Process(target=consumer, args=(condition, 3)),Process(target=consumer, args=(condition, 4)),Process(target=consumer, args=(condition, 5))]for c in consumers:c.start()producer_process.join()for c in consumers:c.join()
5.4.3、自定義鎖
from multiprocessing import Condition, Lock, Process
import time
def producer(condition):while True:with condition:condition.notify_all() # 通知所有等待的消費者time.sleep(3) # 模擬生產者的工作周期
def consumer(condition, number):while True:with condition:print(f'{number}搶到了鎖,正在等待condition')condition.wait() # 等待條件被滿足print(f'condition已經觸發,{number}釋放了鎖,')
if __name__ == '__main__':lock = Lock()condition = Condition(lock=lock)producer_process = Process(target=producer, args=(condition,))producer_process.start()consumers = [Process(target=consumer, args=(condition, 1)),Process(target=consumer, args=(condition, 2)),Process(target=consumer, args=(condition, 3)),Process(target=consumer, args=(condition, 4)),Process(target=consumer, args=(condition, 5))]for c in consumers:c.start()producer_process.join()for c in consumers:c.join()
六、進程池
6.1、進程池的介紹
????????進程池是一組預先創建的空閑進程,它們等待執行任務,主進程負責將任務分配給進 程池中的空閑進程去執行。進程池可以管理進程的創建和銷毀,避免了頻繁地創建和 銷毀進程帶來的開銷,通過進程池可以輕松的實現多任務的并行處理。
效率:相比于手動創建和管理多個進程,使用進程池可以更高效地利用系統資 源。
簡化:進程池簡化了并行編程的復雜性,開發者不需要關注進程的創建和銷毀細 節。
控制:可以限制同時運行的進程數量,防止系統資源被過度消耗。
6.2、進程池的創建
6.2.1、使用multiprocessing庫
from multiprocessing import Pool
Pool(processes=None,initializer=None,initargs= (),maxtasksperchild=None)
processes:進程池中的進程數。如果 processes為 None,則默認使用系統的 處理器核心數。
initializer:每個工作進程啟動時要執行的可調用對象,默認為None。如果是 None,則調用initializer(*initargs)。
initargs:傳遞給 initializer的可變參數元組。
maxtasksperchild:工作進程退出之前可以完成的任務數,完成后用一個新的 工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild默認是 None,意味著只要Pool存在工作進程就會一直存活。?
返回對象的主要方法為:
apply(func, args=()):在一個池工作進程中執行func(args,*kwargs),然后返回 結果。需要強調的是:此操作并不會在所有池工作進程中并執行func函數。如果要 通過不同參數并發地執行func函數,必須從不同線程調用p.apply()函數或者使用 p.apply_async。它是阻塞的。apply很少使用。
apply_async(func, args=(), kwds={}, callback=None):異步地執行函數 func。 數字典。 args是傳遞給 func的位置參數元組, kwds是傳遞給 callback是一個回調函數,當 func的關鍵字參 func執行完成后會被調用。
map(func, iterable, chunksize=None):將 iterable中的每個元素作為參數 傳遞給func,并返回結果列表。此方法類似于內置函數 map,但是它是并行的。
map_async(func, iterable, chunksize=None, callback=None):與 map類 似,但是是異步的。 chunksize指定每次分配給進程的迭代器元素數量。
imap(func, iterable, chunksize):imap 與 map的區別是,map是當所有的 進程都已經執行完了,再返回結果,imap()則是立即返回一個iterable可迭代對 象。
imap_unordered(func, iterable, chunksize):與imap相同,只不過不保證返 回的結果順序與進程添加的順序一致。
close():阻止任何新的任務被提交到池中。一旦所有任務完成,工作進程會退 出。
terminate():立即終止所有工作進程,不再處理未處理的任務。
join():等待所有工作進程退出。必須在close()或 terminate()之后調用。?
6.2.1.1、apply函數和apply_async函數
同步操作:
????????在同步操作中,調用者必須等待直到被調用的操作完成才能繼續執行下一步。簡單來 說,就是按照順序一步一步地執行,每一步都必須等待前一步完成后才能開始。例 如,在同步函數調用中,主調函數會等待被調用的函數返回結果后才會繼續執行下面 的代碼。?
特點:
????????順序執行。
????????阻塞等待。
????????通常更易于理解和實現。
????????可能導致性能瓶頸,特別是在需要等待長時間操作時。
異步操作:?
????????在異步操作中,調用者不需要等待被調用的操作完成就可以繼續執行其他任務。當被 調用的操作完成時,會通過回調函數、事件通知等方式告知調用者。這種方式允許程 序同時處理多個任務,提高了效率和響應速度。?
特點:?
????????并行或多任務執行。
????????不阻塞等待。
????????可以提高系統的整體吞吐量。
????????實現相對復雜,需要處理回調或其他機制來管理非阻塞操作。
apply函數:
import time from multiprocessing import Pool def func(n):while True:if n <= 1:breakprint(n)n -= 1time.sleep(1) def test1():print(123) def test2():print(456) if __name__ == "__main__":with Pool(processes=4) as pool: # 使用單個進程進行同步計算result = pool.apply(func, (5,))pool.apply(test1)pool.apply(test2)''' 5 4 3 2 123 456 '''
apply_async函數:
from multiprocessing import Pool # 定義計算斐波那契數列的函數 def fibonacci(n):if n <= 1:return nelse:return fibonacci(n-1) + fibonacci(n-2) # 定義一個回調函數,用于處理計算結果 def handle_result(result):print(f"Fibonacci result: {result}") if __name__ == "__main__":# 創建一個進程池with Pool(processes=4) as pool:# 定義要計算的斐波那契數列的索引列表fib_indices = [35, 36, 37, 38] # 這些數字較大,計算可能需要一些時間# 使用apply_async異步計算斐波那契數列的值,并設置回調函數results = []for index in fib_indices:result = pool.apply_async(fibonacci, (index,),callback=handle_result)results.append(result)pool.close()pool.join() ''' Fibonacci result: 9227465 Fibonacci result: 14930352 Fibonacci result: 24157817 Fibonacci result: 39088169 '''
方法apply_async()和map_async()的返回值都具有以下的方法:
????????1. get(timeout):用于獲取執行結果。如果timeout不是None且在timeout秒內仍 然沒有執行完得到結果,就會拋出異常。
????????2. wait(timeout):阻塞,直到返回結果,或在timeout秒后超時。
????????3. ready():返回執行狀態,是否已經完成任務。
????????4. succesful():判斷調用是否已經完成并且未引發異常。如果還沒獲得結果就拋出 異常。?
6.2.1.2、map函數和map_async函數
map函數:
from multiprocessing import Pool def square(x):"""計算給定數字的平方"""return x * x if __name__ == "__main__":# 創建一個進程池with Pool(processes=4) as pool:# 定義要計算的數字列表numbers = [1, 2, 3, 4, 5]# 使用map同步計算每個數字的平方results = pool.map(square, numbers)print("Results:", results)#Results: [1, 4, 9, 16, 25]
map_async函數:
import time from multiprocessing import Pool def square(x):"""計算給定數字的平方"""return x * x if __name__ == "__main__":# 創建一個進程池with Pool(processes=4) as pool:# 定義要計算的數字列表numbers = [1, 2, 3, 4, 5]# 使用map_async異步計算每個數字的平方async_result = pool.map_async(square, numbers)# 主進程可以繼續執行其他任務print("Main process continues to run in parallel.")# 等待所有子進程完成pool.close()pool.join()# 獲取異步計算的結果results = async_result.get()print("Results:", results) # Main process continues to run in parallel. # Results: [1, 4, 9, 16, 25]
6.2.2、使用concurrent.futures庫
在Python中,使用concurrent.futures的ProcessPoolExecutor創建進程池。
from concurrent.futures import ProcessPoolExecutor
ProcessPoolExecutor(max_workers=None, mp_context=None,initializer=None, initargs=())
max_workers: 指定進程池中可以同時運行的最大進程數。如果設置為 None 或 未指定,則默認為機器的處理器數量,最多為61。
mp_context: 指定多進程上下文。默認情況下, multiprocessing.get_context() 來獲取上下文。這允許你選擇不同的上下文,例如 fork、spawn、 forkserver 等,這些上下文可能提供不同的功能, 如更好的資源隔離、更好的安全性等。
initializer: 一個可選的可調用對象,每個工作進程在啟動時都會調用它。這 可以用來執行進程的初始化操作,例如設置進程局部存儲。
initargs: 一個元組,其中包含傳遞給 initializer 的參數。?
其主要的方法為:
submit(fn, *args, **kwargs): 提交一個可調用對象 fn 到進程池,并返回一 個 Future 對象,該對象的result方法可以用來獲取結果。
map(func, *iterables, timeout=None, chunksize=1): 它允許你將一個函數 func 應用于多個可迭代對象 iterables 中的元素,并且并行地在多個進程 上執行這些函數調用。 timeout是可選參數,用于設置阻塞等待每個任務完成的 最大秒數。?chunksize是可選參數,指定每次提交 給進程池的任務數量。一個較大的 chunksize 可以減少進程間通信的開銷,但 它也會增加內存消耗,因為它會保存更多的任務結果。返回的結果是一個迭代 器。
shutdown(wait=True): 等待所有進程完成當前任務后關閉進程池。如果 參數設置為 True,進程池會等待所有任務完成;如果設置為 wait False,進程池會 立即返回,不等待任務完成。使用with語句管理時,with語句結束時會自動調用 shutdown。?
6.2.2.1、submit的應用
from concurrent.futures import ProcessPoolExecutor
import time
# 定義一個簡單的函數,用于執行一個計算密集型的任務
def compute_square(n):return n * n
if __name__ == '__main__':numbers = [1, 2, 3 ,4, 5]results = []# 使用with語句創建ProcessPoolExecutor實例with ProcessPoolExecutor(max_workers=3) as executor:for num in numbers:# 提交計算任務到進程池future = executor.submit(compute_square, num)results.append(future)print([res.result() for res in results])#[1, 4, 9, 16, 25]
6.2.2.2、map的應用
from concurrent.futures import ProcessPoolExecutor
import time
# 定義一個簡單的函數,計算一個數字的平方
def square(number):return number * number
if __name__ == '__main__':# 數字列表numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]# 使用with語句創建ProcessPoolExecutor實例with ProcessPoolExecutor(max_workers=3) as executor:# 使用map方法并行執行squareresults = executor.map(square, numbers)print(list(results))#[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
6.3、手動創建與進程池的比較
6.3.1、手動創建多進程
優點:
????????1. 靈活性:手動創建進程可以更精確地控制每個進程的創建和銷毀,可以針對特定 任務定制進程的行為。
????????2. 直接控制:可以直接與進程對象交互,例如,可以設置進程的名稱、守護狀態 等。
????????3. 簡單的并行結構:對于簡單的并行任務,手動創建進程可以更加直觀。
缺點:
????????1. 資源管理:需要手動管理進程的生命周期,包括創建、銷毀和異常處理。
????????2. 開銷大:進程的創建和銷毀開銷較大,如果頻繁創建和銷毀大量進程,可能會導 致性能問題。
????????3. 同步問題:需要手動處理進程間同步(例如,使用鎖、信號量等)。
6.3.2、進程池創建多進程
優點:
????????1. 高效管理:進程池會管理進程的生命周期,包括進程的創建和銷毀,減少了開 銷。
????????2. 資源限制:可以限制同時運行的進程數量,防止系統資源被過度消耗。
????????3. 簡化并行:簡化了并行任務的處理,不需要手動創建和銷毀進程。
????????4. 任務分發:可以使用 apply, apply_async,map, map_async等函數分發任務到進程池。
缺點:
????????1. 靈活性較低:與手動創建進程相比,進程池提供的是一種更通用的解決方案,可 能不適合所有特定場景。
????????2. 可能造成阻塞:如果進程池中的所有進程都在忙,而又有新的任務需要執行,那 么這些任務可能會被阻塞,直到有進程可用。