一。概念
rdd.combineByKey(lambda x:"%d_" %x, lambda a,b:"%s@%s" %(a,b), lambda a,b:"%s$%s" %(a,b))
三個參數(都是函數)
第一個參數:給定一個初始值,用函數生成初始值。
第二個參數:combinbe聚合邏輯。
第三個參數:reduce端聚合邏輯。
二。代碼
from pyspark.conf import SparkConf from pyspark.context import SparkContext conf = SparkConf().setMaster("local").setAppName("CombineByKey") sc = SparkContext(conf = conf) rdd = sc.parallelize([("A",1),("B",2),("B",3),("B",4),("B",5),("C",1),("A",2)], 2) def f(index,items):print "partitionId:%d" %indexfor val in items:print valreturn items rdd.mapPartitionsWithIndex(f).count()combinerRDD = rdd.combineByKey(lambda x:"%d_" %x, lambda a,b:"%s@%s" %(a,b), lambda a,b:"%s$%s" %(a,b)) combinerRDD.foreach(p) groupByKeyRDD.foreach(p)sc.stop()
三。解釋
第一個函數作用于每一個組的第一個元素上,將其變為初始值
第二個函數:一開始a是初始值,b是分組內的元素值,比如A[1_],因為沒有b值所以不能調用combine函數,第二組因為函數內元素值是[2_,3]調用combine函數后為2_@3,以此類推
第三個函數:reduce端大聚合,把相同的key的數據拉取到一個節點上,然后分組。
?
四。結果
?五。拓展
1.用combinebykey實現groupbykey的邏輯
1.1 combinebykey的三個參數
第一個應該返回一個列表,初始值
第二個函數中的a依賴于第一個函數的返回值
第三個函數的a,b依賴于第二個函數的返回值
1.2 解釋:
?
1.3 代碼:
def mergeValue(list1,b):list1.append(b)return list1def mergeCombiners(list1,list2):list1.extend(list2)return list1groupByKeyRDD = rdd.combineByKey(lambda a:[a],mergeValue,mergeCombiners)
?
1.4結果
?
2.使用combineBykey把相同的key和對應的邏輯相加起來
代碼:
reduceByKeyRDD = rdd.combineByKey(lambda a:a,lambda a,b:a+b,lambda a,b:a+b)
結果:
?
持續更新中。。。。,歡迎大家關注我的公眾號LHWorld.