Flink 系列之十五 - 高級概念 - 窗口

之前做過數據平臺,對于實時數據采集,使用了Flink。現在想想,在數據開發平臺中,Flink的身影幾乎無處不在,由于之前是邊用邊學,總體有點混亂,借此空隙,整理一下Flink的內容,算是一個知識積累,同時也分享給大家。

注意由于框架不同版本改造會有些使用的不同,因此本次系列中使用基本框架是 Flink-1.19.x,Flink支持多種語言,這里的所有代碼都是使用java,JDK版本使用的是19
代碼參考:https://github.com/forever1986/flink-study.git

目錄

  • 1 窗口定義及分類
    • 1.1 定義
    • 1.2 窗口分類
    • 1.3 計算函數
  • 2 代碼演示
    • 2.1 reduce、aggregate&process演示
      • 2.1.1 reduce方法演示
      • 2.1.2 aggregate方法演示
      • 2.1.3 process方法演示
      • 2.1.4 結合使用
    • 2.2 時間窗口演示
      • 2.2.3 時間滾動窗口
      • 2.2.2 時間滑動窗口
      • 2.2.3 時間會話窗口
    • 2.3 計數窗口演示
      • 2.3.1 計數滾動窗口
      • 2.3.2 計數滑動窗口

前面幾章對Flink從數據輸入到中間計算,最后到數據輸出的整個流程講了一遍。但是這些只不過是Flink最基本的內容。接下來需要更為深入的了解Flink的特性,這些特性就是體現Flink的優勢。本章先來了解第一個高級概念:“窗口

1 窗口定義及分類

1.1 定義

根據《官方文檔》的描述:窗口是Flink處理無界流的核心。窗口將流分成有限大小的“”,可以對桶內的數據進行定制化的計算。這么說可能會比較難理解,下面通過圖解來說明一下窗口的概念:

在這里插入圖片描述

  • 管道里的數據不斷的流過來,Flink會根據規則生成一個桶來接住這些數據,桶的生成規則如下:
    1)基于時間規則:一定范圍時間內的數據放到一個桶
    2)基于計數規則:一定數量的數據放到一個桶
  • 當一開始沒有任何數據過來時,Flink是不會生成桶的,而是第一條數據過來才會生成一個桶。也就是事件觸發類型的。
  • Flink也不是觸發了就一下子生成很多桶,而是到來的那一條數據符合某些條件(時間或者數量條件)時,才會生成新的桶

1.2 窗口分類

通過前面的描述可以了解到關于窗口簡單理解為一個按照一定規則接受數據的桶,可以根據不同規則、不同窗口分配器以及是否做KeyBy的維度進行分類:

1)根據是否KeyBy可分為:

  • 進行KeyBy:則會將數據的key相同的分配到同一個子任務,并在子任務進行窗口計算,可以是由多個子任務計算不同key的數據。
  • 不進行KeyBy:不會對數據進行分開,全部都由一個子任務進行計算。

2)根據不同規則可分為:

  • 時間窗口:根據設定的的時間跨度進行劃分窗口,比如設定5秒鐘,則會5秒鐘一個桶。
  • 計數窗口:根據設定的計數總量進行劃分窗口,比如設定100條數據,則會100條一個桶。

3)根據窗口分配器可分為:

  • 滾動窗口:將數據按照時間或者計數方式劃分為不重疊的窗口。每個窗口都有固定時間或者計數。
    在這里插入圖片描述

  • 滑動窗口:將數據按照設定的時間或者計數+步長的方式劃分窗口,每個窗口都有固定時間或者計數,但是窗口跟窗口之間存在一定重疊數據。
    在這里插入圖片描述

注意:這里需要注意2個點。
1)當輸入前2條數據時,就已經會有一個窗口生成,因為前2條數據會與之前2條(假設的數據)構成一個窗口。那么也就意味著滑動窗口是會按照步長每2條數據創建一個窗口。
2)滑動的窗口與窗口之間是會重疊2條數據,也就是有2條數據會被共享。滾動窗口經常用于計算最近1個小時、1天的數據量這種場景
3)細心的朋友還發現,其實滾動窗口就是滑動窗口一個特例,當滑動窗口的步長等于窗口長度時,就是滾動窗口

  • 會話窗口:將數據按照規定多少時間內沒有其它數據再進來,那么之前進來的數據歸為一個窗口。每個窗口都沒有固定時間,窗口與窗口之間數據不重復。但是這種方式只支持時間規則不支持計數規則

