【Flink】DataStream API:源算子、數據類型

目錄

  • 源算子(Source)
    • 從集合中讀取數據
    • 從文件讀取數據
    • 從Socket讀取數據
    • 從Kafka讀取數據
    • 從數據生成器讀取數據
  • Flink支持的數據類型
      • Flink的類型系統
      • Flink支持的數據類型
      • 類型提示(Type Hints)

源算子(Source)

? Flink可以從各種來源獲取數據,然后構建DataStream進行轉換處理。一般將數據的輸入來源稱為數據源(data source),而讀取數據的算子就是源算子(source operator)。所以,source就是我們整個處理程序的輸入端。

在這里插入圖片描述

? 在Flink1.12以前,舊的添加source的方式,是調用執行環境的addSource()方法:
DataStream stream = env.addSource(…);
方法傳入的參數是一個“源函數”(source function),需要實現SourceFunction接口。
? 從Flink1.12開始,主要使用流批統一的新Source架構:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多預實現的接口,此外還有很多外部連接工具也幫我們實現了對應的Source,通常情況下足以應對我們的實際需求。

從集合中讀取數據

? 最簡單的讀取數據的方式,就是在代碼中直接創建一個Java集合,然后調用執行環境的fromCollection方法進行讀取。這相當于將數據臨時存儲到內存中,形成特殊的數據結構后,作為數據源使用,一般用于測試。

在這里插入圖片描述

package env;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class CollectionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 從集合讀
//        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 22, 3));// 直接填寫元素DataStreamSource<Integer> source = env.fromElements(1, 22, 3);source.print();env.execute();}
}

從文件讀取數據

? 真正的實際應用中,自然不會直接將數據寫在代碼中。通常情況下,我們會從存儲介質中獲取數據,一個比較常見的方式就是讀取日志文件。這也是批處理中最常見的讀取方式。
讀取文件,需要添加文件連接器依賴:

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version>
</dependency>

在這里插入圖片描述

package source;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FileSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);FileSource.FileSourceBuilder<String> builder = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/word.txt"));FileSource<String> fileSource = builder.build();env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"filesource").print();env.execute();}
}

說明:

  • 參數可以是目錄,也可以是文件;還可以從HDFS目錄下讀取,使用路徑hdfs://…;
  • 路徑可以是相對路徑,也可以是絕對路徑;
  • 相對路徑是從系統屬性user.dir獲取路徑:idea下是project的根目錄,standalone模式下是集群節點根目錄;

從Socket讀取數據

? 不論從集合還是文件,我們讀取的其實都是有界數據。在流處理的場景中,數據往往是無界的。
我們之前用到的讀取socket文本流,就是流處理場景。但是這種方式由于吞吐量小、穩定性較差,一般也是用于測試。

DataStream<String> stream = env.socketTextStream("localhost", 7777);

從Kafka讀取數據

? Flink官方提供了連接工具flink-connector-kafka,直接幫我們實現了一個消費者FlinkKafkaConsumer,它就是用來讀取Kafka數據的SourceFunction。

? 所以想要以Kafka作為數據源獲取數據,我們只需要引入Kafka連接器的依賴。Flink官方提供的是一個通用的Kafka連接器,它會自動跟蹤最新版本的Kafka客戶端。目前最新版本只支持0.10.0版本以上的Kafka。這里我們需要導入的依賴如下。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version>
</dependency>

在這里插入圖片描述

package source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KafkaSourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("master:9092,slave1:9092,slave2:9092").setGroupId("kafkasource").setTopics("topic1").setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"").print();env.execute();}
}

Kafka生產數據

在這里插入圖片描述
Flink消費數據

在這里插入圖片描述

從數據生成器讀取數據

? Flink從1.11開始提供了一個內置的DataGen 連接器,主要是用于生成一些隨機數,用于在沒有數據源的時候,進行流任務的測試以及性能測試等。1.17提供了新的Source寫法,需要導入依賴:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version>
</dependency>

在這里插入圖片描述

