[Flink]二、Flink1.13

7. 處理函數

        之前所介紹的流處理 API,無論是基本的轉換、聚合,還是更為復雜的窗口操作,其實都是基于 DataStream 進行轉換的;所以可以統稱為 DataStream API ,這也是 Flink 編程的核心。而我們知道,為了讓代碼有更強大的表現力和易用性, Flink 本身提供了多層 API ,DataStream API 只是中間的一環,如圖 7-1 所示:
        
        在更底層,我們可以不定義任何具體的算子(比如 map filter ,或者 window ),而只是提
煉出一個統一的“處理”( process )操作——它是所有轉換算子的一個概括性的表達,可以自
定義處理邏輯,所以這一層接口就被叫作“處理函數”( process function )。
        在處理函數中,我們直面的就是數據流中最基本的元素:數據事件(event )、狀態( state
以及時間( time )。這就相當于對流有了完全的控制權。處理函數比較抽象,沒有具體的操作,
所以對于一些常見的簡單應用(比如求和、開窗口)會顯得有些麻煩;不過正是因為它不限定
具體做什么,所以理論上我們可以做任何事情,實現所有需求。所以可以說,處理函數是我們
進行 Flink 編程的“大招”,輕易不用,一旦放出來必然會掃平一切。
        本章我們就深入底層,討論一下 Flink 中處理函數的使用方法。
7.1 基本處理函數( ProcessFunction
處理函數主要是定義數據流的轉換操作,所以也可以把它歸到轉換算子中。我們知道在
Flink 中幾乎所有轉換算子都提供了對應的函數類接口,處理函數也不例外;它所對應的函數
類,就叫作 ProcessFunction
7.1.1 處理函數的功能和使用
我們之前學習的轉換算子,一般只是針對某種具體操作來定義的,能夠拿到的信息比較有
限。比如 map 算子,我們實現的 MapFunction 中,只能獲取到當前的數據,定義它轉換之后
的形式;而像窗口聚合這樣的復雜操作, AggregateFunction 中除數據外,還可以獲取到當前的
183 狀態(以累加器 Accumulator 形式出現)。另外我們還介紹過富函數類,比如 RichMapFunction
它提供了獲取運行時上下文的方法 getRuntimeContext() ,可以拿到狀態,還有并行度、任務名
稱之類的運行時信息。
但是無論那種算子,如果我們想要訪問事件的時間戳,或者當前的水位線信息,都是完全
做不到的。在定義生成規則之后,水位線會源源不斷地產生,像數據一樣在任務間流動,可我
們卻不能像數據一樣去處理它;跟時間相關的操作,目前我們只會用窗口來處理。而在很多應
用需求中,要求我們對時間有更精細的控制,需要能夠獲取水位線,甚至要 把控時間 、定義
什么時候做什么事,這就不是基本的時間窗口能夠實現的了。
于是必須祭出大招——處理函數( ProcessFunction )了。處理函數提供了一個“定時服務”
TimerService ),我們可以通過它訪問流中的事件( event )、時間戳( timestamp )、水位線
watermark ),甚至可以注冊 定時事件 。而且處理函數繼承了 AbstractRichFunction 抽象類,
所以擁有富函數類的所有特性,同樣可以訪問狀態( state )和其他運行時信息。此外,處理函
數還可以直接將數據輸出到側輸出流( side output )中。所以,處理函數是最為靈活的處理方
法,可以實現各種自定義的業務邏輯;同時也是整個 DataStream API 的底層基礎。
處理函數的使用與基本的轉換操作類似,只需要直接基于 DataStream 調用 .process() 方法
就可以了。方法需要傳入一個 ProcessFunction 作為參數,用來定義處理邏輯。
stream.process(new MyProcessFunction())
這里 ProcessFunction 不是接口,而是一個抽象類,繼承了 AbstractRichFunction
MyProcessFunction 是它的一個具體實現。所以所有的處理函數,都是富函數( RichFunction ),
富函數可以調用的東西這里同樣都可以調用。
下面是一個具體的應用示例:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class ProcessFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new 
SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}})).process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, Context ctx, 
Collector<String> out) throws Exception {if (value.user.equals("Mary")) {out.collect(value.user);} else if (value.user.equals("Bob")) {out.collect(value.user);out.collect(value.user);}System.out.println(ctx.timerService().currentWatermark());}}).print();env.execute();}
}
這里我們在 ProcessFunction 中重寫了 .processElement() 方法,自定義了一種處理邏輯:當
數據的 user 為“ Mary ”時,將其輸出一次;而如果為“ Bob ”時,將 user 輸出兩次。這里的
輸 出 , 是 通 過 調 用 out.collect() 來實現的。另外我們還可以調用
ctx.timerService().currentWatermark() 來 獲 取 當 前 的 水 位 線 打 印 輸 出 。 所 以 可 以 看 到 ,
ProcessFunction 函數有點像 FlatMapFunction 的升級版。可以實現 Map Filter FlatMap 的所
有功能。很明顯,處理函數非常強大,能夠做很多之前做不到的事情。
接下來我們就深入 ProcessFunction 內部來進行詳細了解。
7.1.2 ProcessFunction 解析
在源碼中我們可以看到,抽象類 ProcessFunction 繼承了 AbstractRichFunction ,有兩個泛
型類型參數: I 表示 Input ,也就是輸入的數據類型; O 表示 Output ,也就是處理完成之后輸出
的數據類型。
內部單獨定義了兩個方法:一個是必須要實現的抽象方法 .processElement() ;另一個是非
抽象方法 .onTimer()
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...
public abstract void processElement(I value, Context ctx, Collector<O> out) 
throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) 
throws Exception {}
...
}
1. 抽象方法 .processElement()
用于“處理元素”,定義了處理的核心邏輯。這個方法對于流中的每個元素都會調用一次,
參數包括三個:輸入數據值 value ,上下文 ctx ,以及“收集器”( Collector out 。方法沒有返
回值,處理之后的輸出數據是通過收集器 out 來定義的。
? value :當前流中的輸入元素,也就是正在處理的數據,類型與流中數據類
型一致。
? ctx :類型是 ProcessFunction 中定義的內部抽象類 Context ,表示當前運行的
上下文,可以獲取到當前的時間戳,并提供了用于查詢時間和注冊定時器的“定時服
務” (TimerService) ,以及可以將數據發送到“側輸出流”( side output )的方法 .output()
Context 抽象類定義如下:
public abstract class Context {
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
? out :“收集器”(類型為 Collector ),用于返回輸出數據。使用方式與 flatMap
算子中的收集器完全一樣,直接調用 out.collect() 方法就可以向下游發出一個數據。
這個方法可以多次調用,也可以不調用。
通過幾個參數的分析不難發現, ProcessFunction 可以輕松實現 flatMap 這樣的基本轉換功
能(當然 map filter 更不在話下);而通過富函數提供的獲取上下文方法 .getRuntimeContext()
也可以自定義狀態( state )進行處理,這也就能實現聚合操作的功能了。關于自定義狀態的具
體實現,我們會在后續“狀態管理”一章中詳細介紹。
2. 非抽象方法 .onTimer()
用于定義定時觸發的操作,這是一個非常強大、也非常有趣的功能。這個方法只有在注冊
好的定時器觸發的時候才會調用,而定時器是通過“定時服務” TimerService 來注冊的。打個
比方,注冊定時器( timer )就是設了一個鬧鐘,到了設定時間就會響;而 .onTimer() 中定義的,
就是鬧鐘響的時候要做的事。所以它本質上是一個基于時間的“回調”( callback )方法,通過
時間的進展來觸發;在事件時間語義下就是由水位線( watermark )來觸發了。
.processElement() 類似,定時方法 .onTimer() 也有三個參數:時間戳( timestamp ),上下
文( ctx ),以及收集器( out )。這里的 timestamp 是指設定好的觸發時間,事件時間語義下當
186 然就是水位線了。另外這里同樣有上下文和收集器,所以也可以調用定時服務( TimerService ),
以及任意輸出處理之后的數據。
既然有 .onTimer() 方法做定時觸發,我們用 ProcessFunction 也可以自定義數據按照時間分
組、定時觸發計算輸出結果;這其實就實現了窗口( window )的功能。所以說 ProcessFunction
是真正意義上的終極奧義,用它可以實現一切功能。
我們也可以看到,處理函數都是基于事件觸發的。水位線就如同插入流中的一條數據一樣;
只不過處理真正的數據事件調用的是 .processElement() 方法,而處理水位線事件調用的
.onTimer()
這里需要注意的是,上面的 .onTimer() 方法只是定時器觸發時的操作,而定時器( timer
真正的設置需要用到上下文 ctx 中的定時服務。在 Flink 中,只有“按鍵分區流” KeyedStream
才支持設置定時器的操作,所以之前的代碼中我們并沒有使用定時器。所以基于不同類型的流,
可以使用不同的處理函數,它們之間還是有一些微小的區別的。接下來我們就介紹一下處理函
數的分類。
7.1.3 處理函數的分類
Flink 中的處理函數其實是一個大家族, ProcessFunction 只是其中一員。
我們知道, DataStream 在調用一些轉換方法之后,有可能生成新的流類型;例如調
.keyBy() 之后得到 KeyedStream ,進而再調用 .window() 之后得到 WindowedStream 。對于不同
類型的流,其實都可以直接調用 .process() 方法進行自定義處理,這時傳入的參數就都叫作處理
函數。當然,它們盡管本質相同,都是可以訪問狀態和時間信息的底層 API ,可彼此之間也會
有所差異。
Flink 提供了 8 個不同的處理函數:
1 ProcessFunction
最基本的處理函數,基于 DataStream 直接調用 .process() 時作為參數傳入。
2 KeyedProcessFunction
對流按鍵分區后的處理函數,基于 KeyedStream 調用 .process() 時作為參數傳入。要想使用
定時器,比如基于 KeyedStream
3 ProcessWindowFunction
開窗之后的處理函數,也是全窗口函數的代表。基于 WindowedStream 調用 .process() 時作
為參數傳入。
4 ProcessAllWindowFunction
同樣是開窗之后的處理函數,基于 AllWindowedStream 調用 .process() 時作為參數傳入。
5 CoProcessFunction
187 合并( connect )兩條流之后的處理函數,基于 ConnectedStreams 調用 .process() 時作為參
數傳入。關于流的連接合并操作,我們會在后續章節詳細介紹。
6 ProcessJoinFunction
間隔連接( interval join )兩條流之后的處理函數,基于 IntervalJoined 調用 .process() 時作為
參數傳入。
7 BroadcastProcessFunction
廣播連接流處理函數,基于 BroadcastConnectedStream 調用 .process() 時作為參數傳入。這
里的“廣播連接流” BroadcastConnectedStream ,是一個未 keyBy 的普通 DataStream 與一個廣
播流( BroadcastStream )做連接( conncet )之后的產物。關于廣播流的相關操作,我們會在后
續章節詳細介紹。
8 KeyedBroadcastProcessFunction
按鍵分區的廣播連接流處理函數,同樣是基于 BroadcastConnectedStream 調用 .process()
作為參數傳入。與 BroadcastProcessFunction 不同的是,這時的廣播連接流,是一個 KeyedStream
與廣播流( BroadcastStream )做連接之后的產物。
接下來,我們就對 KeyedProcessFunction ProcessWindowFunction 的具體用法展開詳細
說明。
7.2 按鍵分區處理函數( KeyedProcessFunction
Flink 程序中,為了實現數據的聚合統計,或者開窗計算之類的功能,我們一般都要先
keyBy 算子對數據流進行“按鍵分區”,得到一個 KeyedStream 。也就是指定一個鍵( key ),
按照它的哈希值( hash code )將數據分成不同的“組”,然后分配到不同的并行子任務上執行
計算;這相當于做了一個邏輯分流的操作,從而可以充分利用并行計算的優勢實時處理海量數
據。
另外我們在上節中也提到,只有在 KeyedStream 中才支持使用 TimerService 設置定時器的
操作。所以一般情況下,我們都是先做了 keyBy 分區之后,再去定義處理操作;代碼中更加
常見的處理函數是 KeyedProcessFunction ,最基本的 ProcessFunction 反而出鏡率沒那么高。
接下來我們就先從定時服務( TimerService )入手,詳細講解 KeyedProcessFunction 的用
7.2.1 定時器( Timer )和定時服務( TimerService
KeyedProcessFunction 的一個特色,就是可以靈活地使用定時器。
定時器( timers )是處理函數中進行時間相關操作的主要機制。在 .onTimer() 方法中可以實
現定時處理的邏輯,而它能觸發的前提,就是之前曾經注冊過定時器、并且現在已經到了觸發
188 時間。注冊定時器的功能,是通過上下文中提供的“定時服務”( TimerService )來實現的。
定時服務與當前運行的環境有關。前面已經介紹過, ProcessFunction 的上下文( Context
中提供了 .timerService() 方法,可以直接返回一個 TimerService 對象:
public abstract TimerService timerService();
TimerService Flink 關于時間和定時器的基礎服務接口,包含以下六個方法:
// 獲取當前的處理時間
long currentProcessingTime();
// 獲取當前的水位線(事件時間)
long currentWatermark();
// 注冊處理時間定時器,當處理時間超過 time 時觸發
void registerProcessingTimeTimer(long time);
// 注冊事件時間定時器,當水位線超過 time 時觸發
void registerEventTimeTimer(long time);
// 刪除觸發時間為 time 的處理時間定時器
void deleteProcessingTimeTimer(long time);
// 刪除觸發時間為 time 的處理時間定時器
void deleteEventTimeTimer(long time);
六個方法可以分成兩大類:基于處理時間和基于事件時間。而對應的操作主要有三個:獲
取當前時間,注冊定時器,以及刪除定時器。需要注意,盡管處理函數中都可以直接訪問
TimerService ,不過只有基于 KeyedStream 的處理函數,才能去調用注冊和刪除定時器的方法;
未作按鍵分區的 DataStream 不支持定時器操作,只能獲取當前時間。
對于處理時間和事件時間這兩種類型的定時器, TimerService 內部會用一個優先隊列將它
們的時間戳( timestamp )保存起來,排隊等待執行。可以認為,定時器其實是 KeyedStream
上處理算子的一個狀態,它以時間戳作為區分。所以 TimerService 會以鍵( key )和時間戳為
標準,對定時器進行去重;也就是說對于每個 key 和時間戳,最多只有一個定時器,如果注冊
了多次, onTimer() 方法也將只被調用一次。這樣一來,我們在代碼中就方便了很多,可以肆
無忌憚地對一個 key 注冊定時器,而不用擔心重復定義——因為一個時間戳上的定時器只會觸
發一次。
基于 KeyedStream 注冊定時器時,會傳入一個定時器觸發的時間戳,這個時間戳的定時器
對于每個 key 都是有效的。這樣,我們的代碼并不需要做額外的處理,底層就可以直接對不同
key 進行獨立的處理操作了。
利用這個特性,有時我們可以故意降低時間戳的精度,來減少定時器的數量,從而提高處
理性能。比如我們可以在設置定時器時只保留整秒數,那么定時器的觸發頻率就是最多 1 秒一
次。
long coalescedTime = time / 1000 * 1000;
189 ctx.timerService().registerProcessingTimeTimer(coalescedTime);
這里注意定時器的時間戳必須是毫秒數,所以我們得到整秒之后還要乘以 1000 。定時器
默認的區分精度是毫秒。
另外 Flink .onTimer() .processElement() 方法是同步調用的( synchronous ),所以也不會
出現狀態的并發修改。
Flink 的定時器同樣具有容錯性,它和狀態一起都會被保存到一致性檢查點( checkpoint
中。當發生故障時, Flink 會重啟并讀取檢查點中的狀態,恢復定時器。如果是處理時間的定
時器,有可能會出現已經“過期”的情況,這時它們會在重啟時被立刻觸發。關于 Flink 的檢
查點和容錯機制,我們會在后續章節詳細講解。
7.2.2 KeyedProcessFunction 的使用
KeyedProcessFunction 可以說是處理函數中的 嫡系部隊 ,可以認為是 ProcessFunction
一個擴展。我們只要基于 keyBy 之后的 KeyedStream ,直接調用 .process() 方法,這時需要傳入
的參數就是 KeyedProcessFunction 的實現類。
stream.keyBy( t -> t.f0 )
.process(new MyKeyedProcessFunction())
類似地, KeyedProcessFunction 也是繼承自 AbstractRichFunction 的一個抽象類,源碼中定
義如下:
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
{
...
public abstract void processElement(I value, Context ctx, Collector<O> out)
throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
throws Exception {}
public abstract class Context {...}
...
}
可以看到與 ProcessFunction 的定義幾乎完全一樣,區別只是在于類型參數多了一個 K
這是當前按鍵分區的 key 的類型。同樣地,我們必須實現一個 .processElement() 抽象方法,用
來處理流中的每一個數據;另外還有一個非抽象方法 .onTimer() ,用來定義定時器觸發時的回
調操作。由于定時器只能在 KeyedStream 上使用,所以到了 KeyedProcessFunction 這里,我們
才真正對時間有了精細的控制,定時方法 .onTimer() 才真正派上了用場。
下面是一個使用處理時間定時器的具體示例:
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
public class ProcessingTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 處理時間語義,不需要分配時間戳和 watermarkSingleOutputStreamOperator<Event> stream = env.addSource(new 
ClickSource());// 要用定時器,必須基于 KeyedStreamstream.keyBy(data -> true).process(new KeyedProcessFunction<Boolean, Event, String>() {@Overridepublic void processElement(Event value, Context ctx, 
Collector<String> out) throws Exception {Long currTs = ctx.timerService().currentProcessingTime();out.collect("數據到達,到達時間:" + new Timestamp(currTs));// 注冊一個 10 秒后的定時器ctx.timerService().registerProcessingTimeTimer(currTs + 10 
* 1000L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out) throws Exception {out.collect("定時器觸發,觸發時間:" + new Timestamp(timestamp));}}).print();env.execute();}
}
在上面的代碼中,由于定時器只能在 KeyedStream 上使用,所以先要進行 keyBy ;這里
.keyBy(data -> true) 是將所有數據的 key 都指定為了 true ,其實就是所有數據擁有相同的 key
會分配到同一個分區。
之后我們自定義了一個 KeyedProcessFunction ,其中 .processElement() 方法是每來一個數據
都會調用一次,主要是定義了一個 10 秒之后的定時器;而 .onTimer() 方法則會在定時器觸發時
調用。所以我們會看到,程序運行后先在控制臺輸出“數據到達”的信息,等待 10 秒之后,
又會輸出“定時器觸發”的信息,打印出的時間間隔正是 10 秒。
當然,上面的例子是處理時間的定時器,所以我們是真的需要等待 10 秒才會看到結果。
事件時間語義下,又會有什么不同呢?我們可以對上面的代碼略作修改,做一個測試:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
public class EventTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
192env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new 
CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonot
onousTimestamps().withTimestampAssigner(new 
SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long 
recordTimestamp) {return element.timestamp;}}));// 基于 KeyedStream 定義事件時間定時器stream.keyBy(data -> true).process(new KeyedProcessFunction<Boolean, Event, String>() {@Overridepublic void processElement(Event value, Context ctx, 
Collector<String> out) throws Exception {out.collect("數據到達,時間戳為:" + ctx.timestamp());out.collect(" 數據到達,水位線為: " + 
ctx.timerService().currentWatermark() + "\n -------分割線-------");// 注冊一個 10 秒后的定時器ctx.timerService().registerEventTimeTimer(ctx.timestamp() 
+ 10 * 1000L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out) throws Exception {out.collect("定時器觸發,觸發時間:" + timestamp);}}).print();env.execute();}// 自定義測試數據源public static class CustomSource implements SourceFunction<Event> {@Overridepublic void run(SourceContext<Event> ctx) throws Exception {// 直接發出測試數據ctx.collect(new Event("Mary", "./home", 1000L));// 為了更加明顯,中間停頓 5 秒鐘Thread.sleep(5000L);// 發出 10 秒后的數據ctx.collect(new Event("Mary", "./home", 11000L));Thread.sleep(5000L);// 發出 10 秒+1ms 后的數據ctx.collect(new Event("Alice", "./cart", 11001L));Thread.sleep(5000L);}@Overridepublic void cancel() { }}
}
由于是事件時間語義,所以我們必須從數據中提取出數據產生的時間戳。這里為了更清楚
地看到程序行為,我們自定義了一個數據源,發出三條測試數據,時間戳分別為 1000 11000
11001 ,并且發出數據后都會停頓 5 秒。
在代碼中,我們依然將所有數據分到同一分區,然后在自定義的 KeyedProcessFunction
使用定時器。同樣地,每來一條數據,我們就將當前的數據時間戳和水位線信息輸出,并注冊
一個 10 秒后(以當前數據時間戳為基準)的事件時間定時器。執行程序結果如下:
        
