02. Flink 快速上手
1、創建項目導入依賴
pom文件:
<properties><flink.version>1.17.0</flink.version>
</properties><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
</dependency>
2、需求
批處理基本思路:先逐行讀取文本,在根據空格進行單詞拆分,最后再去統計每個單詞出現的頻率。
(1)數據準備
在工程目錄下新建文件夾input,新建文本words.txt。
文件輸入:
hello world
hello flink
hello java
2.1 批處理
代碼編寫(使用DataSet API實現)
package com.company.onedayflink.demo;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class FlinkBatchWords {public static void main(String[] args) throws Exception {// 1、創建執行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2、從文件中讀取數據DataSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");// 3、切分、轉換FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {/**** @param value 讀取到的輸入* @param out 返回的內容,Tuple2是一個二元分組,(字符串,個數)。* @throws Exception*/@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 3.1 切分for (String s : value.split(" ")) {// 3.2 將單組轉為二元組Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 3.3 將二元組發送給下游out.collect(tuple);}}});// 4、按照 word 分組UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroup = wordAndOne.groupBy(0); // 0 表示下標為0的參數,也就是二元組的String單詞// 5、各分組聚合AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroup.sum(1);//1 表示下標1的元素,即單詞個數// 6、輸出sum.print();}
}
運行結果:
2.2 流處理
2.2.1 有界流
代碼編寫(使用DataStream API實現,讀取文件屬于有界流)
package com.company.onedayflink.demo;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;@Slf4j
public class FlinkStreamWords {public static void main(String[] args) throws Exception {// 1、創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、從文件中讀取數據DataStreamSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");// 3、處理數據(切換、轉換、分組、聚合)// 3.1 切換、轉換SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {for (String s : value.split(" ")) {// 構建二元組Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 通過采集器向下游發送數據out.collect(tuple);}}});// 3.2 分組, KeySelector<IN, KEY> 中 IN 表示輸入的類型,KEY 表示分組key的類型KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOne.keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0); // value.f0 表示二元組的第一個元素// 3.3 聚合SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOneKS.sum(1); // 1 表示二元組的第二個元素// 4、輸出數據sum.print();// 5、執行env.execute();}
}
執行結果:
2> (java,1)
3> (hello,1)
3> (hello,2)
3> (hello,3)
6> (world,1)
8> (flink,1)
前面的編號是并行度,線程數。
2.2.2 無界流
(1)使用 netcat 監聽7777端口,建立stream流
安裝 netcat
brew install netcat
監聽 7777 端口
nc -lk 7777
(2)代碼編寫(使用DataStream API實現,讀取stream流屬于無界流)
package com.company.onedayflink.demo;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlinkSteamSocketWords {public static void main(String[] args) throws Exception {// 1、創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2、讀取數據(其中hostname 是需要監聽的主機名稱,mac電腦可以在終端使用hostname命令查看)DataStreamSource<String> socketDS = env.socketTextStream("zgyMacBook-Pro.local", 7777);// 3、數據處理(切割、轉換、分組、聚合)SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {// 3.1 切分for (String s : value.split(" ")) {// 3.2 將單組轉為二元組Tuple2<String, Integer> tuple = Tuple2.of(s, 1);// 3.3 將二元組發送給下游out.collect(tuple);}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1);// 4、輸出sum.print();// 5、執行env.execute();}
}
(3)測試
在終端發送消息
hello flink
hello world
觀察程序控制臺打印
8> (flink,1)
3> (hello,1)
6> (world,1)
3> (hello,2)