spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子

0185edac07ea52a24acd2f58ef1d4375.png
目錄
天小天:(一)Spark Streaming 算子梳理 — 簡單介紹streaming運行邏輯
天小天:(二)Spark Streaming 算子梳理 — flatMap和mapPartitions
天小天:(三)Spark Streaming 算子梳理 — transform算子
天小天:(四)Spark Streaming 算子梳理 — Kafka createDirectStream
天小天:(五)Spark Streaming 算子梳理 — foreachRDD
天小天:(六)Spark Streaming 算子梳理 — glom算子
天小天:(七)Spark Streaming 算子梳理 — repartition算子
天小天:(八)Spark Streaming 算子梳理 — window算子

前言

本文主要講解repartiion的作用及原理。

作用

repartition用來調整父RDD的分區數,入參為調整之后的分區數。由于使用方法比較簡單,這里就不寫例子了。

源碼分析

接下來從源碼的角度去分析是如何實現重新分區的。

DStream

/*** Return a new DStream with an increased or decreased level of parallelism. Each RDD in the* returned DStream has exactly numPartitions partitions.*/def repartition(numPartitions: Int): DStream[T] = ssc.withScope {this.transform(_.repartition(numPartitions))}

從方法中可以看到,實現repartition的方式是通過Dstreamtransform算子之間調用RDD的repartition算子實現的。

接下來就是看看RDD的repartition算子是如何實現的。

RDD

/*** Return a new RDD that has exactly numPartitions partitions.** Can increase or decrease the level of parallelism in this RDD. Internally, this uses* a shuffle to redistribute data.** If you are decreasing the number of partitions in this RDD, consider using `coalesce`,* which can avoid performing a shuffle.** TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.*/def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)}

首先可以看到RDDrepartition的實現是調用時coalesce方法。其中入參有兩個第一個是numPartitions為重新分區后的分區數量,第二個參數為是否shuffle,這里的入參為true代表會進行shuffle。

接下來看下coalesce是如何實現的。

