每一篇文章都短小精悍,不啰嗦。
今天我們來深入剖析Role
類的代碼實現。在多智能體協作系統中,Role
(角色)就像現實世界中的 "員工",是執行具體任務、參與協作的基本單位。這段代碼是 MetaGPT 框架的核心,它定義了一個角色從 "接收信息" 到 "做出決策" 再到 "執行任務" 的完整生命周期。
一、類的整體結構與核心定位
1. 繼承關系:能力的組合
class Role(BaseRole, SerializationMixin, ContextMixin, BaseModel):
- BaseRole:抽象基類,定義了角色的核心接口(
think
/act
/react
等),強制實現基礎能力; - SerializationMixin:提供序列化能力,支持角色狀態的保存與恢復(斷點續跑);
- ContextMixin:整合全局上下文(配置、成本管理器等),方便訪問全局資源;
- BaseModel(Pydantic):提供數據校驗、屬性管理(如
model_config
),簡化參數處理。
設計意圖:通過多繼承將不同維度的能力(接口規范、序列化、上下文訪問、數據管理)分離,符合 "單一職責原則"。
2. 核心屬性:角色的 "身份與配置"
屬性 | 作用 | 類比現實 |
---|---|---|
name /profile | 角色名稱(如 "張三")與身份(如 "產品經理") | 員工的姓名和職位 |
goal /constraints | 工作目標(如 "設計需求文檔")與約束(如 "用中文輸出") | 崗位目標和工作規范 |
actions | 角色可執行的動作(如WriteCode /AnalyzeRequirement ) | 員工的技能列表 |
rc (RoleContext) | 運行時上下文(消息緩沖區、記憶、當前狀態等) | 員工的 "工作記憶"(待辦、郵件、歷史記錄) |
react_mode | 決策模式(react /by_order /plan_and_act ) | 工作方式(靈活應變 / 按流程 / 先規劃后執行) |
二、關鍵函數實現剖析
模塊 1:初始化與基礎配置
1.?__init__
與_process_role_extra
:角色的 "入職配置"
@model_validator(mode="after")
def validate_role_extra(self):self._process_role_extra()return selfdef _process_role_extra(self):kwargs = self.model_extra or {}if self.is_human:self.llm = HumanProvider(None) # 人類角色用人類交互接口self._check_actions() # 初始化動作列表self.llm.system_prompt = self._get_prefix() # 設置大模型提示詞前綴self.llm.cost_manager = self.context.cost_manager # 綁定成本管理器if not self.observe_all_msg_from_buffer:self._watch(kwargs.pop("watch", [UserRequirement])) # 默認關注用戶需求if self.latest_observed_msg:self.recovered = True # 標記為恢復的角色
- 作用:完成角色初始化的收尾工作,包括 LLM 配置、動作校驗、消息關注設置等。
- 關鍵步驟:
- 若為人類角色,綁定
HumanProvider
(接收人類輸入); - 通過
_check_actions
初始化動作列表(確保每個動作正確綁定上下文和 LLM); - 設置 LLM 的系統提示詞(
_get_prefix
生成),定義角色的 "人設"; - 默認關注
UserRequirement
(用戶需求),確保角色能接收核心指令。
- 若為人類角色,綁定
2.?_get_prefix
:角色的 "人設說明書"
def _get_prefix(self):if self.desc:return self.desc# 基礎人設:身份、名稱、目標prefix = PREFIX_TEMPLATE.format(profile=self.profile, name=self.name, goal=self.goal)# 約束條件(如"只能用中文")if self.constraints:prefix += CONSTRAINT_TEMPLATE.format(constraints=self.constraints)# 環境信息(所在團隊的其他角色)if self.rc.env and self.rc.env.desc:all_roles = self.rc.env.role_names()other_role_names = ", ".join([r for r in all_roles if r != self.name])prefix += f"You are in {self.rc.env.desc} with roles({other_role_names})."return prefix
- 作用:生成 LLM 的系統提示詞,定義角色的核心身份、目標和約束,是角色 "行為準則" 的基礎。
- 設計亮點:動態整合環境信息(如團隊中的其他角色),讓角色知道 "自己在和誰協作",提升協作合理性。
模塊 2:感知環境 ——_observe
:角色的 "讀郵件"
async def _observe(self) -> int:# 1. 獲取新消息(從緩沖區或恢復狀態)news = []if self.recovered and self.latest_observed_msg:# 從恢復狀態獲取最近消息news = self.rc.memory.find_news(observed=[self.latest_observed_msg], k=10)if not news:# 從消息緩沖區取所有未處理消息news = self.rc.msg_buffer.pop_all()# 2. 過濾消息(只保留感興趣的)old_messages = [] if not self.enable_memory else self.rc.memory.get() # 已處理的舊消息self.rc.news = [n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) # 關注的動作或發給自己的消息and n not in old_messages # 排除已處理的]# 3. 存入記憶(避免重復處理)if self.observe_all_msg_from_buffer:self.rc.memory.add_batch(news) # 全量存入(無狀態角色可能需要)else:self.rc.memory.add_batch(self.rc.news) # 只存感興趣的# 4. 記錄最新消息(用于斷點恢復)self.latest_observed_msg = self.rc.news[-1] if self.rc.news else Nonelogger.debug(f"{self._setting} observed: {[f'{i.role}: {i.content[:20]}...' for i in self.rc.news]}")return len(self.rc.news)
- 作用:從消息緩沖區獲取并處理新消息,是角色 "感知世界" 的入口。
- 核心邏輯:"取消息→過濾→存記憶",確保角色只關注與自己相關的信息(避免信息過載)。
- 設計亮點:
- 支持斷點恢復(從
latest_observed_msg
繼續處理); - 靈活的消息過濾機制(通過
rc.watch
和send_to
判斷相關性); - 可配置是否全量存入記憶(
observe_all_msg_from_buffer
),適應不同角色需求(如管理者可能需要了解全局)。
- 支持斷點恢復(從
模塊 3:決策系統 ——_think
:角色的 "思考下一步"
_think
是角色的 "決策核心",根據react_mode
(反應模式)決定下一個動作,支持三種策略:
1. 單動作場景(只有一個Action
)
if len(self.actions) == 1:self._set_state(0) # 直接執行唯一動作return True
- 場景:如 "數據采集員" 只有
CollectData
一個動作,無需復雜決策。
2. 按順序執行(by_order
模式)
if self.rc.react_mode == RoleReactMode.BY_ORDER:self._set_state(self.rc.state + 1) # 每次切換到下一個動作return self.rc.state >= 0 and self.rc.state < len(self.actions)
- 場景:流程固定的任務(如 "需求→設計→開發" 的線性流程),按預設順序執行動作。
- 示例:
ProductManager
先執行WritePRD
,再執行ReviewPRD
。
3. 動態決策(react
模式):LLM 驅動
# 構建提示詞:包含歷史記錄、當前狀態、可選動作
prompt = self._get_prefix() + STATE_TEMPLATE.format(history=self.rc.history,states="\n".join(self.states), # 可選動作列表(如"0. 寫需求 1. 評審需求")n_states=len(self.states) - 1,previous_state=self.rc.state
)# 調用LLM選擇下一個狀態(動作索引)
next_state = await self.llm.aask(prompt)
next_state = extract_state_value_from_output(next_state) # 提取純數字結果# 校驗并更新狀態
if next_state not in range(-1, len(self.states)):next_state = -1 # 無效狀態則終止
self._set_state(next_state) # 更新狀態并綁定對應動作
- 場景:復雜、動態的任務(如應對用戶頻繁變更的需求),需要 LLM 根據歷史對話動態判斷。
- 設計亮點:
- 用
STATE_TEMPLATE
嚴格約束 LLM 輸出格式(只返回數字),避免解析錯誤; - 失敗處理(無效輸出時設為 - 1 終止),保證系統健壯性。
- 用
4. 規劃后執行(plan_and_act
模式)
由_plan_and_act
實現,先通過Planner
生成任務計劃,再按計劃執行動作(后續詳解)。
模塊 4:執行系統 ——_act
:角色的 "動手做事"
async def _act(self) -> Message:logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")# 1. 執行當前待辦動作(如WriteCode.run())response = await self.rc.todo.run(self.rc.history) # 傳入歷史信息供參考# 2. 將結果封裝為消息(便于協作)if isinstance(response, (ActionOutput, ActionNode)):# 動作輸出→AIMessage(帶結構化內容)msg = AIMessage(content=response.content,instruct_content=response.instruct_content, # 結構化指令內容(如JSON)cause_by=self.rc.todo, # 標記由哪個動作產生sent_from=self # 標記發送者)elif isinstance(response, Message):msg = response # 已是消息格式,直接使用else:# 其他類型→簡單文本消息msg = AIMessage(content=response or "", cause_by=self.rc.todo, sent_from=self)# 3. 存入記憶(記錄自己的執行結果)self.rc.memory.add(msg)return msg
- 作用:執行
_think
決策的動作,并將結果封裝為消息(供其他角色接收)。 - 核心流程:"執行動作→封裝結果→記錄記憶",是角色產生價值的核心步驟。
- 設計亮點:
- 兼容多種輸出類型(
ActionOutput
/Message
/ 文本),靈活適配不同Action
的實現; - 消息中記錄
cause_by
和sent_from
,便于追蹤 "動作來源" 和 "發送者",支持協作追溯。
- 兼容多種輸出類型(
模塊 5:主流程 ——run
與react
:角色的 "工作循環"
1.?run
:完整工作流程
async def run(self, with_message=None) -> Message | None:# 1. 處理輸入消息(如有外部消息,先放入緩沖區)if with_message:self.put_message(with_message)# 2. 感知新消息(無消息則等待)if not await self._observe():logger.debug(f"{self.name}:無新消息,等待中")return# 3. 反應(根據模式執行思考-行動循環)rsp = await self.react()# 4. 發送結果(廣播給其他角色)self.publish_message(rsp)return rsp
- 作用:串聯 "感知→決策→行動→反饋" 的完整閉環,是角色對外提供服務的入口。
- 類比:員工的 "工作日流程"—— 看郵件→想方案→做事情→發結果。
2.?react
:反應策略分發
async def react(self) -> Message:if self.rc.react_mode in [RoleReactMode.REACT, RoleReactMode.BY_ORDER]:rsp = await self._react() # 執行思考-行動循環elif self.rc.react_mode == RoleReactMode.PLAN_AND_ACT:rsp = await self._plan_and_act() # 先規劃再執行self._set_state(-1) # 重置狀態return rsp
- 作用:根據
react_mode
調用對應的反應策略,是 "策略模式" 的典型應用。
3.?_react
:react
/by_order
模式的執行循環
async def _react(self) -> Message:actions_taken = 0rsp = AIMessage(content="No actions taken yet", cause_by=Action)while actions_taken < self.rc.max_react_loop: # 限制最大循環次數(防無限執行)# 思考下一步has_todo = await self._think()if not has_todo:break# 執行動作rsp = await self._act()actions_taken += 1return rsp # 返回最后一個動作的結果
- 作用:實現 "思考→行動" 的循環,支持多輪決策(如先分析需求,再修改方案)。
- 安全機制:
max_react_loop
限制最大輪次,避免因 LLM 決策失誤導致無限循環。
模塊 6:消息協作 ——publish_message
與put_message
角色通過消息與其他角色協作,這兩個方法是 "溝通工具":
1.?put_message
:接收私信
def put_message(self, message):if not message:returnself.rc.msg_buffer.push(message) # 放入私有消息緩沖區
- 作用:接收發給自己的消息(如 "@產品經理 請補充需求"),存入私有緩沖區,供
_observe
處理。
2.?publish_message
:廣播消息
def publish_message(self, msg):if not msg:return# 處理"發給自己"的標記if MESSAGE_ROUTE_TO_SELF in msg.send_to:msg.send_to.add(any_to_str(self))msg.send_to.remove(MESSAGE_ROUTE_TO_SELF)# 發給自己的消息直接放入緩沖區if all(to in {any_to_str(self), self.name} for to in msg.send_to):self.put_message(msg)return# 否則通過環境廣播(所有訂閱者可見)if self.rc.env:self.rc.env.publish_message(msg)
- 作用:將消息廣播到環境中,供其他角色接收(如 "產品經理發布需求文檔,供架構師參考")。
- 設計亮點:
- 支持 "發給自己" 的特殊標記(
MESSAGE_ROUTE_TO_SELF
),方便角色自我記錄; - 消息路由由環境(
env
)處理,符合 "單一職責原則"(角色不關心誰接收,只負責發送)。
- 支持 "發給自己" 的特殊標記(
模塊 7:其他關鍵功能
1. 動作管理:set_actions
/set_action
def set_actions(self, actions: list[Union[Action, Type[Action]]]):self._reset() # 清空現有動作for action in actions:# 實例化動作(如果傳入的是類)if not isinstance(action, Action):i = action(context=self.context)else:i = action# 初始化動作(綁定上下文、LLM、前綴)self._init_action(i)self.actions.append(i)self.states.append(f"{len(self.actions)-1}. {action}") # 記錄動作狀態描述
- 作用:為角色添加可執行的動作(如給
Engineer
添加WriteCode
和TestCode
)。 - 設計亮點:支持傳入動作類或實例,自動初始化并綁定上下文,簡化角色配置。
2. 記憶管理:get_memories
def get_memories(self, k=0) -> list[Message]:return self.rc.memory.get(k=k) # 返回最近k條記憶(k=0返回全部)
- 作用:獲取歷史消息(記憶),供決策和動作執行參考(如
_think
需要歷史對話,_act
需要上下文信息)。
3. 規劃執行:_plan_and_act
(plan_and_act
模式)
async def _plan_and_act(self) -> Message:# 1. 生成計劃(基于目標和歷史)if not self.planner.plan.goal:goal = self.rc.memory.get()[-1].content # 取最新需求作為目標await self.planner.update_plan(goal=goal) # LLM生成計劃# 2. 按計劃執行任務while self.planner.current_task:task = self.planner.current_tasktask_result = await self._act_on_task(task) # 執行任務await self.planner.process_task_result(task_result) # 處理結果(更新計劃)# 3. 返回最終結果rsp = self.planner.get_useful_memories()[0]self.rc.memory.add(rsp)return rsp
- 作用:適用于復雜任務(如 "開發一個電商網站"),先通過
Planner
拆分任務,再逐個執行。 - 設計亮點:計劃與執行分離,支持動態調整計劃(如某任務失敗后重新規劃)。
三、設計亮點總結
- 策略模式:通過
react_mode
支持三種決策策略,適配不同場景; - 模塊化設計:
_observe
/_think
/_act
分離,便于單獨擴展(如優化決策邏輯只需改_think
); - 靈活性:動作可動態添加、記憶可配置、消息過濾可定制;
- 健壯性:無效輸入處理(如 LLM 輸出錯誤)、斷點恢復(
recovered
狀態); - 現實映射:設計貼近人類協作模式(記憶、消息、決策流程),降低理解成本。
四、學習要點
- 角色是多智能體系統的 "細胞":所有協作都通過角色的
run
方法串聯,理解角色的生命周期是掌握多智能體系統的關鍵; - 設計模式的應用:策略模式(
react_mode
)、模板方法(run
定義流程)、觀察者模式(消息訂閱)等,是代碼靈活性的核心; - 工程化細節:異常處理、狀態持久化、配置驅動等,保證系統在復雜場景下的可靠性。
希望通過今天的剖析,大家能理解Role
類如何將 "智能體" 的抽象概念轉化為可執行的代碼,以及每個函數在其中的作用。后續可以嘗試擴展角色(如實現一個Designer
角色),加深理解。