package source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/*** 數據生成器Source,四個參數*  第一個:GeneratorFunction接口,需要實現,重寫map方法,輸入類型固定是Long*  第二個,Long類型,自動生成的數字序列(從0自增)的最大值(小于),達到這個值就停止了*  第三個,限速策略,比如 每秒生成幾條數據*  第四個:返回的類型*/DataGeneratorSource<String> source = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) {return "Number:" + value;}}, Long.MAX_VALUE, RateLimiterStrategy.perSecond(100), Types.STRING);env.fromSource(source,WatermarkStrategy.noWatermarks(),"").print();env.execute();}
}

Flink支持的數據類型

Flink的類型系統

Flink使用“類型信息”(TypeInformation)來統一表示數據類型。TypeInformation類是Flink中所有類型描述符的基類。它涵蓋了類型的一些基本屬性,并為每個數據類型生成特定的序列化器、反序列化器和比較器。

Flink支持的數據類型

對于常見的Java和Scala數據類型,Flink都是支持的。Flink在內部,Flink對支持不同的類型進行了劃分,這些類型可以在Types工具類中找到:

(1)基本類型
所有Java基本類型及其包裝類,再加上Void、String、Date、BigDecimal和BigInteger。

(2)數組類型
包括基本類型數組(PRIMITIVE_ARRAY)和對象數組(OBJECT_ARRAY)。

(3)復合數據類型

  • Java元組類型(TUPLE):這是Flink內置的元組類型,是Java
    API的一部分。最多25個字段,也就是從Tuple0~Tuple25,不支持空字段。
  • Scala 樣例類及Scala元組:不支持空字段。
  • 行類型(ROW):可以認為是具有任意個字段的元組,并支持空字段。
  • POJO:Flink自定義的類似于Java bean模式的類。

(4)輔助類型
Option、Either、List、Map等。

(5)泛型類型(GENERIC)

Flink支持所有的Java類和Scala類。不過如果沒有按照上面POJO類型的要求來定義,就會被Flink當作泛型類來處理。Flink會把泛型類型當作黑盒,無法獲取它們內部的屬性;它們也不是由Flink本身序列化的,而是由Kryo序列化的。
在這些類型中,元組類型和POJO類型最為靈活,因為它們支持創建復雜類型。而相比之下,POJO還支持在鍵(key)的定義中直接使用字段名,這會讓我們的代碼可讀性大大增加。所以,在項目實踐中,往往會將流處理程序中的元素類型定為Flink的POJO類型。
Flink對POJO類型的要求如下:

  1. 類是公有(public)的
  2. 有一個無參的構造方法
  3. 所有屬性都是公有(public)的
  4. 所有屬性的類型都是可以序列化的

類型提示(Type Hints)

Flink還具有一個類型提取系統,可以分析函數的輸入和返回類型,自動獲取類型信息,從而獲得對應的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情況下(比如Lambda表達式中),自動提取的信息是不夠精細的——只告訴Flink當前的元素由“船頭、船身、船尾”構成,根本無法重建出“大船”的模樣;這時就需要顯式地提供類型信息,才能使應用程序正常工作或提高其性能。

為了解決這類問題,Java API提供了專門的“類型提示”(type hints)。
回憶一下之前的word count流處理程序,我們在將String類型的每個詞轉換成(word, count)二元組后,就明確地用returns指定了返回的類型。因為對于map里傳入的Lambda表達式,系統只能推斷出返回的是Tuple2類型,而無法得到Tuple2<String, Long>。只有顯式地告訴系統當前的返回類型,才能正確地解析出完整數據。

.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));

Flink還專門提供了TypeHint類,它可以捕獲泛型的類型信息,并且一直記錄下來,為運行時提供足夠的信息。我們同樣可以通過.returns()方法,明確地指定轉換之后的DataStream里元素的類型。

returns(new TypeHint<Tuple2<Integer, SomeType>>(){})

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/97882.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/97882.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/97882.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Linux 安裝docker-compose安裝方法(安裝docker compose安裝)

文章目錄**方法一&#xff1a;通過 curl 下載二進制文件&#xff08;推薦&#xff09;**1. 安裝前準備- **確保已安裝 Docker**- **檢查 Docker 是否安裝成功**2. 下載并安裝 Docker Compose- **下載最新版本的 Docker Compose 二進制文件**- **國內加速下載&#xff08;解決 G…

