一、A Bit of Jargon
1、關鍵術語解析
1.1 并發 (Concurrency)
定義:
并發是指同時處理多個待處理任務的能力,這些任務可以依次或并行(如果可能)進行,最終每個任務都會成功或失敗。
理解:
- 單核 CPU: 即使是單核 CPU 也可以實現并發,因為操作系統調度器會交錯執行多個任務,給人一種同時進行的錯覺。
- 多任務處理: 并發也常被稱為多任務處理。
舉例:
想象你在廚房里同時做三道菜:炒菜、煮湯和烤面包。你不會同時做所有事情,而是會交替進行,例如先炒一會兒菜,然后去攪拌湯,再去查看面包的狀態。這種交替進行的方式就是并發。
1.2 并行 (Parallelism)
定義:
并行是指同時執行多個計算任務的能力。
理解:
- 硬件要求: 并行需要多核 CPU、多個 CPU、GPU 或集群中的多臺計算機。
- 與并發的區別: 并發強調的是任務處理的交替進行,而并行則是真正的同時進行。
舉例:
回到廚房的例子,如果你有多個爐灶和幫手,你們可以同時進行炒菜、煮湯和烤面包,這就是并行。
1.3 執行單元 (Execution Unit)
定義:
執行單元是執行代碼的并發對象,每個執行單元都有獨立的執行狀態和調用棧。
Python 原生支持的執行單元:
- 進程 (Process): 獨立的內存空間,隔離性強,通信成本高。
- 線程 (Thread): 共享內存空間,通信方便,但需要處理數據競爭問題。
- 協程 (Coroutine): 在單個線程內運行,由事件循環調度,資源消耗低,但需要協作式調度。
1.4 進程 (Process)
定義:
進程是正在運行的計算機程序的實例,使用內存和 CPU 時間片。
特點:
- 隔離性: 每個進程擁有獨立的內存空間,彼此隔離。
- 通信方式: 進程間通過管道、套接字或內存映射文件進行通信,這些方式只能傳輸原始字節,因此 Python 對象需要序列化。
- 資源消耗: 進程比線程消耗更多資源。
- 搶占式多任務: 操作系統調度器會定期搶占(暫停)正在運行的進程,讓其他進程運行。這意味著一個凍結的進程不會凍結整個系統(理論上)。
舉例:
在 Python 中,可以使用 multiprocessing
或 concurrent.futures
庫啟動額外的 Python 進程。
1.5 線程 (Thread)
定義:
線程是進程內的執行單元。
特點:
- 共享內存: 同一進程內的線程共享相同的內存空間,這使得線程間數據共享變得容易,但也可能導致數據損壞(當多個線程同時更新同一個對象時)。
- 資源消耗: 線程比進程消耗更少的資源。
- 搶占式多任務: 線程也在操作系統調度器的監督下進行搶占式多任務處理。
舉例:
在 Python 中,可以使用 threading
或 concurrent.futures
庫創建額外的線程。
1.6 協程 (Coroutine)
定義:
協程是一種可以暫停自身執行并在之后恢復的函數。
Python 中的協程:
- 經典協程: 由生成器函數構建。
- 原生協程: 使用
async def
定義。
特點:
- 事件循環: Python 協程通常在單個線程內運行,由事件循環調度。
- 協作式多任務: 每個協程必須顯式地使用
yield
或await
關鍵字讓出控制權,以便其他協程可以并發(但不是并行)執行。 - 阻塞問題: 協程中的任何阻塞代碼都會阻塞事件循環和所有其他協程,這與進程和線程支持的搶占式多任務形成對比。
- 資源消耗: 每個協程消耗的資源比線程或進程少。
舉例:
使用 asyncio
, Curio
或 Trio
等異步編程框架可以提供事件循環和用于非阻塞、基于協程的 I/O 的支持庫。
1.7 隊列 (Queue)
定義:
隊列是一種數據結構,允許我們按 FIFO(先進先出)順序放入和取出項目。
用途:
隊列允許不同的執行單元交換應用程序數據和控制消息,例如錯誤代碼和終止信號。
Python 中的隊列實現:
queue
模塊: 提供支持線程的隊列類。multiprocessing
和asyncio
包: 實現自己的隊列類。- 其他類型的隊列:
LifoQueue
(后進先出)和PriorityQueue
(優先級隊列)。
1.8 鎖 (Lock)
定義:
鎖是執行單元可以用來同步其操作并避免數據損壞的對象。
工作原理:
當更新共享數據結構時,運行中的代碼應持有相關的鎖。這向程序的其他部分發出信號,在鎖釋放之前等待訪問相同的數據結構。
最簡單類型的鎖:
互斥鎖(mutex,用于互斥)。
1.9 競爭 (Contention)
定義:
競爭是指對有限資源的爭奪。
- 資源競爭: 當多個執行單元嘗試訪問共享資源(例如鎖或存儲)時,就會發生資源競爭。
- CPU 競爭: 當計算密集型進程或線程必須等待操作系統調度器分配 CPU 時間片時,就會發生 CPU 競爭。
2、Python 中的并發支持
2.1 進程和線程與 GIL
GIL (Global Interpreter Lock):
- 定義: GIL 是控制對對象引用計數和其他內部解釋器狀態的鎖。
- 作用: 在任何時候只有一個 Python 線程可以持有 GIL,這意味著即使有多個 CPU 核心,也只有一個線程可以執行 Python 代碼。
- 影響:
- 多線程性能: GIL 限制了多線程在 CPU 密集型任務上的性能,因為線程之間會競爭 GIL,導致上下文切換開銷。
- I/O 密集型任務: 對于 I/O 密集型任務,GIL 的影響較小,因為 I/O 操作會釋放 GIL,線程可以在等待 I/O 時讓出 CPU。
- CPU 密集型任務: 對于 CPU 密集型任務,順序的單線程代碼通常更簡單、更快。
GIL 的釋放:
- 內置函數和 C 擴展: 任何在 Python/C API 級別集成的內置函數或 C 擴展都可以在執行耗時任務時釋放 GIL。
- 標準庫函數: 所有執行系統調用的標準庫函數都會釋放 GIL,包括所有執行磁盤 I/O、網絡 I/O 和
time.sleep()
的函數。 - NumPy/SciPy: NumPy/SciPy 庫中的許多 CPU 密集型函數以及
zlib
和bz2
模塊中的壓縮/解壓縮函數也會釋放 GIL。 - 非 Python 線程: 在 Python/C API 級別集成的擴展也可以啟動其他不受 GIL 影響的非 Python 線程,但這些線程通常不能更改 Python 對象。
GIL 對網絡編程的影響:
由于 I/O 操作會釋放 GIL,并且網絡讀寫總是意味著高延遲(與內存讀寫相比),因此 GIL 對 Python 線程的網絡編程影響相對較小。因此,David Beazley 說:“Python 線程擅長什么都不做。”
多核 CPU 上的 CPU 密集型任務:
- 解決方案: 要在多核 CPU 上運行 CPU 密集型 Python 代碼,必須使用多個 Python 進程。
總結:
- 多線程適用場景: 如果您希望應用程序更好地利用多核機器的計算資源,建議使用
multiprocessing
或concurrent.futures.ProcessPoolExecutor
。 - 多線程適用場景: 如果您希望同時運行多個 I/O 密集型任務,
threading
仍然是一個合適的模型。
2.2 協程與 GIL
協程與 GIL 的關系:
- 默認情況: 默認情況下,協程在同一個 Python 線程中共享事件循環,因此 GIL 不會影響它們。
- 多線程協程: 在異步程序中使用多個線程是可能的,但最佳實踐是讓一個線程運行事件循環和所有協程,而額外的線程執行特定任務。
3、實際應用中的注意事項
-
選擇合適的并發模型:
- CPU 密集型任務: 使用多進程 (
multiprocessing
或ProcessPoolExecutor
)。 - I/O 密集型任務: 使用多線程 (
threading
或ThreadPoolExecutor
) 或協程 (asyncio
)。 - 混合任務: 可以結合使用多進程和多線程或協程。
- CPU 密集型任務: 使用多進程 (
-
避免 GIL 的限制:
- 使用多進程: 對于 CPU 密集型任務,避免使用多線程,轉而使用多進程。
- 使用 C 擴展: 利用 C 擴展釋放 GIL,可以提高多線程性能。
-
線程安全:
- 鎖的使用: 在多線程環境中,更新共享數據時必須使用鎖來防止數據競爭。
- 線程安全的數據結構: 使用線程安全的數據結構,例如
queue.Queue
,可以簡化并發編程。
-
協程的阻塞問題:
- 避免阻塞代碼: 在協程中避免使用阻塞代碼,例如
time.sleep()
,可以使用await asyncio.sleep()
代替。 - 異步編程實踐: 遵循異步編程的最佳實踐,例如使用異步庫和異步 I/O 操作。
- 避免阻塞代碼: 在協程中避免使用阻塞代碼,例如
二、A Concurrent Hello World
1、Spinner with Threads
1. 概述
本節將深入探討如何使用 Python 的 threading
模塊實現一個簡單的終端動畫 Spinner。該程序在執行耗時操作時,通過在終端顯示旋轉動畫來提示用戶程序正在運行,而不是卡死。
2. 主要概念
2.1 線程(Thread)
- 定義: 線程是操作系統能夠進行運算調度的最小單位,是進程中的一個執行流。
- 作用: 在本例中,我們使用線程來實現耗時操作與動畫的并發執行,從而避免阻塞主線程。
2.2 threading.Event
類
- 定義:
Event
是線程間最簡單的信號機制,用于在線程之間傳遞事件信號。 - 主要方法:
set()
: 將內部標志設置為True
,通知所有等待該事件的線程。wait(timeout=None)
: 阻塞調用線程,直到內部標志被設置為True
或超時(如果指定了timeout
)。
- 工作原理:
- 初始時,內部標志為
False
。 - 當某個線程調用
set()
時,標志變為True
,所有等待該事件的線程將被喚醒。 - 如果調用
wait()
時指定了timeout
,則線程會在超時后繼續執行,即使標志仍為False
。
- 初始時,內部標志為
3. 代碼詳解
3.1 spin
函數
import itertools
import time
from threading import Thread, Eventdef spin(msg: str, done: Event) -> None:for char in itertools.cycle(r'\|/-'):status = f'\r{char} {msg}'print(status, end='', flush=True)if done.wait(0.1):breakblanks = ' ' * len(status)print(f'\r{blanks}\r', end='')
- 功能: 在終端顯示旋轉動畫,直到接收到停止信號。
- 關鍵點:
itertools.cycle(r'\|/-')
: 創建一個無限循環的迭代器,依次生成字符'\', '|', '/', '-', '\', ...
,實現旋轉效果。'\r'
: 回車符,將光標移動到行首,實現覆蓋輸出,達到動畫效果。flush=True
: 強制刷新輸出緩沖區,確保動畫及時顯示。done.wait(0.1)
: 每隔 0.1 秒檢查一次停止信號。如果接收到信號,則退出循環。- 清除動畫: 通過打印與動畫相同長度的空格,再移動光標到行首,實現清除動畫的效果。
3.2 slow
函數
def slow() -> int:time.sleep(3)return 42
- 功能: 模擬一個耗時操作(例如網絡請求或復雜計算),阻塞調用線程 3 秒后返回結果
42
。 - 關鍵點:
time.sleep(3)
會阻塞調用線程,但會釋放全局解釋器鎖(GIL),允許其他線程運行。
3.3 supervisor
函數
def supervisor() -> int:done = Event()spinner = Thread(target=spin, args=('thinking!', done))print(f'spinner object: {spinner}')spinner.start()result = slow()done.set()spinner.join()return result
- 功能: 協調主線程和 spinner 線程的運行。
- 關鍵點:
Event()
: 創建一個事件對象,用于通知 spinner 線程停止。Thread(target=spin, args=('thinking!', done))
: 創建一個新線程,目標函數為spin
,參數為('thinking!', done)
。spinner.start()
: 啟動 spinner 線程,開始顯示動畫。slow()
: 調用耗時操作,阻塞主線程。done.set()
: 設置事件,通知 spinner 線程停止。spinner.join()
: 等待 spinner 線程結束,確保動畫被正確清除。
3.4 main
函數
def main() -> None:result = supervisor()print(f'Answer: {result}')
- 功能: 調用
supervisor
函數并輸出結果。
4. 運行流程
- 主線程創建
Event
對象done
。 - 主線程創建并啟動 spinner 線程,傳入消息
'thinking!'
和done
事件。 - 主線程調用
slow()
,阻塞自身 3 秒。 - Spinner 線程每隔 0.1 秒更新一次動畫字符,直到
done
事件被設置。 slow()
返回后,主線程設置done
事件,通知 spinner 線程停止。- 主線程等待 spinner 線程結束,然后輸出結果
Answer: 42
。
5. 關鍵點總結
import itertools
import time
from threading import Thread, Eventdef spin(msg: str, done: Event) -> None:for char in itertools.cycle(r'\|/-'):status = f'\r{char} {msg}'print(status, end='', flush=True)if done.wait(0.1):breakblanks = ' ' * len(status)print(f'\r{blanks}\r', end='')def slow() -> int:time.sleep(3)return 42def supervisor() -> int:done = Event()spinner = Thread(target=spin, args=('thinking!', done))print(f'spinner object: {spinner}')spinner.start()result = slow()done.set()spinner.join()return resultdef main() -> None:result = supervisor()print(f'Answer: {result}')if __name__ == '__main__':main()
- 線程與 GIL:
- Python 的全局解釋器鎖(GIL)允許同一時間只有一個線程執行 Python 字節碼。
time.sleep()
會釋放 GIL,因此其他線程可以運行。- 在本例中,
slow()
阻塞主線程,但 spinner 線程可以繼續運行,因為time.sleep()
釋放了 GIL。
- 線程間通信:
threading.Event
是一種簡單的線程間通信機制,用于通知事件的發生。- 在本例中,
done
事件用于通知 spinner 線程停止運行。
- 動畫實現:
- 使用
'\r'
回車符和flush=True
實現覆蓋輸出,達到動畫效果。 - 通過循環更新字符,實現旋轉效果。
- 使用
2、Spinner with Processes
1. 多進程與多線程概述
在 Python 中,實現并發編程主要有兩種方式:
- 多線程(Threading):在同一個進程中創建多個線程,共享內存空間,但受限于全局解釋器鎖(GIL),無法真正利用多核 CPU。
- 多進程(Multiprocessing):創建多個獨立的進程,每個進程擁有獨立的內存空間和 GIL,可以真正利用多核 CPU。
multiprocessing
模塊 旨在模擬 threading
模塊的 API,使得從多線程到多進程的轉換更加容易。
2. multiprocessing.Process
類:創建和管理子進程
2.1 基本用法
與 threading.Thread
類似,multiprocessing.Process
用于創建子進程。以下是 spinner_proc.py
的示例代碼:
import itertools
import time
from multiprocessing import Process, Event
from multiprocessing import synchronizedef spin(msg: str, done: synchronize.Event) -> None: for char in itertools.cycle(r'\|/-'):status = f'\r{char} {msg}'print(status, end='', flush=True)if done.wait(0.1):breakblanks = ' ' * len(status)print(f'\r{blanks}\r', end='')def slow() -> int:time.sleep(3)return 42def supervisor() -> int:done = Event()spinner = Process(target=spin, args=('thinking!', done)) # 創建子進程print(f'spinner object: {spinner}')spinner.start() # 啟動子進程result = slow()done.set() # 通知子進程spinner.join() # 等待子進程結束return resultdef main() -> None:result = supervisor()print(f'Answer: {result}')if __name__ == '__main__':main()
關鍵點:
- 創建子進程:
Process(target=spin, args=('thinking!', done))
target
參數指定子進程要執行的函數。args
參數是傳遞給目標函數的參數元組。
- 啟動子進程:
spinner.start()
- 啟動子進程并執行目標函數。
- 等待子進程結束:
spinner.join()
- 主進程會阻塞,直到子進程結束。
輸出示例:
spinner object: <Process name='Process-1' parent=14868 initial>
Process-1
是子進程的名稱。14868
是運行spinner_proc.py
的 Python 進程的進程 ID。
2.2 與 threading.Thread
的對比
- API 相似性:兩者都提供了
start()
、join()
等方法,API 風格相似,易于從多線程切換到多進程。 - 實現差異:
threading.Event
是一個類,而multiprocessing.Event
是一個函數,返回一個synchronize.Event
實例。- 需要從
multiprocessing
模塊導入synchronize
以使用類型提示。
3. 進程間通信:挑戰與解決方案
3.1 進程隔離與數據共享
- 隔離性:操作系統將每個進程隔離,進程之間無法直接共享 Python 對象。
- 序列化與反序列化:跨進程邊界傳遞數據時,需要對數據進行序列化和反序列化,這會帶來開銷。
示例:
在 spinner_proc.py
中,只有 Event
對象的狀態在進程間傳遞。Event
是由 multiprocessing
模塊底層 C 代碼使用操作系統信號量實現的,因此可以高效地在進程間共享。
3.2 共享內存
從 Python 3.8 開始,multiprocessing.shared_memory
模塊提供了共享內存功能,但存在以下限制:
- 不支持用戶自定義類的實例。
- 支持的數據類型有限:
- 原始字節(
bytes
) ShareableList
:一種可變序列類型,可以存儲固定數量的int
、float
、bool
、None
以及最大 10MB 的str
和bytes
。
- 原始字節(
4. 實際應用中的注意事項
4.1 進程池(Process Pool)
- 概念:預先創建一組進程,重復利用這些進程執行任務,避免頻繁創建和銷毀進程帶來的開銷。
- 使用場景:適用于需要大量任務并行執行的場景,如 CPU 密集型任務。
示例:
from multiprocessing import Pooldef square(x):return x * xif __name__ == '__main__':with Pool(4) as pool:results = pool.map(square, range(10))print(results)
輸出示例:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4.2 進程間通信機制
除了共享內存,multiprocessing
還提供了多種進程間通信機制:
- 隊列(Queue):線程和進程安全的 FIFO 隊列。
- 管道(Pipe):用于在兩個進程之間建立雙向通信通道。
- 管理器(Manager):提供多種共享數據結構,如列表、字典等。
隊列示例:
from multiprocessing import Process, Queuedef worker(q):q.put('Hello from worker')if __name__ == '__main__':q = Queue()p = Process(target=worker, args=(q,))p.start()print(q.get()) # 輸出: Hello from workerp.join()
5. 總結
multiprocessing
模塊 提供了強大的多進程編程支持,彌補了多線程受限于 GIL 的不足。- API 相似性 使得從多線程切換到多進程變得相對容易,但需要注意進程間通信帶來的開銷和復雜性。
- 共享內存 提供了高效的進程間數據共享方式,但需要謹慎使用以避免數據競爭和死鎖。
- 進程池 是管理大量并行任務的有效工具,可以提高程序性能。
6. 補充示例:使用 multiprocessing
實現生產者-消費者模型
from multiprocessing import Process, Queue
import timedef producer(q, items):for item in items:q.put(item)print(f'Produced: {item}')time.sleep(1)q.put(None) # 發送終止信號def consumer(q):while True:item = q.get() # 這是一個阻塞操作if item is None:breakprint(f'Consumed: {item}')time.sleep(2)if __name__ == '__main__':q = Queue()p = Process(target=consumer, args=(q,))p.start()producer(q, range(5))p.join()
輸出示例:
Produced: 0
Consumed: 0
Produced: 1
Produced: 2
Consumed: 1
Produced: 3
Produced: 4
Consumed: 2
Consumed: 3
Consumed: 4
為什么需要 join()
-
等待子進程完成:
- 在主進程中啟動子進程后,主進程會繼續執行后續代碼。如果不使用
join()
,主進程可能會在子進程完成之前就結束,導致子進程成為孤兒進程或僵尸進程。 join()
方法會阻塞主進程,直到子進程p
結束。這樣可以確保子進程完成其任務后再繼續執行主進程的后續操作。
- 在主進程中啟動子進程后,主進程會繼續執行后續代碼。如果不使用
-
資源管理:
- 如果主進程過早退出,可能導致子進程無法正常結束,進而導致資源泄漏或其他不可預見的問題。
- 使用
join()
可以確保所有子進程在主進程退出前完成其任務,從而正確地釋放資源。
-
同步:
- 在多進程編程中,進程之間的同步非常重要。
join()
提供了一種簡單的同步機制,確保主進程在子進程完成后再繼續執行。 - 在你的代碼中,
join()
確保主進程等待消費者子進程完成消費所有隊列中的元素,包括接收到終止信號None
后退出循環。
- 在多進程編程中,進程之間的同步非常重要。
q.get()是如何做到阻塞操作的
要深入理解 Queue.get()
的阻塞實現,我們需要考慮一些底層的同步機制。Python 的 multiprocessing.Queue
是基于底層的操作系統原語來實現的,這些原語確保多進程環境中的數據安全和同步。
1. Queue 的基本結構
multiprocessing.Queue
實際上是通過 multiprocessing.Manager
和底層管道(pipe)來實現的。它使用進程間通信(IPC)機制來在進程間傳遞信息。關鍵的組件包括:
- 管道(Pipe):用于在進程之間傳遞字節流。
- 鎖(Lock):用來確保同時只有一個進程可以訪問或修改共享資源。
- 信號量(Semaphore)或條件變量(Condition Variable):用于管理進程的同步。
2. 阻塞機制的實現
具體到 Queue.get()
的實現,涉及以下幾個步驟:
a. 獲取鎖
每次調用 get()
方法時,首先需要獲取一個鎖。這是為了確保當一個進程正在操作隊列時,其他進程無法進行相同的操作。鎖的使用防止了數據競爭和不一致性。
b. 檢查隊列狀態
一旦鎖被獲取,get()
方法會檢查隊列中是否有可用的數據:
- 如果隊列不為空:直接從隊列中獲取數據,然后釋放鎖。進程可以繼續執行。
- 如果隊列為空:進程將進入等待狀態。
c. 進入等待狀態
如果隊列為空,get()
會讓當前進程進入等待狀態。這是通過信號量或條件變量實現的:
- 信號量:信號量會被初始化為 0。當隊列中加入新數據(
put()
操作)時,信號量被增加,表示有新的數據可用。阻塞的進程會被喚醒。 - 條件變量:條件變量允許進程等待特定的條件成立(如隊列中有數據)。當條件滿足時(新數據加入隊列),條件變量會通知等待的進程繼續執行。
d. 被喚醒和獲取數據
當有新數據被放入隊列時,put()
方法會更新信號量或條件變量,通知阻塞的 get()
操作可以繼續執行。被喚醒的 get()
操作將重新獲取鎖,檢查隊列并取出數據。
e. 釋放鎖
無論是成功獲取數據還是由于其他原因退出,get()
方法最后都會釋放鎖,以允許其他進程進行隊列操作。
經典問題:為什么用while,而不是if
這是一個多線程并發編程中經常會遇到的問題,尤其是在等待條件時,比如經典的生產者-消費者模型,很多人會問:
為什么在等待條件時,常常寫成
while not 條件: condition.wait()
,而不是if not 條件: condition.wait()
?
核心原因:防止虛假喚醒(Spurious Wakeup)和競態條件
1. 虛假喚醒(Spurious Wakeup)
- 在多線程環境下,有些操作系統或線程庫實現允許線程在沒有被顯式通知的情況下從
wait()
返回,這叫做“虛假喚醒”。 - 如果你用
if
,線程醒來后不會再次檢查條件,可能條件并沒有真正滿足,結果導致程序邏輯錯誤。 while
會在每次醒來后重新檢查條件,確保只有在真正條件滿足時才繼續執行。
2. 競爭條件(Race Condition)
- 在多線程環境中,可能多個線程被同時喚醒(例如
notify_all()
喚醒多個線程),這些線程會爭奪資源,只有一個能成功拿到,其余又要繼續等待。 - 如果用
if
,沒搶到資源的線程不會再次判斷條件,會直接往下執行,出錯。 while
可以讓沒搶到資源的線程再次進入等待,直到條件真正滿足。
一句話記憶
多線程等待條件時,用while不是if,因為醒來后必須重新檢查條件,防止虛假喚醒和數據競爭。
7. 常見誤區與調試技巧
- 避免在
__main__
之外創建進程:在 Windows 上,必須將創建進程的代碼放在if __name__ == '__main__'
塊中,否則會導致遞歸創建進程。 - 使用
if __name__ == '__main__'
保護主程序:確保子進程不會再次執行主程序代碼。 - 注意全局變量:進程之間不共享全局變量,需要使用進程間通信機制傳遞數據。
- 調試多進程程序:可以使用
print
語句、日志記錄或調試工具,但要注意進程間輸出的順序。
3、Spinner with Coroutines
1. 協程驅動的 Spinner 程序解析
1.1 程序結構
我們以 spinner_async.py
為例,探討如何使用協程實現一個簡單的 Spinner 程序,該程序在后臺顯示旋轉指示器,同時執行耗時操作。
import asyncio
import itertoolsdef main() -> None:result = asyncio.run(supervisor())print(f'Answer: {result}')async def supervisor() -> int:spinner = asyncio.create_task(spin('thinking!'))print(f'spinner object: {spinner}')result = await slow()spinner.cancel()return resultasync def spin(msg: str) -> None:for char in itertools.cycle(r'\|/-'):status = f'\r{char} {msg}'print(status, flush=True, end='')try:await asyncio.sleep(.1)except asyncio.CancelledError:breakblanks = ' ' * len(status)print(f'\r{blanks}\r', end='')async def slow() -> int:await asyncio.sleep(3)return 42if __name__ == '__main__':main()
在這個例子中,任務調度是由 asyncio
的事件循環負責的。當一個任務(協程)在執行過程中遇到 await
關鍵字(如 await asyncio.sleep()
),它會將控制權交還給事件循環,讓事件循環調度其他準備好執行的任務。這種機制讓多個任務可以并發運行,而不會因為等待某個任務的完成而阻塞整個程序。
具體調度過程
-
事件循環:
- 當程序運行時,
asyncio.run(supervisor())
啟動了事件循環。 - 事件循環負責管理和調度所有由
asyncio.create_task()
創建的任務。
- 當程序運行時,
-
任務創建與調度:
- 在
supervisor
協程中,asyncio.create_task(spin('thinking!'))
創建了一個新的任務spin
。 - 事件循環立即開始調度
spin
的執行,因為它是通過create_task
顯式創建的并且不依賴其他任務完成。
- 在
-
遇到
await
關鍵字:- 在
spin
函數中,await asyncio.sleep(.1)
暫停了spin
的執行,將控制權交還給事件循環。 - 事件循環檢查其他任務,發現
supervisor
仍在等待slow
的結果,而slow
也在await asyncio.sleep(3)
上暫停。 - 因為
spin
的暫停時間很短(0.1 秒),所以事件循環會頻繁地重新調度spin
,顯示旋轉動畫。
- 在
-
并發執行:
- 由于
spin
和slow
都在使用await
進行異步等待,事件循環能夠在它們各自的等待期間切換執行。 - 這意味著即使
slow
在等待 3 秒,spin
仍能繼續執行,提供視覺反饋。
- 由于
-
任務取消:
- 一旦
slow
完成并返回結果,supervisor
立即調用spinner.cancel()
來取消spin
任務。 - 當
spin
檢測到取消請求時,通過捕獲asyncio.CancelledError
來終止動畫循環。
- 一旦
總結
- 調度角色: 是
asyncio
的事件循環負責調度任務。它在任務遇到await
時自動切換到其他準備好執行的任務。所以await后面跟可等待對象(協程對象、Future、Task對象),他們都可能發生I/O等待 - 自動切換: 任務在遇到
await asyncio.sleep()
或其他 I/O 操作時,會自動將控制權交還給事件循環,允許事件循環安排其他任務的執行。 - 并發執行: 通過這種調度機制,多個任務可以并發運行,提高程序的響應性和效率,特別是在 I/O 密集型操作中。
關于await
在 Python 的異步編程中,await
關鍵字用于暫停協程的執行,直到提供的可等待對象完成。可等待對象可以是協程對象、Future
、或 Task
。當代碼執行到 await
時,會發生以下事情:
-
暫停當前協程:
- 當協程執行到
await
語句時,它會暫停執行,將控制權交還給事件循環。 - 當前協程的狀態被保存,以便在可等待對象完成后可以恢復執行。
- 當協程執行到
-
返回控制權給事件循環:
await
后的操作通常是 I/O 操作或其他耗時任務。在這些操作進行時,協程會將控制權交還給事件循環。- 事件循環可以利用這個時間去調度其他可運行的任務。這種機制允許其他協程在當前協程等待時執行,從而實現并發。
-
事件循環調度其他任務:
- 事件循環會檢查所有已注冊的任務,并尋找可以運行的任務(即那些不在等待 I/O 的任務)。
- 如果有其他任務準備好執行,事件循環會調度它們運行。
-
等待的任務完成后:
- 一旦
await
后的可等待對象完成(例如,一個網絡請求完成,一個定時器到期,或一個文件被讀取完畢),事件循環會將控制權返回給之前暫停的協程。 - 協程從上次暫停的位置恢復執行。
- 一旦
-
恢復執行:
- 在可等待對象完成后,協程會繼續執行
await
之后的代碼。 - 這可能涉及處理從
await
表達式返回的結果。
- 在可等待對象完成后,協程會繼續執行
代碼示例中的調度過程
以你的代碼為例,來看 await
的具體調度過程:
async def spin(msg: str) -> None:for char in itertools.cycle(r'\|/-'):status = f'\r{char} {msg}'print(status, flush=True, end='')try:# 這里的 await asyncio.sleep(.1) 將暫停 spin 協程await asyncio.sleep(.1)except asyncio.CancelledError:breakblanks = ' ' * len(status)print(f'\r{blanks}\r', end='')async def slow() -> int:# 這里的 await asyncio.sleep(3) 將暫停 slow 協程await asyncio.sleep(3)return 42
- 在
spin
中,遇到await asyncio.sleep(.1)
時,spin
協程暫停,讓出控制權。 - 事件循環會檢查其他任務,發現
slow
也在等待(await asyncio.sleep(3)
)。 - 因為
spin
的等待時間很短,事件循環在其他任務沒有準備好執行時,很快會回到spin
。 - 繼續執行
spin
,直到slow
完成 3 秒的等待并返回結果。 supervisor
協程恢復,取消spin
,打印結果。
這種機制使得 Python 的異步編程可以高效地處理大量 I/O 操作,而不會因為等待而阻塞整個程序。
為什么輸出是這樣的?
這個代碼的輸出為什么一直是先輸出spinner object: <Task pending name=‘Task-2’ coro=<spin() running at /fluent/test.py:15>>
然后再是動畫,最后是Answer: 42 ?
在Python的asyncio
中,創建一個任務(如使用asyncio.create_task()
)會安排該協程在事件循環中運行,但不會立即執行該協程的代碼。相反,它會繼續執行當前的同步代碼,直到遇到await
或其他需要切換上下文的點。
具體到你的代碼:
-
創建任務:
spinner = asyncio.create_task(spin('thinking!'))
這行代碼創建了一個任務對象
spinner
,并安排spin
協程在事件循環中運行。此時它只是在事件循環中注冊了spin
,并不會立即執行spin
的代碼。 -
打印任務對象信息:
print(f'spinner object: {spinner}')
創建任務后,代碼繼續執行這一行。這是一個普通的同步操作,所以會立即被執行,打印出任務對象的信息。
-
執行
spin
和slow
:
接下來是:result = await slow()
這行代碼會暫停
supervisor
協程的執行,直到slow()
完成。在這個暫停期間,事件循環會有機會調度并執行其他任務,包括spin
任務。
正是因為事件循環的這種調度機制,創建任務后,控制權會立即返回到當前的同步代碼塊,使得創建任務后的代碼(如打印任務對象信息)會在任務的協程代碼實際開始執行之前被執行。
1.2 關鍵概念解析
-
事件循環(Event Loop):
- 協程由事件循環驅動,事件循環負責管理協程隊列、調度協程執行、監控 I/O 操作事件,并在事件發生時將控制權交還給相應的協程。
- 事件循環和所有協程都在單線程內執行,因此,任何一個協程的阻塞都會導致事件循環變慢,進而影響其他協程的執行。
-
asyncio.run()
函數:- 用于啟動事件循環,并驅動傳入的協程對象(通常是程序的入口點,例如本例中的
supervisor
)。 - 該函數會阻塞,直到傳入的協程執行完畢,并返回協程的返回值。
- 用于啟動事件循環,并驅動傳入的協程對象(通常是程序的入口點,例如本例中的
-
原生協程(Native Coroutines):
- 使用
async def
關鍵字定義,例如supervisor
、spin
和slow
。 - 原生協程需要通過事件循環來驅動執行。
- 使用
-
asyncio.create_task()
函數:- 從協程內部調用,用于調度另一個協程的執行。
- 該函數會立即返回一個
asyncio.Task
對象,該對象包裝了協程對象,并提供了控制和管理協程狀態的方法。 - 例如,
spinner = asyncio.create_task(spin('thinking!'))
創建了一個用于執行spin
協程的任務,并將其賦值給spinner
變量。
-
await
關鍵字:- 從協程內部調用,用于將控制權轉移給另一個協程。
- 當前協程會被掛起,直到被
await
的協程執行完畢。 - 例如,
result = await slow()
會掛起supervisor
協程,直到slow
協程執行完畢,并將slow
的返回值賦給result
。
-
協程的運行方式:
asyncio.run(coro())
:從普通函數調用,用于驅動協程對象(通常是程序的入口點)。該調用會阻塞,直到協程體返回。asyncio.create_task(coro())
:從協程內部調用,用于調度另一個協程的執行。該調用不會掛起當前協程,并返回一個Task
實例。await coro()
:從協程內部調用,用于將控制權轉移給被await
的協程。該調用會掛起當前協程,直到被await
的協程返回。
-
協程的取消:
- 調用
Task.cancel()
方法會向協程拋出asyncio.CancelledError
異常,協程可以通過捕獲該異常來執行清理操作并退出。
- 調用
1.3 關鍵函數解析
-
spin
協程:- 使用
itertools.cycle
循環顯示旋轉指示器(\|/-
)。 - 使用
await asyncio.sleep(0.1)
暫停 0.1 秒,避免阻塞事件循環。 - 捕獲
asyncio.CancelledError
異常以優雅地退出循環。
- 使用
-
slow
協程:- 使用
await asyncio.sleep(3)
模擬耗時操作,并返回結果42
。 - 同樣使用
await asyncio.sleep
而不是time.sleep
,以避免阻塞事件循環。
- 使用
1.4 重要實驗:理解協程阻塞的影響
為了更好地理解協程的工作原理,我們進行以下實驗:
-
實驗內容:將
slow
協程中的await asyncio.sleep(3)
替換為time.sleep(3)
。 -
預期結果:
- 程序顯示 spinner 對象,例如
<Task pending name='Task-2' coro=<spin() running at /path/to/spinner_async.py:12>>
。 - Spinner 不會顯示,程序掛起 3 秒。
- 顯示
Answer: 42
并結束程序。
- 程序顯示 spinner 對象,例如
-
原因分析:
time.sleep(3)
會阻塞主線程,而主線程也是事件循環所在的線程。- 因此,在
time.sleep(3)
期間,事件循環無法執行任何其他協程,包括spin
協程。 - 這導致 Spinner 無法顯示,程序整體被阻塞。
-
關鍵結論:
- 永遠不要在 asyncio 協程中使用
time.sleep(…)
,除非你希望暫停整個程序。 - 如果協程需要等待一段時間,應該使用
await asyncio.sleep(DELAY)
,這會將控制權交還給事件循環,允許其他掛起的協程執行。
- 永遠不要在 asyncio 協程中使用
2. 協程與綠色線程(Greenlets)
-
綠色線程(Greenlets):
- 綠色線程是一種輕量級的協程實現,由
greenlet
包提供。 - 不需要使用
yield
或await
等特殊語法,更易于集成到現有的順序代碼庫中。 - 例如,SQL Alchemy 1.4 ORM 使用綠色線程來實現其與 asyncio 兼容的新異步 API。
- 綠色線程是一種輕量級的協程實現,由
-
gevent 庫:
gevent
是一個基于綠色線程的網絡庫,通過“猴子補丁(monkey patching)”的方式,將 Python 的標準 socket 模塊替換為非阻塞版本。- 對周圍代碼具有高度透明性,使得將順序應用程序和庫(如數據庫驅動程序)轉換為執行并發網絡 I/O 變得更加容易。
- 許多開源項目使用
gevent
,包括廣泛部署的 Gunicorn。
3. 總結
- 協程 是一種強大的并發編程工具,在單線程內實現高效的 I/O 操作處理。
- 事件循環 是協程的核心,負責調度和管理協程的執行。
- 避免使用
time.sleep()
,而應使用await asyncio.sleep()
,以確保事件循環的流暢運行。 - 綠色線程 和 gevent 提供了另一種異步編程的方式,適用于不同的應用場景。
4、Supervisors Side-by-Side
spinner_thread.py
和spinner_async.py
的代碼行數幾乎相同。supervisor
函數是這些示例的核心。下面我們來詳細比較一下它們。示例19 - 8列出了示例19 - 2中的supervisor
函數。
示例19 - 8 spinner_thread.py:使用線程的supervisor
函數
def supervisor() -> int:done = Event()spinner = Thread(target=spin, args=('thinking!', done))print('spinner object:', spinner)spinner.start()result = slow()done.set()spinner.join()return result
作為對比,示例19 - 9展示了示例19 - 4中的supervisor
協程。
示例19 - 9 spinner_async.py:異步的supervisor
協程
async def supervisor() -> int:spinner = asyncio.create_task(spin('thinking!'))print('spinner object:', spinner)result = await slow()spinner.cancel()return result
下面總結這兩個supervisor
實現之間的異同點:
asyncio.Task
大致相當于threading.Thread
。
示例對比:
使用線程的示例 (thread_example.py
):
import threading
import timedef task1():for i in range(5):print(f"Thread 1: {i}")time.sleep(1)def task2():for i in range(5):print(f"Thread 2: {i}")time.sleep(1.5)def run_threads():thread1 = threading.Thread(target=task1)thread2 = threading.Thread(target=task2)thread1.start()thread2.start()thread1.join()thread2.join()print("Both threads have finished.")if __name__ == "__main__":run_threads()
使用asyncio
的示例 (asyncio_example.py
):
import asyncioasync def task1():for i in range(5):print(f"Task 1: {i}")await asyncio.sleep(1)async def task2():for i in range(5):print(f"Task 2: {i}")await asyncio.sleep(1.5)async def run_tasks():task1_coroutine = asyncio.create_task(task1())task2_coroutine = asyncio.create_task(task2())await task1_coroutineawait task2_coroutineprint("Both tasks have finished.")if __name__ == "__main__":asyncio.run(run_tasks())
解釋:
- 并發執行: 在線程示例中,
task1
和task2
在不同的線程中并行執行。在asyncio
示例中,task1
和task2
作為協程在同一個線程中并發執行,通過事件循環調度。 - 任務調度: 線程由操作系統調度,而
asyncio
任務由事件循環調度。
Task
驅動一個協程對象,而Thread
調用一個可調用對象。
示例對比:
使用線程的示例 (thread_callable.py
):
import threadingclass MyTask:def run(self):print("Thread is running a callable object.")def run_thread():obj = MyTask()thread = threading.Thread(target=obj.run)thread.start()thread.join()print("Thread has finished.")if __name__ == "__main__":run_thread()
使用asyncio
的示例 (asyncio_coroutine.py
):
import asyncioclass MyCoroutine:async def run(self):print("Coroutine is running.")async def run_coroutine():obj = MyCoroutine()await obj.run()print("Coroutine has finished.")if __name__ == "__main__":asyncio.run(run_coroutine())
解釋:
- 調用方式: 在線程示例中,
Thread
調用了一個類的run
方法,這是一個可調用對象。在asyncio
示例中,asyncio
直接調用了一個協程方法run
,這是因為asyncio
只能調度協程。
- 協程通過
await
關鍵字顯式地讓出控制權。
示例對比:
使用線程的示例 (thread_sleep.py
):
import threading
import timedef task():for i in range(5):print(f"Thread: {i}")time.sleep(1)def run_thread():thread = threading.Thread(target=task)thread.start()thread.join()print("Thread has finished.")if __name__ == "__main__":run_thread()
使用asyncio
的示例 (asyncio_await.py
):
import asyncioasync def task():for i in range(5):print(f"Coroutine: {i}")await asyncio.sleep(1)async def run_coroutine():await task()print("Coroutine has finished.")if __name__ == "__main__":asyncio.run(run_coroutine())
解釋:
- 控制權讓渡: 在線程示例中,
time.sleep(1)
會阻塞當前線程。在asyncio
示例中,await asyncio.sleep(1)
會暫停當前協程的執行,讓事件循環調度其他協程執行。
- 你不能自己實例化
Task
對象,而是通過將一個協程傳遞給asyncio.create_task(…)
來獲取它們。
示例對比:
使用線程的示例 (thread_instantiate.py
):
import threadingdef task():print("Thread is running.")def run_thread():thread = threading.Thread(target=task)thread.start()thread.join()print("Thread has finished.")if __name__ == "__main__":run_thread()
使用asyncio
的示例 (asyncio_create_task.py
):
import asyncioasync def task():print("Task is running.")async def run_tasks():task_coroutine = asyncio.create_task(task())await task_coroutineprint("Task has finished.")if __name__ == "__main__":asyncio.run(run_tasks())
解釋:
- 任務創建: 在線程示例中,
Thread
對象是通過threading.Thread
直接實例化的。在asyncio
示例中,Task
對象是通過asyncio.create_task
創建的,而不是通過實例化Task
類。
- 當
asyncio.create_task(…)
返回一個Task
對象時,它已經被安排好要運行了,但Thread
實例必須通過調用其start
方法來顯式地啟動運行。
示例對比:
使用線程的示例 (thread_start.py
):
import threadingdef task():print("Thread is running.")def run_thread():thread = threading.Thread(target=task)# 線程尚未啟動# thread.start() 必須調用才能啟動線程thread.start()thread.join()print("Thread has finished.")if __name__ == "__main__":run_thread()
使用asyncio
的示例 (asyncio_schedule.py
):
import asyncioasync def task():print("Task is running.")async def run_tasks():task_coroutine = asyncio.create_task(task())# Task 已經被安排運行,無需顯式啟動await task_coroutineprint("Task has finished.")if __name__ == "__main__":asyncio.run(run_tasks())
解釋:
- 任務啟動: 在線程示例中,必須調用
thread.start()
才能啟動線程。在asyncio
示例中,asyncio.create_task
會立即安排協程執行,無需顯式啟動。
- 在使用線程的
supervisor
中,slow
是一個普通函數,由主線程直接調用。在異步的supervisor
中,slow
是一個由await
驅動的協程。
示例對比:
使用線程的示例 (thread_slow.py
):
import threading
import timedef slow():print("Slow function is running.")time.sleep(2)print("Slow function is done.")return 42def supervisor():thread = threading.Thread(target=slow)thread.start()thread.join()print("Supervisor has finished.")if __name__ == "__main__":supervisor()
使用asyncio
的示例 (asyncio_slow.py
):
import asyncioasync def slow():print("Slow coroutine is running.")await asyncio.sleep(2)print("Slow coroutine is done.")return 42async def supervisor():task = asyncio.create_task(slow())await taskprint("Supervisor has finished.")if __name__ == "__main__":asyncio.run(supervisor())
解釋:
- 函數調用: 在線程示例中,
slow
是一個普通函數,由主線程直接調用并在新線程中執行。在asyncio
示例中,slow
是一個協程,通過await
調用。
- 沒有從外部終止線程的API;相反,你必須發送一個信號,比如設置
done
這個Event
對象。對于任務,有Task.cancel()
實例方法,它會在協程體當前暫停的await
表達式處引發CancelledError
異常。
示例對比:
使用線程的示例 (thread_cancel.py
):
import threading
import time
from threading import Eventdef task(done):while not done.is_set():print("Thread is running.")time.sleep(1)print("Thread has been cancelled.")def run_thread():done = Event()thread = threading.Thread(target=task, args=(done,))thread.start()time.sleep(3)done.set()thread.join()print("Thread has finished.")if __name__ == "__main__":run_thread()
使用asyncio
的示例 (asyncio_cancel.py
):
import asyncioasync def task():try:while True:print("Task is running.")await asyncio.sleep(1)except asyncio.CancelledError:print("Task has been cancelled.")async def run_tasks():task_coroutine = asyncio.create_task(task())await asyncio.sleep(3)task_coroutine.cancel()await task_coroutineprint("Task has finished.")if __name__ == "__main__":asyncio.run(run_tasks())
解釋:
- 取消機制: 在線程示例中,通過設置
Event
對象來通知線程停止。在asyncio
示例中,使用task.cancel()
來取消任務,協程會在await asyncio.sleep(1)
處捕獲CancelledError
異常。
supervisor
協程必須在main
函數中通過asyncio.run
來啟動。
示例對比:
使用線程的示例 (thread_main.py
):
import threading
import timedef task():print("Thread is running.")time.sleep(2)print("Thread is done.")def supervisor():thread = threading.Thread(target=task)thread.start()thread.join()print("Supervisor has finished.")if __name__ == "__main__":supervisor()
使用asyncio
的示例 (asyncio_main.py
):
import asyncioasync def task():print("Task is running.")await asyncio.sleep(2)print("Task is done.")async def supervisor():await task()print("Supervisor has finished.")if __name__ == "__main__":asyncio.run(supervisor())
解釋:
- 程序入口: 在線程示例中,
supervisor
函數作為主函數直接調用。在asyncio
示例中,supervisor
協程通過asyncio.run
啟動。
總結
通過這些具體的對比示例,我們可以看到threading.Thread
和asyncio.Task
在并發執行、調用方式、控制權讓渡、任務創建與啟動、取消機制以及程序入口等方面的不同。這些差異使得asyncio
在處理高并發和I/O密集型任務時更加高效。
關于線程和協程的最后一點:如果你用線程進行過任何復雜的編程,就會知道理解程序的邏輯有多困難,因為調度器可以隨時中斷線程。你必須記住持有鎖來保護程序的關鍵部分,以避免在多步驟操作過程中被中斷,否則可能會使數據處于無效狀態。
而對于協程,默認情況下你的代碼不會被中斷。你必須顯式地使用await
讓程序的其他部分運行。協程不是通過持有鎖來同步多個線程的操作,而是從定義上就是 “同步” 的:任何時刻只有一個協程在運行。當你想放棄控制權時,使用await
將控制權交回調度器。這就是為什么可以安全地取消一個協程:從定義上講,協程只有在await
表達式處暫停時才能被取消,所以你可以通過處理CancelledError
異常來進行清理工作。
time.sleep()
調用會阻塞但不做任何事情。現在我們將通過一個CPU密集型的調用來進行實驗,以便更好地理解全局解釋器鎖(GIL),以及CPU密集型函數在異步代碼中的影響。
三、The Real Impact of the GIL
——基于多進程、多線程、異步編程的對比分析
1、核心問題:GIL 對 CPU 密集型任務的影響
GIL(全局解釋器鎖) 是 CPython 解釋器的設計特性,它確保同一時刻僅有一個線程執行 Python 字節碼。這對 CPU 密集型任務有顯著影響:
- I/O 操作:等待網絡/磁盤時釋放 GIL,允許其他線程運行(如
time.sleep(3)
或 HTTP 請求)。 - CPU 密集型操作:長時間占用 CPU 的函數會阻塞其他線程(如素數計算)。
2、原始例子:素數判斷函數
import mathdef is_prime(n: int) -> bool:if n < 2:return Falseif n == 2:return Trueif n % 2 == 0:return Falseroot = math.isqrt(n)for i in range(3, root + 1, 2): # 僅檢查奇數if n % i == 0:return Falsereturn True
- 測試
is_prime(5_000_111_000_222_021)
耗時約 3.3 秒(CPU 密集型)。
3、三大并發模型的行為對比
假設將以下代碼中的 time.sleep(3)
或 asyncio.sleep(3)
替換為 is_prime(n)
:
-
多進程 (
spinner_proc.py
)- 結果:旋轉動畫持續運行。
- 原理:子進程負責動畫,父進程計算素數。進程間內存隔離,GIL 不共享。
# 偽代碼示例 from multiprocessing import Process, Eventdef spin():while not done_event.is_set():# 更新動畫...if __name__ == "__main__":done_event = Event()p = Process(target=spin) # 子進程運行動畫p.start()is_prime(n) # 主進程計算素數done_event.set()
-
多線程 (
spinner_thread.py
)- 結果:動畫持續運行(但可能卡頓)。
- 原理:
- Python 默認每 5ms 強制切換線程(通過
sys.setswitchinterval
設置)。 - 主線程計算素數時,每 5ms 被中斷一次,動畫線程獲得 GIL 更新狀態。
- Python 默認每 5ms 強制切換線程(通過
- 陷阱:若線程數 > CPU 核心數,程序效率會顯著下降!
# 偽代碼示例 import threadingdef spin():while not done_event.is_set():# 更新動畫(短暫占用 GIL)...done_event = threading.Event() t = threading.Thread(target=spin) t.start() is_prime(n) # 主線程計算素數 done_event.set()
-
異步編程 (
spinner_async.py
)- 結果:動畫完全凍結!
- 原理:
- 異步任務在單線程中運行,
is_prime
阻塞事件循環。 - 動畫任務 (
spinner
) 從未獲得執行機會。
- 異步任務在單線程中運行,
# 偽代碼示例 import asyncioasync def spin():while not done:# 更新動畫await asyncio.sleep(0.1)async def slow():is_prime(n) # 阻塞整個事件循環!return 42async def supervisor():spinner_task = asyncio.create_task(spin())result = await slow()spinner_task.cancel()return result
關鍵難點與解決方案
1. 異步編程中運行 CPU 密集型任務
問題:直接調用 is_prime
會阻塞事件循環。
方案一(不推薦):插入 await asyncio.sleep(0)
async def is_prime_async(n):for i in range(3, root + 1, 2):if n % i == 0:return Falseif i % 100_000 == 1: # 每 10 萬次迭代讓步一次await asyncio.sleep(0) # 讓出控制權return True
- 缺點:計算時間從 3.3s 增至 4.9s(效率下降 50%),且事件循環仍被拖慢。
方案二(推薦):使用 run_in_executor
移交到線程池
async def slow():loop = asyncio.get_running_loop()result = await loop.run_in_executor(None, is_prime, n) # 在子線程中運行return result
這種情況下的is_prime函數要是非async的,如果是async會報錯:RuntimeWarning: Enable tracemalloc to get the object allocation
loop.run_in_executor
方法用于在不同的線程中運行一個阻塞的同步函數,而不是一個協程。因此,這個問題導致了 is_prime
協程沒有被正確地等待(await),從而引發了 RuntimeWarning
。
所以需要將 is_prime
改為同步函數:如果 is_prime
函數本身不需要異步執行,你可以將它定義為一個普通的同步函數(去掉 async
關鍵字)。這樣可以直接使用 loop.run_in_executor
來調用它。
完整代碼
import mathdef is_prime(n: int) -> bool:if n < 2:return Falseif n == 2:return Trueif n % 2 == 0:return Falseroot = math.isqrt(n)for i in range(3, root + 1, 2): # 僅檢查奇數if n % i == 0:return Falsereturn Trueimport asyncio
import itertoolsasync def slow():loop = asyncio.get_running_loop()result = await loop.run_in_executor(None, is_prime, 5_000_111_000_222_021) # 在子線程中運行return resultdef main() -> None:result = asyncio.run(supervisor())print(f'Answer: {result}')async def supervisor() -> int:spinner = asyncio.create_task(spin('thinking!'))print(f'spinner object: {spinner}')result = await slow()spinner.cancel()return resultasync def spin(msg: str) -> None:for char in itertools.cycle(r'\|/-'):status = f'\r{char} {msg}'print(status, flush=True, end='')try:await asyncio.sleep(.1)except asyncio.CancelledError:breakblanks = ' ' * len(status)print(f'\r{blanks}\r', end='')if __name__ == '__main__':main()
- 原理:將阻塞函數轉移到線程池執行,避免阻塞事件循環。
2. 多線程的適用場景
- 適合:I/O 密集型任務(如 HTTP 請求、文件讀寫)。
- 避免:CPU 密集型任務(除非任務能頻繁釋放 GIL,如 NumPy/C 擴展)。
- 對比實驗:
# 兩個 CPU 密集型線程 vs 順序執行
import threading
import time
import mathn = 5_000_111_000_222_021def is_prime(n: int) -> bool:if n < 2:return Falseif n == 2:return Trueif n % 2 == 0:return Falseroot = math.isqrt(n)for i in range(3, root + 1, 2): # 僅檢查奇數if n % i == 0:return Falsereturn Truedef task():for _ in range(2): # 并行執行兩次is_prime(n)# 順序執行(更快!)
start = time.time()
is_prime(n)
is_prime(n)
print("Sequential:", time.time() - start) # 多線程執行(更慢!)
t1 = threading.Thread(target=is_prime, args=(n,))
t2 = threading.Thread(target=is_prime, args=(n,))
t1.start(); t2.start()
t1.join(); t2.join()
print("Threads:", time.time() - start) # Sequential: 2.7513389587402344
# Threads: 5.548290014266968
工程實踐總結
場景 | 推薦模型 | 原因 |
---|---|---|
I/O 密集型(網絡/磁盤) | 異步編程 (asyncio ) | 高并發、低開銷 |
CPU 密集型 | 多進程 (multiprocessing ) | 繞過 GIL,利用多核 CPU |
混合任務 | 異步 + 線程池 | I/O 用 asyncio ,CPU 用 run_in_executor |
黃金法則:
- “I/O 用異步,CPU 用進程”
- 避免在異步事件循環中直接調用 CPU 密集型函數
四、A Homegrown Process Pool
本節是為了展示如何使用多個進程處理CPU密集型任務,以及使用隊列來分配任務和收集結果的常見模式。第20章將介紹一種向進程分配任務的更簡單方法:使用concurrent.futures
包中的ProcessPoolExecutor
,它內部使用了隊列。
1. 問題背景與核心概念
1.1 問題描述
我們需要檢測一組大數(20個從2到101?-1的整數)是否為素數。這是一個CPU密集型任務,因為判斷大數是否為素數需要大量計算。
primes.py:
#!/usr/bin/env python3import mathPRIME_FIXTURE = [(2, True),(142702110479723, True),(299593572317531, True),(3333333333333301, True),(3333333333333333, False),(3333335652092209, False),(4444444444444423, True),(4444444444444444, False),(4444444488888889, False),(5555553133149889, False),(5555555555555503, True),(5555555555555555, False),(6666666666666666, False),(6666666666666719, True),(6666667141414921, False),(7777777536340681, False),(7777777777777753, True),(7777777777777777, False),(9999999999999917, True),(9999999999999999, False),
]NUMBERS = [n for n, _ in PRIME_FIXTURE]# tag::IS_PRIME[]
def is_prime(n: int) -> bool:if n < 2:return Falseif n == 2:return Trueif n % 2 == 0:return Falseroot = math.isqrt(n)for i in range(3, root + 1, 2):if n % i == 0:return Falsereturn True
# end::IS_PRIME[]if __name__ == '__main__':for n, prime in PRIME_FIXTURE:prime_res = is_prime(n)assert prime_res == primeprint(n, prime)
1.2 核心挑戰
順序執行(單進程)效率低下,如何利用多核CPU加速計算?
1.3 關鍵解決方案
使用 多進程 (multiprocessing) 和 任務隊列 (Queue):
- 創建多個工作進程并行處理任務
- 使用隊列分配任務和收集結果
- 避免GIL(全局解釋器鎖)對CPU密集型任務的限制
2. 順序執行:性能基準
2.1 代碼實現 (sequential.py
)
#!/usr/bin/env python3
from time import perf_counter
from typing import NamedTuple
from primes import is_prime, NUMBERSclass Result(NamedTuple):prime: boolelapsed: floatdef check(n: int) -> Result:t0 = perf_counter()prime = is_prime(n)return Result(prime, perf_counter() - t0)def main() -> None:print(f'Checking {len(NUMBERS)} numbers sequentially:')t0 = perf_counter()for n in NUMBERS:prime, elapsed = check(n)label = 'P' if prime else ' 'print(f'{n:16} {label} {elapsed:9.6f}s')elapsed = perf_counter() - t0print(f'Total time: {elapsed:.2f}s')if __name__ == '__main__':main()
2.2 運行結果
Checking 20 numbers sequentially:2 P 0.000000s142702110479723 P 0.247988s299593572317531 P 0.338454s
3333333333333301 P 1.098308s
3333333333333333 0.000014s
3333335652092209 1.093885s
4444444444444423 P 1.270619s
4444444444444444 0.000001s
4444444488888889 1.343093s
5555553133149889 1.531583s
5555555555555503 P 1.479670s
5555555555555555 0.000006s
6666666666666666 0.000000s
6666666666666719 P 1.803434s
6666667141414921 1.786589s
7777777536340681 1.739732s
7777777777777753 P 2.059888s
7777777777777777 0.000009s
9999999999999917 P 1.965799s
9999999999999999 0.000005s
Total time: 17.76s
2.3 關鍵觀察
- 所有任務順序執行
- 小數字檢測快(微秒級),大素數檢測慢
- 總時間≈各任務耗時之和
3. 多進程解決方案 (procs.py
)
3.1 核心代碼解析
import sys
from time import perf_counter
from typing import NamedTuple
from multiprocessing import Process, SimpleQueue, cpu_count
from multiprocessing import queues
from primes import is_prime, NUMBERSclass PrimeResult(NamedTuple):n: int # 存儲原始數字prime: bool # 是否為素數elapsed: float # 檢測耗時# 類型別名提高可讀性
JobQueue = queues.SimpleQueue[int]
ResultQueue = queues.SimpleQueue[PrimeResult]def check(n: int) -> PrimeResult:t0 = perf_counter()res = is_prime(n)return PrimeResult(n, res, perf_counter() - t0)def worker(jobs: JobQueue, results: ResultQueue) -> None:while n := jobs.get(): # 從隊列獲取任務results.put(check(n)) # 提交結果# 毒丸處理:發送終止信號results.put(PrimeResult(0, False, 0.0))def start_jobs(procs: int, jobs: JobQueue, results: ResultQueue) -> None:# 填充任務隊列for n in NUMBERS:jobs.put(n)# 啟動工作進程for _ in range(procs):proc = Process(target=worker, args=(jobs, results))proc.start()# 添加毒丸(每個進程一個)for _ in range(procs):jobs.put(0)def main() -> None:# 確定進程數(默認為CPU核心數)procs = cpu_count() if len(sys.argv) < 2 else int(sys.argv[1])print(f'Checking {len(NUMBERS)} numbers with {procs} processes:')t0 = perf_counter()# 創建隊列jobs: JobQueue = SimpleQueue()results: ResultQueue = SimpleQueue()# 啟動任務start_jobs(procs, jobs, results)# 處理結果checked = 0procs_done = 0while procs_done < procs:n, prime, elapsed = results.get()if n == 0: # 毒丸檢測procs_done += 1else:checked += 1label = 'P' if prime else ' 'print(f'{n:16} {label} {elapsed:9.6f}s')elapsed = perf_counter() - t0print(f'{checked} checks in {elapsed:.2f}s')if __name__ == '__main__':main()
3.2 運行結果(8進程)
Checking 20 numbers with 8 processes:2 P 0.000001s
3333333333333333 0.000005s
4444444444444444 0.000001s142702110479723 P 0.419267s
5555555555555555 0.000004s
6666666666666666 0.000000s299593572317531 P 0.517399s
3333333333333301 P 1.804822s
3333335652092209 1.804431s
4444444444444423 P 1.953585s
7777777777777777 0.000005s
4444444488888889 2.084240s
9999999999999999 0.000007s
5555553133149889 2.250911s
5555555555555503 P 2.257135s
6666666666666719 P 2.442794s
6666667141414921 2.534752s
7777777777777753 P 2.199439s
7777777536340681 2.235224s
9999999999999917 P 2.372946s
20 checks in 4.40s
3.3 關鍵改進
- 并行處理:使用8個進程(對應8個物理CPU核心)
- 隊列管理:
JobQueue
:分發待檢測數字ResultQueue
:收集檢測結果
- 毒丸模式 (Poison Pill):用特殊值(0)通知進程終止
- 結果關聯:在結果中存儲原始數字(
PrimeResult.n
)解決亂序問題
3.4 性能提升
方案 | 總耗時 | 加速比 |
---|---|---|
順序執行 | 17.76s | 1x |
多進程(8) | 4.40s | 4.0x |
超線程的真相:雖然系統報告12個邏輯核心(6物理核心×2線程),但CPU密集型任務中實際性能接近物理核心數(6核心)
ps:上面的是原文的內容,但是這個是超線程(Hyper-Threading),是一種由英特爾開發的技術,用于提高處理器的并行處理能力。通過超線程,一個物理核心可以被虛擬化為兩個邏輯核心,這使得操作系統和應用程序可以將其視為兩個獨立的核心進行任務分配。 mac的m芯片并沒有這個超線程。
M2 芯片有 8 個核心,其中包括 4 個性能核心和 4 個能效核心。這意味著在進行 CPU 密集型任務時,你可以使用所有 8 個核心來處理任務。與超線程不同,M2 芯片的核心是物理核心,不是通過超線程技術增加的邏輯核心。因此,你可以在 CPU 密集型任務中充分利用這 8 個物理核心的性能。
在理想情況下,如果你有8個物理核心,并且任務可以完美地分割和并行執行,你可能期望接近8倍的加速。但實際還是比較難的
4. 關鍵技術與工程實踐
4.1 進程間通信 (IPC)
問題:進程間內存隔離,不能直接共享數據
解決方案:使用隊列(multiprocessing.Queue
)
# 創建隊列
from multiprocessing import Queue
job_queue = Queue()
result_queue = Queue()# 生產者(主進程)
job_queue.put(task_data)# 消費者(工作進程)
task = job_queue.get()
result = process(task)
result_queue.put(result)
4.2 毒丸模式 (Poison Pill)
作用:優雅終止工作進程
實現技巧:
- 使用不會與正常數據沖突的哨兵值(0、None、Ellipsis等)
- 每個進程一個毒丸
# 主進程
for _ in range(num_workers):job_queue.put(POISON_PILL) # 發送終止信號# 工作進程
while True:task = job_queue.get()if task is POISON_PILL:break# 處理正常任務
4.3 結果亂序處理
問題:任務完成順序 ≠ 提交順序
解決方案:結果中攜帶原始數據標識
# 錯誤做法(丟失關聯)
results.put(is_prime(n))# 正確做法(保留關聯)
results.put((n, is_prime(n), elapsed_time))
4.4 進程數優化
最佳實踐:進程數 ≈ 物理CPU核心數
圖:不同進程數下的執行時間(6核CPU最佳)
5. 多線程方案的陷阱 (threads.py
)
5.1 為何線程無效
# 偽代碼展示線程方案
from threading import Threaddef worker():while task := task_queue.get():# CPU密集型計算result = heavy_computation(task) # GIL在此阻塞其他線程result_queue.put(result)# 創建多個線程(實際無法并行執行CPU任務)
5.2 GIL (全局解釋器鎖) 的影響
- Python解釋器一次只允許一個線程執行字節碼
- CPU密集型任務無法充分利用多核
- 線程切換反而增加開銷
5.3 實測性能
方案 | 總耗時 | 對比順序執行 |
---|---|---|
順序執行 | 40.31s | 基準 |
12線程 | 42.5s | 更慢 |
關鍵結論:線程適合I/O密集型任務,不適合CPU密集型任務
6. 補充實例:I/O密集型任務對比
6.1 模擬I/O任務
import time
import requests
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutorURLS = ['https://www.example.com/page1','https://www.example.com/page2',# ... 20個URL
]def fetch(url):start = time.perf_counter()response = requests.get(url) # I/O阻塞操作elapsed = time.perf_counter() - startreturn len(response.content), elapsed
6.2 性能對比
方案 | 任務類型 | 20個任務總耗時 |
---|---|---|
順序執行 | I/O | 12.7s |
多線程(20) | I/O | 1.3s |
多進程(20) | I/O | 3.8s |
6.3 關鍵結論
- I/O密集型:多線程最佳(避免進程創建開銷)
- CPU密集型:多進程最佳(避開GIL限制)
- 混合型任務:需根據實際情況測試選擇
7. 工程實踐建議
7.1 避免"自造輪子"
使用標準庫concurrent.futures
簡化代碼:
# 使用ProcessPoolExecutor重構procs.py
from concurrent.futures import ProcessPoolExecutordef main():with ProcessPoolExecutor(max_workers=12) as executor:futures = {executor.submit(is_prime, n): n for n in NUMBERS}for future in as_completed(futures):n = futures[future]prime = future.result()# 處理結果...
7.2 常見陷阱與解決方案
陷阱 | 解決方案 |
---|---|
忘記if __name__ == '__main__' | 始終使用守護模式 |
隊列阻塞導致死鎖 | 設置超時/使用非阻塞方法 |
進程資源泄漏 | 使用上下文管理器管理進程池 |
跨平臺序列化問題 | 避免傳遞lambda/閉包函數 |
7.3 調試技巧
- 簡化復現:減少進程數和任務量
- 日志記錄:
import logging logging.basicConfig(level=logging.DEBUG,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' )
- 使用
ProcessPoolExecutor
替代手動管理進程
8. 總結:Python在多核世界的生存之道
-
CPU密集型任務:
- 使用多進程繞過GIL限制
- 優選
concurrent.futures.ProcessPoolExecutor
- 進程數≈物理CPU核心數
-
I/O密集型任務:
- 使用多線程減少上下文切換開銷
- 優選
concurrent.futures.ThreadPoolExecutor
- 結合
asyncio
實現更高并發
-
混合任務優化:
- 分離CPU和I/O邏輯
- 使用進程池+線程池分層架構
- 考慮C擴展處理關鍵計算
-
分布式擴展:
- 對于超大規模任務
- 使用
Celery
或Dask
跨節點分發 - 結合云服務實現彈性擴展
關鍵洞見:Python通過多進程和異步編程模型,結合豐富的生態系統,能有效利用多核處理器應對各類計算需求。
總結
在介紹一些理論之后,本章展示了通過 Python 三種原生并發編程模型實現的 spinner 腳本:
- 基于
threading
包的線程模型 - 基于
multiprocessing
的進程模型 - 基于
asyncio
的異步協程模型
隨后我們通過實驗探究了全局解釋器鎖(GIL)的實際影響:修改 spinner 示例以計算大整數的素性,并觀察其行為。實驗直觀表明,異步編程(asyncio)中必須避免 CPU 密集型函數,因為它們會阻塞事件循環。實驗的線程版本雖然存在 GIL 限制,但仍能工作——這是因為 Python 會定期中斷線程,且該示例僅使用兩個線程:一個執行 CPU 密集型計算,另一個每秒僅驅動動畫 10 次。多進程版本則繞開了 GIL,通過啟動新進程專門處理動畫,而主進程執行素性檢查。
下一個計算多個素數的示例突出了多進程與線程的區別,證明只有進程能讓 Python 充分利用多核 CPU。對于重計算任務,Python 的 GIL 會使線程性能比順序代碼更差。
GIL 是 Python 并發與并行計算討論的核心,但我們不應高估其影響(如 725 頁“Python 與多核世界”所述)。例如,GIL 對 Python 在系統管理中的許多用例并無影響。另一方面,數據科學和服務器端開發社區已通過針對特定需求的工業級解決方案繞開了 GIL 的限制。最后兩節提到了支持 Python 服務器端應用規模化的兩個常見組件:WSGI 應用服務器和分布式任務隊列。