文章目錄
- 1 多進程
- 1.1 簡介
- 1.2 Linux下多進程
- 1.3 multiprocessing
- 1.4 Pool
- 1.5 進程間通信
- 1.6 分布式進程
1 多進程
1.1 簡介
要讓Python程序實現多進程(multiprocessing),我們先了解操作系統的相關知識。
Unix/Linux
操作系統提供了一個fork()
系統調用,它非常特殊。普通的函數調用,調用一次,返回一次,但是fork()
調用一次,返回兩次,因為操作系統自動把當前進程(稱為父進程)復制了一份(稱為子進程),然后,分別在父進程和子進程內返回。
子進程永遠返回0
,而父進程返回子進程的ID。這樣做的理由是,一個父進程可以fork
出很多子進程,所以,父進程要記下每個子進程的ID
,而子進程只需要調用getppid()
就可以拿到父進程的ID。
1.2 Linux下多進程
Python的os模塊封裝了常見的系統調用,其中就包括fork,可以在Python程序中輕松創建子進程:
# multiprocessing.py
import osprint ('Process (%s) start...' % os.getpid())
pid = os.fork()
if pid==0:print ('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
運行結果如下:Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
由于Windows沒有fork調用,上面的代碼在Windows上無法運行。由于Mac系統是基于BSD(Unix的一種)內核,所以,在Mac下運行是沒有問題的
有了fork
調用,一個進程在接到新任務時就可以復制出一個子進程來處理新任務,常見的Apache服務器就是由父進程監聽端口,每當有新的http
請求時,就fork
出子進程來處理新的http請求。
1.3 multiprocessing
如果打算編寫多進程的服務程序,Unix/Linux
無疑是正確的選擇。由于Windows沒有fork調用,難道在Windows上無法用Python編寫多進程的程序?
由于Python是跨平臺的,自然也應該提供一個跨平臺的多進程支持。multiprocessing
模塊就是跨平臺版本的多進程模塊。
multiprocessing
模塊提供了一個Process類來代表一個進程對象,下面的例子演示了啟動一個子進程并等待其結束:
from multiprocessing import Process
import os# 子進程要執行的代碼
def run_proc(name):print ('Run child process %s (%s)...' % (name, os.getpid()))if __name__=='__main__':print ('Parent process %s.' % os.getpid())p = Process(target=run_proc, args=('test',))print ('Process will start.')p.start()p.join()print 'Process end.'執行結果如下:
Parent process 928.
Process will start.
Run child process test (929)...
Process end.
創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,用start()
方法啟動,這樣創建進程比fork()還要簡單。
join()
方法可以等待子進程結束后再繼續往下運行,通常用于進程間的同步。
1.4 Pool
如果要啟動大量的子進程,可以用進程池的方式批量創建子進程:
from multiprocessing import Pool
import os, time, random
def long_time_task(name):print ('Run task %s (%s)...' % (name, os.getpid()))start = time.time()time.sleep(random.random() * 3)end = time.time()print ('Task %s runs %0.2f seconds.' % (name, (end - start)))if __name__=='__main__':print 'Parent process %s.' % os.getpid()p = Pool()for i in range(5):p.apply_async(long_time_task, args=(i,))print ('Waiting for all subprocesses done...')p.close()p.join()print 'All subprocesses done.'執行結果如下:
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.
代碼解讀:
- 對Pool對象調用
join()
方法會等待所有子進程執行完畢,調用join()
之前必須先調用close()
,調用close()
之后就不能繼續添加新的Process
了。 - 注意輸出的結果,task 0,1,2,3是立刻執行的,而task 4要等待前面某個task完成后才執行,這是因為Pool的默認大小在電腦上是4,因此,最多同時執行4個進程。這是Pool有意設計的限制,并不是操作系統的限制。如果改成:
p = Pool(5)
,就可以同時跑5個進程。 - 由于
Pool
的默認大小是CPU的核數,如果擁有8核CPU,那么要提交至少9個子進程才能看到上面的等待效果。
1.5 進程間通信
Process
之間肯定是需要通信的,操作系統提供了很多機制來實現進程間的通信。Python
的multiprocessing
模塊包裝了底層的機制,提供了Queue、Pipes
等多種方式來交換數據。
我們以 Queue
為例,在父進程中創建兩個子進程,一個往Queue里寫數據,一個從Queue里讀數據:
from multiprocessing import Process, Queue
import os, time, random# 寫數據進程執行的代碼:
def write(q):for value in ['A', 'B', 'C']:print ('Put %s to queue...' % value)q.put(value)time.sleep(random.random())
# 讀數據進程執行的代碼:
def read(q):while True:value = q.get(True)print ('Get %s from queue.' % value)if __name__=='__main__':# 父進程創建Queue,并傳給各個子進程:q = Queue()pw = Process(target=write, args=(q,))pr = Process(target=read, args=(q,))# 啟動子進程pw,寫入:pw.start()# 啟動子進程pr,讀取:pr.start()# 等待pw結束:pw.join()# pr進程里是死循環,無法等待其結束,只能強行終止:pr.terminate()運行結果如下:Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
在Unix/Linux
下,multiprocessing
模塊封裝了fork()
調用,使我們不需要關注fork()
的細節。由于Windows沒有fork調用,因此,multiprocessing
需要“模擬”出fork的效果,父進程所有Python
對象都必須通過pickle
序列化再傳到子進程去,所以,如果multiprocessing
在Windows
下調用失敗了,要先考慮是不是pickle失敗了。
1.6 分布式進程
Python
的multiprocessing
模塊不但支持多進程,其中managers
子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網絡通信。
舉個例子:如果我們已經有一個通過Queue通信的多進程程序在同一臺機器上運行,現在,由于處理任務的進程任務繁重,希望把發送任務的進程和處理任務的進程分布到兩臺機器上。怎么用分布式進程實現?
原有的Queue可以繼續使用,但是,通過managers
模塊把Queue通過網絡暴露出去,就可以讓其他機器的進程訪問Queue了。
我們先看服務進程,服務進程負責啟動Queue,把Queue注冊到網絡上,然后往Queue里面寫入任務:
# taskmanager.pyimport random, time, Queue
from multiprocessing.managers import BaseManager# 發送任務的隊列:
task_queue = Queue.Queue()
# 接收結果的隊列:
result_queue = Queue.Queue()# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):pass# 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 綁定端口5000, 設置驗證碼'abc':
manager = QueueManager(address=('', 5000), authkey='abc')
# 啟動Queue:
manager.start()
# 獲得通過網絡訪問的Queue對象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個任務進去:
for i in range(10):n = random.randint(0, 10000)print('Put task %d...' % n)task.put(n)
# 從result隊列讀取結果:
print('Try get results...')
for i in range(10):r = result.get(timeout=10)print('Result: %s' % r)
# 關閉:
manager.shutdown()
請注意,當我們在一臺機器上寫多進程程序時,創建的Queue
可以直接拿來用,但是,在分布式多進程環境下,添加任務到Queue
不可以直接對原始的task_queue
進行操作,那樣就繞過了QueueManager
的封裝,必須通過manager.get_task_queue()
獲得的Queue接口添加。
然后,在另一臺機器上啟動任務進程(本機上啟動也可以):
# taskworker.pyimport time, sys, Queue
from multiprocessing.managers import BaseManager# 創建類似的QueueManager:
class QueueManager(BaseManager):pass# 由于這個QueueManager只從網絡上獲取Queue,所以注冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')# 連接到服務器,也就是運行taskmanager.py的機器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證碼注意保持與taskmanager.py設置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey='abc')
# 從網絡連接:
m.connect()
# 獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊列取任務,并把結果寫入result隊列:
for i in range(10):try:n = task.get(timeout=1)print('run task %d * %d...' % (n, n))r = '%d * %d = %d' % (n, n, n*n)time.sleep(1)result.put(r)except Queue.Empty:print('task queue is empty.')
# 處理結束:
print('worker exit.')
任務進程要通過網絡連接到服務進程,所以要指定服務進程的IP。
現在,可以試試分布式進程的工作效果了。先啟動taskmanager.py服務進程:
$ python taskmanager.py
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...
taskmanager進程發送完任務后,開始等待result隊列的結果。現在啟動taskworker.py進程:$ python taskworker.py 127.0.0.1
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.
taskworker
進程結束,在taskmanager進程中會繼續打印出結果:
Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956
這個簡單的Manager/Worker模型有什么用?其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個worker,就可以把任務分布到幾臺甚至幾十臺機器上,比如把計算n*n的代碼換成發送郵件,就實現了郵件隊列的異步發送。
Queue對象存儲在哪?注意到taskworker.py中根本沒有創建Queue的代碼,所以,Queue對象存儲在taskmanager.py進程中:
而Queue
之所以能通過網絡訪問,就是通過QueueManager
實現的。由于QueueManager
管理的不止一個Queue
,所以,要給每個Queue
的網絡調用接口起個名字,比如get_task_queue
。
authkey
:是為了保證兩臺機器正常通信,不被其他機器惡意干擾。如果taskworker.py
的authkey和taskmanager.py的authkey不一致,肯定連接不上。
注意
:Queue
的作用是用來傳遞任務
和接收結果
,每個任務的描述數據量要盡量小。比如發送一個處理日志文件的任務,就不要發送幾百兆的日志文件本身,而是發送日志文件存放的完整路徑,由Worker進程再去共享的磁盤上讀取文件。