Python基礎之多進程

文章目錄

  • 1 多進程
    • 1.1 簡介
    • 1.2 Linux下多進程
    • 1.3 multiprocessing
    • 1.4 Pool
    • 1.5 進程間通信
    • 1.6 分布式進程

1 多進程

1.1 簡介

要讓Python程序實現多進程(multiprocessing),我們先了解操作系統的相關知識。
Unix/Linux操作系統提供了一個fork()系統調用,它非常特殊。普通的函數調用,調用一次,返回一次,但是fork()調用一次,返回兩次,因為操作系統自動把當前進程(稱為父進程)復制了一份(稱為子進程),然后,分別在父進程和子進程內返回。

子進程永遠返回0,而父進程返回子進程的ID。這樣做的理由是,一個父進程可以fork出很多子進程,所以,父進程要記下每個子進程的ID,而子進程只需要調用getppid()就可以拿到父進程的ID。

1.2 Linux下多進程

Python的os模塊封裝了常見的系統調用,其中就包括fork,可以在Python程序中輕松創建子進程:

# multiprocessing.py
import osprint ('Process (%s) start...' % os.getpid())
pid = os.fork()
if pid==0:print ('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
運行結果如下:Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

由于Windows沒有fork調用,上面的代碼在Windows上無法運行。由于Mac系統是基于BSD(Unix的一種)內核,所以,在Mac下運行是沒有問題的

有了fork調用,一個進程在接到新任務時就可以復制出一個子進程來處理新任務,常見的Apache服務器就是由父進程監聽端口,每當有新的http請求時,就fork出子進程來處理新的http請求。

1.3 multiprocessing

如果打算編寫多進程的服務程序,Unix/Linux無疑是正確的選擇。由于Windows沒有fork調用,難道在Windows上無法用Python編寫多進程的程序?
由于Python是跨平臺的,自然也應該提供一個跨平臺的多進程支持。multiprocessing模塊就是跨平臺版本的多進程模塊。

multiprocessing模塊提供了一個Process類來代表一個進程對象,下面的例子演示了啟動一個子進程并等待其結束:

from multiprocessing import Process
import os# 子進程要執行的代碼
def run_proc(name):print ('Run child process %s (%s)...' % (name, os.getpid()))if __name__=='__main__':print ('Parent process %s.' % os.getpid())p = Process(target=run_proc, args=('test',))print ('Process will start.')p.start()p.join()print 'Process end.'執行結果如下:
Parent process 928.
Process will start.
Run child process test (929)...
Process end.

創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,用start()方法啟動,這樣創建進程比fork()還要簡單。
join()方法可以等待子進程結束后再繼續往下運行,通常用于進程間的同步。

1.4 Pool

如果要啟動大量的子進程,可以用進程池的方式批量創建子進程:

from multiprocessing import Pool
import os, time, random
def long_time_task(name):print ('Run task %s (%s)...' % (name, os.getpid()))start = time.time()time.sleep(random.random() * 3)end = time.time()print ('Task %s runs %0.2f seconds.' % (name, (end - start)))if __name__=='__main__':print 'Parent process %s.' % os.getpid()p = Pool()for i in range(5):p.apply_async(long_time_task, args=(i,))print ('Waiting for all subprocesses done...')p.close()p.join()print 'All subprocesses done.'執行結果如下:
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

代碼解讀:

  • 對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之后就不能繼續添加新的Process了。
  • 注意輸出的結果,task 0,1,2,3是立刻執行的,而task 4要等待前面某個task完成后才執行,這是因為Pool的默認大小在電腦上是4,因此,最多同時執行4個進程。這是Pool有意設計的限制,并不是操作系統的限制。如果改成:p = Pool(5),就可以同時跑5個進程。
  • 由于Pool的默認大小是CPU的核數,如果擁有8核CPU,那么要提交至少9個子進程才能看到上面的等待效果。

1.5 進程間通信

Process之間肯定是需要通信的,操作系統提供了很多機制來實現進程間的通信。Pythonmultiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等多種方式來交換數據。

我們以 Queue 為例,在父進程中創建兩個子進程,一個往Queue里寫數據,一個從Queue里讀數據:

from multiprocessing import Process, Queue
import os, time, random# 寫數據進程執行的代碼:
def write(q):for value in ['A', 'B', 'C']:print ('Put %s to queue...' % value)q.put(value)time.sleep(random.random())
# 讀數據進程執行的代碼:
def read(q):while True:value = q.get(True)print ('Get %s from queue.' % value)if __name__=='__main__':# 父進程創建Queue,并傳給各個子進程:q = Queue()pw = Process(target=write, args=(q,))pr = Process(target=read, args=(q,))# 啟動子進程pw,寫入:pw.start()# 啟動子進程pr,讀取:pr.start()# 等待pw結束:pw.join()# pr進程里是死循環,無法等待其結束,只能強行終止:pr.terminate()運行結果如下:Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

Unix/Linux下,multiprocessing模塊封裝了fork()調用,使我們不需要關注fork()的細節。由于Windows沒有fork調用,因此,multiprocessing需要“模擬”出fork的效果,父進程所有Python對象都必須通過pickle序列化再傳到子進程去,所以,如果multiprocessingWindows下調用失敗了,要先考慮是不是pickle失敗了。

1.6 分布式進程

Pythonmultiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網絡通信。

舉個例子:如果我們已經有一個通過Queue通信的多進程程序在同一臺機器上運行,現在,由于處理任務的進程任務繁重,希望把發送任務的進程和處理任務的進程分布到兩臺機器上。怎么用分布式進程實現?
原有的Queue可以繼續使用,但是,通過managers模塊把Queue通過網絡暴露出去,就可以讓其他機器的進程訪問Queue了。
我們先看服務進程,服務進程負責啟動Queue,把Queue注冊到網絡上,然后往Queue里面寫入任務:

# taskmanager.pyimport random, time, Queue
from multiprocessing.managers import BaseManager# 發送任務的隊列:
task_queue = Queue.Queue()
# 接收結果的隊列:
result_queue = Queue.Queue()# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):pass# 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 綁定端口5000, 設置驗證碼'abc':
manager = QueueManager(address=('', 5000), authkey='abc')
# 啟動Queue:
manager.start()
# 獲得通過網絡訪問的Queue對象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個任務進去:
for i in range(10):n = random.randint(0, 10000)print('Put task %d...' % n)task.put(n)
# 從result隊列讀取結果:
print('Try get results...')
for i in range(10):r = result.get(timeout=10)print('Result: %s' % r)
# 關閉:
manager.shutdown()

請注意,當我們在一臺機器上寫多進程程序時,創建的Queue可以直接拿來用,但是,在分布式多進程環境下,添加任務到Queue不可以直接對原始的task_queue進行操作,那樣就繞過了QueueManager的封裝,必須通過manager.get_task_queue()獲得的Queue接口添加。
然后,在另一臺機器上啟動任務進程(本機上啟動也可以):

# taskworker.pyimport time, sys, Queue
from multiprocessing.managers import BaseManager# 創建類似的QueueManager:
class QueueManager(BaseManager):pass# 由于這個QueueManager只從網絡上獲取Queue,所以注冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')# 連接到服務器,也就是運行taskmanager.py的機器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證碼注意保持與taskmanager.py設置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey='abc')
# 從網絡連接:
m.connect()
# 獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊列取任務,并把結果寫入result隊列:
for i in range(10):try:n = task.get(timeout=1)print('run task %d * %d...' % (n, n))r = '%d * %d = %d' % (n, n, n*n)time.sleep(1)result.put(r)except Queue.Empty:print('task queue is empty.')
# 處理結束:
print('worker exit.')

任務進程要通過網絡連接到服務進程,所以要指定服務進程的IP。

現在,可以試試分布式進程的工作效果了。先啟動taskmanager.py服務進程:

$ python taskmanager.py
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...
taskmanager進程發送完任務后,開始等待result隊列的結果。現在啟動taskworker.py進程:$ python taskworker.py 127.0.0.1
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.

taskworker進程結束,在taskmanager進程中會繼續打印出結果:

Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

這個簡單的Manager/Worker模型有什么用?其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個worker,就可以把任務分布到幾臺甚至幾十臺機器上,比如把計算n*n的代碼換成發送郵件,就實現了郵件隊列的異步發送。

Queue對象存儲在哪?注意到taskworker.py中根本沒有創建Queue的代碼,所以,Queue對象存儲在taskmanager.py進程中:
在這里插入圖片描述
Queue之所以能通過網絡訪問,就是通過QueueManager實現的。由于QueueManager管理的不止一個Queue,所以,要給每個Queue的網絡調用接口起個名字,比如get_task_queue

authkey:是為了保證兩臺機器正常通信,不被其他機器惡意干擾。如果taskworker.py的authkey和taskmanager.py的authkey不一致,肯定連接不上。

注意Queue的作用是用來傳遞任務接收結果,每個任務的描述數據量要盡量小。比如發送一個處理日志文件的任務,就不要發送幾百兆的日志文件本身,而是發送日志文件存放的完整路徑,由Worker進程再去共享的磁盤上讀取文件。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/38670.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/38670.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/38670.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

豆包文科成績超了一本線,為什么理科不行?

卡奧斯智能交互引擎是卡奧斯基于海爾近40年工業生產經驗積累和卡奧斯7年工業互聯網平臺建設的最佳實踐,基于大語言模型和RAG技術,集合海量工業領域生態資源方優質產品和知識服務,旨在通過智能搜索、連續交互,實時生成個性化的內容…

使用Java構建可擴展的微服務架構

使用Java構建可擴展的微服務架構 大家好,我是免費搭建查券返利機器人省錢賺傭金就用微賺淘客系統3.0的小編,也是冬天不穿秋褲,天冷也要風度的程序猿!今天我們將探討如何使用Java構建可擴展的微服務架構,這是現代軟件開…

Java - 程序員面試筆記記錄 實現 - Part2

2.1 輸入輸出流 流可以被看作一組有序的字節集合,即數據在兩個設備間的傳輸。 字節流:以字節作為單位,讀到一個字節就返回一個字節;InputStream & OutputStream。 字符流:使用字節流讀到一個到多個字節先查詢碼…

【Invalid mapping pattern】SpringMVC路徑匹配

報錯: Description:Invalid mapping pattern detected: /**/{[path:[^.]] ^ No more pattern data allowed after {...} or ** pattern elementAction:Fix this pattern in your application or switch to the legacy parser implementation with spring.mvc.pathm…

VLC for Unity播放RTSP延遲高的解決辦法

VLC for Unity播放RTSP延遲高的解決辦法&#xff1a; 設置網絡緩存時長network-caching100 public void Open(){Log("VLCPlayerExample Open");if (mediaPlayer.Media ! null)mediaPlayer.Media.Dispose();List<string> options new List<string>();o…

Eureka在微服務架構中的服務降級策略解析

引言 微服務架構因其靈活性和可擴展性而受到現代軟件開發的青睞。然而&#xff0c;隨著服務數量的增加&#xff0c;系統的復雜性也隨之上升&#xff0c;服務間的依賴關系可能導致單點故障&#xff0c;影響整個系統的穩定性。服務降級是一種常見的應對策略&#xff0c;用于在服…

基于RabbitMQ的異步消息傳遞:發送與消費

引言 RabbitMQ是一個流行的開源消息代理&#xff0c;用于在分布式系統中實現異步消息傳遞。它基于Erlang語言編寫&#xff0c;具有高可用性和可伸縮性。在本文中&#xff0c;我們將探討如何在Python中使用RabbitMQ進行消息發送和消費。 安裝RabbitMQ 在 Ubuntu 上安裝 Rabbi…

提升寫作效率:探索AI在現代辦公自動化中的應用

工欲善其事&#xff0c;必先利其器。 隨著AI技術與各個行業或細分場景的深度融合&#xff0c;日常工作可使用的AI工具呈現出井噴式發展的趨勢&#xff0c;AI工具的類別也從最初的AI文本生成、AI繪畫工具&#xff0c;逐漸擴展到AI思維導圖工具、AI流程圖工具、AI生成PPT工具、AI…

精通SQL Server端口管理:添加與刪除監聽端口的指南

引言 SQL Server的端口管理是數據庫管理員(DBA)必須掌握的關鍵技能之一。端口配置不僅關系到數據庫的網絡通信能力&#xff0c;還直接影響到數據庫的安全性和性能。本文將詳細介紹如何在SQL Server中添加和刪除監聽端口&#xff0c;以及相關的配置策略和最佳實踐。 SQL Serve…

ubuntu 系統中 使用docker 制作 Windows 系統,從此告別 vmware虛擬機

我的系統是 ubuntu 24 前期準備工作&#xff1a; 安裝dockerdocker pull 或者 手動制作鏡像 docker build 的話 必須要 科學上網&#xff0c; 好像阿里鏡像都下不下來。需要 知道 docker 和docker compose 命令的使用方式 我是給docker 掛了 http代理 如果你能pull下來鏡像 …

springboot健身房管理系統-計算機畢業設計源碼031807

摘 要 大數據時代下&#xff0c;數據呈爆炸式地增長。為了迎合信息化時代的潮流和信息化安全的要求&#xff0c;利用互聯網服務于其他行業&#xff0c;促進生產&#xff0c;已經是成為一種勢不可擋的趨勢。在健身房管理的要求下&#xff0c;開發一款整體式結構的健身房管理系統…

Windows環境使用SpringBoot整合Minio平替OSS

目錄 配置Minio環境 一、下載minio.exe mc.exe 二、設置用戶名和密碼 用管理員模式打開cmd 三、啟動Minio服務器 四、訪問WebUI給的地址 SpringBoot整合Minio 一、配置依賴&#xff0c;application.yml 二、代碼部分 FileVO MinioConfig MinioUploadService MinioController 三…

使用Python繪制太陽系圖

使用Python繪制太陽系圖 太陽系圖太陽系圖的優點使用場景 效果代碼 太陽系圖 太陽系圖&#xff08;Sunburst Chart&#xff09;是一種層次結構圖表&#xff0c;用于表示數據的分層結構。它使用同心圓表示各個層級&#xff0c;中心圓代表最高層級&#xff0c;向外的圓環代表逐級…

CCT技術

概念介紹 多個功能核心的集成可以通過片上系統(SOC)或封裝中系統(SIP)設備的開發來實現。SOC器件將核心集成到單個集成電路中。SIP集成是將多個集成電路組合到單個封裝中。核心數量 的增加可能導致必要的測試人員資源和/或測試時間的增加。這直接影響了與測試這些設備相關的…

CesiumJS【Basic】- #031 繪制虛線(Entity方式)

文章目錄 繪制虛線(Entity方式)1 目標2 代碼2.1 main.ts繪制虛線(Entity方式) 1 目標 使用Entity方式繪制虛線 2 代碼 2.1 main.ts import * as Cesium from cesium;const viewer = new Cesium.Viewer(

SAP實現特別總賬的憑證預制

SAP實現特別總賬的憑證預制 仔細理解只有”其他”的特殊總帳標識才可預制憑證這句話. F-29/f-48不可預制。F-29/f-48預制時出現錯誤消息號 FP 030&#xff0c;提示特殊總帳標志類型“匯票和”預付定金“的特別總帳標志的過帳代碼不能預制&#xff0c;這是系統寫死的&#xff…

現在電氣真的比不過計算機嗎 ?

電氣工程和計算機科學在今天的科技和工業領域中各有其重要性和發展空間&#xff0c;并不存在簡單的比較誰“比不過”誰的情況。我收集制作一份plc學習包&#xff0c;對于新手而言簡直不要太棒&#xff0c;里面包括了新手各個時期的學習方向&#xff0c;包括了編程教學&#xff…

Pycharm的終端(Terminal)中切換到當前項目所在的虛擬環境

1.在Pycharm最下端點擊終端/Terminal, 2.點擊終端窗口最上端最右邊的∨&#xff0c; 3.點擊Command Prompt&#xff0c;切換環境&#xff0c; 可以看到現在環境已經由默認的PS(Window PowerShell)切換為項目所使用的虛擬環境。 4.更近一步&#xff0c;如果想讓Pycharm默認顯示…

Linux常用工具使用方式

目錄 常用工具&#xff1a; 安裝包管理工具&#xff1a; 查找含有關鍵字的軟件包 安裝軟件 安裝文件傳輸工具 安裝編輯器 C語言編譯器 C編譯器 安裝調試器 安裝項目版本管理工具 cmake 卸載軟件 安裝jsoncpp 安裝boost庫 安裝mariadb 安裝tree&#xff08;讓目錄…

基于Java的區塊鏈數字身份認證

基于Java的區塊鏈數字身份認證 大家好&#xff0c;我是免費搭建查券返利機器人省錢賺傭金就用微賺淘客系統3.0的小編&#xff0c;也是冬天不穿秋褲&#xff0c;天冷也要風度的程序猿&#xff01;今天我們將探討基于Java的區塊鏈數字身份認證&#xff0c;這是區塊鏈技術在安全領…