Flink之Task解析

Flink之Task解析

??對Flink的Task進行解析前,我們首先要清楚幾個角色TaskManagerSlotTaskSubtaskTaskChain分別是什么

角色注釋
TaskManager在Flink中TaskManager就是一個管理task的進程,每個節點只有一個TaskManager
SlotSlot就是TaskManager中的槽位,一個TaskManager中可以存在多個槽位,取決于服務器資源和用戶配置,可以在槽位中運行Task實例
Task其實Task在Flink中就是一個類,其中可以包含一個或多個算子,這個取決于算子鏈的構成
SubTaskSubTask就是Task類的并行實例可以是一個或多個,也就是說當代碼執行的那一刻開始,就根據用戶所設置或者默認的并行度創建出多個SubTask
TaskChainTaskChain就是算子鏈,何為算子鏈?就是在一個Task實例中出現的串行算子,算子間必須是OneToOne模式且并行度相同.

??上面對幾個角色進行了一個簡單的闡述,后面會結合圖解和偽代碼進行講解,這里我們以計算中比較經典wordcount為例子,偽代碼如下所示:

public class FLinkWordCount {public static void main(String[] args) throws Exception {// 創建流處理環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();\// 設置并行度3env.setParallelism(3)// 讀取數據文件DataStreamSource<String> streamSource = env.readTextFile("xxx");// 轉大寫DataStreamSource<String> upperCaseSource = streamSource.map(word -> word.toUpperCase())// 轉成tuple2格式,計數1SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = upperCaseSource.map(word -> Tuple2.of(word, 1));// 按照單詞分組KeyedStream<Tuple2<String, Integer>, String> keyed = mapStream.keyBy(tup -> tup.f0);// 求和keyed.sum("f1")env.execute();}
}

??上面的代碼中我們使用了兩次map,一次keyBy,一次sum算子,我們下面就結合這幾個算子進行講解,講解之前有兩個條件需要先記住:

  • 同一個Task并行實例不能放在同一個TaskSlot上運行,一個TaskSlot上可以運行多個不同的Task并行實例
  • 同一個共享組的算子允許共享槽位,不同共享組的算子決不允許共享槽位

??上面這兩句話一定要記牢,以便于后面的理解.

算子鏈劃分及Task槽位分配

算子鏈劃分

可以根據上面的代碼理解下圖:
在這里插入圖片描述

上圖中我們可以看到兩個map組成一個task chain,keyBysum組成一個task chain,這里說一下原因,首先就是兩個map的并行度是一致的,而且是OneToOne模式,所以可以將兩個map綁定成一個算子鏈,并將其放入到一個SubTask中,而到了keyBy這里為什么不能再放入到一個task chain中,這里我們可以思考一下,keyBy時會發生什么?以spark的角度來說會發生shuffle對吧,這就導致了不能滿足OneToOne的模式,簡單來說我們也可以想清楚,如果keyBymap組成一個task chain那么還怎么做wordcount?

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-bckucHbv-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task2.png)]

通過上圖應該很容易理解了.

Task槽位分配

??上面講了關于task chain怎么劃分的,為什么這樣劃分,這里講一下為什么同一個Task的并行實例(SubTask)不能在同一個task slot中.其實這個也很容易就想清楚,如果同一Task的多個SubTask都出現在一個task slot中那么還有什么意義呢?當這些SubTask出現在一個task slot中時就會發生串行計算,那并行的意義也就沒有了.

??同時這種機制也保證了任務的容錯性,也就是說對于同一個Task一旦某一個task slot出現異常的情況,其他的task slot中的SubTask還能正常運行,如果將這些SubTask放到一個task slot中,當這個task slot出現異常情況時,就會影響整個任務的執行.

??總結來說,這種設計保證了Flink任務的隔離性、容錯性、資源利用性.這里用圖解的方式便于大家記憶,如下:

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-rlgqeo6A-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task3.png)]

槽位共享及算子鏈斷/連

槽位共享

