深入探索Apache Flink:流處理的藝術與實踐

在當今的大數據時代,流處理已成為處理實時數據的關鍵技術。Apache Flink,作為一個開源的流處理框架,以其高吞吐量、低延遲和精確一次(exactly-once)的語義處理能力,在眾多流處理框架中脫穎而出。本文將深入探討如何使用Apache Flink進行流處理,并通過詳細的代碼示例幫助新手快速上手。

1. Apache Flink簡介

Apache Flink是一個分布式處理引擎,支持批處理和流處理。它提供了DataStream API和DataSet API,分別用于處理無界和有界數據集。Flink的核心優勢在于其能夠以事件時間(event-time)處理數據,確保即使在亂序或延遲數據的情況下,也能得到準確的結果。

2. 環境搭建

在開始編寫代碼之前,我們需要搭建Flink的開發環境。以下是步驟:

  1. 下載并安裝Flink

    wget https://archive.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz
    tar -xzf flink-1.14.3-bin-scala_2.12.tgz
    cd flink-1.14.3
    

  2. 啟動Flink集群

    ./bin/start-cluster.sh
    

  3. 驗證Flink集群: 打開瀏覽器,訪問http://localhost:8081,確保Flink的Web UI正常運行。

3. 第一個Flink流處理程序

我們將從一個簡單的WordCount程序開始,該程序從一個文本流中讀取數據,并計算每個單詞的出現次數。

3.1 創建Flink項目

使用Maven創建一個新的Flink項目:

mvn archetype:generate \-DarchetypeGroupId=org.apache.flink \-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.14.3

3.2 編寫WordCount程序

src/main/java目錄下創建一個新的Java類WordCount.java

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount {public static void main(String[] args) throws Exception {// 創建執行環境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 從Socket讀取數據DataStream<String> text = env.socketTextStream("localhost", 9999);// 進行單詞計數DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);// 打印結果counts.print();// 執行程序env.execute("Socket WordCount");}// 自定義FlatMapFunction,用于分割單詞public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// 分割單詞String[] words = value.toLowerCase().split("\\W+");for (String word : words) {if (word.length() > 0) {out.collect(new Tuple2<>(word, 1));}}}}
}

3.3 運行WordCount程序

  1. 啟動Socket服務器

    nc -lk 9999
    

  2. 運行Flink程序: 在IDE中運行WordCount類,或者使用Maven打包并提交到Flink集群:

    mvn clean package
    ./bin/flink run target/your-project-name-1.0-SNAPSHOT.jar
    

  3. 輸入數據: 在啟動的Socket服務器中輸入一些文本,例如:

    Hello World
    Hello Flink
    

  4. 查看結果: 在Flink的Web UI中查看輸出結果,或者在控制臺中查看打印的輸出。

4. 高級特性與實踐

4.1 事件時間與水印

Flink支持事件時間(event-time)處理,這意味著可以按照事件發生的時間進行處理,而不是數據到達的時間。為了處理亂序數據,Flink引入了水印(watermark)的概念。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.time.Duration;public class EventTimeWordCount {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.socketTextStream("localhost", 9999);DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, timestamp) -> event.f1)).keyBy(0).timeWindow(Time.seconds(10)).sum(1);counts.print();env.execute("EventTime WordCount");}public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {String[] words = value.toLowerCase().split("\\W+");for (String word : words) {if (word.length() > 0) {out.collect(new Tuple2<>(word, 1));}}}}
}

4.2 狀態管理與容錯

Flink提供了強大的狀態管理機制,可以輕松處理有狀態的計算。以下是一個簡單的例子,展示了如何使用Flink的狀態API。

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StatefulWordCount {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.socketTextStream("localhost", 9999);DataStream<Tuple2<String, Integer>> counts = text.flatMap(new StatefulTokenizer());counts.print();env.execute("Stateful WordCount");}public static class StatefulTokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {private transient ValueState<Integer> countState;@Overridepublic void open(Configuration config) {ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("wordCount", // 狀態名稱TypeInformation.of(Integer.class)); // 狀態類型countState = getRuntimeContext().getState(descriptor);}@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.toLowerCase().split("\\W+");for (String word : words) {if (word.length() > 0) {Integer currentCount = countState.value();if (currentCount == null) {currentCount = 0;}currentCount += 1;countState.update(currentCount);out.collect(new Tuple2<>(word, currentCount));}}}}
}

4.3 容錯與恢復

Flink通過檢查點(checkpoint)機制實現容錯。以下是一個簡單的例子,展示了如何啟用檢查點。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FaultTolerantWordCount {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 啟用檢查點env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);DataStream<String> text = env.socketTextStream("localhost", 9999);DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);counts.print();env.execute("Fault Tolerant WordCount");}public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {String[] words = value.toLowerCase().split("\\W+");for (String word : words) {if (word.length() > 0) {out.collect(new Tuple2<>(word, 1));}}}}
}

