RDD介紹

RDD設計背景

在實際應用中,存在許多迭代式計算,這些應用場景的共同之處是 : 不同計算階段之間會重用中間結果,即一個階段的輸出結果會作為下一個階段的輸入.
而目前的MapReduce框架都是把中間結果寫入到HDFS中,帶來了大量的數據復制、磁盤IO和序列化開銷;
如果能將結果保存在內存當中,就可以大量減少IO.
RDD就是為了滿足這種需求而出現的,它提供了一個抽象的數據架構,我們不必擔心底層數據的分布式特性,只需將具體的應用邏輯表達為一系列轉換處理,
不同RDD之間的轉換操作形成依賴關系,可以實現管道化,從而避免了中間結果的落地存儲,大大降低了數據復制、磁盤IO和序列化開銷,最終加快計算速度.

RDD概念

RDD,彈性分布式數據集.
一個RDD就是一個分布式對象集合,本質上是一個只讀的分區記錄集合;
一個RDD可以分成多個分區,每個分區就是一個數據集片段(HDFS上的塊);
一個RDD的不同分區可以被保存到集群中不同的節點上,從而可以在集群中的不同節點上進行并行計算.
彈性:既可以存儲在內存又可以存儲在磁盤
分布式:可以被分成多個分區,不同的分區可以被保存到不同的節點上進行并行計算
數據集:本質上是一個只讀的分區記錄集合
RDD提供了一種高度受限的共享內存模型,即RDD是只讀的記錄分區的集合;
不能直接修改,只能基于穩定的物理存儲中的數據集來創建RDD,或者通過在其他RDD上執行確定的轉換操作(如map、join和groupBy)而創建得到新的RDD.RDD提供了一組豐富的操作以支持常見的數據運算,分為'行動'(Action)'轉換'(Transformation)兩種類型,前者用于執行計算并指定輸出的形式,后者指定RDD之間的相互依賴關系.
兩類操作的主要區別是:轉換操作(比如map、filter、groupBy、join等)接受RDD并返回RDD,而行動操作(比如count、collect等)接受RDD但是返回非RDD(即輸出一個值或結果).

RDD執行過程

Spark用Scala語言實現了RDD的API,程序員可以通過調用API實現對RDD的各種操作.
RDD典型的執行過程如下:
1. 讀入外部數據源(或者內存中的集合)創建RDD;
2. RDD經過一系列的'Transformation'操作,每一次都會產生不同的RDD,供給下一個'Transformation'使用;
3. 最后一個RDD經'Action'操作進行處理,并輸出到外部數據源(或者變成Scala/JAVA集合或變量).
需要說明的是,RDD采用了惰性調用,即在RDD的執行過程中,真正的計算發生在RDD的'Action'操作,
對于'Action'之前的所有'Transformation'操作,Spark只是記錄下'Transformation'操作應用的一些基礎數據集以及RDD生成的軌跡,即相互之間的依賴關系,而不會觸發真正的計算.

在這里插入圖片描述

從輸入中邏輯上產生了A和C兩個RDD,經過一系列'Transformation'操作,邏輯上生成了F(也是一個RDD),之所以說是邏輯上,是因為這時候計算并沒有發生,Spark只是記錄了RDD之間的生成和依賴關系.
也就當F要進行輸出時,是當F進行'Action'操作的時候,Spark才會根據RDD的依賴關系生成DAG,并從起點開始真正的計算.

在這里插入圖片描述

血緣關系

上訴一系列處理稱為一個'血緣關系(Lineage)',即DAG拓撲排序的結果.
采用惰性調用,通過血緣關系連接起來的一系列RDD操作就可以實現管道化(pipeline),避免了多次轉換操作之間數據同步的等待,
而且不用擔心有過多的中間數據,因為具有血緣關系的操作都管道化了,一個操作得到的結果不需要保存為中間數據,而是直接管道式地流入到下一個操作進行處理.
同時,這種通過血緣關系就把一系列操作進行管道化連接的設計方式,也使得管道中每次操作的計算變得相對簡單,保證了每個操作在處理邏輯上的單一性;
相反,在MapReduce的設計中,為了盡可能地減少MapReduce過程,在單個MapReduce中會寫入過多復雜的邏輯.

