大數據SQL調優專題——Spark執行原理

引入

在深入MapReduce中有提到,MapReduce雖然通過“分而治之”的思想,解決了海量數據的計算處理問題,但性能還是不太理想,這體現在兩個方面:

  1. 每個任務都有比較大的overhead,都需要預先把程序復制到各個 worker 節點,然后啟動進程;
  2. 所有的中間數據都要讀寫多次硬盤。map 的輸出結果要寫到硬盤上,reduce抓取數據排序合并之后,也要先寫到本地硬盤上再進行讀取,所以快不起來。

除此之外,MapReduce還有以下的問題:

  1. 算子不夠豐富,僅有Map和Reduce,復雜算子的實現極為煩瑣;
  2. MapReduce在處理迭代計算、實時查詢和交互式數據挖掘任務時效率較低,因為每次迭代都需要將數據寫入磁盤,導致大量的I/O開銷;
  3. 無法支持血緣或上下游依賴的概念,失敗重試只能從頭開始,變相地無法實現迭代計算。

針對以上的缺陷,不同的計算引擎采取了不同的優化策略。例如Tez簡化了MapReduce過程,支持DAG(Directed Acyclic Graph,有向無環圖)?,細化MapReduce環節并靈活組合。Impala則專注于單節點純內存計算。而Spark依托DAG Lineage、純內存計算、RDD(分布式彈性數據集)等特性,以及與Hadoop生態極佳的兼容性,支持例如圖計算、機器學習、流(Micro-Batch)計算等多樣化的功能或場景,在一系列大數據引擎中脫穎而出,成為當今最主流的計算引擎之一。

Spark最初由加州大學伯克利分校的AMPLab開發,后來成為Apache軟件基金會的頂級項目。

Spark的核心概念

下面通過Spark的一些核心概念,去進一步了解它。

RDD(彈性分布式數據集)

定義

RDD(Resilient Distributed Dataset)是Spark的核心抽象,是一個不可變的、分布式的數據集合,可以并行操作。RDD可以通過在存儲系統中的文件或已有的Scala、Java、Python集合以及通過其他RDD的轉換操作來創建。

特性

  • 不可變性:RDD一旦創建,其數據就不能被修改。

  • 分區:RDD的數據被劃分為多個分區,每個分區可以獨立計算。

  • 依賴關系:RDD之間通過轉換操作形成依賴關系,這些依賴關系構成了DAG(有向無環圖)

DAG(有向無環圖)

定義

DAG是Spark中用于表示RDD之間依賴關系的有向無環圖。DAG調度器負責將DAG分解成多個階段(stages),每個階段是一系列并行的任務。

工作原理

  1. 操作解析:當代碼被提交到Spark的解釋器時,系統會生成一個操作圖,稱為Operator Graph,記錄了所有的Transformation操作及其依賴關系。

  2. DAG調度器:當一個Action操作(如collect或save)被觸發時,DAG調度器會將Operator Graph分解為多個Stage(階段)。窄依賴(Narrow Dependency)的操作如map和filter不需要數據重新分區,屬于同一階段;寬依賴(Wide Dependency)的操作如reduceByKey需要數據Shuffle,不同階段之間以寬依賴為界。

  3. 任務調度器:每個Stage會被拆分為多個基于數據分區的Task。Task調度器將這些Task分發到集群的Worker節點上執行。

  4. 執行與結果:每個Worker節點執行分配的Task,并將結果返回給Driver程序。DAG確保各個階段按依賴順序執行,并通過內存優化中間結果存儲,最大限度減少I/O和通信開銷。

作用

  • 任務依賴分析:DAG調度器通過分析RDD之間的依賴關系,決定任務的執行順序。

  • 內存計算優化:通過減少Shuffle和磁盤讀寫,DAG提高了計算效率。

  • 全局優化:DAG確保每個Stage都包含最少的任務,避免重復計算。

Shuffle機制

定義

Shuffle是Spark中的一種數據重新分區操作,通常在寬依賴(Wide Dependency)的操作中發生,如reduceByKey、groupByKey等。

工作原理

  1. 分區:Shuffle操作會將數據重新分區,通常會根據鍵(key)進行分區。

  2. 數據傳輸:數據從一個節點傳輸到另一個節點,以確保相同鍵的數據位于同一個節點上。

  3. 排序和分組:在目標節點上,數據會被排序和分組,以便進行后續的聚合操作。

優化策略

  • 減少數據傳輸:通過數據本地性優化,盡量減少數據在節點之間的傳輸。

  • 壓縮:在Shuffle過程中,可以啟用數據壓縮,減少網絡傳輸的開銷。

  • 緩存:在Shuffle之前,可以將數據緩存到內存中,減少重復計算。

數據緩存機制

定義

數據緩存機制是Spark中用于提高數據處理效率的一種機制,通過將數據緩存到內存中,減少重復計算的開銷。

實現方式

  • cache()cache()方法是persist()的簡化版,其底層實現直接調用persist(StorageLevel.MEMORY_AND_DISK),默認將數據存儲在內存中,如果內存不足,則溢寫到磁盤。

  • persist()persist()方法允許用戶選擇存儲級別StorageLevel,如MEMORY_ONLYMEMORY_AND_DISKDISK_ONLY等。

作用

  • 加速重復計算:通過緩存數據,避免重復計算DAG中的父節點。

  • 靈活的存儲策略persist()方法提供了更靈活的存儲策略,適應內存、磁盤等不同環境。

