Spark核心之02:常用算子詳解

1、RDD操作詳解

# 啟動spark-shell
spark-shell --master local[2] 

1.1 基本轉換

1) map

map是對RDD中的每個元素都執行一個指定的函數來產生一個新的RDD。 任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。

舉例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> val b = a.map(x => x2)
scala> a.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> b.collect
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

上述例子中把原RDD中每個元素都乘以2來產生一個新的RDD。

2) filter

filter 是對RDD中的每個元素都執行一個指定的函數來過濾產生一個新的RDD。 任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。

val rdd = sc.parallelize(List(1,2,3,4,5,6))  
val filterRdd = rdd.filter(_ > 5)
filterRdd.collect() //返回所有大于5的數據的一個Array, Array(6,8,10,12)

3) flatMap

與map類似,區別是原RDD中的元素經map處理后只能生成一個元素,而原RDD中的元素經flatmap處理后可生成多個元素來構建新RDD。 舉例:對原RDD中的每個元素x產生y個元素(從1到y,y為元素x的值)

scala> val a = sc.parallelize(1 to 4, 2)
scala> val b = a.flatMap(x => 1 to x)
scala> b.collect
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

??4) mapPartitions

mapPartitions是map的一個變種。map的輸入函數是應用于RDD中每個元素,而mapPartitions的輸入函數是應用于每個分區,也就是把每個分區中的內容作為整體來處理的。 它的函數定義為:

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]

f即為輸入函數,它處理每個分區里面的內容。每個分區中的內容將以Iterator[T]傳遞給輸入函數ff的輸出結果是Iterator[U]。最終的RDD由所有分區經過輸入函數處理后的結果合并起來的。

舉例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) {val cur = iter.nextres.::=(pre, cur)pre = cur  } res.iterator}
scala> a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

上述例子中的函數myfunc是把分區中一個元素和它的下一個元素組成一個Tuple。因為分區中最后一個元素沒有下一個元素了,所以(3,4)和(6,7)不在結果中。 mapPartitions還有些變種,比如mapPartitionsWithContext,它能把處理過程中的一些狀態信息傳遞給用戶指定的輸入函數。還有mapPartitionsWithIndex,它能把分區的index傳遞給用戶指定的輸入函數。

5) mapPartitionsWithIndex

def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

函數作用同mapPartitions,不過提供了兩個參數,第一個參數為分區的索引。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有兩個分區
scala> var rdd2 = rdd1.mapPartitionsWithIndex{|     (x,iter) => {|       var result = List[String]()|       var i = 0|       while(iter.hasNext){|        i += iter.next()|       }|       result.::(x + "|" + i).iterator|     }| }//rdd2將rdd1中每個分區的數字累加,并在每個分區的累加結果前面加了分區索引
scala> rdd2.collect
res13: Array[String] = Array(0|3, 1|12)  //p-0(1,2) p-1(3,4,5)

🈂?好像沒用了🈂?6) mapWith

mapWith是map的另外一個變種,map只需要一個輸入函數,而mapWith有兩個輸入函數。它的定義如下:

def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]

第一個函數constructA是把RDD的partition index(index從0開始)作為輸入,輸出為新類型A;

第二個函數f是把二元組(T, A)作為輸入(其中T為原RDD中的元素,A為第一個函數的輸出),輸出類型為U。

舉例:把partition index 乘以10加2,作為新的RDD的元素。

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3)
x.mapWith(a => a * 10)((b, a) => (b,a + 2)).collect 

結果:

(1,2)

(2,2)

(3,2)

(4,12)

(5,12)

(6,12)

(7,22)

(8,22)

(9,22)

(10,22)

🈂?好像沒用了🈂?7) flatMapWith

flatMapWith與mapWith很類似,都是接收兩個函數,一個函數把partitionIndex作為輸入,輸出是一個新類型A;另外一個函數是以二元組(T,A)作為輸入,輸出為一個序列,這些序列里面的元素組成了新的RDD。它的定義如下:

def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]

舉例:

scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect
res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9)

??8) coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]

該函數用于將RDD進行重分區,使用HashPartitioner。

第一個參數為重分區的數目,第二個為是否進行shuffle,默認為false;

以下面的例子來看:

scala> var data = sc.parallelize(1 to 12, 3) 
scala> data.collect 
scala> data.partitions.size 
scala> var rdd1 = data.coalesce(1) 
scala> rdd1.partitions.size 
scala> var rdd1 = data.coalesce(4) 
scala> rdd1.partitions.size
res2: Int = 1   //如果重分區的數目大于原來的分區數,那么必須指定shuffle參數為true,//否則,分區數不便
scala> var rdd1 = data.coalesce(4,true) 
scala> rdd1.partitions.size
res3: Int = 4

??9) repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

該函數其實就是coalesce函數第二個參數為true的實現

scala> var data = sc.parallelize(1 to 12, 3) 
scala> data.collect 
scala> data.partitions.size 
scala> var rdd1 = data. repartition(1) 
scala> rdd1.partitions.size 
scala> var rdd1 = data. repartition(4) 
scala> rdd1.partitions.size
res3: Int = 4

10) randomSplit

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

該函數根據weights權重,將一個RDD切分成多個RDD。