每來一條數據,都會輸出兩行“數據到達”的信息,并以分割線隔開;兩條數據到達的時
間間隔為 5 秒。當第三條數據到達后,隨后立即輸出一條定時器觸發的信息;再過 5 秒之后,
剩余兩條定時器信息輸出,程序運行結束。
我們可以發現,數據到來之后,當前的水位線與時間戳并不是一致的。當第一條數據到來,
時間戳為 1000 ,可水位線的生成是周期性的(默認 200ms 一次),不會立即發生改變,所以依
然是最小值 Long.MIN_VALUE ;隨后只要到了水位線生成的時間點( 200ms 到了),就會依據
當前的最大時間戳 1000 來生成水位線了。這里我們沒有設置水位線延遲,默認需要減去 1
秒,所以水位線推進到了 999 。而當時間戳為 11000 的第二條數據到來之后,水位線同樣沒有
立即改變,仍然是 999 ,就好像總是“滯后”數據一樣。
這樣程序的行為就可以得到合理解釋了。事件時間語義下,定時器觸發的條件就是水位線
推進到設定的時間。第一條數據到來后,設定的定時器時間為 1000 + 10 * 1000 = 11000 ;而當
時間戳為 11000 的第二條數據到來,水位線還處在 999 的位置,當然不會立即觸發定時器;而
之后水位線會推進到 10999 ,同樣是無法觸發定時器的。必須等到第三條數據到來,將水位線
真正推進到 11000 ,就可以觸發第一個定時器了。第三條數據發出后再過 5 秒,沒有更多的數
據生成了,整個程序運行結束將要退出,此時 Flink 會自動將水位線推進到長整型的最大值
Long.MAX_VALUE )。于是所有尚未觸發的定時器這時就統一觸發了,我們就在控制臺看到
了后兩個定時器的觸發信息。
7.3 窗口處理函數
除 了 KeyedProcessFunction , 另 外 一 大 類 常 用 的 處 理 函 數 , 就 是 基 于 窗 口 的
ProcessWindowFunction ProcessAllWindowFunction 了。如果看了前面的章節,會發現我們
195 之前已經簡單地使用過窗口處理函數了。
7.3.1 窗口處理函數的使用
進行窗口計算,我們可以直接調用現成的簡單聚合方法( sum/max/min , 也可以通過調
.reduce() .aggregate() 來自定義一般的增量聚合函數( ReduceFunction/AggregateFucntion );
而對于更加復雜、需要窗口信息和額外狀態的一些場景,我們還可以直接使用全窗口函數、把
數據全部收集保存在窗口內,等到觸發窗口計算時再統一處理。窗口處理函數就是一種典型的
全窗口函數。
窗 口 處 理 函 數 ProcessWindowFunction 的 使 用 與 其 他 窗 口 函 數 類 似 , 也 是 基 于
WindowedStream 直接調用方法就可以,只不過這時調用的是 .process()
stream.keyBy( t -> t.f0 )
.window( TumblingEventTimeWindows.of(Time.seconds(10)) )
.process(new MyProcessWindowFunction())
7.3.2 ProcessWindowFunction 解析
ProcessWindowFunction 既是處理函數又是全窗口函數。從名字上也可以推測出,它的本
質似乎更傾向于“窗口函數”一些。事實上它的用法也確實跟其他處理函數有很大不同。我們
可以從源碼中的定義看到這一點:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
extends AbstractRichFunction {
...
public abstract void process(
KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws
Exception;
public void clear(Context context) throws Exception {}
public abstract class Context implements java.io.Serializable {...}
}
ProcessWindowFunction 依然是一個繼承了 AbstractRichFunction 的抽象類,它有四個類型
參數:
? IN input ,數據流中窗口任務的輸入數據類型。
? OUT output ,窗口任務進行計算之后的輸出數據類型。
? KEY :數據中鍵 key 的類型。
196 197
? W :窗口的類型,是 Window 的子類型。一般情況下我們定義時間窗口, W
就是 TimeWindow
而內部定義的方法,跟我們之前熟悉的處理函數就有所區別了。因為全窗口函數不是逐個
處理元素的,所以處理數據的方法在這里并不是 .processElement() ,而是改成了 .process() 。方
法包含四個參數。
? key :窗口做統計計算基于的鍵,也就是之前 keyBy 用來分區的字段。
? context :當前窗口進行計算的上下文,它的類型就是 ProcessWindowFunction
內部定義的抽象類 Context
? elements :窗口收集到用來計算的所有數據,這是一個可迭代的集合類型。
? out :用來發送數據輸出計算結果的收集器,類型為 Collector
可以明顯看出,這里的參數不再是一個輸入數據,而是窗口中所有數據的集合。而上下文
context 所包含的內容也跟其他處理函數有所差別:
public abstract class Context implements java.io.Serializable {
public abstract W window();
public abstract long currentProcessingTime();
public abstract long currentWatermark();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
除了可以通過 .output() 方法定義側輸出流不變外,其他部分都有所變化。這里不再持有
TimerService 對象,只能通過 currentProcessingTime() currentWatermark() 來獲取當前時間,所
以失去了設置定時器的功能;另外由于當前不是只處理一個數據,所以也不再提供 .timestamp()
方法。與此同時,也增加了一些獲取其他信息的方法:比如可以通過 .window() 直接獲取到當
前的窗口對象,也可以通過 .windowState() .globalState() 獲取到當前自定義的窗口狀態和全局
狀態。注意這里的“窗口狀態”是自定義的,不包括窗口本身已經有的狀態,針對當前 key
當前窗口有效;而“全局狀態”同樣是自定義的狀態,針對當前 key 的所有窗口有效。
所以我們會發現, ProcessWindowFunction 中除了 .process() 方法外,并沒有 .onTimer() 方法,
而是多出了一個 .clear() 方法。從名字就可以看出,這主要是方便我們進行窗口的清理工作。如
果我們自定義了窗口狀態,那么必須在 .clear() 方法中進行顯式地清除,避免內存溢出。
這里有一個問題:沒有了定時器,那窗口處理函數就失去了一個最給力的武器,如果我們
希望有一些定時操作又該怎么做呢?其實仔細思考會發現,對于窗口而言,它本身的定義就包
含了一個觸發計算的時間點,其實一般情況下是沒有必要再去做定時操作的。如果非要這么干,
Flink 也提供了另外的途徑——使用窗口觸發器( Trigger )。在觸發器中也有一個 TriggerContext
它可以起到類似 TimerService 的作用:獲取當前時間、注冊和刪除定時器,另外還可以獲取當 前的狀態。這樣設計無疑會讓處理流程更加清晰——定時操作也是一種“觸發”,所以我們就
讓所有的觸發操作歸觸發器管,而所有處理數據的操作則歸窗口函數管。
至于另一種窗口處理函數 ProcessAllWindowFunction ,它的用法非常類似。區別在于它基
于的是 AllWindowedStream ,相當于對沒有 keyBy 的數據流直接開窗并調用 .process() 方法 :
stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) )
.process(new MyProcessAllWindowFunction())
7.4 應用案例—— Top N
窗口的計算處理,在實際應用中非常常見。對于一些比較復雜的需求,如果增量聚合函數
無法滿足,我們就需要考慮使用窗口處理函數這樣的“大招”了。
網站中一個非常經典的例子,就是實時統計一段時間內的熱門 url 。例如,需要統計最近
10 秒鐘內最熱門的兩個 url 鏈接,并且每 5 秒鐘更新一次。我們知道,這可以用一個滑動窗口
來實現,而“熱門度”一般可以直接用訪問量來表示。于是就需要開滑動窗口收集 url 的訪問
數據,按照不同的 url 進行統計,而后匯總排序并最終輸出前兩名。這其實就是著名的“ Top N
問題。
很顯然,簡單的增量聚合可以得到 url 鏈接的訪問量,但是后續的排序輸出 Top N 就很難
實現了。所以接下來我們用窗口處理函數進行實現。
7.4.1 使用 ProcessAllWindowFunction
一種最簡單的想法是,我們干脆不區分 url 鏈接,而是將所有訪問數據都收集起來,統一
進行統計計算。所以可以不做 keyBy ,直接基于 DataStream 開窗,然后使用全窗口函數
ProcessAllWindowFunction 來進行處理。
在窗口中可以用一個 HashMap 來保存每個 url 的訪問次數,只要遍歷窗口中的所有數據,
自然就能得到所有 url 的熱門度。最后把 HashMap 轉成一個列表 ArrayList ,然后進行排序、
取出前兩名輸出就可以了。
代碼具體實現如下:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import 
198
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
public class ProcessAllWindowTopN {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> eventStream = env.addSource(new 
ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new 
SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long 
recordTimestamp) {return element.timestamp;}}));// 只需要 url 就可以統計數量,所以轉換成 String 直接開窗統計SingleOutputStreamOperator<String> result = eventStream.map(new MapFunction<Event, String>() {@Overridepublic String map(Event value) throws Exception {return value.url;}})
199
200.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), 
Time.seconds(5))) // 開滑動窗口.process(new ProcessAllWindowFunction<String, String, TimeWindow>() 
{@Overridepublic void process(Context context, Iterable<String> elements, 
Collector<String> out) throws Exception {HashMap<String, Long> urlCountMap = new HashMap<>();// 遍歷窗口中數據,將瀏覽量保存到一個 HashMap 中for (String url : elements) {if (urlCountMap.containsKey(url)) {long count = urlCountMap.get(url);urlCountMap.put(url, count + 1L);} else {urlCountMap.put(url, 1L);}}ArrayList<Tuple2<String, Long>> mapList = new 
ArrayList<Tuple2<String, Long>>();// 將瀏覽量數據放入 ArrayList,進行排序for (String key : urlCountMap.keySet()) {mapList.add(Tuple2.of(key, urlCountMap.get(key)));}mapList.sort(new Comparator<Tuple2<String, Long>>() {@Overridepublic int compare(Tuple2<String, Long> o1, Tuple2<String, 
Long> o2) {return o2.f1.intValue() - o1.f1.intValue();}});// 取排序后的前兩名,構建輸出結果StringBuilder result = new StringBuilder();result.append("========================================\n");for (int i = 0; i < 2; i++) {Tuple2<String, Long> temp = mapList.get(i);String info = "瀏覽量 No." + (i + 1) +" url:" + temp.f0 +" 瀏覽量:" + temp.f1 +" 窗 口 結 束 時 間 : " + new 
Timestamp(context.window().getEnd()) + "\n";result.append(info);}result.append("========================================\n");out.collect(result.toString());}});result.print();env.execute();}
}
運行結果如下所示:
7.4.2 使用 KeyedProcessFunction
在上一小節的實現過程中,我們沒有進行按鍵分區,直接將所有數據放在一個分區上進行
了開窗操作。這相當于將并行度強行設置為 1 ,在實際應用中是要盡量避免的,所以 Flink
方也并不推薦使用 AllWindowedStream 進行處理。另外,我們在全窗口函數中定義了 HashMap
來統計 url 鏈接的瀏覽量,計算過程是要先收集齊所有數據、然后再逐一遍歷更新 HashMap
這顯然不夠高效。如果我們可以利用增量聚合函數的特性,每來一條數據就更新一次對應 url
的瀏覽量,那么到窗口觸發計算時只需要做排序輸出就可以了。
基于這樣的想法,我們可以從兩個方面去做優化:一是對數據進行按鍵分區,分別統計瀏
覽量;二是進行增量聚合,得到結果最后再做排序輸出。所以,我們可以使用增量聚合函數
AggregateFunction 進行瀏覽量的統計,然后結合 ProcessWindowFunction 排序輸出來實現 Top N
的需求。
具體實現思路就是,先按照 url 對數據進行 keyBy 分區,然后開窗進行增量聚合。這里就
會發現一個問題:我們進行按鍵分區之后,窗口的計算就會只針對當前 key 有效了;也就是說,
每個窗口的統計結果中,只會有一個 url 的瀏覽量,這是無法直接用 ProcessWindowFunction
進行排序的。所以我們只能分成兩步:先對每個 url 鏈接統計出瀏覽量,然后再將統計結果收
集起來,排序輸出最終結果。因為最后的排序還是基于每個時間窗口的,所以為了讓輸出的統
計結果中包含窗口信息,我們可以借用第六章中定義的 POJO UrlViewCount 來表示,它包
201 含了 url 、瀏覽量( count )以及窗口的起始結束時間。之后對 UrlViewCount 的處理,可以先按
窗口分區,然后用 KeyedProcessFunction 來實現。
總結處理流程如下:
1 )讀取數據源;
2 )篩選瀏覽行為( pv );
3 )提取時間戳并生成水位線;
4 )按照 url 進行 keyBy 分區操作;
5 )開長度為 1 小時、步長為 5 分鐘的事件時間滑動窗口;
6 )使用增量聚合函數 AggregateFunction ,并結合全窗口函數 WindowFunction 進行窗口
聚合,得到每個 url 、在每個統計窗口內的瀏覽量,包裝成 UrlViewCount
7 )按照窗口進行 keyBy 分區操作;
8 )對同一窗口的統計結果數據,使用 KeyedProcessFunction 進行收集并排序輸出。
糟糕的是,這里又會帶來另一個問題。最后我們用 KeyedProcessFunction 來收集數據做排
序,這時面對的就是窗口聚合之后的數據流,而窗口已經不存在了;那到底什么時候會收集齊
所有數據呢?這問題聽起來似乎有些沒道理。我們統計瀏覽量的窗口已經關閉,就說明了當前
已經到了要輸出結果的時候,直接輸出不就行了嗎?
沒有這么簡單。因為數據流中的元素是逐個到來的,所以即使理論上我們應該“同時”收
到很多 url 的瀏覽量統計結果,實際也是有先后的、只能一條一條處理。下游任務(就是我們
定義的 KeyedProcessFunction )看到一個 url 的統計結果,并不能保證這個時間段的統計數據
不會再來了,所以也不能貿然進行排序輸出。解決的辦法,自然就是要等所有數據到齊了——
這很容易讓我們聯想起水位線設置延遲時間的方法。這里我們也可以“多等一會兒”,等到水
位線真正超過了窗口結束時間,要統計的數據就肯定到齊了。
具體實現上,可以采用一個延遲觸發的事件時間定時器。基于窗口的結束時間來設定延遲,
其實并不需要等太久——因為我們是靠水位線的推進來觸發定時器,而水位線的含義就是“之
前的數據都到齊了”。所以我們只需要設置 1 毫秒的延遲,就一定可以保證這一點。
而在等待過程中,之前已經到達的數據應該緩存起來,我們這里用一個自定義的“列表狀
態”( ListState )來進行存儲,如圖 7-2 所示。這個狀態需要使用富函數類的 getRuntimeContext()
方法獲取運行時上下文來定義,我們一般把它放在 open() 生命周期方法中。之后每來一個
UrlViewCount ,就把它添加到當前的列表狀態中,并注冊一個觸發時間為窗口結束時間加 1
毫秒( windowEnd + 1 )的定時器。待到水位線到達這個時間,定時器觸發,我們可以保證當
前窗口所有 url 的統計結果 UrlViewCount 都到齊了;于是從狀態中取出進行排序輸出。
        
