Value類型:
9) distinct
??函數簽名
def distinct()(implicit?ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit?ord: Ordering[T] = null): RDD[T]
??函數說明
將數據集中重復的數據去重
?
val?dataRDD?=?sparkContext.makeRDD(List(
?1,2,3,4,1,2
))
val?dataRDD1 =?dataRDD.distinct()
val?dataRDD2 =?dataRDD.distinct(2)
?
10) coalesce
??函數簽名
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] =?Option.empty)?
(implicit?ord: Ordering[T] = null)
: RDD[T]
??函數說明
根據數據量縮減分區,用于大數據集過濾后,提高小數據集的執行效率
當?spark 程序中,存在過多的小任務的時候,可以通過 coalesce 方法,收縮合并分區,減少分區的個數,減小任務調度成本
val?dataRDD?=?sparkContext.makeRDD(List(
?1,2,3,4,1,2
),6)
val?dataRDD1 =?dataRDD.coalesce(2)
?
11) repartition
??函數簽名
def repartition(numPartitions: Int)(implicit?ord: Ordering[T] = null): RDD[T]
??函數說明
該操作內部其實執行的是?coalesce 操作,參數 shuffle 的默認值為 true。無論是將分區數多的RDD 轉換為分區數少的 RDD,還是將分區數少的 RDD 轉換為分區數多的 RDD,repartition操作都可以完成,因為無論如何都會經?shuffle 過程。
val?dataRDD?=?sparkContext.makeRDD(List(
?1,2,3,4,1,2
),2)
val?dataRDD1 =?dataRDD.repartition(4)
?
12)?sortBy
??函數簽名
def?sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int =?this.partitions.length)?
(implicit?ord: Ordering[K],?ctag:?ClassTag[K]): RDD[T]
??函數說明
該操作用于排序數據。在排序之前,可以將數據通過f 函數進行處理,之后按照 f 函數處理的結果進行排序,默認為升序排列。排序后新產生的?RDD 的分區數與原 RDD 的分區數一致。中間存在?shuffle 的過程。
val?dataRDD?=?sparkContext.makeRDD(List(
?1,2,3,4,1,2
),2)
val?dataRDD1 =?dataRDD.sortBy(num=>num,?false,?4)
val?dataRDD2 =?dataRDD.sortBy(num=>num,?true,?4)
?
?
雙Value類型:
13) intersection
??函數簽名
def intersection(other: RDD[T]): RDD[T]
??函數說明
對源?RDD 和參數 RDD 求交集后返回一個新的 RDD
val?dataRDD1 =?sparkContext.makeRDD(List(1,2,3,4))
val?dataRDD2 =?sparkContext.makeRDD(List(3,4,5,6))
val?dataRDD?= dataRDD1.intersection(dataRDD2)
?
14) union
??函數簽名
def union(other: RDD[T]): RDD[T]
??函數說明
對源?RDD 和參數 RDD 求并集后返回一個新的 RDD(重復數據不會去重)
val?dataRDD1 =?sparkContext.makeRDD(List(1,2,3,4))
val?dataRDD2 =?sparkContext.makeRDD(List(3,4,5,6))
val?dataRDD?= dataRDD1.union(dataRDD2)
?
15) subtract
??函數簽名
def subtract(other: RDD[T]): RDD[T]
??函數說明
以源?RDD 元素為主,去除兩個 RDD 中重復元素,將源RDD的其他元素保留下來。(求差集)
val?dataRDD1 =?sparkContext.makeRDD(List(1,2,3,4))
val?dataRDD2 =?sparkContext.makeRDD(List(3,4,5,6))
val?dataRDD?= dataRDD1.subtract(dataRDD2)
?
16) zip
??函數簽名
def zip[U:?ClassTag](other: RDD[U]): RDD[(T, U)]
??函數說明
將兩個?RDD 中的元素,以鍵值對的形式進行合并。其中,鍵值對中的 Key 為第 1?個?RDD
中的元素,Value 為第 2?個?RDD 中的相同位置的元素。
val?dataRDD1 =?sparkContext.makeRDD(List("a","b","c","d"))
val?dataRDD2 =?sparkContext.makeRDD(List(1,2,3,4))
val?dataRDD?= dataRDD1.zip(dataRDD2)
flatMap
??函數簽名
def?flatMap[U:?ClassTag](f: T =>?TraversableOnce[U]): RDD[U]
??函數說明
將處理的數據進行扁平化后再進行映射處理,所以算子也稱之為扁平映射。
val?dataRDD?=?sparkContext.makeRDD(List(
?List(1,2),List(3,4)
),1)
val?dataRDD1?=?dataRDD.flatMap(
?list => list
)
?
map和flatMap的區別:
?
map會將每一條輸入數據映射為一個新對象。
?
flatMap包含兩個操作:會將每一個輸入對象輸入映射為一個新集合,然后把這些新集合連成一個大集合。
partitionBy
??函數簽名
def?partitionBy(partitioner: Partitioner): RDD[(K, V)]
??函數說明
將數據按照指定?Partitioner 重新進行分區。Spark 默認的分區器是?HashPartitioner
val?rdd: RDD[(Int,?String)] =
?sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
val?rdd2: RDD[(Int,?String)] =
?rdd.partitionBy(new?HashPartitioner(2))
函數說明
將數據源的數據根據?key 對 value 進行分組
val?dataRDD1 =
?sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val?dataRDD2 = dataRDD1.groupByKey()
val?dataRDD3 = dataRDD1.groupByKey(2)
val?dataRDD4 = dataRDD1.groupByKey(new?HashPartitioner(2))
可以將數據按照相同的?Key 對 Value 進行聚合
val?dataRDD1 =?sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val?dataRDD2 = dataRDD1.reduceByKey(_+_)
val?dataRDD3 = dataRDD1.reduceByKey(_+_,?2)
將數據根據不同的規則進行分區內計算和分區間計算val?dataRDD1 =
?sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val?dataRDD2 =
?dataRDD1.aggregateByKey(0)(_+_,_+_)
val?dataRDD1 =
?sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))
val?dataRDD2 = dataRDD1.foldByKey(0)(_+_)
現有數據 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),求每個key的總值及每個key對應鍵值對的個數
val?list:?List[(String, Int)] =?List(("a",?88), ("b",?95), ("a",?91), ("b",?93),("a",?95), ("b",?98))
val?input: RDD[(String, Int)] =?sc.makeRDD(list,?2)
val?combineRDD: RDD[(String, (Int, Int))] =?input.combineByKey(
?(_,?1),?//a=>(a,1)
?(acc: (Int, Int), v) => (acc._1 + v, acc._2 +?1),?//acc_1為數據源的value,acc_2為key出現的次數,二者進行分區內部的計算
?(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)?//將分區內部計算的結果進行分區間的匯總計算,得到每個key的總值以及每個key出現的次數
)
在一個(K,V)的 RDD 上調用,K 必須實現 Ordered 接口(特質),返回一個按照 key 進行排序
val?dataRDD1 =?sc.makeRDD(List(("a",1),("b",2),("c",3)))
val?sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val?sortRDD2: RDD[(String, Int)] = dataRDD1.sortByKey(false)
?
?
?