該權重參數為一個Double數組

第二個參數為random的種子,基本可忽略。

scala> var rdd = sc.makeRDD(1 to 12,12)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at makeRDD at :21scala> rdd.collect
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)  scala> var splitRDD = rdd.randomSplit(Array(0.5, 0.1, 0.2, 0.2))
splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[17] at randomSplit at :23, 
MapPartitionsRDD[18] at randomSplit at :23, 
MapPartitionsRDD[19] at randomSplit at :23, 
MapPartitionsRDD[20] at randomSplit at :23)//這里注意:randomSplit的結果是一個RDD數組
scala> splitRDD.size
res8: Int = 4
//由于randomSplit的第一個參數weights中傳入的值有4個,因此,就會切分成4個RDD,
//把原來的rdd按照權重0.5, 0.1, 0.2, 0.2,隨機劃分到這4個RDD中,權重高的RDD,劃分到//的幾率就大一些。
//注意,權重的總和加起來為1,否則會不正常 
scala> splitRDD(0).collect
res10: Array[Int] = Array(1, 4)scala> splitRDD(1).collect
res11: Array[Int] = Array(3)                                                    scala> splitRDD(2).collect
res12: Array[Int] = Array(5, 9)scala> splitRDD(3).collect
res13: Array[Int] = Array(2, 6, 7, 8, 10)

11) glom

def glom(): RDD[Array[T]]

該函數是將RDD中每一個分區中類型為T的元素轉換成Array[T],這樣每一個分區就只有一個數組元素

scala> var rdd = sc.makeRDD(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at makeRDD at :21
scala> rdd.partitions.size
res33: Int = 3  //該RDD有3個分區
scala> rdd.glom().collect
res35: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
//glom將每個分區中的元素放到一個數組中,這樣,結果就變成了3個數組

??12) union并集

val rdd1 = sc.parallelize(List(5, 6, 4, 3))val rdd2 = sc.parallelize(List(1, 2, 3, 4))//求并集val rdd3 = rdd1.union(rdd2)rdd3.collect

??13) distinct去重

val rdd1 = sc.parallelize(List(5, 6, 4, 3))val rdd2 = sc.parallelize(List(1, 2, 3, 4))//求并集val rdd3 = rdd1.union(rdd2)//去重輸出rdd3.distinct.collect

??14) intersection交集

val rdd1 = sc.parallelize(List(5, 6, 4, 3))val rdd2 = sc.parallelize(List(1, 2, 3, 4))//求交集val rdd4 = rdd1.intersection(rdd2) rdd4.collect

??15) subtract

def subtract(other: RDD[T]): RDD[T]def subtract(other: RDD[T], numPartitions: Int): RDD[T]def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

該函數返回在RDD中出現,并且不在otherRDD中出現的元素,不去重。

val rdd1 = sc.parallelize(List(5, 6, 6, 4, 3))val rdd2 = sc.parallelize(List(1, 2, 3, 4))//求差集val rdd4 = rdd1.subtract(rdd2)rdd4.collect

16) subtractByKey

def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)]def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)]

subtractByKey和基本轉換操作中的subtract類似,只不過這里是針對K的,返回在主RDD中出現,并且不在otherRDD中出現的元素。

參數numPartitions用于指定結果的分區數

參數partitioner用于指定分區函數

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) scala> rdd1.subtractByKey(rdd2).collectres13: Array[(String, String)] = Array((B,2))

17) groupbyKey

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求并集
val rdd4 = rdd1 union rdd2
//按key進行分組
val rdd5 = rdd4.groupByKe
rdd5.collect

18) reduceByKey

顧名思義,reduceByKey就是對元素為KV對的RDD中Key相同的元素的Value進行reduce,因此,Key相同的多個元素的值被reduce為一個值,然后與原RDD中的Key組成一個新的KV對。

舉例:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求并集
val rdd4 = rdd1 union rdd2
//按key進行分組
val rdd6 = rdd4.reduceByKey(_ + _)
rdd6.collect()

19) sortByKey

將List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1))和List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5))做wordcount,并按名稱排序

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))val rdd3 = rdd1.union(rdd2)//按key進行聚合val rdd4 = rdd3.reduceByKey(_ + _)//false降序val rdd5 = rdd4.sortByKey(false)rdd5.collect

20) sortBy

將List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1))和List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5))做wordcount,并按數值排序

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))val rdd3 = rdd1.union(rdd2)//按key進行聚合val rdd4 = rdd3.reduceByKey(_ + _)//false降序val rdd5 = rdd4.sortBy(_._2, false)rdd5.collect

21) zip

def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

zip函數用于將兩個RDD組合成Key/Value形式的RDD,這里默認兩個RDD的partition數量以及元素數量都相同,否則會拋出異常。

scala> var rdd1 = sc.makeRDD(1 to 5,2)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21scala> rdd1.zip(rdd2).collectres0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))      scala> rdd2.zip(rdd1).collectres1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3)rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21scala> rdd1.zip(rdd3).collectjava.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions//如果兩個RDD分區數不同,則拋出異常

22) zipPartitions

