Flink,spark對比

三:az 如何調度Spark、Flink,MR 任務
首先,使用java編寫一個spark任務,定義一個類,它有main方法,里面寫好邏輯,sparkConf 和JavaSparkContext 獲取上下文,然后打成一個jar包,創建一個sh文件,使用spark提交任務的spark-submit 命令,指定jar包和對應的類名,和運行的參數,然后在job 文件里面指定sh 腳本,接著dependencies指定好依賴就行。最終打包成一個zip包上傳。

如果是提交flink任務呢,也是定義一個類,在main方法里,Flink 流批任務只需要分別使用StreamExecutionEnvironment或者ExecutionEnvironment獲取對應的執行環境,然后獲取到DataStream 或者DataSet, 然后進行一系列的轉換,最終達成一個jar 包,它是使用/bin/flink run 去提交任務的,后面的參數指定和spark 大同小異 ,az 也大同小異

MR 如何提交任務呢,肯定要編寫Mapper和Reducer的實現處理類,然后有個主類,獲取到Hadoop 的Configuration 的對應環境配置,獲取到job 指定輸入輸出以及Mapper以及reducer類,然后打包成一個jar包,使用hadoop jar xx.jar 提交任務。

四:簡單介紹下Flink
那就對比下Flink,Spark,MapReduce
Flink ,大數據分布式處理框架,從流處理開始,打造流批一體的框架,用于對無界和有界的數據流進行有狀態計算,提供了諸多高級api供用戶開發分布式任務,提供了數據分布,容錯機制,資源管理和調度等功能

4.1: 首先從編程模型來看,MR的基礎就是一條record,spark 就是RDD,rdd就是一批數據,而Flink 是DataStream 和 DataSet,這兩個也是一批數據;從這個最開始的編程模型的輸入來看就知道spark以及Flink 比 MR 快,后續的數據轉換spark和Flink 都有豐富的算子(transform和collect 算子,flink是operator chain),而MR就很局限了,要自己定義
4.2:從數據流轉的介質來看,MR會落盤,就是那個Map階段的結尾會落盤,涉及到磁盤I/O,比較耗費時間;其實Flink 和Spark 也會進行數據的落盤,但是他們和mr的最大的本質不同就是他們可以把數據放在內存中,最后再落盤,而MR一定會落盤;

4.3:算子方面,flink是dataset api,DataStream API, table api, sql;而spark 是 RDD, DataSet, DataFrame, sparkSql;Flink 的核心引擎是runTime,spark的是SparkCore

五:Flink 和sparkStreaming 的區別
5.1: 一個實時,一個微批
5.2: 一個使用StreamingExecutionEnvironment, 一個使用JavaStreamingContext;
5.3: 一個DataStream, 一個是Dstream 的流數據
5.4: 任務調度來說,一個是會依次創建StreamGraph, JobGraph, ExecutionGraph,JobManager 調度ExecutionGraph;而另一個是 創建DstreamGraph, JobGenerator, 和JobScheduler
5.5: 時間機制方面,一個是有數據時間,攝入時間和處理時間;而sparkStreaming 是只有處理時間
5.6: 容錯方面,Flink 有分布式快照,使用兩階段提交協議可以做到只有一次處理,而sparkStreaming 也有checkpoint ,能恢復數據,但是做不到恰好一次處理,可能會重復。

六:Flink 和spark的checkpoint 的異同點
6.1: checkpoint 說白了都是為了持久化數據的,Flink 是保存比如某個數據的狀態,說白了就是會動態變化的值,比如用戶的訂單總額就是用戶訂單數據的狀態,而spark 是保存RDD的數據到hdfs,截斷RDD,防止數據異常中斷,可以恢復;不過都是把內存中的數據持久化到外部的系統中,這里一般是hdfs,持久化嘛
6.2: checkpoint的觸發方式不一樣,Flink 的checkpoint 是由jobManager 定時觸發的,如果配置了的話;而Spark是需要在代碼中手動觸發的
6.3: checkpoint 的觸發機制不一樣,Flink的checkpoint 說白了有兩個階段,預提交階段和提交階段,預提交階段會做三個事,如下所示:
6.3.1: 進行checkpoint, 比如記錄了用戶1和2的訂單金額分別是200和300
6.3.2: 寫WAL 日志,就是用戶1和2又有新的動作,由增加了訂單金額100和50(這個可以認為是狀態)
6.3.3: 鎖定資源,告訴外部系統,用戶1和2的訂單總金額分別是300和350,但是讓外部系統知道,并不是立馬更新
如果上述有任何一步失敗,我們都會滾到上個checkpoint,然后接下來就是提交階段,會做兩個事:
6.3.4: 把checkpoint 的狀態提交
6.3.5: 外部系統更新對應的訂單總金額300和350

