?
import java.util.*;
import java.util.concurrent.*;// 核心接口定義
interface StreamOperator {void open();void processElement(Object element);void close();
}interface SourceFunction extends StreamOperator {void run(SourceContext ctx);
}interface SinkFunction extends StreamOperator {void invoke(Object value);
}// 運行時組件
class JobGraph {private List<StreamOperator> operators = new ArrayList<>();public void addOperator(StreamOperator operator) {operators.add(operator);}public List<StreamOperator> getOperators() {return operators;}
}class ExecutionGraph {private List<ExecutionVertex> vertices = new ArrayList<>();public void addVertex(ExecutionVertex vertex) {vertices.add(vertex);}public List<ExecutionVertex> getVertices() {return vertices;}
}class ExecutionVertex {private StreamOperator operator;private int parallelism;public ExecutionVertex(StreamOperator operator, int parallelism) {this.operator = operator;this.parallelism = parallelism;}public StreamOperator getOperator() {return operator;}
}// 主控節點
class JobManager {private ResourceManager resourceManager = new ResourceManager();private Map<String, JobMaster> runningJobs = new ConcurrentHashMap<>();public String submitJob(JobGraph jobGraph) {String jobId = UUID.randomUUID().toString();JobMaster jobMaster = new JobMaster(jobId, jobGraph);runningJobs.put(jobId, jobMaster);jobMaster.start(resourceManager);return jobId;}
}class JobMaster {private String jobId;private JobGraph jobGraph;private CheckpointCoordinator checkpointCoordinator;public JobMaster(String jobId, JobGraph jobGraph) {this.jobId = jobId;this.jobGraph = jobGraph;this.checkpointCoordinator = new CheckpointCoordinator();}public void start(ResourceManager resourceManager) {// 構建執行圖ExecutionGraph executionGraph = buildExecutionGraph(jobGraph);// 申請資源List<TaskSlot> slots = resourceManager.allocateResources(executionGraph);// 部署任務deployTasks(executionGraph, slots);// 啟動檢查點協調器checkpointCoordinator.start(jobId, executionGraph);}private ExecutionGraph buildExecutionGraph(JobGraph jobGraph) {ExecutionGraph executionGraph = new ExecutionGraph();for (StreamOperator operator : jobGraph.getOperators()) {executionGraph.addVertex(new ExecutionVertex(operator, 2)); // 默認并行度2}return executionGraph;}private void deployTasks(ExecutionGraph executionGraph, List<TaskSlot> slots) {int slotIndex = 0;for (ExecutionVertex vertex : executionGraph.getVertices()) {for (int i = 0; i < vertex.getParallelism(); i++) {Task task = new Task(vertex.getOperator());slots.get(slotIndex++ % slots.size()).deployTask(task);}}}
}// 資源管理
class ResourceManager {private List<TaskManager> taskManagers = new ArrayList<>();public ResourceManager() {// 初始化3個TaskManagerfor (int i = 0; i < 3; i++) {taskManagers.add(new TaskManager(i));}}public List<TaskSlot> allocateResources(ExecutionGraph executionGraph) {List<TaskSlot> slots = new ArrayList<>();for (TaskManager tm : taskManagers) {slots.addAll(tm.getAvailableSlots());}return slots.subList(0, Math.min(slots.size(), executionGraph.getVertices().size()));}
}// 工作節點
class TaskManager {private int id;private List<TaskSlot> slots = new ArrayList<>();public TaskManager(int id) {this.id = id;// 每個TaskManager有2個slotslots.add(new TaskSlot(id + "-1"));slots.add(new TaskSlot(id + "-2"));}public List<TaskSlot> getAvailableSlots() {return new ArrayList<>(slots);}
}class TaskSlot {private String id;private Task runningTask;public TaskSlot(String id) {this.id = id;}public void deployTask(Task task) {this.runningTask = task;task.start();}
}// 任務執行
class Task implements Runnable {private StreamOperator operator;private Thread executionThread;public Task(StreamOperator operator) {this.operator = operator;}public void start() {executionThread = new Thread(this);executionThread.start();}@Overridepublic void run() {operator.open();// 模擬數據處理循環while (true) {Object element = fetchNextElement(); // 從上游獲取數據if (element != null) {operator.processElement(element);}}}private Object fetchNextElement() {// 實際從網絡或本地隊列獲取數據return Math.random() > 0.5 ? new Object() : null;}
}// 容錯機制
class CheckpointCoordinator {public void start(String jobId, ExecutionGraph executionGraph) {ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {triggerCheckpoint(jobId, executionGraph);}, 0, 10, TimeUnit.SECONDS); // 每10秒觸發檢查點}private void triggerCheckpoint(String jobId, ExecutionGraph executionGraph) {System.out.println("Triggering checkpoint for job: " + jobId);// 1. 通知所有任務開始檢查點for (ExecutionVertex vertex : executionGraph.getVertices()) {// 實際實現中會通過RPC通知TaskManager}// 2. 等待所有任務確認// 3. 持久化檢查點元數據}
}// 示例應用
public class SimpleFlinkDemo {public static void main(String[] args) {// 1. 創建作業圖JobGraph jobGraph = new JobGraph();// 創建數據源SourceFunction source = new SourceFunction() {@Override public void open() {}@Override public void close() {}@Overridepublic void run(SourceContext ctx) {// 實際產生數據流}@Overridepublic void processElement(Object element) {// 源操作符不需要處理元素}};// 創建處理算子StreamOperator mapper = new StreamOperator() {@Override public void open() {}@Override public void close() {}@Overridepublic void processElement(Object element) {System.out.println("Processing: " + element);// 實際處理邏輯}};// 創建輸出算子SinkFunction sink = new SinkFunction() {@Override public void open() {}@Override public void close() {}@Overridepublic void invoke(Object value) {System.out.println("Output: " + value);}@Overridepublic void processElement(Object element) {invoke(element);}};// 構建作業圖jobGraph.addOperator(source);jobGraph.addOperator(mapper);jobGraph.addOperator(sink);// 2. 提交作業JobManager jobManager = new JobManager();String jobId = jobManager.submitJob(jobGraph);System.out.println("Job submitted with ID: " + jobId);// 保持主線程運行try {Thread.sleep(60000);} catch (InterruptedException e) {e.printStackTrace();}}
}
1.創建作業圖list:source數據源,mapper處理算子,sink輸出算子提交
2.加入jobmanager
3.jobmaster 添加一個作業 id:job,里main含有job圖
4.生成執行圖,里面裝的是執行ExecutionVertex
5.給執行圖分配slot
6.部署task
執行檢查