1、Spark基礎介紹
1.1、Spark基礎概念
Spark是一種基于內存的快速、通用、可擴展的大數據分析計算引擎
1.2、Spark運行架構
運行過程:
-
Driver 執行用戶程序(Application)的
main()
方法并創建 SparkContext,與 Cluster Manager 建立通信 -
Cluster Manager 為用戶程序分配計算資源,返回可供使用的 Executor 列表
-
獲取 Executor 資源后,Spark 會將用戶程序代碼及依賴包(Application jar)傳遞給 Executor
-
最后,SparkContext 發送 tasks(經拆解后的任務序列)到 Executor,由其執行計算任務并輸出結果
Driver的職責:
-
作業解析與 DAG 生成:
-
將用戶編寫的 Spark 程序(如
JavaSparkContext
)解析為有向無環圖(DAG)。
-
-
任務調度:
-
將 DAG 劃分為多個階段(Stage)和任務(Task)。
-
為每個任務分配資源并調度到 Executor 上執行。
-
-
與集群管理器通信:
-
向集群管理器Cluster Manager申請資源(Executor)。
-
監控 Executor 的狀態和資源使用情況。
-
-
結果收集:
-
接收 Executor 返回的計算結果。
-
Executor的職責:
-
執行計算任務:
-
接收 Driver 發送的任務(如
map
、reduce
等操作)。 -
在本地執行具體的計算邏輯。
-
-
內存管理:
-
緩存 RDD 和中間計算結果(如通過
persist()
或cache()
)。 -
管理任務執行時的內存分配(如堆內 / 堆外內存)。
-
-
與 Driver 通信:
-
向 Driver 匯報任務狀態(如完成、失敗)。
-
返回計算結果給 Driver。
-
Cluster Manager的職責:
-
資源分配與調度:
-
接收 Driver 的資源請求,并在集群節點上啟動相應的 Executor 進程
-
-
節點監控與故障恢復:
-
監控集群中各節點的健康狀態,在節點故障時進行處理(如重新分配資源、重啟 Executor)
-
1.3、Spark生態圈
1.3.1、數據來源
Spark支持多種數據來源。包括文件系統、數據庫、結構化數據、實時數據等。
1.3.2、運行模式
1.3.2.1、Local模式(單機)
Local模式就是運行在一臺計算機上的模式,通常就是用于在本機上練手和測試。
1.3.2.2、Standalone模式(集群)
Standalone模式是Spark自帶的資源調度引擎,構建一個由Master + Worker構成的Spark集群,Spark運行在集群中。
運行過程:
-
SparkContext連接到Master,向Master注冊并申請資源(CPU Core 和Memory);
-
Master根據SparkContext的資源申請要求和Worker心跳周期內報告的信息決定在哪個Worker上分配資源,并在該Worker上獲取資源,然后啟動StandaloneExecutorBackend;
-
StandaloneExecutorBackend向SparkContext注冊;
-
SparkContext將Applicaiton代碼發送給StandaloneExecutorBackend;并且SparkContext解析Applicaiton代碼,構建DAG圖,并提交給DAG Scheduler分解成Stage,然后以Stage提交給Task Scheduler,Task Scheduler負責將Task分配到相應的Worker,最后提交給StandaloneExecutorBackend執行;
-
StandaloneExecutorBackend會建立Executor線程池,開始執行Task,并向SparkContext報告,直至Task完成。
-
所有Task完成后,SparkContext向Master注銷,釋放資源。
1.3.2.3、Yarn模式(集群)
Spark使用Hadoop的Yarn組件進行資源與任務調度。分為Yarn-client模式和Yarn-cluster模式。
(1)Yarn-client模式(默認)
Yarn-Client模式中,Driver在客戶端本地運行,這種模式可以使得Spark Application和客戶端進行交互,因為Driver在客戶端,所以可以通過webUI訪問Driver的狀態,默認是http://{IP}:4040訪問。
運行過程:
-
Client向Yarn的ResourceManager申請啟動Application Master。同時在SparkContext初始化中將創建DAGScheduler和TASKScheduler等,由于我們選擇的是Yarn-Client模式,程序會選擇YarnClientClusterScheduler和YarnClientSchedulerBackend;
-
ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的ApplicationMaster,與YARN-Cluster區別的是在該ApplicationMaster不運行SparkContext,只與SparkContext進行聯系進行資源的分派;
-
Client中的SparkContext初始化完畢后,與ApplicationMaster建立通訊,向ResourceManager注冊,根據任務信息向ResourceManager申請資源(Container);
-
一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向Client中的SparkContext注冊并申請Task;
-
Client中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task并向Driver匯報運行的狀態和進度,以讓Client隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務;
-
應用程序運行完成后,Client的SparkContext向ResourceManager申請注銷并關閉自己。
(2)Yarn-cluster模式
Yarn-cluster模式中,Driver程序運行在由ResourceManager啟動的ApplicationMaster,適用于生產環境。
運行過程:
-
Client向Yarn中提交應用程序,包括ApplicationMaster程序、啟動ApplicationMaster的命令、需要在Executor中運行的程序等;
-
ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的ApplicationMaster,其中ApplicationMaster進行SparkContext等的初始化;
-
ApplicationMaster向ResourceManager注冊,這樣用戶可以直接通過ResourceManage查看應用程序的運行狀態,然后它將采用輪詢的方式通過RPC協議為各個任務申請資源,并監控它們的運行狀態直到運行結束;
-
一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向ApplicationMaster中的SparkContext注冊并申請Task。
-
ApplicationMaster中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task并向ApplicationMaster匯報運行的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務;
-
應用程序運行完成后,ApplicationMaster向ResourceManager申請注銷并關閉自己。
(3)Yarn-cluster 和 Yarn-client 區別與聯系
-
從廣義上講,Yarn-cluster適用于生產環境;而Yarn-client適用于交互和調試,也就是希望快速地看到application的輸出。
-
從深層次的含義講,Yarn-cluster和Yarn-client模式的區別其實就是Application Master進程的區別,Yarn-cluster模式下,driver運行在AM(Application Master)中,它負責向YARN申請資源,并監督作業的運行狀況。當用戶提交了作業之后,就可以關掉Client,作業會繼續在YARN上運行。然而Yarn-cluster模式不適合運行交互類型的作業。而Yarn-client模式下,Application Master僅僅向YARN請求executor,client會和請求的container通信來調度他們工作,也就是說Client不能離開。
1.3.2.4、Mesos模式(集群)
Spark使用Mesos平臺進行資源與任務的調度。
1.3.3、Spark核心模塊
(1)Spark Core:Spark的基礎組件,提供了分布式數據處理的核心功能,包括任務調度、內存管理、錯誤恢復、與存儲系統交互等模塊。還包含了對彈性分布式數據集(Resilient Distributed DataSet,簡稱RDD)的API定義。
(2)Spark SQL:提供了對結構化數據的處理能力,支持SQL查詢和Spark DataFrame API。
(3)Spark Streaming:用于處理實時數據流,支持高吞吐量的數據流處理。它可以將實時數據源如Kafka、Flume等集成到Spark應用中,實現實時數據處理和分析。
(4)Spark MLlib:機器學習庫,提供了豐富的機器學習算法,包括分類、回歸、聚類、協同過濾等。MLlib支持多種編程語言,如Scala、Java、Python和R。
(5)Spark GraghX:用于圖數據處理和分析的庫,支持圖計算、圖算法等。GraphX可以高效地處理大規模圖數據,適用于社交網絡分析、推薦系統等領域。
1.4、Spark與MapReduce
1.4.1、MapReduce框架
MapReduce 是一種分布式計算編程模型,由 Google 在 2004 年提出,用于大規模數據處理。
Hadoop MapReduce 是其開源實現,成為 Hadoop 生態的核心組件之一,用于在集群上并行處理海量數據。
數據處理流程是從數據源獲取數據,經過分析計算后,將結果輸出到指定位置,核心是一次計算,不適合迭代計算。
1.4.2、Spark框架
基于內存處理,可以將多個計算任務迭代,中間結果可以不落盤(注:Shuffle是落盤的)
1.4.3、Spark與MapReduce的區別
特性 | Spark | MapReduce |
開發語言 | Java+Scala | Java |
編程模型 | 多種API,多語言接口,多種操作方式(DataFrame、Dataset 和 SQL) | Map 和 Reduce |
延遲性 | 低 | 高 |
適用場景 | 迭代計算、實時處理 | 批處理 |
容錯機制 | RDD 血統 | 檢查點和重新計算 |
資源管理 | Stanalone、Yarn、Mesos | Hadoop Yarn |
數據處理類型 | 批處理、流處理、交互式查詢 | 批處理 |
處理方式 | 在MR的基礎上優化了計算過程 | 出現的較早,只考慮單一的操作 |
計算模式 | 內存計算為主 | 磁盤計算為主 |
1.5、Spark快速開始
1.5.1、安裝包下載
下載地址:Downloads | Apache Spark
1.5.2、上傳并解壓安裝包
tar -zxvf spark-3.3.1-bin-hadoop3.tgz
1.5.3、執行官方示例
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.12-3.3.1.jar \
10
參數解釋:
bin/spark-submit:提交spark任務
--class org.apache.spark.examples.SparkPi:要執行程序的主類
--master local[2]:
(1)local:沒有指定線程數,則所有計算都運行在一個線程當中,沒有任何并行計算
(2)local[K]:指定使用K個Core來運行計算,比如local[2]就是運行2個Core來執行
(3)local[*]:自動幫你按照CPU最多核來設置線程數
./examples/jars/spark-examples_2.12-3.3.1.jar:要運行的程序
10:要運行程序的輸入參數
1.5.4、結果截圖展示
2、Spark Core模塊
Spark 計算框架為了能夠進行高并發和高吞吐的數據處理,封裝了三大數據結構,用于處理不同的應用場景。三大數據結構分別是:
-
RDD : 彈性分布式數據集
-
累加器:分布式共享只寫變量
-
廣播變量:分布式共享只讀變量
2.1、RDD
2.1.1、RDD基本概念
RDD叫做彈性分布式數據集,是Spark中最基本的數據抽象。代碼中是一個抽象類,它代表一個彈性的、不可變的、可分區的、里面的元素可并行計算的集合。
-
彈性:
-
自動進行內存和磁盤數據存儲的切換
-
Spark優先把數據放到內存中,如果內存放不下,就會放到磁盤里面,程序進行自動的存儲切換
-
-
基于血統的高效容錯機制
-
在RDD進行轉換和動作的時候,會形成RDD的Lineage依賴鏈,當某一個RDD失效的時候,可以通過重新計算上游的RDD來重新生成丟失的RDD數據。
-
-
Task如果失敗會自動進行特定次數的重試
-
RDD的計算任務如果運行失敗,會自動進行任務的重新計算,默認次數是4次。
-
-
Stage如果失敗會自動進行特定次數的重試
-
如果Job的某個Stage階段計算失敗,框架也會自動進行任務的重新計算,默認次數也是4次。
-
-
Checkpoint和Persist可主動或被動觸發
-
RDD可以用Persist持久化將RDD緩存到內存或者磁盤,當再次用到該RDD時直接讀取就行。也可以將RDD進行檢查點,檢查點會將數據存儲在HDFS中,該RDD的所有父RDD依賴都會被移除。
-
-
數據調度彈性
-
Spark把這個Job執行模型抽象為通用的有向無環圖DAG,可以將多Stage的任務串聯或并行執行,調度引擎自動處理Stage的失敗以及Task的失敗。
-
-
數據分片的高度彈性
-
可以根據業務的特征,動態調整數據分片的個數,提升整體的應用執行效率。
-
不可變性:
-
RDD是不可變的,這意味著一旦創建,其內容就不能被改變。這種設計保證了RDD的只讀屬性,使得Spark能夠在分布式環境中安全地高并發并行處理數據,而不用擔心數據在處理過程中的修改。不可變性簡化了并行計算的實現,因為它允許Spark優化數據的訪問和存儲,避免了并發修改帶來的復雜性。
-
-
分區:
-
RDD是可分區的,這意味著可以將數據集分割成多個分區,每個分區可以被存儲在集群中的不同節點上。這種分區機制允許Spark并行處理數據,每個分區可以在不同的節點上同時處理,極大地提高了處理速度和效率。通過合理的分區策略,可以優化數據的本地訪問和減少跨節點通信的開銷。
-
-
并行計算:
-
RDD支持并行計算,這是Spark的核心優勢之一。通過對數據進行分區,Spark可以將計算任務分配到多個節點上并行執行。這種并行處理能力使得Spark能夠處理大規模數據集,同時保持較高的處理速度和效率。通過使用如
map
、filter
、reduce
等操作,可以在不同的分區上獨立地執行數據處理邏輯,從而實現高效的并行計算。
-
2.1.2、RDD創建方式
2.1.2.1、從集合(內存)中創建RDD:parallelize
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("spark");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> stringRDD = jsc.parallelize(Arrays.asList("hello", "spark"), 2);//2:代表分區數
2.1.2.2、從外部存儲(文件)中創建RDD
由外部存儲系統的數據集創建RDD包括:本地的文件系統,還有所有Hadoop支持的數據集,比如HDFS、HBase等。
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("spark");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lineRDD = jsc.textFile("input.text");JavaPairRDD<LongWritable, Text> hadoopRDD =sc.hadoopFile("input", TextInputFormat.class, LongWritable.class, Text.class, 2);
2.1.3.3、從其他RDD中創建
主要是通過一個RDD運算完后,再產生新的RDD。
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("spark");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> stringRDD = jsc.parallelize(Arrays.asList("hello", "spark"), 2);JavaRDD<String> mapRDD = stringRDD.map(s -> s + "123");
2.1.3、分區(Partition)和分區器(Partitioner)
2.1.3.1、分區(Partition)
分區是 RDD(彈性分布式數據集)的基本組成單位,是一個邏輯上的概念,表示數據的分片。每個分區可被單獨處理,分布在集群的不同節點上,實現并行計算。
分區數的設置優先級:
1. 優先使用方法參數
jsc.parallelize(Arrays.asList("hello", "spark"), 2);jsc.textFile("input.text", 2);
2. 使用配置參數:spark.default.parallelism
sparkConf.set("spark.default.parallelism", "4");
3. 采用環境默認總核值
sparkConf.setMaster("local[*]");
分區數據的分配規則:
針對集合(內存)中創建的RDD:
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
i:索引編號,從0開始;length:數據的數組長度;numSlices:分區數
針對外部存儲中創建的RDD:
Spark框架文件的操作沒有自己的實現的。采用MR庫(Hadoop)來實現,當前讀取文件的切片數量不是由Spark決定的,而是由Hadoop決定。
注:底層使用函數minPartitions = Math.min(defaultParallelism, 2),以上設置的分區上不一定是最終分區數
2.1.3.2、分區器(Partitioner)
分區器是一種策略,用于決定 RDD 中的鍵值對(Key-Value)如何被分配到各個分區中。Spark目前支持Hash分區器、Range分區器和用戶自定義分區器。Hash分區器為當前的默認分區器。分區器直接決定了RDD中分區的個數、RDD中每條數據經過Shuffle后進入哪個分區和Reduce的個數。
注意:
(1)只有Key-Value類型的pairRDD才有分區器,非Key-Value類型的RDD分區的值是None,數據分布由系統自動管理,通常采用輪詢(round-robin)方式分配到不同分區
(2)每個RDD的分區ID范圍:0~numPartitions-1,決定這個值是屬于那個分區的。
Hash分區器:
對于給定的key,計算其hashCode,并除以分區的個數取余,如果余數小于0,則用余數+分區的個數(否則加0),最后返回的值就是這個key所屬的分區ID。
Range分區器:
將一定范圍內的數映射到某一個分區內,盡量保證每個分區中數據量均勻,而且分區與分區之間是有序的,一個分區中的元素肯定都比另一個分區內的元素小或者大,但是分區內的元素是不能保證順序的。
用戶自定義分區器:
-
定義分區器類:繼承
org.apache.spark.Partitioner
類,并實現其抽象方法。 -
實現
numPartitions
方法:返回分區數量。 -
實現
getPartition
方法:根據鍵返回其分區ID。 -
在操作中使用自定義分區器:在執行如
groupByKey
或reduceByKey
等操作時,通過設置參數來使用自定義分區器。
2.1.4、RDD算子
將數據轉化為RDD之后,就需要進行RDD的計算,RDD提供了計算方法RDD的方法又稱為RDD算子。RDD 支持兩種類型的算子:轉換算子(transformation) 和動作算子( action)。轉換算子可以將已有RDD轉換得到一個新的RDD,而動作算子則是基于RDD的計算,并將結果返回給驅動器(driver)。
2.1.4.1、轉換算子-transformation
用于從現有 RDD 創建新的 RDD,原有的RDD保持不變,不會觸發實際計算。轉換算子是惰性的(Lazy),僅記錄計算邏輯(DAG),不立即執行。
針對Value類型(部分方法):
方法 | 解釋 | 示例 |
map()映射 | 參數f是一個函數,可以寫作匿名子類,它可以接收一個參數。當某個RDD執行map方法時,會遍歷該RDD中的每一個數據項,并依次應用f函數,從而產生一個新的RDD。 | JavaRDD<String> lineRDD = sc.textFile("input/1.txt"); JavaRDD<String> mapRDD = lineRDD.map(s -> s + "||"); |
flatMap()扁平化 | 與map操作類似,將RDD中的每一個元素通過應用f函數依次轉換為新的元素,并封裝到RDD中。 | List<List<Integer>> datas = Arrays.asList( Arrays.asList(1, 2), Arrays.asList(3, 4) ); JavaRDD<List<Integer>> rdd = jsc.parallelize(datas, 2); JavaRDD<Integer> flatMapRDD = rdd.flatMap(new FlatMapFunction<List<Integer>, Integer>() { @Override public Iterator<Integer> call(List<Integer> list) { return list.iterator(); } }); |
mapPartitions()按分區批量處理 | 對每個分區的數據批量應用函數,函數輸入為整個分區的迭代器。 | JavaRDD<Integer> integerJavaRDD = parallelize.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() { @Override public Iterator<Integer> call(Iterator<Integer> integerIterator) throws Exception { List<Integer> list = new ArrayList<>(); while (integerIterator.hasNext()) { Integer next = integerIterator.next(); list.add(next * 2); } return list.iterator(); } }); |
groupBy()分組 | 按照傳入函數的返回值進行分組。將相同的key對應的值放入一個迭代器。 groupBy會存在shuffle過程 shuffle:將不同的分區數據進行打亂重組的過程 | JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2); JavaPairRDD<Integer, Iterable<Integer>> groupByRDD = integerJavaRDD.groupBy((Function<Integer, Integer>) v1 -> v1 % 2); |
filter()過濾 | 接收一個返回值為布爾類型的函數作為參數。當某個RDD調用filter方法時,會對該RDD中每一個元素應用f函數,如果返回值類型為true,則該元素會被添加到新的RDD中。 | JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2); JavaRDD<Integer> filterRDD = integerJavaRDD.filter((Function<Integer, Boolean>) v1 -> v1 % 2 == 0); |
distinct()去重 | 對內部的元素去重,并將去重后的元素放到新的RDD中。 | JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2); JavaRDD<Integer> distinct = integerJavaRDD.distinct(); |
sortBy()排序 | 該操作用于排序數據。在排序之前,可以將數據通過f函數進行處理,之后按照f函數處理的結果進行排序,默認為正序排列。排序后新產生的RDD的分區數與原RDD的分區數一致。Spark的排序結果是全局有序。 | JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(5, 8, 1, 11, 20), 2); JavaRDD<Integer> sortByRDD = integerJavaRDD.sortBy((Function<Integer, Integer>) v1 -> v1, true, 2); |
針對Key-Value類型(部分方法):要想使用Key-Value類型的算子首先需要使用mapToPair方法轉換為PairRDD
方法 | 解釋 | 示例 |
mapValues() | 針對于(K,V)形式的類型只對V進行操作 | JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2); JavaPairRDD<String, String> pairRDD = integerJavaRDD.mapToPair((PairFunction<Integer, String, String>) integer -> new Tuple2<>(integer.toString(), integer.toString())); JavaPairRDD<String, String> mapValuesRDD = pairRDD.mapValues((Function<String, String>) v1 -> v1 + "|||"); |
groupByKey() | groupByKey對每個key進行操作,但只生成一個seq,并不進行聚合。該操作可以指定分區器或者分區數(默認使用HashPartitioner) | JavaRDD<String> integerJavaRDD = sc.parallelize(Arrays.asList("hi","hi","hello","spark" ),2); JavaPairRDD<String, Integer> pairRDD = integerJavaRDD.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)); JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey(); |
reduceByKey() | 該操作可以將RDD[K,V]中的元素按照相同的K對V進行聚合。其存在多種重載形式,還可以設置新RDD的分區數。與groupByKey()的區別在于在shuffle之前有combine(預聚合)操作,在不影響業務邏輯的前提下,優先選用reduceByKey() | JavaRDD<String> integerJavaRDD = sc.parallelize(Arrays.asList("hi","hi","hello","spark" ),2); JavaPairRDD<String, Integer> pairRDD = integerJavaRDD.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)); JavaPairRDD<String, Integer> result = pairRDD.reduceByKey((Function2<Integer, Integer, Integer>) Integer::sum); |
sortByKey() | 在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD | JavaPairRDD<Integer, String> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>(4, "a"), new Tuple2<>(3, "c"), new Tuple2<>(2, "d"))); JavaPairRDD<Integer, String> pairRDD = javaPairRDD.sortByKey(false); |
2.1.4.2、動作算子-action
行動算子是觸發了整個作業的執行。
方法 | 解釋 | 示例 |
collect() | 以數組Array的形式返回數據集的所有元素 | JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2); List<Integer> collect = integerJavaRDD.collect(); |
count() | 返回RDD中元素的個數 | JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2); long count = integerJavaRDD.count(); |
first() | 返回RDD中的第一個元素 | JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2); Integer first = integerJavaRDD.first(); |
take() | 返回一個由RDD的前n個元素組成的數組 | JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2); List<Integer> list = integerJavaRDD.take(3); |
countByKey() | 統計每種key的個數 | JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 8), new Tuple2<>("b", 8), new Tuple2<>("a", 8), new Tuple2<>("d", 8))); Map<String, Long> map = pairRDD.countByKey(); |
saveAsTextFile() | 將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對于每個元素,Spark將會調用toString方法,將它轉換為文件中的文本,每條記錄占一行,適用于存儲普通文本或 JSON 等可解析的字符串數據。 | JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2); integerJavaRDD.saveAsTextFile("output"); |
saveAsObjectFile() | 將RDD中的元素序列化成對象,存儲到文件中(二進制格式保存),適用于存儲復雜對象(如自定義類或復雜結構數據),但需配合 Spark 提供的序列化框架(如 Kryo 序列化)使用。 | JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2); integerJavaRDD.saveAsObjectFile("output1"); |
foreach() | 遍歷RDD中每一個元素 | JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),4); integerJavaRDD.foreach((VoidFunction<Integer>) System.out::println); |
foreachPartition() | 遍歷RDD中每一個分區 | JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2); parallelize.foreachPartition((VoidFunction<Iterator<Integer>>) integerIterator -> { // 一次處理一個分區的數據 while (integerIterator.hasNext()) { Integer next = integerIterator.next(); System.out.println(next); } }); |
2.1.4.3、混洗算子-shuffle
混洗算子(shuffle operator)指的是那些導致Spark重新分配數據以達到重新組織數據的目的的操作。Shuffle操作通常發生在需要基于鍵(key)對數據進行重新分組或聚合時,比如在執行groupByKey
、reduceByKey
、aggregateByKey
、join
(特別是大key的join)等操作時。
示例:在執行groupByKey
時,Spark需要將具有相同鍵的數據聚合到同一個節點上,這就需要將數據從一個節點移動到包含所有具有該鍵數據的節點上。
優化混洗操作的策略
-
減少混洗的數據量:盡量減少需要混洗的數據量。例如,通過在map階段進行過濾,減少后續混洗操作的數據量。
-
使用合適的分區策略:合理設置分區數可以減少混洗的開銷。例如,在執行
groupByKey
之前使用repartition(重新分配數據,會觸發shuffle)
或coalesce(合并分區,不會觸發shuffle)
來調整數據的分區數。 -
避免寬依賴:盡量避免寬依賴(wide dependency),即盡量避免在混洗操作中使用大寬度的鍵。例如,使用
mapToPair
和reduceByKey
組合代替groupByKey
,因為reduceByKey
通常有更好的性能。 -
廣播變量:對于小數據集的join操作,使用廣播變量可以減少混洗的需求。
-
使用緩存:對頻繁使用的數據進行緩存可以減少混洗操作的重復執行。
2.1.5、RDD依賴關系
2.1.5.1、RDD血緣關系
RDD只支持粗粒度轉換,即在大量記錄上執行的單個操作。將創建RDD的一系列Lineage(血統)記錄下來,以便恢復丟失的分區。RDD的Lineage會記錄RDD的元數據信息和轉換行為,當該RDD的部分分區數據丟失時,它可以根據這些信息來重新運算和恢復丟失的數據分區。通過.toDebugString()方法查看RDD血緣關系。
(8) ParallelCollectionRDD[0] at parallelize at SparkLineage.java:30 []**************mapToPair依賴parallelize****************(8) MapPartitionsRDD[1] at mapToPair at SparkLineage.java:33 []| ParallelCollectionRDD[0] at parallelize at SparkLineage.java:30 []**************reduceByKey依賴mapToPair****************(8) ShuffledRDD[2] at reduceByKey at SparkLineage.java:36 []+-(8) MapPartitionsRDD[1] at mapToPair at SparkLineage.java:33 []| ParallelCollectionRDD[0] at parallelize at SparkLineage.java:30 []
2.1.5.2、寬依賴
寬依賴表示同一個父RDD的Partition被多個子RDD的Partition依賴(只能是一對多),會引起Shuffle。寬依賴的算子包括sort、reduceByKey、groupByKey、join和調用rePartition函數的任何操作,在不影響業務的情況下,應避免使用。
2.1.5.3、窄依賴
窄依賴表示每一個父RDD的Partition最多被子RDD的一個Partition使用(一對一or多對一)。
2.1.5.4、DAG有向無環圖
DAG(Directed Acyclic Graph)有向無環圖是由點和線組成的拓撲圖形,該圖形具有方向,不會閉環。例如,DAG記錄了RDD的轉換過程和任務的階段。
2.1.5.5、RDD任務劃分
-
Application:初始化一個SparkContext即生成一個Application;
-
Job:一個Action算子就會生成一個Job;
-
Stage:Stage等于寬依賴的個數加1;
-
Task:一個Stage階段中,最后一個RDD的分區個數就是Task的個數。
注意:Application->Job->Stage->Task每一層都是1對n的關系。
2.1.6、序列化
在實際開發中我們往往需要自己定義一些對于RDD的操作,那么此時需要注意的是,初始化工作是在Driver端進行的,而實際運行程序是在Executor端進行的,這就涉及到了跨進程通信,是需要序列化的。
示例:
在以下示例中,若User未實現implements Serializable接口,會報錯SparkException: Task not serializable和java.io.NotSerializableException: com.example.spark.User
User zhangsan = new User("zhangsan", 13);User lisi = new User("lisi", 13);JavaRDD<User> userJavaRDD = sc.parallelize(Arrays.asList(zhangsan, lisi), 2);JavaRDD<User> mapRDD = userJavaRDD.map((Function<User, User>) v1 -> new User(v1.getName(), v1.getAge() + 1));mapRDD. collect().forEach(System.out::println);
序列化方式:Java序列化(默認)、Kryo序列化(推薦)
Java的序列化能夠序列化任何的類。但是比較重,序列化后對象的體積也比較大。Spark出于性能的考慮,Spark2.0開始支持另外一種Kryo序列化機制。Kryo速度是Serializable的10倍。當RDD在Shuffle數據的時候,簡單數據類型、數組和字符串類型已經在Spark內部使用Kryo來序列化。
示例:
引入依賴(Spark已內置)
<dependency><groupId>com.esotericsoftware</groupId><artifactId>kryo</artifactId><version>5.6.1</version></dependency>
代碼中修改序列化機制和注冊要序列化的類
SparkConf conf = new SparkConf();conf.setMaster("local[*]");conf.setAppName("sparkCore");// 替換默認的序列化機制conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");// 注冊需要使用kryo序列化的自定義類conf.registerKryoClasses(new Class[]{Class.forName("com.atguigu.bean.User")});
2.1.7、RDD持久化
2.1.7.1、RDD Cache緩存
RDD通過Cache或者Persist方法將前面的計算結果緩存,默認情況下會把數據以序列化的形式緩存在JVM的堆內存中。但是并不是這兩個方法被調用時立即緩存,而是觸發后面的action算子時,該RDD將會被緩存在計算節點的內存中,并供后面重用。注意:cache操作會增加血緣關系,不改變原有的血緣關系
默認存儲級別是 StorageLevel.MEMORY_ONLY(以Java序列化方式存到內存里)。完整的存儲級別列表如下:
存儲級別 | 含義 |
MEMORY_ONLY | 以未序列化的 Java 對象形式將 RDD 存儲在 JVM 內存中。如果RDD不能全部裝進內存,那么將一部分分區緩存,而另一部分分區將每次用到時重新計算。這個是Spark的RDD的默認存儲級別。 |
MEMORY_AND_DISK | 以未序列化的Java對象形式存儲RDD在JVM中。如果RDD不能全部裝進內存,則將不能裝進內存的分區放到磁盤上,然后每次用到的時候從磁盤上讀取。 |
MEMORY_ONLY_SER | 以序列化形式存儲 RDD(每個分區一個字節數組)。通常這種方式比未序列化存儲方式要更省空間,尤其是如果你選用了一個比較好的序列化協議(fast serializer),但是這種方式也相應的會消耗更多的CPU來讀取數據。 |
MEMORY_AND_DISK_SER | 和 MEMORY_ONLY_SER 類似,只是當內存裝不下的時候,會將分區的數據吐到磁盤上,而不是每次用到都重新計算。 |
DISK_ONLY | RDD 數據只存儲于磁盤上。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2等 | 和上面沒有”_2″的級別相對應,只不過每個分區數據會在兩個節點上保存兩份副本。 |
緩存有可能丟失,或者存儲于內存的數據由于內存不足而被刪除,RDD的緩存容錯機制保證了即使緩存丟失也能保證計算的正確執行。通過基于RDD的一系列轉換,丟失的數據會被重算,由于RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,并不需要重算全部Partition。
設置緩存示例:
mapRDD.cache();mapRDD.persist(StorageLevel.MEMORY_ONLY());
2.1.7.2、RDD CheckPoint檢查點
由于血緣依賴過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果檢查點之后有節點出現問題,可以從檢查點開始重做血緣,減少了開銷。檢查點的目的是將RDD中間結果寫入磁盤。檢查點存儲路徑通常是存儲在HDFS等容錯、高可用的文件系統。檢查點數據存儲格式為:二進制的文件。檢查點觸發時間和Cache緩存一樣,不會馬上被執行,必須執行Action操作才能觸發。檢查點會切斷血緣關系。而且檢查點為了數據安全,會從血緣關系的最開始執行一遍。推薦做法是先進行Cache緩存操作,再進行CheckPoint檢查點操作。
sparkContext.setCheckpointDir("hdfs://hadoop102:9000/spark/checkpoint");//設置檢查點路徑mapRDD.cache();//執行cache方法,緩存數據mapRDD.checkpoint();//設置檢查點
2.1.7.3、緩存和檢查點的的區別
-
Cache緩存只是將數據保存起來,不切斷血緣依賴。Checkpoint檢查點切斷血緣依賴。
-
Cache緩存的數據通常存儲在磁盤、內存等地方,可靠性低。Checkpoint的數據通常存儲在HDFS等容錯、高可用的文件系統,可靠性高。
-
建議對checkpoint()的RDD使用Cache緩存,這樣checkpoint的job只需從Cache緩存中讀取數據即可,否則需要再從頭計算一次RDD。
-
如果使用完了緩存,可以通過unpersist()方法釋放緩存。
2.2、累加器
累加器:分布式共享只寫變量
累加器支持在所有節點間進行累加操作(如計數或求和),但僅允許驅動程序(Driver Program)讀取最終結果。
2.2.1、系統累加器
Spark內置了三種類型的Accumulator,分別是LongAccumulator用來累加整數型,DoubleAccumulator用來累加浮點型,CollectionAccumulator用來累加集合元素。
代碼示例:
LongAccumulator longAccumulator = sc.sc().longAccumulator();// LongAccumulator: 數值型累加DoubleAccumulator doubleAccumulator = sc.sc().doubleAccumulator();// DoubleAccumulator: 小數型累加CollectionAccumulator<Integer> collectionAccumulator = sc.sc().collectionAccumulator();// CollectionAccumulator:集合累加List<Integer> integers = parallelize.map((Function<Integer, Integer>) v1 -> {longAccumulator.add(v1);doubleAccumulator.add(v1);collectionAccumulator.add(v1);return v1;}).collect();System.out.println(collectionAccumulator.value());//driver端獲取累加值System.out.println(longAccumulator.value());//driver端獲取累加值System.out.println(doubleAccumulator.value());//driver端獲取累加值
2.2.2、自定義累加器
當內置的Accumulator無法滿足要求時,可以繼承AccumulatorV2實現自定義的累加器。
代碼示例:
1. 繼承AccumulatorV2,實現相關方法
class BigIntegerAccumulator extends AccumulatorV2<BigInteger, BigInteger> {private BigInteger num = BigInteger.ZERO;public BigIntegerAccumulator() {}public BigIntegerAccumulator(BigInteger num) {this.num = new BigInteger(num.toString());}//檢查當前值是否為零@Overridepublic boolean isZero() {return num.compareTo(BigInteger.ZERO) == 0;}//創建當前累加器的副本@Overridepublic AccumulatorV2<BigInteger, BigInteger> copy() {return new BigIntegerAccumulator(num);}//重置累加器的值(通常用于重新開始累加)@Overridepublic void reset() {num = BigInteger.ZERO;}//向累加器添加數值@Overridepublic void add(BigInteger num) {this.num = this.num.add(num);}//將當前值與本地值相加,并更新本地累加器的值@Overridepublic void merge(AccumulatorV2<BigInteger, BigInteger> other) {num = num.add(other.value());}//返回當前累加器的值,僅在Driver端可用@Overridepublic BigInteger value() {return num;}}
2. 創建自定義Accumulator的實例,然后在SparkContext上注冊它
// 直接new自定義的累加器BigIntegerAccumulator bigIntegerAccumulator = new BigIntegerAccumulator();// 然后在SparkContext上注冊一下sparkContext.register(bigIntegerAccumulator, "bigIntegerAccumulator");
2.3、廣播變量
廣播變量:分布式共享只讀變量。
廣播變量用來高效分發較大的對象。向所有工作節點發送一個較大的只讀值,以供一個或多個Spark Task操作使用。
代碼示例:
final JavaRDD<String> rdd = sparkContext.parallelize(Arrays.asList("Hello", "Spark", "Hadoop", "Flink", "Spark", "Hadoop"));List<String> okList = Arrays.asList("Spark", "Hadoop");//假設一個大對象final Broadcast<List<String>> broadcast = sparkContext.broadcast(okList);//構建一個廣播變量final JavaRDD<String> filterRDD = rdd.filter(s -> broadcast.value().contains(s)//broadcast.value()拉取廣播變量的值);
3、Spark SQL模塊
3.1、概述
Spark SQL是用于結構化數據處理的Spark模塊。與基本的Spark RDD API不同,Spark SQL提供的接口為Spark提供了有關數據結構和正在執行的計算的更多信息。在內部,Spark SQL使用這些額外的信息來執行額外的優化。與Spark SQL交互的方式有多種,包括SQL和Dataset API。計算結果時,使用相同的執行引擎,與您用于表達計算的API/語言無關。
3.2、Spark SQL特點
- 易整合
無縫的整合了SQL查詢和Spark編程。
Dataset<Row> ds = sparkSession.read().json("data/user.json");//將數據模型轉換成表,方便SQL的使用ds.createOrReplaceTempView("user");//使用SQL文的方式操作數據String sql = "select avg(age) from user";Dataset<Row> sqlDS = sparkSession.sql(sql);//展示數據模型的效果sqlDS.show();
- 統一的數據訪問方式
使用相同的方式連接不同的數據源。
Dataset<Row> ds = sparkSession.read().json("data/user.json");//將數據模型轉換成表,方便SQL的使用ds.createOrReplaceTempView("user");
-
兼容Hive
在已有的倉庫上直接運行SQL或者HQL。
SparkSession sparkSession = SparkSession.builder().enableHiveSupport() //啟用Hive的支持.master("local[*]").appName("SparkSQL").getOrCreate();sparkSession.sql("show tables").show();sparkSession.sql("create table user_info(name String,age bigint)");
-
標準的數據連接
通過JDBC或者ODBC來連接。
Dataset<Row> jdbc = sparkSession.read().jdbc("jdbc:mysql://localhost:3306/gmall", "activity_info", properties);//連接mysql數據庫
3.3、Spark SQL發展歷程
RDD(Spark1.0)=》Dataframe(Spark1.3)=》Dataset(Spark1.6)
如果同樣的數據都給到這三個數據結構,他們分別計算之后,都會給出相同的結果。不同的是他們的執行效率和執行方式。在現在的版本中,Dataset性能最好,已經成為了唯一使用的接口。其中Dataframe已經在底層被看做是特殊泛型的DataSet<Row>。
RDD、DataFrame、DataSet三者的共性:
-
三者都是Spark平臺下的分布式彈性數據集,為處理超大型數據提供便利。
-
三者都有惰性機制,在進行創建、轉換,如map方法時,不會立即執行,只有在遇到Action行動算子如foreach時,三者才會開始遍歷運算。
-
三者有許多共同的函數,如filter,排序等。
-
三者都會根據Spark的內存情況自動緩存運算。
-
三者都有分區的概念。
3.4、Spark SQL編程
3.3.1、SparkSession
在老的版本中,SparkSQL提供兩種SQL查詢起始點:
-
一個叫SQLContext,用于Spark自己提供的SQL查詢;
-
一個叫HiveContext,用于連接Hive的查詢。
SparkSession是Spark最新的SQL查詢起始點,實質上是SQLContext和HiveContext的組合,所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。
SparkSession內部封裝了SparkContext,所以計算實際上是由SparkContext完成的。
//第一種方式SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");SparkContext sc = new SparkContext(conf);SparkSession sparkSession = new SparkSession(sc);//第二種方式SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").enableHiveSupport() // 關鍵:開啟Hive功能.getOrCreate();//第三種方式SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
3.3.3、Spark SQL入門
//構建環境對象SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate();//對接文件數據源時,會將文件中的一行數據封裝為Row對象Dataset<Row> ds = sparkSession.read().json("data/user.json");//可以轉換成RDD對象JavaRDD<Row> javaRDD = ds.javaRDD();//將數據模型轉換成臨時視圖,方便SQL的使用ds.createOrReplaceTempView("user");//使用SQL文的方式操作數據String sql = "select * from user";//執行sql查詢Dataset<Row> sqlDS = sparkSession.sql(sql);//展示數據模型的效果sqlDS.show();//DSL語法ds.select("*").show();//DSL語法ds.select("name","age").where(ds.col("name").equalTo("張三")).show();//釋放資源sparkSession.close();
3.3.4、數據的加載與保存
3.3.4.1、CSV文件
Dataset<Row> csv = sparkSession.read().option("header", "true") //配置是否第一行為表頭(列名).option("sep","_") //配置分隔符,默認是英文的逗號(,).csv("data/user.csv");csv.write().mode(SaveMode.Append)//追加模式,不影響原有文件。Overwrite:覆蓋原有文件;Ignore:有文件則不操作;ErrorIfExists:存在文件就報錯.option("header", "true") //配置第一行是否是表頭.csv("output");
3.3.4.2、JSON文件
Dataset<Row> json = sparkSession.read().json("data/user.json");json.write().json("output");
3.3.4.3、Parquet文件(列式存儲文件)
Dataset<Row> csv = sparkSession.read().json("data/user.json");csv.write().parquet("output");
3.3.4.4、對接MySQL
Properties properties = new Properties();properties.setProperty("user","root");properties.setProperty("password","000000");Dataset<Row> jdbc = sparkSession.read().jdbc("jdbc:mysql://hadoop102:3306/gmall?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true", "activity_info", properties);jdbc.show();
3.3.6、用戶自定義函數
3.3.6.1、UDF
用于處理單行數據并返回單個值。其輸入和輸出為1:1關系,即輸入一行數據后輸出單個結果。
sparkSession.udf().register("prefixName", new UDF1<String, String>() {@Overridepublic String call(String name) throws Exception {return "Name:" + name;}}, DataTypes.StringType);String sql = "select prefixName(name) from user";Dataset<Row> sqlDS = sparkSession.sql(sql);
3.3.6.2、UDAF
用于處理多行數據并返回單個聚合結果,通常用于統計、分組匯總等場景。
第一步:自定義UDAF函數
//自定義緩沖區的數據類型@Data@AllArgsConstructorpublic class AvgAgeBuffer implements Serializable {private Long total;private Long cnt;}//1. 創建自定義的【公共】類//2. 繼承 org.apache.spark.sql.expressions.Aggregator//3. 設定泛型// IN : 輸入數據類型// BUFF : 緩沖區的數據類型// OUT : 輸出數據類型//4. 重寫方法public class MyAvgAgeUDAF extends Aggregator<Long, AvgAgeBuffer, Long> {@Override// TODO 緩沖區的初始化操作public AvgAgeBuffer zero() {return new AvgAgeBuffer(0L, 0L);}@Override// TODO 將輸入的年齡和緩沖區的數據進行聚合操作public AvgAgeBuffer reduce(AvgAgeBuffer buffer, Long in) {buffer.setTotal(buffer.getTotal() + in);buffer.setCnt(buffer.getCnt() + 1);return buffer;}@Override// TODO 合并緩沖區的數據public AvgAgeBuffer merge(AvgAgeBuffer b1, AvgAgeBuffer b2) {b1.setTotal(b1.getTotal() + b2.getTotal());b1.setCnt(b1.getCnt() + b2.getCnt());return b1;}@Override// TODO 計算最終結果public Long finish(AvgAgeBuffer buffer) {return buffer.getTotal() / buffer.getCnt();}@Overridepublic Encoder<AvgAgeBuffer> bufferEncoder() {return Encoders.bean(AvgAgeBuffer.class);}@Overridepublic Encoder<Long> outputEncoder() {return Encoders.LONG();}}
第二步:使用
sparkSession.udf().register("avgAge", udaf(new MyAvgAgeUDAF(), Encoders.LONG()));String sql = "select avgAge(age) from user";Dataset<Row> sqlDS = sparkSession.sql(sql);