OCR 發票識別與驗真接口:助力電子化發票新時代

自 2025 年 10 月 1 日起&#xff0c;紙質火車票徹底告別歷史舞臺&#xff0c;全面數字化的電子發票取而代之&#xff0c;這一變革標志著票務領域的重大革新&#xff0c;也讓電子化發票處理的需求呈井噴式增長。在此背景下&#xff0c;OCR 發票識別和發票驗真接口技術挺身而出&…

設計模式:抽象工廠模式(Abstract Factory Pattern)

文章目錄一、概念二、實例分析三、完整示例一、概念 抽象工廠模式是一種創建型設計模式。 提供一個接口用于創建一系列相關或相互依賴的對象&#xff0c;而無需指定它們的具體類。 相比于工廠方法模式&#xff0c;抽象工廠模式不僅僅是創建單一產品&#xff0c;而是一族產品&am…

輕量級注意力模型HOTSPOT-YOLO:無人機光伏熱異常檢測新SOTA,mAP高達90.8%

【導讀】 無人機光伏巡檢如何更智能、更高效&#xff1f;HOTSPOT-YOLO模型給出了亮眼答案&#xff01;給AI裝上“熱成像鷹眼”&#xff0c;能精準鎖定光伏板上的細微熱斑缺陷。它不僅將檢測精度&#xff08;mAP&#xff09;提升至90.8%&#xff0c;更在保持實時性的前提下大幅…

CHT共軛傳熱: 導熱系數差異如何影響矩陣系數

文章目錄 一、導熱系數差異如何影響矩陣系數&#xff1f;二、如何處理系數差異以加速收斂&#xff1f;1. **變量重縮放&#xff08;Scaling of Variables&#xff09;**2. **使用物理型預條件子&#xff08;Physics-based Preconditioning&#xff09;**3. **區域分解法&#x…

Vue Vapor 事件機制深潛:從設計動機到源碼解析

基于 vue3.6&#xff08;alpha 階段&#xff09;及 Vapor 的最新進展撰寫&#xff1b;Vapor 仍在演進中&#xff0c;部分實現可能繼續優化。TL;DR&#xff08;速覽&#xff09; 傳統&#xff08;≤3.5&#xff09;&#xff1a;事件以元素為中心綁定&#xff1b;每個元素用 el._…

Day 01(01): Hadoop與大數據基石

目標&#xff1a;建立對大數據生態的整體認知&#xff0c;理解HDFS和MapReduce的核心思想。 8:00-9:30&#xff1a;【視頻學習】在B站搜索“Hadoop入門”或“三小時入門大數據”&#xff0c;觀看1-2個高播放量的簡介視頻&#xff0c;了解大數據面臨的問題和Hadoop的解決方案。 …

開源 + 免費!谷歌推出 Gemini CLI,Claude Code 的強勁對手

在如今飛速發展的 AI 工具生態中&#xff0c;命令行界面&#xff08;CLI&#xff09;這一開發者與計算機交互的傳統方式&#xff0c;正悄然發生著一場顛覆性的變革。2025 年 6 月 25 日&#xff0c;谷歌正式發布開源的 Gemini CLI&#xff0c;這一舉措標志著谷歌 Gemini AI 能力…

MacOS - 記錄MacOS發燙的好幾天 - 幕后黑手竟然是

MacOS - 記錄MacOS發燙的好幾天 - 幕后黑手竟然是 Mac是不可能出bug的&#xff0c;一定是世界出bug了。 前言 幾天前Mac突然開始燙燙的&#xff0c;就這么一燙燙了好幾天。這可不行&#xff0c;所以看了下“活動監視器”&#xff0c;發現了一個Code Helper(Plugin)占據200%上下…

Vue基礎知識-Vue中:class與:style動態綁定樣式

完整源碼<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><script src&quo…

終于趕在考試券過期前把Oracle OCP證書考下來了!

