Spark SQL 概述

Spark SQL 概述

Spark SQL 是 Apache Spark 的一個模塊,專門用于處理結構化數據。它集成了 SQL 查詢和 Spark 編程的強大功能,使得處理大數據變得更加高效和簡便。通過 Spark SQL,用戶可以直接在 Spark 中使用 SQL 查詢,或者使用 DataFrame 和 DataSet API 進行數據操作。

  • 一、Spark SQL 架構
  • 二、Spark SQL 特點
  • 三、Spark SQL 運行原理
  • 四、Spark SQL API 相關概述
  • 五、Spark SQL 依賴
  • 六、Spark SQL 數據集
    • 1、DataFrame
    • 2、Dataset
    • 3、DataFrame 和 Dataset 的關系
  • 七、Spark Sql 基本用法
    • 1、Scala 創建 SparkSession 對象
    • 2、DataFrame 和 Dataset 的創建方式
    • 3、DataFrame API

一、Spark SQL 架構

Spark SQL 的架構主要由以下幾個組件組成:

  1. SparkSession:Spark 應用的統一入口點,用于創建 DataFrame、DataSet 和執行 SQL 查詢。
  2. Catalyst 優化器:Spark SQL 的查詢優化引擎,負責解析、分析、優化和生成物理執行計劃。
  3. DataFrame 和 DataSet API:提供面向對象的編程接口,支持豐富的數據操作方法。
  4. 數據源接口:支持多種數據源,如 HDFS、S3、HBase、Cassandra、Hive 等。
  5. 執行引擎:將優化后的查詢計劃轉換為執行任務,并在分布式集群上并行執行這些任務。

二、Spark SQL 特點

  • 統一數據訪問接口:支持多種數據源(如 CSV、JSON、Parquet、Hive、JDBC、HBase 等)并提供一致的查詢接口。
  • DataFrame 和 Dataset API:提供面向對象的編程接口,支持類型安全的操作,便于數據處理。
  • Catalyst 優化器:自動將用戶的查詢轉換為高效的執行計劃,提升查詢性能。
  • 與 Hive 的集成:無縫集成 Hive,能夠直接訪問現存的 Hive 數據,并使用 Hive 的 UDF 和 UDAF。
  • 高性能:通過 Catalyst 優化器和 Tungsten 執行引擎,實現高效的查詢性能和內存管理。
  • 多種操作方式:支持 SQL 和 API 編程兩種操作方式,靈活性高。
  • 外部工具接口:提供 JDBC/ODBC 接口供第三方工具借助 Spark 進行數據處理。
  • 高級接口:提供了更高層級的接口,方便地處理數據。

三、Spark SQL 運行原理

在這里插入圖片描述

查詢解析(Query Parsing):將 SQL 查詢解析成抽象語法樹(AST)。

邏輯計劃生成(Logical Plan Generation):將 AST 轉換為未優化的邏輯計劃。

邏輯計劃優化(Logical Plan Optimization):使用 Catalyst 優化器對邏輯計劃進行一系列規則優化。

物理計劃生成(Physical Plan Generation):將優化后的邏輯計劃轉換為一個或多個物理計劃,并選擇最優的物理計劃。

執行(Execution):將物理計劃轉換為 RDD,并在集群上并行執行。

四、Spark SQL API 相關概述

SparkContext:SparkContext 是 Spark 應用程序的主入口點,負責連接到 Spark 集群,管理資源和任務調度。在 Spark 2.0 之后,推薦使用 SparkSession 取代 SparkContext。

SQLContext:SQLContext 是 Spark SQL 的編程入口點,允許用戶通過 SQL 查詢或 DataFrame API 進行數據處理。它提供了基本的 Spark SQL 功能。

HiveContext:HiveContext 是 SQLContext 的子集,增加了對 Hive 的集成支持,可以直接訪問 Hive 中的數據和元數據,使用 Hive 的 UDF 和 UDAF。