它們之間可以進行組合,并產生不同的作用。當然并非所有的都能組合,本來有223=12種,但是由于計數與會話窗口不能組合,因此只有下面10種組合結果:

類型作用子任務數是否固定窗口之間是否重疊數據
不進行KeyBy+時間+滾動按照設定的時間間隔,劃分窗口1個時間固定,數量不固定不重疊
不進行KeyBy+時間+滑動按照設定的時間間隔+步長,劃分窗口1個時間固定,數量不固定重疊
不進行KeyBy+時間+會話按照設定的時間間隔,超過該時間間隔沒有數據進入,
則開啟新的窗口
1個時間不固定,數量不固定不重疊
不進行KeyBy+計數+滾動按照設定的計數,劃分窗口1個數量固定,數據不固定不重疊
不進行KeyBy+計數+滑動按照設定的計數+步長,劃分窗口1個固定計數重疊
進行KeyBy+時間+滾動按照設定的時間間隔,劃分窗口1或n個時間固定,數量不固定不重疊
進行KeyBy+時間+滑動按照設定的時間間隔+步長,劃分窗口1或n個時間固定,數量不固定重疊
進行KeyBy+時間+會話按照設定的時間間隔,超過該時間間隔沒有數據進入,
則開啟新的窗口
1或n個時間不固定,數量不固定不重疊
進行KeyBy+計數+滾動按照設定的計數,劃分窗口1或n個數量固定,數據不固定不重疊
進行KeyBy+計數+滑動按照設定的計數+步長,劃分窗口1或n個固定計數重疊

1.3 計算函數

如果要進行窗口計算,需要先通過windowAll或window方法進行開窗操作,開窗之后就可以使用其計算函數。通過拿AllWindowedStream類來舉例子,可以看到有很多種計算方式,包括max、min、reduce、aggregate、process等方法。但其實只需要關注其中reduce、aggregate、process三個計算函數即可,因為其它方法都是基于這三個方法中的函數進行擴展的,下面說明一下這三個函數的內容:

1)reduceReduceFunction函數,輸入、累加器以及輸出的數據類型必須一致

reduce方法需要實現ReduceFunction函數:

/*** 輸入、累加器以及輸出的數據類型必須一致*/
@Public
@FunctionalInterface
public interface ReduceFunction<T> extends Function, Serializable {/*** 進行累加的reduce方法,輸入、累加器以及輸出的數據類型必須一致* @param value1  上一次累加的結果* @param value2  這一次進來的數據* @return 返回相同數據類型的數據*/T reduce(T value1, T value2) throws Exception;
}

2)aggregateAggregateFunction函數,輸入、累加器以及輸出的數據類型可以不一致

aggregate:AggregateFunction函數:

/*** 進行累加的reduce方法,輸入、累加器以及輸出的數據類型必須一致* IN  輸入的數據類型* ACC  累加器的數據類型* OUT  返回的數據類型*/
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {/*** 創建一個累加器,也就是初始化最初開始累積的值。比如你是一個計算總數的,你初始累加器就可以是0。*/ACC createAccumulator();/*** 進行累加操作,也就是計算操作。(不要被add的名稱誤導了,它就是寫你自己的聚合邏輯)* @param value 這一次進來的數據* @param accumulator 上一次累加的結果* @return 返回累加的數據*/ACC add(IN value, ACC accumulator);/*** 輸出的最終值給下游算子*/OUT getResult(ACC accumulator);/*** 一般在會話窗口中使用,用于合并窗口與窗口結果*/ACC merge(ACC a, ACC b);
}

3)processProcessWindowFunctionProcessAllWindowFunction函數,這個是自定義計算函數

ProcessWindowFunctionProcessAllWindowFunction函數之前在《系列之十一 - Data Stream API的中間算子的底層原理及其自定義》就有講過,但是沒有代碼演示,這一章會進行代碼演示。ProcessWindowFunctionProcessAllWindowFunction函數其實就是自定義聚合方式,兩者區別在于是否使用過keyBy操作。他們有一個process方法要實現,該方法只有窗口結束時,才會被執行。

