原文地址:http://blog.csdn.net/qq_23660243/article/details/51435257
--------------------------------------------
最近經常使用到reduceByKey這個算子,懵逼的時間占據多數,所以沉下心來翻墻上國外的帖子仔細過了一遍,發現一篇不錯的,在此加上個人的理解整體過一遍這個算子,那么我們開始:
國外的大牛一上來給出這么一句話,個人感覺高度概括了reduceByKey的功能:
那么這就基本奠定了reduceByKey的作用域是key-value類型的鍵值對,并且是只對每個key的value進行處理,如果含有多個key的話,那么就對多個values進行處理。這里的函數是我們自己傳入的,也就是說是可人為控制的【其實這是廢話,人為控制不了這算子一點用沒有】。那么舉個例子:
??
scala> val x = sc.parallelize(Array(("a", 1), ("b", 1), ("a", 1),| ("a", 1), ("b", 1), ("b", 1),| ("b", 1), ("b", 1)), 3)
我們創建了一個Array的字符串,并把其存入Spark的集群上,設置了三個分區【這里我們不關注分區,只關注操作】。那么我們調用reduceByKey并且傳入函數進行相應操作【本處我們對相同key的value進行相加操作,類似于統計單詞出現次數】:
scala> val y = x.reduceByKey((pre, after) => (pre + after))這里兩個參數我們邏輯上讓他分別代表同一個key的兩個不同values,那么結果想必大家應該猜到了:
scala> y.collect res0: Array[(String, Int)] = Array((a,3), (b,5))嗯,到這里大家對reduceByKey有了初步的認識和體會。論壇中有一段寫的也很有幫助,由于英文不好怕翻譯過來誤導大家,所以每次附上原話:
看到這大家對這個算子應該有了更加深入的認識,那么再附上我的Scala的一個小例
子,同樣是統計字母出現次數:
結果是:ArrayBuffer((a,3), (b,5)),很簡單對吧。論壇給出了java和python的版本的,如下:
Java:
packagecom.backtobazics.sparkexamples;importjava.util.Arrays;importorg.apache.spark.api.java.JavaPairRDD; importorg.apache.spark.api.java.JavaRDD; importorg.apache.spark.api.java.JavaSparkContext; importorg.apache.spark.api.java.function.Function2;importscala.Tuple2;public classReduceByKeyExample {public static void main(String[] args) throws Exception {JavaSparkContext sc = new JavaSparkContext();//Reduce Function for sum Function2<Integer, Integer, Integer> reduceSumFunc = (accum, n) -> (accum + n);// Parallelized with 2 partitions JavaRDD<String> x = sc.parallelize(Arrays.asList("a", "b", "a", "a", "b", "b", "b", "b"),3);// PairRDD parallelized with 3 partitions// mapToPair function will map JavaRDD to JavaPairRDD JavaPairRDD<String, Integer> rddX = x.mapToPair(e -> newTuple2<String,Integer>(e, 1));// New JavaPairRDD JavaPairRDD<String, Integer> rddY = rddX.reduceByKey(reduceSumFunc);//Print tuples for(Tuple2<String, Integer> element : rddY.collect()){System.out.println("("+element._1+", "+element._2+")");}} }// Output:// (b, 5)// (a, 3)python:
Bazic reduceByKey example in python# creating PairRDD x with key value pairs>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 1), ... ("b", 1), ("b", 1), ("b", 1), ("b", 1)], 3)# Applying reduceByKey operation on x>>> y = x.reduceByKey(lambda accum, n: accum + n) >>> y.collect() [('b', 5), ('a', 3)]# Define associative function separately >>>def sumFunc(accum, n): ... return accum + n ... >>> y = x.reduceByKey(sumFunc) >>> y.collect() [('b', 5), ('a', 3)]感謝大家捧場,客官慢走。