Spark join數據傾斜調優

Spark中常見的兩種數據傾斜現象如下

  • stage部分task執行特別慢

在這里插入圖片描述

一般情況下是某個task處理的數據量遠大于其他task處理的數據量,當然也不排除是程序代碼沒有冗余,異常數據導致程序運行異常。

  • 作業重試多次某幾個task總會失敗
    在這里插入圖片描述

常見的退出碼143、53、137、52以及heartbeat timed out異常,通常可認為是executor內存被打滿。

RDD調優方法

  1. 查看數據分布
    Spark Core中shuffle算子出現數據傾斜時,可在Spark作業中加入查看key分布的代碼,也可以將代碼拆解出來使用spark-shell做測試
val rdd = sc.parallelize(Array("hello", "hello", "hello", "hi")).map((_,1))// 數據量較少
rdd.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(20)
// 數據量較大, 用sample采樣后在統計
rdd.sample(false, 0.1)
.reduceByKey(_+_)
.sortBy(_._2, false)
.take(20)
  1. 調整shuffle并行度
    原理:Spark在做shuffle時,默認使用HashPartitioner(非Hash Shuffle)對數據進行分區。如果并行度設置的不合適如比較小,可能造成大量不相同的key對應的數據被分配到了同一個task上,造成該task所處理的數據遠大于其它task,從而造成數據傾斜
    在這里插入圖片描述

調優建議:

  • 使用spark.default.parallelism調整分區數,默認值200建議500或更大
  • 在shuffle的算子上直接設置分區數,如:a.join(b, 500)、rdd.reduceByKey(_ + _, 500)
  1. reduce join轉map join
    原理:不使用join算子直接進行連接操作,而使用broadcast變量與map類算子實現join操作,進而完全規避掉shuffle類的操作,徹底避免數據傾斜的出現
    在這里插入圖片描述

調優建議:

  • broadcast的數據量不要超過500M, 過大driver/executor可能會oom
// 1.broadcast小表
val rdd1Broadcast = sc.broadcast(rdd1.collect())
// 2.map join
rdd2.map { x =>val rdd1DataMap = rdd1Broadcast.value.toMaprdd1DataMap.get(x._1) match {case Some(v) => (x._1, (x._2, v))case None => (x._1, (x._2, null))}
}
// 2.或者直接
rdd2.join(rdd1Broadcast)
  1. 分拆join在union
    原理:將有數據傾斜的RDD1中傾斜key對應的數據集單獨抽取出來加鹽(隨機前綴),另外一個RDD2每條數據分別與所有的隨機前綴結合形成新的RDD(相當于將其數據增到到原來的N倍,N即為隨機前綴的總個數),然后將二者join之后去掉前綴;然后將不包含傾斜key的剩余數據進行join;最后將兩次join的結果集通過union合并,即可得到全部join結果。
    在這里插入圖片描述

調優建議:

// 1.統計數量最大的key
val skewedKeySet = rdd1.sample(false, 0.2).reduceByKey(_ + _).sortBy(_._2, false).take(10).map(x => x._1).toSet// 2.拆分異常的rdd, 傾斜key加上隨機數
val rdd1_1 = rdd1.filter(x => skewedKeySet.contains(x._1)).map { x =>val prefix = scala.util.Random.nextInt(10).toString(s"${prefix}_${x._1}", x._2)
}
val rdd1_2 = rdd1.filter(x => !skewedKeySet.contains(x._1))// 3.正常rdd存在傾斜key的部分進行膨脹
val rdd2_1 = rdd2.filter(x => skewedKeySet.contains(x._1)).flatMap { x =>val list = 0 until 10list.map(i => (s"${i}_${x._1}", x._2))}val rdd2_2 = rdd2.filter(x => !skewedKeySet.contains(x._1))// 4.傾斜key的rdd進行join
val skewedRDD = rdd1_1.join(rdd2_1).map(x => (x._1.split("_")(1), x._2))
// 5.普通key的rdd進行join
val sampleRDD = rdd1_2.join(rdd2_2)
// 6.結果union
skewedRDD.union(sampleRDD)

SQL調優方法

  1. 查看數據分布
    統計某個查詢結果或表中出現次數超過200次的key
WITH a AS (${query})
SELECT k,s
FROM (SELECT ${key} AS k,count(*) AS sFROM aGROUP BY ${key}
)
WHERE s > 200
  1. 自動調整shuffle并行度
    原理:自適應執行開啟的前提下(AQE),假設我們設置的shuffle partition個數為5,在map stage結束之后,我們知道每一個partition的大小分別是70MB,30MB,20MB,10MB和50MB。假設我們設置每一個reducer處理的目標數據量是64MB,那么在運行時,我們可以實際使用3個reducer。第一個reducer處理partition 0 (70MB),第二個reducer處理連續的partition 1 到3,共60MB,第三個reducer處理partition 4 (50MB)
    在這里插入圖片描述

Spark參數:

