Spark Core
概念
前言
批處理(有界數據)
? 對靜態的、有限的數據集進行一次性處理,數據通常按固定周期(如每小時、每天)收集后統一計算。
特點:
- 高吞吐量,適合大規模數據。
- 高延遲(數據需積累到一定量才觸發計算)。
- 典型場景:離線報表生成、歷史數據分析(如T+1的統計報表)。
流處理(無界數據)
? 對動態的、無限的數據流進行實時處理,數據產生后立即處理(逐條或微批次)。
特點:
- 低延遲(毫秒級到秒級響應)。
- 處理狀態化(如窗口聚合、事件時間處理)。
- 典型場景:實時監控、欺詐檢測、實時推薦。
流批一體
? 用同一套API或框架同時處理批數據和流數據,底層實現邏輯統一(如將批視為流的特例)。
特點:
- 開發效率高:無需維護兩套代碼。
- 一致性保證:流和批的結果邏輯相同(如實時和離線統計口徑一致)。
簡介
? 是專門為大規模數據處理而設計的快速通用的計算引擎,是一種類似 Hadoop MapReduce
的通用并行計算框架。
和MapReduce的對比
MapReduce | Spark | |
---|---|---|
編程模型 | Map 和 Reduce | 不局限于 Map 和 Reduce,還提供多種數據集操作類型 |
運算效率 | 每次迭代都要向磁盤寫入和讀取,中間數據 I/O 開銷大,效率低 | 中間結果直接存放到內存,更高的迭代運算效率 |
調度機制 | N/A | 基于 DAG 的任務調度,執行機制更優 |
模塊
模塊 | 作用 |
---|---|
Spark Core | 最基礎與最核心的功能。用于離線數據處理,批量計算。 |
Spark SQL | 交互式查詢。用于操作結構化數據,通過 SQL 或者 Hive 的 HQL 來查詢數據。 |
Spark Streaming | 準實時流式計算。 |
Spark MLlib | 機器學習。 |
Spark GraphX | 圖計算。 |
簡單入門
創建MAVEN項目,并引入依賴
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-yarn_2.12</artifactId><version>3.3.2</version><scope>provided</scope></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.12</artifactId><version>2.7.0</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.12</artifactId><version>2.7.0</version></dependency></dependencies>
創建Scala單例對象,并實現
package com.mrhelloworld.wordcountimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount01Demo {def main(args: Array[String]): Unit = {// ==================== 建立連接 ====================// 初始化配置對象val conf = new SparkConf()// 設置運行模式與 AppNameconf.setMaster("local").setAppName("WordCount")// 根據配置對象初始化上下文對象val sc = new SparkContext(conf)// ==================== 業務處理 ====================// 讀取文件,按行讀取val lines: RDD[String] = sc.textFile("data/wordcount")// 按空格拆分每一行數據,拆分為一個一個的單詞val words: RDD[String] = lines.flatMap(w => w.split("\\s+"))// 將數據根據單詞進行分組,便于統計val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(w => w)// 對分組后的數據進行統計val wordCount: RDD[(String, Int)] = wordGroup.map(kv => (kv._1, kv._2.size))// 從結果集中獲取指定條數的數據val wordCountTopN: Array[(String, Int)] = wordCount.take(10)// 將結果打印在控制臺wordCountTopN.foreach(println)// 簡寫方式lines.flatMap(_.split("\\s+")).groupBy(w => w).map(kv => (kv._1, kv._2.size)).take(10).foreach(println)// ==================== 關閉連接 ====================if (!sc.isStopped) sc.stop()}
}
運行架構
主要角色
- 資源層面的主從
- Master,負責管理與分配整個集群中的資源(CPU Core 和 Memory)
- Worker,負責接收資源并執行作業中的任務
- 作業層面的主從
- Driver,負責管理整個集群中的作業任務調度
- Executor,負責執行具體的任務
注意:無論什么運行模式都會存在這些角色,只是在不同的運行模式下,這些角色的分布會有所不同
通用運行架構
注意:一個App -> 一個或多個Job -> 一個或多個Stage -**> ** 一個或多個Task:Task是由分區數決定的
運行流程
- 啟動集群后,Worker 節點會向 Master 節點心跳匯報資源(CPU Core 和 Memory)情況;
- Client 提交 Application,根據不同的運行模式在不同的位置創建 Driver 進程;
- SparkContext 連接到 Master,向 Master 注冊應用并申請資源(Executor 的 CPU Core 和 Memory);
- Master 根據 SparkContext 的資源申請并根據 Worker 心跳周期內報告的信息決定在哪個 Worker 上分配資源,也就是Executor;
- Worker 節點創建 Executor 進程,Executor 向 Driver 進行反向注冊;
- 資源滿足后(Executor 注冊完畢),SparkContext 解析 Applicaiton 代碼,創建 RDD,構建 DAG,并提交給DAGScheduler 分解成 Stage(當碰到 Action 算子時,就會催生 Job,每個 Job 中含有 1 個或多個 Stage),然后將Stage(或者稱TaskSet)提交給 TaskScheduler,TaskScheduler 負責將 Task 分配到相應的Worker,最后提交給Executor 執行(發送到 Executor 的線程池中);
- 每個 Executor 會持有一個線程池,Executor 通過啟動多個線程(Task)來對 RDD 的 Partition 進行并行計算,并向SparkContext 報告,直至 Task 完成。
DAG:有向無環圖,將作業的調度順序規定好,不會出現回頭執行相同任務或階段的可能,類似于找一條路線一次發試卷,不要出現走一樣的路的情況
運行模式
本地模式
? Local 模式,就是不需要其他任何節點資源就可以在本地執行 Spark 代碼的環境,一般用于教學,調試,演示等。本地模式就是一個獨立的進程,通過其內部的多個線程來模擬整個 Spark 運行時環境,每個線程代表一個 Worker。
? 本地模式,在運行時也有不同的模式:
模式 | 特點 |
---|---|
local | 只有一個工作進程,無并行計算能力 |
local[N] | N 個工作進程,通常設置 N 為機器的 CPU 數量 |
local[*] | 工作進程數量等于機器的 CPU 核心數量 |
local[N, M] | 指定線程個數以及失敗重試次數,失敗重試次數為 M(最多重試M-1次) |
local[*, M] | 指定線程個數以及失敗重試次數,失敗重試次數為 M(最多重試M-1次) |
local-cluster [numSlaves,coresPerSlave,memeoryPerySlave] | 本地偽分布式集群 |
numSlaves :模擬集群的 Worker 節點個數;
coresPerSlave :模擬集群的各個 Worker 節點上的內核數;
memoryPerSlave :模擬集群的各個 Worker 節點上的內存大小,單位 M。
Standalone 獨立模式
? Spark 自身提供的集群模式,也就是所謂的 Standalone 獨立模式。Spark 的獨立模式體現了經典的 Master-Worker 模式
? 在搭建時的不同模式:
模式 | 特點 |
---|---|
Standalone-Single | Master 單節點模式,如果 Master 掛了就無法提交應用程序 |
Standalone-HA | Master 高可用模式。可以使用 FileSystem (文件系統)或 ZooKeeper (分布式協調服務)來實現主備切換 |
優點
- 獨立模式屬于自己獨立一套集群(Client/Master/Worker),是 Spark 原生的集群管理器,自帶完整的服務,可單獨部
署,無需依賴任何其他資源管理系統 - 使用 Standalone 可以很方便地搭建一個集群
缺點
? 資源不利于充分利用
YARN 模式
? Spark 可以基于 YARN 來計算(將 Spark 應用提交到 YARN 上運行),Spark 客戶端直接連接 YARN,不需要額外構建 Spark 集群Spark 中的各個角色運行在 YARN 的容器內部,并組成Spark 集群環境。
? 在 Spark On Yarn 模式下,Executor 進程名稱為 CoarseGrainedExecutorBackend。一個 CoarseGrainedExecutorBackend
有且僅有一個 Executor 對象, 負責將 Task 包裝成 taskRunner,并從線程池中抽取一個空閑線程運行 Task,每一個
CoarseGrainedExecutorBackend 能并行運行 Task 的數量取決于分配給它的 CPU 個數。
Client模式和Cluster模式
? Client模式和Cluster模式是在上述模式的基礎上,運行時的不同模式(不包括普通的本地模式)
模式 | 區別 |
---|---|
Client模式 | 客戶端在提交任務后,不能離開,并會將最后的結果返回給到客戶端,提交任務的節點就是Driver所在的節點 |
Cluster模式 | 客戶端在提交任務后,將直接離開,由Master決定Driver所在節點,Driver管理任務的完成,并將任務結果打印在日志中,結果只能在日志中查看(Yarn中,Driver 在AplicationMaster中) |
核心編程
RDD
概念和特性
概念:
? RDD 是 Resilient Distributed Dataset 的縮寫,意思為彈性分布式數據集(一種數據結構),是一個讀取分區記錄的集合,是 Spark 對需要處理的數據的基本抽象。源碼中是一個抽象類,代表一系列彈性的、不可變、可分區、里面元素可并行計算的集合。
注意:RDD中存放的是數據的讀取邏輯和數據的計算邏輯,并發給Executor。
特性
- 彈性
- 彈性存儲:內存與磁盤自動切換;
- 彈性容錯:數據丟失可以自動恢復;
- 彈性計算:計算出錯重試機制;
- 彈性分片:可根據需求重新分片。
- 分布式:數據存儲在大數據集群不同的節點上;
- 數據集:RDD 只是封裝了計算邏輯,并不保存數據;
- 數據抽象:RDD 是一個抽象類,需要子類具體實現;
- 不可變:RDD 封裝了計算邏輯,是不可改變的,想要改變,只能產生新的 RDD,在新的 RDD 中封裝新的計算邏輯;
- 可分區:RDD 是一種分布式的數據集,由于數據量很大,因此計算時要被切分并存儲在各個結點的分區當中;
- 并行計算:一個分區對應一個任務。分區是 Spark 計算任務的基本處理單位,決定了并行計算的粒度。
- 依賴關系:如果某個 RDD 丟失了,則可以根據血緣關系,從父 RDD 計算得來。
- 惰性執行:Spark 對于 Transformation 轉換算子采用惰性計算機制,遇到 Transformation 時并不會立即計算結果,而是要等遇到 Action 行動算子時才會一起執行。
創建RDD
通過集合創建RDD(通過內存)
def main(args: Array[String]): Unit = {// 建立連接val sc = new SparkContext("local[*]", "CreateRDD")// 通過內存創建 RDDval list = List(1, 2, 3, 4, 5)// parallelize 表示并行度,為了方便使用可以調用 makeRDD//val rdd: RDD[Int] = sc.parallelize(list)// 通過源碼得知 makeRDD 內部調用了 parallelizeval rdd: RDD[Int] = sc.makeRDD(list)rdd.foreach(println)// 關閉連接if (!sc.isStopped) sc.stop()}
通過文件創建RDD
def main(args: Array[String]): Unit = {// 建立連接val sc = new SparkContext("local[*]", "CreateRDD")// path 可以是具體的文件,也可以是目錄,當 path 是目錄時,則獲取目錄下所有文件val rdd01: RDD[String] = sc.textFile("data/test.txt")rdd01.foreach(println)val rdd02: RDD[String] = sc.textFile("data/wordcount")rdd02.foreach(println)// 還支持通配符匹配val rdd03: RDD[String] = sc.textFile("data/wordcount/wd1*.txt")rdd03.foreach(println)// 訪問的節點必須為 Active NameNode,也就是說 node01 必須是 Active NameNodeval rdd04: RDD[String] = sc.textFile("hdfs://node02:8020/bd/wordcount")rdd04.foreach(println)// textFile 是按行讀取,wholeTextFiles 是按整個文件讀取// 返回格式為元組:(文件地址+文件名, 文件內容)val rdd05: RDD[(String, String)] = sc.wholeTextFiles("data/test.txt")rdd05.foreach(println)val rdd06: RDD[(String, String)] = sc.wholeTextFiles("hdfs://node02:8020/bd/wordcount")rdd06.foreach(println)// 關閉連接if (!sc.isStopped) sc.stop()}
分區
? Spark RDD 是一種分布式的數據集,由于數據量很大,因此計算時要被切分并存儲在各個結點的分區當中
具體分區方法 | ||
---|---|---|
集合的分區處理 | makeRDD | 通過設置numSlices 參數來設置分區數,不傳遞時將使用默認值defaultParallelism ,該默認值表示將按當前運行環境的最大可用 CPU 核數進行分區 |
文件的分區處理 | textFile | 讀取文件數據時,數據會按照 Hadoop 文件讀取的規則進行分區 |
支持重分區的算子 | \ | 在Spark中有一些算子也可以支持數據的重新分區,詳見算子 |
分區器的分區處理 | HashPartitioner RangePartitioner等繼承Partitioner的自定義類 | 通過繼承Partitioner來自己定義實現不同的分區方式 |
算子
? 算子的本質就是函數,不同的RDD之間的依賴關系,是一個函數空間到另一個函數空間的映射。
轉換算子
? 轉換往往是從一個 RDD 到另一個 RDD 的計算。在執行應用的程序時,遇到轉換算子,并不會立即觸發計算操作,而是延時到遇到 Action 算子時才會操作。
單 Value的轉換算子
算子 | 作用 | 語法 | 例如 |
---|---|---|---|
map | 將處理的數據逐條進行映射轉換,將返回值構成新的 RDD | def map[U: ClassTag](f: T => U): RDD[U] | |
mapPartitions | 以分區為單位進行數據轉換操作,將整個分區的數據加載到內存 | def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] | |
mapPartitionsWithIndex | mapPartitionsWithIndex 跟 mapPartitions 差不多,只是參數多了個分區號 | def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] | |
flatMap | 首先將函數作用于集合中的每個元素,然后將結果展平,返回新的集合 | def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] | |
glom | 將同一個分區的多個單個數據直接轉換為相同類型的單個數組進行處理 | def glom(): RDD[Array[T]] | |
groupBy(shuffle) | 將數據按指定條件進行分組,從而方便我們對數據進行統計分析 | def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] | |
filter | 指過濾出符合一定條件的元素 | def filter(f: T => Boolean): RDD[T] | |
sample | 是從大量的數據中獲取少量的數據 | def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T] | |
distinct(shuffle) | 將數據集中重復的數據去重 | def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] |
Shuffer:
sample采樣的策略:
withReplacement:表示抽出樣本數據后是否再放回去,true 表示會放回去,這也就意味著抽出的樣本可能有重復;
fraction:
- 抽取不放回的情況下為每個元素被抽取的概率,在 0 ~ 1 之間,e.g. 0.3 表示抽出 30%;1 全取,0 全不取;
- 抽取放回的情況下為每個元素可能被抽取的次數,e.g. 3 表示可能會抽取 3 次,當然也有可能抽取 1 次或者 2 次,或者 1 次都抽不到或者抽到更多次。
- seed:表示隨機種子可以將這個參數設置為定值。種子相同時,指定相同的 fraction 每次抽樣的數據也會相同,如果此時出現數據不相同的情況那就是數據出問題了
coalesce 函數除了可以縮減分區數,還可以擴增分區數,但是擴增分區數時一定要設置 shuffle 為 true
雙 Value的轉換算子
算子 | 作用 | 語法 | 例如 |
---|---|---|---|
union | 對兩個 RDD 進行并集(合并)操作,且不去重 | rdd1.union(rdd2) | |
intersection | 表示對兩個 RDD 取交集(相同) | rdd1.intersection(rdd2) | |
subtract | 表示對兩個 RDD 取差集(不同) | rdd1.subtract(rdd2) | |
cartesian | 表示對兩個 RDD 進行笛卡爾積操作 | rdd1.cartesian(rdd2) | |
zip | 將兩個 RDD 合并成一個 RDD | ``rdd1.zip(rdd2)` |
zip:兩個 RDD 的 Partition 數量以及元素數量都必須相同,否則會拋出異常
轉換算子(Key-Value)
算子 | 作用 | 語法 | 例如 |
---|---|---|---|
partitionBy(shuffle) | 按照指定分區數重新進行分區 | def partitionBy(partitioner: Partitioner): RDD[(K, V)] | mkRDD.partitionBy(new HashPartitioner(2)) |
sortByKey(shuffle) | 將 K, V 格式數據的 Key 根據指定的規則進行排序,默認為升序 | def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] | mkRDD.sortByKey() |
reduceByKey(shuffle) | 將相同 Key 的值聚合到一起 | def reduceByKey(func: (V, V) => V): RDD[(K, V)] def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] | mkRDD.reduceByKey(_ + _) |
groupByKey(shuffle) | 按 K, V 格式數據的 Key 進行分組,會返回 (K, Iterable[V]) 格式數據 | def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] | mkRDD.groupByKey() |
aggregateByKey(shuffle) | 按 K, V 格式數據的 Key 進行分組,可以實現分區內和分區間不同的計算邏輯 | def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,<br/> combOp: (U, U) => U): RDD[(K, U)] | mkRDD.aggregateByKey(0)(math.max(_, _), _ + _) |
mapValues | 針對于 K, V 形式的數據只對 V 進行操作 | def mapValues[U](f: V => U): RDD[(K, U)] | aggregateByKeyRDD.mapValues(t => t._1 / t._2) |
foldByKey(shuffle) | 當分區內和分區間計算邏輯相同時使用 | def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] | mkRDD.foldByKey(0)(_ + _) |
combineByKey(shuffle) | 第一個參數是給每個分區的第一個 Key 的 Value 一個初始值規則 | def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null): RDD[(K, C)] | mkRDD.combineByKey( (_, 1),(t, v) => (t._1 + v, t._2 + 1),(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2) |
join | 在類型為 K, V 和 K, W 的 RDD 上調用,返回一個相同 Key 對應的所有元素對在一起的 K, (V, W) 的 RDD | def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] | rdd1.join(rdd2) |
分區器Partitioner:
- HashPartitioner
- RangePartitioner
- 繼承Partitioner的自定義分區器
五種聚合算子(上表中加粗的算子)最終都會調用到 combineByKeyWithClassTag 這個函數:
行動算子
? 一個行動往往代表一種輸出到外部系統的操作。在執行應用的程序時,遇到行動算子,會立即產生一個 Job,對已有的 RDD 中的數據執行計算后產生結果,將結果返回 Driver 程序或寫入到外部物理存儲。
算子 | 作用 | 語法 | 例如 |
---|---|---|---|
reduce | 通過函數聚合 RDD 中的所有元素,先聚合分區內數據,再聚合分區間數據 | def reduce(f: (T, T) => T): T | mkRDD.reduce(_ + _) |
count | 返回 RDD 中元素的個數 | def count(): Long | mkRDD.count() |
take | 返回 RDD 的前 n 個元素組成的數組 | def take(num: Int): Array[T] | mkRDD.take(3) |
takeOrdered | 返回 RDD 排序后的前 n 個元素組成的數組,默認正序 | def take(num: Int): Array[T] | ```mkRDD.takeOrdered(3)(Ordering.Int.reverse)` |
first | 返回 RDD 中的第一個元素,底層就是 take(1) | def first(): T | mkRDD.first() |
foreach | 循環遍歷數據集中的每個元素,運行相應的計算邏輯(函數) | def foreach(f: T => Unit): Unit | mkRDD.foreach(println) |
foreachPartition | 按分區循環遍歷數據集中的每個元素,并運行相應的計算邏輯(函數) | def foreachPartition(f: Iterator[T] => Unit): Unit | mkRDD.foreachPartition(partitionOfRecords => println(s"分區:${TaskContext.getPartitionId()},記錄:${partitionOfRecords.toArray.mkString(",")}")) |
collect | 將不同分區的數據收集到 Driver 并以數組的形式返回數據 | def collect(): T | mkRDD.collect() |
aggregate | 將每個分區里面的元素通過 seqOp 函數和初始值進行聚合,然后用 combOp 函數將每個分區的結果和初始值(zeroValue)進行 combine 操作 | def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U | mkRDD.aggregate(10)(_ + _, _ + _) |
fold | 當 aggregate 的分區內和分區間計算邏輯相同時,Spark 為了讓程序員更方便的使用,提供了 fold 算子 | def fold(zeroValue: T)(op: (T, T) => T): T | mkRDD.fold(10)(_ + _) |
countByKey | 針對 K, V 類型的 RDD,返回一個 K, Int 的 map,表示每一個 Key 對應的元素個數,countByKey 底層調用的是 reduceByKey | def countByKey(): Map[K, Long] = self.withScope {self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap} | mkRDD.map(_ -> 1).countByKey() |
countByValue | 針對 K, V 類型的 RDD,返回一個 K, Int 的 map,表示每一個 Key 對應的元素個數countByValue 底層調用的是map().countByKey() | def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {map(value => (value, null)).countByKey()} | mkRDD.countByValue() |
save 系列:
- saveAsTextFile:將數據集的元素以 TextFile 的形式保存到 HDFS 文件系統或者其他文件系統
- saveAsSequenceFile:將數據集中的元素以 Hadoop SequenceFile 的格式保存到 HDFS 文件系統或者其他文件系統
- saveAsObjectFile:將 RDD 中的元素序列化成對象,存儲到文件中
控制算子
? 可以將 RDD 持久化,持久化的單位是 Partition。
算子 | 作用 | 語法 | 例如 |
---|---|---|---|
cache | 保存到內存,效率高,數據不安全容易丟失 | def cache(): T | mkRDD.cache() |
persist | 保存到磁盤(臨時文件,作業結束后會刪除),效率低,數據安全 | def cache(StorageLevel.xxx): T | mkRDD.persist(StorageLevel.MEMORY_AND_DISK) |
checkpoint | 保存到磁盤(永久保存,一般存儲在分布式文件系統中,例如 HDFS),效率低,數據安全 | \ | 核心代碼:sparkContext.setCheckpointDir("hdfs://node02:8020/yjx/cp")<br />rdd3.checkpoint() |
注意:
- 當我們沒有對RDD持久化及沒有使用控制算子時,我們使用行動算子時會自動從數據源開始再進行一次全部邏輯
- checkpoint 可以理解為改變了數據源,因為關鍵數據已經計算完成,沒有必要重頭進行讀取,所以 checkpoint
算子不僅能將 RDD 持久化到磁盤,還能切斷 RDD 之間的依賴關系
閉包檢測
閉包的概念
? Spark Job 產生的任務會并行執行,也就是說會分發給多個 Executor 去執行。那么從計算的角度,算子以外的代碼都是在 Driver 端執行,算子里面的代碼都是在 Executor 端執行,這樣就會導致算子內可能會用到算子外的
數據,就會形成閉包的效果。
? 而算子外的數據想要被 Executor 端的任務使用,就必須先序列化然后通過網絡 IO 進行傳輸,此時如果算子外的數據
無法序列化,就會發生錯誤。所以為了降低程序出錯的可能性,Spark 的算子增加了閉包檢查功能,會在執行計算任務
前,檢測閉包內的對象是否可以進行序列化。
常見問題
算子外的數據無法序列化會出現以上問題
解決方案
- 繼承 scala.Serializable
- 使用樣例類(推薦),樣例類默認繼承了 Serializable
Kryo 序列化框架
- Spark 內部默認使用的序列化框架 Kryo。
- 由于 Java 自身的序列化比較重(字節多),所以出于性能的考慮,Spark 2.0 開始支持另外一種序列化機制 Kryo。
- Kryo 的速度是 Serializable 的 10 倍。當 RDD 在 Shuffle 數據的時候,簡單數據類型、數組和字符串類型已經在 Spark
內部使用 Kryo 來序列化。 - 使用 Kryo 序列化,也需要繼承 Serializable 接口。
- 配置 Kryo 序列化方式如下:
val conf: SparkConf = new SparkConf()
.setMaster("local[*]").setAppName("SerializDemo")
// 設置 Kryo 序列化器
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注冊 Kryo 序列化器
.registerKryoClasses(Array(classOf[User]))
血統和依賴關系
血統
? Spark 根據用戶 Application 中的 RDD 的轉換算子和行動算子,會生成 RDD 之間的依賴關系,多個 RDD 之間的關系又
形成一條關系鏈叫做 RDD 的血統(Lineage)。
依賴關系
窄依賴(Narrow Dependency)
? 窄依賴指的是父 RDD 和子 RDD 的 Partition 之間的關系是一對一的(獨生子女)。
寬依賴(Wide Dependency)
? 寬依賴指的是父 RDD 與子 RDD 的 Partition 之間的關系是一對多(多胎),寬依賴會有 Shuffle 的產生。
通常情況下,產生了Shuffer就是寬依賴。
Stage階段
· 每個 Job 會被拆分成多個 Task,作為一個 TaskSet 任務集,其名稱為 Stage,Stage 的劃分和調度是由 DAGScheduler
來負責的。Stage 的切割規則為:從后往前,遇到寬依賴就切割 Stage 。
? 總結:
- 從后向前推理,遇到寬依賴就斷開,遇到窄依賴就把當前的 RDD 加入到 Stage 中;
- 默認情況下每個 Stage 里面的 Task 的數量是由該 Stage 中最后一個 RDD 的 Partition 數量決定的(一個 Partition 對應一
個 Task); - 最后一個 Stage(ResultStage) 里面的任務類型是 ResultTask,前面所有其他 Stage(ShuffleMapStage) 里面的任務類型都是
ShuffleMapTask; - 代表當前 Stage 的算子一定是該 Stage 的最后一個計算步驟。
Job作業
? Job 包含了多個 Task 組成的并行計算,往往由 Spark Action 算子觸發生成, 一個 Application 中往往會產生多個
Job。一個Job 包含 N 個 Transformation 算子和 1 個 Action 算子。
總結:
- Application:初始化一個 SparkContext 即生成一個 Application;
- Job 是 Application 的子集,以 Spark Action 算子為界,遇到一個 Action 算子就觸發一個 Job;
- Stage 是 Job 的子集,以 RDD 寬依賴(即 Shuffle)為界,遇到 Shuffle 就做一次劃分;
- Task 是 Stage 的子集,以并行度(分區數)來衡量,分區數是多少,就有多少個 Task。
提示:Spark 中所謂的并行度是指 RDD 中的分區數,即 RDD 中的 Task 數。
注意:Application → Job → Stage → Task,每一層都是 1 對 N 的關系。
數據本地化
數據本地化級別:
PROCESS_LOCAL
:進程本地化,性能最好。指代碼和數據在同一個進程中,也就是同一個 Executor 中;計算數據的
Task 由 Executor 執行,此時數據在 Executor 的 BlockManager 中;NODE_LOCAL
:節點本地化。代碼和數據在同一個節點中,數據存儲在節點的 HDFS Block,Task 在節點的某個
Executror 執行;或者數據和 Task 在同一個節點不同的 Executor 中,數據需要跨進程傳輸;NO_PREF
:沒有最佳位置這一說,數據從哪里訪問都一樣,不需要位置優先,比如 SparkSQL 直接讀取 MySQL;RACK_LOCAL
:機架本地化。數據和 Task 在一個機架的兩個節點上,數據需要通過網絡在節點之間進行傳輸;ANY
:數據和 Task 可能在集群中的任何地方,而且不在一個機架中,性能最差。
Process Locality:相關參數 spark.locality.wait 默認值是 3s。Task 任務分配的時候,先是按照 PROCESS_LOCAL
方式去分配 Task,如果 PROCESS_LOCAL 不滿足,默認等待 3 秒,看能不能按照這個級別去分配,如果等了 3 秒也實現不
了,那么就按 NODE_LOCAL 級別去分配 ,以此類推。// 全局設置,默認 3s
spark.locality.wait=3s
// 建議 60s
spark.locality.wait.process=60s
// 建議 30s
spark.locality.wait.node=30s
// 建議 20s
spark.locality.wait.rack=20s
廣播變量(分布式共享只讀變量)
? Spark 提供了廣播變量,一種分布式共享只讀變量,使用了廣播變量存儲 regex 后,Executor 端
就只會存儲一份該數據供多個 Task 使用,為了防止數據被修改,所以是只讀變量。
工作流程
- 廣播變量初始的時候在 Drvier 端會有一份副本
- Task 在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的 Executor 對應的 BlockManager 中,嘗試獲取變量副本
- 如果本地沒有,那么就從 Driver 端遠程拉取變量副本,并保存在本地的 Executor 對應的 BlockManager 中
- 此后這個 Executor 上的其他 Task 都會直接使用本地的BlockManager 中的副本
- Executor 的 BlockManager 除了從 Driver 端上拉取,也能從其他節點的 BlockManager 上拉取變量副本,距離越近越好
優點
- 不是每個 Task 一份變量副本,而是每個節點的 Executor 一份副本,可以讓變量產生的副本數大大減少
- 變量一旦被定義為一個廣播變量,那么這個變量只能讀取,不能修改
注意:
- 不能將RDD廣播出去, RDD 是不存儲數據的。可以將 RDD 的結果廣播出去。
- 廣播變量只能在 Driver 端定義,不能在 Executor 端定義。
- 在 Driver 端可以修改廣播變量的值,在 Executor 端無法修改廣播變量的值
- 如果 Executor 端用到了 Driver 端的變量,不使用廣播變量,在 Executor 有多少 Task 就有多少 Driver 端的變量副本
- 如果 Executor 端用到了 Driver 端的變量,使用廣播變量,在每個 Executor 中只有一份 Driver 端的變量副本
累加器(分布式共享只寫變量)
? 累加器是 Spark 提供的一種分布式共享只寫變量,主要用于在分布式計算中高效地執行全局聚合操作。
累加器在 Driver 端定義并賦初始值,累加器只能在 Driver 端讀取最后的值,在 Excutor 端更新。
自定義累加器
? Spark 還支持自定義累加器,只需要繼承 AccumulatorV2 即可。
簡單示例:
package com.yjxxt.accumulator
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
object WordCountAccumulatorDemo {def main(args: Array[String]): Unit = {// 建立連接val sc = new SparkContext("local[*]", "OnlineCountDemo")// 初始化自定義累加器val wordCountAccumulator = new WordCountAccumulator// 注冊累加器sc.register(wordCountAccumulator, "wordCountAccumulator")// 創建 RDDval rdd: RDD[String] = sc.textFile("data/wordcount", 2)// 自定義累加器 實現 WordCountrdd.flatMap(_.split("\\s+")).foreach(wordCountAccumulator.add)// 獲取累加器println(wordCountAccumulator.value)// 關閉連接if (!sc.isStopped) sc.stop()}/*** 自定義累加器*/class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {// 定義可變 Map 存放 Word 和 Count(這個就是自定義的累加器)private var wd = mutable.Map[String, Long]()// 累加器是否為零(空)override def isZero: Boolean = this.wd.isEmpty// 拷貝新的累加器override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new WordCountAccumulator// 重置累加器override def reset(): Unit = this.wd.clear()// 累加器相加override def add(word: String): Unit = {val newCount = wd.getOrElse(word, 0L) + 1Lwd.update(word, newCount)}// 累加器合并override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {val m1 = this.wdval m2 = other.valuem2.foreach {case (word, count) => {val newCount = m1.getOrElse(word, 0L) + countm1.update(word, newCount)}}}// 獲取累加器
override def value: mutable.Map[String, Long] = wd}
}
內存管理 (同一內存管理)
儲存內存和執行內存**共享同一塊空間**,可以動態**借用對方的空閑區域**。
內存分配
其中最重要的優化在于動態借用機制,其規則如下:
- 設定基本的存儲內存大小和執行內存大小( spark.memory.storageFraction 參數,默認為 0.5,即各占一半),
該設定確定了雙方各自擁有的空間范圍; - 雙方空間都不足時,則存儲到硬盤;若己方空間不足而對方空間空余時,可借用對方的空間(存儲空間不足是指不足
以放下一個完整的 Block); - 執行內存的空間被對方占用后,可讓對方將占用的部分轉存到硬盤,然后“歸還”借用的空間;
- 存儲內存的空間被對方占用后,無法讓對方“歸還”,因為需要考慮 Shuffle 過程中的很多因素,實現起來較為復雜。
內存估算
? 內存具體估算公式如下:
- 估算 Other 內存 = 自定義數據結構大小 * 每個 Executor 核數;
- 估算 Storage 內存 = 廣播變量數據大小 + Cache / Executor 數量。例如 Cache 了 100G 數據,10 個 Executor,那么每個
Executor 分別存儲 10G 數據。在 Spark WEBUI 的 Storage 標簽頁可以直接查看所有的內存占用,大致就對應 Storage
Memory。 - 估算 Execution 內存 = 知道了 Storage Memory,Execution Memory 也就知道了,因為默認情況下 Execution Memory 和
Storage Memory 占用 Spark Memory 的比例是相同的。 - 估算 Executor 內存 = 每個 Executor 核數 *(數據集大小 / 并行度)。例如 100G 數據,并行度 200(Task),單個并行
度的數據量為 100G / 200 = 500M。然后 executor-cores 為 4,最終 Executor 內存最少需要 2G 內存。 - Task 被執行的并行度 = Executor 個數 * 每個 Executor 的 Core 核數。
內存配置
? 一般情況下,各個區域的內存比例保持默認值即可。如果需要更加精確的控制內存分配,可以按照如下思路:
- Unified 統一內存比例 spark.memory.fraction = (估算 Storage 內存 + 估算 Execution 內存)/(估算 Storage 內存 + 估算
Execution 內存 + 估算 Other 內存)。 - Storage/Execution 內存比例 spark.memory.storageFraction = (估算 Storage 內存)/(估算 Storage 內存 + 估算
Execution 內存)。
配置參數說明:
- Unified 統一內存比例:spark.memory.fraction,默認為 0.6;
- Storage/Execution 內存比例:spark.memory.storageFraction,默認為 0.5。
? 代入公式計算:
- Storage 內存 = (spark.executor.memory - 300MB) * spark.memory.fraction * spark.memory.storageFraction
- Execution 內存 = (spark.executor.memdry - 300MB) * spark.memory.fraction * (1 - spark.memory.storageFraction)
4,最終 Executor 內存最少需要 2G 內存。 - Task 被執行的并行度 = Executor 個數 * 每個 Executor 的 Core 核數。
內存配置
? 一般情況下,各個區域的內存比例保持默認值即可。如果需要更加精確的控制內存分配,可以按照如下思路:
- Unified 統一內存比例 spark.memory.fraction = (估算 Storage 內存 + 估算 Execution 內存)/(估算 Storage 內存 + 估算
Execution 內存 + 估算 Other 內存)。 - Storage/Execution 內存比例 spark.memory.storageFraction = (估算 Storage 內存)/(估算 Storage 內存 + 估算
Execution 內存)。
配置參數說明:
- Unified 統一內存比例:spark.memory.fraction,默認為 0.6;
- Storage/Execution 內存比例:spark.memory.storageFraction,默認為 0.5。
? 代入公式計算:
- Storage 內存 = (spark.executor.memory - 300MB) * spark.memory.fraction * spark.memory.storageFraction
- Execution 內存 = (spark.executor.memdry - 300MB) * spark.memory.fraction * (1 - spark.memory.storageFraction)