&#x1f6a9; 今天終于能松口氣了——Oracle OCP證書到手&#xff01; 差點白白浪費一次考試機會&#xff08;1700&#xff09;&#xff01;3月底報名了Oracle OCP&#xff0c;擺爛了大半年&#xff0c;終于是逼著自己在考試券過期前考完了082和083科目&#xff0c;目前已經順…

Power BI學習筆記-周報銷售數據分析

Power BI學習筆記-周報銷售數據分析 簡介 來自B站的Power BI學習視頻的學習筆記。 記錄來自B站的Power BI教學視頻&#xff0c;由“高級財務BP-Ni”發布&#xff0c;視頻發布者主要發布財務類相關的PBI視頻&#xff0c;視頻長度30分鐘左右。 視頻鏈接&#xff1a; 【powerbi周報…

Oracle 數據庫與操作系統兼容性指南

前言 作為一個在 Oracle 坑里摸爬滾打多年的老 DBA&#xff0c;最怕聽到的就是"這個版本能不能裝在這個系統上&#xff1f;"這種問題。昨天又有朋友來問我 Oracle 數據庫和操作系統的兼容性&#xff0c;索性把這些年積累的官方兼容性列表整理出來&#xff0c;省得大家…

pytorch初級

本文章是本人通過讀《Pytorch實用教程》第二版做的學習筆記&#xff0c;深度學習的核心部分&#xff1a;數據準備 ?? 模型構建 ?? 模型訓練 ?? 模型評估與應用。根據上面的思路&#xff0c;我們分為幾個部分&#xff1a; 第一部分&#xff1a;PyTorch 基礎 - 涵蓋了從基本…

UniApp 混合開發:Plus API 從基礎到7大核心場景實戰的完整指南

在 UniApp 混合開發中&#xff0c;plus API 是連接前端代碼與原生設備能力的核心橋梁。基于 5 Runtime&#xff0c;它封裝了設備硬件、系統交互、網絡通信等近百種原生能力&#xff0c;解決了 UniApp 跨端 API 覆蓋不足的問題。但直接使用 plus API 常面臨兼容性復雜、回調嵌套…

本周難點問題詳細總結

&#x1f4cb; 本周技術問題總結 &#x1f534; 1. 表單校驗與用戶體驗 1.1 表單錯誤提示不規范 問題&#xff1a;校驗失敗時缺少頁面標識位置&#xff1a;SupplierForm.vue:375代碼示例&#xff1a;message.error([基本信息] 表單校驗失敗&#xff0c;請檢查必填字段)影響&…

下一代自動駕駛汽車系統XIL驗證方法

摘要自動駕駛汽車測試仍是一個新興且尚未成熟的過程&#xff0c;全球統一的測試流程尚需時日。實車測試對資源要求極高&#xff0c;因此開發并提升基于虛擬環境的測試方法的效率至關重要。有鑒于此&#xff0c;本文提出一種新穎的 X-in-the-Loop&#xff08;XIL&#xff0c;X 代…

視頻數據如何聯網共享?

視頻數據如何聯網共享&#xff1f; 視頻聯網共享系統&#xff0c;實現前端設備的接入管理以及接入數據的獲取。前端設備包括視頻設備、卡口設備、Wifi數據采集設備、移動采集設備以及GPS/北斗數據采集設備等。系統實現海量視頻數據的快速檢索&#xff0c;并為上層數據應用提供視…

Django項目開發全鏈路:數據庫操作、多環境配置、windows/linux項目部署一站式指南

Django項目開發全鏈路:數據庫操作、多環境配置、windows/linux項目部署一站式指南 一、項目初始化 二、創建第一個應用 三、數據庫與數據模型的應用 四、創建管理后臺用戶 五、數據模型與數據庫交互之添加 六、數據模型與數據庫交互之修改 七、數據模型與數據庫交互之查詢 八、…

GLib多線程編程實踐:從數據結構到線程池的完整指南

引言 GLib是一個功能豐富、跨平臺的C程序庫,提供了大量高效且經過充分測試的數據結構與算法接口。本文將通過一個完整的實踐案例,介紹如何使用GLib實現動態數組、鏈表、平衡二叉樹和線程池,并分享在實際開發中遇到的常見問題及解決方案。 一、GLib核心數據結構實踐 1.1 動…