【Flink銀行反欺詐系統設計方案】1.短時間內多次大額交易場景的flink與cep的實現

【flink應用系列】1.Flink銀行反欺詐系統設計方案

  • 1. 經典案例:短時間內多次大額交易
    • 1.1 場景描述
    • 1.2 風險判定邏輯
  • 2. 使用Flink實現
    • 2.1 實現思路
    • 2.2 代碼實現
    • 2.3 使用Flink流處理
  • 3. 使用Flink CEP實現
    • 3.1 實現思路
    • 3.2 代碼實現
  • 4. 總結

1. 經典案例:短時間內多次大額交易

1.1 場景描述

規則1:單筆交易金額超過10,000元。

規則2:同一用戶在10分鐘內進行了3次或更多次交易。

風險行為:同時滿足規則1和規則2的交易行為。

1.2 風險判定邏輯

檢測每筆交易是否滿足“單筆交易金額超過10,000元”。

對同一用戶,統計10分鐘內的交易次數。

如果交易次數達到3次或更多,則判定為風險行為。

2. 使用Flink實現

2.1 實現思路

使用Flink的KeyedStream按用戶分組。

使用ProcessFunction實現自定義窗口邏輯,統計10分鐘內的交易次數。

結合規則1和規則2,判斷是否為風險行為。

2.2 代碼實現

// 定義交易數據POJO
public class Transaction {private String transactionId;private String userId;private Double amount;private Long timestamp;// getters and setters
}// 定義風控結果POJO
public class RiskResult {private String userId;private String transactionId;private String riskLevel;private String actionTaken;private Long createTime;// getters and setters
}// 實現風控邏輯
public class FraudDetectionProcessFunction extends KeyedProcessFunction<String, Transaction, RiskResult> {private transient ValueState<Integer> transactionCountState;private transient ValueState<Long> timerState;@Overridepublic void open(Configuration parameters) {// 初始化狀態ValueStateDescriptor<Integer> countDescriptor = new ValueStateDescriptor<>("transactionCount", Types.INT);transactionCountState = getRuntimeContext().getState(countDescriptor);ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>("timerState", Types.LONG);timerState = getRuntimeContext().getState(timerDescriptor);}@Overridepublic void processElement(Transaction transaction,Context ctx,Collector<RiskResult> out) throws Exception {// 規則1:單筆交易金額超過10,000元if (transaction.getAmount() > 10000) {// 更新交易次數Integer count = transactionCountState.value();if (count == null) {count = 0;}count += 1;transactionCountState.update(count);// 如果是第一次滿足規則1,設置10分鐘的定時器if (count == 1) {long timer = ctx.timestamp() + 10 * 60 * 1000; // 10分鐘ctx.timerService().registerEventTimeTimer(timer);timerState.update(timer);}// 規則2:10分鐘內交易次數達到3次if (count >= 3) {RiskResult result = new RiskResult();result.setUserId(transaction.getUserId());result.setTransactionId(transaction.getTransactionId());result.setRiskLevel("HIGH");result.setActionTaken("ALERT");result.setCreateTime(System.currentTimeMillis());out.collect(result);}}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<RiskResult> out) throws Exception {// 定時器觸發時,重置狀態transactionCountState.clear();timerState.clear();}
}

2.3 使用Flink流處理

java

DataStream<Transaction> transactionStream = env.addSource(transactionSource);DataStream<RiskResult> riskResultStream = transactionStream.keyBy(Transaction::getUserId).process(new FraudDetectionProcessFunction());riskResultStream.addSink(new AlertSink());

3. 使用Flink CEP實現

Flink CEP(Complex Event Processing)是Flink提供的復雜事件處理庫,適合處理基于時間序列的模式匹配。以下是使用Flink CEP實現上述風控規則的示例。

3.1 實現思路

定義模式:檢測10分鐘內3次或更多次大額交易。

使用Flink CEP的模式匹配功能,匹配符合條件的事件序列。

3.2 代碼實現

java

// 定義交易數據POJO
public class Transaction {private String transactionId;private String userId;private Double amount;private Long timestamp;// getters and setters
}// 定義風控結果POJO
public class RiskResult {private String userId;private List<String> transactionIds;private String riskLevel;private String actionTaken;private Long createTime;// getters and setters
}// 實現風控邏輯
public class FraudDetectionCEP {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 交易數據流DataStream<Transaction> transactionStream = env.addSource(transactionSource).assignTimestampsAndWatermarks(WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, timestamp) -> event.getTimestamp()));// 按用戶分組KeyedStream<Transaction, String> keyedStream = transactionStream.keyBy(Transaction::getUserId);// 定義CEP模式:10分鐘內3次或更多次大額交易Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("first").where(new SimpleCondition<Transaction>() {@Overridepublic boolean filter(Transaction transaction) {return transaction.getAmount() > 10000;}}).next("second").where(new SimpleCondition<Transaction>() {@Overridepublic boolean filter(Transaction transaction) {return transaction.getAmount() > 10000;}}).next("third").where(new SimpleCondition<Transaction>() {@Overridepublic boolean filter(Transaction transaction) {return transaction.getAmount() > 10000;}}).within(Time.minutes(10));// 應用模式PatternStream<Transaction> patternStream = CEP.pattern(keyedStream, pattern);// 生成風控結果DataStream<RiskResult> riskResultStream = patternStream.process(new PatternProcessFunction<Transaction, RiskResult>() {@Overridepublic void processMatch(Map<String, List<Transaction>> match,Context ctx,Collector<RiskResult> out) throws Exception {RiskResult result = new RiskResult();result.setUserId(match.get("first").get(0).getUserId());result.setTransactionIds(match.values().stream().flatMap(List::stream).map(Transaction::getTransactionId).collect(Collectors.toList()));result.setRiskLevel("HIGH");result.setActionTaken("ALERT");result.setCreateTime(System.currentTimeMillis());out.collect(result);}});// 輸出結果riskResultStream.addSink(new AlertSink());env.execute("Fraud Detection with Flink CEP");}
}

