Spark排錯與優化

一. 運維

1. Master掛掉,standby重啟也失效

Master默認使用512M內存,當集群中運行的任務特別多時,就會掛掉,原因是master會讀取每個task的event log日志去生成spark ui,內存不足自然會OOM,可以在master的運行日志中看到,通過HA啟動的master自然也會因為這個原因失敗。

解決

  1. 增加Master的內存占用,在Master節點spark-env.sh 中設置:

    export SPARK_DAEMON_MEMORY 10g # 根據你的實際情況
    
  2. 減少保存在Master內存中的作業信息

    spark.ui.retainedJobs 500   # 默認都是1000
    spark.ui.retainedStages 500
    

2. worker掛掉或假死

有時候我們還會在web ui中看到worker節點消失或處于dead狀態,在該節點運行的任務則會報各種 lost worker 的錯誤,引發原因和上述大體相同,worker內存中保存了大量的ui信息導致gc時失去和master之間的心跳。

解決

  1. 增加Master的內存占用,在Worker節點spark-env.sh 中設置:

    export SPARK_DAEMON_MEMORY 2g # 根據你的實際情況
    
  2. 減少保存在Worker內存中的Driver,Executor信息

    spark.worker.ui.retainedExecutors 200   # 默認都是1000
    spark.worker.ui.retainedDrivers 200   
    

二. 運行錯誤

1.shuffle FetchFailedException

Spark Shuffle FetchFailedException解決方案

錯誤提示

  1. missing output location

    org.apache.spark.shuffle.MetadataFetchFailedException: 
    Missing an output location for shuffle 0
    

    missing output location

  2. shuffle fetch faild

    org.apache.spark.shuffle.FetchFailedException:
    Failed to connect to spark047215/192.168.47.215:50268
    

    shuffle fetch faild

    當前的配置為每個executor使用1core,5GRAM,啟動了20個executor

解決

這種問題一般發生在有大量shuffle操作的時候,task不斷的failed,然后又重執行,一直循環下去,直到application失敗。

faild

一般遇到這種問題提高executor內存即可,同時增加每個executor的cpu,這樣不會減少task并行度。

  • spark.executor.memory 15G
  • spark.executor.cores 3
  • spark.cores.max 21

啟動的execuote數量為:7個

execuoterNum = spark.cores.max/spark.executor.cores 

每個executor的配置:

3core,15G RAM

消耗的內存資源為:105G RAM

15G*7=105G

可以發現使用的資源并沒有提升,但是同樣的任務原來的配置跑幾個小時還在卡著,改了配置后幾分鐘就能完成。

2.Executor&Task Lost

錯誤提示

  1. executor lost

    WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local):
    ExecutorLostFailure (executor lost)
    
  2. task lost

    WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217):
    java.io.IOException: Connection from /192.168.47.217:55483 closed
    
  3. 各種timeout

    java.util.concurrent.TimeoutException: Futures timed out after [120 second]ERROR TransportChannelHandler: Connection to /192.168.47.212:35409 
    has been quiet for 120000 ms while there are outstanding requests.
    Assuming connection is dead; please adjust spark.network.
    timeout if this is wrong
    

解決

由網絡或者gc引起,worker或executor沒有接收到executor或task的心跳反饋。
提高 spark.network.timeout 的值,根據情況改成300(5min)或更高。
默認為 120(120s),配置所有網絡傳輸的延時,如果沒有主動設置以下參數,默認覆蓋其屬性

  • spark.core.connection.ack.wait.timeout
  • spark.akka.timeout
  • spark.storage.blockManagerSlaveTimeoutMs
  • spark.shuffle.io.connectionTimeout
  • spark.rpc.askTimeout or spark.rpc.lookupTimeout

3.傾斜

錯誤提示

  1. 數據傾斜

    數據傾斜

  2. 任務傾斜
    差距不大的幾個task,有的運行速度特別慢。

解決

