MetaGPT-DataInterpreter源碼解讀
MetaGPT 是一種多智能體框架,其利用SOP(Standard Operating Procedures)來協調多智能體系統。即:多智能體=智能體+環境+標準流程(SOP)+通信+經濟
DataInterpreter :簡單三行代碼,即可完成用戶requirement
任務
from metagpt.roles.di.data_interpreter import DataInterpreter
mi = DataInterpreter(use_reflection=True, tools=["<all>"])
mi.run(requirement)
machine_learning
from metagpt.roles.di.data_interpreter import DataInterpreter
DataInterpreter
mi = DataInterpreter() # 實例化方法
DataInterpreter(private_context=None,
private_config=None,
private_llm=<metagpt.provider.openai_api.OpenAILLM object at 0x7fa432f2cc40>,name='David', profile='DataInterpreter', goal='', constraints='', desc='', is_human=False, role_id='', states = ["0. <class 'metagpt.actions.di.write_analysis_code.WriteAnalysisCode'>"],
actions= [WriteAnalysisCode], rc = RoleContext(env=None, msg_buffer=MessageQueue(),memory=Memory(storage=[], index=defaultdict(<class 'list'>, {}), ignore_id=False), working_memory=Memory(storage=[], index=defaultdict(<class 'list'>, {}), ignore_id=False), state=0, todo=WriteAnalysisCode, watch={'metagpt.actions.add_requirement.UserRequirement'},news=[], react_mode='plan_and_act',max_react_loop=1), addresses={'David', 'metagpt.roles.di.data_interpreter.DataInterpreter'},planner=Planner(plan=Plan( goal='', context='', tasks=[], task_map={}, current_task_id=''),working_memory=Memory(storage=[], index=defaultdict(<class 'list'>, {}), ignore_id=False),auto_run=True),recovered=False, latest_observed_msg=None, auto_run=True, use_plan=True, use_reflection=False, execute_code=ExecuteNbCode,
tools=[],
tool_recommender=None, react_mode='plan_and_act', max_react_loop=1
)
上述展示 DataInterpreter
組成,首先補充下pydantic
基礎知識點:
@model_validator(mode=“wrap”):
- 驗證器方法需要接受兩個參數:
cls
(類本身)和value
(要驗證的值) - 可以在自定義邏輯中決定是否調用默認的
handler
方法來繼續驗證過程 - 允許在默認驗證之前和之后執行自定義邏輯
@model_validator(mode=“after”):
- 驗證器方法只需要接受一個參數:
value
(已經通過默認驗證的值) - 驗證器方法通常以
self
作為第一個參數,表示模型實例本身 - 這種模式下的驗證器會在 Pydantic 的默認字段驗證邏輯之后執行。
@model_validator(mode=“before”):
-
這種模式下的驗證器會在 Pydantic 的默認字段驗證邏輯之前執行
-
驗證器方法通常以
cls
作為第一個參數,表示模型類本身 -
這種模式適用于類方法,因為它們在類級別上操作,可以在創建實例之前對類進行操作
-
要點1:
SerializationMixin(BaseModel, extra="forbid")
@model_validator(mode="wrap")@classmethoddef __convert_to_real_type__(cls, value: Any, handler):# ... 方法實現 ...
@model_validator(mode="wrap")
裝飾器用于自定義 Pydantic 模型的驗證和設置過程。- 這個類方法用于在反序列化過程中將字典轉換回正確的子類實例。
- 如果輸入值不是一個字典,或者不包含
__module_class_name
字段,它會使用默認的處理程序來處理值。 - 如果
__module_class_name
存在,它會查找這個名稱對應的類類型,并使用這個類來實例化對象。
**不是很理解:**這段代碼通過自定義序列化和反序列化過程,實現了對多態類的支持。在序列化時,它會將類類型信息添加到輸出中;在反序列化時,它使用這些信息來創建正確的子類實例;
-
要點2:
DataInterpreter(Role)
class DataInterpreter(Role):name: str = "David"profile: str = "DataInterpreter"auto_run: bool = Trueuse_plan: bool = Trueuse_reflection: bool = Falseexecute_code: ExecuteNbCode = Field(default_factory=ExecuteNbCode, exclude=True)tools: list[str] = [] # Use special symbol ["<all>"] to indicate use of all registered toolstool_recommender: ToolRecommender = Nonereact_mode: Literal["plan_and_act", "react"] = "plan_and_act"max_react_loop: int = 10 # used for react mode@model_validator(mode="after")def set_plan_and_tool(self) -> "Interpreter":self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop, auto_run=self.auto_run)self.use_plan = (self.react_mode == "plan_and_act") # create a flag for convenience, overwrite any passed-in valueif self.tools and not self.tool_recommender:self.tool_recommender = BM25ToolRecommender(tools=self.tools)self.set_actions([WriteAnalysisCode])self._set_state(0)return self
如果采用
plan_and_act
模式,引入規劃器planner
self.planner = Planner(goal=self.goal, working_memory=self.rc.working_memory, auto_run=auto_run)
Planner
: 3 個字段分別是plan
、working_memory
、auto_run
class Planner(BaseModel):plan: Planworking_memory: Memory = Field(default_factory=Memory) # memory for working on each task, discarded each time a task is doneauto_run: bool = Falsedef __init__(self, goal: str = "", plan: Plan = None, **kwargs):plan = plan or Plan(goal=goal)super().__init__(plan=plan, **kwargs)
plan
字段也是個類實例,具有的字段如下:class Plan(BaseModel):goal: strcontext: str = ""tasks: list[Task] = []task_map: dict[str, Task] = {}current_task_id: str = ""
在
Role
類set_actions
中有段代碼如下:for action in actions:if not isinstance(action, Action):i = action(context=self.context)else:'''pass'''self._init_action(i)
self.context
是ContextMixin
類中一個方法,返回是Context()
對象實例; -
要點3:
Action(SerializationMixin, ContextMixin, BaseModel)
class Action(SerializationMixin, ContextMixin, BaseModel):model_config = ConfigDict(arbitrary_types_allowed=True)name: str = ""i_context: Union[dict, CodingContext, CodeSummarizeContext, TestingContext, RunCodeContext, CodePlanAndChangeContext, str, None ] = ""prefix: str = "" # aask*時會加上prefix,作為system_messagedesc: str = "" # for skill managernode: ActionNode = Field(default=None, exclude=True)
**字段驗證:**先執行
Action
里字段驗證,mode=“before” 字段從后向前驗證;@model_validator(mode="before")@classmethoddef set_name_if_empty(cls, values):if "name" not in values or not values["name"]:values["name"] = cls.__name__return values@model_validator(mode="before")@classmethoddef _init_with_instruction(cls, values):if "instruction" in values:name = values["name"]i = values.pop("instruction")values["node"] = ActionNode(key=name, expected_type=str, instruction=i, example="", schema="raw")return values
這里驗證
values
值到底是什么?其實校驗就是上述的self.context
**字段驗證:**后執行
ContextMixin
里字段驗證,mode=“after” 字段從前向后驗證;@model_validator(mode="after")def validate_context_mixin_extra(self):self._process_context_mixin_extra()return selfdef _process_context_mixin_extra(self):"""Process the extra field"""kwargs = self.model_extra or {}self.set_context(kwargs.pop("context", None))self.set_config(kwargs.pop("config", None))self.set_llm(kwargs.pop("llm", None))def set(self, k, v, override=False):"""Set attribute"""if override or not self.__dict__.get(k):self.__dict__[k] = v
打印下
self.__dict__
{'private_context': Context(kwargs=AttrD...': 0.0}})), 'private_config': None, 'private_llm': None, 'name': 'WriteAnalysisCode', 'i_context': '', 'prefix': '', 'desc': '', 'node': None}
_init_action
中 追加Action
字段的private_llm
、prefix
run 方法
:
mi.run(requirement)
-
要點1:
run
函數使用異步編程并被role_raise_decorator
裝飾(用于處理在異步函數執行過程中可能出現的異常);@role_raise_decorator async def run(self, with_message=None) -> Message | None:"""Observe, and think and act based on the results of the observation"""
def role_raise_decorator(func):async def wrapper(self, *args, **kwargs):try:return await func(self, *args, **kwargs)except KeyboardInterrupt as kbi:logger.error(f"KeyboardInterrupt: {kbi} occurs, start to serialize the project")if self.latest_observed_msg:self.rc.memory.delete(self.latest_observed_msg)# raise again to make it captured outsideraise Exception(format_trackback_info(limit=None))except Exception as e:if self.latest_observed_msg:logger.warning("There is a exception in role's execution, in order to resume, ""we delete the newest role communication message in the role's memory.")# remove role newest observed msg to make it observed againself.rc.memory.delete(self.latest_observed_msg)# raise again to make it captured outsideif isinstance(e, RetryError):last_error = e.last_attempt._exceptionname = any_to_str(last_error)if re.match(r"^openai\.", name) or re.match(r"^httpx\.", name):raise last_errorraise Exception(format_trackback_info(limit=None))return wrapper
這段代碼的結構和各個部分的功能如下:
- 在
try
塊中,調用原始的異步函數func
,并使用await
來等待其完成。 - 如果在執行
func
時捕獲到KeyboardInterrupt
異常(通常由用戶按Ctrl+C觸發),代碼將記錄一個錯誤日志,并刪除角色的最新觀察消息self.latest_observed_msg
,然后重新拋出一個異常。調用的是traceback.format_exc(limit=limit)
,這是一個用于獲取當前異常的堆棧跟蹤信息的函數。 - 如果異常是
RetryError
,它將檢查最后的錯誤類型,如果錯誤與openai
或httpx
相關,則拋出這個最后的錯誤。如果異常不是RetryError
或者最后的錯誤與openai
或httpx
無關,代碼將重新拋出一個異常,并包含完整的堆棧跟蹤信息。
- 在
-
要點2:
Message
解讀:class Message(BaseModel):"""list[<role>: <content>]"""id: str = Field(default="", validate_default=True) # According to Section 2.2.3.1.1 of RFC 135content: strinstruct_content: Optional[BaseModel] = Field(default=None, validate_default=True)role: str = "user" # system / user / assistantcause_by: str = Field(default="", validate_default=True)sent_from: str = Field(default="", validate_default=True)send_to: set[str] = Field(default={MESSAGE_ROUTE_TO_ALL}, validate_default=True)
Message
類具有多個字段,包括id
、content
、instruct_content
、role
、cause_by
、sent_from
和send_to
, 使用 Pydantic 的功能來提供類型注解和驗證。Field(default=None, validate_default=True)
: 默認值為None
,并且即使它是默認值,也會進行驗證。def __init__(self, content: str = "", **data: Any):data["content"] = data.get("content", content)super().__init__(**data)
一旦基類初始化完成,Pydantic 會根據
@field_validator
裝飾器指定的順序執行校驗函數。@field_validator("id", mode="before")@classmethoddef check_id(cls, id: str) -> str:return id if id else uuid.uuid4().hex@field_validator("instruct_content", mode="before")@classmethoddef check_instruct_content(cls, ic: Any) -> BaseModel: pass
在
mode="before"
的情況下,這些校驗函數會在字段值被最終賦值給實例屬性之前執行,這樣可以確保所有的校驗邏輯都在屬性被設置之前完成。user: Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy. cause_by: 'metagpt.actions.add_requirement.UserRequirement' content: 'Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.' id: '39defc4ed634430e9e87893a60557bc3' instruct_content: None role: 'user' send_to: {'<all>'} sent_from: ''
-
要點3:
RoleContext
解讀:class RoleContext(BaseModel):"""Role Runtime Context"""model_config = ConfigDict(arbitrary_types_allowed=True)# # env exclude=True to avoid `RecursionError: maximum recursion depth exceeded in comparison`env: "Environment" = Field(default=None, exclude=True) # # avoid circular import# TODO judge if ser&desermsg_buffer: MessageQueue = Field(default_factory=MessageQueue, exclude=True) # Message Buffer with Asynchronous Updatesmemory: Memory = Field(default_factory=Memory)# long_term_memory: LongTermMemory = Field(default_factory=LongTermMemory)working_memory: Memory = Field(default_factory=Memory)state: int = Field(default=-1) # -1 indicates initial or termination state where todo is Nonetodo: Action = Field(default=None, exclude=True)watch: set[str] = Field(default_factory=set)news: list[Type[Message]] = Field(default=[], exclude=True) # TODO not usedreact_mode: RoleReactMode = (RoleReactMode.REACT) # see `Role._set_react_mode` for definitions of the following two attributesmax_react_loop: int = 1
-
RoleContext
類是一個用于創建具有多個屬性的數據模型,包括環境、消息緩沖區、記憶、狀態、待辦動作、監聽、新消息、回應模式和最大反應循環次數。 -
exclude=True
參數是Field
裝飾器的一個選項,用于控制模型序列化時的行為。當一個字段被標記為exclude=True
時,它在默認情況下不會出現在模型的 JSON 序列化結果中。這可以用于隱藏敏感信息或不需要傳輸的字段。例如:rc = RoleContext() print(rc.json) # 設置 exclude 參數的字段不會被序列化
-
arbitrary_types_allowed=True
是model_config
中的一個配置選項,它允許模型中使用任意類型的對象作為屬性值。設置arbitrary_types_allowed=True
,那么 Pydantic 將不會對未知類型的對象進行校驗,而是直接允許它們作為模型屬性的值。 -
msg_buffer
: 這是一個MessageQueue
類型的字段,用于存儲消息緩沖區;class MessageQueue(BaseModel):"""Message queue which supports asynchronous updates."""model_config = ConfigDict(arbitrary_types_allowed=True)_queue: Queue = PrivateAttr(default_factory=Queue)
該類提供多種方法:
push
、pop
、pop_all
、dump
、load
、empty
方法; -
memory: Memory = Field(default_factory=Memory)
這是一個Memory
類型的字段,用于存儲角色的記憶。class Memory(BaseModel):"""The most basic memory: super-memory"""storage: list[SerializeAsAny[Message]] = []index: DefaultDict[str, list[SerializeAsAny[Message]]] = Field(default_factory=lambda: defaultdict(list))ignore_id: bool = False
-
storage: list[SerializeAsAny[Message]] = []
這是一個列表類型的字段,用于存儲Message
類型的實例。SerializeAsAny
可能是一個自定義的類型轉換器,它允許Message
類型的實例在序列化時被轉換為一個可序列化的形式。 -
index: DefaultDict[str, list[SerializeAsAny[Message]]] = Field(default_factory=lambda: defaultdict(list))
這是一個DefaultDict
類型的字段,用于存儲消息的索引。DefaultDict
是一個特殊的字典,當訪問一個不存在的鍵時,它會自動創建一個默認值。在這個例子中,默認值是一個空列表。這個索引字典的鍵是字符串類型,值是一個SerializeAsAny[Message]
類型的列表,用于存儲與特定鍵相關聯的消息。這個字段使用了Field
裝飾器,并且使用了一個 lambda 函數作為默認工廠,這個 lambda 函數返回一個空的DefaultDict
。 -
ignore_id: bool = False
這是一個布爾類型的字段,用于表示是否忽略消息的 ID。默認值為False
,意味著消息的 ID 會被考慮在內該類提供多種方法:
add
、add_batch
、get_by_role
、get_by_content
、delete_newest
、delete
、clear
、count
、try_remember
、get
、find_news
、get_by_action
、get_by_actions
方法;
-
-
default_factory
:
使用
default_factory
可以確保每次創建模型實例時,可選字段都會被賦予一個新創建的默認實例,而不是共享同一個實例。這對于可變數據類型(如列表、字典)尤其重要;-
news: list[Type[Message]] = Field(default=[], exclude=True)
這是一個Message
類型的列表字段,用于存儲角色的新消息;從消息緩沖區
msg_buffer
拿到所有未處理的消息Message
, 后放置在特定Role
的memory
里, 根據規則過濾感興趣的消息,放在news
里;(這些消息要么是由self.rc.watch
中的某些內容導致的,要么是發送給self.name
的。此外,這些消息不能在old_messages
中找到,以避免重復處理)
-
-
要點4:
Role
解讀:class Role(SerializationMixin, ContextMixin, BaseModel):"""Role/Agent"""model_config = ConfigDict(arbitrary_types_allowed=True, extra="allow")name: str = ""profile: str = ""goal: str = ""constraints: str = ""desc: str = ""is_human: bool = Falserole_id: str = ""states: list[str] = []# scenarios to set action system_prompt:# 1. `__init__` while using Role(actions=[...])# 2. add action to role while using `role.set_action(action)`# 3. set_todo while using `role.set_todo(action)`# 4. when role.system_prompt is being updated (e.g. by `role.system_prompt = "..."`)# Additional, if llm is not set, we will use role's llmactions: list[SerializeAsAny[Action]] = Field(default=[], validate_default=True)rc: RoleContext = Field(default_factory=RoleContext)addresses: set[str] = set()planner: Planner = Field(default_factory=Planner)# builtin variablesrecovered: bool = False # to tag if a recovered rolelatest_observed_msg: Optional[Message] = None # record the latest observed message when interrupted__hash__ = object.__hash__ # support Role as hashable type in `Environment.members`
實例化對字段進行后校驗:
@model_validator(mode="after")def validate_role_extra(self):self._process_role_extra()return selfdef _process_role_extra(self):kwargs = self.model_extra or {}if self.is_human:self.llm = HumanProvider(None)# Check actions and set llm and prefix for each action.self._check_actions() # 'You are a DataInterpreter, named David, your goal is . 'self.llm.system_prompt = self._get_prefix()self.llm.cost_manager = self.context.cost_managerself._watch(kwargs.pop("watch", [UserRequirement]))if self.latest_observed_msg:self.recovered = True
-
要點5:
react
方法解讀:該方法根據角色當前的回應模式選擇執行不同的策略。如果反應模式是
RoleReactMode.REACT
或RoleReactMode.BY_ORDER
,則執行self._react()
方法;如果是RoleReactMode.PLAN_AND_ACT
,則執行self._plan_and_act()
方法。self._plan_and_act()
:直觀簡潔,很容易理解
async def _plan_and_act(self) -> Message:"""first plan, then execute an action sequence, i.e. _think (of a plan) -> _act -> _act -> ... Use llm to come up with the plan dynamically."""# create initial plan and update it until confirmationgoal = self.rc.memory.get()[-1].content # retreive latest user requirementawait self.planner.update_plan(goal=goal)# take on tasks until all finishedwhile self.planner.current_task:task = self.planner.current_tasklogger.info(f"ready to take on task {task}")# take on current tasktask_result = await self._act_on_task(task)# process the result, such as reviewing, confirming, plan updatingawait self.planner.process_task_result(task_result)rsp = self.planner.get_useful_memories()[0] # return the completed plan as a responseself.rc.memory.add(rsp) # add to persistent memoryreturn rsp
-
要點6:
llm
大語言模型怎么調用:-
_aask
方法:async def _aask(self, prompt: str, system_msgs: Optional[list[str]] = None) -> str:"""Append default prefix"""return await self.llm.aask(prompt, system_msgs)
實例化
self.llm
:@propertydef llm(self) -> BaseLLM:"""Role llm: if not existed, init from role.config"""if not self.private_llm:self.private_llm = self.context.llm_with_cost_manager_from_llm_config(self.config.llm)return self.private_llm
llm
如何回應:async def _achat_completion_stream(self, messages: list[dict], timeout=USE_CONFIG_TIMEOUT) -> str:response: AsyncStream[ChatCompletionChunk] = await self.aclient.chat.completions.create(**self._cons_kwargs(messages, timeout=self.get_timeout(timeout)), stream=True)
self.aclient
實例化通過self.aclient = AsyncOpenAI(**kwargs)
,kwargs
是{'api_key': 'sk-', 'base_url': 'http://10.9.xx.xx:8000/v1'}
輸入信息
self._cons_kwargs(messages, timeout=self.get_timeout(timeout))
是'messages':[{'role': 'system', 'content': 'As a data scientist,... function.'}, {'role': 'user', 'content': '\n# User Requirement\n... code\n```\n'}]'max_tokens':4096 'temperature':0.0 'model':'glm4' 'timeout':600
-