ProcessWindowFunction函數:

/*** 進行累加的reduce方法,輸入、累加器以及輸出的數據類型必須一致* IN  輸入的數據類型* OUT  返回的數據類型* KEY  key的類型,如果是ProcessAllWindowFunction,則沒有該泛型* W  開窗類型*/
@PublicEvolving
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>extends AbstractRichFunction {/*** 處理聚合操作的方法* */public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
}

2 代碼演示

前面講了很多概念,可能通過字面還是很難理解,現在通過demo的方式來說明。可能就能夠理解。由于做KeyBy和不做KeyBy唯一的區別就是有多少個子任務來處理開窗操作。所以這里只演示做KeyBy的示例:

以下所有代碼參考lesson08子模塊

2.1 reduce、aggregate&process演示

這里先對reduce、aggregate和process三個方法進行演示,本次演示大家要關注的是看看三個方法的差異

方法實現函數關鍵函數方法特性執行頻率是否有rich函數
reduceReduceFunctionreduce輸入、輸出和累加器的數據類型必須一致每個窗口除了第一條數據之外,每來一條數據都會執行
aggregateAggregateFunctionadd輸入、輸出和累加器的數據類型可以不一致每個窗口每來一條數據都會執行
processProcessWindowFunction或
者ProcessAllWindowFunction
process輸入和輸出的數據類型可以不一致每個窗口結束時,執行一次過期(非Rich函數已經能夠獲取上下文)

2.1.1 reduce方法演示

示例說明:輸入的cpu值在同一個窗口中累加起來。看看reduce如何被調用

ReduceDemo類:

import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;/*** 演示reduce方法*/
public class ReduceDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 開窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 計算SingleOutputStreamOperator<ServerInfo> reduce = windowStream.reduce(new ReduceFunction<ServerInfo>() {@Overridepublic ServerInfo reduce(ServerInfo value1, ServerInfo value2) throws Exception {System.out.println("==reduce: value1="+ value1 + "value2=" + value2);value1.setCpu(value1.getCpu()+value2.getCpu());return value1;}});// 7. 打印reduce.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}

輸入:前2條數據快速輸入,第3條等一段時間后再輸入
在這里插入圖片描述

輸出:
在這里插入圖片描述

知識點:總共輸入了3條數據,其中前2條數據快速輸入,第3條等一段時間后再輸入,結果是:reduce只打印1次,print打印2次。原因:
1)本示例中使用滾動窗口,設置時間間隔為10秒,因此第一條和第二條在第一個時間窗口,第三條在第二個時間窗口,因此print打印2次
2)reduce只輸出1次,這個在《系列之八 - Data Stream API的中間算子:轉換和聚合》中講聚合類算子講過,reduce第一條數據是不會被調用的。因此第一條數據進來時,不調用reduce;第二條數據進來時,調用reduce;第三條數據進來時,由于是新的窗口,因此它算是該窗口的第一條數據,因此也不調用reduce。

2.1.2 aggregate方法演示

示例說明:通過aggregate實現求同一個窗口下的平均cpu值

AggregateDemo類:

import com.demo.lesson08.model.ServerAvgInfo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;/*** 演示Aggregate方法*/
public class AggregateDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 開窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 計算SingleOutputStreamOperator<String> reduce = windowStream.aggregate(new AggregateFunction<ServerInfo, ServerAvgInfo, String>() {@Overridepublic ServerAvgInfo createAccumulator() {// 初始值System.out.println("====createAccumulator====");return new ServerAvgInfo();}@Overridepublic ServerAvgInfo add(ServerInfo value, ServerAvgInfo accumulator) {System.out.println("====add====");// 累積cpu值以及條數accumulator.setServerId(value.getServerId());accumulator.setNum(accumulator.getNum()==null?1:accumulator.getNum()+1);accumulator.setCpuTotal(accumulator.getCpuTotal()==null?value.getCpu():accumulator.getCpuTotal()+ value.getCpu());return accumulator;}@Overridepublic String getResult(ServerAvgInfo accumulator) {System.out.println("====getResult====");// 平均cpu值return "平均cpu值: "+(accumulator.getCpuTotal()/accumulator.getNum());}@Overridepublic ServerAvgInfo merge(ServerAvgInfo a, ServerAvgInfo b) {System.out.println("====merge====");return null;}});// 7. 打印reduce.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}

輸入:前3條數據快速輸入,第4條等一段時間(10秒)后再輸入
在這里插入圖片描述

輸出:
在這里插入圖片描述

知識點:總共輸入了4條數據,其中前3條數據快速輸入,第4條等一段時間(10秒)后再輸入。結果是:createAccumulator打印2次,add打印4次,getResult打印2次,merge打印0次,print打印2次。原因:
1)本示例使用滾動窗口,設置時間間隔為10秒,因此第一條、第二條和第三條在第一個時間窗口,第四條在第二個時間窗口,因此2個窗口print打印2次。createAccumulator和getResult也是打印2次,因此說明他們是在窗口創建和關閉時分別被調用
2)add輸出4次,說明每一次數據輸入,都會執行add方法
3)支持輸入、累加器以及輸出都是不一樣的數據類型

