一、執行第一個Spark程序
1、執行程序
我們執行一下Spark自帶的一個例子,利用蒙特·卡羅算法求PI:
啟動Spark集群后,可以在集群的任何一臺機器上執行一下命令:
/home/spark/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://master:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/home/spark/spark-1.6.1-bin-hadoop2.6/lib/spark-examples-1.6.1-hadoop2.6.0.jar \
100
在執行過程中bash上的信息:
執行完成bash上的信息:
執行過程中WebUI上的信息:
執行完以后WebUI上的信息:
2、命令解析
/home/spark/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
①spark-submit:提交任務,它是一個Driver,至于它的實現細節后續會有介紹
--class org.apache.spark.examples.SparkPi \
②—class 指定任務的類名(使用反射調用該類的main方法)
--master spark://master:7077 \
③—master 指定集群Master的地址
--executor-memory 1G \
④—executor 指定為每個executor分配的內存大小
--total-executor-cores 2 \
⑤—total-executor 指定分配給所有executor總的處理器核數
(這里先說一下executor是Worker啟動的子進程,executor負責執行任務,其細節以后會介紹的。)
/home/spark/spark-1.6.1-bin-hadoop2.6/lib/spark-examples-1.6.1-hadoop2.6.0.jar \
⑥指定任務的jar包地址
100
⑦任務的類的main方法的參數
二、使用 spark-shell
spark-shell是Spark自帶的交互式Shell程序,方便用戶進行交互式編程,用戶可以在該命令行下用scala編寫spark程序,spark-shell也是一個Driver。
1、啟動 spark-shell
/home/spark/spark-1.6.1-bin-hadoop2.6/bin/spark-shell \
--master spark://master:7077 \
--executor-memory 1g \
--total-executor-cores 2
這個命令的解析其實和上面的spark-submit是一樣的,我在這再啰嗦一遍:
參數說明:
--master spark://master:7077 指定Master的地址
--executor-memory 1g 指定每個executor可用內存為1G
--total-executor-cores 2 指定所有executor總的處理器核數為2
還有一點需要非常注意:
如果啟動 spark shell 時沒有指定master地址,但是也可以正常啟動 spark shell 和執行 spark shell 中的程序,其實是啟動了spark的local模式,該模式僅在本機啟動一個進程,沒有與集群建立聯系。
還要說明一點:
spark shell 中已經默認將SparkContext類初始化為對象sc。用戶代碼如果需要用到,則直接應用 sc 即可。SparkContext是Spark集群的入口,Driver只有初始化了SparkContext才可以向Spark集群提交任務,所以這個SparkContext和重要,以后我們會詳細介紹整個SparkContext的初始化流程的,現在可以先記住SparkContext是集群的入口,就像Spring中的ApplicationContext一樣。
2、在spark shell中編寫WordCount程序
(1)首先啟動hdfs
(2)向hdfs上傳文件words.txt 到 hdfs://hadoop01:9000/spark/words.txt
words.txt的內容:
hello tom
hello jim
hello tom and kitty
(3)在spark shell 中用scala語言編寫spark程序:
sc.textFile("hdfs://hadoop01:9000/spark/words.txt").flatMap(_.split(" "))
.map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://hadoop01:9000/spark/out")
(4)使用hdfs命令查看結果:
hdfs dfs -cat hdfs://hadoop01:9000/spark/out/part*
?
(jim,1)
(tom,2)
(hello,3)
(and,1)
(kitty,1)
(5)程序簡單說明:
sc.textFile("hdfs://hadoop01:9000/spark/words.txt") 從hdfs中讀取數據
flatMap(_.split(" ")) 先map再壓平
map((_,1)) 將單詞和1構成元組
reduceByKey(_+_) 按照key進行reduce,并將value累加
saveAsTextFile("hdfs://hadoop01:9000/spark/out") 將結果寫入到hdfs中
最后:
也可以使用 IDEA 編寫完一個程序后打包,使用spark-submit方式提交到集群,在這里我就不寫了。一定要注意spark-submit的配置命令不要出錯,還要注意自己的程序需要的參數的正確,不要忘了起hdfs。