具體代碼實現如下:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import 
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
public class KeyedProcessTopN {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從自定義數據源讀取數據SingleOutputStreamOperator<Event> eventStream = env.addSource(new 
ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonot
onousTimestamps().withTimestampAssigner(new 
SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long 
recordTimestamp) {return element.timestamp;}}));// 需要按照 url 分組,求出每個 url 的訪問量SingleOutputStreamOperator<UrlViewCount> urlCountStream =eventStream.keyBy(data -> data.url).window(SlidingEventTimeWindows.of(Time.seconds(10), 
Time.seconds(5))).aggregate(new UrlViewCountAgg(),new UrlViewCountResult());// 對結果中同一個窗口的統計數據,進行排序處理SingleOutputStreamOperator<String> result = urlCountStream.keyBy(data -> 
data.windowEnd).process(new TopN(2));result.print("result");env.execute();}// 自定義增量聚合public static class UrlViewCountAgg implements AggregateFunction<Event, Long, 
204
Long> {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event value, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return null;}}// 自定義全窗口函數,只需要包裝窗口信息public static class UrlViewCountResult extends ProcessWindowFunction<Long, 
UrlViewCount, String, TimeWindow> {@Overridepublic void process(String url, Context context, Iterable<Long> elements, 
Collector<UrlViewCount> out) throws Exception {// 結合窗口信息,包裝輸出內容Long start = context.window().getStart();Long end = context.window().getEnd();out.collect(new UrlViewCount(url, elements.iterator().next(), start, 
end));}}// 自定義處理函數,排序取 top npublic static class TopN extends KeyedProcessFunction<Long, UrlViewCount, 
String>{
205// 將 n 作為屬性private Integer n;// 定義一個列表狀態private ListState<UrlViewCount> urlViewCountListState;public TopN(Integer n) {this.n = n;}@Overridepublic void open(Configuration parameters) throws Exception {// 從環境中獲取列表狀態句柄urlViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<UrlViewCount>("url-view-count-list",Types.POJO(UrlViewCount.class)));}@Overridepublic void processElement(UrlViewCount value, Context ctx, 
Collector<String> out) throws Exception {// 將 count 數據添加到列表狀態中,保存起來urlViewCountListState.add(value);// 注冊 window end + 1ms 后的定時器,等待所有數據到齊開始排序ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> 
out) throws Exception {// 將數據從列表狀態變量中取出,放入 ArrayList,方便排序ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();for (UrlViewCount urlViewCount : urlViewCountListState.get()) {urlViewCountArrayList.add(urlViewCount);}// 清空狀態,釋放資源urlViewCountListState.clear();// 排序urlViewCountArrayList.sort(new Comparator<UrlViewCount>() {@Override
206
207public int compare(UrlViewCount o1, UrlViewCount o2) {return o2.count.intValue() - o1.count.intValue();}});// 取前兩名,構建輸出結果StringBuilder result = new StringBuilder();
result.append("========================================\n");result.append("窗口結束時間:" + new Timestamp(timestamp - 1) + "\n");for (int i = 0; i < this.n; i++) {UrlViewCount UrlViewCount = urlViewCountArrayList.get(i);String info = "No." + (i + 1) + " "+ "url:" + UrlViewCount.url + " "+ "瀏覽量:" + UrlViewCount.count + "\n";result.append(info);}result.append("========================================\n");out.collect(result.toString());}}
}
代碼中,我們還利用了定時器的特性:針對同一 key 、同一時間戳會進行去重。所以對于
同一個窗口而言,我們接到統計結果數據后設定的 windowEnd + 1 的定時器都是一樣的,最終
只會觸發一次計算。而對于不同的 key (這里 key windowEnd ),定時器和狀態都是獨立的,
所以我們也不用擔心不同窗口間數據的干擾。
我們在上面的代碼中使用了后面要講解的 ListState 。這里可以先簡單說明一下。我們先聲
明一個列表狀態變量 :
private ListState<Event> UrlViewCountListState;
然后在 open 方法中初始化了列表狀態變量,我們初始化的時候使用了 ListStateDescriptor
描述符,這個描述符用來告訴 Flink 列表狀態變量的名字和類型。列表狀態變量是單例,也就
是說只會被實例化一次。這個列表狀態變量的作用域是當前 key 所對應的邏輯分區。我們使用
add 方法向列表狀態變量中添加數據,使用 get 方法讀取列表狀態變量中的所有元素。
另外,根據水位線的定義,我們這里的延遲時間設為 0 事實上也是可以保證數據都到齊的。
感興趣的讀者可以自行修改代碼進行測試。 208
7.5 側輸出流( Side Output
處理函數還有另外一個特有功能,就是將自定義的數據放入“側輸出流”( side output )輸
出。這個概念我們并不陌生,之前在講到窗口處理遲到數據時,最后一招就是輸出到側輸出流。
而這種處理方式的本質,其實就是處理函數的側輸出流功能。
我們之前講到的絕大多數轉換算子,輸出的都是單一流,流里的數據類型只能有一種。而
側輸出流可以認為是“主流”上分叉出的“支流”,所以可以由一條流產生出多條流,而且這
些流中的數據類型還可以不一樣。利用這個功能可以很容易地實現“分流”操作。
具體應用時,只要在處理函數的 .processElement() 或者 .onTimer() 方法中,調用上下文
.output() 方法就可以了。
DataStream<Integer> stream = env.addSource(...);
SingleOutputStreamOperator<Long> longStream =
stream.process(new
ProcessFunction<Integer, Long>() {
@Override
public void processElement( Integer value, Context ctx, Collector<Integer>
out) throws Exception {
// 轉換成 Long ,輸出到主流中
out.collect(Long.valueOf(value));
// 轉換成 String ,輸出到側輸出流中
ctx.output(outputTag, "side-output: " + String.valueOf(value));
}
});
這里 output() 方法需要傳入兩個參數,第一個是一個“輸出標簽” OutputTag ,用來標識側
輸出流,一般會在外部統一聲明;第二個就是要輸出的數據。
我們可以在外部先將 OutputTag 聲明出來:
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
如果想要獲取這個側輸出流,可以基于處理之后的 DataStream 直接調用 .getSideOutput()
方法,傳入對應的 OutputTag ,這個方式與窗口 API 中獲取側輸出流是完全一樣的。
DataStream<String> stringStream = longStream.getSideOutput(outputTag);
7.6 本章總結
Flink 擁有非常豐富的多層 API ,而底層的處理函數可以說是最為強大、最為靈活的一種。
廣義上來講,處理函數也可以認為是 DataStream API 中的一部分,它的調用方式與其他轉換
算子完全一致。處理函數可以訪問時間、狀態,定義定時操作,它可以直接觸及流處理最為本
質的組成部分。所以處理函數不僅是我們處理復雜需求時兜底的“大招”,更是理解流處理本
質的重要一環。
在本章中,我們詳細介紹了處理函數的功能和底層的結構,重點講解了最為常用的
KeyedProcessFunction ProcessWindowFunction ,并實現了電商應用中 Top N 的經典案例,另
外還介紹了側輸出流的用法。而關于合并兩條流之后的處理函數,以及廣播連接流
BroadcastConnectedStream )的處理操作,調用方法和原理都非常類似,我們會在后續章節繼
續展開。