zipPartitions函數將多個RDD按照partition組合成為新的RDD,該函數需要組合的RDD具有相同的分區數,但對于每個分區內的元素數量沒有要求。

該函數有好幾種實現,可分為三類:

1. 參數是一個RDD
def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

這兩個區別就是參數preservesPartitioning,是否保留父RDD的partitioner分區信息

映射方法f參數為兩個RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21//rdd1兩個分區中元素分布:
scala> rdd1.mapPartitionsWithIndex{|     (x,iter) => {|      var result = List[String]()|       while(iter.hasNext){|        result ::= ("part_" + x + "|" + iter.next())|       }|       result.iterator|       |     }|    }.collect
res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)//rdd2兩個分區中元素分布
scala> rdd2.mapPartitionsWithIndex{|     (x,iter) => {|      var result = List[String]()|       while(iter.hasNext){|        result ::= ("part_" + x + "|" + iter.next())|       }|       result.iterator|       |     }|    }.collect
res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)//rdd1和rdd2做zipPartition
scala> rdd1.zipPartitions(rdd2){|    (rdd1Iter,rdd2Iter) => {|     var result = List[String]()|     while(rdd1Iter.hasNext && rdd2Iter.hasNext) {|      result::=(rdd1Iter.next() + "_" + rdd2Iter.next())|     }|     result.iterator|    }|   }.collect
res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)
2. 參數是兩個RDD
def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

用法同上面,只不過該函數參數為兩個RDD,映射方法f輸入參數為兩個RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21scala> var rdd3 = sc.makeRDD(Seq("a","b","c","d","e"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21//rdd3中個分區元素分布
scala> rdd3.mapPartitionsWithIndex{|     (x,iter) => {|      var result = List[String]()|       while(iter.hasNext){|        result ::= ("part_" + x + "|" + iter.next())|       }|       result.iterator|       |     }|    }.collect
res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)//三個RDD做zipPartitions
scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){|    (rdd1Iter,rdd2Iter,rdd3Iter) => {|     var result = List[String]()|     while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {|      result::=(rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next())|     }|     result.iterator|    }|   }
rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27scala> rdd4.collect
res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)
3. 參數是三個RDD
def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

用法同上面,只不過這里又多了個一個RDD而已。

23) zipWithIndex

def zipWithIndex(): RDD[(T, Long)]

該函數將RDD中的元素和這個元素在RDD中的ID(索引號)組合成鍵/值對。

scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21scala> rdd2.zipWithIndex().collectres27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))

24) zipWithUniqueId

def zipWithUniqueId(): RDD[(T, Long)]

該函數將RDD中元素和一個唯一ID組合成鍵/值對,該唯一ID生成算法如下:

每個分區中第一個元素的唯一ID值為:該分區索引號,

每個分區中第N個元素的唯一ID值為:(前一個元素的唯一ID值) + (該RDD總的分區數)

看下面的例子:

scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21//rdd1有兩個分區,scala> rdd1.zipWithUniqueId().collectres32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))//總分區數為2//第一個分區第一個元素ID為0,第二個分區第一個元素ID為1//第一個分區第二個元素ID為0+2=2,第一個分區第三個元素ID為2+2=4//第二個分區第二個元素ID為1+2=3,第二個分區第三個元素ID為3+2=5

1.2 鍵值轉換

??25) partitionBy

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

該函數根據partitioner函數生成新的ShuffleRDD,將原RDD重新分區。

scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at makeRDD at :21scala> rdd1.partitions.size
res20: Int = 2//查看rdd1中每個分區的元素
scala> rdd1.mapPartitionsWithIndex{|     (partIdx,iter) => {|      var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()|       while(iter.hasNext){|        var part_name = "part_" + partIdx;|        var elem = iter.next()|        if(part_map.contains(part_name)) {|         var elems = part_map(part_name)|         elems ::= elem|         part_map(part_name) = elems|        } else {|         part_map(part_name) = List[(Int,String)]{elem}|        }|       }|       part_map.iterator|       |     }|    }.collect
res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C))))//(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中//使用partitionBy重分區
scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at :23scala> rdd2.partitions.size
res23: Int = 2//查看rdd2中每個分區的元素
scala> rdd2.mapPartitionsWithIndex{|     (partIdx,iter) => {|      var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()|       while(iter.hasNext){|        var part_name = "part_" + partIdx;|        var elem = iter.next()|        if(part_map.contains(part_name)) {|         var elems = part_map(part_name)|         elems ::= elem|         part_map(part_name) = elems|        } else {|         part_map(part_name) = List[(Int,String)]{elem}|        }|       }|       part_map.iterator|     }|    }.collect
res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))//(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中

26) mapValues

mapValues顧名思義就是輸入函數應用于RDD中Kev-Value的Value,原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素。因此,該函數只適用于元素為KV對的RDD。

舉例:

scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)scala> val b = a.map(x => (x.length, x))scala> b.mapValues("x" + _ + "x").collectres5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))

27) flatMapValues

flatMapValues類似于mapValues,不同的在于flatMapValues應用于元素為KV對的RDD中Value。每個一元素的Value被輸入函數映射為一系列的值,然后這些值再與原RDD中的Key組成一系列新的KV對。

舉例

