Flink DataStream 編程入門

流處理是 Flink 的核心,流處理的數據集用 DataStream 表示。數據流從可以從各種各樣的數據源中創建(消息隊列、Socket 和 文件等),經過 DataStream 的各種 transform 操作,最終輸出文件或者標準輸出。這個過程跟之前文章中介紹的 Flink 程序基本骨架一樣。本篇介紹 DataStream 相關的入門知識。

Flink 101

為了學習 Flink 的朋友能查看到每個例子的源碼,我創建了一個 GitHub 項目:github.com/duma-repo/a… 這里會存放每一篇文章比較重要的示例的源碼,目前支持 Java 和 Scala,仍在不斷完善中。代碼下載后可以在本地運行,也可以打包放在集群上運行。同時,歡迎各位將優質的資源提交到項目中。

簡單示例

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class WindowWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9999).flatMap(new Splitter()).keyBy(0).timeWindow(Time.seconds(5)).sum(1);dataStream.print();env.execute("Window WordCount");}public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word: sentence.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1)); //空格分割后,每個單詞轉換成 (word, 1) 二元組輸出}}}}復制代碼

這個例子跟之間介紹 WordCount 的例子類似,這里詳細介紹下涉及的 API 和含義

  • 數據源:socketTextStream 是從 socket 創建的數據流,可以使用 nc -l 9000 創建 socket 客戶端發送數據
  • transform:flatMap 將輸入的數據按照空格分割后,扁平化處理(flat即為扁平的意思);keyBy 會按照指定的 key 進行分組,這里就是將單詞作為 key;timeWindow 指定時間窗口,這里是 5s 處理一次;sum 是聚合函數,將分組好的單詞個數求和
  • 輸出:print 將處理完的數據輸出到標準輸出流中,可以在控制臺看到輸出的結果。調用 execute 方法提交 Job

Data Source

經過以上的介紹,我們知道常見的數據源有 socket、消息隊列和文件等。對于常見的數據源 Flink 已經定義好了讀取函數,接下來一一介紹。

基于文件

  • readTextFile(path):讀文本文件,默認是文件類型是 TextInputFormat,并且返回類型是 String
  • readFile(fileInputFormat, path):讀文件,需要指定輸入文件的格式
  • readFile(fileInputFormat, path, watchType, interval, typeInfo):以上兩個方法內部都會調用這個方法,參數說明:
    • fileInputFormat - 輸入文件的類型
    • path - 輸入文件路徑
    • watchType - 取值為 FileProcessingMode.PROCESS_CONTINUOUSLY 和 FileProcessingMode.PROCESS_ONCE
      • FileProcessingMode.PROCESS_CONTINUOUSLY - 當輸入路徑下有文件被修改,整個路徑下內容將會被重新處理
      • FileProcessingMode.PROCESS_ONCE - 只掃描一次,便退出。因此這種模式下輸入數據只讀取一次
    • interval - 依賴 watchType 參數,對于 FileProcessingMode.PROCESS_CONTINUOUSLY 每隔固定時間(單位:毫秒)檢測路徑下是否有新數據
    • typeInfo - 返回數據的類型

需要注意,在底層 Flink 將讀文件的過程分為兩個子任務 —— 文件監控和數據讀取(reader)。監控任務由 1 個 task 實現,而讀取的任務由多個 task 實現,數量與 Job 的并行度相同。監控任務的作用是掃描輸入路徑(周期性或者只掃描一次,取決于 watchType),當數據可以被處理時,會將數據分割成多個分片,將分片分配給下游的 reader 。一個分片只會被一個 reader 讀取,一個 reader 可以讀取多個分片。

基于 Socket

  • socketTextStream:從 socket 數據流中讀數據

基于 Collection

  • fromCollection(Collection):從 Java.util.Collection 類型的數據中創建輸入流,collection 中的所有元素類型必須相同
  • fromCollection(Iterator, Class):從 iterator (迭代器)中創建輸入流,Class 參數指定從 iterator 中的數據類型
  • fromElements(T ...):從給定的參數中創建輸入流, 所有參數類型必須相同
  • fromParallelCollection(SplittableIterator, Class):從 iterator 中創建并行的輸入流,Class 指定 iterator 中的數據類型
  • generateSequence(from, to):從 from 至 to 之間的數據序列創建并行的數據流

自定義

  • addSource:可以自定義輸入源,通過實現 SourceFunction 接口來自定義非并行的輸入流;也可以實現 ParallelSourceFunction 接口或集成 RichParallelSourceFunction 類來自定義并行輸入流,當然也可以定義好的數據源,如:Kafka,addSource(new FlinkKafkaConsumer08<>(...))

DataStream 的 transform