4. 總結

Flink實現:通過KeyedProcessFunction和狀態管理實現多規則匹配。

Flink CEP實現:通過定義復雜事件模式,簡化多規則匹配的邏輯。

適用場景:

Flink適合需要自定義邏輯的場景。

Flink CEP適合基于時間序列的模式匹配場景。

通過以上實現,可以高效檢測銀行交易中的風險行為,并根據需要擴展更多規則

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/72605.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/72605.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/72605.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C語言——鏈表

大神文獻&#xff1a;https://blog.csdn.net/weixin_73588765/article/details/128356985 目錄 一、鏈表概念 1. 什么是鏈表&#xff1f; 1.1 鏈表的構成 2. 鏈表和數組的區別 數組的特點&#xff1a; 鏈表的特點&#xff1a; 二者對比&#xff1a; 二…

Spring框架自帶的定時任務:Spring Task詳解

文章目錄 一、基本使用1、配置&#xff1a;EnableScheduling2、觸發器&#xff1a;Scheduled 二、拓展1、修改默認的線程池2、springboot配置 三、源碼分析參考資料 一、基本使用 1、配置&#xff1a;EnableScheduling import org.springframework.context.annotation.Config…

數據庫事務、樂觀鎖及悲觀鎖

參考&#xff1a;node支付寶支付及同步、異步通知、主動查詢支付寶訂單狀態 以下容結合上述鏈接查看 1. 什么是數據庫事務&#xff1f; 1.1. 連續執行數據庫操作 在支付成功后&#xff0c;我們在自定義的paidSuccess里&#xff0c;依次更新了訂單狀態和用戶信息。也就說這里…

Android 創建一個全局通用的ViewModel

&#xff08;推薦&#xff09;使用ViewModelStore 代碼示例&#xff1a; class MyApplication : Application(), ViewModelStoreOwner {private val mViewModelStore ViewModelStore()override fun onCreate() {super.onCreate()}override val viewModelStore: ViewModelSto…

SCI期刊推薦 | 免版面費 | 計算機領域:信息系統、軟件工程、自動化和控制

在學術研究領域&#xff0c;選擇合適的SCI期刊對科研成果的傳播與認可至關重要。了解SCI期刊的研究領域和方向是基礎&#xff0c;確保投稿內容與期刊主題相符。同時&#xff0c;要關注期刊的影響因子和評估標準&#xff0c;選擇具有較高影響力和學術認可度的期刊。閱讀期刊的投…

解鎖Android RemoteViews:跨進程UI更新的奧秘

一、RemoteViews 簡介 在 Android 開發的廣闊領域中&#xff0c;RemoteViews 是一個獨特且重要的概念&#xff0c;它為開發者提供了一種在其他進程中顯示視圖結構的有效方式。從本質上講&#xff0c;RemoteViews 并非傳統意義上在當前應用進程內直接渲染和操作的 View&#xf…

常見webshell工具的流量特征

1、蟻劍 1.1、蟻劍webshell靜態特征 蟻劍中php使用assert、eval執行&#xff1b;asp只有eval執行&#xff1b;在jsp使用的是Java類加載&#xff08;ClassLoader&#xff09;&#xff0c;同時會帶有base64編碼解碼等字符特征。 1.2、蟻劍webshell動態特征 查看流量分析會發現…

爬蟲系列之【數據解析之bs4】《四》

目錄 前言 一、用法詳解 1.1 獲取標簽內容 1.2 獲取標簽屬性 1.3 獲取標簽包裹的文本內容 1.4 獲取標簽列表 1.5 css 選擇器&#xff1a;select 二、實戰案例 完整代碼 前言 HTML數據解析 1、正則 2、xpath&#xff08;居多&#xff09; 3、css 選擇器&#xff08;bs…

Java-實現PDF合同模板填寫內容并導出PDF文件

可用于公司用戶合同導出pdf文件 效果圖 一、導入所需要jar包 <!--生成PDF--><dependency><groupId>com.itextpdf</groupId><artifactId>itextpdf</artifactId><version>5.5.11</version></dependency><dependency&…

