Flink深入淺出之01:應用場景、基本架構、部署模式

Flink

1?? 一 、知識要點

📖 1. Flink簡介

  • Apache Flink? — Stateful Computations over Data Streams
  • Apache Flink 是一個分布式大數據處理引擎,可對有界數據流無界數據流進行有狀態的計算。Flink 能在所有常見集群環境中運行,并能以內存速度和任意規模進行計算
    • 官網地址:http://flink.apache.org

在這里插入圖片描述

1.1 處理無界和有界數據
  • 任何類型的數據都可以形成一種事件流。信用卡交易、傳感器測量、機器日志、網站或移動應用程序上的用戶交互記錄,所有這些數據都形成一種流。

  • 數據可以被作為無界或者有界流來處理。

(1) 無界流 有定義流的開始,但沒有定義流的結束。它們會無休止地產生數據。無界流的數據必須持續處理,即數據被攝取后需要立刻處理。我們不能等到所有數據都到達再處理,因為輸入是無限的,在任何時候輸入都不會完成。處理無界數據通常要求以特定順序攝取事件,例如事件發生的順序,以便能夠推斷結果的完整性。(2) 有界流 有定義流的開始,也有定義流的結束。有界流可以在攝取所有數據后再進行計算。有界流所有數據可以被排序,所以并不需要有序攝取。有界流處理通常被稱為批處理

在這里插入圖片描述

  • Apache Flink 擅長處理無界和有界數據集 精確的時間控制和狀態化使得 Flink 的運行時(runtime)能夠運行任何處理無界流的應用。有界流則由一些專為固定大小數據集特殊設計的算法和數據結構進行內部處理,產生了出色的性能。
1.2 部署應用到任意地方
  • Apache Flink 是一個分布式系統,它需要計算資源來執行應用程序。Flink 集成了所有常見的集群資源管理器,例如 Hadoop YARN、 Apache Mesos 和 Kubernetes,但同時也可以作為獨立集群運行。

    	Flink 被設計為能夠很好地工作在上述每個資源管理器中,這是通過資源管理器特定(resource-manager-specific)的部署模式實現的。Flink 可以采用與當前資源管理器相適應的方式進行交互。部署 Flink 應用程序時,Flink 會根據應用程序配置的并行性自動標識所需的資源,并從資源管理器請求這些資源。在發生故障的情況下,Flink 通過請求新資源來替換發生故障的容器。提交或控制應用程序的所有通信都是通過 REST 調用進行的,這可以簡化 Flink 與各種環境中的集成
    
1.3 運行任意規模應用
  • Flink 旨在任意規模上運行有狀態流式應用。因此,應用程序被并行化為可能數千個任務,這些任務分布在集群中并發執行。所以應用程序能夠充分利用無盡的 CPU、內存、磁盤和網絡 IO。而且 Flink 很容易維護非常大的應用程序狀態。其異步和增量的檢查點算法對處理延遲產生最小的影響,同時保證精確一次狀態的一致性。

    Flink 用戶報告了其生產環境中一些令人印象深刻的擴展性數字:每天處理數萬億的事件可以維護幾TB大小的狀態可以部署上千個節點的集群
    
1.4 利用內存性能
  • 有狀態的 Flink 程序針對本地狀態訪問進行了優化。任務的狀態始終保留在內存中,如果狀態大小超過可用內存,則會保存在能高效訪問的磁盤數據結構中。任務通過訪問本地(通常在內存中)狀態來進行所有的計算,從而產生非常低的處理延遲。Flink 通過定期和異步地對本地狀態進行持久化存儲來保證故障場景下精確一次的狀態一致性。

在這里插入圖片描述

