當智能體作為獨立服務/容器部署時,它們無法共享進程內狀態。?以下是針對分布式部署中動態內存庫的生產就緒解決方案:
1. 基于外部存儲的內存庫
基于 DynamoDB 的共享內存
import boto3 from strands import Agent, tool from typing import Dict, Any import jsonclass DynamoDBMemoryBank:def __init__(self, table_name: str = "agent-shared-memory"):self.dynamodb = boto3.resource('dynamodb')self.table = self.dynamodb.Table(table_name)def store(self, memory_key: str, data: Dict[str, Any], agent_id: str):"""在共享內存中存儲數據"""self.table.put_item(Item={'memory_key': memory_key,'data': json.dumps(data),'agent_id': agent_id,'timestamp': int(time.time()),'ttl': int(time.time()) + 86400 # 24 小時 TTL})def retrieve(self, memory_key: str) -> Dict[str, Any]:"""從共享內存檢索數據"""try:response = self.table.get_item(Key={'memory_key': memory_key})if 'Item' in response:return json.loads(response['Item']['data'])except Exception as e:print(f"檢索內存錯誤: {e}")return {}def list_keys(self, prefix: str = "") -> list:"""列出所有內存鍵,可選前綴過濾"""response = self.table.scan()keys = [item['memory_key'] for item in response['Items']]if prefix:keys = [k for k in keys if k.startswith(prefix)]return keys# 創建共享內存工具 memory_bank = DynamoDBMemoryBank()@tool def store_shared_data(key: str, data: str, description: str = "") -> str:"""在所有智能體可訪問的共享內存庫中存儲數據"""memory_bank.store(key, {'content': data,'description': description}, agent_id=os.getenv('AGENT_ID', 'unknown'))return f"存儲數據,鍵為: {key}"@tool def retrieve_shared_data(key: str) -> str:"""從共享內存庫檢索數據"""data = memory_bank.retrieve(key)if data:return f"{data.get('description', '')}: {data.get('content', '')}"return f"未找到鍵對應的數據: {key}"@tool def list_memory_keys(prefix: str = "") -> str:"""列出可用的內存鍵"""keys = memory_bank.list_keys(prefix)return f"可用鍵: {', '.join(keys)}"
基于 Redis 的高性能內存
import redis import json from strands import Agent, toolclass RedisMemoryBank:def __init__(self, host: str = "redis-cluster.company.com", port: int = 6379):self.redis_client = redis.Redis(host=host, port=port, decode_responses=True,health_check_interval=30)def store(self, key: str, data: dict, ttl: int = 3600):"""存儲數據并設置 TTL"""self.redis_client.setex(key, ttl, json.dumps(data))def retrieve(self, key: str) -> dict:"""檢索數據"""data = self.redis_client.get(key)return json.loads(data) if data else {}def publish_event(self, channel: str, message: dict):"""向其他智能體發布事件"""self.redis_client.publish(channel, json.dumps(message))def subscribe_to_events(self, channels: list):"""訂閱來自其他智能體的事件"""pubsub = self.redis_client.pubsub()pubsub.subscribe(channels)return pubsub# 基于 Redis 的內存工具 redis_memory = RedisMemoryBank()@tool def store_fast_memory(key: str, data: str, ttl_minutes: int = 60) -> str:"""在高速 Redis 內存庫中存儲數據"""redis_memory.store(key, {'content': data}, ttl_minutes * 60)return f"存儲在快速內存中: {key}"@tool def get_fast_memory(key: str) -> str:"""從高速內存檢索"""data = redis_memory.retrieve(key)return data.get('content', '未找到')
2. 基于 S3 的已部署智能體會話管理
python
from strands import Agent from strands.session.s3_session_manager import S3SessionManager# 每個已部署的智能體使用 S3 進行持久化會話 class DeployedAgent:def __init__(self, agent_id: str, user_id: str = None):self.agent_id = agent_id# 創建帶有 S3 后端的會話管理器session_id = f"{agent_id}_{user_id}" if user_id else agent_idself.session_manager = S3SessionManager(session_id=session_id,bucket=os.getenv('AGENT_MEMORY_BUCKET', 'agent-memory-bucket'),prefix=f"agents/{agent_id}/sessions")# 創建具有持久化會話的智能體self.agent = Agent(name=f"已部署智能體 {agent_id}",session_manager=self.session_manager,tools=[store_shared_data, retrieve_shared_data, *other_tools])async def process_request(self, message: str, context: dict = None):"""使用持久化內存處理請求"""if context:# 在共享內存中存儲上下文供其他智能體使用await store_shared_data(f"context_{self.agent_id}_{int(time.time())}", json.dumps(context))return await self.agent.run(message)# 部署配置 deployed_orchestrator = DeployedAgent("orchestrator", "global") deployed_specialist = DeployedAgent("data_specialist", "global")
3. 帶有共享內存的容器部署
帶有共享服務的 Docker Compose
# docker-compose.yml version: '3.8' services:# 共享內存服務redis:image: redis:7-alpineports:- "6379:6379"volumes:- redis_data:/datadynamodb-local:image: amazon/dynamodb-localports:- "8000:8000"command: ["-jar", "DynamoDBLocal.jar", "-sharedDb", "-inMemory"]# 智能體服務orchestrator:build: ./orchestratorenvironment:- AGENT_ID=orchestrator- REDIS_HOST=redis- DYNAMODB_ENDPOINT=http://dynamodb-local:8000- AGENT_MEMORY_BUCKET=shared-agent-memoryports:- "9000:9000"depends_on:- redis- dynamodb-localdata-specialist:build: ./data-specialistenvironment:- AGENT_ID=data_specialist- REDIS_HOST=redis- DYNAMODB_ENDPOINT=http://dynamodb-local:8000- AGENT_MEMORY_BUCKET=shared-agent-memoryports:- "9001:9001"depends_on:- redis- dynamodb-localresearch-agent:build: ./research-agentenvironment:- AGENT_ID=research_agent- REDIS_HOST=redis- DYNAMODB_ENDPOINT=http://dynamodb-local:8000- AGENT_MEMORY_BUCKET=shared-agent-memoryports:- "9002:9002"depends_on:- redis- dynamodb-localvolumes:redis_data:
4. 帶有共享內存的 AWS ECS/Fargate 部署
# agent_service.py - 所有已部署智能體的基礎服務 import os import asyncio from strands import Agent from strands.multiagent.a2a import A2AServerclass ProductionAgentService:def __init__(self):self.agent_id = os.getenv('AGENT_ID')self.memory_bank = DynamoDBMemoryBank()self.redis_memory = RedisMemoryBank(host=os.getenv('REDIS_HOST', 'redis-cluster.company.com'))# 創建帶有共享內存工具的智能體self.agent = Agent(name=f"生產環境智能體 {self.agent_id}",session_manager=S3SessionManager(session_id=f"prod_{self.agent_id}",bucket=os.getenv('AGENT_MEMORY_BUCKET'),prefix=f"production/{self.agent_id}"),tools=[store_shared_data,retrieve_shared_data,store_fast_memory,get_fast_memory,*self.get_specialized_tools()])def get_specialized_tools(self):"""在子類中重寫以提供專用工具"""return []async def start_service(self):"""啟動 A2A 服務"""port = int(os.getenv('PORT', 9000))server = A2AServer(agent=self.agent, port=port)# 啟動后臺任務asyncio.create_task(self.memory_sync_task())# 啟動 A2A 服務器server.serve()async def memory_sync_task(self):"""同步內存和處理事件的后臺任務"""pubsub = self.redis_memory.subscribe_to_events(['agent_events'])while True:try:message = pubsub.get_message(timeout=1.0)if message and message['type'] == 'message':await self.handle_agent_event(json.loads(message['data']))except Exception as e:print(f"內存同步錯誤: {e}")await asyncio.sleep(1)async def handle_agent_event(self, event: dict):"""處理來自其他智能體的事件"""if event.get('type') == 'memory_update':# 刷新本地緩存或采取行動print(f"內存被 {event.get('agent_id')} 更新: {event.get('key')}")# 專用智能體實現 class OrchestratorService(ProductionAgentService):def get_specialized_tools(self):return [orchestration_tools]class DataSpecialistService(ProductionAgentService):def get_specialized_tools(self):return [data_analysis_tools]# 主入口點 if __name__ == "__main__":agent_type = os.getenv('AGENT_TYPE', 'orchestrator')if agent_type == 'orchestrator':service = OrchestratorService()elif agent_type == 'data_specialist':service = DataSpecialistService()else:service = ProductionAgentService()asyncio.run(service.start_service())
5. 事件驅動的內存更新
# 事件驅動的內存同步 class EventDrivenMemoryBank:def __init__(self):self.dynamodb_memory = DynamoDBMemoryBank()self.redis_memory = RedisMemoryBank()self.sns = boto3.client('sns')self.topic_arn = os.getenv('AGENT_EVENTS_TOPIC_ARN')async def store_with_notification(self, key: str, data: dict, agent_id: str):"""存儲數據并通知其他智能體"""# 在持久化存儲中存儲self.dynamodb_memory.store(key, data, agent_id)# 在快速緩存中存儲self.redis_memory.store(key, data, ttl=3600)# 通知其他智能體event = {'type': 'memory_update','key': key,'agent_id': agent_id,'timestamp': time.time()}# 發布到 SNS 進行跨區域/服務通知await self.sns.publish(TopicArn=self.topic_arn,Message=json.dumps(event))# 發布到 Redis 進行實時更新self.redis_memory.publish_event('agent_events', event)@tool def store_with_broadcast(key: str, data: str, description: str = "") -> str:"""存儲數據并廣播給所有智能體"""event_memory = EventDrivenMemoryBank()asyncio.create_task(event_memory.store_with_notification(key, {'content': data, 'description': description},os.getenv('AGENT_ID')))return f"已存儲并廣播: {key}"
此架構的主要優勢
可擴展性:智能體可以獨立部署并根據需求擴展
持久性:內存可以在容器重啟和部署后保留
實時同步:Redis 提供快速內存訪問和事件通知
耐用性:DynamoDB/S3 提供持久化存儲
事件驅動:智能體可以實時響應內存變化
多區域:可以在 AWS 區域和可用區之間工作
生產部署模式
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ 協調者 │ │ 數據專家 │ │ 研究智能體 │ │ (ECS 任務) │ │ (ECS 任務) │ │ (ECS 任務) │ └─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘│ │ │└──────────────────────┼──────────────────────┘│┌─────────────┴─────────────┐│ 共享內存層 ││ ││ ┌─────────┐ ┌─────────┐ ││ │ Redis │ │DynamoDB │ ││ │(快速) │ │(持久) │ ││ └─────────┘ └─────────┘ ││ ││ ┌─────────┐ ┌─────────┐ ││ │ S3 │ │ SNS │ ││ │(會話) │ │(事件) │ ││ └─────────┘ └─────────┘ │└───────────────────────────┘
這種架構確保您的已部署智能體可以共享動態內存庫,同時保持分布式、可擴展部署的優勢。