Spark Shuffle機制原理

文章目錄

  • 1.什么是Shuffle?
  • 2.Shuffle解決什么問題?
  • 3.Shuffle Write與Shuffle Read
  • 4.Shuffle的計算需求
    • 4.1 計算需求表
    • 4.2 partitionby
    • 4.3 groupByKey
    • 4.4 reduceByKey
    • 4.5 sortByKey
  • 5.Shuffle Write框架設計與實現
    • 5.1 Shuffle Write框架實現的功能
    • 5.2 Shuffle Write的多種情況
      • 5.2.1 不需要combine和sort
        • 5.2.1.1 操作流程
        • 5.2.1.2 優缺點
        • 5.2.1.3 適用性
      • 5.2.2 不需要combine,需要sort
        • 5.2.2.1 操作流程
        • 5.2.2.2 優缺點
        • 5.2.2.3 適用性
      • 5.2.3 需要combile,需要/不需要sort
        • 5.2.3.1 操作流程
        • 5.2.3.2 優缺點
        • 5.2.3.3 適用性
  • 6.Shuffle Read框架設計與實現
    • 6.1 Shuffle Read框架實現的功能
    • 6.2 Shuffle Read的不同情況
      • 6.2.1 不需要combine和sort
        • 6.2.1.1 操作流程
        • 6.2.1.2 優缺點
        • 6.2.1.3 適用性
      • 6.2.2 不需要combine,需要sort
        • 6.2.2.1 操作流程
        • 6.2.2.2 優缺點
        • 6.2.2.3 適用性
      • 6.2.3 需要combine,需要/不需要sort
        • 6.2.3.1 操作流程
        • 6.2.3.2 優缺點
        • 6.2.3.3 適用性

閱讀本篇文章前,需要閱讀 Spark執行計劃與UI分析

1.什么是Shuffle?

運行在不同stage、不同節點上的task間如何進行數據傳遞。這個數據傳遞過程通常被稱為Shuffle機制。

2.Shuffle解決什么問題?

如果是單純的數據傳遞,則只需要將數據進行分區、通過網絡傳輸即可,沒有太大難度,但Shuffle機制還需要進行各種類型的計算(如聚合、排序),而且數據量一般會很大。如何支持這些不同類型的計算,如何提高Shuffle的性能都是Shuffle機制設計的難點問題。

3.Shuffle Write與Shuffle Read

  • Shuffle Write:上游stage預先將輸出數據進行劃分,按照分區存放,分區個數與下游task個數一致,這個過程被稱為"Shuffle Write"。
  • Shuffle Read:上游數據按照分區存放完成后,下游的task將屬于自己分區的數據通過網絡傳輸獲取,然后將來自上游不同分區的數據聚合再一起處理,這個過程稱為"Shuffle Read"。

4.Shuffle的計算需求

4.1 計算需求表

所謂計算需求,也就是Shuffle要解決具體算子的哪些計算需求:
在這里插入圖片描述
這里我來分析幾個例子:

4.2 partitionby

在這里插入圖片描述
可以看到partitionby操作只進行了數據分區操作,并沒有涉及到數據的聚合和排序操作。

4.3 groupByKey

在這里插入圖片描述
可以看到groupByKey的操作既需要分區,又需要做聚合,并且在Shuffle Read階段做的聚合。

4.4 reduceByKey

在這里插入圖片描述
可以看到reduceByKey做了兩步聚合,在Shuffle Write中先執行func聚合一次(由spark內部執行,不生成新的rdd),然后進行分區數據傳輸,最后再在每個分區聚合一次,執行相同的func函數。同時func需要滿足交換律和結合律。兩次聚合(多了Shuffle Write端聚合)的優點是優化Shuffle的性能,一是傳輸的數據量大大減少,二是降低Shuffle Read端的內存消耗。

4.5 sortByKey

在這里插入圖片描述
分區后,在ShuffleRead端進行排序。sortByKey() 為了保證生成的RDD中的數據是全局有序(按照Key排序) 的, 采用Range劃分來分發數據。 Range劃分可以保證在生成的RDD中, partition 1中的所有record的Key小于(或大于) partition 2中所有的record的Key。
可以看到當前并沒有算子需要在Shuffle Write端進行排序的,但不能保證用戶實現的算子不會在Shuffle Write端進行排序,因此在spark實現Shuffle框架的時候保留了在Shuffle Write端進行排序的功能

5.Shuffle Write框架設計與實現

5.1 Shuffle Write框架實現的功能