📖 2. Flink 的應用場景

  在實際生產的過程中,大量數據在不斷地產生,例如金融交易數據、互聯網訂單數據、 GPS 定位數據、傳感器信號、移動終端產生的數據、通信信號數據等,以及我們熟悉的網絡 流量監控、服務器產生的日志數據,這些數據最大的共同點就是實時從不同的數據源中產生, 然后再傳輸到下游的分析系統。針對這些數據類型主要包括實時智能推薦、復雜事件處理、 實時欺詐檢測、實時數倉與 ETL 類型、流數據分析類型、實時報表類型等實時業務場景,而 Flink 對于這些類型的場景都有著非常好的支持。
  • 1、實時智能推薦

    	智能推薦會根據用戶歷史的購買行為,通過推薦算法訓練模型,預測用戶未來可能會購 買的物品。對個人來說,推薦系統起著信息過濾的作用,對 Web/App 服務端來說,推薦系統 起著滿足用戶個性化需求,提升用戶滿意度的作用。推薦系統本身也在飛速發展,除了算法 越來越完善,對時延的要求也越來越苛刻和實時化。利用 Flink 流計算幫助用戶構建更加實 時的智能推薦系統,對用戶行為指標進行實時計算,對模型進行實時更新,對用戶指標進行 實時預測,并將預測的信息推送給 Wep/App 端,幫助用戶獲取想要的商品信息,另一方面也 幫助企業提升銷售額,創造更大的商業價值。
    
  • 2、復雜事件處理

    	對于復雜事件處理,比較常見的案例主要集中于工業領域,例如對車載傳感器、機械設 備等實時故障檢測,這些業務類型通常數據量都非常大,且對數據處理的時效性要求非常高。 通過利用 Flink 提供的 CEP(復雜事件處理)進行事件模式的抽取,同時應用 Flink 的 Sql 進行事件數據的轉換,在流式系統中構建實時規則引擎,一旦事件觸發報警規則,便立即將 告警結果傳輸至下游通知系統,從而實現對設備故障快速預警監測,車輛狀態監控等目的。
    
  • 3、實時欺詐檢測

    	在金融領域的業務中,常常出現各種類型的欺詐行為,例如信用卡欺詐、信貸申請欺詐等,而如何保證用戶和公司的資金安全,是來近年來許多金融公司及銀行共同面對的挑戰。 隨著不法分子欺詐手段的不斷升級,傳統的反欺詐手段已經不足以解決目前所面臨的問題。 以往可能需要幾個小時才能通過交易數據計算出用戶的行為指標,然后通過規則判別出具有 欺詐行為嫌疑的用戶,再進行案件調查處理,在這種情況下資金可能早已被不法分子轉移, 從而給企業和用戶造成大量的經濟損失。而運用 Flink 流式計算技術能夠在毫秒內就完成對 欺詐判斷行為指標的計算,然后實時對交易流水進行規則判斷或者模型預測,這樣一旦檢測 出交易中存在欺詐嫌疑,則直接對交易進行實時攔截,避免因為處理不及時而導致的經濟損失。
    
  • 4、實時數倉與 ETL

    	結合離線數倉,通過利用流計算諸多優勢和 SQL 靈活的加工能力,對流式數據進行實時 清洗、歸并、結構化處理,為離線數倉進行補充和優化。另一方面結合實時數據 ETL 處理能力,利用有狀態流式計算技術,可以盡可能降低企業由于在離線數據計算過程中調度邏輯的 復雜度,高效快速地處理企業需要的統計結果,幫助企業更好地應用實時數據所分析出來的結果。
    
  • 5、流數據分析

    	實時計算各類數據指標,并利用實時結果及時調整在線系統相關策略,在各類內容投放、 無線智能推送領域有大量的應用。流式計算技術將數據分析場景實時化,幫助企業做到實時化分析 Web 應用或者 App 應用的各項指標,包括 App 版本分布情況、Crash 檢測和分布等, 同時提供多維度用戶行為分析,支持日志自主分析,助力開發者實現基于大數據技術的精細 化運營、提升產品質量和體驗、增強用戶黏性。
    
  • 6、實時報表分析

    	實時報表分析是近年來很多公司采用的報表統計方案之一,其中最主要的應用便是實時 大屏展示。利用流式計算實時得出的結果直接被推送到前端應用,實時顯示出重要指標的變 換情況。最典型的案例便是淘寶的雙十一活動,每年雙十一購物節,除瘋狂購物外,最引人 注目的就是天貓雙十一大屏不停跳躍的成交總額。在整個計算鏈路中包括從天貓交易下單購 買到數據采集、數據計算、數據校驗,最終落到雙十一大屏上展現的全鏈路時間壓縮在 5 秒以內,頂峰計算性能高達數三十萬筆訂單/秒,通過多條鏈路流計算備份確保萬無一失。 而在其他行業,企業也在構建自己的實時報表系統,讓企業能夠依托于自身的業務數據,快速提取出更多的數據價值,從而更好地服務于企業運行過程中。
    

📖 3. Flink基本技術棧

在這里插入圖片描述

