一篇文章,帶你玩轉SparkCore

Spark Core

概念

前言

批處理(有界數據)

? 對靜態的、有限的數據集進行一次性處理,數據通常按固定周期(如每小時、每天)收集后統一計算。

特點:

  • 高吞吐量,適合大規模數據。
  • 高延遲(數據需積累到一定量才觸發計算)。
  • 典型場景:離線報表生成、歷史數據分析(如T+1的統計報表)。
流處理(無界數據)

? 對動態的、無限的數據流進行實時處理,數據產生后立即處理(逐條或微批次)。

特點:

  • 低延遲(毫秒級到秒級響應)。
  • 處理狀態化(如窗口聚合、事件時間處理)。
  • 典型場景:實時監控、欺詐檢測、實時推薦。
流批一體

? 用同一套API或框架同時處理批數據和流數據,底層實現邏輯統一(如將批視為流的特例)。

特點:

  • 開發效率高:無需維護兩套代碼。
  • 一致性保證:流和批的結果邏輯相同(如實時和離線統計口徑一致)。

簡介

? 是專門為大規模數據處理而設計的快速通用的計算引擎,是一種類似 Hadoop MapReduce 的通用并行計算框架。

和MapReduce的對比

MapReduceSpark
編程模型Map 和 Reduce不局限于 Map 和 Reduce,還提供多種數據集操作類型
運算效率每次迭代都要向磁盤寫入和讀取,中間數據 I/O 開銷大,效率低中間結果直接存放到內存,更高的迭代運算效率
調度機制N/A基于 DAG 的任務調度,執行機制更優

模塊

在這里插入圖片描述

模塊作用
Spark Core最基礎與最核心的功能。用于離線數據處理,批量計算。
Spark SQL交互式查詢。用于操作結構化數據,通過 SQL 或者 Hive 的 HQL 來查詢數據。
Spark Streaming準實時流式計算。
Spark MLlib機器學習。
Spark GraphX圖計算。

簡單入門

創建MAVEN項目,并引入依賴

<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-yarn_2.12</artifactId><version>3.3.2</version><scope>provided</scope></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.12</artifactId><version>2.7.0</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.12</artifactId><version>2.7.0</version></dependency></dependencies>

創建Scala單例對象,并實現

