Spark SQL 概述
Spark SQL 是 Apache Spark 的一個模塊,專門用于處理結構化數據。它集成了 SQL 查詢和 Spark 編程的強大功能,使得處理大數據變得更加高效和簡便。通過 Spark SQL,用戶可以直接在 Spark 中使用 SQL 查詢,或者使用 DataFrame 和 DataSet API 進行數據操作。
- 一、Spark SQL 架構
- 二、Spark SQL 特點
- 三、Spark SQL 運行原理
- 四、Spark SQL API 相關概述
- 五、Spark SQL 依賴
- 六、Spark SQL 數據集
- 1、DataFrame
- 2、Dataset
- 3、DataFrame 和 Dataset 的關系
- 七、Spark Sql 基本用法
- 1、Scala 創建 SparkSession 對象
- 2、DataFrame 和 Dataset 的創建方式
- 3、DataFrame API
一、Spark SQL 架構
Spark SQL 的架構主要由以下幾個組件組成:
- SparkSession:Spark 應用的統一入口點,用于創建 DataFrame、DataSet 和執行 SQL 查詢。
- Catalyst 優化器:Spark SQL 的查詢優化引擎,負責解析、分析、優化和生成物理執行計劃。
- DataFrame 和 DataSet API:提供面向對象的編程接口,支持豐富的數據操作方法。
- 數據源接口:支持多種數據源,如 HDFS、S3、HBase、Cassandra、Hive 等。
- 執行引擎:將優化后的查詢計劃轉換為執行任務,并在分布式集群上并行執行這些任務。
二、Spark SQL 特點
- 統一數據訪問接口:支持多種數據源(如 CSV、JSON、Parquet、Hive、JDBC、HBase 等)并提供一致的查詢接口。
- DataFrame 和 Dataset API:提供面向對象的編程接口,支持類型安全的操作,便于數據處理。
- Catalyst 優化器:自動將用戶的查詢轉換為高效的執行計劃,提升查詢性能。
- 與 Hive 的集成:無縫集成 Hive,能夠直接訪問現存的 Hive 數據,并使用 Hive 的 UDF 和 UDAF。
- 高性能:通過 Catalyst 優化器和 Tungsten 執行引擎,實現高效的查詢性能和內存管理。
- 多種操作方式:支持 SQL 和 API 編程兩種操作方式,靈活性高。
- 外部工具接口:提供 JDBC/ODBC 接口供第三方工具借助 Spark 進行數據處理。
- 高級接口:提供了更高層級的接口,方便地處理數據。
三、Spark SQL 運行原理
查詢解析(Query Parsing):將 SQL 查詢解析成抽象語法樹(AST)。
邏輯計劃生成(Logical Plan Generation):將 AST 轉換為未優化的邏輯計劃。
邏輯計劃優化(Logical Plan Optimization):使用 Catalyst 優化器對邏輯計劃進行一系列規則優化。
物理計劃生成(Physical Plan Generation):將優化后的邏輯計劃轉換為一個或多個物理計劃,并選擇最優的物理計劃。
執行(Execution):將物理計劃轉換為 RDD,并在集群上并行執行。
四、Spark SQL API 相關概述
SparkContext:SparkContext 是 Spark 應用程序的主入口點,負責連接到 Spark 集群,管理資源和任務調度。在 Spark 2.0 之后,推薦使用 SparkSession 取代 SparkContext。
SQLContext:SQLContext 是 Spark SQL 的編程入口點,允許用戶通過 SQL 查詢或 DataFrame API 進行數據處理。它提供了基本的 Spark SQL 功能。
HiveContext:HiveContext 是 SQLContext 的子集,增加了對 Hive 的集成支持,可以直接訪問 Hive 中的數據和元數據,使用 Hive 的 UDF 和 UDAF。
SparkSession:SparkSession 是 Spark 2.0 引入的新概念,合并了 SQLContext 和 HiveContext 的功能,提供了統一的編程接口。SparkSession 是 Spark SQL 的建議入口點,支持使用 DataFrame 和 Dataset API 進行數據處理。
創建 SparkContext 和 SparkSession 的注意事項:如果同時需要創建 SparkContext 和 SparkSession,必須先創建 SparkContext,再創建 SparkSession。如果先創建 SparkSession,再創建 SparkContext,會導致異常,因為在同一個 JVM 中只能運行一個 SparkContext。
五、Spark SQL 依賴
<properties><spark.version>3.1.2</spark.version><spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${spark.scala.version}</artifactId><version>${spark.version}</version>
</dependency>
六、Spark SQL 數據集
在 Spark SQL 中,數據集主要分為以下幾種類型:DataFrame 和 Dataset。它們是處理和操作結構化和半結構化數據的核心抽象。
1、DataFrame
Dataset 是在 Spark 2.0 中引入的新的抽象數據結構,它是強類型的,可以存儲 JVM 對象。Dataset API 結合了 DataFrame 的操作簡便性和類型安全性,適用于需要更高級別數據類型控制和面向對象編程風格的場景。具體特點如下:
- 類似于二維表格:DataFrame 類似于傳統的關系數據庫中的二維表格。
- Schema(數據結構信息):在 RDD 的基礎上加入了 Schema,描述數據結構的信息。
- 支持嵌套數據類型:DataFrame 的 Schema 支持嵌套的數據類型,如
struct
、map
和array
。 - 豐富的 SQL 操作 API:提供更多類似 SQL 操作的 API,便于進行數據查詢和操作。
2、Dataset
Dataset 是在 Spark 2.0 中引入的新的抽象數據結構,它是強類型的,可以存儲 JVM 對象。Dataset API 結合了 DataFrame 的操作簡便性和類型安全性,適用于需要更高級別數據類型控制和面向對象編程風格的場景。具體特點如下:
- 強類型:Spark 1.6中引入的一個更通用的數據集合,Dataset 是強類型的,提供類型安全的操作。
- RDD + Schema:可以認為 Dataset 是 RDD 和 Schema 的結合,既有 RDD 的分布式計算能力,又有 Schema 描述數據結構的信息。
- 適用于特定領域對象:可以存儲和操作特定領域對象的強類型集合。
- 并行操作:可以使用函數或者相關操作并行地進行轉換和操作。
3、DataFrame 和 Dataset 的關系
- DataFrame 是特殊的 Dataset:DataFrame 是 Dataset 的一個特例,即
DataFrame = Dataset[Row]
。 - 數據抽象和操作方式的統一:DataFrame 和 Dataset 統一了 Spark SQL 的數據抽象和操作方式,提供了靈活且強大的數據處理能力。
七、Spark Sql 基本用法
1、Scala 創建 SparkSession 對象
import org.apache.spark.sql.SparkSession
object SparkSqlContext {def main(args: Array[String]): Unit = {// 創建 SparkConf 對象,設置應用程序的配置val conf: SparkConf = new SparkConf().setMaster("local[4]") // 設置本地運行模式,使用 4 個線程.setAppName("spark sql") // 設置應用程序名稱為 "spark sql"// 創建 SparkSession 對象,用于 Spark SQL 的編程入口val spark: SparkSession = SparkSession.builder().config(conf) // 將 SparkConf 配置應用于 SparkSession.getOrCreate() // 獲取現有的 SparkSession,或者新建一個// 獲取 SparkContext 對象,可以直接從 SparkSession 中獲取val sc: SparkContext = spark.sparkContext// 導入 SparkSession 的隱式轉換,可以使用 DataFrame API 的方法import spark.implicits._// 在這里可以編寫數據處理代碼,例如創建 DataFrame 和 Dataset,進行數據操作等...// 停止 SparkSession,釋放資源spark.stop()}
}
2、DataFrame 和 Dataset 的創建方式
1、從集合創建
case class Person(name: String, age: Int) // 下同val data1 = Seq(Person("Alice", 25), Person("Bob", 30))
val ds: Dataset[Person] = spark.createDataset(data) // 這里的spark是SparkSession對象(如上代碼),下同val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")
1、從文件系統讀取
val schema = StructType(Seq(StructField("name", StringType, nullable = false),StructField("age", IntegerType, nullable = false)
))val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]val dfCsv: DataFrame = spark.read// 使用.schema方法指定CSV文件的模式(schema)其定義了DataFrame的列名和類型。// 這是一個可選步驟,但如果CSV文件沒有頭部行,或者你想覆蓋文件中的頭部行,則必須指定。 .schema(schema) // 這里設置"header"為"true",表示CSV文件的第一行是列名,不需要Spark從文件中自動推斷。 .option("header", "true").csv("/path/to/csv/file")
3、從關系型數據庫讀取
val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)
4、從非結構化數據源讀取
val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")
5、手動創建 Dataset
import org.apache.spark.sql.types._val schema = StructType(Seq(StructField("name", StringType, nullable = false),StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]val dfManual: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(data), schema
)
3、DataFrame API
語法示例一
模擬數據(1000條):
id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子異,男,48,廣州市
3,孟秀英,女,46,上海市
4,金嘉倫,男,8,北京市
...
需求:哪些城市和性別組合在人口較多(ID數量>50)的情況下具有最高的平均年齡,以及這些組合在各自性別中的排名。
// 導入SparkSession的隱式轉換,這樣可以使用DataFrame的便捷方法(例如下面的'$'符號)
import spark.implicits._// 定義了一個DataFrame的schema,但在這個例子中,使用了CSV的header來自動推斷schema
val schema = StructType(Seq(StructField("id", LongType),StructField("name", StringType),StructField("gender", StringType),StructField("age", IntegerType),StructField("city", StringType),
))// 定義WindowSpec,用于后續的窗口函數操作,按gender分區,按avg_age降序排序,(復用使用此)
val WindowSpec: WindowSpec = Window.partitionBy($"gender").orderBy($"avg_age".desc)// 從CSV文件中讀取數據,使用header作為列名,然后選擇特定的列,進行分組和聚合操作
// 哪些城市和性別組合在人口較多(ID數量>50)的情況下具有最高的平均年齡,以及這些組合在各自性別中的排名。
spark.read// .schema(schema) // 應用我們定義的schema .option("header", "true") // 使用CSV的header作為列名.csv("D:\\projects\\sparkSql\\people.csv") // DataFrame.select($"id", $"name", $"age", $"city", $"gender") // 選擇需要的列(不寫默認就是全選).groupBy($"city", $"gender") // 按城市和性別分組.agg( // 多重聚合count($"id").as("count"), // 計算每個組的ID數量round(avg($"age"), 2).as("avg_age") // 計算每個組的平均年齡,并保留兩位小數).where($"count".gt(50)) // 過濾出ID數量大于(可以使用>)50的組.orderBy($"avg_age".desc) // 按平均年齡降序排序.select($"city", $"gender", $"avg_age",dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank")).show() // 顯示結果
結果:
+------+------+-------+-------------------+
| city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市| 男| 41.05| 1|
| 東莞| 男| 42.81| 2|
|上海市| 男| 43.92| 3|
|成都市| 男| 45.89| 4|
| 中山| 男| 47.08| 5|
|廣州市| 男| 47.47| 6|
| 深圳| 男| 48.36| 7|
|上海市| 女| 46.02| 1|
| 中山| 女| 49.55| 2|
+------+------+-------+-------------------+
語法示例二:視圖,sql
// 讀取CSV文件到DataFrame,使用header作為列名
val dfPeople: DataFrame = spark.read.option("header", "true") // 使用CSV的header作為列名.csv("D:\\projects\\sparkSql\\people.csv")// 將DataFrame注冊為臨時視圖
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL來查詢這個視圖了
// 例如,查詢所有人的姓名和年齡
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql("""|select * from people_view|where gender = '男'|""".stripMargin).show()
語法示例三:join
case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)val frmStu = spark.createDataFrame(Seq(Student("張三", 1),Student("李四", 1),Student("王五", 2),Student("趙六", 2),Student("李明", 2),Student("王剛", 4),Student("王朋", 5),)
)val frmClass = spark.createDataFrame(Seq(Class(1, "name1"),Class(2, "name2"),Class(3, "name3"),Class(4, "name4"))
)
left
左連接,rignt
右連接, full
全外連接,anti
左差集,semi
左交集
// 別名 + inner 內連接
frmStu.as("S").join(frmClass.as("C"), $"S.classId" === $"C.classId") // joinType 默認 inner內連接.show()// 使用左外連接將df和frmClass根據classId合并
frmStu.join(frmClass, Seq("classId"), "left") .show()// 左差集
frmStu.join(frmClass, Seq("classId"), "anti") .show()// 左交集
frmStu.join(frmClass, Seq("classId"), "semi") .show()
結果
別名 + inner 內連接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|張三| 1| 1| name1|
|李四| 1| 1| name1|
|王五| 2| 2| name2|
|趙六| 2| 2| name2|
|李明| 2| 2| name2|
|王剛| 4| 4| name4|
+----+-------+-------+---------+使用左外連接將df和frmClass根據classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
| 1|張三| name1|
| 1|李四| name1|
| 2|王五| name2|
| 2|趙六| name2|
| 2|李明| name2|
| 4|王剛| name4|
| 5|王朋| null|
+-------+----+---------+左差集
+-------+----+
|classId|name|
+-------+----+
| 5|王朋|
+-------+----+左交集
+-------+----+
|classId|name|
+-------+----+
| 1|張三|
| 1|李四|
| 2|王五|
| 2|趙六|
| 2|李明|
| 4|王剛|
+-------+----+