參數說明推薦值
spark.sql.adaptive.enabled開啟自適應執行線上默認值true
spark.sql.adaptive.coalescePartitions.minPartitionNum自適應執行中使用的最小shuffle后分區數,默認值executor*core數
spark.sql.adaptive.coalescePartitions.initialPartitionNum合并前的初始shuffle分區數量,默認值spark.sql.shuffle.partitions
spark.sql.adaptive.advisoryPartitionSizeInBytes合并小分區到建議的目標值, 默認256m
spark.sql.shuffle.partitionsjoin等操作分區數,默認值200推薦500或更大
  1. 自動優化Join
    原理:自適應執行開啟的前提下(AQE),我們可以獲得SortMergeJoin兩個子stage的數據量,在滿足條件的情況下,即一張表小于broadcast閾值,可以將SortMergeJoin轉化成BroadcastHashJoin
    在這里插入圖片描述
參數說明推薦值
spark.sql.adaptive.enabled開啟自適應執行線上默認值true
spark.sql.autoBroadcastJoinThreshold默認10M,設置為-1可以禁用廣播;實際根據hive表存儲的統計信息或文件預估大小與此值做判斷看是否做broadcast,由于文件是壓縮格式一般情況下此參數并不可靠建議膨脹系數spark.sql.sources.fileCompressionFactor=10推薦此參數保持默認,調整自適應的broadcast參數
spark.sql.adaptive.autoBroadcastJoinThreshold此參數僅影響自適應執行階段join優化時broadcast閾值;設置為-1可以禁用廣播;默認值spark.sql.autoBroadcastJoinThreshold自適應執行得到的數據比較準確,driver內存足夠的前提下可以將此值調大如200M
  1. 自動處理數據傾斜
    原理:自適應執行開啟的前提下(AQE),我們可以在運行時很容易地檢測出有數據傾斜的partition。當執行某個stage時,我們收集該stage每個mapper 的shuffle數據大小和記錄條數。如果某一個partition的數據量或者記錄條數超過中位數的N倍,并且大于某個預先配置的閾值,我們就認為這是一個數據傾斜的partition,需要進行特殊的處理
    在這里插入圖片描述
參數說明推薦值
spark.sql.adaptive.enabled開啟自適應執行線上默認值true
spark.sql.adaptive.skewJoin.enabled開啟自動解決數據傾斜,默認值true
spark.sql.adaptive.skewJoin.skewedPartitionFactor影響因子,某分區數據大小超過所有分區中位數與影響因子乘積,才會被認為發生了數據傾斜
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes視為傾斜分區的分區數據最小值

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/38170.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/38170.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/38170.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【電路筆記】-放大器類型

放大器類型 文章目錄 放大器類型1、概述2、關于偏置的注意事項3、A類(Class A)放大器4、B類(Class B)放大器5、AB類(Class AB)放大器6、C類(Class C)放大器7、總結1、概述 放大器通常根據輸出級的結構進行分類。 事實上,功率放大確實發生在該階段,因此輸出信號的質量和…

Arduino (esp ) 下String的內存釋放

在個人的開源項目 GitHub - StarCompute/tftziku: 這是一個通過單片機在各種屏幕上顯示中文的解決方案 中為了方便快速檢索使用了string,于是這個string在esp8266中占了40多k,原本以為當string設置為""的時候這個40k就可以回收,結果發覺不行…

【JS異步編程】async/await——用同步代碼寫異步

歷史小劇場 懂得暴力的人,是強壯的;懂得克制暴力的人,才是強大的。----《明朝那些事兒》 什么是 async/await async: 聲明一個異步函數 自動將常規函數轉換成Promise,返回值也是一個Promise對象;只有async函數內部的異…

Java SE入門及基礎(59) 線程的實現(上) 線程的創建方式 線程內存模型 線程安全

目錄 線程(上) 1. 線程的創建方式 Thread類常用構造方法 Thread類常用成員方法 Thread類常用靜態方法 示例 總結 2. 線程內存模型 3.線程安全 案例 代碼實現 執行結果 線程(上) 1. 線程的創建方式 An application t…

利用 Docker 簡化 Nacos 部署:快速搭建 Nacos 服務

利用 Docker 簡化 Nacos 部署:快速搭建 Nacos 服務 引言 在微服務架構中,服務注冊與發現是確保服務間通信順暢的關鍵組件。Nacos(Dynamic Naming and Configuration Service)作為阿里巴巴開源的一個服務發現和配置管理平臺&…

任務調度器——任務切換

一、開啟任務調度器 函數原型: void vTaskStartScheduler( void ) 作用:用于啟動任務調度器,任務調度器啟動后, FreeRTOS 便會開始進行任務調度 內部實現機制(以動態創建為例): &#xff0…

Linux 安裝、配置Tomcat 的HTTPS

Linux 安裝 、配置Tomcat的HTTPS 安裝Tomcat 這里選擇的是 tomcat 10.X ,需要Java 11及更高版本 下載頁 ->Binary Distributions ->Core->選擇 tar.gz包 下載、上傳到內網服務器 /opt 目錄tar -xzf 解壓將解壓的根目錄改名為 tomat-10 并移動到 /opt 下, 形成個人…

