文章目錄
- 一、workforce 簡介
- 1.架構設計
- 2.通信機制
- 二、workforce 工作流程圖示例
- 1.用戶角色
- 2.工作流程
- 三、workforce 中重要函數說明
- 1.`__init__`函數
- 2.`add_single_agent_worker` 函數
- 3.`add_role_playing_worker` 函數
- 4.`add_workforce` 函數
- 四、基于workforce實現多智能體協調
- (1)創建一個 Workforce 實例
- (2)worker定義
- (3)添加到Workforce 的工作節點
- (4)創建一個任務
- (5)啟動 Workforce 的任務處理流程
- 完整示例代碼
- 五、遇到問題
一、workforce 簡介
Workforce是CAMEL框架中的一個多智能體協同工作系統。它以一種簡潔的方式讓多個智能體協作完成任務,類似于一個高效的團隊合作系統。
1.架構設計
Workforce采用層級架構設計。一個workforce可以包含多個工作節點(worker nodes)
,每個工作節點可以包含一個或多個智能體作為工作者。工作節點由workforce內部的**協調智能體(coordinator agent)**管理,協調智能體根據工作節點的描述及其工具集來分配任務。
2.通信機制
Workforce內部的通信基于任務通道(task channel)。Workforce初始化時會創建一個所有節點共享的通道。任務會被發布到這個通道中,每個工作節點會監聽通道,接受分配給它的任務并解決。當任務完成后,工作節點會將結果發布回通道,結果會作為其他任務的"依賴項"保留在通道中,供所有工作節點共享。
通過這種機制,workforce能夠以協作和高效的方式解決任務。
二、workforce 工作流程圖示例
如圖,展示了如何通過以下智能體協作完成一個請求,即“創建一個產品的登錄頁面”
1.用戶角色
-
Root Node (Manager):作為系統的管理者,負責接收任務并協調任務的分解和分配。
-
Coordinator Agent (協調智能體) 和 Task Manager Agent (任務管理智能體):管理任務分解、依賴關系、分發任務,以及監控任務完成情況。
-
Leaf Nodes (Workers):執行任務的實際智能體,分別承擔不同的角色(如“內容撰寫者”和“代碼撰寫者”)。
2.工作流程
(a) 用戶需求的接收
用戶發出任務請求(例如“創建一個登錄頁面”,圖中 1)。
Coordinator Agent 接收需求,作為入口點。
(b) 任務分解與定義
Coordinator Agent 通過任務分解策略,將請求拆分為多個子任務(如任務 A.1 和 A.2),并定義:
這些任務被送到 Task Manager Agent 進行分發(圖中 2 和 3)。
? 任務的分配
Task Manager Agent 將任務分發到 Channel(圖中 4),這是一個任務管理中樞。
任務按角色需求分配到 Leaf Nodes (Workers),包括:
(d) Leaf Nodes 執行任務
內容撰寫者 (Content Writer):
代碼撰寫者 (Code Writer):
(e) 結果整合與返回
Coordinator Agent 匯總所有任務結果(如 A.1 和 A.2 的結果)。
將完整的任務結果返回給用戶(圖中 18)。
三、workforce 中重要函數說明
1.__init__
函數
def __init__(self,description: str,children: Optional[List[BaseNode]] = None,coordinator_agent_kwargs: Optional[Dict] = None,task_agent_kwargs: Optional[Dict] = None,new_worker_agent_kwargs: Optional[Dict] = None,) -> None:super().__init__(description)self._child_listening_tasks: Deque[asyncio.Task] = deque()self._children = children or []self.new_worker_agent_kwargs = new_worker_agent_kwargscoord_agent_sys_msg = BaseMessage.make_assistant_message(role_name="Workforce Manager",content="You are coordinating a group of workers. A worker can be ""a group of agents or a single agent. Each worker is ""created to solve a specific kind of task. Your job ""includes assigning tasks to a existing worker, creating ""a new worker for a task, etc.",)self.coordinator_agent = ChatAgent(coord_agent_sys_msg, **(coordinator_agent_kwargs or {}))task_sys_msg = BaseMessage.make_assistant_message(role_name="Task Planner",content="You are going to compose and decompose tasks.",)self.task_agent = ChatAgent(task_sys_msg, **(task_agent_kwargs or {}))# If there is one, will set by the workforce class wrapping thisself._task: Optional[Task] = Noneself._pending_tasks: Deque[Task] = deque()
其中coordinator_agent的本質為ChatAgent
,role_name=“Workforce Manager”,對應的提示詞為:
"You are coordinating a group of workers. A worker can be ""a group of agents or a single agent. Each worker is ""created to solve a specific kind of task. Your job ""includes assigning tasks to a existing worker, creating ""a new worker for a task, etc."
task_agentt的本質也是ChatAgent
,role_name=“Task Planner”,對應的提示詞為:
"You are going to compose and decompose tasks."
2.add_single_agent_worker
函數
@check_if_running(False)def add_single_agent_worker(self, description: str, worker: ChatAgent) -> Workforce:r"""Add a worker node to the workforce that uses a single agent.Args:description (str): Description of the worker node.worker (ChatAgent): The agent to be added.Returns:Workforce: The workforce node itself."""worker_node = SingleAgentWorker(description, worker)self._children.append(worker_node)return self
調用add_single_agent_worker
時會添加單個工作節點worker_node
.
3.add_role_playing_worker
函數
def add_role_playing_worker(self,description: str,assistant_role_name: str,user_role_name: str,assistant_agent_kwargs: Optional[Dict] = None,user_agent_kwargs: Optional[Dict] = None,chat_turn_limit: int = 3,) -> Workforce:r"""Add a worker node to the workforce that uses `RolePlaying` system.Args:description (str): Description of the node.assistant_role_name (str): The role name of the assistant agent.user_role_name (str): The role name of the user agent.assistant_agent_kwargs (Optional[Dict], optional): The keywordarguments to initialize the assistant agent in the roleplaying, like the model name, etc. Defaults to `None`.user_agent_kwargs (Optional[Dict], optional): The keyword argumentsto initialize the user agent in the role playing, like themodel name, etc. Defaults to `None`.chat_turn_limit (int, optional): The maximum number of chat turnsin the role playing. Defaults to 3.Returns:Workforce: The workforce node itself."""worker_node = RolePlayingWorker(description,assistant_role_name,user_role_name,assistant_agent_kwargs,user_agent_kwargs,chat_turn_limit,)self._children.append(worker_node)return self
調用add_role_playing_worker
時會添加RolePlayingWorker
的工作節點.
4.add_workforce
函數
@check_if_running(False)def add_workforce(self, workforce: Workforce) -> Workforce:r"""Add a workforce node to the workforce.Args:workforce (Workforce): The workforce node to be added.Returns:Workforce: The workforce node itself."""self._children.append(workforce)return self
調用add_workforce
時會添加workforce
類型的工作節點.
四、基于workforce實現多智能體協調
關鍵步驟為
(1)創建一個 Workforce 實例
workforce = Workforce(description="旅游攻略制作與評估工作組",new_worker_agent_kwargs={'model':model},coordinator_agent_kwargs={'model':model},task_agent_kwargs={'model':model})
(2)worker定義
planner_agent = ChatAgent(system_message="""你是一個專業的旅行規劃師。你的職責是:1. 根據景點分布規劃合理的游覽順序2. 為每天安排適量的景點和活動3. 考慮用餐、休息等時間4. 注意不同季節的特點請確保行程安排合理且具有可行性。""",model=model,output_language='中文')
(3)添加到Workforce 的工作節點
workforce.add_single_agent_worker("負責制定詳細行程規劃",worker=planner_agent
)
(4)創建一個任務
from camel.tasks import Task# 創建一個用于測試的任務
task = Task(content="規劃一個3天的巴黎旅行計劃。",id="0", # id可以是任何標記字符串
)
(5)啟動 Workforce 的任務處理流程
task = workforce.process_task(task)
完整示例代碼
from camel.agents import ChatAgent
from camel.models import ModelFactory
from camel.types import ModelPlatformType
from camel.messages import BaseMessage
from camel.societies.workforce import Workforce
from camel.toolkits import SearchToolkit
from camel.tasks import Task
from camel.toolkits import FunctionTool
from camel.configs import SiliconFlowConfig # 關鍵from dotenv import load_dotenv
import osload_dotenv()api_key = os.getenv('Siliconflow_API_KEY')model = ModelFactory.create(model_platform=ModelPlatformType.SILICONFLOW,model_type="Qwen/Qwen2.5-72B-Instruct",model_config_dict=SiliconFlowConfig(temperature=0.2).as_dict(),api_key=api_key
)# 創建一個 Workforce 實例
workforce = Workforce(description="旅游攻略制作與評估工作組",new_worker_agent_kwargs={'model':model},coordinator_agent_kwargs={'model':model},task_agent_kwargs={'model':model})search_tool = FunctionTool(SearchToolkit().search_duckduckgo)search_agent = ChatAgent(system_message="""你是一個專業的旅游信息搜索助手。你的職責是:1. 搜索目的地的主要景點信息2. 搜索當地特色美食信息3. 搜索交通和住宿相關信息請確保信息的準確性和實用性。""",model=model,tools=[search_tool],output_language='中文')planner_agent = ChatAgent(system_message="""你是一個專業的旅行規劃師。你的職責是:1. 根據景點分布規劃合理的游覽順序2. 為每天安排適量的景點和活動3. 考慮用餐、休息等時間4. 注意不同季節的特點請確保行程安排合理且具有可行性。""",model=model,output_language='中文')reviewer_agent = ChatAgent(system_message="""你是一個經驗豐富的旅行愛好者。你的職責是:1. 從游客角度評估行程的合理性2. 指出可能的問題和改進建議3. 補充實用的旅行小貼士4. 評估行程的性價比請基于實際旅行經驗給出中肯的建議。""",model=model,output_language='中文'
)# 添加工作節點
workforce.add_single_agent_worker("負責搜索目的地相關信息",worker=search_agent
).add_single_agent_worker("負責制定詳細行程規劃",worker=planner_agent
).add_single_agent_worker("負責從游客角度評估行程",worker=reviewer_agent
)from camel.tasks import Task# 創建一個用于測試的任務
task = Task(content="規劃一個3天的巴黎旅行計劃。",id="0", # id可以是任何標記字符串
)task = workforce.process_task(task)print(task.result)
五、遇到問題
關鍵報錯代碼
If you have a specific format in mind or a particular context for this number, please let me know!
Traceback (most recent call last):File "/home/allyoung/camel_course/03-2-33-Workforce.py", line 84, in <module>task = workforce.process_task(task)File "/home/allyoung/camel/camel/societies/workforce/utils.py", line 69, in wrapperreturn func(self, *args, **kwargs)File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 153, in process_taskasyncio.run(self.start())File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/asyncio/runners.py", line 44, in runreturn loop.run_until_complete(main)File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/asyncio/base_events.py", line 641, in run_until_completereturn future.result()File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 469, in startawait self._listen_to_channel()File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 437, in _listen_to_channelawait self._post_ready_tasks()File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 402, in _post_ready_tasksassignee_id = self._find_assignee(task=ready_task)File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 288, in _find_assigneeresult_dict = json.loads(response.msg.content)File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/__init__.py", line 346, in loadsreturn _default_decoder.decode(s)File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/decoder.py", line 337, in decodeobj, end = self.raw_decode(s, idx=_w(s, 0).end())File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/decoder.py", line 355, in raw_decoderaise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)