dask.dataframe.shuffle.set_index中獲取 divisions 的步驟分析

dask.dataframe.shuffle.set_index 中獲取 divisions 的步驟分析

主要流程概述

set_index 函數中,當 divisions=None 時,系統需要通過分析數據來動態計算分區邊界。這個過程分為以下幾個關鍵步驟:

1. 初始檢查和準備

if divisions is None:sizes = df.map_partitions(sizeof) if repartition else []divisions = index2._repartition_quantiles(npartitions, upsample=upsample)mins = index2.map_partitions(M.min)maxes = index2.map_partitions(M.max)divisions, sizes, mins, maxes = base.compute(divisions, sizes, mins, maxes)

步驟說明:

  • 計算每個分區的大小(如果啟用重新分區)
  • 調用 _repartition_quantiles 計算近似分位數
  • 并行計算每個分區的最小值和最大值
  • 使用 base.compute 觸發實際計算

2. 分位數計算過程 (_repartition_quantiles)

_repartition_quantiles 方法調用 partition_quantiles 函數,該函數執行以下步驟:

2.1 生成采樣策略
def sample_percentiles(num_old, num_new, chunk_length, upsample=1.0, random_state=None):# 計算隨機百分位比例random_percentage = 1 / (1 + (4 * num_new / num_old) ** 0.5)# 生成等間距和隨機百分位
2.2 創建計算圖
# 1. 數據類型信息
dtype_dsk = {(name0, 0): (dtype_info, df_keys[0])}# 2. 每個分區的百分位摘要
val_dsk = {(name1, i): (percentiles_summary, key, df.npartitions, npartitions, upsample, state)for i, (state, key) in enumerate(zip(state_data, df_keys))
}# 3. 合并和壓縮摘要
merge_dsk = create_merge_tree(merge_and_compress_summaries, sorted(val_dsk), name2)# 4. 最終處理
last_dsk = {(name3, 0): (pd.Series, (process_val_weights, merged_key, npartitions, (name0, 0)), qs, None, df.name)
}

3. 數據后處理

divisions = methods.tolist(divisions)
if type(sizes) is not list:sizes = methods.tolist(sizes)
mins = methods.tolist(mins)
maxes = methods.tolist(maxes)

4. 空數據檢測和重新分區

empty_dataframe_detected = pd.isnull(divisions).all()
if repartition or empty_dataframe_detected:total = sum(sizes)npartitions = max(math.ceil(total / partition_size), 1)npartitions = min(npartitions, df.npartitions)# 插值生成新的分界點divisions = np.interp(x=np.linspace(0, n - 1, npartitions + 1),xp=np.linspace(0, n - 1, n),fp=divisions,).tolist()

5. 數據類型特殊處理

if pd.api.types.is_categorical_dtype(index2.dtype):dtype = index2.dtypemins = pd.Categorical(mins, dtype=dtype).codes.tolist()maxes = pd.Categorical(maxes, dtype=dtype).codes.tolist()

6. 排序優化檢查

if (mins == sorted(mins) and maxes == sorted(maxes) and all(mx < mn for mx, mn in zip(maxes[:-1], mins[1:]))):divisions = mins + [maxes[-1]]result = set_sorted_index(df, index, drop=drop, divisions=divisions)return result.map_partitions(M.sort_index)

這個檢查的作用:

  • 如果數據已經按索引排序,可以直接使用最小值和最大值作為分界點
  • 避免昂貴的shuffle操作

分位數計算詳細過程

核心算法:percentiles_summary 函數
def percentiles_summary(df, num_old, num_new, upsample, state):"""Summarize data using percentiles and derived weights."""# 1. 生成采樣百分位qs = sample_percentiles(num_old, num_new, len(df), upsample, state)# 2. 計算百分位值vals = df.quantile(qs)# 3. 轉換為權重return percentiles_to_weights(qs, vals, len(df))
權重計算:percentiles_to_weights 函數
def percentiles_to_weights(qs, vals, length):"""Weigh percentile values by length and the difference between percentiles"""if length == 0:return ()diff = np.ediff1d(qs, 0.0, 0.0)weights = 0.5 * length * (diff[1:] + diff[:-1])return vals.tolist(), weights.tolist()

