Flink CEP是什么?

Apache Flink 的 CEP(Complex Event Processing,復雜事件處理) 是 Flink 提供的一個庫,用于在無界數據流中檢測符合特定模式的事件組合。


🎯 一、什么是 CEP?

? 定義:

CEP 是一種從連續的數據流中識別出符合預設模式(Pattern)的事件組合的技術。

它可以用來實現:

  • 用戶行為分析(如“登錄 → 加入購物車 → 放棄支付”)
  • 異常檢測(如“連續失敗請求超過3次”)
  • 風控規則匹配(如“短時間內多次轉賬”)

🧠 二、CEP 的核心概念

概念描述
Pattern定義你想要匹配的事件序列規則
PatternStream表示匹配到的事件流
Event Stream原始輸入的數據流
Time Limit設置模式匹配的時間窗口(例如:10秒內完成一系列操作)
Quantifier控制事件出現的次數(如 oneOrMore, times(n), within() 等)

🔍 三、Flink CEP 的工作流程圖解

原始事件流↓
[ Pattern API ] → 定義模式(如 A → B → C)↓
PatternStream → 匹配成功的事件組合↓
處理邏輯(如報警、記錄日志等)

📦 四、Flink CEP 核心組件

1. Pattern<Event, ?>

定義事件匹配規則,例如:

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {@Overridebegin方法詳解public boolean filter(Event event) {return event.getType().equals("登錄");}}).next("middle").where(new SimpleCondition<Event>() {public boolean filter(Event event) {return event.getType().equals("加入購物車");}}).within(Time.seconds(10)); // 在10秒內完成整個流程

2. PatternStream<Event>

將原始流與 Pattern 關聯,得到匹配結果:

PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);

3. select / process 操作

對匹配成功的事件進行處理:

patternStream.select(new PatternSelectFunction<Event, String>() {@Overridepublic String select(Map<String, List<Event>> patternMap) throws Exception {Event start = patternMap.get("start").get(0);Event middle = patternMap.get("middle").get(0);return "用戶行為路徑匹配: " + start + " -> " + middle;}
}).print();

🧪 五、Java 示例代碼演示

示例目標:

檢測“連續三次登錄失敗”的用戶行為

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;public class FlinkCEPExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 模擬輸入事件流DataStream<Event> eventStream = env.fromElements(new Event("userA", "登錄失敗", 1000L),new Event("userB", "登錄成功", 1500L),new Event("userA", "登錄失敗", 2000L),new Event("userA", "登錄失敗", 3000L),new Event("userA", "登錄成功", 4000L));// 定義 CEP 模式:連續3次登錄失敗(時間窗口為10秒)Pattern<Event, ?> pattern = Pattern.<Event>begin("first").where(new SimpleCondition<Event>() {@Overridepublic boolean filter(Event event) {return event.getType().equals("登錄失敗");}}).times(3).within(Time.seconds(10));// 將模式應用到事件流上PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);// 輸出匹配到的事件patternStream.select(new PatternSelectFunction<Event, String>() {@Overridepublic String select(Map<String, List<Event>> patternMap) throws Exception {List<Event> events = patternMap.get("first");return "發現異常行為!用戶 [" + events.get(0).userId + "] 連續3次登錄失敗";}}).print();env.execute("Flink CEP Example");}// 事件類public static class Event {public String userId;public String type;public long timestamp;public Event(String userId, String type, long timestamp) {this.userId = userId;this.type = type;this.timestamp = timestamp;}public String getType() {return type;}public String getUserId() {return userId;}@Overridepublic String toString() {return "{" + "\"userId\":\"" + userId + "\", \"type\":\"" + type + "\", \"timestamp\":" + timestamp + "}";}}
}

📈 六、運行結果示例

發現異常行為!用戶 [userA] 連續3次登錄失敗

表示 userA 在 10 秒內連續出現了 3 次 “登錄失敗” 的行為,觸發了 CEP 規則。


?? 七、常用 Pattern 條件和匹配方式

方法描述
.begin("name")開始一個新的模式
.where(condition)添加一個條件
.times(n)匹配 n 次
.oneOrMore()匹配至少一次
.greedy()貪婪匹配(盡可能多匹配)
.followedBy("name")非嚴格近鄰(允許中間有其他事件)
.notFollowedBy("name")排除某個事件
.within(Time.time)設置模式匹配的最大時間窗口

🧩 八、CEP 的應用場景

場景描述
風控系統檢測欺詐行為、異常交易
用戶行為分析識別漏斗轉化率、用戶流失路徑
IoT 設備監控檢測設備故障前的行為序列
運維監控檢測服務調用鏈中的異常順序
安全審計檢測非法操作組合(如“登錄失敗→嘗試訪問敏感資源”)