8. 多流轉換

無論是基本的簡單轉換和聚合,還是基于窗口的計算,我們都是針對一條流上的數據進行
處理的。而在實際應用中,可能需要將不同來源的數據連接合并在一起處理,也有可能需要將
一條流拆分開,所以經常會有對多條流進行處理的場景。本章我們就來討論 Flink 中對多條流
進行轉換的操作。
簡單劃分的話,多流轉換可以分為“分流”和“合流”兩大類。目前分流的操作一般是通
過側輸出流( side output )來實現,而合流的算子比較豐富,根據不同的需求可以調用 union
connect join 以及 coGroup 等接口進行連接合并操作。下面我們就進行具體的講解。
8.1 分流
所謂“分流”,就是將一條數據流拆分成完全獨立的兩條、甚至多條流。也就是基于一個
DataStream ,得到完全平等的多個子 DataStream ,如圖 8-1 所示。一般來說,我們會定義一些
篩選條件,將符合條件的數據揀選出來放到對應的流里。
        
8.1.1 簡單實現
其實根據條件篩選數據的需求,本身非常容易實現:只要針對同一條流多次獨立調
.filter() 方法進行篩選,就可以得到拆分之后的流了。
例如,我們可以將電商網站收集到的用戶行為數據進行一個拆分,根據類型( type )的不
同,分為“ Mary ”的瀏覽數據、“ Bob ”的瀏覽數據等等。那么代碼就可以這樣實現:
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());// 篩選 Mary 的瀏覽行為放入 MaryStream 流中DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>() 
{@Overridepublic boolean filter(Event value) throws Exception {return value.user.equals("Mary");}});// 篩選 Bob 的購買行為放入 BobStream 流中DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {
210@Overridepublic boolean filter(Event value) throws Exception {return value.user.equals("Bob");}});// 篩選其他人的瀏覽行為放入 elseStream 流中DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>() 
{@Overridepublic boolean filter(Event value) throws Exception {return !value.user.equals("Mary") && !value.user.equals("Bob") ;}});MaryStream.print("Mary pv");BobStream.print("Bob pv");elseStream.print("else pv");env.execute();}}
輸出結果是:
Bob pv> Event{user='Bob', url='./home', timestamp=2021-06-23 17:30:57.388}
else pv> Event{user='Alice', url='./home', timestamp=2021-06-23 17:30:58.399}
else pv> Event{user='Alice', url='./home', timestamp=2021-06-23 17:30:59.409}
Bob pv> Event{user='Bob', url='./home', timestamp=2021-06-23 17:31:00.424}
else pv> Event{user='Alice', url='./prod?id=1', timestamp=2021-06-23 
17:31:01.441}
else pv> Event{user='Alice', url='./prod?id=1', timestamp=2021-06-23 
17:31:02.449}
Mary pv> Event{user='Mary', url='./home', timestamp=2021-06-23 17:31:03.465}
這種實現非常簡單,但代碼顯得有些冗余——我們的處理邏輯對拆分出的三條流其實是一
樣的,卻重復寫了三次。而且這段代碼背后的含義,是將原始數據流 stream 復制三份,然后
對每一份分別做篩選;這明顯是不夠高效的。我們自然想到,能不能不用復制流,直接用一個
算子就把它們都拆分開呢?
在早期的版本中, DataStream API 中提供了一個 .split() 方法,專門用來將一條流“切分”
成多個。它的基本思路其實就是按照給定的篩選條件,給數據分類“蓋戳”;然后基于這條蓋
戳之后的流,分別揀選想要的“戳”就可以得到拆分后的流。這樣我們就不必再對流進行復制
了。不過這種方法有一個缺陷:因為只是“蓋戳”揀選,所以無法對數據進行轉換,分流后的
數據類型必須跟原始流保持一致。這就極大地限制了分流操作的應用場景。現在 split 方法已
經淘汰掉了,我們以后分流只使用下面要講的側輸出流。
8.1.2 使用側輸出流
Flink 1.13 版本中,已經棄用了 .split() 方法,取而代之的是直接用處理函數( process
function )的側輸出流( side output )。
我們知道,處理函數本身可以認為是一個轉換算子,它的輸出類型是單一的,處理之后得
到的仍然是一個 DataStream ;而側輸出流則不受限制,可以任意自定義輸出數據,它們就像從
“主流”上分叉出的“支流”。盡管看起來主流和支流有所區別,不過實際上它們都是某種類型
DataStream ,所以本質上還是平等的。利用側輸出流就可以很方便地實現分流操作,而且得
到的多條 DataStream 類型可以不同,這就給我們的應用帶來了極大的便利。
關于處理函數中側輸出流的用法,我們已經在 7.5 節做了詳細介紹。簡單來說,只需要調
用上下文 ctx .output() 方法,就可以輸出任意類型的數據了。而側輸出流的標記和提取,都
離不開一個“輸出標簽”( OutputTag ),它就相當于 split() 分流時的“戳”,指定了側輸出流的
id 和類型。
我們可以使用側輸出流將上一小節的分流代碼改寫如下:
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class SplitStreamByOutputTag {// 定義輸出標簽,側輸出流的數據類型為三元組(user, url, timestamp)private static OutputTag<Tuple3<String, String, Long>> MaryTag = new 
OutputTag<Tuple3<String, String, Long>>("Mary-pv"){};private static OutputTag<Tuple3<String, String, Long>> BobTag = new 
OutputTag<Tuple3<String, String, Long>>("Bob-pv"){};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());SingleOutputStreamOperator<Event> processedStream = stream.process(new 
ProcessFunction<Event, Event>() {@Overridepublic void processElement(Event value, Context ctx, Collector<Event> 
out) throws Exception {if (value.user.equals("Mary")){ctx.output(MaryTag, new Tuple3<>(value.user, value.url, 
value.timestamp));} else if (value.user.equals("Bob")){
212ctx.output(BobTag, new Tuple3<>(value.user, value.url, 
value.timestamp));} else {out.collect(value);}}});processedStream.getSideOutput(MaryTag).print("Mary pv");processedStream.getSideOutput(BobTag).print("Bob pv");processedStream.print("else");env.execute();}
}
輸出結果是:
Bob pv> (Bob,./prod?id=1,1624442886645)
Mary pv> (Mary,./prod?id=1,1624442887664)
Bob pv> (Bob,./home,1624442888673)
Mary pv> (Mary,./prod?id=1,1624442889676)
else> Event{user='Alice', url='./prod?id=1', timestamp=2021-06-23 18:08:10.693}
else> Event{user='Alice', url='./prod?id=1', timestamp=2021-06-23 18:08:11.697}
else> Event{user='Alice', url='./prod?id=1', timestamp=2021-06-23 18:08:12.702}
Mary pv> (Mary,./cart,1624442893705)
Bob pv> (Bob,./cart,1624442894710)
else> Event{user='Alice', url='./cart', timestamp=2021-06-23 18:08:15.722}
Mary pv> (Mary,./prod?id=1,1624442896725)
這里我們定義了兩個側輸出流,分別揀選 Mary 的瀏覽事件和 Bob 的瀏覽事件;由于類型
已經確定,我們可以只保留 ( 用戶 id, url, 時間戳 ) 這樣一個三元組。而剩余的事件則直接輸出
到主流,類型依然保留 Event ,就相當于之前的 elseStream 。這樣的實現方式顯然更簡潔,也
更加靈活。
8.2 基本合流操作
既然一條流可以分開,自然多條流就可以合并。在實際應用中,我們經常會遇到來源不同
的多條流,需要將它們的數據進行聯合處理。所以 Flink 中合流的操作會更加普遍,對應的
API 也更加豐富。
8.2.1 聯合( Union
最簡單的合流操作,就是直接將多條流合在一起,叫作流的“聯合”( union ),如圖 8-2
所示。聯合操作要求必須流中的數據類型必須相同,合并之后的新流會包括所有流中的元素,
數據類型不變。這種合流方式非常簡單粗暴,就像公路上多個車道匯在一起一樣。
        
        
在代碼中,我們只要基于 DataStream 直接調用 .union() 方法,傳入其他 DataStream 作為參
數,就可以實現流的聯合了;得到的依然是一個 DataStream
stream1.union(stream2, stream3, ...)
注意: union() 的參數可以是多個 DataStream ,所以聯合操作可以實現多條流的合并。
這里需要考慮一個問題。在事件時間語義下,水位線是時間的進度標志;不同的流中可能
水位線的進展快慢完全不同,如果它們合并在一起,水位線又該以哪個為準呢?
還以要考慮水位線的本質含義,是“之前的所有數據已經到齊了”;所以對于合流之后的
水位線,也是要以最小的那個為準,這樣才可以保證所有流都不會再傳來之前的數據。換句話
說,多流合并時處理的時效性是以最慢的那個流為準的。我們自然可以想到,這與之前介紹的
并行任務水位線傳遞的規則是完全一致的;多條流的合并,某種意義上也可以看作是多個并行
任務向同一個下游任務匯合的過程。
我們可以用下面的代碼做一個簡單測試:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class UnionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);
214SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop102", 
7777).map(data -> {String[] field = data.split(",");return new Event(field[0].trim(), field[1].trim(), 
Long.valueOf(field[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBound
edOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new 
SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long 
recordTimestamp) {return element.timestamp;}}));stream1.print("stream1");SingleOutputStreamOperator<Event> stream2 = 
env.socketTextStream("hadoop103", 7777).map(data -> {String[] field = data.split(",");return new Event(field[0].trim(), field[1].trim(), 
Long.valueOf(field[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBound
edOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new 
SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long 
recordTimestamp) {return element.timestamp;}}));stream2.print("stream2");// 合并兩條流stream1.union(stream2).process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, Context ctx, 
Collector<String> out) throws Exception {out.collect(" 水 位 線 : " + 
ctx.timerService().currentWatermark());}}).print();env.execute();}
}
這里為了更清晰地看到水位線的進展,我們創建了兩條流來讀取 socket

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/43207.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/43207.shtml
英文地址,請注明出處:http://en.pswp.cn/web/43207.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

