7. 處理函數
之前所介紹的流處理 API,無論是基本的轉換、聚合,還是更為復雜的窗口操作,其實都是基于 DataStream 進行轉換的;所以可以統稱為 DataStream API ,這也是 Flink 編程的核心。而我們知道,為了讓代碼有更強大的表現力和易用性, Flink 本身提供了多層 API ,DataStream API 只是中間的一環,如圖 7-1 所示:

在更底層,我們可以不定義任何具體的算子(比如 map , filter ,或者 window ),而只是提
煉出一個統一的“處理”( process )操作——它是所有轉換算子的一個概括性的表達,可以自
定義處理邏輯,所以這一層接口就被叫作“處理函數”( process function )。
在處理函數中,我們直面的就是數據流中最基本的元素:數據事件(event )、狀態( state )
以及時間( time )。這就相當于對流有了完全的控制權。處理函數比較抽象,沒有具體的操作,
所以對于一些常見的簡單應用(比如求和、開窗口)會顯得有些麻煩;不過正是因為它不限定
具體做什么,所以理論上我們可以做任何事情,實現所有需求。所以可以說,處理函數是我們
進行 Flink 編程的“大招”,輕易不用,一旦放出來必然會掃平一切。
本章我們就深入底層,討論一下 Flink 中處理函數的使用方法。
7.1 基本處理函數( ProcessFunction )
處理函數主要是定義數據流的轉換操作,所以也可以把它歸到轉換算子中。我們知道在
Flink 中幾乎所有轉換算子都提供了對應的函數類接口,處理函數也不例外;它所對應的函數
類,就叫作 ProcessFunction 。
7.1.1 處理函數的功能和使用
我們之前學習的轉換算子,一般只是針對某種具體操作來定義的,能夠拿到的信息比較有
限。比如 map 算子,我們實現的 MapFunction 中,只能獲取到當前的數據,定義它轉換之后
的形式;而像窗口聚合這樣的復雜操作, AggregateFunction 中除數據外,還可以獲取到當前的
183 狀態(以累加器 Accumulator 形式出現)。另外我們還介紹過富函數類,比如 RichMapFunction ,
它提供了獲取運行時上下文的方法 getRuntimeContext() ,可以拿到狀態,還有并行度、任務名
稱之類的運行時信息。
但是無論那種算子,如果我們想要訪問事件的時間戳,或者當前的水位線信息,都是完全
做不到的。在定義生成規則之后,水位線會源源不斷地產生,像數據一樣在任務間流動,可我
們卻不能像數據一樣去處理它;跟時間相關的操作,目前我們只會用窗口來處理。而在很多應
用需求中,要求我們對時間有更精細的控制,需要能夠獲取水位線,甚至要 “ 把控時間 ” 、定義
什么時候做什么事,這就不是基本的時間窗口能夠實現的了。
于是必須祭出大招——處理函數( ProcessFunction )了。處理函數提供了一個“定時服務”
( TimerService ),我們可以通過它訪問流中的事件( event )、時間戳( timestamp )、水位線
( watermark ),甚至可以注冊 “ 定時事件 ” 。而且處理函數繼承了 AbstractRichFunction 抽象類,
所以擁有富函數類的所有特性,同樣可以訪問狀態( state )和其他運行時信息。此外,處理函
數還可以直接將數據輸出到側輸出流( side output )中。所以,處理函數是最為靈活的處理方
法,可以實現各種自定義的業務邏輯;同時也是整個 DataStream API 的底層基礎。
處理函數的使用與基本的轉換操作類似,只需要直接基于 DataStream 調用 .process() 方法
就可以了。方法需要傳入一個 ProcessFunction 作為參數,用來定義處理邏輯。
stream.process(new MyProcessFunction())
這里 ProcessFunction 不是接口,而是一個抽象類,繼承了 AbstractRichFunction ;
MyProcessFunction 是它的一個具體實現。所以所有的處理函數,都是富函數( RichFunction ),
富函數可以調用的東西這里同樣都可以調用。
下面是一個具體的應用示例:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class ProcessFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new
SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event event, long l) {return event.timestamp;}})).process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, Context ctx,
Collector<String> out) throws Exception {if (value.user.equals("Mary")) {out.collect(value.user);} else if (value.user.equals("Bob")) {out.collect(value.user);out.collect(value.user);}System.out.println(ctx.timerService().currentWatermark());}}).print();env.execute();}
}
這里我們在 ProcessFunction 中重寫了 .processElement() 方法,自定義了一種處理邏輯:當
數據的 user 為“ Mary ”時,將其輸出一次;而如果為“ Bob ”時,將 user 輸出兩次。這里的
輸 出 , 是 通 過 調 用 out.collect() 來實現的。另外我們還可以調用
ctx.timerService().currentWatermark() 來 獲 取 當 前 的 水 位 線 打 印 輸 出 。 所 以 可 以 看 到 ,
ProcessFunction 函數有點像 FlatMapFunction 的升級版。可以實現 Map 、 Filter 、 FlatMap 的所
有功能。很明顯,處理函數非常強大,能夠做很多之前做不到的事情。
接下來我們就深入 ProcessFunction 內部來進行詳細了解。
7.1.2 ProcessFunction 解析
在源碼中我們可以看到,抽象類 ProcessFunction 繼承了 AbstractRichFunction ,有兩個泛
型類型參數: I 表示 Input ,也就是輸入的數據類型; O 表示 Output ,也就是處理完成之后輸出
的數據類型。
內部單獨定義了兩個方法:一個是必須要實現的抽象方法 .processElement() ;另一個是非
抽象方法 .onTimer() 。
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...
public abstract void processElement(I value, Context ctx, Collector<O> out)
throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
throws Exception {}
...
}
1. 抽象方法 .processElement()
用于“處理元素”,定義了處理的核心邏輯。這個方法對于流中的每個元素都會調用一次,
參數包括三個:輸入數據值 value ,上下文 ctx ,以及“收集器”( Collector ) out 。方法沒有返
回值,處理之后的輸出數據是通過收集器 out 來定義的。
? value :當前流中的輸入元素,也就是正在處理的數據,類型與流中數據類
型一致。
? ctx :類型是 ProcessFunction 中定義的內部抽象類 Context ,表示當前運行的
上下文,可以獲取到當前的時間戳,并提供了用于查詢時間和注冊定時器的“定時服
務” (TimerService) ,以及可以將數據發送到“側輸出流”( side output )的方法 .output() 。
Context 抽象類定義如下:
public abstract class Context {
public abstract Long timestamp();
public abstract TimerService timerService();
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
? out :“收集器”(類型為 Collector ),用于返回輸出數據。使用方式與 flatMap
算子中的收集器完全一樣,直接調用 out.collect() 方法就可以向下游發出一個數據。
這個方法可以多次調用,也可以不調用。
通過幾個參數的分析不難發現, ProcessFunction 可以輕松實現 flatMap 這樣的基本轉換功
能(當然 map 、 filter 更不在話下);而通過富函數提供的獲取上下文方法 .getRuntimeContext() ,
也可以自定義狀態( state )進行處理,這也就能實現聚合操作的功能了。關于自定義狀態的具
體實現,我們會在后續“狀態管理”一章中詳細介紹。
2. 非抽象方法 .onTimer()
用于定義定時觸發的操作,這是一個非常強大、也非常有趣的功能。這個方法只有在注冊
好的定時器觸發的時候才會調用,而定時器是通過“定時服務” TimerService 來注冊的。打個
比方,注冊定時器( timer )就是設了一個鬧鐘,到了設定時間就會響;而 .onTimer() 中定義的,
就是鬧鐘響的時候要做的事。所以它本質上是一個基于時間的“回調”( callback )方法,通過
時間的進展來觸發;在事件時間語義下就是由水位線( watermark )來觸發了。
與 .processElement() 類似,定時方法 .onTimer() 也有三個參數:時間戳( timestamp ),上下
文( ctx ),以及收集器( out )。這里的 timestamp 是指設定好的觸發時間,事件時間語義下當
186 然就是水位線了。另外這里同樣有上下文和收集器,所以也可以調用定時服務( TimerService ),
以及任意輸出處理之后的數據。
既然有 .onTimer() 方法做定時觸發,我們用 ProcessFunction 也可以自定義數據按照時間分
組、定時觸發計算輸出結果;這其實就實現了窗口( window )的功能。所以說 ProcessFunction
是真正意義上的終極奧義,用它可以實現一切功能。
我們也可以看到,處理函數都是基于事件觸發的。水位線就如同插入流中的一條數據一樣;
只不過處理真正的數據事件調用的是 .processElement() 方法,而處理水位線事件調用的
是 .onTimer() 。
這里需要注意的是,上面的 .onTimer() 方法只是定時器觸發時的操作,而定時器( timer )
真正的設置需要用到上下文 ctx 中的定時服務。在 Flink 中,只有“按鍵分區流” KeyedStream
才支持設置定時器的操作,所以之前的代碼中我們并沒有使用定時器。所以基于不同類型的流,
可以使用不同的處理函數,它們之間還是有一些微小的區別的。接下來我們就介紹一下處理函
數的分類。
7.1.3 處理函數的分類
Flink 中的處理函數其實是一個大家族, ProcessFunction 只是其中一員。
我們知道, DataStream 在調用一些轉換方法之后,有可能生成新的流類型;例如調
用 .keyBy() 之后得到 KeyedStream ,進而再調用 .window() 之后得到 WindowedStream 。對于不同
類型的流,其實都可以直接調用 .process() 方法進行自定義處理,這時傳入的參數就都叫作處理
函數。當然,它們盡管本質相同,都是可以訪問狀態和時間信息的底層 API ,可彼此之間也會
有所差異。
Flink 提供了 8 個不同的處理函數:
( 1 ) ProcessFunction
最基本的處理函數,基于 DataStream 直接調用 .process() 時作為參數傳入。
( 2 ) KeyedProcessFunction
對流按鍵分區后的處理函數,基于 KeyedStream 調用 .process() 時作為參數傳入。要想使用
定時器,比如基于 KeyedStream 。
( 3 ) ProcessWindowFunction
開窗之后的處理函數,也是全窗口函數的代表。基于 WindowedStream 調用 .process() 時作
為參數傳入。
( 4 ) ProcessAllWindowFunction
同樣是開窗之后的處理函數,基于 AllWindowedStream 調用 .process() 時作為參數傳入。
( 5 ) CoProcessFunction
187 合并( connect )兩條流之后的處理函數,基于 ConnectedStreams 調用 .process() 時作為參
數傳入。關于流的連接合并操作,我們會在后續章節詳細介紹。
( 6 ) ProcessJoinFunction
間隔連接( interval join )兩條流之后的處理函數,基于 IntervalJoined 調用 .process() 時作為
參數傳入。
( 7 ) BroadcastProcessFunction
廣播連接流處理函數,基于 BroadcastConnectedStream 調用 .process() 時作為參數傳入。這
里的“廣播連接流” BroadcastConnectedStream ,是一個未 keyBy 的普通 DataStream 與一個廣
播流( BroadcastStream )做連接( conncet )之后的產物。關于廣播流的相關操作,我們會在后
續章節詳細介紹。
( 8 ) KeyedBroadcastProcessFunction
按鍵分區的廣播連接流處理函數,同樣是基于 BroadcastConnectedStream 調用 .process() 時
作為參數傳入。與 BroadcastProcessFunction 不同的是,這時的廣播連接流,是一個 KeyedStream
與廣播流( BroadcastStream )做連接之后的產物。
接下來,我們就對 KeyedProcessFunction 和 ProcessWindowFunction 的具體用法展開詳細
說明。
7.2 按鍵分區處理函數( KeyedProcessFunction )
在 Flink 程序中,為了實現數據的聚合統計,或者開窗計算之類的功能,我們一般都要先
用 keyBy 算子對數據流進行“按鍵分區”,得到一個 KeyedStream 。也就是指定一個鍵( key ),
按照它的哈希值( hash code )將數據分成不同的“組”,然后分配到不同的并行子任務上執行
計算;這相當于做了一個邏輯分流的操作,從而可以充分利用并行計算的優勢實時處理海量數
據。
另外我們在上節中也提到,只有在 KeyedStream 中才支持使用 TimerService 設置定時器的
操作。所以一般情況下,我們都是先做了 keyBy 分區之后,再去定義處理操作;代碼中更加
常見的處理函數是 KeyedProcessFunction ,最基本的 ProcessFunction 反而出鏡率沒那么高。
接下來我們就先從定時服務( TimerService )入手,詳細講解 KeyedProcessFunction 的用
法
7.2.1 定時器( Timer )和定時服務( TimerService )
KeyedProcessFunction 的一個特色,就是可以靈活地使用定時器。
定時器( timers )是處理函數中進行時間相關操作的主要機制。在 .onTimer() 方法中可以實
現定時處理的邏輯,而它能觸發的前提,就是之前曾經注冊過定時器、并且現在已經到了觸發
188 時間。注冊定時器的功能,是通過上下文中提供的“定時服務”( TimerService )來實現的。
定時服務與當前運行的環境有關。前面已經介紹過, ProcessFunction 的上下文( Context )
中提供了 .timerService() 方法,可以直接返回一個 TimerService 對象:
public abstract TimerService timerService();
TimerService 是 Flink 關于時間和定時器的基礎服務接口,包含以下六個方法:
// 獲取當前的處理時間
long currentProcessingTime();
// 獲取當前的水位線(事件時間)
long currentWatermark();
// 注冊處理時間定時器,當處理時間超過 time 時觸發
void registerProcessingTimeTimer(long time);
// 注冊事件時間定時器,當水位線超過 time 時觸發
void registerEventTimeTimer(long time);
// 刪除觸發時間為 time 的處理時間定時器
void deleteProcessingTimeTimer(long time);
// 刪除觸發時間為 time 的處理時間定時器
void deleteEventTimeTimer(long time);
六個方法可以分成兩大類:基于處理時間和基于事件時間。而對應的操作主要有三個:獲
取當前時間,注冊定時器,以及刪除定時器。需要注意,盡管處理函數中都可以直接訪問
TimerService ,不過只有基于 KeyedStream 的處理函數,才能去調用注冊和刪除定時器的方法;
未作按鍵分區的 DataStream 不支持定時器操作,只能獲取當前時間。
對于處理時間和事件時間這兩種類型的定時器, TimerService 內部會用一個優先隊列將它
們的時間戳( timestamp )保存起來,排隊等待執行。可以認為,定時器其實是 KeyedStream
上處理算子的一個狀態,它以時間戳作為區分。所以 TimerService 會以鍵( key )和時間戳為
標準,對定時器進行去重;也就是說對于每個 key 和時間戳,最多只有一個定時器,如果注冊
了多次, onTimer() 方法也將只被調用一次。這樣一來,我們在代碼中就方便了很多,可以肆
無忌憚地對一個 key 注冊定時器,而不用擔心重復定義——因為一個時間戳上的定時器只會觸
發一次。
基于 KeyedStream 注冊定時器時,會傳入一個定時器觸發的時間戳,這個時間戳的定時器
對于每個 key 都是有效的。這樣,我們的代碼并不需要做額外的處理,底層就可以直接對不同
key 進行獨立的處理操作了。
利用這個特性,有時我們可以故意降低時間戳的精度,來減少定時器的數量,從而提高處
理性能。比如我們可以在設置定時器時只保留整秒數,那么定時器的觸發頻率就是最多 1 秒一
次。
long coalescedTime = time / 1000 * 1000;
189 ctx.timerService().registerProcessingTimeTimer(coalescedTime);
這里注意定時器的時間戳必須是毫秒數,所以我們得到整秒之后還要乘以 1000 。定時器
默認的區分精度是毫秒。
另外 Flink 對 .onTimer() 和 .processElement() 方法是同步調用的( synchronous ),所以也不會
出現狀態的并發修改。
Flink 的定時器同樣具有容錯性,它和狀態一起都會被保存到一致性檢查點( checkpoint )
中。當發生故障時, Flink 會重啟并讀取檢查點中的狀態,恢復定時器。如果是處理時間的定
時器,有可能會出現已經“過期”的情況,這時它們會在重啟時被立刻觸發。關于 Flink 的檢
查點和容錯機制,我們會在后續章節詳細講解。
7.2.2 KeyedProcessFunction 的使用
KeyedProcessFunction 可以說是處理函數中的 “ 嫡系部隊 ” ,可以認為是 ProcessFunction 的
一個擴展。我們只要基于 keyBy 之后的 KeyedStream ,直接調用 .process() 方法,這時需要傳入
的參數就是 KeyedProcessFunction 的實現類。
stream.keyBy( t -> t.f0 )
.process(new MyKeyedProcessFunction())
類似地, KeyedProcessFunction 也是繼承自 AbstractRichFunction 的一個抽象類,源碼中定
義如下:
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
{
...
public abstract void processElement(I value, Context ctx, Collector<O> out)
throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
throws Exception {}
public abstract class Context {...}
...
}
可以看到與 ProcessFunction 的定義幾乎完全一樣,區別只是在于類型參數多了一個 K ,
這是當前按鍵分區的 key 的類型。同樣地,我們必須實現一個 .processElement() 抽象方法,用
來處理流中的每一個數據;另外還有一個非抽象方法 .onTimer() ,用來定義定時器觸發時的回
調操作。由于定時器只能在 KeyedStream 上使用,所以到了 KeyedProcessFunction 這里,我們
才真正對時間有了精細的控制,定時方法 .onTimer() 才真正派上了用場。
下面是一個使用處理時間定時器的具體示例:
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
public class ProcessingTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 處理時間語義,不需要分配時間戳和 watermarkSingleOutputStreamOperator<Event> stream = env.addSource(new
ClickSource());// 要用定時器,必須基于 KeyedStreamstream.keyBy(data -> true).process(new KeyedProcessFunction<Boolean, Event, String>() {@Overridepublic void processElement(Event value, Context ctx,
Collector<String> out) throws Exception {Long currTs = ctx.timerService().currentProcessingTime();out.collect("數據到達,到達時間:" + new Timestamp(currTs));// 注冊一個 10 秒后的定時器ctx.timerService().registerProcessingTimeTimer(currTs + 10
* 1000L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx,
Collector<String> out) throws Exception {out.collect("定時器觸發,觸發時間:" + new Timestamp(timestamp));}}).print();env.execute();}
}
在上面的代碼中,由于定時器只能在 KeyedStream 上使用,所以先要進行 keyBy ;這里
的 .keyBy(data -> true) 是將所有數據的 key 都指定為了 true ,其實就是所有數據擁有相同的 key ,
會分配到同一個分區。
之后我們自定義了一個 KeyedProcessFunction ,其中 .processElement() 方法是每來一個數據
都會調用一次,主要是定義了一個 10 秒之后的定時器;而 .onTimer() 方法則會在定時器觸發時
調用。所以我們會看到,程序運行后先在控制臺輸出“數據到達”的信息,等待 10 秒之后,
又會輸出“定時器觸發”的信息,打印出的時間間隔正是 10 秒。
當然,上面的例子是處理時間的定時器,所以我們是真的需要等待 10 秒才會看到結果。
事件時間語義下,又會有什么不同呢?我們可以對上面的代碼略作修改,做一個測試:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
public class EventTimeTimerTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
192env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new
CustomSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonot
onousTimestamps().withTimestampAssigner(new
SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long
recordTimestamp) {return element.timestamp;}}));// 基于 KeyedStream 定義事件時間定時器stream.keyBy(data -> true).process(new KeyedProcessFunction<Boolean, Event, String>() {@Overridepublic void processElement(Event value, Context ctx,
Collector<String> out) throws Exception {out.collect("數據到達,時間戳為:" + ctx.timestamp());out.collect(" 數據到達,水位線為: " +
ctx.timerService().currentWatermark() + "\n -------分割線-------");// 注冊一個 10 秒后的定時器ctx.timerService().registerEventTimeTimer(ctx.timestamp()
+ 10 * 1000L);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx,
Collector<String> out) throws Exception {out.collect("定時器觸發,觸發時間:" + timestamp);}}).print();env.execute();}// 自定義測試數據源public static class CustomSource implements SourceFunction<Event> {@Overridepublic void run(SourceContext<Event> ctx) throws Exception {// 直接發出測試數據ctx.collect(new Event("Mary", "./home", 1000L));// 為了更加明顯,中間停頓 5 秒鐘Thread.sleep(5000L);// 發出 10 秒后的數據ctx.collect(new Event("Mary", "./home", 11000L));Thread.sleep(5000L);// 發出 10 秒+1ms 后的數據ctx.collect(new Event("Alice", "./cart", 11001L));Thread.sleep(5000L);}@Overridepublic void cancel() { }}
}
由于是事件時間語義,所以我們必須從數據中提取出數據產生的時間戳。這里為了更清楚
地看到程序行為,我們自定義了一個數據源,發出三條測試數據,時間戳分別為 1000 、 11000
和 11001 ,并且發出數據后都會停頓 5 秒。
在代碼中,我們依然將所有數據分到同一分區,然后在自定義的 KeyedProcessFunction 中
使用定時器。同樣地,每來一條數據,我們就將當前的數據時間戳和水位線信息輸出,并注冊
一個 10 秒后(以當前數據時間戳為基準)的事件時間定時器。執行程序結果如下:

