作者:IvanCodes
日期:2025年7月20日
專欄:Spark教程
本教程將從零開始,一步步指導您如何在 IntelliJ IDEA 中搭建一個基于 Maven 和 Scala 的 Spark 開發環境,并最終完成經典的 WordCount 案例。
一、創建 Maven 項目并配置 Scala 環境
1.1 新建 Maven 項目
首先,我們需要在 IDEA 中創建一個基礎的 Maven 項目。
- 打開 IntelliJ IDEA,點擊新建項目。
- 在彈出的窗口中,按照下圖的數字順序進行配置:
- 選擇新建項目。
- 左側列表選擇 Java。
- 構建系統選擇 Maven。
- 為項目選擇一個 JDK,推薦JDK 11 (這里演示用 Oracle OpenJDK 22)。
- 點擊創建。
1.2 為項目添加 Scala 框架支持
默認創建的是一個純 Java 的 Maven 項目,我們需要為它添加 Scala 支持。
- 在項目結構視圖中,右鍵點擊項目根目錄 (例如
SparkBasis
,后面的SparkCore,SparkRDD等等其他的也同理)。 - 在彈出的菜單中選擇打開模塊設置 或按快捷鍵
F4
。
- 在項目結構 窗口中:
- 確保左側選擇了模塊。
- 中間面板會顯示當前的項目模塊 (例如
SparkBasis
)。 - 點擊上方的 “+” 號按鈕來添加框架。
- 在彈出的菜單中,選擇 Scala。
1.3 配置 Scala SDK
添加 Scala 框架后,IDEA 會提示你配置 Scala SDK。
- 在彈出的“添加 Scala 支持”窗口中,按照下圖的數字順序操作:
- 如果“使用庫”下拉框中沒有可用的 Scala SDK,點擊創建 按鈕。
- 在下載 Scala SDK的窗口中,選擇一個版本。非常重要:這個版本必須與你稍后要在
pom.xml
中配置的 Spark 依賴版本相匹配。例如,Spark 3.x 版本通常對應 Scala 2.12 或 2.13。這里我們選擇 2.13.16。 - 點擊下載,IDEA 會自動下載并配置。
- 下載完成后,在選擇 Scala SDK的窗口中確認版本。
- 點擊確定。
- 返回到“添加 Scala 支持”窗口,再次點擊確定。
- 最后,在項目結構窗口點擊應用。
- 點擊確定 關閉窗口。
- 配置完成后,你的項目結構會發生變化,IDEA 會自動識別 Scala 源代碼。你可以在
src/main
目錄下新建一個scala
目錄 (如果不存在),并將其標記為源代碼根目錄。可以創建一個簡單的Hello.scala
對象來測試環境是否配置成功。
二、配置 Maven 依賴與日志系統
為了使用 Spark 并擁有一個干凈的運行環境,我們需要做兩件事:1) 在 pom.xml
文件中添加 Spark 的相關庫作為項目依賴;2) 配置日志系統,避免 Spark 運行時輸出過多的調試信息。
2.1 配置 Maven 依賴 (pom.xml)
pom.xml
文件是 Maven 項目的核心配置文件,它告訴 Maven 我們的項目需要哪些外部庫 (JARs)。
- 打開項目根目錄下的
pom.xml
文件。 - 在
<dependencies>
標簽內,添加spark-core
和spark-sql
的依賴。同時,為了避免 Spark 啟動時出現關于SLF4J
的警告,我們需要顯式添加一個日志實現庫,如slf4j-log4j12
。 - 為了加速依賴下載,可以配置一個國內的 Maven 鏡像倉庫,如阿里云。
以下是完整的 pom.xml
核心配置代碼:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.example</groupId><artifactId>SparkBasis</artifactId><version>1.0-SNAPSHOT</version></parent><artifactId>SparkCore</artifactId><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.32</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.13</artifactId><version>3.5.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.13</artifactId><version>3.5.1</version></dependency></dependencies><repositories><repository><id>aliyunmaven</id><name>Alibaba Cloud Maven</name><url>https://maven.aliyun.com/repository/public</url></repository></repositories></project>
重要提示:
artifactId
中的_2.13
必須與您在第一步中配置的 Scala SDK 主版本完全一致。- Spark 的版本 (
3.5.1
) 最好選擇一個穩定且常用的版本。
編輯完 pom.xml
后,IDEA 通常會自動提示或在右上角顯示一個Maven刷新圖標,點擊它讓 Maven 重新加載項目并下載新添加的依賴。
2.2 配置日志屬性 (log4j.properties)
當您首次運行 Spark 程序時,會發現控制臺被大量的INFO
級別日志刷屏,這些是 Spark 內部組件的運行日志,它們會淹沒我們自己程序的輸出結果,給調試帶來困擾。
為了讓輸出更清爽,只顯示警告 (WARN) 和錯誤 (ERROR) 級別的日志,我們可以通過添加一個 log4j.properties
文件來控制日志級別。
- 在項目的
src/main
目錄下,右鍵點擊 -> 新建 -> 目錄,創建一個名為resources
的目錄。 - 在
src/main/resources
目錄下,右鍵點擊 -> 新建 -> 文件,創建一個名為log4j.properties
的文件。
- 將以下內容復制到
log4j.properties
文件中:
# 將根日志級別設置為ERROR,這樣所有INFO和WARN信息都會被隱藏
log4j.rootCategory=ERROR, console# --- 配置控制臺輸出的格式 ---
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n# --- 為Spark-shell單獨設置日志級別(可選) ---
# 運行spark-shell時,此級別會覆蓋根日志級別,以便為shell和常規應用設置不同級別
log4j.logger.org.apache.spark.repl.Main=WARN# --- 將一些特別“吵”的第三方組件的日志級別單獨調高 ---
log4j.logger.org.sparkproject.jetty=ERROR
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.spark.parquet=ERROR
log4j.logger.parquet=ERROR
配置好這兩個文件后,您的 Spark 項目就具備了必要的依賴庫和一個清爽的日志環境,可以準備進行下一步的開發了。
三、Windows 環境配置 (解決 winutils.exe
問題)
在 Windows 系統上直接運行 Spark 代碼,通常會因為缺少 Hadoop 的本地庫而報錯 (例如 NullPointerException
)。我們需要手動配置 winutils.exe
和 hadoop.dll
。
3.1 下載 winutils.exe 和 hadoop.dll
- 訪問
winutils
的 GitHub 倉庫:https://github.com/cdarlint/winutils/ - 根據你使用的Hadoop版本選擇對應的目錄。重要提示:Spark 3.5.1 通常與 Hadoop 3.3.x 版本兼容。因此我們進入
hadoop-3.3.5/bin
目錄。- GitHub倉庫鏈接:https://github.com/cdarlint/winutils/tree/master/hadoop-3.3.5/bin
- 在該目錄中,分別找到
hadoop.dll
和winutils.exe
文件,并點擊下載按鈕將它們保存到本地。
3.2 創建目錄并放置文件
- 在你的電腦上創建一個不含中文和空格的路徑作為 Hadoop 的主目錄,例如
C:\hadoop
。 - 在該目錄下再創建一個
bin
子目錄,即C:\hadoop\bin
。 - 將剛剛下載的
winutils.exe
和hadoop.dll
兩個文件復制到C:\hadoop\bin
文件夾中。
3.3 配置環境變量
為了讓系統和 Spark 能找到這些文件,需要配置兩個環境變量:HADOOP_HOME
和 Path
。
- 以管理員身份打開
PowerShell
。 - 執行以下兩條命令來設置系統級別的環境變量:
設置 HADOOP_HOME:
[System.Environment]::SetEnvironmentVariable('HADOOP_HOME', 'C:\hadoop', 'Machine')
將 HADOOP_HOME\bin 添加到 Path:
[System.Environment]::SetEnvironmentVariable('Path', ([System.Environment]::GetEnvironmentVariable('Path', 'Machine') + ';C:\hadoop\bin'), 'Machine')
- 配置完成后,重啟 IntelliJ IDEA (甚至重啟電腦) 以確保環境變量生效。
四、WordCount 案例實戰
環境全部準備就緒后,我們來編寫 WordCount 程序。
4.1 方法一:純 Scala 實現
這種方法不使用 Spark,僅用 Scala 自身的集合操作來處理本地文件,用于對比和理解基本邏輯。
代碼 (WordCount01.scala
):
package Spark.Core.WordCountimport scala.io.Sourceobject WordCount01 {def main(args: Array[String]): Unit = {// 1、文件路徑val filePaths = Seq("E:\\BigData\\Spark\\SparkPractice\\SparkBasis\\Datas\\WordCount\\1.txt","E:\\BigData\\Spark\\SparkPractice\\SparkBasis\\Datas\\WordCount\\2.txt")// 讀取所有文件內容val words = filePaths.flatMap(path => Source.fromFile(path).getLines()).flatMap(_.split("\\s+"))// 將單詞轉換成鍵值對形式val wordcounts = words.groupBy(word => word).map(kv => (kv._1, kv._2.size))wordcounts.foreach(println)}
}
4.2 方法二:Spark RDD 實現 (使用 reduceByKey )
這是最經典、最高效的 Spark WordCount 實現方式。
代碼 (WordCount02.scala
):
package Spark.Core.WordCountimport org.apache.spark.{SparkConf, SparkContext}object WordCount02 {def main(args: Array[String]): Unit = {// 1、創建 Spark 運行上下文val conf = new SparkConf().setAppName("WordCount_Reduce").setMaster("local[*]")val sc = new SparkContext(conf)// 2、讀取 textFile 獲取文件// 讀取單個或多個文件val linesRdd = sc.textFile("E:\\BigData\\Spark\\SparkPractice\\SparkBasis\\Datas\\WordCount\\*")// 3、扁平化操作val wordsRdd = linesRdd.flatMap(line => line.split("\\s+"))// 4、結構轉換(單詞, 1)val pairRdd = wordsRdd.map(word => (word, 1))// 5、利用 reduceByKey 完成聚合val wordCountsRdd = pairRdd.reduceByKey((x, y) => x + y)wordCountsRdd.collect().foreach(println)sc.stop()}
}
4.3 方法三:Spark RDD 實現 (使用 groupByKey )
這種方法也能實現 WordCount,但通常性能不如 reduceByKey
,因為它會導致大量的數據在網絡中Shuffle。
代碼 (WordCount03.scala
):
package Spark.Core.WordCountimport org.apache.spark.{SparkConf, SparkContext}object WordCount03 {def main(args: Array[String]): Unit = {// 1、創建 Spark 運行上下文val conf = new SparkConf().setAppName("WordCount_PatternMatching").setMaster("local[*]")val sc = new SparkContext(conf)// 2、讀取 textFile 獲取文件// 讀取單個或多個文件val linesRdd = sc.textFile("E:\\BigData\\Spark\\SparkPractice\\SparkBasis\\Datas\\WordCount\\*")// 3、扁平化操作val wordsRdd = linesRdd.flatMap(line => line.split("\\s+"))// 4、結構轉換val pairRdd = wordsRdd.map(word => (word, 1))// 5、利用 groupByKey 對 key 進行分組,再對 value 值進行聚合val groupedRdd = pairRdd.groupByKey()// 6、(自己選擇) 利用 map 將每個元素處理成最終結果val wordCountsRdd = groupedRdd.map {case (word, ones) => (word, ones.sum)// case (word, ones) => (word, ones.size) // 對于 (word, 1) 的情況, .size 和 .sum 結果一樣}wordCountsRdd.collect().foreach(println)sc.stop()}
}
總結
至此,您已經成功完成了在 IntelliJ IDEA 中搭建 Spark 開發環境的全過程,包括項目創建、Scala配置、Maven依賴管理,以及解決 Windows 環境下的關鍵問題,并通過三種不同的方式實現了 WordCount 案例。現在,您可以在這個強大的環境中開始您的Spark開發之旅了!