??前面講過同一個Task的多個SubTask不能出現在一個task slot中,但是不同TaskSubTask是可以共享同一個task slot的,但是在Flink中有一個機制,就是用戶(開發人員)可以自定義不同的算子間是否可以共享同一個task slot,如上面的例子中兩個map的并行度一致并且符合OneToOne的模式,在正常情況下必然會會分到一個task chain中,但是Flink給用戶提供了的slot group的概念,也就是說用戶可以將這兩個map分配到不同的slot group中,這種情況下兩個map就不會劃分到一個task chain中,試想一下當兩個map都不允許共享同一個task slot時,怎么可能劃分到同一個task chain中呢?

??偽代碼如下:

public class FLinkWordCount {public static void main(String[] args) throws Exception {// 創建流處理環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();\// 設置并行度3env.setParallelism(1)// 讀取數據文件DataStreamSource<String> streamSource = env.readTextFile("xxx");// 轉大寫DataStreamSource<String> upperCaseSource = streamSource.map(word -> word.toUpperCase())// 通過slotSharingGroup()將upperCaseSource作為一個分組"g1"SingleOutputStreamOperator<String> slotGroup1 = upperCaseSource.slotSharingGroup("g1");// 轉成tuple2格式,計數1SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = upperCaseSource.map(word -> Tuple2.of(word, 1));// 通過slotSharingGroup()將mapStream作為一個分組"g3"SingleOutputStreamOperator<Tuple2<String, Integer>> slotGroup2 = mapStream.slotSharingGroup("g2");// 按照單詞分組KeyedStream<Tuple2<String, Integer>, String> keyed = mapStream.keyBy(tup -> tup.f0);// 求和keyed.sum("f1")env.execute();}
}

上面的代碼中我們將upperCaseSourcemapStream分成了兩個task slot,這樣兩個map就不可以共享相同的task slot,同時代碼中將并行度改為了1,這樣便于圖解,如下圖:

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-saLgMu0Q-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task4.png)]
如果說集群中總task slot只有3個,并且在代碼中兩個map設置了不同的task slot且兩個map的并行度都為3時會怎么樣?很簡單,提交任務時就會報錯,因為提交任務所需要的資源已經超出了集群的資源.

??這里說一下對于對task slot進行分組處理的實際用處,就以代碼中兩個map為例子,在實際的業務中如果兩個map處理的數據量都極大,如果將兩個map的計算都放到一個節點的一個task slot時會發生什么?數據的積壓、任務異常失敗等等都有可能發生,但是有slotSharingGroup我們就可以保證同一個task slot不會承載過大的計算任務,也就達到了資源合理分配的目的.

算子鏈斷/連

??前面講了關于將兩個map進行slotSharingGroup后會將兩個map劃分到不同的task chain,如果有這樣一個情況兩個map滿足OneToOne的模式且并行度相同時,我們不使用slotSharingGroup能否將兩個map劃分成不同的task chain?答案是當然可以的,Flink為我們提供了對應的API,偽代碼如下:

public class FLinkWordCount {public static void main(String[] args) throws Exception {// 創建流處理環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();\// 設置并行度3env.setParallelism(3)// 讀取數據文件DataStreamSource<String> streamSource = env.readTextFile("xxx");// 轉大寫DataStreamSource<String> upperCaseSource = streamSource.map(word -> word.toUpperCase())// 轉成tuple2格式,計數1SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = upperCaseSource.map(word -> Tuple2.of(word, 1));// 將mapStream劃分到一個新的task chain中SingleOutputStreamOperator<Tuple2<String, Integer>> newTaskChainMapStream = mapStream.startNewChain();// 按照單詞分組KeyedStream<Tuple2<String, Integer>, String> keyed = mapStream.keyBy(tup -> tup.f0);// 求和keyed.sum("f1")env.execute();}
}

在上面代碼中我們調用了startNewChain()后就可以將mapStream劃分到一個新的task chain中,這樣的情況下,兩個map既屬于不同的task chain又可以共享同一個task slot,如下圖:

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-jOIlz8uH-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task5.png)]
以上就是對于Task的講解,如有錯誤歡迎指出,如有問題共同探討.

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/40736.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/40736.shtml
英文地址,請注明出處:http://en.pswp.cn/news/40736.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

數據結構單鏈表