適用場景

  • 數據需要被多次使用:適用于數據需要被多次使用,但不需要跨作業的容錯能力的場景。

  • 計算代價大:適用于計算代價大,但內存能夠容納數據的場景。

錯誤容忍機制

RDD的DAG Lineage(血緣)

指創建RDD所依賴的轉換操作序列。當某個RDD的分區數據丟失時,Spark可以通過Lineage信息重新計算該分區的數據。

RDD的 DAG Lineage?主要用于描述數據從源到目標的轉換過程,包括數據的流動、處理、轉換等各個步驟。DAG Lineage能夠清晰地展示數據的來源、去向以及數據在不同階段的變化,幫助用戶了解數據的全生命周期。

Checkpoint(檢查點)

通過將RDD的狀態保存到可靠的存儲系統(如HDFS、S3等),以支持容錯和優化長計算鏈。當Spark應用程序出現故障時,可以從檢查點恢復狀態。

故障恢復

  • 節點故障:當Worker節點故障時,Spark會利用RDD的血統信息重新計算丟失的數據分區。如果設置了檢查點,Spark會從檢查點位置開始重新執行,減少計算開銷。
  • 驅動節點故障:如果驅動節點故障,Spark會通過Apache Mesos等集群管理器重新啟動驅動節點,并恢復執行狀態。

內存管理機制

內存模型

執行內存(Execution Memory):主要用于存儲任務執行過程中的臨時數據,如Shuffle的中間結果等。這部分內存主要用于任務的執行期間,任務完成后會被釋放。

存儲內存(Storage Memory):用于緩存中間結果(RDD)和DataFrame/DataSet的持久化數據。這部分內存是為了加速重復計算而存在的,數據可以被多次復用。

內存分配

內存分配比例:內存分配比例可以通過配置項spark.executor.memory來設置總的內存大小,并通過spark.storage.memoryFraction來指定存儲內存所占的比例,默認為0.6。這意味著默認情況下,Executor的60%的內存用于存儲,剩余的40%用于執行。

內存回收

LRU緩存淘汰策略:Spark采用LRU(Least Recently Used)緩存淘汰策略來管理存儲內存中的數據。當存儲內存不足時,Spark會根據LRU算法淘汰最近最少使用的數據。

Spill to Disk:當執行內存不足時,Spark會將一部分數據溢寫到磁盤,以釋放內存空間。例如,在Shuffle操作期間,如果內存不足以存放所有中間結果,Spark會將部分數據寫入磁盤。

動態內存管理

動態調整內存分配:在Spark 2.x版本之后,引入了更先進的內存管理機制,支持動態調整執行內存和存儲內存之間的比例。這意味著在運行時,Spark可以根據實際內存使用情況動態調整內存分配,從而更好地利用資源。

內存配置

  • spark.executor.memory:設置Executor的總內存大小。
  • spark.storage.memoryFraction:設置存儲內存所占的比例。
  • spark.shuffle.spill.compress:是否啟用Shuffle數據的壓縮。
  • spark.serializer:設置序列化庫,默認為org.apache.spark.serializer.KryoSerializer。
  • spark.kryoserializer.buffer.max:設置Kryo序列化器的最大緩沖區大小。

Spark的執行原理

由于本專欄的重點是SQL,所以我們主要看Spark SQL的執行過程。相比于Hive的源碼,Spark就貼心很多了,提供了org.apache.spark.sql.execution.QueryExecution類,這個類是Spark SQL查詢執行的核心,它封裝了從SQL解析到最終執行的整個過程,為開發者提供了豐富的接口來理解和調試查詢執行的整個過程。

QueryExecution源碼注釋如下:

The primary workflow for executing relational queries using Spark. ?Designed to allow easy?access to the intermediate phases of query execution for developers.
While this is not a public class, we should avoid changing the function names for the sake of changing them, because a lot of developers use the feature for debugging.

翻譯:

使用 Spark 執行關系查詢的主要工作流程。設計目的是讓開發者能夠輕松訪問查詢執行的中間階段。
雖然這不是一個公共類,但我們應盡可能避免改變函數名稱,因為許多開發者使用此功能進行調試。

QueryExecution類的2.x源碼如下:

