115、Spark的任務執行流程
driver和executor,結構式一主多從模式,
driver:spark的驅動節點,用于執行spark任務中的main方法,負責實際代碼的執行工作;主要負責:將代碼邏輯轉換為任務、在executor之間調度任務、跟蹤executor的執行情況。
Executor:spark的執行節點,是jvm的一個進程,負責在spark作業中運行具體的任務,任務之間相互獨立,spark應用啟動時候,executor節點被同時啟動,伴隨整個spark應用的生命周期而存在;
主要功能:負責運行組成spark應用的任務,并將結果返回給驅動進程、
在任務提交之后,都先啟動driver,然后driver向集群管理中注冊應用程序,根據任務的配置文件分配executor并啟動,然后driver等待資源滿足,執行main函數;spark為懶加載,當執行到action算子時候才開始真正執行,開始反向推算,根據寬依賴進行stage的劃分,隨后每一個stage對應一個個taskset,一個taskset中有多個task,task會被指定到executor中執行,
116、Spark提交job的流程
117、Spark的階段劃分
spark的階段劃分分為兩個階段:轉換階段和動作階段,分別對應轉換算子和行動算子
每遇到一個寬依賴就劃分一個stage
在一個stage內部會有很多task被執行,同一個stage中所有的task結束之后才能根據DAG依賴執行下一個stage中的task
階段:劃分stage的依據就是RDD之間的寬窄依賴,遇到寬依賴就劃分stage,每個stage包含一個或者多個task任務,stage是由一組并行的task組成;切割規則:遇到寬依賴就切割stage(遇到一個shuffle就轉為一個新的階段)
階段的劃分等于shuffle依賴的數量+1
根據行動算子劃分job、根據shuffle劃分stage、根據RDD的分區數劃分Task(Job>stage>task,一個job中可以有多個stage,一個stage中可以多個task)
118、Sparkjoin的分類
根據join操作方式進行分類,分為shuffle join和broadcastjoin
Shuffle join:spark將參與join操作的數據集按照join的條件進行分區,并將具有相同鍵的數據分發到同一個節點上進行join操作
Broadcast join:spark將一個較小的數據集復制到每一個節點的內存中,然后將參與join操作的大數據集分發到各個節點上進行join操作(通常進行大表join小表)
119、spark mapjoin的實現原理
Map join在內存中將兩個數據集進行連接,從而避免磁盤io的開銷
1、數據劃分:spark將兩個數據集劃分為多個分區,每個分區的數據流盡可能均勻
2、數據廣播:spark將其中一個較小的數據集廣播到每一個節點的內存中
3、分區處理:每個節點接收到廣播的數據后,將其本地的另一個數據集進行聯接操作
4、結果匯總:每個節點將自己的結果發送到驅動節點,由驅動節點進行最終的節點匯總(map join適用于,兩個數據集至少有一個可以完全放入內存中)
120、spark shuffle以及優點
可以用于在數據分區過程中重新分配和重組數據,在spark執行對數據進行重分區或者聚合操作時候,將數據重新發送到不同的節點上,進行下一步的計算
優點:
數據本地性:shuffle可以在節點之間移動數據,以便在計算過程中最大限度地利用數據本地性,減少數據傳輸過程中的開銷
分布式計算:shuffle運行spark在多個節點之間執行計算的時候,從而實現了分布式計算的能力
補充:Spark的shuffle怎么了解?能講講Spark的shuffle的過程嗎?
121、什么時候會產生shuffle
數據重分區:需要將數據重新分區進行后續的數據處理操作時候
聚合操作:當需要對數據進行聚合操作時候,會使用到shuffle操作
排序操作:需要對數據進行排序的時候,使用shuffle
122、spark為什么適合迭代處理
spark是基于內存計算的,存儲的數據在內存中,而不是在磁盤上,從而提高了數據處理的速度
可以保留中間結果:RDD可以內存中保留中間結果,對于迭代處理來說,每次迭代都是可以重用中間結果,而不是重新計算基于DAG執行引擎:
123、Spark為什么快?
1、spark是基于內存計算,MR是基于磁盤計算
2、spark中具有DAG有向無環圖,在此過程中減少了shuffle以及落地磁盤的次數
3、spark是粗粒度的資源申請,也就是當提交了spark application時候,application會將所有資源申請完畢,task在執行的時候就不需要申請資源,task執行快,當最后一個task執行完之后才會被釋放;MR是細粒度的資源申請,task需要自己申請資源并釋放,故application執行比較緩慢;
124、Spark數據傾斜問題,如何定位,解決方案?
spark中數據傾斜主要是指shuffle過程中出現的數據傾斜問題,不同的key對應的數據量不同導致不同的task處理的數據量不同的問題
數據傾斜是指少數的task被分配了極大量的數據,少數task運行緩慢
解決方案:
增加分區:如果數據分布不均勻,可以增加分區數,使得數據能夠更加均勻地分配到不同的分區中
重新分桶/哈希:對于鍵值對沖突的情況,嘗試重新分桶或者通過哈希函數重新計算鍵值,使得數據分布均勻
增加緩存:對于某些數據,可以將其緩存到內存中,減少重復計算
隨機前綴/后綴:對于簡直沖突的情況,增加鍵的前綴或者后綴,降低沖突
傾斜數據單獨處理:
補充:美團梳理spark解決數據傾斜的問題
數據傾斜原理簡單:在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,如果某一個key對應的數據量特別大的話,就會發生數據傾斜,大部分key對應10條數據,個別key對應的100W條數據,導致運行task結束事件不同,因此,整個saprk作業的運行進度是由運行時間最長的那個task決定的。
了解了spark的stage劃分原理,有助于快速定位數據傾斜發生的位置查看導致數據傾斜的key的數據分布情況:
1、如果是spark SQL中的group by、join語句導致的數據傾斜,那么就查詢一下SQL中使用的表的key的分布情況。
2、如果是對SparkRDD執行的shuffle算子導致的數據傾斜,那么可以在Spark作業中加入查看key分布的代碼,比如RDD.countByKey()。然后對統計出來的各個key出現的次數,collect/take到客戶端打印一下,就可以看到key的分布情況。
解決方案
1、使用hive
ETL預處理數據:評估是否可以通過hive進行數據預處理,從根源上解決了數據傾斜問題,徹底避免了spark中執行的shuffle類算子,我們只是把數據傾斜提升到了hive中,避免了spark中發生數據傾斜。2、過濾少數導致傾斜的key:如果發現傾斜的key就少數幾個,并且對計算本身的影響不大,就可以直接過濾少數幾個key;如果每次執行作業時候,動態判定哪些key的數據量最多然后再進行過濾,可以使用sample算子對RDD進行采樣,計算每個key的數量,取數據量最多的key過濾即可。
3、提高shuffle操作的并行度:這是最簡單的一種方案,對RDD執行shuffle算子時候,給shuffle傳入一個參數,該參數就設置了shuffle算子執行時候的reduce
task數量;在實際時候治標不治本,無法徹底解決數據傾斜的問題。
4、局部聚合和全局聚合:第一次是局部聚合,先給每個key都打上一個隨機數,然后對打上隨機數后的數據執行reducebykey的操作,進行局部聚合,然后將各個key的前綴去掉,再進行全局聚合操作;對于聚合類的shuffle操作導致的數據傾斜,效果很好,可以大幅度甚至解決數據傾斜問題。
5、將reduce join轉換為map
join:不使用join算子進行連接操作,而使用Broadcast變量與map類算子實現join操作,進而完全規避掉shuffle類的操作,徹底避免數據傾斜的發生和出現。將較小RDD中的數據直接通過collect算子拉取到Driver端的內存中來,然后對其創建一個Broadcast變量;接著對另外一個RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照連接key進行比對,如果連接key相同的話,那么就將兩個RDD的數據用你需要的方式連接起來;普通的join是會走shuffle過程的,而一旦shuffle,就相當于會將相同key的數據拉取到一個shuffle
read task中再進行join,此時就是reduce
join。但是如果一個RDD是比較小的,則可以采用廣播小RDD全量數據+map算子來實現與join同樣的效果,也就是map
join,此時就不會發生shuffle操作,也就不會發生數據傾斜。 6、采樣傾斜key并分拆join操作
7、隨機前綴和擴容RDD進行join
125、spark中的寬窄依賴
兩個相鄰的RDD之間的依賴關系(寬、窄依賴是根據上下游RDD的分區而言的)
寬依賴:上游一個RDD的partition可以被下游RDD的多個partition依賴
窄依賴:上游一個RDD的partition可以被下游RDD的多個partition依賴
RDD不會保存數據,只會保存血緣關系。提高容錯性,將RDD之間的關系恢復重新進行讀取
126、spark join在什么情況下會變成窄依賴
當兩個RDD進行join時候,分區方式以及分區數目相同,并且每個分區中的數據量也相當,這樣就將每個分區數據進行一對一匹配,形成窄依賴
當進行shuffle操作的key值較少時候,通過增大分區來減少每個分區中數據量,使得每個分區的數據量相對較少,
127、spark的內存模型
spark是基于分布式內存計算的,由dirver和executor
spark內存分為堆內內存和堆外內存,堆內內存基于JVM內存模型,堆外內存則通過調用底層JDK unsafeAPI
1、堆內內存
其大小由spark應用程序啟動時候的-executor-momery參數配置,executor運行的并發任務共享JVM堆內內存,該任務在緩存RDD數據和廣播數據時占用的內存被規劃為存儲內存(Storage),這些任務在執行shuffle時占用的內存被規劃為執行內存(Execution)
2、堆外內存
可以直接在工作節點的系統內存中開辟空間,spark可以直接操作堆外內存,減少了不必要的內存開銷和頻繁的垃圾掃描,默認情況下不開啟
128、為什么要劃分寬窄依賴
目的在于執行計算中進行優化。spark通過識別窄依賴來執行一些優化,在同一個節點上對多個窄依賴的轉化操作進行合并,從而減少網絡傳輸的開銷。對于寬依賴,spakr會根據分區的數量和大小來據欸的那個是否進行數據重分區。
129、spark中的轉換算子和行動算子的區分
轉換算子得到的是一個新的RDD,但不會立即執行計算,只是記錄下當前的操作,
行動算子是指觸發RDD進行計算的操作,(所以spark中作業的劃分是根據行動算子來確定的)
130、Spark的哪些算子會有shuffle過程?
groupByKey:將具有相同鍵的鍵值對分組到一起,必須進行shuffle以重新分配數據到不同的分區。
reduceByKey:對具有相同鍵的鍵值對進行聚合操作,需要將具有相同鍵的數據重新分配到不同的分區。
sortByKey:按照鍵對數據進行排序,需要將數據重新分區以進行排序。
join:將兩個具有相同鍵的數據集進行連接操作,需要將具有相同鍵的數據重新分配到不同的分區。
distinct:去除數據集中的重復元素,需要對元素進行重新分區以進行重復元素的合并。
cogroup:將具有相同鍵的數據集進行分組,需要將具有相同鍵的數據重新分配到不同的分區。
131、Spark有了RDD,為什么還要有Dataform和DataSet?
引入DF和DS是為了實現更高級的數據處理和優化
RDD是強類型的,它在編譯時候無法檢查數據類型的準確性,如果在運行過程中類型不匹配,只能在運行時拋出。DF和DS是基于RDD的抽象,提供了更加攻擊的類型安全性,允許編譯器在編譯時候檢查數據類型的準確性
RDD是基于函數式編程的,需要手動編寫復雜的轉換和操作邏輯。DF和DS提供了基于SQL的高級抽象,可以使用sql語句進行數據操作
132、Spark的RDD、DataFrame、DataSet、DataStream區別?
RDD式彈性分布式數據集,是基于分區進行操作,通過轉換算子和行動算子來進行數據處理
DF是一種以結構化數據為中心的數據抽象概念,DF是一個分布式數據,具有類似關系型數據庫表的結構
DS式DF的擴展,提供類型安全和更高級的API,強類型的數據集合
補充:spark中dataframe表格的類型
createGlobalTempView:全局臨時視圖,spark中sql的臨時視圖是session級別的,會隨著session的消失而消失,如果希望一個臨時視圖跨session而存在,可以建立一個全局臨時視圖,全局臨時視圖存在于系統數據庫global_temp中,必須加上庫名引用它;
createOrReplaceGlobalTempView:創建一個可替換的全局視圖,
createTempView:臨時視圖
createOrReplaceTempView:創建一個臨時視圖,如果該視圖已經存在,則替換它,session級別的
補充:RDD的彈性體現在哪些方面
-
自動進行內存和磁盤切換
-
基于lineage(血緣關系)的高效容錯(出錯時候可以進行恢復)task如果失敗會特定次數的重試
-
stage如果失敗會自動進行特定次數的重試,而且只會計算失敗的分片
-
checkpoint:每次對RDD操作都會產生新的RDD,如果鏈條比較長,就算笨重,就把數據放在磁盤中
-
persist:內存或磁盤中對數據進行復用
133、Spark的Spark Application、Job、Stage、Task分別介紹下,如何劃分?
application(應用):一個獨立的spark作業是由一系列的tasks組成的,一個application通常包含多個任務,每個作業由一個或者多個RDD轉換和操作組成。提交一個任務就是一個application
Job(作業):job是一組相互依賴的RDD轉化和動作操作的有向無環圖,一個job代表了一個完整的作業執行流程(一個action算子就會生成一個job)
Stage(階段):stage是job的劃分,一個job可以由多個stage組成,stage是根據RDD之間的寬窄依賴劃分的,一個stage中的所有任務都可以并行執行,不同的stage之間的任務需要等待前一個stage的任務完成
Task(任務):task是最小的作業單元,每個stage包含多個任務,每個任務負責處理一個RDD分區的數據(一個stage中,最后一個RDD的分區個數就是task的個數)
Job代表一個完整的作業執行過程,Stage是Job的劃分,根據RDD之間的寬依賴關系劃分,Task是Stage的執行單元,負責對RDD進行實際的操作和計算
注意:Application->Job->Stage->Task每一層都是1對n的關系。
134、Stage的內部邏輯
stage是由一個具有相同寬依賴關系的RDD組成的,一個stage可以看作一個邏輯的劃分
內部邏輯:
1、DAG生成:在stage內部,spark會根據RDD之間的依賴關系生成一個有向無環圖
2、任務劃分:會將每個stage劃分為多個task,每個task對應的RDD的分區
3、任務調度:spark會將task調度到集群中的執行器上執行
4、任務執行
5、數據傳輸
135、spark為什么要劃分stage
劃分satge的目的是為了優化任務的執行過程,提高計算性能和效率
136、stage的數量等于什么
stage的數量等于寬依賴的個數+1
137、Spark容錯機制?
138、RDD的容錯機制
RDD的容錯性是指其發射發生故障能夠自動恢復,并且不會丟失任何數據
容錯實現方式:
1、數據復制:RDD將數據劃分為多個分區,并將每個分區的數據復制到集群的多個節點上
139、Spark廣播變量的實現和原理?
廣播變量是一種分布式共享變量,允許開發者在每個節點上緩存一個只讀變量,而不是將其復制到每個任務中,用于在每個節點上緩存一個較大的數據集,方便在執行任務期間共享數據
在多個并行操作中使用同一個變量,但是 Spark 會為每個任務分別發送。
140、轉換算子
1、map
將處理的數據逐條進行映射轉換,可以是類型的轉換,也可以是值的轉換。不會減少或者增多數據
2、mapPartitions
將待處理的數據以分區為單位發送到計算節點上進行處理,可以減少或者增多數據
Map 算子是分區內一個數據一個數據的執行,類似于串行操作。而 mapPartitions 算子 是以分區為單位進行批處理操作。
比如,將RDD中的所有數據通過JDBC連接寫入數據庫,如果使用map函數,可能要為每一個元素都創建一個connection,這樣開銷很大,如果使用mapPartitions,那么只需要針對每一個分區建立一個connection
3、mapPartitionsWithIndex
將待處理的數據以分區為單位發送到計算節點進行處理,并且可以獲取當前分區索引
4、flatmap
將待處理的數據進行扁平化后再映射
5、glom
將RDD中的分區數據直接轉換為相同類型的RDD,分區不變 將每一個分區形成一個數組,形成新的RDD類型時RDD[Array[T]]
6、Groupby
根據指定的規則進行分組,分區默認不變,數據會被打亂重新組合,一個組的數據在一個分區中,涉及shuffle
7、filter
根據指定規則進行篩選過濾,符合規則的數據保留,不符合的數據丟棄
8、sample
- 從數據集中抽取數據,采樣從大規模數據中抽取數據
- 第一個參數:抽取數據后是否將數據放回
- 第二個參數:數據源中每條數據被抽取的概率(如果抽取不放回,表示數據源中每條數據被抽取的概率)(如果抽取放回的場合:表示數據源中每條數據被抽取的可能次數)
- 第三個參數:表示隨機算法的種子
9、distinct
數據去重。
10、coalesce
縮減分區,用于大數據集過濾以后,提高小數據集的執行效率
11、repartition
擴大分區,內部還是執行的coalesce算子,只是默認執行shuffle操作,沒有shuffle操作的話,就沒有意義
12、sortBy算子
按照一定的規則進行排序
13、交集并集差集拉鏈
intersection、union、subtract、zip都是針對兩個value類型的
在交集、并集、補集、差集中,數據類型必須一致
在zip中,數據類型可以不一致,但是數據的個數一定要一樣
14、partitionBy
15、reduceByKey
(a,1)(a,1)(b,1)(b,1)(b,1)(b,1)
按照指定的鍵,對value做聚合(a,2),(b,3)
支持分區內預聚合,可以有效減少shuffle時落盤的數據量
分區內和分區間計算規則是相同的
16、groupByKey
按照指定的鍵,將value聚合成一個迭代器 (a,(1,2,3))
類比groupBy,(a,((a,1), (a,2), (a,3)))
reduceByKey對比groupByKey
都存在shuffle的操作,但是reduceByKey可以在shuffle前對分區內的數據進行預聚合,這樣會減少落盤的數據量。
groupByKey只是進行分組,不存在數據量減少的問題
reduceByKey性能高
reduceByKey包含分組和聚合的功能,GroupByKey只能分組,不能聚合
17、aggregateByKey
分區內計算和分區間的計算規則可以不同,自己定義
第一個參數:表示計算的初始值
第二個參數列表:
分區內的計算規則
分區間的計算規則
18、foldByKey
如果分區內和分區間的計算規則相同了,那么就是用foldByKey算子
19、combinByKey
是一個通用的聚合操作,
reduceByKey、foldByKey、aggregateByKey、combineByKey 的區別?
行動算子
count、countbykey、countbyvalue
141、reduceByKey和groupByKey的區別和作用?
reducebykey將具有相同的鍵的值進行聚合,并返回一個新的鍵值RDD,
groupbykey將具有相同的鍵的所有值分組,并返回一個新的鍵值對RDD
從shuffle的角度:兩者都存在shuffle操作,但是reducebykey可以在shuffle前對分區內相同的key的數據進行預聚合,減少落盤的數量,groupbykey只是進行分組,不存在數據量減少的問題,前者性能較高
功能角度:reducebykey只包含了分組和聚合的功能,后者只能分組
142、reducebykey和reduce的區別
兩者都是進行聚合操作的方法
reducebykey是轉換算子,將RDD中具有相同鍵的元素進行聚合,返回一個新的RDD,并將結果作為新的鍵
reduce是一個行動算子,它將RDD中所有元素進行聚合,并返回一個單個的結果
reduceByKey適用于對鍵值對RDD進行聚合操作,返回一個新的鍵值對RDD,而reduce操作適用于對整個RDD進行聚合,返回一個單一結果。
reduceByKey可以在分區上并行地進行聚合操作,而reduce操作是在整個RDD上進行的。
reduceByKey需要指定一個聚合函數來合并具有相同鍵的元素,而reduce操作只需要指定一個聚合函數即可。
143、使用reduceByKey出現數據傾斜怎么辦?
144、Spark SQL的執行原理?
和RDD不同,sparksql的DS和sql并不是直接生成計劃交給集群執行,而是經過了一個叫Catalyst的優化器,幫助開發者優化代碼
回答:
1、首先SparkSQL底層解析成RDD,通過兩個階段RBO和CBO
2、RBO就是通過邏輯執行計劃通過常見的優化達到邏輯執行計劃
3、CBO就是從優化后的邏輯計劃到物理執行計劃
145、Spark SQL的優化?
1、catalyst優化器:自動推斷查詢計劃的最優執行方式
2、列式存儲;采用列式存儲的方式來存儲和處理數據
3、數據劃分和分區:可以將大規模的數據集劃分成多個小塊進行處理
4、數據裁剪和推測執行:數據裁剪可以根據查詢條件不想不關的數據過濾掉,減少數據的傳輸和處理量
5、并行執行和動態分配資源
146、Spark RDD持久化
checkpoint
用于將spark應用程序的中間數據保存到持久存儲中,以便在發生故障或者重啟時候恢復應用程序的狀態,當用戶啟動checkpoint后,spark會將DAG的中間數據保存到可靠的存儲系統中,即使發生故障也可以從checkpoint中恢復數據,執行方法:
sparkcontext.setCheckpointDir(" ")
Cache緩存
RDD通過cache或者persist方法將前面的計算結果緩存,默認情況下會把數據以緩存在JVM的堆內存中,
緩存和檢查點區別
cache只是將數據保存起來,不切斷血緣依賴,checkpoint檢查點切斷血緣依賴
cache緩存的數據通常在磁盤,內存地方,checkpoint數據通常存儲在hdfs
145、DF和DS的創建
補充:RDD、DF、DS三者之間的轉換
1、DF和DS——>RDD,只需要調用.rdd()就能實現
2、RDD——>DS,將RDD的每一行封裝成樣例類,再調用toDS()
3、RDD——>DF,調用.toDF()
3、DF——>DS,DF就是DS的特例,是可以相互轉換,使用.as()
4、DS——>DF,使用.toDF()
146、HashPartitioner和RangePartitioner的實現
兩者都是spark的分區函數,繼承于partitioner
HashPartitioner:哈希分區,對于給定的key,計算其hashCode,并除于分區的個數取余,會后返回的余數就是這個key所屬的分區ID
RangePartitioner:將一定范圍內的數據映射到一個分區中,盡量保證每個分區的數據均勻,而且分區間有序,也就是說一個分區中的元素肯定都是比另一個分區內的元素小或者大,但是分區內的元素是不能保證順序的
147、為什么spark比MR快
1、內存計算
spark將數據存儲在內存中進行計算和處理,而hadoop則將數據存儲在磁盤上,速度更慢
2、DAG執行引擎
spark使用DAG執行引擎,通過將任務劃分為多個階段進行優化,可以有效地減少任務之間的數據傳輸和磁盤讀寫
3、運行模式
spark支持多種運行模式,本地、yarn。獨立模式更,根據需求進行選擇
4、緩存機制
spark具有強大的緩存機制,可以將結果存儲在內存中,避免了重復計算和磁盤讀寫操作
5、數據流水線
spark可以將多個數據處理操作連接成一個數據流水線,減少了中間數據的存儲的傳輸
6、資源調度
spark是粗粒度的資源申請,也就是當提交了spark application時候,application會將所有資源申請完畢,task在執行的時候就不需要申請資源,task執行快,當會后一個task執行完之后才會被釋放;MR是細粒度的資源申請,task需要自己申請資源并釋放,故application執行比較緩慢。
補充:Hive On Spark 和 Spark SQL的區別
Hive on spark:
將spark作為hive的計算引擎,通過將hive的查詢作為spark任務提交到spark集群上進行計算,
繼承了hive的數據倉庫功能,包括元數據管理,數據存儲,查詢優化等,還支持UDF函數和存儲過程
sparkSQL:是spark項目的一部分,用于處理結構化的數據,基于Dataframe,
sparkSQL可以更直接利用spark引擎的優化器和執行引擎,可以更緊密地集成到spark生態系統中,更好利用集群資源(優化器和執行引擎、數據格式和存儲、內存計算、執行計劃)