2019獨角獸企業重金招聘Python工程師標準>>>
在Spar看中,我們知道一切的操作都是基于RDD的。在使用中,RDD有一種非常特殊也是非常實用的format——pair RDD,即RDD的每一行是(key, value)的格式。這種格式很像Python的字典類型,便于針對key進行一些處理。
針對pair RDD這樣的特殊形式,spark中定義了許多方便的操作,今天主要介紹一下reduceByKey和groupByKey,因為在接下來講解《在spark中如何實現SQL中的group_concat功能?》時會用到這兩個operations。
首先,看一看spark官網[1]是怎么解釋的:
reduceByKey(func,?numPartitions=None)
Merge the values for each key using an associative reduce function. This will also perform the merging?locally on each mapper?before sending results to a reducer, similarly to a “combiner” in MapReduce. Output will be hash-partitioned with?numPartitions?partitions, or the default parallelism level if?numPartitions?is not specified.
也就是,reduceByKey用于對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,并且merge操作可以通過函數自定義。
groupByKey(numPartitions=None)
Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.?Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using?reduceByKey?or aggregateByKey will provide much better performance.
也就是,groupByKey也是對每個key進行操作,但只生成一個sequence。需要特別注意“Note”中的話,它告訴我們:如果需要對sequence進行aggregation操作(注意,groupByKey本身不能自定義操作函數),那么,選擇reduceByKey/aggregateByKey更好。這是因為groupByKey不能自定義函數,我們需要先用groupByKey生成RDD,然后才能對此RDD通過map進行自定義函數操作。
為了更好的理解上面這段話,下面我們使用兩種不同的方式去計算單詞的個數[2]:
val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _)
val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum))
上面得到的wordCountsWithReduce和wordCountsWithGroup是完全一樣的,但是,它們的內部運算過程是不同的。
(1)當采用reduceByKeyt時,Spark可以在每個分區移動數據之前將待輸出數據與一個共用的key結合。借助下圖可以理解在reduceByKey里究竟發生了什么。 注意在數據對被搬移前同一機器上同樣的key是怎樣被組合的(reduceByKey中的lamdba函數)。然后lamdba函數在每個區上被再次調用來將所有值reduce成一個最終結果。整個過程如下:
(2)當采用groupByKey時,由于它不接收函數,spark只能先將所有的鍵值對(key-value pair)都移動,這樣的后果是集群節點之間的開銷很大,導致傳輸延時。整個過程如下:
因此,在對大數據進行復雜計算時,reduceByKey優于groupByKey。
另外,如果僅僅是group處理,那么以下函數應該優先于 groupByKey?:
(1)、combineByKey?組合數據,但是組合之后的數據類型與輸入時值的類型不一樣。
(2)、foldByKey合并每一個 key 的所有值,在級聯函數和“零值”中使用。
最后,對reduceByKey中的func做一些介紹:
如果是用python寫的spark,那么有一個庫非常實用:operator[3],其中可以用的函數包括:大小比較函數,邏輯操作函數,數學運算函數,序列操作函數等等。這些函數可以直接通過“from operator import *”進行調用,直接把函數名作為參數傳遞給reduceByKey即可。如下:
<span style="font-size:14px;">from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.reduceByKey(add).collect()) [('a', 2), ('b', 1)]</span>
?