/*** QueryExecution類負責執行關系查詢的主要工作流程。* 它設計用于讓開發人員可以輕松訪問查詢執行的中間階段。** @param sparkSession 用于執行查詢的SparkSession* @param logical 要執行的邏輯計劃*/
class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {// TODO: 將planner和optimizer從SessionState移動到這里/** 獲取查詢計劃器 */protected def planner: SparkPlanner = sparkSession.sessionState.plannerdef assertAnalyzed(): Unit = analyzeddef assertSupported(): Unit = {if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {UnsupportedOperationChecker.checkForBatch(analyzed)}}/** 懶加載已分析的邏輯計劃 */// SQL解析lazy val analyzed: LogicalPlan = {SparkSession.setActiveSession(sparkSession)sparkSession.sessionState.analyzer.executeAndCheck(logical)}/** 懶加載帶緩存數據的邏輯計劃 */lazy val withCachedData: LogicalPlan = {assertAnalyzed()assertSupported()sparkSession.sharedState.cacheManager.useCachedData(analyzed)}/** 懶加載優化后的邏輯計劃 */// 邏輯優化處理lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)/** 懶加載Spark物理計劃 */// 將邏輯計劃轉換成物理計劃// 邏輯計劃是不區分引擎的,而這里的物理計劃(SparkPlan)是面向Spark執行的lazy val sparkPlan: SparkPlan = {SparkSession.setActiveSession(sparkSession)// TODO: 目前我們使用next(),即取planner返回的第一個計劃,//       但我們將來會實現選擇最佳計劃的功能。planner.plan(ReturnAnswer(optimizedPlan)).next()}/** * executedPlan 不應用于初始化任何 SparkPlan。* 它僅應用于執行。*/// 預提交階段lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)/** RDD的內部版本。避免復制且沒有schema */// 最終提交階段lazy val toRdd: RDD[InternalRow] = {if (sparkSession.sessionState.conf.getConf(SQLConf.USE_CONF_ON_RDD_OPERATION)) {new SQLExecutionRDD(executedPlan.execute(), sparkSession.sessionState.conf)} else {executedPlan.execute()}}/*** 為執行準備計劃好的SparkPlan。* 根據需要插入shuffle操作和內部行格式轉換。** @param plan 要準備執行的SparkPlan* @return 準備好執行的SparkPlan*/protected def prepareForExecution(plan: SparkPlan): SparkPlan = {preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }}/** * 在執行前應用于物理計劃的規則序列。* 這些規則將按順序應用。** @return 要應用的規則序列*/protected def preparations: Seq[Rule[SparkPlan]] = Seq(PlanSubqueries(sparkSession),EnsureRequirements(sparkSession.sessionState.conf),CollapseCodegenStages(sparkSession.sessionState.conf),ReuseExchange(sparkSession.sessionState.conf),ReuseSubquery(sparkSession.sessionState.conf))/*** 嘗試執行給定的操作并返回結果字符串。* 如果發生AnalysisException,則返回異常的字符串表示。** @param f 要執行的操作* @return 操作結果的字符串表示或異常信息*/protected def stringOrError[A](f: => A): String =try f.toString catch { case e: AnalysisException => e.toString }/*** 以Hive兼容的字符串序列形式返回結果。* 這在測試和CLI應用程序的SparkSQLDriver中使用。** @return Hive兼容的結果字符串序列*/def hiveResultString(): Seq[String] = executedPlan match {case ExecutedCommandExec(desc: DescribeTableCommand) =>// 如果是Hive表的describe命令,我們希望輸出格式與Hive相似desc.run(sparkSession).map {case Row(name: String, dataType: String, comment) =>Seq(name, dataType,Option(comment.asInstanceOf[String]).getOrElse("")).map(s => String.format("%-20s", s)).mkString("\t")}// Hive中的SHOW TABLES只輸出表名,而我們的輸出包括數據庫、表名和是否為臨時表case command @ ExecutedCommandExec(s: ShowTablesCommand) if !s.isExtended =>command.executeCollect().map(_.getString(1))case other =>val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq// 我們需要類型信息以輸出結構字段名val types = analyzed.output.map(_.dataType)// 重新格式化以匹配Hive的制表符分隔輸出result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t"))}/*** 根據給定的數據類型格式化數據,并返回其字符串表示。** @param a 包含數據和其對應DataType的元組* @return 格式化后的字符串表示*/private def toHiveString(a: (Any, DataType)): String = {val primitiveTypes = Seq(StringType, IntegerType, LongType, DoubleType, FloatType,BooleanType, ByteType, ShortType, DateType, TimestampType, BinaryType)/*** 格式化BigDecimal,去除尾隨的零。** @param d 要格式化的BigDecimal* @return 格式化后的字符串*/def formatDecimal(d: java.math.BigDecimal): String = {if (d.compareTo(java.math.BigDecimal.ZERO) == 0) {java.math.BigDecimal.ZERO.toPlainString} else {d.stripTrailingZeros().toPlainString}}/*** Hive輸出struct字段的方式與最外層屬性略有不同。* 這個方法處理struct、數組和map類型的特殊格式化。** @param a 包含數據和其對應DataType的元組* @return 格式化后的字符串*/def toHiveStructString(a: (Any, DataType)): String = a match {case (struct: Row, StructType(fields)) =>struct.toSeq.zip(fields).map {case (v, t) => s""""${t.name}":${toHiveStructString((v, t.dataType))}"""}.mkString("{", ",", "}")case (seq: Seq[_], ArrayType(typ, _)) =>seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")case (map: Map[_, _], MapType(kType, vType, _)) =>map.map {case (key, value) =>toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))}.toSeq.sorted.mkString("{", ",", "}")case (null, _) => "null"case (s: String, StringType) => "\"" + s + "\""case (decimal, DecimalType()) => decimal.toStringcase (interval, CalendarIntervalType) => interval.toStringcase (other, tpe) if primitiveTypes contains tpe => other.toString}// 主要的格式化邏輯a match {case (struct: Row, StructType(fields)) =>struct.toSeq.zip(fields).map {case (v, t) => s""""${t.name}":${toHiveStructString((v, t.dataType))}"""}.mkString("{", ",", "}")case (seq: Seq[_], ArrayType(typ, _)) =>seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")case (map: Map[_, _], MapType(kType, vType, _)) =>map.map {case (key, value) =>toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))}.toSeq.sorted.mkString("{", ",", "}")case (null, _) => "NULL"case (d: Date, DateType) =>DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))case (t: Timestamp, TimestampType) =>DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t),DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone))case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)case (interval, CalendarIntervalType) => interval.toStringcase (other, tpe) if primitiveTypes.contains(tpe) => other.toString}}/*** 返回執行計劃的簡單字符串表示。** @return 包含物理計劃樹的字符串*/def simpleString: String = withRedaction {s"""== Physical Plan ==|${stringOrError(executedPlan.treeString(verbose = false))}""".stripMargin.trim}/*** 返回查詢執行的詳細字符串表示。** @return 包含解析的邏輯計劃、分析的邏輯計劃、優化的邏輯計劃和物理計劃的字符串*/override def toString: String = withRedaction {def output = Utils.truncatedString(analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")val analyzedPlan = Seq(stringOrError(output),stringOrError(analyzed.treeString(verbose = true))).filter(_.nonEmpty).mkString("\n")s"""== Parsed Logical Plan ==|${stringOrError(logical.treeString(verbose = true))}|== Analyzed Logical Plan ==|$analyzedPlan|== Optimized Logical Plan ==|${stringOrError(optimizedPlan.treeString(verbose = true))}|== Physical Plan ==|${stringOrError(executedPlan.treeString(verbose = true))}""".stripMargin.trim}/*** 返回帶有統計信息的查詢執行字符串表示。** @return 包含優化的邏輯計劃和物理計劃(帶統計信息)的字符串*/def stringWithStats: String = withRedaction {// 觸發計算邏輯計劃的統計信息optimizedPlan.stats// 只顯示優化的邏輯計劃和物理計劃s"""== Optimized Logical Plan ==|${stringOrError(optimizedPlan.treeString(verbose = true, addSuffix = true))}|== Physical Plan ==|${stringOrError(executedPlan.treeString(verbose = true))}""".stripMargin.trim}/*** 對給定的字符串中的敏感信息進行編輯。** @param message 要編輯的原始消息* @return 編輯后的消息*/private def withRedaction(message: String): String = {Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)}/** * 一個特殊的命名空間,包含可用于調試查詢執行的命令。*/// scalastyle:offobject debug {// scalastyle:on/*** 將此計劃中找到的所有生成的代碼打印到標準輸出。* 即打印每個WholeStageCodegen子樹的輸出。*/def codegen(): Unit = {// scalastyle:off printlnprintln(org.apache.spark.sql.execution.debug.codegenString(executedPlan))// scalastyle:on println}/*** 獲取查詢計劃中的WholeStageCodegenExec子樹及其生成的代碼。** @return WholeStageCodegen子樹及其對應生成代碼的序列*/def codegenToSeq(): Seq[(String, String)] = {org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)}}
}

