以下是使用Spark連接MySQL數據庫、添加數據和讀取數據的步驟(基于Scala API):
?
1. 準備工作
?
- 添加MySQL驅動依賴
在Spark項目中引入MySQL Connector JAR包(如?mysql-connector-java-8.0.33.jar?),或通過Spark提交命令指定:
bash
spark-submit --jars mysql-connector-java-8.0.33.jar your_app.jar
?
?
- 確保MySQL服務運行
確認數據庫、表存在,且Spark所在節點可訪問MySQL端口(默認3306)。
?
2. 讀取MySQL數據
?
scala
import org.apache.spark.sql.SparkSession
?
val spark = SparkSession.builder()
? .appName("Spark MySQL Example")
? .master("local[*]") // 或集群地址
? .getOrCreate()
?
// 讀取參數配置
val jdbcUrl = "jdbc:mysql://localhost:3306/test_db?useSSL=false&useUnicode=true&characterEncoding=utf8"
val table = "users" // 表名
val user = "root"
val password = "your_password"
?
// 讀取數據為DataFrame
val df = spark.read.format("jdbc")
? .option("url", jdbcUrl)
? .option("dbtable", table)
? .option("user", user)
? .option("password", password)
? .load()
?
// 顯示數據
df.show()
?
?
3. 寫入數據到MySQL
?
scala
// 假設已有待寫入的DataFrame(如dfToSave)
val writeMode = "append" // 寫入模式:append(追加)、overwrite(覆蓋)、ignore(忽略重復)、errorIfExists(沖突報錯)
?
dfToSave.write.format("jdbc")
? .option("url", jdbcUrl)
? .option("dbtable", "new_users") // 目標表名
? .option("user", user)
? .option("password", password)
? .option("driver", "com.mysql.cj.jdbc.Driver") // 驅動類(可選,Spark會自動推斷)
? .mode(writeMode)
? .save()
?
?
關鍵參數說明
?
- ?jdbcUrl?:MySQL連接URL,需指定數據庫名和字符編碼(避免中文亂碼)。
?
- ?dbtable?:支持直接寫表名,或子查詢(如?"(SELECT * FROM users WHERE age > 18) AS subquery"?)。
?
- ?writeMode?:控制寫入行為,根據需求選擇模式。
?
注意事項
?
1.?驅動版本匹配:確保MySQL驅動版本與數據庫版本兼容(如MySQL 8.0+對應?mysql-connector-java 8.0+?)。
?
2.?分區并行讀取:若數據量大,可添加?partitionColumn?、?lowerBound?、?upperBound?參數并行讀取:
scala
.option("partitionColumn", "id") // 分區字段(需為數字類型)
.option("lowerBound", "1") // 分區最小值
.option("upperBound", "1000") // 分區最大值
.option("numPartitions", "4") // 分區數(并行度)
?
?
3.?事務支持:MySQL JDBC寫入不保證事務原子性,大規模寫入建議使用批量操作或外部工具(如Sqoop)。
?
通過以上代碼,可實現Spark與MySQL的數據交互。