Spark Shuffle Tracking 原理分析

Shuffle Tracking

Shuffle Tracking 是 Spark 在沒有 ESS(External Shuffle Service)情況,并且開啟 Dynamic Allocation 的重要功能。如在 K8S 上運行 spark 沒有 ESS。本文檔所有的前提都是基于以上條件的。

如果開啟了 ESS,那么 Executor 計算完后,把 shuffle 數據交給 ESS, Executor 沒有任務時,可以安全退出,下游任務從 ESS 拉取 shuffle 數據。

1. 背景

如果 Executor 執行了上游的 Shuffle Map Task 并且把 shuffle 數據些到本地。并且現在 Executor 沒有 Task 運行,那么此 Executor 是否能銷毀?

現狀是如果 Executor 沒有 active 的 shuffle 數據,則可以被銷毀。
active shuffle 的定義:如果 Shuffle Map Stage 的 task 把 shuffle 數據輸出到本地。如果依賴此 shuffle 的Stage 沒有計算完畢,則稱此 shuffle 為 active shuffle。因為依賴此 shuffle 的 Task 可能從 Driver 端獲取了 MapStatus,但是還沒有拉取完 shuffle 數據。

為了達到此目的,需要跟蹤每個 Stage 和每個 Task 的運行信息。并且啟動定時任務,定時掃描每個 Executor,判斷是否有任務運行,是否有 active 的 shuffle,如果沒有則可以退出。

退出有兩種,如果開啟了 decommission,則到期的 executors 進入 decommission 模式,否則執行 killExecutors。

參數配置

spark.dynamicAllocation.shuffleTracking.enabled: 默認 true,是否開啟 shuffle tracking。
spark.dynamicAllocation.shuffleTracking.timeout: 默認 Long.MaxValue,

2. 設計

ExecutorMonitor 為每個 Executor 創建一個 Tracker, 用于跟蹤此 Executor 的狀態。

private val executors = new ConcurrentHashMap[String, Tracker]()

定時任務間隔時間查找 timeout 的 executor,然后處理。

timedOutExecutors 方法的主要邏輯,就是遍歷 executors。如果 executor 沒有 active 的 shuffle 并且當前時間大于 executor 的超時時間 timeoutAt,則此 executor 可以被安全釋放。

為什么 executor 有 active shuffle 數據就不能 kill?
在這里插入圖片描述

  • Shuffle 的過程:
  1. MapTask 把 shuffle 寫到本地,并且把狀態匯報給 Driver.
  2. Reduce Task 從 Driver 獲取 shuffle status,并從 shuffle status 獲取每個 shuffle 數據的地址。
  3. 連接對應的 executor 獲取 shuffle 數據。

如果在 reduce 獲取完 shuffle status 后,MapTask 所在的 Executor 被 kill 掉,Reduce Task 就無法獲取 shuffle 數據。

如果執行 decommission 邏輯,把 MapTask 的 shuffle 數據長傳到 bos 等分布式存儲是否可以?

也是不可以的,因為 reduce 可能已經把 shuffle status 拿走,獲取的 shuffle status 沒有記錄 shuffle 數據在分布式存儲上。

參考: ExecutorMonitor,ExecutorAllocationManager

Executor 狀態的更新

ExecutorMonitor 實現了 SparkListner 接口,當 Job, Stage, Task 等 start 和 end 時,都會執行回調。

以 hasActiveShuffle 為例
每個 executor 用一個集合 shuffleIds 存儲其上擁有的 shuffle 數據。 當其為空時,說明沒有 shuffle 數據。

在 onTaskEnd 和 onBlockUpdated 時調用 addShuffle 向 shuffleIds 添加數據。

在以下時機刪除 shuffleIds 里的數據。

  1. 依賴 driver 端的 ContextCleaner,當 ShuffleRDD 僅有 weakReference 時觸發。
  2. rdd.cleanShuffleDependencies 方法,但是此方法僅在 org.apache.spark.ml.recommendation.ALS 使用。

timeoutAt 的計算邏輯

總結:timeoutAt 根據 idle 的時間,spark.dynamicAllocation.cachedExecutorIdleTimeout 和 spark.dynamicAllocation.shuffleTracking.timeout 這 3 個值中最大的值。

詳細計算邏輯:
timeoutAt 在一些事件發生時觸發計算,如 onBlockUpdated, onUnpersistRDD, updateRunningTasks, removeShuffle, updateActiveShuffles
timeoutAt 的計算邏輯:
當執行器有計算任務時 為 Long.MaxValue。
否則為 max(_cacheTimeout, _shuffleTimeout, idleTimeoutNs)
_cacheTimeout: 如果沒有 cache 數據,為0,否則為參數 spark.dynamicAllocation.cachedExecutorIdleTimeout 的值(默認 Long.MaxValue)。

