Flink 系列文章
一、Flink 專欄
Flink 專欄系統介紹某一知識點,并輔以具體的示例進行說明。
-
1、Flink 部署系列
本部分介紹Flink的部署、配置相關基礎內容。 -
2、Flink基礎系列
本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 -
3、Flik Table API和SQL基礎系列
本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創建庫、表用法、查詢、窗口函數、catalog等等內容。 -
4、Flik Table API和SQL提高與應用系列
本部分是table api 和sql的應用部分,和實際的生產應用聯系更為密切,以及有一定開發難度的內容。 -
5、Flink 監控系列
本部分和實際的運維、監控工作相關。
二、Flink 示例專欄
Flink 示例專欄是 Flink 專欄的輔助說明,一般不會介紹知識點的信息,更多的是提供一個一個可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內容。
兩專欄的所有文章入口點擊:Flink 系列文章匯總索引
文章目錄
- Flink 系列文章
- 一、Flink的23種算子說明及示例
- 1、maven依賴
- 2、java bean
- 3、map
- 4、flatmap
- 5、Filter
- 6、KeyBy
- 7、Reduce
- 8、Aggregations
- 9、first、distinct、join、outjoin、cross
- 10、Window
- 11、WindowAll
- 12、Window Apply
- 13、Window Reduce
- 14、Aggregations on windows
- 15、Union
- 16、Window Join
- 17、Interval Join
- 18、Window CoGroup
- 19、Connect
- 20、CoMap, CoFlatMap
- 21、Iterate
- 22、Cache
- 23、Split
- 24、Select
- 25、Project
本文主要介紹Flink 的23種常用的operator及以具體可運行示例進行說明,如果需要了解更多內容,可以在本人Flink 專欄中了解更新系統的內容。
本文除了maven依賴外,沒有其他依賴。
本專題分為五篇,即:
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(1)- map、flatmap和filter
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(2)- keyby、reduce和Aggregations
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(3)-window、distinct、join等
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(4)- union、window join、connect、outputtag、cache、iterator、project
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(完整版)
一、Flink的23種算子說明及示例
1、maven依賴
下文中所有示例都是用該maven依賴,除非有特殊說明的情況。
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.4</version></dependency></dependencies>
2、java bean
下文所依賴的User如下
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {private int id;private String name;private String pwd;private String email;private int age;private double balance;
}
3、map
[DataStream->DataStream]
這是最簡單的轉換之一,其中輸入是一個數據流,輸出的也是一個數據流。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestMapDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// source// transformationmapFunction5(env);// sink// executeenv.execute();}// 構造一個list,然后將list中數字乘以2輸出,內部匿名類實現public static void mapFunction1(StreamExecutionEnvironment env) throws Exception {List<Integer> data = new ArrayList<Integer>();for (int i = 1; i <= 10; i++) {data.add(i);}DataStreamSource<Integer> source = env.fromCollection(data);SingleOutputStreamOperator<Integer> sink = source.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer inValue) throws Exception {return inValue * 2;}});sink.print();
// 9> 12
// 4> 2
// 10> 14
// 8> 10
// 13> 20
// 7> 8
// 12> 18
// 11> 16
// 5> 4
// 6> 6}// 構造一個list,然后將list中數字乘以2輸出,lambda實現public static void mapFunction2(StreamExecutionEnvironment env) throws Exception {List<Integer> data = new ArrayList<Integer>();for (int i = 1; i <= 10; i++) {data.add(i);}DataStreamSource<Integer> source = env.fromCollection(data);SingleOutputStreamOperator<Integer> sink = source.map(i -> 2 * i);sink.print();
// 3> 4
// 4> 6
// 9> 16
// 7> 12
// 10> 18
// 2> 2
// 6> 10
// 5> 8
// 8> 14
// 11> 20}// 構造User數據源public static DataStreamSource<User> source(StreamExecutionEnvironment env) {DataStreamSource<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 12, 1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 28, 1500), new User(5, "alan1", "5", "5@5.com", 15, 500), new User(4, "alan2", "4", "4@4.com", 30, 400)));return source;}// lambda實現用戶對象的balance×2和age+5功能public static SingleOutputStreamOperator<User> mapFunction3(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);SingleOutputStreamOperator<User> sink = source.map((MapFunction<User, User>) user -> {User user2 = user;user2.setAge(user.getAge() + 5);user2.setBalance(user.getBalance() * 2);return user2;});sink.print();
// 10> User(id=1, name=alan1, pwd=1, email=1@1.com, age=17, balance=2000.0)
// 14> User(id=4, name=alan2, pwd=4, email=4@4.com, age=35, balance=800.0)
// 11> User(id=2, name=alan2, pwd=2, email=2@2.com, age=24, balance=400.0)
// 12> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
// 13> User(id=5, name=alan1, pwd=5, email=5@5.com, age=20, balance=1000.0)return sink;}// lambda實現balance*2和age+5后,balance》=2000和age》=20的數據過濾出來public static SingleOutputStreamOperator<User> mapFunction4(StreamExecutionEnvironment env) throws Exception {SingleOutputStreamOperator<User> sink = mapFunction3(env).filter(user -> user.getBalance() >= 2000 && user.getAge() >= 20);sink.print();
// 15> User(id=1, name=alan1, pwd=1, email=1@1.com, age=17, balance=2000.0)
// 1> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
// 16> User(id=2, name=alan2, pwd=2, email=2@2.com, age=24, balance=400.0)
// 3> User(id=4, name=alan2, pwd=4, email=4@4.com, age=35, balance=800.0)
// 2> User(id=5, name=alan1, pwd=5, email=5@5.com, age=20, balance=1000.0)
// 1> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)return sink;}// lambda實現balance*2和age+5后,balance》=2000和age》=20的數據過濾出來并通過flatmap收集public static SingleOutputStreamOperator<User> mapFunction5(StreamExecutionEnvironment env) throws Exception {SingleOutputStreamOperator<User> sink = mapFunction4(env).flatMap((FlatMapFunction<User, User>) (user, out) -> {if (user.getBalance() >= 3000) {out.collect(user);}}).returns(User.class);sink.print();
// 8> User(id=5, name=alan1, pwd=5, email=5@5.com, age=20, balance=1000.0)
// 7> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
// 6> User(id=2, name=alan2, pwd=2, email=2@2.com, age=24, balance=400.0)
// 9> User(id=4, name=alan2, pwd=4, email=4@4.com, age=35, balance=800.0)
// 5> User(id=1, name=alan1, pwd=1, email=1@1.com, age=17, balance=2000.0)
// 7> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
// 7> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)return sink;}}
4、flatmap
[DataStream->DataStream]
FlatMap 采用一條記錄并輸出零個,一個或多個記錄。將集合中的每個元素變成一個或多個元素,并返回扁平化之后的結果。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestFlatMapDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();flatMapFunction3(env);env.execute();}// 構造User數據源public static DataStreamSource<String> source(StreamExecutionEnvironment env) {List<String> info = new ArrayList<>();info.add("i am alanchan");info.add("i like hadoop");info.add("i like flink");info.add("and you ?");DataStreamSource<String> dataSource = env.fromCollection(info);return dataSource;}// 將句子以空格進行分割-內部匿名類實現public static void flatMapFunction1(StreamExecutionEnvironment env) throws Exception {DataStreamSource<String> source = source(env);SingleOutputStreamOperator<String> sink = source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] splits = value.split(" ");for (String split : splits) {out.collect(split);}}});sink.print();
// 11> and
// 10> i
// 8> i
// 9> i
// 8> am
// 10> like
// 11> you
// 10> flink
// 8> alanchan
// 9> like
// 11> ?
// 9> hadoop}// lambda實現public static void flatMapFunction2(StreamExecutionEnvironment env) throws Exception {DataStreamSource<String> source = source(env);SingleOutputStreamOperator<String> sink = source.flatMap((FlatMapFunction<String, String>) (input, out) -> {String[] splits = input.split(" ");for (String split : splits) {out.collect(split);}}).returns(String.class);sink.print();
// 6> i
// 8> and
// 8> you
// 8> ?
// 5> i
// 7> i
// 5> am
// 5> alanchan
// 6> like
// 7> like
// 6> hadoop
// 7> flink}// lambda實現public static void flatMapFunction3(StreamExecutionEnvironment env) throws Exception {DataStreamSource<String> source = source(env);SingleOutputStreamOperator<String> sink = source.flatMap((String input, Collector<String> out) -> Arrays.stream(input.split(" ")).forEach(out::collect)).returns(String.class);sink.print();
// 8> i
// 11> and
// 10> i
// 9> i
// 10> like
// 11> you
// 8> am
// 11> ?
// 10> flink
// 9> like
// 9> hadoop
// 8> alanchan}}
5、Filter
DataStream → DataStream
Filter 函數根據條件判斷出結果。按照指定的條件對集合中的元素進行過濾,過濾出返回true/符合條件的元素。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestFilterDemo {// 構造User數據源public static DataStreamSource<User> sourceUser(StreamExecutionEnvironment env) {DataStreamSource<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 12, 1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 28, 1500), new User(5, "alan1", "5", "5@5.com", 15, 500), new User(4, "alan2", "4", "4@4.com", 30, 400)));return source;}// 構造User數據源public static DataStreamSource<Integer> sourceList(StreamExecutionEnvironment env) {List<Integer> data = new ArrayList<Integer>();for (int i = 1; i <= 10; i++) {data.add(i);}DataStreamSource<Integer> source = env.fromCollection(data);return source;}// 過濾出大于5的數字,內部匿名類public static void filterFunction1(StreamExecutionEnvironment env) throws Exception {DataStream<Integer> source = sourceList(env);SingleOutputStreamOperator<Integer> sink = source.map(new MapFunction<Integer, Integer>() {public Integer map(Integer value) throws Exception {return value + 1;}}).filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value > 5;}});sink.print();
// 1> 10
// 14> 7
// 16> 9
// 13> 6
// 2> 11
// 15> 8}// lambda實現public static void filterFunction2(StreamExecutionEnvironment env) throws Exception {DataStream<Integer> source = sourceList(env);SingleOutputStreamOperator<Integer> sink = source.map(i -> i + 1).filter(value -> value > 5);sink.print();
// 12> 7
// 15> 10
// 11> 6
// 13> 8
// 14> 9
// 16> 11}// 查詢user id大于3的記錄public static void filterFunction3(StreamExecutionEnvironment env) throws Exception {DataStream<User> source = sourceUser(env);SingleOutputStreamOperator<User> sink = source.filter(user -> user.getId() > 3);sink.print();
// 14> User(id=5, name=alan1, pwd=5, email=5@5.com, age=15, balance=500.0)
// 15> User(id=4, name=alan2, pwd=4, email=4@4.com, age=30, balance=400.0)}/*** @param args*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();filterFunction3(env);env.execute();}}
6、KeyBy
DataStream → KeyedStream
按照指定的key來對流中的數據進行分組
KeyBy 在邏輯上是基于 key 對流進行分區。在內部,它使用 hash 函數對流進行分區。它返回 KeyedDataStream 數據流。將同一Key的數據放到同一個分區。
分區結果和KeyBy下游算子的并行度強相關。如下游算子只有一個并行度,不管怎么分,都會分到一起。
對于POJO類型,KeyBy可以通過keyBy(fieldName)指定字段進行分區。
對于Tuple類型,KeyBy可以通過keyBy(fieldPosition)指定字段進行分區。
對于一般類型,如上,KeyBy可以通過keyBy(new KeySelector {…})指定字段進行分區。
import java.util.Arrays;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestKeyByDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(4);// 設置數據分區數量keyByFunction6(env);env.execute();}// 構造User數據源public static DataStreamSource<User> source(StreamExecutionEnvironment env) {DataStreamSource<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 12, 1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 28, 1500), new User(5, "alan1", "5", "5@5.com", 15, 500), new User(4, "alan2", "4", "4@4.com", 30, 400)));return source;}// 按照name進行keyby 對于POJO類型,KeyBy可以通過keyBy(fieldName)指定字段進行分區public static void keyByFunction1(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);KeyedStream<User, String> sink = source.keyBy(new KeySelector<User, String>() {@Overridepublic String getKey(User value) throws Exception {return value.getName();}});sink.map(user -> {System.out.println("當前線程ID:" + Thread.currentThread().getId() + ",user:" + user.toString());return user;});sink.print();}// lambda 對于POJO類型,KeyBy可以通過keyBy(fieldName)指定字段進行分區public static void keyByFunction2(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);KeyedStream<User, String> sink = source.keyBy(user -> user.getName());// 演示keyby后的數據輸出sink.map(user -> {System.out.println("當前線程ID:" + Thread.currentThread().getId() + ",user:" + user.toString());return user;});sink.print();}// 對于Tuple類型,KeyBy可以通過keyBy(fieldPosition)指定字段進行分區。lambdapublic static void keyByFunction3(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);SingleOutputStreamOperator<Tuple2<String, User>> userTemp = source.map((MapFunction<User, Tuple2<String, User>>) user -> {return new Tuple2<String, User>(user.getName(), user);}).returns(Types.TUPLE(Types.STRING, Types.POJO(User.class)));KeyedStream<Tuple2<String, User>, Tuple> sink = userTemp.keyBy(0);// 演示keyby后的數據輸出sink.map(user -> {System.out.println("當前線程ID:" + Thread.currentThread().getId() + ",user:" + user.f1.toString());return user.f1;});sink.print();}// 對于Tuple類型,KeyBy可以通過keyBy(fieldPosition)指定字段進行分區。public static void keyByFunction4(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);SingleOutputStreamOperator<Tuple2<String, User>> userTemp = source.map(new MapFunction<User, Tuple2<String, User>>() {@Overridepublic Tuple2<String, User> map(User value) throws Exception {return new Tuple2<String, User>(value.getName(), value);}});KeyedStream<Tuple2<String, User>, String> sink = userTemp.keyBy(new KeySelector<Tuple2<String, User>, String>() {@Overridepublic String getKey(Tuple2<String, User> value) throws Exception {return value.f0;}});// 演示keyby后的數據輸出sink.map(user -> {System.out.println("當前線程ID:" + Thread.currentThread().getId() + ",user:" + user.f1.toString());return user.f1;});// sink.map(new MapFunction<Tuple2<String, User>, String>() {
//
// @Override
// public String map(Tuple2<String, User> value) throws Exception {
// System.out.println("當前線程ID:" + Thread.currentThread().getId() + ",user:" + value.f1.toString());
// return null;
// }});sink.print();}// 對于一般類型,如上,KeyBy可以通過keyBy(new KeySelector {...})指定字段進行分區。// 按照name的前4位進行keybypublic static void keyByFunction5(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);KeyedStream<User, String> sink = source.keyBy(new KeySelector<User, String>() {@Overridepublic String getKey(User value) throws Exception {
// String temp = value.getName().substring(0, 4);return value.getName().substring(0, 4);}});sink.map(user -> {System.out.println("當前線程ID:" + Thread.currentThread().getId() + ",user:" + user.toString());return user;});sink.print();}// 對于一般類型,如上,KeyBy可以通過keyBy(new KeySelector {...})指定字段進行分區。 lambda// 按照name的前4位進行keybypublic static void keyByFunction6(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);KeyedStream<User, String> sink = source.keyBy(user -> user.getName().substring(0, 4));sink.map(user -> {System.out.println("當前線程ID:" + Thread.currentThread().getId() + ",user:" + user.toString());return user;});sink.print();}}
7、Reduce
KeyedStream → DataStream
對集合中的元素進行聚合。Reduce 返回單個的結果值,并且 reduce 操作每處理一個元素總是創建一個新值。常用的方法有 average, sum, min, max, count,使用 reduce 方法都可實現。基于ReduceFunction進行滾動聚合,并向下游算子輸出每次滾動聚合后的結果。
注意: Reduce會輸出每一次滾動聚合的結果。
import java.util.Arrays;import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(4);// 設置數據分區數量reduceFunction2(env);env.execute();}// 構造User數據源public static DataStreamSource<User> source(StreamExecutionEnvironment env) {DataStreamSource<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 12, 1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 28, 1500), new User(5, "alan1", "5", "5@5.com", 15, 500), new User(4, "alan2", "4", "4@4.com", 30, 400)));return source;}// 按照name進行balance進行sumpublic static void reduceFunction1(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);KeyedStream<User, String> keyedStream = source.keyBy(user -> user.getName());SingleOutputStreamOperator<User> sink = keyedStream.reduce(new ReduceFunction<User>() {@Overridepublic User reduce(User value1, User value2) throws Exception {double balance = value1.getBalance() + value2.getBalance();return new User(value1.getId(), value1.getName(), "", "", 0, balance);}});//sink.print();}// 按照name進行balance進行sum lambdapublic static void reduceFunction2(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);KeyedStream<User, String> userKeyBy = source.keyBy(user -> user.getName());SingleOutputStreamOperator<User> sink = userKeyBy.reduce((user1, user2) -> {User user = user1;user.setBalance(user1.getBalance() + user2.getBalance());return user;});sink.print();}}
8、Aggregations
KeyedStream → DataStream
DataStream API 支持各種聚合,例如 min,max,sum 等。 這些函數可以應用于 KeyedStream 以獲得 Aggregations 聚合。
Aggregate 對KeyedStream按指定字段滾動聚合并輸出每一次滾動聚合后的結果。默認的聚合函數有:sum、min、minBy、max、maxBy。
注意:
max(field)與maxBy(field)的區別: maxBy返回field最大的那條數據;而max則是將最大的field的值賦值給第一條數據并返回第一條數據。同理,min與minBy。
Aggregate聚合算子會滾動輸出每一次聚合后的結果
max 和 maxBy 之間的區別在于 max 返回流中的最大值,但 maxBy 返回具有最大值的鍵, min 和 minBy 同理。
max以第一個比較對象的比較列值進行替換,maxBy是以整個比較對象進行替換。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestAggregationsDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();aggregationsFunction2(env);env.execute();}// 構造User數據源public static DataStreamSource<User> source(StreamExecutionEnvironment env) {DataStreamSource<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 12, 1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 28, 1500), new User(5, "alan1", "5", "5@5.com", 15, 500), new User(4, "alan2", "4", "4@4.com", 30, 400)));return source;}//分組統計sum、max、min、maxby、minbypublic static void aggregationsFunction(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);KeyedStream<User, String> userTemp= source.keyBy(user->user.getName());DataStream sink = null;//1、根據name進行分區統計balance之和 alan1----2500/alan2----600
// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=2500.0)
// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=600.0)
// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=3000.0)sink = userTemp.sum("balance");//2、根據name進行分區統計balance的max alan1----1500/alan2----400
// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1500.0)
// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=400.0)
// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1500.0)sink = userTemp.max("balance");//1@1.com-3000 -- 2@2.com-300//3、根據name進行分區統計balance的min alan1----500/alan2---200
// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=500.0)
// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)sink = userTemp.min("balance");//4、根據name進行分區統計balance的maxBy alan2----400/alan1----1500
// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
// 1> User(id=4, name=alan2, pwd=4, email=4@4.com, age=30, balance=400.0)
// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
// 16> User(id=3, name=alan1, pwd=3, email=3@3.com, age=28, balance=1500.0)
// 16> User(id=3, name=alan1, pwd=3, email=3@3.com, age=28, balance=1500.0)sink = userTemp.maxBy("balance");//5、根據name進行分區統計balance的minBy alan2----200/alan1----500
// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
// 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
// 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
// 16> User(id=5, name=alan1, pwd=5, email=5@5.com, age=15, balance=500.0)sink = userTemp.minBy("balance");sink.print();}public static void aggregationsFunction2(StreamExecutionEnvironment env) throws Exception {List list = new ArrayList<Tuple3<Integer, Integer, Integer>>();list.add(new Tuple3<>(0,3,6));list.add(new Tuple3<>(0,2,5));list.add(new Tuple3<>(0,1,6));list.add(new Tuple3<>(0,4,3));list.add(new Tuple3<>(1,1,9));list.add(new Tuple3<>(1,2,8));list.add(new Tuple3<>(1,3,10));list.add(new Tuple3<>(1,2,9));list.add(new Tuple3<>(1,5,7));DataStreamSource<Tuple3<Integer, Integer, Integer>> source = env.fromCollection(list);KeyedStream<Tuple3<Integer, Integer, Integer>, Integer> tTemp= source.keyBy(t->t.f0);DataStream<Tuple3<Integer, Integer, Integer>> sink =null;//按照分區,以第一個Tuple3的元素為基礎進行第三列值比較,如果第三列值小于第一個tuple3的第三列值,則進行第三列值替換,其他的不變
// 12> (0,3,6)
// 11> (1,1,9)
// 11> (1,1,8)
// 12> (0,3,5)
// 11> (1,1,8)
// 12> (0,3,5)
// 11> (1,1,8)
// 12> (0,3,3)
// 11> (1,1,7) sink = tTemp.min(2);// 按照數據分區,以第一個tuple3的元素為基礎進行第三列值比較,如果第三列值小于第一個tuple3的第三列值,則進行整個tuple3的替換
// 12> (0,3,6)
// 11> (1,1,9)
// 12> (0,2,5)
// 11> (1,2,8)
// 12> (0,2,5)
// 11> (1,2,8)
// 12> (0,4,3)
// 11> (1,2,8)
// 11> (1,5,7)sink = tTemp.minBy(2);sink.print();}}
9、first、distinct、join、outjoin、cross
具體事例詳見例子及結果。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestFirst_Join_Distinct_OutJoin_CrossDemo {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();joinFunction(env);env.execute();}public static void unionFunction(StreamExecutionEnvironment env) throws Exception {List<String> info1 = new ArrayList<>();info1.add("team A");info1.add("team B");List<String> info2 = new ArrayList<>();info2.add("team C");info2.add("team D");List<String> info3 = new ArrayList<>();info3.add("team E");info3.add("team F");List<String> info4 = new ArrayList<>();info4.add("team G");info4.add("team H");DataStream<String> source1 = env.fromCollection(info1);DataStream<String> source2 = env.fromCollection(info2);DataStream<String> source3 = env.fromCollection(info3);DataStream<String> source4 = env.fromCollection(info4);source1.union(source2).union(source3).union(source4).print();
// team A
// team C
// team E
// team G
// team B
// team D
// team F
// team H}public static void crossFunction(ExecutionEnvironment env) throws Exception {// cross,求兩個集合的笛卡爾積,得到的結果數為:集合1的條數 乘以 集合2的條數List<String> info1 = new ArrayList<>();info1.add("team A");info1.add("team B");List<Tuple2<String, Integer>> info2 = new ArrayList<>();info2.add(new Tuple2("W", 3));info2.add(new Tuple2("D", 1));info2.add(new Tuple2("L", 0));DataSource<String> data1 = env.fromCollection(info1);DataSource<Tuple2<String, Integer>> data2 = env.fromCollection(info2);data1.cross(data2).print();
// (team A,(W,3))
// (team A,(D,1))
// (team A,(L,0))
// (team B,(W,3))
// (team B,(D,1))
// (team B,(L,0))}public static void outerJoinFunction(ExecutionEnvironment env) throws Exception {// Outjoin,跟sql語句中的left join,right join,full join意思一樣// leftOuterJoin,跟join一樣,但是左邊集合的沒有關聯上的結果也會取出來,沒關聯上的右邊為null// rightOuterJoin,跟join一樣,但是右邊集合的沒有關聯上的結果也會取出來,沒關聯上的左邊為null// fullOuterJoin,跟join一樣,但是兩個集合沒有關聯上的結果也會取出來,沒關聯上的一邊為nullList<Tuple2<Integer, String>> info1 = new ArrayList<>();info1.add(new Tuple2<>(1, "shenzhen"));info1.add(new Tuple2<>(2, "guangzhou"));info1.add(new Tuple2<>(3, "shanghai"));info1.add(new Tuple2<>(4, "chengdu"));List<Tuple2<Integer, String>> info2 = new ArrayList<>();info2.add(new Tuple2<>(1, "深圳"));info2.add(new Tuple2<>(2, "廣州"));info2.add(new Tuple2<>(3, "上海"));info2.add(new Tuple2<>(5, "杭州"));DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1);DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2);// left join
// eft join:7> (1,shenzhen,深圳)
// left join:2> (3,shanghai,上海)
// left join:8> (4,chengdu,未知)
// left join:16> (2,guangzhou,廣州)data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {Tuple3<Integer, String, String> tuple = new Tuple3();if (second == null) {tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField("未知", 2);} else {// 另外一種賦值方式,和直接用構造函數賦值相同tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField(second.f1, 2);}return tuple;}}).print("left join");// right join
// right join:2> (3,shanghai,上海)
// right join:7> (1,shenzhen,深圳)
// right join:15> (5,--,杭州)
// right join:16> (2,guangzhou,廣州)data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {Tuple3<Integer, String, String> tuple = new Tuple3();if (first == null) {tuple.setField(second.f0, 0);tuple.setField("--", 1);tuple.setField(second.f1, 2);} else {// 另外一種賦值方式,和直接用構造函數賦值相同tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField(second.f1, 2);}return tuple;}}).print("right join");// fullOuterJoin
// fullOuterJoin:2> (3,shanghai,上海)
// fullOuterJoin:8> (4,chengdu,--)
// fullOuterJoin:15> (5,--,杭州)
// fullOuterJoin:16> (2,guangzhou,廣州)
// fullOuterJoin:7> (1,shenzhen,深圳)data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {Tuple3<Integer, String, String> tuple = new Tuple3();if (second == null) {tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField("--", 2);} else if (first == null) {tuple.setField(second.f0, 0);tuple.setField("--", 1);tuple.setField(second.f1, 2);} else {// 另外一種賦值方式,和直接用構造函數賦值相同tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField(second.f1, 2);}return tuple;}}).print("fullOuterJoin");}public static void joinFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info1 = new ArrayList<>();info1.add(new Tuple2<>(1, "shenzhen"));info1.add(new Tuple2<>(2, "guangzhou"));info1.add(new Tuple2<>(3, "shanghai"));info1.add(new Tuple2<>(4, "chengdu"));List<Tuple2<Integer, String>> info2 = new ArrayList<>();info2.add(new Tuple2<>(1, "深圳"));info2.add(new Tuple2<>(2, "廣州"));info2.add(new Tuple2<>(3, "上海"));info2.add(new Tuple2<>(5, "杭州"));DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1);DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2);//// join:2> ((3,shanghai),(3,上海))
// join:16> ((2,guangzhou),(2,廣州))
// join:7> ((1,shenzhen),(1,深圳))data1.join(data2).where(0).equalTo(0).print("join");// join2:2> (3,上海,shanghai)
// join2:7> (1,深圳,shenzhen)
// join2:16> (2,廣州,guangzhou)DataSet<Tuple3<Integer, String, String>> data3 = data1.join(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {return new Tuple3<Integer, String, String>(first.f0, second.f1, first.f1);}});data3.print("join2");}public static void firstFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info = new ArrayList<>();info.add(new Tuple2(1, "Hadoop"));info.add(new Tuple2(1, "Spark"));info.add(new Tuple2(1, "Flink"));info.add(new Tuple2(2, "Scala"));info.add(new Tuple2(2, "Java"));info.add(new Tuple2(2, "Python"));info.add(new Tuple2(3, "Linux"));info.add(new Tuple2(3, "Window"));info.add(new Tuple2(3, "MacOS"));DataSet<Tuple2<Integer, String>> dataSet = env.fromCollection(info);// 前幾個
// dataSet.first(4).print();
// (1,Hadoop)
// (1,Spark)
// (1,Flink)
// (2,Scala)// 按照tuple2的第一個元素進行分組,查出每組的前2個
// dataSet.groupBy(0).first(2).print();
// (3,Linux)
// (3,Window)
// (1,Hadoop)
// (1,Spark)
// (2,Scala)
// (2,Java)// 按照tpule2的第一個元素進行分組,并按照倒序排列,查出每組的前2個dataSet.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();
// (3,Window)
// (3,MacOS)
// (1,Spark)
// (1,Hadoop)
// (2,Scala)
// (2,Python)}public static void distinctFunction(ExecutionEnvironment env) throws Exception {List list = new ArrayList<Tuple3<Integer, Integer, Integer>>();list.add(new Tuple3<>(0, 3, 6));list.add(new Tuple3<>(0, 2, 5));list.add(new Tuple3<>(0, 3, 6));list.add(new Tuple3<>(1, 1, 9));list.add(new Tuple3<>(1, 2, 8));list.add(new Tuple3<>(1, 2, 8));list.add(new Tuple3<>(1, 3, 9));DataSet<Tuple3<Integer, Integer, Integer>> source = env.fromCollection(list);// 去除tuple3中元素完全一樣的source.distinct().print();
// (1,3,9)
// (0,3,6)
// (1,1,9)
// (1,2,8)
// (0,2,5)// 去除tuple3中第一個元素一樣的,只保留第一個// source.distinct(0).print();
// (1,1,9)
// (0,3,6)// 去除tuple3中第一個和第三個相同的元素,只保留第一個// source.distinct(0,2).print();
// (0,3,6)
// (1,1,9)
// (1,2,8)
// (0,2,5)}public static void distinctFunction2(ExecutionEnvironment env) throws Exception {DataSet<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 18, 3000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 18, 1000), new User(5, "alan1", "5", "5@5.com", 28, 1500), new User(4, "alan2", "4", "4@4.com", 20, 300)));// source.distinct("name").print();
// User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
// User(id=1, name=alan1, pwd=1, email=1@1.com, age=18, balance=3000.0)source.distinct("name", "age").print();
// User(id=1, name=alan1, pwd=1, email=1@1.com, age=18, balance=3000.0)
// User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
// User(id=5, name=alan1, pwd=5, email=5@5.com, age=28, balance=1500.0)
// User(id=4, name=alan2, pwd=4, email=4@4.com, age=20, balance=300.0)}public static void distinctFunction3(ExecutionEnvironment env) throws Exception {DataSet<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 18, -1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 18, -1000), new User(5, "alan1", "5", "5@5.com", 28, 1500), new User(4, "alan2", "4", "4@4.com", 20, -300)));// 針對balance增加絕對值去重source.distinct(new KeySelector<User, Double>() {@Overridepublic Double getKey(User value) throws Exception {return Math.abs(value.getBalance());}}).print();
// User(id=5, name=alan1, pwd=5, email=5@5.com, age=28, balance=1500.0)
// User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
// User(id=1, name=alan1, pwd=1, email=1@1.com, age=18, balance=-1000.0)
// User(id=4, name=alan2, pwd=4, email=4@4.com, age=20, balance=-300.0)}public static void distinctFunction4(ExecutionEnvironment env) throws Exception {List<String> info = new ArrayList<>();info.add("Hadoop,Spark");info.add("Spark,Flink");info.add("Hadoop,Flink");info.add("Hadoop,Flink");DataSet<String> source = env.fromCollection(info);source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {System.err.print("come in ");for (String token : value.split(",")) {out.collect(token);}}});source.distinct().print();}}
10、Window
KeyedStream → WindowedStream
Window 函數允許按時間或其他條件對現有 KeyedStream 進行分組。 以下是以 10 秒的時間窗口聚合:
inputStream.keyBy(0).window(Time.seconds(10));
Flink 定義數據片段以便(可能)處理無限數據流。 這些切片稱為窗口。 此切片有助于通過應用轉換處理數據塊。 要對流進行窗口化,需要分配一個可以進行分發的鍵和一個描述要對窗口化流執行哪些轉換的函數。要將流切片到窗口,可以使用 Flink 自帶的窗口分配器。 我們有選項,如 tumbling windows, sliding windows, global 和 session windows。
具體參考系列文章
6、Flink四大基石之Window詳解與詳細示例(一)
6、Flink四大基石之Window詳解與詳細示例(二)
7、Flink四大基石之Time和WaterMaker詳解與詳細示例(watermaker基本使用、kafka作為數據源的watermaker使用示例以及超出最大允許延遲數據的接收實現)
11、WindowAll
DataStream → AllWindowedStream
windowAll 函數允許對常規數據流進行分組。 通常,這是非并行數據轉換,因為它在非分區數據流上運行。
與常規數據流功能類似,也有窗口數據流功能。 唯一的區別是它們處理窗口數據流。 所以窗口縮小就像 Reduce 函數一樣,Window fold 就像 Fold 函數一樣,并且還有聚合。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
這適用于非并行轉換的大多數場景。所有記錄都將收集到 windowAll 算子對應的一個任務中。
具體參考系列文章
6、Flink四大基石之Window詳解與詳細示例(一)
6、Flink四大基石之Window詳解與詳細示例(二)
7、Flink四大基石之Time和WaterMaker詳解與詳細示例(watermaker基本使用、kafka作為數據源的watermaker使用示例以及超出最大允許延遲數據的接收實現)
12、Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
將通用 function 應用于整個窗口。下面是一個手動對窗口內元素求和的 function。
如果你使用 windowAll 轉換,則需要改用 AllWindowFunction。
windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {public void apply (Tuple tuple,Window window,Iterable<Tuple2<String, Integer>> values,Collector<Integer> out) throws Exception {int sum = 0;for (value t: values) {sum += t.f1;}out.collect (new Integer(sum));}
});// 在 non-keyed 窗口流上應用 AllWindowFunction
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {public void apply (Window window,Iterable<Tuple2<String, Integer>> values,Collector<Integer> out) throws Exception {int sum = 0;for (value t: values) {sum += t.f1;}out.collect (new Integer(sum));}
});
13、Window Reduce
WindowedStream → DataStream
對窗口應用 reduce function 并返回 reduce 后的值。
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);}
});
14、Aggregations on windows
WindowedStream → DataStream
聚合窗口的內容。min和minBy之間的區別在于,min返回最小值,而minBy返回該字段中具有最小值的元素(max和maxBy相同)。
windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
15、Union
Union 函數將兩個或多個數據流結合在一起。 這樣就可以并行地組合數據流。 如果我們將一個流與自身組合,那么它會輸出每個記錄兩次。
public static void unionFunction(ExecutionEnvironment env) throws Exception {//Produces the union of two DataSets, which have to be of the same type. A union of more than two DataSets can be implemented with multiple union callsList<String> info1 = new ArrayList<>();info1.add("team A");info1.add("team B");List<String> info2 = new ArrayList<>();info2.add("team C");info2.add("team D");List<String> info3 = new ArrayList<>();info3.add("team E");info3.add("team F");List<String> info4 = new ArrayList<>();info4.add("team G");info4.add("team H");DataSet<String> source1 = env.fromCollection(info1);DataSet<String> source2 = env.fromCollection(info2);DataSet<String> source3 = env.fromCollection(info3);DataSet<String> source4 = env.fromCollection(info4);source1.union(source2).union(source3).union(source4).print();
// team A
// team C
// team E
// team G
// team B
// team D
// team F
// team H}
16、Window Join
DataStream,DataStream → DataStream
可以通過一些 key 將同一個 window 的兩個數據流 join 起來。
在 5 秒的窗口中連接兩個流,其中第一個流的第一個屬性的連接條件等于另一個流的第二個屬性
inputStream.join(inputStream1).where(0).equalTo(1).window(Time.seconds(5)) .apply (new JoinFunction () {...});inputStream.join(otherStream).where(<key selector>).equalTo(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new JoinFunction () {...});
具體介紹參考文章:
【flink番外篇】2、flink的18種算子window join 和interval join 介紹及詳細示例
17、Interval Join
KeyedStream,KeyedStream → DataStream
根據 key 相等并且滿足指定的時間范圍內(e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound)的條件將分別屬于兩個 keyed stream 的元素 e1 和 e2 Join 在一起。
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream).between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound.upperBoundExclusive(true) // optional.lowerBoundExclusive(true) // optional.process(new IntervalJoinFunction() {...});
具體介紹參考文章:
【flink番外篇】2、flink的18種算子window join 和interval join 介紹及詳細示例
18、Window CoGroup
DataStream,DataStream → DataStream
根據指定的 key 和窗口將兩個數據流組合在一起。
dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new CoGroupFunction () {...});
19、Connect
DataStream,DataStream → ConnectedStreams
connect提供了和union類似的功能,用來連接兩個數據流,它與union的區別在于:
- connect只能連接兩個數據流,union可以連接多個數據流。
- connect所連接的兩個數據流的數據類型可以不一致,union所連接的兩個數據流的數據類型必須一致。
兩個DataStream經過connect之后被轉化為ConnectedStreams,ConnectedStreams會對兩個流的數據應用不同的處理方法,且雙流之間可以共享狀態。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;/*** @author alanchan
* union算子可以合并多個同類型的數據流,并生成同類型的數據流,即可以將多個DataStream[T]合并為一個新的DataStream[T]。數據將按照先進先出(First
* In First Out)的模式合并,且不去重。 connect只能連接兩個數據流,union可以連接多個數據流。
* connect所連接的兩個數據流的數據類型可以不一致,union所連接的兩個數據流的數據類型必須一致。
* 兩個DataStream經過connect之后被轉化為ConnectedStreams,ConnectedStreams會對兩個流的數據應用不同的處理方法,且雙流之間可以共享狀態。**/
public class TestConnectDemo {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> ds1 = env.fromElements("i", "am", "alanchan");DataStream<String> ds2 = env.fromElements("i", "like", "flink");DataStream<Long> ds3 = env.fromElements(10L, 20L, 30L);// transformation// 注意union能合并同類型DataStream<String> result1 = ds1.union(ds2);// union不可以合并不同類,直接出錯
// ds1.union(ds3);// connet可以合并同類型ConnectedStreams<String, String> result2 = ds1.connect(ds2);// connet可以合并不同類型ConnectedStreams<String, Long> result3 = ds1.connect(ds3);/** public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable { * OUT map1(IN1 value) throws Exception; * OUT map2(IN2 value) throws Exception;* }*/DataStream<String> result = result3.map(new CoMapFunction<String, Long, String>() {private static final long serialVersionUID = 1L;@Overridepublic String map1(String value) throws Exception {return value + "String";}@Overridepublic String map2(Long value) throws Exception {return value * 2 + "_Long";}});// sinkresult1.print();// connect之后需要做其他的處理,不能直接輸出// result2.print();// result3.print();result.print();// executeenv.execute();}
}
20、CoMap, CoFlatMap
ConnectedStreams → DataStream
類似于在連接的數據流上進行 map 和 flatMap。
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {@Overridepublic Boolean map1(Integer value) {return true;}@Overridepublic Boolean map2(String value) {return false;}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {@Overridepublic void flatMap1(Integer value, Collector<String> out) {out.collect(value.toString());}@Overridepublic void flatMap2(String value, Collector<String> out) {for (String word: value.split(" ")) {out.collect(word);}}
});
21、Iterate
DataStream → IterativeStream → ConnectedStream
通過將一個算子的輸出重定向到某個之前的算子來在流中創建“反饋”循環。這對于定義持續更新模型的算法特別有用。下面的代碼從一個流開始,并不斷地應用迭代自身。大于 0 的元素被發送回反饋通道,其余元素被轉發到下游。
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){@Overridepublic boolean filter(Long value) throws Exception {return value > 0;}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){@Overridepublic boolean filter(Long value) throws Exception {return value <= 0;}
});
22、Cache
DataStream → CachedDataStream
把算子的結果緩存起來。目前只支持批執行模式下運行的作業。算子的結果在算子第一次執行的時候會被緩存起來,之后的 作業中會復用該算子緩存的結果。如果算子的結果丟失了,它會被原來的算子重新計算并緩存。
DataStream<Integer> dataStream = //...
CachedDataStream<Integer> cachedDataStream = dataStream.cache();
cachedDataStream.print(); // Do anything with the cachedDataStream
...
env.execute(); // Execute and create cache.cachedDataStream.print(); // Consume cached result.
env.execute();
23、Split
此功能根據條件將流拆分為兩個或多個流。 當獲得混合流并且可能希望單獨處理每個數據流時,可以使用此方法。新版本使用OutputTag替代。
SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() {@Overridepublic Iterable<String> select(Integer value) {List<String> output = new ArrayList<String>(); if (value % 2 == 0) {output.add("even");}else {output.add("odd");}return output;}
});
OutputTag示例如下
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** @author alanchan**/
public class TestOutpuTagAndProcessDemo {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// SourceDataStreamSource<String> ds = env.fromElements("alanchanchn is my vx", "i like flink", "alanchanchn is my name", "i like kafka too", "alanchanchn is my true vx");// transformation// 對流中的數據按照alanchanchn拆分并選擇OutputTag<String> nameTag = new OutputTag<>("alanchanchn", TypeInformation.of(String.class));OutputTag<String> frameworkTag = new OutputTag<>("framework", TypeInformation.of(String.class));// public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
//
// private static final long serialVersionUID = 1L;
//
// public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
//
// public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
//
// public abstract class Context {
//
// public abstract Long timestamp();
//
// public abstract TimerService timerService();
//
// public abstract <X> void output(OutputTag<X> outputTag, X value);
// }
//
// public abstract class OnTimerContext extends Context {
// public abstract TimeDomain timeDomain();
// }
//
// }SingleOutputStreamOperator<String> result = ds.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String inValue, Context ctx, Collector<String> outValue) throws Exception {// out收集完的還是放在一起的,ctx可以將數據放到不同的OutputTagif (inValue.startsWith("alanchanchn")) {ctx.output(nameTag, inValue);} else {ctx.output(frameworkTag, inValue);}}});DataStream<String> nameResult = result.getSideOutput(nameTag);DataStream<String> frameworkResult = result.getSideOutput(frameworkTag);// .sinkSystem.out.println(nameTag);// OutputTag(Integer, 奇數)System.out.println(frameworkTag);// OutputTag(Integer, 偶數)nameResult.print("name->");frameworkResult.print("framework->");
// OutputTag(String, alanchanchn)
// OutputTag(String, framework)
// framework->> alanchanchn is my vx
// name->> alanchanchn is my name
// framework->> i like flink
// name->> alanchanchn is my true vx
// framework->> i like kafka too// executeenv.execute();}
}
24、Select
此功能允許您從拆分流中選擇特定流。新版本使用OutputTag替代。
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
參考上文中spilt中的outputtag示例。
25、Project
Project 函數允許從事件流中選擇屬性子集,并僅將所選元素發送到下一個處理流。
DataStream<Tuple4<Integer, Double, String, String>> in = // [...]
DataStream<Tuple2<String, String>> out = in.project(3,2);
上述函數從給定記錄中選擇屬性號 2 和 3。 以下是示例輸入和輸出記錄:
(1,10.0,A,B)=> (B,A)
(2,20.0,C,D)=> (D,C)
- 完整示例
import java.util.Arrays;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author alanchan**/
public class TestprojectDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple5<Integer, String, Integer, String,Double>> in = env.fromCollection(Arrays.asList(Tuple5.of(1, "alan", 17, "alan.chan.chn@163.com", 20d),Tuple5.of(2, "alanchan", 18, "alan.chan.chn@163.com", 25d),Tuple5.of(3, "alanchanchn", 19, "alan.chan.chn@163.com", 30d),Tuple5.of(4, "alan_chan", 18, "alan.chan.chn@163.com", 25d),Tuple5.of(5, "alan_chan_chn", 20, "alan.chan.chn@163.com", 30d)));DataStream<Tuple3<String, Integer,Double>> out = in.project(1, 2,4);out.print();
// 8> (alan,17,20.0)
// 11> (alan_chan,18,25.0)
// 12> (alan_chan_chn,20,30.0)
// 10> (alanchanchn,19,30.0)
// 9> (alanchan,18,25.0)env.execute();}}
以上,本文主要介紹Flink 的23種常用的operator及以具體可運行示例進行說明,如果需要了解更多內容,可以在本人Flink 專欄中了解更新系統的內容。
本專題分為五篇,即:
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(1)- map、flatmap和filter
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(2)- keyby、reduce和Aggregations
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(3)-window、distinct、join等
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(4)- union、window join、connect、outputtag、cache、iterator、project
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(完整版)