?Spark- Linux下安裝Spark
前期部署
1.JDK安裝,配置PATH
可以參考之前配置hadoop等配置
2.下載spark-1.6.1-bin-hadoop2.6.tgz,并上傳到服務器解壓
[root@srv01 ~]# tar -xvzf spark-1.6.1-hadoop2.6.tgz /usr/spark-1.6.1-hadoop2.6
3.在?/usr?下創建軟鏈接到目標文件夾
[root@srv01 usr]# ln -s spark-1.6.1-bin-hadoop2.6 spark
4.修改配置文件,目標目錄?/usr/spark/conf/?
[root@srv01 conf]# ls
docker.properties.template log4j.properties.template slaves.template spark-env.sh.template
fairscheduler.xml.template metrics.properties.template spark-defaults.conf.template
這里需要把spark-env.sh.template改名為spark-env.sh
export JAVA_HOME=/usr/jdk
#這個是單機版的配置,不能實現高可用
export SPARK_MASTER_IP=srv01
export SPARK_MASTER_PORT=7077
再配置slaves ,都是我的集群的機器的hostname
srv01
srv02
srv03
5.分發到集群各個機器上,再軟鏈接一下,保持集群一致性,參考step-3
[root@srv01 usr]# scp -r spark-1.6.1-bin-hadoop2.6 srv02:/usr
[root@srv01 usr]# scp -r spark-1.6.1-bin-hadoop2.6 srv03:/usr
6.Spark-sell
配置好,啟動spark-shell,注意記得先關閉防火墻(也可以將spark寫進PATH中)
輸入?sc?,如果顯示下面的,表示安裝正常
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@18811c42
7.測試單詞計算案例
scala> sc.textFile("/root/file.log").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect.toBuffer
res7: scala.collection.mutable.Buffer[(String, Int)] = ArrayBuffer((scala,2), (spark,2), (hive,1), (hadoop,2), (mapreduce,1), (zookeeper,1), (hello,1), (redis,1), (world,1))
?8.啟動Spark集群模式(前提是3臺機器的spark配置一樣,配置文件spark-env.sh和slaves文件保持一致)
進入Spark的sbin目錄下啟動
./start-all.sh
這個腳本文件在sbin目錄
通過Jps查看角色
[root@srv01 conf]# jps
13079 Master
13148 Worker? //這個worker的啟動通過配置文件slaves
13234 Jps
下面是我的slaves的配置文件
srv01
srv02
srv03
slaves配置的決定了在哪幾臺機器上啟動worker
下面的配置文件決定了在哪臺機器上啟動Master
啟動Spark集群(如果有使用hdfs的場景,需要把hadoop的conf目錄下的core-site.xml和dhfs-site.xml拷貝到spark的conf目錄下,才能使用高可用的hdfs url)
?
?
然后在通過網頁查看spark的相關信息:
http://192.168.1.88:8080/
執行第一個Spark程序
指定運行程序的主機名(Master)
?
./spark-submit --class org.apache.spark.examples.SparkPi --master spark://srv01:7077 --executor-memory 1G --total-executor-cores 2 /usr/spark-1.6.1-bin-hadoop2.6/lib/spark-examples-1.6.1-hadoop2.6.0.jar 500
?
?
?
?
IDEA上面編碼使用集群上的spark運行程序
package com.rz.spark.baseimport org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext}object transactionApp {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.OFF)val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("spark://hdp:7077")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))println(rdd1.partitions.length)sc.stop()} }
?