2019獨角獸企業重金招聘Python工程師標準>>>
python多進程
進程簡介
進程是程序在計算機上的一次執行活動。當你運行一個程序,你就啟動了一個進程。顯然,程序是死的(靜態的),進程是活的(動態的)。進程可以分為系統進程和用戶進程。凡是用于完成操作系統的各種功能的進程就是系統進程,它們就是處于運行狀態下的操作系統本身;用戶進程就不必我多講了吧,所有由你啟動的進程都是用戶進程。進程是操作系統進行資源分配的單位。
在操作系統的管理下,所有正在運行的進程輪流使用CPU,每個進程允許占用CPU的時間非常短(比如10毫秒),這樣用戶根本感覺不出來CPU是在輪流為多個進程服務,就好象所有的進程都在不間斷地運行一樣。但實際上在任何一個時間內有且僅有一個進程占有CPU。
進程和線程
進程(process)和線程(thread)
單個CPU一次只能運行一個任務;在任一時刻,CPU總是運行一個進程,其他進程處于非運行狀態;
一個進程可以包含多個線程;
進程沒有任何共享狀態,進程修改的數據,改動僅限于該進程內;
一個進程的內存空間是共享的,每個線程都可以使用這些共享內存;
一個線程使用某些共享內存時,其他線程必須等它結束才能使用這一塊內存;防止多個線程同時讀寫某一塊內存區域,采用互斥鎖(Mutual exclusion,縮寫Mutex);
某些內存區域只能供給固定數目的線程使用,此時通過信號量(Semaphore)保證多個線程不會互相沖突;
多進程形式,運行多個任務同時運行;多線程形式,允許單個任務分成不同的部分運行;
多線程使用的是cpu的一個核,適合io密集型;
多進程使用的是cpu的多個核,適合運算密集型。
在linux中可以使用ps -efL查看進程和線程ID。以memcached進程為例,輸出結果如下
1 2 3 4 5 6 7 8 9 10 11 12 | [root@VM_0_4_centos ~]# ps -efL |grep memcached root 24421 1 24421 0 10 May19 ? 00:00:03 memcached -d -u root root 24421 1 24422 0 10 May19 ? 00:00:01 memcached -d -u root root 24421 1 24423 0 10 May19 ? 00:00:00 memcached -d -u root root 24421 1 24424 0 10 May19 ? 00:00:00 memcached -d -u root root 24421 1 24425 0 10 May19 ? 00:00:00 memcached -d -u root root 24421 1 24426 0 10 May19 ? 00:00:00 memcached -d -u root root 24421 1 24427 0 10 May19 ? 00:00:00 memcached -d -u root root 24421 1 24428 0 10 May19 ? 00:00:00 memcached -d -u root root 24421 1 24429 0 10 May19 ? 00:00:09 memcached -d -u root root 24421 1 24430 0 10 May19 ? 00:00:00 memcached -d -u root root 32169 31101 32169 0 1 23:23 pts/0 00:00:00 grep --color=auto memcached |
?
第一行UID
(用戶ID),第二行為PID
(進程ID),第三行PPID
(父進程ID),第四行LWP
(線程ID)。
從示例可以看出,進程24421子進程有10個,對應線程ID分別為24421-24430。
multiprocess
python中的多線程無法利用多核優勢,若要充分使用多核CPU資源,在python中大部分情況使用多進程。python提供了非常好用的多進程包multiprocessing。
multiprocessing模塊用來開啟子進程,并在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。
multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
Process類
創建一個Process對象
1 | p = multiprocessing.Process(target=worker_1, args=(2, )) |
?
-
參數
target:函數名字
args:函數需要的參數,以tuple的形式傳入(單個元素的tuple必須有逗號) -
方法
p.is_alive() 判斷進程p是否存活,是返回True
p.run() 啟動進程,它去調用target指定的函數
p.start() 啟動進程,它會自動調用run方法,推薦使用start
p.join(timeout) 主線程等待p終止(主線程處于等的狀態,p處于運行狀態)。p.join只能join使用start開啟的進程,不能join使用run開啟的進程
p.terminate() 強制進程p退出,不會進行任何清理操作,如果p創建了子進程,該子進程就變成了僵尸進程 - 屬性
p.name 進程的名字
p.pid 進程的pid
p.daemon 默認為False,如果設置為True代表p為后臺運行的守護進程,當p的父進程終止時p也隨之終止,并且設置為True后,p不能創建自己的新進程,必須在p.start()之前設置1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
import multiprocessing import timedef worker(args, interval):print("start worker {0}".format(args))time.sleep(interval)print("end worker {0}".format(args))def main():print("start main")p1 = multiprocessing.Process(target=worker, args=(1, 1))p2 = multiprocessing.Process(target=worker, args=(2, 2))p3 = multiprocessing.Process(target=worker, args=(3, 3))p1.start()p2.start()p3.start()print("end main")if __name__ == '__main__':main()
輸出結果
1 2 3 4 5 6 7 8 | start main end main start worker 1 start worker 2 start worker 3 end worker 1 end worker 2 end worker 3 |
?
multprocessing用到的兩個方法
cpu_count():統計cpu總數
active_children():獲得所有子進程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | import multiprocessing import timedef worker(args, interval):print("start worker {0}".format(args))time.sleep(interval)print("end worker {0}".format(args))def main():print("start main")p1 = multiprocessing.Process(target=worker, args=(1, 1))p2 = multiprocessing.Process(target=worker, args=(2, 2))p3 = multiprocessing.Process(target=worker, args=(3, 3))p1.start()p1.join(timeout=0.5) #此處保證了p1優先執行p2.start()p3.start()print("the number of CPU is: {0}".format(multiprocessing.cpu_count()))for p in multiprocessing.active_children():print("The name of active children is: {0}, pid is: {1} is alive".format(p.name, p.pid))print("end main")if __name__ == '__main__':main() |
?
輸出結果
1 2 3 4 5 6 7 8 9 10 11 12 | start main start worker 1 the number of CPU is: 4 The name of active children is: Process-1, pid is: 25360 is alive The name of active children is: Process-2, pid is: 24500 is alive The name of active children is: Process-3, pid is: 26100 is alive end main start worker 3 start worker 2 end worker 1 end worker 2 end worker 3 |
?
lock組件
當我們用多進程來讀寫文件的時候,如果一個進程是寫文件,一個進程是讀文件,如果兩個文件同時進行,肯定是不行的,必須是文件寫結束以后,才可以進行讀操作。或者是多個進程在共享一些資源的時候,同時只能有一個進程進行訪問,那就要有一個鎖機制進行控制。
下面使用2個進程分別進行+1
和+3
操作為例
- 不加鎖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
import time import multiprocessingdef add(value, number):print("start add{0} number= {1}".format(value, number))for i in range(1, 3):number += valuetime.sleep(0.3)print("number = {0}".format(number))if __name__ == '__main__':print("start main")number = 0p1 = multiprocessing.Process(target=add, args=(1, number))p3 = multiprocessing.Process(target=add, args=(3, number))p1.start()p3.start()print("end main")
輸出結果
1 2 3 4 5 6 7 8 | start main end main start add1 number= 0 start add3 number= 0 number = 1 number = 3 number = 2 number = 6 |
?
- 加鎖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
import time import multiprocessingdef add(lock, value, number):with lock:print("start add{0} number= {1}".format(value, number))for i in range(1, 3):number += valuetime.sleep(0.3)print("number = {0}".format(number))if __name__ == '__main__':print("start main")number = 0lock = multiprocessing.Lock()p1 = multiprocessing.Process(target=add, args=(lock, 1, number))p3 = multiprocessing.Process(target=add, args=(lock, 3, number))p1.start()p3.start()print("end main")
輸出結果
1 2 3 4 5 6 7 8 | start main end main start add1 number= 0 number = 1 number = 2 start add3 number= 0 number = 3 number = 6 |
?
鎖的獲取可以使用lock.acquire()
獲取,lock.release()
釋放
1 2 3 4 5 6 7 8 9 10 11 12 13 | def add(lock, value, number):lock.acquire()print("start add3 number= {0}".format(number))try:for i in range(1, 5):number += valuetime.sleep(0.3)print("number = {0}".format(number))except Exception as e:raise efinally:lock.release()pass |
?
共享內存
一般變量在進程之間是沒法進行通訊的,但是multiprocessing提供了Value
和Array
模塊,可以在不同的進程中使用同一變量。Value
和Array
結構內部都實現了鎖機制,因此多進程是安全的。
Value和Array都需要設置其中存放值的類型,d是double類型,i是int類型。類型設置和array
模塊的值類似,更多的類型可以點擊array — Efficient arrays of numeric values查看。
上面的示例中,兩個進程執行后number結果分別為2和6,假如兩個進程可以共享變量,name輸出結果將會是8。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | import multiprocessing from multiprocessing import Valuedef add(value, number):print("start add{0} number= {1}".format(value, number.value))for i in range(1, 3):number.value += valueprint("number = {0}".format(number.value))if __name__ == '__main__':print("start main")number = Value('d', 0) #使用Value創建變量p1 = multiprocessing.Process(target=add, args=(1, number))p3 = multiprocessing.Process(target=add, args=(3, number))p1.start()p3.start()print("end main") |
?
輸出結果
1 2 3 4 5 6 7 8 | start main end main start add1 number= 0.0 start add3 number= 0.0 number = 1.0 number = 4.0 number = 5.0 number = 8.0 |
?
number最終結果是8,但是具體輸出結果每次執行可能存在差異。
更多關于multiprocessing
的內容可以點擊multiprocessing — Process-based parallelism查看官方介紹。
Share