文章目錄
- 一、整體架構概述
- 二、核心組件詳解
- 1. SparkContext
- 2. DAG Scheduler
- 3. Task Scheduler
- 4. Executor
- 三、作業執行流程
- 1. DAG 生成與 Stage 劃分
- 2. Task 調度與執行
- 3. 內存管理
- 四、Shuffle 機制詳解
- 1. Shuffle 過程
- 2. Shuffle 優化
- 五、內存管理機制
- 1. 統一內存管理(Unified Memory Management)
- 2. Tungsten 優化
- 六、容錯機制
- 1. Lineage(血統)
- 2. Checkpoint
- 3. 任務重試
- 七、調度策略
- 1. 任務調度
- 2. 推測執行
- 八、性能優化關鍵點
- 1. 數據本地性
- 2. 并行度調整
- 3. 內存調優
- 九、高級特性
- 1. Catalyst 優化器
- 2. Tungsten 項目
- 十、監控與調試工具
- 1. Spark UI
- 2. 事件日志
- 3. Spark 性能調優工具
一、整體架構概述
Spark
采用主從架構(Master-Slave
),主要組件包括:
- Driver Program:運行用戶應用的 main 函數,負責創建
SparkContext
、分析作業、調度任務。 - Cluster Manager:資源管理器,如
YARN、Mesos、Standalone
。 - Worker Node:集群中的工作節點,負責執行具體任務。
- Executor:
Worker
節點上的進程,負責運行任務并緩存數據。
執行流程:
- 用戶提交應用,
Driver
啟動并創建SparkContext
。 SparkContext
連接Cluster Manager
,請求資源。Cluster Manager
分配資源,在Worker
節點上啟動Executor
。Driver
將任務分發給Executor
執行。Executor
向Driver
匯報任務狀態和結果。
二、核心組件詳解
1. SparkContext
- 是
Spark
應用的入口,負責與Cluster Manager
通信,協調資源分配。 - 管理
RDD
的依賴關系(血統圖),并生成DAG
(有向無環圖)。
2. DAG Scheduler
- 將作業(
Job
)分解為多個階段(Stage
),每個階段包含多個任務(Task
)。 - 根據
RDD
的依賴關系劃分Stage
:- 寬依賴(如
shuffle
)會觸發新的Stage
。 - 窄依賴(如
map、filter
)會被合并到同一個Stage
。
- 寬依賴(如
3. Task Scheduler
- 將
Task
分配給具體的Executor
執行。 - 負責任務調度、重試失敗的任務,以及處理推測執行(
Speculative Execution
)。
4. Executor
- 負責執行
Task
,并將結果返回給Driver
。 - 維護內存緩存,存儲
RDD
分區數據。
三、作業執行流程
1. DAG 生成與 Stage 劃分
# 示例代碼
rdd = sc.textFile("data.txt") # 讀取文件,創建 RDD
words = rdd.flatMap(lambda line: line.split()) # 轉換操作
pairs = words.map(lambda word: (word, 1)) # 轉換操作
counts = pairs.reduceByKey(lambda a, b: a + b) # 觸發 Shuffle
counts.collect() # 動作操作,觸發作業執行
執行流程:
collect()
觸發作業提交。DAG Scheduler
將作業劃分為兩個Stage
:Stage 1
:執行textFile、flatMap、map
操作。Stage 2
:執行reduceByKey
和collect
操作,依賴于Stage 1
的輸出。
2. Task 調度與執行
- ShuffleMapTask:執行
Stage 1
的任務,輸出中間結果(Shuffle
文件)。 - ResultTask:執行
Stage 2
的任務,讀取Shuffle
文件并聚合結果。
3. 內存管理
- Storage Memory:存儲緩存的
RDD
和DataFrame
。 - Execution Memory:執行
Shuffle
、聚合、排序等操作的內存。 - User Memory:用戶代碼使用的內存。
四、Shuffle 機制詳解
1. Shuffle 過程
-
Map 端:
- 將數據分區并寫入內存緩沖區。
- 緩沖區滿時溢寫到磁盤,生成多個小文件。
- 最終合并所有小文件為一個大文件,并生成索引。
-
Reduce 端:
- 從各個
Map
任務拉取屬于自己的數據。 - 合并數據并按
key
排序。 - 執行聚合或其他操作。
- 從各個
2. Shuffle 優化
- Sort Shuffle:默認實現,減少文件數量。
- Tungsten-Sort Shuffle:基于內存管理框架
Tungsten
,提高效率。 - 自適應執行(Spark 3.0+):動態調整
Shuffle
分區數。
五、內存管理機制
1. 統一內存管理(Unified Memory Management)
Spark 1.6+
引入,Storage
和Execution
內存可相互借用:# 內存配置參數 spark.memory.fraction = 0.6 # 統一內存占堆內存的比例 spark.memory.storageFraction = 0.5 # Storage 內存占統一內存的比例
2. Tungsten 優化
- 堆外內存:減少
GC
壓力,提高內存訪問效率。 - 二進制格式:直接操作二進制數據,避免
Java
對象開銷。
六、容錯機制
1. Lineage(血統)
RDD
記錄其創建過程(依賴關系),當部分分區丟失時,可通過重新計算恢復。
2. Checkpoint
- 將
RDD
寫入可靠存儲(如HDFS
),切斷血統關系,用于長依賴鏈的RDD
。rdd.checkpoint() # 設置檢查點
3. 任務重試
Task
失敗時,Task Scheduler
會自動重試(默認 4 次)。
七、調度策略
1. 任務調度
- FIFO(默認):先進先出。
- FAIR:公平調度,支持多作業共享資源。
# 啟用公平調度 spark.conf.set("spark.scheduler.mode", "FAIR")
2. 推測執行
- 當某個任務執行緩慢時,會在其他節點啟動副本任務,取最先完成的結果。
# 啟用推測執行 spark.conf.set("spark.speculation", "true")
八、性能優化關鍵點
1. 數據本地性
- PROCESS_LOCAL:數據在同一
JVM
內,最快。 - NODE_LOCAL:數據在同一節點,但需跨進程傳輸。
- RACK_LOCAL:數據在同一機架的不同節點。
- ANY:數據在任意位置。
2. 并行度調整
- 根據集群資源設置合理的并行度:
# 設置默認并行度 spark.conf.set("spark.default.parallelism", 200)
3. 內存調優
- 調整
Executor
內存和堆外內存:spark.executor.memory = 8g spark.memory.offHeap.enabled = true spark.memory.offHeap.size = 2g
九、高級特性
1. Catalyst 優化器
Spark SQL
的查詢優化器,將SQL
查詢轉換為高效的物理執行計劃:- 分析:解析
SQL
語句,檢查表和列是否存在。 - 邏輯優化:應用規則優化邏輯計劃(如謂詞下推、投影修剪)。
- 物理計劃生成:生成多個物理計劃并選擇最優。
- 代碼生成:將執行計劃編譯為
Java
字節碼。
- 分析:解析
2. Tungsten 項目
- 優化內存和
CPU
利用率:- 二進制數據處理,減少內存占用。
- 避免
Java
對象開銷,直接操作內存。
十、監控與調試工具
1. Spark UI
- 查看作業、階段、任務的執行情況,內存使用等指標。
2. 事件日志
- 記錄作業執行的詳細信息,可用于離線分析:
# 啟用事件日志 spark.eventLog.enabled = true spark.eventLog.dir = "hdfs:///spark-logs"
3. Spark 性能調優工具
- Shuffle 調優:分析
Shuffle
性能瓶頸。 - SQL 執行計劃分析:查看
SQL
查詢的優化過程。