【flink番外篇】1、flink的23種常用算子介紹及詳細示例(完整版)

Flink 系列文章

一、Flink 專欄

Flink 專欄系統介紹某一知識點,并輔以具體的示例進行說明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關基礎內容。

  • 2、Flink基礎系列
    本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。

  • 3、Flik Table API和SQL基礎系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創建庫、表用法、查詢、窗口函數、catalog等等內容。

  • 4、Flik Table API和SQL提高與應用系列
    本部分是table api 和sql的應用部分,和實際的生產應用聯系更為密切,以及有一定開發難度的內容。

  • 5、Flink 監控系列
    本部分和實際的運維、監控工作相關。

二、Flink 示例專欄

Flink 示例專欄是 Flink 專欄的輔助說明,一般不會介紹知識點的信息,更多的是提供一個一個可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內容。

兩專欄的所有文章入口點擊:Flink 系列文章匯總索引


文章目錄

  • Flink 系列文章
  • 一、Flink的23種算子說明及示例
    • 1、maven依賴
    • 2、java bean
    • 3、map
    • 4、flatmap
    • 5、Filter
    • 6、KeyBy
    • 7、Reduce
    • 8、Aggregations
    • 9、first、distinct、join、outjoin、cross
    • 10、Window
    • 11、WindowAll
    • 12、Window Apply
    • 13、Window Reduce
    • 14、Aggregations on windows
    • 15、Union
    • 16、Window Join
    • 17、Interval Join
    • 18、Window CoGroup
    • 19、Connect
    • 20、CoMap, CoFlatMap
    • 21、Iterate
    • 22、Cache
    • 23、Split
    • 24、Select
    • 25、Project


本文主要介紹Flink 的23種常用的operator及以具體可運行示例進行說明,如果需要了解更多內容,可以在本人Flink 專欄中了解更新系統的內容。
本文除了maven依賴外,沒有其他依賴。

本專題分為五篇,即:
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(1)- map、flatmap和filter
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(2)- keyby、reduce和Aggregations
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(3)-window、distinct、join等
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(4)- union、window join、connect、outputtag、cache、iterator、project
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(完整版)

一、Flink的23種算子說明及示例

1、maven依賴

下文中所有示例都是用該maven依賴,除非有特殊說明的情況。

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.4</version></dependency></dependencies>

2、java bean

下文所依賴的User如下

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {private int id;private String name;private String pwd;private String email;private int age;private double balance;
}

3、map

[DataStream->DataStream]
這是最簡單的轉換之一,其中輸入是一個數據流,輸出的也是一個數據流。
在這里插入圖片描述

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestMapDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// source// transformationmapFunction5(env);// sink// executeenv.execute();}// 構造一個list,然后將list中數字乘以2輸出,內部匿名類實現public static void mapFunction1(StreamExecutionEnvironment env) throws Exception {List<Integer> data = new ArrayList<Integer>();for (int i = 1; i <= 10; i++) {data.add(i);}DataStreamSource<Integer> source = env.fromCollection(data);SingleOutputStreamOperator<Integer> sink = source.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer inValue) throws Exception {return inValue * 2;}});sink.print();
//		9> 12
//		4> 2
//		10> 14
//		8> 10
//		13> 20
//		7> 8
//		12> 18
//		11> 16
//		5> 4
//		6> 6}// 構造一個list,然后將list中數字乘以2輸出,lambda實現public static void mapFunction2(StreamExecutionEnvironment env) throws Exception {List<Integer> data = new ArrayList<Integer>();for (int i = 1; i <= 10; i++) {data.add(i);}DataStreamSource<Integer> source = env.fromCollection(data);SingleOutputStreamOperator<Integer> sink = source.map(i -> 2 * i);sink.print();
//		3> 4
//		4> 6
//		9> 16
//		7> 12
//		10> 18
//		2> 2
//		6> 10
//		5> 8
//		8> 14
//		11> 20}// 構造User數據源public static DataStreamSource<User> source(StreamExecutionEnvironment env) {DataStreamSource<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 12, 1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 28, 1500), new User(5, "alan1", "5", "5@5.com", 15, 500), new User(4, "alan2", "4", "4@4.com", 30, 400)));return source;}// lambda實現用戶對象的balance×2和age+5功能public static SingleOutputStreamOperator<User> mapFunction3(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);SingleOutputStreamOperator<User> sink = source.map((MapFunction<User, User>) user -> {User user2 = user;user2.setAge(user.getAge() + 5);user2.setBalance(user.getBalance() * 2);return user2;});sink.print();
//		10> User(id=1, name=alan1, pwd=1, email=1@1.com, age=17, balance=2000.0)
//		14> User(id=4, name=alan2, pwd=4, email=4@4.com, age=35, balance=800.0)
//		11> User(id=2, name=alan2, pwd=2, email=2@2.com, age=24, balance=400.0)
//		12> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
//		13> User(id=5, name=alan1, pwd=5, email=5@5.com, age=20, balance=1000.0)return sink;}// lambda實現balance*2和age+5后,balance》=2000和age》=20的數據過濾出來public static SingleOutputStreamOperator<User> mapFunction4(StreamExecutionEnvironment env) throws Exception {SingleOutputStreamOperator<User> sink = mapFunction3(env).filter(user -> user.getBalance() >= 2000 && user.getAge() >= 20);sink.print();
//		15> User(id=1, name=alan1, pwd=1, email=1@1.com, age=17, balance=2000.0)
//		1> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
//		16> User(id=2, name=alan2, pwd=2, email=2@2.com, age=24, balance=400.0)
//		3> User(id=4, name=alan2, pwd=4, email=4@4.com, age=35, balance=800.0)
//		2> User(id=5, name=alan1, pwd=5, email=5@5.com, age=20, balance=1000.0)
//		1> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)return sink;}// lambda實現balance*2和age+5后,balance》=2000和age》=20的數據過濾出來并通過flatmap收集public static SingleOutputStreamOperator<User> mapFunction5(StreamExecutionEnvironment env) throws Exception {SingleOutputStreamOperator<User> sink = mapFunction4(env).flatMap((FlatMapFunction<User, User>) (user, out) -> {if (user.getBalance() >= 3000) {out.collect(user);}}).returns(User.class);sink.print();
//		8> User(id=5, name=alan1, pwd=5, email=5@5.com, age=20, balance=1000.0)
//		7> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
//		6> User(id=2, name=alan2, pwd=2, email=2@2.com, age=24, balance=400.0)
//		9> User(id=4, name=alan2, pwd=4, email=4@4.com, age=35, balance=800.0)
//		5> User(id=1, name=alan1, pwd=1, email=1@1.com, age=17, balance=2000.0)
//		7> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
//		7> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)return sink;}}

4、flatmap