RDD特性

總體而言,Spark采用RDD以后能夠實現高效計算的主要原因如下:
(1) 高效的容錯性
現有的分布式共享內存、鍵值存儲、內存數據庫等,為了實現容錯,必須在集群節點之間進行數據復制或者記錄日志,即在節點之間會發生大量的數據傳輸,這對于數據密集型應用而言會帶來很大的開銷.
而在RDD的設計中,數據只讀,不可修改,如果需要修改數據,必須從父RDD轉換到子RDD,由此在不同RDD之間建立了血緣關系;所以,RDD是一種天生具有容錯機制的特殊集合,
不需要通過數據冗余的方式(比如詳細的記錄操作的日志)實現容錯,而只需通過RDD父子依賴(血緣)關系重新計算得到丟失的分區來實現容錯,無需回滾整個系統,這樣就避免了數據復制的高開銷,
而且重算過程可以在不同節點之間并行進行,實現了高效的容錯;
此外,RDD提供的轉換操作都是一些粗粒度的操作(比如map、filter和join),RDD依賴關系只需要記錄這種粗粒度的轉換操作,而不需要記錄具體的數據和各種細粒度操作的日志(比如對哪個數據項進行了修改),這就大大降低了數據密集型應用中的容錯開銷.
(2) 中間結果持久化到內存
數據在內存中的多個RDD操作之間進行傳遞,不需要落地到磁盤上,避免了不必要的讀寫磁盤開銷.
(3) 存放的數據可以是Java對象,避免了不必要的對象序列化和反序列化開銷.

RDD的依賴關系

RDD中不同的操作會使得不同RDD中的分區會產生不同的依賴; RDD中的依賴關系分為窄依賴(Narrow Dependency)、寬依賴(Wide Dependency).
寬依賴: 一個父RDD的一個分區對應一個子RDD的多個分區; 一對多,伴有shuffle過程.
窄依賴: 一個父RDD的分區對應于一個子RDD的分區,或多個父RDD的分區對應于一個子RDD的分區;一對一或多對一.
總結:如果父RDD的一個分區只被一個子RDD的一個分區所使用就是窄依賴,否則就是寬依賴.

在這里插入圖片描述

窄依賴典型的操作包括map、filter、union等,寬依賴典型的操作包括groupByKey、sortByKey等;
對于連接(join)操作,可以分為兩種情況:
1.對輸入進行協同劃分,屬于窄依賴.
協同劃分(co-partitioned)是指多個父RDD的某一分區的所有'鍵(key)'落在子RDD的同一個分區內,不會產生同一個父RDD的某一分區落在子RDD的兩個分區的情況.
2.對輸入做非協同劃分,屬于寬依賴.
對于窄依賴的RDD,可以以流水線的方式計算所有父分區,不會造成網絡之間的數據混合;
對于寬依賴的RDD,則通常伴隨著Shuffle操作,即首先需要計算好所有父分區數據,然后在節點之間進行Shuffle.

階段劃分(stage)

Spark通過分析各個RDD的依賴關系生成了DAG,再通過分析各個RDD中的分區之間的依賴關系來決定如何劃分階段,
具體劃分方法是:在DAG中進行反向解析,遇到寬依賴就斷開,遇到窄依賴就把當前的RDD加入到當前的階段中;將窄依賴盡量劃分在同一個階段中,可以實現流水線計算.
例如,假設從HDFS中讀入數據生成3個不同的RDD(即A、C和E),通過一系列轉換操作后再將計算結果保存回HDFS;
對DAG進行解析時,在依賴圖中進行反向解析,由于從RDD A到RDD B的轉換以及從RDD B和F到RDD G的轉換,都屬于寬依賴,因此,在寬依賴處斷開后可以得到三個階段,即階段1、階段2和階段3.
可以看出,在階段2中,從map到union都是窄依賴,這兩步操作可以形成一個流水線操作,
比如,分區7通過map操作生成的分區9,可以不用等待分區8到分區10這個轉換操作的計算結束,而是繼續進行union操作,轉換得到分區13,這樣流水線執行大大提高了計算的效率.