2.1.3 process方法演示

示例說明:通過process實現求同一個窗口下的平均cpu值

ProcessDemo類:

import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;public class ProcessDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 開窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 計算SingleOutputStreamOperator<String> reduce = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {System.out.println("該窗口的時間:"+DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的條數=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印reduce.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}

輸入:前3條數據快速輸入,等待控制臺輸出process,再輸入第4條數據
在這里插入圖片描述

輸出:
在這里插入圖片描述

知識點:總共輸入了4條數據,其中前3條數據快速輸入,第4條等一段時間后再輸入。結果是:process打印2次,print打印2次,原因:
1)本示例使用滾動窗口,設置時間間隔為10秒,因此第一條、第二條和第三條在第一個時間窗口,第四條在第二個時間窗口,因此2個窗口print打印2次。
2)process打印2次,說明每個窗口在最后的時候process才會調用一次,并且將本次窗口的數據條數都一次性計算,并沒有每條數據計算。
3)支持輸入和輸出都是不一樣的數據類型

2.1.4 結合使用

細心的朋友還能看到reduce或者aggregate方法還有另外重載的方法,可以傳入2個參數。比如reduce可以傳入ReduceFunctionProcessWindowFunction,aggregate可以傳入AggregateFunctionProcessWindowFunction。其實這是由于它們各自存在一定的缺點,為了彌補缺點,可以結合使用,各取有點。

注意:使用時是會將ReduceFunctionAggregateFunction最終的數據給ProcessWindowFunction,也就是每個窗口,ProcessWindowFunctionprocess方法的elements只會拿到ReduceFunctionAggregateFunction累加器最后計算得到的一條數據

示例說明:在ReduceFunction中累積CPU,然后在ProcessWindowFunction計算平均值

ReduceAndProcessDemo 類:

import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** 演示reduce方法*/
public class ReduceAndProcessDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 開窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 計算SingleOutputStreamOperator<ServerInfo> reduce = windowStream.reduce(new ReduceFunction<>() {@Overridepublic ServerInfo reduce(ServerInfo value1, ServerInfo value2) throws Exception {System.out.println("==reduce: value1=" + value1 + "value2=" + value2);value1.setCpu(value1.getCpu() + value2.getCpu());return value1;}}, new ProcessWindowFunction<>() {@Overridepublic void process(String key, ProcessWindowFunction<ServerInfo, ServerInfo, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<ServerInfo> out) throws Exception {// 平均cpu值long num = elements.spliterator().estimateSize();ServerInfo serverInfo = elements.iterator().next();System.out.println("服務器id=" + key + "在窗口的時間:" + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " + DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+ " 的條數=" + elements.spliterator().estimateSize()+ " 錯誤平均cpu值: " + (serverInfo.getCpu() / num));out.collect(serverInfo);}});// 7. 打印reduce.print("最終結果");// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}

輸入:快速輸入3條數據,保證3條數據在一個窗口
在這里插入圖片描述

輸出:
在這里插入圖片描述

知識點:從上面輸出結果可以看到,reduce被調用2次,這是因為總共輸入3條數據。process被調用一次,而且可以看到平均值計算錯誤,在process中打印的結果顯示獲取到的條數是1條。由此可以看出process獲取到的是reduce最后一條數據。

2.2 時間窗口演示

時間窗口的演示,只需要關注的是不同窗口的時間劃分

2.2.3 時間滾動窗口

示例說明:通過process實現求同一個窗口下的平均cpu值

TumblingTimeWindowsDemo 類:

import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 時間滾動窗口示例*/
public class TumblingTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 開窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(10)));// 6. 計算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的開始時間和結束時間System.out.println("該窗口的時間:"+DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的條數=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}

輸入:前3條數據快速輸入,等待控制臺輸出數據之后,再輸入第4條
在這里插入圖片描述

輸出:
在這里插入圖片描述

知識點:總共輸入了4條數據,其中前3條數據快速輸入,第4條等一段時間后再輸入。結果是:process打印2次,說明被分為2個窗口:
1)注意窗口的開始時間和結束時間,第一個窗口是從0秒-10秒,第二個窗口是從10秒-20秒
2)這說明滾動窗口,窗口之間是不會重疊的。
3)另外注意的點是,窗口時間范圍是左閉右開的,也就是0秒是屬于第一個窗口,但10秒不屬于第一個窗口,而是屬于第二個窗口

