本博文的主要內容是:
1、rdd基本操作實戰
2、transformation和action流程圖
3、典型的transformation和action
?
?
?
RDD有3種操作:
1、? Trandformation ?????對數據狀態的轉換,即所謂算子的轉換
2、? Action??? 觸發作業,即所謂得結果的
3、? Contoller? 對性能、效率和容錯方面的支持,如cache、persist、checkpoint
Contoller包括cache、persist、checkpoint。
?
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
傳入類型是T,返回類型是U。
?
?
?
元素之間,為什么reduce操作,要符合結合律和交換律?
答:因為,交換律,不知,哪個數據先過來。所以,必須符合交換律。
在交換律基礎上,想要reduce操作,必須要符合結合律。
/**
* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
RDD.scala(源碼)
這里,新建包com.zhouls.spark.cores
package com.zhouls.spark.cores
/**
* Created by Administrator on 2016/9/27.
*/
object TextLines {
}
下面,開始編代碼
本地模式
自動 ,會寫好
源碼來看,
所以,?val lines = sc.textFile("C:\\Users\\Administrator\\Desktop\\textlines.txt") //通過HadoopRDD以及MapPartitionsRDD獲取文件中每一行的內容本身
?
?
val lineCount = lines.map(line => (line,1)) //每一行變成行的內容與1構成的Tuple
val textLines = lineCount.reduceByKey(_+_)
textLines.collect.foreach(pair => println(pair._1 + ":" + pair._2))
?成功!
?現在,將此行代碼,
textLines.collect.foreach(pair => println(pair._1 + ":" + pair._2))
改一改
textLines.foreach(pair => println(pair._1 + ":" + pair._2))
總結:
本地模式里,
textLines.collect.foreach(pair => println(pair._1 + ":" + pair._2))
改一改
textLines.foreach(pair => println(pair._1 + ":" + pair._2))
運行正常,因為在本地模式下,是jvm,但這樣書寫,是不正規的。
?
?
集群模式里,
textLines.collect.foreach(pair => println(pair._1 + ":" + pair._2))
改一改
textLines.foreach(pair => println(pair._1 + ":" + pair._2))
運行無法通過,因為結果是分布在各個節點上。
collect源碼:
/**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
得出,collect后array中就是一個元素,只不過這個元素是一個Tuple。
Tuple是元組。通過concat合并!
foreach源碼:
/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
?
rdd實戰(rdd基本操作實戰)至此!
?
?
?
?
?rdd實戰(transformation流程圖)
?拿wordcount為例!
?
啟動hdfs集群
spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh
?
?
?啟動spark集群
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh
?
?
啟動spark-shell
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g
?
?
scala> val partitionsReadmeRdd = ?sc.textFile("hdfs://SparkSingleNode:9000/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).saveAsTextFile("~/partition1README.txt")
?或者
?scala> val readmeRdd = sc.textFile("hdfs://SparkSingleNode:9000/README.md")
?scala> ?val partitionsReadmeRdd = readmeRdd.flatMap(_.split(" ")).map(word => (word,1)).reduceByKey(_+_,1)
.saveAsTextFile("~/partition1README.txt")
?
注意,~目錄,不是這里。
?
?
?
?為什么,我的,不是這樣的顯示呢?
?
?
?
RDD的transformation和action執行的流程圖
?
?
典型的transformation和action