4_Flink CEP

Flink CEP

1、何為CEP?

CEP,全稱為復雜事件處理(Complex Event Processing),是一種用于實時監測和分析數據流的技術。

CEP詳細講解

CEP是基于動態環境的事件流的分析技術,事件是狀態變化(持續生成數據)的。通過分析事件間的關系,利用過濾、關聯、聚合等技術,根據事件間的【時序關系和聚合關系制定檢測規則,持續地從事件流中查詢出【符合規則要求】的事件序列,最終分析得到更復雜的復合事件。

簡單概括:CEP就是對于復雜事件制定規則,篩選所需的事件。

復雜事件的定義

復雜事件是由一個或多個簡單事件按照特定的模式、關系和邏輯組合而成的高級事件,比如說“連續登錄失敗”,或者“訂單支付超時”等等【通常會對這些事件進行警報處理】。

在“連續登錄失敗”這個案例中,每個單獨的登錄失敗事件都可以被視為一個簡單事件,而當這些簡單事件在特定時間窗口內連續發生時(例如,連續兩次或三次登錄失敗),它們就構成了一個復雜事件。

2、核心組件

Flink為 CEP 提供了專門的Flink CEP library,其中主要包含了以下的核心組件:Pattern Event StreamPattern Definition(模式定義)Pattern Detection(模式檢測)生成Alert

1. 模式定義【規則的具體化過程】

根據業務需求,在Flink的DataStream API上定義出復雜事件的模式條件。

2. 模式檢測【規則的執行】

一旦模式條件被定義并應用到DataStream上,Flink CEP引擎就會開始對這些數據流(Event Stream)中的事件進行實時檢測。引擎會監聽輸入流中的事件,并根據之前定義的模式條件來匹配和識別復雜事件

3. 生成警告(或其他處理邏輯)

當Flink CEP引擎**【檢測到】符合模式條件的事件序列時,它會根據定義的【處理邏輯】來執行相應的動作**。其中通常會進行警告處理,如”連續登錄失敗“這個事件,考慮到會是黑客入侵等操作,從而生成警報(Alter Generation),并通知(Notification)相關人員進行處理。當然也會進行其他處理邏輯,如數據聚合、統計計算、事件轉換等。

3、CEP基本流程

目標:從有序的簡單事件流中發現一些高階特征。

復雜事件處理(CEP)的流程可以分成三個步驟:
(1) 定義一個匹配規則
(2) 將匹配規則應用到事件流(由一個或多個簡單事件構成)上,檢測滿足規則的復雜事件
(3) 對檢測到的復雜事件進行處理,得到結果進行輸出

1.目標:從有序的簡單事件流中發現一些高階特征。
2.輸入:一個或多個由簡單事件構成的事件流。
3.處理:識別簡單事件之間的內在聯系,多個符合一定規則的簡單事件構成復雜事件。
4.輸出:滿足規則的復雜事件。

如圖所示,輸入是不同形狀的事件流,我們可以定義一個匹配規則:連續兩個圓。那么將這個規則應用到輸入流上,就可以檢測到兩組匹配的復雜事件。它們構成了一個新的“復雜事件流”,流中的數據就變成了一組一組的復雜事件,每個數據都包含了兩個圓。接下來,我們就可以針對檢測到的復雜事件,處理之后輸出一個提示或報警信息了。

4、CEP相關依賴

<properties><flink.version>1.13.2</flink.version>
</properties><!-- flink-CEP(復雜事件規則引擎):制定規則,篩選數據 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>

5、Pattern API(模擬API)

實際操作:一般使用Java來編寫CEP規則,因為CEP對Scala編寫不友好。

模式API可以讓你定義想從輸入流中抽取的復雜模式序列

1.1. 基本含義

含義處理事件的【規則】通常由【多個相互關聯的模式(Pattern)組成】。Flink CEP提供了Pattern API用于對輸入流數據進行復雜事件規則【制定規則】的定義,用來提取符合規則的事件序列。

一般模式大致分為三類:單個模式組合模式模式組

1.2. 三種模式講解

1)子事件
/**UserAction 為主事件UserLogin 繼承自 UserAction 為子事件
*/
Pattern.<UserAction>begin(...).subType(UserLogin.class).where(new SimpleCondition<UserLogin>() {@Overridepublic boolean filter(UserLogin userAction) throws Exception {return userAction.actType().equals(UserAction.LOGIN());}})...
2)個體模式(Individual Patterns)

“個體模式”實際上是單一對象(單一規則)或簡單模式

定義:個體模式就是組成復雜規則的每一個單獨的模式

特點:個體模式可以是單例模式或者循環模式

  • 單例模式:只接受一個事件
  • 循環模式:可以接受多個事件,并通過指定循環次數來定義接收事件的數量

個體模式默認為單例模式,可以使用【量詞】轉換成【循環模式】,每個模式可以有一個或者多個條件來決定它接受哪些事件。

一:量詞

含義:量詞是給定符合特定條件的事件出現的次數。比如:定義個體模式為“匹配形狀為三角形的事件”,再讓它循環多次,就變成了“匹配連續多個三角形的事件”。注意這里的“連續”,只要保證前后順序即可,中間可以有其他事件,所以是“寬松近鄰”關系。

