distributed.client.Client 用戶可調用函數分析
1. 核心計算函數
任務提交和執行
-
submit(func, *args, key=None, workers=None, resources=None, retries=None, priority=0, fifo_timeout='60s', allow_other_workers=False, actor=False, actors=False, pure=None, **kwargs)
- 提交單個函數到集群執行
- 返回 Future 對象
-
map(func, *iterables, key=None, workers=None, retries=None, priority=0, allow_other_workers=False, fifo_timeout='60s', batch_size=None, **kwargs)
- 將函數映射到多個輸入上
- 類似 Python 的 map 函數
-
compute(collections, optimize_graph=True, **kwargs)
- 計算 Dask 集合(如 DataFrame, Array 等)
- 觸發實際的計算執行
-
persist(collections, optimize_graph=True, **kwargs)
- 將集合持久化到分布式內存中
- 返回新的集合,數據保留在集群中
-
get(dsk, keys, **kwargs)
- 執行 Dask 圖并獲取結果
- 底層計算接口
2. 數據管理函數
數據分發和收集
-
scatter(data, workers=None, broadcast=False, timeout=None, hash=True, **kwargs)
- 將本地數據分發到集群的 workers
- 支持廣播模式
-
gather(futures, errors='raise', direct=None, asynchronous=None)
- 從分布式內存中收集 Future 結果
- 將遠程數據拉取到本地
數據集管理
-
publish_dataset(*args, **kwargs)
- 發布命名數據集到調度器
- 使數據在集群中可被其他客戶端訪問
-
unpublish_dataset(name, **kwargs)
- 從調度器移除命名數據集
-
list_datasets(**kwargs)
- 列出調度器上可用的命名數據集
-
get_dataset(name, default=NO_DEFAULT_PLACEHOLDER, **kwargs)
- 從調度器獲取命名數據集
3. 任務控制函數
任務生命周期管理
-
cancel(futures, asynchronous=None, force=False)
- 取消正在運行的任務
- 支持強制取消
-
retry(futures, asynchronous=None)
- 重試失敗的任務
-
restart(**kwargs)
- 重啟分布式網絡
- 清理并重新初始化集群
4. 集群管理函數
集群操作
-
close(timeout=no_default)
- 關閉客戶端連接
- 清理資源
-
shutdown()
- 關閉連接的調度器和 workers
- 完全關閉集群
-
wait_for_workers(n_workers=0, timeout=None)
- 等待指定數量的 workers 啟動
- 阻塞調用
集群信息查詢
-
scheduler_info(**kwargs)
- 獲取集群中 workers 的基本信息
-
nthreads(workers=None, **kwargs)
- 獲取每個 worker 節點的線程/核心數
-
who_has(futures=None, **kwargs)
- 查詢存儲每個 future 數據的 workers
-
has_what(workers=None, **kwargs)
- 查詢每個 worker 持有的鍵
-
processing(workers=None)
- 查詢每個 worker 當前運行的任務
-
nbytes(keys=None, summary=True, **kwargs)
- 查詢集群中每個鍵占用的字節數
5. 監控和診斷函數
性能監控
-
profile(start=None, stop=None, plot=False, filename=None, server=False, scheduler=False, workers=None, merge_workers=True, plot_width=700, plot_height=300, **kwargs)
- 性能分析和可視化
- 生成性能報告
-
call_stack(futures=None, keys=None)
- 獲取相關鍵的活躍調用棧
日志和事件
-
get_scheduler_logs(n=None)
- 獲取調度器日志
-
get_worker_logs(n=None, workers=None, nanny=False)
- 獲取 workers 日志
-
log_event(topic, msg)
- 在指定主題下記錄事件
-
get_events(topic: str = None)
- 檢索結構化主題日志
6. 數據優化函數
數據重新分布
-
rebalance(futures=None, workers=None, **kwargs)
- 在集群中重新平衡數據
- 優化數據分布
-
replicate(futures, n=None, workers=None, branching_factor=2, **kwargs)
- 設置 futures 在集群中的復制
- 提高容錯性
7. 工具函數
實用工具
-
get_executor(**kwargs)
- 返回用于在此客戶端上提交任務的 concurrent.futures Executor
-
normalize_collection(collection)
- 用已存在的 futures 替換集合的任務
-
futures_of(futures)
- 從輸入中提取 Future 對象
-
write_scheduler_file(scheduler_file)
- 將調度器信息寫入 JSON 文件
元數據管理
-
get_metadata(keys, default=no_default)
- 從調度器獲取任意元數據
-
set_metadata(key, value)
- 在調度器中設置任意元數據
版本信息
get_versions(check=False, packages=[])
- 返回調度器、所有 workers 和客戶端的版本信息
8. 特殊執行函數
在特定位置執行
-
run_on_scheduler(function, *args, **kwargs)
- 在調度器進程上運行函數
-
run(function, *args, **kwargs)
- 在所有 workers 上運行函數(在任務調度系統外)
-
run_coroutine(function, *args, **kwargs)
- 在所有 workers 上生成協程
9. 開發工具函數
IPython 集成
-
start_ipython_workers(workers=None, magic_names=False, qtconsole=False, qtconsole_args=None)
- 在 workers 上啟動 IPython 會話
-
start_ipython_scheduler(magic_name="scheduler_if_ipython", qtconsole=False, qtconsole_args=None)
- 在調度器上啟動 IPython 會話
10. 文件管理函數
upload_file(filename, **kwargs)
- 上傳本地包到 workers
11. 上下文管理
-
as_current()
- 線程本地、任務本地上下文管理器
- 使 Client.current 類方法返回當前客戶端
-
current(cls, allow_global=True)
- 類方法,返回當前上下文中的客戶端
12. 狀態查詢函數
-
asynchronous()
- 查詢是否在事件循環中運行
-
dashboard_link()
- 獲取集群儀表板鏈接
這些函數涵蓋了 Dask 分布式計算的所有主要功能,從基本的任務提交到高級的集群管理和監控。用戶可以根據具體需求選擇合適的方法來操作分布式集群。