大多數任務都完成了,還有那么一兩個任務怎么都跑不完或者跑的很慢,分為數據傾斜和task傾斜兩種。

  1. 數據傾斜
    數據傾斜大多數情況是由于大量的無效數據引起,比如null或者”“,也有可能是一些異常數據,比如統計用戶登錄情況時,出現某用戶登錄過千萬次的情況,無效數據在計算前需要過濾掉。
    數據處理有一個原則,多使用filter,這樣你真正需要分析的數據量就越少,處理速度就越快。

    sqlContext.sql("...where col is not null and col != ''")
    

    具體可參考:
    解決spark中遇到的數據傾斜問題

  2. 任務傾斜
    task傾斜原因比較多,網絡io,cpu,mem都有可能造成這個節點上的任務執行緩慢,可以去看該節點的性能監控來分析原因。以前遇到過同事在spark的一臺worker上跑R的任務導致該節點spark task運行緩慢。
    或者可以開啟spark的推測機制,開啟推測機制后如果某一臺機器的幾個task特別慢,推測機制會將任務分配到其他機器執行,最后Spark會選取最快的作為最終結果。

    • spark.speculation true
    • spark.speculation.interval 100 - 檢測周期,單位毫秒;
    • spark.speculation.quantile 0.75 - 完成task的百分比時啟動推測
    • spark.speculation.multiplier 1.5 - 比其他的慢多少倍時啟動推測。

4.OOM

錯誤提示

堆內存溢出

java.lang.OutOfMemoryError: Java heap space

解決

內存不夠,數據太多就會拋出OOM的Exeception,主要有driver OOM和executor OOM兩種

  1. driver OOM
    一般是使用了collect操作將所有executor的數據聚合到driver導致。盡量不要使用collect操作即可。

  2. executor OOM
    可以按下面的內存優化的方法增加code使用內存空間

    • 增加executor內存總量,也就是說增加spark.executor.memory的值
    • 增加任務并行度(大任務就被分成小任務了),參考下面優化并行度的方法

5.task not serializable

錯誤提示

org.apache.spark.SparkException: Job aborted due to stage failure: 
Task not serializable: java.io.NotSerializableException: ...

解決

如果你在worker中調用了driver中定義的一些變量,Spark就會將這些變量傳遞給Worker,這些變量并沒有被序列化,所以就會看到如上提示的錯誤了。

val x = new X()  //在driver中定義的變量
dd.map{r => x.doSomething(r) }.collect  //map中的代碼在worker(executor)中執行

除了上文的map,還有filter,foreach,foreachPartition等操作,還有一個典型例子就是在foreachPartition中使用數據庫創建連接方法。這些變量沒有序列化導致的任務報錯。

下面提供三種解決方法:

  1. 將所有調用到的外部變量直接放入到以上所說的這些算子中,這種情況最好使用foreachPartition減少創建變量的消耗。
  2. 將需要使用的外部變量包括sparkConf,SparkContext,都用 @transent進行注解,表示這些變量不需要被序列化
  3. 將外部變量放到某個class中對類進行序列化。

6.driver.maxResultSize太小

錯誤提示

Caused by: org.apache.spark.SparkException:Job aborted due to stage failure: Total size of serialized results of 374 tasks (1026.0 MB) is bigger thanspark.driver.maxResultSize (1024.0 MB)

解決

spark.driver.maxResultSize默認大小為1G 每個Spark action(如collect)所有分區的序列化結果的總大小限制,簡而言之就是executor給driver返回的結果過大,報這個錯說明需要提高這個值或者避免使用類似的方法,比如countByValue,countByKey等。

將值調大即可

spark.driver.maxResultSize 2g

7.taskSet too large

錯誤提示

WARN TaskSetManager: Stage 198 contains a task of very large size (5953 KB). The maximum recommended task size is 100 KB.

這個WARN可能還會導致ERROR