在flink整個軟件架構體系中。同樣遵循著分層的架構設計理念,在降低系統耦合度的同時,也為上層用戶構建flink應用提供了豐富且友好的接口。

  • API & libraries層
	作為分布式數據處理框架,fink同時提供了支撐流計算和批計算的接口,同時在此基礎之上抽象出不同的應用類型的組件庫。
如:基于流處理的CEP(復雜事件處理庫)、SQL&Table庫、FlinkML(機器學習庫)、Gelly(圖處理庫)
有流式處理API,批處理API。流式處理的支持事件處理,表操作。批處理的,支持機器學習,圖計算,也支持表操作。
  • Runtime核心層
	該層主要負責對上層的接口提供基礎服務,也就是flink分布式計算的核心實現。flink底層的執行引擎。
  • 物理部署層
該層主要涉及到flink的部署模式,目前flink支持多種部署模式:
本地 local
集群 standalone/yarn
云 GCE/EC2  谷歌云、亞馬遜云
kubenetes

📖 4. Flink基本架構

在這里插入圖片描述

? Flink 整個系統主要由兩個組件組成,分別為 JobManager 和 TaskManager,Flink 架構也遵循 Master-Slave 架構設計原則,JobManager 為 Master 節點,TaskManager 為 Worker(Slave)節點。所有組件之間的通信都是借助于 Akka Framework,包括任務的狀態以及 Checkpoint 觸發等信息

  • Client

    	客戶端負責將任務提交到集群,與 JobManager 構建 Akka 連接,然后將任務提交JobManager,通過和 JobManager 之間進行交互獲取任務執行狀態。客戶端提交任務可以采用CLI 方式或者通過使用 Flink WebUI 提交,也可以在應用程序中指定 JobManager 的 RPC 網絡端口構建 ExecutionEnvironment 提交 Flink 應用。
    
  • JobManager

    	JobManager 負責整個 Flink 集群任務的調度以及資源的管理,從客戶端中獲取提交的應用,然后根據集群中 TaskManager 上 TaskSlot 的使用情況,為提交的應用分配相應的 TaskSlot 資源并命令 TaskManager 啟動從客戶端中獲取的應用。JobManager 相當于整個集群的 Master 節點,且整個集群有且只有一個活躍的 JobManager ,負責整個集群的任務管理和資源管理。JobManager 和 TaskManager 之間通過 Actor System 進行通信,獲取任務執行的情況并通過 Actor System 將應用的任務執行情況發送給客戶端。同時在任務執行的過程中,Flink JobManager 會觸發 Checkpoint 操作,每個 TaskManager 節點 收到 Checkpoint 觸發指令后,完成 Checkpoint 操作,所有的 Checkpoint 協調過程都是在 Fink JobManager 中完成。當任務完成后,Flink 會將任務執行的信息反饋給客戶端,并且釋放掉 TaskManager 中的資源以供下一次提交任務使用。
    
  • TaskManager

    	TaskManager 相當于整個集群的 Slave 節點,負責具體的任務執行和對應任務在每個節點上的資源申請和管理。客戶端通過將編寫好的 Flink 應用編譯打包,提交到 JobManager,然后 JobManager 會根據已注冊在 JobManager 中 TaskManager 的資源情況,將任務分配給有資源的 TaskManager節點,然后啟動并運行任務。TaskManager 從 JobManager 接收需要部署的任務,然后使用 Slot 資源啟動 Task,建立數據接入的網絡連接,接收數據并開始數據處理。同時 TaskManager 之間的數據交互都是通過數據流的方式進行的。可以看出,Flink 的任務運行其實是采用多線程的方式,這和 MapReduce 多 JVM 進行的方式有很大的區別,Flink 能夠極大提高 CPU 使用效率,在多個任務和 Task 之間通過 TaskSlot 方式共享系統資源,每個 TaskManager 中通過管理多個 TaskSlot 資源池進行對資源進行有效管理。
    

📖 5. Flink的源碼編譯(了解)

  • 我們可以對flink的源碼進行編譯,方便對我們各種hadoop的版本進行適配

  • 參見:https://blog.csdn.net/h335146502/article/details/96483310

cd /kkb/soft
編譯flink-shaded包
wget  https://github.com/apache/flink-shaded/archive/release-7.0.tar.gz
tar -zxvf flink-shaded-release-7.0.tar.gz -C /kkb/install/
cd /kkb/install/flink-shaded-release-7.0/
mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.14.2編譯flink源碼
wget http://archive.apache.org/dist/flink/flink-1.9.2/flink-1.9.2-src.tgztar -zxf flink-1.9.2-src.tgz -C /kkb/install/
cd /kkb/install/flink-1.9.2/
mvn -T2C clean install -DskipTests -Dfast -Pinclude-hadoop -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.14.2

