Flink 系列文章
一、Flink 專欄
Flink 專欄系統介紹某一知識點,并輔以具體的示例進行說明。
-
1、Flink 部署系列
本部分介紹Flink的部署、配置相關基礎內容。 -
2、Flink基礎系列
本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內容。 -
3、Flik Table API和SQL基礎系列
本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創建庫、表用法、查詢、窗口函數、catalog等等內容。 -
4、Flik Table API和SQL提高與應用系列
本部分是table api 和sql的應用部分,和實際的生產應用聯系更為密切,以及有一定開發難度的內容。 -
5、Flink 監控系列
本部分和實際的運維、監控工作相關。
二、Flink 示例專欄
Flink 示例專欄是 Flink 專欄的輔助說明,一般不會介紹知識點的信息,更多的是提供一個一個可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內容。
兩專欄的所有文章入口點擊:Flink 系列文章匯總索引
文章目錄
- Flink 系列文章
- 一、Flink的23種算子說明及示例
- 9、first、distinct、join、outjoin、cross
- 10、Window
- 11、WindowAll
- 12、Window Apply
- 13、Window Reduce
- 14、Aggregations on windows
本文主要介紹Flink 的10種常用的operator(window、distinct、join等)及以具體可運行示例進行說明.
如果需要了解更多內容,可以在本人Flink 專欄中了解更新系統的內容。
本文除了maven依賴外,沒有其他依賴。
本專題分為五篇,即:
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(1)- map、flatmap和filter
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(2)- keyby、reduce和Aggregations
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(3)-window、distinct、join等
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(4)- union、window join、connect、outputtag、cache、iterator、project
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(完整版)
一、Flink的23種算子說明及示例
本文示例中使用的maven依賴和java bean 參考本專題的第一篇中的maven和java bean。
9、first、distinct、join、outjoin、cross
具體事例詳見例子及結果。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestFirst_Join_Distinct_OutJoin_CrossDemo {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();joinFunction(env);env.execute();}public static void unionFunction(StreamExecutionEnvironment env) throws Exception {List<String> info1 = new ArrayList<>();info1.add("team A");info1.add("team B");List<String> info2 = new ArrayList<>();info2.add("team C");info2.add("team D");List<String> info3 = new ArrayList<>();info3.add("team E");info3.add("team F");List<String> info4 = new ArrayList<>();info4.add("team G");info4.add("team H");DataStream<String> source1 = env.fromCollection(info1);DataStream<String> source2 = env.fromCollection(info2);DataStream<String> source3 = env.fromCollection(info3);DataStream<String> source4 = env.fromCollection(info4);source1.union(source2).union(source3).union(source4).print();
// team A
// team C
// team E
// team G
// team B
// team D
// team F
// team H}public static void crossFunction(ExecutionEnvironment env) throws Exception {// cross,求兩個集合的笛卡爾積,得到的結果數為:集合1的條數 乘以 集合2的條數List<String> info1 = new ArrayList<>();info1.add("team A");info1.add("team B");List<Tuple2<String, Integer>> info2 = new ArrayList<>();info2.add(new Tuple2("W", 3));info2.add(new Tuple2("D", 1));info2.add(new Tuple2("L", 0));DataSource<String> data1 = env.fromCollection(info1);DataSource<Tuple2<String, Integer>> data2 = env.fromCollection(info2);data1.cross(data2).print();
// (team A,(W,3))
// (team A,(D,1))
// (team A,(L,0))
// (team B,(W,3))
// (team B,(D,1))
// (team B,(L,0))}public static void outerJoinFunction(ExecutionEnvironment env) throws Exception {// Outjoin,跟sql語句中的left join,right join,full join意思一樣// leftOuterJoin,跟join一樣,但是左邊集合的沒有關聯上的結果也會取出來,沒關聯上的右邊為null// rightOuterJoin,跟join一樣,但是右邊集合的沒有關聯上的結果也會取出來,沒關聯上的左邊為null// fullOuterJoin,跟join一樣,但是兩個集合沒有關聯上的結果也會取出來,沒關聯上的一邊為nullList<Tuple2<Integer, String>> info1 = new ArrayList<>();info1.add(new Tuple2<>(1, "shenzhen"));info1.add(new Tuple2<>(2, "guangzhou"));info1.add(new Tuple2<>(3, "shanghai"));info1.add(new Tuple2<>(4, "chengdu"));List<Tuple2<Integer, String>> info2 = new ArrayList<>();info2.add(new Tuple2<>(1, "深圳"));info2.add(new Tuple2<>(2, "廣州"));info2.add(new Tuple2<>(3, "上海"));info2.add(new Tuple2<>(5, "杭州"));DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1);DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2);// left join
// eft join:7> (1,shenzhen,深圳)
// left join:2> (3,shanghai,上海)
// left join:8> (4,chengdu,未知)
// left join:16> (2,guangzhou,廣州)data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {Tuple3<Integer, String, String> tuple = new Tuple3();if (second == null) {tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField("未知", 2);} else {// 另外一種賦值方式,和直接用構造函數賦值相同tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField(second.f1, 2);}return tuple;}}).print("left join");// right join
// right join:2> (3,shanghai,上海)
// right join:7> (1,shenzhen,深圳)
// right join:15> (5,--,杭州)
// right join:16> (2,guangzhou,廣州)data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {Tuple3<Integer, String, String> tuple = new Tuple3();if (first == null) {tuple.setField(second.f0, 0);tuple.setField("--", 1);tuple.setField(second.f1, 2);} else {// 另外一種賦值方式,和直接用構造函數賦值相同tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField(second.f1, 2);}return tuple;}}).print("right join");// fullOuterJoin
// fullOuterJoin:2> (3,shanghai,上海)
// fullOuterJoin:8> (4,chengdu,--)
// fullOuterJoin:15> (5,--,杭州)
// fullOuterJoin:16> (2,guangzhou,廣州)
// fullOuterJoin:7> (1,shenzhen,深圳)data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {Tuple3<Integer, String, String> tuple = new Tuple3();if (second == null) {tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField("--", 2);} else if (first == null) {tuple.setField(second.f0, 0);tuple.setField("--", 1);tuple.setField(second.f1, 2);} else {// 另外一種賦值方式,和直接用構造函數賦值相同tuple.setField(first.f0, 0);tuple.setField(first.f1, 1);tuple.setField(second.f1, 2);}return tuple;}}).print("fullOuterJoin");}public static void joinFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info1 = new ArrayList<>();info1.add(new Tuple2<>(1, "shenzhen"));info1.add(new Tuple2<>(2, "guangzhou"));info1.add(new Tuple2<>(3, "shanghai"));info1.add(new Tuple2<>(4, "chengdu"));List<Tuple2<Integer, String>> info2 = new ArrayList<>();info2.add(new Tuple2<>(1, "深圳"));info2.add(new Tuple2<>(2, "廣州"));info2.add(new Tuple2<>(3, "上海"));info2.add(new Tuple2<>(5, "杭州"));DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1);DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2);//// join:2> ((3,shanghai),(3,上海))
// join:16> ((2,guangzhou),(2,廣州))
// join:7> ((1,shenzhen),(1,深圳))data1.join(data2).where(0).equalTo(0).print("join");// join2:2> (3,上海,shanghai)
// join2:7> (1,深圳,shenzhen)
// join2:16> (2,廣州,guangzhou)DataSet<Tuple3<Integer, String, String>> data3 = data1.join(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {return new Tuple3<Integer, String, String>(first.f0, second.f1, first.f1);}});data3.print("join2");}public static void firstFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info = new ArrayList<>();info.add(new Tuple2(1, "Hadoop"));info.add(new Tuple2(1, "Spark"));info.add(new Tuple2(1, "Flink"));info.add(new Tuple2(2, "Scala"));info.add(new Tuple2(2, "Java"));info.add(new Tuple2(2, "Python"));info.add(new Tuple2(3, "Linux"));info.add(new Tuple2(3, "Window"));info.add(new Tuple2(3, "MacOS"));DataSet<Tuple2<Integer, String>> dataSet = env.fromCollection(info);// 前幾個
// dataSet.first(4).print();
// (1,Hadoop)
// (1,Spark)
// (1,Flink)
// (2,Scala)// 按照tuple2的第一個元素進行分組,查出每組的前2個
// dataSet.groupBy(0).first(2).print();
// (3,Linux)
// (3,Window)
// (1,Hadoop)
// (1,Spark)
// (2,Scala)
// (2,Java)// 按照tpule2的第一個元素進行分組,并按照倒序排列,查出每組的前2個dataSet.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();
// (3,Window)
// (3,MacOS)
// (1,Spark)
// (1,Hadoop)
// (2,Scala)
// (2,Python)}public static void distinctFunction(ExecutionEnvironment env) throws Exception {List list = new ArrayList<Tuple3<Integer, Integer, Integer>>();list.add(new Tuple3<>(0, 3, 6));list.add(new Tuple3<>(0, 2, 5));list.add(new Tuple3<>(0, 3, 6));list.add(new Tuple3<>(1, 1, 9));list.add(new Tuple3<>(1, 2, 8));list.add(new Tuple3<>(1, 2, 8));list.add(new Tuple3<>(1, 3, 9));DataSet<Tuple3<Integer, Integer, Integer>> source = env.fromCollection(list);// 去除tuple3中元素完全一樣的source.distinct().print();
// (1,3,9)
// (0,3,6)
// (1,1,9)
// (1,2,8)
// (0,2,5)// 去除tuple3中第一個元素一樣的,只保留第一個// source.distinct(0).print();
// (1,1,9)
// (0,3,6)// 去除tuple3中第一個和第三個相同的元素,只保留第一個// source.distinct(0,2).print();
// (0,3,6)
// (1,1,9)
// (1,2,8)
// (0,2,5)}public static void distinctFunction2(ExecutionEnvironment env) throws Exception {DataSet<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 18, 3000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 18, 1000), new User(5, "alan1", "5", "5@5.com", 28, 1500), new User(4, "alan2", "4", "4@4.com", 20, 300)));// source.distinct("name").print();
// User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
// User(id=1, name=alan1, pwd=1, email=1@1.com, age=18, balance=3000.0)source.distinct("name", "age").print();
// User(id=1, name=alan1, pwd=1, email=1@1.com, age=18, balance=3000.0)
// User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
// User(id=5, name=alan1, pwd=5, email=5@5.com, age=28, balance=1500.0)
// User(id=4, name=alan2, pwd=4, email=4@4.com, age=20, balance=300.0)}public static void distinctFunction3(ExecutionEnvironment env) throws Exception {DataSet<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 18, -1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 18, -1000), new User(5, "alan1", "5", "5@5.com", 28, 1500), new User(4, "alan2", "4", "4@4.com", 20, -300)));// 針對balance增加絕對值去重source.distinct(new KeySelector<User, Double>() {@Overridepublic Double getKey(User value) throws Exception {return Math.abs(value.getBalance());}}).print();
// User(id=5, name=alan1, pwd=5, email=5@5.com, age=28, balance=1500.0)
// User(id=2, name=alan2, pwd=2, email=2@2.com, age=19, balance=200.0)
// User(id=1, name=alan1, pwd=1, email=1@1.com, age=18, balance=-1000.0)
// User(id=4, name=alan2, pwd=4, email=4@4.com, age=20, balance=-300.0)}public static void distinctFunction4(ExecutionEnvironment env) throws Exception {List<String> info = new ArrayList<>();info.add("Hadoop,Spark");info.add("Spark,Flink");info.add("Hadoop,Flink");info.add("Hadoop,Flink");DataSet<String> source = env.fromCollection(info);source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {System.err.print("come in ");for (String token : value.split(",")) {out.collect(token);}}});source.distinct().print();}}
10、Window
KeyedStream → WindowedStream
Window 函數允許按時間或其他條件對現有 KeyedStream 進行分組。 以下是以 10 秒的時間窗口聚合:
inputStream.keyBy(0).window(Time.seconds(10));
Flink 定義數據片段以便(可能)處理無限數據流。 這些切片稱為窗口。 此切片有助于通過應用轉換處理數據塊。 要對流進行窗口化,需要分配一個可以進行分發的鍵和一個描述要對窗口化流執行哪些轉換的函數。要將流切片到窗口,可以使用 Flink 自帶的窗口分配器。 我們有選項,如 tumbling windows, sliding windows, global 和 session windows。
具體參考系列文章
6、Flink四大基石之Window詳解與詳細示例(一)
6、Flink四大基石之Window詳解與詳細示例(二)
7、Flink四大基石之Time和WaterMaker詳解與詳細示例(watermaker基本使用、kafka作為數據源的watermaker使用示例以及超出最大允許延遲數據的接收實現)
11、WindowAll
DataStream → AllWindowedStream
windowAll 函數允許對常規數據流進行分組。 通常,這是非并行數據轉換,因為它在非分區數據流上運行。
與常規數據流功能類似,也有窗口數據流功能。 唯一的區別是它們處理窗口數據流。 所以窗口縮小就像 Reduce 函數一樣,Window fold 就像 Fold 函數一樣,并且還有聚合。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
這適用于非并行轉換的大多數場景。所有記錄都將收集到 windowAll 算子對應的一個任務中。
具體參考系列文章
6、Flink四大基石之Window詳解與詳細示例(一)
6、Flink四大基石之Window詳解與詳細示例(二)
7、Flink四大基石之Time和WaterMaker詳解與詳細示例(watermaker基本使用、kafka作為數據源的watermaker使用示例以及超出最大允許延遲數據的接收實現)
12、Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
將通用 function 應用于整個窗口。下面是一個手動對窗口內元素求和的 function。
如果你使用 windowAll 轉換,則需要改用 AllWindowFunction。
windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {public void apply (Tuple tuple,Window window,Iterable<Tuple2<String, Integer>> values,Collector<Integer> out) throws Exception {int sum = 0;for (value t: values) {sum += t.f1;}out.collect (new Integer(sum));}
});// 在 non-keyed 窗口流上應用 AllWindowFunction
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {public void apply (Window window,Iterable<Tuple2<String, Integer>> values,Collector<Integer> out) throws Exception {int sum = 0;for (value t: values) {sum += t.f1;}out.collect (new Integer(sum));}
});
13、Window Reduce
WindowedStream → DataStream
對窗口應用 reduce function 并返回 reduce 后的值。
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);}
});
14、Aggregations on windows
WindowedStream → DataStream
聚合窗口的內容。min和minBy之間的區別在于,min返回最小值,而minBy返回該字段中具有最小值的元素(max和maxBy相同)。
windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
以上,本文主要介紹Flink 的10種常用的operator(window、distinct、join等)及以具體可運行示例進行說明.
如果需要了解更多內容,可以在本人Flink 專欄中了解更新系統的內容。
本專題分為五篇,即:
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(1)- map、flatmap和filter
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(2)- keyby、reduce和Aggregations
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(3)-window、distinct、join等
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(4)- union、window join、connect、outputtag、cache、iterator、project
【flink番外篇】1、flink的23種常用算子介紹及詳細示例(完整版)