每來一條數據,都會輸出兩行“數據到達”的信息,并以分割線隔開;兩條數據到達的時
間間隔為 5 秒。當第三條數據到達后,隨后立即輸出一條定時器觸發的信息;再過 5 秒之后,
剩余兩條定時器信息輸出,程序運行結束。
我們可以發現,數據到來之后,當前的水位線與時間戳并不是一致的。當第一條數據到來,
時間戳為 1000 ,可水位線的生成是周期性的(默認 200ms 一次),不會立即發生改變,所以依
然是最小值 Long.MIN_VALUE ;隨后只要到了水位線生成的時間點( 200ms 到了),就會依據
當前的最大時間戳 1000 來生成水位線了。這里我們沒有設置水位線延遲,默認需要減去 1 毫
秒,所以水位線推進到了 999 。而當時間戳為 11000 的第二條數據到來之后,水位線同樣沒有
立即改變,仍然是 999 ,就好像總是“滯后”數據一樣。
這樣程序的行為就可以得到合理解釋了。事件時間語義下,定時器觸發的條件就是水位線
推進到設定的時間。第一條數據到來后,設定的定時器時間為 1000 + 10 * 1000 = 11000 ;而當
時間戳為 11000 的第二條數據到來,水位線還處在 999 的位置,當然不會立即觸發定時器;而
之后水位線會推進到 10999 ,同樣是無法觸發定時器的。必須等到第三條數據到來,將水位線
真正推進到 11000 ,就可以觸發第一個定時器了。第三條數據發出后再過 5 秒,沒有更多的數
據生成了,整個程序運行結束將要退出,此時 Flink 會自動將水位線推進到長整型的最大值
( Long.MAX_VALUE )。于是所有尚未觸發的定時器這時就統一觸發了,我們就在控制臺看到
了后兩個定時器的觸發信息。
7.3 窗口處理函數
除 了 KeyedProcessFunction , 另 外 一 大 類 常 用 的 處 理 函 數 , 就 是 基 于 窗 口 的
ProcessWindowFunction 和 ProcessAllWindowFunction 了。如果看了前面的章節,會發現我們
195 之前已經簡單地使用過窗口處理函數了。
7.3.1 窗口處理函數的使用
進行窗口計算,我們可以直接調用現成的簡單聚合方法( sum/max/min ) , 也可以通過調
用 .reduce() 或 .aggregate() 來自定義一般的增量聚合函數( ReduceFunction/AggregateFucntion );
而對于更加復雜、需要窗口信息和額外狀態的一些場景,我們還可以直接使用全窗口函數、把
數據全部收集保存在窗口內,等到觸發窗口計算時再統一處理。窗口處理函數就是一種典型的
全窗口函數。
窗 口 處 理 函 數 ProcessWindowFunction 的 使 用 與 其 他 窗 口 函 數 類 似 , 也 是 基 于
WindowedStream 直接調用方法就可以,只不過這時調用的是 .process() 。
stream.keyBy( t -> t.f0 )
.window( TumblingEventTimeWindows.of(Time.seconds(10)) )
.process(new MyProcessWindowFunction())
7.3.2 ProcessWindowFunction 解析
ProcessWindowFunction 既是處理函數又是全窗口函數。從名字上也可以推測出,它的本
質似乎更傾向于“窗口函數”一些。事實上它的用法也確實跟其他處理函數有很大不同。我們
可以從源碼中的定義看到這一點:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
extends AbstractRichFunction {
...
public abstract void process(
KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws
Exception;
public void clear(Context context) throws Exception {}
public abstract class Context implements java.io.Serializable {...}
}
ProcessWindowFunction 依然是一個繼承了 AbstractRichFunction 的抽象類,它有四個類型
參數:
? IN : input ,數據流中窗口任務的輸入數據類型。
? OUT : output ,窗口任務進行計算之后的輸出數據類型。
? KEY :數據中鍵 key 的類型。
196 197
? W :窗口的類型,是 Window 的子類型。一般情況下我們定義時間窗口, W
就是 TimeWindow 。
而內部定義的方法,跟我們之前熟悉的處理函數就有所區別了。因為全窗口函數不是逐個
處理元素的,所以處理數據的方法在這里并不是 .processElement() ,而是改成了 .process() 。方
法包含四個參數。
? key :窗口做統計計算基于的鍵,也就是之前 keyBy 用來分區的字段。
? context :當前窗口進行計算的上下文,它的類型就是 ProcessWindowFunction
內部定義的抽象類 Context 。
? elements :窗口收集到用來計算的所有數據,這是一個可迭代的集合類型。
? out :用來發送數據輸出計算結果的收集器,類型為 Collector 。
可以明顯看出,這里的參數不再是一個輸入數據,而是窗口中所有數據的集合。而上下文
context 所包含的內容也跟其他處理函數有所差別:
public abstract class Context implements java.io.Serializable {
public abstract W window();
public abstract long currentProcessingTime();
public abstract long currentWatermark();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
除了可以通過 .output() 方法定義側輸出流不變外,其他部分都有所變化。這里不再持有
TimerService 對象,只能通過 currentProcessingTime() 和 currentWatermark() 來獲取當前時間,所
以失去了設置定時器的功能;另外由于當前不是只處理一個數據,所以也不再提供 .timestamp()
方法。與此同時,也增加了一些獲取其他信息的方法:比如可以通過 .window() 直接獲取到當
前的窗口對象,也可以通過 .windowState() 和 .globalState() 獲取到當前自定義的窗口狀態和全局
狀態。注意這里的“窗口狀態”是自定義的,不包括窗口本身已經有的狀態,針對當前 key 、
當前窗口有效;而“全局狀態”同樣是自定義的狀態,針對當前 key 的所有窗口有效。
所以我們會發現, ProcessWindowFunction 中除了 .process() 方法外,并沒有 .onTimer() 方法,
而是多出了一個 .clear() 方法。從名字就可以看出,這主要是方便我們進行窗口的清理工作。如
果我們自定義了窗口狀態,那么必須在 .clear() 方法中進行顯式地清除,避免內存溢出。
這里有一個問題:沒有了定時器,那窗口處理函數就失去了一個最給力的武器,如果我們
希望有一些定時操作又該怎么做呢?其實仔細思考會發現,對于窗口而言,它本身的定義就包
含了一個觸發計算的時間點,其實一般情況下是沒有必要再去做定時操作的。如果非要這么干,
Flink 也提供了另外的途徑——使用窗口觸發器( Trigger )。在觸發器中也有一個 TriggerContext ,
它可以起到類似 TimerService 的作用:獲取當前時間、注冊和刪除定時器,另外還可以獲取當 前的狀態。這樣設計無疑會讓處理流程更加清晰——定時操作也是一種“觸發”,所以我們就
讓所有的觸發操作歸觸發器管,而所有處理數據的操作則歸窗口函數管。
至于另一種窗口處理函數 ProcessAllWindowFunction ,它的用法非常類似。區別在于它基
于的是 AllWindowedStream ,相當于對沒有 keyBy 的數據流直接開窗并調用 .process() 方法 :
stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) )
.process(new MyProcessAllWindowFunction())
7.4 應用案例—— Top N
窗口的計算處理,在實際應用中非常常見。對于一些比較復雜的需求,如果增量聚合函數
無法滿足,我們就需要考慮使用窗口處理函數這樣的“大招”了。
網站中一個非常經典的例子,就是實時統計一段時間內的熱門 url 。例如,需要統計最近
10 秒鐘內最熱門的兩個 url 鏈接,并且每 5 秒鐘更新一次。我們知道,這可以用一個滑動窗口
來實現,而“熱門度”一般可以直接用訪問量來表示。于是就需要開滑動窗口收集 url 的訪問
數據,按照不同的 url 進行統計,而后匯總排序并最終輸出前兩名。這其實就是著名的“ Top N ”
問題。
很顯然,簡單的增量聚合可以得到 url 鏈接的訪問量,但是后續的排序輸出 Top N 就很難
實現了。所以接下來我們用窗口處理函數進行實現。
7.4.1 使用 ProcessAllWindowFunction
一種最簡單的想法是,我們干脆不區分 url 鏈接,而是將所有訪問數據都收集起來,統一
進行統計計算。所以可以不做 keyBy ,直接基于 DataStream 開窗,然后使用全窗口函數
ProcessAllWindowFunction 來進行處理。
在窗口中可以用一個 HashMap 來保存每個 url 的訪問次數,只要遍歷窗口中的所有數據,
自然就能得到所有 url 的熱門度。最后把 HashMap 轉成一個列表 ArrayList ,然后進行排序、
取出前兩名輸出就可以了。
代碼具體實現如下:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import
198
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
public class ProcessAllWindowTopN {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> eventStream = env.addSource(new
ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new
SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long
recordTimestamp) {return element.timestamp;}}));// 只需要 url 就可以統計數量,所以轉換成 String 直接開窗統計SingleOutputStreamOperator<String> result = eventStream.map(new MapFunction<Event, String>() {@Overridepublic String map(Event value) throws Exception {return value.url;}})
199
200.windowAll(SlidingEventTimeWindows.of(Time.seconds(10),
Time.seconds(5))) // 開滑動窗口.process(new ProcessAllWindowFunction<String, String, TimeWindow>()
{@Overridepublic void process(Context context, Iterable<String> elements,
Collector<String> out) throws Exception {HashMap<String, Long> urlCountMap = new HashMap<>();// 遍歷窗口中數據,將瀏覽量保存到一個 HashMap 中for (String url : elements) {if (urlCountMap.containsKey(url)) {long count = urlCountMap.get(url);urlCountMap.put(url, count + 1L);} else {urlCountMap.put(url, 1L);}}ArrayList<Tuple2<String, Long>> mapList = new
ArrayList<Tuple2<String, Long>>();// 將瀏覽量數據放入 ArrayList,進行排序for (String key : urlCountMap.keySet()) {mapList.add(Tuple2.of(key, urlCountMap.get(key)));}mapList.sort(new Comparator<Tuple2<String, Long>>() {@Overridepublic int compare(Tuple2<String, Long> o1, Tuple2<String,
Long> o2) {return o2.f1.intValue() - o1.f1.intValue();}});// 取排序后的前兩名,構建輸出結果StringBuilder result = new StringBuilder();result.append("========================================\n");for (int i = 0; i < 2; i++) {Tuple2<String, Long> temp = mapList.get(i);String info = "瀏覽量 No." + (i + 1) +" url:" + temp.f0 +" 瀏覽量:" + temp.f1 +" 窗 口 結 束 時 間 : " + new
Timestamp(context.window().getEnd()) + "\n";result.append(info);}result.append("========================================\n");out.collect(result.toString());}});result.print();env.execute();}
}
運行結果如下所示:

7.4.2 使用 KeyedProcessFunction
在上一小節的實現過程中,我們沒有進行按鍵分區,直接將所有數據放在一個分區上進行
了開窗操作。這相當于將并行度強行設置為 1 ,在實際應用中是要盡量避免的,所以 Flink 官
方也并不推薦使用 AllWindowedStream 進行處理。另外,我們在全窗口函數中定義了 HashMap
來統計 url 鏈接的瀏覽量,計算過程是要先收集齊所有數據、然后再逐一遍歷更新 HashMap ,
這顯然不夠高效。如果我們可以利用增量聚合函數的特性,每來一條數據就更新一次對應 url
的瀏覽量,那么到窗口觸發計算時只需要做排序輸出就可以了。
基于這樣的想法,我們可以從兩個方面去做優化:一是對數據進行按鍵分區,分別統計瀏
覽量;二是進行增量聚合,得到結果最后再做排序輸出。所以,我們可以使用增量聚合函數
AggregateFunction 進行瀏覽量的統計,然后結合 ProcessWindowFunction 排序輸出來實現 Top N
的需求。
具體實現思路就是,先按照 url 對數據進行 keyBy 分區,然后開窗進行增量聚合。這里就
會發現一個問題:我們進行按鍵分區之后,窗口的計算就會只針對當前 key 有效了;也就是說,
每個窗口的統計結果中,只會有一個 url 的瀏覽量,這是無法直接用 ProcessWindowFunction
進行排序的。所以我們只能分成兩步:先對每個 url 鏈接統計出瀏覽量,然后再將統計結果收
集起來,排序輸出最終結果。因為最后的排序還是基于每個時間窗口的,所以為了讓輸出的統
計結果中包含窗口信息,我們可以借用第六章中定義的 POJO 類 UrlViewCount 來表示,它包
201 含了 url 、瀏覽量( count )以及窗口的起始結束時間。之后對 UrlViewCount 的處理,可以先按
窗口分區,然后用 KeyedProcessFunction 來實現。
總結處理流程如下:
( 1 )讀取數據源;
( 2 )篩選瀏覽行為( pv );
( 3 )提取時間戳并生成水位線;
( 4 )按照 url 進行 keyBy 分區操作;
( 5 )開長度為 1 小時、步長為 5 分鐘的事件時間滑動窗口;
( 6 )使用增量聚合函數 AggregateFunction ,并結合全窗口函數 WindowFunction 進行窗口
聚合,得到每個 url 、在每個統計窗口內的瀏覽量,包裝成 UrlViewCount ;
( 7 )按照窗口進行 keyBy 分區操作;
( 8 )對同一窗口的統計結果數據,使用 KeyedProcessFunction 進行收集并排序輸出。
糟糕的是,這里又會帶來另一個問題。最后我們用 KeyedProcessFunction 來收集數據做排
序,這時面對的就是窗口聚合之后的數據流,而窗口已經不存在了;那到底什么時候會收集齊
所有數據呢?這問題聽起來似乎有些沒道理。我們統計瀏覽量的窗口已經關閉,就說明了當前
已經到了要輸出結果的時候,直接輸出不就行了嗎?
沒有這么簡單。因為數據流中的元素是逐個到來的,所以即使理論上我們應該“同時”收
到很多 url 的瀏覽量統計結果,實際也是有先后的、只能一條一條處理。下游任務(就是我們
定義的 KeyedProcessFunction )看到一個 url 的統計結果,并不能保證這個時間段的統計數據
不會再來了,所以也不能貿然進行排序輸出。解決的辦法,自然就是要等所有數據到齊了——
這很容易讓我們聯想起水位線設置延遲時間的方法。這里我們也可以“多等一會兒”,等到水
位線真正超過了窗口結束時間,要統計的數據就肯定到齊了。
具體實現上,可以采用一個延遲觸發的事件時間定時器。基于窗口的結束時間來設定延遲,
其實并不需要等太久——因為我們是靠水位線的推進來觸發定時器,而水位線的含義就是“之
前的數據都到齊了”。所以我們只需要設置 1 毫秒的延遲,就一定可以保證這一點。
而在等待過程中,之前已經到達的數據應該緩存起來,我們這里用一個自定義的“列表狀
態”( ListState )來進行存儲,如圖 7-2 所示。這個狀態需要使用富函數類的 getRuntimeContext()
方法獲取運行時上下文來定義,我們一般把它放在 open() 生命周期方法中。之后每來一個
UrlViewCount ,就把它添加到當前的列表狀態中,并注冊一個觸發時間為窗口結束時間加 1
毫秒( windowEnd + 1 )的定時器。待到水位線到達這個時間,定時器觸發,我們可以保證當
前窗口所有 url 的統計結果 UrlViewCount 都到齊了;于是從狀態中取出進行排序輸出。

