數據加載與保存
通用方式:
????????SparkSQL 提供了通用的保存數據和數據加載的方式。這里的通用指的是使用相同的API,根據不同的參數讀取和保存不同格式的數據,SparkSQL 默認讀取和保存的文件格式為parquet
?
數據加載方法:
????????spark.read.load 是加載數據的通用方法。如果讀取不同格式的數據,可以對不同的數據格式進行設定。
????????spark.read.format("…")[.option("…")].load("…")
三種加載數據的方法:
????????使用?option
?參數加載數據,在"jdbc"格式下需要傳入 JDBC 相應參數,url、user、password 和 dbtable
????????????????????????(適用于需要傳入數據庫連接信息的情況。)
????????使用?load方法加載數據,在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要傳入加載數據的路徑。
????????????????????????(適用于指定數據路徑和類型的情況。)
?????????使用format?加載數據,指定加載的數據類型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和
"textFile"。
前面都是使用 read API 先把文件加載到 DataFrame 然后再查詢,其實,我們也可以直接在文件上進行查詢: 文件格式.'文件路徑'
spark.sql("select * from json.’ Spark-SQL/input/user.json’").show
?
數據保存方法:
主要介紹了兩種保存數據的方法,一種是df write.save的通用方法,另一種是通過指定format、option和save(需要指定數據格式和保存路徑的情況)路徑來保存。
format("…"):指定保存的數據類型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。
save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要傳入保存數據的路徑。
option("…"):在"jdbc"格式下需要傳入 JDBC 相應參數,url、user、password 和 dbta
????????save mode的不同選項,如append、error、overwrite和ignore,以及它們在文件已存在時的處理方式。
?
Parquet
????????Spark SQL 的默認數據源為 Parquet 格式。Parquet 是一種能夠有效存儲嵌套數據的列式
存儲格式。
????????加載數據:
????????????????val df = spark.read.load("examples/src/main/resources/users.parquet")
????????保存數據:
????????????????var df = spark.read.json("/opt/module/data/input/people.json")
????????????????df.write.mode("append").save("/opt/module/data/output")
?
?
?
?
數據格式與數據源
默認數據源介紹了 Spark 的默認數據源,能夠存儲嵌套數據,簡化了數據操作。強調了默認數據源的便利性,通常不需要修改配置。
JSON
????????JSON數據處理:
????????spark SQL自動檢測JSON數據集的結構,并將其加載為dataset。
????????可以通過 SparkSession.read.json()去加載 JSON 文件。
????????強調了spark中讀取的JSON文件每一行應為一個json串。
加載json文件
val path = "/opt/module/spark-local/people.json"
val peopleDF = spark.read.json(path)
創建臨時表
peopleDF.createOrReplaceTempView("people")
數據查詢
val resDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
?
CSV 數據
?CSV 文件的讀取方法,通常用于簡單的數據導入。
Spark SQL 可以配置 CSV 文件的列表信息,讀取 CSV 文件,CSV 文件的第一行設置為
數據列。
spark.read.format("csv").option("sep",";").option("inferSchema","true")
.option("header", "true").load("data/user.csv")
?
MySQL 數據操作
連接與加載
通過 JDBC 連接 MySQL 數據庫并加載數據的方法。
強調:驅動版本與 MySQL 版本匹配的重要性。
介紹了三種加載數據的方式:使用?option
?參數逐個設置連接信息。使用?options
?參數在 URL 中融合連接信息。使用?spark.read.jdbc
?方法直接傳入 JDBC 參數。
寫入數據
通過 JDBC 將數據寫入 MySQL 數據庫的方法。
舉例說明了如何創建 RDD 并將其轉換為 DataFrame 進行寫入操作。
強調了?save mode
?在寫入操作中的應用。
1) 導入依賴
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version>
</dependency>
MySQL8 <version>8.0.11</version>2) 讀取數據
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")
val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._
//通用的load方式讀取spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/system").option("driver","com.mysql.jdbc.Driver")//com.mysql.cj.jdbc.Driver.option("user","root").option("password","123456").option("dbtable","user").load().show()spark.stop()//通用的load方法的另一種形式
spark.read.format("jdbc").options(Map("url"->"jdbc:mysql://localhost:3306/system?user=root&password=123456","dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show()//通過JDBC
val pros :Properties = new Properties()
pros.setProperty("user","root")
pros.setProperty("password","123456")
val df :DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/system","user",pros)
df.show()3) 寫入數據
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")
val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._
val rdd: RDD[Stu] = spark.sparkContext.makeRDD(List(Stu("lisi", 20),Stu("zs", 30)))
val ds:Dataset[Stu] = rdd.toDS()ds.write.format("jdbc").option("url","jdbc:mysql://localhost:3306/system").option("driver","com.mysql.jdbc.Driver").option("user","root").option("password","123456").option("dbtable","user2").mode(SaveMode.Append).save()spark.stop()
?
Spark-SQL連接Hive
連接方式:內嵌Hive、外部Hive、Spark-SQL CLI、Spark beeline 以及代碼操作。
????????內嵌HIVE:在生產環境中幾乎不使用內嵌Hive模式。
????????外部HIVE:需要與虛擬機中的Hive相連,需下載并配置PS ML、CORE杠set SML、HDFS等文件,并修改配置文件以指向虛擬機的Have。
在虛擬機中下載以下配置文件
如果想在spark-shell中連接外部已經部署好的 Hive,需要通過以下幾個步驟:
Spark 要接管 Hive 需要把 hive-site.xml 拷貝到 conf/目錄下,并將url中的localhost改為node01
?
?
驅動放置:MySQL驅動?copy 需要放到 jars/目錄下
把 core-site.xml 和 hdfs-site.xml 拷貝到 conf/目錄下
重啟 spark-shell
?
運行Spark-SQL CLI
Spark SQL CLI 可以很方便的在本地運行 Hive 元數據服務以及從命令行執行查詢任務。在 Spark 目錄下執行如下命令啟動 Spark SQL CLI,直接執行 SQL 語句,類似于 Hive 窗口。
操作步驟:
- 將mysql的驅動放入jars/當中;
- 將hive-site.xml文件放入conf/當中;
- 運行bin/目錄下的spark-sql.cmd 或者打開cmd,在
D:\spark\spark-3.0.0-bin-hadoop3.2\bin當中直接運行spark-sql
可以直接運行SQL語句,如下所示:
?
運行Spark-SQL CLI的使用:
????????通過spark-sql.?cmd運行,可以直接輸入MySQL語句,不需要SQL括號和雙引號。
????????驅動和配置文件的放置位置與外部Hive相同。
?
導入依賴:需要導入與Spark版本一致的依賴包(如3.0.0版本),并與Hive版本保持一致。
虛擬機運行:強調所有操作需要在虛擬機運行的情況下進行,除非使用IDEA。
代碼實現:導入必要的包。創建配置對象和SQL對象,輸入SQL語句以展示數據庫和數據表。
?