在現代軟件開發中,性能是一個永恒的話題。特別是在處理網絡請求、文件讀寫等 I/O 密集型任務時,傳統的同步編程模型可能會因為等待而浪費大量時間。為了解決這個問題,異步編程應運而生。Python 通過內置的 asyncio
庫,為開發者提供了強大而優雅的異步編程能力。 [1][2]
本文將帶你從零開始,逐步深入 asyncio
的世界,理解其核心概念,并最終通過實戰案例掌握其用法。
1. 什么是異步編程?為什么要用它?
想象一下你在廚房做飯,需要同時燒水、切菜和炒菜。
- 同步 (Synchronous):你先把水壺放到灶上,然后就一直盯著它,直到水燒開。之后,你再去切菜,切完所有菜后,最后才開始炒菜。在這個過程中,當你在等待水燒開時,你什么也做不了,時間被白白浪費。
- 異步 (Asynchronous):你把水壺放到灶上后,就不管它了,直接去切菜。切菜的間隙,你抽空看一眼水開了沒。水一開,你就去處理。這樣,等待水燒開的時間被你用來切菜,整個做飯的效率大大提高。 [3]
代碼的世界也是如此。同步編程就是一次只做一件事,必須等前一件事(比如一次網絡請求)完成后才能做下一件。 [1] 而異步編程允許程序在等待一個耗時操作(通常是 I/O 操作)時,切換去執行其他任務,從而提高整體效率。 [1][3]
asyncio
正是 Python 用于實現這種高效工作模式的標準庫,它特別適合 I/O 密集型和高層級的網絡代碼。 [3]
2. asyncio
的核心基石:async
/await
與協程
要使用 asyncio
,首先需要理解幾個關鍵概念。
協程 (Coroutine)
在 Python 中,使用 async def
關鍵字定義的函數,我們稱之為協程函數。調用它并不會立即執行函數體,而是會返回一個協程對象。 [4] 協程可以被看作是一種可以暫停和恢復執行的特殊函數。 [1]
await
這個關鍵字只能在 async def
函數內部使用。它的作用是“等待”一個可等待對象 (Awaitable) 執行完成。 [5] 可等待對象包括協程、任務 (Task) 和 Future 對象。 [5] 當程序執行到 await
時,它會告訴事件循環:“這個操作有點耗時,我先在這里暫停,你可以去忙別的,等我好了再回來繼續。” [6]
事件循環 (Event Loop)
事件循環是 asyncio
的心臟。 [2][6] 你可以把它想象成一個大管家,負責調度和執行所有的異步任務。它會不斷檢查是否有任務已經準備好可以繼續運行,或者是否有新的任務需要開始。 [2]
asyncio.run()
這是啟動異步程序的入口。它會創建一個新的事件循環,運行你傳入的頂級協程(通常是 main
函數),并在協程執行完畢后關閉事件循環。 [7][8]
3. 牛刀小試:你的第一個 asyncio
程序
讓我們來看一個最簡單的例子。
import asyncio
import time# 使用 async def 定義一個協程函數
async def say_hello(delay, message):"""一個簡單的協程,會延遲指定秒數后打印消息。"""print(f"[{time.strftime('%X')}] 開始任務: {message}")# asyncio.sleep 是一個異步的 time.sleep()# 當遇到 await asyncio.sleep() 時,事件循環會切換到其他任務await asyncio.sleep(delay)print(f"[{time.strftime('%X')}] 完成任務: {message}")# 定義主入口協程
async def main():print(f"[{time.strftime('%X')}] 程序開始")# 直接 await 調用協程await say_hello(2, "你好")await say_hello(1, "世界")print(f"[{time.strftime('%X')}] 程序結束")# 使用 asyncio.run() 啟動程序
if __name__ == "__main__":asyncio.run(main())
運行結果分析:
[13:30:00] 程序開始
[13:30:00] 開始任務: 你好
[13:30:02] 完成任務: 你好
[13:30:02] 開始任務: 世界
[13:30:03] 完成任務: 世界
[13:30:03] 程序結束
你會發現,這段代碼雖然是異步的,但執行順序和同步代碼一樣,總共耗時 3 秒。這是因為我們依次 await
了兩個協程,必須等第一個完成后,第二個才會開始。
那么,如何讓它們“同時”運行呢?
4. 并發執行:asyncio.gather
與 asyncio.create_task
為了真正實現并發,我們需要讓多個任務在事件循環中同時被調度。 [3]
asyncio.gather
asyncio.gather()
可以接收一個或多個可等待對象,將它們并發執行,并按輸入順序返回所有結果。 [9]
修改上面的 main
函數:
async def main():print(f"[{time.strftime('%X')}] 程序開始")# 使用 asyncio.gather 并發運行兩個協程await asyncio.gather(say_hello(2, "你好"),say_hello(1, "世界"))print(f"[{time.strftime('%X')}] 程序結束")# ... 其他代碼不變 ...
新的運行結果:
[13:32:10] 程序開始
[13:32:10] 開始任務: 你好
[13:32:10] 開始任務: 世界
[13:32:11] 完成任務: 世界
[13:32:12] 完成任務: 你好
[13:32:12] 程序結束
觀察時間戳,兩個任務幾乎是同時開始的。耗時1秒的任務先結束,耗時2秒的后結束。整個程序的總耗時取決于最長的那個任務,也就是 2 秒,而不是之前的 3 秒。這就是并發帶來的效率提升! [3]
asyncio.create_task
asyncio.create_task()
用于將一個協程包裝成一個任務 (Task),并提交給事件循環立即開始執行,而不需要馬上 await
它。 [6][7] Task
是 Future
的一個子類,專門用于管理協程。 [4]
這就像是“發射后不管”(fire-and-forget),你創建了一個任務讓它在后臺運行,然后可以繼續做其他事情。 [6]
async def main():print(f"[{time.strftime('%X')}] 程序開始")# 創建任務,任務會立即開始在事件循環中被調度task1 = asyncio.create_task(say_hello(2, "你好"))task2 = asyncio.create_task(say_hello(1, "世界"))print(f"[{time.strftime('%X')}] 任務已創建")# 在這里可以做其他事情await asyncio.sleep(0.5)print(f"[{time.strftime('%X')}] 主程序做了一些其他工作")# 等待任務完成await task1await task2print(f"[{time.strftime('%X')}] 程序結束")
運行結果:
[13:35:20] 程序開始
[13:35:20] 任務已創建
[13:35:20] 開始任務: 你好
[13:35:20] 開始任務: 世界
[13:35:20] 主程序做了一些其他工作
[13:35:21] 完成任務: 世界
[13:35:22] 完成任務: 你好
[13:35:22] 程序結束
create_task
和 gather
的區別在于控制的粒度。gather
是一種更高級的抽象,適合一次性并發運行多個任務并收集結果的場景。 [9] create_task
則提供了更靈活的控制,允許你在任務運行期間執行其他邏輯。 [6][9]
5. 實戰演練:使用 aiohttp
并發下載網頁
理論講了這么多,讓我們來看一個最能體現 asyncio
價值的場景:并發網絡請求。我們將使用流行的異步 HTTP 客戶端庫 aiohttp
。 [10][11]
首先,你需要安裝 aiohttp
:
pip install aiohttp
下面的例子將對比同步和異步方式獲取多個網頁標題所花費的時間。
import asyncio
import time
import aiohttp
import requests # 用于同步對比urls = ['https://www.python.org','https://github.com','https://www.wikipedia.org','https://www.youtube.com','https://www.amazon.com',
]def get_title_sync(url):"""同步獲取網頁標題"""try:resp = requests.get(url, timeout=10)# 一個簡單的解析,實際應用中建議使用 BeautifulSoupreturn resp.text.split('<title>')[1].split('</title>')[0].strip()except Exception as e:return f"Error: {e}"async def get_title_async(session, url):"""異步獲取網頁標題"""try:# aiohttp 使用 session.get() 發起請求async with session.get(url, timeout=10) as resp:# resp.text() 是一個協程,需要 awaithtml = await resp.text()return html.split('<title>')[1].split('</title>')[0].strip()except Exception as e:return f"Error: {e}"async def main_async():# aiohttp 建議使用一個 ClientSession 來執行所有請求async with aiohttp.ClientSession() as session:tasks = [get_title_async(session, url) for url in urls]# 使用 gather 并發執行所有任務titles = await asyncio.gather(*tasks)for url, title in zip(urls, titles):print(f"{url}: {title}")if __name__ == "__main__":# --- 同步版本 ---print("--- 開始同步請求 ---")start_time_sync = time.time()for url in urls:title = get_title_sync(url)print(f"{url}: {title}")end_time_sync = time.time()print(f"同步請求總耗時: {end_time_sync - start_time_sync:.2f} 秒\n")# --- 異步版本 ---print("--- 開始異步請求 ---")start_time_async = time.time()asyncio.run(main_async())end_time_async = time.time()print(f"異步請求總耗時: {end_time_async - start_time_async:.2f} 秒")
典型的運行結果:
--- 開始同步請求 ---
https://www.python.org: Welcome to Python.org
https://github.com: GitHub: Let’s build from here
https://www.wikipedia.org: Wikipedia
https://www.youtube.com: YouTube
https://www.amazon.com: Amazon.com. Spend less. Smile more.
同步請求總耗時: 4.58 秒--- 開始異步請求 ---
https://www.python.org: Welcome to Python.org
https://github.com: GitHub: Let’s build from here
https://www.wikipedia.org: Wikipedia
https://www.youtube.com: YouTube
https://www.amazon.com: Amazon.com. Spend less. Smile more.
異步請求總耗時: 0.95 秒
結果一目了然。異步版本的速度比同步版本快了數倍。 [5] 這是因為 asyncio
在等待一個網站響應時,沒有閑著,而是立即去請求下一個網站,極大地利用了網絡 I/O 的等待時間。 [11]
6. 進階:協程間的同步與通信
當我們有多個協程并發運行時,有時它們需要訪問同一個資源,或者需要相互傳遞工作任務。這時,為了避免數據混亂和協調工作流程,就需要用到同步和通信機制。asyncio
提供了與多線程編程中類似的工具,但它們是為協程專門設計的。
6.1 資源保護:asyncio.Lock
在并發環境中,如果多個任務同時嘗試修改一個共享資源(例如一個變量或文件),就可能導致競爭條件 (Race Condition),使得最終結果不可預測。
雖然 asyncio
在單線程上運行,不會有真正的并行執行,但一個協程可以在 await
處被掛起,此時事件循環會運行另一個協程。如果這兩個協程都在修改同一個數據,問題依然存在。
asyncio.Lock
就是用來解決這個問題的。它保證在任何時候,只有一個協程能夠獲得鎖并執行“臨界區”代碼。
使用場景:保護對共享資源的訪問,確保操作的原子性。
讓我們看一個例子:多個協程同時增加一個共享計數器。
import asyncio# 一個共享的資源
shared_counter = 0async def unsafe_worker():"""一個沒有鎖保護的協程"""global shared_counter# 1. 讀取當前值current_value = shared_counter# 在這里,協程可能會被掛起,切換到另一個 workerawait asyncio.sleep(0.01) # 2. 基于舊值計算新值new_value = current_value + 1# 3. 寫入新值shared_counter = new_valueasync def safe_worker(lock):"""一個有鎖保護的協程"""global shared_counter# 使用 async with lock 語法可以自動獲取和釋放鎖async with lock:current_value = shared_counterawait asyncio.sleep(0.01)new_value = current_value + 1shared_counter = new_valueasync def main():global shared_counter# --- 演示不安全的情況 ---print("--- 演示不安全的情況 ---")shared_counter = 0tasks_unsafe = [unsafe_worker() for _ in range(100)]await asyncio.gather(*tasks_unsafe)print(f"沒有鎖保護,100個任務完成后的計數器值: {shared_counter}") # 結果通常遠小于100# --- 演示安全的情況 ---print("\n--- 演示安全的情況 ---")shared_counter = 0lock = asyncio.Lock()tasks_safe = [safe_worker(lock) for _ in range(100)]await asyncio.gather(*tasks_safe)print(f"使用鎖保護,100個任務完成后的計數器值: {shared_counter}") # 結果總是100if __name__ == "__main__":asyncio.run(main())
代碼解讀與結果分析:
unsafe_worker
: 在讀取 (current_value = ...
) 和寫入 (shared_counter = ...
) 之間有一個await
。這給了事件循環切換到另一個unsafe_worker
的機會。多個 worker 可能會基于同一個舊值進行計算,導致一些增加操作丟失。因此,最終結果會小于 100。safe_worker
: 使用了async with lock:
。當一個協程進入這個代碼塊時,它會獲取鎖。如果此時其他協程也想進入,它們必須await
,直到第一個協程執行完畢并自動釋放鎖。這確保了“讀-改-寫”這個操作的完整性,所以最終結果總是正確的 100。
6.2 任務分發:asyncio.Queue
asyncio.Queue
是一個為異步編程設計的隊列,它非常適合經典的生產者-消費者 (Producer-Consumer) 模型。
- 生產者 (Producer):創建任務或數據,并將其放入隊列。
- 消費者 (Consumer):從隊列中取出任務或數據,并進行處理。
隊列本身處理了所有的同步邏輯:
- 如果消費者試圖從空隊列中獲取 (
get
) 數據,它會自動await
,直到隊列中有新數據。 - 如果生產者試圖向一個已滿的隊列(如果創建時指定了
maxsize
)中放入 (put
) 數據,它會自動await
,直到隊列有空位。
使用場景:解耦任務的創建和執行,實現任務分發系統,控制并發處理任務的數量。
讓我們構建一個簡單的爬蟲模型:一個生產者負責發現 URL 并放入隊列,多個消費者負責從隊列中取出 URL 并“下載”。
import asyncio
import randomasync def producer(queue, num_urls):"""生產者:生成一些模擬的URL并放入隊列"""print("生產者啟動...")for i in range(num_urls):url = f"https://example.com/page/{i}"# 模擬發現URL需要一些時間await asyncio.sleep(random.uniform(0.1, 0.5))# 將URL放入隊列await queue.put(url)print(f"生產者放入: {url}")print("生產者完成任務。")async def consumer(name, queue):"""消費者:從隊列中獲取URL并處理"""print(f"消費者 {name} 啟動...")# 持續從隊列中獲取任務while True:# 從隊列中獲取URL,如果隊列為空,會在此處等待url = await queue.get()print(f"消費者 {name} 正在處理: {url}")# 模擬處理任務需要的時間await asyncio.sleep(random.uniform(0.5, 1.5))print(f"消費者 {name} 完成處理: {url}")# 必須調用 task_done() 來通知隊列這個任務已經處理完畢queue.task_done()async def main():# 創建一個不限大小的隊列task_queue = asyncio.Queue()num_urls_to_produce = 10num_consumers = 3# 啟動生產者producer_task = asyncio.create_task(producer(task_queue, num_urls_to_produce))# 啟動多個消費者consumer_tasks = []for i in range(num_consumers):task = asyncio.create_task(consumer(f"C{i+1}", task_queue))consumer_tasks.append(task)# 等待生產者完成所有URL的放入await producer_taskprint("所有URL已放入隊列,等待消費者處理...")# 等待隊列中的所有任務都被處理完畢# queue.join() 會阻塞,直到隊列中每個項目的 task_done() 都被調用await task_queue.join()print("所有任務處理完畢!")# 所有任務都處理完了,消費者們還在 while True 循環里等待新任務# 為了讓程序能正常退出,我們需要取消這些消費者任務for task in consumer_tasks:task.cancel()if __name__ == "__main__":asyncio.run(main())
代碼解讀與關鍵點:
queue.put(item)
: 生產者使用它來異步地添加項目。queue.get()
: 消費者使用它來異步地獲取項目。這是主要的同步點。queue.task_done()
: 這是至關重要的一步!消費者處理完一個項目后,必須調用此方法。它會減少隊列的內部計數器。queue.join()
:main
函數用它來等待所有項目都被處理。它會一直阻塞,直到隊列的內部計數器歸零。這確保了我們在程序結束前,所有工作都已完成。- 任務取消: 因為消費者通常在一個無限循環中工作,當所有工作完成后,我們需要顯式地取消它們,否則
asyncio.run(main())
將永遠不會退出。
7. 總結
asyncio
為 Python 帶來了強大的并發能力,是構建高性能網絡應用和服務的利器。
核心要點回顧:
- 適用場景:I/O 密集型任務(如網絡爬蟲、Web 服務器、數據庫連接等)。
- 核心語法:
async def
定義協程,await
暫停協程并等待結果。 - 啟動方式:
asyncio.run()
是現代 Python 中啟動異步程序的標準方式。 - 并發執行:使用
asyncio.gather()
或asyncio.create_task()
來并發運行多個任務。 - 同步與通信:使用
asyncio.Lock
保護共享資源,避免競爭條件;使用asyncio.Queue
構建生產者-消費者模型,高效地分發和處理任務。 - 生態系統:需要配合
aiohttp
,aiodns
,asyncpg
等異步庫才能發揮最大威力。
從 Python 3.4 首次引入 asyncio
至今,它已經變得越來越成熟和易用。雖然異步編程的思維方式需要一些時間來適應,但一旦你掌握了它,它將成為你工具箱中應對高并發挑戰的一把“瑞士軍刀”。希望這篇博客能為你打開異步編程的大門。
參考文章
- asyncio 教程- 什么是異步? - Graia 官方文檔
- Python asyncio 模塊 - 菜鳥教程
- Asyncio in Python: A Comprehensive Guide with Examples. | by Obafemi - Medium
- 使用asyncio - python并發編程-中文版
- Python asyncio 從不會到上路 - MyApollo
- Solve Common Asynchronous Scenarios With Python’s “asyncio” - Better Programming
- Coroutines and Tasks — Python 3.13.5 documentation
- 使用asyncio - Python教程- 廖雪峰的官方網站
- Is it more efficient to use create_task(), or gather()? - Stack Overflow
- python asyncio 異步I/O - 實現并發http請求(asyncio + aiohttp) - yuminhu - 博客園
- python asyncio 異步I/O - 實現并發http請求(asyncio + aiohttp) - 上海-悠悠- 博客園
- asyncio教程原創 - CSDN博客