RDD設計背景
在實際應用中,存在許多迭代式計算,這些應用場景的共同之處是 : 不同計算階段之間會重用中間結果,即一個階段的輸出結果會作為下一個階段的輸入.
而目前的MapReduce框架都是把中間結果寫入到HDFS中,帶來了大量的數據復制、磁盤IO和序列化開銷;
如果能將結果保存在內存當中,就可以大量減少IO.
RDD就是為了滿足這種需求而出現的,它提供了一個抽象的數據架構,我們不必擔心底層數據的分布式特性,只需將具體的應用邏輯表達為一系列轉換處理,
不同RDD之間的轉換操作形成依賴關系,可以實現管道化,從而避免了中間結果的落地存儲,大大降低了數據復制、磁盤IO和序列化開銷,最終加快計算速度.
RDD概念
RDD,彈性分布式數據集.
一個RDD就是一個分布式對象集合,本質上是一個只讀的分區記錄集合;
一個RDD可以分成多個分區,每個分區就是一個數據集片段( HDFS上的塊) ;
一個RDD的不同分區可以被保存到集群中不同的節點上,從而可以在集群中的不同節點上進行并行計算.
彈性:既可以存儲在內存又可以存儲在磁盤
分布式:可以被分成多個分區,不同的分區可以被保存到不同的節點上進行并行計算
數據集:本質上是一個只讀的分區記錄集合
RDD提供了一種高度受限的共享內存模型,即RDD是只讀的記錄分區的集合;
不能直接修改,只能基于穩定的物理存儲中的數據集來創建RDD,或者通過在其他RDD上執行確定的轉換操作( 如map、join和groupBy) 而創建得到新的RDD.RDD提供了一組豐富的操作以支持常見的數據運算,分為'行動' ( Action) 和'轉換' ( Transformation) 兩種類型,前者用于執行計算并指定輸出的形式,后者指定RDD之間的相互依賴關系.
兩類操作的主要區別是:轉換操作( 比如map、filter、groupBy、join等) 接受RDD并返回RDD,而行動操作( 比如count、collect等) 接受RDD但是返回非RDD( 即輸出一個值或結果) .
RDD執行過程
Spark用Scala語言實現了RDD的API,程序員可以通過調用API實現對RDD的各種操作.
RDD典型的執行過程如下:
1 . 讀入外部數據源( 或者內存中的集合) 創建RDD;
2 . RDD經過一系列的'Transformation' 操作,每一次都會產生不同的RDD,供給下一個'Transformation' 使用;
3 . 最后一個RDD經'Action' 操作進行處理,并輸出到外部數據源( 或者變成Scala/JAVA集合或變量) .
需要說明的是,RDD采用了惰性調用,即在RDD的執行過程中,真正的計算發生在RDD的'Action' 操作,
對于'Action' 之前的所有'Transformation' 操作,Spark只是記錄下'Transformation' 操作應用的一些基礎數據集以及RDD生成的軌跡,即相互之間的依賴關系,而不會觸發真正的計算.
從輸入中邏輯上產生了A和C兩個RDD,經過一系列'Transformation' 操作,邏輯上生成了F( 也是一個RDD) ,之所以說是邏輯上,是因為這時候計算并沒有發生,Spark只是記錄了RDD之間的生成和依賴關系.
也就當F要進行輸出時,是當F進行'Action' 操作的時候,Spark才會根據RDD的依賴關系生成DAG,并從起點開始真正的計算.
血緣關系
上訴一系列處理稱為一個'血緣關系(Lineage)' ,即DAG拓撲排序的結果.
采用惰性調用,通過血緣關系連接起來的一系列RDD操作就可以實現管道化( pipeline) ,避免了多次轉換操作之間數據同步的等待,
而且不用擔心有過多的中間數據,因為具有血緣關系的操作都管道化了,一個操作得到的結果不需要保存為中間數據,而是直接管道式地流入到下一個操作進行處理.
同時,這種通過血緣關系就把一系列操作進行管道化連接的設計方式,也使得管道中每次操作的計算變得相對簡單,保證了每個操作在處理邏輯上的單一性;
相反,在MapReduce的設計中,為了盡可能地減少MapReduce過程,在單個MapReduce中會寫入過多復雜的邏輯.
RDD特性
總體而言,Spark采用RDD以后能夠實現高效計算的主要原因如下:
( 1 ) 高效的容錯性
現有的分布式共享內存、鍵值存儲、內存數據庫等,為了實現容錯,必須在集群節點之間進行數據復制或者記錄日志,即在節點之間會發生大量的數據傳輸,這對于數據密集型應用而言會帶來很大的開銷.
而在RDD的設計中,數據只讀,不可修改,如果需要修改數據,必須從父RDD轉換到子RDD,由此在不同RDD之間建立了血緣關系; 所以,RDD是一種天生具有容錯機制的特殊集合,
不需要通過數據冗余的方式( 比如詳細的記錄操作的日志) 實現容錯,而只需通過RDD父子依賴( 血緣) 關系重新計算得到丟失的分區來實現容錯,無需回滾整個系統,這樣就避免了數據復制的高開銷,
而且重算過程可以在不同節點之間并行進行,實現了高效的容錯;
此外,RDD提供的轉換操作都是一些粗粒度的操作( 比如map、filter和join) ,RDD依賴關系只需要記錄這種粗粒度的轉換操作,而不需要記錄具體的數據和各種細粒度操作的日志( 比如對哪個數據項進行了修改) ,這就大大降低了數據密集型應用中的容錯開銷.
( 2 ) 中間結果持久化到內存
數據在內存中的多個RDD操作之間進行傳遞,不需要落地到磁盤上,避免了不必要的讀寫磁盤開銷.
( 3 ) 存放的數據可以是Java對象,避免了不必要的對象序列化和反序列化開銷.
RDD的依賴關系
RDD中不同的操作會使得不同RDD中的分區會產生不同的依賴; RDD中的依賴關系分為窄依賴( Narrow Dependency) 、寬依賴( Wide Dependency) .
寬依賴: 一個父RDD的一個分區對應一個子RDD的多個分區; 一對多,伴有shuffle過程.
窄依賴: 一個父RDD的分區對應于一個子RDD的分區,或多個父RDD的分區對應于一個子RDD的分區; 一對一或多對一.
總結:如果父RDD的一個分區只被一個子RDD的一個分區所使用就是窄依賴,否則就是寬依賴.
窄依賴典型的操作包括map、filter、union等,寬依賴典型的操作包括groupByKey、sortByKey等;
對于連接( join) 操作,可以分為兩種情況:
1 .對輸入進行協同劃分,屬于窄依賴.
協同劃分( co-partitioned) 是指多個父RDD的某一分區的所有'鍵(key)' 落在子RDD的同一個分區內,不會產生同一個父RDD的某一分區落在子RDD的兩個分區的情況.
2 .對輸入做非協同劃分,屬于寬依賴.
對于窄依賴的RDD,可以以流水線的方式計算所有父分區,不會造成網絡之間的數據混合;
對于寬依賴的RDD,則通常伴隨著Shuffle操作,即首先需要計算好所有父分區數據,然后在節點之間進行Shuffle.
階段劃分(stage)
Spark通過分析各個RDD的依賴關系生成了DAG,再通過分析各個RDD中的分區之間的依賴關系來決定如何劃分階段,
具體劃分方法是:在DAG中進行反向解析,遇到寬依賴就斷開,遇到窄依賴就把當前的RDD加入到當前的階段中; 將窄依賴盡量劃分在同一個階段中,可以實現流水線計算.
例如,假設從HDFS中讀入數據生成3個不同的RDD( 即A、C和E) ,通過一系列轉換操作后再將計算結果保存回HDFS;
對DAG進行解析時,在依賴圖中進行反向解析,由于從RDD A到RDD B的轉換以及從RDD B和F到RDD G的轉換,都屬于寬依賴,因此,在寬依賴處斷開后可以得到三個階段,即階段1、階段2和階段3.
可以看出,在階段2中,從map到union都是窄依賴,這兩步操作可以形成一個流水線操作,
比如,分區7通過map操作生成的分區9,可以不用等待分區8到分區10這個轉換操作的計算結束,而是繼續進行union操作,轉換得到分區13,這樣流水線執行大大提高了計算的效率.
由上述論述可知,把一個DAG圖劃分成多個'stage' 以后,每個階段都代表了一組關聯的、相互之間沒有Shuffle依賴關系的任務組成的任務集合;
每個任務集合會被提交給任務調度器( TaskScheduler) 進行處理,由任務調度器將任務分發給Executor運行.
Spark算子
RDD支持兩種類型的操作:
Transformation:從一個RDD轉換為一個新的RDD.
Action:基于一個數據集進行運算( 引起Job運算) ,并返回RDD.
例如,map是一個Transformation操作,map將數據集的每一個元素按指定的函數轉換為一個RDD返回; reduce是一個action操作.
Spark的所有Transformation操作都是懶執行,它們并不立馬執行,而是先記錄對數據集的一系列Transformation操作; 這種設計讓Spark的運算更加高效.
例如,對一個數據集map操作之后使用reduce只返回結果,而不返回龐大的map運算的結果集.
默認情況下,每個轉換的RDD在執行不同Action操作時都會重新計算; 即使兩個Action操作會使用同一個轉換的RDD,該RDD也會重新計算.
除非使用persist方法或cache方法將RDD緩存到內存,這樣在下次使用這個RDD時將會提高計算效率,也支持將RDD持久化到硬盤上或在多個節點上復制.
Transformation算子
下面列出了Spark常用的transformation操作,詳細的細節請參考RDD API文檔( Scala、Java、Python、R) 和鍵值對RDD方法文檔( Scala、Java) .map( func)
將原來RDD的每個數據項,使用map中用戶自定義的函數func進行映射,轉變為一個新的元素,并返回一個新的RDD.filter( func)
使用函數func對原RDD中數據項進行過濾,將符合func中條件的數據項組成新的RDD返回.flatMap( func)
類似于map,但是輸入數據項可以被映射到0個或多個輸出數據集合中,所以函數func的返回值是一個數據項集合而不是一個單一的數據項.mapPartitions( func)
類似于map,但是該操作是在每個分區上分別執行,所以當操作一個類型為T的RDD時func的格式必須是Iterator< T> = > Iterator< U> .
即mapPartitions需要獲取到每個分區的迭代器,在函數中通過這個分區的迭代器對整個分區的元素進行操作.mapPartitionsWithIndex( func)
類似于mapPartitions,但是需要提供給func一個整型值,這個整型值是分區的索引,所以當處理T類型的RDD時,func的格式必須為( Int, Iterator< T> ) = > Iterator< U> .union( otherDataset)
返回原數據集和參數指定的數據集合并后的數據集;
使用union函數時需要保證兩個RDD元素的數據類型相同,返回的RDD數據類型和被合并的RDD元素數據類型相同;
該操作不進行去重操作,返回的結果會保存所有元素; 如果想去重,可以使用distinct( ) .intersection( otherDataset)
返回兩個數據集的交集.distinct( [ numTasks] ))
將RDD中的元素進行去重操作.groupByKey( [ numTasks] )
操作( K,V) 格式的數據集,返回( K, Iterable) 格式的數據集.
注意,如果分組是為了按key進行聚合操作( 例如,計算sum、average) ,此時使用reduceByKey或aggregateByKey計算效率會更高.
注意,默認情況下,并行情況取決于父RDD的分區數,但可以通過參數numTasks來設置任務數.reduceByKey( func, [ numTasks] )
使用給定的func,將( K,V) 對格式的數據集中key相同的值進行聚集,其中func的格式必須為( V,V) = > V,可選參數numTasks可以指定reduce任務的數目.aggregateByKey( zeroValue) ( seqOp, combOp,[ numTasks] )
對( K,V) 格式的數據按key進行聚合操作,聚合時使用給定的合并函數和一個初始值,返回一個( K,U) 對格式數據;
需要指定的三個參數:zeroValue為在每個分區中,對key值第一次讀取V類型的值時,使用的U類型的初始變量;
seqOp用于在每個分區中,相同的key中V類型的值合并到zeroValue創建的U類型的變量中; combOp是對重新分區后兩個分區中傳入的U類型數據的合并函數.sortByKey( [ ascending] , [ numTasks] )
( K,V) 格式的數據集,其中K已實現了Ordered,經過sortByKey操作返回排序后的數據集,指定布爾值參數ascending來指定升序或降序排列.join( otherDataset, [ numTasks] )
用于操作兩個鍵值對格式的數據集,操作兩個數據集( K,V) 和( K,W) 返回( K,( V, W)) 格式的數據集,通過leftOuterJoin、rightOuterJoin、fullOuterJoin完成外連接操作.cogroup( otherDataset, [ numTasks] )
用于操作兩個鍵值對格式數據集( K,V) 和( K,W) ,返回數據集格式為( K,( Iterable, Iterable)) .這個操作也稱為groupWith.
對在兩個RDD中的Key-Value類型的元素,每個RDD相同Key的元素分別聚合為一個集合,并且返回兩個RDD中對應Key的元素集合的迭代器.cartesian( otherDataset)
對類型為T和U的兩個數據集進行操作,返回包含兩個數據集所有元素對的( T,U) 格式的數據集; 即對兩個RDD內的所有元素進行笛卡爾積操作.pipe( command, [ envVars] )
以管道( pipe) 方式將RDD的各個分區( partition) 使用shell命令處理( 比如一個 Perl或 bash腳本) ,
RDD的元素會被寫入進程的標準輸入( stdin) ,將進程返回的一個字符串型 RDD( RDD of strings) ,以一行文本的形式寫入進程的標準輸出( stdout) 中.coalesce( numPartitions)
把RDD的分區數降低到通過參數numPartitions指定的值,在得到的更大一些數據集上執行操作,會更加高效.repartition( numPartitions)
隨機地對RDD的數據重新洗牌( Reshuffle) ,從而創建更多或更少的分區,以平衡數據,總是對網絡上的所有數據進行洗牌( shuffles) .repartitionAndSortWithinPartitions( partitioner)
根據給定的分區器對RDD進行重新分區,在每個結果分區中,按照key值對記錄排序,這在每個分區中比先調用repartition再排序效率更高,因為它可以將排序過程在shuffle操作的機器上進行.
Action算子
下面列出了Spark支持的常用的action操作,詳細請參考RDD API文檔( Scala、Java、Python、R) 和鍵值對RDD方法文檔( Scala、Java) .reduce( func)
使用函數func聚集數據集中的元素,這個函數func輸入為兩個元素,返回為一個元素; 這個函數應該符合結合律和交換率,這樣才能保證數據集中各個元素計算的正確性.collect( )
在驅動程序中,以數組的形式返回數據集的所有元素; 通常用于filter或其它產生了大量小數據集的情況.count( )
返回數據集中元素的個數.first( )
返回數據集中的第一個元素,類似于take( 1 ) .take( n)
返回數據集中的前n個元素,類似于sql中的limit.takeOrdered( n,[ ordering] )
返回RDD按自然順序或自定義順序排序后的前n個元素.saveAsTextFile( path)
將數據集中的元素以文本文件或文本文件集合的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中;
Spark將在每個元素上調用toString方法,將數據元素轉換為文本文件中的一行記錄.saveAsSequenceFile( path) ( Java and Scala)
將數據集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中;
該操作只支持對實現了Hadoop的Writable接口的鍵值對RDD進行操作,在Scala中,還支持隱式轉換為Writable的類型( Spark包括了基本類型的轉換,例如Int、Double、String等) .saveAsObjectFile( path) ( Java and Scala)
將數據集中的元素以簡單的Java序列化的格式寫入指定的路徑; 這些保存該數據的文件,可以使用SparkContext.objectFile( ) 進行加載.countByKey( )
僅支持對( K,V) 格式的鍵值對類型的RDD進行操作; 返回( K,Int) 格式的Hashmap,( K,Int) 為每個key值對應的記錄數目.foreach( func)
對數據集中每個元素使用函數func進行處理.