具體代碼實現如下:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
public class KeyedProcessTopN {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從自定義數據源讀取數據SingleOutputStreamOperator<Event> eventStream = env.addSource(new
ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonot
onousTimestamps().withTimestampAssigner(new
SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long
recordTimestamp) {return element.timestamp;}}));// 需要按照 url 分組,求出每個 url 的訪問量SingleOutputStreamOperator<UrlViewCount> urlCountStream =eventStream.keyBy(data -> data.url).window(SlidingEventTimeWindows.of(Time.seconds(10),
Time.seconds(5))).aggregate(new UrlViewCountAgg(),new UrlViewCountResult());// 對結果中同一個窗口的統計數據,進行排序處理SingleOutputStreamOperator<String> result = urlCountStream.keyBy(data ->
data.windowEnd).process(new TopN(2));result.print("result");env.execute();}// 自定義增量聚合public static class UrlViewCountAgg implements AggregateFunction<Event, Long,
204
Long> {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event value, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return null;}}// 自定義全窗口函數,只需要包裝窗口信息public static class UrlViewCountResult extends ProcessWindowFunction<Long,
UrlViewCount, String, TimeWindow> {@Overridepublic void process(String url, Context context, Iterable<Long> elements,
Collector<UrlViewCount> out) throws Exception {// 結合窗口信息,包裝輸出內容Long start = context.window().getStart();Long end = context.window().getEnd();out.collect(new UrlViewCount(url, elements.iterator().next(), start,
end));}}// 自定義處理函數,排序取 top npublic static class TopN extends KeyedProcessFunction<Long, UrlViewCount,
String>{
205// 將 n 作為屬性private Integer n;// 定義一個列表狀態private ListState<UrlViewCount> urlViewCountListState;public TopN(Integer n) {this.n = n;}@Overridepublic void open(Configuration parameters) throws Exception {// 從環境中獲取列表狀態句柄urlViewCountListState = getRuntimeContext().getListState(new ListStateDescriptor<UrlViewCount>("url-view-count-list",Types.POJO(UrlViewCount.class)));}@Overridepublic void processElement(UrlViewCount value, Context ctx,
Collector<String> out) throws Exception {// 將 count 數據添加到列表狀態中,保存起來urlViewCountListState.add(value);// 注冊 window end + 1ms 后的定時器,等待所有數據到齊開始排序ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String>
out) throws Exception {// 將數據從列表狀態變量中取出,放入 ArrayList,方便排序ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();for (UrlViewCount urlViewCount : urlViewCountListState.get()) {urlViewCountArrayList.add(urlViewCount);}// 清空狀態,釋放資源urlViewCountListState.clear();// 排序urlViewCountArrayList.sort(new Comparator<UrlViewCount>() {@Override
206
207public int compare(UrlViewCount o1, UrlViewCount o2) {return o2.count.intValue() - o1.count.intValue();}});// 取前兩名,構建輸出結果StringBuilder result = new StringBuilder();
result.append("========================================\n");result.append("窗口結束時間:" + new Timestamp(timestamp - 1) + "\n");for (int i = 0; i < this.n; i++) {UrlViewCount UrlViewCount = urlViewCountArrayList.get(i);String info = "No." + (i + 1) + " "+ "url:" + UrlViewCount.url + " "+ "瀏覽量:" + UrlViewCount.count + "\n";result.append(info);}result.append("========================================\n");out.collect(result.toString());}}
}
代碼中,我們還利用了定時器的特性:針對同一 key 、同一時間戳會進行去重。所以對于
同一個窗口而言,我們接到統計結果數據后設定的 windowEnd + 1 的定時器都是一樣的,最終
只會觸發一次計算。而對于不同的 key (這里 key 是 windowEnd ),定時器和狀態都是獨立的,
所以我們也不用擔心不同窗口間數據的干擾。
我們在上面的代碼中使用了后面要講解的 ListState 。這里可以先簡單說明一下。我們先聲
明一個列表狀態變量 :
private ListState<Event> UrlViewCountListState;
然后在 open 方法中初始化了列表狀態變量,我們初始化的時候使用了 ListStateDescriptor
描述符,這個描述符用來告訴 Flink 列表狀態變量的名字和類型。列表狀態變量是單例,也就
是說只會被實例化一次。這個列表狀態變量的作用域是當前 key 所對應的邏輯分區。我們使用
add 方法向列表狀態變量中添加數據,使用 get 方法讀取列表狀態變量中的所有元素。
另外,根據水位線的定義,我們這里的延遲時間設為 0 事實上也是可以保證數據都到齊的。
感興趣的讀者可以自行修改代碼進行測試。 208
7.5 側輸出流( Side Output )
處理函數還有另外一個特有功能,就是將自定義的數據放入“側輸出流”( side output )輸
出。這個概念我們并不陌生,之前在講到窗口處理遲到數據時,最后一招就是輸出到側輸出流。
而這種處理方式的本質,其實就是處理函數的側輸出流功能。
我們之前講到的絕大多數轉換算子,輸出的都是單一流,流里的數據類型只能有一種。而
側輸出流可以認為是“主流”上分叉出的“支流”,所以可以由一條流產生出多條流,而且這
些流中的數據類型還可以不一樣。利用這個功能可以很容易地實現“分流”操作。
具體應用時,只要在處理函數的 .processElement() 或者 .onTimer() 方法中,調用上下文
的 .output() 方法就可以了。
DataStream<Integer> stream = env.addSource(...);
SingleOutputStreamOperator<Long> longStream =
stream.process(new
ProcessFunction<Integer, Long>() {
@Override
public void processElement( Integer value, Context ctx, Collector<Integer>
out) throws Exception {
// 轉換成 Long ,輸出到主流中
out.collect(Long.valueOf(value));
// 轉換成 String ,輸出到側輸出流中
ctx.output(outputTag, "side-output: " + String.valueOf(value));
}
});
這里 output() 方法需要傳入兩個參數,第一個是一個“輸出標簽” OutputTag ,用來標識側
輸出流,一般會在外部統一聲明;第二個就是要輸出的數據。
我們可以在外部先將 OutputTag 聲明出來:
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
如果想要獲取這個側輸出流,可以基于處理之后的 DataStream 直接調用 .getSideOutput()
方法,傳入對應的 OutputTag ,這個方式與窗口 API 中獲取側輸出流是完全一樣的。
DataStream<String> stringStream = longStream.getSideOutput(outputTag);
7.6 本章總結
Flink 擁有非常豐富的多層 API ,而底層的處理函數可以說是最為強大、最為靈活的一種。
廣義上來講,處理函數也可以認為是 DataStream API 中的一部分,它的調用方式與其他轉換
算子完全一致。處理函數可以訪問時間、狀態,定義定時操作,它可以直接觸及流處理最為本
質的組成部分。所以處理函數不僅是我們處理復雜需求時兜底的“大招”,更是理解流處理本
質的重要一環。
在本章中,我們詳細介紹了處理函數的功能和底層的結構,重點講解了最為常用的
KeyedProcessFunction 和 ProcessWindowFunction ,并實現了電商應用中 Top N 的經典案例,另
外還介紹了側輸出流的用法。而關于合并兩條流之后的處理函數,以及廣播連接流
( BroadcastConnectedStream )的處理操作,調用方法和原理都非常類似,我們會在后續章節繼
續展開。
8. 多流轉換
無論是基本的簡單轉換和聚合,還是基于窗口的計算,我們都是針對一條流上的數據進行
處理的。而在實際應用中,可能需要將不同來源的數據連接合并在一起處理,也有可能需要將
一條流拆分開,所以經常會有對多條流進行處理的場景。本章我們就來討論 Flink 中對多條流
進行轉換的操作。
簡單劃分的話,多流轉換可以分為“分流”和“合流”兩大類。目前分流的操作一般是通
過側輸出流( side output )來實現,而合流的算子比較豐富,根據不同的需求可以調用 union 、
connect 、 join 以及 coGroup 等接口進行連接合并操作。下面我們就進行具體的講解。
8.1 分流
所謂“分流”,就是將一條數據流拆分成完全獨立的兩條、甚至多條流。也就是基于一個
DataStream ,得到完全平等的多個子 DataStream ,如圖 8-1 所示。一般來說,我們會定義一些
篩選條件,將符合條件的數據揀選出來放到對應的流里。

