三:az 如何調度Spark、Flink,MR 任務
首先,使用java編寫一個spark任務,定義一個類,它有main方法,里面寫好邏輯,sparkConf 和JavaSparkContext 獲取上下文,然后打成一個jar包,創建一個sh文件,使用spark提交任務的spark-submit 命令,指定jar包和對應的類名,和運行的參數,然后在job 文件里面指定sh 腳本,接著dependencies指定好依賴就行。最終打包成一個zip包上傳。
如果是提交flink任務呢,也是定義一個類,在main方法里,Flink 流批任務只需要分別使用StreamExecutionEnvironment或者ExecutionEnvironment獲取對應的執行環境,然后獲取到DataStream 或者DataSet, 然后進行一系列的轉換,最終達成一個jar 包,它是使用/bin/flink run 去提交任務的,后面的參數指定和spark 大同小異 ,az 也大同小異
MR 如何提交任務呢,肯定要編寫Mapper和Reducer的實現處理類,然后有個主類,獲取到Hadoop 的Configuration 的對應環境配置,獲取到job 指定輸入輸出以及Mapper以及reducer類,然后打包成一個jar包,使用hadoop jar xx.jar 提交任務。
四:簡單介紹下Flink
那就對比下Flink,Spark,MapReduce
Flink ,大數據分布式處理框架,從流處理開始,打造流批一體的框架,用于對無界和有界的數據流進行有狀態計算,提供了諸多高級api供用戶開發分布式任務,提供了數據分布,容錯機制,資源管理和調度等功能
4.1: 首先從編程模型來看,MR的基礎就是一條record,spark 就是RDD,rdd就是一批數據,而Flink 是DataStream 和 DataSet,這兩個也是一批數據;從這個最開始的編程模型的輸入來看就知道spark以及Flink 比 MR 快,后續的數據轉換spark和Flink 都有豐富的算子(transform和collect 算子,flink是operator chain),而MR就很局限了,要自己定義
4.2:從數據流轉的介質來看,MR會落盤,就是那個Map階段的結尾會落盤,涉及到磁盤I/O,比較耗費時間;其實Flink 和Spark 也會進行數據的落盤,但是他們和mr的最大的本質不同就是他們可以把數據放在內存中,最后再落盤,而MR一定會落盤;
4.3:算子方面,flink是dataset api,DataStream API, table api, sql;而spark 是 RDD, DataSet, DataFrame, sparkSql;Flink 的核心引擎是runTime,spark的是SparkCore
五:Flink 和sparkStreaming 的區別
5.1: 一個實時,一個微批
5.2: 一個使用StreamingExecutionEnvironment, 一個使用JavaStreamingContext;
5.3: 一個DataStream, 一個是Dstream 的流數據
5.4: 任務調度來說,一個是會依次創建StreamGraph, JobGraph, ExecutionGraph,JobManager 調度ExecutionGraph;而另一個是 創建DstreamGraph, JobGenerator, 和JobScheduler
5.5: 時間機制方面,一個是有數據時間,攝入時間和處理時間;而sparkStreaming 是只有處理時間
5.6: 容錯方面,Flink 有分布式快照,使用兩階段提交協議可以做到只有一次處理,而sparkStreaming 也有checkpoint ,能恢復數據,但是做不到恰好一次處理,可能會重復。
六:Flink 和spark的checkpoint 的異同點
6.1: checkpoint 說白了都是為了持久化數據的,Flink 是保存比如某個數據的狀態,說白了就是會動態變化的值,比如用戶的訂單總額就是用戶訂單數據的狀態,而spark 是保存RDD的數據到hdfs,截斷RDD,防止數據異常中斷,可以恢復;不過都是把內存中的數據持久化到外部的系統中,這里一般是hdfs,持久化嘛
6.2: checkpoint的觸發方式不一樣,Flink 的checkpoint 是由jobManager 定時觸發的,如果配置了的話;而Spark是需要在代碼中手動觸發的
6.3: checkpoint 的觸發機制不一樣,Flink的checkpoint 說白了有兩個階段,預提交階段和提交階段,預提交階段會做三個事,如下所示:
6.3.1: 進行checkpoint, 比如記錄了用戶1和2的訂單金額分別是200和300
6.3.2: 寫WAL 日志,就是用戶1和2又有新的動作,由增加了訂單金額100和50(這個可以認為是狀態)
6.3.3: 鎖定資源,告訴外部系統,用戶1和2的訂單總金額分別是300和350,但是讓外部系統知道,并不是立馬更新
如果上述有任何一步失敗,我們都會滾到上個checkpoint,然后接下來就是提交階段,會做兩個事:
6.3.4: 把checkpoint 的狀態提交
6.3.5: 外部系統更新對應的訂單總金額300和350
如果是spark的checkpoint ,則直接把數據存儲到hdfs了,沒有啥特殊的。
7:Flink 和Spark的集群規模
Flink on yarn,一般是10臺;cpu核數是36;內存是128G;
spark on yarn,是200臺,pb級別的數據,cpu 核數是36,內存是128G
8:Flink 和spark, yarn 的集群角色
8.1:說明
Flink 是有client,jobManager 以及taskManger;client 是提交任務的作用,并且接收結果返回;而JobManager 接收提交任務,進行任務調度,故障恢復,容錯管理;管理tm;
spark 也是有driver,master 以及 worker,和flink的一一對應,此外還有個executor 和 clusterManager
yarn 則是有ResourceManager(整體資源的管理), NoderManager(管理節點上的資源), ApplicationMaster(一個應用程序的管理者),Container(實際運行程序的容器)以及Client
9:flink 以及 spark 還有Mr 提交任務到yarn上的流程對比
9.1:Flink 提交任務流程如下,Flink 支持三種模式,session 模式,perJob模式和Application 模式,前面兩者都相當于spark的yarn-cleint 模式,一個是共享資源,一個獨享資源;而Application 模式是相當于spark的yarn-cluster 模式,客戶端在yarn上,生產環境使用application模式,如下所示:
這里的ResourceManager 是flink 自己的,不是yarn的
9.2:spark 在yarn上有yarn-client 模式和yarn-cluster 模式之分,一般我們使用yarn-cluster 模式,這個最主要的點就是driver 是在客戶端還是yarn上,這里的applicationMaster 就可以理解為Driver,生產環境如下:
10. Flink 的TaskSlot
它的目的是為了控制一個taskManager 能運行多少個task,所以對資源進行了分配,劃分成不同的slot,一般和cpu是1:1 的關系,所以一個算子分布在不同的taskManger 上面,在一個tm的并行度和slot是一比一的關系,那么全局的并行度就是我們自己設置的并行度了,不過我們在考慮的時候就是考慮單個tm里面的并行度好點;slot 做了內存隔離,沒有做cpu的隔離。
11:Flink 和spark的常用算子比較
FLink 獨有的算子,keyBy, process, window
spark 獨有的,mapPartition, repartition,colease, union ; transformation 和 action 算子
12.Flink 分區策略
GlobalPartitioner; ShufflePartitioner, RebalancePartitioner; RescalePartitioner(根據上下游算子的并行度分發數據), BrodcastPartitioner,ForwardPartitioner(上下游算子并行度一致);KeyGroupStreamPartitioner(Hash分區),CustomPartitioner(自定義分區策略)
Flink的默認分區數就是等于并行度
spark的默認分區數等于cpu的核數,也可以使用repartition,
13:Flink 和Spark的編程流轉區別
Flink 流式這邊一直返回的會是DataStream, 批返回的是DataSet的數據集
而Spark這邊流失返回的會是Dstream以及衍生類的數據集,而批返回的則是RDD以及衍生類的數據集
14: Spark 和Flink 的序列化
為什么這兩者都要實現自己的序列化框架呢,因為Java的序列化存儲密度低,分布式計算的話內存要用在刀刃上,所以他們實現了自己的序列化框架,Spark 是使用了KyroSerializer 序列化,Flink的序列化的基本類是TypeInfomation.
15: Spark 和flink的反壓機制
spark.streaming.backpressure.enabled, sparkStreaming 動態調整,
Flink 手動調整,看并行度,算子處理情況。
16:flink 和spark 數據在內存的抽象
16.1: 就是java對象 --StreamRecord–Buffer–memorySegment–Byte數組
16.2 RDD在緩存到內存之前,partition中record對象實例在堆內other內存區域中的不連續空間中存儲。RDD的緩存過程中, 不連續存儲空間內的partition被轉換為連續存儲空間的Block對象,并在Storage內存區域存儲,此過程被稱作為Unroll(展開)。
17: Spark 和Flink以及Hive 調優
都是從三個方面來說,
分別是資源調優,代碼性能調優,業務調優
17.1: 對于spark 和Flink 來說,資源調優方面,可以使得單個executor 或者taskManager 可以使用的內存和cpu最大的話就盡量可以配置最大,先說spark;
17.1.1: spark一般調整的就是num-executors ,相當于flink的tm的個數;executro-memory, executor-cores,以及driver-memory 分別相當于tm的內存,tm的slot 個數,jm的內存;spark.default.parallelism 也相當于flink的并行度,spark.storage.memoryFraction 是用來持久化RDD的那部分內存,一般是executor-memory 堆內內存的60%的50%;spark.shuffle.memoryFraction就是用來shuffle的內存,和剛剛的一樣,占有堆內內存的60%的50%;所以實際生產看看到底哪個用的多一點,就多給點
17.1.2: 在資源參數這里,hive需要調整的無非也是內存和cpu這方面,如下所示:
mapreduce.map.java.opts, map 階段的jvm進程的堆內存;
mapreduce.map.memory.mb,map階段的jvm 進程的堆內存和堆外內存的和;
mapreduce.reduce.java.opts,reduce 階段的jvm進程的堆內存;
mapreduce.reduce.memory.mb,reduce 階段的 的jvm 進程的堆內存和堆外內存的和;
mapreduce.map/reduce.cpu.vcores, map 和reduce 階段可用的cpu 的個數;當給大點
但是hive中的map和reduce 的task的數量取決于總文件的個數和每個文件數的大小,一般是每個文件數的大小起作用,如下所示:
mapred.min/max.split.size,就是可以分割文件的最小和最大文件大小,但是map的task數量還不是由這個決定的,還是由多個因素決定的,看下圖
因為hadoop系統中dfs.block.size 一般是128M,所以如果我們沒有設置上述的最小和最大的話,就是默認按照128去分割,如果要提高task數量,要么提高mapred.map.tasks的數量,要么增大mapred.min.split.size 的大小,到256M也可以。
那么reduce的task的數量呢?
reducer_num = min(total_size/hive.exec.reducers.bytes.per.reducers, mapred.reduce.tasks);
所以最直接的辦法是通過mapred.reduce.tasks = 10 來設定就可以,當然設定太小了執行時間會長,所以要居中;太大的話則小文件過多,也不好。
17.2: 算子性能調優
17.2.1: spark算子性能調優
spark.sql.adaptive.enabled 默認為false 自適應執行框架的開關
spark.sql.adaptive.skewedJoin.enabled 默認為 false 傾斜處理開關
spark.hadoop.mapreduce.input.fileinputformat.split.minsize 是用于聚合input的小文件,用于控制每個mapTask的輸入文件,防止小文件過多時候,產生太多的task
spark.sql.autoBroadcastJoinThreshold 用于控制在spark sql中使用BroadcastJoin時候表的大小閾值,適當增大可以讓一些表走BroadcastJoin,提升性能,但是如果設置太大又會造成driver內存壓力
用 reduceByKey( ) 和 aggregrateByKey( ) 來取代 groupByKey,因為前者會進行預聚合
操作數據庫建義采用foreachPartition( ) ,資源可以的情況下使用mapPartitions 代替map
數據復用使用persist
減少數據碎片使用 coalesce( )進行重分區
spark.shuffle.file.buffer參數是調節map端緩沖區大小,單位是kb,減少磁盤溢寫次數;
spark.reducer.maxSizeInFlight 參數是調節shuffle的時候reduce端的緩沖區大小,單位是MB
spark.shuffle.io.maxRetries reduce端拉取重試次數,以及拉取失敗等待間隔,spark.shuffle.io.retryWait,單位是s,比如60s
spark.shuffle.sort.bypassMergeThreshold, 如果確實不需要排序操縱,那就調大sortByPass的閾值,調大到400等,默認是200
17.2.2: Hive 性能調優
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 自動合并小文件
set hive.merge.mapredfiles = true; 設置reduce 端對輸出文件的合并
set hive.archive.enabled=true; 使用hadoop archive 文件對小文件歸檔
set hive.mapred.mode=strict 開啟嚴格模式;不允許對分區表查詢where不帶分區,order by 必須加上limit,不允許笛卡爾積等;
set hive.exec.parallel=true; //打開任務并行執行
set mapred.job.reuse.jvm.num.tasks=10 設置jvm重用
set hive.map.aggr=true; set hive.groupby.skewindata = true; 進行數據負載均衡,數據傾斜優化
set hive.fetch.task.conversion=more; 可以減少不必要的走mapreduce 任務
set hive.auto.convert.join = true; 開啟map join
17.2.3: Flink 性能調優
算子方面暫無,主要是資源和傾斜方面,要改代碼
17.3: 業務代碼調優
最典型的問題,數據傾斜怎么辦?
hive只能是自己可以通過剛剛那個skew_in_data 去均衡,那么flink 和spark呢?
17.3.1: spark和flink 數據傾斜處理
17.3.1.1: 碰到大量空值的或者就是某個大量值的,加上隨機字符串,均勻shuffle
17.3…1.2: 把聚合的步驟往前放,放到hive或者mapreudce 里面去做
17.3.1.3: 過濾掉少數導致傾斜的key
17.3.1.4: 提高shuffle操作的并行度,增加并行處理能力
17.3.1.5: 兩階段聚合,局部聚合+全局聚合,對于傾斜的key打上隨機淺醉,聚合后再去掉再聚合,這個適合聚合算子,不適合join
17.3.1.6: Reduce join 換成MapJoin
17.3.1.7: 傾斜key 拆分join,打上隨機前綴,然后后續不傾斜的擴容和它join,最終過濾掉前綴得到正確結果