_shuffleTimeout: 如果沒有 shuffle數據,為 0, 否則為參數 spark.dynamicAllocation.shuffleTracking.timeout 的值(默認 Long.MaxValue)。
idleTimeoutNs 為 spark.dynamicAllocation.executorIdleTimeout

3. 測試

測試命令

spark-shell  \--conf spark.dynamicAllocation.enabled=true \--conf spark.dynamicAllocation.initialExecutors=2 \--conf spark.dynamicAllocation.maxExecutor=400 \--conf spark.dynamicAllocation.minExecutors=1 \--conf spark.shuffle.service.enabled=false \--conf spark.dynamicAllocation.shuffleTracking.enabled=true

參考資料:

https://www.waitingforcode.com/apache-spark/what-new-apache-spark-3-shuffle-service-changes/read

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/713350.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/713350.shtml
英文地址,請注明出處:http://en.pswp.cn/news/713350.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MySQL 表的基本操作,結合項目的表自動初始化來講

有了數據庫以后,我們就可以在數據庫中對表進行增刪改查了,這也就意味著,一名真正的 CRUD Boy 即將到來(😁)。 查表 查看當前數據庫中所有的表,使用 show tables; 命令 由于當前數據庫中還沒有…

基于Python3的數據結構與算法 - 09 希爾排序

一、引入 希爾排序是一種分組插入排序的算法。 二、排序思路 首先取一個整數d1 n/2,將元素分為d1個組,每組相鄰量取元素距離為d1,在各組內直接進行插入排序;取第二個整數d2 d1/2, 重復上述分組排序過程&#xff0…

Angular 2 中的樣式綁定和 NgStyle

在 Angular 2 模板中綁定內聯樣式很容易。以下是一個綁定單個樣式值的示例&#xff1a; 你還可以指定單位&#xff0c;例如在這里我們將單位設置為 em&#xff0c;但也可以使用 px、% 或 rem&#xff1a; <p [style.font-size.em]"3">A paragraph at 3em! &l…

CSS 自測題 -- 用 flex 布局繪制骰子(一、二、三【含斜三點】、四、五、六點)

一點 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>css flex布局-畫骰子</title><sty…

vue3 滾動條觸底監聽

問題&#xff1a;指定區域內&#xff0c;顯示返回的數據&#xff0c;要求先顯示20條&#xff0c;區域超出部分滾動顯示&#xff0c;對滾動條進行監聽&#xff0c;滾動條觸底后&#xff0c;繼續顯示下20條... 解決過程&#xff1a; 1.在區域的div上&#xff0c;添加scroll事件…

Unity 切換場景

場景切換前必須要將場景拖動到Build中 同步加載場景 using System.Collections; using System.Collections.Generic; //using UnityEditor.SearchService; using UnityEngine; // 場景管理 需要導入該類 using UnityEngine.SceneManagement;public class c3 : MonoBehaviour {…

redis五大基礎類型【重點】

之前寫過一點小知識&#xff1a;https://blog.csdn.net/qq_45927881/article/details/134959181?spm1001.2014.3001.5501 參考鏈接 https://xiaolincoding.com/redis/data_struct/command.html#%E4%BB%8B%E7%BB%8D 目錄 1. string&#xff08;字符串&#xff09;2. Hash&#…

MySql安全加固:配置不同用戶不同賬號禁止使用舊密碼禁止MySql進程管理員權限

MySql安全加固&#xff1a;配置不同用戶不同賬號&禁止使用舊密碼&禁止MySql進程管理員權限 1.1 檢查是否配置不同用戶不同賬號1.2 檢查是否禁止使用舊密碼1.3 禁止MySql進程管理員權限 &#x1f496;The Begin&#x1f496;點點關注&#xff0c;收藏不迷路&#x1f496…

【c++】通訊錄管理系統

1.系統功能介紹及展示 2.創建項目 3.菜單實現 4.退出功能實現 5.添加聯系人—結構體設計 6.添加聯系人—功能實現 7.顯示聯系人 8.刪除練習人—檢測聯系人是否存在 9.刪除聯系人—功能實現 10.查找聯系人 11.修改聯系人 12.清空通訊錄 #include <iostream> #include <…

什么是VR虛擬社區|VR元宇宙平臺|VR主題館加盟

