創建進程
os.fork()
該方法只能在linux和mac os中使用,因為其主要基于系統的fork來實現。window中沒有這個方法。
通過os.fork()方法會創建一個子進程,子進程的程序集為該語句下方的所有語句。
import os??print("主進程的PID為:" , os.getpid())w = 1pid = os.fork() # 創建子進程print('fork方法的返回值為: ', pid)if pid == 0:print(f'子進程PID: {os.getpid()}, 主進程PID: {os.getppid()}, 子進程中w: {w}')else:print(f'主進程PID: {os.getpid()}, 主進程PID: {os.getppid()}, 子進程中w: {w}')
multiprocessing(target為函數)
通過multiprocessing模塊中的Process類創建一個進程實例對象,并通過其start方法啟動該進程。
進程中的程序集為Process類的target參數,可以是一個函數也可以是一個方法。
需要注意的是windows系統中創建進程的過程需要放在if __name__== "__main__"
代碼塊中,因為其實現數據集的復制時,是通過import語句實現。
而在Linux和MacOS系統下則不需要,因為他們原生支持fork方法。
import multiprocessingimport osimport time?def task():for i in range(3):print("wating... 當前pid為 : ", os.getpid(), "父進程為: ", os.getppid())time.sleep(1)???if __name__ == "__main__":print('主進程pid:', os.getpid())?process = multiprocessing.Process(target=task)process.start()
multiprocessing(taget為方法)
創建Process實例對象的target參數,不僅可以是函數名,還可以是類的方法名。
- 如果需要往target中傳入參數,可以通過args和kwargs兩個參數進行相應的傳參
import multiprocessingimport time?class Tasks():def task1(self):time.sleep(1)print('task1')?def task2(self):time.sleep(1)print('task2')?if __name__ == "__main__":t = Tasks()process1 = multiprocessing.Process(target=t.task1)process1.start()?process2 = multiprocessing.Process(target=t.task2)process2.start()
繼承Process類
通過繼承multiprocessing.Process
類,并重寫其中的run方法。
- 必須要重寫
run
方法,Process子類的實例對象的run方法,就是進程執行的程序
import timefrom multiprocessing import Process??class MyProcess(Process):def __init__(self, i):super().__init__()self.name = str(i)?def run(self):time.sleep(2)print(f'子進程-{self.name}')??if __name__ == '__main__':for i in range(5):p = MyProcess(i)p.start()?print('主進程')
進程阻塞
目前系統一般都是多核的,當處理多任務時,一般都以并發或并行的方式處理多任務。所以系統一般以異步的方式處理多進程。
在Process
的實例方法中,通過join
方法表示進程阻塞時,主將處于等待狀態,并不會處理其他進程。
單進程阻塞
針對每個進程開啟后立馬啟用join方法,這種方法效率低下。使得系統的處理方式編程同步阻塞
,使得主進程依次處理子進程。
import timefrom multiprocessing import Process?def eat():time.sleep(2)print('eat')?def drink():time.sleep(2)print('drink')?if __name__ == '__main__':process1 = Process(target=eat)process1.start()process1.join()?process2 = Process(target=drink)process2.start()process2.join()??print('主進程')
多進程阻塞
先利用start方法將多個進程同時創建并啟動,然后在創建完成后統一阻塞進程。
- 統一創建進程,并讓其統一運行
- 統一等待進程結束,避免每個進程都等一段時間
import timefrom multiprocessing import Process?def eat():time.sleep(2)print('eat')?def drink():time.sleep(2)print('drink')???if __name__ == '__main__':process1 = Process(target=eat)process1.start()?process2 = Process(target=drink)process2.start()?for p in [process1, process2]:p.join()?print('主進程')
進程鎖
當多進程編輯同一文件或數據時,往往會導致數據不一致問題,針對這種情況,需要在進程中對處理文件或數據的代碼前后進行加鎖和解鎖操作。
如果沒有鎖,會導致數據的不一致
import time, jsonfrom multiprocessing import Process??def read_ticket(user):with open('ticket.txt') as f:num = json.load(f)['ticket']time.sleep(1)print(f'User {user}: 當前剩余{num}張票')return num??def order_ticket(user, num):time.sleep(1)num -= 1with open('ticket.txt', 'w') as f:json.dump({'ticket': num}, f)print(f'User {user}: 購票成功')??def ticket(user):num = read_ticket(user)if num > 0:order_ticket(user, num)else:print(f'User {user}: 購票失敗')??if __name__ == '__main__':queue = []for i in range(5):p = Process(target=ticket, args=(i,))p.start()queue.append(p)for q in queue:q.join()?print('運行結束')
加鎖/解鎖
在編輯數據的之前通過acquire
方法加鎖,當數據編輯完成后,通過release
方法解鎖。
- 在主進程中創建一個鎖對象
- 然后在每個修改共同數據的進程中傳入已經創建的鎖對象
- 在修改數據的代碼前后分別加鎖和解鎖
"""@Time: 2024/6/28 20:18@Author: 'Ethan'@Email: ethanzhou4406@outlook.com@File: 1. 同步阻塞.py@Project: python@Feature:"""import time, jsonfrom multiprocessing import Process, Lock??def read_ticket(user):with open('ticket.txt') as f:num = json.load(f)['ticket']time.sleep(0.1)print(f'User {user}: 當前剩余{num}張票')??def order_ticket(user):time.sleep(0.1)with open('ticket.txt') as f:num = json.load(f)['ticket']if num > 0:with open('ticket.txt', 'w') as f:num -= 1json.dump({'ticket': num}, f)print(f'User {user}: 購票成功')else:print(f'User {user}: 購票失敗')??def ticket(user,lock):read_ticket(user)lock.acquire()order_ticket(user)lock.release()??if __name__ == '__main__':lock = Lock()queue = []for i in range(5):p = Process(target=ticket, args=(i, lock))p.start()queue.append(p)for q in queue:q.join()?print('運行結束')?
鎖的上下文管理器
如果在代碼加鎖后,解鎖前,代碼出現了異常就會導致進程沒有來得及解鎖,而導致死鎖現象。通過鎖的上下文管理器語法,可以有效避免這種情況的發生。
import time, jsonfrom multiprocessing import Process, Lock??def read_ticket(user):with open('ticket.txt') as f:num = json.load(f)['ticket']time.sleep(0.1)print(f'User {user}: 當前剩余{num}張票')??def order_ticket(user):time.sleep(0.1)with open('ticket.txt') as f:num = json.load(f)['ticket']if num > 0:with open('ticket.txt', 'w') as f:num -= 1json.dump({'ticket': num}, f)print(f'User {user}: 購票成功')else:print(f'User {user}: 購票失敗')??def ticket(user,lock):read_ticket(user)with lock:order_ticket(user)??if __name__ == '__main__':lock = Lock()queue = []for i in range(5):p = Process(target=ticket, args=(i, lock))p.start()queue.append(p)for q in queue:q.join()?print('運行結束')?
進程間通信
進程之間可以進行通信,主要是通過各個進程之中傳入一個公共的溝通工具,所有的進程都通過這個工具進行溝通。multiprocessing中提供了兩種進程間溝通的工具Queue
和Pipe
Queue方式
Queue是基于文件傳輸的socket通信方式,并且它是帶鎖機制的。它的數據主要的特點是先進先出,后進后出。
當一個對象被放入一個隊列中時,這個對象首先會被一個后臺線程用pickle
序列化,并將序列化后的數據通過一個底層管道的管道傳遞給隊列中。
主要使用如下方法:
- qsize(): 返回隊列的大致的長度。返回的值由于多線程或多進程的上下文而變得不可靠
- empty(): 隊列為空返回True,否則返回False。返回的值由于多線程或多進程的上下文而變得不可靠
- full(): 隊列滿了返回True,否則返回False。返回的值由于多線程或多進程的上下文而變得不可靠
- put(obj[, block[, timeout]]): 將obj放入隊列。
- 如果block為True(默認值)而且timeout是None(默認值),將會阻塞當前進程,直到有空的緩沖槽。
- 如果timeout是正數,將會阻塞了最多timeout秒之后還是沒有可用的緩沖槽時拋出
queue.Full
異常 - 反之block為False,僅當有可用緩沖槽時才放入對象,否則拋出
queue.Full
異常(這種情況下timeout參數會被忽略)
- get([block[, timeout]]): 從隊列中取出并返回對象。如果可選參數block是True而且timeout是None,將會阻塞當前進程,直到隊列中出現可用對象。如果timeout是正數,將會阻塞了最多timeout秒之后還是沒有可用的對象時拋出
queue.Empty
異常。- 反之,block是False時,僅當有可用對象能夠取出時返回,否則拋出
queue.Empty
異常(這種情況下timeout參數會被忽略)
- 反之,block是False時,僅當有可用對象能夠取出時返回,否則拋出
import time, jsonfrom multiprocessing import Process, Queue??def task(i, queue: Queue):time.sleep(1)queue.put(i)print(f'task {i}, 入列')??if __name__ == '__main__':queue = Queue()process_queue = []for i in range(5):p = Process(target=task, args=(i, queue))p.start()process_queue.append(p)for p in process_queue:p.join()?for i in range(5):print(f'主進程中消費隊列內容{queue.get()}')?print('運行結束')?
Pipe方式
Pipe方式是進程之間通信的另一種方式和Queue不同之處在于,它不帶鎖,且信息順序無法得到保障。
主要的使用方法:
- send(obj): 將一個對象發送到鏈接的另一端,可以用
recv()
讀取,發送的對象必須是可序列化的,多大的對象(接近32MiB)可能引發ValueError
異常 - recv(): 返回一個由另一端使用
send()
發送的對象,該方法會一直阻塞直到接收到對象。如果對端關閉了鏈接或者沒有東西可接收,將拋出EOFError
異常
import timefrom multiprocessing import Process, Pipefrom multiprocessing.connection import Connection??def task(pipe:Connection):print('子進程往管道里加了內容')time.sleep(1)pipe.send("子進程往管道中加了點東西")??if __name__ == '__main__':pipe1, pipe2 = Pipe()p = Process(target=task, args=(pipe1,))p.start()p.join()print('主進程中獲取管道內的內容為:', pipe2.recv())print('運行結束')