flink 偽代碼

?

import java.util.*;
import java.util.concurrent.*;// 核心接口定義
interface StreamOperator {void open();void processElement(Object element);void close();
}interface SourceFunction extends StreamOperator {void run(SourceContext ctx);
}interface SinkFunction extends StreamOperator {void invoke(Object value);
}// 運行時組件
class JobGraph {private List<StreamOperator> operators = new ArrayList<>();public void addOperator(StreamOperator operator) {operators.add(operator);}public List<StreamOperator> getOperators() {return operators;}
}class ExecutionGraph {private List<ExecutionVertex> vertices = new ArrayList<>();public void addVertex(ExecutionVertex vertex) {vertices.add(vertex);}public List<ExecutionVertex> getVertices() {return vertices;}
}class ExecutionVertex {private StreamOperator operator;private int parallelism;public ExecutionVertex(StreamOperator operator, int parallelism) {this.operator = operator;this.parallelism = parallelism;}public StreamOperator getOperator() {return operator;}
}// 主控節點
class JobManager {private ResourceManager resourceManager = new ResourceManager();private Map<String, JobMaster> runningJobs = new ConcurrentHashMap<>();public String submitJob(JobGraph jobGraph) {String jobId = UUID.randomUUID().toString();JobMaster jobMaster = new JobMaster(jobId, jobGraph);runningJobs.put(jobId, jobMaster);jobMaster.start(resourceManager);return jobId;}
}class JobMaster {private String jobId;private JobGraph jobGraph;private CheckpointCoordinator checkpointCoordinator;public JobMaster(String jobId, JobGraph jobGraph) {this.jobId = jobId;this.jobGraph = jobGraph;this.checkpointCoordinator = new CheckpointCoordinator();}public void start(ResourceManager resourceManager) {// 構建執行圖ExecutionGraph executionGraph = buildExecutionGraph(jobGraph);// 申請資源List<TaskSlot> slots = resourceManager.allocateResources(executionGraph);// 部署任務deployTasks(executionGraph, slots);// 啟動檢查點協調器checkpointCoordinator.start(jobId, executionGraph);}private ExecutionGraph buildExecutionGraph(JobGraph jobGraph) {ExecutionGraph executionGraph = new ExecutionGraph();for (StreamOperator operator : jobGraph.getOperators()) {executionGraph.addVertex(new ExecutionVertex(operator, 2)); // 默認并行度2}return executionGraph;}private void deployTasks(ExecutionGraph executionGraph, List<TaskSlot> slots) {int slotIndex = 0;for (ExecutionVertex vertex : executionGraph.getVertices()) {for (int i = 0; i < vertex.getParallelism(); i++) {Task task = new Task(vertex.getOperator());slots.get(slotIndex++ % slots.size()).deployTask(task);}}}
}// 資源管理
class ResourceManager {private List<TaskManager> taskManagers = new ArrayList<>();public ResourceManager() {// 初始化3個TaskManagerfor (int i = 0; i < 3; i++) {taskManagers.add(new TaskManager(i));}}public List<TaskSlot> allocateResources(ExecutionGraph executionGraph) {List<TaskSlot> slots = new ArrayList<>();for (TaskManager tm : taskManagers) {slots.addAll(tm.getAvailableSlots());}return slots.subList(0, Math.min(slots.size(), executionGraph.getVertices().size()));}
}// 工作節點
class TaskManager {private int id;private List<TaskSlot> slots = new ArrayList<>();public TaskManager(int id) {this.id = id;// 每個TaskManager有2個slotslots.add(new TaskSlot(id + "-1"));slots.add(new TaskSlot(id + "-2"));}public List<TaskSlot> getAvailableSlots() {return new ArrayList<>(slots);}
}class TaskSlot {private String id;private Task runningTask;public TaskSlot(String id) {this.id = id;}public void deployTask(Task task) {this.runningTask = task;task.start();}
}// 任務執行
class Task implements Runnable {private StreamOperator operator;private Thread executionThread;public Task(StreamOperator operator) {this.operator = operator;}public void start() {executionThread = new Thread(this);executionThread.start();}@Overridepublic void run() {operator.open();// 模擬數據處理循環while (true) {Object element = fetchNextElement(); // 從上游獲取數據if (element != null) {operator.processElement(element);}}}private Object fetchNextElement() {// 實際從網絡或本地隊列獲取數據return Math.random() > 0.5 ? new Object() : null;}
}// 容錯機制
class CheckpointCoordinator {public void start(String jobId, ExecutionGraph executionGraph) {ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {triggerCheckpoint(jobId, executionGraph);}, 0, 10, TimeUnit.SECONDS); // 每10秒觸發檢查點}private void triggerCheckpoint(String jobId, ExecutionGraph executionGraph) {System.out.println("Triggering checkpoint for job: " + jobId);// 1. 通知所有任務開始檢查點for (ExecutionVertex vertex : executionGraph.getVertices()) {// 實際實現中會通過RPC通知TaskManager}// 2. 等待所有任務確認// 3. 持久化檢查點元數據}
}// 示例應用
public class SimpleFlinkDemo {public static void main(String[] args) {// 1. 創建作業圖JobGraph jobGraph = new JobGraph();// 創建數據源SourceFunction source = new SourceFunction() {@Override public void open() {}@Override public void close() {}@Overridepublic void run(SourceContext ctx) {// 實際產生數據流}@Overridepublic void processElement(Object element) {// 源操作符不需要處理元素}};// 創建處理算子StreamOperator mapper = new StreamOperator() {@Override public void open() {}@Override public void close() {}@Overridepublic void processElement(Object element) {System.out.println("Processing: " + element);// 實際處理邏輯}};// 創建輸出算子SinkFunction sink = new SinkFunction() {@Override public void open() {}@Override public void close() {}@Overridepublic void invoke(Object value) {System.out.println("Output: " + value);}@Overridepublic void processElement(Object element) {invoke(element);}};// 構建作業圖jobGraph.addOperator(source);jobGraph.addOperator(mapper);jobGraph.addOperator(sink);// 2. 提交作業JobManager jobManager = new JobManager();String jobId = jobManager.submitJob(jobGraph);System.out.println("Job submitted with ID: " + jobId);// 保持主線程運行try {Thread.sleep(60000);} catch (InterruptedException e) {e.printStackTrace();}}
}

1.創建作業圖list:source數據源,mapper處理算子,sink輸出算子提交

2.加入jobmanager

3.jobmaster 添加一個作業 id:job,里main含有job圖

4.生成執行圖,里面裝的是執行ExecutionVertex

5.給執行圖分配slot

6.部署task

執行檢查

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/921323.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/921323.shtml
英文地址,請注明出處:http://en.pswp.cn/news/921323.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

一招快速識別你的電腦是機械硬盤還是固態硬盤

你是否經常覺得電腦開機慢、軟件打開卡頓&#xff1f;其中一個關鍵原因&#xff0c;可能就在于你使用的是機械硬盤&#xff08;HDD&#xff09;還是固態硬盤&#xff08;SSD&#xff09;。固態硬盤讀寫速度快&#xff0c;能顯著提升系統響應速度&#xff1b;而機械硬盤雖然容量…

52核心52線程,Intel下一代CPU憋了個大的

被逼急了的 Intel&#xff0c;可能正在憋大招&#xff01;如大伙兒所見&#xff0c;Intel 這兩年日子已經不能用「慘」來形容。其過去引以為傲的 PC 處理器&#xff0c;特別是高性能桌面處理器領域&#xff0c;如今算是徹底被 AMD 打懵了。無他&#xff0c;己方產品是連年擺爛&…

【LeetCode 熱題 100】1. 兩數之和——(解法二)哈希表

Problem: 1. 兩數之和 文章目錄整體思路完整代碼時空復雜度時間復雜度&#xff1a;O(N)空間復雜度&#xff1a;O(N)整體思路 這段代碼旨在高效地解決 “兩數之和” 問題。與 O(N^2) 的暴力枚舉法相比&#xff0c;此版本采用了一種經典的 “空間換時間” 策略&#xff0c;利用 …

MySQL主從同步--主從復制進階

MySQL支持一臺主庫同時向多臺從庫進行復制&#xff0c;從庫同時也可以作為其他從服務器的主庫&#xff0c;實現鏈狀復制。1、MySQL支持的binlog二進制日志復制類型- 基于語句&#xff08;statement&#xff09;的復制在主服務器上執行SQL語句&#xff0c;在從服務器上執行同樣的…

WPF外部打開html文件

注意&#xff1a;這是一份提供WPF外部瀏覽器打開html的方法&#xff0c;而不是WPF內部嵌入html 需要通過瀏覽器打開&#xff0c;否則無法使用地址欄拼接參數的形式操作html 下面是打開html的方法↓string localHtmlPath "C:\Users\pangb\Downloads\Help\幫助文檔 - 副本.…

Go初級之十:錯誤處理與程序健壯性

Go初級之十&#xff1a;錯誤處理與程序健壯性為什么選這個主題&#xff1f; 錯誤處理是 Go 語言中一個非常獨特且重要的設計哲學。它體現了 Go 的“顯式錯誤處理”思想&#xff0c;與其它語言&#xff08;如 Java/Python&#xff09;的異常機制不同。在實際開發中&#xff0c;幾…

Xsens解碼人形機器人訓練的語言

隨著人形機器人在現實世界的應用中變得越來越普遍&#xff0c;了解實現其類似人類運動的技術至關重要。在Xsens我們滿懷熱情地探索這一領域&#xff0c;致力于為人形機器人訓練開發最佳的動作捕捉解決方案。為了幫助您更好地理解所遇到的術語&#xff0c;我們創建了一份概述&am…

25年下載chromedriver.140

前提&#xff1a; 因為我需要用seleium模擬瀏覽器獲取數據&#xff0c;需要用到這個chromedriver 驅動。 1.chrome瀏覽器版本號 先檢查你的chrome 的版本號是多少&#xff0c;就下載對應的 chromedriver 【三個點】--->【幫助】------>【關于 Google chrome 】 我的版本…

深度學習玩游戲, 模型玩游戲,大模型+游戲 llm+game, 機器學習玩游戲,人工智能游戲陪伴,模型陪玩游戲

1. 論文地址 Think in Games: Learning to Reason in Games via Reinforcement Learning with Large Language Models 2. 中文&#xff1a; Think in Games&#xff1a;做一個在王者榮耀中會玩和思考的Agent 3. 我記得幾年前&#xff0c;相關文章還是使用dqn算法。玩雅利達小…

并查集|棧

lc1668不能直接跳class Solution { public:int maxRepeating(string sequence, string word) {int k 0, n sequence.size(), wn word.size(), t 0;for (int i 0; i < n - wn; i) {if (sequence.substr(i, wn) word) {t 1;int j i wn;while (j wn < n &&…

問題三ai思路

好的&#xff0c;我把“路線A&#xff1a;分類建模擇時”的代碼按功能分段給出&#xff0c;并為每段配上簡明解釋。你可以將這些段落依次粘貼到已完成清洗后的 df 變量之后直接運行。 0. 依賴導入&#xff08;一次即可&#xff09; 作用&#xff1a;導入所需庫&#xff1b;后續…

Java第十四幕集合啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦啦

集合1 Collection接口1.1 集合概述集合是一個裝對象的容器。集合中只能存放引用數據類型的對象。集合中有一些大小是固定的&#xff0c;有一些是不固定的。有一些是有序的&#xff0c;有些是無序的。有些可以有重復元素&#xff0c;有一些不可以有重復元素1.2 集合常用方法publ…

硬件基礎:串口通信

數據傳輸方式&#xff08;按位傳輸方式&#xff09;并行通信通過多條數據線同時傳輸多個數據位&#xff0c;速度較快但成本高&#xff0c;抗干擾能力弱&#xff0c;適用于短距離通信&#xff0c;如早期的打印機接口。串行通信通過單條或少數數據線逐位傳輸數據&#xff0c;線路…

從Java全棧到云原生:一場技術深度對話

從Java全棧到云原生&#xff1a;一場技術深度對話 面試官與應聘者互動記錄 面試官&#xff1a;你好&#xff0c;歡迎來到我們的面試。先簡單介紹一下你自己吧。 應聘者&#xff1a;您好&#xff0c;我叫李明&#xff0c;28歲&#xff0c;碩士學歷&#xff0c;有5年Java全棧開發…

158-EEMD-HHT算法

158-EEMD-HHT#EMD #希爾伯特變換-&#xff08;Hilbert- Huang Transform&#xff0c;HHT&#xff09;#集合經驗模態分解 EEMD #時頻分析 #邊際譜代碼描述1、利用 集合經驗模態分解&#xff08;EEMD&#xff09;方法對信號進行分解&#xff0c;得到模態分量 IMF&#xff1b;2、計…

C#開發中的 token

C# 開發中的 Token 詳解 C# 開發中的 Token 詳解與示例 1. CancellationToken - 異步取消令牌 示例 1:基礎取消機制 示例 2:Web API 中的請求取消 2. JWT Token - 身份驗證令牌 示例 1:JWT Token 生成與驗證 示例 2:ASP.NET Core JWT 認證配置 3. Access Token - API 訪問令…

旅游安全急救實訓室助力應急處置技能實戰化

隨著旅游行業的快速發展&#xff0c;游客安全需求日益突出&#xff0c;應急處置能力已成為旅游服務人才的核心素養之一。在中職教育旅游服務與管理專業中&#xff0c;旅游安全急救實訓室作為關鍵教學場所&#xff0c;正發揮著不可替代的作用。一、旅游安全急救實訓室的建設背景…

分布式微服務--ZooKeeper的客戶端常用命令 Java API 操作

一、ZooKeeper 客戶端常用命令 1. 啟動與退出 bin/zkCli.sh -server 127.0.0.1:2181 # 連接客戶端 quit # 退出客戶端2. 節點操作 # 查看子節點 ls / ls -s / ls /app# 查看節點詳細信息 ls2 /app stat /app# 創建節點 create /node1 "…

PID控制技術深度剖析:從基礎原理到高級應用(六)

PID 控制技術深度剖析&#xff1a;從基礎原理到高級應用 最近在項目中有要開始進行PID的控制了&#xff0c;隔了很久沒有做PID控制的東西了&#xff0c;所以想正好借這個機會&#xff0c;溫習一下和PID有關的內容。 系列文章目錄 PID控制技術深度剖析&#xff1a;從基礎原理到…

PCL關鍵點提取

1. 核心概念:什么是關鍵點?為什么需要關鍵點? 關鍵詞:信息冗余、計算效率、突出特征 “想象一下,我們有一片密集的點云,包含幾十萬個點。如果我們直接在每個點上都計算像FPFH這樣的局部特征,計算量會非常大,極其耗時,而且很多點所處的區域(比如平坦的墻面)特征非常…