如第四節中的圖所示,每個數據操作只需要其中的一個或兩個功能。Spark為了支持所有的情況,設計了一個通用的Shuffle Write框架,框架的計算順序為“map()輸出→數據聚合→排序→分區”輸出。
在這里插入圖片描述
map task每計算出一個record及其partitionId,就將record放入類似HashMap的數據結構中進行聚合;聚合完成后,再將HashMap中的數據放入類似Array的數據結構中進行排序,既可按照partitionId,也可以按照partitionId+Key進行排序;最后根據partitionId將數據寫入不同的數據分區中,存放到本地磁盤上。partitionId=Hash(Key)% 下游分區數

5.2 Shuffle Write的多種情況

5.2.1 不需要combine和sort

在這里插入圖片描述
這種Shuffle Write方式稱為:BypassMergeSortShuffleWriter
這種情況最簡單,只需要實現分區功能:

5.2.1.1 操作流程

map()依次輸出KV record,并計算其partitionId(PID),Spark根據 partitionId,將record依次輸出到不同的buffer中,每當buffer填滿就將record溢寫到磁盤上的分區文件中。分配buffer的原因是map()輸出record的速度很快,需要進行緩沖來減少磁盤I/O。

5.2.1.2 優缺點

該模式的優點是速度快,直接將record輸出到不同文件中。缺點是資源消耗過高,每個分區都需要有一個buffer(默認大小為32KB,由spark.Shuffle.file.buffer進行控制),當分區數過大時,內存消耗會很高。

5.2.1.3 適用性

適用于Shuffle Write端不需要聚合和排序且分區個數較少(小于spark.Shuffle.sort.bypassMergeThreshold,默認值為200),例如groupBy(100),partitionBy(100),sortByKey(100)。

5.2.2 不需要combine,需要sort

在這里插入圖片描述

這種Shuffle模式被命名為:SortShuffleWriter(KeyOrdering=true),使用的Array被命名為PartitionedPairBuffer

5.2.2.1 操作流程
  • 這種情況需要使用partitionId+key進行排序,Spark采用的實現方法是建立一個Array:PartitionedPairBuffer,來存放map()輸出的record,并將每個<K,V>record轉化為<(PartitionId,K),V>record,然后按照PartitionId+Key對record進行排序,最后將所有record寫入寫入一個文件中,通過建立索引來標示每個分區
  • 如果Array存放不下,則會先擴容,如果還存放不下,就將Array中的record排序后spill到磁盤上,等待map()輸出完以后,再將Array中的record與磁盤上已排序的record進行全局排序,得到最終有序的record,并寫入文件中。
5.2.2.2 優缺點
  • 優點是只需要一個Array結構就可以支持按照partitionId+Key進行排序,Array大小可控,而且具有擴容和spill到磁盤上的功能,支持從小規模到大規模數據的排序。同時,輸出的數據已經按照partitionId進行排序,因此只需要一個分區文件存儲,即可標示不同的分區數據,克服了BypassMergeSortShuffleWriter中建立文件數過多的問題,適用于分區個數很大的情況。缺點是排序增加計算時延。
5.2.2.3 適用性
  • map()端不需要聚合(combine)、Key需要排序、分區個數無限制。目前,Spark本身沒有提供這種排序類型的數據操作,但不排除用戶會自定義,或者系統未來會提供這種類型的操作。sortByKey()操作雖然需要按Key進行排序,但這個排序過程在Shuffle Read端完成即可,不需要在Shuffle Write端進行排序。

最后,使用這種Shuffle如何解決BypassMergeSortShuffleWriter存在的buffer分配過多的問題?我們只需要將“按PartitionId+Key排序”改為“只按PartitionId排序”,就可以支持“不需要map()端combine、不需要按照Key進行排序,分區個數過大”的操作。例如,groupByKey(300)、partitionBy(300)、sortByKey(300)。

5.2.3 需要combile,需要/不需要sort

在這里插入圖片描述
這種Shuffle模式被稱為:sort-based Shuffle Write,哈希表為:PartitionedAppendOnlyMap