如果是spark的checkpoint ,則直接把數據存儲到hdfs了,沒有啥特殊的。

7:Flink 和Spark的集群規模
Flink on yarn,一般是10臺;cpu核數是36;內存是128G;
spark on yarn,是200臺,pb級別的數據,cpu 核數是36,內存是128G

8:Flink 和spark, yarn 的集群角色
8.1:說明
Flink 是有client,jobManager 以及taskManger;client 是提交任務的作用,并且接收結果返回;而JobManager 接收提交任務,進行任務調度,故障恢復,容錯管理;管理tm;
spark 也是有driver,master 以及 worker,和flink的一一對應,此外還有個executor 和 clusterManager
yarn 則是有ResourceManager(整體資源的管理), NoderManager(管理節點上的資源), ApplicationMaster(一個應用程序的管理者),Container(實際運行程序的容器)以及Client

9:flink 以及 spark 還有Mr 提交任務到yarn上的流程對比
9.1:Flink 提交任務流程如下,Flink 支持三種模式,session 模式,perJob模式和Application 模式,前面兩者都相當于spark的yarn-cleint 模式,一個是共享資源,一個獨享資源;而Application 模式是相當于spark的yarn-cluster 模式,客戶端在yarn上,生產環境使用application模式,如下所示:
在這里插入圖片描述
這里的ResourceManager 是flink 自己的,不是yarn的

9.2:spark 在yarn上有yarn-client 模式和yarn-cluster 模式之分,一般我們使用yarn-cluster 模式,這個最主要的點就是driver 是在客戶端還是yarn上,這里的applicationMaster 就可以理解為Driver,生產環境如下:
在這里插入圖片描述
10. Flink 的TaskSlot
它的目的是為了控制一個taskManager 能運行多少個task,所以對資源進行了分配,劃分成不同的slot,一般和cpu是1:1 的關系,所以一個算子分布在不同的taskManger 上面,在一個tm的并行度和slot是一比一的關系,那么全局的并行度就是我們自己設置的并行度了,不過我們在考慮的時候就是考慮單個tm里面的并行度好點;slot 做了內存隔離,沒有做cpu的隔離。

11:Flink 和spark的常用算子比較
FLink 獨有的算子,keyBy, process, window
spark 獨有的,mapPartition, repartition,colease, union ; transformation 和 action 算子

12.Flink 分區策略
GlobalPartitioner; ShufflePartitioner, RebalancePartitioner; RescalePartitioner(根據上下游算子的并行度分發數據), BrodcastPartitioner,ForwardPartitioner(上下游算子并行度一致);KeyGroupStreamPartitioner(Hash分區),CustomPartitioner(自定義分區策略)
Flink的默認分區數就是等于并行度

spark的默認分區數等于cpu的核數,也可以使用repartition,

13:Flink 和Spark的編程流轉區別
Flink 流式這邊一直返回的會是DataStream, 批返回的是DataSet的數據集
而Spark這邊流失返回的會是Dstream以及衍生類的數據集,而批返回的則是RDD以及衍生類的數據集

14: Spark 和Flink 的序列化
為什么這兩者都要實現自己的序列化框架呢,因為Java的序列化存儲密度低,分布式計算的話內存要用在刀刃上,所以他們實現了自己的序列化框架,Spark 是使用了KyroSerializer 序列化,Flink的序列化的基本類是TypeInfomation.

15: Spark 和flink的反壓機制
spark.streaming.backpressure.enabled, sparkStreaming 動態調整,
Flink 手動調整,看并行度,算子處理情況。

16:flink 和spark 數據在內存的抽象
16.1: 就是java對象 --StreamRecord–Buffer–memorySegment–Byte數組
16.2 RDD在緩存到內存之前,partition中record對象實例在堆內other內存區域中的不連續空間中存儲。RDD的緩存過程中, 不連續存儲空間內的partition被轉換為連續存儲空間的Block對象,并在Storage內存區域存儲,此過程被稱作為Unroll(展開)。