8.1.1 簡單實現
其實根據條件篩選數據的需求,本身非常容易實現:只要針對同一條流多次獨立調
用 .filter() 方法進行篩選,就可以得到拆分之后的流了。
例如,我們可以將電商網站收集到的用戶行為數據進行一個拆分,根據類型( type )的不
同,分為“ Mary ”的瀏覽數據、“ Bob ”的瀏覽數據等等。那么代碼就可以這樣實現:
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());// 篩選 Mary 的瀏覽行為放入 MaryStream 流中DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>()
{@Overridepublic boolean filter(Event value) throws Exception {return value.user.equals("Mary");}});// 篩選 Bob 的購買行為放入 BobStream 流中DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {
210@Overridepublic boolean filter(Event value) throws Exception {return value.user.equals("Bob");}});// 篩選其他人的瀏覽行為放入 elseStream 流中DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>()
{@Overridepublic boolean filter(Event value) throws Exception {return !value.user.equals("Mary") && !value.user.equals("Bob") ;}});MaryStream.print("Mary pv");BobStream.print("Bob pv");elseStream.print("else pv");env.execute();}}
輸出結果是:
Bob pv> Event{user='Bob', url='./home', timestamp=2021-06-23 17:30:57.388}
else pv> Event{user='Alice', url='./home', timestamp=2021-06-23 17:30:58.399}
else pv> Event{user='Alice', url='./home', timestamp=2021-06-23 17:30:59.409}
Bob pv> Event{user='Bob', url='./home', timestamp=2021-06-23 17:31:00.424}
else pv> Event{user='Alice', url='./prod?id=1', timestamp=2021-06-23
17:31:01.441}
else pv> Event{user='Alice', url='./prod?id=1', timestamp=2021-06-23
17:31:02.449}
Mary pv> Event{user='Mary', url='./home', timestamp=2021-06-23 17:31:03.465}
這種實現非常簡單,但代碼顯得有些冗余——我們的處理邏輯對拆分出的三條流其實是一
樣的,卻重復寫了三次。而且這段代碼背后的含義,是將原始數據流 stream 復制三份,然后
對每一份分別做篩選;這明顯是不夠高效的。我們自然想到,能不能不用復制流,直接用一個
算子就把它們都拆分開呢?
在早期的版本中, DataStream API 中提供了一個 .split() 方法,專門用來將一條流“切分”
成多個。它的基本思路其實就是按照給定的篩選條件,給數據分類“蓋戳”;然后基于這條蓋
戳之后的流,分別揀選想要的“戳”就可以得到拆分后的流。這樣我們就不必再對流進行復制
了。不過這種方法有一個缺陷:因為只是“蓋戳”揀選,所以無法對數據進行轉換,分流后的
數據類型必須跟原始流保持一致。這就極大地限制了分流操作的應用場景。現在 split 方法已
經淘汰掉了,我們以后分流只使用下面要講的側輸出流。
8.1.2 使用側輸出流
在 Flink 1.13 版本中,已經棄用了 .split() 方法,取而代之的是直接用處理函數( process
function )的側輸出流( side output )。
我們知道,處理函數本身可以認為是一個轉換算子,它的輸出類型是單一的,處理之后得
到的仍然是一個 DataStream ;而側輸出流則不受限制,可以任意自定義輸出數據,它們就像從
“主流”上分叉出的“支流”。盡管看起來主流和支流有所區別,不過實際上它們都是某種類型
的 DataStream ,所以本質上還是平等的。利用側輸出流就可以很方便地實現分流操作,而且得
到的多條 DataStream 類型可以不同,這就給我們的應用帶來了極大的便利。
關于處理函數中側輸出流的用法,我們已經在 7.5 節做了詳細介紹。簡單來說,只需要調
用上下文 ctx 的 .output() 方法,就可以輸出任意類型的數據了。而側輸出流的標記和提取,都
離不開一個“輸出標簽”( OutputTag ),它就相當于 split() 分流時的“戳”,指定了側輸出流的
id 和類型。
我們可以使用側輸出流將上一小節的分流代碼改寫如下:
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class SplitStreamByOutputTag {// 定義輸出標簽,側輸出流的數據類型為三元組(user, url, timestamp)private static OutputTag<Tuple3<String, String, Long>> MaryTag = new
OutputTag<Tuple3<String, String, Long>>("Mary-pv"){};private static OutputTag<Tuple3<String, String, Long>> BobTag = new
OutputTag<Tuple3<String, String, Long>>("Bob-pv"){};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());SingleOutputStreamOperator<Event> processedStream = stream.process(new
ProcessFunction<Event, Event>() {@Overridepublic void processElement(Event value, Context ctx, Collector<Event>
out) throws Exception {if (value.user.equals("Mary")){ctx.output(MaryTag, new Tuple3<>(value.user, value.url,
value.timestamp));} else if (value.user.equals("Bob")){
212ctx.output(BobTag, new Tuple3<>(value.user, value.url,
value.timestamp));} else {out.collect(value);}}});processedStream.getSideOutput(MaryTag).print("Mary pv");processedStream.getSideOutput(BobTag).print("Bob pv");processedStream.print("else");env.execute();}
}
輸出結果是:
Bob pv> (Bob,./prod?id=1,1624442886645)
Mary pv> (Mary,./prod?id=1,1624442887664)
Bob pv> (Bob,./home,1624442888673)
Mary pv> (Mary,./prod?id=1,1624442889676)
else> Event{user='Alice', url='./prod?id=1', timestamp=2021-06-23 18:08:10.693}
else> Event{user='Alice', url='./prod?id=1', timestamp=2021-06-23 18:08:11.697}
else> Event{user='Alice', url='./prod?id=1', timestamp=2021-06-23 18:08:12.702}
Mary pv> (Mary,./cart,1624442893705)
Bob pv> (Bob,./cart,1624442894710)
else> Event{user='Alice', url='./cart', timestamp=2021-06-23 18:08:15.722}
Mary pv> (Mary,./prod?id=1,1624442896725)
這里我們定義了兩個側輸出流,分別揀選 Mary 的瀏覽事件和 Bob 的瀏覽事件;由于類型
已經確定,我們可以只保留 ( 用戶 id, url, 時間戳 ) 這樣一個三元組。而剩余的事件則直接輸出
到主流,類型依然保留 Event ,就相當于之前的 elseStream 。這樣的實現方式顯然更簡潔,也
更加靈活。
8.2 基本合流操作
既然一條流可以分開,自然多條流就可以合并。在實際應用中,我們經常會遇到來源不同
的多條流,需要將它們的數據進行聯合處理。所以 Flink 中合流的操作會更加普遍,對應的
API 也更加豐富。
8.2.1 聯合( Union )
最簡單的合流操作,就是直接將多條流合在一起,叫作流的“聯合”( union ),如圖 8-2
所示。聯合操作要求必須流中的數據類型必須相同,合并之后的新流會包括所有流中的元素,
數據類型不變。這種合流方式非常簡單粗暴,就像公路上多個車道匯在一起一樣。

