官方解釋: ''' In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) '''釋義: 在CPython中,這個全局解釋器鎖,也稱為GIL,是一個互斥鎖,防止多個線程在同一時間執行Python字節碼,這個鎖是非常重要的,
因為CPython的內存管理非線程安全的,很多其他的特性依賴于GIL,所以即使它影響了程序效率也無法將其直接去除 總結: 在CPython中,GIL會把線程的并行變成串行,導致效率降低
?
?
?
二.GIL帶來的問題
首先必須明確執行一個py文件,分為三個步驟
-
從硬盤加載Python解釋器到內存
-
從硬盤加載py文件到內存
-
解釋器解析py文件內容,交給CPU執行
其次需要明確的是每當執行一個py文件,就會立即啟動一個python解釋器,
當執行test.py時其內存結構如下:
GIL,叫做全局解釋器鎖,加到了解釋器上,并且是一把互斥鎖,那么這把鎖對應用程序到底有什么影響?
這就需要知道解釋器的作用,以及解釋器與應用程序代碼之間的關系
py文件中的內容本質都是字符串,只有在被解釋器解釋時,才具備語法意義,解釋器會將py代碼翻譯為當前系統支持的指令交給系統執行。
當進程中僅存在一條線程時,GIL鎖的存在沒有不會有任何影響,但是如果進程中有多個線程時,GIL鎖就開始發揮作用了。如下圖:
開啟子線程時,給子線程指定了一個target表示該子線程要處理的任務即要執行的代碼。代碼要執行則必須交由解釋器,即多個線程之間就需要共享解釋器,為了避免共享帶來的數據競爭問題,于是就給解釋器加上了互斥鎖!
由于互斥鎖的特性,程序串行,保證數據安全,降低執行效率,GIL將使得程序整體效率降低!
?
?
GIL與GC的孽緣 :
在使用Python中進行編程時,程序員無需參與內存的管理工作,這是因為Python有自帶的內存管理機制,簡稱GC。那么GC與GIL有什么關聯?
要搞清楚這個問題,需先了解GC的工作原理,Python中內存管理使用的是引用計數,每個數會被加上一個整型的計數器,表示這個數據被引用的次數,當這個整數變為0時則表示該數據已經沒有人使用,成了垃圾數據。
當內存占用達到某個閾值時,GC會將其他線程掛起,然后執行垃圾清理操作,垃圾清理也是一串代碼,也就需要一條線程來執行。
?
示例代碼:
from threading import Thread def task():a = 10print(a)# 開啟三個子線程執行task函數 Thread(target=task).start() Thread(target=task).start() Thread(target=task).start()
?
通過上圖可以看出,GC與其他線程都在競爭解釋器的執行權,而CPU何時切換,以及切換到哪個線程都是無法預支的,這樣一來就造成了競爭問題 !
假設線程1正在定義變量a=10,而定義變量第一步會先到到內存中申請空間把10存進去,第二步將10的內存地址與變量名a進行綁定,如果在執行完第一步后,CPU切換到了GC線程,GC線程發現10的地址引用計數為0則將其當成垃圾進行了清理,等CPU再次切換到線程1時,剛剛保存的數據10已經被清理掉了,導致無法正常定義變量。
當然其他一些涉及到內存的操作同樣可能產生問題,為了避免GC與其他線程競爭解釋器帶來的問題,CPython簡單粗暴的給解釋器加了互斥鎖
?
如下圖所示:
有了GIL后,多個線程將不可能在同一時間使用解釋器,從而保證了解釋器的數據安全。
?
?
GIL的加鎖與解鎖時機
加鎖的時機:
在調用解釋器時立即加鎖
解鎖時機:
-
當前線程遇到了IO時釋放
-
當前線程執行時間超過設定值時釋放
?
?
但我們并不能因此就否認Python這門語言,其原因如下:
-
GIL僅僅在CPython解釋器中存在,在其他的解釋器中沒有,并不是Python這門語言的缺點
-
在單核處理器下,多線程之間本來就無法真正的并行執行
-
在多核處理下,運算效率的確是比單核處理器高,但是要知道現代應用程序多數都是基于網絡的(qq,微信,爬蟲,瀏覽器等等),CPU的運行效率是無法決定網絡速度的,而網絡的速度是遠遠比不上處理器的運算速度,則意味著每次處理器在執行運算前都需要等待網絡IO,這樣一來多核優勢也就沒有那么明顯了
舉個例子:
任務1 從網絡上下載一個網頁,等待網絡IO的時間為1分鐘,解析網頁數據花費,1秒鐘
任務2 將用戶輸入數據并將其轉換為大寫,等待用戶輸入時間為1分鐘,轉換為大寫花費,1秒鐘
單核CPU下:1.開啟第一個任務后進入等待。2.切換到第二個任務也進入了等待。一分鐘后解析網頁數據花費1秒解析完成切換到第二個任務,轉換為大寫花費1秒,那么總耗時為:1分+1秒+1秒 = 1分鐘2秒
多核CPU下:1.CPU1處理第一個任務等待1分鐘,解析花費1秒鐘。1.CPU2處理第二個任務等待1分鐘,轉換大寫花費1秒鐘。由于兩個任務是并行執行的所以總的執行時間為1分鐘+1秒鐘 = 1分鐘1秒
可以發現,多核CPU對于總的執行時間提升只有1秒,但是這邊的1秒實際上是夸張了,轉換大寫操作不可能需要1秒,時間非常短!
上面的兩個任務都是需要大量IO時間的,這樣的任務稱之為IO密集型,與之對應的是計算密集型即IO操作較少大部分都是計算任務。
對于計算密集型任務,Python多線程的確比不上其他語言!為了解決這個弊端,Python推出了多進程技術,可以良好的利用多核處理器來完成計算密集任務。
總結:
1.單核下無論是IO密集還是計算密集GIL都不會產生任何影響
3.Cpython中IO密集任務應該采用多線程,計算密集型應該采用多進程
另外:之所以廣泛采用CPython解釋器,就是因為大量的應用程序都是IO密集型的,還有另一個很重要的原因是CPython可以無縫對接各種C語言實現的庫,這對于一些數學計算相關的應用程序而言非常的happy,直接就能使用各種現成的算法
?
計算密集型的效率測試:
from multiprocessing import Process from threading import Thread import timedef task():for i in range(10000000):i += 1if __name__ == '__main__':start_time = time.time()
# 多進程p1 = Process(target=task) # 2.053471565246582p2 = Process(target=task)p3 = Process(target=task)p4 = Process(target=task)# 多線程# p1 = Thread(target=task) # 3.169567823410034# p2 = Thread(target=task)# p3 = Thread(target=task)# p4 = Thread(target=task) p1.start()p2.start()p3.start()p4.start()p1.join()p2.join()p3.join()p4.join()print(time.time() - start_time)
?
IO密集型的效率測試 :
from multiprocessing import Process from threading import Thread import time def task():with open("test.txt",encoding="utf-8") as f:f.read() if __name__ == '__main__':start_time = time.time()# 多進程# p1 = Process(target=task)# p2 = Process(target=task)# p3 = Process(target=task)# p4 = Process(target=task)# 多線程p1 = Thread(target=task)p2 = Thread(target=task)p3 = Thread(target=task)p4 = Thread(target=task)p1.start()p2.start()p3.start()p4.start()p1.join()p2.join()p3.join()p4.join()print(time.time()-start_time)
?
?
五.自定義的線程鎖與GIL的區別
GIL保護的是解釋器級別的數據安全,比如對象的引用計數,垃圾分代數據等等,具體參考垃圾回收機制詳解。
from threading import Thread,Lock import timea = 0 def task():global atemp = atime.sleep(0.01) a = temp + 1t1 = Thread(target=task) t2 = Thread(target=task) t1.start() t2.start()t1.join() t2.join() print(a)
?
過程分析:
1.線程1獲得CPU執行權,并獲取GIL鎖執行代碼 ,得到a的值為0后進入睡眠,釋放CPU并釋放GIL
2.線程2獲得CPU執行權,并獲取GIL鎖執行代碼 ,得到a的值為0后進入睡眠,釋放CPU并釋放GIL
3.線程1睡醒后獲得CPU執行權,并獲取GIL執行代碼 ,將temp的值0+1后賦給a,執行完畢釋放CPU并釋放GIL
4.線程2睡醒后獲得CPU執行權,并獲取GIL執行代碼 ,將temp的值0+1后賦給a,執行完畢釋放CPU并釋放GIL,最后a的值也就是1
?
from threading import Thread,Lock import timelock = Lock() a = 0 def task():global alock.acquire()temp = atime.sleep(0.01)a = temp + 1lock.release() t1 = Thread(target=task) t2 = Thread(target=task)t1.start() t2.start()t1.join() t2.join() print(a)
?
過程分析:
1.線程1獲得CPU執行權,并獲取GIL鎖執行代碼 ,得到a的值為0后進入睡眠,釋放CPU并釋放GIL,不釋放lock
2.線程2獲得CPU執行權,并獲取GIL鎖,嘗試獲取lock失敗,無法執行,釋放CPU并釋放GIL
3.線程1睡醒后獲得CPU執行權,并獲取GIL繼續執行代碼 ,將temp的值0+1后賦給a,執行完畢釋放CPU釋放GIL,釋放lock,此時a的值為1
4.線程2獲得CPU執行權,獲取GIL鎖,嘗試獲取lock成功,執行代碼,得到a的值為1后進入睡眠,釋放CPU并釋放GIL,不釋放lock
5.線程2睡醒后獲得CPU執行權,獲取GIL繼續執行代碼 ,將temp的值1+1后賦給a,執行完畢釋放CPU釋放GIL,釋放lock,此時a的值為2
?
?
?
?
六:進程池與線程池
什么是進程/線程池?
池表示一個容器,本質上就是一個存儲進程或線程的列表
?
池子中存儲線程還是進程?
?
為什么需要進程/線程池?
在很多情況下需要控制進程或線程的數量在一個合理的范圍,例如TCP程序中,一個客戶端對應一個線程,雖然線程的開銷小,但肯定不能無限的開,否則系統資源遲早被耗盡,解決的辦法就是控制線程的數量。
線程/進程池不僅幫我們控制線程/進程的數量,還幫我們完成了線程/進程的創建,銷毀,以及任務的分配
?
進程池的使用:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time,os# 創建進程池,指定最大進程數為3,此時不會創建進程,不指定數量時,默認為CPU和核數 pool = ProcessPoolExecutor(3)def task():time.sleep(1)print(os.getpid(),"working..")if __name__ == '__main__':for i in range(10):pool.submit(task) # 提交任務時立即創建進程# 任務執行完成后也不會立即銷毀進程time.sleep(2)for i in range(10):pool.submit(task) #再有新任務是 直接使用之前已經創建好的進程來執行
?
線程池的使用:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from threading import current_thread,active_count import time,os# 創建進程池,指定最大線程數為3,此時不會創建線程,不指定數量時,默認為CPU和核數*5 pool = ThreadPoolExecutor(3) print(active_count()) # 只有一個主線def task():time.sleep(1)print(current_thread().name,"working..")if __name__ == '__main__':for i in range(10):pool.submit(task) # 第一次提交任務時立即創建線程# 任務執行完成后也不會立即銷毀time.sleep(2)for i in range(10):pool.submit(task) #再有新任務時 直接使用之前已經創建好的線程來執行
?
案例:TCP中的應用
?
?
?
七.同步異步-阻塞非阻塞
同步異步-阻塞非阻塞,經常會被程序員提及,并且概念非常容易混淆!
?
阻塞非阻塞 ------指的是程序的運行狀態
阻塞:當程序執行過程中遇到了IO操作,在執行IO操作時,程序無法繼續執行其他代碼,稱為阻塞!
非阻塞:程序在正常運行沒有遇到IO操作,或者通過某種方式使程序即使遇到了也不會停在原地,還可以執行其他操作,以提高CPU的占用率
?
同步-異步-------- 指的是提交任務的方式
同步:指調用發起任務后必須在原地等待任務執行完成,才能繼續執行
異步:指調用發起任務后不用等待任務執行,可以立即開啟執行其他操作
?
同步會有等待的效果但是這和阻塞是完全不同的,阻塞時程序會被剝奪CPU執行權,而同步調用則不會!
?
?
程序中的異步調用并獲取結果方式1:
from concurrent.futures import ThreadPoolExecutor from threading import current_thread import timepool = ThreadPoolExecutor(3) def task(i):time.sleep(0.01)print(current_thread().name,"working..")return i ** iif __name__ == '__main__':objs = []for i in range(3):res_obj = pool.submit(task,i) # 異步方式提交任務# 會返回一個對象用于表示任務結果 objs.append(res_obj)# 該函數默認是阻塞的 會等待池子中所有任務執行結束后執行 pool.shutdown(wait=True)# 從結果對象中取出執行結果 for res_obj in objs:print(res_obj.result()) print("over")
?
程序中的異步調用并獲取結果方式2:
from concurrent.futures import ThreadPoolExecutor from threading import current_thread import timepool = ThreadPoolExecutor(3) def task(i):time.sleep(0.01)print(current_thread().name,"working..")return i ** iif __name__ == '__main__':objs = []for i in range(3):res_obj = pool.submit(task,i) # 會返回一個對象用于表示任務結果print(res_obj.result()) #result是同步的一旦調用就必須等待 任務執行完成拿到結果 print("over")
?
?
?
8.異步回調
什么是異步回調
異步回調指的是:在發起一個異步任務的同時指定一個函數,在異步任務完成時會自動的調用這個函數
?
為什么需要異步回調
之前在使用線程池或進程池提交任務時,如果想要處理任務的執行結果則必須調用result函數或是shutdown函數,而它們都是是阻塞的,會等到任務執行完畢后才能繼續執行,這樣一來在這個等待過程中就無法執行其他任務,降低了效率,所以需要一種方案,即保證解析結果的線程不用等待,又能保證數據能夠及時被解析,該方案就是異步回調
?
異步回調的使用
先來看一個案例:
在編寫爬蟲程序時,通常都是兩個步驟:
1.從服務器下載一個網頁文件
2.讀取并且解析文件內容,提取有用的數據
按照以上流程可以編寫一個簡單的爬蟲程序
要請求網頁數據則需要使用到第三方的請求庫requests可以通過pip或是pycharm來安裝,在pycharm中點擊settings->解釋器->點擊+號->搜索requests->安裝
?
import requests,re,os,random,time from concurrent.futures import ProcessPoolExecutordef get_data(url):print("%s 正在請求%s" % (os.getpid(),url))time.sleep(random.randint(1,2))response = requests.get(url)print(os.getpid(),"請求成功 數據長度",len(response.content))#parser(response) # 3.直接調用解析方法 哪個進程請求完成就那個進程解析數據 強行使兩個操作耦合到一起了return responsedef parser(obj):data = obj.result()htm = data.content.decode("utf-8")ls = re.findall("href=.*?com",htm)print(os.getpid(),"解析成功",len(ls),"個鏈接")if __name__ == '__main__':pool = ProcessPoolExecutor(3)urls = ["https://www.baidu.com","https://www.sina.com","https://www.python.org","https://www.tmall.com","https://www.mysql.com","https://www.apple.com.cn"]# objs = []for url in urls:# res = pool.submit(get_data,url).result() # 1.同步的方式獲取結果 將導致所有請求任務不能并發# parser(res) obj = pool.submit(get_data,url) # obj.add_done_callback(parser) # 4.使用異步回調,保證了數據可以被及時處理,并且請求和解析解開了耦合# objs.append(obj)# pool.shutdown() # 2.等待所有任務執行結束在統一的解析# for obj in objs:# res = obj.result()# parser(res)# 1.請求任務可以并發 但是結果不能被及時解析 必須等所有請求完成才能解析# 2.解析任務變成了串行
?
總結:異步回調使用方法就是在提交任務后得到一個Futures對象,調用對象的add_done_callback來指定一個回調函數,
如果把任務比喻為燒水,沒有回調時就只能守著水壺等待水開,有了回調相當于換了一個會響的水壺,燒水期間可用作其他的事情,等待水開了水壺會自動發出聲音,這時候再回來處理。水壺自動發出聲音就是回調。
-
使用進程池時,回調函數都是主進程中執行執行
-
使用線程池時,回調函數的執行線程是不確定的,哪個線程空閑就交給哪個線程
-
回調函數默認接收一個參數就是這個任務對象自己,再通過對象的result函數來獲取任務的處理結果
?
?
?
1.Queue 先進先出隊列
與多進程中的Queue使用方式完全相同,區別僅僅是不能被多進程共享。
q = Queue(3) q.put(1) q.put(2) q.put(3) print(q.get(timeout=1)) print(q.get(timeout=1)) print(q.get(timeout=1))
?
2.LifoQueue 后進先出隊列
該隊列可以模擬堆棧,實現先進后出,后進先出
lq = LifoQueue()lq.put(1) lq.put(2) lq.put(3)print(lq.get()) print(lq.get()) print(lq.get())
?
3.PriorityQueue 優先級隊列
該隊列可以為每個元素指定一個優先級,這個優先級可以是數字,字符串或其他類型,但是必須是可以比較大小的類型,取出數據時會按照從小到大的順序取出
pq = PriorityQueue() # 數字優先級 pq.put((10,"a")) pq.put((11,"a")) pq.put((-11111,"a"))print(pq.get()) print(pq.get()) print(pq.get()) # 字符串優先級 pq.put(("b","a")) pq.put(("c","a")) pq.put(("a","a"))print(pq.get()) print(pq.get()) print(pq.get())
?
?
10.線程事件Event
什么是事件
事件表示在某個時間發生了某個事情的通知信號,用于線程間協同工作。
因為不同線程之間是獨立運行的狀態不可預測,所以一個線程與另一個線程間的數據是不同步的,當一個線程需要利用另一個線程的狀態來確定自己的下一步操作時,就必須保持線程間數據的同步,Event就可以實現線程間同步
Event介紹
?
可用方法:
event.isSet() #:返回event的狀態值; event.wait() #:將阻塞線程;直到event的狀態為True event.set() #:設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度; event.clear() #:恢復event的狀態值為False
?
使用案例:
# 在鏈接mysql服務器前必須保證mysql已經啟動,而啟動需要花費一些時間,所以客戶端不能立即發起鏈接 需要等待msyql啟動完成后立即發起鏈接 from threading import Event,Thread import timeboot = False def start():global bootprint("正正在啟動服務器.....")time.sleep(5)print("服務器啟動完成!")boot = Truedef connect():while True:if boot:print("鏈接成功")breakelse:print("鏈接失敗")time.sleep(1)Thread(target=start).start() Thread(target=connect).start() Thread(target=connect).start()
?
使用Event改造后:
from threading import Event,Thread import timee = Event() def start():print("正正在啟動服務器.....")time.sleep(3)print("服務器啟動完成!")e.set()def connect():e.wait()print("鏈接成功")Thread(target=start).start() Thread(target=connect).start() Thread(target=connect).start()
?
from threading import Event,Thread import timee = Event() def start():global bootprint("正正在啟動服務器.....")time.sleep(5)print("服務器啟動完成!")e.set()def connect():for i in range(1,4):print("第%s次嘗試鏈接" % i)e.wait(1)if e.isSet():print("鏈接成功")breakelse:print("第%s次鏈接失敗" % i)else:print("服務器未啟動!")Thread(target=start).start() Thread(target=connect).start() # Thread(target=connect).start()
?
?
?