val a = sc.parallelize(List((1, 2), (3, 4), (5, 6)))val b = a.flatMapValues(x => 1.to(x))b.collect.foreach(println)

28) combineByKey

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

該函數用于將RDD[K,V]轉換成RDD[K,C],這里的V類型和C類型可以相同也可以不同。

其中的參數:

createCombiner:組合器函數,用于將V類型轉換成C類型,輸入參數為RDD[K,V]中的V,輸出為C ,分區內相同的key做一次

mergeValue:合并值函數,將一個C類型和一個V類型值合并成一個C類型,輸入參數為(C,V),輸出為C,分區內相同的key循環做

mergeCombiners:分區合并組合器函數,用于將兩個C類型值合并成一個C類型,輸入參數為(C,C),輸出為C,分區之間循環做

numPartitions:結果RDD分區數,默認保持原有的分區數

partitioner:分區函數,默認為HashPartitioner

mapSideCombine:是否需要在Map端進行combine操作,類似于MapReduce中的combine,默認為true

看下面例子:

scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21 scala> rdd1.combineByKey(|    (v : Int) => v + "_",  |    (c : String, v : Int) => c + "@" + v,  |    (c1 : String, c2 : String) => c1 + "$" + c2|   ).collect
res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))

其中三個映射函數分別為:

createCombiner: (V) => C

(v : Int) => v + “” //在每一個V值后面加上字符,返回C類型(String)

mergeValue: (C, V) => C

(c : String, v : Int) => c + “@” + v //合并C類型和V類型,中間加字符@,返回C(String)

mergeCombiners: (C, C) => C

(c1 : String, c2 : String) => c1 + “ ” + c 2 / / 合并 C 類型和 C 類型,中間加 ” + c2 //合并C類型和C類型,中間加 +c2//合并C類型和C類型,中間加,返回C(String)

其他參數為默認值。

最終,將RDD[String,Int]轉換為RDD[String,String]。

再看例子:

rdd1.combineByKey((v : Int) => List(v),(c : List[Int], v : Int) => v :: c,(c1 : List[Int], c2 : List[Int]) => c1 ::: c2
).collect
res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))

最終將RDD[String,Int]轉換為RDD[String,List[Int]]。

29) foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] 

該函數用于RDD[K,V]根據K將V做折疊、合并處理,其中的參數zeroValue表示先根據映射函數將zeroValue應用于V,進行初始化V,再將映射函數應用于初始化后的V.

例子:

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))scala> rdd1.foldByKey(0)(_+_).collectres75: Array[(String, Int)] = Array((A,2), (B,3), (C,1)) //將rdd1中每個key對應的V進行累加,注意zeroValue=0,需要先初始化V,映射函數為+操//作,比如("A",0), ("A",2),先將zeroValue應用于每個V,得到:("A",0+0), ("A",2+0),即://("A",0), ("A",2),再將映射函數應用于初始化后的V,最后得到(A,0+2),即(A,2)

再看:

scala> rdd1.foldByKey(2)(_+_).collectres76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))//先將zeroValue=2應用于每個V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再將映射函//數應用于初始化后的V,最后得到:(A,2+4),即:(A,6)

再看乘法操作:

scala> rdd1.foldByKey(0)(__).collectres77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))//先將zeroValue=0應用于每個V,注意,這次映射函數為乘法,得到:("A",00), ("A",20),//即:("A",0), ("A",0),再將映射函//數應用于初始化后的V,最后得到:(A,00),即:(A,0)//其他K也一樣,最終都得到了V=0scala> rdd1.foldByKey(1)(__).collectres78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))//映射函數為乘法時,需要將zeroValue設為1,才能得到我們想要的結果。

在使用foldByKey算子時候,要特別注意映射函數及zeroValue的取值。

30) reduceByKeyLocally

def reduceByKeyLocally(func: (V, V) => V): Map[K, V]

該函數將RDD[K,V]中每個K對應的V值根據映射函數來運算,運算結果映射到一個Map[K,V]中,而不是RDD[K,V]。

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[91] at makeRDD at :21scala> rdd1.reduceByKeyLocally((x,y) => x + y)res90: scala.collection.Map[String,Int] = Map(B -> 3, A -> 2, C -> 1)

31) cogroup和groupByKey的區別

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))//cogroupval rdd3 = rdd1.cogroup(rdd2)//groupbykeyval rdd4 = rdd1.union(rdd2).groupByKey//注意cogroup與groupByKey的區別rdd3.foreach(println)rdd4.foreach(println)

??32) join

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))//求jionval rdd3 = rdd1.join(rdd2)rdd3.collect

33) leftOuterJoin

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

leftOuterJoin類似于SQL中的左外關聯left outer join,返回結果以前面的RDD為主,關聯不上的記錄為空。只能用于兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可。

參數numPartitions用于指定結果的分區數

參數partitioner用于指定分區函數

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)scala> rdd1.leftOuterJoin(rdd2).collectres11: Array[(String, (String, Option[String]))] = Array((B,(2,None)), (A,(1,Some(a))), (C,(3,Some(c))))

34) rightOuterJoin

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))] 

rightOuterJoin類似于SQL中的有外關聯right outer join,返回結果以參數中的RDD為主,關聯不上的記錄為空。只能用于兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可。