Caused by: java.lang.RuntimeException: Failed to commit taskCaused by: org.apache.spark.executor.CommitDeniedException: attempt_201603251514_0218_m_000245_0: Not committed because the driver did not authorize commit

解決

如果你比較了解spark中的stage是如何劃分的,這個問題就比較簡單了。
一個Stage中包含的task過大,一般由于你的transform過程太長,因此driver給executor分發的task就會變的很大。
所以解決這個問題我們可以通過拆分stage解決。也就是在執行過程中調用cache.count緩存一些中間數據從而切斷過長的stage。

8. driver did not authorize commit

driver did not authorize commit

9. 環境報錯

  1. driver節點內存不足
    driver內存不足導致無法啟動application,將driver分配到內存足夠的機器上或減少driver-memory

    Java HotSpot(TM) 64-Bit Server VM warning: INFO:
    

    os::commit_memory(0x0000000680000000, 4294967296, 0) failed;
    error=’Cannot allocate memory’ (errno=12)

  2. hdfs空間不夠
    hdfs空間不足,event_log無法寫入,所以 ListenerBus會報錯 ,增加hdfs空間(刪除無用數據或增加節點)

    Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):File /tmp/spark-history/app-20151228095652-0072.inprogress could only be replicated to 0 nodes instead of minReplication (=1)ERROR LiveListenerBus: Listener EventLoggingListener threw an exception
    java.lang.reflect.InvocationTargetException
    
  3. spark編譯包與hadoop版本不一致
    下載對應hadoop版本的spark包或自己編譯。

    java.io.InvalidClassException: org.apache.spark.rdd.RDD;local class incompatible: stream classdesc serialVersionUID
    
  4. driver機器端口使用過多
    在一臺機器上沒有指定端口的情況下,提交了超過15個任務。

    16/03/16 16:03:17 ERROR SparkUI: Failed to bind SparkUI
    java.net.BindException: 地址已在使用: Service 'SparkUI' failed after 16 retries!
    

    提交任務時指定app web ui端口號解決:

    --conf spark.ui.port=xxxx
    

三. 一些python錯誤

1.python版本過低

java.io.UIException: Cannot run program "python2.7": error=2,沒有那個文件或目錄

spark使用的python版本為2.7,centOS默認python版本為2.6,升級即可。

2.python權限不夠

錯誤提示

部分節點上有錯誤提示

java.io.IOExeception: Cannot run program "python2.7": error=13, 權限不夠

解決

新加的節點運維裝2.7版本的python,python命令是正確的,python2.7卻無法調用,只要改改環境變量就好了。

3.pickle使用失敗

錯誤提示

TypeError: ('__cinit__() takes exactly 8 positional arguments (11 given)',<type 'sklearn.tree._tree.Tree'>, (10, array([1], dtype=int32), 1,<sklearn.tree._tree.RegressionCriterion object at 0x100077480>,50.0, 2, 1, 0.1, 10, 1, <mtrand.RandomState object at 0x10a55da08>))

解決

該pickle文件是在0.17版本的scikit-learn下訓練出來的,有些機器裝的是0.14版本,版本不一致導致,升級可解決,記得將老版本數據清理干凈,否則會報各種Cannot import xxx的錯誤。

四. 一些優化

1. 部分Executor不執行任務

有時候會發現部分executor并沒有在執行任務,為什么呢?

(1) 任務partition數過少,
要知道每個partition只會在一個task上執行任務。改變分區數,可以通過 repartition 方法,即使這樣,在 repartition 前還是要從數據源讀取數據,此時(讀入數據時)的并發度根據不同的數據源受到不同限制,常用的大概有以下幾種:

hdfs - block數就是partition數
mysql - 按讀入時的分區規則分partition
es - 分區數即為 es 的 分片數(shard)

(2) 數據本地性的副作用

taskSetManager在分發任務之前會先計算數據本地性,優先級依次是:

process(同一個executor) -> node_local(同一個節點) -> rack_local(同一個機架) -> any(任何節點)

