?Spark SQL 的數據源------通用的數據 加載/保存功能
? ? ? ? ? ?Spark SQL支持通過DataFrame接口在各種數據源上進行操作。DataFrame可以使用關系變換進行操作,也可以用來創建臨時視圖。將DataFrame? ? ? 注冊為臨時視圖允許您對其數據運行SQL查詢。本節介紹使用Spark Data Sources加載和保存數據的一般方法,然后介紹可用于內置數據源的特定選? ? ? ? 項。
?1, 常用的加載和保存功能。
? ? ?最簡單的形式,默認的數據源(parquet除非另有配置 spark.sql.sources.default)將用于所有的操作。
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
?2,手動指定選項
? ? ? 您也可以手動指定將要使用的數據源以及您想要傳遞給數據源的其他選項。數據源通過其全名指定(即org.apache.spark.sql.parquet),但內置的來源,你也可以使用自己的短名稱(json,parquet,jdbc,orc,libsvm,csv,text)。從任何數據源類型加載的數據框可以使用此語法轉換為其他類型。
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
3,?直接在文件上運行SQL ? ? ? ?您可以使用SQL直接查詢該文件,而不是使用讀取API將文件加載到DataFrame中并進行查詢。
val peopleDFCsv = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("examples/src/main/resources/people.csv")val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
其中people.csv的數據為: name;age;job
Jorge;30;Developer
Bob;32;Developer
4,保存模式 ? ??保存操作可以選擇一個Save Mode,指定如何處理現有的數據(如果存在)。認識到這些保存模式不使用任何鎖定數據而不是原子性的操作數據是很重要的。另外,執行時重寫數據,數據在寫出新數據之前將被刪除。常見類型如下:
Scala/Java | ? ? ?Any Language | Meaning |
SaveMode.ErrorIfExists (default) | ? "error" (default)? ? ? | 如果數據已經存在,將DataFrame保存到數據源時,則預計會拋出異常。 |
SaveMode.Append | "append" | 如果data / table已經存在,將DataFrame保存到數據源時,則DataFrame的內容將被添加到現有數據中。 |
SaveMode.Overwrite | "overwrite" | 覆蓋模式意味著將DataFrame保存到數據源時,如果data / table已經存在,則現有數據將被DataFrame的內容覆蓋。 |
SaveMode.Ignore | "ignore"? ? ? ? ? ? ? ? ?? | 忽略模式意味著,當將DataFrame保存到數據源時,如果數據已經存在,保存操作將不會保存DataFrame的內容,也不會更改現有數據。這與CREATE TABLE IF NOT EXISTSSQL中的類似。 |
? ? ? ? ?DataFrames也可以使用該saveAsTable 命令將其作為持久表保存到Hive Metastore中。請注意,現有的Hive部署對于使用此功能不是必需的。Spark將為您創建一個默認的本地Hive Metastore(使用Derby)。與createOrReplaceTempView命令不同的是, saveAsTable將實現DataFrame的內容并創建指向Hive Metastore中的數據的指針。即使您的Spark程序重新啟動后,永久性表格仍然存在,只要您保持與同一Metastore的連接即可。用于持久表的DataFrame可以通過使用表的名稱調用tablea方法來創建SparkSession。
? ? ? ? 對于基于文件的數據源,例如文本,parquet,json等,您可以通過path選項指定一個自定義表格路徑 ,例如df.write.option("path", "/some/path").saveAsTable("t")。當表被刪除時,自定義表路徑將不會被刪除,表數據仍然存在。如果沒有指定自定義表格路徑,Spark會將數據寫入倉庫目錄下的默認表格路徑。當表被刪除時,默認的表路徑也將被刪除。
? ? ? ?從Spark 2.1開始,持久數據源表具有存儲在Hive Metastore中的每個分區元數據。這帶來了幾個好處:
? ? ? ? ? ?1) 由于Metastore只能返回查詢所需的分區,因此不再需要發現第一個查詢的所有分區。
? ? ? ? ? ?2) Hive DDL如ALTER TABLE PARTITION ... SET LOCATION現在可用于使用Datasource API創建的表。
請注意,創建外部數據源表(具有path選項的那些表)時,默認情況下不會收集分區信息。要同步Metastore中的分區信息,可以調用MSCK REPAIR TABLE。
6,Bucketing(分段), Sorting(排序) and Partitioning(分區)
? ? ?對于基于文件的數據源,也可以對輸出進行分類。分段和排序僅適用于持久表:
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
? ??而分區則可以同時使用save和saveAsTable使用數據集API。 usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
? 同時也可以對單個表使用分區和分區:
peopleDF.write.partitionBy("favorite_color").bucketBy(42, "name").saveAsTable("people_partitioned_bucketed")
partitionBy
創建一個目錄結構,如“?Partition Discovery?”部分所述。因此,對基數高的柱子的適用性有限。相比之下?bucketBy
,通過固定數量的桶分配數據,并且可以在大量唯一值無界時使用。上述完整的例子代碼如下:
private def runBasicDataSourceExample(spark: SparkSession): Unit = {val usersDF = spark.read.load("examples/src/main/resources/users.parquet")usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")val peopleDFCsv = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("examples/src/main/resources/people.csv")val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")peopleDF.write.partitionBy("favorite_color").bucketBy(42, "name").saveAsTable("people_partitioned_bucketed")spark.sql("DROP TABLE IF EXISTS people_bucketed")spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed")}
其中people.json測試數據如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}