2.2.2 時間滑動窗口

示例說明:通過process實現求同一個窗口下的平均cpu值

SlidingTimeWindowsDemo 類:

import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 時間滑動窗口示例*/
public class SlidingTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 開窗 - 滑動窗口,間隔為10秒,步長為5秒WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(SlidingProcessingTimeWindows.of(Duration.ofSeconds(10),Duration.ofSeconds(5)));// 6. 計算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的開始時間和結束時間System.out.println("該窗口的時間:"+ DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的條數=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}

輸入:先快速輸入前三條,等待控制臺輸出之后,再輸入第4條數據
在這里插入圖片描述

輸出:
在這里插入圖片描述

知識點:從控制臺可以看到process被執行了2次,說明開了2個窗口。
1)注意其每個窗口的開始時間和結束時間,比如第一個窗口是從15秒-25秒,第二個窗口是從20秒-30秒,說明在20秒-25秒是兩個窗口重疊之處,也就是代碼中設置的步長5秒
2)前三條數據分別被第一個窗口和第二個窗口給計算了,因此說明前3條數據輸入時間落在了20秒-25秒之間。因此說明兩個窗口是會重疊的。

2.2.3 時間會話窗口

示例說明:通過process實現求同一個窗口下的平均cpu值

SessionTimeWindowsDemo類:

import com.demo.lesson08.model.ServerInfo;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Iterator;/*** 時間會話窗口演示*/
public class SessionTimeWindowsDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new KeySelectorFunction());// 5. 開窗WindowedStream<ServerInfo, String, TimeWindow> windowStream = kyStream.window(// 固定間隔時間ProcessingTimeSessionWindows.withGap(Duration.ofSeconds(10))// 動態間隔時間
//                ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<ServerInfo>(){
//                    @Override
//                    public long extract(ServerInfo element) {
//                        return 0;
//                    }
//                }));// 6. 計算SingleOutputStreamOperator<String> process = windowStream.process(new ProcessWindowFunction<ServerInfo, String, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, TimeWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {// 打印窗口的開始時間和結束時間System.out.println("該窗口的時間:"+ DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getStart())+ " - " +DateFormatUtils.ISO_8601_EXTENDED_DATETIME_TIME_ZONE_FORMAT.format(context.window().getEnd())+" 的條數=" + elements.spliterator().estimateSize());// 平均cpu值Iterator<ServerInfo> iterator = elements.iterator();double cpu = 0l;long num = 0;while (iterator.hasNext()){cpu = cpu + iterator.next().getCpu();num ++;}String result = "平均cpu值: "+(cpu/num);out.collect(result);}});// 7. 打印process.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}

輸入:前面2條數據在10秒鐘之內輸入,第2條數據輸入后的10秒鐘之后,再輸入第3條數據
在這里插入圖片描述

輸出:
在這里插入圖片描述

