1、Java下Spark開發環境搭建(from?http://www.cnblogs.com/eczhou/p/5216918.html)
1.1、jdk安裝
安裝oracle下的jdk,我安裝的是jdk 1.7,安裝完新建系統環境變量JAVA_HOME,變量值為“C:\Program Files\Java\jdk1.7.0_79”,視自己安裝路勁而定。
同時在系統變量Path下添加C:\Program Files\Java\jdk1.7.0_79\bin和C:\Program Files\Java\jre7\bin。
1.2 spark環境變量配置
去http://spark.apache.org/downloads.html網站下載相應hadoop對應的版本,我下載的是spark-1.6.0-bin-hadoop2.6.tgz,spark版本是1.6,對應的hadoop版本是2.6
解壓下載的文件,假設解壓 目錄為:D:\spark-1.6.0-bin-hadoop2.6。將D:\spark-1.6.0-bin-hadoop2.6\bin添加到系統Path變量,同時新建SPARK_HOME變量,變量值為:D:\spark-1.6.0-bin-hadoop2.6
1.3 hadoop工具包安裝
spark是基于hadoop之上的,運行過程中會調用相關hadoop庫,如果沒配置相關hadoop運行環境,會提示相關出錯信息,雖然也不影響運行,但是這里還是把hadoop相關庫也配置好吧。
1.3.1 去下載hadoop 2.6編譯好的包https://www.barik.net/archive/2015/01/19/172716/,我下載的是hadoop-2.6.0.tar.gz,
1.3.2 解壓下載的文件夾,將相關庫添加到系統Path變量中:D:\hadoop-2.6.0\bin;同時新建HADOOP_HOME變量,變量值為:D:\hadoop-2.6.0
1.4 eclipse環境
直接新建java工程,將D:\spark-1.6.0-bin-hadoop2.6\lib下的spark-assembly-1.6.0-hadoop2.6.0.jar添加到工程中就可以了。
?
2、Java寫Spark WordCount程序
import?java.util.Arrays;
import?org.apache.spark.SparkConf;
import?org.apache.spark.api.java.JavaPairRDD;
import?org.apache.spark.api.java.JavaRDD;
import?org.apache.spark.api.java.JavaSparkContext;
import?org.apache.spark.api.java.function.FlatMapFunction;
import?org.apache.spark.api.java.function.Function2;
import?org.apache.spark.api.java.function.PairFunction;
import?org.apache.spark.api.java.function.VoidFunction;
import?scala.Tuple2;
public?class?WordCount?{
????public?static?void?main(String[]?args)?{
????????
????????//創建?SparkConf對象,對程序進行必要的配置
????????SparkConf?conf?=?new?SparkConf()
????????.setAppName("WordCount").setMaster("local");
????????
????????//通過conf創建上下文對象
????????JavaSparkContext?sc?=?new?JavaSparkContext(conf);
????????
????????//創建初始RDD
????????JavaRDD<String>?lines?=?sc.textFile("D://spark.txt");
????????
????????//----用各種Transformation算子對RDD進行操作-----------------------------------------
????????JavaRDD<String>?words?=?lines.flatMap(new?FlatMapFunction<String,?String>()?{
????????????private?static?final?long?serialVersionUID?=?1L;
????????????@Override
????????????public?Iterable<String>?call(String?line)?throws?Exception?{
????????????????//?TODO?Auto-generated?method?stub
????????????????return?Arrays.asList(line.split("?"));
????????????}
????????});
????????
????????JavaPairRDD<String,Integer>?pairs?=?words.mapToPair(new?PairFunction<String,?String,?Integer>()?{
????????????private?static?final?long?serialVersionUID?=?1L;
????????????@Override
????????????public?Tuple2<String,?Integer>?call(String?word)?throws?Exception?{
????????????????//?TODO?Auto-generated?method?stub
????????????????return?new?Tuple2<String,Integer>(word,1);
????????????}
????????});
????????
????????JavaPairRDD<String,Integer>?wordCounts?=?pairs.reduceByKey(new?Function2<Integer,?Integer,?Integer>()?{
????????????
????????????private?static?final?long?serialVersionUID?=?1L;
????????????
????????????@Override
????????????public?Integer?call(Integer?v1,?Integer?v2)?throws?Exception?{
????????????????//?TODO?Auto-generated?method?stub
????????????????return?v1?+?v2;
????????????}
????????});
????????
????????
????????//----用一個?action?算子觸發job-----------------------------------------
????????wordCounts.foreach(new?VoidFunction<Tuple2<String,Integer>>()?{
????????????
????????????@Override
????????????public?void?call(Tuple2<String,?Integer>?wordCount)?throws?Exception?{
????????????????//?TODO?Auto-generated?method?stub
????????????????System.out.println(wordCount._1?+?"?appeared?"?+?wordCount._2?+?"?times");
????????????}
????????});
????}
}?
?