一文入門【NestJs】Controllers 控制器

Nest學習系列 ??一文帶你入門【NestJS】 ??前言 流程圖 Controllers 控制器主要負責處理傳入請求&#xff0c;并向客戶端返回響應&#xff0c;控制器可以通過路由機制來控制接收那些請求&#xff0c;通常一個Controllers種會有多個匹配路由&#xff0c;不同的路由可以知…

Spring源碼二十一:Bean實例化流程四

上一篇Spring源碼二十&#xff1a;Bean實例化流程三中&#xff0c;我們主要討論了單例Bean創建對象的主要方法getSingleton的內部方法createBean&#xff0c;createBean方法中的resolveBeanClase方法與prepareMethodOverrides方法處理了lookup-method屬性與repliace-method配置…

MT3046 憤怒的象棚

思路&#xff1a; a[]存憤怒值&#xff1b;b[i]存以i結尾的&#xff0c;窗口里的最大值&#xff1b;c[i]存以i結尾的&#xff0c;窗口里面包含?的最大值。 &#xff08;?為新大象的位置&#xff09; 例&#xff1a;1 2 3 4 ? 5 6 7 8 9 則ans的計算公式b3b4c4c5c6b7b8b9…

三代測序結構變異分析 - 單樣本Germline SV calling和多樣本SV Calling