之前已經介紹了一些 transfrom 函數,如:map、flatMap 和 filter 等。同時還有窗口函數:window、timeWindow 等,聚合函數:sum、reduce 等。更多的 transform 函數以及使用將會單獨寫一篇文章介紹。

Data Sink

Data Sink 便是數據的輸出。同 Data Source 類似, Flink 也內置了一些輸出函數,如下:

  • writeAsText(path) / TextOutputFormat:將數據作為 String 類型輸出到指定文件
  • writeAsCsv(...) / CsvOutputFormat:將 Tuple 類型輸出到 ',' 分隔的 csv 類型的文件。行和列的分隔符可以通過參數配置,默認的為 '\n' 和 ','
  • print() / printToErr():將數據打印到標準輸出流或者標準錯誤流,可以指定打印的前綴。
  • writeUsingOutputFormat() / FileOutputFormat:輸出到 OutputFormat 類型指定的文件,支持對象到字節的轉換。
  • writeToSocket:根據 SerializationSchema 將數據輸出到 socket
  • addSink:自定義輸出函數,如:自定義將數據輸出到 Kafka

小結

本篇文章主要介紹了 Flink Streaming 編程的基本骨架。詳細介紹了 Streaming 內置的 Data Source 和 DataSink 。下篇將繼續介紹 Flink Streaming 編程涉及的基本概念。

代碼地址: github.com/duma-repo/a…

歡迎關注公眾號「渡碼」


轉載于:https://juejin.im/post/5d09814651882528fd530789

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/387544.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/387544.shtml
英文地址,請注明出處:http://en.pswp.cn/news/387544.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

騰訊手游如何提早揭露游戲外掛風險?

目前騰訊SR手游安全測試限期開放免費專家預約&#xff01;點擊鏈接&#xff1a;手游安全測試立即預約&#xff01; 作者&#xff1a;sheldon&#xff0c;騰訊高級安全工程師 商業轉載請聯系騰訊WeTest獲得授權&#xff0c;非商業轉載請注明出處。 文中動圖無法顯示&#xff0c…

基于ARM Cortex-M0+ 的Bootloader 參考

源&#xff1a; 基于ARM Cortex-M0內核的bootloader程序升級原理及代碼解析轉載于:https://www.cnblogs.com/LittleTiger/p/10312784.html

小猿圈web前端之網站性能優化方案

現在前端不僅要能做出一個網站頁面&#xff0c;還要把這個頁面做的炫酷&#xff0c;那需要很大程度的優化&#xff0c;那么怎么優化才更好呢&#xff1f;小猿圈總結了一下自己優化的方案&#xff0c;感興趣的朋友可以看一下。一般網站優化都是優化后臺&#xff0c;如接口的響應…

下面介紹一個開源的OCR引擎Tesseract2。值得慶幸的是雖然是開源的但是它的識別率較高,并不比其他引擎差勁。網上介紹Tessnet2也是當時時間排名第三的識別引擎,只是后來慢慢不維護了,目前是G

下面介紹一個開源的OCR引擎Tesseract2。值得慶幸的是雖然是開源的但是它的識別率較高&#xff0c;并不比其他引擎差勁。網上介紹Tessnet2也是當時時間排名第三的識別引擎&#xff0c;只是后來慢慢不維護了&#xff0c;目前是Google在維護&#xff0c;大家都知道Google 在搞電子…

js 更改json的 key

let t data.map(item > {return{fee: item[費用],companyName1: item.companyName,remark1: item.remark,beginTime1: item.beginTime,endTime1: item.endTime}})console.log(t) 源地址&#xff1a;https://www.cnblogs.com/Marydon20170307/p/8676611.html轉載于:https:/…

1.4版本上線(第八次會議)

在小組成員連夜趕工的奮斗下&#xff0c;終于在昨天深夜成功實現了UI界面功能 至此&#xff0c;我們的系統終于真正可實用而不是局限在命令行進行互動了 由于python嵌入數據庫功能實現難度較大&#xff0c;迫于時間的局限性&#xff0c;我們選擇了用json文件與txt文件進行替代&…

分UV教程

第一步 首先&#xff0c;打開一個練習場景“空中預警機1.max”&#xff08;這事小弟平時的練習做的不好獻丑了&#xff09;。&#xff08;圖01&#xff09; 圖01 第二步 這里我們拿機翼來舉例子&#xff0c;隱藏除機翼意外的其他模型。&#xff08;圖02&#xff09; 圖02 第三步…

k8s系列--- dashboard認證及分級授權

http://blog.itpub.net/28916011/viewspace-2215214/ 因版本不一樣&#xff0c;略有改動 Dashboard官方地址&#xff1a; https://github.com/kubernetes/dashboard dashbord是作為一個pod來運行&#xff0c;需要serviceaccount賬號來登錄。 先給dashboad創建一個專用的認證信息…

JAVA項目開發