? 九、CEP 使用建議

建議說明
時間窗口設置合理太大會影響性能,太小可能漏掉有效模式
合理使用 greedy 模式避免重復匹配或遺漏
與 Watermark 結合使用確保事件時間語義正確
限制狀態大小防止狀態無限增長(可使用 withStateCleaning(true)
使用側輸出處理未匹配事件可選,用于調試或補救機制

📌 十、總結

特性描述
名稱Flink CEP
功能流式數據中識別事件模式
輸入無界流
輸出匹配到的事件組合
適用場景用戶行為分析、風控、安全審計等
依賴庫flink-cepflink-cep-java

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/81844.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/81844.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/81844.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ARM (Attention Refinement Module)

ARM模塊【來源于BiSeNet】&#xff1a;細化特征圖的注意力&#xff0c;增強重要特征并抑制不重要的特征。 Attention Refinement Module (ARM) 詳解 ARM (Attention Refinement Module) 是 BiSeNet 中用于增強特征表示的關鍵模塊&#xff0c;它通過注意力機制來細化特征圖&…

AR0144CSSC20SUKA0-CRBR——1/4英寸 1.0 MP 高性能CMOS圖像傳感器解析

產品概述&#xff1a; AR0144CSSC20SUKA0-CRBR 是一款1/4 英寸&#xff0c;1.0 Mp CMOS 數字圖像傳感器&#xff0c;帶有 1280H x 800V 有效像素陣列 全局快門CMOS數字圖像傳感器&#xff0c;它結合了新型的創新全局快門像素設計&#xff0c;適用于準確快速的移動場景捕捉。該…

深入理解遞歸算法:Go語言實現指南

深入理解遞歸算法&#xff1a;Go語言實現指南 引言 遞歸是編程中一種優雅而強大的算法思想&#xff0c;通過函數自我調用的方式解決復雜問題。本文將使用Go語言演示遞歸的核心原理&#xff0c;并通過典型示例幫助開發者掌握這一重要技術。 一、遞歸基礎概念 1.1 遞歸定義 遞歸…

vue2實現【瀑布流布局】

瀑布流 1. 解釋2. 形成結構和樣式3. 自定義指令 1. 解釋 瀑布流特征&#xff1a; 等寬不等高&#xff1a;元素寬度固定&#xff0c;高度根據內容自適應。錯落排列&#xff1a;元素像瀑布一樣從上到下依次填充&#xff0c;自動尋找最短列插入 體現&#xff1a;圖中第一排1&…

CSS display有幾種屬性值

在 CSS 中&#xff0c;display 屬性是控制元素布局和渲染方式的核心屬性之一。它有多種屬性值&#xff0c;每個值都決定了元素在文檔流中的表現形式。以下是 display 的主要屬性值分類及說明&#xff1a; 1. 塊級和行內布局 塊級元素 (block) 特性&#xff1a;獨占一行&…

基于Java實現可靠傳輸

實現可靠傳輸 1. 結合代碼和 LOG 文件分析針對每個項目舉例說明解決效果。 RDT1.0 對應 Log 日志&#xff1a;Log 1.0.txt&#xff0c;接收文件 recvData 1.0.txt RDT1.0 版本是在可靠信道上進行可靠的數據傳輸&#xff0c;因此沒有過多的內容需要說明&#xff0c;發送方 L…

機器學習10-隨機森林

隨機森林學習筆記 一、隨機森林簡介 隨機森林&#xff08;Random Forest&#xff09;是一種集成學習算法&#xff0c;基于決策樹構建模型。它通過組合多個決策樹的結果來提高模型的準確性和穩定性。隨機森林的核心思想是利用“集成”的方式&#xff0c;將多個弱學習器組合成一…

LeetCode 438. 找到字符串中所有字母異位詞 | 滑動窗口與字符計數數組解法

文章目錄 問題描述核心思路&#xff1a;滑動窗口 字符計數數組1. 字符計數數組2. 滑動窗口 算法步驟完整代碼實現復雜度分析關鍵點總結類似問題 問題描述 給定兩個字符串 s 和 p&#xff0c;要求找到 s 中所有是 p 的**字母異位詞&#xff08;Anagram&#xff09;**的子串的起…

idea中,git的cherry-pick怎么用

背景: A同學在A分支進行開發, B同學在B分支進行開發,B同學開發過程中發現,A同學在A分支上面的某次提交,例如某次提交了一個工具類,B同學也用的到這個工具類,但是B又不想mergeA分支的代碼,此時就可以用到git的chery pick能力.

深入解析:如何基于開源OpENer開發EtherNet/IP從站服務

一、EtherNet/IP協議概述 EtherNet/IP(Industrial Protocol)是一種基于以太網的工業自動化通信協議,它將CIP(Common Industrial Protocol)封裝在標準以太網幀中,通過TCP/IP和UDP/IP實現工業設備間的通信。作為ODVA(Open DeviceNet Vendors Association)組織的核心協議…

當 PyIceberg 和 DuckDB 遇見 AWS S3 Tables:打造 Serverless 數據湖“開源夢幻組合”

引言 在一些大數據分析場景比如電商大數據營銷中&#xff0c;我們需要快速分析存儲海量用戶行為數據&#xff08;如瀏覽、加購、下單&#xff09;&#xff0c;以進行用戶行為分析&#xff0c;優化營銷策略。傳統方法依賴 Spark/Presto 集群或 Redshift 查詢 S3 上的 Parquet/O…

流復備機斷檔處理

文章目錄 環境癥狀問題原因解決方案 環境 系統平臺&#xff1a;UOS&#xff08;海光&#xff09;,UOS &#xff08;飛騰&#xff09;,UOS&#xff08;鯤鵬&#xff09;,UOS&#xff08;龍芯&#xff09;,UOS &#xff08;申威&#xff09;,銀河麒麟svs&#xff08;X86_64&…

【藍橋杯真題精講】第 16 屆 Python A 組(省賽)

文章目錄 T1 偏藍 (5/5)T2 IPv6 (0/5)T3 2025 圖形 (10/10)T4 最大數字 (10/10)T5 倒水 (15/15)T6 拼好數 (0/15)T7 登山 (20/20)T8 原料采購 (20/20) 更好的閱讀體驗 高速訪問&#xff1a;https://wiki.dwj601.cn/ds-and-algo/lan-qiao-cup/16th-python-a/永久鏈接&#xff1…

SpringBoot+Dubbo+Zookeeper實現分布式系統步驟

SpringBootDubboZookeeper實現分布式系統 一、分布式系統通俗解釋二、環境準備&#xff08;詳細版&#xff09;1. 軟件版本2. 安裝Zookeeper&#xff08;單機模式&#xff09; 三、完整項目結構&#xff08;帶詳細注釋&#xff09;四、手把手代碼實現步驟1&#xff1a;創建父工…

Spring的業務層,持久層,控制層的關系

在 Spring 框架中&#xff0c;控制層&#xff08;Controller&#xff09;、業務層&#xff08;Service&#xff09; 和 持久層&#xff08;Repository/Mapper&#xff09; 是分層架構的核心組成部分&#xff0c;職責分離明確&#xff0c;通過依賴注入&#xff08;DI&#xff09…

css實現不確定內容的高度過渡

實現效果&#xff1a;鼠標懸浮按鈕&#xff0c;高度過渡出現如圖所示文本框 代碼&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-widt…

計算機視覺與深度學習 | matlab實現ARIMA-WOA-CNN-LSTM時間序列預測(完整源碼和數據)

以下是一個基于MATLAB的ARIMA-WOA-CNN-LSTM時間序列預測框架。由于完整代碼較長,此處提供核心模塊和實現思路,完整源碼和數據可通過文末方式獲取。 1. 數據準備(示例數據) 使用MATLAB內置的航空乘客數據集: % 加載數據 data = readtable(airline-passengers.csv); data …

在 Excel 中使用東方仙盟軟件————仙盟創夢IDE

安裝插件 用仙盟創夢編寫插件代碼 源碼 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using ExcelDna.Integration;namespace 東方仙盟.仙盟創夢IDE_招標系統 {public static class 仙盟創夢_招標專…

Sql刷題日志(day9)

一、筆試 1、limit offset&#xff1a;分頁查詢 SELECT column1, column2, ... FROM table_name LIMIT number_of_rows OFFSET start_row; --跳過前 start_row 行&#xff0c;返回接下來的 number_of_rows 行。 2、lag、lead&#xff1a;查詢前后行數據 --lag函數用于訪問當…

C++面試3——const關鍵字的核心概念、典型場景和易錯陷阱

const關鍵字的核心概念、典型場景和易錯陷阱 一、const本質&#xff1a;類型系統的守護者 1. 與#define的本質差異 維度#defineconst編譯階段預處理替換編譯器類型檢查作用域無作用域&#xff08;全局污染&#xff09;遵循塊作用域調試可見性符號消失保留符號信息類型安全無類…