權重計算原理:

  • 每個百分位值的權重 = 0.5 × 分區長度 × (前一個百分位差 + 后一個百分位差)
  • 這樣確保權重反映該值在數據分布中的重要性
合并和壓縮:merge_and_compress_summaries 函數
def merge_and_compress_summaries(vals_and_weights):"""Merge and sort percentile summaries that are already sorted."""vals, weights = zip(*vals_and_weights)vals = list(merge_sorted(*vals))weights = list(merge_sorted(*weights))# 壓縮重復值compressed_vals = []compressed_weights = []# ... 壓縮邏輯return compressed_vals, compressed_weights
最終處理:process_val_weights 函數

這個函數(我們之前分析過的)將合并后的值-權重對轉換為最終的分區邊界。

完整流程圖和示例

讓我創建一個詳細的流程圖來展示整個過程:

開始 set_index(df, index, divisions=None)↓
檢查 divisions 是否為 None↓
是 → 開始動態計算分界點↓
1. 并行計算:- 每個分區的大小 (sizes)- 每個分區的分位數 (divisions)- 每個分區的最小值 (mins)- 每個分區的最大值 (maxes)↓
2. 觸發計算:base.compute(divisions, sizes, mins, maxes)↓
3. 分位數計算過程:┌─────────────────────────────────────────┐│ 對每個分區執行 percentiles_summary:     ││ 1. 生成采樣百分位 (等間距 + 隨機)       ││ 2. 計算百分位值                        ││ 3. 轉換為權重                          │└─────────────────────────────────────────┘↓
4. 合并所有分區的摘要:┌─────────────────────────────────────────┐│ merge_and_compress_summaries:           ││ 1. 合并排序的值和權重                   ││ 2. 壓縮重復值                          │└─────────────────────────────────────────┘↓
5. 最終處理:┌─────────────────────────────────────────┐│ process_val_weights:                    ││ 1. 處理巨型值                          ││ 2. 計算最終分界點                      ││ 3. 處理數據類型                        │└─────────────────────────────────────────┘↓
6. 后處理:- 轉換為列表格式- 檢查空數據- 處理重新分區- 特殊數據類型處理↓
7. 排序優化檢查:- 如果數據已排序,使用 min/max 作為分界點- 否則繼續到 shuffle 階段↓
調用 set_partition 進行實際的數據重排↓
結束

關鍵優化策略

  1. 采樣策略:結合等間距和隨機百分位,平衡計算效率和準確性
  2. 排序檢測:如果數據已排序,避免昂貴的shuffle操作
  3. 數據類型感知:特別處理分類、時間等特殊數據類型
  4. 內存優化:通過壓縮和合并減少內存使用
  5. 分布式計算:利用Dask的并行計算能力

性能考慮

  • 時間復雜度:O(n log n),主要由排序和分位數計算決定
  • 空間復雜度:O(n),存儲采樣數據和權重
  • 網絡開銷:需要收集所有分區的統計信息
  • 計算開銷:需要兩次數據遍歷(統計 + shuffle)

總結

dask.dataframe.shuffle.set_index 中獲取 divisions 的過程是一個復雜的分布式算法,主要包含以下步驟:

核心步驟

  1. 并行統計:計算每個分區的分位數、大小、最小值、最大值
  2. 分位數計算:使用采樣策略生成代表性百分位
  3. 權重分配:根據數據分布為每個值分配權重
  4. 合并壓縮:合并所有分區的統計信息并壓縮重復值
  5. 分界點計算:使用 process_val_weights 計算最終分界點
  6. 優化檢查:檢測數據是否已排序,避免不必要的shuffle

關鍵特點

  • 分布式設計:充分利用Dask的并行計算能力
  • 智能采樣:結合等間距和隨機采樣策略
  • 類型感知:特別處理不同數據類型
  • 性能優化:檢測已排序數據,避免重復計算
  • 內存高效:通過壓縮和合并減少內存使用