在Flink CEP中,你可以通過這些方法指定循環模式(有以下四種基本形式):

  • pattern.times(#ofTimes),指定期望一個給定事件出現特定次數的模式;
  • pattern.times(#fromTimes, #toTimes),指定期望一個給定事件出現次數在一個最小值和最大值間;
  • pattern.oneOrMore(),指定期望一個給定事件出現一次或者多次的模式【無上限】;
  • pattern.timesOrMore(#ofTimes),指定一個給定事件出現特定次數或多次的模式;

應用范圍:量詞(如 .oneOrMore())是應用于整個由 begin() 開始到某個結束條件(如果有的話,比如 .until())之間的事件序列的。如:當你看到 oneOrMore() 應用于某個 next 定義的模式時,它意味著該模式(即 next 指向的事件類型)可以在其前面的模式(可能是 begin 或另一個 next)之后重復出現一次或多次。

下面以 start 為名字的模式對象,解釋所涉及到的量詞意義【舉例】。

greedy():指定期望一個給定事件出現盡可能多次的模式;

optional():指定期望一個給定事件出現0或1次的模式;

// 期望出現4次
start.times(4)// 期望出現0次或者4次
start.times(4).optional()// 期望出現2次、3次或者4次
start.times(2, 4)// 期望出現2次、3次或者4次,并且盡可能多【但不得超過4次】
start.times(2, 4).greedy()// 期望出現0次、2次、3次或者4次
start.times(2, 4).optional()// 期望出現0、2、3或者4次,并且盡可能多【但不得超過4次】
start.times(2, 4).optional().greedy()// 期望出現1到多次
start.oneOrMore()// 期望出現1到多次,并且盡可能多【無上限】
start.oneOrMore().greedy()// 期望出現0到多次
start.oneOrMore().optional()// 期望出現0到多次,并且盡可能多【無上限】
start.oneOrMore().optional().greedy()// 期望出現2到多次
start.timesOrMore(2)// 期望出現2到多次,并且盡可能多【無上限】
start.timesOrMore(2).greedy()// 期望出現0、2或多次
start.timesOrMore(2).optional()// 期望出現0、2或多次,并且盡可能多【無上限】
start.timesOrMore(2).optional().greedy()
二:條件

對每個模式你可以指定一個條件來決定一個進來的事件是否被接受進入這個模式,指定判斷事件屬性的條件可以通過pattern.where()【與】、pattern.or()【或】或者pattern.until()【終止】方法。

  • 迭代條件【默認】

    含義:這是最普遍的條件類型。使用它可以指定一個基于前面已經被接受的事件【上一個模式流入此處的事件】的屬性,或者它們的一個子集的統計數據來決定是否接受時間序列的條件。

    應用場景:將當前事件跟之前的事件做對比,才能判斷出要不要接受當前事件。

    分析:迭代條件能夠獲取已經匹配的事件,如果自身又是循環模式(比如量詞oneOrMore),那么兩者結合就可以捕獲自身之前接收的數據,據此來判斷是否接受當前事件。這個功能非常強大,我們可以由此實現更加復雜的需求,比如可以要求“只有大于之前數據的平均值,才接受當前事件”。

    案例講解:模式名稱為“middle”的循環模式,其可以接受事件發生一次或多次。因此在下面的迭代條件中,我們通過ctx.getEventsForPattern(“middle”)獲取當前模式已經接受的事件,計算它們的總價(getMoney())之和;再加上當前事件中的數量,如果總和小于100,就接受當前事件,否則就不匹配。最終我們的匹配規則就是:循環匹配的所有事件價格(getMoney())之和并且總和必須小于 100。

    middle.oneOrMore().where(new IterativeCondition<Event>() { @Overridepublic boolean filter(Event value, Context<Event> ctx) throws Exception {int sum = value.getMoney();// 獲取當前模式之前已經匹配的事件,求所有事件 amount 之和// middle為已定義的模式名稱for (Event event : ctx.getEventsForPattern("middle")) { sum += event.getMoney();}// 在總數量小于 100 時,當前事件滿足匹配規則,可以匹配成功return sum < 100;}});
    
  • 簡單條件

    這種類型的條件擴展了前面提到的IterativeCondition類

    它決定是否接受一個事件只取決于事件自身的屬性。

    // 規則定義【簡單條件】:2~4個的溫度都在40度以上的事件
    Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0") // 模式 + 鎖定泛型  => 一個事件.times(2,4) // 量詞:2~4個.where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}});
    
  • 組合條件

    定義:對同一個事件而言,可將當前條件和其他的條件(where()or()until())結合起來使用。

    注意:這適用于任何條件,你可以通過依次調用where() 來組合條件,最終的結果是每個單一條件的結果的邏輯and。如果想使用or來組合條件,你可以像下面這樣使用or()方法

    // 組合條件:2個[溫度都在大于等于40度 or 溫度小于等于20度]的事件【非連續】
    // or:其中兩個判斷條件滿足其一即可
    Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0") // 模式 + 鎖定泛型  => 一個事件.times(2) // 量詞:2個.where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40; // 判斷條件:溫度都在大于等于40度}}).or(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature()<=20; // 判斷條件:溫度小于等于20度}});
    
  • 停止條件

    如果使用循環模式oneOrMore()oneOrMore().optional()),可以指定一個停止條件,建議使用.until()作為停止條件,以便清理狀態。

    until在Flink CEP中的工作方式:一旦until條件被滿足,則當前匹配的模式序列將結束,并且不會包含導致until條件滿足的那個事件。之后,CEP引擎會開始嘗試匹配新的模式序列。

    // 終止條件:一旦出現滿足溫度小于20的事件,則之前所有連續的溫度事件的序列結束,開始匹配新的模式序列。
    // 注意:不包括滿足條件的哪個事件本身。
    Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0")	// 模式 + 鎖定泛型  => 一個事件.oneOrMore().until(new SimpleCondition<TemperatureEvent>() {/*until(condition); 為循環模式指定一個停止條件。意思是滿足了給定的條件的事件出現后,就不會再有事件被接受進入模式了。只適用于和oneOrMore()同時使用。另外:在基于事件的條件中,它可用于清理對應模式的狀態。*/@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() < 20;}});
    
3)組合模式(Combining Patterns)

“組合模式”實際是多對象(多規則)組合

組合模式由一個初始模式作為開頭,如下所示:

val start : Pattern[Event, _] = Pattern.begin("start")
嚴格連續(嚴格緊鄰)

next():期望所有匹配的事件嚴格的一個接一個出現,中間沒有任何不匹配的事件。

// 嚴格連續: 【連續】3個溫度在40度以上的事件
Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}}).next("stage-1").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}}).next("stage-2").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}});
松散連續(寬松近鄰)

