介紹:
????????線程(Threads)是操作系統提供的一種輕量級的執行單元,可以在一個進程內并發執行多個任務。每個線程都有自己的執行上下文,包括棧、寄存器和程序計數器。
????????在Python中,可以使用threading
模塊創建和管理線程。線程可以同時執行多個任務,可以在一個線程中執行耗時操作,而不會阻塞其他線程的執行。線程之間共享進程的資源,如內存空間,因此需要注意線程安全的問題。
????????然而,Python的線程在特定情況下可能會受到全局解釋器鎖(Global Interpreter Lock,GIL)的限制。GIL是一種機制,它確保同一時刻只有一個線程可以執行Python字節碼。這意味著在多線程場景下,即使有多個線程,也無法真正實現并行執行。因此,在CPU密集型的任務中,Python的線程并不能充分利用多核處理器的能力。
1、導入threading
模塊
在使用Python線程之前,首先需要導入?threading
?模塊。可以使用以下語句導入該模塊:
import threading
2、創建線程
????????使用?threading.Thread
?類創建線程對象。可以通過傳遞一個可調用的目標函數和其他參數來實例化線程對象。目標函數是線程實際執行的任務。
# 定義一個目標函數作為線程的執行任務
def my_task(arg1, arg2):# 執行任務的代碼# 創建線程對象
my_thread = threading.Thread(target=my_task, args=(arg1, arg2))
3、啟動線程
????????通過調用線程對象的?start()
?方法來啟動線程。啟動線程后,它將在后臺運行,執行目標函數中的代碼。
my_thread.start()
4、等待線程完成
????????可以使用?join()
?方法等待線程執行完畢。調用?join()
?方法會阻塞當前線程,直到目標線程執行完成。
my_thread.join()
5、線程同步
????????在多線程編程中,線程之間的同步是一項重要的任務,旨在確保線程按照預期的順序執行,并避免競態條件和數據不一致的問題。Python提供了幾種同步原語,常用的包括鎖(Lock)、信號量(Semaphore)、事件(Event)和條件變量(Condition)。下面詳細介紹這些同步原語的特點和使用方法:
鎖(Lock)
????????鎖是一種最基本的同步原語,在Python中由?threading.Lock
?類實現。它提供了兩個主要方法:acquire()
?和?release()
。一個線程可以通過調用?acquire()
?來獲取鎖,如果鎖當前沒有被其他線程持有,則該線程將獲得鎖并繼續執行,否則將被阻塞直到鎖被釋放。當線程完成對臨界區的訪問后,應該調用?release()
?來釋放鎖,以便其他線程可以獲取它。
import threading# 創建鎖對象
lock = threading.Lock()# 線程函數
def thread_function():lock.acquire()# 臨界區代碼lock.release()
鎖還支持上下文管理器的使用方式,可以使用?with
?語句來自動獲取和釋放鎖:
import threading# 創建鎖對象
lock = threading.Lock()# 線程函數
def thread_function():with lock:# 臨界區代碼
信號量(Semaphore)
????????信號量是一種更高級的同步原語,用于控制對共享資源的并發訪問。Python中的信號量由?threading.Semaphore
?類實現。信號量維護一個內部計數器,線程可以通過調用?acquire()
?來減少計數器的值,如果計數器為零,則線程將被阻塞。線程在完成對共享資源的訪問后,應該調用?release()
?來增加計數器的值,以便其他線程可以獲取信號量。
import threading# 創建信號量對象
semaphore = threading.Semaphore(value=3) # 設置初始計數器值為3# 線程函數
def thread_function():semaphore.acquire()# 訪問共享資源semaphore.release()
信號量的計數器可以控制同時訪問共享資源的線程數量。
事件(Event)
????????事件是一種用于線程間通信的同步原語,由?threading.Event
?類實現。事件有兩種狀態:已設置和未設置。線程可以通過調用?set()
?來設置事件,將其狀態設置為已設置;通過調用?clear()
?可以將事件狀態設置為未設置。線程可以通過調用?wait()
?來等待事件的設置,如果事件已設置,則線程可以繼續執行,否則將被阻塞。
import threading# 創建事件對象
event = threading.Event()# 線程函數
def thread_function():event.wait() # 等待事件設置# 執行操作# 主線程設置事件
event.set()
事件還可以使用?is_set()
?方法來檢查事件的狀態。
條件變量(Condition)
????????條件變量是一種復雜的同步原語,由?threading.Condition
?類實現。它提供了一個條件隊列,允許線程等待某個條件的發生。條件變量結合鎖一起使用,可以實現更復雜的線程間同步。
import threading# 創建條件變量對象
condition = threading.Condition()# 線程函數 A
def thread_function_a():with condition:while not condition_predicate():condition.wait()# 執行操作# 線程函數 B
def thread_function_b():with condition:# 修改條件condition.notify() # 通知等待的線程# 主線程
with condition:# 修改條件condition.notify() # 通知等待的線程
????????在線程函數 A 中,線程會等待條件謂詞成立的情況下才繼續執行,否則會調用?wait()
?方法將線程掛起。線程函數 B 可以在某個條件發生變化時調用?notify()
?方法來通知等待的線程。
6、共享數據
????????共享數據是指多個線程同時訪問和修改的數據。當多個線程同時讀寫共享數據時,可能會發生競態條件(Race Condition)和數據損壞的問題。為了確保線程安全性,需要采取適當的同步措施來保護共享數據。以下是一些常用的同步機制和技術:
鎖(Lock)
????????鎖是一種最常見的同步原語,用于保護共享數據的互斥訪問。在多線程環境中,一個線程可以通過獲取鎖來獨占地訪問共享數據,其他線程必須等待鎖釋放后才能訪問。鎖可以使用?threading.Lock
?類來實現,通過調用?acquire()
?和?release()
?方法來獲取和釋放鎖。
import threading# 創建鎖對象
lock = threading.Lock()# 共享數據
shared_data = 0# 線程函數
def thread_function():global shared_datalock.acquire()# 訪問和修改共享數據shared_data += 1lock.release()
在訪問共享數據之前獲取鎖,確保同一時間只有一個線程可以修改數據,從而避免競態條件。
信號量(Semaphore)
????????信號量也可以用于保護共享數據的訪問,在多線程環境中控制并發訪問的數量。信號量維護一個內部計數器,線程在訪問共享數據之前通過獲取信號量來減少計數器的值,如果計數器為零,則線程將被阻塞。線程在完成對共享數據的訪問后,通過釋放信號量來增加計數器的值,從而允許其他線程繼續訪問。
import threading# 創建信號量對象
semaphore = threading.Semaphore()# 共享數據
shared_data = 0# 線程函數
def thread_function():global shared_datasemaphore.acquire()# 訪問和修改共享數據shared_data += 1semaphore.release()
通過適當設置信號量的初始值,可以控制同時訪問共享數據的線程數量。
其他同步原語
????????Python還提供了其他一些同步原語,如條件變量(Condition)和事件(Event)。它們可以用于更復雜的同步需求,如線程之間的通信和等待特定條件的發生。
import threading# 創建條件變量對象
condition = threading.Condition()# 共享數據
shared_data = []# 線程函數 A
def thread_function_a():with condition:while not condition_predicate():condition.wait()# 訪問和修改共享數據# 線程函數 B
def thread_function_b():with condition:# 修改條件condition.notify() # 通知等待的線程
????????在上述示例中,線程函數 A等待條件謂詞成立的情況下才能訪問共享數據,線程函數 B在條件發生變化時通過?notify()
?方法通知等待的線程。
7、線程狀態
????????線程狀態是指線程在不同的時間點上所處的狀態,它反映了線程的執行情況和可用性。在多線程編程中,線程可以處于以下幾種不同的狀態:
新建(New)狀態
????????當創建線程對象但尚未啟動線程時,線程處于新建狀態。此時線程對象已經被創建,但尚未分配系統資源和執行代碼。可以通過實例化線程類或者從線程池中獲取線程來創建新線程。
import threading# 創建新線程對象
thread = threading.Thread(target=thread_function)
就緒(Runnable)狀態
????????當線程準備好執行,但由于系統調度的原因還未開始執行時,線程處于就緒狀態。線程已經分配了系統資源,并且等待調度器將其放入運行隊列中。多個就緒狀態的線程可能會競爭CPU資源,調度器會根據調度算法決定哪個線程被選中執行。
運行(Running)狀態
????????當線程獲得CPU資源并開始執行線程函數時,線程處于運行狀態。此時線程的代碼正在被執行,它可能會與其他線程并發執行或通過時間片輪轉進行切換。只有一個線程可以處于運行狀態。
阻塞(Blocked)狀態
????????當線程被暫停執行,等待某個條件的發生時,線程處于阻塞狀態。在阻塞狀態下,線程不會占用CPU資源,直到滿足特定條件后才能繼續執行。常見的阻塞原因包括等待I/O操作、獲取鎖失敗、等待其他線程的通知等。
終止(Terminated)狀態
????????當線程完成了它的執行任務或被顯式終止時,線程處于終止狀態。線程函數執行完畢或者出現異常時,線程將自動終止。也可以通過調用線程對象的?join()
?方法來等待線程執行完畢。
# 等待線程執行完畢
thread.join()
????????線程狀態之間可以相互轉換,線程的狀態轉換通常由操作系統的調度器和線程的執行情況決定。例如,當線程處于就緒狀態并獲得CPU資源時,它將進入運行狀態;當線程在執行期間發生阻塞,它將進入阻塞狀態;當線程執行完畢或被終止時,它將進入終止狀態。
8、線程屬性和方法
???threading.Thread
?類提供了一些屬性和方法來管理和操作線程。其中一些常用的屬性和方法包括:
name
:獲取或設置線程的名稱。ident
:獲取線程的標識符。is_alive()
:檢查線程是否處于活動狀態。setDaemon(daemonic)
:將線程設置為守護線程,當主線程退出時,守護線程也會被終止。start()
:啟動線程。join(timeout)
:等待線程執行完成,可選地設置超時時間。run()
:線程的執行入口點,在線程啟動時被調用。sleep(secs)
:線程休眠指定的秒數。
9、線程間通信
????????線程間通信是指在多線程編程中,多個線程之間進行數據傳遞和共享的過程。線程間通信的目的是實現線程之間的協作和數據交換,以完成復雜的任務。在Python中,可以使用?queue
?模塊提供的隊列來實現線程安全的數據傳遞。
queue
?模塊提供了幾種隊列類型,常用的有以下三種:
Queue(先進先出隊列)
???Queue
?是最常用的線程安全隊列,它使用先進先出(FIFO)的方式存儲和獲取數據。多個線程可以安全地將數據放入隊列中,并從隊列中獲取數據。Queue
?類提供了以下常用方法:
put(item[, block[, timeout]])
:將數據放入隊列,可指定是否阻塞和超時時間。get([block[, timeout]])
:從隊列中獲取數據,可指定是否阻塞和超時時間。empty()
:判斷隊列是否為空。full()
:判斷隊列是否已滿。qsize()
:返回隊列中的元素數量。
import queue# 創建隊列對象
q = queue.Queue()# 線程函數 A
def thread_function_a():while True:item = q.get()# 處理數據# 線程函數 B
def thread_function_b():while True:# 產生數據q.put(item)
????????在上述示例中,線程函數 A從隊列中獲取數據并進行處理,線程函數 B產生數據并放入隊列中,兩個線程通過隊列進行數據交換。
LifoQueue(后進先出隊列)
????????LifoQueue
?是一種后進先出(LIFO)的隊列類型,與?Queue
?不同的是,它的獲取順序與放入順序相反。其他方法與?Queue
?類相同。
import queue# 創建后進先出隊列對象
q = queue.LifoQueue()
后進先出隊列適用于某些特定的場景,例如需要按照相反的順序處理數據。
PriorityQueue(優先級隊列)
? ? PriorityQueue
?是一種根據優先級排序的隊列類型,可以為隊列中的每個元素指定一個優先級。優先級高的元素先被獲取。元素的優先級可以是數字、元組或自定義對象。其他方法與?Queue
?類相同。
import queue# 創建優先級隊列對象
q = queue.PriorityQueue()
優先級隊列適用于需要根據優先級順序處理數據的場景。
10、線程池
????????線程池是一種用于管理和復用線程的機制,可以有效地管理大量線程的生命周期,并提供簡化的接口來提交和管理任務。在Python中,可以使用?concurrent.futures
?模塊中的?ThreadPoolExecutor
?類來創建線程池。
線程池的特點
- 線程復用:線程池中的線程可以被重復使用,避免了線程頻繁創建和銷毀的開銷。
- 線程管理:線程池負責管理線程的生命周期,包括線程的創建、銷毀和回收。
- 并發控制:線程池可以限制并發執行的線程數量,防止系統資源被過度占用。
- 異步提交:線程池提供了異步提交任務的方法,可以在后臺執行任務并返回結果。
創建線程池
????????可以使用?ThreadPoolExecutor
?類來創建線程池。可以指定線程池的大小(可同時執行的線程數量)和其他相關參數。
from concurrent.futures import ThreadPoolExecutor# 創建線程池
pool = ThreadPoolExecutor(max_workers=5)
在上述示例中,創建了一個最大同時執行 5 個線程的線程池。
提交任務
????????可以使用線程池的?submit()
?方法提交任務,該方法會返回一個?Future
?對象,用于獲取任務的執行結果。
# 定義任務函數
def task_function():# 任務邏輯# 提交任務到線程池
future = pool.submit(task_function)
在上述示例中,將任務函數?task_function
?提交到線程池,并獲得了一個?Future
?對象。
獲取任務結果
????????可以使用?Future
?對象的?result()
?方法來獲取任務的執行結果。如果任務尚未完成,result()
?方法將會阻塞直到任務完成并返回結果。
# 獲取任務結果
result = future.result()
關閉線程池
????????在使用完線程池后,應該調用?shutdown()
?方法來關閉線程池。關閉線程池后,將不再接受新的任務提交,并且會等待所有已提交的任務執行完畢后再退出。
# 關閉線程池
pool.shutdown()