17: Spark 和Flink以及Hive 調優
都是從三個方面來說,
分別是資源調優,代碼性能調優,業務調優
17.1: 對于spark 和Flink 來說,資源調優方面,可以使得單個executor 或者taskManager 可以使用的內存和cpu最大的話就盡量可以配置最大,先說spark;
17.1.1: spark一般調整的就是num-executors ,相當于flink的tm的個數;executro-memory, executor-cores,以及driver-memory 分別相當于tm的內存,tm的slot 個數,jm的內存;spark.default.parallelism 也相當于flink的并行度,spark.storage.memoryFraction 是用來持久化RDD的那部分內存,一般是executor-memory 堆內內存的60%的50%;spark.shuffle.memoryFraction就是用來shuffle的內存,和剛剛的一樣,占有堆內內存的60%的50%;所以實際生產看看到底哪個用的多一點,就多給點

17.1.2: 在資源參數這里,hive需要調整的無非也是內存和cpu這方面,如下所示:
mapreduce.map.java.opts, map 階段的jvm進程的堆內存;
mapreduce.map.memory.mb,map階段的jvm 進程的堆內存和堆外內存的和;
mapreduce.reduce.java.opts,reduce 階段的jvm進程的堆內存;
mapreduce.reduce.memory.mb,reduce 階段的 的jvm 進程的堆內存和堆外內存的和;
mapreduce.map/reduce.cpu.vcores, map 和reduce 階段可用的cpu 的個數;當給大點

但是hive中的map和reduce 的task的數量取決于總文件的個數和每個文件數的大小,一般是每個文件數的大小起作用,如下所示:
mapred.min/max.split.size,就是可以分割文件的最小和最大文件大小,但是map的task數量還不是由這個決定的,還是由多個因素決定的,看下圖
在這里插入圖片描述
因為hadoop系統中dfs.block.size 一般是128M,所以如果我們沒有設置上述的最小和最大的話,就是默認按照128去分割,如果要提高task數量,要么提高mapred.map.tasks的數量,要么增大mapred.min.split.size 的大小,到256M也可以。

那么reduce的task的數量呢?
reducer_num = min(total_size/hive.exec.reducers.bytes.per.reducers, mapred.reduce.tasks);
所以最直接的辦法是通過mapred.reduce.tasks = 10 來設定就可以,當然設定太小了執行時間會長,所以要居中;太大的話則小文件過多,也不好。

17.2: 算子性能調優
17.2.1: spark算子性能調優
spark.sql.adaptive.enabled 默認為false 自適應執行框架的開關
spark.sql.adaptive.skewedJoin.enabled 默認為 false 傾斜處理開關
spark.hadoop.mapreduce.input.fileinputformat.split.minsize 是用于聚合input的小文件,用于控制每個mapTask的輸入文件,防止小文件過多時候,產生太多的task
spark.sql.autoBroadcastJoinThreshold 用于控制在spark sql中使用BroadcastJoin時候表的大小閾值,適當增大可以讓一些表走BroadcastJoin,提升性能,但是如果設置太大又會造成driver內存壓力
用 reduceByKey( ) 和 aggregrateByKey( ) 來取代 groupByKey,因為前者會進行預聚合
操作數據庫建義采用foreachPartition( ) ,資源可以的情況下使用mapPartitions 代替map
數據復用使用persist
減少數據碎片使用 coalesce( )進行重分區
spark.shuffle.file.buffer參數是調節map端緩沖區大小,單位是kb,減少磁盤溢寫次數;
spark.reducer.maxSizeInFlight 參數是調節shuffle的時候reduce端的緩沖區大小,單位是MB
spark.shuffle.io.maxRetries reduce端拉取重試次數,以及拉取失敗等待間隔,spark.shuffle.io.retryWait,單位是s,比如60s
spark.shuffle.sort.bypassMergeThreshold, 如果確實不需要排序操縱,那就調大sortByPass的閾值,調大到400等,默認是200

17.2.2: Hive 性能調優
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 自動合并小文件
set hive.merge.mapredfiles = true; 設置reduce 端對輸出文件的合并
set hive.archive.enabled=true; 使用hadoop archive 文件對小文件歸檔
set hive.mapred.mode=strict 開啟嚴格模式;不允許對分區表查詢where不帶分區,order by 必須加上limit,不允許笛卡爾積等;
set hive.exec.parallel=true; //打開任務并行執行
set mapred.job.reuse.jvm.num.tasks=10 設置jvm重用
set hive.map.aggr=true; set hive.groupby.skewindata = true; 進行數據負載均衡,數據傾斜優化
set hive.fetch.task.conversion=more; 可以減少不必要的走mapreduce 任務
set hive.auto.convert.join = true; 開啟map join

