文章目錄
RDD-Resilient Distributed Dataset
一、RDD五大特性
二、RDD創建方式
RDD-Resilient Distributed Dataset
在 Apache Spark 編程中,RDD(Resilient Distributed Dataset,彈性分布式數據集)是 Spark Core 中最基本的數據抽象,代表一個不可變、可分區、可并行計算的元素集合。RDD 允許用戶在集群上以容錯的方式執行計算。
一、RDD五大特性
首先回顧下Spark WordCount的核心代碼流程:
sc.textFile("...").flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).foreach(println)
結合以上代碼,我們理解RDD的五大特性,RDD理解圖如下:
RDD五大特性:
1) RDD由一系列Partition組成(A list of partitions)
RDD由多個Partition組成,這些Partition分布在集群的不同節點上。如果讀取的是HDFS中的數據,每個partition對應一個Split,每個Split大小默認與每個Block大小一樣,。
2) 函數是作用在每個Partition(Split)上的(A function for computing each split)
RDD 定義了在每個分區上進行計算的函數,例如 flatMap、map 等操作,這些函數對每個分區中的數據進行處理。
3) RDD之間有依賴關系(A list of dependencies on other RDDs)
RDD之間存在依賴關系,上圖中RDD2可以基于RDD1生成,RDD1叫做父RDD,RDD2叫做子RDD。
4) 分區器作用在K,V格式的RDD上(Optionally, a Partitioner for key-value RDDs)
上圖中RDD3中的數據是Tuple類型,這種類型叫做K,V格式的RDD。Spark分區器作用是決定數據發往下游RDD哪個Partition中,分區器只能作用在這種K,V格式的RDD中,默認根據Key的hash值與下游RDD的Partition個數取模決定該條數據去往下游RDD的哪個Paritition中。
5) RDD提供一系列最佳的計算位置(Optionally, a list of preferred locations to compute each split on)
RDD 提供每個分區的最佳計算位置,通常是數據所在的節點,這樣可以將計算task調度到數據所在的位置,減少數據傳輸,提高計算效率(計算移動,數據不移動原則)。
關于RDD的注意點如下:
- textFile底層讀取文件方式與MR讀取文件方式類似,首先對數據split,默認Split是一個block大小。
- 讀取數據文件時,RDD的Paritition個數默認與Split個數相同,也可以在創建RDD的時候指定,Partition是分布在不同節點上的。
- RDD雖然叫做數據集,但實際上不存儲數據,RDD類似迭代器,對象不可變,處理數據時,下游RDD會依次向上游RDD獲取對應數據,這就是RDD之間為什么有依賴關系的原因。
- 如果RDD中數據類型為二元組對象,那么這種RDD我們稱作K,V格式的RDD。
- RDD的彈性體現在RDD中Partition個數可以由用戶設置、RDD可以根據依賴關系基于上一個RDD按照迭代器方式計算出下游RDD。
- RDD提供最佳計算位置,task發送到相應的partition節點上處理數據,體現了“計算移動,數據不移動”的理念。
二、RDD創建方式
在Spark中創建RDD可以通過讀取集合、讀取文件方式創建,還可以基于已有RDD轉換創建,后續我們主要使用第三種方式,這里先介紹前兩種方式。下面分別使用Java和Scala API演示RDD的創建。
- Java API
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("generateRDD");
JavaSparkContext sc = new JavaSparkContext(conf);
//1.從集合中創建RDD,并指定并行度為3,默認并行度為1
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b", "c", "d"),3);
System.out.println("rdd1并行度為:"+rdd1.getNumPartitions());
rdd1.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}
});//2.從集合創建K,V格式RDD
JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("a", 1),new Tuple2<String, Integer>("b", 2),new Tuple2<String, Integer>("c", 3),new Tuple2<String, Integer>("d", 4)
));rdd2.foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> tp) throws Exception {System.out.println(tp._1 + " " + tp._2);}
});//3.從文件中創建RDD,并指定并行度為3,默認并行度為1
JavaRDD<String> rdd3 = sc.textFile("./data/data.txt",3);
System.out.println("rdd3并行度為:"+rdd3.getNumPartitions());
rdd3.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}
});
- Scala API
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("GenerateRDDTest")
val sc = new SparkContext(conf)
//1.從集合創建RDD,并指定并行度為3,默認并行度為1
val rdd1 =sc.parallelize(1 to 20,3)
println(s"rdd1 并行度為:${rdd1.getNumPartitions}")
rdd1.foreach(println)//2.從集合創建K,V格式RDD
val rdd1KV:RDD[(String,Int)] = sc.parallelize(Array(("a",1),("b",2),("c",3),("d",4),("e",5)))
println(s"rdd1KV 并行度為:${rdd1KV.getNumPartitions}")
rdd1KV.foreach(println)//3.從集合創建RDD,并指定并行度為3,默認并行度為1
val rdd2 =sc.makeRDD(1 to 20,3)
println(s"rdd2 并行度為:${rdd2.getNumPartitions}")
rdd2.foreach(println)//4.從文件創建RDD,并指定并行度為3,默認并行度為1
val rdd3 = sc.textFile("./data/data.txt",3)
println(s"rdd3 并行度為:${rdd2.getNumPartitions}")
rdd3.foreach(println)
注意以下兩點:
1、無論是基于集合或者文件創建RDD,默認RDD分區數為1,也可以在創建時指定RDD paritition個數;
2、Scala API中parallelize方法可以從集合中得到K,V或者非K,V格式RDD,還可以通過makeRDD方法讀取集合轉換成RDD。formation算子對RDD進行轉換處理。
- 📢博客主頁:https://lansonli.blog.csdn.net
- 📢歡迎點贊 👍 收藏 ?留言 📝 如有錯誤敬請指正!
- 📢本文由 Lansonli 原創,首發于 CSDN博客🙉
- 📢停下休息的時候不要忘了別人還在奔跑,希望大家抓緊時間學習,全力奔赴更美好的生活?