目錄
- ??介紹
- ??設置環境
- ??創建項目
- ??代理技能
- ??代理卡片
- ??A2A服務器
- ??與A2A服務器交互
- ??添加代理功能
- ??使用本地Ollama模型
- ??后續步驟
介紹
在本教程中,您將使用Python構建一個簡單的echo A2A服務器。這個基礎實現將向您展示A2A提供的所有功能。完成本教程后,您將能夠使用Ollama或Google的Agent Development Kit添加代理功能。
您將學習:
- ? A2A背后的基本概念
- ? 如何用Python創建A2A服務器
- ? 與A2A服務器交互
- ? 添加訓練模型作為代理
設置環境
您需要的工具
- ? 代碼編輯器,如Visual Studio Code (VS Code)
- ? 命令提示符,如Terminal (Linux)、iTerm (Mac) 或VS Code中的Terminal
Python環境
我們將使用uv作為包管理器并設置項目。
我們將使用的A2A庫需要python >= 3.12
,如果您還沒有匹配的版本,uv可以安裝。我們將使用python 3.12。
檢查
運行以下命令,確保您已準備好進入下一步:
echo?'import?sys;?print(sys.version)'?|?uv?run?-
如果您看到類似以下內容,說明您已準備就緒!
3.12.3?(main,?Feb?4?2025,?14:48:35)?[GCC?13.3.0]
創建項目
首先使用uv
創建一個項目。我們將添加--package
標志,以便您以后可以添加測試或發布項目:
uv?init?--package?my-project
cd?my-project
使用虛擬環境
我們為這個項目創建一個虛擬環境。這只需要做一次:
uv?venv?.venv
對于這個和將來打開的任何終端窗口,您需要激活這個虛擬環境:
source?.venv/bin/activate
如果您使用的是VS Code等代碼編輯器,您需要設置Python解釋器以便代碼補全。在VS Code中,按下Ctrl-Shift-P
并選擇Python: Select Interpreter
。然后選擇您的項目my-project
,接著選擇正確的Python解釋器Python 3.12.3 ('.venv':venv) ./.venv/bin/python
現在源代碼結構應該類似于:
tree?.
.
├──?pyproject.toml
├──?README.md
├──?src
│???└──?my-project
│???????├──?__init__.py
添加Google-A2A Python庫
接下來我們將添加來自Google的A2A Python示例庫:
uv?add?git+https://github.com/google/A2A#subdirectory=samples/python
設置項目結構
現在創建一些我們稍后將使用的文件:
touch?src/my_project/agent.py
touch?src/my_project/task_manager.py
測試運行
如果一切設置正確,您現在應該能夠運行您的應用程序:
uv?run?my-project
輸出應該類似于:
Hello?from?my-project!
代理技能
代理技能是代理可以執行的一組功能。下面是我們echo代理的技能示例:
{id:?"my-project-echo-skill"name:?"Echo?Tool",description:?"Echos?the?input?given",tags:?["echo",?"repeater"],examples:?["I?will?see?this?echoed?back?to?me"],inputModes:?["text"],outputModes:?["text"]
}
這符合代理卡片的技能部分:
{id:?string;?//?代理技能的唯一標識符name:?string;?//?技能的人類可讀名稱//?技能描述?-?將被客戶端或人類用作提示,以理解這個技能的作用description:?string;//?描述這個特定技能功能類別的標簽詞集合//?(例如"cooking"、"customer?support"、"billing")tags:?string[];//?該技能可以執行的示例場景集合//?將被客戶端用作提示,以了解如何使用該技能//?(例如"I?need?a?recipe?for?bread")examples?:?string[];?//?任務提示示例//?該技能支持的交互模式集合//?(如果與默認值不同)inputModes?:?string[];?//?支持的輸入MIME類型outputModes?:?string[];?//?支持的輸出MIME類型
}
實現
讓我們用代碼創建這個代理技能。打開src/my-project/__init__.py
并用以下代碼替換內容:
import?google_a2a
from?google_a2a.common.types?import?AgentSkilldef?main():skill?=?AgentSkill(id="my-project-echo-skill",name="Echo?Tool",description="Echos?the?input?given",tags=["echo",?"repeater"],examples=["I?will?see?this?echoed?back?to?me"],inputModes=["text"],outputModes=["text"],)print(skill)if?__name__?==?"__main__":main()
測試運行
讓我們運行一下:
uv?run?my-project
輸出應該類似于:
id='my-project-echo-skill'?name='Echo?Tool'?description='Echos?the?input?given'?tags=['echo',?'repeater']?examples=['I?will?see?this?echoed?back?to?me']?inputModes=['text']?outputModes=['text']
代理卡片
現在我們已經定義了技能,可以創建代理卡片了。
遠程代理需要以JSON格式發布代理卡片,描述代理的能力和技能,以及認證機制。換句話說,這讓世界了解您的代理及如何與之交互。
實現
首先添加一些解析命令行參數的輔助工具。這對稍后啟動服務器很有幫助:
uv?add?click
然后更新我們的代碼:
import?loggingimport?click
from?dotenv?import?load_dotenv
import?google_a2a
from?google_a2a.common.types?import?AgentSkill,?AgentCapabilities,?AgentCardlogging.basicConfig(level=logging.INFO)
logger?=?logging.getLogger(__name__)@click.command()
@click.option("--host",?default="localhost")
@click.option("--port",?default=10002)
def?main(host,?port):skill?=?AgentSkill(id="my-project-echo-skill",name="Echo?Tool",description="Echos?the?input?given",tags=["echo",?"repeater"],examples=["I?will?see?this?echoed?back?to?me"],inputModes=["text"],outputModes=["text"],)logging.info(skill)if?__name__?==?"__main__":main()
接下來添加我們的代理卡片:
#?...
def?main(host,?port):#?...capabilities?=?AgentCapabilities()agent_card?=?AgentCard(name="Echo?Agent",description="This?agent?echos?the?input?given",url=f"http://{host}:{port}/",version="0.1.0",defaultInputModes=["text"],defaultOutputModes=["text"],capabilities=capabilities,skills=[skill])logging.info(agent_card)if?__name__?==?"__main__":main()
測試運行
讓我們運行一下:
uv?run?my-project
輸出應該類似于:
INFO:root:name='Echo?Agent'?description='This?agent?echos?the?input?given'?url='http://localhost:10002/'?provider=None?version='0.1.0'?documentationUrl=None?capabilities=AgentCapabilities(streaming=False,?pushNotifications=False,?stateTransitionHistory=False)?authentication=None?defaultInputModes=['text']?defaultOutputModes=['text']?skills=[AgentSkill(id='my-project-echo-skill',?name='Echo?Tool',?description='Echos?the?input?given',?tags=['echo',?'repeater'],?examples=['I?will?see?this?echoed?back?to?me'],?inputModes=['text'],?outputModes=['text'])]
A2A服務器
我們幾乎準備好啟動服務器了!我們將使用Google-A2A
中的A2AServer
類,它在底層啟動一個uvicorn服務器。
任務管理器
在創建服務器之前,我們需要一個任務管理器來處理傳入的請求。
我們將實現InMemoryTaskManager接口,需要實現兩個方法:
async?def?on_send_task(self,request:?SendTaskRequest
)?->?SendTaskResponse:"""該方法查詢或創建代理的任務。調用者將收到恰好一個響應。"""passasync?def?on_send_task_subscribe(self,request:?SendTaskStreamingRequest
)?->?AsyncIterable[SendTaskStreamingResponse]?|?JSONRPCResponse:"""該方法使調用者訂閱有關任務的未來更新。調用者將收到一個響應,并通過客戶端和服務器之間建立的會話接收訂閱更新"""pass
打開src/my_project/task_manager.py
并添加以下代碼。我們將簡單地返回直接回顯響應,并立即將任務標記為完成,不需要任何會話或訂閱:
from?typing?import?AsyncIterableimport?google_a2a
from?google_a2a.common.server.task_manager?import?InMemoryTaskManager
from?google_a2a.common.types?import?(Artifact,JSONRPCResponse,Message,SendTaskRequest,SendTaskResponse,SendTaskStreamingRequest,SendTaskStreamingResponse,Task,TaskState,TaskStatus,TaskStatusUpdateEvent,
)class?MyAgentTaskManager(InMemoryTaskManager):def?__init__(self):super().__init__()async?def?on_send_task(self,?request:?SendTaskRequest)?->?SendTaskResponse:#?更新由InMemoryTaskManager存儲的任務await?self.upsert_task(request.params)task_id?=?request.params.id#?我們的自定義邏輯,簡單地將任務標記為完成#?并返回echo文本received_text?=?request.params.message.parts[0].texttask?=?await?self._update_task(task_id=task_id,task_state=TaskState.COMPLETED,response_text=f"on_send_task?received:?{received_text}")#?發送響應return?SendTaskResponse(id=request.id,?result=task)async?def?on_send_task_subscribe(self,request:?SendTaskStreamingRequest)?->?AsyncIterable[SendTaskStreamingResponse]?|?JSONRPCResponse:passasync?def?_update_task(self,task_id:?str,task_state:?TaskState,response_text:?str,)?->?Task:task?=?self.tasks[task_id]agent_response_parts?=?[{"type":?"text","text":?response_text,}]task.status?=?TaskStatus(state=task_state,message=Message(role="agent",parts=agent_response_parts,))task.artifacts?=?[Artifact(parts=agent_response_parts,)]return?task
A2A服務器
有了任務管理器,我們現在可以創建服務器了。
打開src/my_project/__init__.py
并添加以下代碼:
#?...
from?google_a2a.common.server?import?A2AServer
from?my_project.task_manager?import?MyAgentTaskManager
#?...
def?main(host,?port):#?...task_manager?=?MyAgentTaskManager()server?=?A2AServer(agent_card=agent_card,task_manager=task_manager,host=host,port=port,)server.start()
測試運行
讓我們運行一下:
uv?run?my-project
輸出應該類似于:
INFO:?????Started?server?process?[20506]
INFO:?????Waiting?for?application?startup.
INFO:?????Application?startup?complete.
INFO:?????Uvicorn?running?on?http://localhost:10002?(Press?CTRL+C?to?quit)
恭喜!您的A2A服務器現在正在運行!
與A2A服務器交互
首先我們將使用Google-A2A的命令行工具向我們的A2A服務器發送請求。嘗試之后,我們將編寫自己的基本客戶端,了解底層工作原理。
使用Google-A2A的命令行工具
在上一步中,您的A2A服務器已經在運行:
#?這應該已經在您的終端中運行
$?uv?run?my-project
INFO:?????Started?server?process?[20538]
INFO:?????Waiting?for?application?startup.
INFO:?????Application?startup?complete.
INFO:?????Uvicorn?running?on?http://localhost:10002?(Press?CTRL+C?to?quit)
在同一目錄中打開新終端:
source?.venv/bin/activate
uv?run?google-a2a-cli?--agent?http://localhost:10002
注意:這只有在您安裝了來自此PR的google-a2a時才有效,因為之前CLI并未公開。
否則,您必須直接檢出Google/A2A倉庫,導航到samples/python
目錄并直接運行CLI。
然后通過輸入并按Enter發送消息到服務器:
=========??starting?a?new?task?========What?do?you?want?to?send?to?the?agent??(:q?or?quit?to?exit):?Hello!
如果一切正常,您將在響應中看到:
"message":{"role":"agent","parts":[{"type":"text","text":"on_send_task?received:?Hello!"}]}
要退出,輸入:q
并按Enter。
添加代理功能
現在我們有了一個基本的A2A服務器,讓我們添加更多功能。我們將探索A2A如何異步工作和流式響應。
流式傳輸
這允許客戶端訂閱服務器并接收多個更新,而不是單個響應。這對于長時間運行的代理任務或需要向客戶端流式傳輸多個Artifacts的情況很有用。
首先聲明我們的代理已準備好流式傳輸。打開src/my_project/__init__.py
并更新AgentCapabilities:
#?...
def?main(host,?port):#?...capabilities?=?AgentCapabilities(streaming=True)#?...
現在在src/my_project/task_manager.py
中,我們需要實現on_send_task_subscribe
:
import?asyncio
#?...
class?MyAgentTaskManager(InMemoryTaskManager):#?...async?def?_stream_3_messages(self,?request:?SendTaskStreamingRequest):task_id?=?request.params.idreceived_text?=?request.params.message.parts[0].texttext_messages?=?["one",?"two",?"three"]for?text?in?text_messages:parts?=?[{"type":?"text","text":?f"{received_text}:?{text}",}]message?=?Message(role="agent",?parts=parts)is_last?=?text?==?text_messages[-1]task_state?=?TaskState.COMPLETED?if?is_last?else?TaskState.WORKINGtask_status?=?TaskStatus(state=task_state,message=message)task_update_event?=?TaskStatusUpdateEvent(id=request.params.id,status=task_status,final=is_last,)await?self.enqueue_events_for_sse(request.params.id,task_update_event)async?def?on_send_task_subscribe(self,request:?SendTaskStreamingRequest)?->?AsyncIterable[SendTaskStreamingResponse]?|?JSONRPCResponse:#?更新由InMemoryTaskManager存儲的任務await?self.upsert_task(request.params)task_id?=?request.params.id#?為此任務創建工作隊列sse_event_queue?=?await?self.setup_sse_consumer(task_id=task_id)#?開始為此任務異步工作asyncio.create_task(self._stream_3_messages(request))#?告訴客戶端期待未來的流式響應return?self.dequeue_events_for_sse(request_id=request.id,task_id=task_id,sse_event_queue=sse_event_queue,)
重啟A2A服務器以應用新更改,然后重新運行CLI:
$?uv?run?google-a2a-cli?--agent?http://localhost:10002
=========??starting?a?new?task?========What?do?you?want?to?send?to?the?agent??(:q?or?quit?to?exit):?Streaming?"status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?:?one"}]}
"status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?:?two"}]}
"status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?:?three"}]}
有時代理可能需要額外輸入。例如,代理可能會詢問客戶是否希望繼續重復3條消息。在這種情況下,代理將以TaskState.INPUT_REQUIRED
響應,客戶端然后會用相同的task_id
和session_id
但更新的消息重新發送send_task_streaming
,提供代理所需的輸入。在服務器端,我們將更新on_send_task_subscribe
以處理這種情況:
#?...class?MyAgentTaskManager(InMemoryTaskManager):#?...async?def?_stream_3_messages(self,?request:?SendTaskStreamingRequest):#?...async?for?message?in?messages:#?...#?is_last?=?message?==?messages[-1]?#?刪除此行task_state?=?TaskState.WORKING#?...task_update_event?=?TaskStatusUpdateEvent(id=request.params.id,status=task_status,final=False,)#?...ask_message?=?Message(role="agent",parts=[{"type":?"text","text":?"Would?you?like?more?messages??(Y/N)"}])task_update_event?=?TaskStatusUpdateEvent(id=request.params.id,status=TaskStatus(state=TaskState.INPUT_REQUIRED,message=ask_message),final=True,)await?self.enqueue_events_for_sse(request.params.id,task_update_event)#?...async?def?on_send_task_subscribe(self,request:?SendTaskStreamingRequest)?->?AsyncIterable[SendTaskStreamingResponse]?|?JSONRPCResponse:task_id?=?request.params.idis_new_task?=?task_id?in?self.tasks#?更新由InMemoryTaskManager存儲的任務await?self.upsert_task(request.params)received_text?=?request.params.message.parts[0].textsse_event_queue?=?await?self.setup_sse_consumer(task_id=task_id)if?not?is_new_task?and?received_text?==?"N":task_update_event?=?TaskStatusUpdateEvent(id=request.params.id,status=TaskStatus(state=TaskState.COMPLETED,message=Message(role="agent",parts=[{"type":?"text","text":?"All?done!"}])),final=True,)await?self.enqueue_events_for_sse(request.params.id,task_update_event,)else:asyncio.create_task(self._stream_3_messages(request))return?self.dequeue_events_for_sse(request_id=request.id,task_id=task_id,sse_event_queue=sse_event_queue,)
重啟服務器并運行CLI后,我們可以看到任務將繼續運行,直到我們告訴代理N
:
$?uv?run?google-a2a-cli?--agent?http://localhost:10002
=========??starting?a?new?task?========What?do?you?want?to?send?to?the?agent??(:q?or?quit?to?exit):?Streaming?"status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?:?one"}]}
"status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?:?two"}]}
"status":{"state":"working","message":{"role":"agent","parts":[{"type":"text","text":"Streaming?:?three"}]}
"status":{"state":"input-required","message":{"role":"agent","parts":[{"type":"text","text":"Would?you?like?more?messages??(Y/N)"}]}What?do?you?want?to?send?to?the?agent??(:q?or?quit?to?exit):?N"status":{"state":"completed","message":{"role":"agent","parts":[{"type":"text","text":"All?done!"}]}
恭喜!您現在有了一個能夠異步執行工作并在需要時向用戶請求輸入的代理。
使用本地Ollama模型
現在到了激動人心的部分。我們將為A2A服務器添加AI功能。
在本教程中,我們將設置本地Ollama模型并將其與A2A服務器集成。
要求
我們將安裝ollama
、langchain
,并下載支持MCP工具的ollama模型(用于未來教程)。
- 1. 下載ollama
- 2. 運行ollama服務器:
#?注意:如果ollama已在運行,您可能會收到錯誤,如
#?Error:?listen?tcp?127.0.0.1:11434:?bind:?address?already?in?use
#?在Linux上可以運行systemctl?stop?ollama停止ollama
ollama?serve
- 3. 從此列表下載模型。我們將使用
qwq
,因為它支持tools
(如其標簽所示)并在24GB顯卡上運行:
ollama?pull?qwq
- 4. 安裝
langchain
:
uv?add?langchain?langchain-ollama?langgraph
現在ollama設置好了,我們可以開始將其集成到A2A服務器中。
將Ollama集成到A2A服務器
首先打開src/my_project/__init__.py
:
#?...@click.command()
@click.option("--host",?default="localhost")
@click.option("--port",?default=10002)
@click.option("--ollama-host",?default="http://127.0.0.1:11434")
@click.option("--ollama-model",?default=None)
def?main(host,?port,?ollama_host,?ollama_model):#?...capabilities?=?AgentCapabilities(streaming=False?#?我們將流式功能作為讀者的練習)#?...task_manager?=?MyAgentTaskManager(ollama_host=ollama_host,ollama_model=ollama_model,)#?..
現在在src/my_project/agent.py
中添加AI功能:
from?langchain_ollama?import?ChatOllama
from?langgraph.prebuilt?import?create_react_agent
from?langgraph.graph.graph?import?CompiledGraphdef?create_ollama_agent(ollama_base_url:?str,?ollama_model:?str):ollama_chat_llm?=?ChatOllama(base_url=ollama_base_url,model=ollama_model,temperature=0.2)agent?=?create_react_agent(ollama_chat_llm,?tools=[])return?agentasync?def?run_ollama(ollama_agent:?CompiledGraph,?prompt:?str):agent_response?=?await?ollama_agent.ainvoke({"messages":?prompt?})message?=?agent_response["messages"][-1].contentreturn?str(message)
最后從src/my_project/task_manager.py
調用我們的ollama代理:
#?...
from?my_project.agent?import?create_ollama_agent,?run_ollamaclass?MyAgentTaskManager(InMemoryTaskManager):def?__init__(self,ollama_host:?str,ollama_model:?typing.Union[None,?str]):super().__init__()if?ollama_model?is?not?None:self.ollama_agent?=?create_ollama_agent(ollama_base_url=ollama_host,ollama_model=ollama_model)else:self.ollama_agent?=?Noneasync?def?on_send_task(self,?request:?SendTaskRequest)?->?SendTaskResponse:#?...received_text?=?request.params.message.parts[0].textresponse_text?=?f"on_send_task?received:?{received_text}"if?self.ollama_agent?is?not?None:response_text?=?await?run_ollama(ollama_agent=self.ollama_agent,?prompt=received_text)task?=?await?self._update_task(task_id=task_id,task_state=TaskState.COMPLETED,response_text=response_text)#?發送響應return?SendTaskResponse(id=request.id,?result=task)#?...
讓我們測試一下!
首先重新運行A2A服務器,將qwq
替換為您下載的ollama模型:
uv?run?my-project?--ollama-host?http://127.0.0.1:11434?--ollama-model?qwq
然后重新運行CLI:
uv?run?google-a2a-cli?--agent?http://localhost:10002
注意,如果您使用大模型,加載可能需要一段時間。CLI可能會超時。在這種情況下,一旦ollama服務器完成模型加載,請重新運行CLI。
您應該看到類似于以下內容:
=========??starting?a?new?task?========What?do?you?want?to?send?to?the?agent??(:q?or?quit?to?exit):?hey"message":{"role":"agent","parts":[{"type":"text","text":"<think>\nOkay,?the?user?said?\"hey\".?That's?pretty?casual.?I?should?respond?in?a?friendly?way.?Maybe?ask?how?I?can?help?them?today.?Keep?it?open-ended?so?they?feel?comfortable?sharing?what?they?need.?Let?me?make?sure?my?tone?is?positive?and?approachable.?Alright,?something?like,?\"Hey?there!?How?can?I?assist?you?today?\"?Yeah,?that?sounds?good.\n</think>\n\nHey?there!?How?can?I?assist?you?today??😊"}]}
恭喜!您現在有了一個使用AI模型生成響應的A2A服務器!
了解更多:https://a2aprotocol.ai/blog/python-a2a-tutorial