Spark會優先執行高優先級的任務,任務完成的速度很快(小于設置的spark.locality.wait時間),則數據本地性下一級別的任務則一直不會啟動,這就是Spark的延時調度機制。

舉個極端例子:運行一個count任務,如果數據全都堆積在某一臺節點上,那將只會有這臺機器在長期計算任務,集群中的其他機器則會處于等待狀態(等待本地性降級)而不執行任務,造成了大量的資源浪費。

判斷的公式為:

curTime – lastLaunchTime >= localityWaits(currentLocalityIndex)

其中 curTime 為系統當前時間,lastLaunchTime 為在某優先級下最后一次啟動task的時間

如果滿足這個條件則會進入下一個優先級的時間判斷,直到 any,不滿足則分配當前優先級的任務。

數據本地性任務分配的源碼在 taskSetManager.scala

如果存在大量executor處于等待狀態,可以降低以下參數的值(也可以設置為0),默認都是3s。

spark.locality.wait
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack

當你數據本地性很差,可適當提高上述值,當然也可以直接在集群中對數據進行balance。

2. spark task 連續重試失敗

有可能哪臺worker節點出現了故障,task執行失敗后會在該 executor 上不斷重試,達到最大重試次數后會導致整個 application 執行失敗,我們可以設置失敗黑名單(task在該節點運行失敗后會換節點重試),可以看到在源碼中默認設置的是 0,

private val EXECUTOR_TASK_BLACKLIST_TIMEOUT =conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L)  

spark-default.sh 中設置

spark.scheduler.executorTaskBlacklistTime 30000

task 在該 executor 運行失敗后會在其它 executor 中啟動,同時此 executor 會進入黑名單30s(不會分發任務到該executor)。

3. 內存

如果你的任務shuffle量特別大,同時rdd緩存比較少可以更改下面的參數進一步提高任務運行速度。

spark.storage.memoryFraction - 分配給rdd緩存的比例,默認為0.6(60%),如果緩存的數據較少可以降低該值。
spark.shuffle.memoryFraction - 分配給shuffle數據的內存比例,默認為0.2(20%)
剩下的20%內存空間則是分配給代碼生成對象等。

如果任務運行緩慢,jvm進行頻繁gc或者內存空間不足,或者可以降低上述的兩個值。
"spark.rdd.compress","true" - 默認為false,壓縮序列化的RDD分區,消耗一些cpu減少空間的使用

4. 并發

mysql讀取并發度優化

spark.default.parallelism
發生shuffle時的并行度,在standalone模式下的數量默認為core的個數,也可手動調整,數量設置太大會造成很多小任務,增加啟動任務的開銷,太小,運行大數據量的任務時速度緩慢。

spark.sql.shuffle.partitions
sql聚合操作(發生shuffle)時的并行度,默認為200,如果該值太小會導致OOM,executor丟失,任務執行時間過長的問題

相同的兩個任務:
spark.sql.shuffle.partitions=300:

并行度300

spark.sql.shuffle.partitions=500:

并行度500

速度變快主要是大量的減少了gc的時間。

但是設置過大會造成性能惡化,過多的碎片task會造成大量無謂的啟動關閉task開銷,還有可能導致某些task hang住無法執行。

這里寫圖片描述

修改map階段并行度主要是在代碼中使用rdd.repartition(partitionNum)來操作。

5. shuffle

spark-sql join優化
map-side-join 關聯優化

6. 磁盤

磁盤IO優化

7.序列化

kryo Serialization

8.數據本地性

Spark不同Cluster Manager下的數據本地性表現
spark讀取hdfs數據本地性異常

9.代碼

編寫Spark程序的幾個優化點

轉載于:https://www.cnblogs.com/xiaomaohai/p/6158031.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/374319.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/374319.shtml
英文地址,請注明出處:http://en.pswp.cn/news/374319.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

在MySQL上使用帶密碼的GlassFish JDBC安全性