適用于三代PacBio HiFi / ONT 長reads數據的結構變異分析。 1. sniffles2安裝 sniffles2需要Python >= 3.10環境,因此用conda創建安裝好3.10的環境。 sniffles2安裝要求: Python >= 3.10pysam >= 0.21.0edlib >=1.3.9psutil>=5.9.4# 創建conda環境 conda c…

【記錄】LaTex|LaTex 代碼片段 Listings 添加帶圓圈數字標號的箭頭(又名 LaTex Tikz 庫畫箭頭的簡要介紹)

文章目錄 前言注意事項1 Tikz 的調用方法&#xff1a;newcommand2 標號圓圈數字的添加方式&#xff1a;\large{\textcircled{\small{1}}}\normalsize3 快速掌握 Tikz 箭頭寫法&#xff1a;插入點相對位移標號node3.1 第一張圖&#xff1a;插入點相對位移3.2 第二張圖&#xff1…

【MindSpore學習打卡】應用實踐-LLM原理和實踐-基于MindSpore實現BERT對話情緒識別

在當今的自然語言處理&#xff08;NLP&#xff09;領域&#xff0c;情緒識別是一個非常重要的應用場景。無論是在智能客服、社交媒體分析&#xff0c;還是在情感計算領域&#xff0c;準確地識別用戶的情緒都能夠極大地提升用戶體驗和系統的智能化水平。BERT&#xff08;Bidirec…

