以下是使用 Spark 進行數據提取(讀取)和保存(寫入)的常見場景及代碼示例(基于 Scala/Java/Python,不含圖片操作):
?
一、數據提取(讀取)
?
1. 讀取文件數據(文本/CSV/JSON/Parquet 等)
?
Scala
?
scala ??
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
? .appName("Data Read")
? .getOrCreate()
// 讀取 CSV(含表頭)
val csvDf = spark.read.format("csv")
? .option("header", "true")
? .option("inferSchema", "true") // 自動推斷數據類型
? .load("path/to/csv/file.csv")
// 讀取 JSON
val jsonDf = spark.read.json("path/to/json/file.json")
// 讀取 Parquet(Spark 原生格式,高效)
val parquetDf = spark.read.parquet("path/to/parquet/dir")
?
?
Python
?
python ??
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Data Read").getOrCreate()
# 讀取 CSV
csv_df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)
# 讀取 JSON
json_df = spark.read.json("path/to/json/file.json")
# 讀取 Parquet
parquet_df = spark.read.parquet("path/to/parquet/dir")
?
?
2. 讀取數據庫數據(如 MySQL/Hive)
?
Scala(以 MySQL 為例)
?
scala ??
val jdbcDf = spark.read.format("jdbc")
? .option("url", "jdbc:mysql://host:port/db?useSSL=false")
? .option("dbtable", "table_name")
? .option("user", "username")
? .option("password", "password")
? .load()
?
?
Python(以 Hive 為例,需啟用 Hive 支持)
?
python ??
# 讀取 Hive 表(需在 SparkSession 中啟用 Hive)
hive_df = spark.sql("SELECT * FROM hive_table")
?
?
二、數據保存(寫入)
?
1. 保存為文件(CSV/JSON/Parquet 等)
?
Scala
?
scala ??
// 保存為 CSV(覆蓋模式,含表頭)
csvDf.write.format("csv")
? .option("header", "true")
? .mode("overwrite") // 模式:overwrite/append/ignore/errorIfExists
? .save("output/csv_result")
// 保存為 Parquet(分區存儲,提升查詢性能)
parquetDf.write.partitionBy("category") // 按字段分區
? .mode("append")
? .parquet("output/parquet_result")
?
?
Python
?
python ??
# 保存為 JSON
json_df.write.json("output/json_result", mode="overwrite")
# 保存為 Parquet(指定壓縮格式)
parquet_df.write.parquet("output/parquet_result", compression="snappy")
?
?
2. 保存到數據庫(如 MySQL/Hive)
?
Scala(以 MySQL 為例)
?
scala ??
jdbcDf.write.format("jdbc")
? .option("url", "jdbc:mysql://host:port/db?useSSL=false")
? .option("dbtable", "target_table")
? .option("user", "username")
? .option("password", "password")
? .mode("append") // 追加模式
? .save()
?
?
Python(以 Hive 為例)
?
python ??
# 保存為 Hive 表(需啟用 Hive 支持)
hive_df.write.saveAsTable("hive_target_table", mode="overwrite")
?
?
三、關鍵參數說明
?
1.?讀取模式(文件)
?
- ?inferSchema?: 是否自動推斷數據類型(適用于 CSV/JSON,需讀取少量數據,影響性能)。
?
- ?header?: CSV 是否包含表頭(?true/false?)。
?
2.?寫入模式(?mode?)
?
- ?overwrite?: 覆蓋已有數據。
?
- ?append?: 追加到現有數據。
?
- ?ignore?: 忽略寫入(不報錯)。
?
- ?errorIfExists?: 存在則報錯(默認)。
?
3.?數據庫連接
?
- 需添加對應數據庫驅動(如 MySQL 的 ?mysql-connector-java?)。
?
- 對于大規模數據,建議使用分區并行寫入(如 ?option("numPartitions", "4")?)。
?
四、典型場景示例
?
場景:從 MySQL 讀取數據,清洗后保存為 Parquet
?
scala ??
// 讀取 MySQL 數據
val mysqlDf = spark.read.jdbc(
? url = "jdbc:mysql://host:port/source_db",
? dbtable = "source_table",
? properties = Map("user" -> "u", "password" -> "p")
)
// 數據清洗(示例:過濾空值)
val cleanedDf = mysqlDf.na.drop("any")
// 保存為 Parquet(按日期分區)
cleanedDf.write.partitionBy("date")
? .parquet("output/cleaned_data")
?
?
通過以上方法,可靈活使用 Spark 完成數據提取和保存任務,支持多種數據源和格式。