DataStreamAPI實踐原理——快速上手

引入

通過編程模型,我們知道Flink的編程模型提供了多層級的抽象,越上層的API,其描述性和可閱讀性越強,越下層API,其靈活度高、表達力越強,多數時候上層API能做到的事情,下層API也能做到,反過來未必,不過這些API的底層模型是一致的,可以混合使用。

Flink架構可以處理批和流,Flink?批處理數據需要使用到Flink中的DataSet?API,此API主要是支持
Flink針對批數據進行操作,本質上Flink處理批數據也是看成一種特殊的流處理(有界流),所以沒有必要分成批和流兩套API,從Flink1.12版本往后,Dataset?API?已經標記為Legacy(已過時),已被官方軟棄用,官方建議使用Table?API?或者SQL?來處理批數據,我們也可以使用帶有Batch執行模式的DataStream?API來處理批數據(DataSet和DataStream?API做到了合并),而在后續Flink版本中DataSet?API?也被刪除。

DataStream?API的學習對于理解Flink數據處理流程非常重要,下面我們先從核心API層開始學習,通過基于DataStream?API?的編程實踐,去學習Flink編程方式,處理數據流程以及轉換處理。

現在我們先通過數據處理最經典的WordCount案例,來快速上手Flink的DataStream API開發。

代碼編寫流程

我們知道Flink編程模型主要有數據源、轉換操作和數據輸出三個部分,而實際開發編程的時候,則會多兩個部分:

  1. 初始化上下文環境(Environment)
    Environment是編寫Flink程序的基礎,不同層級API編程中創建的Environment環境不同,如:Dataset?編程中需要創建ExecutionEnvironment,DataStream編程中需要創建
    StreamExecutionEnvironment,在Table和SQL?API中需要創建TableExecutionEnvironment,使用不同語言編程導入的包也不同,在獲取到對應的Environment后我們還可以進行外參數的配置,例如:并行度、容錯機制設置等。
  2. 數據源(DataSource)<可以有多個>
    DataSource部分主要定義了數據接入功能,主要是將外部數據接入到Flink系統中并轉換成DataStream對象供后續的轉換使用。
  3. 轉換操作(Transformation)
    Transformation部分有各種各樣的算子操作可以對DataStream流進行轉換操作,最終將轉換結果數據通過DataSink寫出到外部存儲介質中,例如:文件、數據庫、Kafka消息系統等。
  4. 數據輸出(DataSink)
    經過一系列Transformation轉換操作后,最后一定要調用Sink操作,才會形成一個完整的DataFlow拓撲。只有調用了Sink操作,才會產生最終的計算結果,這些數據可以寫入到的文件、輸出到指定的網絡端口、消息中間件、外部的文件系統或者是打印到控制臺。
  5. 程序觸發(env.execute())
    在DataStream編程中編寫完成DataSink代碼后并不意味著程序結束,由于Flink是基于事件驅動處理的,有一條數據時就會進行處理,所以最后一定要使用Environment.execute()來觸發程序執行。

Flink數據類型

在 Apache Flink 中,為了能夠在分布式計算過程中對數據的類型進行管理和判斷,引入了 TypeInformation 類來對數據類型進行描述。TypeInformation 是 Flink 類型系統的基石,它允許 Flink 在編譯時推斷數據類型,從而為數據的序列化、反序列化、內存管理等操作提供必要的類型信息。以下是 Flink 中常見的數據類型及其對應的 TypeInformation 類型:

1. 基本數據類型

Flink 通過 BasicTypeInfo 支持 Java 的基本數據類型(如 int、double、boolean 等)以及它們的包裝類(如 Integer、Double、Boolean 等),還支持 String 類型。

2. 數組類型

對于數組類型,Flink 提供了 BasicArrayTypeInfo,支持如 int[]、String[] 等數組數據類型。

3. Tuple 類型

Tuple 是 Flink 中一種常用的數據類型,用于表示固定長度的字段集合。Flink 提供了 TupleTypeInfo 來支持 Tuple 類型的數據。

4. POJO 類型

POJO(Plain Old Java Object)類型是 Flink 中非常重要的數據類型,它允許使用普通的 Java 類來表示數據對象。為了使 Flink 能夠正確識別和處理 POJO 類型,需要滿足以下條件:

  • POJO 類必須是公共類(public)且不能是內部類。
  • POJO 類必須包含一個默認的無參構造函數。
  • POJO 類的所有字段必須是公共的,或者提供公共的 getter 和 setter 方法。

當滿足上述條件時,Flink 會自動識別 POJO 類型,并通過 PojoTypeInfo 來描述該類型。

5. Scala Case Class 類型