知識點:從控制臺可以看到process被執行2次,因此它是開了2個窗口
1)前面兩條數據被合并為一個窗口,因此平均值是2.5,第三條數據在第二個窗口,因此平均值2.8。
2)注意一下每個窗口的開始和結束時間,都不想之前滾動或者滑動窗口一樣,都是固定的
3)會話窗口還支持動態規定間隔時間,你可以使用withDynamicGap方法,實現函數接口,在輸入的數據中設置某個屬性為間隔時間,實現動態的會話窗口。

2.3 計數窗口演示

2.3.1 計數滾動窗口

示例說明:通過process實現求同一個窗口下的平均cpu值

TumblingCountWindowsDemo 類:

import com.demo.lesson08.method.ReduceDemo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.Iterator;/*** 計數滾動窗口示例*/
public class TumblingCountWindowsDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 開窗 - 活動窗口,間隔為3條數據WindowedStream<ServerInfo, String, GlobalWindow> windowedStream = kyStream.countWindow(3);// 6. 計算 - 計算平均值SingleOutputStreamOperator<String> process = windowedStream.process(new ProcessWindowFunction<>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, GlobalWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {long num = elements.spliterator().estimateSize();Iterator<ServerInfo> iterator = elements.iterator();double sum = 0l;while (iterator.hasNext()) {ServerInfo next = iterator.next();sum = sum + next.getCpu();}out.collect("cpu平均值=" + (sum / num) + " 條數=" + num);}});// 7. 打印process.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}

輸入:輸入前3條,再輸入后3條
在這里插入圖片描述

輸出:(平均值由于精度問題不用管)
在這里插入圖片描述

知識點:示例中設置滾動窗口,每個窗口3條數據。輸入了6條數據,process調用了2次
1)這樣就知道滾動窗口是每3條會新建一個窗口,窗口之間不重疊,與代碼示例中設置的一樣

2.3.2 計數滑動窗口

示例說明:通過process實現求同一個窗口下的平均cpu值

SlidingCountWindowsDemo 類:

import com.demo.lesson08.method.ReduceDemo;
import com.demo.lesson08.model.ServerInfo;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;import java.util.Iterator;/*** 計數滑動窗口示例*/
public class SlidingCountWindowsDemo {public static void main(String[] args) throws Exception {// 1. 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取數據DataStreamSource<String> text = env.socketTextStream("127.0.0.1", 9999);// 3. map做類型轉換SingleOutputStreamOperator<ServerInfo> map = text.map(new ReduceDemo.ServerInfoMapFunction());// 4. 做keyByKeyedStream<ServerInfo, String> kyStream = map.keyBy(new ReduceDemo.KeySelectorFunction());// 5. 開窗 - 活動窗口,間隔為3條數據WindowedStream<ServerInfo, String, GlobalWindow> windowedStream = kyStream.countWindow(4,2);// 6. 計算 - 計算平均值SingleOutputStreamOperator<String> process = windowedStream.process(new ProcessWindowFunction<>() {@Overridepublic void process(String s, ProcessWindowFunction<ServerInfo, String, String, GlobalWindow>.Context context, Iterable<ServerInfo> elements, Collector<String> out) throws Exception {long num = elements.spliterator().estimateSize();Iterator<ServerInfo> iterator = elements.iterator();double sum = 0l;while (iterator.hasNext()) {sum = sum + iterator.next().getCpu();}out.collect("cpu平均值=" + (sum / num) + " 條數=" + num);}});// 7. 打印process.print();// 執行env.execute();}public static class ServerInfoMapFunction implements MapFunction<String, ServerInfo> {@Overridepublic ServerInfo map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new ServerInfo(value1,value2,value3);}}public static class KeySelectorFunction implements KeySelector<ServerInfo, String> {@Overridepublic String getKey(ServerInfo value) throws Exception {// 返回第一個值,作為keyBy的分類return value.getServerId();}}
}

輸入:先輸入2條,等控制臺輸出后再輸入第3條和第4條,等控制臺輸出后再輸入第5條和第6條
在這里插入圖片描述

輸出:
在這里插入圖片描述

知識點:總共輸入6條數據,先輸入2條,等一下再輸入第3條和第4條,等一下再輸入第5條和第6條。process調用了3次
1)可以看出它是會按照步長來調用process,這是因為第1條和第2條會是前一個窗口與當前窗口重疊

結語:本章說明了不同的窗口類型,還通過代碼演示,更深刻了解窗口的工作原理。接下來一章將接觸更底層的Flink如何實現不同窗口的。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/80578.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/80578.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/80578.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