在這里插入圖片描述

由上述論述可知,把一個DAG圖劃分成多個'stage'以后,每個階段都代表了一組關聯的、相互之間沒有Shuffle依賴關系的任務組成的任務集合;
每個任務集合會被提交給任務調度器(TaskScheduler)進行處理,由任務調度器將任務分發給Executor運行.

Spark算子

RDD支持兩種類型的操作:
Transformation:從一個RDD轉換為一個新的RDD.
Action:基于一個數據集進行運算(引起Job運算),并返回RDD.
例如,map是一個Transformation操作,map將數據集的每一個元素按指定的函數轉換為一個RDD返回;reduce是一個action操作.
Spark的所有Transformation操作都是懶執行,它們并不立馬執行,而是先記錄對數據集的一系列Transformation操作;這種設計讓Spark的運算更加高效.
例如,對一個數據集map操作之后使用reduce只返回結果,而不返回龐大的map運算的結果集.
默認情況下,每個轉換的RDD在執行不同Action操作時都會重新計算;即使兩個Action操作會使用同一個轉換的RDD,該RDD也會重新計算.
除非使用persist方法或cache方法將RDD緩存到內存,這樣在下次使用這個RDD時將會提高計算效率,也支持將RDD持久化到硬盤上或在多個節點上復制.

Transformation算子

下面列出了Spark常用的transformation操作,詳細的細節請參考RDD API文檔(Scala、Java、Python、R)和鍵值對RDD方法文檔(Scala、Java).map(func)
將原來RDD的每個數據項,使用map中用戶自定義的函數func進行映射,轉變為一個新的元素,并返回一個新的RDD.filter(func)
使用函數func對原RDD中數據項進行過濾,將符合func中條件的數據項組成新的RDD返回.flatMap(func)
類似于map,但是輸入數據項可以被映射到0個或多個輸出數據集合中,所以函數func的返回值是一個數據項集合而不是一個單一的數據項.mapPartitions(func)
類似于map,但是該操作是在每個分區上分別執行,所以當操作一個類型為T的RDD時func的格式必須是Iterator<T> => Iterator<U>.
即mapPartitions需要獲取到每個分區的迭代器,在函數中通過這個分區的迭代器對整個分區的元素進行操作.mapPartitionsWithIndex(func)
類似于mapPartitions,但是需要提供給func一個整型值,這個整型值是分區的索引,所以當處理T類型的RDD時,func的格式必須為(Int, Iterator<T>) => Iterator<U>.union(otherDataset)
返回原數據集和參數指定的數據集合并后的數據集;
使用union函數時需要保證兩個RDD元素的數據類型相同,返回的RDD數據類型和被合并的RDD元素數據類型相同;
該操作不進行去重操作,返回的結果會保存所有元素;如果想去重,可以使用distinct().intersection(otherDataset)
返回兩個數據集的交集.distinct([numTasks]))
將RDD中的元素進行去重操作.groupByKey([numTasks])
操作(K,V)格式的數據集,返回(K, Iterable)格式的數據集.
注意,如果分組是為了按key進行聚合操作(例如,計算sum、average),此時使用reduceByKey或aggregateByKey計算效率會更高.
注意,默認情況下,并行情況取決于父RDD的分區數,但可以通過參數numTasks來設置任務數.reduceByKey(func, [numTasks])
使用給定的func,將(K,V)對格式的數據集中key相同的值進行聚集,其中func的格式必須為(V,V) => V,可選參數numTasks可以指定reduce任務的數目.aggregateByKey(zeroValue)(seqOp, combOp,[numTasks])(K,V)格式的數據按key進行聚合操作,聚合時使用給定的合并函數和一個初始值,返回一個(K,U)對格式數據;
需要指定的三個參數:zeroValue為在每個分區中,對key值第一次讀取V類型的值時,使用的U類型的初始變量;
seqOp用于在每個分區中,相同的key中V類型的值合并到zeroValue創建的U類型的變量中;combOp是對重新分區后兩個分區中傳入的U類型數據的合并函數.sortByKey([ascending], [numTasks])
(K,V)格式的數據集,其中K已實現了Ordered,經過sortByKey操作返回排序后的數據集,指定布爾值參數ascending來指定升序或降序排列.join(otherDataset, [numTasks])
用于操作兩個鍵值對格式的數據集,操作兩個數據集(K,V)(K,W)返回(K,(V, W))格式的數據集,通過leftOuterJoin、rightOuterJoin、fullOuterJoin完成外連接操作.cogroup(otherDataset, [numTasks])
用于操作兩個鍵值對格式數據集(K,V)(K,W),返回數據集格式為(K,(Iterable, Iterable)).這個操作也稱為groupWith.
對在兩個RDD中的Key-Value類型的元素,每個RDD相同Key的元素分別聚合為一個集合,并且返回兩個RDD中對應Key的元素集合的迭代器.cartesian(otherDataset)
對類型為T和U的兩個數據集進行操作,返回包含兩個數據集所有元素對的(T,U)格式的數據集;即對兩個RDD內的所有元素進行笛卡爾積操作.pipe(command, [envVars])
以管道(pipe)方式將RDD的各個分區(partition)使用shell命令處理(比如一個 Perl或 bash腳本),
RDD的元素會被寫入進程的標準輸入(stdin),將進程返回的一個字符串型 RDD(RDD of strings),以一行文本的形式寫入進程的標準輸出(stdout)中.coalesce(numPartitions)
把RDD的分區數降低到通過參數numPartitions指定的值,在得到的更大一些數據集上執行操作,會更加高效.repartition(numPartitions)
隨機地對RDD的數據重新洗牌(Reshuffle),從而創建更多或更少的分區,以平衡數據,總是對網絡上的所有數據進行洗牌(shuffles).repartitionAndSortWithinPartitions(partitioner)
根據給定的分區器對RDD進行重新分區,在每個結果分區中,按照key值對記錄排序,這在每個分區中比先調用repartition再排序效率更高,因為它可以將排序過程在shuffle操作的機器上進行.

