在 Apache Spark 中,Executor(執行器) 是運行在集群工作節點(Worker Node)上的進程,負責執行具體的計算任務并管理數據。它是 Spark 分布式計算的核心組件之一,直接決定了任務的并行度和資源利用率。以下是 Executor 的詳細解析:
1. Executor 的核心職責
職責 | 說明 |
---|---|
執行 Task | 運行 Driver 分配的 Task(包括 Shuffle Map Task 和 Result Task)。 |
數據存儲 | 緩存 RDD 的分區數據(通過內存或磁盤),加速后續計算。 |
Shuffle 處理 | 處理 Shuffle 操作(如排序、聚合、溢寫磁盤)。 |
與 Driver 通信 | 向 Driver 發送心跳,報告 Task 狀態和塊(Block)信息。 |
資源管理 | 管理分配給它的內存和 CPU 核心,確保任務高效運行。 |
2. Executor 的內部結構
(1) 線程池(Task Runner Threads)
- 每個 Executor 內部維護一個線程池,線程數由
spark.executor.cores
決定。 - 每個線程處理一個 Task,實現并行計算。
- 示例:若
spark.executor.cores=4
,則 Executor 最多同時運行 4 個 Task。
(2) 內存管理
- Executor 的內存分為兩部分(通過
spark.memory.fraction
配置比例):- Execution Memory:用于計算(如 Shuffle、Join、Sort 的臨時內存)。
- Storage Memory:用于緩存 RDD 和廣播變量。
- 溢出機制:當內存不足時,數據溢寫到磁盤(可能影響性能)。
(3) BlockManager
- 管理 Executor 的數據塊(Block),包括本地和遠程數據。
- 負責與其他 Executor 交換 Shuffle 數據。
3. Executor 的啟動與資源分配
(1) 資源申請
- Driver 通過集群管理器(如 YARN、Kubernetes)申請 Executor 資源。
- 關鍵配置參數:
spark.executor.instances
:Executor 數量。spark.executor.memory
:每個 Executor 的內存(如4g
)。spark.executor.cores
:每個 Executor 的 CPU 核心數。
(2) Executor 啟動流程
- Driver 向集群管理器發送資源請求。
- 集群管理器(如 YARN 的 ResourceManager)分配 Container。
- 在 Container 中啟動
CoarseGrainedExecutorBackend
進程。 - Executor 向 Driver 注冊,準備接收 Task。
4. Executor 與 Task 的執行
(1) Task 分發
- Driver 將 Task 序列化后發送給 Executor。
- Executor 反序列化 Task 代碼并執行。
(2) 數據本地性(Locality)
- Executor 優先處理存儲在本地的數據(如 HDFS 塊),減少網絡傳輸。
- 本地性級別:
PROCESS_LOCAL
>NODE_LOCAL
>RACK_LOCAL
>ANY
。
(3) Shuffle 過程
- Map 階段:Executor 將 Shuffle 數據寫入本地磁盤(或內存)。
- Reduce 階段:Executor 從其他節點拉取 Shuffle 數據。
5. Executor 的容錯機制
- Task 失敗重試:若某個 Task 失敗,Driver 會重新調度該 Task(最多
spark.task.maxFailures
次)。 - Executor 崩潰:
- Driver 檢測到 Executor 失聯后,向集群管理器申請新 Executor。
- 丟失的緩存數據需重新計算(依賴 RDD 血統)。
6. 配置優化與常見問題
(1) 內存配置優化
- 避免 OOM:
- 增加
spark.executor.memory
。 - 調整
spark.memory.fraction
(默認 0.6)和spark.memory.storageFraction
(默認 0.5)。
- 增加
- 示例配置:
spark-submit \--executor-memory 8g \--executor-cores 4 \--conf spark.memory.fraction=0.7
(2) 并行度與數據傾斜
- 合理分區:確保每個 Task 處理的數據量均衡(通過
repartition
或調整分區數)。 - 處理傾斜:使用
salting
或自定義分區器。
(3) GC 調優
- 啟用 G1 垃圾回收器(減少停頓時間):
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC"
7. Executor 與 Driver 的對比
特性 | Executor | Driver |
---|---|---|
角色 | 執行任務的“工人” | 協調任務的“指揮官” |
運行位置 | 集群的工作節點(Worker Node) | 客戶端或集群節點(取決于部署模式) |
數據訪問 | 僅處理分配的分區數據 | 可訪問全局數據(如 collect() 結果) |
容錯 | 無狀態,失敗后由 Driver 重新調度 Task | 單點故障,崩潰則整個應用失敗 |
8. 典型問題與解決方案
(1) Executor 頻繁 Full GC
- 現象:任務停滯,日志顯示 GC 時間過長。
- 解決:
- 增加 Executor 內存。
- 減少緩存數據量,或使用序列化緩存(
MEMORY_ONLY_SER
)。
(2) Shuffle 數據溢出到磁盤
- 現象:任務變慢,磁盤 I/O 高。
- 解決:
- 增加
spark.executor.memory
。 - 優化 Shuffle 操作(如減少
groupByKey
,改用reduceByKey
)。
- 增加
(3) Executor 失聯
- 現象:Driver 日志顯示
ExecutorLostFailure
。 - 解決:
- 檢查集群資源是否充足(如 YARN 資源隊列)。
- 增加
spark.network.timeout
(默認 120s)。
總結
Executor 是 Spark 分布式計算的執行單元,負責 Task 運行、數據緩存和 Shuffle 處理。合理配置 Executor 的數量、內存和核心數是優化 Spark 應用性能的關鍵。通過調整資源參數、優化數據本地性和處理傾斜問題,可以顯著提升任務的執行效率。