我在該博客上最成功的文章之一是有關在GlassFish上使用基于表單的身份驗證來建立JDBC安全領域的文章 。 對這篇文章的一些評論使我意識到&#xff0c;要真正使它安全&#xff0c;應該做的還很多。 開箱即用的安全性 圖片&#xff1a; TheKenChan &#xff08; CC BY-NC 2.0 &a…

mgo寫入安全機制

mgo寫入安全機制 mongo寫入安全mgo寫入安全mongo寫入安全 mongo本身也有一整套的寫入安全機制,但是在這篇的內容里只介紹一小部分相關部分.先放一個鏈接可以跳過本節不看直接看這個 鏈接. WriteConcern.NONE:沒有異常拋出WriteConcern.NORMAL:僅拋出網絡錯誤異常&#xff0c;沒…

C學習雜記(二)筆試題:不使用任何中間變量如何將a、b的值進行交換

常見的方法如下 void swap1(int *a, int *b) {int temp *a;*a *b;*b temp; } 不使用中間變量的方法 void swap2(int *a, int *b) {*a *a *b;*b *a - *b;*a *a - *b; } 這種方法是不可取的&#xff0c;因為ab和a-b的運算可能會導致數據溢出。 void swap3(int *a, in…

利用python進行數據分析_利用python進行數據分析復現(1)

&#xfeff;一直以來&#xff0c;都想學習python數據分析相關的知識&#xff0c;總是拖拖拉拉&#xff0c;包括這次這個分享也是。《利用python進行數據分析 第2版》是一次無意之間在簡書上看到的一個分享&#xff0c;我決定將很詳細。一直都想著可以復現一下。但總有理由&…

在運行時交換出Spring Bean配置

如今&#xff0c;大多數Java開發人員都定期與Spring打交道&#xff0c;而我們當中的許多人已經熟悉了Spring的功能和局限性。 最近&#xff0c;我遇到了一個我從未遇到過的問題&#xff1a;引入了基于運行時引入的配置來重新連接Bean內部的功能。 這對于簡單的配置更改或交換掉…

Proximal Algorithms--Accelerated proximal gradient method

4.3 Accelerated proximal gradient method&#xff1a; 加速近端梯度方法&#xff1a; 基本的近端梯度方法的所謂的“加速”版本&#xff0c;就是在算法中包含了一個外推(extrapolation)步驟&#xff0c;一個簡單的版本是&#xff1a; yk1:xkωk(xk?xk?1)xk1:proxλkg(yk1?…

C語言代碼規范(七)#define

#define 宏定義的使用 #define MAX(x, y) ( ((x) > (y)) ? (x) : (y) ) #define MIN(x, y) ( ((x) < (y)) ? (x) : (y) ) 在宏定義中要把參數用括號擴起來( ((x) > (y)) ? (x) : (y) )。 因為宏只是簡單的文本替換&#xff0c;如果不注意&#xff0c;很容…

http 二進制_淺談HTTP協議

HTTP一、HTTP協議http協議&#xff0c;是超文本傳輸協議&#xff0c;此協議是基于TCP/IP的協議&#xff0c;是互聯網上應用最為廣泛的一直網絡協議是一種無狀態協議&#xff0c;默認端口為80,。設計HTTP的最初目的是為了提供一種發布和接受HTML頁面的方法。通過HTTP或者HTTPS協…

登陸注冊

登陸注冊&#xff0c;注冊的賬號存在服務器的數據庫里&#xff0c;成功了就給你返回成功&#xff0c;失敗了就返回失敗 有三種登陸方式&#xff1a;普通注冊&#xff0c;手機號注冊&#xff0c;第三方注冊轉載于:https://www.cnblogs.com/SensenCoder/p/4885606.html

Java并發教程–線程池