def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T] = withScope {require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")if (shuffle) {// 是否經過shuffle,repartition是走這個邏輯/** Distributes elements evenly across output partitions, starting from a random partition. */// distributePartition是shuffle的邏輯,// 對迭代器中的每個元素分派不同的key,shuffle時根據這些key平均的把元素分發到下一個stage的各個partition中。val distributePartition = (index: Int, items: Iterator[T]) => {var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)items.map { t =>// Note that the hash code of the key will just be the key itself. The HashPartitioner// will mod it with the number of total partitions.position = position + 1(position, t)}} : Iterator[(Int, T)]// include a shuffle step so that our upstream tasks are still distributednew CoalescedRDD(new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), // 為每個元素分配key,分配的邏輯為distributePartitionnew HashPartitioner(numPartitions)), // ShuffledRDD 根據key進行混洗numPartitions,partitionCoalescer).values} else {// 如果不經過shuffle之間返回CoalescedRDDnew CoalescedRDD(this, numPartitions, partitionCoalescer)}}

從源碼中可以看到無論是否經過shuffle最終返回的都是CoalescedRDD。其中區別是經過shuffle需要為每個元素分配key,并根據key將所有的元素平均分配到task中。

CoalescedRDD

private[spark] class CoalescedRDD[T: ClassTag](@transient var prev: RDD[T], // 父RDDmaxPartitions: Int, // 最大partition數量,這里就是重新分區后的partition數量partitionCoalescer: Option[PartitionCoalescer] = None // 重新分區算法,入參默認為None)extends RDD[T](prev.context, Nil) {  // Nil since we implement getDependenciesrequire(maxPartitions > 0 || maxPartitions == prev.partitions.length,s"Number of partitions ($maxPartitions) must be positive.")if (partitionCoalescer.isDefined) {require(partitionCoalescer.get.isInstanceOf[Serializable],"The partition coalescer passed in must be serializable.")}override def getPartitions: Array[Partition] = {// 獲取重新算法,默認為DefaultPartitionCoalescerval pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())// coalesce方法是根據傳入的rdd和最大分區數計算出每個新的分區處理哪些舊的分區pc.coalesce(maxPartitions, prev).zipWithIndex.map {case (pg, i) => // pg為partitionGroup即舊的partition組成的集合,集合里的partition對應一個新的partitionval ids = pg.partitions.map(_.index).toArraynew CoalescedRDDPartition(i, prev, ids, pg.prefLoc) //組成一個新的parititon}}override def compute(partition: Partition, context: TaskContext): Iterator[T] = {// 當執行到這里時分區已經重新分配好了,這部分代碼也是執行在新的分區的task中的。// 新的partition取出就的partition對應的所有partition并以此調用福rdd的迭代器執行next計算。partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>firstParent[T].iterator(parentPartition, context)}}override def getDependencies: Seq[Dependency[_]] = {Seq(new NarrowDependency(prev) {def getParents(id: Int): Seq[Int] =partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices})}override def clearDependencies() {super.clearDependencies()prev = null}/*** Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,* then the preferred machine will be one which most parent splits prefer too.* @param partition* @return the machine most preferred by split*/override def getPreferredLocations(partition: Partition): Seq[String] = {partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq}
}

對于CoalescedRDD來講getPartitions方法是最核心的方法。舊的parition對應哪些新的partition就是在這個方法里計算出來的。具體的算法是在DefaultPartitionCoalescercoalesce方法體現出來的。

compute方法是在新的task中執行的,即分區已經重新分配好,并且拉取父RDD指定parition對應的元素提供給下游迭代器計算。

圖示

寫下來用兩張圖解釋下是如何repartition

無shuffle

f64a60fdf06c5d550098d544ab9b95bf.png

有shuffle

4e759bdcdeda0607f968367be31f6c98.png

總結

以上repartition的邏輯基本就已經介紹完了。其中DefaultPartitionCoalescer中重新分區的算法邏輯并沒有展開說。這里以后如果有時間會再寫一篇詳細介紹。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/533412.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/533412.shtml
英文地址,請注明出處:http://en.pswp.cn/news/533412.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

爐石傳說服務器維護有補償嗎,爐石傳說維護公告全文及具體補償方案 爐石數據回檔怎么補償?...

爐石傳說超長維護事件的最終解決方法是回檔到1月14日,并且會有一定的補償,下面是官方公告各位親愛的爐石玩家,首先向大家抱以最誠摯的歉意,同時也感謝大家在游戲維護的這段時間的耐心等待以及關注。上周六下午(北京時間1月14日15:…

550什么意思_布草知識 | 都是羽絨,為什么價格大不同?

為什么羽絨的價格有些便宜有些這么貴呢?讓小編來告訴你!市面上羽絨制品這么多,從幾百到上萬都有,中間的區別到底在哪里呢?接下來,我們就來說說,它們到底貴在哪里~01暖不暖,看蓬松度羽…

可調整大小的div_氣液增壓缸運行速度的調整以及壓力的調節方式

點擊藍字 關注我們增壓缸的行程及噸位絕對了設備整體速度,以下的調整只是在原基礎上起到微調的作用。(1)氣液增壓缸時間調整:增壓缸四個步驟動作是依靠時間繼電器來控制的,可根據不同的產品的需求,通過時間繼電器來調整每個步驟的…

pytest測試實戰 電子書_電子書丨Selenium 3+Python 3自動化測試項目實戰:從菜鳥到高手...

▊《Selenium 3Python 3自動化測試項目實戰:從菜鳥到高手》田春成 著電子書售價:39.5元2019年9月出版Selenium是目前非常流行的一種自動化測試工具。本書基于Python 3語言講述了新的Selenium 3的基本理論與操作,涉及各種高級應用,…

phpcms移動端和pc端_移動端調試大法

文章:樊秀寶(北京中心—小易F8技術小組)排版:suny在日常項目中的開發中,接觸移動端開發的小伙伴們免不了要和移動端調試打交道。本文總結了常用的移動端調試方法,歡迎大家學習和補充。01谷歌瀏覽器谷歌瀏覽器是我們前端開發中必不…

redis 中一個字段 修改map_CTO 指名點姓讓我帶頭沖鋒,熬了一個通宵,終于把Redis中7千萬個Key刪完了...

由于有一條業務線不理想,高層決定下架業務。對于我們技術團隊而言,其對應的所有服務器資源和其他相關資源都要釋放。釋放了 8 臺應用服務器;1 臺 ES 服務器;刪除分布式定時任務中心相關的業務任務;備份并刪除 MySQL 數…

太陽花圖片_長壽花扔水里,光長葉不開花?趕緊加點營養液

養個花可不簡單,春天一到還得操心換盆、換土,如果你像偷懶的話,還不如養些能水培的花,給它一杯水就夠了,實在太省心啦!銅錢草銅錢草實在太好養了,摘一枝放在水杯里就能活,還挺有意境…

hystrix 全局熔斷_跟我學Spring Cloud(Finchley版)14Feign使用Hystrix

Feign默認已經整合了Hystrix,本節詳細探討Feign使用Hystrix的具體細節。服務降級1 加配置,默認Feign是不啟用Hystrix的,需要添加如下配置啟用Hystrix,這樣所有的Feign Client都會受到Hystrix保護!feign:hystrix:enable…

ubuntu 改屏幕分辨率命令_Ubuntu被曝嚴重漏洞!!!

GitHub安全研究員Kevin Backhouse發現的一個Ubuntu系統大漏洞。無需系統密碼,就能添加新的sudo用戶、獲取root權限,事后還能刪除不留痕跡。這種攻擊方法非常簡單,Backhouse在官方博客中寫道:“使用終端中的一些簡單命令&#xff0…

swag您的裝置不支持_一件充滿意境的中國風水墨粒子、電子屏風交互裝置

不久前有人留言怎么用粒子做水墨,今天投石科技給大家分享個水墨粒子裝置作品案例,大家可以發揮自己的想象去做中國風的一些東西,希望能對大家有些幫助吧。《墨跡》這是一個數字山水畫的交互裝置,它通過攝像頭捕捉手部運動進行互動…

弱電工程集成商_弱電工程樓宇自控系統基礎知識培訓資料

前言:弱電行業里面樓宇自控系統是非常難的一個子系統,涉及到很多其他專業,樓宇自控系統的設計一般為廠家設計,但是也有系統集成商來設計的,樓宇自控系統主要學習它的控制原理,學習完以后學習DDC箱子的繪制&…

刪除單元格_VBA(實驗1)用VBA 刪除某列空單元格的3種方法:刪除法,轉移到其他列方法,數組方法...

1 要解決的問題:刪除某列中的空單元格/空行暫時只實現了刪除一列中的空行,并沒有實現多行的判斷空行和刪除方法。----之后再做更復雜的1.1 需求分析用VBA刪除如下內容,解決思路都不同刪除1列的空行(本文要做的)刪除整個…

安卓qpythonttsspeak_當python遇到Android手機 那么,萬物皆可盤

囂張開場今天不跟大家講python知識,是不很失望?No,看過了今天的內容,你python的裝13指數,至少上升1w,并附帶暴擊、濺射、眩暈、致盲效果。沒錯,就是這么囂張.....當python遇到安卓手機我們日常的…

winpe制作u盤啟動盤_怎么制作u盤啟動盤 u盤啟動盤制作方法【介紹】

使用u盤裝系統時就需要先將u盤制作成一個啟動u盤,這樣才能夠通過u盤啟動裝系統操作,那么 如何制作u盤啟動盤 呢?為此,今天我們就為小伙伴們詳細的介紹 怎樣制作u盤啟動盤 的操作。制作u盤啟動盤準備工作:① 、準備一個空間容量大…

插入空行_如何一鍵插入表格空行,這個方法才最高級!

100萬職場人都在看后臺回復禮包領199元職場干貨很久很久之前,小可教過大家如何一鍵刪除空行,回顧請戳→《如何一鍵刪除表格空行,這個方法才最高級!》這次,小可反過來,教大家如何一鍵插入很多空行&#xff0…

的控制臺主題_【12.11最新版】芯片機/大氣層主題軟件NXThemesInstaller

Switch的主題的安裝和管理主要通過自制軟件——NXThemesInstaller軟件地址:https://github.com/exelix11/SwitchThemeInjector本文只傳了工具,主題需要自行去下載,可以按照自己喜歡的更換!!教程簡單概括如下這是最常見…

數據卡片_E015 如何批量匯總工作簿數據,形成獨立工作簿信息卡片

Hi,How are you doing?我是職場編碼(CodeVoc)。在E000中,我們介紹了Node.js、Ruby、Electron等工具下載安裝。這期,給你演示一下由Electron聯合Ruby制作的小工具。知乎視頻?www.zhihu.com借助Electron官方Demo&#…

linux 編譯3g驅動_linux重新編譯內核

1.內容簡介linux內核簡介linux內核版本號linux為什么重新編譯內核linux內核編譯模式linux內核功能劃分linux內核編譯步驟2linux內核簡介內核,是一個操作系統的核心。它負責管理系統的進程、內存、設備驅動、文件和網絡系統,決定著系統的性能和穩定性。3.…

r語言平均值顯著性檢驗_R語言:常用統計檢驗方法

轉自http://blog.sciencenet.cn/home.php?modspace&uid255662&doblog&id240107正態總體均值的假設檢驗t檢驗單個總體例一某種元件的壽命X(小時),服從正態分布,N(mu,sigma^2),其中mu,sigma^2均未知,16只元件的壽命如下…

redis哨兵模式沒有切換主機_Redis哨兵(Sentinel)模式

Redis哨兵(Sentinel)模式在這里插入圖片描述一、主從復制高可用當我們使用主從復制出現的問題手動故障轉移寫能力和存儲能力受限主從復制 -master 宕機故障處理主從切換技術的方法是:當主服務器宕機后,需要手動把一臺從服務器切換為主服務器,…