前言
最近突然冒出一個想法:能不能用SpringBoot自己實現一個類似AWS Lambda或阿里云函數計算的執行引擎?
說干就干,于是從零開始設計了一套基于SpringBoot的Serverless執行框架。
這套框架支持函數動態加載、按需執行、資源隔離,甚至還實現了簡單的冷啟動優化。
今天分享給大家,看看如何用SpringBoot的強大能力,打造一個屬于自己的Serverless引擎。
設計思路
核心特性
我們要實現的Serverless引擎包含以下特性:
動態函數加載:支持運行時加載新的函數代碼
函數隔離執行:每個函數在獨立的上下文中運行
生命周期管理:自動管理函數的創建、執行和銷毀
資源限制:控制函數的執行時間
函數調用:支持HTTP、定時器等多種觸發方式
監控統計:記錄函數執行次數、耗時、成功率等指標
架構設計
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Function API │ │ Event Trigger │ │ Management UI │
└─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘│ │ │└──────────────────────┼──────────────────────┘│┌────────────┴──────────────┐│ Serverless Engine │└────────────┬──────────────┘│┌────────────────────────┼──────────────────────┐│ │ │
┌───────▼───────┐ ┌───────────▼─────────┐ ┌───────▼───────┐
│ Function Pool │ │ Execution Manager │ │ Resource Pool │
└───────────────┘ └─────────────────────┘ └───────────────┘
核心實現
項目結構
src/
├── main/
│ ├── java/
│ │ └── com/
│ │ └── example/
│ │ ├── ServerlessEngine.java
│ │ ├── core/
│ │ │ ├── FunctionManager.java
│ │ │ ├── ExecutionEngine.java
│ │ │ ├── ResourceManager.java
│ │ │ └── EventDispatcher.java
│ │ ├── model/
│ │ │ ├── ServerlessFunction.java
│ │ │ ├── ExecutionContext.java
│ │ │ ├── ExecutionResult.java
│ │ │ └── FunctionMetrics.java
│ │ ├── executor/
│ │ │ ├── FunctionExecutor.java
│ │ │ └── IsolatedClassLoader.java
│ │ ├── trigger/
│ │ │ ├── HttpTrigger.java
│ │ │ ├── TimerTrigger.java
│ │ │ └── EventTrigger.java
│ │ ├── api/
│ │ │ └── ServerlessController.java
│ └── resources/
│ ├── application.yml
│ └── functions/
│ ├── demo-function.jar
│ └── user-function.jar
函數接口定義
package com.example.model;import java.util.Map;/*** Serverless函數接口* 所有用戶函數都需要實現這個接口*/
@FunctionalInterface
public interface ServerlessFunction {/*** 函數執行入口* @param input 輸入參數* @param context 執行上下文* @return 執行結果*/Object handle(Map<String, Object> input, ExecutionContext context) throws Exception;
}
執行上下文
package com.example.model;import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** 函數執行上下文*/
public class ExecutionContext {private String requestId;private String functionName;private String functionVersion;private LocalDateTime startTime;private long timeoutMs;private Map<String, Object> environment;private Map<String, Object> attributes;public ExecutionContext(String requestId, String functionName) {this.requestId = requestId;this.functionName = functionName;this.functionVersion = "1.0";this.startTime = LocalDateTime.now();this.timeoutMs = 30000; // 默認30秒超時this.environment = new ConcurrentHashMap<>();this.attributes = new ConcurrentHashMap<>();}// 獲取剩余執行時間public long getRemainingTimeMs() {long elapsed = System.currentTimeMillis() - java.sql.Timestamp.valueOf(startTime).getTime();return Math.max(0, timeoutMs - elapsed);}@Overridepublic String toString() {return "ExecutionContext{" +"requestId='" + requestId + ''' +", functionName='" + functionName + ''' +", functionVersion='" + functionVersion + ''' +", startTime=" + startTime +", timeoutMs=" + timeoutMs +'}';}
}
執行結果
package com.example.model;import java.time.LocalDateTime;/*** 函數執行結果*/
public class ExecutionResult {private String requestId;private String functionName;private boolean success;private Object result;private String errorMessage;private String errorType;private LocalDateTime startTime;private LocalDateTime endTime;private long executionTime;public ExecutionResult(String requestId, String functionName) {this.requestId = requestId;this.functionName = functionName;this.startTime = LocalDateTime.now();}// 標記執行成功public void markSuccess(Object result) {this.success = true;this.result = result;this.endTime = LocalDateTime.now();this.executionTime = calculateExecutionTime();}// 標記執行失敗public void markFailure(String errorType, String errorMessage) {this.success = false;this.errorType = errorType;this.errorMessage = errorMessage;this.endTime = LocalDateTime.now();this.executionTime = calculateExecutionTime();}// 計算執行時間private long calculateExecutionTime() {if (startTime != null && endTime != null) {return java.sql.Timestamp.valueOf(endTime).getTime() - java.sql.Timestamp.valueOf(startTime).getTime();}return 0;}// Getter和Setter方法省略@Overridepublic String toString() {return "ExecutionResult{" +"requestId='" + requestId + ''' +", functionName='" + functionName + ''' +", success=" + success +", executionTime=" + executionTime +'}';}
}
函數指標統計
package com.example.model;import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;/*** 函數執行指標*/
public class FunctionMetrics {private String functionName;private AtomicLong invocationCount = new AtomicLong(0);private AtomicLong successCount = new AtomicLong(0);private AtomicLong errorCount = new AtomicLong(0);private AtomicLong totalExecutionTime = new AtomicLong(0);private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);private AtomicLong maxExecutionTime = new AtomicLong(0);private AtomicReference<LocalDateTime> lastInvocation = new AtomicReference<>();private AtomicReference<LocalDateTime> createTime = new AtomicReference<>(LocalDateTime.now());public FunctionMetrics(String functionName) {this.functionName = functionName;}// 記錄函數調用public void recordInvocation(ExecutionResult result) {invocationCount.incrementAndGet();lastInvocation.set(LocalDateTime.now());if (result.isSuccess()) {successCount.incrementAndGet();} else {errorCount.incrementAndGet();}long executionTime = result.getExecutionTime();totalExecutionTime.addAndGet(executionTime);// 更新最小執行時間minExecutionTime.updateAndGet(current -> Math.min(current, executionTime));// 更新最大執行時間maxExecutionTime.updateAndGet(current -> Math.max(current, executionTime));}// 獲取平均執行時間public double getAvgExecutionTime() {long count = invocationCount.get();if (count == 0) {return 0.0;}return (double) totalExecutionTime.get() / count;}// 獲取成功率public double getSuccessRate() {long total = invocationCount.get();if (total == 0) {return 0.0;}return (double) successCount.get() / total * 100;}// 獲取錯誤率public double getErrorRate() {return 100.0 - getSuccessRate();}@Overridepublic String toString() {return "FunctionMetrics{" +"functionName='" + functionName + ''' +", invocationCount=" + invocationCount.get() +", successCount=" + successCount.get() +", errorCount=" + errorCount.get() +", avgExecutionTime=" + String.format("%.2f", getAvgExecutionTime()) +", successRate=" + String.format("%.2f", getSuccessRate()) + "%" +'}';}
}
隔離類加載器
package com.example.executor;import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.HashMap;
import java.util.Map;/*** 隔離類加載器* 為每個函數提供獨立的類加載環境*/
public class IsolatedClassLoader extends URLClassLoader {private final String functionName;private final Map<String, Class<?>> loadedClasses = new HashMap<>();private final ClassLoader parentClassLoader;public IsolatedClassLoader(String functionName, URL[] urls, ClassLoader parent) {super(urls, parent);this.functionName = functionName;this.parentClassLoader = parent;}@Overrideprotected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {// 檢查是否已經加載過Class<?> loadedClass = loadedClasses.get(name);if (loadedClass != null) {return loadedClass;}// 對于Java系統類,使用父類加載器if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("sun.") || name.startsWith("com.sun.")) {return super.loadClass(name, resolve);}// 對于Spring相關類,使用父類加載器if (name.startsWith("org.springframework.") || name.startsWith("org.apache.") ||name.startsWith("com.fasterxml.")) {return super.loadClass(name, resolve);}try {// 嘗試自己加載類Class<?> clazz = findClass(name);loadedClasses.put(name, clazz);if (resolve) {resolveClass(clazz);}return clazz;} catch (ClassNotFoundException e) {// 如果找不到,使用父類加載器return super.loadClass(name, resolve);}}@Overrideprotected Class<?> findClass(String name) throws ClassNotFoundException {try {String path = name.replace('.', '/') + ".class";InputStream is = getResourceAsStream(path);if (is == null) {throw new ClassNotFoundException(name);}byte[] classData = readClassData(is);return defineClass(name, classData, 0, classData.length);} catch (IOException e) {throw new ClassNotFoundException(name, e);}}private byte[] readClassData(InputStream is) throws IOException {ByteArrayOutputStream buffer = new ByteArrayOutputStream();byte[] data = new byte[1024];int bytesRead;while ((bytesRead = is.read(data)) != -1) {buffer.write(data, 0, bytesRead);}return buffer.toByteArray();}public String getFunctionName() {return functionName;}public int getLoadedClassCount() {return loadedClasses.size();}@Overridepublic void close() throws IOException {loadedClasses.clear();super.close();}
}
函數執行器
package com.example.executor;import com.example.model.ExecutionContext;
import com.example.model.ExecutionResult;
import com.example.model.ServerlessFunction;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.*;/*** 函數執行器* 負責在隔離環境中執行函數*/
@Component
@Slf4j
public class FunctionExecutor {@Autowiredprivate ClassLoaderPool classLoaderPool;private final ExecutorService executorService;public FunctionExecutor() {// 創建線程池用于執行函數this.executorService = Executors.newCachedThreadPool(r -> {Thread t = new Thread(r);t.setName("function-executor-" + System.currentTimeMillis());t.setDaemon(true);return t;});}/*** 執行函數*/public ExecutionResult execute(String functionName, String jarPath, String className, Map<String, Object> input, ExecutionContext context) {ExecutionResult result = new ExecutionResult(context.getRequestId(), functionName);Future<Object> future = executorService.submit(() -> {// 從池中獲取ClassLoader(不需要每次創建)IsolatedClassLoader classLoader = classLoaderPool.getClassLoader(functionName, jarPath, className);// 加載函數類Class<?> functionClass = classLoader.loadClass(className);Object functionInstance = functionClass.getDeclaredConstructor().newInstance();// 檢查是否實現了ServerlessFunction接口if (!(functionInstance instanceof ServerlessFunction)) {throw new IllegalArgumentException("Function class must implement ServerlessFunction interface");}ServerlessFunction function = (ServerlessFunction) functionInstance;// 執行函數return function.handle(input, context);});try {// 等待執行結果,支持超時Object functionResult = future.get(context.getTimeoutMs(), TimeUnit.MILLISECONDS);result.markSuccess(functionResult);} catch (TimeoutException e) {future.cancel(true);result.markFailure("TIMEOUT", "Function execution timeout");} catch (ExecutionException e) {Throwable cause = e.getCause();log.error(cause.getMessage(),cause);result.markFailure(cause.getClass().getSimpleName(), cause.getMessage());} catch (Exception e) {result.markFailure(e.getClass().getSimpleName(), e.getMessage());}return result;}/*** 關閉執行器*/public void shutdown() {executorService.shutdown();try {if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {executorService.shutdownNow();}} catch (InterruptedException e) {executorService.shutdownNow();Thread.currentThread().interrupt();}}
}
函數管理器
package com.example.core;import com.example.model.FunctionMetrics;
import org.springframework.stereotype.Component;import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;/*** 函數管理器* 負責函數的注冊、查找、生命周期管理*/
@Component
public class FunctionManager {// 函數注冊表private final Map<String, FunctionDefinition> functions = new ConcurrentHashMap<>();// 函數指標private final Map<String, FunctionMetrics> metrics = new ConcurrentHashMap<>();/*** 函數定義*/public static class FunctionDefinition {private String name;private String description;private String jarPath;private String className;private long timeoutMs;private Map<String, Object> environment;private Date createTime;private Date updateTime;public FunctionDefinition(String name, String jarPath, String className) {this.name = name;this.jarPath = jarPath;this.className = className;this.timeoutMs = 30000; // 默認30秒this.environment = new HashMap<>();this.createTime = new Date();this.updateTime = new Date();}// Getter和Setter方法public String getName() { return name; }public void setName(String name) { this.name = name; }public String getDescription() { return description; }public void setDescription(String description) { this.description = description; }public String getJarPath() { return jarPath; }public void setJarPath(String jarPath) { this.jarPath = jarPath; }public String getClassName() { return className; }public void setClassName(String className) { this.className = className; }public long getTimeoutMs() { return timeoutMs; }public void setTimeoutMs(long timeoutMs) { this.timeoutMs = timeoutMs; }public Map<String, Object> getEnvironment() { return environment; }public void setEnvironment(Map<String, Object> environment) { this.environment = environment; }public Date getCreateTime() { return createTime; }public Date getUpdateTime() { return updateTime; }public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; }}/*** 注冊函數*/public void registerFunction(String name, String jarPath, String className) {// 驗證jar文件是否存在File jarFile = new File(jarPath);if (!jarFile.exists()) {throw new IllegalArgumentException("JAR file not found: " + jarPath);}FunctionDefinition definition = new FunctionDefinition(name, jarPath, className);functions.put(name, definition);// 初始化指標metrics.put(name, new FunctionMetrics(name));System.out.println("Function registered: " + name + " -> " + className);}/*** 注冊函數(帶配置)*/public void registerFunction(String name, String jarPath, String className, long timeoutMs, Map<String, Object> environment) {registerFunction(name, jarPath, className);FunctionDefinition definition = functions.get(name);definition.setTimeoutMs(timeoutMs);if (environment != null) {definition.setEnvironment(new HashMap<>(environment));}}/*** 獲取函數定義*/public FunctionDefinition getFunction(String name) {return functions.get(name);}/*** 檢查函數是否存在*/public boolean functionExists(String name) {return functions.containsKey(name);}/*** 獲取所有函數名稱*/public Set<String> getAllFunctionNames() {return new HashSet<>(functions.keySet());}/*** 獲取所有函數定義*/public Collection<FunctionDefinition> getAllFunctions() {return new ArrayList<>(functions.values());}/*** 更新函數*/public void updateFunction(String name, String jarPath, String className) {if (!functionExists(name)) {throw new IllegalArgumentException("Function not found: " + name);}FunctionDefinition definition = functions.get(name);definition.setJarPath(jarPath);definition.setClassName(className);definition.setUpdateTime(new Date());System.out.println("Function updated: " + name);}/*** 刪除函數*/public void removeFunction(String name) {if (functions.remove(name) != null) {metrics.remove(name);System.out.println("Function removed: " + name);}}/*** 獲取函數指標*/public FunctionMetrics getFunctionMetrics(String name) {return metrics.get(name);}/*** 獲取所有函數指標*/public Collection<FunctionMetrics> getAllMetrics() {return new ArrayList<>(metrics.values());}/*** 清理所有函數*/public void clear() {functions.clear();metrics.clear();}/*** 獲取函數數量*/public int getFunctionCount() {return functions.size();}
}
執行引擎
package com.example;import cn.hutool.core.io.FileUtil;
import com.example.core.FunctionManager;
import com.example.trigger.TimerTrigger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;import java.util.HashMap;
import java.util.Map;/*** Serverless引擎啟動類*/
@SpringBootApplication
@EnableScheduling
public class ServerlessEngine implements CommandLineRunner {@Autowiredprivate FunctionManager functionManager;@Autowiredprivate TimerTrigger timerTrigger;public static void main(String[] args) {FileUtil.writeBytes("123".getBytes(),"functions/function.txt");SpringApplication.run(ServerlessEngine.class, args);}@Overridepublic void run(String... args) throws Exception {System.out.println("=== Serverless Engine Started ===");// 注冊示例函數registerDemoFunctions();// 注冊示例定時任務registerDemoTimerTasks();System.out.println("=== Demo Functions and Tasks Registered ===");System.out.println("API available at: http://localhost:8080/serverless");}/*** 注冊演示函數*/private void registerDemoFunctions() {// 注冊Hello World函數functionManager.registerFunction("hello-world","functions/demo-function.jar","com.example.functions.HelloWorldFunction");// 注冊用戶服務函數Map<String, Object> userEnv = new HashMap<>();userEnv.put("DB_URL", "jdbc:h2:mem:testdb");userEnv.put("MAX_USERS", "1000");functionManager.registerFunction("user-service","functions/user-function.jar","com.example.functions.UserServiceFunction",60000, // 60秒超時userEnv);}/*** 注冊演示定時任務*/private void registerDemoTimerTasks() {// 注冊清理任務timerTrigger.registerTimerTask("cleanup-task","user-service","0 0 2 * * ?" // 每天凌晨2點執行);// 注冊健康檢查任務timerTrigger.registerTimerTask("health-check","hello-world","0/10 * * * * ?" // 每10秒執行一次);}
}
HTTP觸發器
package com.example.trigger;import com.example.core.ExecutionEngine;
import com.example.model.ExecutionResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;/*** HTTP觸發器* 處理HTTP請求觸發的函數調用*/
@Component
public class HttpTrigger {@Autowiredprivate ExecutionEngine executionEngine;/*** 處理HTTP請求*/public ExecutionResult handleRequest(String functionName, HttpServletRequest request, Map<String, Object> body) {// 構建輸入參數Map<String, Object> input = new HashMap<>();// 添加HTTP相關信息Map<String, Object> httpInfo = new HashMap<>();httpInfo.put("method", request.getMethod());httpInfo.put("path", request.getRequestURI());httpInfo.put("queryString", request.getQueryString());httpInfo.put("remoteAddr", request.getRemoteAddr());httpInfo.put("userAgent", request.getHeader("User-Agent"));// 添加請求頭Map<String, String> headers = new HashMap<>();Enumeration<String> headerNames = request.getHeaderNames();if (headerNames != null) {while (headerNames.hasMoreElements()) {String headerName = headerNames.nextElement();headers.put(headerName, request.getHeader(headerName));}}httpInfo.put("headers", headers);// 添加查詢參數Map<String, String[]> queryParams = request.getParameterMap();Map<String, Object> params = new HashMap<>();queryParams.forEach((key, values) -> {if (values.length == 1) {params.put(key, values[0]);} else {params.put(key, values);}});httpInfo.put("queryParams", params);input.put("http", httpInfo);// 添加請求體if (body != null) {input.put("body", body);}// 調用函數return executionEngine.invoke(functionName, input);}/*** 簡化的GET請求處理*/public ExecutionResult handleGetRequest(String functionName, HttpServletRequest request) {return handleRequest(functionName, request, null);}/*** 簡化的POST請求處理*/public ExecutionResult handlePostRequest(String functionName, HttpServletRequest request, Map<String, Object> body) {return handleRequest(functionName, request, body);}
}
定時觸發器
package com.example.trigger;import com.example.core.ExecutionEngine;
import com.example.model.ExecutionResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** 定時觸發器* 支持cron表達式定時觸發函數*/
@Component
public class TimerTrigger {@Autowiredprivate ExecutionEngine executionEngine;// 定時任務注冊表private final Map<String, TimerTask> timerTasks = new ConcurrentHashMap<>();/*** 定時任務定義*/public static class TimerTask {private String name;private String functionName;private String cronExpression;private boolean enabled;private LocalDateTime lastExecution;private LocalDateTime nextExecution;private long executionCount;public TimerTask(String name, String functionName, String cronExpression) {this.name = name;this.functionName = functionName;this.cronExpression = cronExpression;this.enabled = true;this.executionCount = 0;}// Getter和Setter方法public String getName() { return name; }public String getFunctionName() { return functionName; }public String getCronExpression() { return cronExpression; }public boolean isEnabled() { return enabled; }public void setEnabled(boolean enabled) { this.enabled = enabled; }public LocalDateTime getLastExecution() { return lastExecution; }public void setLastExecution(LocalDateTime lastExecution) { this.lastExecution = lastExecution; }public LocalDateTime getNextExecution() { return nextExecution; }public void setNextExecution(LocalDateTime nextExecution) { this.nextExecution = nextExecution; }public long getExecutionCount() { return executionCount; }public void incrementExecutionCount() { this.executionCount++; }}/*** 注冊定時任務*/public void registerTimerTask(String taskName, String functionName, String cronExpression) {TimerTask task = new TimerTask(taskName, functionName, cronExpression);timerTasks.put(taskName, task);System.out.println("Timer task registered: " + taskName + " -> " + functionName + " (" + cronExpression + ")");}/*** 移除定時任務*/public void removeTimerTask(String taskName) {if (timerTasks.remove(taskName) != null) {System.out.println("Timer task removed: " + taskName);}}/*** 啟用/禁用定時任務*/public void setTimerTaskEnabled(String taskName, boolean enabled) {TimerTask task = timerTasks.get(taskName);if (task != null) {task.setEnabled(enabled);System.out.println("Timer task " + taskName + " " + (enabled ? "enabled" : "disabled"));}}/*** 獲取所有定時任務*/public Map<String, TimerTask> getAllTimerTasks() {return new HashMap<>(timerTasks);}/*** 手動執行定時任務*/public ExecutionResult executeTimerTask(String taskName) {TimerTask task = timerTasks.get(taskName);if (task == null) {throw new IllegalArgumentException("Timer task not found: " + taskName);}return executeTask(task);}/*** 定時執行 - 每分鐘檢查一次*/@Scheduled(fixedRate = 60000) // 每分鐘執行一次public void checkAndExecuteTimerTasks() {LocalDateTime now = LocalDateTime.now();timerTasks.values().stream().filter(TimerTask::isEnabled).forEach(task -> {// 這里簡化處理,實際應該解析cron表達式// 為了演示,我們每5分鐘執行一次if (task.getLastExecution() == null || task.getLastExecution().isBefore(now.minusMinutes(5))) {executeTask(task);}});}/*** 執行定時任務*/private ExecutionResult executeTask(TimerTask task) {// 構建輸入參數Map<String, Object> input = new HashMap<>();Map<String, Object> timerInfo = new HashMap<>();timerInfo.put("taskName", task.getName());timerInfo.put("cronExpression", task.getCronExpression());timerInfo.put("executionTime", LocalDateTime.now().toString());timerInfo.put("executionCount", task.getExecutionCount());input.put("timer", timerInfo);// 執行函數ExecutionResult result = executionEngine.invoke(task.getFunctionName(), input);// 更新任務信息task.setLastExecution(LocalDateTime.now());task.incrementExecutionCount();System.out.println("Timer task executed: " + task.getName() + " -> " + task.getFunctionName() + ", success: " + result.isSuccess());return result;}
}
Serverless控制器
package com.example.api;import com.example.core.ExecutionEngine;
import com.example.core.FunctionManager;
import com.example.model.ExecutionResult;
import com.example.model.FunctionMetrics;
import com.example.trigger.HttpTrigger;
import com.example.trigger.TimerTrigger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import javax.servlet.http.HttpServletRequest;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;/*** Serverless API控制器*/
@RestController
@RequestMapping("/serverless")
public class ServerlessController {@Autowiredprivate FunctionManager functionManager;@Autowiredprivate ExecutionEngine executionEngine;@Autowiredprivate HttpTrigger httpTrigger;@Autowiredprivate TimerTrigger timerTrigger;/*** 調用函數*/@PostMapping("/functions/{functionName}/invoke")public ResponseEntity<Map<String, Object>> invokeFunction(@PathVariable String functionName,@RequestBody(required = false) Map<String, Object> input,HttpServletRequest request) {ExecutionResult result = httpTrigger.handlePostRequest(functionName, request, input);Map<String, Object> response = new HashMap<>();response.put("requestId", result.getRequestId());response.put("functionName", result.getFunctionName());response.put("success", result.isSuccess());response.put("executionTime", result.getExecutionTime());response.put("memoryUsed", result.getMemoryUsed());if (result.isSuccess()) {response.put("result", result.getResult());} else {response.put("errorType", result.getErrorType());response.put("errorMessage", result.getErrorMessage());}return ResponseEntity.ok(response);}/*** GET方式調用函數*/@GetMapping("/functions/{functionName}/invoke")public ResponseEntity<Map<String, Object>> invokeFunctionGet(@PathVariable String functionName,HttpServletRequest request) {ExecutionResult result = httpTrigger.handleGetRequest(functionName, request);Map<String, Object> response = new HashMap<>();response.put("requestId", result.getRequestId());response.put("functionName", result.getFunctionName());response.put("success", result.isSuccess());response.put("executionTime", result.getExecutionTime());if (result.isSuccess()) {response.put("result", result.getResult());} else {response.put("errorType", result.getErrorType());response.put("errorMessage", result.getErrorMessage());}return ResponseEntity.ok(response);}/*** 注冊函數*/@PostMapping("/functions/{functionName}")public ResponseEntity<Map<String, String>> registerFunction(@PathVariable String functionName,@RequestBody Map<String, Object> config) {String jarPath = (String) config.get("jarPath");String className = (String) config.get("className");Long timeoutMs = config.containsKey("timeoutMs") ? ((Number) config.get("timeoutMs")).longValue() : 30000L;Long maxMemory = config.containsKey("maxMemory") ? ((Number) config.get("maxMemory")).longValue() : 128 * 1024 * 1024L;@SuppressWarnings("unchecked")Map<String, Object> environment = (Map<String, Object>) config.get("environment");functionManager.registerFunction(functionName, jarPath, className, timeoutMs, maxMemory, environment);Map<String, String> response = new HashMap<>();response.put("message", "Function registered successfully");response.put("functionName", functionName);return ResponseEntity.ok(response);}/*** 獲取所有函數列表*/@GetMapping("/functions")public ResponseEntity<Map<String, Object>> getAllFunctions() {Collection<FunctionManager.FunctionDefinition> functions = functionManager.getAllFunctions();Map<String, Object> response = new HashMap<>();response.put("functions", functions);response.put("count", functions.size());return ResponseEntity.ok(response);}/*** 獲取函數詳情*/@GetMapping("/functions/{functionName}")public ResponseEntity<FunctionManager.FunctionDefinition> getFunctionDetail(@PathVariable String functionName) {FunctionManager.FunctionDefinition function = functionManager.getFunction(functionName);if (function == null) {return ResponseEntity.notFound().build();}return ResponseEntity.ok(function);}/*** 刪除函數*/@DeleteMapping("/functions/{functionName}")public ResponseEntity<Map<String, String>> deleteFunction(@PathVariable String functionName) {functionManager.removeFunction(functionName);Map<String, String> response = new HashMap<>();response.put("message", "Function deleted successfully");response.put("functionName", functionName);return ResponseEntity.ok(response);}/*** 獲取函數指標*/@GetMapping("/functions/{functionName}/metrics")public ResponseEntity<FunctionMetrics> getFunctionMetrics(@PathVariable String functionName) {FunctionMetrics metrics = functionManager.getFunctionMetrics(functionName);if (metrics == null) {return ResponseEntity.notFound().build();}return ResponseEntity.ok(metrics);}/*** 獲取所有函數指標*/@GetMapping("/metrics")public ResponseEntity<Map<String, Object>> getAllMetrics() {Collection<FunctionMetrics> metrics = functionManager.getAllMetrics();Map<String, Object> response = new HashMap<>();response.put("metrics", metrics);response.put("count", metrics.size());return ResponseEntity.ok(response);}/*** 注冊定時任務*/@PostMapping("/timer-tasks/{taskName}")public ResponseEntity<Map<String, String>> registerTimerTask(@PathVariable String taskName,@RequestBody Map<String, String> config) {String functionName = config.get("functionName");String cronExpression = config.get("cronExpression");timerTrigger.registerTimerTask(taskName, functionName, cronExpression);Map<String, String> response = new HashMap<>();response.put("message", "Timer task registered successfully");response.put("taskName", taskName);return ResponseEntity.ok(response);}/*** 獲取所有定時任務*/@GetMapping("/timer-tasks")public ResponseEntity<Map<String, Object>> getAllTimerTasks() {Map<String, TimerTrigger.TimerTask> tasks = timerTrigger.getAllTimerTasks();Map<String, Object> response = new HashMap<>();response.put("tasks", tasks);response.put("count", tasks.size());return ResponseEntity.ok(response);}/*** 手動執行定時任務*/@PostMapping("/timer-tasks/{taskName}/execute")public ResponseEntity<Map<String, Object>> executeTimerTask(@PathVariable String taskName) {ExecutionResult result = timerTrigger.executeTimerTask(taskName);Map<String, Object> response = new HashMap<>();response.put("requestId", result.getRequestId());response.put("success", result.isSuccess());response.put("executionTime", result.getExecutionTime());if (result.isSuccess()) {response.put("result", result.getResult());} else {response.put("errorType", result.getErrorType());response.put("errorMessage", result.getErrorMessage());}return ResponseEntity.ok(response);}/*** 系統狀態*/@GetMapping("/status")public ResponseEntity<Map<String, Object>> getSystemStatus() {Map<String, Object> status = new HashMap<>();// 系統信息Runtime runtime = Runtime.getRuntime();status.put("totalMemory", runtime.totalMemory());status.put("freeMemory", runtime.freeMemory());status.put("usedMemory", runtime.totalMemory() - runtime.freeMemory());status.put("maxMemory", runtime.maxMemory());status.put("availableProcessors", runtime.availableProcessors());// 函數統計status.put("functionCount", functionManager.getFunctionCount());status.put("timerTaskCount", timerTrigger.getAllTimerTasks().size());// 總執行次數long totalInvocations = functionManager.getAllMetrics().stream().mapToLong(FunctionMetrics::getInvocationCount).sum();status.put("totalInvocations", totalInvocations);return ResponseEntity.ok(status);}
}
主啟動類
package com.example;import cn.hutool.core.io.FileUtil;
import com.example.core.FunctionManager;
import com.example.trigger.TimerTrigger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;import java.util.HashMap;
import java.util.Map;/*** Serverless引擎啟動類*/
@SpringBootApplication
@EnableScheduling
public class ServerlessEngine implements CommandLineRunner {@Autowiredprivate FunctionManager functionManager;@Autowiredprivate TimerTrigger timerTrigger;public static void main(String[] args) {FileUtil.writeBytes("123".getBytes(),"functions/function.txt");SpringApplication.run(ServerlessEngine.class, args);}@Overridepublic void run(String... args) throws Exception {System.out.println("=== Serverless Engine Started ===");// 注冊示例函數registerDemoFunctions();// 注冊示例定時任務registerDemoTimerTasks();System.out.println("=== Demo Functions and Tasks Registered ===");System.out.println("API available at: http://localhost:8080/serverless");}/*** 注冊演示函數*/private void registerDemoFunctions() {// 注冊Hello World函數functionManager.registerFunction("hello-world","functions/demo-function.jar","com.example.functions.HelloWorldFunction");// 注冊用戶服務函數Map<String, Object> userEnv = new HashMap<>();userEnv.put("DB_URL", "jdbc:h2:mem:testdb");userEnv.put("MAX_USERS", "1000");functionManager.registerFunction("user-service","functions/user-function.jar","com.example.functions.UserServiceFunction",60000, // 60秒超時userEnv);}/*** 注冊演示定時任務*/private void registerDemoTimerTasks() {// 注冊清理任務timerTrigger.registerTimerTask("cleanup-task","user-service","0 0 2 * * ?" // 每天凌晨2點執行);// 注冊健康檢查任務timerTrigger.registerTimerTask("health-check","hello-world","0/10 * * * * ?" // 每10秒執行一次);}
}
配置文件
# application.yml
server:port: 8080spring:application:name: serverless-enginejackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null# Serverless引擎配置
serverless:function:# 函數存儲目錄function-dir: ./functions/# 默認超時時間(毫秒)default-timeout: 30000# 最大并發執行數max-concurrent-executions: 100executor:# 核心線程數core-pool-size: 10# 最大線程數max-pool-size: 50# 線程存活時間(秒)keep-alive-time: 60# 隊列容量queue-capacity: 1000logging:level:com.example: DEBUGpattern:console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"management:endpoints:web:exposure:include: health,info,metrics,envendpoint:health:show-details: always
示例函數
Hello World函數
package com.example.functions;import com.example.model.ExecutionContext;
import com.example.model.ServerlessFunction;import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;/*** Hello World示例函數*/
public class HelloWorldFunction implements ServerlessFunction {@Overridepublic Object handle(Map<String, Object> input, ExecutionContext context) throws Exception {Map<String, Object> result = new HashMap<>();result.put("message", "Hello from Serverless Engine!");result.put("timestamp", LocalDateTime.now().toString());result.put("requestId", context.getRequestId());result.put("functionName", context.getFunctionName());result.put("input", input);// 模擬一些處理時間Thread.sleep(100);return result;}
}
用戶服務函數
package com.example.functions;import com.example.model.ExecutionContext;
import com.example.model.ServerlessFunction;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;/*** 用戶服務示例函數*/
public class UserServiceFunction implements ServerlessFunction {// 模擬用戶存儲private static final Map<Long, Map<String, Object>> users = new ConcurrentHashMap<>();private static final AtomicLong idGenerator = new AtomicLong(1);static {// 初始化一些測試數據Map<String, Object> user1 = new HashMap<>();user1.put("id", 1L);user1.put("name", "John Doe");user1.put("email", "john@example.com");users.put(1L, user1);Map<String, Object> user2 = new HashMap<>();user2.put("id", 2L);user2.put("name", "Jane Smith");user2.put("email", "jane@example.com");users.put(2L, user2);idGenerator.set(3);}@Overridepublic Object handle(Map<String, Object> input, ExecutionContext context) throws Exception {String action = (String) ((Map)input.get("body")).get("action");if (action == null) {action = "list";}Map<String, Object> result = new HashMap<>();switch (action.toLowerCase()) {case "list":result.put("users", users.values());result.put("count", users.size());break;case "get":Long userId = Long.valueOf(input.get("userId").toString());Map<String, Object> user = users.get(userId);if (user != null) {result.put("user", user);} else {result.put("error", "User not found");}break;case "create":@SuppressWarnings("unchecked")Map<String, Object> userData = (Map<String, Object>) ((Map)input.get("body")).get("user");Long newId = idGenerator.getAndIncrement();userData.put("id", newId);users.put(newId, userData);result.put("user", userData);result.put("message", "User created successfully");break;case "delete":Long deleteId = Long.valueOf(input.get("userId").toString());Map<String, Object> deletedUser = users.remove(deleteId);if (deletedUser != null) {result.put("message", "User deleted successfully");} else {result.put("error", "User not found");}break;default:result.put("error", "Unknown action: " + action);}result.put("action", action);result.put("timestamp", System.currentTimeMillis());return result;}
}
功能測試
#!/bin/bash
# test-serverless-engine.shBASE_URL="http://localhost:8080/serverless"echo "=== Testing Serverless Engine ==="# 1. 獲取系統狀態
echo "1. Getting system status..."
curl -s "${BASE_URL}/status" | jq '.'
echo# 2. 獲取所有函數
echo "2. Getting all functions..."
curl -s "${BASE_URL}/functions" | jq '.'
echo# 3. 調用Hello World函數
echo "3. Invoking hello-world function..."
curl -s -X POST "${BASE_URL}/functions/hello-world/invoke" \-H "Content-Type: application/json" \-d '{"name": "Serverless Test"}' | jq '.'
echo# 4. 調用用戶服務函數 - 列出用戶
echo "4. Invoking user-service function - list users..."
curl -s -X POST "${BASE_URL}/functions/user-service/invoke" \-H "Content-Type: application/json" \-d '{"action": "list"}' | jq '.'
echo# 5. 調用用戶服務函數 - 創建用戶
echo "5. Invoking user-service function - create user..."
curl -s -X POST "${BASE_URL}/functions/user-service/invoke" \-H "Content-Type: application/json" \-d '{"action": "create","user": {"name": "Bob Wilson","email": "bob@example.com"}}' | jq '.'
echo# 6. 獲取函數指標
echo "6. Getting function metrics..."
curl -s "${BASE_URL}/metrics" | jq '.'
echo# 7. 獲取定時任務
echo "7. Getting timer tasks..."
curl -s "${BASE_URL}/timer-tasks" | jq '.'
echoecho "=== Test Completed ==="
Maven配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>serverless-engine</artifactId><version>1.0.0</version><packaging>jar</packaging><name>SpringBoot Serverless Engine</name><description>A serverless execution engine built with SpringBoot</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.18</version><relativePath/></parent><properties><java.version>11</java.version></properties><dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot Starter Actuator --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!-- Jackson for JSON processing --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency></dependencies></project>
總結
通過SpringBoot,我們成功實現了一個功能完整的Serverless執行引擎。這個引擎具備了以下核心能力:
核心特性
函數隔離:每個函數在獨立的類加載器中運行
生命周期管理:自動管理函數的創建、執行和銷毀
多種觸發方式:支持HTTP和定時器觸發
監控統計:完整的執行指標和性能統計
RESTful API:完整的管理和調用接口
技術亮點
動態類加載:使用自定義ClassLoader實現函數隔離
異步執行:基于線程池的并發執行機制
資源控制:支持超時和內存限制
指標收集:實時統計函數執行情況
這套自研的Serverless引擎展示了SpringBoot強大的擴展能力,不僅能快速構建業務應用,還能打造底層基礎設施。
希望這個實現能給大家一些啟發,在技術架構設計上有所幫助。