進程是應用程序正在執行的實體,當程序執行時,也就創建了一個主線程。進程在創建和執行需要占用一定的資源,如內存、文件、I/O設備等。
線程是CPU使用的基本單元,由主線程創建,并使用這個進程的資源,因此線程創建成本低,可以實現并發處理,充分利用CPU。
1. 使用進程
程序只是一堆靜態的代碼,而進程則是程序的運行過程。同一個程序執行兩次,就是兩個進程。一個進程就是一個正在運行的任務。對于單核CPU來說,同一時間只能處理一個任務,如果要實現多任務并發處理,可以多任務之間輪換執行。
multiprocessing
是多進程管理包,可以編寫多進程和多線程。如編寫多線程,使用multiprocessing.dummy即可,用法與multiprocessing基本相同。
常用組件和功能:
- 管理進程模塊
Process
:用于創建進程模塊。Pool
:用于創建管理進程池。Queue
:用于進程通信,資源共享。Value,Array
:用于進程通信,資源共享。Pipe
:用于管理通信。Manager
:用于資源共享。
- 同步子進程模塊
Condition
:條件對象。Even
t:事件通信。Lock
:進程鎖。RLock
:遞歸鎖。Semaphore
:進程信號量。
使用多線程往往是用來處理CPU密集型(如科學計算)的需求,如果是IO密集型(如文件讀取、爬蟲等),則可以使用多線程去處理。
Process
是 multiprocessing 的子類,也是multiprocessing 的核心模塊,用來創建子進程。可以實現多進程的創建、啟動、關閉等操作。在multiprocessing中,每一個進程都用一個Process類表示。
multiprocessing.Process(group=None,target=None,name=None,args=(),kwargs={})
參數說明如下:
- group:線程組,目前還沒有實現,參數值必須為None。
- target:表示當前進程啟動時要執行的調用對象,一般為可執行的方法或函數。
- name:進程名稱,相當于給當前進程取一個別名。
- args:表示傳遞給target函數的位置參數,格式為元組。
- kwargs:表示傳遞給target函數的關鍵字參數,格式為字典。
Process
對象包含的實例方法如下:
- is_alive():判斷進程實例是否還在執行。
- join([timeout]):阻塞進程執行,直到進程終止,或者等待一段時間,具體時間由timeout(可選參數)設置,單位為s。
- start():啟動進程實例。
- run():如果沒有設置target參數,調用start()方法時,將執行對象的run()方法。
- terminate():不管任務是否完成,立即停止進程。
Process
對象的常用屬性:
- name:進程名稱。
- pid:進程ID,在進程被創造前返回None。
- exitcode:進程的退出碼,如果進程沒有結束,那么返回None;如果進程被信號N終結,則返回- N。
- authkey:進程的認證密鑰,為一個字節串。
- sentinel:當進程結束時變為ready狀態,可用于同時等待多個事件,否則用join()更簡單些。
- daemon:將父進程設置為守護進程,當父進程結束時,子進程也結束。
# 新建一個test2.py文件,輸入以下內容
from multiprocessing import Process
def foo(i):print('say hi',i)if __name__=='__main__':for i in range(10):p = Process(target=foo,args=(i,))p.start()
# 執行同時輸出以下10行數據
# 新建一個test3.py文件,輸入以下內容
import multiprocessing # 導入multiprocessing模塊
import time # 導入time模塊
def worker(): # 處理任務name = multiprocessing.current_process().name # 獲取進程的名稱print(name,'Starting')time.sleep(4) # 睡眠4sprint(name,'Exiting')def my_service(): # 處理任務name = multiprocessing.current_process().name # 獲取進程的名稱print(name,'Starting')time.sleep(5) # 睡眠5sprint(name,'Exiting')if __name__ == '__main__': # 主進程service = multiprocessing.Process( # 創建子進程1name = 'my_service', # 修改進程名稱target = my_service # 調用對象)worker_1 = multiprocessing.Process( # 創建子進程2name = 'worker 1', # 修改進程名稱target = worker # 調用對象)worker_2 = multiprocessing.Process( # 創建子進程3,保持默認的進程名稱target = worker # 調用對象)worker_1.start() # 啟動進程1worker_2.start() # 啟動進程2service.start() # 啟動進程3
自定義進程:簡單的任務,直接使用multiprocessing.Process實現多進程,而對于復雜的任務,通常自定義Process類,擴展Process功能。
# 新建一個test4.py文件,輸入以下內容
from multiprocessing import Process # 導入 Process 類
import time,os # 導入time和os模塊
class MyProcess(Process): # 自定義進程類,繼承自Processdef __init__(self,name): # 重寫初始化函數super().__init__() # 調用父類的初始化函數self.name = name # 重寫name屬性值def run(self): # 重寫 run方法print('%s is running'%self.name,os.getppid()) # 打印子進程信息,os.getppid()獲取父進程IDtime.sleep(3)print('%s is done'%self.name,os.getpid()) # 打印子進程信息,os.getpid()獲取子進程(當前進程)ID
if __name__ == '__main__':p = MyProcess('子進程1') # 創建子進程p.start() # 執行進程print('主進程',os.getppid()) # 打印主進程ID
管道:Pipe
可以創建管道,用來在兩個進程間進行通信,兩個進程分別位于管道的兩端。
Pipe([duplex])
# (conn1,conn2) = Pipe()
該方法返回兩個鏈接對象(conn1,conn2) 元組,代表管道的兩端。參數duplex為可選,默認值為True。
- 如果duplex為True,那么是雙工模式,即conn1和conn2均可收發消息。
- 如果duplex為False,conn1只負責接收消息,conn2只負責發送消息。
實例化的Pipe對象擁有connection的方法,5種常用的方法如下:
- send(obj):發送數據。
- recv():接收數據。如果沒有消息可接收,recv()方法一直阻塞。如果管道已經被關閉,那么recv()方法拋出EOFError錯誤。
- poll([timeout]):查看緩沖區是否有數據,可設置時間。如果timeout為None,則無限超時。
- send_bytes([buffer[,offset[,size]]):發送二進制字節數據。
- recv_bytes([maxlength]):接收二進制字節數據。
from multiprocessing import Process,Pipe # 導入 Process和Pipe
a,b = Pipe(True) # 如果改 a,b = Pipe(False)
a.send('Hi,b') # 發送數據
print(b.recv()) # 輸出 Hi,b# 一個可發送消息,另一個可以接受消息
# 新建一個test5.py文件,輸入以下內容
from multiprocessing import Process,Pipe # 導入 Process和Pipe
def send(pipe): # send傳輸一個列表pipe.send(['spam']+[42,'egg'])pipe.close()
if __name__ == '__main__':(conn1,conn2) = Pipe() # 創建兩個Pipe實例sender = Process(target=send,args=(conn1,)) # args一定是實例化后的Pipe變量,不能寫args=(Pipe(),)sender.start() # Process 類啟動進程print('conn2 got:%s'%conn2.recv()) # 管道的另一端conn2從send收到消息conn2.close() # 關閉管道
# 管道可以同時發送和接收消息
# 新建一個test6.py文件,輸入以下內容
from multiprocessing import Process,Pipe # 導入 Process和Pipe
def talk(pipe):pipe.send(dict(name='Bob',spam=42)) # 傳入一個字典reply = pipe.recv() # 接收傳輸的數據print('talker got:',reply)if __name__ == '__main__':(parentEnd,childEnd) = Pipe() # 創建兩個Pipe()實例child = Process(target=talk,args=(childEnd,)) # 創建一個Process進程,名稱為childchild.start() # 啟動進程print('parent got:',parentEnd.recv()) # parentEnd是一個Pip()管道,可以接受child Process進程傳輸的數據parentEnd.send({x*2 for x in 'spam'}) # 使用send方法來傳輸數據child.join()print('parent exit')
隊列:Queue
可以創建隊列,實現在多個進程間通信。Queue
是multiprocessing的子類
Queue([maxsize])
Queue
實例對象的常用方法:
- empty():判斷隊列是否為空,空返回True,否則返回False。
- full():判斷隊列是否已滿,滿返回True,否則返回False
- put(obj[,block[,timeout]]):寫入數據。
- get([block[,timeout]]):讀取數據。
- put_nowait():直接寫入數據。
- get_nowait():直接讀取數據。
- close():關閉隊列。
- qsize():返回隊列的大小。
from multiprocessing import Queue
q = Queue() # 創建一個隊列對象
# 使用put方法往隊列里面放值
q.put(1) # 添加數字1
q.put(2) # 添加數字2
q.put(3) # 添加數字3
# 使用get方法從隊列取值。先進先出,后進后出原則
print(q.get()) # 1
print(q.get()) # 2
print(q.get()) # 3
print(q.full()) # False
print(q.empty()) # True
get()方法將從隊列取值,并且把隊列內被取出來的值刪掉。如果get()沒有參數情況下就是默認一直等著取值,就算隊列里面沒有可取的值,程序也不會結束,就會卡在那里一直等待。
# 新建一個test7.py文件,輸入以下內容
from multiprocessing import Process,Queue # 導入 Process,Queue 類
def f(q,name,age): # 進程函數q.put([name,age]) # 添加數據if __name__ == '__main__':q = Queue() # 創建一個Queue對象p = Process(target=f,args=(q,'張三',18)) # 創建一個進程p.start() # 執行進程print(q.get()) # ['張三', 18]p.join() # 阻塞進程
進程池:Pool
可以提供指定數量的進程供用戶調用。
進程池對象 = Pool(進程池,初始函數,初始參數,最大任務數,上下文)
進程池對象常用方法:
- apply():執行進程函數。
- apply_async():異步執行進程函數。
- map():迭代執行進程函數。
- map_async():異步迭代執行進程函數。
- close():關閉進程池。
- terminal():結束工作進程。
- join():阻塞主進程等待退出。
# 新建一個test8.py文件,輸入以下內容
import time
from multiprocessing import Pool # 導入Pool 類def run(n): # 進程處理函數time.sleep(1) # 阻塞 1 sreturn n*n # 返回浮點數的平方
if __name__ == '__main__': # 主進程testFL = [1,2,3,4,5,6] # 待處理的數列print('順序執行') # 但進程s = time.time() # 計時開始for fn in testFL:run(fn)e1 = time.time() # 計時結束print('順序執行時間:',int(e1-s)) # 計算所用時差print('并行執行') # 創建多進程,并行執行pool = Pool(6) # 創建6個進程數量的進程池r1 = pool.map(run,testFL) # 并發執行運算pool.close() # 關閉進程池,不再接受新的進程pool.join() # 主進程阻塞等待子進程的退出e2 = time.time() # 計時結束print('并行執行時間:',int(e2-e1)) # 計算所用時差print(r1) # 打印計算結果
進程鎖:當多個進程使用同一資源時,容易引發數據安全或順序混亂問題,這時可以考慮為進程加鎖,使進程產生同步,確保數據的一致性。使用Lock可以創建鎖。
lock = multiprocessing.Lock() # 創建鎖
lock.acquire() # 獲取鎖
lock.release() # 釋放鎖# 新建一個test9.py文件,輸入以下內容
import os,time,random
from multiprocessing import Process,Lock,set_start_method
def work(lock,n):lock.acquire()print('%s:%s is runing'%(n,os.getpid()))time.sleep(random.random())print('%s:%s is down'%(n,os.getpid()))lock.release()
if __name__ == '__main__':set_start_method('fork')lock = Lock()for i in range(3): # 利用for循環模擬多線程p = Process(target=work,args=(lock,i))p.start()# 使用加鎖形式實現了順序執行,保證了數據的安全,類似于數據庫的事務。
2.使用線程
進程是執行的應用程序,而線程是進程的執行序列,一個進程可以有多個線程,線程(Thread)也叫輕量級進程,多線程類似于同時執行多個不同的程序。
多線程運行優點如下:
- 進程之間不能共享內存,但線程之間可以共享內存。
- 操作系統在創建進程時,需要為該進程重新分配系統資源,但創建線程的代價則小的多因此使用多線程實現多任務并發執行比使用多進程的效率高。
多線程編程的優勢如下:
- 使用線程可以把占據長時間程序的任務放到后臺處理。
- 用戶界面可以更加吸引人,如用戶點擊一個按鈕去觸發某些事件的處理,可以彈出一個進度條來顯示處理的進度。
- 程序的運行速度可能加快。
在一些等待的任務實現上如用戶輸入、文件讀寫和網絡收斂數據等,線程的優勢較明顯。
使用Thread
構造器可以創建線程,Thread
是threading模塊最核心的類,每個Threa對象代表一個線程,每個線程可以獨立處理不同的任務。
Thread(group=None,target=None,name=None,args=(),kwargs={})
# 新建一個test11.py文件,輸入以下內容
import time
import threading
def f(n):print('線程運算',n)time.sleep(1)if __name__ == '__main__':a = time.time()for i in range(5):# f(i+1) # 單線程運算:5st = threading.Thread(target=f,args=(i+1,)) # 多線程運算:不到1st.start()b = time.time()print('花費實際:%2f'%(b-a))
自定義線程:繼承 threading.Thread 自定義線程類,其本質是重構Thread類中的run方法。
# 新建一個test12.py文件,輸入以下內容
# 自定義Threading
import time
import threading
class MyThread(threading.Thread): # 以繼承的方式實現線程創建def __init__(self,n): # 重寫初始化函數super(MyThread,self).__init__() # 重構run函數必須重寫self.n = ndef run(self): # 重寫run函數print('task',self.n)time.sleep(1)print('2s')time.sleep(1)print('1s')time.sleep(1)print('0s')
if __name__ == '__main__':t1 = MyThread('t1') # 實例化線程對象t2 = MyThread('t2')t1.start() # 執行線程t2.start()
線程鎖:在多線程中,所有變量都由所有線程共享,任何一個變量都可以被任何一個線程修改。因此,線程之間共享數據同時改變一個變量時會把內容改亂。為了確保一個線程在修改變量時,別的線程一定不能改該變量,這就是鎖。
Lock
對象有兩個基本方法:
- acquire():可以阻塞或非阻塞地獲取鎖。
- release():釋放一個鎖。
線程鎖的優點:確保某段關鍵代碼只能由一個線程從頭到尾完整地執行。
線程鎖的缺點如下:
- 阻止了多線程并發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率降低了。
- 由于存在多個鎖,不同線程持有不同的鎖,可能回造成死鎖,導致多個線程全部掛起,既不能執行,也無法結束,只能靠操作系統強制終止。
# 新建一個test13.py文件,輸入以下內容
import time
import threading
t = 0
lock = threading.Lock() # 創建Lock對象
def run_thread(n): # 線程處理函數global t # 聲明全局變量for i in range(1000000): # 無數次重復操作,對變量執行先存后取相同的值lock.acquire() # 獲取鎖try:t = t + nt = t - nfinally:lock.release() # 釋放鎖
t1 = threading.Thread(target=run_thread,args=(5,)) # 創建線程
t2 = threading.Thread(target=run_thread,args=(8,))
t1.start() # 開始執行線程
t2.start()
t1.join() # 阻塞線程
t2.join()
print(t) # 0
遞歸鎖:RLock
允許在同一線程中多次調用acqire(),不回阻塞程序,這種鎖稱為遞歸鎖。acquire和release必須成對出現,即調用了n次acquire()方法,就必須調用n次release()方法,才能真正釋放所占用的鎖。
# 新建一個test13.py文件,輸入以下內容
import time
import threading
t = 0
rlock = threading.RLock() # 創建Lock對象
def run_thread(n): # 線程處理函數global t # 聲明全局變量for i in range(1000000): # 無數次重復操作,對變量執行先存后取相同的值rlock.acquire() # 獲取鎖rlock.acquire() # 獲取鎖try:t = t + nt = t - nfinally:rlock.release() # 釋放鎖rlock.release() # 釋放鎖
t1 = threading.Thread(target=run_thread,args=(5,)) # 創建線程
t2 = threading.Thread(target=run_thread,args=(8,))
t1.start() # 開始執行線程
t2.start()
t1.join() # 阻塞線程
t2.join()
print(t) # 0
條件對象:允許一個或多個線程在被其它線程通知之前處于等待中。Condition 時 threading 模塊的一個子類,用于維護多線程之間的同步協作。內部使用的也是Lock或Rlock,同時增加了等待池功能。常見方法如下:
- acquire():請求底層鎖。
- release():釋放底層鎖。
- wait():等待直到被通知或發送超時。
- wait_for():等待,直接條件計算為真。
- notify(n=1):默認喚醒一個等待這個條件的線程。
- notify_all():喚醒所有正在等待這個條件的線程。
import time
import threading
class Test1(threading.Thread):def __init__(self,name,cond):super().__init__()self.name = nameself.cond = conddef run(self):with self.cond:print(self.name,':1')time.sleep(1)print(self.name,':3')class Test2(threading.Thread):def __init__(self,name,cond):super().__init__()self.name = nameself.cond = conddef run(self):with self.cond:print(self.name,':2')time.sleep(1)print(self.name,':4')cond = threading.Condition()
a = Test1('A',cond)
b = Test2('B',cond)
a.start()
b.start()
事件通信:事件用于主線程控制其他線程的執行,事件是一個簡單的線程同步對象,其主要提供以下幾個方法:
- is_set():當且僅當內部標志為True時返回True。
- set():將內部標志設置為True。所有正在等待這個事件的線程將被喚醒。當標志為True時,調用wait()方法的線程不回被阻塞。
- clear():將內部標志設置為False。
- wait():等待設置標志。