測評推薦:企業管理u盤的軟件有哪些?

U盤作為一種便攜的存儲設備,方便易用,被廣泛應用于企業辦公、個人學習及日常工作中。然而,U盤的使用也帶來了數據泄露、病毒傳播等安全隱患。為了解決這些問題,企業管理U盤的軟件應運而生。 本文將對市面上流行的幾款U盤管理軟件…

Hadoop3:Yarn容量調度器配置多隊列案例

一、情景描述 需求1: default隊列占總內存的40%,最大資源容量占總資源60%,hive隊列占總內存的60%,最大資源容量占總資源80%。 二、多隊列優點 (1)因為擔心員工不小心,寫遞歸死循環代碼&#…

數據處理:四選一、四關聯

今天去面試,面試官們給我一個‘選擇’,有四個選項:‘展示你的才華’、‘展示你的美貌’、‘展示你的才華與美貌’、‘都不展示’ {label: “選擇”,children: [{label: “展示你的才華”,children: [],isShow: talentModal,click: () > {i…

電路筆記(電源模塊): 基于FT2232HL實現的jtag下載器硬件+jtag的通信引腳說明

JTAG接口說明 JTAG 接口根據需求可以選擇20針或14針的配置,具體選擇取決于應用場景和需要連接的功能。比如之前的可編程邏輯器件XC9572XL使用JTAG引腳(TCK、TDI、TDO、TMS、VREF、GND)用于與器件進行調試和編程通信。更詳細的內容可以閱讀11…

51單片機STC8H8K64U通過RA8889/RA8876如何控制彩屏(SPI源碼下載)

【硬件部份】 一、硬件連接實物: STC8H系列單片機不需要外部晶振和外部復位,在相同的工作頻率下,速度比傳統的8051單片機要快12倍,具有高可靠抗干擾的優秀特性,與瑞佑的RA8889/RA8876控制芯片剛好可以完美搭配用于工…

redis實戰-緩存雪崩問題及解決方案

定義理解 緩存雪崩是指在同一時間段,大量緩存的key同時失效,或者Redis服務宕機,導致大量請求到達數據庫,帶來巨大壓力 和緩存擊穿的區別: 緩存雪崩是由于緩存中的大量數據同時失效或緩存服務器故障引起的&#xff1b…

(漏洞檢查項) | 服務端請求偽造 SSRF

(漏洞檢查項)|服務端請求偽造 SSRF 漏洞場景 服務端請求偽造(SSRF,Server-Side Request Forgery)漏洞發生在應用程序允許攻擊者通過構造惡意請求,利用服務器端發起HTTP請求,并訪問內部資源或進行其他未授權操作。 漏…

css_20_定位

相對定位 設置相對定位 給元素設置 position: relative 即可實現相對定位。 可以使用 left、right、top 、 bottom 四個屬性調整位置。 相對定位的參考點是相對自己原來的位置相對定位的特點: 1.不會脫離文檔流,元素位置的變化,只…

機器學習周記(第四十五周:Graphformer)2024.6.24~2024.6.30

目錄 摘要ABSTRACT1 論文信息1.1 論文標題1.2 論文摘要1.3 論文引言1.4 論文貢獻 2 論文模型2.1 問題定義2.2 模型架構2.2.1 自注意下采樣模塊(Self-attention down-sampling module)2.2.2 稀疏圖自注意力機制(Sparse graph self-attention m…

python自動移除excel文件密碼(小工具)

安裝 msoffcrypto-tool 使用pip命令安裝: 打開命令行工具(如終端、命令提示符或Powershell),然后輸入以下命令來安裝msoffcrypto-tool: pip install msoffcrypto-tool庫,進行自動移除excel文件密碼 import msoffcrypt…

【C++】using namespace std 到底什么意思

📢博客主頁:https://blog.csdn.net/2301_779549673 📢歡迎點贊 👍 收藏 ?留言 📝 如有錯誤敬請指正! 📢本文作為 JohnKi 的學習筆記,引用了部分大佬的案例 📢未來很長&a…

新手練習項目 7:猜數字游戲

名人說:莫聽穿林打葉聲,何妨吟嘯且徐行。—— 蘇軾《定風波莫聽穿林打葉聲》 Code_流蘇(CSDN)(一個喜歡古詩詞和編程的Coder) 目錄 一、項目描述二、項目實現三、項目步驟四、項目擴展方向 更多項目內容,請關注我、訂…

comsol學習筆記

comsol巖土力學與流固耦合的學習 comsol的相關視頻教程 https://www.bilibili.com/video/BV1Cu4y1r7Gn/?spm_id_from333.337.search-card.all.click&vd_source02b2bad477a153eaeb9c48cbbedaf8df [這里面有講解地應力平衡技術] https://www.bilibili.com/video/BV17C4y1j…