VR虛擬社區是指一種基于虛擬現實技術構建的在線社交平臺或環境&#xff0c;用戶可以在其中創建虛擬化的個人形象&#xff08;也稱為avatars&#xff09;并與其他用戶進行交流、互動和合作。在VR虛擬社區中&#xff0c;用戶可以選擇不同的虛擬場景和環境&#xff0c;如虛擬公園、…

fly-barrage 前端彈幕庫(3):滾動彈幕的設計與實現

項目官網地址&#xff1a;https://fly-barrage.netlify.app/&#xff1b; &#x1f451;&#x1f40b;&#x1f389;如果感覺項目還不錯的話&#xff0c;還請點下 star &#x1f31f;&#x1f31f;&#x1f31f;。 Gitee&#xff1a;https://gitee.com/fei_fei27/fly-barrage&a…

顯示器開機正常,插入HDMI線卻不顯示畫面,換了HDMI線還是不行?

環境&#xff1a; 惠普/P24VG4 DELL筆記本 問題描述&#xff1a; 顯示器開機正常&#xff0c;插入HDMI線卻不顯示畫面&#xff0c;換了HDMI線還是不行&#xff0c;是不是顯示器壞了&#xff1f; 解決方案&#xff1a; 1.前往顯示器設置菜單里面查看input 2.把輸入源默認設…

二百二十五、海豚調度器——用DolphinScheduler調度執行Flume數據采集任務

一、目的 數倉的數據源是Kafka&#xff0c;因此離線數倉需要用Flume采集Kafka中的數據到HDFS中 在實際項目中&#xff0c;不可能一直在Xshell中啟動Flume任務&#xff0c;一是項目的Flume任務很多&#xff0c;二是一旦Xshell頁面關閉Flume任務就會停止&#xff0c;這樣非常不…

案例研究|DataEase助力眾陶聯應對產業鏈數據可視化挑戰

佛山眾陶聯供應鏈服務有限公司&#xff08;以下簡稱為“眾陶聯”&#xff09;成立于2016年&#xff0c;是由34家陶瓷企業共同創辦的建陶行業工業互聯網平臺&#xff0c;股東產值占整個行業的22.5%。眾陶聯以數據賦能為核心&#xff0c;積極探索新的交易和服務模式&#xff0c;構…

ant-design-vue如何限制圖片上傳的尺寸?

handleBeforeUpload(file, fileList) {// fileList 只包含了當次上傳的文件列表&#xff0c;不包含已上傳的文件列表// 所以長度要加上已上傳的文件列表的長度const isLimit this.fileList.length fileList.length > this.limit;const indexOfFile fileList.findIndex(it…

C++ STL 之容器 vector 常見用法

一. 什么是vector vector為“變長數組”&#xff0c;即長度根據需要而自動改變的數組。 頭文件&#xff1a; #include <vector>using namespace std;單獨定義一個vector&#xff1a;vector<typename> name&#xff0c;相當于一維數組 name[SIZE] &#xff0c;其長…

mac-docker-php容器連接mac中的pgsql數據庫失敗以及出現table_msg存錯誤時的解決方法

以php中的thinkphp 5.1為例&#xff0c;php容器連接mac中的pgsql數據庫失敗時&#xff0c;出現如下錯誤 [7] PDOException in Connection.php line 528 SQLSTATE[08006] [7] could not connect to server: Connection refused Is the server running on host "localhost&…

Git 配置處理客戶端無法正常訪問到 github 原網站時,npm 下載依賴包失敗的問題

Git 配置處理客戶端無法正常訪問到 github 原網站時&#xff0c;npm 下載依賴包失敗的問題 使用 github 的鏡像網站地址或類似的替代產品地址&#xff0c;代替到 npm 拉取依賴包的 git 地址本地Git配置 例如&#xff1a;執行一下命令&#xff0c;則是以https://kgithub.com 替…

requests庫/urllib3庫返回WEB響應內容的處理差異

requests庫是一個廣泛使用的HTTP庫&#xff0c;用于發送HTTP請求和處理響應。 以下是requests庫中一些主要類和方法的詳細介紹&#xff1a;requests庫主要類和方法 類:requests.models.Response: status_code: 響應狀態碼。text: 以Unicode形式返回響應內容。content: 以字節形…

MySQL的主從同步原理

MySQL的主從同步&#xff08;也稱為復制&#xff09;是一種數據同步技術&#xff0c;用于將一個MySQL服務器&#xff08;主服務器&#xff09;上的數據和變更實時復制到另一個或多個MySQL服務器&#xff08;從服務器&#xff09;。這項技術支持數據備份、讀寫分離、故障恢復等多…