概述
本文介紹如何基于Flink源碼進行二次開發,實現一個動態規則引擎系統。通過自定義算子和算子協調器,實現數據流的動態規則計算和協調管理。以此更好理解前面介紹的源碼相關文章
項目需求
核心功能
實現一個動態規則引擎,具備以下特性:
- 數據源產生兩類數據:數據本身 和 運算表達式
- 按照運算表達式對數據進行運算并輸出結果
- 運算表達式可以動態更新
- 支持多并行度的運算任務
架構設計
具體例子說明
場景:實時溫度監控系統
假設我們有一個實時溫度監控系統,需要對傳感器數據進行動態計算:
數據源輸入示例:
時間線:
T1: {"type": "rule", "expression": "temperature * 1.8 + 32"} // 攝氏度轉華氏度
T2: {"type": "data", "sensorId": "001", "temperature": 25.0}
T3: {"type": "data", "sensorId": "002", "temperature": 30.0}
T4: {"type": "data", "sensorId": "003", "temperature": 20.0}
T5: {"type": "rule", "expression": "temperature + 273.15"} // 攝氏度轉開爾文
T6: {"type": "data", "sensorId": "004", "temperature": 35.0}
T7: {"type": "data", "sensorId": "005", "temperature": 28.0}
期望的處理結果:
T2數據: 25.0 * 1.8 + 32 = 77.0°F (使用第一個規則)
T3數據: 30.0 * 1.8 + 32 = 86.0°F (使用第一個規則)
T4數據: 20.0 * 1.8 + 32 = 68.0°F (使用第一個規則)
--- 規則切換點 ---
T6數據: 35.0 + 273.15 = 308.15K (使用第二個規則)
T7數據: 28.0 + 273.15 = 301.15K (使用第二個規則)
關鍵挑戰:
- 數據一致性:T4的數據必須用第一個規則計算完成后,T6的數據才能開始用第二個規則計算
- 并行處理:如果有多個Calc Operator并行處理,需要確保它們都完成了舊規則的計算
- 無數據丟失:規則切換過程中不能丟失任何數據
處理流程詳解:
當T5時刻新規則到達時:
1. Expression Operator收到新規則↓
2. 通知Coordinator更新規則: "temperature + 273.15"↓
3. 向所有Calc Operator廣播: "請完成當前批次計算"↓
4. 阻塞數據流: T6、T7數據暫時不向下游發送↓
5. 等待所有Calc Operator匯報: "我已完成T4及之前的數據計算"↓
6. Coordinator確認所有Task完成后,通知Expression Operator: "可以繼續"↓
7. 恢復數據流: T6、T7數據開始使用新規則處理
多并行度場景:
假設有3個Calc Operator并行處理:Calc-1: 正在處理T2數據 (25.0°C)
Calc-2: 正在處理T3數據 (30.0°C)
Calc-3: 正在處理T4數據 (20.0°C)當T5新規則到達時:
- 所有Calc都必須完成當前計算并匯報
- 只有收到3個完成匯報后,才能開始處理T6、T7數據
為什么需要Operator Coordinator?
問題:Flink的Task之間只能傳遞數據,無法傳遞控制信號
解決:通過Job Master中的Coordinator實現:
- Expression Operator → Coordinator: "新規則來了"
- Coordinator → 所有Calc Operator: "完成當前批次"
- 所有Calc Operator → Coordinator: "我完成了"
- Coordinator → Expression Operator: "可以繼續了"