可以看到這個類確實是封裝了SparkSQL查詢執行的整個過程,從邏輯計劃到物理計劃,再到最終的執行。

我們簡單梳理一下其主要的執行階段如下:

  1. analyzed: 對邏輯計劃進行分析
  2. withCachedData: 使用緩存數據
  3. optimizedPlan: 優化邏輯計劃
  4. sparkPlan: 生成物理計劃
  5. executedPlan: 準備提交執行物理計劃
  6. toRdd: 最終執行,將物理計劃轉換為RDD

而QueryExecution類的3.x源碼如下:


/*** QueryExecution 類代表了使用 Spark 執行關系查詢的主要工作流程。* * 該類設計用于讓開發者能夠輕松訪問查詢執行的中間階段,便于調試。** @param sparkSession 當前的 SparkSession* @param logical 要執行的邏輯計劃* @param tracker 用于跟蹤查詢計劃的 QueryPlanningTracker* @param mode 命令執行模式*/
class QueryExecution(val sparkSession: SparkSession,         // 當前的 Spark 會話,包含 Spark 的運行時環境val logical: LogicalPlan,             // 查詢的邏輯計劃,表示對數據的高級操作val tracker: QueryPlanningTracker = new QueryPlanningTracker, // 跟蹤查詢計劃的執行階段和時間val mode: CommandExecutionMode.Value = CommandExecutionMode.ALL // 指定命令執行的模式
) extends Logging {val id: Long = QueryExecution.nextExecutionId // 為每個查詢執行分配唯一 ID// 暫未使用,計劃將 planner 和 optimizer 移至此處protected def planner = sparkSession.sessionState.planner// 確保邏輯計劃已被分析def assertAnalyzed() = analyzed// 確保查詢操作受支持def assertSupported() = {if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {UnsupportedOperationChecker.checkForBatch(analyzed)}}// 延遲計算,確保邏輯計劃已被分析lazy val analyzed: LogicalPlan = {// 執行分析階段val plan = executePhase(QueryPlanningTracker.ANALYSIS) {sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)}tracker.setAnalyzed(plan)plan}// 根據執行模式決定是否立即執行命令lazy val commandExecuted: LogicalPlan = mode match {case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)case CommandExecutionMode.SKIP => analyzed}// 定義命令執行名稱private def commandExecutionName(command: Command): String = command match {case _: CreateTableAsSelect => "create"case _: ReplaceTableAsSelect => "replace"case _: AppendData => "append"case _: OverwriteByExpression => "overwrite"case _: OverwritePartitionsDynamic => "overwritePartitions"case _ => "command"}// 立即執行命令private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {case c: Command =>// 設置查詢計劃追蹤器,標記查詢已準備好執行tracker.setReadyForExecution()// 執行命令計劃val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)// 執行命令計劃,獲取結果val result = SQLExecution.withNewExecutionId(qe, Some(commandExecutionName(c))) {qe.executedPlan.executeCollect()}// 創建命令結果CommandResult(qe.analyzed.output,qe.commandExecuted,qe.executedPlan,result)case other => other // 其他情況保持不變}// 對查詢計劃進行歸一化lazy val normalized: LogicalPlan = {val normalizationRules = sparkSession.sessionState.planNormalizationRulesif (normalizationRules.isEmpty) {commandExecuted} else {val planChangeLogger = new PlanChangeLogger[LogicalPlan]()// 應用歸一化規則val normalized = normalizationRules.foldLeft(commandExecuted) { (p, rule) =>val result = rule.apply(p)planChangeLogger.logRule(rule.ruleName, p, result)result}// 記錄歸一化過程planChangeLogger.logBatch("Plan Normalization", commandExecuted, normalized)normalized}}lazy val withCachedData: LogicalPlan = sparkSession.withActive {assertAnalyzed()assertSupported()// 克隆計劃,避免狀態共享sparkSession.sharedState.cacheManager.useCachedData(normalized.clone())}// 確保命令已執行def assertCommandExecuted(): Unit = commandExecuted// 獲得優化后的邏輯計劃lazy val optimizedPlan: LogicalPlan = {assertCommandExecuted()executePhase(QueryPlanningTracker.OPTIMIZATION) {// 克隆計劃,避免狀態共享val plan = sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)// 標記計劃為已分析plan.setAnalyzed()plan}}// 確保計劃已優化def assertOptimized(): Unit = optimizedPlan// 獲得物理執行計劃lazy val sparkPlan: SparkPlan = {assertOptimized()executePhase(QueryPlanningTracker.PLANNING) {// 創建 Spark 計劃QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())}}// 確保物理計劃已準備好執行def assertSparkPlanPrepared(): Unit = sparkPlan// 獲得實際執行計劃lazy val executedPlan: SparkPlan = {assertOptimized()val plan = executePhase(QueryPlanningTracker.PLANNING) {// 準備執行計劃QueryExecution.prepareForExecution(preparations, sparkPlan.clone())}// 標記查詢已準備好執行tracker.setReadyForExecution()plan}// 確保實際執行計劃已準備好執行def assertExecutedPlanPrepared(): Unit = executedPlan// 獲取 RDD 內部版本lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(executedPlan.execute(), sparkSession.sessionState.conf)// 獲得查詢計劃的指標def observedMetrics: Map[String, Row] = CollectMetricsExec.collect(executedPlan)protected def preparations: Seq[Rule[SparkPlan]] = {QueryExecution.preparations(sparkSession,Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))), false)}// 執行查詢階段protected def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive {QueryExecution.withInternalError(s"The Spark SQL phase $phase failed with an internal error.") {tracker.measurePhase(phase)(block)}}// 獲取查詢計劃的簡單字符串表示def simpleString: String = {val concat = new PlanStringConcat()simpleString(false, SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 生成查詢計劃的簡單字符串表示private def simpleString(formatted: Boolean,maxFields: Int,append: String => Unit): Unit = {append("== Physical Plan ==\n")if (formatted) {try {ExplainUtils.processPlan(executedPlan, append)} catch {case e: AnalysisException => append(e.toString)case e: IllegalArgumentException => append(e.toString)}} else {QueryPlan.append(executedPlan,append, verbose = false, addSuffix = false, maxFields = maxFields)}append("\n")}// 獲取查詢計劃的解釋字符串def explainString(mode: ExplainMode): String = {val concat = new PlanStringConcat()explainString(mode, SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 解釋查詢計劃,生成字符串表示private def explainString(mode: ExplainMode, maxFields: Int, append: String => Unit): Unit = {val queryExecution = if (logical.isStreaming) {new IncrementalExecution(sparkSession, logical, OutputMode.Append(), "<unknown>",UUID.randomUUID, UUID.randomUUID, 0, None, OffsetSeqMetadata(0, 0),WatermarkPropagator.noop())} else {this}mode match {case SimpleMode =>queryExecution.simpleString(false, maxFields, append)case ExtendedMode =>queryExecution.toString(maxFields, append)case CodegenMode =>try {org.apache.spark.sql.execution.debug.writeCodegen(append, queryExecution.executedPlan)} catch {case e: AnalysisException => append(e.toString)}case CostMode =>queryExecution.stringWithStats(maxFields, append)case FormattedMode =>queryExecution.simpleString(formatted = true, maxFields = maxFields, append)}}// 寫入各個階段的計劃private def writePlans(append: String => Unit, maxFields: Int): Unit = {val (verbose, addSuffix) = (true, false)append("== Parsed Logical Plan ==\n")QueryPlan.append(logical, append, verbose, addSuffix, maxFields)append("\n== Analyzed Logical Plan ==\n")try {if (analyzed.output.nonEmpty) {append(truncatedString(analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields))append("\n")}QueryPlan.append(analyzed, append, verbose, addSuffix, maxFields)append("\n== Optimized Logical Plan ==\n")QueryPlan.append(optimizedPlan, append, verbose, addSuffix, maxFields)append("\n== Physical Plan ==\n")QueryPlan.append(executedPlan, append, verbose, addSuffix, maxFields)} catch {case e: AnalysisException => append(e.toString)}}// 重寫 toString 方法override def toString: String = withRedaction {val concat = new PlanStringConcat()toString(SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 生成查詢計劃的字符串表示private def toString(maxFields: Int, append: String => Unit): Unit = {writePlans(append, maxFields)}// 獲取包含統計信息的計劃字符串def stringWithStats: String = {val concat = new PlanStringConcat()stringWithStats(SQLConf.get.maxToStringFields, concat.append)withRedaction {concat.toString}}// 生成包含統計信息的計劃字符串private def stringWithStats(maxFields: Int, append: String => Unit): Unit = {try {// 觸發邏輯計劃的統計計算optimizedPlan.collectWithSubqueries {case plan => plan.stats}} catch {case e: AnalysisException => append(e.toString + "\n")}// 只顯示優化后的邏輯計劃和物理計劃append("== Optimized Logical Plan ==\n")QueryPlan.append(optimizedPlan, append, verbose = true, addSuffix = true, maxFields)append("\n== Physical Plan ==\n")QueryPlan.append(executedPlan, append, verbose = true, addSuffix = false, maxFields)append("\n")}// 內部敏感信息的脫敏處理private def withRedaction(message: String): String = {Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, message)}// 用于調試查詢執行的特殊命名空間object debug {// 打印生成的代碼def codegen(): Unit = {println(org.apache.spark.sql.execution.debug.codegenString(executedPlan))}// 獲取生成的代碼和統計信息def codegenToSeq(): Seq[(String, String, ByteCodeStats)] = {org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)}// 將調試信息寫入文件def toFile(path: String,maxFields: Int = Int.MaxValue,explainMode: Option[String] = None): Unit = {val filePath = new Path(path)val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf())val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath)))try {val mode = explainMode.map(ExplainMode.fromString(_)).getOrElse(ExtendedMode)explainString(mode, maxFields, writer.write)if (mode != CodegenMode) {writer.write("\n== Whole Stage Codegen ==\n")org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan)}log.info(s"Debug information was written at: $filePath")} finally {writer.close()}}}
}
/*** SPARK-35378: 命令應立即執行,以便像 `sql("INSERT ...")` 這樣的查詢可以立即觸發表插入,無需調用 `.collect()`。* 為了避免無窮遞歸,我們應在遞歸執行命令時使用 `NON_ROOT`。此外,我們不能執行帶有命令葉節點的查詢計劃,* 因為許多命令返回 `GenericInternalRow` 并不能直接放入查詢計劃中,否則查詢引擎可能會嘗試將 `GenericInternalRow`* 轉換為 `UnsafeRow` 并失敗。當運行 EXPLAIN 或命令包含在其他命令中時,應使用 `SKIP` 來避免立即觸發命令執行。*/
object CommandExecutionMode extends Enumeration {val SKIP, NON_ROOT, ALL = Value // 定義不同模式的值
}
object QueryExecution {private val _nextExecutionId = new AtomicLong(0) // 原子操作,確保線程安全private def nextExecutionId: Long = _nextExecutionId.getAndIncrement // 獲取下一個查詢執行的 ID// 構建用于準備執行的規則序列private[execution] def preparations(sparkSession: SparkSession,adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None,subquery: Boolean): Seq[Rule[SparkPlan]] = {adaptiveExecutionRule.toSeq ++Seq(CoalesceBucketsInJoin,PlanDynamicPruningFilters(sparkSession),PlanSubqueries(sparkSession),RemoveRedundantProjects,EnsureRequirements(),ReplaceHashWithSortAgg,RemoveRedundantSorts,RemoveRedundantWindowGroupLimits,DisableUnnecessaryBucketedScan,ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules, outputsColumnar = false),CollapseCodegenStages()) ++(if (subquery) Nil else Seq(ReuseExchangeAndSubquery))}// 準備 SparkPlan 以供執行private[execution] def prepareForExecution(preparations: Seq[Rule[SparkPlan]],plan: SparkPlan): SparkPlan = {val planChangeLogger = new PlanChangeLogger[SparkPlan]()val preparedPlan = preparations.foldLeft(plan) { case (sp, rule) =>val result = rule.apply(sp)planChangeLogger.logRule(rule.ruleName, sp, result)result}planChangeLogger.logBatch("Preparations", plan, preparedPlan)preparedPlan}// 將邏輯計劃轉換為 Spark 計劃def createSparkPlan(sparkSession: SparkSession,planner: SparkPlanner,plan: LogicalPlan): SparkPlan = {planner.plan(ReturnAnswer(plan)).next()}// 準備 SparkPlan 以供執行def prepareExecutedPlan(spark: SparkSession, plan: SparkPlan): SparkPlan = {prepareForExecution(preparations(spark, subquery = true), plan)}// 準備子查詢的執行計劃def prepareExecutedPlan(spark: SparkSession, plan: LogicalPlan): SparkPlan = {val sparkPlan = createSparkPlan(spark, spark.sessionState.planner, plan.clone())prepareExecutedPlan(spark, sparkPlan)}// 使用自適應執行上下文準備執行計劃def prepareExecutedPlan(session: SparkSession,plan: LogicalPlan,context: AdaptiveExecutionContext): SparkPlan = {val sparkPlan = createSparkPlan(session, session.sessionState.planner, plan.clone())val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true)prepareForExecution(preparationRules, sparkPlan.clone())}// 將斷言和空指針異常轉換為內部錯誤private[sql] def toInternalError(msg: String, e: Throwable): Throwable = e match {case e @ (_: java.lang.NullPointerException | _: java.lang.AssertionError) =>SparkException.internalError(msg + " You hit a bug in Spark or the Spark plugins you use. Please, report this bug " +"to the corresponding communities or vendors, and provide the full stack trace.",e)case e: Throwable =>e}// 捕獲斷言和空指針異常,并將其轉換為內部錯誤private[sql] def withInternalError[T](msg: String)(block: => T): T = {try {block} catch {case e: Throwable => throw toInternalError(msg, e)}}
}