16年java軟件開發經驗&#xff0c;全職項目開發&#xff0c;項目可簽合同、開普票和專票。 主要承接項目&#xff1a; 1、網站開發項目 自主開發千帆CMS動態發布系統&#xff0c;基于java/springboot2/jpa/easyui開發&#xff0c;簡單易用&#xff0c;后臺與前端分離&#xff0…

3dmax基本操作

1、基本操作平移視圖&#xff08;你所說的移動&#xff09;&#xff1a;CTRLP&#xff0c;或者用&#xff0c;滾輪。按住鼠標滾輪不放拖動&#xff0c;就行了。旋轉&#xff1a; ALT滾輪。按住ALT鍵不放&#xff0c;利用滾輪的移動&#xff08;滾輪也要按著不放&#xff09…

padding影響整個div的實際寬度

padding影響整個div的實際寬度 1.不讓padding影響整個div的實際寬度 所以要設置css屬性&#xff1a; box-sizing:box-sizingposted on 2019-01-25 16:58 玉貔貅 閱讀(...) 評論(...) 編輯 收藏 轉載于:https://www.cnblogs.com/yupixiu/p/10320564.html

unity3d 任務頭上的血條

人物的名稱與血條的繪制方法很簡單&#xff0c;但是我們需要解決的問題是如何在3D世界中尋找合適的坐標。因為3D世界中的人物是會移動的&#xff0c;它是在3D世界中移動&#xff0c;并不是在2D平面中移動&#xff0c;但是我們需要將3D的人物坐標換算成2D平面中的坐標&#xff0…

如何在C#中使用Win32和其他庫之三

具有內嵌字符數組的結構 某些函數接受具有內嵌字符數組的結構。例如&#xff0c;GetTimeZoneInformation() 函數接受指向以下結構的指針&#xff1a; typedef struct _TIME_ZONE_INFORMATION { LONG Bias; WCHAR StandardName[ 32 ]; SYSTEMTIME Standa…

unity3d 預制體

首先要說明一下什么是預制體&#xff1f; 在Unity3D里面我們叫它Prefab&#xff1b;我們也可以這樣理解&#xff1a;當制作好了游戲組件&#xff08;場景中的任意一個gameobject &#xff09;,我們希望將它制作成一個組件模版&#xff0c;用于批量的套用工作&#xff0c;例如說…

Python小數據池,代碼塊

今日內容一些小的干貨 一. id is 二. 代碼塊三. 小數據池四. 總結python小數據池&#xff0c;代碼塊的最詳細、深入剖析 一. id is 二. 代碼塊三. 小數據池四. 總結一&#xff0c;id&#xff0c;is&#xff0c; 在Python中&#xff0c;id是什么&#xff1f;id是內存地址…

【Wax】使用Wax (framework方式,XCode 4.6)

前情提示&#xff1a;【Wax】使用Wax &#xff08;非framework方式&#xff0c;XCode 4.6&#xff09; 這次&#xff0c;將以framework的方式來使用Wax 那么&#xff0c;讓我們開始吧&#xff01;&#xff01;&#xff01; 準備工作&#xff1a; 下載wax.framework&#xff1a;…

unity3d 簡單動畫

1&#xff0c;動畫系統配置 創建游戲對象并添加Animation組件&#xff0c;然后將動畫文件拖入組件。 進入動畫文件的Debug屬性面板 選中Legacy屬性 選中游戲對象&#xff0c;打開Animation編輯窗口 添加動畫變化屬性 需改關鍵幀的屬性值 配置完成后運行即可得到動畫效果 2&…

人月神話閱讀筆記(二)

今天對人月神話的正文部分進行了閱讀&#xff0c;從人月神話這一部分中了解到缺乏合理的時間進度控制是造成滯后的主要原因&#xff0c;比其他任何事情影響的和還大&#xff0c;書中也對造成這種這種普遍災難的原因進行了并進行了詳細列舉。 首先&#xff0c;我們對估算技術缺乏…

3dmax導出到unity3d下分割動畫

1、在3dmax 導出時候&#xff0c;要導出FBX文件&#xff0c;同時包含動畫&#xff0c;骨骼&#xff0c;皮膚等內容 2、把FBX文件導入到Unity3d后會默認有一個超長的大動畫&#xff0c;就是一個整體的動畫&#xff0c;如圖Take001&#xff0c;這個時候要分割哪部分是跑&#xf…

華碩首款平板電腦周五開售

新浪科技訊北京時間3月21日晚間消息&#xff0c;華碩周一宣布&#xff0c;將于本周開售首款平板電腦EeePadTransformer。本周五&#xff0c;臺灣地區用戶將可以率先預定這款平板電腦&#xff0c;隨后還將在全球其他國家和地區推出,悠語yoryu化妝品玻尿酸水潤彈力面膜120ml補水保…