大疆卓馭嵌入式面經及參考答案

FreeRTOS 有哪 5 種內存管理方式&#xff1f; heap_1.c&#xff1a;這種方式簡單地在編譯時分配一塊固定大小的內存&#xff0c;在整個運行期間不會進行內存的動態分配和釋放。它適用于那些對內存使用需求非常明確且固定&#xff0c;不需要動態分配內存的場景&#xff0c;優點是…

Java 線程池原理

Java 線程池是一種管理和復用線程的機制&#xff0c;其原理如下&#xff1a; 核心概念 線程池的初始化 &#xff1a;在創建線程池時&#xff0c;需要設置一些關鍵參數&#xff0c;如核心線程數&#xff08;corePoolSize&#xff09;、最大線程數&#xff08;maximumPoolSize&am…

大模型都有哪些超參數

大模型的超參數是影響其訓練效果、性能和泛化能力的關鍵設置,可分為以下幾大類別并結合實際應用進行詳細說明: 一、訓練過程相關超參數 學習率(Learning Rate) 作用:控制參數更新的步長,直接影響收斂速度和穩定性。過高會導致震蕩或過擬合,過低則收斂緩慢。調整策略:初…

路由器斷流排查終極指南:從Ping測試到Wireshark抓包5步定位法

測試路由器是否出現“斷流”&#xff08;網絡連接間歇性中斷&#xff09;&#xff0c;需通過多維度排查硬件、軟件及外部干擾因素。以下是詳細步驟指南&#xff1a; 一、基礎環境準備 設備連接 有線測試&#xff1a;用網線將電腦直接連接路由器LAN口&#xff0c;排除WiFi干擾。…

低代碼開發:開啟軟件開發的新篇章

摘要 低代碼開發作為一種新興的軟件開發方式&#xff0c;正在迅速改變傳統軟件開發的模式和效率。它通過可視化界面和預設的模板&#xff0c;使非專業開發者也能夠快速構建應用程序&#xff0c;極大地降低了開發門檻和成本。本文將深入探討低代碼開發的定義、優勢、應用場景以及…

基于Django汽車數據分析大屏可視化系統項目

基于Django汽車數據分析大屏可視化系統項目 一、項目概述 本項目是一個基于 Python 的汽車數據分析大屏可視化系統&#xff0c;旨在通過直觀的可視化界面展示汽車相關數據&#xff0c;幫助用戶更好地理解和分析汽車市場動態、車輛性能等信息。系統采用前后端分離的架構&#…

WebRTC通信原理與流程

1、服務器與協議相關 1.1 STUN服務器 圖1.1.1 STUN服務器在通信中的位置圖 1.1.1 STUN服務簡介 STUN&#xff08;Session Traversal Utilities for NAT&#xff0c;NAT會話穿越應用程序&#xff09;是一種網絡協議&#xff0c;它允許位于NAT&#xff08;或多重 NAT&#xff09;…

Beta分布--貝葉斯建模概率或比例常用分布

Beta分布是一種定義在區間 ([0, 1]) 上的連續概率分布&#xff0c;常用于描述比例或概率的不確定性。它的形狀由兩個正參數 (\alpha)&#xff08;alpha&#xff09;和 (\beta)&#xff08;beta&#xff09;控制&#xff0c;能夠呈現多種形態&#xff08;如對稱、偏態、U型等&am…

深度學習算法:開啟智能時代的鑰匙

引言 深度學習作為機器學習的一個分支&#xff0c;近年來在圖像識別、自然語言處理、語音識別等多個領域取得了革命性的進展。它的核心在于構建多層的神經網絡&#xff0c;通過模仿人腦處理信息的方式&#xff0c;讓機器能夠從數據中學習復雜的模式。 深度學習算法的基本原理…

深入了解linux系統—— 自定義shell

shell的原理 我們知道&#xff0c;我們程序啟動時創建的進程&#xff0c;它的父進程都是bash也就是shell命令行解釋器&#xff1b; 那bash都做了哪些工作呢&#xff1f; 根據已有的知識&#xff0c;我們可以簡單理解為&#xff1a; 輸出命令行提示符獲取并解析我們輸入的指令…