5. 總結

本文詳細介紹了如何使用Apache Flink進行流處理,并通過多個代碼示例展示了Flink的基本用法和高級特性。從簡單的WordCount程序到事件時間處理、狀態管理和容錯機制,Flink提供了豐富的功能來應對各種流處理場景。

通過深入學習和實踐,你將能夠更好地利用Flink處理實時數據,構建高效、可靠的流處理應用。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/44142.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/44142.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/44142.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

在樹莓派設備上導出系統鏡像

鏡像導出 前提條件&#xff1a; 已獲取可以正常使用的設備。已獲取鼠標、鍵盤和電源適配器。已將設備接入可正常使用的網絡。 操作步驟&#xff1a; 連接適配器給設備上電&#xff0c;正常啟動設備&#xff0c;連接鼠標和鍵盤。在終端命令窗格執行如下命令&#xff0c;安裝…

數據模型-ER圖在數據模型設計中的應用

ER圖在數據模型設計中的應用 1. ER圖概述&#xff1a;起源與發展? 實體-關系圖&#xff08;Entity Relationship Diagram&#xff0c;簡稱ER圖&#xff09;起源于1970年代&#xff0c;由Peter Chen首次提出&#xff0c;作為描述數據和信息間關系的圖形化語言。隨著數據庫技術…

[PM]流程與結構設計

流程圖 流程就是為了達到特定目標, 進行的一系列有邏輯性的操作步驟, 由兩個及已上的步驟, 完成一個完整的行為過程, 即可稱為流程, 流程圖就是對這個過程的圖形化展示 分類 業務流程圖 概念: 描述業務流程的一種圖, 通過特定符號和連線表示具體某個業務的處理步驟和過程作…

MyBatis與JDBC相比,有哪些優勢

MyBatis與JDBC&#xff08;Java Database Connectivity&#xff09;相比&#xff0c;在多個方面展現出顯著的優勢。這些優勢使得MyBatis在現代軟件開發中成為一個非常受歡迎的選擇&#xff0c;特別是在處理數據庫交互時。以下是MyBatis相比JDBC的主要優勢&#xff1a; 1. 簡化…

極狐GitLab亮相世界人工智能大會,開啟開源大模型賦能軟件研發新時代

GitLab 是一個全球知名的一體化 DevOps 平臺&#xff0c;很多人都通過私有化部署 GitLab 來進行源代碼托管。極狐GitLab &#xff1a;https://gitlab.cn/install?channelcontent&utm_sourcecsdn 是 GitLab 在中國的發行版&#xff0c;專門為中國程序員服務。可以一鍵式部署…

285個地級市-胡煥庸線數據

全國285個地級市-胡煥庸線數據.zip資源-CSDN文庫 胡煥庸線&#xff1a;中國人口與生態的分界線 胡煥庸線&#xff0c;一條在中國地理學界具有劃時代意義的分界線&#xff0c;由著名地理學家胡煥庸于1935年提出。這條線從黑龍江省的璦琿&#xff08;現黑河市&#xff09;延伸至…

json-server總結

Json-server 是一個專門用于模擬 RESTful API 的工具&#xff0c;它允許前端開發人員在不依賴后端 API 的情況下進行開發&#xff0c;通過本地搭建一個 JSON 服務來快速生成 REST API 風格的后端服務。 一、主要特點與功能 快速搭建&#xff1a;Json-server 使用 JSON 文件作…

HippoRAG如何從大腦獲取線索以改進LLM檢索

知識存儲和檢索正在成為大型語言模型(LLM)應用的重要組成部分。雖然檢索增強生成(RAG)在該領域取得了巨大進步&#xff0c;但一些局限性仍然沒有克服。 俄亥俄州立大學和斯坦福大學的研究團隊推出了HippoRAG&#xff0c;這是一種創新性的檢索框架&#xff0c;其設計理念源于人類…

數學建模美賽論文文檔

目錄 1. 摘要&#xff1a;1.1 閱讀并理解題目1.2 背景介紹1.3 問題提出 2. 目錄&#xff1a;2.1 引言&#xff08;Introduction&#xff09;2.2 假設與合理性說明&#xff08;Assumptions and Justifications&#xff09;2.3 符號說明&#xff08;Notations&#xff09;2.4 模型…

2.Date類型的請求參數