SparkSession:SparkSession 是 Spark 2.0 引入的新概念,合并了 SQLContext 和 HiveContext 的功能,提供了統一的編程接口。SparkSession 是 Spark SQL 的建議入口點,支持使用 DataFrame 和 Dataset API 進行數據處理。

創建 SparkContext 和 SparkSession 的注意事項:如果同時需要創建 SparkContext 和 SparkSession,必須先創建 SparkContext,再創建 SparkSession。如果先創建 SparkSession,再創建 SparkContext,會導致異常,因為在同一個 JVM 中只能運行一個 SparkContext。

五、Spark SQL 依賴

<properties><spark.version>3.1.2</spark.version><spark.scala.version>2.12</spark.scala.version>
</properties>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${spark.scala.version}</artifactId><version>${spark.version}</version>
</dependency>

六、Spark SQL 數據集

在 Spark SQL 中,數據集主要分為以下幾種類型:DataFrame 和 Dataset。它們是處理和操作結構化和半結構化數據的核心抽象。

1、DataFrame

Dataset 是在 Spark 2.0 中引入的新的抽象數據結構,它是強類型的,可以存儲 JVM 對象。Dataset API 結合了 DataFrame 的操作簡便性和類型安全性,適用于需要更高級別數據類型控制和面向對象編程風格的場景。具體特點如下:

  • 類似于二維表格:DataFrame 類似于傳統的關系數據庫中的二維表格。
  • Schema(數據結構信息):在 RDD 的基礎上加入了 Schema,描述數據結構的信息。
  • 支持嵌套數據類型:DataFrame 的 Schema 支持嵌套的數據類型,如 structmaparray
  • 豐富的 SQL 操作 API:提供更多類似 SQL 操作的 API,便于進行數據查詢和操作。

2、Dataset

Dataset 是在 Spark 2.0 中引入的新的抽象數據結構,它是強類型的,可以存儲 JVM 對象。Dataset API 結合了 DataFrame 的操作簡便性和類型安全性,適用于需要更高級別數據類型控制和面向對象編程風格的場景。具體特點如下:

  • 強類型:Spark 1.6中引入的一個更通用的數據集合,Dataset 是強類型的,提供類型安全的操作。
  • RDD + Schema:可以認為 Dataset 是 RDD 和 Schema 的結合,既有 RDD 的分布式計算能力,又有 Schema 描述數據結構的信息。
  • 適用于特定領域對象:可以存儲和操作特定領域對象的強類型集合。
  • 并行操作:可以使用函數或者相關操作并行地進行轉換和操作。

3、DataFrame 和 Dataset 的關系

  • DataFrame 是特殊的 Dataset:DataFrame 是 Dataset 的一個特例,即 DataFrame = Dataset[Row]
  • 數據抽象和操作方式的統一:DataFrame 和 Dataset 統一了 Spark SQL 的數據抽象和操作方式,提供了靈活且強大的數據處理能力。

七、Spark Sql 基本用法

1、Scala 創建 SparkSession 對象