[DataStream->DataStream]
FlatMap 采用一條記錄并輸出零個,一個或多個記錄。將集合中的每個元素變成一個或多個元素,并返回扁平化之后的結果。
在這里插入圖片描述

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestFlatMapDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();flatMapFunction3(env);env.execute();}// 構造User數據源public static DataStreamSource<String> source(StreamExecutionEnvironment env) {List<String> info = new ArrayList<>();info.add("i am alanchan");info.add("i like hadoop");info.add("i like flink");info.add("and you ?");DataStreamSource<String> dataSource = env.fromCollection(info);return dataSource;}// 將句子以空格進行分割-內部匿名類實現public static void flatMapFunction1(StreamExecutionEnvironment env) throws Exception {DataStreamSource<String> source = source(env);SingleOutputStreamOperator<String> sink = source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] splits = value.split(" ");for (String split : splits) {out.collect(split);}}});sink.print();
//		11> and
//		10> i
//		8> i
//		9> i
//		8> am
//		10> like
//		11> you
//		10> flink
//		8> alanchan
//		9> like
//		11> ?
//		9> hadoop}// lambda實現public static void flatMapFunction2(StreamExecutionEnvironment env) throws Exception {DataStreamSource<String> source = source(env);SingleOutputStreamOperator<String> sink = source.flatMap((FlatMapFunction<String, String>) (input, out) -> {String[] splits = input.split(" ");for (String split : splits) {out.collect(split);}}).returns(String.class);sink.print();
//		6> i
//		8> and
//		8> you
//		8> ?
//		5> i
//		7> i
//		5> am
//		5> alanchan
//		6> like
//		7> like
//		6> hadoop
//		7> flink}// lambda實現public static void flatMapFunction3(StreamExecutionEnvironment env) throws Exception {DataStreamSource<String> source = source(env);SingleOutputStreamOperator<String> sink = source.flatMap((String input, Collector<String> out) -> Arrays.stream(input.split(" ")).forEach(out::collect)).returns(String.class);sink.print();
//		8> i
//		11> and
//		10> i
//		9> i
//		10> like
//		11> you
//		8> am
//		11> ?
//		10> flink
//		9> like
//		9> hadoop
//		8> alanchan}}

5、Filter

