?進程與線程的基本概念
特性 | 進程 (Process) | 線程 (Thread) |
---|---|---|
定義 | 操作系統分配資源的基本單位(獨立的內存空間) 多進程可真正并行(利用多核 CPU) | 進程內的執行單元(共享進程資源) |
獨立性 | 完全獨立,崩潰后不影響其他進程 | 共享進程資源,一個線程崩潰可能導致整個進程崩潰 |
資源開銷 | 高(需要分配獨立的內存、文件句柄等) 不可直接共享,需通過 IPC(如管道、共享內存) | 低(共享進程資源,僅需少量棧和寄存器) 可直接訪問全局變量(需同步機制避免競爭) |
數據共享 | 必須通過 IPC(進程間通信):<br>?- 管道(Pipe)<br>?- 共享內存(Shared Memory)<br>?- 消息隊列(Queue) | 直接共享進程內存(需鎖機制) |
同步機制 | 通常不需要 | 需要鎖(Lock)、信號量(Semaphore)等。 |
場景 | 進程 | 線程 |
---|---|---|
CPU 密集型任務 (如數值計算、機器學習) | ? 多進程(繞過 GIL,利用多核)。 | ? 多線程(受 GIL 限制,無法并行計算)。 |
I/O 密集型任務 (如網絡請求、文件讀寫) | ?? 可用,但資源開銷高。 | ? 多線程(輕量級,等待 I/O 時可切換任務)。 |
需要高隔離性 | ? 進程崩潰不影響其他進程。 | ? 線程崩潰可能拖垮整個進程。 |
Joblib庫
基礎Parallel并行處理
n_jobs:設置并行運行的作業數。
verbose:設置后端的詳細程度。
prefer:設置要使用的首選后端。可以是“processes”(進程)或“threads”(線程)。
require:設置對后端的請求。可以是“sharedmem”(共享內存)。
inner_max_num_threads:限制第三方庫使用的最大線程數,這些庫管理它們自己的 C 級線程池。此參數僅受 backend 支持,這些 backend 將 supports_inner_max_num_threads 類屬性設置為 True,例如“loky”后端。
temp_folder、max_nbytes、mmap_mode:控制后端的自動內存映射行為。有關更多詳細信息,請參閱在共享內存(內存映射)中使用數值數據
def process_row(row,idx,function):print('已經進入processrow')result = function(row)#print(f"處理完成 #{idx}: {str(result)[:50]}...")print(f"處理完成 #{idx}")result_clean=json_repair.loads(result)return result_clean# 使用Parallel進行并行處理,并添加verbose參數(日志詳細程度)
results0= Parallel(n_jobs=10, verbose=10)(delayed(process_row)(row, i,function) for i, row in data0.iterrows()
)
通過Parallel的return_as參數,實現結果的即使反饋
return_as="generator"
輸出可以是一個生成器,它會盡快返回可用的結果,即使后續任務尚未完成。輸出的順序始終與輸入提交的順序一致
return_as="generator_unordered"
?
在這種情況下,輸出的順序取決于workers的并發情況,并且是不確定的,這意味著每次執行代碼時結果的順序可能不同。
傳統的?
Parallel(...)(...)
?會先把所有任務結果收集完,再一次性返回一個?list
。當任務數量大、單個結果對象很大時,可能導致內存占用暴漲,甚至 OOM(內存溢出)。使用?
return_as="generator"
?則:1.每次只保留一個任務的結果在內存中 2.避免累積數百/數千個中間結果。
注意事項
generator使用過比如next/list()后無法回到上一個狀態,想要關閉的話使用.close()
環境準備
首先可以更新一下joblib包,1.0.1版本的是不支持return_as的參數的
import joblib
print(joblib.__version__)
!pip install --upgrade joblib
Embarrassingly parallel for loops — joblib 1.6.dev0 documentation
可以參考文檔測試一下代碼
from math import sqrt
from joblib import Parallel, delayed
parallel = Parallel(n_jobs=2, return_as="generator")
output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10))
print(type(output_generator))
print(next(output_generator))
print(next(output_generator))
print(list(output_generator))
?實現中斷后可以繼續處理未處理的數據
import os
import json
import pandas as pd
from joblib import Parallel, delayed
import json_repair# 假設這個是你已有的 DataFrame
# data0 = pd.read_csv(...) 或別的方式加載SAVE_DIR = "results_json"
os.makedirs(SAVE_DIR, exist_ok=True)# 檢查是否已處理過
def has_processed(idx):return os.path.exists(os.path.join(SAVE_DIR, f"{idx}.json"))# 每一行任務的處理函數
def process_row(row, idx, openai_thinking_is_match):if has_processed(idx):print(f"跳過 #{idx}:已存在結果")return Noneprint(f"開始處理 #{idx}")result = openai_thinking_is_match(row)print(f"處理完成 #{idx}")result_clean = json_repair.loads(result)# 保存結果到獨立文件with open(os.path.join(SAVE_DIR, f"{idx}.json"), "w", encoding="utf-8") as f:json.dump(result_clean, f, ensure_ascii=False, indent=2)return result_clean# 執行并行任務parallel = Parallel(n_jobs=50, return_as="generator", verbose=10)output_generator = parallel(delayed(process_row)(row, idx, function)for idx, row in data0.iterrows())# 可選:邊執行邊消費for _ in output_generator:pass