對于使用 Scala 編寫的 Flink 應用,Flink 提供了 CaseClassTypeInfo 來支持 Scala 的 Case Class 類型。Case Class 是 Scala 中一種特殊的類,通常用于表示不可變的數據對象,非常適合在 Flink 中作為數據類型使用。

在使用Java?API開發Flink應用時,通常情況下Flink都能正常進行數據類型推斷進而選擇合適的serializers以及comparators,但是在定義函數時如果使用到了泛型,JVM就會出現類型擦除的問題,Flink就獲取不到對應的類型信息,這就需要借助類型提示(Type?Hints)來告訴系統函數中傳入的參數類型信息和輸出類型,進而對數據類型進行推斷處理。

在使用Scala?API?開發Flink應用時,Scala?API通過使用Manifest和類標簽在編譯器運行時獲取類型信息,即使在函數定義中使用了泛型,也不會像Java?API出現類型擦除問題,但是在使用到Flink已經通過TypeInformation定義的數據類型時,TypeInformation類不會自動創建,需要使用隱式參數的方式引入:import?org.apache.flink.api.scala._,否則在運行代碼過程中會出現“could?not?find?implicit?value?for?evidence?parameter?of?type?TypeInformation”的錯誤。

Flink 序列化機制

在兩個進程進行遠程通信時,它們需要將各種類型的數據以二進制序列的形式在網絡上傳輸,數據發送方需要將對象轉換為字節序列,進行序列化,而接收方則將字節序列恢復為各種對象,進行反序列化。對象的序列化有兩個主要用途:

  • 一是將對象的字節序列永久保存到硬盤上,通常存放在文件中;
  • 二是在網絡上傳輸對象的字節序列。序列化的好處包括減少數據在內存和硬盤中的占用空間,減少網絡傳輸開銷,精確推算內存使用情況,降低垃圾回收的頻率。

序列化和反序列化是分布式計算框架中的關鍵環節,尤其是在節點之間需要進行數據傳輸時。Flink 的序列化機制負責將數據對象轉換為字節序列以便在網絡上傳輸或在磁盤上存儲,并能夠在需要時將字節序列恢復為原始對象。Flink 提供了多種序列化器,以滿足不同類型的數據序列化需求。高效的序列化和反序列化對于分布式計算框架至關重要,原因如下:

  • 減少數據傳輸開銷:通過將對象轉換為緊湊的字節序列,可以減少網絡傳輸的數據量,提高數據傳輸效率。

  • 降低內存占用:序列化后的數據通常占用更少的內存空間,有助于提高內存利用率,尤其是在處理大規模數據集時。

  • 支持數據持久化:序列化后的數據可以方便地寫入磁盤進行持久化存儲,便于后續的數據恢復和分析。

Flink序列化機制負責在節點之間傳輸數據時對數據對象進行序列化和反序列化,確保數據的正確性和一致性。Flink提供了多種序列化器,包括Kryo、Avro和Java序列化器等,大多數情況下,用戶不用擔心flink的序列化框架,Flink會通過TypeInfomation在數據處理之前推斷數據類型,進而使用對應的序列化器,例如:針對標準類型(int,double,long,string)直接由Flink自帶的序列化器處理,其他類型默認會交給Kryo處理。

但是對于Kryo仍然無法處理的類型,可以采取以下兩種解決方案:

1. 強制使用Avro替代Kryo序列化

//設置flink序列化方式為avro
env.getConfig().enableForceAvro();

2. 自定義注冊Kryo序列化

//注冊kryo?自定義序列化器
env.getConfig().registerTypeWithKryoSerializer(Class<?>?type,?Class<??extends?Serializer>?serializerClass)

單詞統計案例

下面我們通過一個單詞統計的案例,快速上手應用Flink,進行流處理。

引入依賴

    <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><flink.version>1.16.0</flink.version><slf4j.version>1.7.31</slf4j.version><log4j.version>2.17.1</log4j.version><scala.version>2.12.10</scala.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><!-- Flink批和流開發依賴包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Scala包 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency><!-- slf4j&log4j 日志相關包 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>${log4j.version}</version></dependency></dependencies>

代碼實現