DataStream → DataStream
Filter 函數根據條件判斷出結果。按照指定的條件對集合中的元素進行過濾,過濾出返回true/符合條件的元素。
在這里插入圖片描述

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestFilterDemo {// 構造User數據源public static DataStreamSource<User> sourceUser(StreamExecutionEnvironment env) {DataStreamSource<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 12, 1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 28, 1500), new User(5, "alan1", "5", "5@5.com", 15, 500), new User(4, "alan2", "4", "4@4.com", 30, 400)));return source;}// 構造User數據源public static DataStreamSource<Integer> sourceList(StreamExecutionEnvironment env) {List<Integer> data = new ArrayList<Integer>();for (int i = 1; i <= 10; i++) {data.add(i);}DataStreamSource<Integer> source = env.fromCollection(data);return source;}// 過濾出大于5的數字,內部匿名類public static void filterFunction1(StreamExecutionEnvironment env) throws Exception {DataStream<Integer> source = sourceList(env);SingleOutputStreamOperator<Integer> sink = source.map(new MapFunction<Integer, Integer>() {public Integer map(Integer value) throws Exception {return value + 1;}}).filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value > 5;}});sink.print();
//		1> 10
//		14> 7
//		16> 9
//		13> 6
//		2> 11
//		15> 8}// lambda實現public static void filterFunction2(StreamExecutionEnvironment env) throws Exception {DataStream<Integer> source = sourceList(env);SingleOutputStreamOperator<Integer> sink = source.map(i -> i + 1).filter(value -> value > 5);sink.print();
//		12> 7
//		15> 10
//		11> 6
//		13> 8
//		14> 9
//		16> 11}// 查詢user id大于3的記錄public static void filterFunction3(StreamExecutionEnvironment env) throws Exception {DataStream<User> source = sourceUser(env);SingleOutputStreamOperator<User> sink = source.filter(user -> user.getId() > 3);sink.print();
//		14> User(id=5, name=alan1, pwd=5, email=5@5.com, age=15, balance=500.0)
//		15> User(id=4, name=alan2, pwd=4, email=4@4.com, age=30, balance=400.0)}/*** @param args*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();filterFunction3(env);env.execute();}}

6、KeyBy

DataStream → KeyedStream
按照指定的key來對流中的數據進行分組
在這里插入圖片描述
KeyBy 在邏輯上是基于 key 對流進行分區。在內部,它使用 hash 函數對流進行分區。它返回 KeyedDataStream 數據流。將同一Key的數據放到同一個分區。

分區結果和KeyBy下游算子的并行度強相關。如下游算子只有一個并行度,不管怎么分,都會分到一起。
對于POJO類型,KeyBy可以通過keyBy(fieldName)指定字段進行分區。
對于Tuple類型,KeyBy可以通過keyBy(fieldPosition)指定字段進行分區。
對于一般類型,如上,KeyBy可以通過keyBy(new KeySelector {…})指定字段進行分區。

import java.util.Arrays;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestKeyByDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//		env.setParallelism(4);// 設置數據分區數量keyByFunction6(env);env.execute();}// 構造User數據源public static DataStreamSource<User> source(StreamExecutionEnvironment env) {DataStreamSource<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 12, 1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 28, 1500), new User(5, "alan1", "5", "5@5.com", 15, 500), new User(4, "alan2", "4", "4@4.com", 30, 400)));return source;}// 按照name進行keyby 對于POJO類型,KeyBy可以通過keyBy(fieldName)指定字段進行分區public static void keyByFunction1(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);KeyedStream<User, String> sink = source.keyBy(new KeySelector<User, String>() {@Overridepublic String getKey(User value) throws Exception {return value.getName();}});sink.map(user -> {System.out.println("當前線程ID:" + Thread.currentThread().getId() + ",user:" + user.toString());return user;});sink.print();}// lambda 對于POJO類型,KeyBy可以通過keyBy(fieldName)指定字段進行分區public static void keyByFunction2(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);KeyedStream<User, String> sink = source.keyBy(user -> user.getName());// 演示keyby后的數據輸出sink.map(user -> {System.out.println("當前線程ID:" + Thread.currentThread().getId() + ",user:" + user.toString());return user;});sink.print();}// 對于Tuple類型,KeyBy可以通過keyBy(fieldPosition)指定字段進行分區。lambdapublic static void keyByFunction3(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);SingleOutputStreamOperator<Tuple2<String, User>> userTemp = source.map((MapFunction<User, Tuple2<String, User>>) user -> {return new Tuple2<String, User>(user.getName(), user);}).returns(Types.TUPLE(Types.STRING, Types.POJO(User.class)));KeyedStream<Tuple2<String, User>, Tuple> sink = userTemp.keyBy(0);// 演示keyby后的數據輸出sink.map(user -> {System.out.println("當前線程ID:" + Thread.currentThread().getId() + ",user:" + user.f1.toString());return user.f1;});sink.print();}// 對于Tuple類型,KeyBy可以通過keyBy(fieldPosition)指定字段進行分區。public static void keyByFunction4(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);SingleOutputStreamOperator<Tuple2<String, User>> userTemp = source.map(new MapFunction<User, Tuple2<String, User>>() {@Overridepublic Tuple2<String, User> map(User value) throws Exception {return new Tuple2<String, User>(value.getName(), value);}});KeyedStream<Tuple2<String, User>, String> sink = userTemp.keyBy(new KeySelector<Tuple2<String, User>, String>() {@Overridepublic String getKey(Tuple2<String, User> value) throws Exception {return value.f0;}});// 演示keyby后的數據輸出sink.map(user -> {System.out.println("當前線程ID:" + Thread.currentThread().getId() + ",user:" + user.f1.toString());return user.f1;});//		sink.map(new MapFunction<Tuple2<String, User>, String>() {
//
//			@Override
//			public String map(Tuple2<String, User> value) throws Exception {
//				System.out.println("當前線程ID:" + Thread.currentThread().getId() + ",user:" + value.f1.toString());
//				return null;
//			}});sink.print();}// 對于一般類型,如上,KeyBy可以通過keyBy(new KeySelector {...})指定字段進行分區。// 按照name的前4位進行keybypublic static void keyByFunction5(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);KeyedStream<User, String> sink = source.keyBy(new KeySelector<User, String>() {@Overridepublic String getKey(User value) throws Exception {
//				String temp = value.getName().substring(0, 4);return value.getName().substring(0, 4);}});sink.map(user -> {System.out.println("當前線程ID:" + Thread.currentThread().getId() + ",user:" + user.toString());return user;});sink.print();}// 對于一般類型,如上,KeyBy可以通過keyBy(new KeySelector {...})指定字段進行分區。 lambda// 按照name的前4位進行keybypublic static void keyByFunction6(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);KeyedStream<User, String> sink = source.keyBy(user -> user.getName().substring(0, 4));sink.map(user -> {System.out.println("當前線程ID:" + Thread.currentThread().getId() + ",user:" + user.toString());return user;});sink.print();}}

7、Reduce

KeyedStream → DataStream
對集合中的元素進行聚合。Reduce 返回單個的結果值,并且 reduce 操作每處理一個元素總是創建一個新值。常用的方法有 average, sum, min, max, count,使用 reduce 方法都可實現。基于ReduceFunction進行滾動聚合,并向下游算子輸出每次滾動聚合后的結果。
注意: Reduce會輸出每一次滾動聚合的結果。
在這里插入圖片描述

import java.util.Arrays;import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//		env.setParallelism(4);// 設置數據分區數量reduceFunction2(env);env.execute();}// 構造User數據源public static DataStreamSource<User> source(StreamExecutionEnvironment env) {DataStreamSource<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 12, 1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 28, 1500), new User(5, "alan1", "5", "5@5.com", 15, 500), new User(4, "alan2", "4", "4@4.com", 30, 400)));return source;}// 按照name進行balance進行sumpublic static void reduceFunction1(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);KeyedStream<User, String> keyedStream = source.keyBy(user -> user.getName());SingleOutputStreamOperator<User> sink = keyedStream.reduce(new ReduceFunction<User>() {@Overridepublic User reduce(User value1, User value2) throws Exception {double balance = value1.getBalance() + value2.getBalance();return new User(value1.getId(), value1.getName(), "", "", 0, balance);}});//sink.print();}// 按照name進行balance進行sum lambdapublic static void reduceFunction2(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);KeyedStream<User, String> userKeyBy = source.keyBy(user -> user.getName());SingleOutputStreamOperator<User> sink = userKeyBy.reduce((user1, user2) -> {User user = user1;user.setBalance(user1.getBalance() + user2.getBalance());return user;});sink.print();}}

8、Aggregations

KeyedStream → DataStream
DataStream API 支持各種聚合,例如 min,max,sum 等。 這些函數可以應用于 KeyedStream 以獲得 Aggregations 聚合。
Aggregate 對KeyedStream按指定字段滾動聚合并輸出每一次滾動聚合后的結果。默認的聚合函數有:sum、min、minBy、max、maxBy。
注意:
max(field)與maxBy(field)的區別: maxBy返回field最大的那條數據;而max則是將最大的field的值賦值給第一條數據并返回第一條數據。同理,min與minBy。
Aggregate聚合算子會滾動輸出每一次聚合后的結果
max 和 maxBy 之間的區別在于 max 返回流中的最大值,但 maxBy 返回具有最大值的鍵, min 和 minBy 同理。
max以第一個比較對象的比較列值進行替換,maxBy是以整個比較對象進行替換。


import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestAggregationsDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();aggregationsFunction2(env);env.execute();}// 構造User數據源public static DataStreamSource<User> source(StreamExecutionEnvironment env) {DataStreamSource<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 12, 1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 28, 1500), new User(5, "alan1", "5", "5@5.com", 15, 500), new User(4, "alan2", "4", "4@4.com", 30, 400)));return source;}//分組統計sum、max、min、maxby、minbypublic static void aggregationsFunction(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);KeyedStream<User, String> userTemp=	source.keyBy(user->user.getName());DataStream sink = null;//1、根據name進行分區統計balance之和 alan1----2500/alan2----600
//		16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
//		1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
//		16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=2500.0)
//		1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=600.0)
//		16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=3000.0)sink = userTemp.sum("balance");//2、根據name進行分區統計balance的max alan1----1500/alan2----400
//		 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
//		 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
//		 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1500.0)
//		 1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=400.0)
//		 16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1500.0)sink = userTemp.max("balance");//1@1.com-3000 --  2@2.com-300//3、根據name進行分區統計balance的min  alan1----500/alan2---200
//		16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
//		16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
//		1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
//		16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=500.0)
//		1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)sink = userTemp.min("balance");//4、根據name進行分區統計balance的maxBy alan2----400/alan1----1500
//		1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
//		1> User(id=4, name=alan2, pwd=4, email=4@4.com, age=30, balance=400.0)
//		16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
//		16> User(id=3, name=alan1, pwd=3, email=3@3.com, age=28, balance=1500.0)
//		16> User(id=3, name=alan1, pwd=3, email=3@3.com, age=28, balance=1500.0)sink = userTemp.maxBy("balance");//5、根據name進行分區統計balance的minBy alan2----200/alan1----500
//		1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
//		1> User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
//		16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
//		16> User(id=1, name=alan1, pwd=1, email=1@1.com, age=12, balance=1000.0)
//		16> User(id=5, name=alan1, pwd=5, email=5@5.com, age=15, balance=500.0)sink = userTemp.minBy("balance");sink.print();}public static void aggregationsFunction2(StreamExecutionEnvironment env) throws Exception {List list = new ArrayList<Tuple3<Integer, Integer, Integer>>();list.add(new Tuple3<>(0,3,6));list.add(new Tuple3<>(0,2,5));list.add(new Tuple3<>(0,1,6));list.add(new Tuple3<>(0,4,3));list.add(new Tuple3<>(1,1,9));list.add(new Tuple3<>(1,2,8));list.add(new Tuple3<>(1,3,10));list.add(new Tuple3<>(1,2,9));list.add(new Tuple3<>(1,5,7));DataStreamSource<Tuple3<Integer, Integer, Integer>> source = env.fromCollection(list);KeyedStream<Tuple3<Integer, Integer, Integer>, Integer> tTemp=  source.keyBy(t->t.f0);DataStream<Tuple3<Integer, Integer, Integer>> sink =null;//按照分區,以第一個Tuple3的元素為基礎進行第三列值比較,如果第三列值小于第一個tuple3的第三列值,則進行第三列值替換,其他的不變
//        12> (0,3,6)
//        11> (1,1,9)
//        11> (1,1,8)
//        12> (0,3,5)
//        11> (1,1,8)
//        12> (0,3,5)
//        11> (1,1,8)
//        12> (0,3,3)
//        11> (1,1,7)  sink =  tTemp.min(2);//     按照數據分區,以第一個tuple3的元素為基礎進行第三列值比較,如果第三列值小于第一個tuple3的第三列值,則進行整個tuple3的替換
//     12> (0,3,6)
//     11> (1,1,9)
//     12> (0,2,5)
//     11> (1,2,8)
//     12> (0,2,5)
//     11> (1,2,8)
//     12> (0,4,3)
//     11> (1,2,8)
//     11> (1,5,7)sink = tTemp.minBy(2);sink.print();}}