17.2.3: Flink 性能調優
算子方面暫無,主要是資源和傾斜方面,要改代碼

17.3: 業務代碼調優
最典型的問題,數據傾斜怎么辦?
hive只能是自己可以通過剛剛那個skew_in_data 去均衡,那么flink 和spark呢?
17.3.1: spark和flink 數據傾斜處理
17.3.1.1: 碰到大量空值的或者就是某個大量值的,加上隨機字符串,均勻shuffle
17.3…1.2: 把聚合的步驟往前放,放到hive或者mapreudce 里面去做
17.3.1.3: 過濾掉少數導致傾斜的key
17.3.1.4: 提高shuffle操作的并行度,增加并行處理能力
17.3.1.5: 兩階段聚合,局部聚合+全局聚合,對于傾斜的key打上隨機淺醉,聚合后再去掉再聚合,這個適合聚合算子,不適合join
17.3.1.6: Reduce join 換成MapJoin
17.3.1.7: 傾斜key 拆分join,打上隨機前綴,然后后續不傾斜的擴容和它join,最終過濾掉前綴得到正確結果

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/42216.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/42216.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/42216.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

數據結構——二叉樹相關題目

1.尋找二叉樹中數值為x的節點 //尋找二叉樹中數值為x的節點 BTNode* TreeFind(BTNode* root, BTDataType x)//傳過來二叉樹的地址和根的地址,以及需要查找的數據 {if (root Null){return Null;}//首先需要先判斷這個樹是否為空,如果為空直接返回空if (…

【JavaWeb程序設計】JSP實現購物車功能

