前言
在現代編程中,異步編程已成為提高程序效率和性能的重要方式。
Python 作為一種流行的編程語言,自然也提供了強大的異步編程支持。
本文將詳細介紹 Python 中的協程,以及 async def、async for、await 和 yield 等關鍵字的使用。
協程簡介
協程是一種比傳統函數更高級的控制結構。
它們在一個過程中暫停,然后在另一個地方恢復執行。
協程可以在程序的多個點之間切換,從而實現并發執行,而無需多線程或多進程的開銷。
協程 vs 線程
與線程不同,協程由程序員手動控制其切換。
線程在操作系統級別進行調度,可能導致頻繁的上下文切換開銷。
協程則由 Python 解釋器調度,開銷更低,且不會發生競爭資源的問題。
一、async def 和 await
在
Python 3.5
及之后版本中,引入了async
和await
關鍵字,使得定義和調用協程變得更為簡潔和直觀。
async def
async def 用于定義一個協程函數。
與普通函數不同,協程函數在調用時不會立即執行,而是返回一個協程對象,直到被 await 調用時才會運行。
import asyncioasync def my_coroutine():print("Hello")await asyncio.sleep(1)print("World")# 調用協程函數
coroutine = my_coroutine()
await
await 用于暫停協程的執行,等待另一個協程完成,并獲取其結果。
await 后面必須跟隨一個可等待對象。
如協程、Future對象或其他實現了 __await__方法的對象。
async def main():print("Start")await my_coroutine()print("End")# 運行主協程
# asyncio.run(main())
- 在上面的示例中,
await my_coroutine()
會暫停main
的執行,直到my_coroutine
運行結束。 asyncio.run
這個函數是Python 3.7
之后才有的特性。- 可以讓
Python
的協程接口變得非常簡單,一個好的編程規范是,asyncio.run(main())
作為主程序的入口函數,在程序運行周期內,只調用一次asyncio.run
。
二、async for 和 async with
Python 3.6 引入了 async for 和 async with,使得異步迭代和上下文管理變得更加方便。
async for
async for
用于異步迭代可等待對象的異步迭代器。它的工作方式類似于普通的
for
循環,但可以在異步環境中使用。
class AsyncIterator:def __init__(self):self.count = 0async def __aiter__(self):return selfasync def __anext__(self):if self.count < 5:self.count += 1return self.countelse:raise StopAsyncIterationasync def async_for_example():async for number in AsyncIterator():print(number)asyncio.run(async_for_example())
async with
async with
用于異步上下文管理器。它的作用與
with
語句類似,但適用于異步環境,確保在異步操作前后執行特定的設置和清理操作。
class AsyncContextManager:async def __aenter__(self):print("Enter context")return selfasync def __aexit__(self, exc_type, exc, tb):print("Exit context")async def async_with_example():async with AsyncContextManager() as manager:print("Inside context")asyncio.run(async_with_example())
三、yield 和 yield from
yield 和 yield from 是生成器相關的關鍵字。
但它們也可以用于協程中,尤其是在生成器協程(Python 3.3之前的異步實現)中。
yield
yield
用于定義生成器函數。生成器函數在每次
yield
語句處暫停,并在下次調用next()
方法時繼續執行。
def simple_generator():yield 1yield 2yield 3for value in simple_generator():print(value)
yield from
yield from 用于委派生成器,允許一個生成器將部分操作委托給另一個生成器。
def generator1():yield 1yield 2def generator2():yield from generator1()yield 3for value in generator2():print(value)
在異步編程中,yield 和 yield from 也可以用于異步生成器和異步迭代器。
四、create_task 和 gather
asyncio.create_task 和 asyncio.gather 是兩個重要的工具,用于并發運行多個協程。
asyncio.create_task
asyncio.create_task
用于將協程包裝成任務,使其能夠在事件循環中并發運行。
import asyncioasync def task1():await asyncio.sleep(1)print("Task 1 completed")async def task2():await asyncio.sleep(2)print("Task 2 completed")async def main():task1_task = asyncio.create_task(task1())task2_task = asyncio.create_task(task2())# 等待所有任務完成await task1_taskawait task2_taskasyncio.run(main())
在這個示例中,我們創建了兩個任務 task1
和 task2
。
并通過 asyncio.create_task
將它們包裝成可并發運行的任務。
然后,我們使用 await
等待所有任務完成。
asyncio.gather
asyncio.gather
用于并行運行多個協程,并收集它們的結果。它比
create_task
更加方便,尤其是當我們需要同時運行多個任務并獲取它們的結果時。
import asyncioasync def task1():await asyncio.sleep(1)print("Task 1 completed")return "Result 1"async def task2():await asyncio.sleep(2)print("Task 2 completed")return "Result 2"async def main():results = await asyncio.gather(task1(), task2())print(results)asyncio.run(main())
在這個示例中,asyncio.gather
并行運行 task1
和 task2
。
并在所有任務完成后返回一個包含結果的列表,這樣我們可以更方便地管理和處理多個協程任務。
異常處理與取消任務
在實際應用中,協程可能會拋出異常,或者需要在執行過程中取消某些任務。
我們可以通過
asyncio.gather
的return_exceptions
參數來收集異常,同時也可以使用cancel
方法來取消任務。
import asyncioasync def worker_1():await asyncio.sleep(1)return 1async def worker_2():await asyncio.sleep(2)return 2 / 0async def worker_3():await asyncio.sleep(3)return 3async def main():task_1 = asyncio.create_task(worker_1())task_2 = asyncio.create_task(worker_2())task_3 = asyncio.create_task(worker_3())await asyncio.sleep(2)task_3.cancel()res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)print(res)# 輸出: [1, ZeroDivisionError('division by zero'), CancelledError()]
asyncio.run(main())
- 在這個示例中,我們創建了三個任務
worker_1、worker_2 和 worker_3
。 - 其中
worker_2
會拋出一個除零異常
,而worker_3
會在執行過程中被取消。 - 我們使用
asyncio.gather
來收集所有任務的結果,并通過return_exceptions=True
參數捕獲所有異常。 - 最終的輸出包含了正常完成任務的結果、拋出的異常以及取消任務的狀態。
通過 asyncio.create_task
和 asyncio.gather
,我們可以有效地并行運行多個協程任務,極大地提高程序的并發性能。
這在處理大量I/O操作或需要同時執行多個獨立任務的場景中尤為重要。
五、并發度控制asyncio.Semaphore
asyncio.Semaphore
是Python
的asyncio
模塊中的一個重要工具,用于控制并發任務的數量。它在處理大量并發操作時尤為重要,尤其是在需要限制同時運行的任務數量以避免過載或超出限制的場景中。
asyncio.Semaphore
是一種異步互斥量
,允許在同一時間內有固定數量
的任務訪問某個資源。它可以幫助你在異步編程中控制并發級別,
防止系統過載或超出外部服務的限制。
工作原理
1. 初始化:
semaphore = asyncio.Semaphore(value)
value
表示信號量的初始值,也即允許同時運行的任務數量。- 默認值是
1
,表示互斥量(類似于鎖)。
2. 獲取信號量:
async with semaphore:# 受控的代碼塊
async with semaphore
是異步上下文管理器,獲取信號量(即允許繼續執行的許可證)并進入受控的代碼塊。- 當代碼塊執行完畢時,信號量會自動釋放,使其他任務能夠繼續執行。
3. 釋放信號量:
- 在
async with
語句塊結束時,信號量會自動釋放。 - 這確保了每個獲取信號量的操作都有一個匹配的釋放操作。
使用場景
- 控制并發任務數量:
使用 asyncio.Semaphore
來限制同時進行的任務數量。例如,當處理大量網絡請求時,控制并發度可以防止超出 API 的速率限制或避免過載。
- 避免資源爭用:
當多個任務訪問共享資源時,信號量可以確保資源訪問的有序性和一致性,避免資源爭用問題。
示例
以下是一個簡單的示例,展示了如何使用 asyncio.Semaphore
來限制同時運行的任務數量:
import asyncioasync def worker(semaphore, worker_id):async with semaphore:print(f"Worker {worker_id} is working")await asyncio.sleep(1)print(f"Worker {worker_id} has finished")async def main():semaphore = asyncio.Semaphore(3) # Limit concurrency to 3tasks = [worker(semaphore, i) for i in range(10)]await asyncio.gather(*tasks)if __name__ == "__main__":asyncio.run(main())
在這個例子中,Semaphore(3)
限制了最多同時運行 3
個 worker
任務。
當有更多任務時,它們必須等待直到有信號量可用。
因此上述代碼的運行日志為:
Worker 0 is working
Worker 1 is working
Worker 2 is working
Worker 0 has finished
Worker 1 has finished
Worker 2 has finished
Worker 3 is working
Worker 4 is working
Worker 5 is working
Worker 3 has finished
Worker 4 has finished
Worker 5 has finished
Worker 6 is working
Worker 7 is working
Worker 8 is working
Worker 6 has finished
Worker 7 has finished
Worker 8 has finished
Worker 9 is working
Worker 9 has finished
總結
asyncio.Semaphore
是控制異步操作并發度的一個強大工具,它能夠有效管理任務并發,避免超載和資源爭用。理解和正確使用信號量可以幫助你在異步編程中實現更高效、更可靠的代碼。
六、協程與生成器的關系
協程與生成器有很多相似之處,都能夠在函數執行過程中暫停并恢復,但它們的設計目的和使用場景有所不同。
相似之處
暫停與恢復:兩者都可以在執行過程中暫停,并在之后恢復。
關鍵字:協程使用
await
暫停執行,生成器使用yield
暫停執行。
不同之處
生成器:主要用于生成一系列值,常用于迭代。
協程:主要用于處理異步操作,管理并發任務。
生成器:使用
yield
關鍵字。協程:使用
async def
定義,await
關鍵字用于暫停。
控制流:
生成器:由調用方(迭代器)控制。
協程:由事件循環控制。
結合使用
在某些情況下,可以結合使用生成器和協程。
例如,在異步生成器中使用
yield
生成值,并使用await
等待異步操作完成。
async def async_generator():for i in range(5):await asyncio.sleep(1)yield iasync def main():async for value in async_generator():print(value)asyncio.run(main())
在這個示例中,我們定義了一個異步生成器函數 async_generator
,它每秒生成一個值,并在主協程中異步迭代這些值。
七、實際應用場景
異步編程在實際中有廣泛的應用,尤其是在處理I/O密集型任務時,如網絡請求、文件操作等。
通過異步編程,可以在等待I/O操作時執行其他任務,從而提高程序的并發性能。
異步網絡請求
import aiohttpasync def fetch(session, url):async with session.get(url) as response:return await response.text()async def main():async with aiohttp.ClientSession() as session:html = await fetch(session, 'http://example.com')print(html)asyncio.run(main())
在這個示例中,我們使用 aiohttp
庫進行異步網絡請求,大大提高了效率。
異步文件操作
import aiofilesasync def read_file(filename):async with aiofiles.open(filename, 'r') as f:contents = await f.read()print(contents)asyncio.run(read_file('example.txt'))
通過 aiofiles
庫,我們可以實現異步的文件讀寫操作,提高文件I/O操作的性能。
異步大模型流式服務
在大模型(如GPT-4)
相關的應用中,流式服務是一種常見的需求。
通過異步編程,可以實現高效的流式數據處理,提高服務響應速度。
import asyncioasync def stream_handler(reader, writer):while True:data = await reader.read(100)if not data:breakprint(f"Received: {data.decode()}")response = f"Echo: {data.decode()}"writer.write(response.encode())await writer.drain()writer.close()await writer.wait_closed()async def main():server = await asyncio.start_server(stream_handler, '127.0.0.1', 8888)async with server:await server.serve_forever()asyncio.run(main())
在這個示例中,我們使用 asyncio
庫創建了一個簡單的流式服務。
客戶端發送的數據會被接收并立即返回給客戶端,實現了基本的流式處理功能。
八、總結
Python
的協程和異步編程為開發高效的并發程序提供了強大的工具。通過
async def、await、async for 和 async with 等關鍵字
,我們可以編寫簡潔、易讀的異步代碼。理解和熟練應用這些關鍵字,將大大提高你的編程效率和程序性能。