9、first、distinct、join、outjoin、cross

具體事例詳見例子及結果。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestFirst_Join_Distinct_OutJoin_CrossDemo {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();joinFunction(env);env.execute();}public static void unionFunction(StreamExecutionEnvironment env) throws Exception {List<String> info1 = new ArrayList<>();info1.add("team A");info1.add("team B");List<String> info2 = new ArrayList<>();info2.add("team C");info2.add("team D");List<String> info3 = new ArrayList<>();info3.add("team E");info3.add("team F");List<String> info4 = new ArrayList<>();info4.add("team G");info4.add("team H");DataStream<String> source1 = env.fromCollection(info1);DataStream<String> source2 = env.fromCollection(info2);DataStream<String> source3 = env.fromCollection(info3);DataStream<String> source4 = env.fromCollection(info4);source1.union(source2).union(source3).union(source4).print();
//        team A
//        team C
//        team E
//        team G
//        team B
//        team D
//        team F
//        team H}public static void crossFunction(ExecutionEnvironment env) throws Exception {// cross,求兩個集合的笛卡爾積,得到的結果數為:集合1的條數 乘以 集合2的條數List<String> info1 = new ArrayList<>();info1.add("team A");info1.add("team B");List<Tuple2<String, Integer>> info2 = new ArrayList<>();info2.add(new Tuple2("W", 3));info2.add(new Tuple2("D", 1));info2.add(new Tuple2("L", 0));DataSource<String> data1 = env.fromCollection(info1);DataSource<Tuple2<String, Integer>> data2 = env.fromCollection(info2);data1.cross(data2).print();
//        (team A,(W,3))
//        (team A,(D,1))
//        (team A,(L,0))
//        (team B,(W,3))
//        (team B,(D,1))
//        (team B,(L,0))}public static void outerJoinFunction(ExecutionEnvironment env) throws Exception {// Outjoin,跟sql語句中的left join,right join,full join意思一樣// leftOuterJoin,跟join一樣,但是左邊集合的沒有關聯上的結果也會取出來,沒關聯上的右邊為null// rightOuterJoin,跟join一樣,但是右邊集合的沒有關聯上的結果也會取出來,沒關聯上的左邊為null// fullOuterJoin,跟join一樣,但是兩個集合沒有關聯上的結果也會取出來,沒關聯上的一邊為nullList<Tuple2<Integer, String>> info1 = new ArrayList<>();info1.add(new Tuple2<>(1, "shenzhen"));info1.add(new Tuple2<>(2, "guangzhou"));info1.add(new Tuple2<>(3, "shanghai"));info1.add(new Tuple2<>(4, "chengdu"));List<Tuple2<Integer, String>> info2 = new ArrayList<>();info2.add(new Tuple2<>(1, "深圳"));info2.add(new Tuple2<>(2, "廣州"));info2.add(new Tuple2<>(3, "上海"));info2.add(new Tuple2<>(5, "杭州"));DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1);DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2);// left join
//        eft join:7> (1,shenzhen,深圳)
//        left join:2> (3,shanghai,上海)
//        left join:8> (4,chengdu,未知)
//        left join:16> (2,guangzhou,廣州)data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {Tuple3<Integer, String, String> tuple = new Tuple3();if (second == null) {tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField("未知", 2);} else {// 另外一種賦值方式,和直接用構造函數賦值相同tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField(second.f1, 2);}return tuple;}}).print("left join");// right join
//        right join:2> (3,shanghai,上海)
//        right join:7> (1,shenzhen,深圳)
//        right join:15> (5,--,杭州)
//        right join:16> (2,guangzhou,廣州)data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {Tuple3<Integer, String, String> tuple = new Tuple3();if (first == null) {tuple.setField(second.f0, 0);tuple.setField("--", 1);tuple.setField(second.f1, 2);} else {// 另外一種賦值方式,和直接用構造函數賦值相同tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField(second.f1, 2);}return tuple;}}).print("right join");// fullOuterJoin
//        fullOuterJoin:2> (3,shanghai,上海)
//        fullOuterJoin:8> (4,chengdu,--)
//        fullOuterJoin:15> (5,--,杭州)
//        fullOuterJoin:16> (2,guangzhou,廣州)
//        fullOuterJoin:7> (1,shenzhen,深圳)data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {Tuple3<Integer, String, String> tuple = new Tuple3();if (second == null) {tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField("--", 2);} else if (first == null) {tuple.setField(second.f0, 0);tuple.setField("--", 1);tuple.setField(second.f1, 2);} else {// 另外一種賦值方式,和直接用構造函數賦值相同tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField(second.f1, 2);}return tuple;}}).print("fullOuterJoin");}public static void joinFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info1 = new ArrayList<>();info1.add(new Tuple2<>(1, "shenzhen"));info1.add(new Tuple2<>(2, "guangzhou"));info1.add(new Tuple2<>(3, "shanghai"));info1.add(new Tuple2<>(4, "chengdu"));List<Tuple2<Integer, String>> info2 = new ArrayList<>();info2.add(new Tuple2<>(1, "深圳"));info2.add(new Tuple2<>(2, "廣州"));info2.add(new Tuple2<>(3, "上海"));info2.add(new Tuple2<>(5, "杭州"));DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1);DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2);////        join:2> ((3,shanghai),(3,上海))
//        join:16> ((2,guangzhou),(2,廣州))
//        join:7> ((1,shenzhen),(1,深圳))data1.join(data2).where(0).equalTo(0).print("join");//        join2:2> (3,上海,shanghai)
//        join2:7> (1,深圳,shenzhen)
//        join2:16> (2,廣州,guangzhou)DataSet<Tuple3<Integer, String, String>> data3 = data1.join(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {return new Tuple3<Integer, String, String>(first.f0, second.f1, first.f1);}});data3.print("join2");}public static void firstFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info = new ArrayList<>();info.add(new Tuple2(1, "Hadoop"));info.add(new Tuple2(1, "Spark"));info.add(new Tuple2(1, "Flink"));info.add(new Tuple2(2, "Scala"));info.add(new Tuple2(2, "Java"));info.add(new Tuple2(2, "Python"));info.add(new Tuple2(3, "Linux"));info.add(new Tuple2(3, "Window"));info.add(new Tuple2(3, "MacOS"));DataSet<Tuple2<Integer, String>> dataSet = env.fromCollection(info);// 前幾個
//	        dataSet.first(4).print();
//	        (1,Hadoop)
//	        (1,Spark)
//	        (1,Flink)
//	        (2,Scala)// 按照tuple2的第一個元素進行分組,查出每組的前2個
//	        dataSet.groupBy(0).first(2).print();
//	        (3,Linux)
//	        (3,Window)
//	        (1,Hadoop)
//	        (1,Spark)
//	        (2,Scala)
//	        (2,Java)// 按照tpule2的第一個元素進行分組,并按照倒序排列,查出每組的前2個dataSet.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();
//	        (3,Window)
//	        (3,MacOS)
//	        (1,Spark)
//	        (1,Hadoop)
//	        (2,Scala)
//	        (2,Python)}public static void distinctFunction(ExecutionEnvironment env) throws Exception {List list = new ArrayList<Tuple3<Integer, Integer, Integer>>();list.add(new Tuple3<>(0, 3, 6));list.add(new Tuple3<>(0, 2, 5));list.add(new Tuple3<>(0, 3, 6));list.add(new Tuple3<>(1, 1, 9));list.add(new Tuple3<>(1, 2, 8));list.add(new Tuple3<>(1, 2, 8));list.add(new Tuple3<>(1, 3, 9));DataSet<Tuple3<Integer, Integer, Integer>> source = env.fromCollection(list);// 去除tuple3中元素完全一樣的source.distinct().print();
//		(1,3,9)
//		(0,3,6)
//		(1,1,9)
//		(1,2,8)
//		(0,2,5)// 去除tuple3中第一個元素一樣的,只保留第一個// source.distinct(0).print();
//		(1,1,9)
//		(0,3,6)// 去除tuple3中第一個和第三個相同的元素,只保留第一個// source.distinct(0,2).print();
//		(0,3,6)
//		(1,1,9)
//		(1,2,8)
//		(0,2,5)}public static void distinctFunction2(ExecutionEnvironment env) throws Exception {DataSet<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 18, 3000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 18, 1000), new User(5, "alan1", "5", "5@5.com", 28, 1500), new User(4, "alan2", "4", "4@4.com", 20, 300)));//		source.distinct("name").print();
//		User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
//		User(id=1, name=alan1, pwd=1, email=1@1.com, age=18, balance=3000.0)source.distinct("name", "age").print();
//		User(id=1, name=alan1, pwd=1, email=1@1.com, age=18, balance=3000.0)
//		User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
//		User(id=5, name=alan1, pwd=5, email=5@5.com, age=28, balance=1500.0)
//		User(id=4, name=alan2, pwd=4, email=4@4.com, age=20, balance=300.0)}public static void distinctFunction3(ExecutionEnvironment env) throws Exception {DataSet<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 18, -1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 18, -1000), new User(5, "alan1", "5", "5@5.com", 28, 1500), new User(4, "alan2", "4", "4@4.com", 20, -300)));// 針對balance增加絕對值去重source.distinct(new KeySelector<User, Double>() {@Overridepublic Double getKey(User value) throws Exception {return Math.abs(value.getBalance());}}).print();
//		User(id=5, name=alan1, pwd=5, email=5@5.com, age=28, balance=1500.0)
//		User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
//		User(id=1, name=alan1, pwd=1, email=1@1.com, age=18, balance=-1000.0)
//		User(id=4, name=alan2, pwd=4, email=4@4.com, age=20, balance=-300.0)}public static void distinctFunction4(ExecutionEnvironment env) throws Exception {List<String> info = new ArrayList<>();info.add("Hadoop,Spark");info.add("Spark,Flink");info.add("Hadoop,Flink");info.add("Hadoop,Flink");DataSet<String> source = env.fromCollection(info);source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {System.err.print("come in ");for (String token : value.split(",")) {out.collect(token);}}});source.distinct().print();}}

10、Window

KeyedStream → WindowedStream
Window 函數允許按時間或其他條件對現有 KeyedStream 進行分組。 以下是以 10 秒的時間窗口聚合:

inputStream.keyBy(0).window(Time.seconds(10));

Flink 定義數據片段以便(可能)處理無限數據流。 這些切片稱為窗口。 此切片有助于通過應用轉換處理數據塊。 要對流進行窗口化,需要分配一個可以進行分發的鍵和一個描述要對窗口化流執行哪些轉換的函數。要將流切片到窗口,可以使用 Flink 自帶的窗口分配器。 我們有選項,如 tumbling windows, sliding windows, global 和 session windows。
具體參考系列文章
6、Flink四大基石之Window詳解與詳細示例(一)
6、Flink四大基石之Window詳解與詳細示例(二)
7、Flink四大基石之Time和WaterMaker詳解與詳細示例(watermaker基本使用、kafka作為數據源的watermaker使用示例以及超出最大允許延遲數據的接收實現)

11、WindowAll

DataStream → AllWindowedStream
windowAll 函數允許對常規數據流進行分組。 通常,這是非并行數據轉換,因為它在非分區數據流上運行。
與常規數據流功能類似,也有窗口數據流功能。 唯一的區別是它們處理窗口數據流。 所以窗口縮小就像 Reduce 函數一樣,Window fold 就像 Fold 函數一樣,并且還有聚合。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

這適用于非并行轉換的大多數場景。所有記錄都將收集到 windowAll 算子對應的一個任務中。

具體參考系列文章
6、Flink四大基石之Window詳解與詳細示例(一)
6、Flink四大基石之Window詳解與詳細示例(二)
7、Flink四大基石之Time和WaterMaker詳解與詳細示例(watermaker基本使用、kafka作為數據源的watermaker使用示例以及超出最大允許延遲數據的接收實現)

12、Window Apply

WindowedStream → DataStream
AllWindowedStream → DataStream
將通用 function 應用于整個窗口。下面是一個手動對窗口內元素求和的 function。

如果你使用 windowAll 轉換,則需要改用 AllWindowFunction。

windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {public void apply (Tuple tuple,Window window,Iterable<Tuple2<String, Integer>> values,Collector<Integer> out) throws Exception {int sum = 0;for (value t: values) {sum += t.f1;}out.collect (new Integer(sum));}
});// 在 non-keyed 窗口流上應用 AllWindowFunction
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {public void apply (Window window,Iterable<Tuple2<String, Integer>> values,Collector<Integer> out) throws Exception {int sum = 0;for (value t: values) {sum += t.f1;}out.collect (new Integer(sum));}
});