followedBy():忽略匹配的事件之間的不匹配的事件。

理解:當且僅當數據為a,c,b,b時,對于followedBy模式而言命中的為{a,b}

// 模擬數據
new TemperatureEvent("江寧1", 33.0, 1723626529138L),
new TemperatureEvent("江寧2", 36.0, 1723626529569L),
new TemperatureEvent("江寧3", 43.0, 1723626531234L),
new TemperatureEvent("江寧4", 18.0, 1723626532685L),
new TemperatureEvent("江寧5", 18.0, 1723626532996L),// 松散連續: 兩個都大于40度的溫度【非連續,允許忽略某些元素】
Pattern<TemperatureEvent, TemperatureEvent> rule3 = Pattern.<TemperatureEvent>begin("stage-0").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}}).followedBy("stage-1").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}});最終結果:
--------------------
江寧3 江寧4
--------------------
非確定的松散連續(非確定性寬松近鄰)

followedByAny():更進一步的松散連續,允許忽略掉一些匹配事件的附加匹配。

理解:當且僅當數據為a,c,b,b時,對于followedByAny而言會有兩次命中{a,b},{a,b}

// 模擬數據
new TemperatureEvent("江寧1", 33.0, 1723626529138L),
new TemperatureEvent("江寧2", 36.0, 1723626529569L),
new TemperatureEvent("江寧3", 43.0, 1723626531234L),
new TemperatureEvent("江寧4", 18.0, 1723626532685L),
new TemperatureEvent("江寧5", 18.0, 1723626532996L),// 不確定的松散連續: 兩個都大于40度的溫度【非連續,允許忽略某些元素】
Pattern<TemperatureEvent, TemperatureEvent> rule3 = Pattern.<TemperatureEvent>begin("stage-0").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}}).followedByAny("stage-1").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() == 18;}});最終結果:
--------------------
江寧3 江寧4
江寧3 江寧5
--------------------

除了以上模式序列外,還可以定義“不希望出現某種近鄰關系”:

  • notNext():希望后面不緊接著是某個特定事件。
  • notFollowedBy():不希望某個特定事件發生在兩個事件之間的任何地方。

【溫馨提示】①所有模式序列必須以.begin()開始;②模式序列不能以.notFollowedBy()結束;③“not”類型的模式不能被optional所修飾;④可以為模式指定時間約束,用來要求在多長時間內匹配有效。

3)模式組(Pattern Groups)

使用模式組的目的:復用性。