5.2.3.1 操作流程
  • 需要實現按Key進行聚合(combine)的功能,Spark采用的實現方法是建立一個類似HashMap的數據結構對map()輸出的record進行聚合。HashMap中的Key是“partitionId+Key”,HashMap中的Value是經過相同combine的聚合結果。在圖中,combine()是sum()函數,那么Value中存放的是多個record對應的Value相加的結果。
  • 聚合完成后,Spark對HashMap中的record進行排序。如果不需要按Key進行排序,如上圖所示,那么只按partitionId進行排序;如果需要按Key進行排序,如圖6.7的下圖所示,那么按partitionId+Key進行排序。最后,將排序后的record寫入一個分區文件中。其中使用的hash表既可以實現聚合功能,也可以實現排序功能。
  • 如果HashMap存放不下,則會先擴容為兩倍大小,如果還存放不下,就將HashMap中的record排序后spill到磁盤上。此時,HashMap被清空,可以繼續對map()輸出的record進行聚合,如果內存再次不夠用,那么繼續spill到磁盤上,此過程可以重復多次。當map()輸出完成以后,將此時HashMap中的reocrd與磁盤上已排序的record進行再次聚合(merge),得到最終的record,輸出到分區文件中。
5.2.3.2 優缺點
  • 優缺點同5.4.2
5.2.3.3 適用性
  • 適合map()端聚合(combine)、需要或者不需要按Key進行排序、分區個數無限制的應用,如reduceByKey()、aggregateByKey()等。

6.Shuffle Read框架設計與實現

6.1 Shuffle Read框架實現的功能

在這里插入圖片描述
reduce task不斷從各個map task的分區文件中獲取數據(Fetch records),然后使用類似HashMap的結構來對數據進行聚(aggregate),該過程是邊獲取數據邊聚合。聚合完成后,將HashMap中的數據放入類似Array的數據結構中按照Key進行排序(sort byKey),最后將排序結果輸出或者傳遞給下一個操作。

6.2 Shuffle Read的不同情況

6.2.1 不需要combine和sort

在這里插入圖片描述

6.2.1.1 操作流程
  • 這種情況最簡單,只需要實現數據獲取功能即可。等待所有的map task結束后,reduce task開始不斷從各個map task獲取<K,V>record,并將record輸出到一個buffer中(大小為spark.reducer.maxSizeInFlight=48MB),下一個操作直接從buffer中獲取數據即可。
6.2.1.2 優缺點
  • 優點是邏輯和實現簡單,內存消耗很小。缺點是不支持聚合、排序等復雜功能。
6.2.1.3 適用性
  • 適合既不需要聚合也不需要排序的應用,如partitionBy()等。

6.2.2 不需要combine,需要sort

在這里插入圖片描述
使用的Array結構:PartitionedPairBuffer

6.2.2.1 操作流程
  • 獲取數據后,將buffer中的record依次輸出到一個Array結構(PartitionedPairBuffer)中。由于這里采用了本來用于Shuffle Write端的PartitionedPairBuffer結構,所以還保留了每個record的partitionId。然后,對Array中的record按照Key進行排序,并將排序結果輸出或者傳遞給下一步操作。
  • 當內存無法存下所有的record時,PartitionedPairBuffer將record排序后spill到磁盤上,最后將內存中和磁盤上的record進行全局排序,得到最終排序后的record。
6.2.2.2 優缺點
  • 優點是只需要一個Array結構就可以支持按照Key進行排序,Array大小可控,而且具有擴容和spill到磁盤上的功能,不受數據規模限制。缺點是排序增加計算時延。
6.2.2.3 適用性
  • 適合reduce端不需要聚合,但需要按Key進行排序的操作,如sortByKey()、sortBy()等。

6.2.3 需要combine,需要/不需要sort

在這里插入圖片描述
哈希表:ExternalAppendOnlyMap

6.2.3.1 操作流程
  • 獲取record后,Spark建立一個類似HashMap的數據結構(ExternalAppendOnlyMap)對buffer中的record進行聚合,HashMap中的Key是record中的Key,HashMap中的Value是經過相同聚合函數(func())計算后的結果。
  • 聚合函數是sum()函數,那么Value中存放的是多個record對應Value相加后的結果。之后,如果需要按照Key進行排序,如下圖所示,則建立一個Array結構,讀取HashMap中的record,并對record按Key進行排序,排序完成后,將結果輸出或者傳遞給下一步操作。
  • 如果HashMap存放不下,則會先擴容為兩倍大小,如果還存放不下,就將HashMap中的record排序后spill到磁盤上。此時,HashMap被清空,可以繼續對buffer中的record進行聚合。如果內存再次不夠用,那么繼續spill到磁盤上,此過程可以重復多次。當聚合完成以后,將此時HashMap中的reocrd與磁盤上已排序的record進行再次聚合,得到最終的record,輸出到分區文件中。
    注意,這里的排序和聚合依然使用的同一個數據結構。
