Scala版
1)創建項目
增加 Scala 插件
Spark 由 Scala 語言開發的,咱們當前使用的 Spark 版本為 3.2.0,默認采用的 Scala 編譯版本為 2.13,所以后續開發時。我們依然采用這個版本。開發前請保證 IDEA 開發工具中含有 Scala 開發插件
創建Maven工程
創建Maven Project工程,GAV如下:
GroupId | ArtifactId | Version |
---|---|---|
com.clear.spark | bigdata-spark_2.13 | 1.0 |
創建Maven Module工程,GAV如下:
GroupId | ArtifactId | Version |
---|---|---|
com.clear.spark | spark-core | 1.0 |
POM
<repositories><!-- 指定倉庫的位置,依次為aliyun、cloudera、jboss --><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>jboss</id><url>https://repository.jboss.com/nexus/content/groups/public/</url></repository>
</repositories><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.13.5</scala.version><scala.binary.version>2.13</scala.binary.version><spark.version>3.2.0</spark.version><hadoop.version>3.1.3</hadoop.version>
</properties><dependencies><!-- 依賴Scala語言--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- Spark Core 依賴 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><!-- Hadoop Client 依賴 --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency>
</dependencies><build><outputDirectory>target/classes</outputDirectory><testOutputDirectory>target/test-classes</testOutputDirectory><resources><resource><directory>${project.basedir}/src/main/resources</directory></resource></resources><plugins><!-- maven 編譯插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.10.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><encoding>UTF-8</encoding></configuration></plugin><!-- 該插件用于將 Scala 代碼編譯成 class 文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><!-- 聲明綁定到 maven 的 compile 階段 --><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins>
</build>
<dependencies><!-- spark-core依賴--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.13</artifactId><version>3.2.0</version></dependency>
</dependencies>
<build><plugins><!-- 該插件用于將 Scala 代碼編譯成 class 文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><!-- 聲明綁定到 maven 的 compile 階段 --><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>
配置文件
在src/main/resources目錄下放置如下三個文件,可以從服務器中拷貝:
- core-site.xml
- hdfs-site.xml
- log4j.properties
3)代碼編寫
package com.clear.sparkimport org.apache.spark.{SparkConf, SparkContext}/*** 使用Scala語言使用SparkCore編程實現詞頻統計:WordCount* 從HDFS上讀取文件,統計WordCount,將結果保存在HDFS上*/
object SparkWordCount {def main(args: Array[String]): Unit = {// todo 創建SparkContext對象,需要傳遞SparkConf對象,設置應用配置信息val conf = new SparkConf().setAppName("詞頻統計").setMaster("local[2]")val sc = new SparkContext(conf)// todo 讀取數據,封裝數據到RDDval inputRDD = sc.textFile("/opt/data/wc/README.md")// 分析數據,調用RDD算子val resultRDD = inputRDD.flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey((tmp, item) => tmp + item)// 保存數據,將最終RDD結果數據保存至外部存儲系統resultRDD.foreach(tuple => println(tuple))resultRDD.saveAsTextFile(s"/opt/data/wc-${System.nanoTime()}")// 應用程序結束,關閉資源sc.stop()}
}
4)測試
[nhk@kk01 wordcount]$ $SPARK_HOME/bin/spark-submit --class com.clear.WordCount /opt/data/wordcount/spark-core-scala-1.0.jar
Java版
1)POM
<dependencies><!-- spark-core依賴--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.13</artifactId><version>3.2.0</version><scope>provided</scope></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>2.4</version><configuration><archive><manifest><!-- mainClass標簽填寫主程序入口--><mainClass>com.clear.demo1.CreateFileUtil</mainClass><addClasspath>true</addClasspath><classpathPrefix>lib/</classpathPrefix></manifest></archive><classesDirectory></classesDirectory></configuration></plugin><!-- 復制依賴文件到編譯目錄中 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><version>3.1.1</version><executions><execution><id>copy-dependencies</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin></plugins>
</build>
2)代碼
package com.clear.wordcount;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;public class JavaSparkWordCount {public static void main(String[] args) {// 創建 SparkConf 對象配置應用SparkConf conf = new SparkConf().setAppName("JavaSparkWordCount").setMaster("local");// 基于 SparkConf 創建 JavaSparkContext 對象JavaSparkContext jsc = new JavaSparkContext(conf);// 加載文件內容JavaRDD<String> lines = jsc.textFile("file:///opt/data/wordcount/README.md");// 轉換為單詞 RDDJavaRDD<String> words = lines.flatMap(line ->Arrays.asList(line.split(" ")).iterator());// 統計每個單詞出現的次數JavaPairRDD<String, Integer> counts = words.mapToPair(word -> new Tuple2<>(word, 1)).reduceByKey((x, y) -> (x + y));// 輸出結果counts.saveAsTextFile("file:///opt/data/wordcount/wc");// 關閉 JavaSparkContext 對象jsc.stop();}
}
3)測試
運行:
[nhk@kk01 wordcount]$ $SPARK_HOME/bin/spark-submit --class com.clear.wordcount.JavaSparkWordCount /opt/data/wordcount/spark-core-demo-1.0.jar
查看結果:
[nhk@kk01 wc]$ pwd
/opt/data/wordcount/wc
[nhk@kk01 wc]$ ll
total 8
-rw-r--r--. 1 nhk nhk 4591 Jul 30 17:48 part-00000
-rw-r--r--. 1 nhk nhk 0 Jul 30 17:49 _SUCCESS
[nhk@kk01 wc]$ head part-00000
(package,1)
(For,3)
(Programs,1)
(processing.,2)
(Because,1)
(The,1)
(cluster.,1)
(its,1)
([run,1)
(APIs,1)