Spark-SQL
一.Spark-SQL核心編程(四)
1.數據加載與保存:
1)通用方式:
SparkSQL 提供了通用的保存數據和數據加載的方式。這里的通用指的是使用相同的API,根據不同的參數讀取和保存不同格式的數據,SparkSQL 默認讀取和保存的文件格式為parquet。
2)加載數據:
spark.read.load 是加載數據的通用方法。如果讀取不同格式的數據,可以對不同的數據格式進行設定。
spark.read.format("…")[.option("…")].load("…")
format("…"):指定加載的數據類型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要傳入加載數據的路徑。
option("…"):在"jdbc"格式下需要傳入 JDBC 相應參數,url、user、password 和 dbtable
我們前面都是使用 read API 先把文件加載到 DataFrame 然后再查詢,其實,我們也可以直接在文件上進行查詢: 文件格式.`文件路徑`
spark.sql("select * from json.’ Spark-SQL/input/user.json’").show
3)保存數據:
df.write.save 是保存數據的通用方法。如果保存不同格式的數據,可以對不同的數據格式進行設定。
df.write.format("…")[.option("…")].save("…")
format("…"):指定保存的數據類型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要傳入保存數據的路徑。
option("…"):在"jdbc"格式下需要傳入 JDBC 相應參數,url、user、password 和 dbtable
保存操作可以使用 SaveMode, 用來指明如何處理數據,使用 mode()方法來設置。
?
例如:df.write.mode("append").json("Spark-SQL/output")
?
?
2.Parquet
Spark SQL 的默認數據源為 Parquet 格式。Parquet 是一種能夠有效存儲嵌套數據的列式
存儲格式。數據源為 Parquet 文件時,Spark SQL 可以方便的執行所有的操作,不需要使用 format。修改配置項 spark.sql.sources.default,可修改默認數據源格式。
1)加載數據:
Val df =s park.read.load("examples/src/main/resources/users.parquet")
2)保存數據:
var df = spark.read.json("/opt/module/data/input/people.json")
df.write.mode("append").save("/opt/module/data/output")
3.JSON
Spark SQL 能夠自動推測 JSON 數據集的結構,并將它加載為一個 Dataset[Row]. 可以
通過 SparkSession.read.json()去加載 JSON 文件。
注意:Spark 讀取的 JSON 文件不是傳統的 JSON 文件,每一行都應該是一個 JSON 串
?
加載json文件
val path = "/opt/module/spark-local/people.json"?
val peopleDF = spark.read.json(path)
創建臨時表
peopleDF.createOrReplaceTempView("people")
數據查詢
val resDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
4.CSV
Spark SQL 可以配置 CSV 文件的列表信息,讀取 CSV 文件,CSV 文件的第一行設置為
數據列。
spark.read.format("csv").option("sep",";").option("inferSchema","true")
.option("header", "true").load("data/user.csv")
5.MySQL
Spark SQL 可以通過 JDBC 從關系型數據庫中讀取數據的方式創建 DataFrame,通過對
DataFrame 一系列的計算后,還可以將數據再寫回關系型數據庫中。
IDEA通過JDBC對MySQL進行操作:
1)導入依賴
<dependency>
? ? <groupId>mysql</groupId>
? ? <artifactId>mysql-connector-java</artifactId>
? ? <version>5.1.27</version>
</dependency>
MySQL8 <version>8.0.11</version>
?
2)讀取數據
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")
val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
?
import spark.implicits._
//通用的load方式讀取
?
spark.read.format("jdbc")
? .option("url","jdbc:mysql://localhost:3306/system")
? .option("driver","com.mysql.jdbc.Driver")//com.mysql.cj.jdbc.Driver
? .option("user","root")
? .option("password","123456")
? .option("dbtable","user")
? .load().show()
?
spark.stop()
?
?
?
?
//通用的load方法的另一種形式
spark.read.format("jdbc")
? .options(
? ? Map("url"->"jdbc:mysql://localhost:3306/system?user=root&password=123456","dbtable"->"user","driver"->"com.mysql.jdbc.Driver"))
? .load().show()
?
?
//通過JDBC
val pros :Properties = new Properties()
pros.setProperty("user","root")
pros.setProperty("password","123456")
val df :DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/system","user",pros)
df.show()
?
1)寫入數據
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")
val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
?
import spark.implicits._
val rdd: RDD[Stu] = spark.sparkContext.makeRDD(List(Stu("lisi", 20),
? Stu("zs", 30)))
val ds:Dataset[Stu] = rdd.toDS()
?
ds.write.format("jdbc")
? .option("url","jdbc:mysql://localhost:3306/system")
? .option("driver","com.mysql.jdbc.Driver")
? .option("user","root")
? .option("password","123456")
? .option("dbtable","user2")
? .mode(SaveMode.Append)
? .save()
?
spark.stop()
?
二.Spark-SQL核心編程(五)
1.Spark-SQL連接Hive
Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL 編譯時可以包含 Hive 支持,也可以不包含。包含 Hive 支持的 Spark SQL 可以支持 Hive 表訪問、UDF (用戶自定義函數)、Hive 查詢語言(HQL)等。需要強調的一點是,如果要在 Spark SQL 中包含Hive 的庫,并不需要事先安裝 Hive。一般來說,最好還是在編譯 Spark SQL 時引入 Hive支持,這樣就可以使用這些特性了。
使用方式分為內嵌Hive、外部Hive、Spark-SQL CLI、Spark beeline 以及代碼操作。
1)內嵌的 HIVE
如果使用 Spark 內嵌的 Hive, 則什么都不用做, 直接使用即可。但是在實際生產活動當中,幾乎沒有人去使用內嵌Hive這一模式。
2)外部的 HIVE
在虛擬機中下載以下配置文件:
?
如果想在spark-shell中連接外部已經部署好的 Hive,需要通過以下幾個步驟:
Spark 要接管 Hive 需要把 hive-site.xml 拷貝到 conf/目錄下,并將url中的localhost改為node01
?
?
?
把 MySQL 的驅動 copy 到 jars/目錄下
?
把 core-site.xml 和 hdfs-site.xml 拷貝到 conf/目錄下
重啟 spark-shell
?
?
3)運行 Spark beeline(了解)
Spark Thrift Server 是 Spark 社區基于 HiveServer2 實現的一個 Thrift 服務。旨在無縫兼容HiveServer2。因為 Spark Thrift Server 的接口和協議都和 HiveServer2 完全一致,因此我們部署好 Spark Thrift Server 后,可以直接使用 hive 的 beeline 訪問 Spark Thrift Server 執行相關語句。Spark Thrift Server 的目的也只是取代 HiveServer2,因此它依舊可以和 Hive Metastore進行交互,獲取到 hive 的元數據。
如果想連接 Thrift Server,需要通過以下幾個步驟:
Spark 要接管 Hive 需要把 hive-site.xml 拷貝到 conf/目錄下
把 Mysql 的驅動 copy 到 jars/目錄下
把 core-site.xml 和 hdfs-site.xml 拷貝到 conf/目錄下
啟動 Thrift Server
使用 beeline 連接 Thrift Server
beeline -u jdbc:hive2://node01:10000 -n root
4)運行Spark-SQL CLI
Spark SQL CLI 可以很方便的在本地運行 Hive 元數據服務以及從命令行執行查詢任務。在 Spark 目錄下執行如下命令啟動 Spark SQL CLI,直接執行 SQL 語句,類似于 Hive 窗口。
操作步驟:
1.將mysql的驅動放入jars/當中;
2.將hive-site.xml文件放入conf/當中;
3.運行bin/目錄下的spark-sql.cmd 或者打開cmd,在D:\spark\spark-3.0.0-bin-hadoop3.2\bin當中直接運行spark-sql
?
可以直接運行SQL語句,如下所示:
5)代碼操作Hive
1.導入依賴。
<dependency>
? ? <groupId>org.apache.spark</groupId>
? ? <artifactId>spark-hive_2.12</artifactId>
? ? <version>3.0.0</version>
</dependency>
<dependency>
? ? <groupId>org.apache.hive</groupId>
? ? <artifactId>hive-exec</artifactId>
? ? <version>2.3.3</version>
</dependency>
?
可能出現下載jar包的問題:
D:\maven\repository\org\pentaho\pentaho-aggdesigner-algorithm\5.1.5-jhyde
?
2. 將hive-site.xml 文件拷貝到項目的 resources 目錄中。
3.代碼實現。
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("hive")
val spark:SparkSession = SparkSession.builder()
? .enableHiveSupport()
? .config(sparkConf)
? .getOrCreate()
?
spark.sql("show databases").show()
spark.sql("create database spark_sql")
spark.sql("show databases").show()
?
?
注意:
1.如果在執行操作時,出現如下錯誤:
?
可以在代碼最前面增加如下代碼解決:
System.setProperty("HADOOP_USER_NAME", "node01")
此處的 node01 改為自己的 hadoop 用戶名稱
2.在開發工具中創建數據庫默認是在本地倉庫,通過參數修改數據庫倉庫的地址: config("spark.sql.warehouse.dir", "hdfs://node01:9000/user/hive/warehouse")
?