通用方式?
SparkSQL提供了通用的數據加載方式,使用spark.read.loa方法,并可通過format指定數據類型(如csv、jdbc、json、orc、parquet、textFile)。
load方法后需傳入數據路徑(針對csv、jdbc、json、orc、parquet、textFile格式)。
option方法用于設置特定格式的參數,如jdbc的url、user、password、dbtable。
特定格式加載?
Parquet?:Spark SQL的默認數據源,無需指定format即可載。
JSON?:Spark SQL能自動推測JSON數據集結構,使用spark.read.json(path)加載。注意,每行應為一個JSON串。
val path = "/opt/module/spark-local/people.json"
val peopleDF = spark.read.json(path)
查詢數據:可以通過SQL語句查詢JSON數據。
val resDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"
CSV?:需指定format為csv,并可通過option設置分隔符、是否推斷schema、是否包含表頭等信息。
MySQL?:通過JDBC從關系型數據庫讀取數據,使用spark.read.format("jdbc").option(...)方式,并傳入數據庫連接信息。
數據保存
通用方式?
使用df.write.save方法保存數據,同樣可通過format指定數據類型。
save方法后需傳入保存路徑(針對csv、orc、parquet、textFile格式)。
option方法用于設置特定格式的參數。
保存操作可使用SaveMode來指明如何處理數據,如覆蓋(overwrite)、追加(append)等,通過mode方法設置。
特定格式保存?
與加載類似,Parquet、JSON、CSV等格式均可通過指定format進行保存。
MySQL等關系型數據庫的寫入也通過JDBC實現,需指定format為jdbc,并傳入數據庫連接信息及表名。
注意事項
在處理JSON數據時,需確保文件格式符合Spark的要求,即每行一個JSON串。
在讀取CSV文件時,可通過設置option來指定分隔符、是否推斷schema等信息,以便正確解析文件內容。
在通過JDBC連接數據庫時,需確保數據庫驅動已正確導入,并正確配置數據庫連接信息。
在保存數據時,需根據實際需求選擇合適的SaveMode,以避免數據覆蓋或丟失。
Spark SQL與Hive的集成
Spark SQL可以編譯時包含Hive支持,從而提供對Hive表訪問、UDF(用戶自定義函數)、Hive查詢語言(HQL)等特性的支持。在使用時,無需事先安裝Hive,但最好在編譯Spark SQL時引入Hive支持。
IDEA通過JDBC對MySQL進行操作:
讀取數據
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")
val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
//通用的load方式讀取
spark.read.format("jdbc")
? .option("url","jdbc:mysql://localhost:3306/system")
? .option("driver","com.mysql.jdbc.Driver")//com.mysql.cj.jdbc.Driver
? .option("user","root")
? .option("password","123456")
? .option("dbtable","user")
? .load().show()
spark.stop()
//通用的load方法的另一種形式
spark.read.format("jdbc")
? .options(
??? Map("url"->"jdbc:mysql://localhost:3306/system?user=root&password=123456","dbtable"->"user","driver"->"com.mysql.jdbc.Driver"))
? .load().show()
//通過JDBC
val pros :Properties = new Properties()
pros.setProperty("user","root")
pros.setProperty("password","123456")
val df :DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/system","user",pros)
df.show()
?寫入數據
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")
val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val rdd: RDD[Stu] = spark.sparkContext.makeRDD(List(Stu("lisi", 20),
? Stu("zs", 30)))
val ds:Dataset[Stu] = rdd.toDS()
ds.write.format("jdbc")
? .option("url","jdbc:mysql://localhost:3306/system")
? .option("driver","com.mysql.jdbc.Driver")
? .option("user","root")
? .option("password","123456")
? .option("dbtable","user2")
? .mode(SaveMode.Append)
? .save()
spark.stop()