
目錄
天小天:(一)Spark Streaming 算子梳理 — 簡單介紹streaming運行邏輯
天小天:(二)Spark Streaming 算子梳理 — flatMap和mapPartitions
天小天:(三)Spark Streaming 算子梳理 — transform算子
天小天:(四)Spark Streaming 算子梳理 — Kafka createDirectStream
天小天:(五)Spark Streaming 算子梳理 — foreachRDD
天小天:(六)Spark Streaming 算子梳理 — glom算子
天小天:(七)Spark Streaming 算子梳理 — repartition算子
天小天:(八)Spark Streaming 算子梳理 — window算子
前言
本文主要講解repartiion
的作用及原理。
作用
repartition
用來調整父RDD
的分區數,入參為調整之后的分區數。由于使用方法比較簡單,這里就不寫例子了。
源碼分析
接下來從源碼的角度去分析是如何實現重新分區的。
DStream
/*** Return a new DStream with an increased or decreased level of parallelism. Each RDD in the* returned DStream has exactly numPartitions partitions.*/def repartition(numPartitions: Int): DStream[T] = ssc.withScope {this.transform(_.repartition(numPartitions))}
從方法中可以看到,實現repartition
的方式是通過Dstream
的transform
算子之間調用RDD的repartition
算子實現的。
接下來就是看看RDD的repartition
算子是如何實現的。
RDD
/*** Return a new RDD that has exactly numPartitions partitions.** Can increase or decrease the level of parallelism in this RDD. Internally, this uses* a shuffle to redistribute data.** If you are decreasing the number of partitions in this RDD, consider using `coalesce`,* which can avoid performing a shuffle.** TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.*/def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)}
首先可以看到RDD
的repartition
的實現是調用時coalesce
方法。其中入參有兩個第一個是numPartitions
為重新分區后的分區數量,第二個參數為是否shuffle,這里的入參為true
代表會進行shuffle。
接下來看下coalesce
是如何實現的。
def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T] = withScope {require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")if (shuffle) {// 是否經過shuffle,repartition是走這個邏輯/** Distributes elements evenly across output partitions, starting from a random partition. */// distributePartition是shuffle的邏輯,// 對迭代器中的每個元素分派不同的key,shuffle時根據這些key平均的把元素分發到下一個stage的各個partition中。val distributePartition = (index: Int, items: Iterator[T]) => {var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)items.map { t =>// Note that the hash code of the key will just be the key itself. The HashPartitioner// will mod it with the number of total partitions.position = position + 1(position, t)}} : Iterator[(Int, T)]// include a shuffle step so that our upstream tasks are still distributednew CoalescedRDD(new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), // 為每個元素分配key,分配的邏輯為distributePartitionnew HashPartitioner(numPartitions)), // ShuffledRDD 根據key進行混洗numPartitions,partitionCoalescer).values} else {// 如果不經過shuffle之間返回CoalescedRDDnew CoalescedRDD(this, numPartitions, partitionCoalescer)}}
從源碼中可以看到無論是否經過shuffle最終返回的都是CoalescedRDD
。其中區別是經過shuffle需要為每個元素分配key,并根據key將所有的元素平均分配到task中。
CoalescedRDD
private[spark] class CoalescedRDD[T: ClassTag](@transient var prev: RDD[T], // 父RDDmaxPartitions: Int, // 最大partition數量,這里就是重新分區后的partition數量partitionCoalescer: Option[PartitionCoalescer] = None // 重新分區算法,入參默認為None)extends RDD[T](prev.context, Nil) { // Nil since we implement getDependenciesrequire(maxPartitions > 0 || maxPartitions == prev.partitions.length,s"Number of partitions ($maxPartitions) must be positive.")if (partitionCoalescer.isDefined) {require(partitionCoalescer.get.isInstanceOf[Serializable],"The partition coalescer passed in must be serializable.")}override def getPartitions: Array[Partition] = {// 獲取重新算法,默認為DefaultPartitionCoalescerval pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())// coalesce方法是根據傳入的rdd和最大分區數計算出每個新的分區處理哪些舊的分區pc.coalesce(maxPartitions, prev).zipWithIndex.map {case (pg, i) => // pg為partitionGroup即舊的partition組成的集合,集合里的partition對應一個新的partitionval ids = pg.partitions.map(_.index).toArraynew CoalescedRDDPartition(i, prev, ids, pg.prefLoc) //組成一個新的parititon}}override def compute(partition: Partition, context: TaskContext): Iterator[T] = {// 當執行到這里時分區已經重新分配好了,這部分代碼也是執行在新的分區的task中的。// 新的partition取出就的partition對應的所有partition并以此調用福rdd的迭代器執行next計算。partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>firstParent[T].iterator(parentPartition, context)}}override def getDependencies: Seq[Dependency[_]] = {Seq(new NarrowDependency(prev) {def getParents(id: Int): Seq[Int] =partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices})}override def clearDependencies() {super.clearDependencies()prev = null}/*** Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,* then the preferred machine will be one which most parent splits prefer too.* @param partition* @return the machine most preferred by split*/override def getPreferredLocations(partition: Partition): Seq[String] = {partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq}
}
對于CoalescedRDD
來講getPartitions
方法是最核心的方法。舊的parition對應哪些新的partition就是在這個方法里計算出來的。具體的算法是在DefaultPartitionCoalescer
的coalesce
方法體現出來的。
compute
方法是在新的task中執行的,即分區已經重新分配好,并且拉取父RDD指定parition
對應的元素提供給下游迭代器計算。
圖示
寫下來用兩張圖解釋下是如何repartition
無shuffle

有shuffle

總結
以上repartition
的邏輯基本就已經介紹完了。其中DefaultPartitionCoalescer
中重新分區的算法邏輯并沒有展開說。這里以后如果有時間會再寫一篇詳細介紹。