Redux和Vuex

為什么React和Vue需要Redux和Vuex 狀態管理需求的演變 #mermaid-svg-GaKl3pkZ82yc1m8E {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GaKl3pkZ82yc1m8E .error-icon{fill:#552222;}#mermaid-svg-GaKl3pkZ82yc1m8E…

Kubernetes排錯(十三):Pod間偶發超時問題排查

在微服務架構中&#xff0c;Pod間偶發的通信超時是最令人頭疼的問題之一。本文將通過生產環境中的真實案例&#xff0c;手把手教你定位這類"幽靈問題"。 一、快速定位問題方向&#xff08;5分鐘縮小范圍&#xff09; 1. 基礎檢查三板斧 # 檢查Service與Endpoint映…

Nginx 源碼安裝成服務

一、環境準備 一臺裝有 CentOS 7.9 的虛擬機&#xff08;IP: 192.168.40.81&#xff09;nginx-1.21.6.tar.gz 安裝包一個&#xff08;版本隨意&#xff09; 二、安裝 1&#xff09;解壓 nginx-1.21.6.tar.gz tar -xzvf nginx-1.21.6.tar.gz -tar&#xff1a;這是一個在 Linu…

L51.【LeetCode題解】438. 找到字符串中所有字母異位詞(四種方法)

目錄 1.題目 2.分析 暴力解法 方法1:排序(超時) 方法2:哈希表(險過) ★判斷兩個哈希表是否相同算法(通用方法,必須掌握) 能相等的前提:兩個哈希表的大小相等 哈希表有迭代器,可以使用范圍for從頭到尾遍歷 提交結果 優化方法:定長滑動窗口 提交結果 使用哈希數組更快…

Qt模塊化架構設計教程 -- 輕松上手插件開發

概述 在軟件開發領域,隨著項目的增長和需求的變化,保持代碼的可維護性和擴展性變得尤為重要。一個有效的解決方案是采用模塊化架構,尤其是利用插件系統來增強應用的功能性和靈活性。Qt框架提供了一套強大的插件機制,可以幫助開發者輕松實現這種架構。 模塊化與插件系統 模…

深入理解 HashMap 的索引計算:右移與異或的作用

在 Java 中&#xff0c;HashMap 是一種高效的數據結構&#xff0c;它通過將鍵映射到數組中的索引位置來實現快速的插入和查找。但之前看源碼總是理解到它要hash之后散列到數組中某一個位置&#xff0c;但卻從未深究它究竟怎么散列的&#xff0c;如果不夠散那就意味著hash沖突增…

overleaf較高級的細節指令

換行命令 原來代碼是將三個矩陣表達式在同一行顯示&#xff0c;使用aligned環境&#xff08;需引入amsmath宏包&#xff0c;一般文檔導言區默認會引入&#xff09;&#xff0c;把三個矩陣的定義分別放在不同行&#xff0c;可通過\\換行。 對齊命令 &放在等號前&#xff0…

LiteLLM:統一API接口,讓多種LLM模型調用如臂使指

在人工智能迅猛發展的今天,各種大語言模型(LLM)層出不窮。對開發者而言,如何高效集成和管理這些模型成為一個棘手問題。LiteLLM應運而生,它提供了一個統一的API接口,讓開發者可以輕松調用包括OpenAI、Anthropic、Cohere等在內的多種LLM模型。本文將深入介紹LiteLLM的特性、…

Google語法整理

以下是從整理出的 Google 語法&#xff1a; site&#xff1a;指定域名&#xff0c;如 “apache site:bbs.xuegod.cn”&#xff0c;可查詢網站的收錄情況 。 inurl&#xff1a;限定在 url 中搜索&#xff0c;如 “inurl:qq.txt”&#xff0c;可搜索 url 中包含特定內容的頁面&a…

python 寫一個工作 簡單 番茄鐘

1、圖 2、需求 番茄鐘&#xff08;Pomodoro Technique&#xff09;是一種時間管理方法&#xff0c;由弗朗西斯科西里洛&#xff08;Francesco Cirillo&#xff09;在 20 世紀 80 年代創立。“Pomodoro”在意大利語中意為“番茄”&#xff0c;這個名字來源于西里洛最初使用的一個…