單鏈表 1 鏈表的概念及結構 概念&#xff1a;鏈表是一種物理存儲結構上非連續、非順序的存儲結構&#xff0c;數據元素的邏輯順序是通過鏈表中的指針鏈 接次序實現的 。 在我們開始講鏈表之前&#xff0c;我們是寫了順序表&#xff0c;順序表就是類似一個數組的東西&#xff0…

上海虛擬展廳制作平臺怎么選,蛙色3DVR 助力行業發展

引言&#xff1a; 在數字化時代&#xff0c;虛擬展廳成為了企業宣傳的重要手段。而作為一家位于上海的實力平臺&#xff0c;上海蛙色3DVR憑借其卓越的功能和創新的技術&#xff0c;成為了企業展示和宣傳的首選。 一、虛擬展廳的優勢 虛擬展廳的崛起是指隨著科技的進步&#x…

36_windows環境debug Nginx 源碼-使用 VSCode 和WSL

文章目錄 配置 WSL編譯 NginxVSCode 安裝插件launch.json配置 WSL sudo apt-get -y install gcc cmake sudo apt-get -y install pcre sudo apt-get -y install libpcre3 libpcre3-dev sudo apt-get

手機商城網站的分析與設計(論文+源碼)_kaic

目錄 摘 要 1 1 緒論 2 1.1選題背景意義 2 1.2國內外研究現狀 2 1.2.1國內研究現狀 2 1.2.2國外研究現狀 3 1.3研究內容 3 2 網上手機商城網站相關技術 4 2.1.NET框架 4 2.2Access數據庫 4 2.3 JavaScript技術 4 3網上手機商城網站分析與設…

Grafana+Prometheus技術文檔-進階使用-監控spring-boot項目

阿丹&#xff1a; 之前已經實現了使用Prometheus來對服務器進行了監控和儀表盤的創建&#xff0c;現在就需要對這些監控方法使用在spring-boot中去。 實現思路&#xff1a; 1、集成Actuator 2、加入Prometheus的依賴 3、配置開放端口、以及開放監控 4、配置Prometheus中的配置…

一次網絡不通“爭吵“引發的思考

作者&#xff1a; 鄭明泉、余凱 為啥爭吵&#xff0c;吵什么&#xff1f; “你到底在說什么啊&#xff0c;我K8s的ecs節點要訪問clb的地址不通和本地網卡有什么關系…” 氣憤語氣都從電話那頭傳了過來&#xff0c;這時電話兩端都沉默了。過了好一會傳來地鐵小姐姐甜美的播報聲…

iview默認樣式覆蓋

scoped 屬性是 HTML5 中的新屬性。 當style標簽擁有scoped屬性時&#xff0c;它的css樣式只能用于當前的Vue組件&#xff0c;可以使組件的樣式不相互污染。 如果一個項目的所有style標簽都加上了scoped屬性&#xff0c;相當于實現了樣式的模塊化。 1、全頁面覆蓋 不添加scoped…

【一】ubuntu20.04上搭建containerd版( 1.2.4 以上)k8s及kuboard V3

k8s 部署全程在超級用戶下進行 sudo su本文請根據大綱順序閱讀&#xff01; 一、配置基礎環境&#xff08;在全部節點執行&#xff09; 1、安裝docker 使用apt安裝containerd 新版k8s已經棄用docker轉為containerd&#xff0c;如果要將docker改為containerd詳見&#xff1a…

對dubbo的DubboReference.check的參數進行剖析

背景 在使用dubbo的時候&#xff0c;發現當消費者啟動的時候&#xff0c;如果提供者沒有啟動&#xff0c;即使提供者后來啟動了&#xff0c;消費者也調不通提供者提供的接口了。 注冊中心使用都是nacos dubbo版本是3.0.4 例子 接口 public interface DemoService {String…

“深入解析JVM內部機制:探秘Java虛擬機的奧秘“

標題&#xff1a;深入解析JVM內部機制&#xff1a;探秘Java虛擬機的奧秘 摘要&#xff1a;本文將深入解析JVM&#xff08;Java虛擬機&#xff09;的內部機制&#xff0c;從字節碼執行到垃圾回收&#xff0c;逐步揭示Java程序運行的奧秘。通過理論分析和示例代碼&#xff0c;讀…