目錄 一、結合之前所學的相關技術,編寫代碼實現以下購物車功能 1. 我實現的功能運行截圖如下 (1)商品列表頁面home.jsp (2)登錄賬號頁面/未登錄點擊結賬頁面 (3)重新登錄頁面(記…

昇思25天學習打卡營第18天|ShuffleNet圖像分類

一、簡介: ShuffleNetV1是曠視科技提出的一種計算高效的CNN模型,和MobileNet, SqueezeNet等一樣主要應用在移動端,所以模型的設計目標就是利用有限的計算資源來達到最好的模型精度。ShuffleNetV1的設計核心是引入了兩種操作:Poin…

如何在centos7安裝Docker

在centOS7中我們可以使用火山引擎鏡像源鏡像安裝Docker,以下是具體的安裝步驟。 step 1: 安裝必要的一些系統工具 sudo yum install -y yum-utils Step 2: 添加軟件源信息 sudo yum-config-manager --add-repo https://mirrors.ivolces.com/docker/linux/centos/docker-ce.r…

力扣雙指針算法題目:二叉樹的層序遍歷(BFS)

目錄 1.題目 2.思路解析 3.代碼 1.題目 . - 力扣(LeetCode) 2.思路解析 對二叉樹進行層序遍歷,顧名思義,就是按每一層的順序對二叉樹一層一層地進行遍歷 思路如下 從第一層開始,先將二叉樹地頭放入隊列q&#xff0…

獨孤思維:副業被罵煞筆,割韭菜

做副業不要被外界干擾,不要被情緒牽絆。 不要因為別人的無心謾罵,隨口一評,就偃旗息鼓。 不要因為自己的情緒需要,找存在感,尋求人安慰。 他強任他強,清風拂山崗。 他橫由他橫,明月照大江。…

2007-2022年中國各企業數字化轉型與供應鏈效率

企業數字化轉型與供應鏈效率是現代企業管理和發展的兩個關鍵方面。以下是對中國各企業數字化轉型與供應鏈效率數據的介紹: 數據簡介 企業數字化轉型:指企業通過采用數字技術與創新方法,改造業務流程、組織結構和產品服務,以提升…

UCOS-III 系統移植

1. 移植前準備 1.1 源碼下載 UCOS-III Kernel Source: https://github.com/weston-embedded/uC-OS3.git Micriμm CPU Source : https://github.com/weston-embedded/uC-CPU.git Micriμm Lib Source: https://github.com/weston-embedded…

Nginx配置文件全解:從入門到設計

Nginx配置文件全解:從入門到架構設計 1. Nginx配置文件基礎 Nginx的主配置文件通常位于/etc/nginx/nginx.conf?。配置文件使用簡單的文本格式,由指令和指令塊組成。 1.1 基本語法規則 每個指令以分號(;)結束指令塊用大括號({})包圍配置文件支持使用…

多方SQL計算場景下,如何達成雙方共識,確認多方計算作業的安全性

安全多方計算在SQL場景下的限制 隨著MPC、隱私計算等概念的流行, 諸多政府機構、金融企業開始考慮參與到多方計算的場景中, 擴展數據的應用價值。 以下面這個場景為例, 銀行可能希望獲取水電局和稅務局的數據,來綜合計算得到各…

DolphinScheduler-3.1.9 資源中心實踐

前言 目前DolphinScheduler最新的穩定版本是 3.1.9 ,基于此做些探索,逐漸深化學習路徑,以便于加深理解。 3.2.1 是最新的版本。目前的穩定版本是 3.1.9 基礎環境:Hadoop3.3, Java 8, Python3, MacOS14.2.1 一、本地偽分布式安裝…

學習筆記——動態路由——IS-IS中間系統到中間系統(開銷)

四、IS-IS開銷 1、IS-IS 開銷簡介 在IS-IS協議剛面世時,互聯網網絡結構還非常簡單,因此IS-IS早期的版本中只使用了6bit來描述鏈路開銷,鏈路開銷的取值范圍是1-63。一條路由的開銷范圍只有10bit,取值范圍是0-1023。 隨著計…

前端實現無縫自動滾動動畫

1. 前言: 前端使用HTMLCSS實現一個無縫滾動的列表效果 示例圖: 2. 源碼 html部分源碼: <!--* Author: wangZhiyu <w3209605851163.com>* Date: 2024-07-05 23:33:20* LastEditTime: 2024-07-05 23:49:09* LastEditors: wangZhiyu <w3209605851163.com>* File…

【ubuntu】安裝(升級)顯卡驅動,黑屏|雙屏無法使用問題解決方法

every blog every motto: You can do more than you think. https://blog.csdn.net/weixin_39190382?typeblog 0. 前言 ubuntu 安裝(升級)顯卡驅動&#xff0c;黑屏|雙屏無法使用問題解決方法 由于項目需要&#xff0c;對顯卡驅動進行升級。升級完就黑屏。。。。&#xff0…

Fast R-CNN(論文閱讀)

論文名&#xff1a;Fast R-CNN 論文作者&#xff1a;Ross Girshick 期刊/會議名&#xff1a;ICCV 2015 發表時間&#xff1a;2015-9 ?論文地址&#xff1a;https://arxiv.org/pdf/1504.08083 源碼&#xff1a;https://github.com/rbgirshick/fast-rcnn 摘要 這篇論文提出了一…

WordPress禁止用戶注冊某些用戶名

不管在任何網站&#xff0c;用戶注冊時都有一個屏蔽非法關鍵詞&#xff0c;就是禁止注冊某些用戶名&#xff0c;原因是因為防止用戶使用一些特定的用戶名&#xff0c;例如管理員、官方等用戶名&#xff0c;還有就是那些攻擊性的詞語了。 加網站添加了屏蔽非法關鍵詞&#xff0…

BAT-致敬精簡

什么是bat bat是windows的批處理程序&#xff0c;可以批量完成一些操作&#xff0c;方便快速。 往往我們可以出通過 winR鍵來打開指令窗口&#xff0c;這里輸入的就是bat指令 這里就是bat界面 節約時間就是珍愛生命--你能想象以下2分鐘的操作&#xff0c;bat只需要1秒鐘 我…

考慮數據庫粒度的設計-提升效率

目錄 概要 場景 設計思路 小結 概要 公開的資料顯示&#xff0c;數據庫粒度是&#xff1a;“在數據庫領域&#xff0c;特別是數據倉庫的設計中&#xff0c;粒度是一個核心概念&#xff0c;它直接影響到數據分析的準確性和存儲效率。粒度的設定涉及到數據的詳細程度和精度&…

【JVM基礎篇】Java的四種垃圾回收算法介紹

文章目錄 垃圾回收算法垃圾回收算法的歷史和分類垃圾回收算法的評價標準標記清除算法優缺點 復制算法優缺點 標記整理算法&#xff08;標記壓縮算法&#xff09;優缺點 分代垃圾回收算法&#xff08;常用&#xff09;JVM參數設置使用Arthas查看內存分區垃圾回收執行流程分代GC算…

【SpringBoot】IDEA查看spring bean的依賴關系

前因&#xff1a;在研究springcloud config組件時&#xff0c;我發現config-server包下的EnvironmentController可以響應客戶端的請求&#xff0c;但EnvironmentController并不在啟動類所在的包路徑下&#xff0c;所以我推測它是作為某個Bean方法在生效&#xff0c;尋找bean的依…