5分鐘從零構建第一個 Apache Flink 應用

為什么80%的碼農都做不了架構師?>>> ??hot3.png

在本文中,我們將從零開始,教您如何構建第一個Apache Flink (以下簡稱Flink)應用程序。

開發環境準備

Flink 可以運行在 Linux, Max OS X, 或者是 Windows 上。為了開發 Flink 應用程序,在本地機器上需要有?Java 8.x?和?maven?環境。

如果有 Java 8 環境,運行下面的命令會輸出如下版本信息:

$ java -version
java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)

如果有 maven 環境,運行下面的命令會輸出如下版本信息:

$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"

另外我們推薦使用 ItelliJ IDEA (社區免費版已夠用)作為 Flink 應用程序的開發 IDE。Eclipse 雖然也可以,但是 Eclipse 在 Scala 和 Java 混合型項目下會有些已知問題,所以不太推薦 Eclipse。下一章節,我們會介紹如何創建一個 Flink 工程并將其導入 ItelliJ IDEA。

創建 Maven 項目

我們將使用 Flink Maven Archetype 來創建我們的項目結構和一些初始的默認依賴。在你的工作目錄下,運行如下命令來創建項目:

mvn archetype:generate \-DarchetypeGroupId=org.apache.flink \-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.6.1 \-DgroupId=my-flink-project \-DartifactId=my-flink-project \-Dversion=0.1 \-Dpackage=myflink \-DinteractiveMode=false

你可以編輯上面的 groupId, artifactId, package 成你喜歡的路徑。使用上面的參數,Maven 將自動為你創建如下所示的項目結構:

$ tree my-flink-project
my-flink-project
├── pom.xml
└── src└── main├── java│   └── myflink│       ├── BatchJob.java│       └── StreamingJob.java└── resources└── log4j.properties

我們的 pom.xml 文件已經包含了所需的 Flink 依賴,并且在 src/main/java 下有幾個示例程序框架。接下來我們將開始編寫第一個 Flink 程序。

編寫 Flink 程序

啟動 IntelliJ IDEA,選擇 "Import Project"(導入項目),選擇 my-flink-project 根目錄下的 pom.xml。根據引導,完成項目導入。

在 src/main/java/myflink 下創建?SocketWindowWordCount.java?文件:

package myflink;public class SocketWindowWordCount {public static void main(String[] args) throws Exception {}
}

現在這程序還很基礎,我們會一步步往里面填代碼。注意下文中我們不會將 import 語句也寫出來,因為 IDE 會自動將他們添加上去。在本節末尾,我會將完整的代碼展示出來,如果你想跳過下面的步驟,可以直接將最后的完整代碼粘到編輯器中。

Flink 程序的第一步是創建一個?StreamExecutionEnvironment?。這是一個入口類,可以用來設置參數和創建數據源以及提交任務。所以讓我們把它添加到 main 函數中:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

下一步我們將創建一個從本地端口號 9000 的 socket 中讀取數據的數據源:

DataStream text = env.socketTextStream("localhost", 9000, "\n");

這創建了一個字符串類型的?DataStreamDataStream?是 Flink 中做流處理的核心 API,上面定義了非常多常見的操作(如,過濾、轉換、聚合、窗口、關聯等)。在本示例中,我們感興趣的是每個單詞在特定時間窗口中出現的次數,比如說5秒窗口。為此,我們首先要將字符串數據解析成單詞和次數(使用Tuple2<String, Integer>表示),第一個字段是單詞,第二個字段是次數,次數初始值都設置成了1。我們實現了一個?flatmap?來做解析的工作,因為一行數據中可能有多個單詞。

DataStream> wordCounts = text.flatMap(new FlatMapFunction>() {@Overridepublic void flatMap(String value, Collector> out) {for (String word : value.split("\\s")) {out.collect(Tuple2.of(word, 1));}}});

接著我們將數據流按照單詞字段(即0號索引字段)做分組,這里可以簡單地使用?keyBy(int index)?方法,得到一個以單詞為 key 的Tuple2<String, Integer>數據流。然后我們可以在流上指定想要的窗口,并根據窗口中的數據計算結果。在我們的例子中,我們想要每5秒聚合一次單詞數,每個窗口都是從零開始統計的:。

DataStream> windowCounts = wordCounts.keyBy(0).timeWindow(Time.seconds(5)).sum(1);

第二個調用的?.timeWindow()?指定我們想要5秒的翻滾窗口(Tumble)。第三個調用為每個key每個窗口指定了sum聚合函數,在我們的例子中是按照次數字段(即1號索引字段)相加。得到的結果數據流,將每5秒輸出一次這5秒內每個單詞出現的次數。

最后一件事就是將數據流打印到控制臺,并開始執行:

windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");

最后的?env.execute?調用是啟動實際Flink作業所必需的。所有算子操作(例如創建源、聚合、打印)只是構建了內部算子操作的圖形。只有在execute()被調用時才會在提交到集群上或本地計算機上執行。