13、Window Reduce

WindowedStream → DataStream
對窗口應用 reduce function 并返回 reduce 后的值。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);}
});

14、Aggregations on windows

WindowedStream → DataStream
聚合窗口的內容。min和minBy之間的區別在于,min返回最小值,而minBy返回該字段中具有最小值的元素(max和maxBy相同)。

windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");

15、Union

Union 函數將兩個或多個數據流結合在一起。 這樣就可以并行地組合數據流。 如果我們將一個流與自身組合,那么它會輸出每個記錄兩次。

public static void unionFunction(ExecutionEnvironment env) throws Exception {//Produces the union of two DataSets, which have to be of the same type. A union of more than two DataSets can be implemented with multiple union callsList<String> info1 = new ArrayList<>();info1.add("team A");info1.add("team B");List<String> info2 = new ArrayList<>();info2.add("team C");info2.add("team D");List<String> info3 = new ArrayList<>();info3.add("team E");info3.add("team F");List<String> info4 = new ArrayList<>();info4.add("team G");info4.add("team H");DataSet<String> source1 = env.fromCollection(info1);DataSet<String> source2 = env.fromCollection(info2);DataSet<String> source3 = env.fromCollection(info3);DataSet<String> source4 = env.fromCollection(info4);source1.union(source2).union(source3).union(source4).print();
//        team A
//        team C
//        team E
//        team G
//        team B
//        team D
//        team F
//        team H} 

16、Window Join

DataStream,DataStream → DataStream
可以通過一些 key 將同一個 window 的兩個數據流 join 起來。
在 5 秒的窗口中連接兩個流,其中第一個流的第一個屬性的連接條件等于另一個流的第二個屬性

inputStream.join(inputStream1).where(0).equalTo(1).window(Time.seconds(5))     .apply (new JoinFunction () {...});inputStream.join(otherStream).where(<key selector>).equalTo(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new JoinFunction () {...});

具體介紹參考文章:
【flink番外篇】2、flink的18種算子window join 和interval join 介紹及詳細示例

17、Interval Join

KeyedStream,KeyedStream → DataStream
根據 key 相等并且滿足指定的時間范圍內(e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound)的條件將分別屬于兩個 keyed stream 的元素 e1 和 e2 Join 在一起。

// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream).between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound.upperBoundExclusive(true) // optional.lowerBoundExclusive(true) // optional.process(new IntervalJoinFunction() {...});

具體介紹參考文章:
【flink番外篇】2、flink的18種算子window join 和interval join 介紹及詳細示例

18、Window CoGroup

DataStream,DataStream → DataStream
根據指定的 key 和窗口將兩個數據流組合在一起。

dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply (new CoGroupFunction () {...});

19、Connect

DataStream,DataStream → ConnectedStreams
connect提供了和union類似的功能,用來連接兩個數據流,它與union的區別在于:

  • connect只能連接兩個數據流,union可以連接多個數據流。
  • connect所連接的兩個數據流的數據類型可以不一致,union所連接的兩個數據流的數據類型必須一致。
    兩個DataStream經過connect之后被轉化為ConnectedStreams,ConnectedStreams會對兩個流的數據應用不同的處理方法,且雙流之間可以共享狀態。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;/*** @author alanchan
*         union算子可以合并多個同類型的數據流,并生成同類型的數據流,即可以將多個DataStream[T]合并為一個新的DataStream[T]。數據將按照先進先出(First
*         In First Out)的模式合并,且不去重。 connect只能連接兩個數據流,union可以連接多個數據流。
*         connect所連接的兩個數據流的數據類型可以不一致,union所連接的兩個數據流的數據類型必須一致。
*         兩個DataStream經過connect之后被轉化為ConnectedStreams,ConnectedStreams會對兩個流的數據應用不同的處理方法,且雙流之間可以共享狀態。**/
public class TestConnectDemo {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> ds1 = env.fromElements("i", "am", "alanchan");DataStream<String> ds2 = env.fromElements("i", "like", "flink");DataStream<Long> ds3 = env.fromElements(10L, 20L, 30L);// transformation// 注意union能合并同類型DataStream<String> result1 = ds1.union(ds2);// union不可以合并不同類,直接出錯
//		ds1.union(ds3);// connet可以合并同類型ConnectedStreams<String, String> result2 = ds1.connect(ds2);// connet可以合并不同類型ConnectedStreams<String, Long> result3 = ds1.connect(ds3);/** public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable { * 		OUT map1(IN1 value) throws Exception; *		OUT map2(IN2 value) throws Exception;* }*/DataStream<String> result = result3.map(new CoMapFunction<String, Long, String>() {private static final long serialVersionUID = 1L;@Overridepublic String map1(String value) throws Exception {return value + "String";}@Overridepublic String map2(Long value) throws Exception {return value * 2 + "_Long";}});// sinkresult1.print();// connect之后需要做其他的處理,不能直接輸出// result2.print();// result3.print();result.print();// executeenv.execute();}
}

20、CoMap, CoFlatMap

ConnectedStreams → DataStream
類似于在連接的數據流上進行 map 和 flatMap。

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {@Overridepublic Boolean map1(Integer value) {return true;}@Overridepublic Boolean map2(String value) {return false;}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {@Overridepublic void flatMap1(Integer value, Collector<String> out) {out.collect(value.toString());}@Overridepublic void flatMap2(String value, Collector<String> out) {for (String word: value.split(" ")) {out.collect(word);}}
});

21、Iterate

DataStream → IterativeStream → ConnectedStream
通過將一個算子的輸出重定向到某個之前的算子來在流中創建“反饋”循環。這對于定義持續更新模型的算法特別有用。下面的代碼從一個流開始,并不斷地應用迭代自身。大于 0 的元素被發送回反饋通道,其余元素被轉發到下游。

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){@Overridepublic boolean filter(Long value) throws Exception {return value > 0;}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){@Overridepublic boolean filter(Long value) throws Exception {return value <= 0;}
});

