Flink
1?? 一 、知識要點
📖 1. Flink簡介
- Apache Flink? — Stateful Computations over Data Streams
- Apache Flink 是一個分布式大數據處理引擎,可對有界數據流和無界數據流進行有狀態的計算。Flink 能在所有常見集群環境中運行,并能以內存速度和任意規模進行計算
- 官網地址:http://flink.apache.org
1.1 處理無界和有界數據
-
任何類型的數據都可以形成一種事件流。信用卡交易、傳感器測量、機器日志、網站或移動應用程序上的用戶交互記錄,所有這些數據都形成一種流。
-
數據可以被作為無界或者有界流來處理。
(1) 無界流 有定義流的開始,但沒有定義流的結束。它們會無休止地產生數據。無界流的數據必須持續處理,即數據被攝取后需要立刻處理。我們不能等到所有數據都到達再處理,因為輸入是無限的,在任何時候輸入都不會完成。處理無界數據通常要求以特定順序攝取事件,例如事件發生的順序,以便能夠推斷結果的完整性。(2) 有界流 有定義流的開始,也有定義流的結束。有界流可以在攝取所有數據后再進行計算。有界流所有數據可以被排序,所以并不需要有序攝取。有界流處理通常被稱為批處理
- Apache Flink 擅長處理無界和有界數據集 精確的時間控制和狀態化使得 Flink 的運行時(runtime)能夠運行任何處理無界流的應用。有界流則由一些專為固定大小數據集特殊設計的算法和數據結構進行內部處理,產生了出色的性能。
1.2 部署應用到任意地方
-
Apache Flink 是一個分布式系統,它需要計算資源來執行應用程序。Flink 集成了所有常見的集群資源管理器,例如 Hadoop YARN、 Apache Mesos 和 Kubernetes,但同時也可以作為獨立集群運行。
Flink 被設計為能夠很好地工作在上述每個資源管理器中,這是通過資源管理器特定(resource-manager-specific)的部署模式實現的。Flink 可以采用與當前資源管理器相適應的方式進行交互。部署 Flink 應用程序時,Flink 會根據應用程序配置的并行性自動標識所需的資源,并從資源管理器請求這些資源。在發生故障的情況下,Flink 通過請求新資源來替換發生故障的容器。提交或控制應用程序的所有通信都是通過 REST 調用進行的,這可以簡化 Flink 與各種環境中的集成
1.3 運行任意規模應用
-
Flink 旨在任意規模上運行有狀態流式應用。因此,應用程序被并行化為可能數千個任務,這些任務分布在集群中并發執行。所以應用程序能夠充分利用無盡的 CPU、內存、磁盤和網絡 IO。而且 Flink 很容易維護非常大的應用程序狀態。其異步和增量的檢查點算法對處理延遲產生最小的影響,同時保證精確一次狀態的一致性。
Flink 用戶報告了其生產環境中一些令人印象深刻的擴展性數字:每天處理數萬億的事件可以維護幾TB大小的狀態可以部署上千個節點的集群
1.4 利用內存性能
- 有狀態的 Flink 程序針對本地狀態訪問進行了優化。任務的狀態始終保留在內存中,如果狀態大小超過可用內存,則會保存在能高效訪問的磁盤數據結構中。任務通過訪問本地(通常在內存中)狀態來進行所有的計算,從而產生非常低的處理延遲。Flink 通過定期和異步地對本地狀態進行持久化存儲來保證故障場景下精確一次的狀態一致性。
📖 2. Flink 的應用場景
在實際生產的過程中,大量數據在不斷地產生,例如金融交易數據、互聯網訂單數據、 GPS 定位數據、傳感器信號、移動終端產生的數據、通信信號數據等,以及我們熟悉的網絡 流量監控、服務器產生的日志數據,這些數據最大的共同點就是實時從不同的數據源中產生, 然后再傳輸到下游的分析系統。針對這些數據類型主要包括實時智能推薦、復雜事件處理、 實時欺詐檢測、實時數倉與 ETL 類型、流數據分析類型、實時報表類型等實時業務場景,而 Flink 對于這些類型的場景都有著非常好的支持。
-
1、實時智能推薦
智能推薦會根據用戶歷史的購買行為,通過推薦算法訓練模型,預測用戶未來可能會購 買的物品。對個人來說,推薦系統起著信息過濾的作用,對 Web/App 服務端來說,推薦系統 起著滿足用戶個性化需求,提升用戶滿意度的作用。推薦系統本身也在飛速發展,除了算法 越來越完善,對時延的要求也越來越苛刻和實時化。利用 Flink 流計算幫助用戶構建更加實 時的智能推薦系統,對用戶行為指標進行實時計算,對模型進行實時更新,對用戶指標進行 實時預測,并將預測的信息推送給 Wep/App 端,幫助用戶獲取想要的商品信息,另一方面也 幫助企業提升銷售額,創造更大的商業價值。
-
2、復雜事件處理
對于復雜事件處理,比較常見的案例主要集中于工業領域,例如對車載傳感器、機械設 備等實時故障檢測,這些業務類型通常數據量都非常大,且對數據處理的時效性要求非常高。 通過利用 Flink 提供的 CEP(復雜事件處理)進行事件模式的抽取,同時應用 Flink 的 Sql 進行事件數據的轉換,在流式系統中構建實時規則引擎,一旦事件觸發報警規則,便立即將 告警結果傳輸至下游通知系統,從而實現對設備故障快速預警監測,車輛狀態監控等目的。
-
3、實時欺詐檢測
在金融領域的業務中,常常出現各種類型的欺詐行為,例如信用卡欺詐、信貸申請欺詐等,而如何保證用戶和公司的資金安全,是來近年來許多金融公司及銀行共同面對的挑戰。 隨著不法分子欺詐手段的不斷升級,傳統的反欺詐手段已經不足以解決目前所面臨的問題。 以往可能需要幾個小時才能通過交易數據計算出用戶的行為指標,然后通過規則判別出具有 欺詐行為嫌疑的用戶,再進行案件調查處理,在這種情況下資金可能早已被不法分子轉移, 從而給企業和用戶造成大量的經濟損失。而運用 Flink 流式計算技術能夠在毫秒內就完成對 欺詐判斷行為指標的計算,然后實時對交易流水進行規則判斷或者模型預測,這樣一旦檢測 出交易中存在欺詐嫌疑,則直接對交易進行實時攔截,避免因為處理不及時而導致的經濟損失。
-
4、實時數倉與 ETL
結合離線數倉,通過利用流計算諸多優勢和 SQL 靈活的加工能力,對流式數據進行實時 清洗、歸并、結構化處理,為離線數倉進行補充和優化。另一方面結合實時數據 ETL 處理能力,利用有狀態流式計算技術,可以盡可能降低企業由于在離線數據計算過程中調度邏輯的 復雜度,高效快速地處理企業需要的統計結果,幫助企業更好地應用實時數據所分析出來的結果。
-
5、流數據分析
實時計算各類數據指標,并利用實時結果及時調整在線系統相關策略,在各類內容投放、 無線智能推送領域有大量的應用。流式計算技術將數據分析場景實時化,幫助企業做到實時化分析 Web 應用或者 App 應用的各項指標,包括 App 版本分布情況、Crash 檢測和分布等, 同時提供多維度用戶行為分析,支持日志自主分析,助力開發者實現基于大數據技術的精細 化運營、提升產品質量和體驗、增強用戶黏性。
-
6、實時報表分析
實時報表分析是近年來很多公司采用的報表統計方案之一,其中最主要的應用便是實時 大屏展示。利用流式計算實時得出的結果直接被推送到前端應用,實時顯示出重要指標的變 換情況。最典型的案例便是淘寶的雙十一活動,每年雙十一購物節,除瘋狂購物外,最引人 注目的就是天貓雙十一大屏不停跳躍的成交總額。在整個計算鏈路中包括從天貓交易下單購 買到數據采集、數據計算、數據校驗,最終落到雙十一大屏上展現的全鏈路時間壓縮在 5 秒以內,頂峰計算性能高達數三十萬筆訂單/秒,通過多條鏈路流計算備份確保萬無一失。 而在其他行業,企業也在構建自己的實時報表系統,讓企業能夠依托于自身的業務數據,快速提取出更多的數據價值,從而更好地服務于企業運行過程中。
📖 3. Flink基本技術棧
在flink整個軟件架構體系中。同樣遵循著分層的架構設計理念,在降低系統耦合度的同時,也為上層用戶構建flink應用提供了豐富且友好的接口。
- API & libraries層
作為分布式數據處理框架,fink同時提供了支撐流計算和批計算的接口,同時在此基礎之上抽象出不同的應用類型的組件庫。
如:基于流處理的CEP(復雜事件處理庫)、SQL&Table庫、FlinkML(機器學習庫)、Gelly(圖處理庫)
有流式處理API,批處理API。流式處理的支持事件處理,表操作。批處理的,支持機器學習,圖計算,也支持表操作。
- Runtime核心層
該層主要負責對上層的接口提供基礎服務,也就是flink分布式計算的核心實現。flink底層的執行引擎。
- 物理部署層
該層主要涉及到flink的部署模式,目前flink支持多種部署模式:
本地 local
集群 standalone/yarn
云 GCE/EC2 谷歌云、亞馬遜云
kubenetes
📖 4. Flink基本架構
? Flink 整個系統主要由兩個組件組成,分別為 JobManager 和 TaskManager,Flink 架構也遵循 Master-Slave 架構設計原則,JobManager 為 Master 節點,TaskManager 為 Worker(Slave)節點。所有組件之間的通信都是借助于 Akka Framework,包括任務的狀態以及 Checkpoint 觸發等信息
-
Client
客戶端負責將任務提交到集群,與 JobManager 構建 Akka 連接,然后將任務提交JobManager,通過和 JobManager 之間進行交互獲取任務執行狀態。客戶端提交任務可以采用CLI 方式或者通過使用 Flink WebUI 提交,也可以在應用程序中指定 JobManager 的 RPC 網絡端口構建 ExecutionEnvironment 提交 Flink 應用。
-
JobManager
JobManager 負責整個 Flink 集群任務的調度以及資源的管理,從客戶端中獲取提交的應用,然后根據集群中 TaskManager 上 TaskSlot 的使用情況,為提交的應用分配相應的 TaskSlot 資源并命令 TaskManager 啟動從客戶端中獲取的應用。JobManager 相當于整個集群的 Master 節點,且整個集群有且只有一個活躍的 JobManager ,負責整個集群的任務管理和資源管理。JobManager 和 TaskManager 之間通過 Actor System 進行通信,獲取任務執行的情況并通過 Actor System 將應用的任務執行情況發送給客戶端。同時在任務執行的過程中,Flink JobManager 會觸發 Checkpoint 操作,每個 TaskManager 節點 收到 Checkpoint 觸發指令后,完成 Checkpoint 操作,所有的 Checkpoint 協調過程都是在 Fink JobManager 中完成。當任務完成后,Flink 會將任務執行的信息反饋給客戶端,并且釋放掉 TaskManager 中的資源以供下一次提交任務使用。
-
TaskManager
TaskManager 相當于整個集群的 Slave 節點,負責具體的任務執行和對應任務在每個節點上的資源申請和管理。客戶端通過將編寫好的 Flink 應用編譯打包,提交到 JobManager,然后 JobManager 會根據已注冊在 JobManager 中 TaskManager 的資源情況,將任務分配給有資源的 TaskManager節點,然后啟動并運行任務。TaskManager 從 JobManager 接收需要部署的任務,然后使用 Slot 資源啟動 Task,建立數據接入的網絡連接,接收數據并開始數據處理。同時 TaskManager 之間的數據交互都是通過數據流的方式進行的。可以看出,Flink 的任務運行其實是采用多線程的方式,這和 MapReduce 多 JVM 進行的方式有很大的區別,Flink 能夠極大提高 CPU 使用效率,在多個任務和 Task 之間通過 TaskSlot 方式共享系統資源,每個 TaskManager 中通過管理多個 TaskSlot 資源池進行對資源進行有效管理。
📖 5. Flink的源碼編譯(了解)
-
我們可以對flink的源碼進行編譯,方便對我們各種hadoop的版本進行適配
-
參見:https://blog.csdn.net/h335146502/article/details/96483310
cd /kkb/soft
編譯flink-shaded包
wget https://github.com/apache/flink-shaded/archive/release-7.0.tar.gz
tar -zxvf flink-shaded-release-7.0.tar.gz -C /kkb/install/
cd /kkb/install/flink-shaded-release-7.0/
mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.14.2編譯flink源碼
wget http://archive.apache.org/dist/flink/flink-1.9.2/flink-1.9.2-src.tgztar -zxf flink-1.9.2-src.tgz -C /kkb/install/
cd /kkb/install/flink-1.9.2/
mvn -T2C clean install -DskipTests -Dfast -Pinclude-hadoop -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.14.2
📖 6. Local模式安裝
-
1、安裝jdk,配置JAVA_HOME,建議使用jdk1.8以上
-
2、安裝包下載地址:
https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.11.tgz
-
3、直接上傳安裝包到服務器
-
4、解壓安裝包并配置環境變量
tar -zxf flink-1.9.2-bin-scala_2.11.tgz -C /kkb/install/配置環境變量 sudo vim /etc/profile export FLINK_HOME=/kkb/install/flink-1.9.2 export PATH=:$FLINK_HOME/bin:$PATH
-
5、啟動服務
-
local模式,什么配置項都不需要配,直接啟動服務器即可
cd /kkb/install/flink-1.9.2 #啟動flink bin/start-cluster.sh #停止flink bin/stop-cluster.sh
-
-
6、Web頁面瀏覽
- http://node01:8081/#/overview
📖 7. Standalone模式安裝
- (1)集群規劃
主機名 | JobManager | TaskManager |
---|---|---|
node01 | 是 | 是 |
node02 | 是 | 是 |
node03 | 是 |
-
(2)依賴
- jdk1.8以上,配置JAVA_HOME
- 主機之間免密碼
-
(3)安裝步驟
node01修改以下配置文件(a) 修改conf/flink-conf.yaml#jobmanager地址
jobmanager.rpc.address: node01
#使用zookeeper搭建高可用
high-availability: zookeeper
##存儲JobManager的元數據到HDFS
high-availability.storageDir: hdfs://node01:8020/flink
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181(b) 修改conf/slaves
node01
node02
node03(c) 修改conf/masters
node01:8081
node02:8081(d)上傳flink-shaded-hadoop-2-uber-2.7.5-10.0.jar到flink的lib目錄下
將flink-shaded-hadoop-2-uber-2.7.5-10.0.jar 這個jar包上傳到flink的安裝目錄的lib下(e) 拷貝到其他節點
scp -r /kkb/install/flink-1.9.2 node02:/kkb/install
scp -r /kkb/install/flink-1.9.2 node03:/kkb/install(f):node01(JobMananger)節點啟動
注意:啟動之前先啟動hadoop和zookeeper集群
cd /kkb/install/flink-1.9.2
bin/start-cluster.sh(g):訪問
http://node01:8081
http://node02:8081(h):關閉flink集群, 在主節點上執行
cd /kkb/install/flink-1.9.2
bin/stop-cluster.sh
- (4) StandAlone模式需要考慮的參數
jobmanager.heap.mb: jobmanager節點可用的內存大小
taskmanager.heap.mb: taskmanager節點可用的內存大小
taskmanager.numberOfTaskSlots: 每臺taskmanager節點可用的cpu數量
parallelism.default: 默認情況下任務的并行度
taskmanager.tmp.dirs: taskmanager的臨時數據存儲目錄
📖 8. Flink on Yarn模式安裝
- 首先安裝好Hadoop(yarn)
- 上傳一個flink的包,配置好hadoop的環境變量就可以了
- flink on yarn有兩種方式
8.1 第一種方式
-
內存集中管理模式(Yarn Session)
- 在Yarn中初始化一個Flink集群,開辟指定的資源,之后我們提交的Flink Jon都在這個Flink yarn-session中,也就是說不管提交多少個job,這些job都會共用開始時在yarn中申請的資源。這個Flink集群會常駐在Yarn集群中,除非手動停止。
8.2 第二種方式
- 內存Job管理模式==【yarn-cluster 推薦使用】==
- 在Yarn中,每次提交job都會創建一個新的Flink集群,任務之間相互獨立,互不影響并且方便管理。任務執行完成之后創建的集群也會消失。
- 在Yarn中,每次提交job都會創建一個新的Flink集群,任務之間相互獨立,互不影響并且方便管理。任務執行完成之后創建的集群也會消失。
8.3 不同模式的任務提交
-
第一種模式
-
【yarn-session.sh(開辟資源) + flink run(提交任務)】
-
1、在flink目錄啟動yarn-session
bin/yarn-session.sh -n 2 -tm 1024 -s 1 -d# -n 表示申請2個容器, # -s 表示每個容器啟動多少個slot # -tm 表示每個TaskManager申請1024M內存 # -d 表示以后臺程序方式運行
-
2、使用 flink 腳本提交任務
bin/flink run examples/batch/WordCount.jar \ -input hdfs://node01:8020/words.txt \ -output hdfs://node01:8020/output/result.txt##說明:如果啟動了很多的yarn-session, 在提交任務的時候可以通過參數 -yid 指定作業提交到哪一個yarn-session中運行 ##例如: bin/flink run \ -yid application_1597295374041_0008 \ examples/batch/WordCount.jar \ -input hdfs://node01:8020/words.txt \ -output hdfs://node01:8020/output/result.txt
- 3、停止任務
yarn application -kill application_1587024622720_0001
-
-
-
第二種模式
-
【flink run -m yarn-cluster (開辟資源+提交任務)】
-
1、啟動集群,執行任務
bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 \ examples/batch/WordCount.jar \ -input hdfs://node01:8020/words.txt \ -output hdfs://node01:8020/output1注意:client端必須要設置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME環境變量,通過這個環境變量來讀取YARN和HDFS的配置信息,否則啟動會失敗。
-
-
-
help信息
-
yarn-session.sh 腳本參數
用法: 必選 -n,--container <arg> 分配多少個yarn容器 (=taskmanager的數量) 可選 -D <arg> 動態屬性 -d,--detached 獨立運行 -jm,--jobManagerMemory <arg> JobManager的內存 [in MB] -nm,--name 在YARN上為一個自定義的應用設置一個名字 -q,--query 顯示yarn中可用的資源 (內存, cpu核數) -qu,--queue <arg> 指定YARN隊列. -s,--slots <arg> 每個TaskManager使用的slots數量 -tm,--taskManagerMemory <arg> 每個TaskManager的內存 [in MB] -z,--zookeeperNamespace <arg> 針對HA模式在zookeeper上創建NameSpace -id,--applicationId <yarnAppId> YARN集群上的任務id,附著到一個后臺運行的yarn session中
-
flink run 腳本參數
run [OPTIONS] <jar-file> <arguments> "run" 操作參數: -c,--class <classname> 如果沒有在jar包中指定入口類,則需要在這里通過這個參數指定 -m,--jobmanager <host:port> 指定需要連接的jobmanager(主節點)地址,使用這個參數可以指定一個不同于配置文件中的jobmanager -p,--parallelism <parallelism> 指定程序的并行度。可以覆蓋配置文件中的默認值。默認查找當前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】: ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1連接指定host和port的jobmanager: ./bin/flink run -m node01:6123 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1啟動一個新的yarn-session: ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1注意:yarn session命令行的選項也可以使用./bin/flink 工具獲得。它們都有一個y或者yarn的前綴 例如:flink run -m yarn-cluster -yn 2 examples/batch/WordCount.jar
-
8.4 Flink on YARN集群部署
- (1) flink on yarn運行原理
- 其實Flink on YARN部署很簡單,就是只要部署好hadoop集群即可,我們只需要部署一個Flink客戶端,然后從flink客戶端提交Flink任務即可。類似于spark on yarn模式。
📖 9. 入門案例演示
9.1 實時需求分析
實時統計每隔1秒統計最近2秒單詞出現的次數
- 創建maven工程,添加pom依賴
<properties><flink.version>1.9.2</flink.version><scala.version>2.11.8</scala.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
9.1.1 實時代碼開發(scala版本,flink完全倒向了java,別用scala了)
-
代碼開發
package com.kaikeba.demo1import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time/*** 使用滑動窗口* 每隔1秒鐘統計最近2秒鐘的每個單詞出現的次數*/ object FlinkStream {def main(args: Array[String]): Unit = {//構建流處理的環境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//從socket獲取數據val sourceStream: DataStream[String] = env.socketTextStream("node01",9999)//導入隱式轉換的包import org.apache.flink.api.scala._//對數據進行處理val result: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")) //按照空格切分.map(x => (x, 1)) //每個單詞計為1.keyBy(0) //按照下標為0的單詞進行分組 .timeWindow(Time.seconds(2),Time.seconds(1)) //每隔1s處理2s的數據.sum(1) //按照下標為1累加相同單詞出現的次數//對數據進行打印result.print()//開啟任務env.execute("FlinkStream")}}
-
發送 socket 數據
##在node01上安裝nc服務 sudo yum -y install nc nc -lk 9999
-
打成jar包提交到yarn中運行
flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -c com.kaikeba.demo1.FlinkStream original-flink_study-1.0-SNAPSHOT.jar
9.1.2 實時代碼開發(java版本)
-
代碼開發
package com.kaikeba.demo1;import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;/*** java代碼開發實時統計每隔1秒統計最近2秒單詞出現的次數*/ public class WindowWordCountJava {public static void main(String[] args) throws Exception {//步驟一:獲取流式處理環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//步驟二:獲取socket數據DataStreamSource<String> sourceDstream = env.socketTextStream("node01", 9999);//步驟三:對數據進行處理DataStream<WordCount> wordAndOneStream = sourceDstream.flatMap(new FlatMapFunction<String, WordCount>() {public void flatMap(String line, Collector<WordCount> collector) throws Exception {String[] words = line.split(" ");for (String word : words) {collector.collect(new WordCount(word, 1L));}}});DataStream<WordCount> resultStream = wordAndOneStream.keyBy("word") //按照單詞分組.timeWindow(Time.seconds(2), Time.seconds(1)) //每隔1s統計2s的數據.sum("count"); //按照count字段累加結果//步驟四:結果打印resultStream.print();//步驟五:任務啟動env.execute("WindowWordCountJava");}public static class WordCount{public String word;public long count;//記得要有這個空構建public WordCount(){}public WordCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString() {return "WordCount{" +"word='" + word + '\'' +", count=" + count +'}';}} }
-
發送socket數據
#在node01上執行命令,發送數據 nc -lk 9999
9.2 離線需求分析
對文件進行單詞計數,統計文件當中每個單詞出現的次數。
9.2.1 離線代碼開發(scala)
package com.kaikeba.demo1import org.apache.flink.api.scala.{ DataSet, ExecutionEnvironment}/*** scala開發flink的批處理程序*/
object FlinkFileCount {def main(args: Array[String]): Unit = {//todo:1、構建Flink的批處理環境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//todo:2、讀取數據文件val fileDataSet: DataSet[String] = env.readTextFile("d:\\words.txt")import org.apache.flink.api.scala._//todo: 3、對數據進行處理val resultDataSet: AggregateDataSet[(String, Int)] = fileDataSet.flatMap(x=> x.split(" ")).map(x=>(x,1)).groupBy(0).sum(1)//todo: 4、打印結果resultDataSet.print()//todo: 5、保存結果到文件resultDataSet.writeAsText("d:\\result")env.execute("FlinkFileCount")}
}
📖 10. Flink并行度&Slot&Task
Flink的每個TaskManager為集群提供solt。每個task slot代表了TaskManager的一個固定大小的資源子集。 solt的數量通常與每個TaskManager節點的可用CPU內核數成比例。一般情況下你的slot數是你每個節點的cpu的核數。
10.1 并行度
一個Flink程序由多個任務組成(source、transformation和 sink)。 一個任務由多個并行的實例(線程)來執行, 一個任務的并行實例 (線程) 數目就被稱為該任務的并行度。
10.2 并行度的設置
- 一個任務的并行度設置可以從多個級別指定
- Operator Level(算子級別)
- Execution Environment Level(執行環境級別)
- Client Level(客戶端級別)
- System Level(系統級別)
- 這些并行度的優先級為
- Operator Level > Execution Environment Level > Client Level > System Level
10.2.1 算子級別
10.2.2 執行環境級別
?
10.2.3 客戶端級別
-
并行度可以在客戶端將job提交到Flink時設定,對于CLI客戶端,可以通過-p參數指定并行度
bin/flink run -p 10 examples/batch/WordCount.jar
10.2.4 系統級別
-
在系統級可以通過設置flink-conf.yaml文件中的parallelism.default屬性來指定所有執行環境的默認并行度
parallelism.default: 1
10.3 并行度操作演示
-
為了方便在本地測試觀察任務并行度信息,可以在本地工程添加以下依賴
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink.version}</version> </dependency>
-
案例
- 注意獲取程序的執行環境發生變化了
- val environment=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
package com.kaikeba.demo1import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time/*** 本地調試并行度*/ object TestParallelism {def main(args: Array[String]): Unit = {//使用createLocalEnvironmentWithWebUI方法,構建本地流式處理環境val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()//執行環境級別//environment.setParallelism(4)import org.apache.flink.api.scala._//接受socket數據val sourceStream: DataStream[String] = environment.socketTextStream("node01",9999)val countStream: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")).setParallelism(5) //算子級別.map(x => (x, 1)).keyBy(0).timeWindow(Time.seconds(2), Time.seconds(1)).sum(1)countStream.print()environment.execute()} }
- 設置并行度,觀察localhost:8081界面
2?? 總結
-
掌握Flink的編程規范
-
了解Flink的集群模式