📖 6. Local模式安裝

  • 1、安裝jdk,配置JAVA_HOME,建議使用jdk1.8以上

  • 2、安裝包下載地址:

    https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.11.tgz

  • 3、直接上傳安裝包到服務器

  • 4、解壓安裝包并配置環境變量

    tar -zxf flink-1.9.2-bin-scala_2.11.tgz -C /kkb/install/配置環境變量
    sudo vim /etc/profile
    export FLINK_HOME=/kkb/install/flink-1.9.2
    export PATH=:$FLINK_HOME/bin:$PATH
    
  • 5、啟動服務

    • local模式,什么配置項都不需要配,直接啟動服務器即可

      cd /kkb/install/flink-1.9.2
      #啟動flink
      bin/start-cluster.sh 
      #停止flink
      bin/stop-cluster.sh 
      
  • 6、Web頁面瀏覽

    • http://node01:8081/#/overview

在這里插入圖片描述

📖 7. Standalone模式安裝

  • (1)集群規劃
主機名JobManagerTaskManager
node01
node02
node03
  • (2)依賴

    • jdk1.8以上,配置JAVA_HOME
    • 主機之間免密碼
  • (3)安裝步驟

node01修改以下配置文件(a) 修改conf/flink-conf.yaml#jobmanager地址
jobmanager.rpc.address: node01
#使用zookeeper搭建高可用
high-availability: zookeeper
##存儲JobManager的元數據到HDFS
high-availability.storageDir: hdfs://node01:8020/flink
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181(b) 修改conf/slaves
node01
node02
node03(c) 修改conf/masters
node01:8081
node02:8081(d)上傳flink-shaded-hadoop-2-uber-2.7.5-10.0.jar到flink的lib目錄下
將flink-shaded-hadoop-2-uber-2.7.5-10.0.jar 這個jar包上傳到flink的安裝目錄的lib下(e) 拷貝到其他節點
scp -r /kkb/install/flink-1.9.2 node02:/kkb/install
scp -r /kkb/install/flink-1.9.2 node03:/kkb/install(f):node01(JobMananger)節點啟動
注意:啟動之前先啟動hadoop和zookeeper集群
cd /kkb/install/flink-1.9.2
bin/start-cluster.sh(g):訪問
http://node01:8081
http://node02:8081(h):關閉flink集群, 在主節點上執行
cd /kkb/install/flink-1.9.2
bin/stop-cluster.sh
  • (4) StandAlone模式需要考慮的參數
jobmanager.heap.mb:     		jobmanager節點可用的內存大小
taskmanager.heap.mb:    		taskmanager節點可用的內存大小
taskmanager.numberOfTaskSlots:   每臺taskmanager節點可用的cpu數量
parallelism.default:             默認情況下任務的并行度
taskmanager.tmp.dirs:            taskmanager的臨時數據存儲目錄

📖 8. Flink on Yarn模式安裝

  1. 首先安裝好Hadoop(yarn)
  2. 上傳一個flink的包,配置好hadoop的環境變量就可以了
  • flink on yarn有兩種方式
