引言
在數字化時代,數據傳輸的效率和穩定性是推動技術進步的關鍵。MCP(Model Context Protocol)作為AI生態系統中的重要一環,通過引入Streamable HTTP傳輸機制,為數據交互帶來了革命性的變化。本文將深入解讀MCP協議的Streamable HTTP,從會話協商到正式通信傳輸數據的全過程,探討其技術架構、協議內容、實現方式以及對AI應用的影響。
技術架構
MCP協議采用客戶端-服務器架構,其核心組件包括:
- MCP主機(MCP Host):包含MCP客戶端的應用程序,如Claude Desktop、Cursor IDE等AI工具。
- MCP客戶端(MCP Client):在主機內部與服務器保持1:1連接的協議實現。
- MCP服務器(MCP Server):輕量級程序,通過標準化協議暴露特定功能,可以是本地Node.js/Python程序或遠程服務。
MCP服務器提供三類標準能力:
- 資源(Resources):如文件讀取、API數據獲取。
- 工具(Tools):第三方服務或功能函數,如Git操作、瀏覽器控制。
- 提示詞(Prompts):預定義的任務模板,增強模型特定場景表現。
Streamable HTTP的協議內容
Streamable HTTP作為MCP協議的一項重大更新,旨在解決傳統HTTP+SSE方案的局限性,同時保留其優勢。其核心內容包括以下幾個方面:
- 統一消息入口:所有客戶端到服務器的消息都通過
/message
端點發送,不再需要專門的SSE端點。 - 動態升級SSE流:服務器可以根據需要將客戶端發往
/message
的請求升級為SSE連接,用于推送通知或請求。 - 會話管理機制:客戶端通過請求頭中的
Mcp-Session-Id
與服務器建立會話,服務器可選擇是否維護會話狀態。 - 支持無狀態服務器:服務器可以選擇完全無狀態運行,不再需要維持長期連接。
實現方式
從會話協商到正式通信傳輸數據
1. 會話協商
會話協商是Streamable HTTP通信的初始階段,客戶端與服務器通過以下步驟建立會話:
- 客戶端發送初始化請求:客戶端通過HTTP POST向MCP服務器的
/message
端點發送一個InitializeRequest
消息,攜帶協議版本和客戶端能力信息。 - 服務器響應初始化:服務器收到請求后,返回一個
InitializeResult
消息,包含服務器支持的協議版本、服務器能力以及會話ID(Mcp-Session-Id
)。 - 客戶端發送已初始化通知:客戶端收到服務器的響應后,發送一個
Initialized
通知,告知服務器初始化已完成。
示例:客戶端發送初始化請求
POST /message HTTP/1.1
Host: mcp.example.com
Content-Type: application/json
Accept: text/event-stream, application/json{"jsonrpc": "2.0", "method": "initialize", "params": {"clientInfo": {"name": "MCP Client", "version": "1.0"}, "capabilities": {}}}
示例:服務器響應初始化
HTTP/1.1 200 OK
Content-Type: application/json
Mcp-Session-Id: 12345{"jsonrpc": "2.0", "id": 1, "result": {"serverInfo": {"name": "MCP Server", "version": "1.0"}, "capabilities": {}}}
示例:客戶端發送已初始化通知
POST /message HTTP/1.1
Host: mcp.example.com
Content-Type: application/json
Accept: text/event-stream, application/json
Mcp-Session-Id: 12345{"jsonrpc": "2.0", "method": "initialized", "params": {}}
2. 正式通信傳輸數據
會話建立后,客戶端和服務器可以通過以下步驟進行正式通信:
- 客戶端發送消息:客戶端通過HTTP POST向MCP服務器的
/message
端點發送JSON-RPC消息,攜帶會話標識Mcp-Session-Id
。 - 服務器處理請求并響應:服務器根據請求內容處理消息,并通過SSE流或JSON對象返回響應。
- 動態升級為SSE流:如果需要實時推送消息,服務器可以將連接升級為SSE流。
- 斷線重連與數據恢復:如果網絡波動導致連接中斷,客戶端可以攜帶
Last-Event-ID
重新連接,服務器根據該ID重放未發送的消息。
示例:客戶端發送消息
POST /message HTTP/1.1
Host: mcp.example.com
Content-Type: application/json
Accept: text/event-stream, application/json
Mcp-Session-Id: 12345{"jsonrpc": "2.0", "id": 1, "method": "get_file", "params": {"path": "/example.txt"}}
示例:服務器響應并升級為SSE流
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
Mcp-Session-Id: 12345data: {"jsonrpc": "2.0", "id": 1, "result": "File content here"}
示例:客戶端斷線重連
GET /message HTTP/1.1
Host: mcp.example.com
Accept: text/event-stream
Last-Event-ID: 12345
Mcp-Session-Id: 12345
示例:服務器重放未發送的消息
data: {"jsonrpc": "2.0", "id": 2, "result": "Continued content here"}
服務端代碼實現
from datetime import datetime
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.routing import Route
from starlette.responses import JSONResponse, StreamingResponse
import json
import uuid
from starlette.middleware.cors import CORSMiddleware
import asyncio
from typing import Dict, Any
import aiofiles
import random# 存儲會話ID和對應的任務隊列
sessions: Dict[str, Dict[str, Any]] = {}# 添加CORS支持
app = Starlette()
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],expose_headers=["Mcp-Session-Id"],
)@app.route('/message', methods=["POST", "GET"])
async def handle_message(request: Request):"""處理POST和GET請求。"""session_id = request.headers.get("Mcp-Session-Id") or request.query_params.get("Mcp-Session-Id")if request.method == "POST":try:data = await request.json()if data.get("method") == "initialize":# 初始化會話session_id = str(uuid.uuid4())sessions[session_id] = {"initialized": True,"task_queue": asyncio.Queue()}response = JSONResponse(content={"jsonrpc": "2.0","id": data.get("id"),"result": {"serverInfo": {"name": "MCP Server", "version": "1.0"},"capabilities": {},},})response.headers["Mcp-Session-Id"] = session_idreturn responseelif session_id and sessions.get(session_id, {}).get("initialized"):# 處理已初始化的請求if data.get("method") == "get_file":try:# 異步讀取文件內容content = await async_read_file(data.get("params", {}).get("path", ""))return JSONResponse(content={"jsonrpc": "2.0","id": data.get("id"),"result": content,})except Exception as e:return JSONResponse(content={"jsonrpc": "2.0","id": data.get("id"),"error": f"Error reading file: {str(e)}",})else:return JSONResponse(content={"error": "Unknown method"})else:return JSONResponse(content={"error": "Session not initialized"}, status_code=400)except Exception as e:return JSONResponse(content={"error": f"Internal server error: {str(e)}"}, status_code=500)elif request.method == "GET":# 處理SSE流請求if not session_id or session_id not in sessions:return JSONResponse(content={"error": "Session not found"}, status_code=404)async def event_generator(session_id):while True:try:message = await asyncio.wait_for(sessions[session_id]["task_queue"].get(), timeout=10) # 超時時間10秒yield f"data: {json.dumps(message)}\n\n"except asyncio.TimeoutError as e:yield f"data: {e}\n\n" # 發送空數據作為心跳包,防止超時斷開return StreamingResponse(event_generator(session_id), media_type="text/event-stream")async def async_read_file(path: str) -> str:"""異步讀取文件內容。"""try:async with aiofiles.open(path, "r") as file:content = await file.read()return contentexcept Exception as e:raise Exception(f"Error reading file: {str(e)}")async def background_task(session_id: str, task: Dict[str, Any]):"""后臺任務處理。"""# 模擬耗時操作await asyncio.sleep(1)# 將結果放入任務隊列sessions[session_id]["task_queue"].put_nowait(task)@app.on_event("startup")
async def startup_event():async def push_test_messages():while True:sp = random.randint(1, 3)await asyncio.sleep(sp) # 每5秒推送一個消息for session_id in sessions.keys():if sessions[session_id]["initialized"]:sessions[session_id]["task_queue"].put_nowait({"message": f"Hello from server!", "sleep": sp,"datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S")})asyncio.create_task(push_test_messages()) # 創建后臺任務if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)
客戶端代碼實現
import httpx
import json
import asyncio
import aiofilesclass MCPClient:def __init__(self, server_url: str):self.server_url = server_urlself.session_id = Noneself.headers = {"Content-Type": "application/json","Accept": "text/event-stream, application/json"}async def initialize(self):"""初始化會話。"""async with httpx.AsyncClient() as client:try:response = await client.post(f"{self.server_url}/message",headers=self.headers,json={"jsonrpc": "2.0","method": "initialize","params": {"clientInfo": {"name": "MCP Client", "version": "1.0"},"capabilities": {},},},)response.raise_for_status()self.session_id = response.headers.get("Mcp-Session-Id")print(f"Session ID: {self.session_id}")return self.session_idexcept Exception as e:print(f"Failed to initialize session: {e}")return Noneasync def send_message(self, method: str, params: dict = None):"""發送消息。"""if not self.session_id:await self.initialize()async with httpx.AsyncClient() as client:try:response = await client.post(f"{self.server_url}/message",headers={"Mcp-Session-Id": self.session_id, **self.headers},json={"jsonrpc": "2.0","id": 1,"method": method,"params": params or {},},)response.raise_for_status()return response.json()except Exception as e:print(f"Failed to send message: {e}")return Noneasync def listen_sse(self):if not self.session_id:await self.initialize()async with httpx.AsyncClient(timeout=None) as client: # 取消超時限制try:async with client.stream("GET",f"{self.server_url}/message",headers={"Mcp-Session-Id": self.session_id, **self.headers},) as response:async for line in response.aiter_lines():if line.strip(): # 避免空行print(f"SSE Message: {line}")except Exception as e:print(f"Failed to listen SSE: {e}")await self.reconnect()async def reconnect(self):"""斷線重連。"""print("Attempting to reconnect...")await asyncio.sleep(5) # 等待5秒后重試await self.initialize()await self.listen_sse()async def main():client = MCPClient("http://localhost:8000")await client.initialize()response = await client.send_message("get_file", {"path": "/Users/houjie/PycharmProjects/python-sdk/examples/mcp-server/example.txt"})print(f"Response: {response}")await client.listen_sse()if __name__ == "__main__":asyncio.run(main())
前端頁面代碼實現
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>MCP Streamable HTTP Demo</title><style>body {font-family: Arial, sans-serif;margin: 0;padding: 20px;background-color: #f5f5f5;}.container {max-width: 800px;margin: 0 auto;background-color: white;padding: 20px;border-radius: 8px;box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);}h1 {text-align: center;color: #333;}.message-area {margin-top: 20px;}.message {padding: 10px;margin-bottom: 10px;border-radius: 4px;background-color: #e9f7fe;border-left: 4px solid #0099cc;}.sse-message {padding: 10px;margin-bottom: 10px;border-radius: 4px;background-color: #f0f9ff;border-left: 4px solid #0077cc;}button {background-color: #0099cc;color: white;border: none;padding: 10px 15px;border-radius: 4px;cursor: pointer;font-size: 14px;}button:hover {background-color: #0077cc;}input[type="text"] {width: 100%;padding: 10px;margin-bottom: 10px;border: 1px solid #ddd;border-radius: 4px;font-size: 14px;}</style>
</head>
<body><div class="container"><h1>MCP Streamable HTTP Demo</h1><div><input type="text" id="serverUrl" placeholder="Enter server URL" value="http://localhost:8000"><button id="initBtn">Initialize Session</button></div><div id="sessionId"></div><div><input type="text" id="filePath" placeholder="Enter file path"><button id="sendBtn">Send Message</button></div><div class="message-area" id="messages"></div></div><script>let client = null;let sessionInitialized = false;document.getElementById('initBtn').addEventListener('click', async () => {const serverUrl = document.getElementById('serverUrl').value;client = new MCPClient(serverUrl);await client.initialize();sessionInitialized = true;document.getElementById('sessionId').textContent = `Session ID: ${client.session_id}`;});document.getElementById('sendBtn').addEventListener('click', async () => {if (!sessionInitialized) {alert('Please initialize the session first.');return;}const filePath = document.getElementById('filePath').value;const response = await client.send_message('get_file', { path: filePath });addMessage(`Response: ${JSON.stringify(response)}`);});class MCPClient {constructor(serverUrl) {this.serverUrl = serverUrl;this.session_id = null;this.headers = {'Content-Type': 'application/json','Accept': 'text/event-stream, application/json'};}async initialize() {try {const response = await fetch(`${this.serverUrl}/message`, {method: 'POST',headers: this.headers,body: JSON.stringify({jsonrpc: '2.0',method: 'initialize',params: {clientInfo: { name: 'MCP Client', version: '1.0' },capabilities: {}}})});if (!response.ok) {throw new Error(`HTTP error! status: ${response.status}`);}this.session_id = response.headers.get('Mcp-Session-Id');addMessage(`Session ID: ${this.session_id}`);this.listen_sse();} catch (error) {addMessage(`Failed to initialize session: ${error}`);}}async send_message(method, params) {if (!this.session_id) {await this.initialize();}try {const response = await fetch(`${this.serverUrl}/message`, {method: 'POST',headers: { 'Mcp-Session-Id': this.session_id, ...this.headers },body: JSON.stringify({jsonrpc: '2.0',id: 1,method: method,params: params || {}})});if (!response.ok) {throw new Error(`HTTP error! status: ${response.status}`);}return await response.json();} catch (error) {addMessage(`Failed to send message: ${error}`);return null;}}listen_sse() {if (!this.session_id) {return;}const eventSource = new EventSource(`${this.serverUrl}/message?Mcp-Session-Id=${this.session_id}`, {headers: { 'Mcp-Session-Id': this.session_id }});eventSource.onmessage = (event) => {addSSEMessage(event.data);};eventSource.onerror = (error) => {addMessage(`Failed to listen SSE: ${error}`);this.reconnect();};}async reconnect() {addMessage('Attempting to reconnect...');await new Promise(resolve => setTimeout(resolve, 5000));await this.initialize();this.listen_sse();}}function addMessage(message) {const messagesDiv = document.getElementById('messages');const messageDiv = document.createElement('div');messageDiv.className = 'message';messageDiv.textContent = message;messagesDiv.appendChild(messageDiv);messagesDiv.scrollTop = messagesDiv.scrollHeight;}function addSSEMessage(message) {const messagesDiv = document.getElementById('messages');const messageDiv = document.createElement('div');messageDiv.className = 'sse-message';messageDiv.textContent = `SSE Message: ${message}`;messagesDiv.appendChild(messageDiv);messagesDiv.scrollTop = messagesDiv.scrollHeight;}</script>
</body>
</html>
運行步驟
-
安裝依賴:
確保安裝了所需的庫:pip install starlette uvicorn httpx aiofiles
-
啟動服務器:
將服務端代碼保存為server.py
,然后運行以下命令啟動服務器:uvicorn server:app --reload
-
運行客戶端:
將客戶端代碼保存為client.py
,然后運行以下命令啟動客戶端:python client.py
-
打開前端頁面:
將前端頁面代碼保存為index.html
,然后在瀏覽器中打開該文件。
示例運行效果
客戶端輸出
Session ID: 587bb6ad-08f5-4102-8b27-4c276e9d7815
Response: {'jsonrpc': '2.0', 'id': 1, 'result': 'File content here'}
Listening for SSE messages...
SSE Message: data: {"message": "Hello from server!", "sleep": 1, "datetime": "2024-01-01 12:00:00"}
SSE Message: data: {"message": "Hello from server!", "sleep": 2, "datetime": "2024-01-01 12:00:02"}
...
服務器輸出
INFO: Started server process [12345]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO: 127.0.0.1:51487 - "POST /message HTTP/1.1" 200 OK
前端頁面效果
前端頁面將顯示會話ID、發送的消息以及接收到的SSE流消息。
調試
- 檢查服務器日志:查看服務器日志,確認是否生成了
Mcp-Session-Id
并返回給客戶端。 - 檢查網絡請求:使用瀏覽器開發者工具(F12),查看網絡請求的響應頭,確認是否包含
Mcp-Session-Id
。 - 檢查跨域問題:確保服務器正確配置了 CORS,允許前端頁面的域名和端口。
希望這些信息能夠幫助你成功實現基于MCP協議的Streamable HTTP服務端、客戶端和前端頁面。