Flink Stream API - 源碼開發需求描述

概述

本文介紹如何基于Flink源碼進行二次開發,實現一個動態規則引擎系統。通過自定義算子和算子協調器,實現數據流的動態規則計算和協調管理。以此更好理解前面介紹的源碼相關文章

項目需求

核心功能

實現一個動態規則引擎,具備以下特性:

  • 數據源產生兩類數據:數據本身運算表達式
  • 按照運算表達式對數據進行運算并輸出結果
  • 運算表達式可以動態更新
  • 支持多并行度的運算任務

架構設計

在這里插入圖片描述

具體例子說明

場景:實時溫度監控系統

假設我們有一個實時溫度監控系統,需要對傳感器數據進行動態計算:

數據源輸入示例:
時間線:
T1: {"type": "rule", "expression": "temperature * 1.8 + 32"}  // 攝氏度轉華氏度
T2: {"type": "data", "sensorId": "001", "temperature": 25.0}
T3: {"type": "data", "sensorId": "002", "temperature": 30.0}
T4: {"type": "data", "sensorId": "003", "temperature": 20.0}
T5: {"type": "rule", "expression": "temperature + 273.15"}   // 攝氏度轉開爾文
T6: {"type": "data", "sensorId": "004", "temperature": 35.0}
T7: {"type": "data", "sensorId": "005", "temperature": 28.0}
期望的處理結果:
T2數據: 25.0 * 1.8 + 32 = 77.0°F    (使用第一個規則)
T3數據: 30.0 * 1.8 + 32 = 86.0°F    (使用第一個規則)
T4數據: 20.0 * 1.8 + 32 = 68.0°F    (使用第一個規則)
--- 規則切換點 ---
T6數據: 35.0 + 273.15 = 308.15K      (使用第二個規則)
T7數據: 28.0 + 273.15 = 301.15K      (使用第二個規則)
關鍵挑戰:
  1. 數據一致性:T4的數據必須用第一個規則計算完成后,T6的數據才能開始用第二個規則計算
  2. 并行處理:如果有多個Calc Operator并行處理,需要確保它們都完成了舊規則的計算
  3. 無數據丟失:規則切換過程中不能丟失任何數據

處理流程詳解:

當T5時刻新規則到達時:
1. Expression Operator收到新規則↓
2. 通知Coordinator更新規則: "temperature + 273.15"↓
3. 向所有Calc Operator廣播: "請完成當前批次計算"↓
4. 阻塞數據流: T6、T7數據暫時不向下游發送↓
5. 等待所有Calc Operator匯報: "我已完成T4及之前的數據計算"↓
6. Coordinator確認所有Task完成后,通知Expression Operator: "可以繼續"↓
7. 恢復數據流: T6、T7數據開始使用新規則處理

多并行度場景:

假設有3個Calc Operator并行處理:Calc-1: 正在處理T2數據 (25.0°C)
Calc-2: 正在處理T3數據 (30.0°C)
Calc-3: 正在處理T4數據 (20.0°C)當T5新規則到達時:
- 所有Calc都必須完成當前計算并匯報
- 只有收到3個完成匯報后,才能開始處理T6、T7數據

為什么需要Operator Coordinator?

問題:Flink的Task之間只能傳遞數據,無法傳遞控制信號
解決:通過Job Master中的Coordinator實現:
- Expression Operator → Coordinator: "新規則來了"
- Coordinator → 所有Calc Operator: "完成當前批次"
- 所有Calc Operator → Coordinator: "我完成了"
- Coordinator → Expression Operator: "可以繼續了"

時序圖示例:

在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/95978.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/95978.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/95978.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

「 CentOS7 安裝部署k8s」

一、Linux系統部署K8s還是非常便利的,只需要掌握Linux常用命令,便可以迅速部署,一起來學習一下吧1、運行以下命令更新系統并安裝必要工具:yum update -y yum install -y yum-utils device-mapper-persistent-data lvm22、安裝Dock…

Disbursement on Quarantine Policy(概率、逆元計算期望)

題目描述There is a train with n rows, and there are m seats per row. All seats are occupied. For some passengers, we know they are being infected with COVID-19 or not. However, for other passengers, we are not sure about their status, and we assume each of…

AI 在金融領域的落地案例

目錄 引言 一、信貸風控:基于 LoRA 的 Qwen-7B 模型微調(適配城商行審批場景) 場景背景 核心代碼 1. 環境依賴安裝 2. 金融數據集加載與預處理(城商行信貸數據) 3. LoRA 微調 Qwen-7B 模型 4. 模型推理&#xf…

平衡二叉樹的調整

平衡二叉樹的定義平衡二叉樹(balanced binary tree),又稱AVL樹(Adelson-Velskii and Landis)。 一棵平衡二叉樹或者是空樹,或者是具有下列性質的二叉排序樹:① 左子樹與右子樹的高度之差的絕對值小于等于1;…

深入解析:如何設計靈活且可維護的自定義消息機制

深入解析:如何設計靈活且可維護的自定義消息機制 引言 在現代軟件開發中,組件間的通信機制至關重要。無論是前端框架中的組件交互,還是后端服務間的消息傳遞,一個良好的消息機制能顯著提升代碼的可維護性和擴展性。本文將深入探討…

PostgreSQL——用戶管理

