Spring AI 多模型智能協作工作流實現指南
說明
本文檔旨在指導開發者基于 Spring AI 框架,在 Spring Boot 2 環境下集成多種主流大語言模型(如 OpenAI ChatGPT、Deepseek、阿里云通義千問等),并提供從環境配置、模型調用、流式輸出到提示模板與異常處理的完整使用示例。文中示例適配 Spring AI 進行開發。本教程適用于對 LLM 應用開發有一定基礎的 Java 工程師,亦可作為企業多模型接入與管理的實現參考。
1. 系統架構概述
本方案基于 Spring AI 框架,構建了一個多模型協作的工作流系統,通過 ChatGPT、Deepseek 和通義千問三大模型的優勢互補,實現從原始輸入到高質量輸出的完整處理流程。
以下是基于Mermaid語法的工作流架構圖,您可以直接復制到支持Mermaid的Markdown編輯器(如Typora、VS Code插件等)中查看:
架構圖說明:
-
流程圖元素:
- 矩形框:表示處理步驟
- 菱形框:表示判斷/解析節點
- 子圖:標識不同處理階段
- 樣式:區分輸入/輸出節點
-
工作流步驟:
-
帶詳細說明的版本:
關鍵路徑說明:
-
正常流程:
用戶輸入 → ChatGPT提取 → 解析JSON → Deepseek生成 → 通義千問潤色 → 輸出
-
異常流程:
解析失敗 → 使用默認關鍵詞 → 繼續后續流程
-
組件職責:
- ChatGPT:結構化理解輸入文本
- Deepseek:基于結構化數據生成內容
- 通義千問:優化表達質量
部署架構圖(補充)
2. 環境配置
2.1 Maven 依賴配置
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.7.18</version></dependency><!-- Spring AI Core --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-core</artifactId><version>0.8.1</version></dependency><!-- OpenAI Starter --><dependency><groupId>org.springframework.ai</groupId><artifactId>spring-ai-openai-spring-boot-starter</artifactId><version>0.8.1</version></dependency><!-- 阿里云通義千問 --><dependency><groupId>com.alibaba.cloud.ai</groupId><artifactId>spring-ai-alibaba-starter</artifactId><version>1.0.0-M2</version></dependency><!-- JSON 處理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 日志記錄 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></dependency>
</dependencies>
2.2 應用配置 (application.yml)
spring:ai:openai:api-key: ${OPENAI_API_KEY}model: gpt-4-turbotemperature: 0.7max-tokens: 1000alibaba:dashscope:api-key: ${TONGYI_API_KEY}model: qwen-maxtemperature: 0.8deepseek:api-key: ${DEEPSEEK_API_KEY}base-url: https://api.deepseek.com/v1model: deepseek-chatapp:workflow:max-retries: 3retry-delay: 1000timeout: 30000
3. 核心組件實現
3.1 模型任務定義
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ModelTask {private String taskId;private TaskType type;private String input;private Map<String, Object> params;public enum TaskType {KEYWORD_EXTRACTION, // 關鍵詞提取CONTENT_GENERATION, // 內容生成TEXT_POLISHING // 文本潤色}
}
3.2 模型執行器接口
public interface ModelExecutor {/*** 執行模型任務* @param task 模型任務* @return 執行結果* @throws ModelExecutionException 模型執行異常*/String execute(ModelTask task) throws ModelExecutionException;/*** 支持的模型類型*/ModelTask.TaskType supportedType();
}
3.3 ChatGPT 執行器實現 (關鍵詞提取)
@Service
@Slf4j
public class ChatGPTExecutor implements ModelExecutor {@Autowiredprivate OpenAiChatClient chatClient;@Overridepublic String execute(ModelTask task) throws ModelExecutionException {try {String prompt = buildExtractionPrompt(task.getInput());log.info("Executing ChatGPT task with prompt: {}", prompt);String response = chatClient.call(prompt);log.info("ChatGPT response: {}", response);return parseJsonResponse(response);} catch (Exception e) {log.error("ChatGPT execution failed", e);throw new ModelExecutionException("ChatGPT processing failed", e);}}private String buildExtractionPrompt(String input) {return """請從以下文本中提取結構化信息,按JSON格式返回:{"keywords": ["關鍵詞1", "關鍵詞2", ...], // 5-10個核心關鍵詞"intent": "用戶意圖描述", // 用戶的主要目的"sentiment": "positive/neutral/negative" // 情感傾向}輸入文本:""" + input;}private String parseJsonResponse(String response) {// 簡化的JSON解析,實際項目中應使用Jackson等庫if (response.startsWith("{") && response.endsWith("}")) {return response;}// 處理非標準JSON響應return response.replaceFirst(".*?(\\{.*\\}).*", "$1");}@Overridepublic ModelTask.TaskType supportedType() {return ModelTask.TaskType.KEYWORD_EXTRACTION;}
}
3.4 Deepseek 執行器實現 (內容生成)
@Service
@Slf4j
public class DeepseekExecutor implements ModelExecutor {@Value("${deepseek.api-key}")private String apiKey;@Value("${deepseek.base-url}")private String baseUrl;@Value("${deepseek.model}")private String model;private final RestClient restClient;public DeepseekExecutor() {this.restClient = RestClient.builder().baseUrl(baseUrl).defaultHeader("Authorization", "Bearer " + apiKey).build();}@Overridepublic String execute(ModelTask task) throws ModelExecutionException {try {Map<String, Object> requestBody = buildRequest(task);log.info("Sending request to Deepseek: {}", requestBody);ResponseEntity<Map> response = restClient.post().uri("/chat/completions").body(requestBody).retrieve().toEntity(Map.class);return extractContent(response.getBody());} catch (Exception e) {log.error("Deepseek API call failed", e);throw new ModelExecutionException("Deepseek processing failed", e);}}private Map<String, Object> buildRequest(ModelTask task) {return Map.of("model", model,"messages", List.of(Map.of("role", "system", "content", "你是一個專業的內容生成助手"),Map.of("role", "user", "content", task.getInput())),"temperature", 0.7,"max_tokens", 2000);}private String extractContent(Map<String, Object> response) {// 簡化處理,實際項目需要更健壯的解析return ((Map)((List)response.get("choices")).get(0)).get("message").toString();}@Overridepublic ModelTask.TaskType supportedType() {return ModelTask.TaskType.CONTENT_GENERATION;}
}
3.5 通義千問執行器實現 (文本潤色)
@Service
@Slf4j
public class TongyiExecutor implements ModelExecutor {@Autowiredprivate TongyiChatClient chatClient;@Overridepublic String execute(ModelTask task) throws ModelExecutionException {try {String polishPrompt = buildPolishPrompt(task.getInput());log.info("Executing Tongyi polishing with prompt: {}", polishPrompt);String response = chatClient.call(polishPrompt);log.info("Tongyi polished response: {}", response);return response;} catch (Exception e) {log.error("Tongyi polishing failed", e);throw new ModelExecutionException("Tongyi polishing failed", e);}}private String buildPolishPrompt(String input) {return """請對以下文本進行專業的中文潤色,要求:1. 保持原意不變2. 優化表達流暢度3. 使用更專業的詞匯4. 適當調整句式結構5. 確保語法正確需要潤色的文本:""" + input;}@Overridepublic ModelTask.TaskType supportedType() {return ModelTask.TaskType.TEXT_POLISHING;}
}
4. 工作流協調器
@Service
@Slf4j
public class WorkflowCoordinator {@Autowiredprivate List<ModelExecutor> executors;@Value("${app.workflow.max-retries}")private int maxRetries;@Value("${app.workflow.retry-delay}")private long retryDelay;@Value("${app.workflow.timeout}")private long timeout;private final Map<ModelTask.TaskType, ModelExecutor> executorMap = new HashMap<>();@PostConstructpublic void init() {executors.forEach(executor -> executorMap.put(executor.supportedType(), executor));}@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 1000))public String processWorkflow(String userInput) throws WorkflowException {long startTime = System.currentTimeMillis();String taskId = UUID.randomUUID().toString();try {// 階段1: 關鍵詞提取ModelTask extractionTask = new ModelTask(taskId, ModelTask.TaskType.KEYWORD_EXTRACTION,userInput,Map.of("format", "json"));String extractionResult = executeWithTimeout(executorMap.get(ModelTask.TaskType.KEYWORD_EXTRACTION),extractionTask);// 階段2: 內容生成String generationPrompt = buildGenerationPrompt(extractionResult);ModelTask generationTask = new ModelTask(taskId,ModelTask.TaskType.CONTENT_GENERATION,generationPrompt,Map.of("length", "medium"));String generatedContent = executeWithTimeout(executorMap.get(ModelTask.TaskType.CONTENT_GENERATION),generationTask);// 階段3: 文本潤色ModelTask polishingTask = new ModelTask(taskId,ModelTask.TaskType.TEXT_POLISHING,generatedContent,Map.of("style", "professional"));String finalResult = executeWithTimeout(executorMap.get(ModelTask.TaskType.TEXT_POLISHING),polishingTask);log.info("Workflow completed in {} ms", System.currentTimeMillis() - startTime);return finalResult;} catch (TimeoutException e) {throw new WorkflowException("Workflow timed out", e);} catch (Exception e) {throw new WorkflowException("Workflow execution failed", e);}}private String executeWithTimeout(ModelExecutor executor, ModelTask task) throws Exception {ExecutorService executorService = Executors.newSingleThreadExecutor();Future<String> future = executorService.submit(() -> executor.execute(task));try {return future.get(timeout, TimeUnit.MILLISECONDS);} finally {executorService.shutdown();}}private String buildGenerationPrompt(String extractionResult) {try {JSONObject json = new JSONObject(extractionResult);String keywords = String.join(", ", json.getJSONArray("keywords").toList());String intent = json.getString("intent");return String.format("根據以下關鍵詞和意圖生成專業內容:\n關鍵詞: %s\n意圖: %s", keywords, intent);} catch (Exception e) {log.warn("Failed to parse extraction result, using raw input");return "根據以下信息生成專業內容:\n" + extractionResult;}}
}
5. REST 控制器
@RestController
@RequestMapping("/api/ai-workflow")
@Slf4j
public class WorkflowController {@Autowiredprivate WorkflowCoordinator workflowCoordinator;@PostMapping("/process")public ResponseEntity<?> processInput(@RequestBody String userInput) {try {log.info("Received processing request: {}", userInput);String result = workflowCoordinator.processWorkflow(userInput);return ResponseEntity.ok(result);} catch (WorkflowException e) {log.error("Workflow processing error", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(Map.of("error", "Processing failed","message", e.getMessage()));}}@GetMapping("/status")public ResponseEntity<?> getStatus() {return ResponseEntity.ok(Map.of("status", "operational","models", List.of("ChatGPT", "Deepseek", "Tongyi")));}
}
6. 異常處理
@ControllerAdvice
@Slf4j
public class GlobalExceptionHandler {@ExceptionHandler(WorkflowException.class)public ResponseEntity<?> handleWorkflowException(WorkflowException ex) {log.error("Workflow error occurred: {}", ex.getMessage());return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(Map.of("error", "AI Workflow Error","message", ex.getMessage(),"timestamp", Instant.now()));}@ExceptionHandler(ModelExecutionException.class)public ResponseEntity<?> handleModelException(ModelExecutionException ex) {log.error("Model execution failed: {}", ex.getMessage());return ResponseEntity.status(HttpStatus.BAD_GATEWAY).body(Map.of("error", "Model Execution Error","message", "One of the AI models failed to process the request","timestamp", Instant.now()));}
}
7. 測試用例
@SpringBootTest
@ActiveProfiles("test")
@Slf4j
public class WorkflowIntegrationTest {@Autowiredprivate WorkflowCoordinator workflowCoordinator;@Testpublic void testFullWorkflow() {String input = "用戶反饋:新版App的語音識別功能很準確,但耗電量較大,希望優化電池使用效率";try {String result = workflowCoordinator.processWorkflow(input);log.info("Workflow result:\n{}", result);assertNotNull(result);assertTrue(result.length() > 50);assertTrue(result.contains("語音識別") || result.contains("耗電"));} catch (WorkflowException e) {fail("Workflow execution failed: " + e.getMessage());}}@Testpublic void testTimeoutHandling() {// 模擬超時場景的測試assertThrows(WorkflowException.class, () -> {workflowCoordinator.processWorkflow("test timeout");});}
}
8. 部署與監控建議
8.1 Prometheus 監控配置
# application.yml 添加監控配置
management:endpoints:web:exposure:include: health,info,metrics,prometheusmetrics:export:prometheus:enabled: truetags:application: ai-workflow
8.2 關鍵指標監控
- 工作流執行時間
- 各模型調用成功率
- API響應時間
- 錯誤率統計
- Token使用量
9. 性能優化建議
-
緩存層:對常見查詢結果實現緩存
@Cacheable(value = "aiResponses", key = "#userInput.hashCode()") public String processWorkflow(String userInput) { ... }
-
批量處理:支持批量輸入處理
-
異步處理:對耗時任務實現異步API
@Async public CompletableFuture<String> processAsync(String input) { ... }
-
連接池配置:優化HTTP連接池
spring:ai:openai:rest:connection-timeout: 5000read-timeout: 30000max-connections: 100
10. 安全建議
-
API密鑰輪換:定期更新各模型API密鑰
-
輸入過濾:防止Prompt注入攻擊
public String sanitizeInput(String input) {return input.replaceAll("[<>\"']", ""); }
-
訪問控制:實現API訪問認證
-
請求限流:防止濫用
總結
本實現方案展示了如何利用 Spring AI 框架構建一個多模型協作的智能工作流系統,具有以下特點:
- 模塊化設計:各模型執行器相互獨立,易于擴展
- 彈性處理:完善的錯誤處理和重試機制
- 性能可控:超時管理和異步支持
- 可觀測性:完善的日志和監控支持
- 生產就緒:包含安全、緩存等生產環境所需功能
通過這種架構,您可以靈活地替換或添加新的AI模型,同時保持業務邏輯的一致性,為構建企業級AI應用提供了可靠的基礎框架。