本節課通過代碼實戰演示RDD中最重要的兩個算子,join和cogroup


join算子代碼實戰:

//通過代碼演示join算子
val?conf =?new?SparkConf().setAppName("RDDDemo").setMaster("local")
val?sc =?new?SparkContext(conf)
val?arr1 =?Array(Tuple2(1,?"Spark"),?Tuple2(2,?"Hadoop"),?Tuple2(3,?"Tachyon"))
val?arr2 =?Array(Tuple2(1,?100),?Tuple2(2,?70),?Tuple2(3,?90))
val?rdd1 = sc.parallelize(arr1)
val?rdd2 = sc.parallelize(arr2)


val?rdd3 = rdd1.join(rdd2)
rdd3.collect().foreach(println)


運行結果:

(1,(Spark,100))

(3,(Tachyon,90))

(2,(Hadoop,70))


cogroup算子代碼實戰:

首先通過java的方式編寫:

? ? ? ? SparkConf?conf?=?new?SparkConf().setMaster("local").setAppName("Cogroup");

????????JavaSparkContext?sc?=?new?JavaSparkContext(conf);


????????List<Tuple2<Integer,?String>>?nameList?=?Arrays.asList(new?Tuple2<Integer,?String>(1,?"Spark"),

????????????????new?Tuple2<Integer,?String>(2,?"Tachyon"),?new?Tuple2<Integer,?String>(3,?"Hadoop"));

????????List<Tuple2<Integer,?Integer>>?ScoreList?=?Arrays.asList(new?Tuple2<Integer,?Integer>(1,?100),

????????????????new?Tuple2<Integer,?Integer>(2,?95),?new?Tuple2<Integer,?Integer>(3,?80),

????????????????new?Tuple2<Integer,?Integer>(1,?80),?new?Tuple2<Integer,?Integer>(2,?110),

????????????????new?Tuple2<Integer,?Integer>(2,?90));


????????JavaPairRDD<Integer,?String>?names?=?sc.parallelizePairs(nameList);

????????JavaPairRDD<Integer,?Integer>?scores?=?sc.parallelizePairs(ScoreList);


????????JavaPairRDD<Integer,?Tuple2<Iterable<String>,?Iterable<Integer>>>?nameAndScores?=?names.cogroup(scores);

????????nameAndScores.foreach(new?VoidFunction<Tuple2<Integer,?Tuple2<Iterable<String>,?Iterable<Integer>>>>()?{

????????????public?void?call(Tuple2<Integer,?Tuple2<Iterable<String>,?Iterable<Integer>>>?t)?throws?Exception?{

????????????????System.out.println("ID:"?+?t._1);

????????????????System.out.println("Name:"?+?t._2._1);

????????????????System.out.println("Score:"?+?t._2._2);

????????????}

????????});

? ? ? ?sc.close();


運行結果:

ID:1

Name:[Spark]

Score:[100,?80]

ID:3

Name:[Hadoop]

Score:[80]

ID:2

Name:[Tachyon]

Score:[95,?110,?90]


通過Scala的方式:

val?conf =?new?SparkConf().setAppName("RDDDemo").setMaster("local")
val?sc =?new?SparkContext(conf)
val?arr1 =?Array(Tuple2(1,?"Spark"),?Tuple2(2,?"Hadoop"),?Tuple2(3,?"Tachyon"))
val?arr2 =?Array(Tuple2(1,?100),?Tuple2(2,?70),?Tuple2(3,?90),?Tuple2(1,?95),?Tuple2(2,?65),?Tuple2(1,?110))
val?rdd1 = sc.parallelize(arr1)
val?rdd2 = sc.parallelize(arr2)

val?rdd3 = rdd1.cogroup(rdd2)
rdd3.collect().foreach(println)
sc.stop()


運行結果:

(1,(CompactBuffer(Spark),CompactBuffer(100,?95,?110)))

(3,(CompactBuffer(Tachyon),CompactBuffer(90)))

(2,(CompactBuffer(Hadoop),CompactBuffer(70,?65)))


備注:

資料來源于:DT_大數據夢工廠(Spark發行版本定制)

更多私密內容,請關注微信公眾號:DT_Spark

如果您對大數據Spark感興趣,可以免費聽由王家林老師每天晚上20:00開設的Spark永久免費公開課,地址YY房間號:68917580