下面是完整的代碼,部分代碼經過簡化(代碼在?GitHub?上也能訪問到):

package myflink;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class SocketWindowWordCount {public static void main(String[] args) throws Exception {// 創建 execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 通過連接 socket 獲取輸入數據,這里連接到本地9000端口,如果9000端口已被占用,請換一個端口DataStream text = env.socketTextStream("localhost", 9000, "\n");// 解析數據,按 word 分組,開窗,聚合DataStream> windowCounts = text.flatMap(new FlatMapFunction>() {@Overridepublic void flatMap(String value, Collector> out) {for (String word : value.split("\\s")) {out.collect(Tuple2.of(word, 1));}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1);// 將結果打印到控制臺,注意這里使用的是單線程打印,而非多線程windowCounts.print().setParallelism(1);env.execute("Socket Window WordCount");}
}

運行程序

要運行示例程序,首先我們在終端啟動 netcat 獲得輸入流:

nc -lk 9000

如果是 Windows 平臺,可以通過?https://nmap.org/ncat/?安裝 ncat 然后運行:

ncat -lk 9000

然后直接運行SocketWindowWordCount的 main 方法。

只需要在 netcat 控制臺輸入單詞,就能在?SocketWindowWordCount?的輸出控制臺看到每個單詞的詞頻統計。如果想看到大于1的計數,請在5秒內反復鍵入相同的單詞。

作者:伍翀

原文鏈接?

本文為云棲社區原創內容,未經允許不得轉載。

轉載于:https://my.oschina.net/yunqi/blog/3047427

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/251797.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/251797.shtml
英文地址,請注明出處:http://en.pswp.cn/news/251797.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

WinForm窗體中如何在一個窗體中取到另一個窗體的值

例如我們定義兩窗體&#xff0c;Form1和Form2&#xff0c;如何在Form2中取到Form1中的一個值呢&#xff1f; 解決方法1&#xff1a; 在Form1 中定義一個成員變量&#xff0c;例如public string a “ ”: 然后給這個成員變量賦值&#xff0c;例如 a lblname.text; 在Form2中我…

Android6.0------權限申請RxPermissions

前面寫了Android6.0權限介紹和權限單個&#xff0c;多個申請&#xff0c;用的是純Java代碼&#xff0c;本文主要說的是借助第三方庫來實現權限申請。 借助第三方庫 RxPermissions來申請6.0權限。 RxPermissions庫地址&#xff1a;https://github.com/tbruyelle/RxPermissions …

如何給 mongodb 設置密碼

言簡意賅&#xff0c;步驟如下&#xff1a; 連接mongo mongo進入admin數據庫 use admin  創建管理員賬戶db.createUser({ user: "adminName", pwd: "adminPassword", roles: [{ role: "userAdminAnyDatabase", db: "admin&qu…

while和do-while循環結構

while(循環條件){ 循環操作 i; } 1.聲明并初始化循環變量。 2.判斷循環條件是否滿足&#xff0c;如果滿足則執行循環操作&#xff1b;否則退出循環。 3.執行完循環操作后&#xff0c;再次判斷循環條件&#xff0c;決定繼續執行循環或退出循環。 *while循環的特點&#xff1a;先…

Thread線程類及多線程

1.進程、線程、并發、并行是什么&#xff1f; 1)進程&#xff1a;操作系統中可以運行多個任務(程序)&#xff0c;這些運行的任務(程序)被稱為進程。程序的運行產生進程(內存空間、程序執行的堆棧)&#xff0c;可以這樣說&#xff0c;進程是作為操作系統分配資源的基本單位。 2)…

絳河 初識WCF5

然后我們在<Client>中添加一個終結點&#xff0c;這個是客戶端的終結點&#xff0c;我們前面曾經提過&#xff0c;通信實際上發生在兩個終結點間&#xff0c;客戶端也有個終結點&#xff0c;然而請求總是從客戶端首先發起&#xff0c;所以終結點地址應該填寫為服務端終結…

python修煉第四天

今天換了師傅。江湖人稱景女神^o^。 女師傅講的比較細&#xff0c;原理的比較多。初學者來說有些難。但是基本功是必須要打牢的。努力&#xff01; 迭代器 迭代器&#xff0c;迭代的工具1 什么是迭代&#xff0c;指的是一個重復的過程&#xff0c;每一次重復稱為一次迭代&#…

尷尬的存儲過程

最近在給一個已沉淀了多年的系統框架進行優化&#xff0c;發現大部分的基礎業務&#xff08;比如增刪改&#xff09;的實現都是通過存儲過程來實現。這讓我糾結了很久&#xff0c;看了下代碼格式我猜應該都是使用了代碼生成器。這無疑為系統的擴展留下了一個難以彌補的大坑。 首…

java虛擬機06-內存分區/新生代、老年代

1.原因 JVM在程序運行過程當中&#xff0c;會創建大量的對象&#xff0c;這些對象&#xff0c;大部分是短周期的對象&#xff0c;小部分是長周期的對象&#xff0c;對于短周期的對象&#xff0c;需要頻繁地進行垃圾回收以保證無用對象盡早被釋放掉&#xff0c;對于長周期對象&a…

博客作業04--樹

1.學習總結(2分) 1.1樹結構思維導圖 1.2 樹結構學習體會 樹這一章節比較復雜&#xff0c;知識點繁多&#xff0c;結合了遞歸的知識所以代碼閱讀起來會有障礙&#xff0c;難以理解&#xff0c;所以學起來比較吃力&#xff0c;而且很多經典的算法理解的不是很透徹解決pta上的問題…

Centos 配置多個虛擬IP

Centos 配置多個虛擬IP 臨時設置 ifconfig enp2s0:3 192.168.3.152 netmask 255.255.255.0 up 復制代碼永久生效 TYPEEthernet BOOTPROTOnone NAMEenp2s0 DEVICEenp2s0 HWADDR40:8d:5c:bc:f4:d8 ONBOOTyes IPADDR0192.168.3.200 PREFIX024 GATEWAY0192.168.3.254 IPADDR1192.16…

[轉]MySQL日志——Undo | Redo

本文是介紹MySQL數據庫InnoDB存儲引擎重做日志漫游 00 – Undo LogUndo Log 是為了實現事務的原子性&#xff0c;在MySQL數據庫InnoDB存儲引擎中&#xff0c;還用Undo Log來實現多版本并發控制(簡稱&#xff1a;MVCC)。 - 事務的原子性(Atomicity) 事務中的所有操作&#xff0…

Vim操作指南

vim具有6種基本模式和5種派生模式。 基本模式 普通模式 插入模式 可視模式 選擇模式 命令行模式 Ex模式 派生模式 操作符等待模式 插入普通模式 插入可視模式 插入選擇模式 替換模式 1.移動光標&#xff08;普通模式下&#xff09; h&#xff1a;左 j&#xff1a;下 …

[DP/單調隊列]BZOJ 2059 [Usaco2010 Nov]Buying Feed 購買飼料

首先我想吐槽的是題目并沒有表明數據范圍。。。 這個題目 DP方程并不難表示。 dp[i][j]表示前i個地點攜帶了j個貨物的最小花費 dp[i][j] dp[i-1][k] (j-k) * cost j*j*(leng[i]-leng[i-1]) 如果你這樣直接提交上去&#xff0c;恭喜你超時&#xff01;&#xff01;&#xff0…

十天沖刺09

今天&#xff0c;和小伙伴在做密保功能的開發&#xff0c;而且通過密保可以找回用戶密碼。轉載于:https://www.cnblogs.com/Excusezuo/p/10923690.html

hdu 6168 Numbers

zk has n numbers a1,a2,...,an. For each (i,j) satisfying 1≤i<j≤n, zk generates a new number (aiaj). These new numbers could make up a new sequence b1&#xff0c;b2,...,bn(n?1)/2 . LsF wants to make some trouble. While zk is sleeping, Lsf mixed up seq…

039_MySQL_多表查詢

#創建部門 CREATE TABLE IF NOT EXISTS dept (did int not null auto_increment PRIMARY KEY,dname VARCHAR(50) not null COMMENT 部門名稱 )ENGINEINNODB DEFAULT charset utf8;#添加部門數據 INSERT INTO dept VALUES (1, 教學部); INSERT INTO dept VALUES (2, 銷售部); IN…

sqlserver 創建對某個存儲過程執行情況的跟蹤

有時候需要抓取執行存儲過程時某個參數的值&#xff0c;有時候程序調用存儲過程執行后結果不太對&#xff0c;不確定是程序的問題還是存儲過程的問題&#xff0c;需要單獨執行存儲過程看結果 即可用下面的方法 -- --創建對某個存儲過程的執行情況的跟蹤 --注意修改路徑 和 obje…

5.7 彈性盒子

彈性盒子定義彈性盒子 display&#xff1a;flex定義子元素排列方式 flex-diection定義子元素換行方式 flxe-wrap定義子元素對齊方式橫向對齊 justify-content縱向對齊 align-items 媒體查詢 media screen and (max-width:最大寬度)and &#xff08;min-width&#xff1a;最小…

4.navicat11激活教程,親測可用哦!

原文地址&#xff1a;http://blog.csdn.net/sanbingyutuoniao123/article/details/52589678Navicat是一款數據庫管理工具, 用于簡化, 開發和管理MySQL, SQL Server, SQLite, Oracle 和 PostgreSQL 的數據庫&#xff1b;Navicat數據模型工具以圖形化方式創建關聯式數據庫&#x…