Action算子

下面列出了Spark支持的常用的action操作,詳細請參考RDD API文檔(Scala、Java、Python、R)和鍵值對RDD方法文檔(Scala、Java).reduce(func)
使用函數func聚集數據集中的元素,這個函數func輸入為兩個元素,返回為一個元素;這個函數應該符合結合律和交換率,這樣才能保證數據集中各個元素計算的正確性.collect()
在驅動程序中,以數組的形式返回數據集的所有元素;通常用于filter或其它產生了大量小數據集的情況.count()
返回數據集中元素的個數.first()
返回數據集中的第一個元素,類似于take(1).take(n)
返回數據集中的前n個元素,類似于sql中的limit.takeOrdered(n,[ordering])
返回RDD按自然順序或自定義順序排序后的前n個元素.saveAsTextFile(path)
將數據集中的元素以文本文件或文本文件集合的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中;
Spark將在每個元素上調用toString方法,將數據元素轉換為文本文件中的一行記錄.saveAsSequenceFile(path) (Java and Scala)
將數據集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中;
該操作只支持對實現了Hadoop的Writable接口的鍵值對RDD進行操作,在Scala中,還支持隱式轉換為Writable的類型(Spark包括了基本類型的轉換,例如Int、Double、String等).saveAsObjectFile(path) (Java and Scala)
將數據集中的元素以簡單的Java序列化的格式寫入指定的路徑;這些保存該數據的文件,可以使用SparkContext.objectFile()進行加載.countByKey()
僅支持對(K,V)格式的鍵值對類型的RDD進行操作;返回(K,Int)格式的Hashmap,(K,Int)為每個key值對應的記錄數目.foreach(func)
對數據集中每個元素使用函數func進行處理.

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/15480.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/15480.shtml
英文地址,請注明出處:http://en.pswp.cn/web/15480.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

為何程序員35歲就開始被嫌棄了?程序員該如何避免中年危機?

