1. 添加依賴
在項目的 `pom.xml`(Maven)中添加以下依賴:
```xml
<!-- Spark SQL -->
<dependency>
? ? <groupId>org.apache.spark</groupId>
? ? <artifactId>spark-sql_2.12</artifactId>
? ? <version>3.3.0</version>
</dependency>
?
<!-- MySQL Connector -->
<dependency>
? ? <groupId>mysql</groupId>
? ? <artifactId>mysql-connector-java</artifactId>
? ? <version>8.0.33</version>
</dependency>
代碼
import org.apache.spark.sql.{SparkSession, SaveMode}
object SparkMySQLDemo {
? def main(args: Array[String]): Unit = {
? ? // 創建 SparkSession
? ? val spark = SparkSession.builder()
? ? ? .appName("SparkMySQLDemo")
? ? ? .master("local[*]") // 生產環境需改為集群模式,如 yarn
? ? ? .config("spark.sql.shuffle.partitions", "5") // 優化分區數
? ? ? .getOrCreate()
? ? // 設置 MySQL 連接參數
? ? val jdbcUrl = "jdbc:mysql://localhost:3306/your_database"
? ? val jdbcUsername = "your_username"
? ? val jdbcPassword = "your_password"
? ? try {
? ? ? // 從 MySQL 讀取數據
? ? ? val df = spark.read
? ? ? ? .format("jdbc")
? ? ? ? .option("url", jdbcUrl)
? ? ? ? .option("dbtable", "source_table") // 要讀取的表名
? ? ? ? .option("user", jdbcUsername)
? ? ? ? .option("password", jdbcPassword)
? ? ? ? .load()
? ? ? // 執行計算(示例:按 category 分組求和)
? ? ? val resultDF = df.groupBy("category")
? ? ? ? .agg(
? ? ? ? ? sum("amount").alias("total_amount"),
? ? ? ? ? count("*").alias("record_count")
? ? ? ? )
? ? ? // 打印計算結果(調試用)
? ? ? resultDF.show()
? ? ? // 將結果寫入 MySQL
? ? ? resultDF.write
? ? ? ? .format("jdbc")
? ? ? ? .option("url", jdbcUrl)
? ? ? ? .option("dbtable", "result_table") // 目標表名
? ? ? ? .option("user", jdbcUsername)
? ? ? ? .option("password", jdbcPassword)
? ? ? ? .mode(SaveMode.Append) // 寫入模式:覆蓋/追加
? ? ? ? .save()
? ? ? println("數據寫入 MySQL 成功!")
? ? } catch {
? ? ? case e: Exception => e.printStackTrace()
? ? } finally {
? ? ? spark.stop()
? ? }
? }
}