目錄
Part.01 關于HDP
Part.02 核心組件原理
Part.03 資源規劃
Part.04 基礎環境配置
Part.05 Yum源配置
Part.06 安裝OracleJDK
Part.07 安裝MySQL
Part.08 部署Ambari集群
Part.09 安裝OpenLDAP
Part.10 創建集群
Part.11 安裝Kerberos
Part.12 安裝HDFS
Part.13 安裝Ranger
Part.14 安裝YARN+MR
Part.15 安裝HIVE
Part.16 安裝HBase
Part.17 安裝Spark2
Part.18 安裝Flink
Part.19 安裝Kafka
Part.20 安裝Flume
十七、安裝Spark2
1.安裝
添加Spark2服務
需要重啟HDFS、YARN、MapReduce2、Hive、HBase等相關服務
2.取消kerberos對頁面的認證
在CONFIGS->Advanced spark2-env下的content里,將下面內容加#注釋掉
export SPARK_HISTORY_OPTS='-Dspark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter -Dspark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=kerberos,kerberos.principal={{spnego_principal}},kerberos.keytab={{spnego_keytab}}"'
訪問頁面,http://hdp01.hdp.com:18081/
3.確認Spark on Yarn配置
查看/usr/hdp/3.1.5.0-152/spark2/conf/spark-env.sh
export HADOOP_HOME=${HADOOP_HOME:-/usr/hdp/3.1.5.0-152/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/usr/hdp/3.1.5.0-152/hadoop/conf}
/usr/hdp/3.1.5.0-152/hadoop-yarn/conf/yarn-site.xml
4.spark-shell交互式命令
每個Spark應用程序都需要一個Spark環境,這是Spark RDD API的主要入口點。Spark Shell提供了一個名為“sc”的預配置Spark環境和一個名為“spark”的預配置Spark會話。使用Spark Shell的時候,本身是預配置了sc,即SparkConf和SparkContext的,但是在實際使用編輯器編程過程中是需要設置這些配置的。
(1)啟動
啟動spark-shell
spark-shell --master local
正確界面如下:
(2)加載本地文件
通過預置sc加載本地文件
val textFile = sc.textFile("file:///root/wordcount_input")
val后面的是變量textFile,而sc.textFile()中的這個textFile是sc的一個方法名稱,這個方法用來加載文件數據。這兩個textFile不是一個東西,不要混淆。實際上,val后面的是變量textFile。
要加載本地文件,必須采用“file:///”開頭的這種格式。執行上上面這條命令以后,并不會馬上顯示結果,因為,Spark采用惰性機制,只有遇到“行動”類型的操作,才會從頭到尾執行所有操作。
textFile.first()
first()是一個“行動”(Action)類型的操作,會啟動真正的計算過程,從文件中加載數據到變量textFile中,并取出第一行文本。屏幕上會顯示很多反饋信息,這里不再給出,你可以從這些結果信息中,找到word.txt文件中的第一行的內容。
正因為Spark采用了惰性機制,在執行轉換操作的時候,即使我們輸入了錯誤的語句,spark-shell也不會馬上報錯,而是等到執行“行動”類型的語句時啟動真正的計算,那個時候“轉換”操作語句中的錯誤就會顯示出來。
(3)變量回寫到本地文件
將變量中的內容寫回到本地文件/root/output中
val textFile = sc.textFile("file:///root/wordcount_input")
textFile.saveAsTextFile("file:///root/output")
上面的saveAsTextFile()括號里面的參數是保存文件的路徑,不是文件名。saveAsTextFile()是一個“行動”(Action)類型的操作,所以,馬上會執行真正的計算過程,從wordcount_input中加載數據到變量textFile中,然后,又把textFile中的數據寫回到本地文件目錄“/root/output”下面
ll /root/output/
cat /root/output/part-00000
(4)加載HDFS中文件
與加載本地文件類似
val textFile = sc.textFile("hdfs://hdp315/testhdfs/tenant1/wordcount_input")
textFile.first()
實驗:Spark SQL-詞頻統計
(1)spark-shell方式
待統計文件為/root/wordcount_input
spark-shell --master local
val textFile = sc.textFile("file:///root/wordcount_input")
val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).ruduceByKey((a,b) => a + b)
wordCount.collect()
textFile包含了多行文本內容,textFile.flatMap(line => line.split(" “))會遍歷textFile中的每行文本內容,當遍歷到其中一行文本內容時,會把文本內容賦值給變量line,并執行Lamda表達式line => line.split(” “)。line => line.split(” “)是一個Lamda表達式,左邊表示輸入參數,右邊表示函數里面執行的處理邏輯,這里執行line.split(” "),也就是針對line中的一行文本內容,采用空格作為分隔符進行單詞切分,從一行文本切分得到很多個單詞構成的單詞集合。這樣,對于textFile中的每行文本,都會使用Lamda表達式得到一個單詞集合,最終,多行文本,就得到多個單詞集合。textFile.flatMap()操作就把這多個單詞集合“拍扁”得到一個大的單詞集合。
然后,針對這個大的單詞集合,執行map()操作,也就是map(word => (word, 1)),這個map操作會遍歷這個集合中的每個單詞,當遍歷到其中一個單詞時,就把當前這個單詞賦值給變量word,并執行Lamda表達式word => (word, 1),這個Lamda表達式的含義是,word作為函數的輸入參數,然后,執行函數處理邏輯,這里會執行(word, 1),也就是針對輸入的word,構建得到一個tuple,形式為(word,1),key是word,value是1(表示該單詞出現1次)。
程序執行到這里,已經得到一個RDD,這個RDD的每個元素是(key,value)形式的tuple。最后,針對這個RDD,執行reduceByKey((a, b) => a + b)操作,這個操作會把所有RDD元素按照key進行分組,然后使用給定的函數(這里就是Lamda表達式:(a, b) => a + b),對具有相同的key的多個value進行reduce操作,返回reduce后的(key,value),比如(“hadoop”,1)和(“hadoop”,1),具有相同的key,進行reduce以后就得到(“hadoop”,2),這樣就計算得到了這個單詞的詞頻。
(2)spark-submit方式
建議找一臺有外網的服務器來做sbt,因為需要下載很多依賴包
安裝sbt
tar -zxvf /opt/sbt-1.8.2.tgz -C /usr/local/
將位于sbt/bin下面的sbt-launch.jar文件放在sbt目錄下。
cp /usr/local/sbt/bin/sbt-launch.jar /usr/local/sbt/
在sbt目錄下創建sbt腳本
chmod u+x /usr/local/sbt/sbt
確認是否成功
/usr/local/sbt/sbt sbtVersion
創建工程目錄及相關文件
scala文件,/data01/project/wordcount/src/main/scala/wordcount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConfobject WordCount {def main(args: Array[String]) {val inputFile = "hdfs://hdp315/testhdfs/ranger_yarn/wordcount_input"val conf = new SparkConf().setAppName("WordCount")val sc = new SparkContext(conf)val textFile = sc.textFile(inputFile)val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)wordCount.foreach(println)}
}
sbt文件,/data01/project/wordcount/wordcount.sbt
name := "WordCount Project"version := "1.0"scalaVersion := "2.11.12"libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
進入到工程目錄下,將整個工程打成jar包
/usr/local/sbt/sbt package
jar包在工程目錄下的./target/scala-2.11/下
回到hdp01上,通過spark-submit提交jar包執行
kinit -kt /root/keytab/ranger_yarn.keytab ranger_yarn
spark-submit --class "WordCount" /root/wordcount-project_2.11-1.0.jar --deploy-mode cluster --master yarn
查看結果
在spark中可以查看任務信息,已經結果
6.實驗:Spark Streaming-顯示實時流內容
將nc作為服務器端,用戶產生數據;啟動sparkstreaming客戶端程序,監聽服務器端發送過來的數據,并對其數據進行顯示。
在測試的nc服務端,啟動nc程序,端口為1234
nc -l 1234
配置sbt文件,增加sparking-streaming依賴包,/data01/project/streamPrint/streamPrint.sbt
name := "streamPrint Project"version := "1.0"scalaVersion := "2.11.12"libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.3.0",
"org.apache.spark" %% "spark-streaming" % "2.3.0"
)
配置scala文件,/data01/project/streamPrint/src/main/scala/streamPrint.scala
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevelobject StreamPrint {def main(args: Array[String]) {val conf = new SparkConf().setAppName("streamPrint")val sc = new StreamingContext(conf, Seconds(5))val lines = sc.socketTextStream("192.168.111.1", 1234, StorageLevel.MEMORY_AND_DISK)if (lines != null) {lines.print()println("start!")}sc.start()sc.awaitTermination()}
}
進入到工程目錄下,將整個工程打成jar包
/usr/local/sbt/sbt package
回到hdp01上,通過spark-submit提交jar包執行
kinit -kt /root/keytab/ranger_yarn.keytab ranger_yarn
spark-submit --class "StreamPrint" /root/streamprint-project_2.11-1.0.jar --deploy-mode cluster --master yarn
此時在nc服務端輸入內容后,可在spark streaming中看到相應的內容
Spark streaming中的間隔,是在scala程序中設置的,val sc = new StreamingContext(conf, Seconds(5))因此是5秒輸出一次。
7.spark-submit參數
–master
master的地址,提交任務到哪里執行
常見的選項有
local:提交到本地服務器執行,并分配單個線程
local[k]:提交到本地服務器執行,并分配k個線程
spark://HOST:PORT:提交到standalone模式部署的spark集群中,并指定主節點的IP與端口
mesos://HOST:PORT:提交到mesos模式部署的集群中,并指定主節點的IP與端口
yarn:提交到yarn模式部署的集群中
–deploy-mode
在本地(client)啟動driver或在cluster上啟動,默認是client
DEPLOY_MODE:設置driver啟動的位置,可選項如下,默認為client
client:在客戶端上啟動driver,這樣邏輯運算在client上執行,任務執行在cluster上
cluster:邏輯運算與任務執行均在cluster上,cluster模式暫時不支持于Mesos集群或Python應用程序
–class
應用程序的主類,僅針對java或scala應用
CLASS_NAME:指定應用程序的類入口,即主類,僅針對java、scala程序,不作用于python程序
–name
應用程序的名稱
–jars
用逗號分隔的本地jar包,設置后,jar包將包含在driver和executor的classpath下
–packages
包含在driver和executor的classpath中的jar的maven坐標
–exclude-packages
為了避免沖突,指定的參數–package中不包含的jars包
–repositories
遠程repository
附加的遠程資源庫(包含jars包)等,可以通過maven坐標進行搜索
–py-files
PY_FILES:逗號隔開的的.zip、.egg、.py文件,這些文件會放置在PYTHONPATH下,該參數僅針對python應用程序
–files
FILES:逗號隔開的文件列表,這些文件將存放于每一個工作節點進程目錄下
–conf PROP=VALUE
指定spark配置屬性的值,格式為PROP=VALUE,例如–confspark.executor.extraJavaOptions=“-XX:MaxPermSize=256m”
–properties-file
指定需要額外加載的配置文件,用逗號分隔,如果不指定,默認為conf/spark-defaults.conf
–driver-memory
Driver內存,默認1G
–driver-java-options
傳給driver的額外的Java選項
–driver-library-path
傳給driver的額外的庫路徑
–driver-class-path
傳給driver的額外的類路徑,用–jars添加的jar包會自動包含在類路徑里
–driver-cores
Driver的核數,默認是1。在yarn或者standalone下使用
–executor-memory
每個executor的內存,默認是1G
–total-executor-cores
所有executor總共的核數。僅僅在mesos或者standalone下使用
–num-executors
啟動的executor數量。默認為2。在yarn下使用
–executor-core
每個executor的核數。在yarn或者standalone下使用