這個算法是Dask DataFrame實現高效分布式排序和分區的核心,通過巧妙的采樣和合并策略,在保證準確性的同時實現了良好的性能。

自己實現

import numpy as np
import pandas as pd# 1?? 采樣百分位
def sample_percentiles(num_old, num_new, chunk_length, upsample=1.0, random_state=None):"""簡單版本:等間距百分位"""return np.linspace(0, 1, num_new + 1)# 2?? 計算百分位摘要(值+權重)
def percentiles_summary(series, num_old, num_new):qs = sample_percentiles(num_old, num_new, len(series))vals = series.quantile(qs).to_numpy()diff = np.ediff1d(qs, 0.0, 0.0)weights = 0.5 * len(series) * (diff[1:] + diff[:-1])return vals.tolist(), weights.tolist()# 3?? 合并多個分區的摘要
def merge_and_compress_summaries(summaries):all_vals = []all_weights = []for vals, weights in summaries:all_vals.extend(vals)all_weights.extend(weights)# 按值排序order = np.argsort(all_vals)vals = np.array(all_vals)[order]weights = np.array(all_weights)[order]# 壓縮重復值compressed_vals = []compressed_weights = []last_val = Nonefor v, w in zip(vals, weights):if last_val is not None and v == last_val:compressed_weights[-1] += welse:compressed_vals.append(v)compressed_weights.append(w)last_val = vreturn np.array(compressed_vals), np.array(compressed_weights)# 4?? 最終處理:計算分界點
def process_val_weights(vals, weights, npartitions):if len(vals) == 0:return np.array([])if len(vals) == npartitions + 1:return valselif len(vals) < npartitions + 1:q_weights = np.cumsum(weights)q_target = np.linspace(q_weights[0], q_weights[-1], npartitions + 1)return np.interp(q_target, q_weights, vals)else:target_weight = weights.sum() / npartitionsjumbo_mask = weights >= target_weightjumbo_vals = vals[jumbo_mask]trimmed_vals = vals[~jumbo_mask]trimmed_weights = weights[~jumbo_mask]trimmed_npartitions = npartitions - len(jumbo_vals)q_weights = np.cumsum(trimmed_weights)q_target = np.linspace(0, q_weights[-1], trimmed_npartitions + 1)left = np.searchsorted(q_weights, q_target, side="left")right = np.searchsorted(q_weights, q_target, side="right") - 1lower = np.minimum(left, right)trimmed = trimmed_vals[lower]rv = np.concatenate([trimmed, jumbo_vals])rv.sort()return rv# 5?? 模擬 set_index 中 divisions 的獲取
def simulate_set_index(df, column, npartitions):num_old = len(df)# 假設原始有分區(這里手動切分成2塊模擬)partitions = np.array_split(df[column], 2)summaries = [percentiles_summary(p, num_old, npartitions) for p in partitions]vals, weights = merge_and_compress_summaries(summaries)divisions = process_val_weights(vals, weights, npartitions)return divisions# ========== DEMO 使用 ==========
df = pd.DataFrame({"x": np.random.randint(0, 100, size=50)})divs = simulate_set_index(df, "x", npartitions=4)print("原始數據示例:\n", df.head())
print("\n計算得到的 divisions:", divs)

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/95659.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/95659.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/95659.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ai生成ppt工具有哪些?10款主流AI生成PPT工具盤點

隨著人工智能技術的飛速發展&#xff0c;AI生成PPT工具逐漸成為職場人士、學生和創作者提升效率的得力助手。這類工具通過智能算法&#xff0c;能夠快速將文本、數據或創意轉化為結構化、視覺化的演示文稿&#xff0c;大幅節省設計時間。1、AiPPT星級評分&#xff1a;★★★★★…

Qt多線程編程學習

