目錄
目標
Python版本
官方文檔
概述
線程池
實戰
創建線程池的基本語法
批量提交任務
生產者&消費者模型
目標
????????掌握線程池的基本概念和使用方法。
Python版本
? ? ? ? Python 3.9.18
官方文檔
concurrent.futures — Launching parallel taskshttps://docs.python.org/3.9/library/concurrent.futures.html
概述
線程池
????????線程的創建和銷毀是需要消耗資源的,比如:CPU資源、內存、上下文切換等。線程池使得線程被創建后不停地被復用,大概過程是:
- 程序啟動后線程池會預先創建一些線程,并將這些線程放入線程池中,這些線程是閑置著的,隨時準備執行任務。
- 當需要任務執行時,程序會向線程池提交任務,線程池會取出一個空閑的線程來執行任務。
- 任務執行完成后線程不會被銷毀,而是再次放回到線程池中,等待下一個任務。
我們創建線程池可以指定最大線程數量,也可以不指定。官方文檔對于默認線程數量做了描述:
if max_workers is None:?
# ThreadPoolExecutor is often used to:?
# * CPU bound task which releases GIL?
# * I/O bound task (which releases GIL, of course)?
#?
# We use cpu_count + 4 for both types of tasks.?
# But we limit it to 32 to avoid consuming surprisingly large resource?
# on many core machine. max_workers = min(32, (os.cpu_count() or 1) + 4)
即默認設置的最大線程數量是:
- 操作系統核心數量+4,
- 如果獲取不到操作系統核心數量則最大線程數量是1個。
- 默認不超過32個線程數量。
實戰
創建線程池的基本語法
import random
import time
from concurrent.futures import ThreadPoolExecutordef fun(url):time.sleep(random.randint(1,3))# 獲取當前時間戳current_time = time.localtime()# 格式化時間為yyyy-MM-dd HH:mm:ssformatted_time = time.strftime('%Y-%m-%d %H:%M:%S', current_time)return f"請求到了{url}的網絡數據:{formatted_time}"if __name__ == '__main__':executor = ThreadPoolExecutor(max_workers=4)task_1=executor.submit(fun, "www.baidu.com")task_2=executor.submit(fun, "www.bilibili.com")task_3=executor.submit(fun, "www.jd.com")task_4=executor.submit(fun, "www.taobao.com")task_5 = executor.submit(fun, "www.tianmao.com")task_1.done()task_2.done()task_3.done()task_4.done()task_5.done()result_1=task_1.result()result_2=task_2.result()result_3=task_3.result()result_4=task_4.result()result_5=task_5.result()print(result_1)print(result_2)print(result_3)print(result_4)print(result_5)print("主線程結束")
批量提交任務
需求
? ? ? ? 創建一個線程池,用來批量執行不同頁面的請求任務。
import random
import time
from concurrent.futures import ThreadPoolExecutor, as_completeddef get_page_html(page_num):time.sleep(random.randint(1,5))# 獲取當前時間戳current_time = time.localtime()# 格式化時間為yyyy-MM-dd HH:mm:ssformatted_time = time.strftime('%Y-%m-%d %H:%M:%S', current_time)return f"請求到第{page_num}頁數據:{formatted_time}"if __name__ == '__main__':executor=ThreadPoolExecutor()#請求第一頁至第一百頁的數據。page_num_list=[page_num for page_num in range(1,101)]task_list = [executor.submit(get_page_html,page_num) for page_num in page_num_list]#按照task_list元素的順序返回結果,即使后面的頁面數據提前請求到了數據,也會靠后返回。for task in task_list:print("按照task_list中的元素順序返回數據:",task.result())#按照先執行完,先返回結果。for future in as_completed(task_list):print("按照先請求到數據就先返回數據:",future.result())print("主線程結束")
生產者&消費者模型
需求
????????每次生產出一只烤鴨,就會被等待的消費者消費,最多生產100000只烤鴨。要求用線程池實現。
import queue
import threading
import time
from concurrent.futures import ThreadPoolExecutor# 最多生產烤鴨數量
max_duck_count = 100000
# 當前生產的烤鴨數量
duck_count = 0
# 把生產的烤鴨放入隊列中
duck_queue = queue.Queue()def produce_duck(duck_queue):global duck_count# 生產者不停地生產烤鴨while True:if duck_count >= max_duck_count:breakduck_count += 1duck_queue.put(duck_count)print(f"===========生產者{threading.current_thread().name}生產了{duck_count}只烤鴨。")# 所有生產者完成后放入退出信號duck_queue.put(None)def consume_duck(duck_queue):# 消費者不停地消費烤鴨while True:duck = duck_queue.get()if duck is None:# 收到退出信號后,消費者退出duck_queue.put(None) # 傳遞退出信號給其他消費者breakprint(f"——————————消費者{threading.current_thread().name}消費了{duck}只烤鴨。")# 初始化線程,設置名稱
def init_thread():thread_id = threading.get_ident()threading.current_thread().name = f"線程_{thread_id}"if __name__ == '__main__':thread_pool = ThreadPoolExecutor(max_workers=10,initializer=init_thread)# 啟動多個生產者線程for i in range(5):thread_pool.submit(produce_duck, duck_queue)# 啟動多個消費者線程for i in range(8):thread_pool.submit(consume_duck, duck_queue)# 等待線程池中的任務執行完畢thread_pool.shutdown(wait=True)print("主線程結束。")