之前做過數據平臺,對于實時數據采集,使用了Flink。現在想想,在數據開發平臺中,Flink的身影幾乎無處不在,由于之前是邊用邊學,總體有點混亂,借此空隙,整理一下Flink的內容,算是一個知識積累,同時也分享給大家。
注意:由于框架不同版本改造會有些使用的不同,因此本次系列中使用基本框架是 Flink-1.19.x,Flink支持多種語言,這里的所有代碼都是使用java,JDK版本使用的是19。
代碼參考:https://github.com/forever1986/flink-study.git
目錄
- 1 窗口定義及分類
- 1.1 定義
- 1.2 窗口分類
- 1.3 計算函數
- 2 代碼演示
- 2.1 reduce、aggregate&process演示
- 2.1.1 reduce方法演示
- 2.1.2 aggregate方法演示
- 2.1.3 process方法演示
- 2.1.4 結合使用
- 2.2 時間窗口演示
- 2.2.3 時間滾動窗口
- 2.2.2 時間滑動窗口
- 2.2.3 時間會話窗口
- 2.3 計數窗口演示
- 2.3.1 計數滾動窗口
- 2.3.2 計數滑動窗口
前面幾章對Flink從數據輸入到中間計算,最后到數據輸出的整個流程講了一遍。但是這些只不過是Flink最基本的內容。接下來需要更為深入的了解Flink的特性,這些特性就是體現Flink的優勢。本章先來了解第一個高級概念:“窗口”
1 窗口定義及分類
1.1 定義
根據《官方文檔》的描述:窗口是Flink處理無界流的核心。窗口將流分成有限大小的“桶”,可以對桶內的數據進行定制化的計算。這么說可能會比較難理解,下面通過圖解來說明一下窗口的概念:
- 管道里的數據不斷的流過來,Flink會根據規則生成一個桶來接住這些數據,桶的生成規則如下:
1)基于時間規則:一定范圍時間內的數據放到一個桶
2)基于計數規則:一定數量的數據放到一個桶 - 當一開始沒有任何數據過來時,Flink是不會生成桶的,而是第一條數據過來才會生成一個桶。也就是事件觸發類型的。
- Flink也不是觸發了就一下子生成很多桶,而是到來的那一條數據符合某些條件(時間或者數量條件)時,才會生成新的桶
1.2 窗口分類
通過前面的描述可以了解到關于窗口簡單理解為一個按照一定規則接受數據的桶,可以根據不同規則、不同窗口分配器以及是否做KeyBy的維度進行分類:
1)根據是否KeyBy可分為:
- 進行KeyBy:則會將數據的key相同的分配到同一個子任務,并在子任務進行窗口計算,可以是由多個子任務計算不同key的數據。
- 不進行KeyBy:不會對數據進行分開,全部都由一個子任務進行計算。
2)根據不同規則可分為:
- 時間窗口:根據設定的的時間跨度進行劃分窗口,比如設定5秒鐘,則會5秒鐘一個桶。
- 計數窗口:根據設定的計數總量進行劃分窗口,比如設定100條數據,則會100條一個桶。
3)根據窗口分配器可分為:
-
滾動窗口:將數據按照時間或者計數方式劃分為不重疊的窗口。每個窗口都有固定時間或者計數。
-
滑動窗口:將數據按照設定的時間或者計數+步長的方式劃分窗口,每個窗口都有固定時間或者計數,但是窗口跟窗口之間存在一定重疊數據。
注意:這里需要注意2個點。
1)當輸入前2條數據時,就已經會有一個窗口生成,因為前2條數據會與之前2條(假設的數據)構成一個窗口。那么也就意味著滑動窗口是會按照步長每2條數據創建一個窗口。
2)滑動的窗口與窗口之間是會重疊2條數據,也就是有2條數據會被共享。滾動窗口經常用于計算最近1個小時、1天的數據量這種場景
3)細心的朋友還發現,其實滾動窗口就是滑動窗口一個特例,當滑動窗口的步長等于窗口長度時,就是滾動窗口。
- 會話窗口:將數據按照規定多少時間內沒有其它數據再進來,那么之前進來的數據歸為一個窗口。每個窗口都沒有固定時間,窗口與窗口之間數據不重復。但是這種方式只支持時間規則,不支持計數規則。
它們之間可以進行組合,并產生不同的作用。當然并非所有的都能組合,本來有223=12種,但是由于計數與會話窗口不能組合,因此只有下面10種組合結果:
類型 | 作用 | 子任務數 | 是否固定 | 窗口之間是否重疊數據 |
---|---|---|---|---|
不進行KeyBy+時間+滾動 | 按照設定的時間間隔,劃分窗口 | 1個 | 時間固定,數量不固定 | 不重疊 |
不進行KeyBy+時間+滑動 | 按照設定的時間間隔+步長,劃分窗口 | 1個 | 時間固定,數量不固定 | 重疊 |
不進行KeyBy+時間+會話 | 按照設定的時間間隔,超過該時間間隔沒有數據進入, 則開啟新的窗口 | 1個 | 時間不固定,數量不固定 | 不重疊 |
不進行KeyBy+計數+滾動 | 按照設定的計數,劃分窗口 | 1個 | 數量固定,數據不固定 | 不重疊 |
不進行KeyBy+計數+滑動 | 按照設定的計數+步長,劃分窗口 | 1個 | 固定計數 | 重疊 |
進行KeyBy+時間+滾動 | 按照設定的時間間隔,劃分窗口 | 1或n個 | 時間固定,數量不固定 | 不重疊 |
進行KeyBy+時間+滑動 | 按照設定的時間間隔+步長,劃分窗口 | 1或n個 | 時間固定,數量不固定 | 重疊 |
進行KeyBy+時間+會話 | 按照設定的時間間隔,超過該時間間隔沒有數據進入, 則開啟新的窗口 | 1或n個 | 時間不固定,數量不固定 | 不重疊 |
進行KeyBy+計數+滾動 | 按照設定的計數,劃分窗口 | 1或n個 | 數量固定,數據不固定 | 不重疊 |
進行KeyBy+計數+滑動 | 按照設定的計數+步長,劃分窗口 | 1或n個 | 固定計數 | 重疊 |
1.3 計算函數
如果要進行窗口計算,需要先通過windowAll或window方法進行開窗操作,開窗之后就可以使用其計算函數。通過拿AllWindowedStream類來舉例子,可以看到有很多種計算方式,包括max、min、reduce、aggregate、process等方法。但其實只需要關注其中reduce、aggregate、process三個計算函數即可,因為其它方法都是基于這三個方法中的函數進行擴展的,下面說明一下這三個函數的內容:
1)reduce:ReduceFunction函數,輸入、累加器以及輸出的數據類型必須一致
reduce方法需要實現ReduceFunction函數:
/*** 輸入、累加器以及輸出的數據類型必須一致*/
@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {/*** 進行累加的reduce方法,輸入、累加器以及輸出的數據類型必須一致* @param value1 上一次累加的結果* @param value2 這一次進來的數據* @return 返回相同數據類型的數據*/T reduce(T value1, T value2) throws Exception;
}
2)aggregate:AggregateFunction函數,輸入、累加器以及輸出的數據類型可以不一致
aggregate:AggregateFunction函數:
/*** 進行累加的reduce方法,輸入、累加器以及輸出的數據類型必須一致* IN 輸入的數據類型* ACC 累加器的數據類型* OUT 返回的數據類型*/
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {/*** 創建一個累加器,也就是初始化最初開始累積的值。比如你是一個計算總數的,你初始累加器就可以是0。*/ACC createAccumulator();/*** 進行累加操作,也就是計算操作。(不要被add的名稱誤導了,它就是寫你自己的聚合邏輯)* @param value 這一次進來的數據* @param accumulator 上一次累加的結果* @return 返回累加的數據*/ACC add(IN value, ACC accumulator);/*** 輸出的最終值給下游算子*/OUT getResult(ACC accumulator);/*** 一般在會話窗口中使用,用于合并窗口與窗口結果*/ACC merge(ACC a, ACC b);
}
3)process:ProcessWindowFunction和ProcessAllWindowFunction函數,這個是自定義計算函數
ProcessWindowFunction和ProcessAllWindowFunction函數之前在《系列之十一 - Data Stream API的中間算子的底層原理及其自定義》就有講過,但是沒有代碼演示,這一章會進行代碼演示。ProcessWindowFunction和ProcessAllWindowFunction函數其實就是自定義聚合方式,兩者區別在于是否使用過keyBy操作。他們有一個process方法要實現,該方法只有窗口結束時,才會被執行。
ProcessWindowFunction函數:
/*** 進行累加的reduce方法,輸入、累加器以及輸出的數據類型必須一致* IN 輸入的數據類型* OUT 返回的數據類型* KEY key的類型,如果是ProcessAllWindowFunction,則沒有該泛型* W 開窗類型*/
@PublicEvolving
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>extends AbstractRichFunction {/*** 處理聚合操作的方法* */public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
}
2 代碼演示
前面講了很多概念,可能通過字面還是很難理解,現在通過demo的方式來說明。可能就能夠理解。由于做KeyBy和不做KeyBy唯一的區別就是有多少個子任務來處理開窗操作。所以這里只演示做KeyBy的示例:
以下所有代碼參考lesson08子模塊
2.1 reduce、aggregate&process演示
這里先對reduce、aggregate和process三個方法進行演示,本次演示大家要關注的是看看三個方法的差異:
方法 | 實現函數 | 關鍵函數方法 | 特性 | 執行頻率 | 是否有rich函數 |
---|---|---|---|---|---|
reduce | ReduceFunction | reduce | 輸入、輸出和累加器的數據類型必須一致 | 每個窗口除了第一條數據之外,每來一條數據都會執行 | 有 |
aggregate | AggregateFunction | add | 輸入、輸出和累加器的數據類型可以不一致 | 每個窗口每來一條數據都會執行 | 有 |
process | ProcessWindowFunction或 者ProcessAllWindowFunction | process | 輸入和輸出的數據類型可以不一致 | 每個窗口結束時,執行一次 | 過期(非Rich函數已經能夠獲取上下文) |
2.1.1 reduce方法演示
示例說明:輸入的cpu值在同一個窗口中累加起來。看看reduce如何被調用
ReduceDemo類:
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;/*** 演示reduce方法*/
public class ReduceDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 開窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 計算SingleOutputStreamOperator<ServerInfo> reduce = windowStream.reduce(new ReduceFunction<ServerInfo>() {@Overridepublic ServerInfo reduce(ServerInfo value1, ServerInfo value2) throws Exception {System.out.println("==reduce: value1="+ value1 + "value2=" + value2);value1.setCpu(value1.getCpu()+value2.getCpu());return value1;}});// 7. 打印reduce.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}
輸入:前2條數據快速輸入,第3條等一段時間后再輸入
輸出:
知識點:總共輸入了3條數據,其中前2條數據快速輸入,第3條等一段時間后再輸入,結果是:reduce只打印1次,print打印2次。原因:
1)本示例中使用滾動窗口,設置時間間隔為10秒,因此第一條和第二條在第一個時間窗口,第三條在第二個時間窗口,因此print打印2次
2)reduce只輸出1次,這個在《系列之八 - Data Stream API的中間算子:轉換和聚合》中講聚合類算子講過,reduce第一條數據是不會被調用的。因此第一條數據進來時,不調用reduce;第二條數據進來時,調用reduce;第三條數據進來時,由于是新的窗口,因此它算是該窗口的第一條數據,因此也不調用reduce。
2.1.2 aggregate方法演示
示例說明:通過aggregate實現求同一個窗口下的平均cpu值
AggregateDemo類:
import com.demo.lesson08.model.ServerAvgInfo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;/*** 演示Aggregate方法*/
public class AggregateDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 開窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 計算SingleOutputStreamOperator<String> reduce = windowStream.aggregate(new AggregateFunction<ServerInfo, ServerAvgInfo, String>() {@Overridepublic ServerAvgInfo createAccumulator() {// 初始值System.out.println("====createAccumulator====");return new ServerAvgInfo();}@Overridepublic ServerAvgInfo add(ServerInfo value, ServerAvgInfo accumulator) {System.out.println("====add====");// 累積cpu值以及條數accumulator.setServerId(value.getServerId());accumulator.setNum(accumulator.getNum()==null?1:accumulator.getNum()+1);accumulator.setCpuTotal(accumulator.getCpuTotal()==null?value.getCpu():accumulator.getCpuTotal()+ value.getCpu());return accumulator;}@Overridepublic String getResult(ServerAvgInfo accumulator) {System.out.println("====getResult====");// 平均cpu值return "平均cpu值: "+(accumulator.getCpuTotal()/accumulator.getNum());}@Overridepublic ServerAvgInfo merge(ServerAvgInfo a, ServerAvgInfo b) {System.out.println("====merge====");return null;}});// 7. 打印reduce.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}
輸入:前3條數據快速輸入,第4條等一段時間(10秒)后再輸入
輸出:
知識點:總共輸入了4條數據,其中前3條數據快速輸入,第4條等一段時間(10秒)后再輸入。結果是:createAccumulator打印2次,add打印4次,getResult打印2次,merge打印0次,print打印2次。原因:
1)本示例使用滾動窗口,設置時間間隔為10秒,因此第一條、第二條和第三條在第一個時間窗口,第四條在第二個時間窗口,因此2個窗口print打印2次。createAccumulator和getResult也是打印2次,因此說明他們是在窗口創建和關閉時分別被調用
2)add輸出4次,說明每一次數據輸入,都會執行add方法。
3)支持輸入、累加器以及輸出都是不一樣的數據類型
2.1.3 process方法演示
示例說明:通過process實現求同一個窗口下的平均cpu值
ProcessDemo類:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;public class ProcessDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 開窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 計算SingleOutputStreamOperator<String> reduce = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {System.out.println("該窗口的時間:"+DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的條數=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印reduce.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}
輸入:前3條數據快速輸入,等待控制臺輸出process,再輸入第4條數據
輸出:
知識點:總共輸入了4條數據,其中前3條數據快速輸入,第4條等一段時間后再輸入。結果是:process打印2次,print打印2次,原因:
1)本示例使用滾動窗口,設置時間間隔為10秒,因此第一條、第二條和第三條在第一個時間窗口,第四條在第二個時間窗口,因此2個窗口print打印2次。
2)process打印2次,說明每個窗口在最后的時候process才會調用一次,并且將本次窗口的數據條數都一次性計算,并沒有每條數據計算。
3)支持輸入和輸出都是不一樣的數據類型
2.1.4 結合使用
細心的朋友還能看到reduce或者aggregate方法還有另外重載的方法,可以傳入2個參數。比如reduce可以傳入ReduceFunction和ProcessWindowFunction,aggregate可以傳入AggregateFunction和ProcessWindowFunction。其實這是由于它們各自存在一定的缺點,為了彌補缺點,可以結合使用,各取有點。
注意:使用時是會將ReduceFunction或AggregateFunction最終的數據給ProcessWindowFunction,也就是每個窗口,ProcessWindowFunction的process方法的elements只會拿到ReduceFunction或AggregateFunction累加器最后計算得到的一條數據。
示例說明:在ReduceFunction中累積CPU,然后在ProcessWindowFunction計算平均值
ReduceAndProcessDemo 類:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** 演示reduce方法*/
public class ReduceAndProcessDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 開窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 計算SingleOutputStreamOperator<ServerInfo> reduce = windowStream.reduce(new ReduceFunction<>() {@Overridepublic ServerInfo reduce(ServerInfo value1, ServerInfo value2) throws Exception {System.out.println("==reduce: value1=" + value1 + "value2=" + value2);value1.setCpu(value1.getCpu() + value2.getCpu());return value1;}}, new ProcessWindowFunction<>() {@Overridepublic void process(String key, ProcessWindowFunction<ServerInfo, ServerInfo, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<ServerInfo> out) throws Exception {// 平均cpu值long num = elements.spliterator().estimateSize();ServerInfo serverInfo = elements.iterator().next();System.out.println("服務器id=" + key + "在窗口的時間:" + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+ " 的條數=" + elements.spliterator().estimateSize()+ " 錯誤平均cpu值: " + (serverInfo.getCpu() / num));out.collect(serverInfo);}});// 7. 打印reduce.print("最終結果");// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}
輸入:快速輸入3條數據,保證3條數據在一個窗口
輸出:
知識點:從上面輸出結果可以看到,reduce被調用2次,這是因為總共輸入3條數據。process被調用一次,而且可以看到平均值計算錯誤,在process中打印的結果顯示獲取到的條數是1條。由此可以看出process獲取到的是reduce最后一條數據。
2.2 時間窗口演示
時間窗口的演示,只需要關注的是不同窗口的時間劃分
2.2.3 時間滾動窗口
示例說明:通過process實現求同一個窗口下的平均cpu值
TumblingTimeWindowsDemo 類:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 時間滾動窗口示例*/
public class TumblingTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 開窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 計算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的開始時間和結束時間System.out.println("該窗口的時間:"+DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的條數=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}
輸入:前3條數據快速輸入,等待控制臺輸出數據之后,再輸入第4條
輸出:
知識點:總共輸入了4條數據,其中前3條數據快速輸入,第4條等一段時間后再輸入。結果是:process打印2次,說明被分為2個窗口:
1)注意窗口的開始時間和結束時間,第一個窗口是從0秒-10秒,第二個窗口是從10秒-20秒
2)這說明滾動窗口,窗口之間是不會重疊的。
3)另外注意的點是,窗口時間范圍是左閉右開的,也就是0秒是屬于第一個窗口,但10秒不屬于第一個窗口,而是屬于第二個窗口
2.2.2 時間滑動窗口
示例說明:通過process實現求同一個窗口下的平均cpu值
SlidingTimeWindowsDemo 類:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 時間滑動窗口示例*/
public class SlidingTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 開窗 - 滑動窗口,間隔為10秒,步長為5秒WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(SlidingProcessingTimeWindows.of(Duration.ofSeconds(10),Duration.ofSeconds(5)));// 6. 計算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的開始時間和結束時間System.out.println("該窗口的時間:"+ DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的條數=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}
輸入:先快速輸入前三條,等待控制臺輸出之后,再輸入第4條數據
輸出:
知識點:從控制臺可以看到process被執行了2次,說明開了2個窗口。
1)注意其每個窗口的開始時間和結束時間,比如第一個窗口是從15秒-25秒,第二個窗口是從20秒-30秒,說明在20秒-25秒是兩個窗口重疊之處,也就是代碼中設置的步長5秒
2)前三條數據分別被第一個窗口和第二個窗口給計算了,因此說明前3條數據輸入時間落在了20秒-25秒之間。因此說明兩個窗口是會重疊的。
2.2.3 時間會話窗口
示例說明:通過process實現求同一個窗口下的平均cpu值
SessionTimeWindowsDemo類:
import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 時間會話窗口演示*/
public class SessionTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 開窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(// 固定間隔時間ProcessingTimeSessionWindows.withGap(Duration.ofSeconds(10))// 動態間隔時間
// ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<ServerInfo>(){
// @Override
// public long extract(ServerInfo element) {
// return 0;
// }
// }));// 6. 計算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的開始時間和結束時間System.out.println("該窗口的時間:"+ DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的條數=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}
輸入:前面2條數據在10秒鐘之內輸入,第2條數據輸入后的10秒鐘之后,再輸入第3條數據
輸出:
知識點:從控制臺可以看到process被執行2次,因此它是開了2個窗口
1)前面兩條數據被合并為一個窗口,因此平均值是2.5,第三條數據在第二個窗口,因此平均值2.8。
2)注意一下每個窗口的開始和結束時間,都不想之前滾動或者滑動窗口一樣,都是固定的
3)會話窗口還支持動態規定間隔時間,你可以使用withDynamicGap方法,實現函數接口,在輸入的數據中設置某個屬性為間隔時間,實現動態的會話窗口。
2.3 計數窗口演示
2.3.1 計數滾動窗口
示例說明:通過process實現求同一個窗口下的平均cpu值
TumblingCountWindowsDemo 類:
import com.demo.lesson08.method.ReduceDemo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.Iterator;/*** 計數滾動窗口示例*/
public class TumblingCountWindowsDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 開窗 - 活動窗口,間隔為3條數據WindowedStream<ServerInfo, String, GlobalWindow> windowedStream = kyStream.countWindow(3);// 6. 計算 - 計算平均值SingleOutputStreamOperator<String> process = windowedStream.process(new ProcessWindowFunction<>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, GlobalWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {long num = elements.spliterator().estimateSize();Iterator<ServerInfo> iterator = elements.iterator();double sum = 0l;while (iterator.hasNext()) {ServerInfo next = iterator.next();sum = sum + next.getCpu();}out.collect("cpu平均值=" + (sum / num) + " 條數=" + num);}});// 7. 打印process.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}
輸入:輸入前3條,再輸入后3條
輸出:(平均值由于精度問題不用管)
知識點:示例中設置滾動窗口,每個窗口3條數據。輸入了6條數據,process調用了2次
1)這樣就知道滾動窗口是每3條會新建一個窗口,窗口之間不重疊,與代碼示例中設置的一樣
2.3.2 計數滑動窗口
示例說明:通過process實現求同一個窗口下的平均cpu值
SlidingCountWindowsDemo 類:
import com.demo.lesson08.method.ReduceDemo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.Iterator;/*** 計數滑動窗口示例*/
public class SlidingCountWindowsDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 開窗 - 活動窗口,間隔為3條數據WindowedStream<ServerInfo, String, GlobalWindow> windowedStream = kyStream.countWindow(4,2);// 6. 計算 - 計算平均值SingleOutputStreamOperator<String> process = windowedStream.process(new ProcessWindowFunction<>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, GlobalWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {long num = elements.spliterator().estimateSize();Iterator<ServerInfo> iterator = elements.iterator();double sum = 0l;while (iterator.hasNext()) {sum = sum + iterator.next().getCpu();}out.collect("cpu平均值=" + (sum / num) + " 條數=" + num);}});// 7. 打印process.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}
輸入:先輸入2條,等控制臺輸出后再輸入第3條和第4條,等控制臺輸出后再輸入第5條和第6條
輸出:
知識點:總共輸入6條數據,先輸入2條,等一下再輸入第3條和第4條,等一下再輸入第5條和第6條。process調用了3次
1)可以看出它是會按照步長來調用process,這是因為第1條和第2條會是前一個窗口與當前窗口重疊
結語:本章說明了不同的窗口類型,還通過代碼演示,更深刻了解窗口的工作原理。接下來一章將接觸更底層的Flink如何實現不同窗口的。