參數numPartitions用于指定結果的分區數

參數partitioner用于指定分區函數

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)scala> rdd1.rightOuterJoin(rdd2).collectres12: Array[(String, (Option[String], String))] = Array((D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c)))

1.3 Action操作

35) first

def first(): T

first返回RDD中的第一個元素,不排序。

scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21scala> rdd1.firstres14: (String, String) = (A,1)scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21scala> rdd1.firstres8: Int = 10

36) count

def count(): Long

count返回RDD中的元素數量。

scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21scala> rdd1.countres15: Long = 3

36) reduce

def reduce(f: (T, T) ? T): T

根據映射函數f,對RDD中的元素進行二元計算,返回計算結果。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21scala> rdd1.reduce(_ + _)
res18: Int = 55scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21scala> rdd2.reduce((x,y) => {|    (x._1 + y._1,x._2 + y._2)|   })
res21: (String, Int) = (CBBAA,6)

37) collect

def collect(): Array[T]

collect用于將一個RDD轉換成數組。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21scala> rdd1.collect
res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

38) take

def take(num: Int): Array[T]

take用于獲取RDD中從0到num-1下標的元素,不排序。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21scala> rdd1.take(1)res0: Array[Int] = Array(10)                           scala> rdd1.take(2)res1: Array[Int] = Array(10, 4)

39) top

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

top函數用于從RDD中,按照默認(降序)或者指定的排序規則,返回前num個元素。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21scala> rdd1.top(1)
res2: Array[Int] = Array(12)scala> rdd1.top(2)
res3: Array[Int] = Array(12, 10)//指定排序規則
scala> implicit val myOrd = implicitly[Ordering[Int]].reverse
myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499efscala> rdd1.top(1)
res4: Array[Int] = Array(2)scala> rdd1.top(2)
res5: Array[Int] = Array(2, 3)

40) takeOrdered

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

takeOrdered和top類似,只不過以和top相反的順序返回元素。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21scala> rdd1.top(1)
res4: Array[Int] = Array(12)scala> rdd1.top(2)
res5: Array[Int] = Array(12, 10)scala> rdd1.takeOrdered(1)
res6: Array[Int] = Array(2)scala> rdd1.takeOrdered(2)
res7: Array[Int] = Array(2, 3)

??41) aggregate

def aggregate[U](zeroValue: U)(seqOp: (U, T) ? U, combOp: (U, U) ? U)(implicit arg0: ClassTag[U]): U

aggregate用戶聚合RDD中的元素,先使用seqOp將RDD中每個分區中的T類型元素聚合成U類型,再使用combOp將之前每個分區聚合后的U類型聚合成U類型,特別注意seqOp和combOp都會使用zeroValue的值,zeroValue的類型為U。

var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.mapPartitionsWithIndex{(partIdx,iter) => {var part_map = scala.collection.mutable.Map[String,List[Int]]()while(iter.hasNext){var part_name = "part_" + partIdx;var elem = iter.next()if(part_map.contains(part_name)) {var elems = part_map(part_name)elems ::= elempart_map(part_name) = elems} else {part_map(part_name) = List[Int]{elem}}}part_map.iterator}}.collect
res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))##第一個分區中包含5,4,3,2,1
##第二個分區中包含10,9,8,7,6scala> rdd1.aggregate(1)(|      {(x : Int,y : Int) => x + y}, |      {(a : Int,b : Int) => a + b}|   )
res17: Int = 58

結果為什么是58,看下面的計算過程:

##先在每個分區中迭代執行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16##part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41##再將兩個分區的結果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58

再比如:

scala> rdd1.aggregate(2)(|      {(x : Int,y : Int) => x + y}, |      {(a : Int,b : Int) => a  b}|   )
res18: Int = 1428
##這次zeroValue=2
##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17
##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42
##最后:zeroValuepart_0part_1 = 2  17  42 = 1428

因此,zeroValue即確定了U的類型,也會對結果產生至關重要的影響,使用時候要特別注意。

42) fold

def fold(zeroValue: T)(op: (T, T) ? T): T

fold是aggregate的簡化,將aggregate中的seqOp和combOp使用同一個函數op。

var rdd1 = sc.makeRDD(1 to 10, 2)
scala> rdd1.fold(1)(|    (x,y) => x + y   |   )
res19: Int = 58
##結果同上面使用aggregate的第一個例子一樣,即:
scala> rdd1.aggregate(1)(|      {(x,y) => x + y}, |      {(a,b) => a + b}|   )
res20: Int = 58

43) lookup

def lookup(key: K): Seq[V]

lookup用于(K,V)類型的RDD,指定K值,返回RDD中該K對應的所有V值。

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21scala> rdd1.lookup("A")
res0: Seq[Int] = WrappedArray(0, 2)scala> rdd1.lookup("B")
res1: Seq[Int] = WrappedArray(1, 2)

44) countByKey

def countByKey(): Map[K, Long]

countByKey用于統計RDD[K,V]中每個K的數量。

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21scala> rdd1.countByKey
res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3)

45) foreach

def foreach(f: (T) ? Unit): Unit

foreach用于遍歷RDD,將函數f應用于每一個元素。