import org.apache.spark.sql.SparkSession
object SparkSqlContext {def main(args: Array[String]): Unit = {// 創建 SparkConf 對象,設置應用程序的配置val conf: SparkConf = new SparkConf().setMaster("local[4]")   // 設置本地運行模式,使用 4 個線程.setAppName("spark sql") // 設置應用程序名稱為 "spark sql"// 創建 SparkSession 對象,用于 Spark SQL 的編程入口val spark: SparkSession = SparkSession.builder().config(conf) // 將 SparkConf 配置應用于 SparkSession.getOrCreate() // 獲取現有的 SparkSession,或者新建一個// 獲取 SparkContext 對象,可以直接從 SparkSession 中獲取val sc: SparkContext = spark.sparkContext// 導入 SparkSession 的隱式轉換,可以使用 DataFrame API 的方法import spark.implicits._// 在這里可以編寫數據處理代碼,例如創建 DataFrame 和 Dataset,進行數據操作等...// 停止 SparkSession,釋放資源spark.stop()}
}

2、DataFrame 和 Dataset 的創建方式

1、從集合創建

case class Person(name: String, age: Int)				// 下同val data1 = Seq(Person("Alice", 25), Person("Bob", 30))	
val ds: Dataset[Person] = spark.createDataset(data)		// 這里的spark是SparkSession對象(如上代碼),下同val data2 = Seq(("Alice", 25), ("Bob", 30))
val df: DataFrame = data.toDF("name", "age")

1、從文件系統讀取

val schema = StructType(Seq(StructField("name", StringType, nullable = false),StructField("age", IntegerType, nullable = false)
))val dsJson: Dataset[Person] = spark.read.json("/path/to/json/file").as[Person]val dfCsv: DataFrame = spark.read// 使用.schema方法指定CSV文件的模式(schema)其定義了DataFrame的列名和類型。// 這是一個可選步驟,但如果CSV文件沒有頭部行,或者你想覆蓋文件中的頭部行,則必須指定。  .schema(schema)		  // 這里設置"header"為"true",表示CSV文件的第一行是列名,不需要Spark從文件中自動推斷。 .option("header", "true").csv("/path/to/csv/file")

3、從關系型數據庫讀取

val url = "jdbc:mysql://localhost:3306/database"
val properties = new java.util.Properties()
properties.setProperty("user", "username")
properties.setProperty("password", "password")val dsDb: Dataset[Person] = spark.read.jdbc(url, "table", properties).as[Person]val dfDb: DataFrame = spark.read.jdbc(url, "table", properties)

4、從非結構化數據源讀取

val dsParquet: Dataset[Person] = spark.read.parquet("/path/to/parquet/file").as[Person]val dfParquet: DataFrame = spark.read.parquet("/path/to/parquet/file")

5、手動創建 Dataset

import org.apache.spark.sql.types._val schema = StructType(Seq(StructField("name", StringType, nullable = false),StructField("age", IntegerType, nullable = false)
))
val data = Seq(Row("Alice", 25), Row("Bob", 30))val dsManual: Dataset[Person] = spark.createDataFrame(spark.sparkContext.parallelize(data), schema).as[Person]val dfManual: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(data), schema
)

3、DataFrame API

語法示例一

模擬數據(1000條):

id,name,gender,age,city
1,邵睿,男,12,上海市
2,林子異,男,48,廣州市
3,孟秀英,女,46,上海市
4,金嘉倫,男,8,北京市
...

需求:哪些城市和性別組合在人口較多(ID數量>50)的情況下具有最高的平均年齡,以及這些組合在各自性別中的排名。

// 導入SparkSession的隱式轉換,這樣可以使用DataFrame的便捷方法(例如下面的'$'符號)
import spark.implicits._// 定義了一個DataFrame的schema,但在這個例子中,使用了CSV的header來自動推斷schema
val schema = StructType(Seq(StructField("id", LongType),StructField("name", StringType),StructField("gender", StringType),StructField("age", IntegerType),StructField("city", StringType),
))// 定義WindowSpec,用于后續的窗口函數操作,按gender分區,按avg_age降序排序,(復用使用此)
val WindowSpec: WindowSpec = Window.partitionBy($"gender").orderBy($"avg_age".desc)// 從CSV文件中讀取數據,使用header作為列名,然后選擇特定的列,進行分組和聚合操作
// 哪些城市和性別組合在人口較多(ID數量>50)的情況下具有最高的平均年齡,以及這些組合在各自性別中的排名。
spark.read// .schema(schema)	// 應用我們定義的schema .option("header", "true") 							// 使用CSV的header作為列名.csv("D:\\projects\\sparkSql\\people.csv")			// DataFrame.select($"id", $"name", $"age", $"city", $"gender") 	// 選擇需要的列(不寫默認就是全選).groupBy($"city", $"gender") 							// 按城市和性別分組.agg(			// 多重聚合count($"id").as("count"),   		// 計算每個組的ID數量round(avg($"age"), 2).as("avg_age") // 計算每個組的平均年齡,并保留兩位小數).where($"count".gt(50))  		// 過濾出ID數量大于(可以使用>)50的組.orderBy($"avg_age".desc)     // 按平均年齡降序排序.select($"city", $"gender", $"avg_age",dense_rank().over(Window.partitionBy($"gender").orderBy($"avg_age".desc)).as("gender_avg_age_rank")).show() // 顯示結果