【人工智能】GPT-4 vs DeepSeek-R1:誰主導了2025年的AI技術競爭?

前言 2025年&#xff0c;人工智能技術將迎來更加激烈的競爭。隨著OpenAI的GPT-4和中國初創公司DeepSeek的DeepSeek-R1在全球范圍內嶄露頭角&#xff0c;AI技術的競爭格局開始發生變化。這篇文章將詳細對比這兩款AI模型&#xff0c;從技術背景、應用領域、性能、成本效益等多個方…

前端開發10大框架深度解析

摘要 在現代前端開發中&#xff0c;框架的選擇對項目的成功至關重要。本文旨在為開發者提供一份全面的前端框架指南&#xff0c;涵蓋 React、Vue.js、Angular、Svelte、Ember.js、Preact、Backbone.js、Next.js、Nuxt.js 和 Gatsby。我們將從 簡介、優缺點、適用場景 以及 實際…

【MySQL】索引(頁目錄、B+樹)

文章目錄 1. 引入索引2. MySQL與磁盤交互的基本單位3. 索引的理解3.1 頁目錄3.2 B樹 4. 聚簇索引、非聚簇索引5. 索引的操作5.1 索引的創建5.1.1 創建主鍵索引5.1.2 創建唯一索引5.1.3 普通索引的創建5.1.4 全文索引的創建 5.2 索引的查詢5.3 刪除索引 1. 引入索引 索引&#…

python-串口助手(OV7670圖傳)

代碼 主python文件 import serial import serial.tools.list_ports import time import tkinter as tk from tkinter import ttk import numpy as np from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure import threadi…

筑牢網絡安全防線:守護您的數據安全

在數字化時代&#xff0c;數據安全已成為企業和個人不容忽視的重要議題。近日印尼國家數據中心遭黑客襲擊的事件&#xff0c;不僅擾亂了機場的移民檢查&#xff0c;還影響了眾多機構的服務運行。黑客利用惡意軟件對數據中心進行攻擊&#xff0c;索要巨額贖金&#xff0c;給印尼…

Vue 3 整合 WangEditor 富文本編輯器:從基礎到高級實踐

本文將詳細介紹如何在 Vue 3 項目中集成 WangEditor 富文本編輯器&#xff0c;實現圖文混排、自定義擴展等高階功能。 一、為什么選擇 WangEditor&#xff1f; 作為國內流行的開源富文本編輯器&#xff0c;WangEditor 具有以下優勢&#xff1a; 輕量高效&#xff1a;壓縮后僅…

FastGPT 引申:信息抽取到知識圖譜的銜接流程

文章目錄 信息抽取到知識圖譜的銜接流程步驟1&#xff1a;原始信息抽取結果步驟2&#xff1a;數據標準化處理&#xff08;Python示例&#xff09;步驟3&#xff1a;Cypher代碼動態生成&#xff08;Python驅動&#xff09; 關鍵銜接邏輯說明1. 唯一標識符生成規則2. 數據映射策略…

Webshell 入侵與防御全攻略

Webshell&#xff0c;是指攻擊者上傳到網站的遠程控制后門&#xff0c;允許黑客像管理員一樣遠程控制網站&#xff0c;執行惡意命令&#xff0c;甚至完全接管網站。本文將帶你深入了解 Webshell 的入侵方式以及相應的防御措施&#xff0c;幫助你加固自己的網站防線。 什么是 W…

NL2SQL-基于Dify+阿里通義千問大模型,實現自然語音自動生產SQL語句

本文基于Dify阿里通義千問大模型&#xff0c;實現自然語音自動生產SQL語句功能&#xff0c;話不多說直接上效果圖 我們可以試著問他幾個問題 查詢每個部門的員工數量SELECT d.dept_name, COUNT(e.emp_no) AS employee_count FROM employees e JOIN dept_emp de ON e.emp_no d…

雙鏈路提升網絡傳輸的可靠性擴展可用帶寬

為了提升網絡傳輸的可靠性或增加網絡可用帶寬&#xff0c; 通常使用雙鏈路冗余備份或者雙鏈路聚合的方式。 本文介紹幾種雙鏈路網絡通信的案例。 5GWiFi冗余傳輸 雙Socket綁定不同網絡接口&#xff1a;通過Android的ConnectivityManager綁定5G蜂窩網絡和WiFi的Socket連接&…

Ubuntu22.04安裝Ollama部署DeepSeek-R1:32B模型

一、環境準備 1.硬件要求 GPU: 至少 NVIDIA A30/A100 (顯存 ≥ 24GB)內存: ≥ 64GB RAM存儲: ≥ 100GB 可用空間 (模型文件約 60GB)2.軟件依賴 # 驗證NVIDIA驅動 nvidia-smi二、Ollama安裝 方法 1:install.sh安裝 運行一下安裝命令: curl -fsSL https://ollama.com/inst…