Spark SQL 架構概述
架構核心組件
-
API層(用戶接口)
- 輸入方式:SQL查詢;DataFrame/Dataset API。
- 統一性: 所有接口最終轉換為邏輯計劃樹(Logical Plan),進入優化流程。
-
編譯器層(Catalyst 優化器)
-
核心引擎: 基于規則的優化器(Rule-Based Optimizer, RBO)與成本優化器(Cost-Based Optimizer, CBO)。
-
處理流程:
階段 輸入 輸出 關鍵動作 解析 SQL/API 操作 未解析邏輯計劃 構建語法樹(AST),校驗語法正確性 分析 未解析邏輯計劃 解析后邏輯計劃 綁定元數據(表/列名、數據類型)、解析函數、檢查語義正確性 優化 解析后邏輯計劃 優化后邏輯計劃 應用優化規則(如謂詞下推、列剪裁、常量折疊、連接重排序) -
優化規則示例:
Predicate Pushdown(謂詞下推):將過濾條件推至數據源層,減少 I/O。
Column Pruning(列裁剪):僅讀取查詢涉及的列,減少數據傳輸。
-
-
執行計劃層(Planner)
-
物理計劃生成:將優化后的邏輯計劃轉換為物理計劃(Physical Plan)。
-
策略匹配: 根據數據分布、資源情況選擇最優執行策略(如
BroadcastHashJoin
vsSortMergeJoin
)。 -
物理優化:
全階段代碼生成(Whole-Stage Codegen):將多個操作合并為單個 JVM 函數,減少虛函數調用開銷。
謂詞下推至數據源:支持 Parquet/ORC 等格式的過濾條件下推。
-
-
執行引擎層(Tungsten + Spark Core)
-
Tungsten 引擎:
堆外內存管理:避免 JVM GC 開銷,直接操作二進制數據。
向量化計算:按列處理數據,提升 CPU 緩存命中率。
-
分布式執行:
物理計劃轉為 RDD DAG → 分解為 Stage → 調度 Task 到 Executor 并行執行。
利用 Spark Core 的血緣(Lineage)、內存管理、Shuffle 服務。
-
關鍵性能技術
- Catalyst 優化器
- 動態優化: 在邏輯計劃階段應用啟發式規則,減少冗余計算。
- 自適應查詢(AQE, Spark 3.0+):運行時根據 Shuffle 數據量動態調整 Join 策略、分區數。
- Tungsten 執行引擎
- 內存效率: 緊湊二進制格式存儲數據,減少內存占用 50%+。
- 代碼生成: 將查詢編譯為字節碼,性能接近手寫代碼。
- 統一數據源接入
- Data Source API V2: 支持擴展自定義數據源(如 Kafka、Cassandra),并實現下推優化。
graph LRA[SQL 查詢] --> B(Parser:生成語法樹)B --> C(Analyzer:解析元數據)C --> D(Optimizer:應用優化規則)D --> E(Planner:生成物理計劃)E --> F(Tungsten:代碼生成+內存優化)F --> G(Spark Core:分布式執行)
Spark SQL高級語法
復雜數據類型
-
數組 (ARRAY):同類型元素的有序集合(索引從0開始)。
- size():數組長度。
- explode():展開數組為多行。
- array_contains(arr, value):檢查元素是否存在。
- transform(arr, x -> x * 2):對每個元素應用Lambda函數。
-
映射(MAP<K,V>):鍵值對集合(鍵唯一)。
-
element_at(map, key):按鍵取值。
-
map_keys()/map_values():獲取所有鍵/值。
-
map_concat(map1, map2):合并兩個Map。
-
-
結構體 (
STRUCT<field1:T1, ...>
):包含多個字段的復合類型(類似JSON對象)。
高級聚合與分組
-
GROUPING SETS:自定義聚合維度組合,無關字段用NULL值填充。
SELECT city, department, SUM(salary) AS total_salary FROM employees GROUP BY GROUPING SETS ((city, department), (city), () )
-
ROLLUP:層級聚合
SELECT country, province, city,COUNT(*) AS count FROM locations GROUP BY ROLLUP(country, province, city) -- SQL結果會顯示(country, province, city)、(country, province)、(country)、()的聚合結果
-
CUBE:所有維度聚合
SELECT year, product, SUM(revenue) FROM sales GROUP BY CUBE(year, product) -- SQL結果會顯示(year, profucr)、(year)、(product)、()的聚合結果
-
聚合過濾(FILTER 子句):對特定條件聚合 (比WHERE更高效)
SELECT department,SUM(salary) FILTER (WHERE age > 30) AS senior_salary,AVG(salary) FILTER (WHERE gender = 'F') AS female_avg FROM employees GROUP BY department
窗口函數
-
核心結構
SELECTRANK() OVER (PARTITION BY dim ORDER BY metric DESCROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW -- 窗口范圍) AS rank FROM table
-
聚合函數:**SUM()、AVG()、COUNT()、MAX()、MIN()**等
-
排名函數:
- ROW_NUMBER():為窗口內的每一行分配一個唯一的序號,序號連續且不重復。
- RANK():排名函數,允許有并列的名次,名次后面會出現空位。
- ENSE_RANK():排名函數,允許有并列的名次,名次后面不會空出位置,即序號連續。
-
分組窗口函數:
- NTILE():將窗口內的行分為指定數量的組,每組的行數盡可能相等。
-
分布窗口函數
- PERCENT_RANK():計算每一行的相對排名,返回一個介于0到1之間的值,表示當前行在分區中的排名百分比。
- CUME_DIST():計算小于或等于當前行的行數占窗口總行數的比例。
-
取值窗口函數
- LAG():訪問當前行之前的第n行數據。
- LEAD():訪問當前行之后的第n行數據。
- FIRST_VALUE():獲取窗口內第一行的值。
- LAST_VALUE():獲取窗口內最后一行的值。
- NTH_VALUE():獲取窗口內第n行的值,如果存在多行則返回第一個。
-
窗口范圍
- UNBOUNDED PRECEDING:從分區中的第一行開始(前面所有行)。
- CURRENT ROW:包括當前行。
- N PRECEDING:從當前行之前的第 nN行開始。
- N FOLLOWING:包括當前行之后第 N 行。
- UNBOUNDED FOLLOWING:到分區中的最后一行結束(后面所有行)。
Spark SQL內置函數
-
聚合函數 (Aggregate Functions)
函數 返回值 說明 approx_count_distinct Long 近似去重計數 (rsd=相對誤差) collect_list Array 收集值到數組 (保留重復) collect_set Array 收集值到集合 (去重) corr Double 相關系數 (-1~1) covar_pop/covar_samp Double 總體/樣本協方差 kurtosis Double 峰度 skewness Double 偏度 percentile_approx Double 近似百分位數 -
數組函數 (Array Functions)
函數 說明 array(e1, e2, …) 轉換為數組 array_contains(arr, val) 數組包含 array_distinct 數組去重 array_position(arr, val) 數組索引值 size 數組大小 -
Map 函數 (Map Functions)
函數 說明 map(k1,v1, k2,v2) 轉化為map element_at(map, key) 根據鍵獲取值 map_keys/map_values 獲取鍵列表、值列表 map_entries 獲取map entry -
日期時間函數 (Datetime Functions)
函數 說明 示例 date_add/date_sub 日期加減 date_add('2025-07-01', 7)
→2025-07-08
datediff 日期差 datediff('2025-06-01','2025-06-30')
→30
date_format 格式化 date_format(ts, 'yyyy-MM')
→"2025-07"
trunc 截斷日期 trunc('2025-07-01', 'MONTH')
→2025-07-01
window 時間窗口 流處理中按時間聚合 -
JSON 函數 (JSON Functions)
函數 說明 示例 get_json_object JSON路徑取值 get_json_object('{"a":1}', '$.a')
→1
json_tuple 多字段提取 json_tuple('{"name":"Bob"}', 'name')
→Bob
from_json 解析為結構體 from_json('{"id":1}', 'id INT')
to_json 結構體轉JSON to_json(struct('Tom' AS name))
→'{"name":"Tom"}'
schema_of_json 推斷Schema schema_of_json('[{"a":1}]')
→ARRAY<STRUCT<a:INT>>
-
字符串函數 (String Functions)
函數 說明 示例 concat_ws 字符串拼接join concat_ws('-','2025','07')
→"2025-07"
split 字符串解析轉數組 split('a,b,c', ',')
→["a","b","c"]
regexp_extract 正則表達式 regexp_extract('id=100','id=(\\d+)',1)
→"100"
translate 字符串子串轉化 translate('hello','el','ip')
→"hippo"
parse_url 解析URL parse_url('http://a.com?q=spark','QUERY')
→"q=spark"