引入
在深入MapReduce中有提到,MapReduce雖然通過“分而治之”的思想,解決了海量數據的計算處理問題,但性能還是不太理想,這體現在兩個方面:
- 每個任務都有比較大的overhead,都需要預先把程序復制到各個 worker 節點,然后啟動進程;
- 所有的中間數據都要讀寫多次硬盤。map 的輸出結果要寫到硬盤上,reduce抓取數據排序合并之后,也要先寫到本地硬盤上再進行讀取,所以快不起來。
除此之外,MapReduce還有以下的問題:
- 算子不夠豐富,僅有Map和Reduce,復雜算子的實現極為煩瑣;
- MapReduce在處理迭代計算、實時查詢和交互式數據挖掘任務時效率較低,因為每次迭代都需要將數據寫入磁盤,導致大量的I/O開銷;
- 無法支持血緣或上下游依賴的概念,失敗重試只能從頭開始,變相地無法實現迭代計算。
針對以上的缺陷,不同的計算引擎采取了不同的優化策略。例如Tez簡化了MapReduce過程,支持DAG(Directed Acyclic Graph,有向無環圖)?,細化MapReduce環節并靈活組合。Impala則專注于單節點純內存計算。而Spark依托DAG Lineage、純內存計算、RDD(分布式彈性數據集)等特性,以及與Hadoop生態極佳的兼容性,支持例如圖計算、機器學習、流(Micro-Batch)計算等多樣化的功能或場景,在一系列大數據引擎中脫穎而出,成為當今最主流的計算引擎之一。
Spark最初由加州大學伯克利分校的AMPLab開發,后來成為Apache軟件基金會的頂級項目。
Spark的核心概念
下面通過Spark的一些核心概念,去進一步了解它。
RDD(彈性分布式數據集)
定義
RDD(Resilient Distributed Dataset)是Spark的核心抽象,是一個不可變的、分布式的數據集合,可以并行操作。RDD可以通過在存儲系統中的文件或已有的Scala、Java、Python集合以及通過其他RDD的轉換操作來創建。
特性
-
不可變性:RDD一旦創建,其數據就不能被修改。
-
分區:RDD的數據被劃分為多個分區,每個分區可以獨立計算。
-
依賴關系:RDD之間通過轉換操作形成依賴關系,這些依賴關系構成了DAG(有向無環圖)。
DAG(有向無環圖)
定義
DAG是Spark中用于表示RDD之間依賴關系的有向無環圖。DAG調度器負責將DAG分解成多個階段(stages),每個階段是一系列并行的任務。
工作原理
-
操作解析:當代碼被提交到Spark的解釋器時,系統會生成一個操作圖,稱為Operator Graph,記錄了所有的Transformation操作及其依賴關系。
-
DAG調度器:當一個Action操作(如collect或save)被觸發時,DAG調度器會將Operator Graph分解為多個Stage(階段)。窄依賴(Narrow Dependency)的操作如map和filter不需要數據重新分區,屬于同一階段;寬依賴(Wide Dependency)的操作如reduceByKey需要數據Shuffle,不同階段之間以寬依賴為界。
-
任務調度器:每個Stage會被拆分為多個基于數據分區的Task。Task調度器將這些Task分發到集群的Worker節點上執行。
-
執行與結果:每個Worker節點執行分配的Task,并將結果返回給Driver程序。DAG確保各個階段按依賴順序執行,并通過內存優化中間結果存儲,最大限度減少I/O和通信開銷。
作用
-
任務依賴分析:DAG調度器通過分析RDD之間的依賴關系,決定任務的執行順序。
-
內存計算優化:通過減少Shuffle和磁盤讀寫,DAG提高了計算效率。
-
全局優化:DAG確保每個Stage都包含最少的任務,避免重復計算。
Shuffle機制
定義
Shuffle是Spark中的一種數據重新分區操作,通常在寬依賴(Wide Dependency)的操作中發生,如reduceByKey、groupByKey等。
工作原理
-
分區:Shuffle操作會將數據重新分區,通常會根據鍵(key)進行分區。
-
數據傳輸:數據從一個節點傳輸到另一個節點,以確保相同鍵的數據位于同一個節點上。
-
排序和分組:在目標節點上,數據會被排序和分組,以便進行后續的聚合操作。
優化策略
-
減少數據傳輸:通過數據本地性優化,盡量減少數據在節點之間的傳輸。
-
壓縮:在Shuffle過程中,可以啟用數據壓縮,減少網絡傳輸的開銷。
-
緩存:在Shuffle之前,可以將數據緩存到內存中,減少重復計算。
數據緩存機制
定義
數據緩存機制是Spark中用于提高數據處理效率的一種機制,通過將數據緩存到內存中,減少重復計算的開銷。
實現方式
-
cache():
cache()
方法是persist()
的簡化版,其底層實現直接調用persist(StorageLevel.MEMORY_AND_DISK)
,默認將數據存儲在內存中,如果內存不足,則溢寫到磁盤。 -
persist():
persist()
方法允許用戶選擇存儲級別(StorageLevel
),如MEMORY_ONLY
、MEMORY_AND_DISK
、DISK_ONLY
等。
作用
-
加速重復計算:通過緩存數據,避免重復計算DAG中的父節點。
-
靈活的存儲策略:
persist()
方法提供了更靈活的存儲策略,適應內存、磁盤等不同環境。
適用場景
-
數據需要被多次使用:適用于數據需要被多次使用,但不需要跨作業的容錯能力的場景。
-
計算代價大:適用于計算代價大,但內存能夠容納數據的場景。
錯誤容忍機制
RDD的DAG Lineage(血緣)
指創建RDD所依賴的轉換操作序列。當某個RDD的分區數據丟失時,Spark可以通過Lineage信息重新計算該分區的數據。
RDD的 DAG Lineage?主要用于描述數據從源到目標的轉換過程,包括數據的流動、處理、轉換等各個步驟。DAG Lineage能夠清晰地展示數據的來源、去向以及數據在不同階段的變化,幫助用戶了解數據的全生命周期。
Checkpoint(檢查點)
通過將RDD的狀態保存到可靠的存儲系統(如HDFS、S3等),以支持容錯和優化長計算鏈。當Spark應用程序出現故障時,可以從檢查點恢復狀態。
故障恢復
- 節點故障:當Worker節點故障時,Spark會利用RDD的血統信息重新計算丟失的數據分區。如果設置了檢查點,Spark會從檢查點位置開始重新執行,減少計算開銷。
- 驅動節點故障:如果驅動節點故障,Spark會通過Apache Mesos等集群管理器重新啟動驅動節點,并恢復執行狀態。
內存管理機制
內存模型
執行內存(Execution Memory):主要用于存儲任務執行過程中的臨時數據,如Shuffle的中間結果等。這部分內存主要用于任務的執行期間,任務完成后會被釋放。
存儲內存(Storage Memory):用于緩存中間結果(RDD)和DataFrame/DataSet的持久化數據。這部分內存是為了加速重復計算而存在的,數據可以被多次復用。
內存分配
內存分配比例:內存分配比例可以通過配置項spark.executor.memory來設置總的內存大小,并通過spark.storage.memoryFraction來指定存儲內存所占的比例,默認為0.6。這意味著默認情況下,Executor的60%的內存用于存儲,剩余的40%用于執行。
內存回收
LRU緩存淘汰策略:Spark采用LRU(Least Recently Used)緩存淘汰策略來管理存儲內存中的數據。當存儲內存不足時,Spark會根據LRU算法淘汰最近最少使用的數據。
Spill to Disk:當執行內存不足時,Spark會將一部分數據溢寫到磁盤,以釋放內存空間。例如,在Shuffle操作期間,如果內存不足以存放所有中間結果,Spark會將部分數據寫入磁盤。
動態內存管理
動態調整內存分配:在Spark 2.x版本之后,引入了更先進的內存管理機制,支持動態調整執行內存和存儲內存之間的比例。這意味著在運行時,Spark可以根據實際內存使用情況動態調整內存分配,從而更好地利用資源。
內存配置
- spark.executor.memory:設置Executor的總內存大小。
- spark.storage.memoryFraction:設置存儲內存所占的比例。
- spark.shuffle.spill.compress:是否啟用Shuffle數據的壓縮。
- spark.serializer:設置序列化庫,默認為org.apache.spark.serializer.KryoSerializer。
- spark.kryoserializer.buffer.max:設置Kryo序列化器的最大緩沖區大小。
Spark的執行原理
由于本專欄的重點是SQL,所以我們主要看Spark SQL的執行過程。相比于Hive的源碼,Spark就貼心很多了,提供了org.apache.spark.sql.execution.QueryExecution類,這個類是Spark SQL查詢執行的核心,它封裝了從SQL解析到最終執行的整個過程,為開發者提供了豐富的接口來理解和調試查詢執行的整個過程。
QueryExecution源碼注釋如下:
The primary workflow for executing relational queries using Spark. ?Designed to allow easy?access to the intermediate phases of query execution for developers.
While this is not a public class, we should avoid changing the function names for the sake of changing them, because a lot of developers use the feature for debugging.翻譯:
使用 Spark 執行關系查詢的主要工作流程。設計目的是讓開發者能夠輕松訪問查詢執行的中間階段。
雖然這不是一個公共類,但我們應盡可能避免改變函數名稱,因為許多開發者使用此功能進行調試。
QueryExecution類的2.x源碼如下:
/*** QueryExecution類負責執行關系查詢的主要工作流程。* 它設計用于讓開發人員可以輕松訪問查詢執行的中間階段。** @param sparkSession 用于執行查詢的SparkSession* @param logical 要執行的邏輯計劃*/
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {// TODO: 將planner和optimizer從SessionState移動到這里/** 獲取查詢計劃器 */protected def planner: SparkPlanner = sparkSession.sessionState.plannerdef assertAnalyzed(): Unit = analyzeddef assertSupported(): Unit = {if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {UnsupportedOperationChecker.checkForBatch(analyzed)}}/** 懶加載已分析的邏輯計劃 */// SQL解析lazy val analyzed: LogicalPlan = {SparkSession.setActiveSession(sparkSession)sparkSession.sessionState.analyzer.executeAndCheck(logical)}/** 懶加載帶緩存數據的邏輯計劃 */lazy val withCachedData: LogicalPlan = {assertAnalyzed()assertSupported()sparkSession.sharedState.cacheManager.useCachedData(analyzed)}/** 懶加載優化后的邏輯計劃 */// 邏輯優化處理lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)/** 懶加載Spark物理計劃 */// 將邏輯計劃轉換成物理計劃// 邏輯計劃是不區分引擎的,而這里的物理計劃(SparkPlan)是面向Spark執行的lazy val sparkPlan: SparkPlan = {SparkSession.setActiveSession(sparkSession)// TODO: 目前我們使用next(),即取planner返回的第一個計劃,// 但我們將來會實現選擇最佳計劃的功能。planner.plan(ReturnAnswer(optimizedPlan)).next()}/** * executedPlan 不應用于初始化任何 SparkPlan。* 它僅應用于執行。*/// 預提交階段lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)/** RDD的內部版本。避免復制且沒有schema */// 最終提交階段lazy val toRdd: RDD[InternalRow] = {if (sparkSession.sessionState.conf.getConf(SQLConf.USE_CONF_ON_RDD_OPERATION)) {new SQLExecutionRDD(executedPlan.execute(), sparkSession.sessionState.conf)} else {executedPlan.execute()}}/*** 為執行準備計劃好的SparkPlan。* 根據需要插入shuffle操作和內部行格式轉換。** @param plan 要準備執行的SparkPlan* @return 準備好執行的SparkPlan*/protected def prepareForExecution(plan: SparkPlan): SparkPlan = {preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }}/** * 在執行前應用于物理計劃的規則序列。* 這些規則將按順序應用。** @return 要應用的規則序列*/protected def preparations: Seq[Rule[SparkPlan]] = Seq(PlanSubqueries(sparkSession),EnsureRequirements(sparkSession.sessionState.conf),CollapseCodegenStages(sparkSession.sessionState.conf),ReuseExchange(sparkSession.sessionState.conf),ReuseSubquery(sparkSession.sessionState.conf))/*** 嘗試執行給定的操作并返回結果字符串。* 如果發生AnalysisException,則返回異常的字符串表示。** @param f 要執行的操作* @return 操作結果的字符串表示或異常信息*/protected def stringOrError[A](f: => A): String =try f.toString catch { case e: AnalysisException => e.toString }/*** 以Hive兼容的字符串序列形式返回結果。* 這在測試和CLI應用程序的SparkSQLDriver中使用。** @return Hive兼容的結果字符串序列*/def hiveResultString(): Seq[String] = executedPlan match {case ExecutedCommandExec(desc: DescribeTableCommand) =>// 如果是Hive表的describe命令,我們希望輸出格式與Hive相似desc.run(sparkSession).map {case Row(name: String, dataType: String, comment) =>Seq(name, dataType,Option(comment.asInstanceOf[String]).getOrElse("")).map(s => String.format("%-20s", s)).mkString("\t")}// Hive中的SHOW TABLES只輸出表名,而我們的輸出包括數據庫、表名和是否為臨時表case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>command.executeCollect().map(_.getString(1))case other =>val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq// 我們需要類型信息以輸出結構字段名val types = analyzed.output.map(_.dataType)// 重新格式化以匹配Hive的制表符分隔輸出result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t"))}/*** 根據給定的數據類型格式化數據,并返回其字符串表示。** @param a 包含數據和其對應DataType的元組* @return 格式化后的字符串表示*/private def toHiveString(a: (Any, DataType)): String = {val primitiveTypes = Seq(StringType, IntegerType, LongType, DoubleType, FloatType,BooleanType, ByteType, ShortType, DateType, TimestampType, BinaryType)/*** 格式化BigDecimal,去除尾隨的零。** @param d 要格式化的BigDecimal* @return 格式化后的字符串*/def formatDecimal(d: java.math.BigDecimal): String = {if (d.compareTo(java.math.BigDecimal.ZERO) == 0) {java.math.BigDecimal.ZERO.toPlainString} else {d.stripTrailingZeros().toPlainString}}/*** Hive輸出struct字段的方式與最外層屬性略有不同。* 這個方法處理struct、數組和map類型的特殊格式化。** @param a 包含數據和其對應DataType的元組* @return 格式化后的字符串*/def toHiveStructString(a: (Any, DataType)): String = a match {case (struct: Row, StructType(fields)) =>struct.toSeq.zip(fields).map {case (v, t) => s""""${t.name}":${toHiveStructString((v, t.dataType))}"""}.mkString("{", ",", "}")case (seq: Seq[_], ArrayType(typ, _)) =>seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")case (map: Map[_, _], MapType(kType, vType, _)) =>map.map {case (key, value) =>toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))}.toSeq.sorted.mkString("{", ",", "}")case (null, _) => "null"case (s: String, StringType) => "\"" + s + "\""case (decimal, DecimalType()) => decimal.toStringcase (interval, CalendarIntervalType) => interval.toStringcase (other, tpe) if primitiveTypes contains tpe => other.toString}// 主要的格式化邏輯a match {case (struct: Row, StructType(fields)) =>struct.toSeq.zip(fields).map {case (v, t) => s""""${t.name}":${toHiveStructString((v, t.dataType))}"""}.mkString("{", ",", "}")case (seq: Seq[_], ArrayType(typ, _)) =>seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")case (map: Map[_, _], MapType(kType, vType, _)) =>map.map {case (key, value) =>toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))}.toSeq.sorted.mkString("{", ",", "}")case (null, _) => "NULL"case (d: Date, DateType) =>DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))case (t: Timestamp, TimestampType) =>DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t),DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)case (interval, CalendarIntervalType) => interval.toStringcase (other, tpe) if primitiveTypes.contains(tpe) => other.toString}}/*** 返回執行計劃的簡單字符串表示。** @return 包含物理計劃樹的字符串*/def simpleString: String = withRedaction {s"""== Physical Plan ==|${stringOrError(executedPlan.treeString(verbose = false))}""".stripMargin.trim}/*** 返回查詢執行的詳細字符串表示。** @return 包含解析的邏輯計劃、分析的邏輯計劃、優化的邏輯計劃和物理計劃的字符串*/override def toString: String = withRedaction {def output = Utils.truncatedString(analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")val analyzedPlan = Seq(stringOrError(output),stringOrError(analyzed.treeString(verbose = true))).filter(_.nonEmpty).mkString("\n")s"""== Parsed Logical Plan ==|${stringOrError(logical.treeString(verbose = true))}|== Analyzed Logical Plan ==|$analyzedPlan|== Optimized Logical Plan ==|${stringOrError(optimizedPlan.treeString(verbose = true))}|== Physical Plan ==|${stringOrError(executedPlan.treeString(verbose = true))}""".stripMargin.trim}/*** 返回帶有統計信息的查詢執行字符串表示。** @return 包含優化的邏輯計劃和物理計劃(帶統計信息)的字符串*/def stringWithStats: String = withRedaction {// 觸發計算邏輯計劃的統計信息optimizedPlan.stats// 只顯示優化的邏輯計劃和物理計劃s"""== Optimized Logical Plan ==|${stringOrError(optimizedPlan.treeString(verbose = true, addSuffix = true))}|== Physical Plan ==|${stringOrError(executedPlan.treeString(verbose = true))}""".stripMargin.trim}/*** 對給定的字符串中的敏感信息進行編輯。** @param message 要編輯的原始消息* @return 編輯后的消息*/private def withRedaction(message: String): String = {Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)}/** * 一個特殊的命名空間,包含可用于調試查詢執行的命令。*/// scalastyle:offobject debug {// scalastyle:on/*** 將此計劃中找到的所有生成的代碼打印到標準輸出。* 即打印每個WholeStageCodegen子樹的輸出。*/def codegen(): Unit = {// scalastyle:off printlnprintln(org.apache.spark.sql.execution.debug.codegenString(executedPlan))// scalastyle:on println}/*** 獲取查詢計劃中的WholeStageCodegenExec子樹及其生成的代碼。** @return WholeStageCodegen子樹及其對應生成代碼的序列*/def codegenToSeq(): Seq[(String, String)] = {org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)}}
}
可以看到這個類確實是封裝了SparkSQL查詢執行的整個過程,從邏輯計劃到物理計劃,再到最終的執行。
我們簡單梳理一下其主要的執行階段如下:
- analyzed: 對邏輯計劃進行分析
- withCachedData: 使用緩存數據
- optimizedPlan: 優化邏輯計劃
- sparkPlan: 生成物理計劃
- executedPlan: 準備提交執行物理計劃
- toRdd: 最終執行,將物理計劃轉換為RDD
而QueryExecution類的3.x源碼如下:
/*** QueryExecution 類代表了使用 Spark 執行關系查詢的主要工作流程。* * 該類設計用于讓開發者能夠輕松訪問查詢執行的中間階段,便于調試。** @param sparkSession 當前的 SparkSession* @param logical 要執行的邏輯計劃* @param tracker 用于跟蹤查詢計劃的 QueryPlanningTracker* @param mode 命令執行模式*/
class QueryExecution(val sparkSession: SparkSession, // 當前的 Spark 會話,包含 Spark 的運行時環境val logical: LogicalPlan, // 查詢的邏輯計劃,表示對數據的高級操作val tracker: QueryPlanningTracker = new QueryPlanningTracker, // 跟蹤查詢計劃的執行階段和時間val mode: CommandExecutionMode.Value = CommandExecutionMode.ALL // 指定命令執行的模式
) extends Logging {val id: Long = QueryExecution.nextExecutionId // 為每個查詢執行分配唯一 ID// 暫未使用,計劃將 planner 和 optimizer 移至此處protected def planner = sparkSession.sessionState.planner// 確保邏輯計劃已被分析def assertAnalyzed() = analyzed// 確保查詢操作受支持def assertSupported() = {if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {UnsupportedOperationChecker.checkForBatch(analyzed)}}// 延遲計算,確保邏輯計劃已被分析lazy val analyzed: LogicalPlan = {// 執行分析階段val plan = executePhase(QueryPlanningTracker.ANALYSIS) {sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)}tracker.setAnalyzed(plan)plan}// 根據執行模式決定是否立即執行命令lazy val commandExecuted: LogicalPlan = mode match {case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)case CommandExecutionMode.SKIP => analyzed}// 定義命令執行名稱private def commandExecutionName(command: Command): String = command match {case _: CreateTableAsSelect => "create"case _: ReplaceTableAsSelect => "replace"case _: AppendData => "append"case _: OverwriteByExpression => "overwrite"case _: OverwritePartitionsDynamic => "overwritePartitions"case _ => "command"}// 立即執行命令private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {case c: Command =>// 設置查詢計劃追蹤器,標記查詢已準備好執行tracker.setReadyForExecution()// 執行命令計劃val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)// 執行命令計劃,獲取結果val result = SQLExecution.withNewExecutionId(qe, Some(commandExecutionName(c))) {qe.executedPlan.executeCollect()}// 創建命令結果CommandResult(qe.analyzed.output,qe.commandExecuted,qe.executedPlan,result)case other => other // 其他情況保持不變}// 對查詢計劃進行歸一化lazy val normalized: LogicalPlan = {val normalizationRules = sparkSession.sessionState.planNormalizationRulesif (normalizationRules.isEmpty) {commandExecuted} else {val planChangeLogger = new PlanChangeLogger[LogicalPlan]()// 應用歸一化規則val normalized = normalizationRules.foldLeft(commandExecuted) { (p, rule) =>val result = rule.apply(p)planChangeLogger.logRule(rule.ruleName, p, result)result}// 記錄歸一化過程planChangeLogger.logBatch("Plan Normalization", commandExecuted, normalized)normalized}}lazy val withCachedData: LogicalPlan = sparkSession.withActive {assertAnalyzed()assertSupported()// 克隆計劃,避免狀態共享sparkSession.sharedState.cacheManager.useCachedData(normalized.clone())}// 確保命令已執行def assertCommandExecuted(): Unit = commandExecuted// 獲得優化后的邏輯計劃lazy val optimizedPlan: LogicalPlan = {assertCommandExecuted()executePhase(QueryPlanningTracker.OPTIMIZATION) {// 克隆計劃,避免狀態共享val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)// 標記計劃為已分析plan.setAnalyzed()plan}}// 確保計劃已優化def assertOptimized(): Unit = optimizedPlan// 獲得物理執行計劃lazy val sparkPlan: SparkPlan = {assertOptimized()executePhase(QueryPlanningTracker.PLANNING) {// 創建 Spark 計劃QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())}}// 確保物理計劃已準備好執行def assertSparkPlanPrepared(): Unit = sparkPlan// 獲得實際執行計劃lazy val executedPlan: SparkPlan = {assertOptimized()val plan = executePhase(QueryPlanningTracker.PLANNING) {// 準備執行計劃QueryExecution.prepareForExecution(preparations, sparkPlan.clone())}// 標記查詢已準備好執行tracker.setReadyForExecution()plan}// 確保實際執行計劃已準備好執行def assertExecutedPlanPrepared(): Unit = executedPlan// 獲取 RDD 內部版本lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(executedPlan.execute(), sparkSession.sessionState.conf)// 獲得查詢計劃的指標def observedMetrics: Map[String, Row] = CollectMetricsExec.collect(executedPlan)protected def preparations: Seq[Rule[SparkPlan]] = {QueryExecution.preparations(sparkSession,Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))), false)}// 執行查詢階段protected def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive {QueryExecution.withInternalError(s"The Spark SQL phase $phase failed with an internal error.") {tracker.measurePhase(phase)(block)}}// 獲取查詢計劃的簡單字符串表示def simpleString: String = {val concat = new PlanStringConcat()simpleString(false, SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 生成查詢計劃的簡單字符串表示private def simpleString(formatted: Boolean,maxFields: Int,append: String => Unit): Unit = {append("== Physical Plan ==\n")if (formatted) {try {ExplainUtils.processPlan(executedPlan, append)} catch {case e: AnalysisException => append(e.toString)case e: IllegalArgumentException => append(e.toString)}} else {QueryPlan.append(executedPlan,append, verbose = false, addSuffix = false, maxFields = maxFields)}append("\n")}// 獲取查詢計劃的解釋字符串def explainString(mode: ExplainMode): String = {val concat = new PlanStringConcat()explainString(mode, SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 解釋查詢計劃,生成字符串表示private def explainString(mode: ExplainMode, maxFields: Int, append: String => Unit): Unit = {val queryExecution = if (logical.isStreaming) {new IncrementalExecution(sparkSession, logical, OutputMode.Append(), "<unknown>",UUID.randomUUID, UUID.randomUUID, 0, None, OffsetSeqMetadata(0, 0),WatermarkPropagator.noop())} else {this}mode match {case SimpleMode =>queryExecution.simpleString(false, maxFields, append)case ExtendedMode =>queryExecution.toString(maxFields, append)case CodegenMode =>try {org.apache.spark.sql.execution.debug.writeCodegen(append, queryExecution.executedPlan)} catch {case e: AnalysisException => append(e.toString)}case CostMode =>queryExecution.stringWithStats(maxFields, append)case FormattedMode =>queryExecution.simpleString(formatted = true, maxFields = maxFields, append)}}// 寫入各個階段的計劃private def writePlans(append: String => Unit, maxFields: Int): Unit = {val (verbose, addSuffix) = (true, false)append("== Parsed Logical Plan ==\n")QueryPlan.append(logical, append, verbose, addSuffix, maxFields)append("\n== Analyzed Logical Plan ==\n")try {if (analyzed.output.nonEmpty) {append(truncatedString(analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields))append("\n")}QueryPlan.append(analyzed, append, verbose, addSuffix, maxFields)append("\n== Optimized Logical Plan ==\n")QueryPlan.append(optimizedPlan, append, verbose, addSuffix, maxFields)append("\n== Physical Plan ==\n")QueryPlan.append(executedPlan, append, verbose, addSuffix, maxFields)} catch {case e: AnalysisException => append(e.toString)}}// 重寫 toString 方法override def toString: String = withRedaction {val concat = new PlanStringConcat()toString(SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 生成查詢計劃的字符串表示private def toString(maxFields: Int, append: String => Unit): Unit = {writePlans(append, maxFields)}// 獲取包含統計信息的計劃字符串def stringWithStats: String = {val concat = new PlanStringConcat()stringWithStats(SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 生成包含統計信息的計劃字符串private def stringWithStats(maxFields: Int, append: String => Unit): Unit = {try {// 觸發邏輯計劃的統計計算optimizedPlan.collectWithSubqueries {case plan => plan.stats}} catch {case e: AnalysisException => append(e.toString + "\n")}// 只顯示優化后的邏輯計劃和物理計劃append("== Optimized Logical Plan ==\n")QueryPlan.append(optimizedPlan, append, verbose = true, addSuffix = true, maxFields)append("\n== Physical Plan ==\n")QueryPlan.append(executedPlan, append, verbose = true, addSuffix = false, maxFields)append("\n")}// 內部敏感信息的脫敏處理private def withRedaction(message: String): String = {Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)}// 用于調試查詢執行的特殊命名空間object debug {// 打印生成的代碼def codegen(): Unit = {println(org.apache.spark.sql.execution.debug.codegenString(executedPlan))}// 獲取生成的代碼和統計信息def codegenToSeq(): Seq[(String, String, ByteCodeStats)] = {org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)}// 將調試信息寫入文件def toFile(path: String,maxFields: Int = Int.MaxValue,explainMode: Option[String] = None): Unit = {val filePath = new Path(path)val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))try {val mode = explainMode.map(ExplainMode.fromString(_)).getOrElse(ExtendedMode)explainString(mode, maxFields, writer.write)if (mode != CodegenMode) {writer.write("\n== Whole Stage Codegen ==\n")org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)}log.info(s"Debug information was written at: $filePath")} finally {writer.close()}}}
}
/*** SPARK-35378: 命令應立即執行,以便像 `sql("INSERT ...")` 這樣的查詢可以立即觸發表插入,無需調用 `.collect()`。* 為了避免無窮遞歸,我們應在遞歸執行命令時使用 `NON_ROOT`。此外,我們不能執行帶有命令葉節點的查詢計劃,* 因為許多命令返回 `GenericInternalRow` 并不能直接放入查詢計劃中,否則查詢引擎可能會嘗試將 `GenericInternalRow`* 轉換為 `UnsafeRow` 并失敗。當運行 EXPLAIN 或命令包含在其他命令中時,應使用 `SKIP` 來避免立即觸發命令執行。*/
object CommandExecutionMode extends Enumeration {val SKIP, NON_ROOT, ALL = Value // 定義不同模式的值
}
object QueryExecution {private val _nextExecutionId = new AtomicLong(0) // 原子操作,確保線程安全private def nextExecutionId: Long = _nextExecutionId.getAndIncrement // 獲取下一個查詢執行的 ID// 構建用于準備執行的規則序列private[execution] def preparations(sparkSession: SparkSession,adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,subquery: Boolean): Seq[Rule[SparkPlan]] = {adaptiveExecutionRule.toSeq ++Seq(CoalesceBucketsInJoin,PlanDynamicPruningFilters(sparkSession),PlanSubqueries(sparkSession),RemoveRedundantProjects,EnsureRequirements(),ReplaceHashWithSortAgg,RemoveRedundantSorts,RemoveRedundantWindowGroupLimits,DisableUnnecessaryBucketedScan,ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules, outputsColumnar = false),CollapseCodegenStages()) ++(if (subquery) Nil else Seq(ReuseExchangeAndSubquery))}// 準備 SparkPlan 以供執行private[execution] def prepareForExecution(preparations: Seq[Rule[SparkPlan]],plan: SparkPlan): SparkPlan = {val planChangeLogger = new PlanChangeLogger[SparkPlan]()val preparedPlan = preparations.foldLeft(plan) { case (sp, rule) =>val result = rule.apply(sp)planChangeLogger.logRule(rule.ruleName, sp, result)result}planChangeLogger.logBatch("Preparations", plan, preparedPlan)preparedPlan}// 將邏輯計劃轉換為 Spark 計劃def createSparkPlan(sparkSession: SparkSession,planner: SparkPlanner,plan: LogicalPlan): SparkPlan = {planner.plan(ReturnAnswer(plan)).next()}// 準備 SparkPlan 以供執行def prepareExecutedPlan(spark: SparkSession, plan: SparkPlan): SparkPlan = {prepareForExecution(preparations(spark, subquery = true), plan)}// 準備子查詢的執行計劃def prepareExecutedPlan(spark: SparkSession, plan: LogicalPlan): SparkPlan = {val sparkPlan = createSparkPlan(spark, spark.sessionState.planner, plan.clone())prepareExecutedPlan(spark, sparkPlan)}// 使用自適應執行上下文準備執行計劃def prepareExecutedPlan(session: SparkSession,plan: LogicalPlan,context: AdaptiveExecutionContext): SparkPlan = {val sparkPlan = createSparkPlan(session, session.sessionState.planner, plan.clone())val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true)prepareForExecution(preparationRules, sparkPlan.clone())}// 將斷言和空指針異常轉換為內部錯誤private[sql] def toInternalError(msg: String, e: Throwable): Throwable = e match {case e @ (_: java.lang.NullPointerException | _: java.lang.AssertionError) =>SparkException.internalError(msg + " You hit a bug in Spark or the Spark plugins you use. Please, report this bug " +"to the corresponding communities or vendors, and provide the full stack trace.",e)case e: Throwable =>e}// 捕獲斷言和空指針異常,并將其轉換為內部錯誤private[sql] def withInternalError[T](msg: String)(block: => T): T = {try {block} catch {case e: Throwable => throw toInternalError(msg, e)}}
}
可以看到Spark 3.x實現的整個執行流程是和2.x區別不大的,主要是引入了兩個新參數如下:
- tracker: 用于跟蹤查詢計劃過程的工具
- mode: 命令執行模式
引入 tracker
和 mode
參數的主要目的是為了增強查詢執行的靈活性和可追蹤性。tracker
參數使得對查詢計劃的執行過程可以進行更細粒度的監控,而 mode
參數則提供了更靈活的查詢執行模式,使得 QueryExecution
類能夠根據不同的需求進行不同的操作。同時通過CommandExecutionMode 枚舉定義了命令執行的不同模式,來夠靈活地控制命令的執行方式。這些改進的目的都是為了提高 Spark SQL 查詢的性能和可調試性。
總結
本文介紹了Spark,并通過源碼梳理了Spark SQL的執行原理,其核心思路也是和我們在引入篇以及Hive執行原理,提到的解析(Parsing)、校驗(Validation)、優化(Optimization)和執行(Execution)是一致的。
Spark SQL的具體執行過程主要可以分為以下幾個步驟:
- 輸入SQL語句經過Antlr4解析,生成未解決的邏輯計劃;
- 綁定分析器,例如函數適配、通過Catalog獲取字段等,生成已解決的邏輯計劃;
- 優化器對已解決的邏輯計劃進行優化,基于CBO和RBO轉換,生成優化后的邏輯計劃;
- 將優化后的邏輯計劃轉換為多個可被識別或執行的物理計劃;
- 基于CBO在多個物理計劃中,選擇執行開銷最小的物理計劃;
- 轉為具體的RDDs執行。
感興趣的小伙伴可以深入源碼去探索一下具體的解析和優化實現。