package com.mrhelloworld.wordcountimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount01Demo {def main(args: Array[String]): Unit = {// ==================== 建立連接 ====================// 初始化配置對象val conf = new SparkConf()// 設置運行模式與 AppNameconf.setMaster("local").setAppName("WordCount")// 根據配置對象初始化上下文對象val sc = new SparkContext(conf)// ==================== 業務處理 ====================// 讀取文件,按行讀取val lines: RDD[String] = sc.textFile("data/wordcount")// 按空格拆分每一行數據,拆分為一個一個的單詞val words: RDD[String] = lines.flatMap(w => w.split("\\s+"))// 將數據根據單詞進行分組,便于統計val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(w => w)// 對分組后的數據進行統計val wordCount: RDD[(String, Int)] = wordGroup.map(kv => (kv._1, kv._2.size))// 從結果集中獲取指定條數的數據val wordCountTopN: Array[(String, Int)] = wordCount.take(10)// 將結果打印在控制臺wordCountTopN.foreach(println)// 簡寫方式lines.flatMap(_.split("\\s+")).groupBy(w => w).map(kv => (kv._1, kv._2.size)).take(10).foreach(println)// ==================== 關閉連接 ====================if (!sc.isStopped) sc.stop()}
}

運行架構

主要角色

  • 資源層面的主從
    • Master,負責管理與分配整個集群中的資源(CPU Core 和 Memory)
    • Worker,負責接收資源并執行作業中的任務
  • 作業層面的主從
    • Driver,負責管理整個集群中的作業任務調度
    • Executor,負責執行具體的任務

注意:無論什么運行模式都會存在這些角色,只是在不同的運行模式下,這些角色的分布會有所不同

通用運行架構

在這里插入圖片描述

注意:一個App -> 一個或多個Job -> 一個或多個Stage -**> ** 一個或多個Task:Task是由分區數決定的

運行流程

  • 啟動集群后,Worker 節點會向 Master 節點心跳匯報資源(CPU Core 和 Memory)情況;
  • Client 提交 Application,根據不同的運行模式在不同的位置創建 Driver 進程;
  • SparkContext 連接到 Master,向 Master 注冊應用并申請資源(Executor 的 CPU Core 和 Memory);
  • Master 根據 SparkContext 的資源申請并根據 Worker 心跳周期內報告的信息決定在哪個 Worker 上分配資源,也就是Executor;
  • Worker 節點創建 Executor 進程,Executor 向 Driver 進行反向注冊;
  • 資源滿足后(Executor 注冊完畢),SparkContext 解析 Applicaiton 代碼,創建 RDD,構建 DAG,并提交給DAGScheduler 分解成 Stage(當碰到 Action 算子時,就會催生 Job,每個 Job 中含有 1 個或多個 Stage),然后將Stage(或者稱TaskSet)提交給 TaskScheduler,TaskScheduler 負責將 Task 分配到相應的Worker,最后提交給Executor 執行(發送到 Executor 的線程池中);
  • 每個 Executor 會持有一個線程池,Executor 通過啟動多個線程(Task)來對 RDD 的 Partition 進行并行計算,并向SparkContext 報告,直至 Task 完成。

DAG:有向無環圖,將作業的調度順序規定好,不會出現回頭執行相同任務或階段的可能,類似于找一條路線一次發試卷,不要出現走一樣的路的情況

運行模式

本地模式

? Local 模式,就是不需要其他任何節點資源就可以在本地執行 Spark 代碼的環境,一般用于教學,調試,演示等。本地模式就是一個獨立的進程,通過其內部的多個線程來模擬整個 Spark 運行時環境,每個線程代表一個 Worker。

? 本地模式,在運行時也有不同的模式:

模式特點
local只有一個工作進程,無并行計算能力
local[N]N 個工作進程,通常設置 N 為機器的 CPU 數量
local[*]工作進程數量等于機器的 CPU 核心數量
local[N, M]指定線程個數以及失敗重試次數,失敗重試次數為 M(最多重試M-1次)
local[*, M]指定線程個數以及失敗重試次數,失敗重試次數為 M(最多重試M-1次)
local-cluster
[numSlaves,coresPerSlave,memeoryPerySlave]
本地偽分布式集群

numSlaves :模擬集群的 Worker 節點個數;
coresPerSlave :模擬集群的各個 Worker 節點上的內核數;
memoryPerSlave :模擬集群的各個 Worker 節點上的內存大小,單位 M。

Standalone 獨立模式

? Spark 自身提供的集群模式,也就是所謂的 Standalone 獨立模式。Spark 的獨立模式體現了經典的 Master-Worker 模式

? 在搭建時的不同模式:

模式特點
Standalone-SingleMaster 單節點模式,如果 Master 掛了就無法提交應用程序
Standalone-HAMaster 高可用模式。可以使用 FileSystem (文件系統)或 ZooKeeper (分布式協調服務)來實現主備切換
優點
  • 獨立模式屬于自己獨立一套集群(Client/Master/Worker),是 Spark 原生的集群管理器,自帶完整的服務,可單獨部
    署,無需依賴任何其他資源管理系統
  • 使用 Standalone 可以很方便地搭建一個集群
缺點

? 資源不利于充分利用

YARN 模式

? Spark 可以基于 YARN 來計算(將 Spark 應用提交到 YARN 上運行),Spark 客戶端直接連接 YARN,不需要額外構建 Spark 集群Spark 中的各個角色運行在 YARN 的容器內部,并組成Spark 集群環境。

? 在 Spark On Yarn 模式下,Executor 進程名稱為 CoarseGrainedExecutorBackend。一個 CoarseGrainedExecutorBackend

有且僅有一個 Executor 對象, 負責將 Task 包裝成 taskRunner,并從線程池中抽取一個空閑線程運行 Task,每一個
CoarseGrainedExecutorBackend 能并行運行 Task 的數量取決于分配給它的 CPU 個數。

在這里插入圖片描述

Client模式和Cluster模式

? Client模式和Cluster模式是在上述模式的基礎上,運行時的不同模式(不包括普通的本地模式)

模式區別
Client模式客戶端在提交任務后,不能離開,并會將最后的結果返回給到客戶端,提交任務的節點就是Driver所在的節點
Cluster模式客戶端在提交任務后,將直接離開,由Master決定Driver所在節點,Driver管理任務的完成,并將任務結果打印在日志中,結果只能在日志中查看(Yarn中,Driver 在AplicationMaster中)

核心編程

RDD

概念和特性
概念:

? RDD 是 Resilient Distributed Dataset 的縮寫,意思為彈性分布式數據集(一種數據結構),是一個讀取分區記錄的集合,是 Spark 對需要處理的數據的基本抽象。源碼中是一個抽象類,代表一系列彈性的、不可變、可分區、里面元素可并行計算的集合。

注意:RDD中存放的是數據的讀取邏輯和數據的計算邏輯,并發給Executor。

特性
  • 彈性
    • 彈性存儲:內存與磁盤自動切換;
    • 彈性容錯:數據丟失可以自動恢復;
    • 彈性計算:計算出錯重試機制;
    • 彈性分片:可根據需求重新分片。
  • 分布式:數據存儲在大數據集群不同的節點上;
  • 數據集:RDD 只是封裝了計算邏輯,并不保存數據;
  • 數據抽象:RDD 是一個抽象類,需要子類具體實現;
  • 不可變:RDD 封裝了計算邏輯,是不可改變的,想要改變,只能產生新的 RDD,在新的 RDD 中封裝新的計算邏輯;
  • 可分區:RDD 是一種分布式的數據集,由于數據量很大,因此計算時要被切分并存儲在各個結點的分區當中;
  • 并行計算:一個分區對應一個任務。分區是 Spark 計算任務的基本處理單位,決定了并行計算的粒度。
  • 依賴關系:如果某個 RDD 丟失了,則可以根據血緣關系,從父 RDD 計算得來。
  • 惰性執行:Spark 對于 Transformation 轉換算子采用惰性計算機制,遇到 Transformation 時并不會立即計算結果,而是要等遇到 Action 行動算子時才會一起執行。
創建RDD
通過集合創建RDD(通過內存)
def main(args: Array[String]): Unit = {// 建立連接val sc = new SparkContext("local[*]", "CreateRDD")// 通過內存創建 RDDval list = List(1, 2, 3, 4, 5)// parallelize 表示并行度,為了方便使用可以調用 makeRDD//val rdd: RDD[Int] = sc.parallelize(list)// 通過源碼得知 makeRDD 內部調用了 parallelizeval rdd: RDD[Int] = sc.makeRDD(list)rdd.foreach(println)// 關閉連接if (!sc.isStopped) sc.stop()}
通過文件創建RDD
def main(args: Array[String]): Unit = {// 建立連接val sc = new SparkContext("local[*]", "CreateRDD")// path 可以是具體的文件,也可以是目錄,當 path 是目錄時,則獲取目錄下所有文件val rdd01: RDD[String] = sc.textFile("data/test.txt")rdd01.foreach(println)val rdd02: RDD[String] = sc.textFile("data/wordcount")rdd02.foreach(println)// 還支持通配符匹配val rdd03: RDD[String] = sc.textFile("data/wordcount/wd1*.txt")rdd03.foreach(println)// 訪問的節點必須為 Active NameNode,也就是說 node01 必須是 Active NameNodeval rdd04: RDD[String] = sc.textFile("hdfs://node02:8020/bd/wordcount")rdd04.foreach(println)// textFile 是按行讀取,wholeTextFiles 是按整個文件讀取// 返回格式為元組:(文件地址+文件名, 文件內容)val rdd05: RDD[(String, String)] = sc.wholeTextFiles("data/test.txt")rdd05.foreach(println)val rdd06: RDD[(String, String)] = sc.wholeTextFiles("hdfs://node02:8020/bd/wordcount")rdd06.foreach(println)// 關閉連接if (!sc.isStopped) sc.stop()}

分區

? Spark RDD 是一種分布式的數據集,由于數據量很大,因此計算時要被切分并存儲在各個結點的分區當中

在這里插入圖片描述

具體分區方法
集合的分區處理makeRDD通過設置numSlices 參數來設置分區數,不傳遞時將使用默認值defaultParallelism ,該默認值表示將按當前運行環境的最大可用 CPU 核數進行分區
文件的分區處理textFile讀取文件數據時,數據會按照 Hadoop 文件讀取的規則進行分區
支持重分區的算子\在Spark中有一些算子也可以支持數據的重新分區,詳見算子
分區器的分區處理HashPartitioner RangePartitioner等繼承Partitioner的自定義類通過繼承Partitioner來自己定義實現不同的分區方式

算子

? 算子的本質就是函數,不同的RDD之間的依賴關系,是一個函數空間到另一個函數空間的映射。

轉換算子

? 轉換往往是從一個 RDD 到另一個 RDD 的計算。在執行應用的程序時,遇到轉換算子,并不會立即觸發計算操作,而是延時到遇到 Action 算子時才會操作。

單 Value的轉換算子
算子作用語法例如
map將處理的數據逐條進行映射轉換,將返回值構成新的 RDDdef map[U: ClassTag](f: T => U): RDD[U]
mapPartitions以分區為單位進行數據轉換操作,將整個分區的數據加載到內存def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
mapPartitionsWithIndexmapPartitionsWithIndex 跟 mapPartitions 差不多,只是參數多了個分區號def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]
flatMap首先將函數作用于集合中的每個元素,然后將結果展平,返回新的集合def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
glom將同一個分區的多個單個數據直接轉換為相同類型的單個數組進行處理def glom(): RDD[Array[T]]
groupBy(shuffle)將數據按指定條件進行分組,從而方便我們對數據進行統計分析def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
filter指過濾出符合一定條件的元素def filter(f: T => Boolean): RDD[T]
sample是從大量的數據中獲取少量的數據def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T]
distinct(shuffle)將數據集中重復的數據去重def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

Shuffer:

sample采樣的策略:

  • withReplacement:表示抽出樣本數據后是否再放回去,true 表示會放回去,這也就意味著抽出的樣本可能有重復;

  • fraction:

    • 抽取不放回的情況下為每個元素被抽取的概率,在 0 ~ 1 之間,e.g. 0.3 表示抽出 30%;1 全取,0 全不取;
    • 抽取放回的情況下為每個元素可能被抽取的次數,e.g. 3 表示可能會抽取 3 次,當然也有可能抽取 1 次或者 2 次,或者 1 次都抽不到或者抽到更多次。
    • seed:表示隨機種子可以將這個參數設置為定值。種子相同時,指定相同的 fraction 每次抽樣的數據也會相同,如果此時出現數據不相同的情況那就是數據出問題了

coalesce 函數除了可以縮減分區數,還可以擴增分區數,但是擴增分區數時一定要設置 shuffle 為 true

雙 Value的轉換算子
算子作用語法例如
union對兩個 RDD 進行并集(合并)操作,且不去重rdd1.union(rdd2)
intersection表示對兩個 RDD 取交集(相同)rdd1.intersection(rdd2)
subtract表示對兩個 RDD 取差集(不同)rdd1.subtract(rdd2)
cartesian表示對兩個 RDD 進行笛卡爾積操作rdd1.cartesian(rdd2)
zip將兩個 RDD 合并成一個 RDD``rdd1.zip(rdd2)`

zip:兩個 RDD 的 Partition 數量以及元素數量都必須相同,否則會拋出異常

轉換算子(Key-Value)
算子作用語法例如
partitionBy(shuffle)按照指定分區數重新進行分區def partitionBy(partitioner: Partitioner): RDD[(K, V)]mkRDD.partitionBy(new HashPartitioner(2))
sortByKey(shuffle)將 K, V 格式數據的 Key 根據指定的規則進行排序,默認為升序def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]mkRDD.sortByKey()
reduceByKey(shuffle)將相同 Key 的值聚合到一起def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
mkRDD.reduceByKey(_ + _)
groupByKey(shuffle)按 K, V 格式數據的 Key 進行分組,會返回 (K, Iterable[V]) 格式數據def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
mkRDD.groupByKey()
aggregateByKey(shuffle)按 K, V 格式數據的 Key 進行分組,可以實現分區內和分區間不同的計算邏輯def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,<br/> combOp: (U, U) => U): RDD[(K, U)]
mkRDD.aggregateByKey(0)(math.max(_, _), _ + _)
mapValues針對于 K, V 形式的數據只對 V 進行操作def mapValues[U](f: V => U): RDD[(K, U)]aggregateByKeyRDD.mapValues(t => t._1 / t._2)
foldByKey(shuffle)當分區內和分區間計算邏輯相同時使用def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V,partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
mkRDD.foldByKey(0)(_ + _)
combineByKey(shuffle)第一個參數是給每個分區的第一個 Key 的 Value 一個初始值規則def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null): RDD[(K, C)]
mkRDD.combineByKey( (_, 1),(t, v) => (t._1 + v, t._2 + 1),(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2)
join在類型為 K, V 和 K, W 的 RDD 上調用,返回一個相同 Key 對應的所有元素對在一起的 K, (V, W) 的 RDDdef join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]rdd1.join(rdd2)

分區器Partitioner:

  • HashPartitioner
  • RangePartitioner
  • 繼承Partitioner的自定義分區器

五種聚合算子(上表中加粗的算子)最終都會調用到 combineByKeyWithClassTag 這個函數:

在這里插入圖片描述

行動算子

? 一個行動往往代表一種輸出到外部系統的操作。在執行應用的程序時,遇到行動算子,會立即產生一個 Job,對已有的 RDD 中的數據執行計算后產生結果,將結果返回 Driver 程序或寫入到外部物理存儲。

算子作用語法例如
reduce通過函數聚合 RDD 中的所有元素,先聚合分區內數據,再聚合分區間數據def reduce(f: (T, T) => T): TmkRDD.reduce(_ + _)
count返回 RDD 中元素的個數def count(): LongmkRDD.count()
take返回 RDD 的前 n 個元素組成的數組def take(num: Int): Array[T]mkRDD.take(3)
takeOrdered返回 RDD 排序后的前 n 個元素組成的數組,默認正序def take(num: Int): Array[T]```mkRDD.takeOrdered(3)(Ordering.Int.reverse)`
first返回 RDD 中的第一個元素,底層就是 take(1)def first(): TmkRDD.first()
foreach循環遍歷數據集中的每個元素,運行相應的計算邏輯(函數)def foreach(f: T => Unit): UnitmkRDD.foreach(println)
foreachPartition按分區循環遍歷數據集中的每個元素,并運行相應的計算邏輯(函數)def foreachPartition(f: Iterator[T] => Unit): UnitmkRDD.foreachPartition(partitionOfRecords => println(s"分區:${TaskContext.getPartitionId()},記錄:${partitionOfRecords.toArray.mkString(",")}"))
collect將不同分區的數據收集到 Driver 并以數組的形式返回數據def collect(): TmkRDD.collect()
aggregate將每個分區里面的元素通過 seqOp 函數和初始值進行聚合,然后用 combOp 函數將每個分區的結果和初始值(zeroValue)進行 combine 操作def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): UmkRDD.aggregate(10)(_ + _, _ + _)
fold當 aggregate 的分區內和分區間計算邏輯相同時,Spark 為了讓程序員更方便的使用,提供了 fold 算子def fold(zeroValue: T)(op: (T, T) => T): TmkRDD.fold(10)(_ + _)
countByKey針對 K, V 類型的 RDD,返回一個 K, Int 的 map,表示每一個 Key 對應的元素個數,countByKey 底層調用的是 reduceByKeydef countByKey(): Map[K, Long] = self.withScope {self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap}mkRDD.map(_ -> 1).countByKey()
countByValue針對 K, V 類型的 RDD,返回一個 K, Int 的 map,表示每一個 Key 對應的元素個數countByValue 底層調用的是map().countByKey()def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {map(value => (value, null)).countByKey()}mkRDD.countByValue()

save 系列:

  • saveAsTextFile:將數據集的元素以 TextFile 的形式保存到 HDFS 文件系統或者其他文件系統
  • saveAsSequenceFile:將數據集中的元素以 Hadoop SequenceFile 的格式保存到 HDFS 文件系統或者其他文件系統
  • saveAsObjectFile:將 RDD 中的元素序列化成對象,存儲到文件中
控制算子

? 可以將 RDD 持久化,持久化的單位是 Partition。

算子作用語法例如
cache保存到內存,效率高,數據不安全容易丟失def cache(): TmkRDD.cache()
persist保存到磁盤(臨時文件,作業結束后會刪除),效率低,數據安全def cache(StorageLevel.xxx): TmkRDD.persist(StorageLevel.MEMORY_AND_DISK)
checkpoint保存到磁盤(永久保存,一般存儲在分布式文件系統中,例如 HDFS),效率低,數據安全\核心代碼:sparkContext.setCheckpointDir("hdfs://node02:8020/yjx/cp")<br />rdd3.checkpoint()

注意:

  • 當我們沒有對RDD持久化及沒有使用控制算子時,我們使用行動算子時會自動從數據源開始再進行一次全部邏輯
  • checkpoint 可以理解為改變了數據源,因為關鍵數據已經計算完成,沒有必要重頭進行讀取,所以 checkpoint
    算子不僅能將 RDD 持久化到磁盤,還能切斷 RDD 之間的依賴關系

閉包檢測

閉包的概念

? Spark Job 產生的任務會并行執行,也就是說會分發給多個 Executor 去執行。那么從計算的角度,算子以外的代碼都是在 Driver 端執行,算子里面的代碼都是在 Executor 端執行,這樣就會導致算子內可能會用到算子外的
數據,就會形成閉包的效果。

? 而算子外的數據想要被 Executor 端的任務使用,就必須先序列化然后通過網絡 IO 進行傳輸,此時如果算子外的數據

無法序列化,就會發生錯誤。所以為了降低程序出錯的可能性,Spark 的算子增加了閉包檢查功能,會在執行計算任務
前,檢測閉包內的對象是否可以進行序列化。

常見問題

在這里插入圖片描述

算子外的數據無法序列化會出現以上問題

解決方案
  • 繼承 scala.Serializable
  • 使用樣例類(推薦),樣例類默認繼承了 Serializable
Kryo 序列化框架
  • Spark 內部默認使用的序列化框架 Kryo。
  • 由于 Java 自身的序列化比較重(字節多),所以出于性能的考慮,Spark 2.0 開始支持另外一種序列化機制 Kryo。
  • Kryo 的速度是 Serializable 的 10 倍。當 RDD 在 Shuffle 數據的時候,簡單數據類型、數組和字符串類型已經在 Spark
    內部使用 Kryo 來序列化。
  • 使用 Kryo 序列化,也需要繼承 Serializable 接口。
  • 配置 Kryo 序列化方式如下:
val conf: SparkConf = new SparkConf()
.setMaster("local[*]").setAppName("SerializDemo")
// 設置 Kryo 序列化器
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注冊 Kryo 序列化器
.registerKryoClasses(Array(classOf[User]))

血統和依賴關系

血統

? Spark 根據用戶 Application 中的 RDD 的轉換算子和行動算子,會生成 RDD 之間的依賴關系,多個 RDD 之間的關系又

形成一條關系鏈叫做 RDD 的血統(Lineage)。

依賴關系
窄依賴(Narrow Dependency)

? 窄依賴指的是父 RDD 和子 RDD 的 Partition 之間的關系是一對一的(獨生子女)。

寬依賴(Wide Dependency)

? 寬依賴指的是父 RDD 與子 RDD 的 Partition 之間的關系是一對多(多胎),寬依賴會有 Shuffle 的產生。

通常情況下,產生了Shuffer就是寬依賴。

Stage階段

· 每個 Job 會被拆分成多個 Task,作為一個 TaskSet 任務集,其名稱為 Stage,Stage 的劃分和調度是由 DAGScheduler
來負責的。Stage 的切割規則為:從后往前,遇到寬依賴就切割 Stage 。

在這里插入圖片描述

? 總結:

  • 從后向前推理,遇到寬依賴就斷開,遇到窄依賴就把當前的 RDD 加入到 Stage 中;
  • 默認情況下每個 Stage 里面的 Task 的數量是由該 Stage 中最后一個 RDD 的 Partition 數量決定的(一個 Partition 對應一
    個 Task);
  • 最后一個 Stage(ResultStage) 里面的任務類型是 ResultTask,前面所有其他 Stage(ShuffleMapStage) 里面的任務類型都是
    ShuffleMapTask;
  • 代表當前 Stage 的算子一定是該 Stage 的最后一個計算步驟。

Job作業

? Job 包含了多個 Task 組成的并行計算,往往由 Spark Action 算子觸發生成, 一個 Application 中往往會產生多個

Job。一個Job 包含 N 個 Transformation 算子和 1 個 Action 算子。

總結:

  • Application:初始化一個 SparkContext 即生成一個 Application;
  • Job 是 Application 的子集,以 Spark Action 算子為界,遇到一個 Action 算子就觸發一個 Job;
  • Stage 是 Job 的子集,以 RDD 寬依賴(即 Shuffle)為界,遇到 Shuffle 就做一次劃分;
  • Task 是 Stage 的子集,以并行度(分區數)來衡量,分區數是多少,就有多少個 Task。

提示:Spark 中所謂的并行度是指 RDD 中的分區數,即 RDD 中的 Task 數。
注意:Application → Job → Stage → Task,每一層都是 1 對 N 的關系。

數據本地化

數據本地化級別:

  • PROCESS_LOCAL進程本地化,性能最好。指代碼和數據在同一個進程中,也就是同一個 Executor 中;計算數據的
    Task 由 Executor 執行,此時數據在 Executor 的 BlockManager 中;
  • NODE_LOCAL節點本地化。代碼和數據在同一個節點中,數據存儲在節點的 HDFS Block,Task 在節點的某個
    Executror 執行;或者數據和 Task 在同一個節點不同的 Executor 中,數據需要跨進程傳輸;
  • NO_PREF :沒有最佳位置這一說,數據從哪里訪問都一樣,不需要位置優先,比如 SparkSQL 直接讀取 MySQL;
  • RACK_LOCAL機架本地化。數據和 Task 在一個機架的兩個節點上,數據需要通過網絡在節點之間進行傳輸;
  • ANY :數據和 Task 可能在集群中的任何地方,而且不在一個機架中,性能最差

Process Locality:相關參數 spark.locality.wait 默認值是 3s。Task 任務分配的時候,先是按照 PROCESS_LOCAL
方式去分配 Task,如果 PROCESS_LOCAL 不滿足,默認等待 3 秒,看能不能按照這個級別去分配,如果等了 3 秒也實現不
了,那么就按 NODE_LOCAL 級別去分配 ,以此類推。

// 全局設置,默認 3s
spark.locality.wait=3s
// 建議 60s
spark.locality.wait.process=60s
// 建議 30s
spark.locality.wait.node=30s
// 建議 20s
spark.locality.wait.rack=20s

廣播變量(分布式共享只讀變量)

? Spark 提供了廣播變量,一種分布式共享只讀變量,使用了廣播變量存儲 regex 后,Executor 端

就只會存儲一份該數據供多個 Task 使用,為了防止數據被修改,所以是只讀變量。

在這里插入圖片描述

工作流程
  • 廣播變量初始的時候在 Drvier 端會有一份副本
  • Task 在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的 Executor 對應的 BlockManager 中,嘗試獲取變量副本
  • 如果本地沒有,那么就從 Driver 端遠程拉取變量副本,并保存在本地的 Executor 對應的 BlockManager 中
  • 此后這個 Executor 上的其他 Task 都會直接使用本地的BlockManager 中的副本
  • Executor 的 BlockManager 除了從 Driver 端上拉取,也能從其他節點的 BlockManager 上拉取變量副本,距離越近越好
優點
  • 不是每個 Task 一份變量副本,而是每個節點的 Executor 一份副本,可以讓變量產生的副本數大大減少
  • 變量一旦被定義為一個廣播變量,那么這個變量只能讀取,不能修改

注意:

  • 不能將RDD廣播出去, RDD 是不存儲數據的。可以將 RDD 的結果廣播出去
  • 廣播變量只能在 Driver 端定義,不能在 Executor 端定義。
  • 在 Driver 端可以修改廣播變量的值,在 Executor 端無法修改廣播變量的值
  • 如果 Executor 端用到了 Driver 端的變量,不使用廣播變量,在 Executor 有多少 Task 就有多少 Driver 端的變量副本
  • 如果 Executor 端用到了 Driver 端的變量,使用廣播變量,在每個 Executor 中只有一份 Driver 端的變量副本

累加器(分布式共享只寫變量)

? 累加器是 Spark 提供的一種分布式共享只寫變量,主要用于在分布式計算中高效地執行全局聚合操作。

在這里插入圖片描述

累加器在 Driver 端定義并賦初始值,累加器只能在 Driver 端讀取最后的值,在 Excutor 端更新。

自定義累加器

? Spark 還支持自定義累加器,只需要繼承 AccumulatorV2 即可。

簡單示例:

package com.yjxxt.accumulator
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
object WordCountAccumulatorDemo {def main(args: Array[String]): Unit = {// 建立連接val sc = new SparkContext("local[*]", "OnlineCountDemo")// 初始化自定義累加器val wordCountAccumulator = new WordCountAccumulator// 注冊累加器sc.register(wordCountAccumulator, "wordCountAccumulator")// 創建 RDDval rdd: RDD[String] = sc.textFile("data/wordcount", 2)// 自定義累加器 實現 WordCountrdd.flatMap(_.split("\\s+")).foreach(wordCountAccumulator.add)// 獲取累加器println(wordCountAccumulator.value)// 關閉連接if (!sc.isStopped) sc.stop()}/*** 自定義累加器*/class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {// 定義可變 Map 存放 Word 和 Count(這個就是自定義的累加器)private var wd = mutable.Map[String, Long]()// 累加器是否為零(空)override def isZero: Boolean = this.wd.isEmpty// 拷貝新的累加器override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = new WordCountAccumulator// 重置累加器override def reset(): Unit = this.wd.clear()// 累加器相加override def add(word: String): Unit = {val newCount = wd.getOrElse(word, 0L) + 1Lwd.update(word, newCount)}// 累加器合并override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {val m1 = this.wdval m2 = other.valuem2.foreach {case (word, count) => {val newCount = m1.getOrElse(word, 0L) + countm1.update(word, newCount)}}}// 獲取累加器![在這里插入圖片描述](https://i-blog.csdnimg.cn/direct/cebec725e8ed4da0b5c31eecbd6f0da3.png#pic_center)
override def value: mutable.Map[String, Long] = wd}
}

內存管理 (同一內存管理)

		儲存內存和執行內存**共享同一塊空間**,可以動態**借用對方的空閑區域**。

內存分配

在這里插入圖片描述

其中最重要的優化在于動態借用機制,其規則如下:

  • 設定基本的存儲內存大小和執行內存大小( spark.memory.storageFraction 參數,默認為 0.5,即各占一半),
    該設定確定了雙方各自擁有的空間范圍;
  • 雙方空間都不足時,則存儲到硬盤;若己方空間不足而對方空間空余時,可借用對方的空間(存儲空間不足是指不足
    以放下一個完整的 Block);
  • 執行內存的空間被對方占用后,可讓對方將占用的部分轉存到硬盤,然后“歸還”借用的空間;
  • 存儲內存的空間被對方占用后,無法讓對方“歸還”,因為需要考慮 Shuffle 過程中的很多因素,實現起來較為復雜。

內存估算

? 內存具體估算公式如下:

  • 估算 Other 內存 = 自定義數據結構大小 * 每個 Executor 核數;
  • 估算 Storage 內存 = 廣播變量數據大小 + Cache / Executor 數量。例如 Cache 了 100G 數據,10 個 Executor,那么每個
    Executor 分別存儲 10G 數據。在 Spark WEBUI 的 Storage 標簽頁可以直接查看所有的內存占用,大致就對應 Storage
    Memory。
  • 估算 Execution 內存 = 知道了 Storage Memory,Execution Memory 也就知道了,因為默認情況下 Execution Memory 和
    Storage Memory 占用 Spark Memory 的比例是相同的。
  • 估算 Executor 內存 = 每個 Executor 核數 *(數據集大小 / 并行度)。例如 100G 數據,并行度 200(Task),單個并行
    度的數據量為 100G / 200 = 500M。然后 executor-cores 為 4,最終 Executor 內存最少需要 2G 內存。
  • Task 被執行的并行度 = Executor 個數 * 每個 Executor 的 Core 核數。

內存配置

? 一般情況下,各個區域的內存比例保持默認值即可。如果需要更加精確的控制內存分配,可以按照如下思路:

  • Unified 統一內存比例 spark.memory.fraction = (估算 Storage 內存 + 估算 Execution 內存)/(估算 Storage 內存 + 估算
    Execution 內存 + 估算 Other 內存)。
  • Storage/Execution 內存比例 spark.memory.storageFraction = (估算 Storage 內存)/(估算 Storage 內存 + 估算
    Execution 內存)。

配置參數說明:

  • Unified 統一內存比例:spark.memory.fraction,默認為 0.6;
  • Storage/Execution 內存比例:spark.memory.storageFraction,默認為 0.5。

? 代入公式計算:

  • Storage 內存 = (spark.executor.memory - 300MB) * spark.memory.fraction * spark.memory.storageFraction
  • Execution 內存 = (spark.executor.memdry - 300MB) * spark.memory.fraction * (1 - spark.memory.storageFraction)
    4,最終 Executor 內存最少需要 2G 內存。
  • Task 被執行的并行度 = Executor 個數 * 每個 Executor 的 Core 核數。

內存配置

? 一般情況下,各個區域的內存比例保持默認值即可。如果需要更加精確的控制內存分配,可以按照如下思路:

  • Unified 統一內存比例 spark.memory.fraction = (估算 Storage 內存 + 估算 Execution 內存)/(估算 Storage 內存 + 估算
    Execution 內存 + 估算 Other 內存)。
  • Storage/Execution 內存比例 spark.memory.storageFraction = (估算 Storage 內存)/(估算 Storage 內存 + 估算
    Execution 內存)。

配置參數說明:

  • Unified 統一內存比例:spark.memory.fraction,默認為 0.6;
  • Storage/Execution 內存比例:spark.memory.storageFraction,默認為 0.5。

? 代入公式計算:

  • Storage 內存 = (spark.executor.memory - 300MB) * spark.memory.fraction * spark.memory.storageFraction
  • Execution 內存 = (spark.executor.memdry - 300MB) * spark.memory.fraction * (1 - spark.memory.storageFraction)

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/92952.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/92952.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/92952.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

VRRP技術

VRRP的概念及應用場景 VRRP&#xff08;虛擬路由冗余協議&#xff09;概念 VRRP&#xff08;Virtual Router Redundancy Protocol&#xff0c;虛擬路由冗余協議&#xff09;是一種路由容錯協議&#xff0c;用于在多個路由器之間提供網關冗余&#xff0c;確保當主路由器故障時&a…

表驅動法-靈活編程范式

表驅動法&#xff1a;從理論到實踐的靈活編程范式 一、為什么需要表驅動法&#xff1f; 在處理多分支邏輯&#xff08;如消息解析、命令分發&#xff09;時&#xff0c;傳統的 if-else 或 switch-case 存在明顯局限&#xff1a; 當分支數量龐大&#xff08;如成百上千條命令&am…

零基礎-動手學深度學習-10.2. 注意力匯聚:Nadaraya-Watson 核回歸

上節介紹了框架下的注意力機制的主要成分 圖10.1.3&#xff1a; 查詢&#xff08;自主提示&#xff09;和鍵&#xff08;非自主提示&#xff09;之間的交互形成了注意力匯聚&#xff1b; 注意力匯聚有選擇地聚合了值&#xff08;感官輸入&#xff09;以生成最終的輸出。 本節將…

nginx高新能web服務器

一、Nginx 概述和安裝 Nginx是免費的、開源的、高性能的HTTP和反向代理服務器、郵件代理服務器、以及TCP/UDP代理服務器。 Nginx 功能介紹 靜態的web資源服務器html&#xff0c;圖片&#xff0c;js&#xff0c;css&#xff0c;txt等靜態資源 http/https協議的反向代理 結合F…

Unity大型場景性能優化全攻略:PC與安卓端深度實踐 - 場景管理、渲染優化、資源調度 C#

本文將深入探討Unity在大型場景中的性能優化策略&#xff0c;涵蓋場景管理、渲染優化、資源調度等核心內容&#xff0c;并提供針對PC和安卓平臺的優化方案及實戰案例。 提示&#xff1a;內容純個人編寫&#xff0c;歡迎評論點贊。 文章目錄1. 大型場景性能挑戰1.1 性能瓶頸定位…

Java集合框架、Collection體系的單列集合

Java集合框架、Collection1. 認識Java集合框架及結構1.1 集合框架整體結構1.2 集合框架的核心作用2. Collection的兩大常用集合體系及各個系列集合的特點2.1 List系列集合&#xff08;有序、可重復&#xff09;2.2 Set系列集合&#xff08;無序、不可重復&#xff09;3. Collec…

HTML <picture> 元素:讓圖片根據設備 “智能切換” 的響應式方案

在響應式設計中&#xff0c;圖片適配是一個繞不開的難題&#xff1a;同一張高清圖片在大屏設備上清晰美觀&#xff0c;但在小屏手機上可能加載緩慢&#xff1b;而適合手機的小圖在桌面端又會模糊失真。傳統的解決方案往往需要用JavaScript判斷設備尺寸并動態替換圖片源&#xf…

Spring Boot 監控與日志管理實戰

在 Spring Boot 應用開發中&#xff0c;指標監控和日志管理是保障應用穩定運行的核心環節。指標監控能實時掌握應用健康狀態、性能瓶頸&#xff0c;日志管理則用于問題排查和安全審計。本文基于 Spring Boot 提供的 Actuator 監控工具、Spring Boot Admin 可視化平臺&#xff0…

【排序算法】②希爾排序

系列文章目錄 第一篇&#xff1a;【排序算法】①直接插入排序-CSDN博客 第二篇&#xff1a;【排序算法】②希爾排序-CSDN博客 第三篇&#xff1a;【排序算法】③直接選擇排序-CSDN博客 第四篇&#xff1a;【排序算法】④堆排序-CSDN博客 第五篇&#xff1a;【排序算法】⑤冒…

Linux Shell為文件添加BOM并自動轉換為unix格式

1.添加并查看BOM添加bomvim -c "set bomb|set fileencodingutf-8|wq" ./gradlew查看bomhead -c 3 ./gradlew | hexdump -C2.安裝dos2unix并轉換為unix格式安裝sudo apt install dos2unix轉換dos2unix ./gradlew

華清遠見25072班C語言學習day5

重點內容&#xff1a;數組&#xff1a;為什么有數組&#xff1f;為了便于存儲多個數據特點&#xff1a;連續存儲多個同種數據類型元素(連續指內存地址連續)數組名&#xff1a;數組中首元素的地址&#xff0c;是一個地址常量。一維整形數組&#xff1a;定義&#xff1a;數據類型…

安全守護,溫情陪伴 — 智慧養老產品上新

- 養老智慧看護終端接入螢石開放平臺 - 在2025 ECDC螢石云開發者大會&#xff0c;螢石產品經理已經介紹了基于螢石云服務AI能力適老化設備的養老智能能力開放。 而今天&#xff0c;養老智慧看護終端再升級&#xff0c;集成跌倒檢測、物理隱私遮蔽、火柴人遮蔽、AI語音智能體…

鴻蒙flutter項目接入極光推送

推送的自分類權益 需要審核15個工作日&#xff0c;實際約3個工作日 項目使用極光推送flutter代碼&#xff0c;代碼端已經配置的東西&#xff08;需要配置flutter端和對應各自平臺原生端&#xff09;&#xff0c;我的工程是多target&#xff0c;所以和單target有一點不同。 一、…

2025牛客多校第八場 根號-2進制 個人題解

J.根號-2進制 #數學 #FFT 思路 賽后發現身邊的同學都是通過借位來解決進位問題的&#xff0c;在此提供一種全程不出現減法的順推做法 首先A,BA,BA,B可以理解為兩個多項式&#xff1a;A0A1?2A2(?2)2…A_{0}A_{1}\sqrt{ -2 }A_{2}(\sqrt{ -2 })^2\dotsA0?A1??2?A2?(?…

DataEase官方出品丨SQLBot:基于大模型和RAG的智能問數系統

2025年8月7日&#xff0c;DataEase開源項目組發布SQLBot開源項目&#xff08;github.com/dataease/SQLBot&#xff09;。SQLBot是一款基于大語言模型&#xff08;Large Language Model&#xff0c;LLM&#xff09;和RAG&#xff08;Retrieval Augmented Generation&#xff0c;…

第十四節 代理模式

在代理模式&#xff08;Proxy Pattern&#xff09;中&#xff0c;一個類代表另一個類的功能。這種類型的設計模式屬于結構型模式。在代理模式中&#xff0c;我們創建具有現有對象的對象&#xff0c;以便向外界提供功能接口。介紹意圖&#xff1a;為其他對象提供一種代理以控制對…

訓推一體 | 暴雨X8848 G6服務器 x Intel?Gaudi? 2E AI加速卡

近日&#xff0c;暴雨信息攜手英特爾&#xff0c;針對Gaudi 2E AI加速器HL-288 PCIe卡&#xff08;簡稱IntelGaudi 2E PCIe卡&#xff0c;下同&#xff09;完成專項調優與適配工作&#xff0c;并重磅推出Intel Eagle Stream平臺4U8卡解決方案。該方案通過軟硬件協同優化&#x…

GB17761-2024標準與電動自行車防火安全的技術革新

隨著我國電動自行車保有量突破3.5億輛&#xff0c;這一便捷的交通工具已成為城市出行的重要組成。然而&#xff0c;伴隨市場規模擴大而來的是日益突出的安全問題——2023年全國電動自行車火災事故高達2.5萬起&#xff0c;年均增長率約20%&#xff0c;火災中塑料件加速燃燒并釋放…

利用容器編排完成haproxy和nginx負載均衡架構實施

1 創建測試目錄和文件[rootdocker-a ~]# mkdir lee [rootdocker-a ~]# cd lee/ [rootdocker-a lee]# touch docker-compose.yml # 容器編排工具Docker Compose 默認識別docker-compose.yml文件2 編寫docker-compose.yml文件和haproxy.cfg文件2.1 核心配置說明2.1.1 服務結構共定…

WinRAR v7.13 烈火漢化穩定版,解壓縮全格式專家

[軟件名稱]: WinRAR v7.13 烈火漢化穩定版 [軟件大小]: 3.8 MB [下載通道]: 夸克盤 | 迅雷盤 軟件介紹 WinRAR 壓縮文件管理器&#xff0c;知名解壓縮軟件&#xff0c;電腦裝機必備軟件&#xff0c;國內最流行最好用的壓縮文件管理器、解壓縮必備軟件。它提供 RAR 和 ZIP 文…