文章目錄 一、為何程序員35歲就開始被嫌棄了&#xff1f;1、技術更新迅速2、職業發展瓶頸3、成本考慮4、年齡歧視5、市場供需變化6、個人因素 二、程序員該如何避免中年危機&#xff1f;1、持續學習與技能更新2、拓展技術廣度與深度3、提升軟技能4、關注行業趨勢與市場變化5、建…

vue3 input輸入框輸入限制(數字)

輸入框限制輸入的內容格式&#xff0c;如&#xff08;金額&#xff0c;數字&#xff09; 金額限制小數點后2位數 <el-input placeholder"請填寫費用" v-model"formMoney.total_money" keyup"formMoney.total_money checkPrice(formMoney.total_…

20240521(代碼整潔和測試入門學習)

測試: 1.測試工程師、測試工具開發工程師、自動化測試工程師。 python&#xff1a; 1、發展背景和優勢&#xff1b; 2、開始多需的工具 interpreter(解釋器) refactor(重構) 2、變量和注釋的基礎語法 3、輸入輸出 i 1 for i in range(1, 11): print(i, end ) 不換行打印…

jupyter notebook 實現聯邦學習模型

聯邦學習(Federated Learning)是一種機器學習框架,它允許多個參與方(例如,移動設備或服務器)在本地數據集上訓練模型,而無需將數據集中到一個位置。這有助于保護數據隱私,并允許在分布式環境中進行模型訓練。 要在Jupyter Notebook中實現聯邦學習模型,你可以遵循以下…

性能大爆炸!為你的Matomo換一個高性能的環境!

隨著我的 Matomo 越來越大&#xff0c;功能需求的增多&#xff0c;插件也變得越來越多&#xff0c;使用傳統的LNMP架構或者LAMP架構都會發現性能正在急劇下級&#xff0c;為此&#xff0c;我們發現了使用FrankenPHP&#xff08;以下簡稱FPHP&#xff09;的方案 首先&#xff0…

Android kotlin協程

說明 可代替線程整異步可控制&#xff0c;靈活 &#xff08;控制優先級&#xff0c;內存占用等&#xff09;速度快 效率高有數量上限 使用 runBlocking 一般用于測試 不建議使用GlobalScope.launch 全局的 生命周期跟隨application 不建議使用CoroutineScope(job) 用 基本使…

櫻花下落的速度是每秒5厘米,我們的心又該以什么速度去接近呢

櫻花下落的速度是每秒五厘米。5年前第一次接觸秒速五厘米的時候&#xff0c;我還在念初中&#xff0c;那時候的我尚且理解不了作品里的太多東西&#xff0c;只是為那輛列車隔開了明里和貴樹感到悲傷&#xff0c;為他們二人那段無疾而終的感情感到遺憾。五年后再一次重溫&#x…

GEE批量導出逐日、逐月、逐季節和逐年的遙感影像(以NDVI為例)

影像導出 1.逐日數據導出2.逐月數據導出3.季節數據導出4.逐年數據導出 最近很多小伙伴們私信我&#xff0c;問我如何高效導出遙感數據&#xff0c;從逐日到逐季度&#xff0c;我都有一套自己的方法&#xff0c;今天就來和大家分享一下&#xff01; ??&#x1f50d;【逐日導出…

Scala 入門介紹和環境搭建

一、簡介 Scala 是一門以 Java 虛擬機&#xff08;JVM&#xff09;為運行環境并將面向對象和函數式編程的最佳特性結合在一起的靜態類型編程語言 (靜態語言需要提前編譯&#xff0c;如&#xff1a;Java、c、c 等&#xff0c;動態語言如&#xff1a;js)Scala 是一門多范式的編程…

【介紹下Pwn,什么是Pwn?】

&#x1f308;個人主頁: 程序員不想敲代碼啊 &#x1f3c6;CSDN優質創作者&#xff0c;CSDN實力新星&#xff0c;CSDN博客專家 &#x1f44d;點贊?評論?收藏 &#x1f91d;希望本文對您有所裨益&#xff0c;如有不足之處&#xff0c;歡迎在評論區提出指正&#xff0c;讓我們共…