在代碼中,我們只要基于 DataStream 直接調用 .union() 方法,傳入其他 DataStream 作為參
數,就可以實現流的聯合了;得到的依然是一個 DataStream :
stream1.union(stream2, stream3, ...)
注意: union() 的參數可以是多個 DataStream ,所以聯合操作可以實現多條流的合并。
這里需要考慮一個問題。在事件時間語義下,水位線是時間的進度標志;不同的流中可能
水位線的進展快慢完全不同,如果它們合并在一起,水位線又該以哪個為準呢?
還以要考慮水位線的本質含義,是“之前的所有數據已經到齊了”;所以對于合流之后的
水位線,也是要以最小的那個為準,這樣才可以保證所有流都不會再傳來之前的數據。換句話
說,多流合并時處理的時效性是以最慢的那個流為準的。我們自然可以想到,這與之前介紹的
并行任務水位線傳遞的規則是完全一致的;多條流的合并,某種意義上也可以看作是多個并行
任務向同一個下游任務匯合的過程。
我們可以用下面的代碼做一個簡單測試:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class UnionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);
214SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop102",
7777).map(data -> {String[] field = data.split(",");return new Event(field[0].trim(), field[1].trim(),
Long.valueOf(field[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBound
edOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new
SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long
recordTimestamp) {return element.timestamp;}}));stream1.print("stream1");SingleOutputStreamOperator<Event> stream2 =
env.socketTextStream("hadoop103", 7777).map(data -> {String[] field = data.split(",");return new Event(field[0].trim(), field[1].trim(),
Long.valueOf(field[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBound
edOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new
SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long
recordTimestamp) {return element.timestamp;}}));stream2.print("stream2");// 合并兩條流stream1.union(stream2).process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, Context ctx,
Collector<String> out) throws Exception {out.collect(" 水 位 線 : " +
ctx.timerService().currentWatermark());}}).print();env.execute();}
}
這里為了更清晰地看到水位線的進展,我們創建了兩條流來讀取 socket