可以看到Spark 3.x實現的整個執行流程是和2.x區別不大的,主要是引入了兩個新參數如下:

  • tracker: 用于跟蹤查詢計劃過程的工具
  • mode: 命令執行模式

引入 trackermode 參數的主要目的是為了增強查詢執行的靈活性和可追蹤性。tracker 參數使得對查詢計劃的執行過程可以進行更細粒度的監控,而 mode 參數則提供了更靈活的查詢執行模式,使得 QueryExecution 類能夠根據不同的需求進行不同的操作。同時通過CommandExecutionMode 枚舉定義了命令執行的不同模式,來夠靈活地控制命令的執行方式。這些改進的目的都是為了提高 Spark SQL 查詢的性能和可調試性。

總結

本文介紹了Spark,并通過源碼梳理了Spark SQL的執行原理,其核心思路也是和我們在引入篇以及Hive執行原理,提到的解析(Parsing)、校驗(Validation)、優化(Optimization)和執行(Execution)是一致的。

Spark SQL的具體執行過程主要可以分為以下幾個步驟:

  1. 輸入SQL語句經過Antlr4解析,生成未解決的邏輯計劃;
  2. 綁定分析器,例如函數適配、通過Catalog獲取字段等,生成已解決的邏輯計劃;
  3. 優化器對已解決的邏輯計劃進行優化,基于CBO和RBO轉換,生成優化后的邏輯計劃;
  4. 將優化后的邏輯計劃轉換為多個可被識別或執行的物理計劃;
  5. 基于CBO在多個物理計劃中,選擇執行開銷最小的物理計劃;
  6. 轉為具體的RDDs執行。

