spark入門
啟停spark
sbin/start-all.shsbin/stop-all.sh
spark-shell
進入spark/bin目錄,執行:
./spark-shell
輸出中有這么一行:
Spark context Web UI available at http://xx.xx.xx.188:4040
意味著我們可以從web頁面查看spark的運行情況,特別要注意的是,我們可從中看到節點的classpath,了解每個節點自帶了哪些jar包。
如要查看集群各節點的信息,也可以查看http://xx.xx.xx.188:8080.
spark-shell默認運行于本地,要想運行于集群,需加上–master參數:
./spark-shell --master spark://xx.xx.xx.188:7077
helloworld
scala代碼是:
object HelloWorld{def main(args: Array[String]): Unit = {//配置Spark應用名稱val conf = new SparkConf().setAppName("CollectFemaleInfo")// 提交spark作業val sc = new SparkContext(conf)//讀取數據。其是傳入參數args(0)指定數據路徑val text = sc.textFile(args(0))//篩選女性網民上網時間數據信息val data = text.filter(_.contains("female"))// 匯總每個女網民上網時間val femaleData:RDD[(String, Int)] = data.map { line =>val t = line.split(',')(t(0), t(2).toInt)}.reduceByKey(_+_)// 篩選出時間大于兩小時的女網民val result = femaleData.filter(line => line._2 > 120)println("result count: " + result.count())result.collect().foreach(println)}
}
注意
:
1、sparkContext的textFile默認加載hdfs的文件,要處理本地文件,需加上file://前綴。用本地數據文件的話,要求spark集群里的每個節點上都有這個本地文件,否則會報文件找不到的錯誤。所以,方便起見,最好是將數據放到hdfs上。
2、RDD的轉換(transformation,例如map、flatMap、filter等)操作都是lazy的(亦即,只是創建一個新的RDD實例,而未做任何實際計算),只有count、collect這樣的行動(action)操作才會真正去求值。這跟java stream的表現是一樣的。
提交jar包
將代碼用maven打包為jar,接著提交給spark運行,提交本地執行的命令如下:
./spark-submit --class com.lee.ConsistencyCheck /export/home/data/com.lee.distrulechecker.service-1.0-SNAPSHOT.jar ExtArea
提交spark集群運行的命令如下:
./spark-submit --class com.lee.ConsistencyCheck --master spark://xx.xx.xx.188:7077 /export/home/data/com.lee.distrulechecker.service-1.0-SNAPSHOT.jar ExtArea./spark-submit --class com.lee.ConsistencyCheck --master spark://xx.xx.xx.188:7077 --conf spark.cores.max=5 /export/home/data/com.lee.distrulechecker.service-1.0-SNAPSHOT.jar ExtArea./spark-submit --class com.lee.ConsistencyCheck --master local /export/home/data/com.lee.distrulechecker.service-1.0-SNAPSHOT.jar ExtArea
注意
:
1、–master 指定集群URL,支持的選項如下:
local 本地單線程local[K] 本地多線程(指定K個內核)local[*] 本地多線程(指定所有可用內核)spark://HOST:PORT 連接到指定的 Spark standalone cluster master,需要指定端口。mesos://HOST:PORT 連接到指定的 Mesos 集群,需要指定端口。yarn-client客戶端模式 連接到 YARN 集群。需要配置 HADOOP_CONF_DIR。yarn-cluster集群模式 連接到 YARN 集群
如果不指定–master選項默認就在local跑。
2、內存不夠可用
--driver-memory 512M --executor-memory 512M
強制限制內存。
修改日志打印級別
spark-submit提交時會打印很多INFO信息,影響結果查看,可通過修改日志級別解決。
spark/conf目錄下復制log4j.properties.template為log4j.properties,修改:
log4j.rootCategory=INFO, console
為
log4j.rootCategory=WARN, console
則在用spark-submit提交后不會出現大量的INFO信息。
提交集群執行時報錯:Initial job has not accepted any resources
網上搜了以下,幾個原因:
1、主機名和ip是否配置正確,查看/etc/hosts,同時在spark-shell里鍵入:
sc.getConf.getAll.foreach(println)
查看conf信息
2、內存不足,SPARK_EXECUTOR_MEMORY參數默認會使用1G內存,如果不夠,可以在spark-submit里指定小于1G的數值,例如:
–executor-memory 512M
3、端口號被占用,之前的程序已運行。我的情況就是這樣,spark-shell使用的集群模式,會把7077端口占用掉,導致隨后的spark-submit必然失敗。
提交jar時的庫依賴
Java 和Scala 用戶可以通過spark-submit 的–jars 標記提交獨立的JAR 包依賴。當只有一兩個庫的簡單依賴,并且這些庫本身不依賴于其他庫時,這種方法比較合適。但是一般Java 和Scala 的工程會依賴很多庫。當你向Spark 提交應用時,你必須把應用的整個依賴傳遞圖中的所有依賴都傳給集群。為此,常規的做法是使用構建工具,生成單個大JAR 包,包含應用的所有的傳遞依賴。這通常被稱為超級(uber)JAR 或者組合(assembly) JAR。
SparkSQL
我們可以在sparkSQL里寫出比較復雜的sql,比如case when:
select case when (a.NAME <> b.EXTNAME) then 1 else 0 end from OBJ1 a join OBJ2 b on a.RID=b.RID
spark的python接口
spark通過py4j來做到python和java的互操作。我個人的猜測,由于spark計算的效率瓶頸應該在分布式計算上,使用python的效率未必比java或scala相差很多,就好比我們產品的程序,性能瓶頸都在sql上,用啥語言組織業務更多的出于使用方便的考量。
Spark進階
driver/executor和master/worker的概念詳解
《spark快速大數據分析》里對driver/executor和master/worker的介紹:
在分布式環境下,Spark 集群采用的是主/ 從結構。在一個Spark 集群中,有一個節點負責中央協調,調度各個分布式工作節點。這個中央協調節點被稱為驅動器(Driver)節點。與之對應的工作節點被稱為執行器(executor)節點。driver節點可以和大量的executor節點進行通信,它們也都作為獨立的Java 進程運行。驅動器節點和所有的執行器節點一起被稱為一個Spark 應用(application)。Spark 文檔中始終使用驅動器節點和執行器節點的概念來描述執行Spark應用的進程。而主節點(master)和工作節點(worker)的概念則被用來分別表述集群管理器中的中心化的部分和分布式的部分。這些概念很容易混淆,所以要格外小心。
上述說法比較抽象,具化后是這樣:
1、一個節點就是一個JVM進程,所以driver/executor和master/worker是四種進程;
2、master/worker進程是靜態的、常駐的,spark集群起來后它們就存在了,我們在主機上執行ps命令可以看到master進程;在從機上ps,可以看到worker進程
3、driver/executor進程是動態的、隨application存在的,application可以簡單的認為就是用spark-submit提交的jar包。我們用 spark-submit提交jar包時,就會啟動driver進程,driver進程好比監工,master進程好比總包工頭,監工向總包工頭提要求:“該干活了”,于是master通知它管理的小包工頭(worker進程):“來來分點活給你們干”。worker進程就會去叫醒手下的工人(同一臺從機上的executor進程):“你干這個、你干那個,手腳麻利點”。所以真正干活的是executor進程,driver還干點數據匯總的活,master/worker可都是“管理者”。application運行的時候,我們可以在從機上用ps命令看,會有好幾個executor進程。這些進程由application觸發啟動,通過線程池運行實際的任務,等application結束,它們就會自然消亡。
因此,一個application的運行會有若干管理開銷,比如數據的跨節點傳輸、啟停executor進程、啟停executor進程里的線程池等,若數據量較小,這些管理開銷占的比重反而較大,得不償失。舉個例子,要處理1000條記錄,3臺機器,每臺機器上4個executor進程,結果每個進程就處理80條記錄,才開始就要結束,實在太浪費了。
spark調優參數
每個executor占用的核數
spark.executor.coresThe number of cores to use on each executor. In standalone and Mesos coarse-grained modes, setting this parameter allows an application to run multiple executors on the same worker, provided that there are enough cores on that worker. Otherwise, only one executor per application will run on each worker.
注意,這是每個executor可以使用的core數。所以,如果一臺機器上僅有8個core且spark.executor.cores=4,那么每臺機器上最多能起2個executor進程。
yarn集群下,該參數默認值為1,即每個executor進程使用一個core;standalone集群下則是該節點可用的所有core,考慮到standalone集群對application的調度默認是獨占的,這個默認值就不難理解了,所以我們在各個worker上僅看到一個executor進程。
application占用的最大核數
spark.cores.maxWhen running on a standalone deploy cluster or a Mesos cluster in "coarse-grained" sharing mode, the maximum amount of CPU cores to request for the application from across the cluster (not from each machine). If not set, the default will be spark.deploy.defaultCores on Spark's standalone cluster manager, or infinite (all available cores) on Mesos.
每個executor占用的內存
spark.executor.memoryAmount of memory to use per executor process (e.g. 2g, 8g).
默認1g。
spark的優缺點
優點:
1、擴展性好,只需增加cpu和內存,就能在增加數據量的情況下保證性能不受較大影響。
實測中,數據量10倍增長,但耗時增長遠低于10倍(當然,超過10w條記錄后我們啟用了多核,之前都是單核運行)。
2、資源獨占(或采用靜態資源分配策略)的情況下,效率始終比較穩定,不像數據庫要受主鍵、背景數據量及統計信息的影響;
缺點:
1、比較重量級,小數據量計算的額外開銷反而較大。這時設置spark.cores.max為很小的值(例如1),減少并行度,反而能提升效率。盡管如此,小數據量下相比于DB依然沒有優勢,兩張千條記錄的表連接在DB上耗時不超過1s,但在spark上仍需4s,這還不包括數據提取到hdfs的時間。
2、可能由于硬件資源有限(主要是core數),應用的并發度無法做到很高,最多不能超過總的核數。從測試情況來看,10w條記錄以內的應用只需1核就能保證效率,但超過10w條,就要考慮多核了,像100w條,在10核時才能保證執行時間最短。