/*** WordCount 類實現了一個簡單的 Flink 流式處理程序,用于統計輸入文本文件中每個單詞的出現次數。*/
public class WordCount {/*** 程序的主入口方法,負責創建 Flink 流式處理環境,讀取輸入文件,進行單詞計數,并輸出結果。* * @param args 命令行參數,在本程序中未使用。* @throws Exception 當執行 Flink 任務時可能拋出異常。*/public static void main(String[] args) throws Exception {// 1. 創建流式處理環境,用于配置和執行 Flink 流式計算任務StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 從指定的文本文件中讀取數據,返回一個 DataStreamSource 對象,其中每個元素是文件中的一行文本DataStreamSource<String> lines = env.readTextFile("./data/words.txt");// 3. 對讀取的每行文本進行處理,將其切分為單詞,并轉換為 <單詞, 1> 的鍵值對形式// flatMap 方法用于將每行文本拆分為多個單詞,并為每個單詞生成一個鍵值對// returns 方法用于指定 flatMap 操作返回的數據類型SingleOutputStreamOperator<Tuple2<String, Long>> kvWordsDS =lines.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {// 將每行文本按空格分割成單詞數組String[] words = line.split(" ");// 遍歷單詞數組,為每個單詞生成一個 <單詞, 1> 的鍵值對,并收集到 Collector 中for (String word : words) {collector.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 對鍵值對數據進行分組統計,按照單詞(鍵)進行分組,對值(出現次數)進行求和// keyBy 方法用于按照指定的鍵對數據進行分組// sum 方法用于對分組后的數據的指定字段進行求和操作// print 方法用于將統計結果輸出到控制臺kvWordsDS.keyBy(tp -> tp.f0).sum(1).print();// 5. 在流式計算中,需要調用 execute 方法來觸發任務的執行// 該方法會阻塞當前線程,直到任務執行完成或被中斷env.execute();}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/77510.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/77510.shtml
英文地址,請注明出處:http://en.pswp.cn/web/77510.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

WPF 圖片文本按鈕 自定義按鈕

效果 上面圖片,下面文本 樣式 <!-- 圖片文本按鈕樣式 --> <Style x:Key="ImageTextButtonStyle" TargetType="Button"><Setter Property="Background" Value="Transparent"/><Setter Property="BorderTh…

驅動開發硬核特訓 · Day 22(上篇): 電源管理體系完整梳理:I2C、Regulator、PMIC與Power-Domain框架

&#x1f4d8; 一、電源子系統總覽 在現代Linux內核中&#xff0c;電源管理不僅是系統穩定性的保障&#xff0c;也是實現高效能與低功耗運行的核心機制。 系統中涉及電源管理的關鍵子系統包括&#xff1a; I2C子系統&#xff1a;硬件通信基礎Regulator子系統&#xff1a;電源…

設計模式全解析:23種經典設計模式及其應用

創建型模式 1. 單例模式&#xff08;Singleton Pattern&#xff09; 核心思想&#xff1a;確保一個類只有一個實例&#xff0c;并提供一個全局訪問點。適用場景&#xff1a;需要共享資源的場景&#xff0c;如配置管理、日志記錄等。 public class Singleton {// 靜態變量保存…

力扣熱題100題解(c++)—矩陣

73.矩陣置零 給定一個 m x n 的矩陣&#xff0c;如果一個元素為 0 &#xff0c;則將其所在行和列的所有元素都設為 0 。請使用 原地 算法。 int m matrix.size(); // 行數int n matrix[0].size(); // 列數bool firstRowZero false; // 標記第一行是否包含 0bool f…

本地部署DeepSeek-R1(Dify升級最新版本、新增插件功能、過濾推理思考過程)

下載最新版本Dify Dify1.0版本之前不支持插件功能&#xff0c;先升級DIfy 下載最新版本&#xff0c;目前1.0.1 Git地址&#xff1a;https://github.com/langgenius/dify/releases/tag/1.0.1 我這里下載到老版本同一個目錄并解壓 拷貝老數據 需先停用老版本Dify PS D:\D…

PostSwigger Web 安全學習:CSRF漏洞3

CSRF 漏洞學習網站&#xff1a;What is CSRF (Cross-site request forgery)? Tutorial & Examples | Web Security Academy CSRF Token 基本原理 CSRF Token 是服務端生成的唯一、隨機且不可預測的字符串&#xff0c;用于驗證客戶端合法校驗。 作用&#xff1a;防止攻擊…

用 Nodemon 解決 npm run serve 頻繁重啟服務

Nodemon 是一個基于 Node.js 構建的開發工具&#xff0c;專為幫助開發者自動監控項目文件的更改而設計。每當文件發生變更時&#xff0c;Nodemon 會自動重啟 Node.js 服務器&#xff0c;無需手動停止并重啟。這對于提升開發速度、減少人工操作非常有幫助&#xff0c;尤其適用于…

django admin 中更新表數據 之后再將數據返回管理界面

在Django中&#xff0c;更新數據庫中的數據并將其重新顯示在Django Admin界面上通常涉及到幾個步驟。這里我將詳細說明如何在Django Admin中更新表數據&#xff0c;并確保更新后的數據能夠立即在管理界面上顯示。 定義模型 首先&#xff0c;確保你的模型&#xff08;Model&…

真.從“零”搞 VSCode+STM32CubeMx+C <1>構建

目錄 前言 準備工作 創建STM32CubeMx項目 VSCode導入項目&配置 構建錯誤調試 后記 前言 去年10月開始接觸單片機&#xff0c;一直在用樹莓派的Pico&#xff0c;之前一直用Micropython&#xff0c;玩的不亦樂乎&#xff0c;試錯階段優勢明顯&#xff0c;很快就能鼓搗一…

C語言學習之結構體

在C語言中&#xff0c;我們已經學了好幾種類型的數據。比如整型int、char、short等&#xff0c;浮點型double、float等。但是這些都是基本數據類型&#xff0c;而這些數據類型應用在實際編程里顯然是不夠用的。比如我們沒有辦法用一旦數據類型來定義一個”人“的屬性。因此這里…

架構-計算機系統基礎

計算機系統基礎 一、計算機系統組成 &#xff08;一&#xff09;計算機系統層次結構 硬件組成 主機&#xff1a;包含CPU&#xff08;運算器控制器&#xff09;、主存儲器&#xff08;內存&#xff09;。外設&#xff1a;輸入設備、輸出設備、輔助存儲器&#xff08;外存&…

【計算機網絡性能優化】從基礎理論到實戰調優

目錄 前言技術背景與價值當前技術痛點解決方案概述目標讀者說明 一、技術原理剖析核心概念圖解核心作用講解關鍵技術模塊說明技術選型對比 二、實戰演示環境配置要求核心代碼實現案例1&#xff1a;iPerf3帶寬測試案例2&#xff1a;TCP窗口優化案例3&#xff1a;QoS流量整形 運行…

Python 自動化辦公:Excel 數據處理的“秘密武器”

引言 在日常的 IT 辦公場景里&#xff0c;Excel 是數據處理與分析的 “常勝將軍”。無論是財務人員整理賬目、銷售團隊統計業績&#xff0c;還是運營人員分析用戶數據&#xff0c;Excel 都發揮著關鍵作用。但面對海量數據&#xff0c;手動操作 Excel 不僅效率低下&#xff0c;還…

緩存集群技術深度解析:從原理到實戰

緩存集群技術深度解析&#xff1a;從原理到實戰 一、緩存集群核心定位與架構選型 1. 集群模式核心價值 緩存集群通過數據分片、高可用保障、水平擴展解決單節點瓶頸&#xff0c;核心能力包括&#xff1a; 數據分片&#xff1a;將數據分散到多個節點&#xff0c;突破單節點內…

CSDN編輯文章時如何自動生成目錄

如何自動生成目錄 前置條件1. 插入目錄標識符2. 編寫標題層級 前置條件 需要使用markdown編輯&#xff0c;并且只有按照markdown語法編寫不同的標題級別&#xff0c;才能使用這個方法自動生成對應的目錄。 1. 插入目錄標識符 在文章的頂部添加以下代碼&#xff1a; [TOC](文…

產品經理對于電商接口的梳理||電商接口文檔梳理與接入

接口梳理7個注意點總結 ①注意要測試環境和生產環境。生產上線時候要提醒研發換到生產環境調用。 ②注意必輸字段和選輸字段&#xff0c;要傳入字段的含義和校驗。枚舉值不清楚含義的要詢問對方含義&#xff0c;比如說單據類型字段枚舉值是B2C發貨單&#xff0c;BBC發貨單&am…

更快的圖像局部修改與可控生成:Flex.2-preview

Flex.2-preview 文本生成圖像擴散模型介紹 一、模型簡介 Flex.2-preview 是一種 開源的 80 億參數文本生成圖像擴散模型&#xff0c;具備通用控制和修復支持功能&#xff0c;是 Flex.1alpha 的下一代版本。該模型由社區開發并為社區服務&#xff0c;采用 Apache 2.0 許可證&a…

【Castle-X機器人】一、模塊安裝與調試:機器人底盤

持續更新。。。。。。。。。。。。。。。 【ROS機器人】模塊安裝 一、Castle-X機器人底盤1.1 結構概述1.2 驅動執行結構1.3 環境傳感器1.4 電氣系統1.5 Castle-x機器人底盤測試激光雷達傳感器測試及數據可視化超聲波傳感器實時數據獲取防跌落傳感器測試陀螺儀測試鍵盤控制測試…

條件、列表渲染.

#### v-for 1. 渲染列表 vue <template> <ul v-for"(item,index) in list" > <li>{{ item }}</li> </ul> </template> <script setup> import { ref } from vue; let list ref([蘋果, 香蕉, 橙子]) </script>…

node20的安裝和vue的入門準備

一、node20的安裝 直接下載路徑&#xff1a;https://nodejs.org/download/release/v20.11.0/node-v20.11.0-x64.msi 安裝&#xff0c;雙擊msi文件 點擊同意協議 更改下載路徑 什么也不用選&#xff0c;點擊next進行下一步 什么也不用選&#xff0c;點擊next進行下一步 點擊安…