但要注意,如果對RDD執行foreach,只會在Executor端有效,而并不是Driver端。

比如:rdd.foreach(println),只會在Executor的stdout中打印出來,Driver端是看不到的。

我在Spark1.4中是這樣,不知道是否真如此。

這時候,使用accumulator共享變量與foreach結合,倒是個不錯的選擇。

scala> var cnt = sc.accumulator(0)
cnt: org.apache.spark.Accumulator[Int] = 0scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21scala> rdd1.foreach(x => cnt += x)scala> cnt.value
res51: Int = 55scala> rdd1.collect.foreach(println) 

??46) foreachPartition

def foreachPartition(f: (Iterator[T]) ? Unit): Unit

foreachPartition和foreach類似,只不過是對每一個分區使用f。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21scala> var allsize = sc.accumulator(0)
size: org.apache.spark.Accumulator[Int] = 0scala>   rdd1.foreachPartition { x => {|    allsize += x.size|   }}
scala> println(allsize.value)
#10

??47) sortBy

def sortBy[K](f: (T) ? K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

sortBy根據給定的排序k函數將RDD中的元素進行排序。

scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2)scala> rdd1.sortBy(x => x).collectres1: Array[Int] = Array(0, 1, 2, 3, 6, 7) //默認升序scala> rdd1.sortBy(x => x,false).collectres2: Array[Int] = Array(7, 6, 3, 2, 1, 0)  //降序//RDD[K,V]類型scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))scala> rdd1.sortBy(x => x).collectres3: Array[(String, Int)] = Array((A,1), (A,2), (B,3), (B,6), (B,7))//按照V進行降序排序scala> rdd1.sortBy(x => x._2,false).collectres4: Array[(String, Int)] = Array((B,7), (B,6), (B,3), (A,2), (A,1))

48) saveAsTextFile

def saveAsTextFile(path: String): Unitdef saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

saveAsTextFile用于將RDD以文本文件的格式存儲到文件系統中。

codec參數可以指定壓縮的類名。

var rdd1 = sc.makeRDD(1 to 10,2)
scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //保存到HDFShadoop fs -ls /tmp/lxw1234.com
Found 2 items
-rw-r--r--  2 lxw1234 supergroup     0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS
-rw-r--r--  2 lxw1234 supergroup     21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000hadoop fs -cat /tmp/lxw1234.com/part-00000

注意:如果使用rdd1.saveAsTextFile(“file:///tmp/lxw1234.com”)將文件保存到本地文件系統,那么只會保存在Executor所在機器的本地目錄。

## 指定壓縮格式保存rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/",classOf[com.hadoop.compression.lzo.LzopCodec])hadoop fs -ls /tmp/lxw1234.com
-rw-r--r--  2 lxw1234 supergroup   0 2015-07-10 09:20 /tmp/lxw1234.com/_SUCCESS
-rw-r--r--  2 lxw1234 supergroup   71 2015-07-10 09:20 /tmp/lxw1234.com/part-00000.lzohadoop fs -text /tmp/lxw1234.com/part-00000.lzo

49) saveAsSequenceFile

saveAsSequenceFile用于將RDD以SequenceFile的文件格式保存到HDFS上。

用法同saveAsTextFile。

50) saveAsObjectFile

def saveAsObjectFile(path: String): Unit

saveAsObjectFile用于將RDD中的元素序列化成對象,存儲到文件中。

對于HDFS,默認采用SequenceFile保存。

var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.saveAsObjectFile("hdfs://cdh5/tmp/lxw1234.com/")hadoop fs -cat /tmp/lxw1234.com/part-00000
SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT
  1. saveAsHadoopFile
def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unitdef saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf =, codec: Option[Class[_ <: CompressionCodec]] = None): Unit

saveAsHadoopFile是將RDD存儲在HDFS上的文件中,支持老版本Hadoop API。

可以指定outputKeyClass、outputValueClass以及壓縮格式。

每個分區輸出一個文件。

var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritablerdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],classOf[com.hadoop.compression.lzo.LzopCodec])

52) saveAsHadoopDataset

def saveAsHadoopDataset(conf: JobConf): Unit

saveAsHadoopDataset用于將RDD保存到除了HDFS的其他存儲中,比如HBase。

在JobConf中,通常需要關注或者設置五個參數:

文件的保存路徑、key值的class類型、value值的class類型、RDD的輸出格式(OutputFormat)、以及壓縮相關的參數。

##使用saveAsHadoopDataset將RDD保存到HDFS中import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConfvar rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
var jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])
jobConf.setOutputKeyClass(classOf[Text])
jobConf.setOutputValueClass(classOf[IntWritable])
jobConf.set("mapred.output.dir","/tmp/lxw1234/")
rdd1.saveAsHadoopDataset(jobConf)

結果:

hadoop fs -cat /tmp/lxw1234/part-00000
A    2
A    1hadoop fs -cat /tmp/lxw1234/part-00001
B    6
B    3
B    7

##保存數據到HBASE

##HBase建表:

create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritablevar conf = HBaseConfiguration.create()var jobConf = new JobConf(conf)jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3")jobConf.set("zookeeper.znode.parent","/hbase")jobConf.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")jobConf.setOutputFormat(classOf[TableOutputFormat])var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))rdd1.map(x => {var put = new Put(Bytes.toBytes(x._1))put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))(new ImmutableBytesWritable,put)}).saveAsHadoopDataset(jobConf)
##結果:
hbase(main):005:0> scan 'lxw1234'
ROW   COLUMN+CELL                                                 A    column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02                        B    column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06                        C    column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07                        
3 row(s) in 0.0550 seconds

注意:保存到HBase,運行時候需要在SPARK_CLASSPATH中加入HBase相關的jar包。

53) saveAsNewAPIHadoopFile

def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unitdef saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit

saveAsNewAPIHadoopFile用于將RDD數據保存到HDFS上,使用新版本Hadoop API。

用法基本同saveAsHadoopFile。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritablevar rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])

54) saveAsNewAPIHadoopDataset

def saveAsNewAPIHadoopDataset(conf: Configuration): Unit

作用同saveAsHadoopDataset,只不過采用新版本Hadoop API。

以寫入HBase為例:

HBase建表:

create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}

完整的Spark應用程序:

package com.lxw1234.testimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Putobject Test {def main(args : Array[String]) {val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com")val sc = new SparkContext(sparkConf);var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3")sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase")sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")var job = new Job(sc.hadoopConfiguration)job.setOutputKeyClass(classOf[ImmutableBytesWritable])job.setOutputValueClass(classOf[Result])job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])rdd1.map(x => {var put = new Put(Bytes.toBytes(x._1))put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))(new ImmutableBytesWritable,put)}   ).saveAsNewAPIHadoopDataset(job.getConfiguration)sc.stop()  }
}

注意:保存到HBase,運行時候需要在SPARK_CLASSPATH中加入HBase相關的jar包。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/72502.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/72502.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/72502.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MySQL 8.X 報錯處理

1.重新加載配置 reload the configuration mysql> ALTER INSTANCE RELOAD KEYRING; ERROR 1227 (42000): Access denied; you need (at least one of) the ENCRYPTION_KEY_ADMIN privilege(s) for this operation 提示需要ENCRYPTION_KEY_ADMIN權限 重新授權 GRANT ENCR…

SQL注入練習場:PHPStudy+SQLI-LABS靶場搭建教程(零基礎友好版)

注意&#xff1a;文中涉及演示均為模擬測試&#xff0c;切勿用于真實環境&#xff0c;任何未授權測試都是違法行為&#xff01; 一、環境準備 下載PHPStudy 官網下載地址&#xff1a;https://www.xp.cn/php-study&#xff08;選擇Windows版&#xff09; 安裝時建議選擇自定…

現今大語言模型性能(準確率)比較

現今大語言模型性能(準確率)比較 表頭信息:表的標題為“大語言模型性能比較結果”(英文:Table 1: Large Language Model Performance Comparison Results),表明該表是用于對比不同大語言模型的性能。列信息: 模型:列出參與比較的不同大語言模型名稱,包括LLAMA3(70B)…

Docker創建自定義網橋并指定網段

前言 docker0是Docker默認網絡的核心組件, 通過虛擬網橋和NAT技術, 實現了容器間的通信以及容器與外部網絡的交互。然而, docker0網段是固定的(通常是172.17.0.0/16), 為了更靈活地管理容器網絡&#xff0c;Docker支持創建自定義網橋&#xff0c;允許用戶指定網段。 例如, 在…

【向量數據庫Weaviate】 和Elasticsearch的區別

Weaviate 和 Elasticsearch 是兩種不同類型的數據庫&#xff0c;設計目標和應用場景有顯著差異。以下是它們的核心區別和適用場景的詳細對比&#xff1a; 1. 設計目標與核心能力 維度WeaviateElasticsearch核心能力向量數據庫 圖數據庫&#xff08;語義搜索優先&#xff09;全…

藍橋杯每日一題:第一周周四哞叫時間

藍橋杯每日一題&#xff1a;第一周周四哞叫時間 疑惑&#xff1a;如何把復雜度控制在Q&#xff08;n&#xff09;&#xff0c;怎么枚舉a和b&#xff0c;longlong的形式又該怎么輸入&#xff08;考慮用string&#xff09; 思路&#xff1a;枚舉倒數第二個b前面有多少個a 這是一…

在 macOS 使用 .pem 私鑰免密登錄騰訊云服務器

前言 在騰訊云上創建服務器時&#xff0c;如果選擇了「密鑰對」的登錄方式&#xff0c;就會得到一個 .pem 文件作為私鑰。很多小伙伴在使用 macOS 系統時&#xff0c;可能不清楚如何使用這個私鑰文件來 SSH 免密登錄遠程服務器。本文將詳細介紹如何在本地配置 .pem 私鑰文件并…

AI學習筆記:LM studio大模型加載參數說明

LM Studio加載大模型時參數設置頁面的常見參數及設置方法如下&#xff1a; 上下文長度&#xff08;Context Length&#xff09; 意義&#xff1a;表示模型可以處理的最大上下文長度&#xff0c;即模型一次能夠考慮的輸入文本的最大token數量。較大的上下文長度能讓模型更好地…