PostgreSQL用戶管理一、組角色管理1.1、創建組角色1.2、查看和修改組角色1.3、刪除組角色二、角色的各種權限2.1、LOGIN(登錄)2.2、SUPERUSER(超級用戶)3.3、CREATEDB(創建數據庫)3.4、CREATEROLE&#xff…

東軟8位MCU使用問題總結

簡介用的單片機為ES7P7021,采用8位RISC內核,2KB的FLASH,128bit的RAM。編譯器使用東軟提供的iDesigner,開發過程中編譯器和單片機有一些地方使用時需要注意下。1.RAMclear()函數注意問題/****************************************…

深度學習在訂單簿分析與短期價格預測中的應用探索

一、訂單簿數據特性及預處理 1.1 訂單簿數據結構解析 在金融交易領域,訂單簿是市場微觀結構的集中體現,它記錄了不同價格水平的買賣訂單信息。一個典型的訂單簿由多個層級組成,每個層級包含特定價格上的買單和賣單數量。例如,在某…

Hashmap源碼

目錄 HashMap底層原理 JDK1.8及以后底層結構為:數組鏈表紅黑樹 默認參數 擴容機制 數組 鏈表 紅黑樹 HashMap為什么用紅黑樹不用B樹 HashMap什么時候擴容 HashMap的長度為什么是 2的 N 次方 HashMap底層原理 JDK1.8及以后底層結構為:數組鏈表紅…

【JAVA 字符串常量池、new String的存儲機制、==與equals的區別,以及字符串重新賦值時的指向變化】

系列文章目錄 提示:這里可以添加系列文章的所有文章的目錄,目錄需要自己手動添加 提示:寫完文章后,目錄可以自動生成,如何生成可參考右邊的幫助文檔 文章目錄系列文章目錄代碼原理解錯誤邏輯理解理解與修正&#xff1a…

博客項目 Spring + Redis + Mysql

基礎模塊1. 郵箱發送功能最初設計的接口 (雛形)public interface EmailService {/*** 發送驗證碼郵件** param email 目標郵箱* return 發送的code* throws RuntimeException 如果發送郵件失敗,將拋出異常*/String sendVerificationCode(Stri…

前端處理導出PDF。Vue導出pdf

前言:該篇主要是解決一些簡單的頁面內容導出為PDF1.安裝依賴使用到兩個依賴,項目目錄下運行這兩個//頁面轉換成圖片 npm install --save html2canvas //圖片轉換成pdf npm install jspdf --save 2.創建通用工具類exportPdf.js文件可以保存在工具類目錄下…

【GM3568JHF】FPGA+ARM異構開發板燒錄指南

1. Windows燒錄說明 SDK 提供 Windows 燒寫工具(工具版本需要 V3.31或以上),工具位于工程根目錄: tools/ ├── windows/RKDevTool 如下圖,編譯生成相應的固件后,設備燒寫需要進入 MASKROM 或 LOADER 燒寫模式,準備…

C++ 多進程編程深度解析【C++進階每日一學】

文章目錄一、引言二、核心概念:進程 (Process)功能與作用三、C 多進程的實現方式四、核心函數詳解1. fork() - 創建子進程函數原型功能說明返回值完整使用格式2. wait() 和 waitpid() - 等待子進程結束函數原型參數與返回值詳解3. exec 系列函數 - 執行新程序函數族…

一周學會Matplotlib3 Python 數據可視化-繪制面積圖(Area)

鋒哥原創的Matplotlib3 Python數據可視化視頻教程: 2026版 Matplotlib3 Python 數據可視化 視頻教程(無廢話版) 玩命更新中~_嗶哩嗶哩_bilibili 課程介紹 本課程講解利用python進行數據可視化 科研繪圖-Matplotlib,學習Matplotlib圖形參數基本設置&…

北京JAVA基礎面試30天打卡11

1.索引創建注意事項 適合的場景 1.頻繁使用where語句查詢的字段 2.關聯字段需要建立索 3.如果不創建索引,那么在連接的過程中,每個值都會進行一次全表掃描 4.分組和排序字段可以建立索引因為索引天生就是有序的,在分組和排序時優勢不言而喻 5…

vscode無法檢測到typescript環境解決辦法

有一個vitereacttypescript項目,在工作電腦上一切正常。但是,在我家里的電腦運行,始終無法檢測到typescript環境。即使出現錯誤的ts語法,也不會有報錯提示,效果如下:我故意將一個string類型,傳入…

【MCP開發】Nodejs+Typescript+pnpm+Studio搭建Mcp服務

MCP服務支持兩種協議,Studio和SSE/HTTP,目前官方提供的SDK有各種語言。 開發方式有以下幾種: 編程語言MCP命令協議發布方式PythonuvxSTUDIOpypiPython遠程調用SSE服務器部署NodejspnpmSTUDIOpnpmNodejs遠程調用SSE服務器部署… 一、初始化項…

vscode使用keil5出現變量跳轉不了和搜索全局不了

vscode使用keil5出現變量跳轉不了,或者未包含文件,或者未全局檢索; 參考如下文章后還會出現; 為什么vscode搜索欄只搜索已經打開的文件_vscode全局搜索只能搜當前文件-CSDN博客 在機緣巧合之下發現如下解決方式: 下載…

命名空間——網絡(net)

命名空間——網絡(net) 一、網絡命名空間:每個都是獨立的“網絡房間” 想象你的電腦是一棟大樓,每個網絡命名空間就是大樓里的一個“獨立房間”: 每個房間里有自己的“網線接口”(網卡)、“門牌…