前端 <el-form-item label"結束日期" prop"endTime"><el-date-pickerv-model"dataForm.endTime"type"date"value-format"yyyy-MM-dd HH:mm:ss"placeholder"選擇日期"></el-date-picker></el…

線下線上游戲電競陪伴APP小程序H5同城線下約玩APP開發,語聊約玩平臺搭建游戲陪玩APP源碼

開發一款線下陪玩約玩APP的實際意義和在生活中的應用場景 1、滿足社交需求:現代社會人們的社交圈往往受到時間、地點和其他限制的影響。線下陪玩約玩APP可以提供一個平臺&#xff0c;讓用戶通過約玩的方式結識新朋友、擴大社交圈 2、解決孤獨感:有些人由于工作忙碌、居住環境單…

論文閱讀2-《Dynamic Multimodal Fusion》

摘要 &#xff08;DynMM&#xff09;&#xff0c;一種新的方法&#xff0c;自適應融合多模態數據和 d在推理過程中生成依賴于數據的前向路徑。為此&#xff0c;我們提出了一種門控功能來提供基于多模態特征和一個的模態級或融合級決策提高計算效率的源感知損失函數。 細節 模…

185240-00G 同軸連接器

型號簡介 185240-00G是Southwest Microwave的2.92 mm連接器。該連接器采用鈹銅合金、工具鋼和不銹鋼等優質材料&#xff0c;并經過金鍍層和鈍化處理&#xff0c;確保其穩定可靠&#xff0c;經久耐用。它還兼容歐盟 RoHS 和 WEEE 指令&#xff0c;是一位環保使者&#xff0c;致力…

AI繪畫Midjourney從入門到實戰應用

大家好&#xff0c;我是愛編程的喵喵。雙985碩士畢業&#xff0c;現擔任全棧工程師一職&#xff0c;熱衷于將數據思維應用到工作與生活中。從事機器學習以及相關的前后端開發工作。曾在阿里云、科大訊飛、CCF等比賽獲得多次Top名次。現為CSDN博客專家、人工智能領域優質創作者。…

概率論習題

泊松分布習題 假設你在醫院值班&#xff0c;每天需要安保人員出動的次數N~P(1),則關于任一天安保人員出動次數&#xff1a; A&#xff1a;出動一次的概率是多少 B&#xff1a;出動次數小于等于一次的概率為 C&#xff1a;出動次數小于一次的概率為 D&#xff1a;若隨機事件發生…

C# 裝飾器模式(Decorator Pattern)

裝飾器模式動態地給一個對象添加一些額外的職責。就增加功能來說&#xff0c;裝飾器模式相比生成子類更為靈活。 // 組件接口 public interface IComponent { void Operation(); } // 具體組件 public class ConcreteComponent : IComponent { public void Opera…

AI推薦系統落地的實現與應用

目錄 一、推薦系統的基礎二、推薦系統的設計與實現三、推薦系統落地的挑戰四、推薦系統的成功案例五、結語 AI推薦系統近年來在各個領域得到了廣泛應用&#xff0c;從電子商務到娛樂&#xff0c;再到個性化學習平臺。它們通過分析用戶行為、偏好和歷史數據&#xff0c;為用戶提…

【NOI-題解】1108 - 正整數N轉換成一個二進制數1290 - 二進制轉換十進制1386 - 小麗找半個回文數1405 - 小麗找潛在的素數?

文章目錄 一、前言二、問題問題&#xff1a;1108 - 正整數N轉換成一個二進制數問題&#xff1a;1290 - 二進制轉換十進制問題&#xff1a;1386 - 小麗找半個回文數問題&#xff1a;1405 - 小麗找潛在的素數&#xff1f; 三、感謝 一、前言 本章節主要對進制轉換的題目進行講解…

ubuntu下aarch64-linux-gnu(交叉編譯) gdb/gdbserver

ubuntu下aarch64-linux-gnu(交叉編譯) gdb/gdbserver gdb是一款開源的、強大的、跨平臺的程序調試工具。主要用于在程序運行時對程序進行控制和檢查&#xff0c;如設置斷點、單步執行、查看變量值、修改內存數據等&#xff0c;從而幫助開發者定位和修復代碼中的錯誤。 gdbserve…

密態計算,大模型商用數據瓶頸的新解法?

大數據產業創新服務媒體 ——聚焦數據 改變商業 大模型邁向產業的深度應用&#xff0c;首要挑戰是高質量數據供給和安全流通。正如在今年的世界人工智能大會上&#xff0c;產學研屆多位專家達成的共識是&#xff0c;數據決定了AI能力的上限。 在實踐中&#xff0c;行業大模型難…