一、算子列表
編號 | 名稱 |
1 | map算子 |
2 | flatMap算子 |
3 | filter算子 |
4 | mapPartitions算子 |
5 | mapPartitionsWithIndex算子 |
6 | keys算子 |
7 | values算子 |
8 | mapValues算子 |
9 | flatMaplValues算子 |
10 | union算子 |
11 | reducedByKey算子 |
12 | combineByKey算子 |
13 | groupByKey算子 |
14 | foldByKey算子 |
15 | aggregateByKey算子 |
16 | ShuffledRDD算子 |
17 | distinct算子 |
18 | partitionBy算子 |
?二、代碼示例
package sparkCoreimport org.apache.hadoop.mapreduce.task.reduce.Shuffle
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.{Aggregator, HashPartitioner, SparkConf, SparkContext, TaskContext}/*** spark基本算子*/object basi_transform_02 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("transform").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//1. map算子val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7),2)val map_rdd: RDD[Int] = rdd1.map(_ * 2)println("*****1. map算子************")map_rdd.foreach(println(_))//2.flatMap算子println("*****2.flatMap算子************")val arr: Array[String] = Array("Hive python spark","Java Hello Word")val rdd2: RDD[String] = sc.makeRDD(arr, 2)val flatMap_rdd: RDD[String] = rdd2.flatMap(_.split(" "))flatMap_rdd.foreach(println(_))//3.filter算子println("*****3.filter算子***********")val rdd3: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 4, 5, 4, 4, 3, 10))val filter_rdd :RDD[Int]= rdd3.filter(_ % 2 == 0)filter_rdd.foreach(println(_))//4. mapPartitions算子:將數據以分區的形式返回,進行map操作,一個分區對應一個迭代器// 應用場景: 比如在進行數據庫操作時,在操作數據之前,需要通過JDBC方式連接數據庫,如果使用map,那每條數據處理之前// 都需要連接一次數據庫,效率顯然很低.如果使用mapPartitions,則每個分區連接一次即可println("*****4. mapPartitions算子**********")val rdd4: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 4, 5, 4, 4, 3, 10),2)val mapParition_rdd: RDD[Int] = rdd4.mapPartitions(iter => {print("模擬數據庫連接操作")iter.map(_ * 2)})mapParition_rdd.foreach(println(_))//5. mapPartitionsWithIndex算子,類似于mapPartitions,不過有兩個參數// 第一個參數是分區索引,第二個是對應的迭代器// 注意:函數返回的是一個迭代器println("*****5. mapPartitionsWithIndex算子**********")val rdd5: RDD[Int] = sc.parallelize(List(10, 20, 30, 40, 60),2)val mapPartitionWithIndex_Rdd: RDD[String] = rdd5.mapPartitionsWithIndex((index, it) => {it.map(e => s"partition:$index,val:$e")})mapPartitionWithIndex_Rdd.foreach(println(_))//6.keys算子: RDD中的數據是【對偶元組】類型,返回【對偶元組】的全部keyprintln("*****6.keys算子**********")val lst: List[(String, Int)] = List(("spark", 1), ("spark", 3), ("hive", 2),("Java", 1), ("Scala", 3), ("Python", 2))val rdd6: RDD[(String, Int)] = sc.parallelize(lst)val keysRdd: RDD[String] = rdd6.keyskeysRdd.foreach(println(_))//7.values: RDD中的數據是【對偶元組】類型,返回【對偶元組】的全部valueprintln("*****7.values算子**********")val values_RDD: RDD[Int] = rdd6.valuesvalues_RDD.foreach(println(_))//8.mapValues: RDD中的數據為對偶元組類型, 將value進行計算,然后與原Key進行組合返回(即返回的仍然是元組)println("*****8.mapValues算子**********")val lst2: List[(String, Int)] = List(("Hello", 1), ("world", 2),("I", 2), ("love", 3), ("you", 2))val rdd8: RDD[(String, Int)] = sc.parallelize(lst2, 2)val mapValues_rdd: RDD[(String, Int)] = rdd8.mapValues(_ * 10)mapValues_rdd.foreach(println(_))//9.flatMaplValues:RDD是對偶元組,將value應用傳入flatMap打平后,再與key組合println("*****9.flatMaplValues算子**********")// ("spark","1 2 3") => ("spark",1),("spark",2),("spark",3)val lst3: List[(String,String )] = List(("Hello", "1 2 3"), ("world", "4 5 6"),)val rdd9: RDD[(String, String)] = sc.parallelize(lst3)// 第一個_是指初始元組中的value;第二個_是指value拆分后的每一個值(轉換成整數)val flatMapValues: RDD[(String, Int)] = rdd9.flatMapValues(_.split(" ").map(_.toInt))flatMapValues.foreach(println(_))//10.union:將兩個類型一樣的RDD合并到一起,返回一個新的RDD,新的RDD分區數量是兩個RDD分區數量之和println("*****10.union算子**********")val union_rdd1 = sc.parallelize(List(1, 2, 3), 2)val union_rdd2 = sc.parallelize(List(4, 5, 6), 3)val union_rdd: RDD[Int] = union_rdd1.union(union_rdd2)union_rdd.foreach(println(_))//11.reducedByKey,在每個分區中進行局部分組聚合,然后將每個分區聚合的結果從上游拉到下游再進行全局分組聚合println("*****11.reducedByKey算子**********")val lst4: List[(String, Int)] = List(("spark", 1), ("spark", 1), ("hive", 3),("Python", 1), ("Java", 1), ("Scala", 3),("flink", 1), ("Mysql", 1), ("hive", 3))val rdd11: RDD[(String, Int)] = sc.parallelize(lst4, 2)val reduced_rdd: RDD[(String, Int)] = rdd11.reduceByKey(_ + _)reduced_rdd.foreach(println(_))//12.combineByKey:相比reducedByKey更底層的方法,后者分區內和分區之間相同Key對應的value值計算邏輯相同,但是前者可以分別定義不同的// 的計算邏輯.combineByKey 需要傳入三個函數作為參數:// 其中第一個函數:key在上游分區第一次出現時,對應的value該如何處理// 第二個函數:分區內相同key對應value的處理邏輯// 第三個函數: 分區間相同Key對應value的處理邏輯println("*****12.combineByKey算子**********")val f1 = (v:Int) => {val stage = TaskContext.get().stageId()val partition = TaskContext.getPartitionId()println(s"f1 function invoked in stage: $stage,partiton:$partition")v}//分區內相同key對應的value使用乘積val f2 = (a:Int,b:Int) => {val stage = TaskContext.get().stageId()val partition = TaskContext.getPartitionId()println(s"f2 function invoked in stage: $stage,partiton:$partition")a * b}//分區間相同key對應的value使用加法val f3 = (m:Int,n:Int) => {val stage = TaskContext.get().stageId()val partition = TaskContext.getPartitionId()println(s"f3 function invoked in stage: $stage,partiton:$partition")m + n}val rdd12: RDD[(String, Int)] = sc.parallelize(lst4,2)val combineByKey_rdd: RDD[(String, Int)] = rdd12.combineByKey(f1, f2, f3)combineByKey_rdd.foreach(println(_))//13.groupByKey:按key進行分組,返回的是(key,iter(value集合)println("*****13.groupByKey算子**********")val rdd13: RDD[(String, Int)] = sc.parallelize(lst4, 3)val groupByKey_rdd: RDD[(String, Iterable[Int])] = rdd13.groupByKey()groupByKey_rdd.foreach(println(_))//14.foldByKey:每個分區應??次初始值,先在每個進?局部聚合,然后再全局聚合(注意全局聚合的時候,初始值并不會被用到)// 局部聚合的邏輯與全局聚合的邏輯相同println("*****14.foldByKey算子**********")val lst5: List[(String, Int)] = List(("maple", 1), ("kelly", 1), ("Avery", 1),("maple", 1), ("kelly", 1), ("Avery", 1))val rdd14: RDD[(String, Int)] = sc.parallelize(lst5)val foldByKey_rdd: RDD[(String, Int)] = rdd14.foldByKey(1)(_ + _)foldByKey_rdd.foreach(println(_))//15.aggregateByKey:foldByKey,并且可以指定初始值,每個分區應??次初始值,傳?兩個函數,分別是局部聚合的計算邏輯// 和全局聚合的邏輯println("*****15.aggregateByKey算子**********")val rdd15: RDD[(String, Int)] = sc.parallelize(lst5)val aggregateByKey_rdd: RDD[(String, Int)] = rdd15.aggregateByKey(1)(_ + _,_ * _ )aggregateByKey_rdd.foreach(print(_))//16 ShuffledRDD:reduceByKey、combineByKey、aggregateByKey、foldByKey底層都是使?的ShuffledRDD,// 并且 mapSideCombine = trueprintln("*****16.ShuffledRDD算子**********")val rdd16: RDD[(String, Int)] = sc.parallelize(lst5,2)val partitioner = new HashPartitioner(rdd16.partitions.length)// 對rdd16按照指定分區器進行分區// String是rdd16中Key的數據類型,第一個Int是rdd16中value的數據類型,第二個Int是中間結果的數據類型(當然前提是傳入聚合器-里面包含計算邏輯// [可以據此知曉中間結果的數據類型])val shuffledRDD: ShuffledRDD[String, Int, Int] = new ShuffledRDD[String, Int, Int](rdd16,partitioner)// 設置一個聚合器: 指定rdd16的計算邏輯(包含三個函數,分別是分區內一個key對應value的處理邏輯;分區內相同key對應value計算邏輯// 和分區間相同Key對應value計算邏輯)val aggregator: Aggregator[String, Int, Int] = new Aggregator[String, Int, Int](f1, f2, f3)// 給shuffledRDD設置聚合器shuffledRDD.setAggregator(aggregator)shuffledRDD.setMapSideCombine(true) // 設置Map端聚合println(shuffledRDD.collect().toList)// 17.distinct算子:對RDD元素進行去重println("*****17.distinct算子**********")val lst6: Array[String] = Array("spark", "spark", "hive","Python", "Python", "Java")val rdd17: RDD[String] = sc.parallelize(lst6)val distinct_rdd: RDD[String] = rdd17.distinct()println(distinct_rdd.collect().toList)// 18.partitionBy: 按照指定的分區器進行分區(底層使用的是ShuffleRDD)println("***** 18.partitionBy算子**********")val rdd18: RDD[(String,Int)] = sc.parallelize(lst5,2)val partitioner2 = new HashPartitioner(rdd18.partitions.length)val partitioned_rdd: RDD[(String, Int)] = rdd18.partitionBy(partitioner2)println(partitioned_rdd.collect().toList)sc.stop()}
}



