一、線程事件對象(threading.Event
)
threading.Event
?用于實現線程間的通信,可讓一個線程通知其他線程終止任務,核心是通過 “事件觸發” 機制協調線程行為。
核心方法:
- 創建事件對象:
event = threading.Event()
- 觸發事件:
event.set()
(將事件狀態設為 “已觸發”) - 判斷事件狀態:
event.is_set()
(返回?True
?表示事件已觸發)
案例:動物賽跑(烏龜 vs 兔子)
通過?Event
?實現 “一方到達終點則所有線程停止” 的邏輯:
import threading
from threading import Thread, Event, current_thread
from abc import ABC, abstractmethod
import timeclass Animal(Thread, ABC):max_distance = 2000 # 比賽總距離def __init__(self, speed, name, time1=0, state=False, stop_event: Event = None):super().__init__(name=name)self.speed = speed # 速度self.time1 = time1 # 已跑時間self.state = state # 是否到達終點self.stop_event = stop_event or Event() # 共享的停止事件@abstractmethoddef distance(self):# 抽象方法:計算當前跑過的距離passdef run(self):# 循環條件:未收到停止信號while not self.stop_event.is_set():self.time1 += 0.1 # 每次循環增加0.1秒length = round(self.distance()) # 計算當前距離print(f"{self.name}在{self.time1:.1f}秒跑了{length}mm")# 檢查是否到達終點if length >= self.max_distance:self.state = Trueprint(f"\n{self.name}到達終點!")self.stop_event.set() # 通知其他線程停止breaktime.sleep(0.1) # 模擬時間流逝class Turtle(Animal):def distance(self):# 烏龜勻速前進return self.time1 * self.speedclass Rabbit(Animal):def distance(self):# 兔子邏輯:前5秒快跑,休息180秒,之后繼續跑if self.time1 < 5:return self.time1 * self.speed # 前5秒正常跑elif self.time1 < 5 + 180:return 5 * self.speed # 5-185秒休息(保持5秒時的距離)else:rest_end_time = 5 + 180 # 休息結束時間extra_time = self.time1 - rest_end_time # 休息后額外跑的時間return 5 * self.speed + extra_time * self.speed # 總距離if __name__ == '__main__':race_stop_event = Event() # 共享的停止事件rabbit = Rabbit(speed=90, name="兔子", stop_event=race_stop_event)turtle = Turtle(speed=10, name="烏龜", stop_event=race_stop_event)print("比賽開始!")rabbit.start()turtle.start()rabbit.join()turtle.join()# 判斷結果if rabbit.state:print("兔子贏了!")elif turtle.state:print("烏龜贏了!")
二、線程安全與隊列(queue.Queue
)
多線程共享數據時,列表等結構線程不安全(可能引發并發問題),而?queue.Queue
?是線程安全的,常用于多線程通信,遵循 FIFO(先進先出)原則。
常見隊列類型:
Queue
:FIFO 隊列PriorityQueue
:基于優先級的隊列LifoQueue
:LIFO(先進后出)隊列
隊列核心方法:
方法 | 說明 |
---|---|
empty() | 判斷隊列是否為空 |
full() | 判斷隊列是否已滿(僅對有長度限制的隊列有效) |
qsize() | 獲取隊列中數據個數 |
put(item, timeout) | 存入數據,隊列滿時阻塞,可設置超時時間 |
put_nowait(item) | 存入數據,隊列滿時不阻塞(直接報錯) |
get(timeout) | 獲取數據,隊列空時阻塞,可設置超時時間 |
get_nowait() | 獲取數據,隊列空時不阻塞(直接報錯) |
三、條件對象(threading.Condition
)
Condition
?用于復雜的線程間同步,通過 “等待 - 喚醒” 機制協調線程行為,自帶鎖(默認?RLock
),需在鎖塊中使用?wait()
?和?notify()
。
核心方法:
wait(timeout=None)
:阻塞當前線程,直到超時或被其他線程喚醒notify(n=1)
:喚醒 1 個等待的線程(默認)notify_all()
:喚醒所有等待的線程
案例:生產者 - 消費者(摘蘋果與吃蘋果)
生產者摘蘋果存入隊列,隊列滿時通知消費者;消費者吃蘋果,隊列空時通知生產者:
from threading import Thread, Condition, current_thread
from queue import Queue
import timeclass Apple:__number_code__ = 0 # 蘋果編號計數器def __init__(self):self.__class__.__number_code__ += 1self.number = self.__class__.__number_code__def __repr__(self):return f"<Apple : {self.number}>"def product_apple(queue: Queue, condition: Condition):while True:with condition: # 鎖塊:確保操作原子性while queue.full():condition.wait() # 隊列滿時等待apple = Apple()print(f"{current_thread().name}正在摘取{apple}")queue.put(apple)time.sleep(0.2) # 模擬摘蘋果耗時if queue.full():condition.notify_all() # 隊列滿時喚醒消費者def eat_apple(queue: Queue, condition: Condition):while True:with condition:while queue.empty():condition.wait() # 隊列空時等待apple = queue.get()print(f"{current_thread().name}正在吃{apple}~~")time.sleep(0.5) # 模擬吃蘋果耗時if queue.empty():condition.notify_all() # 隊列空時喚醒生產者if __name__ == '__main__':apple_queue = Queue(maxsize=50) # 隊列最大容量50condition = Condition()# 啟動生產者和消費者productor = Thread(target=product_apple, args=(apple_queue, condition), name='生產者01')productor.start()for x in range(2):consumer = Thread(target=eat_apple, args=(apple_queue, condition), name=f'消費者{x}')consumer.start()
四、線程狀態
線程生命周期包含以下狀態及轉換:
- 新建狀態:創建線程對象后,調用?
start()
?之前 - 就緒狀態:調用?
start()
?后,等待 CPU 調度 - 運行狀態:獲取 CPU 時間片,執行?
run()
?方法 - 阻塞狀態:因?
sleep()
、join()
、input()
?等操作暫停,阻塞結束后回到就緒狀態 - 死亡狀態:
run()
?執行完畢或異常終止
五、線程池(concurrent.futures.ThreadPoolExecutor
)
線程池可復用線程,減少線程創建 / 銷毀的開銷,適用于多任務場景。
核心用法:
- 創建線程池:
executor = ThreadPoolExecutor(max_workers)
(max_workers
?為線程數) - 提交任務:
executor.submit(func, *args)
(返回?Future
?對象,用于獲取結果) - 獲取結果:
future.result()
(阻塞等待任務完成并返回結果)
案例 1:線程池售票
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
from queue import Queue
import timedef execute_task(queue: Queue):while True:time.sleep(0.5)try:ticket = queue.get(timeout=1) # 超時1秒未獲取到票則退出print(f"{current_thread().name}正在售票、票號是{ticket}、剩余{queue.qsize()}")except:print(f"{current_thread().name}:票已售罄")breakif __name__ == '__main__':executor = ThreadPoolExecutor(5) # 5個線程的線程池queue = Queue(maxsize=100)for x in range(1, 101):queue.put(f"NO.{x:>08}") # 初始化100張票for _ in range(5):executor.submit(execute_task, queue)
案例 2:多線程爬蟲(抓取小說章節)
主線程獲取小說章節列表,線程池并發抓取章節內容,最終匯總保存:
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
from queue import Queue
import requests
import re
import time
from traceback import print_excdef parse_url(url, count=0):"""解析網址,最多重試3次"""headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0","cookie": "ckAC = 1;Hm_lvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;Hm_lpvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;HMACCOUNT = FDAFB0724C56B8F8","referer": "https://www.bqgui.cc/"}try:if count < 3:response = requests.get(url, headers=headers)assert response.status_code == 200, f'網頁抓取失敗,狀態碼{response.status_code}'return response.textreturn Noneexcept:print_exc()return parse_url(url, count + 1) # 重試def parse_caption(queue: Queue):"""抓取單章節內容"""try:time.sleep(0.5) # 降低頻率,避免被封index, href, title = queue.get(timeout=1)print(f"{current_thread().name}正在抓取{title}~~~")text = parse_url(f"https://www.c3719.lol{href}")print(f"{current_thread().name}抓取{title}成功!!!")if text is None:return index, title, ""# 提取章節正文regex = r'<div\s+id="chaptercontent".*?>(.*?)請收藏本站'match = re.search(regex, text)if not match:return index, title, ""caption = match.group(1)# 清洗內容(去除標簽和多余空格)caption = re.sub(r"<br\s*/?>|\s+", "", caption)return index, title, captionexcept:print(f"{current_thread().name}所有章節抓取完成")if __name__ == '__main__':# 主線程獲取章節列表url = "https://www.c3719.lol/book/61808/"noval_name = '斗羅大陸V重生唐三'text = parse_url(url)# 提取所有章節鏈接和標題regex = r'<dd><a\s+href\s*="(.*?)">(.*?)</a></dd>'all_captions = re.findall(regex, text)# 章節信息存入隊列queue = Queue()for index, caption in enumerate(all_captions):queue.put((index, *caption))# 線程池并發抓取executor = ThreadPoolExecutor(50)futures = [executor.submit(parse_caption, queue) for _ in all_captions]# 獲取結果并排序result = [f.result() for f in futures]result.sort(key=lambda x: x[0]) # 按章節順序排序# 保存到文件with open(f'{noval_name}.txt', 'w', encoding='utf-8') as f:for index, title, text in result:f.write(title + "\n")f.write(text + "\n")f.flush()print(f"{noval_name}抓取完成")
作業一:線程通信(生產者 - 消費者模型)
需求:
a) 定義 1 個生產者線程,負責摘蘋果并存儲到隊列中;當隊列已滿時,通知消費者吃蘋果并停止摘蘋果。
b) 定義多個消費者線程,負責吃蘋果;當隊列為空時,通知生產者繼續摘蘋果。
實現代碼
from queue import Queue
from threading import Thread, Condition, current_thread
import time# 蘋果類:記錄蘋果編號
class Apple:__number_code__ = 0 # 類變量,用于計數蘋果編號def __init__(self):self.__class__.__number_code__ += 1 # 每次創建蘋果時編號自增self.number = self.__class__.__number_code__def __repr__(self):return f"<Apple> : {self.number}" # 自定義蘋果的打印格式# 生產者函數:摘蘋果并放入隊列
def produce_apple(queue: Queue, condition: Condition):while True:with condition: # 借助Condition的鎖確保操作原子性# 隊列滿時等待(釋放鎖,等待消費者喚醒)while queue.full():condition.wait()# 摘蘋果并放入隊列apple = Apple()queue.put(apple)print(f"{current_thread().name}摘下: <{apple}>")time.sleep(0.2) # 模擬摘蘋果耗時# 隊列滿時通知所有消費者if queue.full():condition.notify_all()# 消費者函數:從隊列取蘋果并"吃"
def consume_apple(queue: Queue, condition: Condition):while True:with condition: # 借助Condition的鎖確保操作原子性# 隊列空時等待(釋放鎖,等待生產者喚醒)while queue.empty():condition.wait()# 從隊列取蘋果并"吃"apple = queue.get()print(f"{current_thread().name}正在吃: <{apple}>")time.sleep(0.2) # 模擬吃蘋果耗時# 隊列空時通知生產者if queue.empty():condition.notify() # 喚醒1個生產者(此處只有1個生產者,notify_all()也可)# 主程序:啟動生產者和消費者
if __name__ == '__main__':# 創建隊列(最大容量20)和條件對象apple_queue = Queue(maxsize=20)condition = Condition()# 啟動1個生產者線程producer = Thread(target=produce_apple, args=(apple_queue, condition), name="生產者")producer.start()# 啟動3個消費者線程for i in range(3):consumer = Thread(target=consume_apple, args=(apple_queue, condition), name=f"消費者<{i}>")consumer.start()
核心邏輯解析
- Apple 類:通過類變量
__number_code__
記錄蘋果編號,每次創建蘋果時自動生成唯一編號,便于跟蹤。 - Condition 協調:
- 生產者通過
condition.wait()
在隊列滿時阻塞,等待消費者喚醒;隊列滿時通過notify_all()
通知所有消費者。 - 消費者通過
condition.wait()
在隊列空時阻塞,等待生產者喚醒;隊列空時通過notify()
通知生產者。
- 生產者通過
- 線程安全:
with condition
確保生產 / 消費操作在鎖保護下執行,避免并發沖突。
作業二:多線程小說抓取與下載
需求:使用多線程(線程池)抓取筆趣閣類網站(如https://www.biqu03.cc/
)某小說的所有章節內容,并保存為本地文件。
實現代碼
import re
import requests
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
from queue import Queue# 解析小說章節列表(獲取所有章節的鏈接和標題)
def parse_url(url):headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0","cookie": "ckAC = 1;Hm_lvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;Hm_lpvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;HMACCOUNT = FDAFB0724C56B8F8","referer": "https://www.bqgui.cc/"}# 發送請求獲取小說目錄頁內容response = requests.get(url, headers=headers)assert response.status_code == 200, "目錄頁請求失敗" # 確保請求成功text = response.text# 正則提取章節鏈接和標題(<dd><a href="鏈接">標題</a></dd>)regex = r'<dd><a\s+href\s+="(.*)">(.*)</a></dd>'all_title = re.findall(regex, text) # 返回列表:[(鏈接1, 標題1), (鏈接2, 標題2), ...]return all_title# 解析單章節內容(從隊列獲取章節信息,抓取正文并返回)
def parse_caption(queue: Queue):try:time.sleep(0.5) # 降低請求頻率,避免被網站反爬headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0","cookie": "ckAC = 1;Hm_lvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;Hm_lpvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;HMACCOUNT = FDAFB0724C56B8F8","referer": "https://www.bqgui.cc/"}# 從隊列獲取章節信息(索引、鏈接、標題)index, href, title = queue.get(timeout=1)print(f"{current_thread().name}正在抓取{title}~~~")# 構建章節詳情頁URL并請求new_url = f"https://www.c3719.lol{href}" # 注意替換為實際網站域名response = requests.get(new_url, headers=headers)assert response.status_code == 200, f"{title}詳情頁請求失敗"text = response.textprint(f"{current_thread().name}抓取{title}成功!!!")# 正則提取章節正文(匹配id為chaptercontent的div內容)regex = r'<div\s+id="chaptercontent".*?>(.*?)請收藏本站'match = re.search(regex, text, re.DOTALL) # re.DOTALL讓.匹配換行符if not match:return index, title, "" # 無正文時返回空# 清洗正文(去除<br>標簽和多余空格)caption = match.group(1)caption = re.sub(r"<br\s*/?>|\s+", "", caption)return index, title, captionexcept:print(f"{current_thread().name}所有章節抓取完成")# 主程序:協調抓取流程并保存結果
if __name__ == '__main__':# 小說目錄頁URL(替換為目標小說目錄頁)url = "https://www.c3719.lol/book/61808/"all_tit = parse_url(url) # 獲取所有章節信息# 章節信息存入隊列(用于線程池消費)queue = Queue()for index, x in enumerate(all_tit):queue.put((index, *x))# 小說名稱(用于保存文件)noval_name = '斗羅大陸V重生唐三'# 線程池并發抓取所有章節executor = ThreadPoolExecutor(50) # 50個線程并發futures = [executor.submit(parse_caption, queue) for _ in all_tit]# 收集并排序結果(按章節索引排序)result = [f.result() for f in futures if f.result() is not None]result.sort(key=lambda d: d[0]) # 按索引排序,確保章節順序正確# 保存到本地文件with open(f'{noval_name}.txt', 'w', encoding='utf-8') as f:for index, title, text in result:f.write(title + "\n") # 寫入標題f.write(text + "\n") # 寫入正文f.flush() # 實時刷新緩沖區print(f"{noval_name}抓取完成,已保存為本地文件!")
核心邏輯解析
- 章節列表解析:通過
parse_url
函數請求小說目錄頁,用正則提取所有章節的鏈接和標題,為后續抓取做準備。 - 多線程并發抓取:使用
ThreadPoolExecutor
創建線程池,多個線程從隊列獲取章節信息,并發請求詳情頁,提高效率。 - 正文提取與清洗:通過正則匹配章節正文所在的 HTML 標簽,去除冗余標簽(如
<br>
)和空格,得到干凈的文本。 - 結果排序與保存:通過章節索引排序結果,確保章節順序正確,最終寫入本地 TXT 文件。