RDD的操作
1.1 概述
? ?RDD整體包含兩大類操作
transformation?從現有中創建一個新的數據集
action?在對數據集做一定程度的計算后將結果返回
對于所有的transformation,都是Lazy的,也就是說它不會立即執行,只是單純的記住怎么樣從原來的數據集進行轉換的邏輯而已,它僅在某一個計算需要的情況下,才會被真正執行.
因為transformation?的Lazy性,RDD支持在每次計算時都進行重新計算,當然你可以將這個RDD保存下來 (persist? or cache方法)避免每次重計算
可以通過設置不同的存儲級別,將數據保存到硬盤,內存,或者選擇同步多個副本到多個節點中.
? ?1.2 集群環境下的變量與操作
集群環境,所有操作最終會交給executors去執行.而變量,會以數據副本的形式交給executors.很多時候,這與我們非集群環境下的開發思維有非常大的不同.
1.2.1 集群下的閉包
? RDD是支持閉包操作的.但務必注意的是Spark不保證對閉包之外的對象引用進行的變化.
原因是閉包的會被序列化發生給每一個executor,對于閉包的之外的對象引用會拷貝一個副本給executor.這時多個executor執行至少是跨JVM的
??這時對這個副本對象的變更沒有任何意義,因為每個JVM(executor)的副本都是獨立的.
1.2.2 集群下的print
集群環境下,print不會在driver端有任何輸出.
原因也是一樣,print最終是在每個executor執行,其輸出也是在每個executor的stdout上,在driver端,是不會有這些輸出的.
如果想在driver輸出,一個比較簡單的辦法是調用collect()將結果發送到driver端在進行print,但這樣可能會造成driver內存爆掉(所有executor的數據涌入).
比較推薦的做法是rdd.take(100).foreach(println)
? 1.2.3 共享變量
因為集群下,變量只會以副本方式交給executor,而不會將變量的值變化傳回driver,所以一個可讀寫共享的變量是非常有用的.
Spark提供了兩種共享變量?broadcast(廣播變量) 和?accumulators(累加器)
1.2.3.1 廣播變量(broadcast)
廣播變量允許將一個只讀變量的副本發送到每個機器上(executor機器),而不是對每一個任務發送一個副本.這樣在同一機器上的多個任務,就可以反復使用這個變量了.
注意:
廣播變量只會對每個節點分發一次,所以一般來說,廣播變量不應該再被修改了.以保證每個廣播變量的副本的值都是一致的
如果廣播變量被修改,則需要將廣播變量重新分發
另:
舉個例子:Spark的action操作本身是通過一系列的stage來完成的,這些Stage是通過分布式的shuffle操作來進行切分的.而在每個Stage之間,Spark自動使用廣播變量.
這里用法說明,只有數據本身會在多個Stage的多個任務中反復使用,或者說緩存這個數據是非常重要且非常必要的情況下,使用廣播變量才有意義.
廣播變量的使用如下:
// SparkContext.broadcast(v)進行創建,返回的是廣播變量的封裝器,以.value讀取廣播變量的的值val broadcastVar = sc.broadcast(Array(1, 2, 3))val v = broadcastVar.value
1.2.3.2 累加器(accumulators)?
累加器變量僅支持累加操作,因為可以在并行計算執行一些特殊的計算(比計數或者求和).并且累加器的變化是可以在UI的Task界面上看見的(注,不支持Python)
累加器操作,依然遵循RDD的Lazy原則:
累加器更新操作是在Action中,并且在每個任務中只會執行一次(如果任務失敗重啟了,累加器更新不會執行)
而在transformation中,累加器依然不會立即執行更新,如果transformation被重新執行了,則累加器操作會重復執行
對于累加器變量,Spark原生支持數值類型.一個使用例子如下
val accum = sc.longAccumulator("My Accumulator")sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))println(accum.value)
? 也可以創建繼承AccumulatorV2的類型,來實現自定義類型的累加器,例子如下:
//兩個泛型參數->累加的元素類型和結果類型可以不同的class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {private val myVector: MyVector = MyVector.createZeroVectordef reset(): Unit = {myVector.reset()}def add(v: MyVector): Unit = {myVector.add(v)}...}// 創建一個自定義的累加器類型:val myVectorAcc = new VectorAccumulatorV2//將這個觸發器注冊到SparkContext:sc.register(myVectorAcc, "MyVectorAcc1")
1.3 RDD的一些基本操作
1.3.1?Transformations 依賴關系
RDD是由父RDD經過轉換函數形成一個個子RDD(子RDD依賴父RDD).針對不同的轉換函數,以父RDD分區與子RDD分區的關系為標準,Spark將這些依賴關系分為兩類.
窄依賴
窄依賴是指轉換后,父RDD的每個分區只會被某一個子RDD分區使用.(一對一或者多對一的關系).
所以窄依賴一般出現在map,filter等子分區沿用父分區,不會發生重分區的時候.
? ?寬依賴
寬依賴是指轉換后,父RDD的某個或某些分區會被幾個子RDD分區使用.(某個分區數據部分在這個RDD,部分在那個RDD,一對多關系)
寬依賴一般出現在groupByKey等子分區一定會發生重分區的時候
兩種依賴關系的對比
一般來說,窄依賴比寬依賴對執行優化更加有利
i).窄依賴允許集群節點上以流水線的形式直接計算所有分區
? ?寬依賴則需要先計算好父分區的分區信息,然后再以一個shuffle完成重分區,
? ?ii).某個子分區異常需要重計算時,會對這個子分區所依賴的所有分區進行計算.(這是寬窄依賴都必須的),但是針對分區數據而言
窄依賴,一個或多個父分區完全對應一個子分區.對這些父分區的重計算,利用率是100%
寬依賴,父分區的數據不完全對應一個子分區(一對多關系,父分區的某些部分是其它分區的),但此時依然需要重計算父分區全部數據,造成計算浪費(因為白計算其它分區的數據)
1.3.2 Transformations 操作
map
對RDD中的元素執行一個指定函數,將執行結果作為新元素產生一個新的RDD.
與其它map系函數區別,map新元素的完全是Map函數的執行結果返回,所以新RDD的數量與老RDD是一一對應的.
val rdd = sc.parallelize(Seq("aa bb","cc dd","ee ff"),2)rdd.map(rec=>rec.split(" ")>).collect().map(println(_))//返回結果是rec.split(" ")結果(一維數組)=>[["aa","bb"],["cc","dd"]]
flatMap?
與map相同,但結果會扁平化.即如果結果是迭代器容器的,會將元素從容器中取出再返回
val rdd = sc.parallelize(Seq("aa bb","cc"),2)rdd.flatMap(rec=>rec.split(" ")).collect().map(println(_));//返回結果["aa","bb","cc"]//flatMap如以下這種方式使用是不行,flatMap返回結果必須是TraversableOnce[U](可迭代一次的類型)//rdd.flatMap(rec=>(rec,1)).collect().map(println(_));
mapPartitions
與map相同,不過是以分區為單位,所以語法要求必須為?f: Iterator[T] => Iterator[U],注意返回結果不是以分區為單位,而是所有分區執行函數的結果的合并
val rdd = sc.parallelize(Seq("aa bb","cc dd","ee ff"),2)rdd.mapPartitions(part=>part.map(rec=>rec.split(" "))).collect().map(println(_))//結果是 [["aa","bb"],["cc","dd"],["ee","ff"]]
mapPartitionsWithIndex
與mapPartitions類似,不過它帶有分區的index以供使用.所以語法要求為f: (Int, Iterator[T]) => Iterator[U]
val rdd = sc.parallelize(Seq("aa bb","cc dd","ee ff"),2)rdd.mapPartitionsWithIndex((partIdx,part)=>part.map(rec=>(partIdx,rec))).collect().map(println(_))//返回結果 (0,aa bb),(1,cc dd),(1,ee ff)
? sample
抽樣函數.可以從數據集中按一定比例抽取部分數據,抽取之后可以選擇是否返回
語法要求?withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong
val rdd = sc.parallelize(1 to 50)rdd.sample(false,0.2,System.currentTimeMillis).map(rec=>(rec,1)).collect().map(print(_)+" ")//返回結果 (8,1)(21,1)(26,1)(27,1)(34,1)(43,1)(46,1)(49,1)
union
將兩個數據集合并(包含數據重復)
val rdd = sc.parallelize(1 to 10)val rdd2 = sc.parallelize(11 to 20)rdd.union(rdd2).map(rec=>rec.toString).collect().map(rec=>print(s"${rec} "))//返回結果 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
intersection
將兩個數據集合并,取交集作為結果返回
val rdd = sc.parallelize(1 to 10)val rdd2 = sc.parallelize(5 to 15)rdd.intersection(rdd2).map(rec=>rec.toString).collect().map(rec=>print(s"${rec} "))//返回結果 6 7 9 8 10 5
? ?distinct
對當前結果集去重返回
val rdd = sc.parallelize(1 to 10)val rdd2 = sc.parallelize(5 to 15)rdd.union(rdd2).distinct().map(rec=>rec.toString).collect().map(rec=>print(s"${rec} "))//返回結果 4 14 6 8 12 10 2 13 15 11 1 3 7 9 5
groupByKey
將一個鍵值對類型的結果集按照key進行分組(如果是為分組聚合,groupByKey相比reduceByKey效率更低,因為少一個map-shuffer的combine)
val rdd = sc.parallelize(Seq("aa bb","cc dd","bb cc"),2)rdd.flatMap(rec=>rec.split(" ")).map(rec=>(rec,1)).groupByKey().collect().map(rec=>print(s" ${rec._1} ${rec._2.sum} |"));//返回結果 aa 1 | dd 1 | bb 2 | cc 2 |
reduceByKey
將一個鍵值對類型數據集,使用指定的函數分組聚合為另一個鍵值對類型數據集,(相比groupByKey性能更高,因為可以在map-shuffer進行combine減少數據量)
val rdd = sc.parallelize(Seq("aa bb","cc dd","bb cc"),2)rdd.flatMap(rec=>rec.split(" ")).map(rec=>(rec,1)).reduceByKey((value1,value2)=>value1+value2).collect().map(rec=>print(s"${rec} "))//返回結果 (aa,1) (dd,1) (bb,2) (cc,2)
? aggregate
? 給出一個默認基準值,先使用seqOp遍歷分區內元素傳入基準值進行聚合,再對分區間結果使用combOp聚合為最后結果.
? 注意aggregate返回的結果直接是聚合結果(不是RDD),并且要與原RDD的類型一致
val rdd = sc.parallelize(1 to 10);/*** zeroValue:預定義一個初始值 (0,0)* seqOp: (U, T) => U 分區內元素聚合,遍歷元素傳入基準值執行函數.(類似Map-Shuffle)* U:當前基準值,T:當前元素* 執行的邏輯是 (基準值(默認初始值), 元素No.1) 執行seqOp ,結果再作為基準值,執行(基準值(上步結果),元素No.2),以此類推* combOp: (U, U) => U 分區間聚合,將各分區執行seqOp函數的結果再使用combOp聚合 (類似Reduce-Shuffle)*/val aggregateResult = rdd.aggregate((0,0))(seqOp=(sv,tv)=>(sv._1+tv,sv._2+1),combOp=(v1,v2)=>(v1._1+v2._1,v2._2+v2._2))println(aggregateResult)//輸出結果 (55,10) (1-10的總和,1-10的個數) <=非RDD結果,并且類型必須是Int
aggregateByKey
? 與aggregate類似,但針對的是key分組,aggregateBykey是以key組為單位,對分組內元素遍歷使用seqOp,再使用combOp聚合分組內
val rdd = sc.parallelize(Seq("a b c", "b c d"));rdd.flatMap(rec => rec.split(" ")).map(rec => (rec, 1)).aggregateByKey(0)(seqOp = (sv, tv) => (sv + tv),combOp = (v1, v2) => (v1 + v2)).collect().map(rec => print(s"${rec} |"))//輸出結果 (d,1) |(b,2) |(a,1) |(c,2) |
sortByKey
將一個鍵值對RDD按key排序轉換為另一個RDD
join
將兩個鍵值對RDD((K, V),(K, W)),按Key合并為一個RDD(K, (V, W)) .(Spark同時還提供?leftOuterJoin,rightOuterJoin,fullOuterJoin)
val rdd = sc.parallelize(Seq("a b")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));val rdd2 = sc.parallelize(Seq("b c")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));rdd.join(rdd2).collect().map(rec => print(s"${rec} |"))//兩個RDD交集 (b,(b,b))rdd.leftOuterJoin(rdd2).collect().map(rec => print(s"${rec} |"))//leftOuterJoin左邊全數據,右邊Opt (b,(b,Some(b))) |(a,(a,None)) |rdd.rightOuterJoin(rdd2).collect().map(rec => print(s"${rec} |"))//rightOuterJoin右邊全數據,左邊Opt (b,(Some(b),b)) |(c,(None,c)) |rdd.fullOuterJoin(rdd2).collect().map(rec => print(s"${rec} |"))//笛卡爾乘積,Opt (b,(Some(b),Some(b))) |(a,(Some(a),None)) |(c,(None,Some(c))) |
cogroup
將多個鍵值對RDD按Key合并在一起.合并為全數據(沒有丟失)
與fullOuterJoin區別在與多個RDD情況下,cogroup按key合并為一個,fullOuterJoin為多個的笛卡爾積
注意,如果某個數據集少某一個key,合并時是在這個數據集的位置上占CompactBuffer()的位置,而不是直接跳過
val rdd = sc.parallelize(Seq("a b")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));val rdd2 = sc.parallelize(Seq("b c")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));rdd.cogroup(rdd2).collect().map(rec => print(s"${rec} |"))//(b,(CompactBuffer(b),CompactBuffer(b))) |(a,(CompactBuffer(a),CompactBuffer())) |(c,(CompactBuffer(),CompactBuffer(c))) |
cartesian
返回兩個RDD的笛卡爾積結果
pipe
使用Shell的語法操作RDD
coalesce
重新調整RDD的分區后形成一個新的RDD.語法要求:numPartitions: Int, shuffle: Boolean = false.
numPartitions表示要重新調整的分區數量,shuffle表示重新調整分區時是否允許發生shuffle過程.
如果子分區數往下減少,則子分區數設置一定會成功.但要注意,在這種情況下會造成任務的并行度降低(分區數,任務數降了),任務內存更容易爆出(單個任務的數據增大了)
如果子分區數往上增加,則子分區數設置必須要設置shuffle=true,才會成功,否則子分區依然等于父分區
謹記:如果沒有shuffle的參與,RDD只能減少分區(窄依賴),不能增加分區
repartition
只是coalesce的shuffle等于true的快捷方式.?coalesce(numPartitions, shuffle = true)
? ? ? ? ? ? ? ? ? ? ??repartitionAndSortWithinPartitions
?
1.3.2 Action 操作
reduce
RDD中元素前兩個傳給輸入函數,產生一個新的return值,新產生的return值與RDD中下一個元素組成兩個元素,再被傳給輸入函數,直到最后只有一個值為止
與reduceByKey的區別是? reduceByKey是一個轉換操作,返回的是RDD,?reduce是一個action操作,返回的是數據結果
val rdd = sc.parallelize(1 to 100,2);val value = rdd.reduce((v1,v2)=>v1+v2)println(value)//輸出結果 5050
collect
將一個RDD的所有元素以數組的形式發回driver端.注意這個RDD必須是足夠小的數據集,否則很容易將driver端的內存撐爆
count
返回一個RDD的元素的個數
first
返回一個RDD的第一個元素
take(n)
返回一個RDD的前N個元素
takeSample(withReplacement,?num, [seed])
返回一個RDD的百分比抽樣(withReplacement標識元素是否放回RDD以供多次使用)
? ? ? ?takeOrdered(n,?[ordering])
返回一個RDD按照設定的排序規則后的前N個元素
countByKey
只支持鍵值對類型,返回一個RDD的按照Key分組后的每組計數
saveAsTextFile(path)
將一個RDD的全部元素寫入一個文本方式的本地文件,HDFS或其它任何Hadoop支持的存儲系統中.(每行等于每個元素調用toString()的結果)
saveAsSequenceFile(path)
將一個RDD的全部元素寫入一個二進制方式的本地文件,HDFS或其它任何Hadoop支持的存儲系統中.
在Scala中,它還可以用于隱式轉換為可寫類型的類型(Spark包含對基本類型的轉換,如Int、Double、String等)。
saveAsObjectFile(path)
將一個RDD的全部元素使用Java序列化以簡單的格式編寫數據集的元素(可以使用SparkContext.objectFile()加載這些元素)。
foreach(func)
在數據集的每個元素上運行函數func。這通常用于處理副作用,如更新累加器或與外部存儲系統交互
注意:不可以修改foreach()之外的累加器之外的變量,見前面集群下的變量與閉包一節
1.4 Shuffle過程
Spark的某些操作,會引起一個Shuffle過程.Shuffle是指不同節點上的不同分區數據整合重新分區分組的機制.
所以Shuffle是一個代價很高的操作,因為它會導致executor和不同的機器節點之間進行數據復制.
1.4.1 Shuffle簡述
以reduceByKey為例,將原始數據中key相同的記錄聚合為一個組.這里挑戰是原始數據很可能是存在不同分區不同機器的(參考MapReduce執行過程)
Spark-Shuffle與MapReduce-Shuffle的區別
MapReduce-Shuffle結果是分區有序,分區內再按Key排序
Spark-Shuffle結果是分區有序,但分區內Key無序.
要對Spark-Shuffle的分區內再排序,有以下方法:
mapPartitions 在已有的每個分區上再使用.sort排序
repartitionAndSortWithinPartitions? 重建分區,并排序
sortBy提前對RDD本身做一個全范圍排序
1.4.2 RDD中引起Shuffle的操作
?repartition類
操作 例如:repartition
、coalesce
_ByKey
操作(除了counting相關操作)例如:groupByKey
、reduceByKey
join?例如:cogroup
、join
? ?1.4.3?Shuffle的性能影響
Shuffle本身是同時高耗內存,高耗磁盤IO,高耗網絡IO的昂貴操作.
Spark會啟動一系列的MapReduce(Hadoop MapReduce),產生大量的數據緩沖區與歸并排序,大量的pill文件與歸并Merge等等