02、體驗Spark shell下RDD編程
1、Spark RDD介紹
RDD是Resilient Distributed Dataset,中文翻譯是彈性分布式數據集。該類是Spark是核心類成員之一,是貫穿Spark編程的始終。初期階段,我們可以把RDD看成是Java中的集合就可以了,在后面的章節中會詳細講解RDD的內部結構和工作原理。
2、Spark-shell下實現對本地文件的單詞統計
2.1思路
word count是大數據學習的經典案例,很多功能實現都可以歸結為是word count的使用。工作過程為使用SparkContext對象的textFile方法加載文件形成Spark RDD1,RDD1中每個元素就是文件中的每一行文本,然后對RDD的每個元素進行壓扁flatMap操作,形成RDD2,RDD2中每個元素是將RDD1的每行拆分出來產生的單詞,因此RDD2就是單詞的集合,然后再對RDD2進行標一成對,形成(單詞,1)的元組的集合RDD3,最后對RDD3進行按照key進行聚合操作形成RDD4,最終將RDD4計算后得到的集合就是每個單詞的數量
2.2 處理流程
App->SparkContext: textFile加載文件
SparkContext->RDD1: 創建RDD
RDD1-->App: 返回RDD1
App->RDD1: flatMap壓扁操作
RDD1->RDD2: 產生RDD2
RDD2-->App: 返回RDD2
App->RDD2: map標一成對
RDD2->RDD3: 產生RDD3
RDD3-->App: 返回RDD3
App->RDD3: reduceByKey聚合
RDD3->RDD4: 產生RDD4
RDD4-->App: 返回RDD4
App->RDD4: collect收集結果數據
2.3 分步實現代碼
// 進入spark shell環境
$>spark-shell// 1.加載文件
scala>val rdd1 = sc.textFile("file:///homec/centos/1.txt")// 2.壓扁每行
scala>val rdd2 = rdd1.flatMap(_.split(" ")) // 3.標1成對
scala>val rdd3 = rdd2.map(w=>(w,1))// 4.按照key聚合每個key下的所有值
scala>val rdd4 = rdd3.reduceByKey(_+_)// 5.顯式數據
scala>rdd4.collect()
2.4 一步實現代碼
$scala>sc.textFile("file:///home/centos/1.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
3、Spark-shell下實現對氣溫數據的最大最小聚合
3.1 思路分析
氣溫數據數各年度內氣溫列表,將每一行變換成(year,temp)元組后,按照yearn進行聚合即可。
3.2 處理流程
App->SparkContext: textFile加載文件
SparkContext->RDD1: 產生RDD1
RDD1-->App: 返回RDD1
App->RDD1: map變換每行為(year,(max,min))元組
RDD1->RDD2: 產生RDD2
RDD2-->App: 返回RDD2
App->RDD2: reduceByKey雙聚合氣溫極值
RDD2->RDD3:產生RDD3
App->RDD3: collect()收集結果
3.3 分步實現代碼
// 進入spark shell環境
$>spark-shell// 1.加載氣溫數據文件
scala>val rdd1 = sc.textFile("/home/centos/temps.dat")// 2.壓扁每行
scala>val rdd2 = rdd1.flatMap(e=>{val arr = e.split(" ")(arr(0).toInt, (arr(1).toInt ,arr(1).toInt))
}) // 3.reduceByKey
scala>val rdd3 = rdd2.reduceByKey((a,b)=>{import scala.math(math.max(a(0),b(0)) , math.min(a(1),b(1)))
})// 4.收集日志
scala>rdd3.collect()
3.4 一步實現代碼
$scala>sc.textFile("file:///home/centos/temps.dat").map(line=>{val arr = line.split(" ")(arr(0).toInt,(arr(1).toInt , arr(1).toInt))}).reduceByKey((a,b)=>{import scala.math(math.max(a(0) , b(0)) , math.min(a(1) , b(1)))}).collect()