Spark Core基礎與源碼剖析全景手冊
Spark作為大數據領域的明星計算引擎,其核心原理、源碼實現與調優方法一直是面試和實戰中的高頻考點。本文將系統梳理Spark Core與Hadoop生態的關系、經典案例、聚合與分區優化、算子底層原理、集群架構和源碼剖析,結合流程圖、源碼片段和速記口訣,幫助你快速掌握Spark核心知識。
1. Spark Core與Hadoop生態復習
1.1 Spark Core概述
Spark Core作用
Spark Core是Spark的內核,負責RDD(彈性分布式數據集)管理、任務調度、內存管理和容錯機制等,是所有Spark組件的基礎。
核心特性
- RDD(彈性分布式數據集):核心數據抽象,支持分布式、不可變、容錯。
- 懶加載(Lazy Evaluation):轉換操作不會立即執行,觸發Action時才真正計算。
- 容錯機制:DAG血緣追蹤,自動重算丟失分區。
- 內存計算:極大提升大數據處理速度。
口訣:RDD彈性,懶加載,血緣容錯,快如閃電。
1.2 Hadoop生態系統梳理
- HDFS:分布式文件存儲
- MapReduce:分布式計算模型
- YARN:資源調度框架
- 生態組件:Hive(數據倉庫)、HBase(NoSQL)、Zookeeper、Sqoop、Flume、Oozie等
口訣:三駕馬車(HDFS、MR、YARN),生態百花齊放。
1.3 Spark核心術語
- RDD:不可變、分區、彈性容錯的數據集
- Partition:RDD的基本分片單位
- Stage:DAG中的階段,窄依賴劃分
- Task:作用于Partition的計算單元
- Job:用戶提交的完整計算邏輯
口訣:Job拆Stage,Stage分Task,Task跑分區,RDD串血緣。
1.4 HadoopRDD源碼剖析
Spark通過HadoopRDD與Hadoop生態(如HDFS、HBase)對接,底層讀取數據采用Hadoop InputFormat。
getPartitions源碼
override def getPartitions: Array[Partition] = {val jobContext = new JobContextImpl(conf, jobId)val inputFormat = inputFormatClass.newInstance()val rawSplits = inputFormat.getSplits(jobContext)val result = new Array[Partition](rawSplits.size)for (i <- 0 until rawSplits.size) {result(i) = new HadoopPartition(id, i, rawSplits(i))}result
}
- 流程:Hadoop切分→Spark封裝分區→數據對接
compute源碼
override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {val hadoopPartition = split.asInstanceOf[HadoopPartition]val attemptId = newTaskAttemptID()val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)val inputFormat = inputFormatClass.newInstance()val reader = inputFormat.createRecordReader(hadoopPartition.inputSplit.value, hadoopAttemptContext)reader.initialize(hadoopPartition.inputSplit.value, hadoopAttemptContext)new Iterator[(K, V)] {private var havePair = falseprivate var finished = falseprivate def getNext(): Boolean = {if (!finished && reader.nextKeyValue()) { havePair = true; true }else { finished = true; false }}override def hasNext: Boolean = if (!havePair) getNext() else trueoverride def next(): (K, V) = {if (!hasNext) throw new NoSuchElementException("End of stream")havePair = false(reader.getCurrentKey, reader.getCurrentValue)}}
}
- 流程圖:
InputFormat.getSplits → InputSplit[] → HadoopPartition[]
HadoopPartition → RecordReader → (K,V) Iterator
口訣:分片分區,讀器遍歷,KV產出,迭代輸出。
2. Spark常用案例與算子剖析
2.1 WordCount源碼分析與圖解
val lines = sc.textFile("file.txt")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val counts = pairs.reduceByKey(_ + _)
counts.collect().foreach(println)
- 流程圖:
文本 → flatMap → map → reduceByKey
行 拆詞 (詞,1) 詞頻聚合
口訣:讀拆映聚,詞頻統計。
關鍵算子源碼
- textFile(HadoopRDD + map(_._2))
- flatMap/map(MapPartitionsRDD)
- reduceByKey(combineByKey底層實現)
2.2 常用集合操作API
- map:逐元素映射
- flatMap:映射并扁平化
- filter:條件過濾
- groupByKey:按key分組
- reduceByKey:按key聚合
口訣:映射分組,過濾聚合,操作靈活。
2.3 PV/UV分析案例
val pv = logs.count()
val uv = logs.map(_.userId).distinct().count()
- PV:count計數
- UV:distinct去重后計數
口訣:PV計數,UV去重。
2.4 RDD源碼結構與血緣
- 核心屬性:partitions, dependencies, compute, iterator
- 依賴類型:NarrowDependency(窄依賴)、ShuffleDependency(寬依賴)
口訣:分區依賴,懶計算,血緣追溯,容錯重算。
3. 聚合與分區優化源碼剖析
3.1 聚合API與底層實現
reduceByKey
:底層調用combineByKey
aggregateByKey
:可設初值,分區內/間聚合combineByKey
:三步(初始、分區內、分區間)
口訣:簡聚reduce,初值aggregate,靈活combine。
combineByKey源碼
def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,numPartitions: Int
): RDD[(K, C)] = {new ShuffledRDD[K, V, C](self, part, serializer).setAggregator(new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners))
}
口訣:首遇建桶,內聚合并,跨分區合。
3.2 分區調優
- 參數:
spark.default.parallelism
,numPartitions
- 方法:
repartition
(全shuffle),coalesce
(減少分區,少shuffle)
口訣:分區合理,快慢有度,合并coalesce,重分repartition。
4. Shuffle底層實現與優化
4.1 Shuffle寫入(map端)
- 流程:分桶(partitioner)、排序(可選)、序列化與溢寫(ExternalSorter)、輸出(data file + index file)
- 文件結構:
shuffle_0_0.data
:所有桶數據合一shuffle_0_0.index
:記錄每個桶起止偏移
口訣:分桶排序,內存溢寫,索引分隔,文件合一。
4.2 Shuffle讀取(reduce端)
- 流程:獲取元數據(MapOutputTracker)、遠程拉取(ShuffleBlockFetcherIterator)、聚合排序、容錯重試
口訣:元數據定位,遠程拉取,聚合排序,自動容錯。
4.3 ShuffleManager對比
- SortShuffleManager(默認):合桶存儲,節省空間,支持排序
- HashShuffleManager:每桶一文件,文件爆炸,不支持排序
- Tungsten/UnsafeShuffleManager:堆外內存,性能優先
口訣:排序合桶,節省空間;哈希分桶,文件爆炸;Tungsten堆外,性能優先。
5. 聚合算子物理執行與優化
5.1 reduceByKey底層流程
- 分區內聚合(mergeValue)
- shuffle分桶寫文件
- reduce端拉取分桶數據并聚合(mergeCombiners)
流程簡圖:
分區1: (A,1),(A,2),(B,1)
分區2: (A,3),(B,2)
→ 分區內聚合 → (A,3),(B,1); (A,3),(B,2)
→ shuffle分桶
→ reduce端聚合 → (A,6),(B,3)
口訣:分區先合,桶間再聚。
5.2 combineByKey三步
- createCombiner:首次遇key建立桶
- mergeValue:分區內聚合
- mergeCombiners:跨分區聚合
口訣:首次建桶,分區內合,跨區再聚,減少傳輸。
6. 調度系統深度剖析
6.1 DAGScheduler與TaskScheduler
- DAGScheduler:將RDD操作DAG劃分為Stage,管理Stage依賴
- TaskScheduler:負責將Stage中的分區轉為Task,分發到Executor
源碼流程:
// DAGScheduler.submitJob
val finalStage = newStage(...)
submitStage(finalStage)def submitStage(stage: Stage): Unit = {if (stage.parents.isEmpty) {taskScheduler.submitTasks(taskSet)} else {stage.parents.foreach(submitStage)}
}
口訣:DAG劃分,Stage遞進,Task分發,本地優先。
7. 容錯、推測執行與參數調優
7.1 容錯與推測執行
- 血緣(Lineage)容錯:RDD依賴鏈可重算丟失分區
- Shuffle文件丟失:Driver可重算map task
- 推測執行:檢測慢task,允許冗余執行,避免慢節點拖后腿
口訣:血緣追溯,丟失重算,推測執行,容錯加速。
7.2 重點參數
spark.shuffle.compress
:是否壓縮spark.shuffle.file.buffer
:文件緩沖區spark.reducer.maxSizeInFlight
:reduce端拉取并發量spark.shuffle.sort.bypassMergeThreshold
:bypass優化spark.speculation
:推測執行開關
口訣:壓縮節流,緩沖調優,推測補位,參數先行。
8. 集群架構與部署運維
8.1 角色與架構
- Driver:任務提交與調度
- Executor:執行Task與緩存
- Cluster Manager:YARN/Standalone/K8s
- Worker:Standalone模式下運行Executor
口訣:Driver調度,Executor計算,Manager分配。
8.2 資源調度與高可用
- 參數:
spark.executor.memory
、spark.executor.cores
、spark.driver.memory
- HA:Standalone支持Zookeeper主備
- History Server:歷史作業追蹤與調優
口訣:內存CPU,合理分配。主備高可用,ZK做協調。歷史追蹤,日志可查。
8.3 YARN集群搭建與JAR包管理
- YARN模式:client/cluster
- 調優參數:
yarn.nodemanager.resource.memory-mb
- JAR包管理:
--jars
、--packages
、推薦HDFS分發
口訣:YARN調度,參數適配。依賴合規,HDFS分發。
9. 經典面試與實戰答題模板
-
reduceByKey底層流程?
分區內本地聚合,shuffle分桶寫文件,reduce端拉取分桶數據再聚合,采用索引+數據文件結構,丟失可血緣重算,慢任務可推測執行。 -
SortShuffle與HashShuffle區別?
SortShuffle合桶存儲、索引分隔、文件少、支持排序;HashShuffle每桶一文件,文件數多,不支持排序。 -
Shuffle讀寫的網絡協議和容錯?
基于Netty RPC,reduce端并發拉取數據,失敗自動重試或重算map task。
10. 速記口訣大合集
口訣 | 適用場景 | 詳細解釋 |
---|---|---|
RDD彈性,懶加載,血緣容錯,快如閃電 | Spark Core本質 | RDD靈活、延遲、容錯、快 |
分片分區,讀器遍歷,KV產出,迭代輸出 | HadoopRDD源碼 | 分片分區、RecordReader迭代KV |
讀拆映聚,詞頻統計 | WordCount | 讀文件、拆詞、映射、聚合 |
分區依賴,懶計算,血緣追溯,容錯重算 | RDD血緣與依賴 | 窄依賴、懶執行、血緣追溯、自動容錯 |
分區先聚,跨區再合 | reduceByKey等聚合算子 | 先本地聚合,后跨節點聚合 |
三步合并,聚合核心,shuffle分發 | combineByKey底層 | 初始、分區內、分區間三步合并 |
分桶排序,內存溢寫,索引分隔,文件合一 | shuffle寫端 | 分桶排序、溢寫磁盤、索引分隔、合一文件 |
元數據定位,遠程拉取,聚合排序,自動容錯 | shuffle讀端 | 查元數據、拉取聚合、自動重試 |
路徑規范,偏移定位,RPC拉取,容錯重算 | shuffle文件與拉取 | 路徑規范、index偏移、RPC拉取、重算 |
首次建桶,分區內合,跨區再聚,減少傳輸 | combineByKey聚合流程 | 本地聚合減少數據量,跨區再聚合 |
血緣追溯,丟失重算,推測執行,容錯加速 | 容錯與推測執行 | 血緣可重算,慢任務推測執行 |
DAGScheduler劃分,Task分發,本地優先 | 調度原理 | DAG分Stage,Task分發本地優先 |
結語
本手冊結合源碼、流程、架構、調度、底層實現與調優要點,輔以口訣助記,既適合Spark初學者體系化學習,也為有經驗者面試、查漏補缺與實戰調優提供一站式參考。
如需**某一環節(如DAGScheduler狀態流轉、推測執行源碼、具體shuffle二進制結構、Executor資源分配源碼等)**進一步源碼剖析,歡迎留言或私信交流!
關注我,獲取更多大數據實戰與源碼剖析干貨!