thinkphp5.1 trace 不顯示sql語句

config/app.php app_debug > true,//線上環境為 false // 應用Trace app_trace > true,//線上環境為 false config/database.php debug > true, config/log.php close > false, thinkphp5.1x 設計非常奇怪 必須開啟…

使用dockerfile手動構建JDK11鏡像運行容器并校驗

Docker官方維護鏡像的公共倉庫網站 Docker Hub 國內無法訪問了&#xff0c;大部分鏡像無法下載&#xff0c;準備逐步構建自己的鏡像庫。【轉載aliyun官方-容器鏡像服務 ACR】Docker常見問題 阿里云容器鏡像服務ACR&#xff08;Alibaba Cloud Container Registry&#xff09;是面…

內網穿透-外遠程連接中的RabbitMQ服務

文章目錄 前言1.安裝erlang 語言2.安裝rabbitMQ3. 內網穿透3.1 安裝cpolar內網穿透(支持一鍵自動安裝腳本)3.2 創建HTTP隧道 4. 公網遠程連接5.固定公網TCP地址5.1 保留一個固定的公網TCP端口地址5.2 配置固定公網TCP端口地址 前言 RabbitMQ是一個在 AMQP(高級消息隊列協議)基…

Linux:shell腳本:基礎使用(4)《正則表達式-grep工具》

正則表達式定義&#xff1a; 使用單個字符串來描述&#xff0c;匹配一系列符合某個句法規則的字符串 正則表達式的組成&#xff1a; 普通字符串: 大小寫字母&#xff0c;數字&#xff0c;標點符號及一些其他符號 元字符&#xff1a;在正則表達式中具有特殊意義的專用字符 正則表…

python中__main__的解釋

源自于&#xff1a;https://zhuanlan.zhihu.com/p/340997807 Python程序運行時是從模塊頂行開始&#xff0c;最頂層&#xff08;沒有被縮進&#xff09;的代碼都會被執行&#xff0c;所以Python中并不需要一個統一的main()作為程序的入口。 __name__是Python的內置變量&#…

藍橋杯嵌入式省一教程:(三)按鍵掃描與定時器中斷

在第一講中曾經提到&#xff0c;GPIO有輸入輸出兩種模式。在點亮LED時&#xff0c;我們已經使用了GPIO輸出模式&#xff0c;在按鍵識別中&#xff0c;我們將要使用GPIO輸入模式。首先來看看按鍵的電路原理圖&#xff08;下圖在選手資源數據包——CT117E-M4產品手冊中&#xff0…

Android CCodec (二十) CCodec Native服務實現分析

1、C2解碼服務registerAsService注冊流程 google實現CCodec的vendor默認解碼服務代碼路徑是在frameworks/av/media/codec2/hidl/services/vendor.cpp中,而其注冊的是HIDL服務,本文就對HIDL服務注冊做簡要分析。首先看下vendor.cpp中的代碼注冊流程。 int main(int /* argc *…

高等數學教材重難點題型總結(三)微分中值定理和導數的應用

第三章&#xff0c;微分中值定理的證明題等&#xff0c;非常重要&#xff0c;需要牢牢掌握 1.證明中值定理對某函數在給定區間上的正確性 2.與中值定理有關的證明題 3.微分中值定理應用于求證不等式 4.洛必達法則求極限 5.洛必達的經典錯誤反例 6.按某項實現多項式冪展開 7.求帶…

以 Java NIO 的角度理解 Netty

文章目錄 前言Java NIO 工作原理Selector 的創建ServerSocketChannel 的創建ServerSocketChannel 注冊 Selector對事件的處理總結 前言 上篇文章《Netty 入門指南》主要涵蓋了 Netty 的入門知識&#xff0c;包括 Netty 的發展歷程、核心功能與組件&#xff0c;并且通過實例演示…

rabbitmq的發布確認

生產者將信道設置成 confirm 模式&#xff0c;一旦信道進入 confirm 模式&#xff0c; 所有在該信道上面發布的 消息都將會被指派一個唯一的 ID (從 1 開始)&#xff0c;一旦消息被投遞到所有匹配的隊列之后&#xff0c;broker 就會發送一個確認給生產者(包含消息的唯一 ID)&…