一、協程
1.1、協程
協程,Coroutines,也叫作纖程(Fiber)
協程,全稱是“協同程序”,用來實現任務協作。是一種在線程中,比線程更加輕量級的存在,由程序員自己寫程序來管理。
當出現IO阻塞時,CPU一直等待IO返回,處于空轉狀態。這時候用協程,可以執行其他任務。當IO返回結果后,再回來處理數據。充分利用了IO等待的時間,提高了效率。
進程與線程是操作系統管理和調度的基本單位,而協程則是由程序員 實現的一種輕量級的、用戶空間級別的多任務機制,通常不由操作系統直接提供支持。
1.2、協程的核心(控制流的讓出和恢復)
1.每個協程有自己的執行棧,可以保存自己的執行現場
2.可以由用戶程序按需創建協程(比如:遇到io操作)
3.協程“主動讓出(yield)”執行權時候,會保存執行現場(保存中斷時的寄存器上下文和棧),然后切換到其他協程
4.協程恢復執行(resume)時,根據之前保存的執行現場恢復到中斷前的狀態,繼續執行,這樣就通過協程實現了輕量的由用戶態調度的多任務模型
1.3、協程的特點
1. 占用資源少:協程通常只需要少量的棧空間,這是因為它們采用協作式的多任務 處理機制,可以在固定的棧空間內通過狀態保存和恢復來實現任務的切換,相比 多線程和多進程,協程占用的系統資源更少。
2. 切換開銷小:協程的切換是在用戶態進行的,不需要進行系統調用,也不涉及內 核態的上下文切換,因此其切換開銷非常小,遠遠低于線程間的上下文切換。
3. 可暫停和可恢復的函數:協程允許函數在執行過程中主動暫停(通常是遇到I/O操 作或其他耗時操作時),并將控制權交還給調度器,以便其他協程可以運行。在 I/O操作或其他耗時操作完成后,該協程可以從暫停的地方繼續執行,而不會阻塞 整個線程。這種特性使得協程非常適合于處理I/O密集型任務,可以在等待I/O操 作完成時釋放CPU,從而提高程序的并發性能和資源利用率。
1.4、協程的優點
1.由于自身帶有上下文和棧,無需線程上下文切換的開銷,屬于程序級別的切換,操作系統完全感知不到,因而更加輕量級;
2.無需原子操作的鎖定及同步的開銷;
3.方便切換控制流,簡化編程模型
4.單線程內就可以實現并發的效果,最大限度地利用cpu,且可擴展性高,成本低(注:一個CPU支持上萬的協程都不是問題。所以很適合用于高并發處理)
asyncio協程是寫爬蟲比較好的方式。比多線程和多進程都好. 開辟新的線程和進程是非常耗時的。
1.5、協程的缺點
1.無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上。
1.6、與進程和線程的比較
多進程是在操作系統層面實現的并行執行方式,每個進程擁有獨立的內存空間和 系統資源,進程間通過進程間通信(IPC)機制(如管道、消息隊列、共享內存 等)進行交互,這增加了通信的復雜性。多進程可以充分利用多核處理器的性 能,實現真正的并行計算。由于進程間的隔離性,系統的安全性和穩定性也得到 了提高。然而,進程間通信和同步的開銷相對較高,且每個進程的創建和銷毀通 常伴隨著較大的資源開銷。
多線程是在一個進程內部實現的并發執行方式,多個線程共享該進程的內存空間 和資源,這使得線程間通信和數據共享相對容易。但是,這也引入了線程安全問題,需要通過同步機制(如互斥鎖、信號量、條件變量等)來避免數據沖突。多 線程的優點在于能夠實現并發執行,但線程間的上下文切換開銷相比進程較小, 但相比協程則較大,且需要謹慎處理線程安全問題。?
協程是一種輕量級的線程,與多進程和多線程相比,它具有占用資源少、切換開 銷小、可以實現高效異步執行等優點。協程通過非阻塞I/O操作來等待數據,當數 據就緒時自動恢復執行,從而提高了程序的執行效率和響應速度。然而,協程也 有其局限性,它只能在單個線程內執行,因此它對于CPU密集型任務來說并沒有 什么好處。?
二、實現協程的方法?
在python中,實現協程的方法有以下幾種:
????????1. 使用async/await關鍵字:python3.5及以后出現,到目前為止這是目前最主流的 實現協程的方法。
????????2. 使用yield關鍵字:使用yield關鍵字及其send方法可以實現協程的效果。
????????3. 使用asyncio.coroutine:在python3.4發布之后可以使用該裝飾器與yield from 配合來實現協程,不過在python3.8棄用。
????????4. 使用第三方庫:通過其他的第三方庫也可以實現協程,如greenlet。
def consumer():print("消費者準備接收數據。")while True:# 接收生產者發送的數據data = (yield)print("消費者接收到了數據:", data)
def producer(consumer_generator):# 啟動生成器,使它準備好接收數據next(consumer_generator)for i in range(5):print("生產者發送數據:", i)# 發送數據給消費者consumer_generator.send(i)# 終止生成器consumer_generator.close()
if __name__ == '__main__':# 創建消費者生成器consumer_coroutine = consumer()# 創建生產者producer(consumer_coroutine)
'''
消費者準備接收數據。
生產者發送數據: 0
消費者接收到了數據: 0
生產者發送數據: 1
消費者接收到了數據: 1
生產者發送數據: 2
消費者接收到了數據: 2
生產者發送數據: 3
消費者接收到了數據: 3
生產者發送數據: 4
消費者接收到了數據: 4
'''
2.1、async
async關鍵字是Python異步編程的核心組成部分,用于定義協程函數。協程函數與 普通函數不同,它們在調用時不會執行函數里面的代碼,而是會返回一個協程對象。
# 定義一個協程函數
async def func():print('123')# 直接調用協程函數會發出警告,并且函數內部的功能也不會執行
func()
想要運行函數體里面的代碼,需要進行兩個方面的準備:
????????1. 獲取事件循環。
????????2. 將協程對象封裝為Task對象并提交到事件循環中。?
2.2、await
????????在Python中, await關鍵字用于掛起(暫停)異步函數的執行,直到被等待的協程 (coroutine)完成。這是異步編程中的一個關鍵概念,因為它允許程序在等待結果 的同時執行其他任務。
2.2.1、await的基本用法
1. 只能在異步函數內部使用: await關鍵字只能在一個使用了 異步函數內部使用。它不能在普通的同步函數中使用。
2. 等待協程: async def定義的 await后面通常跟一個協程對象(一個異步函數的調用)。當執行到 await時,當前協程會暫停執行,等待右側的協程完成。
2.2.2、await的工作原理
1. 掛起與恢復: 當執行到 await時,當前協程會掛起,并讓出控制權給事件循環 (event loop)。事件循環可以在這段時間內運行其他協程或處理其他事件。一 旦await后面的協程完成,事件循環會恢復執行原來的協程, 果就是協程的返回值。
2. 非阻塞: 盡管 await表達式的結 await看起來像是同步代碼中的阻塞操作,但實際上它是非阻塞 的。這是因為事件循環負責協程之間的切換,從而實現并發。
import asyncio
import time
# 定義一個異步函數say_after,它接受延遲時間和要打印的消息作為參數
async def say_after(delay, what):
# 使用await關鍵字掛起當前協程,直到指定的延遲時間結束后再繼續執行await asyncio.sleep(delay)# 打印消息print(what)
# 定義主異步函數main
async def main():# 記錄開始時間print(f"started at {time.strftime('%X')}")# 調用say_after函數,等待1秒后打印'hello'await say_after(1, 'hello')# 調用say_after函數,等待2秒后打印'world'# 注意:這里的執行不是并行的,而是順序的,因為兩個await語句是順序執行的await say_after(2, 'world')# 記錄結束時間print(f"finished at {time.strftime('%X')}")
# 調用asyncio.run()來啟動主協程
# 這將創建一個新的事件循環并運行main()直到完成
asyncio.run(main())
'''
started at xx:xx:xx
hello
world
finished at xx:xx:xx
'''
三、事件循環
????????事件循環是一種處理程序執行、事件和消息分發的機制。它不斷地等待事件的發生, 當事件發生時,事件循環會將其分發給相應的處理程序進行處理。事件循環的核心是 一個循環,它會不斷地檢查是否有事件需要處理,如果有,就調用相應的回調函數來 處理這些事件。
其工作流程為:
1. 啟動:創建并啟動事件循環。
2. 注冊事件:將各種事件(如網絡套接字、文件描述符、定時器等)注冊到事件循 環中。
3. 事件循環:進入一個循環,等待事件的發生,并處理這些事件。
4. 執行任務:當事件發生時,事件循環會調用相關的處理函數或恢復相應的協程。?
5. 關閉:當所有任務完成后,關閉事件循環。
事件循環的創建隨著Python版本的不同而不同,在Python3.7版本之前,事件循環需 要先使用 asyncio.get_event_loop()來獲取循環,然后使用 run_until_complete()來執行任務。在Python3.7及以后的版本,直接使用 asyncio.run()來直接執行任務。
import asyncio
# 定義一個異步函數func1
async def func1():print('start func1') # 打印信息,表示func1開始執行await asyncio.sleep(2) # 模擬異步I/O操作,協程在這里掛起2秒鐘print('end func1') # 打印信息,表示func1執行結束
# 定義另一個異步函數func2
async def func2():print('start func2') # 打印信息,表示func2開始執行await asyncio.sleep(2) # 模擬異步I/O操作,協程在這里掛起2秒鐘print('end func2') # 打印信息,表示func2執行結束
asyncio.run(func1())
asyncio.run(func2())
'''
start func1
end func1
start func2
end func2
'''
四、Task對象
????????Task對象是 asyncio庫中的一個實現,它用來在事件循環中安排協程的執行。一個 Task是對協程的一個封裝,簡單來說,協程本身并不會自動運行,當一個協程被封 裝為一個Task對象并提交到事件循環中時,它才會在事件循環中被安排執行。當協程 執行完畢后, Task會提供協程的返回值或異常,并且相比協程對象, 更加豐富的方法供我們使用。
????????將協程對象封裝為 Task對象擁有 Task是通過asyncio庫中的函數進行的,但隨著Python版本的不 同,其所用函數也不同。
????????在python3.7之前,Task的創建使用的是 asyncio.ensure_future()函數,通過該 函數將使用 async定義的協程函數所返回的協程對象提交到事件循環中。在 python3.7之后,創建Task對象的方法變得更加直接和明確,可以使用 asyncio.create_task()函數來創建,且python3.8版本之后,添加了name參數可 以為任務指定名稱。這個函數接受一個協程對象作為參數,并返回一個新的Task對 象。
import asyncio
# 定義一個異步函數func1
async def func1():print('start func1') # 打印信息,表示func1開始執行await asyncio.sleep(2) # 模擬異步I/O操作,協程在這里掛起2秒鐘print('end func1') # 打印信息,表示func1執行結束
# 定義另一個異步函數func2
async def func2():print('start func2') # 打印信息,表示func2開始執行await asyncio.sleep(2) # 模擬異步I/O操作,協程在這里掛起2秒鐘print('end func2') # 打印信息,表示func2執行結束
# 定義主異步函數main,它將作為程序的入口點
async def main():# 創建任務列表,使用asyncio.create_task來創建任務tasks = [asyncio.create_task(func1()), # 創建并調度func1作為異步任務asyncio.create_task(func2())# 創建并調度func2作為異步任務]# 使用asyncio.wait等待所有任務完成# asyncio.wait接收一個任務列表,并等待這些任務完成done, pending = await asyncio.wait(tasks)
# 使用 asyncio.run() 來運行主函數
# asyncio.run()是Python 3.7引入的,它會創建一個新的事件循環,運行傳入的協程,并在協程完成后關閉事件循環
# 等同于asyncio.get_event_loop().run_until_complete(main())
asyncio.run(main())
'''
start func1
start func2
end func1
end func2
'''
import asyncio
# 定義一個異步函數func1
async def func1():print('start func1') # 打印信息,表示func1開始執行await asyncio.sleep(2) # 模擬異步I/O操作,協程在這里掛起2秒鐘print('end func1') # 打印信息,表示func1執行結束
# 定義另一個異步函數func2
async def func2():print('start func2') # 打印信息,表示func2開始執行await asyncio.sleep(2) # 模擬異步I/O操作,協程在這里掛起2秒鐘print('end func2') # 打印信息,表示func2執行結束
# 定義主異步函數main,它將作為程序的入口點
async def main():# 直接調用asyncio.gather不需要將協程對象先手動封裝為Task對象# 該函數會負責將它們作為任務調度到事件循環中# asyncio.gather會返回一個包含所有結果的列表await asyncio.gather(func1(), func2())
# 使用 asyncio.run() 來運行主函數
# asyncio.run()會創建一個新的事件循環,運行傳入的協程,并在協程完成后關閉事件循環
asyncio.run(main())
'''
start func1
start func2
end func1
end func2
'''
五、協程間通信
????????與線程相似,協程之間的通信也只有消息隊列一種,且擁有不同種類的消息隊列。 在python中,協程所使用的消息隊列在 asyncio.Queue庫下,其中共存在三種類型 的隊列,分別為標準的先進先出隊列 Queue、先進后出隊列 LifoQueue和優先級隊 列PriorityQueue。
5.1、Queue?
先進先出的原則
asyncio.Queue(maxsize=0)
maxsize:隊列的最大尺寸,如果maxsize小于等于零,則隊列尺寸是無限的。如果 是大于0的整數,則當隊列達到maxsize時, await put()將阻塞至某個元素被 get()取出。
類方法 :
● ? ? ? ?Queue.qsize():返回隊列中當前有幾條消息。
● ? ? ? ?Queue.empty():如果隊列為空,返回True,否則返回 False。
● ? ? ? ?Queue.full():如果隊列已滿(達到最大尺寸),返回 True,否則返回 False。
● ? ? ? ?Queue.put(item, block=True, timeout=None):將 item 放入隊列。如果 block 是True是 None(默認),則在必要時阻塞至有空閑的插槽, 如果timeout 是正數,將最多阻塞 timeout 秒,如果在這段時間內沒有可 用的插槽,將引發 queue.Full 異常。?
● ? ? ? ?Queue.put_nowait(item):相當于 Queue.put(item, block=False)。如果隊列已滿,立即引發 queue.Full 異常。
● ? ? ? ?Queue.get(b1ock=True,timeout=None):從隊列中移除并返回一個元素。如果 block是 True 且 timeout是 None(默認),則在必要時阻塞至隊列中有項目可用。如果 timeout 是正數,將最多阻塞 timeout 秒,如果在這段時間內沒有項目可用,將引發 queue.Empty 異常。
● ? ? ? ?Queue.get_nowait():相當于 Queue.get(block=False)。如果隊列為空立即引發 queue.Empty 異常。
?● ? ? ? Queue.task_done():指示之前入隊的一個任務已經完成。由隊列的消費者線程使用。每個Queue. get()調用之后,需要調用 Queue.task_done()告訴隊列該任務處理完成。
● ? ? ? ?Queue.join():阻塞直到隊列中的所有元素都被處理完。當元素添加到隊列的 時候,未完成任務的計數就會增加,每當消費協程調用 task_done()表示這個元 素已經處理完畢,那么未完成計數就會減少。當未完成計數降到零的時候, join()阻塞被接觸。
import asyncio
# 生產者協程,負責生成一系列數字并將它們放入隊列中
async def producer(queue, n):for i in range(1, n + 1): # 循環從1到n,生成數字print(f'生產者生產了: {i}') # 打印當前生產的數字await queue.put(i) # 將數字放入隊列,如果隊列已滿,則阻塞直到有空位await asyncio.sleep(1) # 模擬生產耗時,等待1秒鐘print('生產者完成生產。') # 所有數字生產完畢,打印完成消息await queue.put(None) # 放入一個None作為結束信號,通知消費者沒有更多數字
# 消費者協程,負責從隊列中取出數字并打印它們
async def consumer(queue):while True: # 無限循環,直到接收到結束信號item = await queue.get() # 從隊列中取出一個元素,如果隊列為空,則阻塞直到有元素if item is None: # 檢查是否接收到結束信號queue.task_done() # 通知隊列,當前任務已經完成break # 如果是結束信號,退出循環print(f'消費者消費了: {item}') # 打印當前消費的數字queue.task_done() # 通知隊列,當前任務已經完成print('消費者完成消費。') # 打印完成消費的消息
# 主協程,負責啟動生產者和消費者協程,并等待它們完成
async def main():queue = asyncio.Queue(5) # 創建一個隊列實例,用于生產者和消費者之間的通信# 創建生產者和消費者協程producer_coro = producer(queue, 5) # 生產者協程,生產1到5的數字consumer_coro = consumer(queue) # 消費者協程# 使用asyncio.gather等待生產者和消費者協程完成# gather允許同時運行多個協程,并在它們都完成時返回結果await asyncio.gather(producer_coro, consumer_coro)
# 運行主協程,啟動事件循環
asyncio.run(main())
'''
生產者生產了: 1
消費者消費了: 1
生產者生產了: 2
消費者消費了: 2
生產者生產了: 3
消費者消費了: 3
生產者生產了: 4
消費者消費了: 4
生產者生產了: 5
消費者消費了: 5
生產者完成生產。
消費者完成消費。
'''
5.2、LifoQueue?
后進先出
asyncio.LifoQueue(maxsize=0)
maxsize:隊列的最大尺寸。如果設置為小于或等于0的數,則隊列的尺寸是無 限的。?
常用方法:?
● ? ? ? ?LifoQueue.put(item, block=True, timeout=None):將 如果 block 是 True 且 timeout 是 item 放入隊列。 None(默認),則在必要時阻塞至有空閑 的插槽。如果 timeout 是正數,將最多阻塞 timeout 秒,如果在這段時間內沒 有可用的插槽,將引發完全異常。?
● ? ? ? ?LifoQueue.put_nowait(item):相當于LifoQueue.put(item,b1ock=False)。如果隊列已滿,立即引發完全異常。
● ? ? ? ?LifoQueue.get(block=True,timeout=None):從隊列中移除并返回一個元素。如果 block是True且timeout是 None(默認),則在必要時阻塞至隊列中有項目可用。如果 timeout 是正數,將最多阻塞timeout秒,如果在這段時間內沒有項目可用,將引發完全異常。?
● ? ? ? ?LifoQueue.get_nowait():相當于 LifoQueue.get(block=False)。如果隊列為空,立即引發完全異常。
● ? ? ? ?LifoQueue.qsize():返回隊列中的項目數量
● ? ? ? ?LifoQueue.empty():如果隊列為空,返回 True,否則返回 False。
● ? ? ? ?LifoQueue.full():如果隊列已滿(達到最大尺寸),返回 True,否則返回False 。
●????????LifoQueue.task_done(): 指示之前入隊的一個任務已經完成,即 get出來的元素相關操作已經完成。由隊列中的 get()端掌控,每次 get用于一個任務時,任務最后要調用 task_done()告訴隊列,任務已經完成。
●????????LifoQueue.join():阻塞直到隊列中的所有元素都被處理完。當元素添加到隊 列的時候,未完成任務的計數就會增加,每當消費協程調用 task_done()表示這 個元素已經處理完畢,那么未完成計數就會減少。當未完成計數降到零的時候, join()阻塞被接觸。
import asyncio
# 生產者協程,負責生成一系列數字并將它們放入隊列中
async def producer(queue, n):for i in range(1, n + 1): # 循環從1到n,生成數字print(f'生產者生產了: {i}') # 打印當前生產的數字await queue.put(i) # 將數字放入隊列,如果隊列已滿,則阻塞直到有空位await asyncio.sleep(1) # 模擬生產耗時,等待1秒鐘print('生產者完成生產。') # 所有數字生產完畢,打印完成消息await queue.put(None) # 放入一個None作為結束信號,通知消費者沒有更多數字
# 消費者協程,負責從隊列中取出數字并打印它們
async def consumer(queue):await asyncio.sleep(5)while True: # 無限循環,直到接收到結束信號item = await queue.get() # 從隊列中取出一個元素,如果隊列為空,則阻塞直到有元素if item is None: # 檢查是否接收到結束信號queue.task_done() # 通知隊列,當前任務已經完成break # 如果是結束信號,退出循環print(f'消費者消費了: {item}') # 打印當前消費的數字queue.task_done() # 通知隊列,當前任務已經完成print('消費者完成消費。') # 打印完成消費的消息
# 主協程,負責啟動生產者和消費者協程,并等待它們完成
async def main():queue = asyncio.LifoQueue(10) # 創建一個隊列實例,用于生產者和消費者之間的通信# 創建生產者和消費者協程producer_coro = producer(queue, 5) # 生產者協程,生產1到5的數字consumer_coro = consumer(queue) # 消費者協程# 使用asyncio.gather等待生產者和消費者協程完成# gather允許同時運行多個協程,并在它們都完成時返回結果await asyncio.gather(producer_coro, consumer_coro)# 等待隊列中的所有項目都被處理完畢await queue.join()print('所有任務都已處理完畢。')
# 運行主協程,啟動事件循環
asyncio.run(main())
'''
生產者生產了: 1
生產者生產了: 2
生產者生產了: 3
生產者生產了: 4
生產者生產了: 5
消費者消費了: 5
消費者消費了: 4
消費者消費了: 3
消費者消費了: 2
消費者消費了: 1
生產者完成生產。
消費者完成消費。
所有任務都已處理完畢。
'''
5.3、PriorityQueue
實現優先級隊列
asyncio.PriorityQueue(maxsize=0)?
maxsize:隊列的最大尺寸。如果設置為小于或等于0的數,則隊列的尺寸是無 限的。
常用方法:?
● ???????PriorityQueue.put((priority, item), block=True, timeout=None):將 item 放入隊列,并為其指定一個優先級 timeout 是 priority。如果 block 是 True 且 None(默認),則在必要時阻塞至有空閑的插槽。如果 timeout 是正數,將最多阻塞 timeout 秒,如果在這段時間內沒有可用的插槽,將引發 完全異常。
● ???????PriorityQueue.put_nowait((item, priority):相當于 PriorityQueue.put((item, priority), block=False)。如果隊列已滿,立 即引發完全異常。
● ???????PriorityQueue.get(block=True, timeout=None):從隊列中移除并返回一 個元素。如果 block 是 True 且 timeout 是 None(默認),則在必要時阻塞 至隊列中有項目可用。如果 timeout 是正數,將最多阻塞 timeout 秒,如果在 這段時間內沒有項目可用,將引發完全異常。
● ???????PriorityQueue.get_nowait():相當于 PriorityQueue.get(block=False)。如果隊列為空,立即引發完全異常。
● ???????PriorityQueue.qsize():返回隊列中的項目數量。
● ???????PriorityQueue.empty():如果隊列為空,返回True,否則返回False。
● ???????PriorityQueue.full():如果隊列已滿(達到最大尺寸),返回True,否則返回 False。?
●????????PriorityQueue.task_done():指示之前入隊的一個任務已經完成,即 來的元素相關操作已經完成。由隊列中的 get()端掌控,每次get用于一個任務 時,任務最后要調用 get出 task_done()告訴隊列,任務已經完成。?
import asyncio
# 生產者協程,負責生成一系列數字(這里實際上是字典鍵值對)并將它們放入隊列中
async def producer(queue, n):fx = {4: 'd', 5: 'e', 2: 'b', 3: 'c', 1: 'a'} # 定義一個字典,包含數字和字母的映射fx_tuples = [(key, value) for key, value in fx.items()] # 將字典轉換為元組列表for i in range(0, n): # 循環從0到n-1,但由于字典是無序的,這里的i僅用作索引限制# 注意:如果n大于fx_tuples的長度,將會引發IndexErrorprint(f'生產者生產了: {fx_tuples[i]}') # 打印當前生產的元組await queue.put(fx_tuples[i]) # 將元組放入隊列,如果隊列已滿,則阻塞直到有空位await asyncio.sleep(1) # 模擬生產耗時,等待1秒鐘print('生產者完成生產。') # 所有指定的元組生產完畢,打印完成消息await queue.put(None) # 放入一個None作為結束信號,通知消費者沒有更多元組
# 消費者協程,負責從隊列中取出元組并打印它們
async def consumer(queue):await asyncio.sleep(5) # 消費者在開始消費前等待5秒(模擬其他任務或延遲)while True: # 無限循環,直到接收到結束信號item = await queue.get() # 從隊列中取出一個元素,如果隊列為空,則阻塞直到有元素if item is None: # 檢查是否接收到結束信號queue.task_done() # 通知隊列,當前任務(即處理None作為結束信號的任務)已經完成break # 如果是結束信號,退出循環print(f'消費者消費了: {item}') # 打印當前消費的元組queue.task_done() # 通知隊列,當前任務(即處理一個元組的任務)已經完成print('消費者完成消費。') # 打印完成消費的消息
# 主協程,負責啟動生產者和消費者協程,并等待它們完成
async def main():queue = asyncio.PriorityQueue(10) # 創建一個優先隊列實例,用于生產者和消費者之間的通信,容量為10# 創建生產者和消費者協程producer_coro = producer(queue, 5) # 生產者協程,嘗試生產前5個字典中的鍵值對(注意字典無序)consumer_coro = consumer(queue) # 消費者協程# 使用asyncio.gather等待生產者和消費者協程完成# gather允許同時運行多個協程,并在它們都完成時返回結果(這里不關心具體返回值)await asyncio.gather(producer_coro, consumer_coro)# 等待隊列中的所有項目都被處理完畢(即等待所有task_done()被調用)await queue.join()print('所有任務都已處理完畢。')
# 運行主協程,啟動事件循環
asyncio.run(main())
'''
生產者生產了: (4, 'd')
生產者生產了: (5, 'e')
生產者生產了: (2, 'b')
生產者生產了: (3, 'c')
生產者生產了: (1, 'a')
消費者消費了: (1, 'a')
消費者消費了: (2, 'b')
消費者消費了: (3, 'c')
消費者消費了: (4, 'd')
消費者消費了: (5, 'e')
生產者完成生產。
消費者完成消費。
所有任務都已處理完畢。
'''
六、協程同步
與進程、線程類似,協程也有同步機制,包括Lock、Semaphore、Event、 Condition。
6.1、Lock
在協程中,可以使用Lock來確保同一時間只有一個協程可以訪問某個資源。
asyncio.Lock()?
其方法為:
????????acquire():獲取鎖。此方法會等待直至鎖為unlocked,然后將其設為locked并 返回True。當有一個以上的協程在 acquire()中被阻塞則會等待解鎖,最終只 有一個協程會被執行。鎖的獲取是公平的,被執行的協程將是第一個開始等待鎖 的協程。
????????release():釋放鎖。當鎖為locked時,將其設為unlocked并返回。如果鎖為 unlocked,則會拋出異常。
????????locked():如果鎖為locked則返回 True。?
為了避免死鎖,建議使用async with 來管理Lock。?
import asyncio # 導入asyncio模塊,提供異步編程的原語和工具
async def worker(lock, id): # 定義一個協程函數,接收一個鎖和一個標識符while True: # 無限循環,模擬持續工作的協程async with lock: # 使用async with語句獲取鎖,確保同一時間只有一個協程執行這部分代碼print(f"Worker {id} is working") # 打印當前協程正在工作的消息await asyncio.sleep(1) # 模擬I/O操作,掛起協程1秒鐘
async def main(): # 定義主協程函數,用于啟動和管理其他協程lock = asyncio.Lock() # 創建一個鎖,用于同步協程,防止它們同時執行某些代碼塊# 創建兩個協程,并將它們傳遞給asyncio.gather()函數# asyncio.gather()用于并發運行多個協程,并等待它們全部完成await asyncio.gather(worker(lock, 1), worker(lock, 2)) # 并發運行兩個worker協程
asyncio.run(main()) # 運行主協程,啟動事件循環并執行主協程
6.2、Semaphore
????????在協程中,可以使用Semaphore來控制對資源的訪問數量。Semaphore會管理一個 內部計數器,該計數器會隨每次 acquire調用遞減并隨每次 release調用遞增,計 數器的值永遠不會降到零以下。當 acquire發現其值為零時,它將保持阻塞直到有某 個任務調用了 release。
asyncio.Semaphore(value=1)?
value:value參數用來為內部計數器賦初始值,默認為1。如果給定的值小于0則會 拋出異常。?
其方法為:?
????????acquire():獲取一個信號量。如果內部計數器的值大于零,則將其減一并立即 返回True。如果其值為零,則會等待直到 release被調用。
????????release():釋放一個信號量,將內部計數器的值加一。可以喚醒一個正在等待 獲取信號量對象的任務。?
????????locked():如果信號量對象無法被立即獲取則返回True
建議使用async with來管理Semaphore。?
import asyncio
async def car(semaphore, car_id):"""模擬車輛進入停車場并離開的過程"""print(f"Car {car_id} 正在等待停車位")async with semaphore: # 獲取信號量,相當于獲取停車位print(f"Car {car_id} 進入停車場了.")await asyncio.sleep(2) # 模擬車輛在停車場內停留的時間print(f"Car {car_id} 離開停車場了")
# 信號量在退出async with塊時自動釋放,相當于車輛離開停車場
async def main():# 假設停車場只有3個停車位parking_spaces = asyncio.Semaphore(3)# 創建5個車輛協程cars = [car(parking_spaces, i) for i in range(1, 6)]# 并發運行所有車輛協程await asyncio.gather(*cars)
asyncio.run(main())
'''
Car 1 正在等待停車位
Car 1 進入停車場了.
Car 2 正在等待停車位
Car 2 進入停車場了.
Car 3 正在等待停車位
Car 3 進入停車場了.
Car 4 正在等待停車位
Car 5 正在等待停車位
Car 1 離開停車場了
Car 2 離開停車場了
Car 3 離開停車場了
Car 4 進入停車場了.
Car 5 進入停車場了.
Car 4 離開停車場了
Car 5 離開停車場了
'''
6.3、Event
????????在python中使用Event允許一個協程通知一個或多個協程某個事件已經發生。Event 對象會管理一個內部標志,可通過 set方法將其設為True并通過 設為False。 clear方法將其重 wait方法會阻塞直至該標志被設為True。該標志初始時會被設為 False。
asyncio.Event()?
其方法為:?
????????wait():協程等待直至事件被設置。如果事件已被設置,則立即返回True,否則將阻塞直至另一個任務調用set()
????????set():設置事件。所有等待事件被設置的任務將被立即喚醒。?
????????clear():清空(取消設置)事件。通過wait()進行等待的任務現在將會阻塞直至set()方法被再次調用。
????????is_set():如果事件已被設置則返回 True。
import asyncio
import random
async def producer(event, data):"""生產者協程,它在準備好數據后設置事件"""print(f"Producer is preparing data: {data}")time = random.uniform(0.5, 2)print(time)await asyncio.sleep(time) # 模擬數據準備時間print(f"Producer has prepared data: {data}")event.set() # 設置事件,表示數據已經準備好了print("Producer has notified the consumer.")
async def consumer(event):"""消費者協程,它在事件被設置后開始消費數據"""print("Consumer is waiting for data.")await event.wait() # 等待事件被設置print("Consumer has received the notification and is consuming data.")# 模擬數據處理await asyncio.sleep(random.uniform(0.5, 2))print("Consumer has finished consuming data.")
async def main():# 創建一個事件對象event = asyncio.Event()# 創建生產者和消費者協程producer_coro = producer(event, "data1")consumer_coro = consumer(event)# 并發運行生產者和消費者協程await asyncio.gather(producer_coro, consumer_coro)
asyncio.run(main())
'''
Producer is preparing data: data1
1.4196680751620212
Consumer is waiting for data.
Producer has prepared data: data1
Producer has notified the consumer.
Consumer has received the notification and is consuming data.
Consumer has finished consuming data.
'''
6.4、Condition
?????????在python中允許協程等待某些條件成立,然后被通知恢復執行。在本質上, Condition 對象合并了 Event和 Lock 的功能。 多個 Condition 對象有可能共享一個 Lock,這允許關注于共享資源的特定狀態的不同任務實現對共享資源的協同獨占訪 問。
asyncio.Condition(lock=None)?
????????lock:lock參數必須為自己創建的 Lock對象或None,在后一種情況下會自動創建?一個新的Lock對象。
其方法為:?
????????acquire():獲取下層的鎖。此方法會等待直至下層的鎖為 unlocked,將其設為 locked 并返回 True。
????????notify(n=1):喚醒最多 n 個正在等待此條件的任務(默認為 1 個)。如果沒 有任務正在等待則此方法為空操作。鎖必須在此方法被調用前被獲取并在隨后被 快速釋放。 如果通過一個 unlocked 鎖調用則會引發異常。
????????locked():如果下層的鎖已被獲取則返回 True。
????????notify_all():喚醒所有正在等待此條件的任務。此方法的行為類似于 notify,但會喚醒所有正在等待的任務。鎖必須在此方法被調用前被獲取并在 隨后被快速釋放。 如果通過一個 unlocked 鎖調用則會引發異常。
????????release():釋放下層的鎖。在未鎖定的鎖調用時,會引發異常。
????????wait():等待直至收到通知。當此方法被調用時如果調用方任務未獲得鎖,則 會引發異常。這個方法會釋放下層的鎖,然后保持阻塞直到被 notify()或 notify_all()調用所喚醒。 一旦被喚醒,Condition 會重新獲取它的鎖并且此 方法將返回 True。
????????wait_for(predicate):等待直到目標值變為 true。目標必須為一個可調用對 象,其結果將被解讀為一個布爾值。?
建議使用async with來管理Condition。?
import asyncio
# 生產者函數,負責通知所有等待的消費者
async def producer(condition):while True: # 無限循環,模擬持續的生產活動await condition.acquire() # 獲取條件變量的鎖condition.notify_all() # 通知所有等待的消費者condition.release() # 釋放條件變量的鎖await asyncio.sleep(1) # 暫停一秒,模擬生產活動的時間間隔
# 消費者函數,負責等待生產者的通知
async def consumer(condition, number):while True: # 無限循環,模擬持續的消費活動await condition.acquire() # 獲取條件變量的鎖print(f'{number}正在等待condition') # 打印消費者正在等待的通知await condition.wait() # 等待生產者的通知print(f'{number}已釋放condition') # 打印消費者收到通知后的消息condition.release() # 釋放條件變量的鎖
# 主函數,負責啟動生產者和消費者任務
async def main():condition = asyncio.Condition() # 創建一個條件變量# 創建任務列表,包括一個生產者和多個消費者tasks = [asyncio.create_task(producer(condition)), # 創建生產者任務asyncio.create_task(consumer(condition, 1)), # 創建消費者任務,編號為1asyncio.create_task(consumer(condition, 2)), # 創建消費者任務,編號為2asyncio.create_task(consumer(condition, 3)), # 創建消費者任務,編號為3asyncio.create_task(consumer(condition, 4)), # 創建消費者任務,編號為4asyncio.create_task(consumer(condition, 5)), # 創建消費者任務,編號為5]# 等待所有任務完成,由于生產者是無限循環,這里實際上會無限等待await asyncio.wait(tasks)
# 運行主函數,啟動事件循環
asyncio.run(main())
七、將協程分布到線程池/進程池中
????????一般情況下,程序的異步開發要么使用協程,要么使用進程池或線程池,但是也會碰 到有一些情況需要既使用協程又使用進程池或線程池,而進程池、線程池 submit后 返回的 Future和協程的 Future又不是一回事,不能直接使用await,因此就需要進 行一個對象的轉換。
????????在Python中,可以通過 asyncio.wrap_future()來將一個 concurrent.futures.Future轉化為asyncio.Future,這樣就可以去使用協程的相關內容了。
import asyncio
import concurrent.futures
import time
# 這是一個普通函數
def func1():time.sleep(5)print('in func1')
# 這是一個普通函數
def func2():time.sleep(3)print('in func2')
async def main():
# 創建一個進程池with concurrent.futures.ProcessPoolExecutor() as pool:# 使用進程池提交任務future1 = pool.submit(func1)future2 = pool.submit(func2)# 將 concurrent.futures.Future 轉換為 asyncio.Futureasync_future1 = asyncio.wrap_future(future1)async_future2 = asyncio.wrap_future(future2)# 使用 asyncio 的 await 等待結果result = await asyncio.gather(async_future1,async_future2)print(f"The result is {result}")
# 注意:進程就需要放到主模塊中去執行
if __name__ == '__main__':asyncio.run(main())
'''
in func2
in func1
The result is [None, None]
'''
?使用 loop.run_in_executor()直接轉換
????????使用 asyncio.get_running_loop()時,如果當前沒有正在運行的事件循環,就拋 出異常。而上面的 asyncio.get_event_loop()則是在當前沒有正在運行的事件循 環的基礎上,會創建一個新的事件循環。相對來說, asyncio.get_running_loop()更適合在協程或異步函數內部使用, asyncio.get_event_loop()適用于更廣泛的情況。?
loop.run_in_executor(executor, func, *args):?
????????executor:一個執行器對象,通常是 concurrent.futures.ThreadPoolExecutor 或 concurrent.futures.ProcessPoolExecutor 的實例。它管理同步函數的執 行,如果不指定就默認創建一個線程池。
????????func:要執行的同步函數。
????????*args:傳遞給 func的位置參數。?
import asyncio
import time# 示例同步函數,模擬耗時操作
def slow_function1():# 打印信息,表示函數開始執行print("Function 1 is running")# 模擬耗時操作,線程睡眠2秒time.sleep(2)# 打印信息,表示函數執行完畢print("Function 1 is done")# 返回函數執行結束的信息return 'func1 end'def slow_function2():# 打印信息,表示函數開始執行print("Function 2 is running")# 模擬耗時操作,線程睡眠2秒time.sleep(2)# 打印信息,表示函數執行完畢print("Function 2 is done")# 返回函數執行結束的信息return 'func2 end'async def main():# 獲取當前正在運行的事件循環loop = asyncio.get_running_loop()print('before run')# 使用線程池執行器并發運行兩個同步函數# run_in_executor的第一個參數為None,表示使用默認的線程池執行器task1 = loop.run_in_executor(None, slow_function1)task2 = loop.run_in_executor(None, slow_function2)print('after run')# 等待兩個函數執行完成,并獲取它們的返回值result1 = await task1result2 = await task2print('after await')# 打印兩個函數的執行結果print(f"Result of function 1: {result1}")print(f"Result of function 2: {result2}")if __name__ == '__main__':# 記錄程序開始執行的時間start = time.time()# 運行主函數asyncio.run(main())# asyncio.get_running_loop()# 打印程序執行的總時間print('total_time', time.time() - start)
'''
before run
Function 1 is running
Function 2 is running
after run
Function 1 is done
Function 2 is done
after await
Result of function 1: func1 end
Result of function 2: func2 end
total_time 2.0069408416748047
'''