6.2.3.2 優缺點
  • 同上一節。
6.2.3.3 適用性
  • 適合reduce端需要聚合、不需要或需要按Key進行排序的操作,如reduceByKey()、aggregateByKey()等。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/918825.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/918825.shtml
英文地址,請注明出處:http://en.pswp.cn/news/918825.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Cursor vs Trae vs VSCode:2025終極IDE橫評,誰才是開發者的效率之選?

前言 2025年的編程世界&#xff0c;AI不再只是輔助&#xff0c;而是編程工作流的核心驅動者。從微軟的VSCode 到新銳 Cursor 與國產黑馬 Trae &#xff0c;三大 IDE 正在重新定義“人機協作”的邊界。本文從架構設計、AI能力、場景適配等維度&#xff0c;帶你看透工具本質&…

Vue 安裝指定版本依賴包、刪除某個依賴包、依賴管理

如何安裝指定版本的依賴包安裝指定版本&#xff1a;一旦你知道了想要的版本號&#xff0c;比如3.4.0&#xff0c;你可以使用以下命令來安裝這個版本的vue-router&#xff1a;npm install vue-router3.4.0 --save這里的^表示安裝3.4.0的最新小版本更新&#xff0c;但不會超過主版…

psycopg2 如何驗證鏈接是否有效

在 psycopg2 中&#xff0c;驗證數據庫連接是否有效&#xff08;即連接是否仍然活躍&#xff09;可以通過以下幾種方法實現&#xff1a;1. 使用 conn.closed 屬性 psycopg2 的連接對象有一個 closed 屬性&#xff0c;可以檢查連接是否已關閉&#xff1a; import psycopg2conn …

數據科學與計算-電商雙11美妝數據分析

一、項目背景&#xff1a;雙 11 美妝數據的價值所在 每年的 “雙 11” 購物節都是電商行業的盛宴&#xff0c;而美妝品類作為消費熱門領域&#xff0c;蘊含著豐富的用戶行為與市場趨勢信息。該項目聚焦雙 11 期間的美妝電商數據&#xff0c;旨在通過數據分析揭示以下核心問題&…

簡單了解MongoDB數據存儲

官方文檔&#xff1a;MongoDB中文手冊|官方文檔中文版 | MongoDB-CN-Manual 什么是MongoDB? MongnDB是一個分布式文件存儲數據庫(或叫文檔數據庫)&#xff0c;是一個介于 關系數據庫和非關系數據庫之間的產品&#xff0c;是非關系數據庫當中功能最豐富&#xff0c;最像關系數…

web網站開發,在線%射擊比賽成績管理%系統開發demo,基于html,css,jquery,python,django,model,orm,mysql數據庫

經驗心得 這個也是crud業務單子&#xff0c;第二個聊點其他的&#xff0c;從最早的無分層開發&#xff0c;到三層開發&#xff0c;工廠&#xff0c;各種接口&#xff0c;再到后面多層&#xff0c;代碼無痕aop&#xff0c;各種框架等&#xff0c;都是在方便我們快速打架一個程序…

[QtADS]解析ads.pro

本文來源 &#xff1a; 騰訊元寶subdirs : 子目錄TEMPLATE subdirs的作用????核心功能??&#xff1a;聲明當前項目為“多項目管理”模式。Qt 的構建系統&#xff08;qmake&#xff09;會遍歷 SUBDIRS中列出的子目錄&#xff0c;在每個子目錄中尋找 .pro文件并遞歸構建。…

三方相機問題分析六:【沒用相機,詭異的手電筒不可使用】下拉狀態欄,手電筒置灰,無法打開,提提示相機正在使用

【關注我,后續持續新增專題博文,謝謝!!!】 上一篇我們講了: 這一篇我們開始講: 三方相機問題分析六:【沒用相機,詭異的手電筒不可使用】下拉狀態欄,點擊手電筒,手電筒置灰,無法打開,提提示相機正在使用9348353 目錄 一、問題背景 二、:問題分析過程 2.1:基于…

Java Selenium 自動打開瀏覽器保存截圖

// 代碼 public class ScreenshotExample {public static void main(String[] args) {// 1. 設置瀏覽器驅動路徑&#xff08;根據實際路徑修改&#xff09;System.setProperty("webdriver.chrome.driver", "D:\\chromedriver-win64\\chromedriver.exe");//…

新商機:為了減少輻射,可以用座機打機房中心再轉手機