imx6ull/linux應用編程學習(12)CAN應用編程基礎

關于裸機的can通信&#xff0c;會在其他文章發&#xff0c;這里主要講講linux上的can通信。 與I2C,SPI等同步通訊方式不同&#xff0c;CAN通訊是異步通訊&#xff0c;也就是沒有時鐘信號線來保持信號接收同步&#xff0c;也就是所說的半雙工&#xff0c;無法同時發送與接收&…

【Java 注解,自定義注解,元注解,注解本質,注解解析】

文章目錄 什么是注解&#xff1f;Java內置注解自定義注解元注解注解的本質注解解析 什么是注解&#xff1f; 注解是Java編程語言中的一種元數據&#xff0c;提供了有關程序的額外信息。注解以符號開始&#xff0c;緊跟著注解的名稱和一對括號&#xff0c;括號內包含注解的參數…

C++基礎篇(1)

目錄 前言 1.第一個C程序 2.命名空間 2.1概念理解 2.2namespace 的價值 2.3 namespace的定義 3.命名空間的使用 4.C的輸入輸出 結束語 前言 本節我們將正式進入C基礎的學習&#xff0c;話不多說&#xff0c;直接上貨&#xff01;&#xff01;&#xff01; 1.第一個C程…

【Linux進階】文件系統8——硬鏈接和符號連接:ln

