一、項目工程守則
1.pdm
新建一個項目
命令行終端:pip install pdm
pdm init
版本號:x.y.z
- x:兼容版本
- y:新增功能
- z:補丁版本
pdm add pytest requests (添加依賴)
pdm是協助管理我們的項目?
?
2. black
就是規范我們的代碼風格的:
pdm add black
black
black test_api (./ 一次處理該目錄下所有.py文件)
?
?
3.flake8
檢查語法錯誤
pdm add flake8
新建配置文件 .flake8
里面寫
這樣就不會查所有的文件了
還要加上 extend-ignore = E501
?
4.isort
排序import
?
5.pytest
管理和執行測試用例
6.自定義命令
?將四個命令合并在一起
?
?7.項目管理
?src源代碼 tests單元測試
二、錯誤和異常
python只有一種錯誤 語法錯誤 如果有語法錯誤的話整個文件都跑不起來 跑到半路停下來的都是異常
異常:執行前沒有檢查出錯誤,半路上遇到預期之外的情況
異常的引發方式:
python自動引發:
1 assert 1 == 2
2. 屬性異常 print("beifan".beifan)
3. 導入異常 from sys import beifan
4. 索引異常 l = [ ]? ? ?print(l[100])
5. 鍵值對異常 ?d = {}? ?print(d["id"])
手動引發異常:
def add(a, b):if not isinstance(a, int):raise ValueError(f'{a=}, 不是整數,無法計算')if not isinstance(b, int):raise ValueError(f'{b=}, 不是整數,無法計算')return a + b
異常引發之后發生了什么
異常引發之后,Python 會進行下面的操作:
- 停止執行
- 收集信息
- 向上傳播
- 不僅 add 函數停止執行
- main 函數也停止執行
- python 也停止執行
如何想要改變這個默認的處理流程,需對異常進行捕捉(打斷傳播鏈條)
捕捉異常可以:
- 打斷異常的傳播
- 自動執行預定義的代碼
注意:
- 盡量只捕捉能處理的異常
- 不能處理的異常,應該繼續傳播
- 如果沒有代碼可以處理,就應該讓 Python 停止運行
?
?
Pycharm 提供的 Debug (更方便)
console:執行任意的 Python 代碼
debugger:控制代碼執行進度、顯示變量內容
步過:執行到下一行
步入:執行到下一行代碼的內部
步出:執行完函數
恢復運行:運行到下一個斷點處
Python 提供的 Debug(更核心) pdb:python debug
python -m pdb main.py
命令:
- l: 查看代碼
- n: 步過
- s: 步入
- c: 恢復執行
- b: 打斷點
- ! : 執行任意代碼
三、python多線程?
(1)Python 多線程的兩種實現方式
方式 1:繼承?Thread
?類
步驟:
- 繼承父類:定義一個類(如?
T1
?),繼承?threading.Thread
?; - 重寫?
run
?方法:在類里實現?run
?方法,寫線程要執行的任務(如調用?task_1()
?); - 調用?
start
?方法:創建線程對象后,用?start()
?啟動線程,自動執行?run
?里的任務。
from threading import Thread # 需導入 Thread 類# 1. 繼承 Thread 類
class T1(Thread): # 2. 重寫 run 方法:定義線程任務def run(self): print("線程任務開始")task_1() # 假設 task_1 是要執行的函數print("線程任務結束")# 3. 實例化線程對象 + 啟動線程
t_1 = T1()
t_1.start() # 啟動線程,自動執行 run 方法# 可選:等待線程結束(避免主線程提前退出)
t_1.join()
方式 2:實例化?Thread
?對象
步驟:
直接創建?Thread
?對象,通過?target
?參數指定線程要執行的任務(函數),再用?start()
?啟動。
from threading import Thread# 定義線程要執行的任務
def task_1(): print("線程任務開始")# 業務邏輯...print("線程任務結束")# 1. 實例化 Thread:用 target 指定任務函數
t_1 = Thread(target=task_1) # 2. 調用 start 方法:啟動線程,執行 target 對應的任務
t_1.start() # 可選:等待線程結束
t_1.join()
?
(2)線程的執行流程
不管用哪種方式創建線程,執行流程?都是固定的:
- 實例化:創建?
Thread
?對象(如?t_1 = T1()
?或?t_1 = Thread(target=task_1)
?); - 調用?
start
?方法:觸發線程創建,系統會分配新的線程資源; - 調用?
run
?方法:新線程啟動后,自動執行?run
?里的邏輯(方式 1 是重寫的?run
,方式 2 是?Thread
?內置?run
?會調用?target
?函數 ); - 執行任務:最終執行?
run
?里的代碼(或?target
?函數 )。
?
(3)關鍵方法解析
1.?start()
:啟動線程
- 作用:告訴系統 “創建新線程,執行任務”;
- 注意:不能重復調用,一個線程對象只能?
start()
?一次。
2.?run()
:線程的核心邏輯
- 作用:定義線程要做的事;
- 方式 1 中,需重寫?
run
?來定制任務; - 方式 2 中,
Thread
?內置的?run
?會自動調用?target
?指定的函數,無需手動重寫。
3.?join()
:等待線程結束
- 作用:主線程執行到?
join()
?時,會暫停自己,等子線程執行完再繼續; - 場景:如果主線程需要用到子線程的結果,或避免子線程還沒跑完,主線程就退出,就需要?
join()
。
?
(4)兩種方式的對比
方式 | 優點 | 缺點 | 適用場景 |
---|---|---|---|
繼承?Thread ?類 | 可以重寫多個方法,定制性強 | 類繼承會占用單繼承名額 | 需要復雜線程邏輯(如自定義?run ?) |
實例化?Thread | 簡單直觀,無需創建類 | 僅能通過?target ?傳任務 | 簡單任務,快速創建線程 |
?
(5)實際應用建議
- 簡單任務選方式 2:直接傳?
target
,代碼更簡潔; - 復雜邏輯選方式 1:比如需要在?
run
?里加前置 / 后置操作(如初始化資源、清理數據 ); - 必用?
join()
:如果主線程依賴子線程結果(如統計多線程下載的文件總數 ),一定要用?join()
?等待,否則可能出現 “子線程還沒跑完,主線程已經退出” 的問題。
四、線程池??
(1)為什么需要 “線程池”?
直接創建線程(如繼承?Thread
?類、實例化?Thread
?對象 )有個問題:線程用完就銷毀,頻繁創建 / 銷毀會浪費資源(比如測試 100 個任務,就要創建 100 個線程 )。
→?線程池的作用:
- 預先創建一批線程(比如 3 個),放進 “池子” 里復用;
- 有任務時,從池子里拿線程執行;任務結束后,線程不銷毀,放回池子等下次復用;
- 減少線程創建 / 銷毀的開銷,提升效率。
?
(2)線程池的核心邏輯
線程池的工作流程可以概括為 3 步:
- 創建線程池:根據需求設置最大線程數(
max_workers
),比如同時最多跑 3 個線程; - 分配任務:有任務時,線程池自動選空閑線程執行任務;
- 復用線程:任務結束后,線程回到池子,等待下一個任務,不用銷毀。
?
(3)線程池的使用步驟
步驟 1:導入模塊
使用線程池需要導入?concurrent.futures
?里的?ThreadPoolExecutor
:
from concurrent.futures import ThreadPoolExecutor
import time # 用于統計耗時
步驟 2:定義任務(假設已有任務函數)
假設我們有 3 個任務函數,需要并行執行:
def task_1():time.sleep(1) # 模擬任務耗時return "任務1結果"def task_2():time.sleep(1)return "任務2結果"def task_3():time.sleep(1)return "任務3結果"
步驟 3:創建線程池 + 提交任務(兩種方式)
方式 1:用?pool.map
?批量提交任務
pool.map
?會自動遍歷任務列表,把每個任務分配給線程池執行,返回結果列表。
# 記錄開始時間
start_time = time.time() # 1. 創建線程池:最多同時跑 3 個線程
pool = ThreadPoolExecutor(max_workers=3) # 2. 提交任務:用 map + lambda 執行任務列表
# lambda x: x() 表示“調用列表里的每個函數(task_1、task_2、task_3)”
res_list = pool.map(lambda x: x(), [task_1, task_2, task_3]) # 3. 關閉線程池:等待所有任務完成(也可以用 with 語法自動關閉)
pool.shutdown() # 記錄結束時間
end_time = time.time() # 輸出耗時
print(f'一共耗時:{end_time - start_time:.2f} 秒') # 遍歷結果
for res in res_list:print(res)
執行流程:
- 線程池創建 3 個線程;
pool.map
?自動把?task_1
、task_2
、task_3
?分配給線程執行;- 任務并行運行(因為?
max_workers=3
,3 個任務同時跑 ); - 所有任務結束后,
pool.shutdown()
?關閉線程池; - 遍歷?
res_list
?拿到每個任務的返回值。
?
方式 2:用?pool.submit
?單個提交任務
Future 對象通常通過線程池 / 進程池的?submit()
?方法創建
pool.submit
?可以單個提交任務,返回?Future
?對象,用于獲取結果。
start_time = time.time() pool = ThreadPoolExecutor(max_workers=3) # 提交任務,返回 Future 對象
f_1 = pool.submit(task_1)
f_2 = pool.submit(task_2)
f_3 = pool.submit(task_3) pool.shutdown() # 等待任務完成end_time = time.time()
print(f'一共耗時:{end_time - start_time:.2f} 秒') # 通過 Future 對象的 result() 方法拿結果
print(f_1.result())
print(f_2.result())
print(f_3.result())
執行流程:
- 逐個提交任務,線程池自動分配線程執行;
pool.shutdown()
?等待所有任務完成;- 用?
future.result()
?獲取每個任務的返回值。
(4)兩種提交任務方式的對比
方式 | 優點 | 缺點 | 適用場景 |
---|---|---|---|
pool.map | 自動遍歷任務列表,代碼簡潔 | 必須等所有任務完成才返回結果 | 任務數量明確,需要批量執行 |
pool.submit | 靈活控制任務提交,可單獨拿結果 | 代碼稍繁瑣 | 任務數量不確定,或需要實時拿結果 |
(5)線程池的關鍵細節
1.?max_workers
?的設置
max_workers
?是線程池里最多同時運行的線程數;- 設置太小(比如 1 ),任務會串行執行,失去并行優勢;
- 設置太大(比如 100 ),會占用過多資源(CPU、內存 ),甚至拖慢程序;
- 建議:根據任務類型(CPU 密集型 / IO 密集型 )調整,IO 密集型(如網絡請求 )可以設大些(比如 10-20 ),CPU 密集型(如大量計算 )設小些(比如 4-8,和 CPU 核心數匹配 )。
2.?pool.shutdown()
?的作用
- 關閉線程池,不再接受新任務;
- 但會等待已提交的任務全部完成后,才真正關閉;
- 也可以用?
with
?語法自動關閉(推薦 ):with ThreadPoolExecutor(max_workers=3) as pool:res_list = pool.map(lambda x: x(), [task_1, task_2, task_3]) # 離開 with 塊后,自動執行 pool.shutdown()
3. 異常處理
如果任務執行中拋出異常,pool.map
?會在遍歷?res_list
?時拋出異常;pool.submit
?則會在調用?future.result()
?時拋出異常。需要用?try-except
?捕獲:
with ThreadPoolExecutor(max_workers=3) as pool:try:res_list = pool.map(lambda x: x(), [task_1, task_2, task_3])for res in res_list:print(res)except Exception as e:print(f"任務執行異常:{e}")
五、線程并發
?
一、變量通信 🌟
原理
多個線程可以共享同一個 “全局變量”,像小盒子一樣📦 線程 A 往里塞數據,線程 B 能掏出數據~
但要注意!Python 里有個?GIL(全局解釋器鎖)?🚦 ,如果多個線程同時改變量,容易 “打架”(數據亂掉)!
代碼示例(反面教材👉 線程打架現場)
import threading
import time# 共享小盒子📦
counter = 0 def increment():global counterfor _ in range(100000):# 非原子操作?? 讀→改→寫,線程會打架!counter += 1 threads = []
for _ in range(5):t = threading.Thread(target=increment)threads.append(t)t.start()for t in threads:t.join()# 理想是 500000,實際可能亂套!因為線程打架啦🥊
print("最終 counter 值:", counter)
改進版(加鎖保護🔒 讓線程乖乖排隊)
import threading
import timecounter = 0
# 可愛小鎖🔒 同一時間只讓一個線程動小盒子!
lock = threading.Lock() def increment():global counterfor _ in range(100000):# 拿鎖!其他線程乖乖等~with lock: counter += 1 # 現在安全啦?threads = []
for _ in range(5):t = threading.Thread(target=increment)threads.append(t)t.start()for t in threads:t.join()# 這次一定是 500000!完美~
print("最終 counter 值:", counter)
二、隊列通信 📮
原理
隊列是?線程安全的 “傳送帶”?🛳? 一個線程當 “生產者”(往傳送帶上放東西),另一個當 “消費者”(從傳送帶上拿東西)~ 不用自己加鎖,超省心!
代碼示例(生產者 - 消費者 🍞→🥪 )
import threading
import queue
import time# 可愛傳送帶📦
q = queue.Queue() # 生產者:往傳送帶放面包🍞
def producer():for i in range(5):item = f"面包{i}"q.put(item) # 放傳送帶上~print(f"生產者放: {item} 👉 傳送帶")time.sleep(0.5)# 消費者:從傳送帶拿面包做三明治🥪
def consumer():while True:item = q.get() # 拿東西!沒東西就等~print(f"消費者拿: {item} 👉 做三明治")q.task_done() # 告訴傳送帶:我處理完啦?if q.empty(): # 傳送帶空了,下班!break# 啟動線程~
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)producer_thread.start()
consumer_thread.start()producer_thread.join()
q.join() # 等傳送帶所有任務完成~
效果:
生產者每隔 0.5 秒放面包,消費者馬上拿走做三明治🥪 完美配合,不會亂~
三、鎖通信 🔒
原理
鎖是?“魔法小令牌”?🪄 線程拿到令牌才能動共享資源!其他線程只能乖乖等令牌~ 這樣就不會打架啦!
代碼示例(保護小盒子📦 )
import threading
import timecounter = 0
# 魔法令牌🔒
lock = threading.Lock() def increment():global counterfor _ in range(100000):# 拿令牌!其他線程等~lock.acquire() try:counter += 1 # 安全操作?finally:lock.release() # 還令牌!其他線程可以拿啦~threads = []
for _ in range(5):t = threading.Thread(target=increment)threads.append(t)t.start()for t in threads:t.join()# 結果一定正確~因為令牌守護著!
print("最終 counter 值:", counter)
偷懶寫法(with
?自動管令牌🔄 )
import threading
import timecounter = 0
lock = threading.Lock() def increment():global counterfor _ in range(100000):# 進入 with 自動拿令牌,退出自動還~超方便!with lock: counter += 1 threads = []
for _ in range(5):t = threading.Thread(target=increment)threads.append(t)t.start()for t in threads:t.join()print("最終 counter 值:", counter)
四、總結(超可愛版對照表🐾 )
方式 | 可愛比喻 | 優點 | 缺點 |
---|---|---|---|
變量 | 共享小盒子📦 | 簡單直接 | 容易打架(需加鎖) |
隊列 | 傳送帶📮 | 線程安全、解耦省心 | 要維護隊列結構 |
鎖 | 魔法令牌🔒 | 精準保護共享資源 | 用不好會 “死鎖”(慎用) |