感興趣的小伙伴可以深入源碼去探索一下具體的解析和優化實現。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/70029.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/70029.shtml
英文地址,請注明出處:http://en.pswp.cn/web/70029.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MYSQL下載安裝及使用

MYSQL官網下載地址&#xff1a;https://downloads.mysql.com/archives/community/ 也可以直接在服務器執行指令下載&#xff0c;但是下載速度比較慢。還是自己下載好拷貝過來比較快。 wget https://dev.mysql.com/get/Downloads/mysql-5.7.38-linux-glibc2.12-x86_64.tar.gz 1…

CentOS 7.8 安裝MongoDB 7 副本集(Replica Set)

文章目錄 1 環境假設步驟1&#xff1a;在兩臺服務器上安裝MongoDB步驟2&#xff1a;配置副本集步驟3&#xff1a;初始化副本集步驟4&#xff1a;驗證副本集配置步驟5&#xff1a;設置安全性&#xff08;可選&#xff09;擴展配置示例&#xff1a;最佳實踐&#xff1a;仲裁節點步…

AJAX 與 ASP 的深入探討

AJAX 與 ASP 的深入探討 引言 隨著互聯網技術的飛速發展,Web應用程序的交互性和性能要求越來越高。AJAX(Asynchronous JavaScript and XML)和ASP(Active Server Pages)作為兩種重要的Web開發技術,在提高Web應用程序性能和用戶體驗方面發揮著重要作用。本文將深入探討AJ…