結果:

+------+------+-------+-------------------+
|  city|gender|avg_age|gender_avg_age_rank|
+------+------+-------+-------------------+
|北京市|    男|  41.05|                  1|
|  東莞|    男|  42.81|                  2|
|上海市|    男|  43.92|                  3|
|成都市|    男|  45.89|                  4|
|  中山|    男|  47.08|                  5|
|廣州市|    男|  47.47|                  6|
|  深圳|    男|  48.36|                  7|
|上海市|    女|  46.02|                  1|
|  中山|    女|  49.55|                  2|
+------+------+-------+-------------------+

語法示例二:視圖,sql

// 讀取CSV文件到DataFrame,使用header作為列名
val dfPeople: DataFrame = spark.read.option("header", "true") // 使用CSV的header作為列名.csv("D:\\projects\\sparkSql\\people.csv")// 將DataFrame注冊為臨時視圖
dfPeople.createOrReplaceTempView("people_view")
// 可以使用Spark SQL來查詢這個視圖了
// 例如,查詢所有人的姓名和年齡
spark.sql("SELECT name, age FROM people_view").show()
// 二
spark.sql("""|select * from people_view|where gender = '男'|""".stripMargin).show()

語法示例三:join

case class Student(name: String, classId: Int)
case class Class(classId: Int, className: String)val frmStu = spark.createDataFrame(Seq(Student("張三", 1),Student("李四", 1),Student("王五", 2),Student("趙六", 2),Student("李明", 2),Student("王剛", 4),Student("王朋", 5),)
)val frmClass = spark.createDataFrame(Seq(Class(1, "name1"),Class(2, "name2"),Class(3, "name3"),Class(4, "name4"))
)

left 左連接,rignt 右連接, full 全外連接,anti左差集,semi左交集

// 別名 + inner 內連接
frmStu.as("S").join(frmClass.as("C"), $"S.classId" === $"C.classId")	// joinType 默認 inner內連接.show()// 使用左外連接將df和frmClass根據classId合并
frmStu.join(frmClass, Seq("classId"), "left")	.show()// 左差集
frmStu.join(frmClass, Seq("classId"), "anti")	.show()// 左交集
frmStu.join(frmClass, Seq("classId"), "semi")	.show()

結果

別名 + inner 內連接
+----+-------+-------+---------+
|name|classId|classId|className|
+----+-------+-------+---------+
|張三|      1|      1|    name1|
|李四|      1|      1|    name1|
|王五|      2|      2|    name2|
|趙六|      2|      2|    name2|
|李明|      2|      2|    name2|
|王剛|      4|      4|    name4|
+----+-------+-------+---------+使用左外連接將df和frmClass根據classId合并
+-------+----+---------+
|classId|name|className|
+-------+----+---------+
|      1|張三|    name1|
|      1|李四|    name1|
|      2|王五|    name2|
|      2|趙六|    name2|
|      2|李明|    name2|
|      4|王剛|    name4|
|      5|王朋|     null|
+-------+----+---------+左差集
+-------+----+
|classId|name|
+-------+----+
|      5|王朋|
+-------+----+左交集
+-------+----+
|classId|name|
+-------+----+
|      1|張三|
|      1|李四|
|      2|王五|
|      2|趙六|
|      2|李明|
|      4|王剛|
+-------+----+

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/44210.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/44210.shtml
英文地址,請注明出處:http://en.pswp.cn/web/44210.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ubuntu16.04安裝低版本cmake(安裝cmake安裝)

文章目錄 ubuntu16.04安裝低版本cmake&#xff08;安裝cmake安裝&#xff09;1. **下載并解壓CMake壓縮文件**&#xff1a;- 首先&#xff0c;你需要從CMake的官方網站或其他可靠來源下載cmake-2.8.9-Linux-i386.tar.gz文件。- 然后在終端中使用以下命令解壓文件&#xff1a; 2…