22、Cache

DataStream → CachedDataStream
把算子的結果緩存起來。目前只支持批執行模式下運行的作業。算子的結果在算子第一次執行的時候會被緩存起來,之后的 作業中會復用該算子緩存的結果。如果算子的結果丟失了,它會被原來的算子重新計算并緩存。

DataStream<Integer> dataStream = //...
CachedDataStream<Integer> cachedDataStream = dataStream.cache();
cachedDataStream.print(); // Do anything with the cachedDataStream
...
env.execute(); // Execute and create cache.cachedDataStream.print(); // Consume cached result.
env.execute();

23、Split

此功能根據條件將流拆分為兩個或多個流。 當獲得混合流并且可能希望單獨處理每個數據流時,可以使用此方法。新版本使用OutputTag替代。

SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() {@Overridepublic Iterable<String> select(Integer value) {List<String> output = new ArrayList<String>(); if (value % 2 == 0) {output.add("even");}else {output.add("odd");}return output;}
});

OutputTag示例如下

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** @author alanchan**/
public class TestOutpuTagAndProcessDemo {public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// SourceDataStreamSource<String> ds = env.fromElements("alanchanchn is my vx", "i like flink", "alanchanchn is my name", "i like kafka too", "alanchanchn is my true vx");// transformation// 對流中的數據按照alanchanchn拆分并選擇OutputTag<String> nameTag = new OutputTag<>("alanchanchn", TypeInformation.of(String.class));OutputTag<String> frameworkTag = new OutputTag<>("framework", TypeInformation.of(String.class));//		public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
//
//			private static final long serialVersionUID = 1L;
//
//			public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
//
//			public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
//
//			public abstract class Context {
//
//				public abstract Long timestamp();
//
//				public abstract TimerService timerService();
//
//				public abstract <X> void output(OutputTag<X> outputTag, X value);
//			}
//
//			public abstract class OnTimerContext extends Context {
//				public abstract TimeDomain timeDomain();
//			}
//
//		}SingleOutputStreamOperator<String> result = ds.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String inValue, Context ctx, Collector<String> outValue) throws Exception {// out收集完的還是放在一起的,ctx可以將數據放到不同的OutputTagif (inValue.startsWith("alanchanchn")) {ctx.output(nameTag, inValue);} else {ctx.output(frameworkTag, inValue);}}});DataStream<String> nameResult = result.getSideOutput(nameTag);DataStream<String> frameworkResult = result.getSideOutput(frameworkTag);// .sinkSystem.out.println(nameTag);// OutputTag(Integer, 奇數)System.out.println(frameworkTag);// OutputTag(Integer, 偶數)nameResult.print("name->");frameworkResult.print("framework->");
//		OutputTag(String, alanchanchn)
//		OutputTag(String, framework)
//		framework->> alanchanchn is my vx
//		name->> alanchanchn is my name
//		framework->> i like flink
//		name->> alanchanchn is my true vx
//		framework->> i like kafka too// executeenv.execute();}
}