內網下,Ubuntu (24.10) 離線安裝docker最新版教程

一般在數據比較敏感的情況下&#xff0c;是無法使用網絡的&#xff0c;而對于Ubuntu系統來說&#xff0c;怎么離線安裝docker呢&#xff1f; 下面我給大家來講一下&#xff1a; 采用二進制安裝&#xff1a; 1.下載docker離線包 官網下載&#xff1a; Index of linux/static…

Copilot Next Edit Suggestions(預覽版)

作者&#xff1a;Brigit Murtaugh&#xff0c;Burke Holland 排版&#xff1a;Alan Wang 我們很高興向你介紹在本次 Visual Studio Code 發布中&#xff0c;關于 GitHub Copilot 的三個預覽功能&#xff1a; Next Edit Suggestions&#xff08;NES&#xff09;Copilot Edits 的…

高性能內存對象緩存Memcached詳細實驗操作

目錄 前提準備&#xff1a; cache1&#xff0c;2&#xff1a; 客戶端cache-api&#xff08;一定得是LAMP環境&#xff09; memcache實現主主復制以及高可用(基于以上完成) cache1,2: memcachekeepalived(基于以上完成) cache1,2: 前提準備&#xff1a; 1. 準備三臺cent…

全單模矩陣及其在分支定價算法中的應用

全單模矩陣及其在分支定價算法中的應用 目錄 全單模矩陣的定義與特性全單模矩陣的判定方法全單模矩陣在優化中的核心價值分支定價算法與矩陣單模性的關系非全單模問題的挑戰與系統解決方案總結與工程實踐建議 1. 全單模矩陣的定義與特性 關鍵定義 單模矩陣&#xff08;Unimo…

Spring AI發布!讓Java緊跟AI賽道!

