04-240606Spark筆記
1.行動算子-2
-
save相關算子:
格式:
def saveAsTextFile(path: String): Unit def saveAsObjectFile(path: String): Unit def saveAsSequenceFile(path: String,codec: Option[Class[_ <: CompressionCodec]] = None): Unit
例子:
?val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3))) ?rdd.saveAsTextFile("output")rdd.saveAsObjectFile("output1")// saveAsSequenceFile方法要求數據的格式必須為K-V類型rdd.saveAsSequenceFile("output2")
輸出結果:
-
foreach
格式:
def foreach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
例子:
? ?val rdd = sc.makeRDD(List(1,2,3,4)) ?//foreach 其實是Driver端內存集合的循環遍歷方法rdd.collect().foreach(println) //Driverprintln("***************")// foreach 其實是Executor端內存數據打印rdd.foreach(println) // Executor// 算子 : Operator(操作)// ? ? ? ? RDD的方法和Scala集合對象的方法不一樣// ? ? ? ? 集合對象的方法都是在同一個節點的內存中完成的。// ? ? ? ? RDD的方法可以將計算邏輯發送到Executor端(分布式節點)執行// ? ? ? ? 為了區分不同的處理效果,所以將RDD的方法稱之為算子。// ? ? ? RDD的方法外部的操作都是在Driver端執行的,而方法內部的邏輯代碼是在Executor端執行。
輸出結果:
2. 序列化
2.1 閉包檢測
-
閉包檢測
因為Driver需要給兩個Executor共享User方法,共享就需要序列化
案例:
?def main(args: Array[String]): Unit = { ?val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf) ?val rdd = sc.makeRDD(List[Int]()) ?val user = new User() ?// SparkException: Task not serializable// NotSerializableException: com.atguigu.bigdata.spark.core.rdd.operator.action.Spark07_RDD_Operator_Action$User ?// RDD算子中傳遞的函數是會包含閉包操作,那么就會進行檢測功能// 閉包檢測rdd.foreach(num => {println("age = " + (user.age + num))}) ?sc.stop() ?}//class User extends Serializable {// 樣例類在編譯時,會自動混入序列化特質(實現可序列化接口)//case class User() {class User {var age : Int = 30}
-
RDD 的分區器
自己來寫分區器:
? ?def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf) ?val rdd = sc.makeRDD(List(("nba", "xxxxxxxxx"),("cba", "xxxxxxxxx"),("wnba", "xxxxxxxxx"),("nba", "xxxxxxxxx"),),3)val partRDD: RDD[(String, String)] = rdd.partitionBy( new MyPartitioner ) ?partRDD.saveAsTextFile("output") ?sc.stop()}
自定義的分區器:
? ?class MyPartitioner extends Partitioner{// 分區數量override def numPartitions: Int = 3 ?// 根據數據的key值返回數據所在的分區索引(從0開始)override def getPartition(key: Any): Int = {key match {case "nba" => 0case "wnba" => 1case _ => 2}}}
* 自定義分區器 * 1. 繼承Partitioner * 2. 重寫方法
輸出結果:
-
RDD 文件讀取與保存
案例1:
? ?def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf) ?val rdd = sc.textFile("output1")println(rdd.collect().mkString(",")) ?val rdd1 = sc.objectFile[(String, Int)]("output2")println(rdd1.collect().mkString(",")) ?val rdd2 = sc.sequenceFile[String, Int]("output3")println(rdd2.collect().mkString(",")) ?sc.stop()}
輸出結果:
案例2:
? ?def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")val sc = new SparkContext(sparConf) ?val rdd = sc.makeRDD(List(("a", 1),("b", 2),("c", 3))) ?rdd.saveAsTextFile("output1")rdd.saveAsObjectFile("output2")rdd.saveAsSequenceFile("output3") ?sc.stop()}
輸出結果:
1. 數據結構:
-
累加器
累加器用來把 Executor 端變量信息聚合到 Driver 端。
: Unit = { ?val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf) ?val rdd = sc.makeRDD(List(1,2,3,4)) ?// reduce : 分區內計算,分區間計算//val i: Int = rdd.reduce(_+_)//println(i)var sum = 0rdd.foreach(num => {sum += num})println("sum = " + sum) ?sc.stop() ?}
-
系統累加器
案例:
? ?def main(args: Array[String]): Unit = { ?val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf) ?val rdd = sc.makeRDD(List(1,2,3,4)) ?// 獲取系統累加器// Spark默認就提供了簡單數據聚合的累加器val sumAcc = sc.longAccumulator("sum") ?//sc.doubleAccumulator//sc.collectionAccumulator ?rdd.foreach(num => {// 使用累加器sumAcc.add(num)}) ?// 獲取累加器的值println(sumAcc.value) ?sc.stop() ?}
累加器的一些特殊情況:
少加:轉換算子中調用累加器,如果沒有行動算子的話,那么不會執行 多加:轉換算子中調用累加器,如果沒有行動算子的話,那么不會執行 一般情況下,累加器會放置在行動算子進
? ?def main(args: Array[String]): Unit = { ?val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf) ?val rdd = sc.makeRDD(List(1,2,3,4)) ?// 獲取系統累加器// Spark默認就提供了簡單數據聚合的累加器val sumAcc = sc.longAccumulator("sum") ?//sc.doubleAccumulator//sc.collectionAccumulator ?val mapRDD = rdd.map(num => {// 使用累加器sumAcc.add(num)num}) ?// 獲取累加器的值// 少加:轉換算子中調用累加器,如果沒有行動算子的話,那么不會執行// 多加:轉換算子中調用累加器,如果沒有行動算子的話,那么不會執行// 一般情況下,累加器會放置在行動算子進行操作mapRDD.collect()mapRDD.collect()println(sumAcc.value) ?sc.stop() ?}
-
自定義累加器
分布式共享只寫變量
案例:
? ?def main(args: Array[String]): Unit = { ?val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf) ?val rdd = sc.makeRDD(List("hello", "spark", "hello")) ?// 累加器 : WordCount// 創建累加器對象val wcAcc = new MyAccumulator()// 向Spark進行注冊sc.register(wcAcc, "wordCountAcc") ?rdd.foreach(word => {// 數據的累加(使用累加器)wcAcc.add(word)}) ?// 獲取累加器累加的結果println(wcAcc.value) ?sc.stop() ?}/*自定義數據累加器:WordCount ?1. 繼承AccumulatorV2, 定義泛型IN : 累加器輸入的數據類型 StringOUT : 累加器返回的數據類型 mutable.Map[String, Long] ?2. 重寫方法(6)*/class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] { ?private var wcMap = mutable.Map[String, Long]() ?// 判斷是否初始狀態override def isZero: Boolean = {wcMap.isEmpty} ?override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new MyAccumulator()} ?override def reset(): Unit = {wcMap.clear()} ?// 獲取累加器需要計算的值override def add(word: String): Unit = {val newCnt = wcMap.getOrElse(word, 0L) + 1wcMap.update(word, newCnt)} ?// Driver合并多個累加器override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = { ?val map1 = this.wcMapval map2 = other.value ?map2.foreach{case ( word, count ) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}} ?// 累加器結果override def value: mutable.Map[String, Long] = {wcMap}}
-
廣播變量
實現原理:
廣播變量用來高效分發較大的對象。在多個并行操作中使用同一個變量,但是 Spark 會為每個任務
分別發送。
案例:
? ?def main(args: Array[String]): Unit = { ?val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf) ?val rdd1 = sc.makeRDD(List(("a", 1),("b", 2),("c", 3))) // ? ? ? val rdd2 = sc.makeRDD(List( // ? ? ? ? ? ("a", 4),("b", 5),("c", 6) // ? ? ? ))val map = mutable.Map(("a", 4),("b", 5),("c", 6)) ? ? ?// join會導致數據量幾何增長,并且會影響shuffle的性能,不推薦使用//val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)//joinRDD.collect().foreach(println)// (a, 1), ? (b, 2), ? (c, 3)// (a, (1,4)),(b, (2,5)),(c, (3,6))rdd1.map {case (w, c) => {val l: Int = map.getOrElse(w, 0)(w, (c, l))}}.collect().foreach(println) ? ? ?sc.stop() ?}
join會導致數據量幾何增長,并且會影響shuffle的性能,不推薦使用
Spark 中的廣播變量就可以將閉包的數據保存到Executor的內存中
Spark 中的廣播變量不能更改 : 分布式共享只讀變量
封裝廣播變量1
案例:
? ?def main(args: Array[String]): Unit = { ?val sparConf = new SparkConf().setMaster("local").setAppName("Acc")val sc = new SparkContext(sparConf) ?val rdd1 = sc.makeRDD(List(("a", 1),("b", 2),("c", 3)))val map = mutable.Map(("a", 4),("b", 5),("c", 6)) ?// 封裝廣播變量val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map) ?rdd1.map {case (w, c) => {// 方法廣播變量val l: Int = bc.value.getOrElse(w, 0)(w, (c, l))}}.collect().foreach(println) ? ? ?sc.stop() ?}