24、Select

此功能允許您從拆分流中選擇特定流。新版本使用OutputTag替代。

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even"); 
DataStream<Integer> odd = split.select("odd"); 
DataStream<Integer> all = split.select("even","odd");

參考上文中spilt中的outputtag示例。

25、Project

Project 函數允許從事件流中選擇屬性子集,并僅將所選元素發送到下一個處理流。

DataStream<Tuple4<Integer, Double, String, String>> in = // [...] 
DataStream<Tuple2<String, String>> out = in.project(3,2);

上述函數從給定記錄中選擇屬性號 2 和 3。 以下是示例輸入和輸出記錄:

(1,10.0,A,B)=> (B,A)
(2,20.0,C,D)=> (D,C)
  • 完整示例
import java.util.Arrays;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author alanchan**/
public class TestprojectDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple5<Integer, String, Integer, String,Double>> in = env.fromCollection(Arrays.asList(Tuple5.of(1, "alan", 17, "alan.chan.chn@163.com", 20d),Tuple5.of(2, "alanchan", 18, "alan.chan.chn@163.com", 25d),Tuple5.of(3, "alanchanchn", 19, "alan.chan.chn@163.com", 30d),Tuple5.of(4, "alan_chan", 18, "alan.chan.chn@163.com", 25d),Tuple5.of(5, "alan_chan_chn", 20, "alan.chan.chn@163.com", 30d)));DataStream<Tuple3<String, Integer,Double>> out = in.project(1, 2,4);out.print();
//		8> (alan,17,20.0)
//		11> (alan_chan,18,25.0)
//		12> (alan_chan_chn,20,30.0)
//		10> (alanchanchn,19,30.0)
//		9> (alanchan,18,25.0)env.execute();}}

以上,本文主要介紹Flink 的23種常用的operator及以具體可運行示例進行說明,如果需要了解更多內容,可以在本人Flink 專欄中了解更新系統的內容。

本專題分為五篇,即:
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(1)- map、flatmap和filter
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(2)- keyby、reduce和Aggregations
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(3)-window、distinct、join等
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(4)- union、window join、connect、outputtag、cache、iterator、project
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(完整版)

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/208342.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/208342.shtml
英文地址,請注明出處:http://en.pswp.cn/news/208342.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

小白學java棧的經典算法問題——第四關白銀挑戰

內容1.括號匹配問題2.最小棧3.最大棧 1.括號匹配問題 棧的典型題目還是非常明顯的&#xff0c;括號匹配、表達式計算等等幾乎都少不了棧&#xff0c;本小節我們就看兩個最經典的問題 首先是LeetCode20,鏈接 本道題還是比較簡單的&#xff0c;其中比較麻煩的是如何判斷兩個符…

力扣面試題 08.12. 八皇后(java回溯解法)

Problem: 面試題 08.12. 八皇后 文章目錄 題目描述思路解題方法復雜度Code 題目描述 思路 八皇后問題的性質可以利用回溯來解決&#xff0c;將大問題具體分解成如下待解決問題&#xff1a; 1.以棋盤的每一行為回溯的決策階段&#xff0c;判斷當前棋盤位置能否放置棋子 2.如何判…

hbuilder + uniapp +vue3 開發微信云小程序

1、創建項目&#xff1a; 2、創建項目完成的默認目錄結構&#xff1a; 3、在根目錄新建一個文件夾cloudFns&#xff08;文件名字隨便&#xff09;&#xff0c;存放云函數源碼&#xff1a; 4、修改manifest.json文件&#xff1a;添加 小程序 appid和cloudfunctionRoot&#xff0…

python的websocket方法教程

WebSocket是一種網絡通信協議&#xff0c;它在單個TCP連接上提供全雙工的通信信道。在本篇文章中&#xff0c;我們將探討如何在Python中使用WebSocket實現實時通信。 websockets是Python中最常用的網絡庫之一&#xff0c;也是websocket協議的Python實現。它不僅作為基礎組件在…

pyside/qt03——人機協同的編程教學—直接面向chatGPT實戰開發(做中學,事上練)