CSS3文字與字體

文字與字體 @font-face 用途:定義一種自定義字體,使其可以在網頁中使用。通過@font-face規則,可以指定字體名稱、來源(通常是URL)以及字體的各種變體(如常規、粗體、斜體等)。 @font-face {font-family: MyCustomFont;src: url(mycustomfont.woff2) format(woff2

馮喜運:5.25黃金價格和原油價格加速看跌?未來如何走勢?

【黃金消息面分析】&#xff1a;本周黃金市場經歷劇烈波動&#xff0c;金價創下五個半月來最糟糕的單周表現&#xff0c;盡管周五因美元下跌小幅回升。美聯儲的鷹派立場和美國經濟數據強勁削弱了降息預期&#xff0c;導致金價承壓。然而&#xff0c;分析師對未來金價走勢看法不…

Rolla‘s homework:Image Processing with Python Final Project

對比學習Yolo 和 faster rcnn 兩種目標檢測 要求 Image Processing with Python Final Project Derek TanLoad several useful packages that are used in this notebook:Image Processing with Python Final Project Project Goals: ? Gain an understanding of the object …

leetcode 1049.最后一塊石頭的重量II

思路&#xff1a;01背包 其實這道題我們可以轉化一下&#xff0c;乍一看有點像區間dp&#xff0c;很像區間合并那種類型。 但是&#xff0c;后來發現&#xff0c;這道題的精髓在于你如何轉成背包問題。我們可以把這個石頭分成兩堆&#xff0c;然后求出來這兩堆的最小差值就行…

使用git生成SSH公鑰,并設置SSH公鑰

1、在git命令行里輸入以下命令 ssh-keygen -t rsa 2、按回車&#xff0c;然后會看到以下字眼 Generating public/private rsa key pair. Enter file in which to save the key (/c/Users/xxx/.ssh/id_rsa) 例&#xff1a; 3、繼續回車&#xff0c;然后會看到以下字眼 Enter…

【面試干貨】數據庫樂觀鎖,悲觀鎖的區別,怎么實現

【面試干貨】數據庫樂觀鎖&#xff0c;悲觀鎖的區別&#xff0c;怎么實現 1、樂觀鎖&#xff0c;悲觀鎖的區別2、總結 &#x1f496;The Begin&#x1f496;點點關注&#xff0c;收藏不迷路&#x1f496; 1、樂觀鎖&#xff0c;悲觀鎖的區別 悲觀鎖&#xff08;Pessimistic Lo…

web前端框架設計第十課-組件

web前端框架設計第十課-組件 一.預習筆記 組件&#xff1a;Vue最強大的功能之一 1.局部組件注冊 注意事項&#xff1a;template標簽中只能有一個根元素 2.全局組件的注冊 注意事項&#xff1a;組件名的大小寫需要注意&#xff08;實踐&#xff09; 3.案例&#xff08;查詢框…

Vivado 使用教程(個人總結)

Vivado 是 Xilinx 公司推出的一款用于 FPGA 設計的集成開發環境 (IDE)&#xff0c;提供了從設計輸入到實現、驗證、調試和下載的完整流程。本文將詳細介紹 Vivado 的使用方法&#xff0c;包括項目創建、設計輸入、約束文件、綜合與實現、仿真、調試、下載配置等步驟。 一、創建…

設計模式--責任鏈模式

責任鏈模式是一種行為設計模式&#xff0c;它允許將請求沿著處理者鏈進行發送。請求會沿鏈傳遞&#xff0c;直到某個處理者對象負責處理它。這種模式在許多應用場景中非常有用&#xff0c;例如在處理用戶輸入、過濾請求以及實現多級審核時。 應用場景 處理用戶輸入&#xff1…

kafka之consumer參數auto.offset.reset

Kafka的auto.offset.reset 參數是用于指定消費者在啟動時如何處理偏移量&#xff08;offset&#xff09;的。這個參數有三個主要的取值&#xff1a;earliest、latest和none。 earliest&#xff1a; 當各分區下有已提交的offset時&#xff0c;從提交的offset開始消費&#xff1b…