引入
通過編程模型,我們知道Flink的編程模型提供了多層級的抽象,越上層的API,其描述性和可閱讀性越強,越下層API,其靈活度高、表達力越強,多數時候上層API能做到的事情,下層API也能做到,反過來未必,不過這些API的底層模型是一致的,可以混合使用。
Flink架構可以處理批和流,Flink?批處理數據需要使用到Flink中的DataSet?API,此API主要是支持
Flink針對批數據進行操作,本質上Flink處理批數據也是看成一種特殊的流處理(有界流),所以沒有必要分成批和流兩套API,從Flink1.12版本往后,Dataset?API?已經標記為Legacy(已過時),已被官方軟棄用,官方建議使用Table?API?或者SQL?來處理批數據,我們也可以使用帶有Batch執行模式的DataStream?API來處理批數據(DataSet和DataStream?API做到了合并),而在后續Flink版本中DataSet?API?也被刪除。
DataStream?API的學習對于理解Flink數據處理流程非常重要,下面我們先從核心API層開始學習,通過基于DataStream?API?的編程實踐,去學習Flink編程方式,處理數據流程以及轉換處理。
現在我們先通過數據處理最經典的WordCount案例,來快速上手Flink的DataStream API開發。
代碼編寫流程
我們知道Flink編程模型主要有數據源、轉換操作和數據輸出三個部分,而實際開發編程的時候,則會多兩個部分:
- 初始化上下文環境(Environment)
Environment是編寫Flink程序的基礎,不同層級API編程中創建的Environment環境不同,如:Dataset?編程中需要創建ExecutionEnvironment,DataStream編程中需要創建
StreamExecutionEnvironment,在Table和SQL?API中需要創建TableExecutionEnvironment,使用不同語言編程導入的包也不同,在獲取到對應的Environment后我們還可以進行外參數的配置,例如:并行度、容錯機制設置等。 - 數據源(DataSource)<可以有多個>
DataSource部分主要定義了數據接入功能,主要是將外部數據接入到Flink系統中并轉換成DataStream對象供后續的轉換使用。 - 轉換操作(Transformation)
Transformation部分有各種各樣的算子操作可以對DataStream流進行轉換操作,最終將轉換結果數據通過DataSink寫出到外部存儲介質中,例如:文件、數據庫、Kafka消息系統等。 - 數據輸出(DataSink)
經過一系列Transformation轉換操作后,最后一定要調用Sink操作,才會形成一個完整的DataFlow拓撲。只有調用了Sink操作,才會產生最終的計算結果,這些數據可以寫入到的文件、輸出到指定的網絡端口、消息中間件、外部的文件系統或者是打印到控制臺。 - 程序觸發(env.execute())
在DataStream編程中編寫完成DataSink代碼后并不意味著程序結束,由于Flink是基于事件驅動處理的,有一條數據時就會進行處理,所以最后一定要使用Environment.execute()來觸發程序執行。
Flink數據類型
在 Apache Flink 中,為了能夠在分布式計算過程中對數據的類型進行管理和判斷,引入了 TypeInformation 類來對數據類型進行描述。TypeInformation 是 Flink 類型系統的基石,它允許 Flink 在編譯時推斷數據類型,從而為數據的序列化、反序列化、內存管理等操作提供必要的類型信息。以下是 Flink 中常見的數據類型及其對應的 TypeInformation 類型:
1. 基本數據類型
Flink 通過 BasicTypeInfo 支持 Java 的基本數據類型(如 int、double、boolean 等)以及它們的包裝類(如 Integer、Double、Boolean 等),還支持 String 類型。
2. 數組類型
對于數組類型,Flink 提供了 BasicArrayTypeInfo,支持如 int[]、String[] 等數組數據類型。
3. Tuple 類型
Tuple 是 Flink 中一種常用的數據類型,用于表示固定長度的字段集合。Flink 提供了 TupleTypeInfo 來支持 Tuple 類型的數據。
4. POJO 類型
POJO(Plain Old Java Object)類型是 Flink 中非常重要的數據類型,它允許使用普通的 Java 類來表示數據對象。為了使 Flink 能夠正確識別和處理 POJO 類型,需要滿足以下條件:
- POJO 類必須是公共類(public)且不能是內部類。
- POJO 類必須包含一個默認的無參構造函數。
- POJO 類的所有字段必須是公共的,或者提供公共的 getter 和 setter 方法。
當滿足上述條件時,Flink 會自動識別 POJO 類型,并通過 PojoTypeInfo 來描述該類型。
5. Scala Case Class 類型
對于使用 Scala 編寫的 Flink 應用,Flink 提供了 CaseClassTypeInfo 來支持 Scala 的 Case Class 類型。Case Class 是 Scala 中一種特殊的類,通常用于表示不可變的數據對象,非常適合在 Flink 中作為數據類型使用。
在使用Java?API開發Flink應用時,通常情況下Flink都能正常進行數據類型推斷進而選擇合適的serializers以及comparators,但是在定義函數時如果使用到了泛型,JVM就會出現類型擦除的問題,Flink就獲取不到對應的類型信息,這就需要借助類型提示(Type?Hints)來告訴系統函數中傳入的參數類型信息和輸出類型,進而對數據類型進行推斷處理。
在使用Scala?API?開發Flink應用時,Scala?API通過使用Manifest和類標簽在編譯器運行時獲取類型信息,即使在函數定義中使用了泛型,也不會像Java?API出現類型擦除問題,但是在使用到Flink已經通過TypeInformation定義的數據類型時,TypeInformation類不會自動創建,需要使用隱式參數的方式引入:import?org.apache.flink.api.scala._,否則在運行代碼過程中會出現“could?not?find?implicit?value?for?evidence?parameter?of?type?TypeInformation”的錯誤。
Flink 序列化機制
在兩個進程進行遠程通信時,它們需要將各種類型的數據以二進制序列的形式在網絡上傳輸,數據發送方需要將對象轉換為字節序列,進行序列化,而接收方則將字節序列恢復為各種對象,進行反序列化。對象的序列化有兩個主要用途:
- 一是將對象的字節序列永久保存到硬盤上,通常存放在文件中;
- 二是在網絡上傳輸對象的字節序列。序列化的好處包括減少數據在內存和硬盤中的占用空間,減少網絡傳輸開銷,精確推算內存使用情況,降低垃圾回收的頻率。
序列化和反序列化是分布式計算框架中的關鍵環節,尤其是在節點之間需要進行數據傳輸時。Flink 的序列化機制負責將數據對象轉換為字節序列以便在網絡上傳輸或在磁盤上存儲,并能夠在需要時將字節序列恢復為原始對象。Flink 提供了多種序列化器,以滿足不同類型的數據序列化需求。高效的序列化和反序列化對于分布式計算框架至關重要,原因如下:
減少數據傳輸開銷:通過將對象轉換為緊湊的字節序列,可以減少網絡傳輸的數據量,提高數據傳輸效率。
降低內存占用:序列化后的數據通常占用更少的內存空間,有助于提高內存利用率,尤其是在處理大規模數據集時。
支持數據持久化:序列化后的數據可以方便地寫入磁盤進行持久化存儲,便于后續的數據恢復和分析。
Flink序列化機制負責在節點之間傳輸數據時對數據對象進行序列化和反序列化,確保數據的正確性和一致性。Flink提供了多種序列化器,包括Kryo、Avro和Java序列化器等,大多數情況下,用戶不用擔心flink的序列化框架,Flink會通過TypeInfomation在數據處理之前推斷數據類型,進而使用對應的序列化器,例如:針對標準類型(int,double,long,string)直接由Flink自帶的序列化器處理,其他類型默認會交給Kryo處理。
但是對于Kryo仍然無法處理的類型,可以采取以下兩種解決方案:
1. 強制使用Avro替代Kryo序列化
//設置flink序列化方式為avro
env.getConfig().enableForceAvro();
2. 自定義注冊Kryo序列化
//注冊kryo?自定義序列化器
env.getConfig().registerTypeWithKryoSerializer(Class<?>?type,?Class<??extends?Serializer>?serializerClass)
單詞統計案例
下面我們通過一個單詞統計的案例,快速上手應用Flink,進行流處理。
引入依賴
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><flink.version>1.16.0</flink.version><slf4j.version>1.7.31</slf4j.version><log4j.version>2.17.1</log4j.version><scala.version>2.12.10</scala.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><!-- Flink批和流開發依賴包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Scala包 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency><!-- slf4j&log4j 日志相關包 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>${log4j.version}</version></dependency></dependencies>
代碼實現
/*** WordCount 類實現了一個簡單的 Flink 流式處理程序,用于統計輸入文本文件中每個單詞的出現次數。*/
public class WordCount {/*** 程序的主入口方法,負責創建 Flink 流式處理環境,讀取輸入文件,進行單詞計數,并輸出結果。* * @param args 命令行參數,在本程序中未使用。* @throws Exception 當執行 Flink 任務時可能拋出異常。*/public static void main(String[] args) throws Exception {// 1. 創建流式處理環境,用于配置和執行 Flink 流式計算任務StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 從指定的文本文件中讀取數據,返回一個 DataStreamSource 對象,其中每個元素是文件中的一行文本DataStreamSource<String> lines = env.readTextFile("./data/words.txt");// 3. 對讀取的每行文本進行處理,將其切分為單詞,并轉換為 <單詞, 1> 的鍵值對形式// flatMap 方法用于將每行文本拆分為多個單詞,并為每個單詞生成一個鍵值對// returns 方法用于指定 flatMap 操作返回的數據類型SingleOutputStreamOperator<Tuple2<String, Long>> kvWordsDS =lines.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {// 將每行文本按空格分割成單詞數組String[] words = line.split(" ");// 遍歷單詞數組,為每個單詞生成一個 <單詞, 1> 的鍵值對,并收集到 Collector 中for (String word : words) {collector.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 對鍵值對數據進行分組統計,按照單詞(鍵)進行分組,對值(出現次數)進行求和// keyBy 方法用于按照指定的鍵對數據進行分組// sum 方法用于對分組后的數據的指定字段進行求和操作// print 方法用于將統計結果輸出到控制臺kvWordsDS.keyBy(tp -> tp.f0).sum(1).print();// 5. 在流式計算中,需要調用 execute 方法來觸發任務的執行// 該方法會阻塞當前線程,直到任務執行完成或被中斷env.execute();}
}