本篇為spring-ai-alibaba學習系列第四十篇
前面介紹?ParalellExecutorNode 會為后續的 m?個 CoderNode 分配任務
現在來看一下處理型任務的處理節點 coder_{i}
該類節點主要負責執行一些操作,例如執行python代碼、調用mcp等
提示詞
以下是該文檔的中文翻譯:---
當前時間: {{ CURRENT_TIME }}
---你是一個由 `supervisor` 代理管理的 `coder` 代理。
你是一名專業的軟件工程師,精通 Python 腳本編寫。你的任務是分析需求、使用 Python 實現高效的解決方案,并清晰地記錄你的方法和結果。# 步驟1. **需求分析**:仔細審查任務描述,理解目標、約束條件和預期成果。
2. **制定解決方案**:確定任務是否需要 Python。概述實現解決方案所需的步驟。
3. **實現解決方案**:- 使用 Python 進行數據分析、算法實現或問題解決。- 在 Python 中使用 `print(...)` 打印輸出以顯示結果或調試值。
4. **列出所有必需的第三方依賴項**:以 `requirements.txt` 的格式列出此 Python 代碼所需的所有第三方依賴項,例如 `numpy==2.2.6`。如果沒有第三方依賴項,則跳過此步驟。
5. **測試解決方案**:驗證實現以確保其滿足要求并處理邊緣情況。
6. **記錄方法論**:提供對你方法的清晰解釋,包括你選擇背后的原因和所做的任何假設。
7. **展示結果**:清楚地顯示最終輸出和任何必要的中間結果。# 注意事項- 始終確保解決方案高效且遵循最佳實踐。
- 優雅地處理邊緣情況,例如空文件或缺失輸入。
- 在代碼中使用注釋以提高可讀性和可維護性。
- 如果你想查看某個值的輸出,必須使用 `print(...)` 將其打印出來。
- 始終且僅使用 Python 進行數學運算。
- 始終使用 `yfinance` 獲取金融市場數據:- 使用 `yf.download()` 獲取歷史數據- 使用 `Ticker` 對象訪問公司信息- 使用適當的時間范圍進行數據檢索
- 所需的 Python 包未預安裝,你應該提供 `requirements.txt`:- `pandas` 用于數據操作- `numpy` 用于數值運算- `yfinance` 用于金融市場數據
- 始終以 **{{ locale }}** 語言環境進行輸出。
- 如果代碼執行失敗,在最多重試 3 次后必須中止。
使用方法
spring.ai.alibaba.deepresearch.python-coder 相關配置
Coder節點的Python執行器跑在Docker容器中,需要額外為其配置Docker信息
在配置文件的spring.ai.alibaba.deepreserch.python-coder.docker-host
字段中設置DockerHost,默認為unix:///var/run/docker.sock
。
本項目需要使用python:3-slim
鏡像創建臨時容器,也可以自己定制包含一些常用的第三方庫的鏡像,第三方庫需要安裝在鏡像的/app/dependency
文件夾里,在配置文件中設置spring.ai.alibaba.deepreserch.python-coder.image-name
的值指定鏡像名稱
節點產出
coder_content_{i}:碼農節點大模型返回的響應數據
源碼跟蹤
跟蹤:在 DeepResearchConfiguration 中,會根據用戶配置的碼農節點個數生成 m?個coder_{i} 節點實例,類型為 CoderNode,每個節點有唯一的節點 id;
添加從 paralell_executor 到 coder_{i} 的邊和從 coder_{i} 到 research_team 的邊,這意味著 m?個 coder_{i} 節點會并行處理
創建時需要4個參數 coderAgent, String.valueOf(i), reflectionProcessor,?mcpProviderFactory
i 為當前節點編號
coderAgent:ChatClient類型,添加了 coder 提示詞,并注冊了一個可以執行 python 代碼的工具
reflectionProcessor:反思處理器
mcpProviderFactory:mcp提供者工廠
研究:CoderNode 的 apply 方法整體流程如下:
1)首先獲取研究計劃中屬于當前節點需要處理的步驟
2)若開啟反思且步驟狀態為待反思,則進入反思處理邏輯,反思通過則修改狀態為完成,否則修改狀態為待處理
3)修改步驟狀態為處理中
4)若開啟mcp,將mcp注冊進 coderAgent,然后將當前步驟內容及反思內容傳入 coderAgent 獲取響應
5)根據是否開啟反思,將步驟狀態修改為待反思或完成
附 apply 方法源碼
public Map<String, Object> apply(OverAllState state) throws Exception {logger.info("coder node {} is running for thread: {}", executorNodeId, state.value("thread_id", "__default__"));Plan currentPlan = StateUtil.getPlan(state);Map<String, Object> updated = new HashMap<>();Plan.Step assignedStep = findAssignedStep(currentPlan);if (assignedStep == null) {logger.info("No remaining steps to be executed by {}", nodeName);return updated;}// Handle reflection logicif (reflectionProcessor != null) {ReflectionProcessor.ReflectionHandleResult reflectionResult = reflectionProcessor.handleReflection(assignedStep, nodeName, "coder");if (!ReflectionUtil.shouldContinueAfterReflection(reflectionResult)) {logger.debug("Step {} reflection processing completed, skipping execution", assignedStep.getTitle());return updated;}}// Mark step as processingassignedStep.setExecutionStatus(StateUtil.EXECUTION_STATUS_PROCESSING_PREFIX + nodeName);try {// Build task messagesList<Message> messages = List.of(new UserMessage(buildTaskMessageWithReflectionHistory(assignedStep, state.value("locale", "en-US"))));logger.debug("{} Node message: {}", nodeName, messages);// 調用agentvar requestSpec = coderAgent.prompt().messages(messages);// 使用MCP工廠創建MCP客戶端AsyncMcpToolCallbackProvider mcpProvider = mcpFactory != null? mcpFactory.createProvider(state, "coderAgent") : null;if (mcpProvider != null) {requestSpec = requestSpec.toolCallbacks(mcpProvider.getToolCallbacks());}// Create stream with error handlingvar streamResult = requestSpec.stream().chatResponse().doOnError(error -> StateUtil.handleStepError(assignedStep, nodeName, error, logger));// Add step titleboolean isReflectionNode = assignedStep.getReflectionHistory() != null&& !assignedStep.getReflectionHistory().isEmpty();String prefix = isReflectionNode ? StreamNodePrefixEnum.CODER_REFLECT_LLM_STREAM.getPrefix(): StreamNodePrefixEnum.CODER_LLM_STREAM.getPrefix();String nodeNum = NodeStepTitleUtil.registerStepTitle(state, isReflectionNode, executorNodeId, "Coder",assignedStep.getTitle(), prefix);logger.info("CoderNode {} starting streaming with key: {}", executorNodeId, nodeNum);var generator = StreamingChatGenerator.builder().startingNode(nodeNum).startingState(state).mapResult(response -> {// Only handle successful responses - errors are handled in doOnErrorString coderContent = response.getResult().getOutput().getText();assignedStep.setExecutionStatus(ReflectionUtil.getCompletionStatus(reflectionProcessor != null, nodeName));assignedStep.setExecutionRes(Objects.requireNonNull(coderContent));logger.info("{} completed, content: {}", nodeName, coderContent);updated.put("coder_content_" + executorNodeId, coderContent);return updated;}).buildWithChatResponse(streamResult);updated.put("coder_content_" + executorNodeId, generator);return updated;}catch (Exception e) {// Handle any exception that occurs before or during stream setupStateUtil.handleStepError(assignedStep, nodeName, e, logger);return updated;}}