Qt多線程編程學習 1. 項目概述 本項目展示了Qt中多線程編程的基本用法&#xff0c;通過繼承QThread類創建自定義線程&#xff0c;并演示了線程的啟動、執行和銷毀過程。項目包含一個簡單的用戶界面&#xff0c;用戶可以通過按鈕控制線程的啟動和結束。 1.1 項目結構 項目包含以…

加密貨幣武器化:惡意npm包利用以太坊智能合約實現隱蔽通信

ReversingLabs研究人員發現兩個惡意npm包利用以太坊&#xff08;Ethereum&#xff09;智能合約隱藏并傳播惡意軟件。這兩個名為colortoolsv2和mimelib2的軟件包于2025年7月被識別&#xff0c;展現了開源安全攻防戰中的新戰術。惡意軟件包偽裝成實用工具攻擊活動始于7月7日發布的…

Spring Boot 全局字段處理最佳實踐

在日常開發中&#xff0c;我們總會遇到一些瑣碎但又無處不在的字段處理需求&#xff1a;? 請求處理: 用戶提交的表單&#xff0c;字符串前后帶了多余的空格&#xff0c;需要手動 trim()。? 響應處理: 返回給前端的 BigDecimal 金額&#xff0c;因為精度問題導致JS處理出錯&am…

三坐標測量機在汽車制造行業中的應用

在汽車制造業中&#xff0c;零部件精度決定著整車性能。從發動機活塞的微米級公差&#xff0c;到車身焊接的毫米級間隙&#xff0c;汽車制造“差之毫厘&#xff0c;謬以千里” &#xff0c;任何細微偏差都可能引發連鎖反應&#xff1a;發動機抖動、異響、油耗飆升&#xff0c;車…

機床夾具設計 +選型

機床夾具設計—第2組&#xff08;鉆床夾具&#xff09;仿真組裝視頻_嗶哩嗶哩_bilibili 夾具-商品搜索-怡合達一站式采購平臺 米思米FA標準品電子目錄new 可能要吧這些定位塊單獨用yolo訓練一邊才能搞識別分析 3長條一短銷定位&#xff0c;黃色的用來夾緊 一個面加一短軸一棱…

表格識別技術:通過計算機視覺和OCR,實現非結構化表格向結構化數據的轉換,推動數字化轉型。

在日常工作和生活中&#xff0c;我們無處不在與表格打交道。從財務報表、發票收據&#xff0c;到科研論文中的數據表、醫療報告&#xff0c;表格以其清晰、結構化的方式&#xff0c;承載著大量關鍵信息。然而&#xff0c;當這些表格以紙質或圖片等非結構化形式存在時&#xff0…

Go基礎(②Viper)

Viper 讀取配置創建一個配置文件 config.yamlserver:port: 8080timeout: 30 # 超時時間&#xff08;秒&#xff09; database:host: "localhost"user: "root"password: "123456"name: "mydb"然后用 Viper 讀取這個配置&#xff0c;代…

kafka Partition(分區)詳解

一、什么是 PartitionPartition&#xff08;分區&#xff09; 是 Kafka Topic&#xff08;主題&#xff09; 的最小并行單位。一個 Topic 可以包含多個 Partition&#xff0c;每個 Partition 底層對應一個有序、不可變的消息隊列&#xff0c;消息只會順序追加。Partition 內部消…

中創中間件適配HGDB

文章目錄環境文檔用途詳細信息環境 系統平臺&#xff1a;Microsoft Windows (64-bit) 10 版本&#xff1a;5.6.5 文檔用途 本文章主要介紹中創中間件簡單適配HGDB。 詳細信息 一、數據源配置 1.數據庫準備 &#xff08;1&#xff09;安裝HGDB并創建一個名為myhgdb的數據…

服務器內存和普通計算機內存在技術方面有什么區別?

服務器內存和普通計算機內存在技術上的區別&#xff0c;主要體現在為滿足不同工作場景和要求而采用的設計和特性上。下面這個表格匯總了它們的主要技術差異&#xff0c;方便你快速了解&#xff1a; ?技術特性??服務器內存??普通計算機內存??錯誤校驗 (ECC)??支持ECC(…

