本系列文章跟隨《MetaGPT多智能體課程》(https://github.com/datawhalechina/hugging-multi-agent),深入理解并實踐多智能體系統的開發。
本文為該課程的第四章(多智能體開發)的第一篇筆記。主要記錄下多智能體的運行機制及跟著教程,實現一個簡單的多智能體系統。
系列筆記
- 【AI Agent系列】【MetaGPT多智能體學習】0. 環境準備 - 升級MetaGPT 0.7.2版本及遇到的坑
- 【AI Agent系列】【MetaGPT多智能體學習】1. 再理解 AI Agent - 經典案例和熱門框架綜述
- 【AI Agent系列】【MetaGPT多智能體學習】2. 重溫單智能體開發 - 深入源碼,理解單智能體運行框架
文章目錄
- 系列筆記
- 0. 多智能體間互通的方式 - Enviroment組件
- 0.1 多智能體間協作方式簡介
- 0.2 Environment組件 - 深入源碼
- 0.2.1 參數介紹
- 0.2.2 add_roles函數 - 承載角色
- 0.2.3 publish_message函數 - 接收角色發布的消息 / 環境中的消息被角色得到
- 0.2.4 run函數 - 運行入口
- 1. 開發一個簡單的多智能體系統
- 1.1 多智能體需求描述
- 1.2 代碼實現
- 1.2.1 學生智能體
- 1.2.2 老師智能體
- 1.2.3 創建多智能體交流的環境
- 1.2.4 運行
- 1.2.5 完整代碼
- 2. 總結
0. 多智能體間互通的方式 - Enviroment組件
0.1 多智能體間協作方式簡介
在上次課中,我也曾寫過 多智能體運行機制 的筆記(這篇文章:【AI Agent系列】【MetaGPT】【深入源碼】智能體的運行周期以及多智能體間如何協作)。其中我自己通過看源碼,總結出了MetaGPT多智能體間協作的方式:
(1)每一個Role都在不斷觀察環境中的信息(_observe函數)
(2)當觀察到自己想要的信息后,就會觸發后續相應的動作
(3)如果沒有觀察到想要的信息,則會一直循環觀察
(4)執行完動作后,會將產生的msg放到環境中(publish_message),供其它Role智能體來使用。
現在再結合《MetaGPT多智能體》課程的教案,發現還是理解地有些淺了,我只關注了智能體在關注自己想要的信息,觀察到了之后就開始行動,而忽略了其背后的基礎組件 - Enviroment。
0.2 Environment組件 - 深入源碼
MetaGPT中對Environment的定義:環境,承載一批角色,角色可以向環境發布消息,可以被其他角色觀察到。短短一句話,總結三個功能:
- 承載角色
- 接收角色發布的消息
- 環境中的消息被角色得到
下面我們分開來細說。
0.2.1 參數介紹
首先來看下 Environment 的參數:
class Environment(ExtEnv):"""環境,承載一批角色,角色可以向環境發布消息,可以被其他角色觀察到Environment, hosting a batch of roles, roles can publish messages to the environment, and can be observed by other roles"""model_config = ConfigDict(arbitrary_types_allowed=True)desc: str = Field(default="") # 環境描述roles: dict[str, SerializeAsAny["Role"]] = Field(default_factory=dict, validate_default=True)member_addrs: Dict["Role", Set] = Field(default_factory=dict, exclude=True)history: str = "" # For debugcontext: Context = Field(default_factory=Context, exclude=True)
- desc:環境的描述
- roles:字典類型,指定當前環境中的角色
- member_addrs:字典類型,表示當前環境中的角色以及他們對應的狀態
- history:記錄環境中發生的消息記錄
- context:當前環境的一些上下文信息
0.2.2 add_roles函數 - 承載角色
def add_roles(self, roles: Iterable["Role"]):"""增加一批在當前環境的角色Add a batch of characters in the current environment"""for role in roles:self.roles[role.profile] = rolefor role in roles: # setup system message with rolesrole.set_env(self)role.context = self.context
從源碼中看,這個函數的功能有兩個:
(1)給 Environment 的 self.roles
參數賦值,上面我們已經知道它是一個字典類型,現在看來,它的 key 為 role 的 profile,值為 role 本身。
疑問: role 的 profile 默認是空(
profile: str = ""
),可以沒有值,那如果使用者懶得寫profile,這里會不會最終只有一個 Role,導致無法實現多智能體?歡迎討論交流。
(2)給添加進來的 role 設置環境信息
Role的 set_env
函數源碼如下,它又給env設置了member_addrs
。有點繞?
def set_env(self, env: "Environment"):"""Set the environment in which the role works. The role can talk to the environment and can also receivemessages by observing."""self.rc.env = envif env:env.set_addresses(self, self.addresses)self.llm.system_prompt = self._get_prefix()self.set_actions(self.actions) # reset actions to update llm and prefix
通過 add_roles 函數,將 role 和 Environment 關聯起來。
0.2.3 publish_message函數 - 接收角色發布的消息 / 環境中的消息被角色得到
Environment 中的 publish_message
函數是 Role 在執行完動作之后,將自身產生的消息發布到環境中調用的接口。Role的調用方式如下:
def publish_message(self, msg):"""If the role belongs to env, then the role's messages will be broadcast to env"""if not msg:returnif not self.rc.env:# If env does not exist, do not publish the messagereturnself.rc.env.publish_message(msg)
通過上面的代碼來看,一個Role只能存在于一個環境中?
然后看下 Environment 中源碼:
def publish_message(self, message: Message, peekable: bool = True) -> bool:"""Distribute the message to the recipients.In accordance with the Message routing structure design in Chapter 2.2.1 of RFC 116, as already plannedin RFC 113 for the entire system, the routing information in the Message is only responsible forspecifying the message recipient, without concern for where the message recipient is located. How toroute the message to the message recipient is a problem addressed by the transport framework designedin RFC 113."""logger.debug(f"publish_message: {message.dump()}")found = False# According to the routing feature plan in Chapter 2.2.3.2 of RFC 113for role, addrs in self.member_addrs.items():if is_send_to(message, addrs):role.put_message(message)found = Trueif not found:logger.warning(f"Message no recipients: {message.dump()}")self.history += f"\n{message}" # For debugreturn True
其中重點是這三行代碼,檢查環境中的所有Role是否訂閱了該消息(或該消息是否應該發送給環境中的某個Role),如果訂閱了,則調用Role的put_message
函數,Role的 put_message
函數的作用是將message放到本身的msg_buffer中:
for role, addrs in self.member_addrs.items():if is_send_to(message, addrs):role.put_message(message)
這樣就實現了將環境中的某個Role的Message放到環境中,并通知給環境中其它的Role的機制。
0.2.4 run函數 - 運行入口
run函數的源碼如下:
async def run(self, k=1):"""處理一次所有信息的運行Process all Role runs at once"""for _ in range(k):futures = []for role in self.roles.values():future = role.run()futures.append(future)await asyncio.gather(*futures)logger.debug(f"is idle: {self.is_idle}")
k = 1, 表示處理一次消息?為什么要有這個值,難道還能處理多次?意義是什么?
該函數其實就是遍歷一遍當前環境中的所有Role,然后運行Role的run函數(運行單智能體)。
Role的run里面,就是用該環境觀察信息,行動,發布信息到環境。這樣多個Role的運行就通過 Environment 串起來了,這些之前已經寫過了,不再贅述,見:【AI Agent系列】【MetaGPT】【深入源碼】智能體的運行周期以及多智能體間如何協作。
1. 開發一個簡單的多智能體系統
復現教程中的demo。
1.1 多智能體需求描述
- 兩個智能體:學生 和 老師
- 任務及預期流程:學生寫詩 —> 老師給改進意見 —> 學生根據意見改進 —> 老師給改進意見 —> … n輪 … —> 結束
1.2 代碼實現
1.2.1 學生智能體
學生智能體的主要內容就是根據指定的內容寫詩。
因此,首先定義一個寫詩的Action:WritePoem
。
然后,定義學生智能體,指定它的動作就是寫詩(self.set_actions([WritePoem])
)
那么它什么時候開始動作呢?通過 self._watch([UserRequirement, ReviewPoem])
設置其觀察的信息。當它觀察到環境中有了 UserRequirement
或者 ReviewPoem
產生的信息之后,開始動作。UserRequirement
為用戶的輸入信息類型。
class WritePoem(Action):name: str = "WritePoem"PROMPT_TEMPLATE: str = """Here is the historical conversation record : {msg} .Write a poem about the subject provided by human, Return only the content of the generated poem with NO other texts.If the teacher provides suggestions about the poem, revise the student's poem based on the suggestions and return.your poem:"""async def run(self, msg: str):prompt = self.PROMPT_TEMPLATE.format(msg = msg)rsp = await self._aask(prompt)return rspclass Student(Role):name: str = "xiaoming"profile: str = "Student"def __init__(self, **kwargs):super().__init__(**kwargs)self.set_actions([WritePoem])self._watch([UserRequirement, ReviewPoem])async def _act(self) -> Message:logger.info(f"{self._setting}: ready to {self.rc.todo}")todo = self.rc.todomsg = self.get_memories() # 獲取所有記憶# logger.info(msg)poem_text = await WritePoem().run(msg)logger.info(f'student : {poem_text}')msg = Message(content=poem_text, role=self.profile,cause_by=type(todo))return msg
1.2.2 老師智能體
老師的智能體的任務是根據學生寫的詩,給出修改意見。
因此創建一個Action為ReviewPoem
。
然后,創建老師的智能體,指定它的動作為Review(self.set_actions([ReviewPoem])
)
它的觸發實際應該是學生寫完詩以后,因此,加入self._watch([WritePoem])
class ReviewPoem(Action):name: str = "ReviewPoem"PROMPT_TEMPLATE: str = """Here is the historical conversation record : {msg} .Check student-created poems about the subject provided by human and give your suggestions for revisions. You prefer poems with elegant sentences and retro style.Return only your comments with NO other texts.your comments:"""async def run(self, msg: str):prompt = self.PROMPT_TEMPLATE.format(msg = msg)rsp = await self._aask(prompt)return rspclass Teacher(Role):name: str = "laowang"profile: str = "Teacher"def __init__(self, **kwargs):super().__init__(**kwargs)self.set_actions([ReviewPoem])self._watch([WritePoem])async def _act(self) -> Message:logger.info(f"{self._setting}: ready to {self.rc.todo}")todo = self.rc.todomsg = self.get_memories() # 獲取所有記憶poem_text = await ReviewPoem().run(msg)logger.info(f'teacher : {poem_text}')msg = Message(content=poem_text, role=self.profile,cause_by=type(todo))return msg
1.2.3 創建多智能體交流的環境
前面我們已經知道了多智能體之間的交互需要一個非常重要的組件 - Environment。
因此,創建一個供多智能體交流的環境,通過 add_roles
加入智能體。
然后,當用戶輸入內容時,將該內容添加到環境中publish_message
,從而觸發相應智能體開始運行。這里cause_by=UserRequirement
代表消息是由用戶產生的,學生智能體關心這類消息,觀察到之后就開始行動了。
classroom = Environment()classroom.add_roles([Student(), Teacher()])classroom.publish_message(Message(role="Human", content=topic, cause_by=UserRequirement,send_to='' or MESSAGE_ROUTE_TO_ALL),peekable=False,
)
1.2.4 運行
加入相應頭文件,指定交互次數,就可以開始跑了。注意交互次數,直接決定了你的程序的效果和你的錢包!
import asynciofrom metagpt.actions import Action, UserRequirement
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.environment import Environmentfrom metagpt.const import MESSAGE_ROUTE_TO_ALLasync def main(topic: str, n_round=3):while n_round > 0:# self._save()n_round -= 1logger.debug(f"max {n_round=} left.")await classroom.run()return classroom.historyasyncio.run(main(topic='wirte a poem about moon'))
1.2.5 完整代碼
import asynciofrom metagpt.actions import Action, UserRequirement
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.environment import Environmentfrom metagpt.const import MESSAGE_ROUTE_TO_ALLclassroom = Environment()class WritePoem(Action):name: str = "WritePoem"PROMPT_TEMPLATE: str = """Here is the historical conversation record : {msg} .Write a poem about the subject provided by human, Return only the content of the generated poem with NO other texts.If the teacher provides suggestions about the poem, revise the student's poem based on the suggestions and return.your poem:"""async def run(self, msg: str):prompt = self.PROMPT_TEMPLATE.format(msg = msg)rsp = await self._aask(prompt)return rspclass ReviewPoem(Action):name: str = "ReviewPoem"PROMPT_TEMPLATE: str = """Here is the historical conversation record : {msg} .Check student-created poems about the subject provided by human and give your suggestions for revisions. You prefer poems with elegant sentences and retro style.Return only your comments with NO other texts.your comments:"""async def run(self, msg: str):prompt = self.PROMPT_TEMPLATE.format(msg = msg)rsp = await self._aask(prompt)return rspclass Student(Role):name: str = "xiaoming"profile: str = "Student"def __init__(self, **kwargs):super().__init__(**kwargs)self.set_actions([WritePoem])self._watch([UserRequirement, ReviewPoem])async def _act(self) -> Message:logger.info(f"{self._setting}: ready to {self.rc.todo}")todo = self.rc.todomsg = self.get_memories() # 獲取所有記憶# logger.info(msg)poem_text = await WritePoem().run(msg)logger.info(f'student : {poem_text}')msg = Message(content=poem_text, role=self.profile,cause_by=type(todo))return msgclass Teacher(Role):name: str = "laowang"profile: str = "Teacher"def __init__(self, **kwargs):super().__init__(**kwargs)self.set_actions([ReviewPoem])self._watch([WritePoem])async def _act(self) -> Message:logger.info(f"{self._setting}: ready to {self.rc.todo}")todo = self.rc.todomsg = self.get_memories() # 獲取所有記憶poem_text = await ReviewPoem().run(msg)logger.info(f'teacher : {poem_text}')msg = Message(content=poem_text, role=self.profile,cause_by=type(todo))return msgasync def main(topic: str, n_round=3):classroom.add_roles([Student(), Teacher()])classroom.publish_message(Message(role="Human", content=topic, cause_by=UserRequirement,send_to='' or MESSAGE_ROUTE_TO_ALL),peekable=False,)while n_round > 0:# self._save()n_round -= 1logger.debug(f"max {n_round=} left.")await classroom.run()return classroom.historyasyncio.run(main(topic='wirte a poem about moon'))
- 運行結果
2. 總結
總結一下整個流程吧,畫了個圖。
從最外圍環境 classroom開始,用戶輸入的信息通過 publish_message
發送到所有Role中,通過Role的 put_message
放到自身的 msg_buffer
中。
當Role運行run
時,_observe
從msg_buffer
中提取信息,然后只過濾自己關心的消息,例如Teacher只過濾出來自WritePoem
的,其它消息雖然在msg_buffer
中,但不處理。同時,msg_buffer
中的消息會存入memory
中。
run完即執行完相應動作后,通過publish_message
將結果消息發布到環境中,環境的publish_message
又將這個消息發送個全部的Role,這時候所有的Role的msg_buffer
中都有了這個消息。完成了智能體間信息的一個交互閉環。
站內文章一覽