在Linux下面的鏈接文件有兩種&#xff0c; 一種是類似Windows的快捷方式功能的文件&#xff0c;可以讓你快速地鏈接到目標文件&#xff08;或目錄)&#xff1b;另一種則是通過文件系統的inode 鏈接來產生新文件名&#xff0c;而不是產生新文件&#xff0c;這種稱為硬鏈接&…

base SAS programming學習筆記10(combine data)

1.一對一合并 基本格式如下&#xff1a; data output-data-set; set data-set1; set data-set2;(data-set1和data-set2可以是相同的數據集&#xff0c;可以添加多個set 語句來實現上述的一對一合并) run; 輸出數據集結果如下&#xff1a; a.會包含所有輸入數據的變量名&#x…

小米手機永久刪除的照片怎么找回?這兩個方法千萬不要錯過!

小米手機永久刪除的照片怎么找回&#xff1f;身為米粉發燒黨的小編又雙叒叕手殘了&#xff01;本來想在手機回收站中恢復一張照片&#xff0c;結果一個稀里糊涂就把照片點成了“永久刪除”。于是乎難得的休班假期&#xff0c;就變成了小編恢復永久刪除照片的漫漫之路。以下是小…

org.springframework.boot.autoconfigure.EnableAutoConfiguration=XXXXX的作用是什么?

org.springframework.boot.autoconfigure.EnableAutoConfigurationXXXXXXX 這一配置項在 Spring Boot 項目中的作用如下&#xff1a; 自動配置類的指定&#xff1a; 這一配置將 EnableAutoConfiguration 設置為 cn.geek.javadatamanage.config.DataManageAutoConfiguration&…

【2024_CUMCM】TOPSIS法(優劣解距離法)

目錄 引入 層次分析法的局限性 簡介 例子 想法1 想法2 運用實際分數進行處理 想法3 問題 擴展問題&#xff1a;增加指標個數 極大型指標與極小型指標 統一指標類型-指標正向化 標準化處理 計算公式 計算得分 對原公式進行變化 升級到m個指標和n個對象 代碼 …

系統分析師-基礎知識

基礎知識 一、計算機組成與結構1、計算機系統基礎知識1.1 計算機硬件組成1.2 中央處理單元&#xff08;CPU&#xff09;1.3 數據表示1.3.1 R進制轉十進制&#xff1a;1.3.2 十進制轉R進制&#xff1a; 1.4 校驗碼&#xff08;3種校驗碼&#xff09;1.4.1 基本知識1.4.2 奇偶校驗…

D-DPCC: Deep Dynamic Point Cloud Compression via 3D Motion Prediction

1. 論文基本信息 發布于&#xff1a; 2022 2. 創新點 首先提出了一種端到端深度動態點云壓縮框架(D-DPCC)&#xff0c;用于運動估計、運動補償、運動壓縮和殘差壓縮的聯合優化。提出了一種新的多尺度運動融合(MMF)模塊用于點云幀間預測&#xff0c;該模塊提取和融合不同運動流…

首屆UTON區塊鏈開發者計劃大會在馬來西亞圓滿落幕

7月9日&#xff0c;首屆UTON區塊鏈開發者計劃大會在馬來西亞吉隆坡成功舉辦&#xff01; 來自全球頂尖的行業領袖、技術精英和眾多區塊鏈愛好者參與了此次盛會&#xff0c;也標志著UTON區塊鏈生態進入了一個全新的發展階段。 會上&#xff0c;UTON區塊鏈創始人之一唐毅先生以“…

Python 中什么是遞歸函數,如何編寫遞歸函數?

遞歸是計算機科學中的一種基本概念&#xff0c;它指的是函數調用自身的編程技巧。在Python中&#xff0c;遞歸函數是一種通過調用自身來解決問題的函數。這種方法常用于解決可以被分解為較小相同問題的場景&#xff0c;例如階乘計算、斐波那契數列、全排列生成等。 一、遞歸的…

TCP 握手數據流

這張圖詳細描述了 TCP 握手過程中&#xff0c;從客戶端發送 SYN 包到服務器最終建立連接的整個數據流轉過程&#xff0c;包括網卡、內核、進程中的各個環節。下面對每個步驟進行詳細解釋&#xff1a; 客戶端到服務器的初始連接請求 客戶端發送 SYN 包&#xff1a; 客戶端發起…

添加點擊跳轉頁面,優化登錄和注冊頁路由

一、給注銷按鈕添加點擊跳轉至登錄頁 1、在路由中添加登錄頁路由 2、自定義登錄頁面 3、在app.vue頁面找到下拉框組件&#xff0c;添加點擊事件 4、使用vue-router中的useRoute和useRouter 點擊后可以跳轉&#xff0c;但是還存在問題&#xff0c;路徑這里如果我們需要更改登錄…