Java 1.5中提供的最通用的并發增強功能之一是引入了可自定義的線程池。 這些線程池使您可以對諸如線程數&#xff0c;線程重用&#xff0c;調度和線程構造之類的東西進行大量控制。 讓我們回顧一下。 首先&#xff0c;線程池。 讓我們直接進入java.util.concurrent.ExecutorSer…

HTTPPost/AFNetWorking/JSONModel/NSPredicate

一、HTTPPost 1. POST方式發送請求 HTTP協議下默認數據發送請求方法是GET方式&#xff0c;若需要使用POST方法&#xff0c;則需要對發送的請求也就是request對象&#xff0c;進行屬性設置。 步驟如下&#xff1a; > 要發送的請求對象&#xff0c;需要使用可變請求對象 [[NSM…

C語言代碼規范(八)使用const修飾值不允許改變的變量

使用const限定一個變量的值不允許被改變&#xff0c;從而保護被修飾的東西&#xff0c;防止意外&#xff0c;提高程序的可靠性和安全性。

教育小思

父母的時代是“攢錢&#xff0c;買房&#xff0c;生子&#xff0c;終老”&#xff0c;而現在的時代是“教育&#xff0c;創造&#xff0c;傳承&#xff0c;成長”。 改變世界&#xff0c;從教育起步。 傳統教育的不足之處&#xff1a; 1. 學習體驗不佳&#xff0c;學習者被迫…

linux redis客戶端_為什么單線程Redis能那么快?

我們通常說&#xff0c;Redis 是單線程&#xff0c;主要是指 Redis 的網絡 IO 和鍵值對讀寫是由一個線程來完成的&#xff0c;這也是 Redis 對外提供鍵值存儲服務的主要流程。但 Redis 的其他功能&#xff0c;比如持久化、異步刪除、集群數據同步等&#xff0c;其實是由額外的線…

servlet中文亂碼處理

servlet中文亂碼處理 如果是post設置req.setCharacterEncoding("utf-8");如果是get&#xff0c;不去修改服務器配置的情況下new String(name.getBytes("iso-8859-1"),"utf-8")數據庫亂碼?useUnicodetrue&characterEncodingUTF-8轉載于:http…

C語言開發筆記(七)const和指針

const修飾變量是常用的&#xff0c;不容易犯錯&#xff0c;而const和指針一起使用時很容易混淆。 (一)const int *p #include <stdio.h>int main(void) {int a 10;int b 20;const int *p &a;*p b;return 0; } const在int *的左側&#xff0c;即指針指向內容為…

從JavaFX 1.3遷移到JavaFX 2.0

幾天前&#xff0c;我完成了將Modellus的源代碼從JavaFX 1.3腳本遷移到JavaFX 2.0 Java語言的過程。 因此&#xff0c;我認為寫關于我在此過程中學到的知識會很好。 我想指出&#xff0c;如果您想繼續在JavaFX 2.0中使用JavaFX腳本&#xff0c;則可以使用Visage&#xff1a; ht…

九度OJ 1034:尋找大富翁 (排序)

時間限制&#xff1a;1 秒 內存限制&#xff1a;32 兆 特殊判題&#xff1a;否 提交&#xff1a;5925 解決&#xff1a;2375 題目描述&#xff1a;浙江桐鄉烏鎮共有n個人,請找出該鎮上的前m個大富翁.輸入&#xff1a;輸入包含多組測試用例.每個用例首先包含2個整數n&#xff08…

ubuntu php 無法執行exec_利用webhook使php項目自動部署

php中文網最新課程每日17點準時技術干貨分享1.先來講一下自動部署的原理&#xff0c;一般在我們push代碼的時候&#xff0c;可以自動請求webhook中設置的url&#xff0c;完成一次請求與響應。那么只要我們設置的url地址請求的php文件內容是執行命令行git push命令&#xff0c;則…

android-verticalseekbar——Android可視化SeekBar類庫

android-verticalseekbar——Android可視化SeekBar類庫轉載于:https://www.cnblogs.com/zhujiabin/p/5706246.html