Flink CEP
1、何為CEP?
CEP,全稱為復雜事件處理(Complex Event Processing),是一種用于實時監測和分析數據流的技術。
CEP詳細講解:
CEP是基于動態環境的事件流的分析技術,事件是狀態變化(持續生成數據)的。通過分析事件間的關系,利用過濾、關聯、聚合等技術,根據事件間的【時序關系和聚合關系】制定檢測規則,持續地從事件流中查詢出【符合規則要求】的事件序列,最終分析得到更復雜的復合事件。
簡單概括:CEP就是對于復雜事件制定規則,篩選所需的事件。
復雜事件的定義:
復雜事件是由一個或多個簡單事件按照特定的模式、關系和邏輯組合而成的高級事件,比如說“連續登錄失敗”,或者“訂單支付超時”等等【通常會對這些事件進行警報處理】。
在“連續登錄失敗”這個案例中,每個單獨的登錄失敗事件都可以被視為一個簡單事件,而當這些簡單事件在特定時間窗口內連續發生時(例如,連續兩次或三次登錄失敗),它們就構成了一個復雜事件。
2、核心組件
Flink為 CEP 提供了專門的Flink CEP library,其中主要包含了以下的核心組件:Pattern Event Stream
、Pattern Definition(模式定義)
、Pattern Detection(模式檢測)
和生成Alert
。
1. 模式定義【規則的具體化過程】
根據業務需求,在Flink的DataStream API上定義出復雜事件的模式條件。
2. 模式檢測【規則的執行】
一旦模式條件被定義并應用到DataStream上,Flink CEP引擎就會開始對這些數據流(Event Stream)中的事件進行實時檢測。引擎會監聽輸入流中的事件,并根據之前定義的模式條件來匹配和識別復雜事件。
3. 生成警告(或其他處理邏輯)
當Flink CEP引擎**【檢測到】符合模式條件的事件序列時,它會根據定義的【處理邏輯】來執行相應的動作**。其中通常會進行警告處理,如”連續登錄失敗“這個事件,考慮到會是黑客入侵等操作,從而生成警報(Alter Generation),并通知(Notification)相關人員進行處理。當然也會進行其他處理邏輯,如數據聚合、統計計算、事件轉換等。
3、CEP基本流程
目標:從有序的簡單事件流中發現一些高階特征。
復雜事件處理(CEP)的流程可以分成三個步驟:
(1) 定義一個匹配規則
(2) 將匹配規則應用到事件流(由一個或多個簡單事件構成)上,檢測滿足規則的復雜事件
(3) 對檢測到的復雜事件進行處理,得到結果進行輸出
1.目標:從有序的簡單事件流中發現一些高階特征。
2.輸入:一個或多個由簡單事件構成的事件流。
3.處理:識別簡單事件之間的內在聯系,多個符合一定規則的簡單事件構成復雜事件。
4.輸出:滿足規則的復雜事件。
如圖所示,輸入是不同形狀的事件流,我們可以定義一個匹配規則:連續兩個圓。那么將這個規則應用到輸入流上,就可以檢測到兩組匹配的復雜事件。它們構成了一個新的“復雜事件流”,流中的數據就變成了一組一組的復雜事件,每個數據都包含了兩個圓。接下來,我們就可以針對檢測到的復雜事件,處理之后輸出一個提示或報警信息了。
4、CEP相關依賴
<properties><flink.version>1.13.2</flink.version>
</properties><!-- flink-CEP(復雜事件規則引擎):制定規則,篩選數據 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
5、Pattern API(模擬API)
實際操作:一般使用Java來編寫CEP規則,因為CEP對Scala編寫不友好。
模式API可以讓你定義想從輸入流中抽取的復雜模式序列。
1.1. 基本含義
含義:處理事件的【規則】通常由【多個相互關聯的模式(Pattern)組成】。Flink CEP提供了Pattern API用于對輸入流數據進行復雜事件規則【制定規則】的定義,用來提取符合規則的事件序列。
一般模式大致分為三類:單個模式,組合模式,模式組。
1.2. 三種模式講解
1)子事件
/**UserAction 為主事件UserLogin 繼承自 UserAction 為子事件
*/
Pattern.<UserAction>begin(...).subType(UserLogin.class).where(new SimpleCondition<UserLogin>() {@Overridepublic boolean filter(UserLogin userAction) throws Exception {return userAction.actType().equals(UserAction.LOGIN());}})...
2)個體模式(Individual Patterns)
“個體模式”實際上是單一對象(單一規則)或簡單模式。
定義:個體模式就是組成復雜規則的每一個單獨的模式。
特點:個體模式可以是單例模式或者循環模式。
- 單例模式:只接受一個事件。
- 循環模式:可以接受多個事件,并通過指定循環次數來定義接收事件的數量。
個體模式默認為單例模式,可以使用【量詞】轉換成【循環模式】,每個模式可以有一個或者多個條件來決定它接受哪些事件。
一:量詞
含義:量詞是給定符合特定條件的事件出現的次數。比如:定義個體模式為“匹配形狀為三角形的事件”,再讓它循環多次,就變成了“匹配連續多個三角形的事件”。注意這里的“連續”,只要保證前后順序即可,中間可以有其他事件,所以是“寬松近鄰”關系。
在Flink CEP中,你可以通過這些方法指定循環模式(有以下四種基本形式):
pattern.times(#ofTimes)
,指定期望一個給定事件出現特定次數的模式;pattern.times(#fromTimes, #toTimes)
,指定期望一個給定事件出現次數在一個最小值和最大值間;pattern.oneOrMore()
,指定期望一個給定事件出現一次或者多次的模式【無上限】;pattern.timesOrMore(#ofTimes)
,指定一個給定事件出現特定次數或多次的模式;應用范圍:量詞(如
.oneOrMore()
)是應用于整個由begin()
開始到某個結束條件(如果有的話,比如.until()
)之間的事件序列的。如:當你看到oneOrMore()
應用于某個next
定義的模式時,它意味著該模式(即next
指向的事件類型)可以在其前面的模式(可能是begin
或另一個next
)之后重復出現一次或多次。
下面以 start 為名字的模式對象,解釋所涉及到的量詞意義【舉例】。
greedy():指定期望一個給定事件出現盡可能多次的模式;
optional():指定期望一個給定事件出現0或1次的模式;
// 期望出現4次
start.times(4)// 期望出現0次或者4次
start.times(4).optional()// 期望出現2次、3次或者4次
start.times(2, 4)// 期望出現2次、3次或者4次,并且盡可能多【但不得超過4次】
start.times(2, 4).greedy()// 期望出現0次、2次、3次或者4次
start.times(2, 4).optional()// 期望出現0、2、3或者4次,并且盡可能多【但不得超過4次】
start.times(2, 4).optional().greedy()// 期望出現1到多次
start.oneOrMore()// 期望出現1到多次,并且盡可能多【無上限】
start.oneOrMore().greedy()// 期望出現0到多次
start.oneOrMore().optional()// 期望出現0到多次,并且盡可能多【無上限】
start.oneOrMore().optional().greedy()// 期望出現2到多次
start.timesOrMore(2)// 期望出現2到多次,并且盡可能多【無上限】
start.timesOrMore(2).greedy()// 期望出現0、2或多次
start.timesOrMore(2).optional()// 期望出現0、2或多次,并且盡可能多【無上限】
start.timesOrMore(2).optional().greedy()
二:條件
對每個模式你可以指定一個條件來決定一個進來的事件是否被接受進入這個模式,指定判斷事件屬性的條件可以通過
pattern.where()
【與】、pattern.or()
【或】或者pattern.until()
【終止】方法。
-
迭代條件【默認】
含義:這是最普遍的條件類型。使用它可以指定一個基于前面已經被接受的事件【上一個模式流入此處的事件】的屬性,或者它們的一個子集的統計數據來決定是否接受時間序列的條件。
應用場景:將當前事件跟之前的事件做對比,才能判斷出要不要接受當前事件。
分析:迭代條件能夠獲取已經匹配的事件,如果自身又是循環模式(比如量詞oneOrMore),那么兩者結合就可以捕獲自身之前接收的數據,據此來判斷是否接受當前事件。這個功能非常強大,我們可以由此實現更加復雜的需求,比如可以要求“只有大于之前數據的平均值,才接受當前事件”。
案例講解:模式名稱為“middle”的循環模式,其可以接受事件發生一次或多次。因此在下面的迭代條件中,我們通過
ctx.getEventsForPattern(“middle”)
獲取當前模式已經接受的事件,計算它們的總價(getMoney()
)之和;再加上當前事件中的數量,如果總和小于100,就接受當前事件,否則就不匹配。最終我們的匹配規則就是:循環匹配的所有事件價格(getMoney()
)之和并且總和必須小于 100。middle.oneOrMore().where(new IterativeCondition<Event>() { @Overridepublic boolean filter(Event value, Context<Event> ctx) throws Exception {int sum = value.getMoney();// 獲取當前模式之前已經匹配的事件,求所有事件 amount 之和// middle為已定義的模式名稱for (Event event : ctx.getEventsForPattern("middle")) { sum += event.getMoney();}// 在總數量小于 100 時,當前事件滿足匹配規則,可以匹配成功return sum < 100;}});
-
簡單條件
這種類型的條件擴展了前面提到的IterativeCondition類
它決定是否接受一個事件只取決于事件自身的屬性。
// 規則定義【簡單條件】:2~4個的溫度都在40度以上的事件 Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0") // 模式 + 鎖定泛型 => 一個事件.times(2,4) // 量詞:2~4個.where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}});
-
組合條件
定義:對同一個事件而言,可將當前條件和其他的條件(
where()
,or()
,until()
)結合起來使用。注意:這適用于任何條件,你可以通過依次調用
where()
來組合條件,最終的結果是每個單一條件的結果的邏輯and。如果想使用or來組合條件,你可以像下面這樣使用or()方法。// 組合條件:2個[溫度都在大于等于40度 or 溫度小于等于20度]的事件【非連續】 // or:其中兩個判斷條件滿足其一即可 Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0") // 模式 + 鎖定泛型 => 一個事件.times(2) // 量詞:2個.where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40; // 判斷條件:溫度都在大于等于40度}}).or(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature()<=20; // 判斷條件:溫度小于等于20度}});
-
停止條件
如果使用循環模式(
oneOrMore()
和oneOrMore().optional()
),可以指定一個停止條件,建議使用.until()
作為停止條件,以便清理狀態。until在Flink CEP中的工作方式:一旦until條件被滿足,則當前匹配的模式序列將結束,并且不會包含導致until條件滿足的那個事件。之后,CEP引擎會開始嘗試匹配新的模式序列。
// 終止條件:一旦出現滿足溫度小于20的事件,則之前所有連續的溫度事件的序列結束,開始匹配新的模式序列。 // 注意:不包括滿足條件的哪個事件本身。 Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0") // 模式 + 鎖定泛型 => 一個事件.oneOrMore().until(new SimpleCondition<TemperatureEvent>() {/*until(condition); 為循環模式指定一個停止條件。意思是滿足了給定的條件的事件出現后,就不會再有事件被接受進入模式了。只適用于和oneOrMore()同時使用。另外:在基于事件的條件中,它可用于清理對應模式的狀態。*/@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() < 20;}});
3)組合模式(Combining Patterns)
“組合模式”實際是多對象(多規則)組合。
組合模式由一個初始模式作為開頭,如下所示:
val start : Pattern[Event, _] = Pattern.begin("start")
嚴格連續(嚴格緊鄰)
next()
:期望所有匹配的事件嚴格的一個接一個出現,中間沒有任何不匹配的事件。
// 嚴格連續: 【連續】3個溫度在40度以上的事件
Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}}).next("stage-1").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}}).next("stage-2").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}});
松散連續(寬松近鄰)
followedBy()
:忽略匹配的事件之間的不匹配的事件。理解:當且僅當數據為a,c,b,b時,對于followedBy模式而言命中的為{a,b}
// 模擬數據
new TemperatureEvent("江寧1", 33.0, 1723626529138L),
new TemperatureEvent("江寧2", 36.0, 1723626529569L),
new TemperatureEvent("江寧3", 43.0, 1723626531234L),
new TemperatureEvent("江寧4", 18.0, 1723626532685L),
new TemperatureEvent("江寧5", 18.0, 1723626532996L),// 松散連續: 兩個都大于40度的溫度【非連續,允許忽略某些元素】
Pattern<TemperatureEvent, TemperatureEvent> rule3 = Pattern.<TemperatureEvent>begin("stage-0").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}}).followedBy("stage-1").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}});最終結果:
--------------------
江寧3 江寧4
--------------------
非確定的松散連續(非確定性寬松近鄰)
followedByAny()
:更進一步的松散連續,允許忽略掉一些匹配事件的附加匹配。理解:當且僅當數據為a,c,b,b時,對于followedByAny而言會有兩次命中{a,b},{a,b}
// 模擬數據
new TemperatureEvent("江寧1", 33.0, 1723626529138L),
new TemperatureEvent("江寧2", 36.0, 1723626529569L),
new TemperatureEvent("江寧3", 43.0, 1723626531234L),
new TemperatureEvent("江寧4", 18.0, 1723626532685L),
new TemperatureEvent("江寧5", 18.0, 1723626532996L),// 不確定的松散連續: 兩個都大于40度的溫度【非連續,允許忽略某些元素】
Pattern<TemperatureEvent, TemperatureEvent> rule3 = Pattern.<TemperatureEvent>begin("stage-0").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}}).followedByAny("stage-1").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() == 18;}});最終結果:
--------------------
江寧3 江寧4
江寧3 江寧5
--------------------
除了以上模式序列外,還可以定義“不希望出現某種近鄰關系”:
notNext()
:希望后面不緊接著是某個特定事件。notFollowedBy()
:不希望某個特定事件發生在兩個事件之間的任何地方。
【溫馨提示】①所有模式序列必須以.begin()開始;②模式序列不能以.notFollowedBy()結束;③“not”類型的模式不能被optional所修飾;④可以為模式指定時間約束,用來要求在多長時間內匹配有效。
3)模式組(Pattern Groups)
使用模式組的目的:復用性。
也可以定義一個模式序列作為begin,followedBy,followedByAny和next的條件。這個模式序列在邏輯上會被當作匹配的條件, 并且返回一個
GroupPattern
,可以在GroupPattern上使用oneOrMore(),times(#ofTimes), times(#fromTimes, #toTimes),optional(),consecutive(),allowCombinations()。
// start
val start: Pattern[Event, _] = Pattern.begin(Pattern.begin[Event]("start").where(...).followedBy("start_middle").where(...)
)// 嚴格連續:通過start生成strict
val strict: Pattern[Event, _] = start.next(Pattern.begin[Event]("next_start").where(...).followedBy("next_middle").where(...)
).times(3)// 松散連續:通過start生成relaxed
val relaxed: Pattern[Event, _] = start.followedBy(Pattern.begin[Event]("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore()// 不確定松散連續:通過start生成nonDetermin
val nonDetermin: Pattern[Event, _] = start.followedByAny(Pattern.begin[Event]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional()
1.3. 匹配后跳過策略
對于一個給定的模式,同一個事件可能會分配到多個成功的匹配上。為了控制一個事件會分配到多少個匹配上,你需要指定跳過策略AfterMatchSkipStrategy。 有五種跳過策略,如下:
-
NO_SKIP
: 每個成功的匹配都會被輸出,不跳過任何環節【默認】。 -
SKIP_TO_NEXT
: 保留相同事件開始的第一條數據。目的:避免重復處理相同起點的多個匹配,從而節省計算資源和提高處理效率。
具體體現:如果檢測到一個匹配,并且該匹配以某個特定事件(事件A)為起始點,那么系統將會丟棄(即不再考慮)所有其他以該事件(事件A)開始但尚未完成或正在進行的其他部分匹配。
舉例:在(A1,A2,A3,C1,A4,A5,C2,C3,C4)中,條件是兩個連續A事件,后面跟隨C事件[followedByAny()]。若不使用
SKIP_TO_NEXT
跳過策略,輸出結果為:(A1,A2,C1),(A2,A3,C1),(A4,A5,C2),(A4,A5,C3),(A4,A5,C4)。若使用SKIP_TO_NEXT
跳過策略,輸出結果為:(A1,A2,C1),(A2,A3,C1),(A4,A5,C2)。 -
SKIP_PAST_LAST_EVENT
: 只保留第一條數據。目的:減少冗余匹配,明確匹配邊界。
具體體現:在成功匹配一個復雜事件模式后,系統將丟棄(不再考慮)從當前匹配的開始事件到結束事件之間的所有其他部分匹配或未完成的匹配序列。
舉例:在(A1,A2,A3,C1,A4,A5,C2,C3,C4)中,條件是兩個連續A事件,后面跟隨C事件[followedByAny()]。若不使用
SKIP_PAST_LAST_EVENT
跳過策略,則輸出的結果為:(A1,A2,C1),(A2,A3,C1),(A4,A5,C2),(A4,A5,C3),(A4,A5,C4)。若使用SKIP_PAST_LAST_EVENT
跳過策略,則輸出的結果為:(A1,A2,C1),(A4,A5,C2)。 -
SKIP_TO_FIRST
: 當匹配到某個模式的開始事件時,如果在匹配過程中遇到無法匹配的事件,它會跳到模式中第一個匹配事件之后繼續處理。目的:處理和過濾事件流,修改起始點
具體體現:一旦模式匹配到定義了
SKIP_TO_FIRST
的相應階段,就會跳過之前的階段,直接從指定的模式處(a2
)開始處理后續事件,但已經匹配到的序列仍然被認為是有效的。舉例:
假設事件序列是
(a1 a2 a3 b)
,并且你使用SKIP_TO_FIRST
策略,以a2
為SKIP_TO_FIRST
指定的開始事件。若中途遇到無法匹配的,它會跳到模式中第一個匹配事件(也就是a2
)之后繼續處理,相當于只留下 a2 開始的匹配【初始時,已經匹配到的序列(a1 a2 a3 b)中a1仍然被認為是有效的。遇到無法匹配后,起始點從a1
變為a2
,此后就從a2
開始,前面的a1
就不在考慮了】。最終得到的匹配順序是(a1 a2 a3 b),(a2 a3 b),(a2 b)。若未遇到不匹配現象,則順序為(a1 a2 a3 b)。 -
SKIP_TO_LAST
: 跳過模式的開始事件的所有匹配,從模式的最后一個事件之后的事件開始新的匹配。目的:避免重復匹配
具體體現:跳過所有從模式開始的事件,而從模式中的最后一個事件之后的位置繼續處理后續事件。
舉例:
假設事件序列是
(a1 a2 a3 b)
,并且你使用SKIP_TO_LAST
策略,找到 a1 開始的匹配(a1 a2 a3 b)后,跳過所有 a1、a2 開始的匹配,跳到以最后一個 a(也就是 a3)為開始的匹配。最終得到(a1 a2 a3 b),(a3 b)。其中若未出現不匹配現象,則最終得到(a1 a2 a3 b)。
【溫馨提示】當使用SKIP_TO_FIRST
和SKIP_TO_LAST
策略時,需要指定一個合法的PatternName。
應用基本格式:
// 定義跳過策略
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("stage-2");Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0",skipStrategy) // 添加跳過策略.where(...)
1.4. 有效時間約束(within)
可以為模式定義一個有效時間約束。 例如,你可以通過pattern.within()
方法指定一個模式應該在10秒內發生。 這種時間模式支持處理時間和事件時間。
【溫馨提示】一個模式序列只能有一個時間限制。如果限制了多個時間在不同的單個模式上,會使用最小的那個時間限制。
next.within(Time.seconds(10))
錯誤總結
①
Option not applicable to NOT pattern
: 量詞不和not連用②
Optional pattern cannot be preceded by greedy pattern
:greedy 之后不能接 optional③
NotFollowedBy is not supported as a last part of a Pattern
:notFollowedBy不能是規則的最后部分
2、模式的檢測處理
一、Pattern檢測(將模式應用于流上)
含義:將匹配規則應用到事件流(由一個或多個簡單事件構成)上,檢測滿足規則的復雜事件
為了在事件流上運行所創建的模式(rule),需要創建一個patStream。 給定一個輸入流input,一個模式rule, 可以通過調用如下方法來創建patStream:
PatternStream<UserAction> patStream = CEP.pattern(stream, rule);
二、處理匹配事件
含義:創建 patStream 之后,就可以應用 select 或者 flatselect 方法,從檢測到的事件序列中提取事件。
- select() 方法需要輸入一個
PatternSelectFunction
作為參數,返回一個結果。 - flatselect() 方法需要輸入一個
PatternFlatSelectFunction
作為參數,返回任意多個結果。
select() 以一個 Map[String,Iterable [IN]] 來接收匹配到的事件序列,其中 【key】 就是每個【模式名稱】,而 【value】 就是所有接收到的【所有事件】的 Iterable 類型。
patStream.select(new PatternSelectFunction<TemperatureEvent, String>() {// key:模式;value:事件@Overridepublic String select(Map<String, List<TemperatureEvent>> pattern) throws Exception {List<TemperatureEvent> events = pattern.get("stage-0"); // 根據key來獲取value// 業務:獲取事件的第一條和最后一條List<TemperatureEvent> sortedEvents = events.stream().sorted((a, b) -> (int) (a.getTimestamp() - b.getTimestamp())) // 排序.collect(Collectors.toList());TemperatureEvent first = sortedEvents.get(0); // 第一條事件TemperatureEvent last = sortedEvents.get(sortedEvents.size() - 1); // 最后一條事件double diff = (last.getTimestamp() - first.getTimestamp()) / 1000.0;return String.format("%s %s %.2f",first.getDistrict(),last.getDistrict(),diff);}
})
三、超時事件提取
應用原因:當一個模式通過within關鍵字定義了檢測窗口時間時,部分事件序列可能因為超過窗口長度被丟棄,為了能夠處理這些超時的部分匹配,select和flatSelectAPI調用允許指定超時處理程序。
Pattern<UserLoginEvent, ?> pattern = Pattern.<UserLoginEvent>begin("start") .where(event -> !event.isSuccess()) // 第一個事件是登錄失敗 .next("middle").where(new SimpleCondition<UserLoginEvent>() { @Override public boolean filter(UserLoginEvent event) { return !event.isSuccess(); // 中間事件也是登錄失敗 } }) .oneOrMore() // 表示“middle”模式可以重復一次或多次 .consecutive() // 強調這些事件必須是連續的 .next("end").where(new SimpleCondition<UserLoginEvent>() { @Override public boolean filter(UserLoginEvent event) { return !event.isSuccess(); // 最后一個事件也是登錄失敗 } }) .within(Time.seconds(10)); // 所有這些事件必須在10秒內發生(時間的限制)
解決方案:超時處理程序會被接收到目前為止由模式匹配到的所有事件,由一個OutputTag定義接收到的超時事件序列(超時事件會放到另外【側輸出流】中)。
// Pattern檢測
PatternStream<Event> patStream = CEP.pattern(stream, rule);// 側輸出流
OutputTag<String> outputTag = new OutputTag<String>("late-data"){};// 處理模式流并捕獲遲到數據
SingleOutputStreamOperator<ComplexEvent> result = patStream.sideOutputLateData(outputTag).select(new PatternSelectFunction<Event, ComplexEvent>() {...});// 獲取側輸出流
DataStream<String> lateData = result.getSideOutput(outputTag);
3、實際應用
實例1:監測某地的溫度并告警
監測某地的溫度在一小時內連續三次大于設定溫度,則進行告警處理。
步驟一:定義事件:溫度數據
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;@Data
@AllArgsConstructor
@ToString
// 模擬天氣數據
public class TemperatureEvent implements Serializable {private String mechineName; // 區域private Double temperature; // 溫度private Long timestamp; // 數據生成時間
}
步驟二:創建Flink環境并配置CEP
import modules.env.Environments;
import modules.time.Timer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;import java.util.Arrays;
import java.util.List;
import java.util.Map;// 監測某地的溫度在一小時內連續三次大于設定溫度,則進行告警處理
public class TempCEP {public static void main(String[] args) throws Exception {// 創建環境StreamExecutionEnvironment see = new Environments().build().enableCheckpoint("file:///D:/phase/flink_state_backend", 3, 1, 1).enableRetries(3, 1).enableStateBackend("hashmap", true, false).finish(RuntimeExecutionMode.STREAMING, 1, 3);// 模擬事件SingleOutputStreamOperator<TemperatureEvent> stream = see.fromCollection(Arrays.asList(new TemperatureEvent("江寧1", 33.0, 1723626529138L),new TemperatureEvent("江寧2", 36.0, 1723626529569L),new TemperatureEvent("江寧3", 43.0, 1723626531234L),new TemperatureEvent("江寧4", 18.0, 1723626532996L),new TemperatureEvent("江寧5", 39.0, 1723626533247L),new TemperatureEvent("江寧6", 41.0, 1723626533888L),new TemperatureEvent("江寧7", 41.0, 1723626534096L),new TemperatureEvent("江寧8", 42.0, 1723626535011L),new TemperatureEvent("江寧9", 42.0, 1723626535011L),new TemperatureEvent("江寧10", 41.0, 1723626535011L),new TemperatureEvent("江寧11", 52.0, 1723626535555L))).assignTimestampsAndWatermarks(// 生成單調遞增的水印,其中10 表示最大允許的延遲時間(即水印的間隔時間)【毫秒】Timer.monotonous(60));// 設定預警溫度double TEMPERATURE_SETTING = 40;// 定義CEP模式Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-1").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() > TEMPERATURE_SETTING;}}).next("stage-2").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() > TEMPERATURE_SETTING;}}).next("stage-3").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() > TEMPERATURE_SETTING;}}).within(Time.hours(1));// 在模式匹配時執行操作PatternStream<TemperatureEvent> patStream = CEP.pattern(stream, rule);patStream.select(new PatternSelectFunction<TemperatureEvent, String>() {@Overridepublic String select(Map<String, List<TemperatureEvent>> pattern) throws Exception {String district1 = pattern.get("stage-1").get(0).getDistrict();String district2 = pattern.get("stage-2").get(0).getDistrict();String district3 = pattern.get("stage-3").get(0).getDistrict();Double first = pattern.get("stage-1").get(0).getTemperature();Double second = pattern.get("stage-2").get(0).getTemperature();Double third = pattern.get("stage-3").get(0).getTemperature();return String.format("告警: 地點 %s -> %s -> %s,溫度在一小時內連續三次超過 40 度!,分別是%.2f度,%.2f度,%.2f度",district1,district2,district3,first,second,third);}}).print();see.execute("CEP_TEMPERATURE");}
}
結果展示:
案例2:異常檢測 - 機箱溫度檢測
需求:同一個機箱連續兩次溫度超標,報警
拓展需求:鍋爐房溫度檢測;信用卡反欺詐:連續大額消費;反作弊:同一個用戶短時間內連續登陸失敗
- flink cep
- pattern定義
- pattern匹配
- select 選出匹配到的事件,報警
步驟一:定義事件:機箱數據
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;import java.io.Serializable;@Data
@AllArgsConstructor
@ToString
// 模擬機箱數據
public class TemperatureEvent implements Serializable {private String chassis; // 機箱private Double temperature; // 溫度
}
步驟二:具體情況分析
1)連續三次高于26度
package cases.test05;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;
import java.util.List;
import java.util.Map;public class Detection {public static void main(String[] args) throws Exception {// 環境配置StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 模擬機箱的數據DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),new TemperatureEvent("xyz", 20.1),new TemperatureEvent("xyz", 21.1),new TemperatureEvent("xyz", 22.2),new TemperatureEvent("xyz", 22.1),new TemperatureEvent("xyz", 26.3),new TemperatureEvent("xyz", 28.1),new TemperatureEvent("xyz", 27.4),new TemperatureEvent("xyz", 22.3),new TemperatureEvent("xyz", 26.7),new TemperatureEvent("xyz", 27.0))// 設置水位線.assignTimestampsAndWatermarks(WatermarkStrategy.<TemperatureEvent>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<TemperatureEvent>() {@Overridepublic long extractTimestamp(TemperatureEvent temperatureEvent, long l) {return System.currentTimeMillis();}}));// 連續三次高于26度Pattern<TemperatureEvent,?> warningPattern = Pattern.<TemperatureEvent>begin("fgt261").where(new SimpleCondition<TemperatureEvent>() {private static final long serialVersionUID = 1L;@Overridepublic boolean filter(TemperatureEvent value) throws Exception {return value.getTemperature()>=26;}}).next("fgt262").where(new SimpleCondition<TemperatureEvent>() {private static final long serialVersionUID = 1L;@Overridepublic boolean filter(TemperatureEvent value) throws Exception {return value.getTemperature()>=26;}}).next("fgt263").where(new SimpleCondition<TemperatureEvent>() {private static final long serialVersionUID = 1L;@Overridepublic boolean filter(TemperatureEvent value) throws Exception {return value.getTemperature()>=26;}});DataStream<String> patternStream = CEP.pattern(inputEventStream,warningPattern).select(new PatternSelectFunction<TemperatureEvent, String>() {@Overridepublic String select(Map<String, List<TemperatureEvent>> pattern) throws Exception {Double first = pattern.get("fgt261").get(0).getTemperature();Double second = pattern.get("fgt262").get(0).getTemperature();Double third = pattern.get("fgt263").get(0).getTemperature();return String.format("告警: 機箱連續三次高于26度,分別是%.2f度,%.2f度,%.2f度",first,second,third);}});patternStream.print();env.execute("Detection");}
}
結果展示:
2)3秒內3次以上平均溫度超過26度
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;
import java.util.List;
import java.util.Map;public class Detection {public static void main(String[] args) throws Exception {// 環境配置StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 模擬機箱的數據DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),new TemperatureEvent("xyz", 20.1),new TemperatureEvent("xyz", 21.1),new TemperatureEvent("xyz", 22.2),new TemperatureEvent("xyz", 22.1),new TemperatureEvent("xyz", 22.3),new TemperatureEvent("xyz", 25.1),new TemperatureEvent("xyz", 27.4),new TemperatureEvent("xyz", 22.3),new TemperatureEvent("xyz", 26.7),new TemperatureEvent("xyz", 27.0),new TemperatureEvent("xyz", 24.4),new TemperatureEvent("xyz", 22.3),new TemperatureEvent("xyz", 21.0))// 設置水位線.assignTimestampsAndWatermarks(WatermarkStrategy.<TemperatureEvent>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<TemperatureEvent>() {@Overridepublic long extractTimestamp(TemperatureEvent temperatureEvent, long l) {return System.currentTimeMillis();}}));// 3秒內3次以上平均溫度超過26度Pattern<TemperatureEvent,?> warningPattern = Pattern.<TemperatureEvent>begin("fgt261") // 3秒內至少3次.timesOrMore(3).where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent temperatureEvent) throws Exception {return temperatureEvent.getTemperature()>26;}}).within(Time.seconds(3)).next("fgt262") // 平均溫度超過26度.where(new IterativeCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {int count = 0;double avg = 0;for (TemperatureEvent te : ctx.getEventsForPattern("fgt261")) {count++;avg += te.getTemperature();}return count>=3 && avg/count>=26;}});DataStream<String> patternStream = CEP.pattern(inputEventStream,warningPattern).select(new PatternSelectFunction<TemperatureEvent, String>() {@Overridepublic String select(Map<String, List<TemperatureEvent>> pattern) throws Exception {return pattern.toString();}});patternStream.print();env.execute("Detection");}
}
結果展示:
案例3:登錄事件異常檢測
同一個用戶連續兩次登陸失敗,報警
- flink cep
- pattern定義
- pattern匹配
- select輸出報警事件
//需求: 如果同一個userid在三秒之內連續兩次登陸失敗,報警。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);// 這里mock了事件流,這個事件流一般從Kafka過來
DataStream<LoginEvent> loginEventStream = env.fromCollection(Arrays.asList( //自定義一個pojo類:userId、ip、typenew LoginEvent("1", "192.168.0.1", "fail"),new LoginEvent("1", "192.168.0.2", "fail"),new LoginEvent("1", "192.168.0.3", "fail"),new LoginEvent("2", "192.168.10.10", "success")
));Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("start")//泛型類或泛型接口上的泛型形參是不能用于靜態成員的,那么當靜態方法需要用到泛型時,只能用泛型方法。.where(new IterativeCondition<LoginEvent>() { // 模式開始事件的匹配條件為事件類型為fail, 為迭代條件@Overridepublic boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {return loginEvent.getType().equals("fail");}}).next("next").where(new IterativeCondition<LoginEvent>() { // 事件的匹配條件為事件類型為fail@Overridepublic boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {return loginEvent.getType().equals("fail");}}).within(Time.seconds(3));// 要求緊鄰的兩個事件發生的時間間隔不能超過3秒鐘// 以userid分組,形成keyedStream,然后進行模式匹配 ::方法引用
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);DataStream<LoginWarning> loginFailDataStream = patternStream.select((Map<String, List<LoginEvent>> pattern) -> {List<LoginEvent> first = pattern.get("start");List<LoginEvent> second = pattern.get("next");return new LoginWarning(first.get(0).getUserId(), first.get(0).getIp(), first.get(0).getType());}
);loginFailDataStream.print();
env.execute();