Spring項目中常用操作記錄

List 基礎操作 創建 // 使用 ArrayList&#xff08;基于動態數組&#xff0c;適合隨機訪問&#xff09; List<String> arrayList new ArrayList<>();// 使用 LinkedList&#xff08;基于鏈表&#xff0c;適合頻繁插入/刪除&#xff09; List<Integer> li…

騰訊 TDF 即將開源 Kuikly 跨端框架,Kotlin 支持全平臺

今天&#xff0c;在騰訊的 Shiply 平臺看 Flutter 動態化自研框架 Conch 時&#xff0c;在側邊欄看到了有「跨端開發框架」的介紹&#xff0c;點開發現有兩個產品&#xff1a; Hippy&#xff1a;面向前端技術棧的跨端開發框架&#xff0c;Web原生開發體驗&#xff0c;支持 Rea…

SQL AND OR 操作符詳解

SQL AND & OR 操作符詳解 在SQL(結構化查詢語言)中,AND 和 OR 是兩種非常重要的邏輯操作符,它們用于在查詢條件中組合多個條件。理解并正確使用這些操作符對于編寫有效的SQL查詢至關重要。 引言 在處理數據庫查詢時,我們常常需要根據多個條件來篩選數據。AND 和 OR…

nginx accesslog 打印自定義header

比如我在請求的header中添加了一個path-match-type&#xff0c;那我現在nginx的accesslog 中打印出來&#xff0c;應該如何配置呢&#xff1f; rootnginx-59f5d66df6-jw5k8:/# cat /etc/nginx/nginx.conf user nginx; worker_processes auto;error_log /var/log/nginx/erro…

響應式布局的設計規范

響應式設計&#xff08;Responsive Design&#xff09; 是一種 web 設計技術&#xff0c;旨在使網頁在不同的設備和屏幕尺寸上都有良好的顯示效果。響應式設計的核心思想是網頁的布局能夠根據設備的屏幕寬度、分辨率以及其他特性自動調整&#xff0c;使其適應桌面、平板和手機等…

說一下redis事務底層原理

Redis事務 1. 事務的基本流程 Redis 事務通過 MULTI、EXEC、WATCH 等命令實現&#xff0c;底層原理可以分為以下幾個步驟&#xff1a; (1) MULTI 命令 當客戶端發送 MULTI 命令時&#xff0c;Redis 會將客戶端標記為“事務模式”。在事務模式下&#xff0c;客戶端發送的所有…

【我的Android進階之旅】如何使用NanoHttpd在Android端快速部署一個HTTP服務器?

文章目錄 開篇:程序員的"摸魚神器"?一、為什么選擇NanoHttpd?二、五分鐘極速上車指南2.1 ? 第一步:引入依賴的哲學2.2 ? 第二步:創建服務器類:繼承大法好2.3 ? 第三步:啟動服務的儀式感三、高級玩法:讓服務器不再單調3.1 ?? 場景1:變身文件服務器3.2 ?…

播放器系列3——解碼

FFmpeg解碼過程詳解 解碼流程 #mermaid-svg-FGu92IEtteOdO2tO {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-FGu92IEtteOdO2tO .error-icon{fill:#552222;}#mermaid-svg-FGu92IEtteOdO2tO .error-text{fill:#5522…

SimPO算法-Simple Preference Optimizationwith a Reference-Free Reward

偏好優化&#xff08;preference optimization &#xff09;算法大全&#xff1a; 本篇介紹下SimPO SimPO&#xff08;Simple Preference Optimization&#xff09;的設計核心在于簡化偏好優化過程&#xff0c;同時提升模型的表現。其設計主要圍繞兩個關鍵點展開&#xff1a;長…

AIGC時代:如何快速搞定Spring Boot+Vue全棧開發

文章目錄 一、Spring Boot基礎二、Vue.js基礎三、Spring Boot與Vue.js集成四、性能優化與最佳實踐《快速搞定Spring BootVue全棧開發》 內容簡介作者簡介目錄前言/序言本書內容本書特點讀者對象 隨著人工智能生成內容&#xff08;AIGC&#xff09;技術的迅速發展&#xff0c;…

探秘基帶算法:從原理到5G時代的通信變革【六】CRC 校驗

文章目錄 2.5 CRC 校驗2.5.1 前言2.5.2 CRC算法簡介2.5.3 CRC計算的詳細過程2.5.4 CRC校驗的兩種方法詳解**分離比較法****整體運算法****不同位出錯與余數的關系****總結** 2.5.5 CRC計算的C實現及工具介紹**C實現CRC計算****CRC計算工具推薦** **2.5.6 總結&#xff1a;CRC校…

AUTOSAR微控制器抽象層(MCAL)詳解及綜合實例

目錄 1. 微控制器抽象層(MCAL)概述 1.1 MCAL的核心功能 1.2 MCAL的模塊劃分 1.3 MCAL的工作流程 2. MCAL的詳細功能解析 2.1 微控制器驅動 2.1.1 時鐘配置 2.1.2 電源管理 2.1.3 實例:時鐘配置 2.2 通信驅動 2.2.1 CAN驅動 2.2.2 實例:CAN通信的實現 2.3 I/O驅…