- 目錄
- 前言:
- 1、Spark概述
- 1.1、什么是Spark(官網:http://spark.apache.org)
- 1.2、為什么要學Spark
- 1.3、Spark特點
- 2、RDD概述
- 2.1、什么是RDD
- 2.2、RDD的屬性
- 2.3、創建RDD的兩種方式
- 2.4、RDD編程API
- 2.5、RDD的依賴關系
- 2.6、RDD的緩存
- 2.7、DAG的生成
- 總結:
目錄
前言:
本篇文章只是簡單介紹下Spark,然后對Spark的RDD在做一個全面的介紹。由于博主知識有限,這里只是做一個簡單的介紹。若有些地方有問題,請大家及時指出。后續隨著深入的學習,會再進一步總結自己的學習成果。
1、Spark概述
1.1、什么是Spark(官網:http://spark.apache.org)
Spark是一種快速、通用、可擴展的大數據分析引擎,2009年誕生于加州大學伯克利分校AMPLab,2010年開源,2013年6月成為Apache孵化項目,2014年2月成為Apache頂級項目。目前,Spark生態系統已經發展成為一個包含多個子項目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項目,Spark是基于內存計算的大數據并行計算框架。Spark基于內存計算,提高了在大數據環境下數據處理的實時性,同時保證了高容錯性和高可伸縮性,允許用戶將Spark部署在大量廉價硬件之上,形成集群。Spark得到了眾多大數據公司的支持,這些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、騰訊、京東、攜程、優酷土豆。當前百度的Spark已應用于鳳巢、大搜索、直達號、百度大數據等業務;阿里利用GraphX構建了大規模的圖計算和圖挖掘系統,實現了很多生產系統的推薦算法;騰訊Spark集群達到8000臺的規模,是當前已知的世界上最大的Spark集群。
1.2、為什么要學Spark
中間結果輸出:基于MapReduce的計算引擎通常會將中間結果輸出到磁盤上,進行存儲和容錯。出于任務管道承接的,考慮,當一些查詢翻譯到MapReduce任務時,往往會產生多個Stage,而這些串聯的Stage又依賴于底層文件系統(如HDFS)來存儲每一個Stage的輸出結果
Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生態系統,以彌補MapReduce的不足。
1.3、Spark特點
1.3.1、快
與Hadoop的MapReduce相比,Spark基于內存的運算要快100倍以上,基于硬盤的運算也要快10倍以上。Spark實現了高效的DAG執行引擎,可以通過基于內存來高效處理數據流。
1.3.2、易用
Spark支持Java、Python和Scala的API,還支持超過80種高級算法,使用戶可以快速構建不同的應用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在這些shell中使用Spark集群來驗證解決問題的方法。
1.3.3、通用
Spark提供了統一的解決方案。Spark可以用于批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不同類型的處理都可以在同一個應用中無縫使用。Spark統一的解決方案非常具有吸引力,畢竟任何公司都想用統一的平臺去處理遇到的問題,減少開發和維護的人力成本和部署平臺的物力成本。
1.3.4、兼容性
Spark可以非常方便地與其他的開源產品進行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作為它的資源管理和調度器,并且可以處理所有Hadoop支持的數據,包括HDFS、HBase和Cassandra等。這對于已經部署Hadoop集群的用戶特別重要,因為不需要做任何數據遷移就可以使用Spark的強大處理能力。Spark也可以不依賴于第三方的資源管理和調度器,它實現了Standalone作為其內置的資源管理和調度框架,這樣進一步降低了Spark的使用門檻,使得所有人都可以非常容易地部署和使用Spark。此外,Spark還提供了在EC2上部署Standalone的Spark集群的工具。
2、RDD概述
2.1、什么是RDD
RDD(Resilient Distributed Dataset)叫做分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、里面的元素可并行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在內存中,后續的查詢能夠重用工作集,這極大地提升了查詢速度。
2.2、RDD的屬性
一組分片(Partition),即數據集的基本組成單位。對于RDD來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度。用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的CPU Core的數目。
一個計算每個分區的函數。Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不需要保存每次計算的結果。
RDD之間的依賴關系。*RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關系。在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。*
一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基于哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner。只有對于于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。
一個列表,存儲存取每個Partition的優先位置(preferred location)。對于一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的存儲位置。
2.3、創建RDD的兩種方式
由一個已經存在的Scala集合創建。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))由外部存儲系統的數據集創建,包括本地的文件系統,還有所有Hadoop支持的數據集,比如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile(“hdfs://node1.itcast.cn:9000/words.txt”)
2.4、RDD編程API
2.4.1、Transformation
RDD中的所有轉換都是延遲加載的,也就是說,它們并不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。
常用的Transformation:
轉換 含義
map(func) 返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成
filter(func) 返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成
flatMap(func) 類似于map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
mapPartitions(func) 類似于map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 類似于mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根據fraction指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed用于指定隨機數生成器種子
union(otherDataset) 對源RDD和參數RDD求并集后返回一個新的RDD
intersection(otherDataset) 對源RDD和參數RDD求交集后返回一個新的RDD
distinct([numTasks])) 對源RDD進行去重后返回一個新的RDD
groupByKey([numTasks]) 在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 與sortByKey類似,但是更靈活
join(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD
cartesian(otherDataset) 笛卡爾積
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)
2.4.2、 Action
動作 含義
reduce(func) 通過func函數聚集RDD中的所有元素,這個功能必須是課交換且可并聯的
collect() 在驅動程序中,以數組的形式返回數據集的所有元素
count() 返回RDD的元素個數
first() 返回RDD的第一個元素(類似于take(1))
take(n) 返回一個由數據集的前n個元素組成的數組
takeSample(withReplacement,num, [seed]) 返回一個數組,該數組由從數據集中隨機采樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用于指定隨機數生成器種子
takeOrdered(n, [ordering])
saveAsTextFile(path) 將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對于每個元素,Spark將會調用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path) 將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。
saveAsObjectFile(path)
countByKey() 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。
foreach(func) 在數據集的每一個元素上,運行函數func進行更新。
2.5、RDD的依賴關系
RDD和它依賴的父RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
2.5.1、窄依賴
窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用
總結:窄依賴我們形象的比喻為獨生子女
2.5.2、寬依賴
寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition
總結:寬依賴我們形象的比喻為超生
2.6、RDD的緩存
Spark速度非常快的原因之一,就是在不同操作中可以在內存中持久化或緩存個數據集。當持久化某個RDD后,每一個節點都將把計算的分片結果保存在內存中,并在對此RDD或衍生出的RDD進行的其他動作中重用。這使得后續的動作變得更加迅速。RDD相關的持久化和緩存,是Spark最重要的特征之一。可以說,緩存是Spark構建迭代式算法和快速交互式查詢的關鍵。
2.6.1、RDD緩存方式
RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是并不是這兩個方法被調用時立即緩存,而是觸發后面的action時,該RDD將會被緩存在計算節點的內存中,并供后面重用。
通過查看源碼發現cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
緩存有可能丟失,或者存儲存儲于內存的數據由于內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基于RDD的一系列轉換,丟失的數據會被重算,由于RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,并不需要重算全部Partition。
2.7、DAG的生成
DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就就形成了DAG,根據RDD之間的依賴關系的不同將DAG劃分成不同的Stage,對于窄依賴,partition的轉換處理在Stage中完成計算。對于寬依賴,由于有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來的計算,因此寬依賴是劃分Stage的依據。
總結:
關于本篇文章,你需要了解以下幾個知識點:
1、Spark是什么、用來干什么的、有什么特點。
2、RDD是什么、常用的RDD有哪些對應的作用是什么、RDD的依賴關系有哪些。
3、DAG是什么。