某些人痛恨自家附近有基站&#xff0c;說是輻射太大。你不能說人家迷信。一般解決辦法就是拆基站。而我覺得&#xff0c;商機來了。現在座機基本沒人裝了。新商機就是座機。附近沒有基站&#xff0c;又要打電話&#xff0c;怎么辦&#xff1f;裝座機。用座機打電話時&#xff0…

【Java|第十九篇】面向對象九——String類和枚舉類

&#xff08;四&#xff09;面向對象11、String類&#xff08;1&#xff09;概述<1>String是一個類&#xff0c;引用數據類型&#xff0c;用來表示字符串&#xff1b;<2>String是Lang包下的類&#xff0c;使用不需要導包&#xff1b;<3>字符串的值不能變&…

超越相似名稱:Elasticsearch semantic text 如何在簡潔、高效、集成方面超越 OpenSearch semantic 字段

作者&#xff1a;來自 Elastic Mike Pellegrini, Nick Chow 及 Libby Lin 比較 Elasticsearch 語義文本和 OpenSearch 語義字段在簡潔性、可配置性和效率方面的表現。 自己動手體驗向量搜索&#xff0c;使用這個自定進度的 Search AI 實操學習。你現在可以開始免費的云試用&am…

OpenAI發布最新大模型GPT5、本地部署GPT開源模型

OpenAI發布最新大模型GPT5、本地部署GPT開源模型 GPT-5概述 北京時間 2025年8月8日 凌晨1點 OPENAI舉行了1個小時的線上發布會&#xff0c;正式推出了其史上最聰明、最強大的大模型GPT-5。 GPT-5是OpenAI發布的最新一代大型語言模型&#xff0c;它基于Transformer架構&#xff…

容器網絡模式選擇在云服務器多節點部署中的連通性驗證方案

容器網絡模式選擇在云服務器多節點部署中的連通性驗證方案在云計算環境中&#xff0c;容器網絡模式的選擇直接影響著多節點部署的通信效率和安全性。本文將從Docker原生網絡驅動對比入手&#xff0c;深入分析Overlay、Host、Bridge等主流網絡模式在跨主機通信場景下的性能表現&…

電商雙11美妝數據分析

1、數據初步了解2.數據清洗2.1 重復值處理 直接刪除重復值。2.2 缺失值處理通過上面觀察數據發現sale_count,comment_count 存在缺失值,先觀察存在缺失值的行的基本情況存在的缺失值很可能意味著售出的數量為0或者評論的數量為0&#xff0c;所以我們用0來填補缺失值。2.3 數據挖…

Pytest項目_day14(參數化、數據驅動)

parametrize 參數化可以組裝測試數據。在測試前定義好測試數據&#xff0c;并在測試用例中使用 單參數單次循環 我們可以在裝飾器中使用mark.parametrize&#xff0c;來定義參數名和參數值列表 參數名還需要傳給函數參數名需要用字符串來定義&#xff0c;參數值列表需要用可迭代…

Nest.js、Knex.js、Nuxt.js、Next.js 和 Spring Boot的異同相關概念

總述Nest.js、Knex.js、Nuxt.js、Next.js 和 Spring Boot 是分屬不同技術領域的工具&#xff0c;涵蓋前端框架、后端框架、數據庫工具等角色&#xff0c;它們在開發中既有功能交集&#xff0c;也有明確的定位差異。一、相同點服務端參與能力五者均能在服務端發揮作用&#xff1…

第2節 大模型分布式推理架構設計原則

大模型推理系統的設計過程,本質上是在多重約束下尋找最優解的過程。硬件資源的物理限制、場景對性能的剛性要求、系統的可擴展性需求,共同構成了設計的邊界條件。明確這些約束的具體表現形式,理解性能指標之間的權衡邏輯,確立架構設計的核心原則,是構建高效分布式推理系統…

快速部署一個鑒黃服務

1.安裝依賴pip install opennsfw22.代碼實現import opennsfw2 as n2# 將自動下載預訓練模型 open_nsfw_weights.h5 到 C:\Users\Administrator\.opennsfw2\weights # pip install opennsfw2# 單張預測 image_path 1.jpg nsfw_probability n2.predict_image(image_path) print…

Camera open failed

前言 由前面的幾篇博客可以知道&#xff0c;openCamera&#xff0c;createCaptureSession&#xff0c;setRepeatingRequest&#xff0c;capture是非常重要的過程&#xff0c;如果其中一個環節出了問題時該如何分析呢&#xff0c;這里我們首先從打開相機流程時&#xff0c;打開…