也可以定義一個模式序列作為begin,followedBy,followedByAny和next的條件。這個模式序列在邏輯上會被當作匹配的條件, 并且返回一個GroupPattern,可以在GroupPattern上使用oneOrMore(),times(#ofTimes), times(#fromTimes, #toTimes),optional(),consecutive(),allowCombinations()。

// start
val start: Pattern[Event, _] = Pattern.begin(Pattern.begin[Event]("start").where(...).followedBy("start_middle").where(...)
)// 嚴格連續:通過start生成strict
val strict: Pattern[Event, _] = start.next(Pattern.begin[Event]("next_start").where(...).followedBy("next_middle").where(...)
).times(3)// 松散連續:通過start生成relaxed
val relaxed: Pattern[Event, _] = start.followedBy(Pattern.begin[Event]("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore()// 不確定松散連續:通過start生成nonDetermin
val nonDetermin: Pattern[Event, _] = start.followedByAny(Pattern.begin[Event]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional()

1.3. 匹配后跳過策略

對于一個給定的模式,同一個事件可能會分配到多個成功的匹配上。為了控制一個事件會分配到多少個匹配上,你需要指定跳過策略AfterMatchSkipStrategy。 有五種跳過策略,如下:

  • NO_SKIP: 每個成功的匹配都會被輸出,不跳過任何環節【默認】。

  • SKIP_TO_NEXT: 保留相同事件開始的第一條數據。

    目的:避免重復處理相同起點的多個匹配,從而節省計算資源和提高處理效率。

    具體體現:如果檢測到一個匹配,并且該匹配以某個特定事件(事件A)為起始點,那么系統將會丟棄(即不再考慮)所有其他以該事件(事件A)開始但尚未完成或正在進行的其他部分匹配。

    舉例:在(A1,A2,A3,C1,A4,A5,C2,C3,C4)中,條件是兩個連續A事件,后面跟隨C事件[followedByAny()]。若不使用SKIP_TO_NEXT跳過策略,輸出結果為:(A1,A2,C1),(A2,A3,C1),(A4,A5,C2),(A4,A5,C3),(A4,A5,C4)。若使用SKIP_TO_NEXT跳過策略,輸出結果為:(A1,A2,C1),(A2,A3,C1),(A4,A5,C2)。

  • SKIP_PAST_LAST_EVENT: 只保留第一條數據。

    目的:減少冗余匹配,明確匹配邊界。

    具體體現:在成功匹配一個復雜事件模式后,系統將丟棄(不再考慮)當前匹配的開始事件結束事件之間的所有其他部分匹配或未完成的匹配序列

    舉例:在(A1,A2,A3,C1,A4,A5,C2,C3,C4)中,條件是兩個連續A事件,后面跟隨C事件[followedByAny()]。若不使用SKIP_PAST_LAST_EVENT跳過策略,則輸出的結果為:(A1,A2,C1),(A2,A3,C1),(A4,A5,C2),(A4,A5,C3),(A4,A5,C4)。若使用SKIP_PAST_LAST_EVENT跳過策略,則輸出的結果為:(A1,A2,C1),(A4,A5,C2)。

  • SKIP_TO_FIRST: 當匹配到某個模式的開始事件時,如果在匹配過程中遇到無法匹配的事件,它會跳到模式中第一個匹配事件之后繼續處理。

    目的:處理和過濾事件流,修改起始點

    具體體現:一旦模式匹配到定義了SKIP_TO_FIRST的相應階段,就會跳過之前的階段,直接從指定的模式處(a2)開始處理后續事件,但已經匹配到的序列仍然被認為是有效的

    舉例

    假設事件序列是 (a1 a2 a3 b),并且你使用 SKIP_TO_FIRST 策略,以 a2SKIP_TO_FIRST指定的開始事件。若中途遇到無法匹配的,它會跳到模式中第一個匹配事件(也就是 a2)之后繼續處理,相當于只留下 a2 開始的匹配【初始時,已經匹配到的序列(a1 a2 a3 b)中a1仍然被認為是有效的。遇到無法匹配后,起始點從a1變為a2,此后就從a2開始,前面的a1就不在考慮了】。最終得到的匹配順序是(a1 a2 a3 b),(a2 a3 b),(a2 b)。若未遇到不匹配現象,則順序為(a1 a2 a3 b)。

  • SKIP_TO_LAST: 跳過模式的開始事件的所有匹配,從模式的最后一個事件之后的事件開始新的匹配。

    目的:避免重復匹配

    具體體現:跳過所有從模式開始的事件,而從模式中的最后一個事件之后的位置繼續處理后續事件。

    舉例

    假設事件序列是 (a1 a2 a3 b),并且你使用 SKIP_TO_LAST 策略,找到 a1 開始的匹配(a1 a2 a3 b)后,跳過所有 a1、a2 開始的匹配,跳到以最后一個 a(也就是 a3)為開始的匹配。最終得到(a1 a2 a3 b),(a3 b)。其中若未出現不匹配現象,則最終得到(a1 a2 a3 b)。

【溫馨提示】當使用SKIP_TO_FIRSTSKIP_TO_LAST策略時,需要指定一個合法的PatternName。

應用基本格式

// 定義跳過策略
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("stage-2");Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0",skipStrategy) // 添加跳過策略.where(...)

1.4. 有效時間約束(within)

可以為模式定義一個有效時間約束。 例如,你可以通過pattern.within()方法指定一個模式應該在10秒內發生。 這種時間模式支持處理時間和事件時間。

【溫馨提示】一個模式序列只能有一個時間限制。如果限制了多個時間在不同的單個模式上,會使用最小的那個時間限制。

next.within(Time.seconds(10))

錯誤總結

Option not applicable to NOT pattern : 量詞不和not連用

Optional pattern cannot be preceded by greedy pattern :greedy 之后不能接 optional

NotFollowedBy is not supported as a last part of a Pattern:notFollowedBy不能是規則的最后部分

2、模式的檢測處理

一、Pattern檢測(將模式應用于流上)

含義:將匹配規則應用到事件流(由一個或多個簡單事件構成)上,檢測滿足規則的復雜事件

為了在事件流上運行所創建的模式(rule),需要創建一個patStream。 給定一個輸入流input,一個模式rule, 可以通過調用如下方法來創建patStream:

PatternStream<UserAction> patStream = CEP.pattern(stream, rule);

二、處理匹配事件

含義:創建 patStream 之后,就可以應用 select 或者 flatselect 方法,從檢測到的事件序列中提取事件。

  • select() 方法需要輸入一個 PatternSelectFunction 作為參數,返回一個結果。
  • flatselect() 方法需要輸入一個 PatternFlatSelectFunction 作為參數,返回任意多個結果。

select() 以一個 Map[String,Iterable [IN]] 來接收匹配到的事件序列,其中 【key】 就是每個【模式名稱】,而 【value】 就是所有接收到的【所有事件】的 Iterable 類型

patStream.select(new PatternSelectFunction<TemperatureEvent, String>() {// key:模式;value:事件@Overridepublic String select(Map<String, List<TemperatureEvent>> pattern) throws Exception {List<TemperatureEvent> events = pattern.get("stage-0"); // 根據key來獲取value// 業務:獲取事件的第一條和最后一條List<TemperatureEvent> sortedEvents = events.stream().sorted((a, b) -> (int) (a.getTimestamp() - b.getTimestamp())) // 排序.collect(Collectors.toList());TemperatureEvent first = sortedEvents.get(0); // 第一條事件TemperatureEvent last = sortedEvents.get(sortedEvents.size() - 1); // 最后一條事件double diff = (last.getTimestamp() - first.getTimestamp()) / 1000.0;return String.format("%s %s %.2f",first.getDistrict(),last.getDistrict(),diff);}
})

三、超時事件提取

應用原因:當一個模式通過within關鍵字定義了檢測窗口時間時,部分事件序列可能因為超過窗口長度被丟棄,為了能夠處理這些超時的部分匹配,select和flatSelectAPI調用允許指定超時處理程序。

Pattern<UserLoginEvent, ?> pattern = Pattern.<UserLoginEvent>begin("start")  .where(event -> !event.isSuccess()) // 第一個事件是登錄失敗  .next("middle").where(new SimpleCondition<UserLoginEvent>() {  @Override  public boolean filter(UserLoginEvent event) {  return !event.isSuccess(); // 中間事件也是登錄失敗  }  })  .oneOrMore() // 表示“middle”模式可以重復一次或多次  .consecutive() // 強調這些事件必須是連續的  .next("end").where(new SimpleCondition<UserLoginEvent>() {  @Override  public boolean filter(UserLoginEvent event) {  return !event.isSuccess(); // 最后一個事件也是登錄失敗  }  })  .within(Time.seconds(10)); // 所有這些事件必須在10秒內發生(時間的限制)

解決方案:超時處理程序會被接收到目前為止由模式匹配到的所有事件,由一個OutputTag定義接收到的超時事件序列(超時事件會放到另外【側輸出流】中)。

// Pattern檢測
PatternStream<Event> patStream = CEP.pattern(stream, rule);// 側輸出流
OutputTag<String> outputTag = new OutputTag<String>("late-data"){};// 處理模式流并捕獲遲到數據
SingleOutputStreamOperator<ComplexEvent> result = patStream.sideOutputLateData(outputTag).select(new PatternSelectFunction<Event, ComplexEvent>() {...});// 獲取側輸出流
DataStream<String> lateData = result.getSideOutput(outputTag);

3、實際應用

實例1:監測某地的溫度并告警

監測某地的溫度在一小時內連續三次大于設定溫度,則進行告警處理。

步驟一:定義事件:溫度數據
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;@Data
@AllArgsConstructor
@ToString
// 模擬天氣數據
public class TemperatureEvent implements Serializable {private String mechineName; // 區域private Double temperature; // 溫度private Long timestamp; // 數據生成時間
}
步驟二:創建Flink環境并配置CEP
import modules.env.Environments;
import modules.time.Timer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;import java.util.Arrays;
import java.util.List;
import java.util.Map;// 監測某地的溫度在一小時內連續三次大于設定溫度,則進行告警處理
public class TempCEP {public static void main(String[] args) throws Exception {// 創建環境StreamExecutionEnvironment see = new Environments().build().enableCheckpoint("file:///D:/phase/flink_state_backend", 3, 1, 1).enableRetries(3, 1).enableStateBackend("hashmap", true, false).finish(RuntimeExecutionMode.STREAMING, 1, 3);// 模擬事件SingleOutputStreamOperator<TemperatureEvent> stream = see.fromCollection(Arrays.asList(new TemperatureEvent("江寧1", 33.0, 1723626529138L),new TemperatureEvent("江寧2", 36.0, 1723626529569L),new TemperatureEvent("江寧3", 43.0, 1723626531234L),new TemperatureEvent("江寧4", 18.0, 1723626532996L),new TemperatureEvent("江寧5", 39.0, 1723626533247L),new TemperatureEvent("江寧6", 41.0, 1723626533888L),new TemperatureEvent("江寧7", 41.0, 1723626534096L),new TemperatureEvent("江寧8", 42.0, 1723626535011L),new TemperatureEvent("江寧9", 42.0, 1723626535011L),new TemperatureEvent("江寧10", 41.0, 1723626535011L),new TemperatureEvent("江寧11", 52.0, 1723626535555L))).assignTimestampsAndWatermarks(// 生成單調遞增的水印,其中10 表示最大允許的延遲時間(即水印的間隔時間)【毫秒】Timer.monotonous(60));// 設定預警溫度double TEMPERATURE_SETTING = 40;// 定義CEP模式Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-1").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() > TEMPERATURE_SETTING;}}).next("stage-2").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() > TEMPERATURE_SETTING;}}).next("stage-3").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() > TEMPERATURE_SETTING;}}).within(Time.hours(1));// 在模式匹配時執行操作PatternStream<TemperatureEvent> patStream = CEP.pattern(stream, rule);patStream.select(new PatternSelectFunction<TemperatureEvent, String>() {@Overridepublic String select(Map<String, List<TemperatureEvent>> pattern) throws Exception {String district1 = pattern.get("stage-1").get(0).getDistrict();String district2 = pattern.get("stage-2").get(0).getDistrict();String district3 = pattern.get("stage-3").get(0).getDistrict();Double first = pattern.get("stage-1").get(0).getTemperature();Double second = pattern.get("stage-2").get(0).getTemperature();Double third = pattern.get("stage-3").get(0).getTemperature();return String.format("告警: 地點 %s -> %s -> %s,溫度在一小時內連續三次超過 40 度!,分別是%.2f度,%.2f度,%.2f度",district1,district2,district3,first,second,third);}}).print();see.execute("CEP_TEMPERATURE");}
}

結果展示

案例2:異常檢測 - 機箱溫度檢測

需求:同一個機箱連續兩次溫度超標,報警
拓展需求:鍋爐房溫度檢測;信用卡反欺詐:連續大額消費;反作弊:同一個用戶短時間內連續登陸失敗

  • flink cep
  • pattern定義
  • pattern匹配
  • select 選出匹配到的事件,報警
步驟一:定義事件:機箱數據
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;import java.io.Serializable;@Data
@AllArgsConstructor
@ToString
// 模擬機箱數據
public class TemperatureEvent implements Serializable {private String chassis; // 機箱private Double temperature; // 溫度
}
步驟二:具體情況分析
1)連續三次高于26度
package cases.test05;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;
import java.util.List;
import java.util.Map;public class Detection {public static void main(String[] args) throws Exception {// 環境配置StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 模擬機箱的數據DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),new TemperatureEvent("xyz", 20.1),new TemperatureEvent("xyz", 21.1),new TemperatureEvent("xyz", 22.2),new TemperatureEvent("xyz", 22.1),new TemperatureEvent("xyz", 26.3),new TemperatureEvent("xyz", 28.1),new TemperatureEvent("xyz", 27.4),new TemperatureEvent("xyz", 22.3),new TemperatureEvent("xyz", 26.7),new TemperatureEvent("xyz", 27.0))// 設置水位線.assignTimestampsAndWatermarks(WatermarkStrategy.<TemperatureEvent>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<TemperatureEvent>() {@Overridepublic long extractTimestamp(TemperatureEvent temperatureEvent, long l) {return System.currentTimeMillis();}}));// 連續三次高于26度Pattern<TemperatureEvent,?> warningPattern = Pattern.<TemperatureEvent>begin("fgt261").where(new SimpleCondition<TemperatureEvent>() {private static final long serialVersionUID = 1L;@Overridepublic boolean filter(TemperatureEvent value) throws Exception {return value.getTemperature()>=26;}}).next("fgt262").where(new SimpleCondition<TemperatureEvent>() {private static final long serialVersionUID = 1L;@Overridepublic boolean filter(TemperatureEvent value) throws Exception {return value.getTemperature()>=26;}}).next("fgt263").where(new SimpleCondition<TemperatureEvent>() {private static final long serialVersionUID = 1L;@Overridepublic boolean filter(TemperatureEvent value) throws Exception {return value.getTemperature()>=26;}});DataStream<String> patternStream = CEP.pattern(inputEventStream,warningPattern).select(new PatternSelectFunction<TemperatureEvent, String>() {@Overridepublic String select(Map<String, List<TemperatureEvent>> pattern) throws Exception {Double first = pattern.get("fgt261").get(0).getTemperature();Double second = pattern.get("fgt262").get(0).getTemperature();Double third = pattern.get("fgt263").get(0).getTemperature();return String.format("告警: 機箱連續三次高于26度,分別是%.2f度,%.2f度,%.2f度",first,second,third);}});patternStream.print();env.execute("Detection");}
}

結果展示

2)3秒內3次以上平均溫度超過26度
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;
import java.util.List;
import java.util.Map;public class Detection {public static void main(String[] args) throws Exception {// 環境配置StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 模擬機箱的數據DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),new TemperatureEvent("xyz", 20.1),new TemperatureEvent("xyz", 21.1),new TemperatureEvent("xyz", 22.2),new TemperatureEvent("xyz", 22.1),new TemperatureEvent("xyz", 22.3),new TemperatureEvent("xyz", 25.1),new TemperatureEvent("xyz", 27.4),new TemperatureEvent("xyz", 22.3),new TemperatureEvent("xyz", 26.7),new TemperatureEvent("xyz", 27.0),new TemperatureEvent("xyz", 24.4),new TemperatureEvent("xyz", 22.3),new TemperatureEvent("xyz", 21.0))// 設置水位線.assignTimestampsAndWatermarks(WatermarkStrategy.<TemperatureEvent>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<TemperatureEvent>() {@Overridepublic long extractTimestamp(TemperatureEvent temperatureEvent, long l) {return System.currentTimeMillis();}}));// 3秒內3次以上平均溫度超過26度Pattern<TemperatureEvent,?> warningPattern = Pattern.<TemperatureEvent>begin("fgt261") // 3秒內至少3次.timesOrMore(3).where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent temperatureEvent) throws Exception {return temperatureEvent.getTemperature()>26;}}).within(Time.seconds(3)).next("fgt262") // 平均溫度超過26度.where(new IterativeCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {int count = 0;double avg = 0;for (TemperatureEvent te : ctx.getEventsForPattern("fgt261")) {count++;avg += te.getTemperature();}return count>=3 && avg/count>=26;}});DataStream<String> patternStream = CEP.pattern(inputEventStream,warningPattern).select(new PatternSelectFunction<TemperatureEvent, String>() {@Overridepublic String select(Map<String, List<TemperatureEvent>> pattern) throws Exception {return pattern.toString();}});patternStream.print();env.execute("Detection");}
}

結果展示

案例3:登錄事件異常檢測

同一個用戶連續兩次登陸失敗,報警

  • flink cep
  • pattern定義
  • pattern匹配
  • select輸出報警事件
//需求: 如果同一個userid在三秒之內連續兩次登陸失敗,報警。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);// 這里mock了事件流,這個事件流一般從Kafka過來
DataStream<LoginEvent> loginEventStream = env.fromCollection(Arrays.asList(  //自定義一個pojo類:userId、ip、typenew LoginEvent("1", "192.168.0.1", "fail"),new LoginEvent("1", "192.168.0.2", "fail"),new LoginEvent("1", "192.168.0.3", "fail"),new LoginEvent("2", "192.168.10.10", "success")
));Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("start")//泛型類或泛型接口上的泛型形參是不能用于靜態成員的,那么當靜態方法需要用到泛型時,只能用泛型方法。.where(new IterativeCondition<LoginEvent>() { // 模式開始事件的匹配條件為事件類型為fail, 為迭代條件@Overridepublic boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {return loginEvent.getType().equals("fail");}}).next("next").where(new IterativeCondition<LoginEvent>() { // 事件的匹配條件為事件類型為fail@Overridepublic boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {return loginEvent.getType().equals("fail");}}).within(Time.seconds(3));// 要求緊鄰的兩個事件發生的時間間隔不能超過3秒鐘// 以userid分組,形成keyedStream,然后進行模式匹配   ::方法引用
PatternStream<LoginEvent> patternStream = 	CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);DataStream<LoginWarning> loginFailDataStream = patternStream.select((Map<String, List<LoginEvent>> pattern) -> {List<LoginEvent> first = pattern.get("start");List<LoginEvent> second = pattern.get("next");return new LoginWarning(first.get(0).getUserId(), first.get(0).getIp(), first.get(0).getType());}
);loginFailDataStream.print();
env.execute();

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/87107.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/87107.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/87107.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

容器基礎知識2-K8s 和 Docker 的關系與管理邏輯詳解

K8s 和 Docker 的關系與管理邏輯詳解 一、先搞懂&#xff1a;Docker 和 K8s 分別是做什么的&#xff1f; Docker&#xff08;容器工具&#xff09;&#xff1a;好比「集裝箱工廠」&#xff0c;負責把應用和依賴打包成標準化容器&#xff08;類似集裝箱&#xff09;&#xff0…

QT MaintenanceTool 登錄無法找到 QtAccount 憑據

親測有效&#xff1a;QT6 Maintenance Tool 登錄問題_qt6 maintenancetool-CSDN博客 將ini這個配置文件移出文件夾后&#xff0c;在切換自己賬戶登錄即可

華為云Flexus+DeepSeek征文|利用華為云一鍵部署 Dify 平臺并接入 DeepSeek 大模型,構建長篇文章生成助手

目錄 前言 1 華為云一鍵部署 Dify 平臺 1.1 華為云 Dify 平臺介紹 1.2 部署過程介紹 1.3 登錄 Dify 平臺 2 接入華為云 ModelArts Studio 中的 DeepSeek 大模型 3 構建長篇文章生成助手 3.1 簡要介紹長篇文章生成助手 3.2 開始節點 3.3 生成標題和大綱&#xff08;LL…

js的一些基礎概念總結

1.變量聲明 首先js變量聲明有三種&#xff0c;var&#xff0c;const&#xff0c;let&#xff0c;這三種變量聲明中我們第一優先使用const&#xff0c;需要改變這個值的時候我們用ley&#xff0c;var是盡量不去使用。 那么我們現在來總結一下三種聲明變量的區別。首先是var let …

防 XSS和CSRF 過濾器(Filter)

會話管理存在問題&#xff1a; 1.服務集群部署或者是分布式服務如何實現會話共享 2.會話的不同存儲地方的安全性問題 答&#xff1a; 會話共享 可以使用后端集中管理(redis)或者客戶端管理 &#xff08;jwt&#xff09;&#xff1b; 存儲安全性 這個還真的沒有太好的方式&…

鴻蒙容器組件 WaterFlow、FlowItem解析:動態瀑布流布局實踐

一、引言&#xff1a;不規則布局的智能化解決方案 在圖片社交、電商導購、資訊聚合等現代應用場景中&#xff0c;瀑布流布局以其靈活的空間利用率和自然的視覺流動感成為界面設計的重要選擇。鴻蒙提供的 WaterFlow 與 FlowItem 組件&#xff0c;通過智能布局算法與聲明式語法&…

概率密度基本概念

概率密度&#xff08;Probability Density&#xff09;是概率論中用于描述隨機變量分布的一種方式&#xff0c;特別適用于連續隨機變量。它并不是一個概率值&#xff0c;而是表示單位范圍內的概率大小或“濃度”。更具體地說&#xff0c;概率密度表示在某個特定值附近&#xff…

10-1 MySQL 索引優化與查詢優化

10-1 MySQL 索引優化與查詢優化 文章目錄 10-1 MySQL 索引優化與查詢優化1. 數據準備2. 索引失效案例2.1 索引字段&#xff1a;全值匹配最優2.2 索引字段&#xff1a;最佳左前綴法則2.3 主鍵插入順序2.4 索引字段進行了&#xff1a;計算、函數、類型轉換(自動或手動)導致索引失…

基于目標驅動的分布式敏捷開發

研究結論 風險對項目目標的影響 時間目標&#xff1a;需求管理不當&#xff08;如需求優先級不明確、多產品負責人需求沖突&#xff09;、架構變更導致的返工、跨站點協調問題&#xff08;如第三方依賴、通信基礎設施不足&#xff09;是影響項目時間的主要風險因素。質量目標&…

高通手機跑AI系列之——穿衣試裝算法

環境準備 手機 測試手機型號&#xff1a;Redmi K60 Pro 處理器&#xff1a;第二代驍龍8移動--8gen2 運行內存&#xff1a;8.0GB &#xff0c;LPDDR5X-8400&#xff0c;67.0 GB/s 攝像頭&#xff1a;前置16MP后置50MP8MP2MP AI算力&#xff1a;NPU 48Tops INT8 &&…

opencv入門(5)圖像像素的讀寫操作和算術運算

文章目錄 1 圖像遍歷與修改1.1 使用數組1.2 使用指針 2 圖像的算術運算2.1 一般算術操作2.2 算術API 1 圖像遍歷與修改 C中支持 數組遍歷 和 指針方式遍歷 1.1 使用數組 訪問使用 image.at(row,col) 進行訪問 如果是單通道灰度圖&#xff0c;就使用image.at進行讀取 如果是三…

Stable Diffusion入門-ControlNet 深入理解-第三課:結構類模型大揭秘——深度、分割與法線貼圖

大家好,歡迎回到Stable Diffusion入門-ControlNet 深入理解系列的第三課! 在上一課中,我們深入探討了 ControlNet 文件的命名規則,以及線條類 ControlNet模型的控制方法。如果你還沒有看過第二篇,趕緊點這里補課:Stable Diffusion入門-ControlNet 深入理解 第二課:Contr…

噴油嘴深凹槽內輪廓測量的方法探究 —— 激光頻率梳 3D 輪廓測量

引言 噴油嘴作為燃油噴射系統核心部件&#xff0c;其深凹槽內輪廓精度直接影響燃油霧化效果與發動機排放性能。噴油嘴深凹槽具有深徑比大&#xff08;可達 30:1&#xff09;、孔徑小&#xff08;φ0.5 - 2mm&#xff09;、表面質量要求高&#xff08;Ra≤0.2μm&#xff09;等…

上證ETF50期權交易規則一文詳解

50ETF期權&#xff0c;首先這是期權交易&#xff0c;所以50ETF期權有期權交易的所有特征&#xff0c;其次&#xff0c;50ETF期權的標的對象是上證50&#xff0c;所以50ETF&#xff08;認購看漲&#xff09;期權的走勢和上證50的走勢是一樣的。 行權時間&#xff1a; 在行權日當…

Oracle獲取執行計劃之10046 技術詳解

Oracle 的 10046 事件是性能調優中最常用的工具之一&#xff0c;通過跟蹤會話的 SQL 執行細節&#xff0c;生成包含執行計劃、等待事件、綁定變量等信息的跟蹤文件&#xff0c;幫助定位性能瓶頸。以下是技術詳解&#xff1a; 一、10046 事件基礎 10046 是 Oracle 內部事件&…

Linux 日志監控工具對比:從 syslog 到 ELK 實戰指南

更多云服務器知識&#xff0c;盡在hostol.com 你有沒有被 Linux 上滿屏飛滾的日志整崩潰過&#xff1f;看著 /var/log 目錄越來越肥&#xff0c;關鍵日志像大海撈針一樣藏在里面&#xff0c;每次出故障就像拆盲盒&#xff0c;賭你能不能第一眼看出問題。 日志系統&#xff0c…

本地服務器部署后外網怎么訪問不了?內網地址映射互聯網上無法連接問題的排查

我的網站部署搭建在本地服務器上的&#xff0c;在內網可以正常訪問&#xff0c;但是外網無法訪問&#xff0c;該怎么排查&#xff1f;局域網內部經過路由器的&#xff0c;有設置了虛擬服務器轉發規則&#xff0c;在互聯網公網上還是無法訪問服務器怎么辦&#xff1f;相信很多人…

如何免費正確安裝微軟的office全家桶

記錄一下如何正確安裝微軟的office全家桶 找到安裝包傻瓜式安裝 找到安裝包 安裝包在附件&#xff0c;大家可以自行進行下載 傻瓜式安裝 操作一目了然&#xff0c;點你需要的就行了

論文閱讀:BLIPv1 2022.2

文章目錄 一、研究背景與問題現有方法的局限性研究目標 二、核心方法與創新點多模態編碼器 - 解碼器混合架構&#xff08;MED&#xff09;標題生成與過濾&#xff08;CapFilt&#xff09;數據自舉方法 三、實驗與結果數據集與訓練配置關鍵實驗發現與 state-of-the-art 方法的對…

630,百度文心大模型4.5系列開源!真香

2025年被普遍認為是AI Agent商業化的關鍵之年&#xff0c;而大模型正是Agent能力的核心支撐。 當開發成本大幅降低&#xff0c;我們很可能看到各種垂直領域的Agent應用如雨后春筍般涌現。 技術普惠的現實意義對于廣大AI創業者和開發者來說&#xff0c;這無疑是個好消息。 之…