本節課通過代碼實戰演示RDD中最重要的兩個算子,join和cogroup
join算子代碼實戰:
//通過代碼演示join算子
val?conf =?new?SparkConf().setAppName("RDDDemo").setMaster("local")
val?sc =?new?SparkContext(conf)
val?arr1 =?Array(Tuple2(1,?"Spark"),?Tuple2(2,?"Hadoop"),?Tuple2(3,?"Tachyon"))
val?arr2 =?Array(Tuple2(1,?100),?Tuple2(2,?70),?Tuple2(3,?90))
val?rdd1 = sc.parallelize(arr1)
val?rdd2 = sc.parallelize(arr2)
val?rdd3 = rdd1.join(rdd2)
rdd3.collect().foreach(println)
運行結果:
(1,(Spark,100))
(3,(Tachyon,90))
(2,(Hadoop,70))
cogroup算子代碼實戰:
首先通過java的方式編寫:
? ? ? ? SparkConf?conf?=?new?SparkConf().setMaster("local").setAppName("Cogroup");
????????JavaSparkContext?sc?=?new?JavaSparkContext(conf);
????????List<Tuple2<Integer,?String>>?nameList?=?Arrays.asList(new?Tuple2<Integer,?String>(1,?"Spark"),
????????????????new?Tuple2<Integer,?String>(2,?"Tachyon"),?new?Tuple2<Integer,?String>(3,?"Hadoop"));
????????List<Tuple2<Integer,?Integer>>?ScoreList?=?Arrays.asList(new?Tuple2<Integer,?Integer>(1,?100),
????????????????new?Tuple2<Integer,?Integer>(2,?95),?new?Tuple2<Integer,?Integer>(3,?80),
????????????????new?Tuple2<Integer,?Integer>(1,?80),?new?Tuple2<Integer,?Integer>(2,?110),
????????????????new?Tuple2<Integer,?Integer>(2,?90));
????????JavaPairRDD<Integer,?String>?names?=?sc.parallelizePairs(nameList);
????????JavaPairRDD<Integer,?Integer>?scores?=?sc.parallelizePairs(ScoreList);
????????JavaPairRDD<Integer,?Tuple2<Iterable<String>,?Iterable<Integer>>>?nameAndScores?=?names.cogroup(scores);
????????nameAndScores.foreach(new?VoidFunction<Tuple2<Integer,?Tuple2<Iterable<String>,?Iterable<Integer>>>>()?{
????????????public?void?call(Tuple2<Integer,?Tuple2<Iterable<String>,?Iterable<Integer>>>?t)?throws?Exception?{
????????????????System.out.println("ID:"?+?t._1);
????????????????System.out.println("Name:"?+?t._2._1);
????????????????System.out.println("Score:"?+?t._2._2);
????????????}
????????});
? ? ? ?sc.close();
運行結果:
ID:1
Name:[Spark]
Score:[100,?80]
ID:3
Name:[Hadoop]
Score:[80]
ID:2
Name:[Tachyon]
Score:[95,?110,?90]
通過Scala的方式:
val?conf =?new?SparkConf().setAppName("RDDDemo").setMaster("local")
val?sc =?new?SparkContext(conf)
val?arr1 =?Array(Tuple2(1,?"Spark"),?Tuple2(2,?"Hadoop"),?Tuple2(3,?"Tachyon"))
val?arr2 =?Array(Tuple2(1,?100),?Tuple2(2,?70),?Tuple2(3,?90),?Tuple2(1,?95),?Tuple2(2,?65),?Tuple2(1,?110))
val?rdd1 = sc.parallelize(arr1)
val?rdd2 = sc.parallelize(arr2)
val?rdd3 = rdd1.cogroup(rdd2)
rdd3.collect().foreach(println)
sc.stop()
運行結果:
(1,(CompactBuffer(Spark),CompactBuffer(100,?95,?110)))
(3,(CompactBuffer(Tachyon),CompactBuffer(90)))
(2,(CompactBuffer(Hadoop),CompactBuffer(70,?65)))
備注:
資料來源于:DT_大數據夢工廠(Spark發行版本定制)
更多私密內容,請關注微信公眾號:DT_Spark
如果您對大數據Spark感興趣,可以免費聽由王家林老師每天晚上20:00開設的Spark永久免費公開課,地址YY房間號:68917580
轉載于:https://blog.51cto.com/18610086859/1773197