先大概有個草圖框架&#xff0c;一點點豐富 我糾結好久&#xff0c;直接用Python寫UI代碼 還是用designer做UI 再轉Python呢&#xff0c; 因為不管怎么樣都要轉成Python代碼&#xff0c; 想了想還是學一下designer吧&#xff0c;有個中介&#xff0c;有直觀理解。 直接這樣也可…

智能優化算法應用:基于食肉植物算法無線傳感器網絡(WSN)覆蓋優化 - 附代碼

智能優化算法應用&#xff1a;基于食肉植物算法無線傳感器網絡(WSN)覆蓋優化 - 附代碼 文章目錄 智能優化算法應用&#xff1a;基于食肉植物算法無線傳感器網絡(WSN)覆蓋優化 - 附代碼1.無線傳感網絡節點模型2.覆蓋數學模型及分析3.食肉植物算法4.實驗參數設定5.算法結果6.參考…

設計并實現一個多線程圖書館管理系統,涉及數據庫操作

沒有實現全部功能&#xff0c;希望路過的大佬&#xff0c;可以實現全部功能&#xff0c;在評論區聊聊 創建數據庫library-demo CREATE DATABASE library-demo創建圖書表book CREATE TABLE book (bookId int(11) NOT NULL AUTO_INCREMENT COMMENT 圖書ID,bookName varchar(15)…

QUIC協議對比TCP網絡性能測試模擬弱網測試

QUIC正常外網壓測數據---時延diff/ms如下圖&#xff1a; QUIC弱網外網壓測數據 TCP正常外網壓測數據 TCP弱網外網壓測數據 結論&#xff1a; 在弱網情況下&#xff0c;TCP和QUIC協議的表現會有所不同。下面是它們在弱網環境中的性能對比&#xff1a; 連接建立&#xff1a;…

HarmonyOS創建JavaScript(類 Web開發模式)項目

上文 HarmonyOS帶大家創建自己的第一個Page頁面并實現路由跳轉(ArkTS)帶大家創建了我們項目中第一個自己創建的page 并完成了一個跳轉邏輯的編寫 上文的開發模式是 ArkTS 的 也被稱為 聲明式開發范式 還有一種 javaScript的 類Web開發模式 這種方式就類似于我們傳統的前端開發模…

基于微群機器人的二次開發

請求URL&#xff1a; http://域名地址/modifyGroupName 請求方式&#xff1a; POST 請求頭Headers&#xff1a; Content-Type&#xff1a;application/jsonAuthorization&#xff1a;login接口返回 參數&#xff1a; 參數名必選類型說明wId是String登錄實例標識chatRoom…

讀書筆記-《數據結構與算法》-摘要2[冒泡排序]

冒泡排序 核心&#xff1a;冒泡&#xff0c;持續比較相鄰元素&#xff0c;大的挪到后面&#xff0c;因此大的會逐步往后挪&#xff0c;故稱之為冒泡。 public class BubbleSort {public static void main(String[] args) {int unsortedArray[] new int[]{6, 5, 3, 1, 8, 7, 2…

Leetcode每日一題學習訓練——Python3版(到達首都的最少油耗)

版本說明 當前版本號[20231205]。 版本修改說明20231205初版 目錄 文章目錄 版本說明目錄到達首都的最少油耗理解題目代碼思路參考代碼 原題可以點擊此 2477. 到達首都的最少油耗 前去練習。 到達首都的最少油耗 ? 給你一棵 n 個節點的樹&#xff08;一個無向、連通、無環…

倒計時模塊復習

經典回顧倒計時 倒計時的基本布局介紹。 一個內容區域和一個輸入區域&#xff0c;內容區域進行劃分 直接使用flex布局會更快一點。 js代碼 我們利用一下模塊化思想&#xff0c;直接把獲得時間這個功能寫成一個函數。方便后續的調用 function getTime() {const date new Date…

MES管理系統通過哪些方面提升產品質量管理水平

在當今高度競爭的市場環境中&#xff0c;質量成為了企業生存和發展的關鍵因素。工廠作為生產產品的核心場所&#xff0c;其質量管理水平直接影響到產品的質量和企業的聲譽。為了應對這一挑戰&#xff0c;許多工廠引入了MES管理系統解決方案。本文將探討MES管理系統如何幫助工廠…

【UE5】監控攝像頭效果(上)

目錄 效果 步驟 一、視角切換 二、攝像頭畫面后期處理 三、在場景中顯示攝像頭畫面 效果 步驟 一、視角切換 1. 新建一個Basic關卡&#xff0c;添加第三人稱游戲資源到項目瀏覽器 2. 新建一個Actor藍圖&#xff0c;這里命名為“BP_SecurityCamera” 打開“BP_Securit…

模電筆記。。。。

模電 2.8 蜂鳴器 按照蜂鳴器驅動方式分為有源蜂鳴器和無源蜂鳴器 有源的有自己的震蕩電路&#xff0c;無源的要寫代碼控制。 里面有個線圈&#xff0c;相當于電感&#xff0c;儲能&#xff0c;通直隔交。 蜂鳴器的參數&#xff1a;額定電壓&#xff0c;工作電壓&#xff0…

【CCF-B】1/2區,錄用見刊極快!2個月錄用!

計算機類 ? 好刊解讀 今天小編帶來Taylor and Francis旗下計算機領域快刊&#xff0c;CCF-B類推薦的期刊解讀&#xff0c;期刊審稿周期短&#xff0c;投稿友好&#xff0c;如您有投稿需求&#xff0c;可作為重點關注&#xff01;后文有相關領域真實發表案例&#xff0c;供您投…

防水,也不怕水。Mate X5是如何做到讓你濕手濕屏也不影響操作的?

相信不少人都碰到過當手機屏幕存在小水珠時&#xff0c;觸控變得不靈敏&#xff0c;或者出現“幽靈觸屏”&#xff0c;指東打西的情況。 尤其是在洗澡、做飯&#xff0c;或者在戶外遇到下雨天氣時&#xff0c;如果打濕的手機收到重要聊天消息或者電話&#xff0c;卻因為濕屏導…

TS學習——面向對象

面向對象是程序中一個非常重要的思想&#xff0c;它被很多同學理解成了一個比較難&#xff0c;比較深奧的問題&#xff0c;其實不然。面向對象很簡單&#xff0c;簡而言之就是程序之中所有的操作都需要通過對象來完成。 舉例來說&#xff1a; 操作瀏覽器要使用window對象操作網…

生成fip.bin在Milkv-duo上跑rtthread的相關嘗試,及其問題分析

前言 &#xff08;1&#xff09;PLCT實驗室實習生長期招聘&#xff1a;招聘信息鏈接 &#xff08;2&#xff09;本來是想在Milkv-duo上跑rtthread的&#xff0c;做了很多努力&#xff0c;一直沒有結果。雖然不知道最終能不能成功做出來&#xff0c;還是把自己的相關努力分享出來…