哪款AI生成PPT工具對職場新人最友好?操作門檻最低的是哪個?

一句話生成專業PPT&#xff0c;職場新人也能輕松做出高質量演示文稿現代職場節奏快&#xff0c;PPT制作已成為必備技能。然而&#xff0c;職場新人常面臨兩大挑戰&#xff1a;缺乏設計經驗&#xff0c;以及需要在有限時間內完成高質量演示。傳統PPT制作耗時費力&#xff0c;需梳…

1.注解的力量:Spring Boot如何用注解重構IoC容器

文章目錄1.1 IoC容器&#xff1a;Spring的智能管家1.2 注解驅動&#xff1a;給管家下指令1.2.1 SpringBootApplication&#xff1a;總管家的聘書1.2.2 組件注解&#xff1a;員工的身份標識1.2.3 Autowired&#xff1a;依賴注入的三種方式1.2.4 Bean注解&#xff1a;手動招聘特殊…

【算法】92.翻轉鏈表Ⅱ--通俗講解

一、題目是啥?一句話說清 給你一個鏈表和兩個整數 left 和 right,反轉從第 left 個節點到第 right 個節點的子鏈表,并返回反轉后的鏈表。其他部分保持不變。 示例: 輸入:head = [1,2,3,4,5], left = 2, right = 4 輸出:[1,4,3,2,5](反轉了從第2到第4個節點) 二、解題…

Nature子刊:新發現!深層腦網絡中發現強迫癥癥狀的神經生物標志物

強迫癥&#xff08;OCD&#xff09;是一種令人困擾的精神疾病&#xff0c;患者常常被強迫思維和強迫行為所困擾。例如&#xff0c;有些人會反復洗手&#xff0c;無法控制自己的清潔沖動&#xff1b;還有些人會不斷檢查門窗是否關好&#xff0c;即便他們已經確認過無數次。這些行…

Onlyoffice集成與AI交互操作指引(Iframe版)

Onlyoffice集成與AI交互操作指引&#xff08;Iframe版&#xff09; 本文檔系統介紹了軟件系統集成OnlyOffice實現在線編輯與AI輔助功能的方案。主要內容包括&#xff1a;后端需提供文檔配置信息并實現Callback接口以處理文檔保存&#xff1b;前端通過Vue集成編輯器&#xff0c…

TypeScript 中 keyof、typeof 和 instanceof

在 TypeScript 開發中&#xff0c;keyof、typeof 和 instanceof 是核心的類型操作符和操作符&#xff0c;專門用于提升類型安全、代碼可讀性和維護性。1. keyof 操作符定義和用途&#xff1a;keyof 是一個類型操作符&#xff0c;用于獲取對象類型的所有鍵&#xff08;屬性名&am…

分布式專題——1.1 Redis單機、主從、哨兵、集群部署

1 Redis 部署 下面演示在 Linux 環境下部署 Redis7。 1.1 單機部署 1.1.1 檢查安裝 gcc 環境Redis 是由 C 語言編寫的&#xff0c;它的運行需要 C 環境&#xff0c;因此我們需要先安裝 gcc&#xff1b; # 關閉防?墻 systemctl stop firewalld.service # 查看防火墻狀態 firewa…

2025年滲透測試面試題總結-54(題目+回答)

安全領域各種資源&#xff0c;學習文檔&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各種好玩的項目及好用的工具&#xff0c;歡迎關注。1、SQL注入的防護方法有哪些&#xff1f; 2、永恒之藍的漏洞原理是什么&#xff1f;怎么做到的&#xff1f; 3、命令…

安卓學習 之 按鈕點擊事件

今天學習安卓應用中的按鈕點擊事件&#xff1a;總結下來在安卓應用中的Button注冊點擊事件的方法主要是以下4種方法&#xff0c;稍后會逐個介紹&#xff1a; 第一種方法&#xff1a;自定義內部類的方法 第二種方法&#xff1a;匿名內部類的方法 第三種方法&#xff1a;當前Acti…