8.1 第一種方式
  • 內存集中管理模式(Yarn Session

    • 在Yarn中初始化一個Flink集群,開辟指定的資源,之后我們提交的Flink Jon都在這個Flink yarn-session中,也就是說不管提交多少個job,這些job都會共用開始時在yarn中申請的資源。這個Flink集群會常駐在Yarn集群中,除非手動停止。

在這里插入圖片描述

8.2 第二種方式
  • 內存Job管理模式==【yarn-cluster 推薦使用】==
    • 在Yarn中,每次提交job都會創建一個新的Flink集群,任務之間相互獨立,互不影響并且方便管理。任務執行完成之后創建的集群也會消失。
      在這里插入圖片描述
8.3 不同模式的任務提交
  • 第一種模式

    • 【yarn-session.sh(開辟資源) + flink run(提交任務)】

      • 1、在flink目錄啟動yarn-session

        bin/yarn-session.sh -n 2 -tm 1024 -s 1 -d# -n 表示申請2個容器,
        # -s 表示每個容器啟動多少個slot
        # -tm 表示每個TaskManager申請1024M內存
        # -d 表示以后臺程序方式運行
        
      • 2、使用 flink 腳本提交任務

        bin/flink run examples/batch/WordCount.jar \
        -input  hdfs://node01:8020/words.txt \
        -output hdfs://node01:8020/output/result.txt##說明:如果啟動了很多的yarn-session, 在提交任務的時候可以通過參數 -yid 指定作業提交到哪一個yarn-session中運行
        ##例如:
        bin/flink run \
        -yid application_1597295374041_0008 \
        examples/batch/WordCount.jar \
        -input  hdfs://node01:8020/words.txt \
        -output hdfs://node01:8020/output/result.txt
        • 3、停止任務
      yarn application -kill application_1587024622720_0001
      
  • 第二種模式

    • 【flink run -m yarn-cluster (開辟資源+提交任務)】

      • 1、啟動集群,執行任務

        bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 \
        examples/batch/WordCount.jar \
        -input hdfs://node01:8020/words.txt \
        -output hdfs://node01:8020/output1注意:client端必須要設置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME環境變量,通過這個環境變量來讀取YARN和HDFS的配置信息,否則啟動會失敗。
        
  • help信息

    • yarn-session.sh 腳本參數

      用法:  必選  -n,--container <arg>   分配多少個yarn容器 (=taskmanager的數量)  可選  -D <arg>                        動態屬性  -d,--detached                   獨立運行  -jm,--jobManagerMemory <arg>    JobManager的內存 [in MB]  -nm,--name                     在YARN上為一個自定義的應用設置一個名字  -q,--query                      顯示yarn中可用的資源 (內存, cpu核數)  -qu,--queue <arg>               指定YARN隊列.  -s,--slots <arg>                每個TaskManager使用的slots數量  -tm,--taskManagerMemory <arg>   每個TaskManager的內存 [in MB]  -z,--zookeeperNamespace <arg>   針對HA模式在zookeeper上創建NameSpace -id,--applicationId <yarnAppId> YARN集群上的任務id,附著到一個后臺運行的yarn    session中
      
    • flink run 腳本參數

      run [OPTIONS] <jar-file> <arguments>  "run" 操作參數:  
      -c,--class <classname>  如果沒有在jar包中指定入口類,則需要在這里通過這個參數指定  
      -m,--jobmanager <host:port>  指定需要連接的jobmanager(主節點)地址,使用這個參數可以指定一個不同于配置文件中的jobmanager  
      -p,--parallelism <parallelism>   指定程序的并行度。可以覆蓋配置文件中的默認值。默認查找當前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】:
      ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1連接指定host和port的jobmanager:
      ./bin/flink run -m node01:6123 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1啟動一個新的yarn-session:
      ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1注意:yarn session命令行的選項也可以使用./bin/flink 工具獲得。它們都有一個y或者yarn的前綴
      例如:flink run -m yarn-cluster -yn 2 examples/batch/WordCount.jar 
8.4 Flink on YARN集群部署
  • (1) flink on yarn運行原理

在這里插入圖片描述

在這里插入圖片描述

  • 其實Flink on YARN部署很簡單,就是只要部署好hadoop集群即可,我們只需要部署一個Flink客戶端,然后從flink客戶端提交Flink任務即可。類似于spark on yarn模式。

📖 9. 入門案例演示

9.1 實時需求分析
實時統計每隔1秒統計最近2秒單詞出現的次數
  • 創建maven工程,添加pom依賴
 <properties><flink.version>1.9.2</flink.version><scala.version>2.11.8</scala.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
9.1.1 實時代碼開發(scala版本,flink完全倒向了java,別用scala了)
  • 代碼開發

    package com.kaikeba.demo1import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.windowing.time.Time/*** 使用滑動窗口* 每隔1秒鐘統計最近2秒鐘的每個單詞出現的次數*/
    object FlinkStream {def main(args: Array[String]): Unit = {//構建流處理的環境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//從socket獲取數據val sourceStream: DataStream[String] = env.socketTextStream("node01",9999)//導入隱式轉換的包import org.apache.flink.api.scala._//對數據進行處理val result: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")) //按照空格切分.map(x => (x, 1))           //每個單詞計為1.keyBy(0)                   //按照下標為0的單詞進行分組      .timeWindow(Time.seconds(2),Time.seconds(1)) //每隔1s處理2s的數據.sum(1)            //按照下標為1累加相同單詞出現的次數//對數據進行打印result.print()//開啟任務env.execute("FlinkStream")}}
  • 發送 socket 數據

    ##在node01上安裝nc服務
    sudo yum -y install nc
    nc -lk 9999
    
  • 打成jar包提交到yarn中運行

    flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -c com.kaikeba.demo1.FlinkStream original-flink_study-1.0-SNAPSHOT.jar 
    
9.1.2 實時代碼開發(java版本)
  • 代碼開發

    package com.kaikeba.demo1;import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;/*** java代碼開發實時統計每隔1秒統計最近2秒單詞出現的次數*/
    public class WindowWordCountJava {public static void main(String[] args) throws Exception {//步驟一:獲取流式處理環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//步驟二:獲取socket數據DataStreamSource<String> sourceDstream = env.socketTextStream("node01", 9999);//步驟三:對數據進行處理DataStream<WordCount> wordAndOneStream = sourceDstream.flatMap(new FlatMapFunction<String, WordCount>() {public void flatMap(String line, Collector<WordCount> collector) throws Exception {String[] words = line.split(" ");for (String word : words) {collector.collect(new WordCount(word, 1L));}}});DataStream<WordCount> resultStream = wordAndOneStream.keyBy("word")  //按照單詞分組.timeWindow(Time.seconds(2), Time.seconds(1)) //每隔1s統計2s的數據.sum("count");   //按照count字段累加結果//步驟四:結果打印resultStream.print();//步驟五:任務啟動env.execute("WindowWordCountJava");}public static class WordCount{public String word;public long count;//記得要有這個空構建public WordCount(){}public WordCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString() {return "WordCount{" +"word='" + word + '\'' +", count=" + count +'}';}}
    }
  • 發送socket數據

    #在node01上執行命令,發送數據
    nc -lk 9999
    
9.2 離線需求分析
對文件進行單詞計數,統計文件當中每個單詞出現的次數。
9.2.1 離線代碼開發(scala)
package com.kaikeba.demo1import org.apache.flink.api.scala.{ DataSet, ExecutionEnvironment}/*** scala開發flink的批處理程序*/
object FlinkFileCount {def main(args: Array[String]): Unit = {//todo:1、構建Flink的批處理環境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//todo:2、讀取數據文件val fileDataSet: DataSet[String] = env.readTextFile("d:\\words.txt")import org.apache.flink.api.scala._//todo: 3、對數據進行處理val resultDataSet: AggregateDataSet[(String, Int)] = fileDataSet.flatMap(x=> x.split(" ")).map(x=>(x,1)).groupBy(0).sum(1)//todo: 4、打印結果resultDataSet.print()//todo: 5、保存結果到文件resultDataSet.writeAsText("d:\\result")env.execute("FlinkFileCount")}
}

📖 10. Flink并行度&Slot&Task

	Flink的每個TaskManager為集群提供solt。每個task slot代表了TaskManager的一個固定大小的資源子集。 solt的數量通常與每個TaskManager節點的可用CPU內核數成比例。一般情況下你的slot數是你每個節點的cpu的核數。

在這里插入圖片描述

10.1 并行度

一個Flink程序由多個任務組成(source、transformation和 sink)。 一個任務由多個并行的實例(線程)來執行, 一個任務的并行實例 (線程) 數目就被稱為該任務的并行度。

10.2 并行度的設置
  • 一個任務的并行度設置可以從多個級別指定
    • Operator Level(算子級別)
    • Execution Environment Level(執行環境級別)
    • Client Level(客戶端級別)
    • System Level(系統級別)
  • 這些并行度的優先級為
    • Operator Level > Execution Environment Level > Client Level > System Level
10.2.1 算子級別

在這里插入圖片描述

10.2.2 執行環境級別

在這里插入圖片描述
?

10.2.3 客戶端級別
  • 并行度可以在客戶端將job提交到Flink時設定,對于CLI客戶端,可以通過-p參數指定并行度

    bin/flink run -p 10 examples/batch/WordCount.jar
    
10.2.4 系統級別
  • 在系統級可以通過設置flink-conf.yaml文件中的parallelism.default屬性來指定所有執行環境的默認并行度

    parallelism.default: 1
    
10.3 并行度操作演示
  • 為了方便在本地測試觀察任務并行度信息,可以在本地工程添加以下依賴

    <dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink.version}</version>
    </dependency>
    
  • 案例

    • 注意獲取程序的執行環境發生變化了
    • val environment=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    package com.kaikeba.demo1import org.apache.flink.core.fs.FileSystem.WriteMode
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.windowing.time.Time/*** 本地調試并行度*/
    object TestParallelism {def main(args: Array[String]): Unit = {//使用createLocalEnvironmentWithWebUI方法,構建本地流式處理環境val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()//執行環境級別//environment.setParallelism(4)import org.apache.flink.api.scala._//接受socket數據val sourceStream: DataStream[String] = environment.socketTextStream("node01",9999)val countStream: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")).setParallelism(5) //算子級別.map(x => (x, 1)).keyBy(0).timeWindow(Time.seconds(2), Time.seconds(1)).sum(1)countStream.print()environment.execute()}
    }
    • 設置并行度,觀察localhost:8081界面

    在這里插入圖片描述

在這里插入圖片描述

2?? 總結

  • 掌握Flink的編程規范

  • 了解Flink的集群模式

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/71573.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/71573.shtml
英文地址,請注明出處:http://en.pswp.cn/web/71573.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

2025年【高壓電工】報名考試及高壓電工考試總結

隨著電力行業的快速發展&#xff0c;高壓電工成為確保電力系統安全穩定運行的重要一環。為了提高高壓電工的專業技能和安全意識&#xff0c;“安全生產模擬考試一點通”平臺特別整理了2025年高壓電工報名考試的相關信息及考試總結&#xff0c;并提供了一套完整的題庫&#xff0…

網絡HTTP

HTTP Network Request Library A Retrofit-based HTTP network request encapsulation library that provides simple and easy-to-use API interfaces with complete network request functionality. 基于Retrofit的HTTP網絡請求封裝庫&#xff0c;提供簡單易用的API接口和完…

os-copilot安裝和使用體驗測評

簡介&#xff1a; OS Copilot是阿里云基于大模型構建的Linux系統智能助手&#xff0c;支持自然語言問答、命令執行和系統運維調優。本文介紹其產品優勢、功能及使用方法&#xff0c;并分享個人開發者在云服務器資源管理中的實際應用體驗。通過-t/-f/管道功能&#xff0c;OS Cop…

Python Flask框架學習匯編

1、入門級&#xff1a; 《Python Flask Web 框架入門》 這篇博文條理清晰&#xff0c;由簡入繁&#xff0c;案例豐富&#xff0c;分十五節詳細講解了Flask框架&#xff0c;強烈推薦&#xff01; 《python的簡單web框架flask【附例子】》 講解的特別清楚&#xff0c;每一步都…

【HarmonyOS Next之旅】DevEco Studio使用指南(一)

目錄 1 -> 工具簡介 1.1 -> 概述 1.2 -> HarmonyOS應用/服務開發流程 1.2.1 -> 開發準備 1.2.2 -> 開發應用/服務 1.2.3 -> 運行、調試和測試應用/服務 1.2.4 -> 發布應用/服務 2 -> 工程介紹 2.1 -> APP包結構 2.2 -> 切換工程視圖 …

Manus開源平替-開源通用智能體

原文鏈接:https://i68.ltd/notes/posts/250306-opensource-agi-agent/ OWL-比Manus還強的全能開源Agent OWL: Optimized Workforce Learning for General Multi-Agent Assistance in Real-World Task Automation&#xff0c;現實世界中執行自動化任務的通用多代理輔助優化學習…

【3.2-3.8學習周報】

提示&#xff1a;文章寫完后&#xff0c;目錄可以自動生成&#xff0c;如何生成可參考右邊的幫助文檔 文章目錄 摘要Abstract一、方法介紹1.任務適應性持續預訓練&#xff08;TACP&#xff09;2.領域自適應連續預訓練&#xff08;DACP&#xff09;3.ETS-DACP和ETA-DACP 二、實驗…

【Linux】用戶和組

思考 使用useradd在Linux下面創建一個用戶&#xff0c;默認情況下&#xff0c;會自動創建一個同名組&#xff0c;并且加入其中&#xff0c;那么是先創建用戶呢&#xff1f;還是先創建組呢&#xff1f; 很簡單&#xff0c;我們實踐一下不就知道了&#xff0c;如下所示&#xff…

新編大學應用英語綜合教程2 U校園全套參考答案

全套答案獲取&#xff1a; 鏈接&#xff1a;https://pan.quark.cn/s/389618f53143

SAP 顧問的五年職業規劃

SAP 顧問的職業發展受到技術進步、企業需求變化和全球經濟環境的影響&#xff0c;因此制定長遠規劃充滿挑戰。面對 SAP 產品路線圖的不確定性&#xff0c;如向 S/4HANA 和 Business Technology Platform (BTP) 的轉變&#xff0c;顧問必須具備靈活性&#xff0c;以保持競爭力和…

圖像生成-ICCV2019-SinGAN: Learning a Generative Model from a Single Natural Image

圖像生成-ICCV2019-SinGAN: Learning a Generative Model from a Single Natural Image 文章目錄 圖像生成-ICCV2019-SinGAN: Learning a Generative Model from a Single Natural Image主要創新點模型架構圖生成器生成器源碼 判別器判別器源碼 損失函數需要源碼講解的私信我 S…

Networking Based ISAC Hardware Testbed and Performance Evaluation

文章目錄 Applications and Challenges of Networked SensingCooperation Mechanism in Networked SensingChallenges and Key Enabling Technologies 5G NR Frame Structure Based ISAC ApproachSignals Available for Radio SensingMulti-Dimensiona Resource Optimization S…

2025年主流原型工具測評:墨刀、Axure、Figma、Sketch

2025年主流原型工具測評&#xff1a;墨刀、Axure、Figma、Sketch 要說2025年國內產品經理使用的主流原型設計工具&#xff0c;當然是墨刀、Axure、Figma和Sketch了&#xff0c;但是很多剛入行的產品經理不了解自己適合哪些工具&#xff0c;本文將從核心優勢、局限短板、協作能…

我代表中國受邀在亞馬遜云科技全球云計算大會re:Invent中技術演講

大家好我是小李哥&#xff0c;本名叫李少奕&#xff0c;目前在一家金融行業公司擔任首席云計算工程師。去年5月很榮幸在全球千萬名開發者中被選為了全球亞馬遜云科技認證技術專家&#xff08;AWS Hero&#xff09;&#xff0c;是近10年來大陸地區僅有的第9名大陸專家。同時作為…

LeetCode 解題思路 12(Hot 100)

解題思路&#xff1a; 定義三個指針&#xff1a; prev&#xff08;前驅節點&#xff09;、current&#xff08;當前節點&#xff09;、nextNode&#xff08;臨時保存下一個節點&#xff09;遍歷鏈表&#xff1a; 每次將 current.next 指向 prev&#xff0c;移動指針直到 curre…

Ubuntu搭建最簡單WEB服務器

安裝apache2 sudo apt install apache2 檢查狀態 $ sudo systemctl status apache2 ● apache2.service - The Apache HTTP ServerLoaded: loaded (/lib/systemd/system/apache2.service; enabled; vendor prese>Active: active (running) since Thu 2025-03-06 09:51:10…

Linux 軟硬鏈接

目錄 軟硬鏈接 軟鏈接 硬鏈接 軟硬鏈接的區別 硬鏈接場景 軟連接場景 軟硬鏈接 軟鏈接 我們可以通過以下命令創建一個文件的軟連接 ln -s mytest softlink-mytest 通過 ls -i -l 命令我們可以看到&#xff0c;軟鏈接文件的inode號與源文件的inode號是不同的&#xff0c…

不同開發語言之for循環的用法、區別總結

一、Objective-C &#xff08;1&#xff09;標準的c風格 for (int i 0; i < 5; i) {NSLog("i %d", i); } &#xff08;2&#xff09;for in循環。 NSArray *array ["apple", "banana", "orange"]; for (NSString *fruit in …

計算機畢設-基于springboot的物業管理系統的設計與實現(附源碼+lw+ppt+開題報告)

博主介紹&#xff1a;?多個項目實戰經驗、多個大型網購商城開發經驗、在某機構指導學員上千名、專注于本行業領域? 技術范圍&#xff1a;Java實戰項目、Python實戰項目、微信小程序/安卓實戰項目、爬蟲大數據實戰項目、Nodejs實戰項目、PHP實戰項目、.NET實戰項目、Golang實戰…

景聯文科技:以精準數據標注賦能AI進化,構筑智能時代數據基石

在人工智能技術席卷全球的浪潮中&#xff0c;高質量數據已成為驅動AI模型進化的核心燃料。作為全球領先的AI數據服務解決方案提供商&#xff0c;景聯文科技深耕數據標注領域多年&#xff0c;以技術為基、以專業為本&#xff0c;致力于為全球客戶提供全場景、高精度、多模態的數…