1. 序言 在當今技術發展的背景下&#xff0c;人工智能&#xff08;AI&#xff09;已經成為各行各業中不可忽視的重要技術。無論是在互聯網公司&#xff0c;還是傳統行業&#xff0c;AI技術的應用都在大幅提升效率、降低成本、推動創新。從智能客服到個性化推薦&#xff0c;從語…

【kafka系列】Kafka如何保證消息不丟失?

目錄 1. 生產者端&#xff1a;確保消息成功發送到Broker 核心機制&#xff1a; 關鍵步驟&#xff1a; 2. Broker端&#xff1a;持久化與副本同步 核心機制&#xff1a; 關鍵源碼邏輯&#xff1a; 3. 消費者端&#xff1a;可靠消費與Offset提交 核心機制&#xff1a; 關…

利用二分法+布爾盲注、時間盲注進行sql注入

一、布爾盲注&#xff1a; import requestsdef binary_search_character(url, query, index, low32, high127):while low < high:mid (low high 1) // 2payload f"1 AND ASCII(SUBSTRING(({query}),{index},1)) > {mid} -- "res {"id": payloa…

UART(一)——UART基礎

一、定義 UART(Universal Asynchronous Receiver/Transmitter)是一種廣泛使用的串行通信協議,用于在設備間通過異步方式傳輸數據。它無需共享時鐘信號,而是依賴雙方預先約定的參數(如波特率)完成通信。 功能和特點 基本的 UART 系統只需三個信號即可提供穩健的中速全雙工…

【PHP】php+mysql 活動信息管理系統(源碼+論文+數據庫+數據庫文件)【獨一無二】

&#x1f449;博__主&#x1f448;&#xff1a;米碼收割機 &#x1f449;技__能&#x1f448;&#xff1a;C/Python語言 &#x1f449;專__注&#x1f448;&#xff1a;專注主流機器人、人工智能等相關領域的開發、測試技術。 【PHP】php 活動信息管理系統&#xff08;源碼論文…

數據結構——單向循環鏈表、雙鏈表、雙向循環鏈表

目錄 一、單向循環鏈表 1.1 單向循環鏈表的概念 1.2 單向循環鏈表的操作 1.2.1 單向循環鏈表的創建 1.2.2 單向循環鏈表的頭插 1.2.3 單向循環鏈表的遍歷 1.2.4 單向循環鏈表的頭刪 1.2.5 單向循環鏈表的尾插 1.2.6 單向循環鏈表的尾刪 1.2.7 約瑟夫環 1.3 單向循環列表所有程…

Apache Iceberg 與 Apache Hudi:數據湖領域的雙雄對決

在數據存儲和處理不斷發展的領域中&#xff0c;數據湖倉的概念已經嶄露頭角&#xff0c;成為了一種變革性的力量。數據湖倉結合了數據倉庫和數據湖的最佳元素&#xff0c;提供了一個統一的平臺&#xff0c;支持數據科學、商業智能、人工智能/機器學習以及臨時報告等多種關鍵功能…

JavaScript數組-數組的概念

在JavaScript編程中&#xff0c;數組&#xff08;Array&#xff09;是一種非常重要的數據結構&#xff0c;它允許我們將多個值存儲在一個單獨的變量中。數組可以包含任意類型的元素&#xff0c;如數字、字符串、對象甚至是其他數組&#xff0c;并提供了豐富的內置方法來操作這些…

AcWing 800. 數組元素的目標和

題目來源&#xff1a; 登錄 - AcWing 題目內容&#xff1a; 給定兩個升序排序的有序數組 A 和 B&#xff0c;以及一個目標值 x。 數組下標從 0開始。 請你求出滿足 A[i]B[j]x的數對 (i,j)。 數據保證有唯一解。 輸入格式 第一行包含三個整數 n,m,x&#xff0c;分別表示 …

wordpress資訊類網站整站打包

wordpress程序&#xff0c;內置了價值499元的模板.但是有了模板沒有全自動采集相信大多數人都搞不懂&#xff0c;目錄那么多&#xff0c;全靠原創幾乎是不可能的事情&#xff0c;除非你是大公司&#xff0c;每人控制一個板塊&#xff0c; 這套源碼里面最有價值的應該是這個采集…

python中的with是做什么的,有什么作用,什么時候需要用到with

&#x1f4cc; Python 中的 with 語句&#xff1a;作用 & 什么時候用 1?? with 是干嘛的&#xff1f; with 主要用來 自動管理資源&#xff0c;確保資源&#xff08;文件、數據庫連接等&#xff09;在使用完后能自動釋放&#xff0c;避免資源泄露問題。 換句話說&…

瀏覽器的Cookie 過期時間存儲

Cookie 是服務器發送到瀏覽器的小型文本數據&#xff0c;用于跟蹤用戶狀態&#xff08;如登錄信息、偏好設置&#xff09;&#xff0c;存儲大小通常限制為 4KB&#xff0c;每個域名下最多允許約 20-50 個 Cookie&#xff08;不同瀏覽器不同&#xff09;。 屬性 屬性說明示例注…

hive全量遷移腳本

#!/bin/bash #場景&#xff1a;數據在同一庫下&#xff0c;并且hive是內部表&#xff08;前綴的hdfs地址是相同的&#xff09;#1.讀取一個文件&#xff0c;獲取表名#echo "時間$dt_jian_2-------------------------" >> /home/hadoop/qianyi_zengliang/rs.txt#…