BFS:多源BFS問題

一、多源BFS簡介 超級源點&#xff1a;其實就是把相應的原點一次性都丟到隊列中 二、01矩陣 . - 力扣&#xff08;LeetCode&#xff09; class Solution { public:const int dx[4]{1,-1,0,0};const int dy[4]{0,0,1,-1};vector<vector<int>> updateMatrix(vector…

Makefile--自動識別編譯環境(x86還是arm)進行編譯

在日常工作中&#xff0c;我們會在虛擬機下的x86系統進行架叉編譯&#xff0c;有時需要在arm上直接進行編譯。但工程都是一樣的&#xff0c;只是Makefile不一樣&#xff0c;這時就涉及到Makefile的靈活運用了。以下是一個自動識別編譯環境的通用Makefile&#xff1a; TARGET_A…

headerpwn:一款針對服務器響應與HTTP Header的模糊測試工具

關于headerpwn headerpwn是一款針對服務器響應與HTTP Header的模糊測試工具&#xff0c;廣大研究人員可以利用該工具查找網絡異常并分析服務器是如何響應不同HTTP Header的。 功能介紹 當前版本的headerpwn支持下列功能&#xff1a; 1、服務器安全與異常檢測&#xff1b; 2、…

PyTorch 1-深度學習

深度學習-PyTorch 一: Pytorch1> pytorch簡介2> PyTorch 特點&優勢3> pytorch簡史4> pytorch 庫5> PyTorch執行流程6> PyTorch 層次結構二: PyTorch常用的高級API和函數1> 自動求導(Autograd)2> 模型容器(Module)3> 優化器(Optimizer)4&g…

Java Stream API詳解:高效處理集合數據的利器

引言 Java 8引入了許多新特性&#xff0c;其中最為顯著的莫過于Lambda表達式和Stream API。Stream API提供了一種高效、簡潔的方法來處理集合數據&#xff0c;使代碼更加簡潔明了&#xff0c;且具有較高的可讀性和可維護性。本文將深入探討Java Stream API的使用&#xff0c;包…

QFileDialog的簡單了解

ps&#xff1a;寫了點垃圾&#xff08;哈哈哈&#xff09; 它繼承自QDialog 這是Windows自己的文件夾 這是兩者的對比圖&#xff1a; 通過看QFileDialog的源碼&#xff0c;來分析它是怎么實現這樣的效果的。 源碼組成&#xff1a; qfiledialog.h qfiledialog_p.h&#xff…

Python面試寶典第11題:最長連續序列

題目 給定一個未排序的整數數組 nums &#xff0c;找出數字連續的最長序列&#xff08;不要求序列元素在原數組中連續&#xff09;的長度。請你設計并實現時間復雜度為 O(n) 的算法解決此問題。 示例 1&#xff1a; 輸入&#xff1a;nums [100,4,200,1,3,2] 輸出&#xff1a;…

微信小程序中的數據通信

方法1: 使用回調函數 在app.js中:可以在修改globalData后執行一個回調函數,這個回調函數可以是頁面傳遞給app的一個更新函數。// app.js App({globalData: {someData: ,},setSomeData(newData, callback) {this.globalData.someData = newData;if (typeof callback === funct…

打造熱銷爆款:LazadaShopee店鋪測評與關鍵詞策略

面對Lazada和Shopee平臺上店鋪銷量難以突破的困境&#xff0c;賣家們往往尋求各種解決方案。其中&#xff0c;店鋪測評作為提升店鋪信譽、優化產品排名及增加曝光度的有效手段&#xff0c;正逐漸成為賣家關注的焦點。以下將深入探討店鋪測評的好處、實施技巧及自養號的關鍵要素…

提升校園效率:智慧校園后勤管理中的尋物管理功能

在智慧校園后勤管理體系中&#xff0c;尋物管理功能扮演著連接遺失與找回的橋梁角色&#xff0c;它充分利用現代信息技術&#xff0c;為校園內的師生提供了一套高效、便捷的失物招領解決方案。此功能圍繞以下幾個核心方面展開。 首先&#xff0c;它支持在線報失與信息登記。一旦…

