歡迎來到"一起學點什么吧"的合集「NLP知微集」。在這里,我們不愿宏大敘事,只聚焦于自然語言處理領域中那些細微卻關鍵的“齒輪”與“螺絲釘”。我相信,真正深刻的理解,源于對細節的洞察。本期,我將為您拆解的是:[Python中的線程(三)]。
知微集:Python中的線程(一)
知微集:Python中的線程(二)
前兩期,我們一起對Python中的線程,已經有了較為初步的認識;今天,我們開始Python線程的第三期,主要從以下來探索Python線程:何時使用線程、線程阻塞調用、線程局部數據、線程互斥鎖、可重入線程鎖、線程條件、線程信號、線程事件、線程計時。
何時使用線程
鎖被釋放的情況包括:
- 當一個線程執行阻塞 IO 時。
- 當一個線程執行 C 代碼并顯式釋放鎖時。
完全避免鎖的方法,例如:
- 使用第三方 Python 解釋器執行 Python 代碼。
使用線程進行阻塞 IO
使用線程進行 IO 密集型任務
IO 密集型任務是一種涉及從設備、文件或套接字連接中讀取或寫入的任務。
這些操作涉及輸入和輸出(IO),而這些操作的速度受限于設備、硬盤或網絡連接。這就是為什么這些任務被稱為 IO 相關的。
CPU 真的非常快。現代 CPU,比如 4GHz 的 CPU,每秒可以執行 40 億條指令,而且你的系統很可能有多個 CPU 核心。
與 CPU 的速度相比,執行 IO 操作非常慢。
與設備交互、讀寫文件和套接字連接需要調用操作系統的指令(內核),內核將等待操作完成。如果該操作是 CPU 的主要焦點,例如在 Python 程序的主線程中執行,那么 CPU 將花費許多毫秒甚至許多秒無所事事地等待。
執行 IO 操作的線程將在整個操作期間阻塞。在阻塞期間,這會向操作系統發出信號,表明一個線程可以被掛起,而另一個線程可以執行,這被稱為上下文切換。
此外,當 Python 解釋器執行阻塞 IO 操作時,它會釋放 GIL,從而允許 Python 進程中的其他線程執行。
因此,阻塞 IO 為在 Python 中使用線程提供了一個絕佳的用例。
阻塞 IO 操作的例子包括:
- 從硬盤讀取或寫入文件。
- 向標準輸出、輸入或錯誤(stdin、stdout、stderr)進行讀取或寫入。
- 打印文檔
- 在與服務器的套接字連接上讀取或寫入字節。
- 下載或上傳文件。
- 查詢服務器。
- 查詢數據庫。
- 拍照或錄制視頻。
- …
使用釋放 GIL 的外部 C 語言線程
我們可能會調用那些會進一步調用第三方 C 庫的函數。
通常,這些函數調用會釋放 GIL,因為被調用的 C library 不會與 Python 解釋器交互。
這為 Python 進程中的其他線程提供了運行的機會。
例如,當使用 Python 標準庫中的“hash”模塊時,通過 hash.update() 函數對數據進行哈希處理時會釋放 GIL。
在使用 OpenSSL 提供的哈希算法時,當對大于 2047 字節的數據進行哈希更新操作,Python GIL 會被釋放以允許其他線程運行。
另一個例子是用于管理數據數組的 NumPy 庫,它在執行數組上的函數時會釋放 GIL。
當一個線程正在等待 IO(例如等待你輸入一些內容,或者等待網絡中有數據傳入)時,Python 會釋放 GIL 以便其他線程可以運行。更重要的是,當 NumPy 正在進行數組操作時,Python 同樣會釋放 GIL。
使用第三方 Python 解釋器
存在可以獲取并用于執行您的 Python 代碼的替代商業和開源 Python 解釋器。
這些解釋器中的一些可能會實現全局解釋器鎖(GIL),并且釋放它的頻率或多或少與 CPython 不同。其他解釋器則完全移除了 GIL,允許多個 Python 并發線程并行執行。
包含無全局解釋器鎖(GIL)的第三方 Python 解釋器的例子包括:
- Jython:一種用 Java 編寫的開源 Python 解釋器。
- IronPython:一個用.NET 編寫的開源 Python 解釋器。
… Jython 不受全局解釋器鎖(GIL)的限制。這是因為所有 Python 線程都被映射到 Java 線程,并使用標準的 Java 垃圾回收支持(CPython 中 GIL 存在的主要原因是因為引用計數垃圾回收系統)。這里的重要推論是,你可以使用線程來處理用 Python 編寫的計算密集型任務。
線程阻塞調用
阻塞調用是指一個函數調用在完成之前不會返回。
所有正常函數都是阻塞調用。沒什么大不了的。
在并發編程中,阻塞調用具有特殊含義。
阻塞調用是指那些會等待特定條件并通知操作系統在線程等待期間沒有發生事情的函數調用。
操作系統可能會注意到一個線程正在調用阻塞函數,并決定切換到另一個線程的上下文。操作系統管理哪些線程應該運行以及何時運行它們。它通過一種多任務處理方式實現這一點,即正在運行的線程被掛起,掛起的線程被恢復并繼續運行。線程的掛起和恢復稱為上下文切換。
操作系統傾向于從阻塞線程中切換上下文,允許非阻塞線程運行。
這意味著如果線程執行了一個阻塞函數調用,一個等待的調用,那么它可能會發出信號表示該線程可以被掛起,從而允許其他線程運行。
類似地,許多我們可能傳統上認為會阻塞的函數調用,在現代非阻塞并發 API(如 asyncio)中可能有非阻塞版本。
在并發編程中,需要考慮三種類型的阻塞函數調用,它們是:
- 并發原語上的阻塞調用。
- 用于 IO 的阻塞調用。
- 阻塞調用 sleep。
并發原語上的阻塞調用
在并發編程中,存在許多阻塞函數調用的例子。
常見的例子包括:
-
等待鎖,例如調用 threading.Lock 上的 acquire()。
通過 threading.Lock 獲取互斥鎖(mutex)是一個阻塞調用。
這是通過調用 acquire() 來實現的,如果鎖可用,它將立即返回(不會阻塞),否則將阻塞直到可用。
lock.acquire()
也可以通過使用上下文管理器更清晰地實現,如果鎖不可用,它也可能被阻塞。
with lock:
-
等待通知,例如調用 threading.Condition 對象的 wait()方法。
線程可以等待程序狀態發生某些變化的通知。
with condition:# block until notifiedcondition.wait()
-
等待線程終止,例如調用 threading.Thread 的 join() 方法。
一個線程在等待另一個線程終止時可能會被阻塞。
這是通過等待線程調用運行在其他線程上的 join()函數來實現的。
-
等待信號量,例如調用 threading.Semaphore 的 acquire() 方法。
一個 threading.Semaphore 提供了一個線程安全的計數器,該計數器的上限是預定義的。
一個線程必須使用 acquire() 函數來獲取信號量上的位置。一旦信號量已滿,額外的獲取嘗試必須被阻塞,直到一個位置變得可用。
-
等待事件,例如調用 threading.Event 的 wait() 方法。
一個 threading.Event 是一個線程安全的布爾標志。
一個線程可以通過 wait() 函數來阻塞等待事件被設置。
-
等待屏障,例如調用 threading.Barrier 的 wait() 方法。
一個 threading.Barrier 是一個同步原語,它允許多個線程進行協調。
線程將在屏障上阻塞,直到固定數量的參與者到達,然后它們將全部被釋放。
在并發編程中,阻塞于并發原語是正常現象。
I/O 的阻塞調用
傳統上,與 IO 交互的函數調用通常是阻塞函數調用。也就是說,它們與并發原語中的阻塞調用具有相同的意義。
等待 IO 設備響應是向操作系統發出線程可以上下文切換的另一個信號。
常見的例子包括:
- 硬盤驅動器(Hard disk drive):讀取、寫入、追加、重命名、刪除等文件。
- 外設(Peripherals):鼠標、鍵盤、屏幕、打印機、串口、攝像頭等。
- 互聯網(Internet):下載和上傳文件、獲取網頁、查詢 RSS 等。
- 數據庫(Database):選擇、更新、刪除等 SQL 查詢。
- Email:發送郵件、接收郵件、查詢收件箱等。
還有許多其他示例,大多與套接字相關。
與 CPU 相比,使用設備進行 IO 通常非常慢。
與向文件或套接字讀取或寫入一些字節相比,CPU 可以執行數量級的更多指令。
設備 IO 由操作系統和設備協調。這意味著操作系統可以從設備收集或發送一些字節,然后在需要時切換回阻塞線程,允許函數調用繼續進行。
阻塞調用 sleep
sleep() 函數是由底層操作系統提供的一項功能,我們可以在程序中加以利用。
這是一個阻塞函數調用,它會暫停線程,使其固定時間(以秒為單位)內阻塞。
這可以通過調用 time.sleep() 來實現。
它和并發原語以及阻塞 IO 函數調用具有相同意義上的阻塞調用。它向操作系統發出信號,表明該線程正在等待,并且是上下文切換的良好候選者。
在并發編程中,添加睡眠可以是一種通過線程模擬固定間隔計算工作量的有用方法。
在展示并發編程的工作示例時,我們經常使用睡眠,但向代碼中添加睡眠也有助于單元測試和調試并發失敗條件,例如通過強制動態應用程序中事件的時序錯誤來引發競態條件。
什么是wait
wait 是線程被阻塞時執行的操作。
通常在使用并發原語時,函數調用本身可能具有 wait() 或 await() 的名稱,這表示線程將阻塞,直到條件滿足。
線程局部數據
線程局部數據存儲是多線程編程中的一個機制,它允許數據以每個線程私有的方式存儲和訪問。
它可能被稱為“線程局部存儲”、“線程私有”或簡單地稱為“線程局部”。
通常這涉及創建一個線程局部對象實例,該實例在數據被設置和檢索的對象之間共享。
在線程局部對象上存儲和檢索的數據可能跨線程具有相同的變量名,這意味著相同的代碼可以在不同線程中使用,這是使用工作線程時的常見方法。
重要的是,對線程局部對象的讀取和寫入在線程級別上是獨特且私有的。這意味著一個線程可以寫入一個名為“address”的變量,而另一個線程可以讀取或寫入一個具有相同名稱的變量,但它不會與第一個線程存儲的變量交互。
如果執行相同的代碼并使用相同的線程局部實例,那么每個線程都有自己的命名變量的私有版本及其在線程局部存儲中分配的值。
當多個線程需要存儲本地數據,如部分解決方案或臨時變量,并且需要在執行時使用相同的代碼,例如相同的對象實例時,這會很有用。
線程可以通過 threading.local 類的實例存儲局部數據。
其他線程可以使用局部的相同屬性名,但值將限制在每個線程中。
這就像每個線程都有一個命名空間,稱為“線程局部數據”。這意味著線程無法訪問或讀取其他線程的局部數據。
重要的是,每個線程必須持有“局部”實例才能訪問存儲的數據。
使用線程局部數據的示例
# 線程局部存儲示例
from time import sleep
import threading# 自定義目標函數
def task(value):# 創建局部存儲local = threading.local()# 存儲數據local.value = value# 阻塞一會兒sleep(value)# 檢索值print(f'Stored value: {local.value}')# 創建并啟動線程
threading.Thread(target=task, args=(1,)).start()
# 創建并啟動另一個線程
threading.Thread(target=task, args=(2,)).start()
Stored value: 1
Stored value: 2
線程局部變量的令人興奮之處在于,我們可以在不同線程之間共享同一個實例。
共享線程局部實例的示例
可以創建一個線程局部對象的實例,并在多個線程之間共享它。
重要的是,每個線程可以在同一個同名的線程局部存儲中存儲唯一數據,并且不會相互干擾。
# 使用共享實例的線程局部存儲示例
from time import sleep
import threading# 自定義目標函數
def task(value, local):# 存儲數據local.value = value# 阻塞一會兒sleep(value)# 檢索值print(f'Stored value: {local.value}')# 創建共享的線程局部實例
local = threading.local()
# 創建并啟動線程
start_time = time.time()
threading.Thread(target=task, args=(1,local)).start()
print(f'Started in {time.time() - start_time:.2f}s')
# 等待一會兒
sleep(0.5)
# 創建并啟動另一個線程
threading.Thread(target=task, args=(2,local)).start()
print(f'Started in {time.time() - start_time:.2f}s')
Started in 0.00s
Started in 0.50s
Stored value: 1
Stored value: 2
首先運行示例會創建線程局部實例,并將其傳遞給第一個線程,該線程將其存儲在名為“value”的線程局部實例中,并休眠一秒鐘。
主線程隨后阻塞了一小段時間。
接下來,創建并啟動了第二個線程,將其值存儲在與相同名稱“value”相同的線程局部實例中,然后阻塞了兩秒鐘。
第一個線程醒來并報告其值為“1”,然后第二個線程醒來并報告其值為“2”。
這表明當線程局部實例在多個線程間共享,并且每個線程都將其私有數據存儲在相同的屬性中時,情況是這樣的。
全局線程局部實例示例
通過將線程局部實例設為全局變量,并在每個函數中直接訪問它來實現相同的結果。
通過在目標任務函數中顯式定義全局變量,然后像之前一樣使用它來實現
使用線程局部變量作為線程私有的存儲空間是線程局部機制的一種常見使用模式。
# 使用全局實例的線程本地存儲示例
import time
from time import sleep
import threading# 自定義目標函數
def task(value):global local# 存儲數據local.value = value# 阻塞一會兒sleep(value)# 檢索值print(f'Stored value: {local.value}')# 創建共享的線程本地實例
local = threading.local()
start_time = time.time()
# 創建并啟動線程
threading.Thread(target=task, args=(1,)).start()
print(f'Started in {time.time() - start_time:.2f}s')
# 等待一會兒
sleep(0.5)
# 創建并啟動另一個線程
threading.Thread(target=task, args=(2,)).start()
print(f'Started in {time.time() - start_time:.2f}s')
Started in 0.00s
Started in 0.50s
Stored value: 1
Stored value: 2
何時使用線程局部數據?
當每個線程必須存儲不同的數據,且這些數據不應被其他線程訪問時,線程本地數據很有用。
一個線程局部存儲機制允許每個線程:
- 執行相同的代碼。
- 使用相同的變量名。
- 共享相同的線程局部實例。
這是在確保線程存儲的數據只能被該線程訪問的同時實現的,使其在線程級別上是私有的,因此得名“線程局部”。
何時不應使用線程局部數據?
線程局部數據不應用于在多個線程之間共享數據。
這是因為數據變量對每個線程都是私有的,并且無法被訪問。
Thread-Local 的常見模式是什么?
- 全局線程局部:將線程局部作為全局變量創建,并在多個線程執行的函數中使用它。
- 靜態線程局部:將線程局部作為類變量(例如靜態)創建,并在多個方法和多個線程執行的相同對象的多個實例中使用它。
線程互斥鎖
在編寫并發程序時,我們可能需要在線程之間共享數據或資源,這些通常需要用互斥鎖來保護。
互斥鎖用于保護代碼的關鍵部分免受并發執行。
可以通過 threading.Lock 類在 Python 中使用互斥鎖(mutex)
互斥鎖
互斥鎖或互斥量是一種用于防止競態條件的同步原語。
競態條件是一種并發失敗情況,當兩個線程執行相同的代碼并訪問或更新相同的資源(例如數據變量、流等)時,會導致資源處于未知且不一致的狀態。
競態條件常常導致程序出現意外行為和/或數據損壞。
能夠被多個線程并發執行且可能導致競態條件的代碼敏感部分被稱為臨界區。臨界區可以指單個代碼塊,但也指多個函數從多個地方對同一數據變量或資源進行訪問。
確保互斥最常用的機制是互斥鎖或簡稱互斥鎖(mutex),它是一種在底層硬件中具有支持的特殊對象。基本思想是每個臨界區都由一個鎖來保護。
互斥鎖可用于確保同一時間只有一個線程執行代碼的關鍵部分,而所有其他試圖執行相同代碼的線程必須等待當前正在執行的線程完成關鍵部分并釋放鎖。
當一個線程“擁有”鎖——也就是說,它已經返回了鎖函數的調用,但還沒有調用解鎖函數——任何其他試圖執行關鍵部分代碼的線程都會在其鎖函數調用中等待。
每個線程必須在臨界區的開始嘗試獲取鎖。如果鎖尚未被獲取,則某個線程將獲取它,而其他線程必須等待獲取鎖的線程釋放它。
在線程執行臨界區代碼之前,它必須通過調用互斥鎖函數來“obtain獲取”互斥鎖,當它完成在臨界區執行代碼后,應該通過調用解鎖函數來“relinquish釋放”互斥鎖。
如果鎖沒有被獲取,我們可能稱其為處于“未鎖定(unlocked)”狀態。而如果鎖已被獲取,我們可能稱其為處于“已鎖定(locked)”狀態。
- UnLocked:鎖未被獲取,并且可以被下一個嘗試獲取的線程獲取。
- Locked:該鎖已被一個線程獲取,任何試圖獲取它的線程都必須等待它被釋放。
鎖是在未鎖定狀態下創建的。
如何使用互斥鎖
Python 通過 threading.Lock 類提供互斥鎖。
鎖的實例可以被創建,然后在線程訪問臨界區之前獲取,在臨界區之后釋放。
任何時候只有一個線程可以擁有鎖。如果一個線程沒有釋放已獲得的鎖,它將無法再次被獲取。
嘗試獲取鎖的線程將被阻塞,直到獲取到鎖,例如如果另一個線程當前持有鎖然后釋放了它。
我們可以通過將“blocking”參數設置為 False 來嘗試獲取鎖而不阻塞。如果無法獲取鎖,則返回 False 值。
lock.acquire(blocking=false)
我們也可以嘗試帶超時地獲取鎖,即在設定的秒數內等待獲取鎖,如果超時則放棄。如果無法獲取鎖,則返回值 False。
lock.acquire(timeout=10)
我們也可以通過上下文管理器協議,使用 with 語句來使用鎖,使得臨界區成為鎖使用過程中的一個代碼塊。
...
# create a lock
lock = Lock()
# acquire the lock
with lock:# ...
這是首選用法,因為它清楚地表明了受保護代碼的開始和結束位置,并確保鎖始終被釋放,即使臨界區內部出現異常或錯誤也是如此。
我們還可以通過 locked()函數檢查鎖是否當前被線程獲取。
...
# check if a lock is currently acquired
if lock.locked():# ...
使用互斥鎖的示例
# SuperFastPython.com
# 互斥鎖(mutex)示例
from time import sleep
from random import random
from threading import Thread
from threading import Lock# 工作函數
def task(lock, identifier, value):# 獲取鎖with lock:print(f'>thread {identifier} got the lock, sleeping for {value}')sleep(value)# 創建共享鎖
lock = Lock()
# 啟動幾個嘗試執行相同臨界區的線程
for i in range(10):# 啟動線程Thread(target=task, args=(lock, i, random())).start()
# 等待所有線程完成...
運行示例會啟動十個線程,所有線程都執行一個自定義的目標函數。
每個線程嘗試獲取鎖,一旦獲取成功,它們會報告一條包含其 ID 以及它們在釋放鎖之前將睡眠多長時間的消息。
由于使用了隨機數,您的具體結果可能會有所不同。
>thread 0 got the lock, sleeping for 0.3595121656333118
>thread 1 got the lock, sleeping for 0.4432990299676699
>thread 2 got the lock, sleeping for 0.20874337791325426
>thread 3 got the lock, sleeping for 0.9722774087786338
>thread 4 got the lock, sleeping for 0.7095006286856006
>thread 5 got the lock, sleeping for 0.5762733611611981
>thread 6 got the lock, sleeping for 0.27261346362585626
>thread 7 got the lock, sleeping for 0.6444981069336172
>thread 8 got the lock, sleeping for 0.43306723744296294
>thread 9 got the lock, sleeping for 0.9158837061873222
如果不釋放鎖會怎樣?
不釋放鎖是一種并發錯誤。
如果一個線程獲得了鎖但沒有釋放它,那么這個鎖就不能被再次獲取,受臨界區保護的代碼也無法再次執行。
如果忽略鎖會怎樣?
忽視鎖是一種并發錯誤。
可以編寫代碼,使得某些線程遵守鎖,而另一些線程不遵守。這很可能導致競爭條件,從而違背了設置鎖的初衷。
該鎖只有在所有對臨界區(數據、資源等)的訪問中都強制執行時,才能保護臨界區。
線程鎖 threading.Lock 是否可重入?
不可以
可重入鎖是一種鎖,如果被一個線程持有,該線程可以再次獲取該鎖。
threading.Thread 鎖不是可重入的,這意味著如果一個線程獲取了鎖,并在代碼的其他部分嘗試再次獲取它,這將導致阻塞,從而引發死鎖錯誤(一種并發失敗條件)。
可重入互斥鎖在 threading.RLock 類中可用。
可重入線程鎖
標準的互斥鎖不允許線程多次獲取鎖。這意味著在一個臨界區中的代碼不能調用或執行受同一鎖保護的另一個臨界區中的代碼。相反,需要一種不同類型的互斥鎖,稱為可重入鎖。
可重入鎖是一種可以被同一個線程多次獲取的鎖。
可以通過 threading.RLock 類在 Python 中使用可重入鎖
可重入鎖
可重入互斥鎖,簡稱“可重入互斥量”或“可重入鎖”,類似于互斥鎖,但它允許線程多次獲取該鎖。
可重入鎖是一種同步原語,可以被同一個線程多次獲取。 […] 在鎖定狀態下,某個線程擁有該鎖;在未鎖定狀態下,沒有線程擁有它。
一個線程可能因為多種原因需要多次獲取同一個鎖。
我們可以想象關鍵部分分布在一個或多個函數中,每個部分都由同一個鎖保護。線程在正常執行過程中可能會調用這些函數,也可能從一個關鍵部分調用另一個關鍵部分。
一個(非可重入)互斥鎖的局限性在于,如果一個線程已經獲取了該鎖,則它無法再次獲取該鎖。實際上,這種情況將導致死鎖,因為它將永遠等待鎖被釋放以便獲取,但它持有該鎖且不會釋放。
可重入鎖允許線程在已經獲取該鎖的情況下再次獲取相同的鎖。這允許線程在受相同可重入鎖保護的情況下,從臨界區內部執行臨界區。
每次線程獲取鎖時,也必須釋放它,這意味著擁有該線程的獲取和釋放操作存在遞歸層次。因此,這種類型的鎖有時被稱為“遞歸互斥鎖”。
鎖與可重入鎖的區別
Python 通過 threading.Lock 類提供互斥鎖,并通過 threading.RLock 類提供可重入鎖。
- threading.Lock: Python 互斥鎖。
- threading.RLock: Python 可重入互斥鎖。
一個 threading.Lock 只能被獲取一次,一旦被獲取,同一線程或任何其他線程都不能再次獲取它,直到它被釋放。
一個 threading.RLock 可以被同一線程多次獲取,盡管一旦被某個線程獲取,其他線程不能獲取它,直到它被釋放。
重要的是,每次當同一個線程獲取 threading.RLock 時,必須以相同次數釋放,直到它可被另一個線程再次獲取。這意味著 acquire()的調用次數必須與 release()的調用次數相同,RLock 才能返回到未鎖定狀態。
如何使用可重入鎖
Python 通過 threading.RLock 類提供鎖。
RLock 的實例可以在線程訪問臨界區之前創建并獲取,在臨界區之后釋放。
嘗試獲取鎖的線程將被阻塞,直到獲取到鎖,例如如果另一個線程當前持有鎖(一次或多次)然后釋放它。
我們可以通過將“blocking”參數設置為 False 來嘗試獲取鎖而不阻塞。如果無法獲取鎖,則返回 False 值。
lock.acquire(blocking=false)
我們也可以嘗試帶超時地獲取鎖,即在設定的秒數內等待獲取鎖,如果超時則放棄。如果無法獲取鎖,則返回值 False。
lock.acquire(timeout=10)
我們也可以通過上下文管理器協議使用 with 語句來使用可重入鎖,使得臨界區成為鎖使用過程中的一個代碼塊。
...
# create a reentrant lock
lock = RLock()
# acquire the lock
with lock:# ...
使用可重入鎖的示例
鑒于目標任務函數受鎖保護,且調用也受相同鎖保護的報告函數,我們可以使用可重入鎖,這樣如果線程在 task()中獲取了鎖,它將能夠在 report()函數中重新進入該鎖。
# 可重入鎖示例
from time import sleep
from random import random
from threading import Thread
from threading import RLock# 報告函數
def report(lock, identifier):# 獲取鎖with lock:print(f'>thread {identifier} done')# 工作函數
def task(lock, identifier, value):# 獲取鎖with lock:print(f'>thread {identifier} sleeping for {value}')sleep(value)# 報告report(lock, identifier)# 創建共享的可重入鎖
lock = RLock()
# 啟動幾個嘗試執行相同臨界區的線程
for i in range(10):# 啟動線程Thread(target=task, args=(lock, i, random())).start()
# 等待所有線程完成...
運行示例會啟動十個線程來執行目標任務函數。
一次只能有一個線程獲取鎖,獲取后,會阻塞并重新進入同一個鎖以報告完成消息。
如果使用非可重入鎖,例如 threading.Lock,則線程將永遠阻塞等待鎖變得可用,但它無法實現這一點,因為線程已經持有鎖。
線程條件
條件允許線程等待和被通知。
通過 threading.Condition 類在 Python 中使用線程條件對象。
線程條件(Threading Condition)
在并發中,條件(也稱為監視器)允許多個線程被通知某些結果。
結合了互斥鎖(mutex)和條件變量
一個互斥鎖可以用來保護一個臨界區,但它不能用來通知其他線程一個條件已經改變或滿足。
一個條件可以被一個線程(類似于互斥鎖)獲取,之后它可以等待另一個線程通知它某些事情已經改變。在等待時,該線程被阻塞,并釋放鎖以便其他線程獲取。
然后,另一個線程可以獲取條件變量,進行更改,并通知一個、全部或等待該條件變量的線程子集某些內容已更改。等待的線程隨后可以被喚醒(由操作系統調度),重新獲取條件變量(互斥鎖),檢查任何已更改的狀態,并執行所需的操作。
這表明一個條件內部使用互斥鎖(用于獲取/釋放條件),但它還提供了其他功能,例如允許線程在條件上等待,并允許線程通知其他在條件上等待的線程。
使用條件對象
Python 通過 threading.Condition 類提供了一個條件變量。
可以創建一個條件對象,默認情況下它將創建一個新的可重入互斥鎖(threading.RLock 類),該鎖將內部使用。
condition = threading.Condition()
...
# acquire the condition
condition.acquire()
# wait to be notified
condition.wait()
# release the condition
condition.release()
直接調用 acquire() 和 release() 函數的另一種方法是使用上下文管理器,它將自動為我們執行獲取/釋放操作
...
# acquire the condition
with condition:# wait to be notifiedcondition.wait()# condition.wait(timeout=10) #該參數允許線程在指定的秒數時間限制后停止阻塞。# condition.wait_for(all_data_collected) #函數可用于僅在滿足某個條件時才解鎖等待的線程,例如調用返回布爾值的函數。
通過 notify()函數通知一個正在等待的線程
...
# acquire the condition
with condition:# notify a waiting threadcondition.notify()
使用條件變量的等待和通知示例
# 使用條件變量的等待/通知示例
from time import sleep
from threading import Thread
from threading import Condition# 準備一些工作的目標函數
def task(condition, work_list):# 阻塞一會兒sleep(1)# 向工作列表添加數據work_list.append(33)# 通知等待的線程工作已完成print('Thread sending notification...')with condition:condition.notify()# 創建條件變量
condition = Condition()
# 準備工作列表
work_list = list()
# 等待通知數據已準備就緒
print('Main thread waiting for data...')
with condition:# 啟動新線程執行一些工作worker = Thread(target=task, args=(condition, work_list))worker.start()# 等待通知condition.wait()
# 我們知道數據已準備就緒
print(f'Got data: {work_list}')
首先運行示例會創建條件和任務列表。
新線程被定義并啟動。線程暫時阻塞,向列表中添加數據,然后通知等待的線程。
與此同時,主線程等待新線程的通知,一旦收到通知,它就知道數據已準備好并報告結果。
使用 wait() 和 notify_all() 的條件變量示例
from time import sleep
from random import random
from threading import Thread
from threading import Condition# 目標函數
def task(condition, number):# 等待通知print(f'Thread {number} waiting...')with condition:condition.wait()# 阻塞一會兒value = random()sleep(value)# 報告結果print(f'Thread {number} got {value}')# 創建條件變量
condition = Condition()
# 啟動一堆等待通知的線程
for i in range(5):worker = Thread(target=task, args=(condition, i))worker.start()
# 阻塞一會兒
sleep(1)
# 通知所有等待的線程可以運行
with condition:# 等待通知condition.notify_all()
# 阻塞直到所有非守護線程完成...
首先運行示例會創建五個線程,這些線程立即開始運行,并且都獲取了條件變量并阻塞等待通知。
主線程阻塞片刻,然后通知所有五個等待線程。等待線程被喚醒,逐一獲取條件變量中的鎖,執行其處理并報告其結果。
程序在所有線程完成其處理后將退出。
wait_for() 的使用示例:條件
# 使用條件變量等待的示例
from time import sleep
from random import random
from threading import Thread
from threading import Condition# 目標函數
def task(condition, work_list):# 獲取條件變量with condition:# 阻塞一會兒value = random()sleep(value)# 向列表添加工作work_list.append(value)print(f'Thread added {value}')# 通知等待的線程condition.notify()# 創建條件變量
condition = Condition()
# 定義工作列表
work_list = list()
# 啟動一堆線程,它們將向列表添加工作
for i in range(5):worker = Thread(target=task, args=(condition, work_list))worker.start()
# 等待所有線程將工作添加到列表中
with condition:# 等待通知condition.wait_for(lambda : len(work_list)==5)print(f'Done, got: {work_list}')
首先運行示例會啟動五個線程,每個線程將獲取條件變量,生成一個隨機值,將其添加到共享工作列表中,并通知主線程。
主線程等待條件變量,并在每個新線程完成時收到通知,但直到 lambda 可調用對象返回 True 時才實際繼續執行并打印消息,即當列表中的值數量與線程數量相匹配時。
線程信號量(Thread Semaphore)
信號量本質上是一個由互斥鎖保護的計數器,用于限制可以訪問資源的線程數量。
在 Python 中,可以通過 threading.Semaphore 類使用信號量。
什么是信號量
信號量是一種并發原語,它允許限制可以獲取保護關鍵區的鎖的線程數量。
它是一種互斥鎖(mutex)鎖的擴展,增加了可以獲取鎖的線程數量計數,以便在額外的線程阻塞之前。一旦滿員,新線程只能在一個持有信號量的現有線程釋放一個位置后才能獲取信號量的位置。
在內部,信號量維護一個由互斥鎖保護的計數器,每次獲取信號量時計數器會遞增,每次釋放信號量時計數器會遞減。
當創建一個信號量時,計數器的上限被設定。如果設定為 1,那么信號量將像互斥鎖一樣運行。
一個信號量提供了一個有用的并發工具,用于限制可以并發訪問資源的線程數量。一些例子包括:
- 限制對服務器的同時套接字連接。
- 限制硬盤上的并發文件操作。
- 限制并發計算。
如何使用信號量
Python 通過 threading.Semaphore 類提供信號量。
在創建 threading.Semaphore 實例時,必須對其進行配置以設置內部計數器的限制。此限制將匹配可以持有信號量的并發線程數量。
...
# create a semaphore with a limit of 100
semaphore = Semaphore(100)
在此實現中,每次獲取信號量時,內部計數器都會遞減。每次釋放信號量時,內部計數器都會遞增。如果信號量沒有可用的位置,則無法獲取信號量,在這種情況下,嘗試獲取它的線程必須阻塞,直到某個位置變得可用。
# 信號量可以通過調用 acquire()函數來獲取
# 默認情況下,這是一個阻塞調用,這意味著調用線程將被阻塞,直到信號量上出現一個可用位置。
semaphore.acquire()# "blocking" 參數可以設置為 False,在這種情況下,如果信號量上沒有可用位置,線程將不會阻塞,函數將立即返回,返回 False 值表示信號量未被獲取,或者返回 True 值表示成功獲取了位置。
semaphore.acquire(blocking=False)# “timeout”參數可以設置為調用線程在信號量不可用時愿意等待的秒數,如果無法獲得則放棄。同樣,acquire()函數在能夠獲得位置時將返回 True,否則返回 False。
semaphore.acquire(timeout=10)
一旦獲取,可以通過調用 release() 函數再次釋放信號量。
semaphore.release()
with semaphore:# ...
使用信號量的示例
# 使用信號量的示例
from time import sleep
from random import random
from threading import Thread
from threading import Semaphore# 目標函數
def task(semaphore, number):# 嘗試獲取信號量with semaphore:# 處理value = random()sleep(value)# 報告結果print(f'Thread {number} got {value}')# 創建信號量
semaphore = Semaphore(2)
# 創建一系列線程
for i in range(10):worker = Thread(target=task, args=(semaphore, i))worker.start()
# 等待所有工作線程完成...
首先運行示例會創建信號量實例,然后啟動十個工作線程。
所有十個線程嘗試獲取信號量,但每次只有兩個線程被授予位置。在信號量上的線程完成工作后,會在隨機的時間間隔內釋放信號量。
每個信號量的釋放(通過上下文管理器)允許另一個線程獲取一個位置并執行其計算,同時僅允許兩個線程在任何時候被處理,盡管所有十個線程都在執行它們的 run 方法。
線程事件(Thread Event)
事件是一個線程安全的布爾標志。
使用 Python 中的 threading.Event 類來使用一個事件對象。
如何使用事件對象
Python 通過 threading.Event 類提供了一個事件對象。
事件是一種簡單的并發原語,允許線程之間進行通信。
一個 threading.Event 對象封裝了一個布爾變量,該變量可以是“設置”(True)或“未設置”(False)。共享該事件實例的線程可以檢查事件是否已設置、設置事件、清除事件(使其未設置)或等待事件被設置。
線程中的 Event 提供了一種便捷的方式,用于在多個線程之間共享一個布爾變量,該變量可以作為觸發某個動作的信號。
這是線程間通信中最簡單的機制之一:一個線程發出事件信號,其他線程等待該信號。
# 創建一個事件對象,該事件將處于“未設置”狀態
event = threading.Event()# 創建后,我們可以通過 is_set() 函數檢查事件是否已被設置,如果事件已設置,該函數將返回 True,否則返回 False。
# check if the event is set
if event.is_set():# do something...# 該事件可以通過 set() 函數進行設置。任何等待該事件被設置的線程都將被通知。
event.set()# 通過 clear() 函數可以將事件標記為“未設置”(無論其當前是否已設置)。
event.clear()# 最后,線程可以通過 wait()函數等待事件被設置。調用此函數將阻塞,直到事件被標記為已設置(例如,另一個線程調用了 set()函數)。如果事件已經設置,wait()函數將立即返回。
event.wait()
# 在審查 threading.Event 的源代碼時,只有當 set() 函數被調用時,等待的線程才會被通知,而 clear() 函數被調用時則不會。# 可以向 wait() 函數傳遞一個“超時”參數,該參數將限制線程等待事件被標記為已設置的時間(以秒為單位)。
# 如果等待期間事件被設置,wait() 函數將返回 True;否則,返回 False 的值表示事件未被設置且調用超時。
event.wait(timeout=10)
使用事件對象的示例
# 使用事件對象的示例
from time import sleep
from random import random
from threading import Thread
from threading import Event# 目標任務函數
def task(event, number):# 等待事件被設置event.wait()# 開始處理value = random()sleep(value)print(f'Thread {number} got {value}')# 創建共享事件對象
event = Event()
# 創建一系列線程
for i in range(5):thread = Thread(target=task, args=(event, i))thread.start()
# 阻塞一會兒
print('Main thread blocking...')
sleep(2)
# 在所有線程中開始處理
event.set()
# 等待所有線程完成...
首先運行示例會創建并啟動五個線程。每個線程在開始處理之前都會等待它前面的事件。
主線程暫時阻塞,讓所有線程開始運行并等待事件。然后主線程設置該事件。這會觸發所有五個線程執行它們的處理并報告消息。
計時線程(Timer Threads)
一個計時器線程將在時間延遲后執行一個函數。
可以通過 threading.Timer 類在 Python 中使用計時器線程對象。
如何使用計時器線程
Python 在 threading.Timer 類中提供了一個計時器線程。
threading.Timer 是 threading.Thread 類的擴展,這意味著我們可以像使用普通線程實例一樣使用它。
它提供了一種在經過一定時間間隔后執行函數的實用方法。
# 創建一個計時器實例并進行配置。這包括在執行前等待的時間(以秒為單位)、觸發時執行的函數以及傳遞給目標函數的任何參數。
timer = Timer(10, task, args=(arg1, arg2))# 目標任務函數不會執行,直到時間流逝完畢。
# 一旦創建,必須通過調用 start() 函數來啟動線程,這將開始計時器。
timer.start()# 如果我們決定在目標函數執行之前取消定時器,可以通過調用 cancel()函數來實現。
timer.cancel()# 一旦時間已過,目標函數已執行,計時器線程就不能再被復用了。
使用計時器線程的示例
# 使用線程定時器對象的示例
from threading import Timer# 目標任務函數
def task(message):# 報告自定義消息print(message)# 創建線程定時器對象
timer = Timer(3, task, args=('Hello world',))
# 啟動定時器對象
timer.start()
# 等待定時器完成
print('Waiting for the timer...')
首先運行示例會創建一個 threading.Timer 對象,并指定目標任務函數 task()以及單個參數。
計時器隨后啟動,主線程等待計時器線程完成。
計時器線程運行,等待配置的 3 秒鐘,然后執行 task()函數報告自定義消息。
取消計時器線程的示例
# 使用線程定時器對象的示例
from time import sleep
from threading import Timer# 目標任務函數
def task(message):# 報告自定義消息print(message)# 創建線程定時器對象
timer = Timer(3, task, args=('Hello world',))
# 啟動定時器對象
timer.start()
# 阻塞一會兒
sleep(1)
# 取消線程
print('Canceling the timer...')
timer.cancel()
Canceling the timer...
運行示例會啟動計時器線程,目標任務函數 task()將在 3 秒后觸發。
主線程等待一秒鐘,然后取消計時器線程,阻止task()函數執行并向報告消息。
結語
“見微知著,積跬步以至千里”,至此,關于 [Python線程] 的微探索之旅才緩緩拉開序幕,后續將一起詳細去探索一下Python的線程相關問題。感謝您的耐心閱讀。希望這片小小的“知微”碎片,能讓你對Python線程有更清晰的認識。
點贊關注不迷路,點擊合集標簽「[#NLP知微集](javascript:😉」,不錯過每一次細微的洞察。
下期再見,繼續我們的拆解之旅!
Reference
行,計時器線程就不能再被復用了。
## 使用計時器線程的示例```python
# 使用線程定時器對象的示例
from threading import Timer# 目標任務函數
def task(message):# 報告自定義消息print(message)# 創建線程定時器對象
timer = Timer(3, task, args=('Hello world',))
# 啟動定時器對象
timer.start()
# 等待定時器完成
print('Waiting for the timer...')
首先運行示例會創建一個 threading.Timer 對象,并指定目標任務函數 task()以及單個參數。
計時器隨后啟動,主線程等待計時器線程完成。
計時器線程運行,等待配置的 3 秒鐘,然后執行 task()函數報告自定義消息。
取消計時器線程的示例
# 使用線程定時器對象的示例
from time import sleep
from threading import Timer# 目標任務函數
def task(message):# 報告自定義消息print(message)# 創建線程定時器對象
timer = Timer(3, task, args=('Hello world',))
# 啟動定時器對象
timer.start()
# 阻塞一會兒
sleep(1)
# 取消線程
print('Canceling the timer...')
timer.cancel()
Canceling the timer...
運行示例會啟動計時器線程,目標任務函數 task()將在 3 秒后觸發。
主線程等待一秒鐘,然后取消計時器線程,阻止task()函數執行并向報告消息。
結語
“見微知著,積跬步以至千里”,至此,關于 [Python線程] 的微探索之旅才緩緩拉開序幕,后續將一起詳細去探索一下Python的線程相關問題。感謝您的耐心閱讀。希望這片小小的“知微”碎片,能讓你對Python線程有更清晰的認識。
點贊關注不迷路,點擊合集標簽「[#NLP知微集](javascript:😉」,不錯過每一次細微的洞察。
下期再見,繼續我們的拆解之旅!
Reference
- https://superfastpython.com/threading-in-python/