如何連接到公司的服務器?

1.下載FileZilla FileZilla的下載與安裝以及簡單使用&#xff08;有圖解超簡單&#xff09;-CSDN博客 2.打開 3.輸入主機 用戶名 密碼 端口 注&#xff1a;主機支持的協議類型&#xff1a; 4.連接成功 其他方式也有很多&#xff0c;比如通過cmd&#xff0c;html網頁等等 3個…

昇思25天學習打卡營第19天|ShuffleNet圖像分類

今天是參加昇思25天學習打卡營的第19天&#xff0c;今天打卡的課程是“ShuffleNet圖像分類”&#xff0c;這里做一個簡單的分享。 1.簡介 在第15-18日的學習內容中&#xff0c;我們陸陸續續學習了計算機視覺相關的模型包括圖像語義分割、圖像分類、目標檢測等內容&#xff0c…

面試遲到了怎么辦

嗨&#xff0c;我是蘭若姐姐。作為一名面試官&#xff0c;最近面試了很多的測試候選人&#xff0c;有了很多感慨&#xff0c;借此抒發一下&#xff0c;我不知道別人面試更看重的是什么&#xff0c;但是在我這里&#xff0c;我最看重的是態度&#xff0c;其次才是技能 我覺得作…

vivado EXTRACT_ENABLE、EXTRACT_RESET

可提取 EXTRACT_ENABLE控制寄存器推斷是否啟用。通常&#xff0c;Vivado工具 提取或不提取基于啟發式方法&#xff0c;通常有利于最大程度的 設計。如果Vivado的行為不符合預期&#xff0c;此屬性將覆蓋 工具的默認行為。如果有不希望的啟用連接到CE引腳 觸發器&#xff0c;此屬…

中關村軟件園發布“數據合規與出境評估服務平臺”

在2024中關村論壇年會期間&#xff0c;中關村軟件園發布“數據合規與出境評估服務平臺”。該平臺是中關村軟件園結合北京市“兩區”建設&#xff0c;立足軟件園國家數字服務出口基地和數字貿易港建設&#xff0c;圍繞園區內外部企業用戶的業務合作、科研創新、跨國運營等場景需…

Python UDP編程之實時聊天與網絡監控詳解

概要 UDP(User Datagram Protocol,用戶數據報協議)是網絡協議中的一種,主要用于快速、簡單的通信場景。與TCP相比,UDP沒有連接、確認、重傳等機制,因此傳輸效率高,但也不保證數據的可靠性和順序。本文將詳細介紹Python中如何使用UDP協議進行網絡通信,并包含相應的示例…

七天.NET 8操作SQLite入門到實戰 - 第一天 SQLite 簡介

什么是SQLite&#xff1f; SQLite是一個輕量級的嵌入式關系型數據庫&#xff0c;它以一個小型的C語言庫的形式存在。它的設計目標是嵌入式的&#xff0c;而且已經在很多嵌入式產品中使用了它&#xff0c;它占用資源非常的低&#xff0c;在嵌入式設備中&#xff0c;可能只需要幾…

Vscode插件推薦——智能切換輸入法(Smart IME)

前言 相信廣大程序員朋友在寫代碼的時候一定會遇到過一個令人非常頭疼的事情——切換輸入法&#xff0c;特別是對于那些勤于寫注釋的朋友&#xff0c;簡直就是噩夢&#xff0c;正所謂懶人推動世界發展&#xff0c;這不&#xff0c;今天就向大家推薦一款好用的vscode插件&#…

ES6 Class(類) 總結(九)

ES6 中的 class 是一種面向對象編程的語法糖&#xff0c;提供了一種簡潔的方式來定義對象的結構和行為。 JavaScript 語言中&#xff0c;生成實例對象的傳統方法是通過構造函數。下面是一個例子。 function Point(x, y) {this.x x;this.y y; } Point.prototype.toString fu…