MCP項目開發-一個簡單的RAG示例
前言
前言
- 客戶端是基于官網的例子改的,模型改成了openai庫連接
- 僅僅使用基礎的RAG流程作為一個演示,包含了以下步驟
- query改寫
- 搜索:使用google serper
- 重排序:使用硅基流動的api
- 大模型api也使用的硅基流動
- 調用速度還可以
官方文檔
-
mcp官方文檔
https://modelcontextprotocol.io/introduction
使用到的api注冊
-
檢索:google serper
https://serper.dev
有2500 Credits,具體也沒搞明白,好像是每次檢索返回的文本數量超過10則花費2Credits,不過這個不重要,反正自己用肯定夠,api在這
?
連接示例
curl --location 'https://google.serper.dev/search' \ --header 'X-API-KEY: 你的api-key' \ --header 'Content-Type: application/json' \ --data '{"q": "你的問題","num": 2 }'
其中q表示要檢索的問題,然后num為返回的文本數,返回示例如下
{"searchParameters": {"q": "中國首都是哪","type": "search","num": 2,"engine": "google"},"organic": [{"title": "中華人民共和國首都 - 維基百科","link": "https://zh.wikipedia.org/zh-hans/%E4%B8%AD%E5%8D%8E%E4%BA%BA%E6%B0%91%E5%85%B1%E5%92%8C%E5%9B%BD%E9%A6%96%E9%83%BD","snippet": "中華人民共和國首都位于北京市,新中國成立前夕的舊稱為北平,是中共中央及中央人民政府所在地,中央四個直轄市之一,全國政治、文化、國際交往和科技創新中心,中國古都、 ...","position": 1},{"title": "中華人民共和國首都_中華人民共和國中央人民政府門戶網站","link": "https://www.gov.cn/guoqing/2005-05/24/content_2615214.htm","snippet": "1949年9月27日,中國人民政治協商會議第一屆全體會議一致通過中華人民共和國的國都定于北平,即日起北平改名北京。 北京,簡稱京,是中國共產黨中央委員會、中華人民共和國 ...","position": 2}],"relatedSearches": [{"query": "中國首都是北京還是上海"},{"query": "中國首都為什么是北京"},{"query": "中國首都上海"},{"query": "中國以前的首都"},{"query": "新中國首都選址"},{"query": "中國歷代首都"},{"query": "中國首都候選"},{"query": "中國首都英文"}],"credits": 1 }
主要的就是在
organic
中,如果我們的num為2,則organic中返回兩個dict,我們取的是沒個dict中的snippet字段 -
聊天大模型與重排序模型:硅基流動
https://cloud.siliconflow.cn/
注冊可以看我的另一篇
https://blog.csdn.net/hbkybkzw/article/details/145558234
api-key如圖
?
服務器
環境搭建
-
官方推薦使用uv來管理環境,先下載uv
pip install uv#或者macos linux curl -LsSf https://astral.sh/uv/install.sh | sh#或者windows powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
之后重啟終端
-
初始化項目。名稱為rag_sample
uv init rag_sample
-
創建虛擬環境并激活
cd rag_sample uv venv# windows .venv\Scripts\activate# linux source .venv/bin/activate
-
安裝依賴
openai庫是客戶端用的,為了方便,客戶端就不初始化了,直接和服務器用同一個環境
# windows uv add mcp[cli] aiohttp dotenv openai rich# linux uv add "mcp[cli]" aiohttp dotenv openai rich
構建服務器
-
在rag_sample目錄下,創建 .env 文件保存為環境變量
GOOGLE_SERPER_URL=https://google.serper.dev/search GOOGLE_SERPER_API= SILICONFLOW_RERANKING_BASE_URL=https://api.siliconflow.cn/v1/rerank SILICONFLOW_CHAT_BASE_URL=https://api.siliconflow.cn/v1/chat/completions SILICONFLOW_RERANKING_API_KEY= RERANKING_MODEL=BAAI/bge-reranker-v2-m3 OPENAI_CHAT_MODEL=Qwen/QwQ-32B OPENAI_BASE_URL=https://api.siliconflow.cn/v1
將
GOOGLE_SERPER_API
和SILICONFLOW_RERANKING_API_KEY
填為自己的 -
rag_client.py
import mcp import aiohttp import asyncio from mcp.server.fastmcp import FastMCP from utils import query_rewrite,search_with_serper,reranking_by_bgemcp = FastMCP("rag")@mcp.tool() async def rag_sample(query:str,doc_num:int=10,top_n:int=4) -> list:"""rag_sample: 輸入query,輸出檢索重排序后的文檔Parameters:-------------------------------------------------------------------------------------------------------------------query: type: str desc: 用戶問題 doc_num: type: int desc: 檢索文檔數 top_n: type: top_n desc: 重排序后返回top_n Returns:-------------------------------------------------------------------------------------------------------------------reranked_doc: type: list desc: top_n檢索文檔"""rewrite_query = await query_rewrite(query=query)documents = await search_with_serper(query=rewrite_query,document_num=doc_num)reranked_doc = await reranking_by_bge(query=query,documents=documents,top_n=top_n)return reranked_docasync def test():query = "MCP協議介紹"reranked_doc = await rag_sample(query=query)importrich.print(reranked_doc)if __name__ == "__main__":# asyncio.run(test())mcp.run(transport="stdio")
test函數是用來測試的
?
我們將主要的代碼放在
rag_client.py
,輔助函數都放在utils.py
中主要的流程就是
query_rewrite
: 傳入用戶問題,返回改寫后的問題search_with_serper
: 傳入改寫后的問題和檢索數量,返回檢索到的文檔信息reranking_by_bge
:傳入用戶問題和檢索到的信息,返回相似度高的top_n文檔信息
代碼見下面的utils.py
-
utils.py
import asyncio import aiohttp from dotenv import load_dotenv import os#從.env文件加載環境變量 load_dotenv()# google serper google_serper_url = os.getenv("GOOGLE_SERPER_URL") google_serper_api = os.getenv("GOOGLE_SERPER_API")# siliconflow api chat_base_url = os.getenv("SILICONFLOW_CHAT_BASE_URL") reranking_base_url = os.getenv("SILICONFLOW_RERANKING_BASE_URL") reranking_api_key = os.getenv("SILICONFLOW_RERANKING_API_KEY") reranking_model = os.getenv("RERANKING_MODEL")# query rewrite prompt query_rewrite_prompt = """ 請根據以下規則優化用戶的問題: 1. 首先判斷用戶query是否為問候或打招呼(例如:'你好'、'在嗎'、'您好'等)。如果是問候,直接返回原文query。 2. 如果不是,對query進行優化改寫,請保持原意并使改寫后的query更精準更簡潔,限制在15個字以內 示例:- 輸入:'你好,能幫個忙嗎?'輸出:'你好,能幫個忙嗎?'- 輸入:'怎么重寫句子結構?'輸出:'句子結構優化步驟示例'輸出時無需解釋,僅返回最終結果。 """async def fetch_search_results(url, headers, payload):"""發動請求并獲取響應"""async with aiohttp.ClientSession() as session:async with session.post(url, headers=headers, json=payload) as response:assert response.status == 200response_json = await response.json()return response_json # 返回響應jsonasync def query_rewrite(query:str):"""一個簡單的query改寫,直接用aiohttp了,沒有使用openai"""url = chat_base_urlheaders = {'Authorization': f'Bearer {reranking_api_key}','Content-Type': 'application/json'}payload={"model": "Qwen/Qwen2.5-72B-Instruct","messages": [{"role": "system","content": query_rewrite_prompt},{"role": "user","content": query}],"max_tokens": 512,"stream":False}response_data = await fetch_search_results( url=url,headers=headers,payload=payload)rewrite_query = response_data['choices'][0]['message']['content']return rewrite_queryasync def search_with_serper(query:str,document_num:int=10):"""google serper 搜索并返回documents,形式如下[{"title1":"","link1":"","snippet1":"","position1":""},{"title2":"","link2":"","snippet2":"","position2":""},...]"""url = google_serper_urlheaders = {'X-API-KEY': google_serper_api,'Content-Type': 'application/json'}payload = {"q": query, # 要搜索的問題"num": document_num # 要所搜的文檔數量}response_data = await fetch_search_results( url=url,headers=headers,payload=payload)documents = response_data['organic']return documents async def reranking_by_bge(query:str,documents:list[dict],top_n:int=4):"""調用排序模型,返回排序后的文檔"""doc_to_rerank = [doc["snippet"] for doc in documents]url = reranking_base_urlheaders = {'Authorization': f'Bearer {reranking_api_key}','Content-Type': 'application/json'}payload = {"model": reranking_model,"query": query,"documents": doc_to_rerank,"top_n": top_n,"return_documents": True, # 如果是false則只返回索引,麻煩"max_chunks_per_doc": 1024,"overlap_tokens": 80}reranked_info = await fetch_search_results( url=url,headers=headers,payload=payload)reranked_doc =[item['document']['text'] for item in reranked_info['results']]return reranked_docasync def test():import richquery = "流水不爭先"rewrite_query = await query_rewrite(query=query)rich.print('rewrite_query',rewrite_query)documents = await search_with_serper(query=rewrite_query,document_num=4)rich.print('documents',documents)reranked_doc = await reranking_by_bge(query=query,documents=documents,top_n=2)rich.print("reranked_doc",reranked_doc)if __name__ == "__main__":asyncio.run(test())
test函數是用來測試的
?
-
運行 python rag_client.py以確認一切正常
app端使用(cherrystudio為例)
-
安裝cherrystudio,并配置好模型
https://cherry-ai.com/
-
在app端添加單個weather服務器,下面的json文件需要復制到cherrystudio中
{"mcpServers": {"rag": {"command": "uv","args": ["--directory","C:\\Users\\孟智超\\Desktop\\mac_backups\\MCP\\rag_sample","run","rag_client.py"]}} }
意思是:
- 有一個名為“天氣”的 MCP 服務器
- 通過運行
uv --directory C:\\Users\\孟智超\\Desktop\\mac_backups\\MCP\\rag_sample run rag_client.py
來啟動它(項目的地址)
-
將上面的json文本復制到cherrystudio
?
-
選擇模型,因為mcp也借用了functioncalling,所以需要選用的模型必須支持functioncalling(tools)
在cherrystudio中的表現就是,有綠色的小扳手
?
-
開始聊天之前開啟mcp
?
-
看看使用前后的對比
問題:什么是MCP?
未開啟前,列舉了很多,但是都不是我們想要的
?
開啟后,雖然在思考過程中也提到了其他無關的,但是最后還是決定調用工具rag_sample,最后得到的是我們想要的正確答案。
?
客戶端
代碼
-
和服務器共用一個環境(不推薦,一般重新開一個環境)
-
代碼
rag_client.py
import os import sys import json import asyncio from typing import Optional from contextlib import AsyncExitStackfrom mcp import ClientSession,StdioServerParameters from mcp.client.stdio import stdio_clientimport openai from openai import OpenAI from dotenv import load_dotenv import traceback# 從.env文件加載環境變量 load_dotenv() # siliconflow api openai.base_url = os.getenv("OPENAI_BASE_URL") openai.api_key = os.getenv("SILICONFLOW_RERANKING_API_KEY") openai_chat_model = os.getenv("OPENAI_CHAT_MODEL")class MCPClient:def __init__(self):# 初始化會話和客戶端對象self.session: Optional[ClientSession] = None # 保存mcp客戶端會話self.exit_stack = AsyncExitStack()self.model = openai_chat_modelself.openai = OpenAI(base_url=openai.base_url,api_key=openai.api_key)async def connect_to_server(self, server_script_path:str):"""connect_to_server: 連接到mcp服務器 Parameters:-------------------------------------------------------------------------------------------------------------------server_script_path: type:str desc: server腳本地址(.py or .js) """is_python = server_script_path.endswith('.py') # python腳本?is_js = server_script_path.endswith('.js') # node 腳本?if not (is_python or is_js):raise ValueError("Server script must be a .py or .js file")command = "python" if is_python else "node"server_params = StdioServerParameters(command=command,args=[server_script_path],env=None) # 告訴mcp客戶端如何啟動服務器,比如當前的 python rag_server.pystdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params) # 啟動服務器進程,建立標準I/O通信管道)self.stdio,self.write = stdio_transport # 拿到讀寫流self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio,self.write) # 創建MCP客戶端會話,與服務器交互)await self.session.initialize() # 發送初始化消息給服務器,等待服務器就緒。# 列出 mcp 服務器上可用的工具response = await self.session.list_tools() # 向 mcp 服務器請求所有已注冊的工具(即@mcp.tool)tools = response.toolsprint("\nConnected to server with tools:", [tool.name for tool in tools])async def process_query(self,query:str) -> str:"""process_query: 使用大模型處理查詢,調用已經注冊的MCP工具 Parameters:-------------------------------------------------------------------------------------------------------------------query: type:str desc:用戶問題 """messages = [{"role":"user","content": query}]response = await self.session.list_tools() # 向 mcp 服務器請求所有已注冊的工具(即@mcp.tool)# 轉換為functional calling 的available_tools格式available_tools = [{"type": "function","function": {"name": tool.name,"description": tool.description,"input_schema": tool.inputSchema}} for tool in response.tools]# print(f"available_tools:\n{available_tools}")# 初始化 大模型連接(OpenAI)response = self.openai.chat.completions.create(model=self.model,max_tokens=1000,messages=messages,tools=available_tools)content = response.choices[0]print(f"content.finish_reason : {content.finish_reason}")if content.finish_reason == "tool_calls": print(f"進入functional calling")# content.finish_reason == "tool_calls"表示要調用工具# 會在content.message.tool_calls 列表中聲明要用哪個函數、參數是什么tool_call = content.message.tool_calls[0]tool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)# 取出工具名 tool_name 和參數 tool_args,執行mcp工具result = await self.session.call_tool(tool_name, tool_args)print(f"\n[Calling tool {tool_name} with args {tool_args}]\n\n")# print(f"\n\ncall_tool_result: {result}")# print(f"\n\ncall_tool_result.content[0].text:{result.content[0].text}")# print(f"i.text for i in result.content:{[i.text for i in result.content]}")# 將模型返回的調用哪個工具數據和工具執行完成后的數據都存入messages中messages.append(content.message.model_dump())messages.append({"role": "tool","content": f"{[i.text for i in result.content]}","tool_call_id": tool_call.id,})# 將上面的結果再次返回給大模型并產生最終結果response = self.openai.chat.completions.create(model=self.model,max_tokens=1000,messages=messages,)return response.choices[0].message.content# 如果沒有調用工具則直接返回當前結果return content.message.contentasync def chat_loop(self):"""運行一個交互式聊天循環"""print("MCP 服務器已經啟動!")print("輸入你的問題(輸入'quit'退出).")while True:try:query = input("\n你的問題:\n").strip()if query.lower() == "quit":breakresponse = await self.process_query(query)print(f"模型回答:\n{response}")except Exception as e:tb = traceback.extract_tb(e.__traceback__)for frame in tb:print("File:", frame.filename)print("Line:", frame.lineno)print("Function:", frame.name)print("Code:", frame.line)async def cleanup(self):"""清理資源"""# 異步地關閉所有在 exit_stack 中注冊的資源(包括 MCP 會話)。await self.exit_stack.aclose()async def main():"""main: 主執行邏輯,以現在的rag為例子標準IO啟動需要服務器的代碼作為客戶端的子進程啟動服務器代碼:rag_server.py客戶端代碼:rag_client.py因為都在同一個項目下,所以啟動直接為python rag_client.py rag_server.py"""if len(sys.argv) < 2: # print("Usage: python client.py <path_to_server_script>")sys.exit(1)client = MCPClient()try:print(sys.argv[1])# python rag_client.py rag_server.py ,則argv[1] = rag_client.py 為客戶端代碼await client.connect_to_server(sys.argv[1]) await client.chat_loop()finally:await client.cleanup()if __name__ == "__main__":asyncio.run(main())
-
connect_to_server
:連接到MCP服務器 -
process_query
: 使用大模型處理查詢,將MCP工具轉換為OpenAI兼容的格式,允許模型決定是否調用工具,如果是,則調用已經注冊的MCP工具 -
chat_loop
: 交互式聊天(只有單輪) -
main
:主函數因為在客戶端腳本中,啟動方式是"stdio"
if __name__ == "__main__":mcp.run(transport="stdio")
這種適合在客戶端和服務器在同一個環境中的情況下,是將服務器作為客戶端的子進程
所以其中這個客戶端的命令如下
python rag_client.py rag_server.py
-
測試運行
-
啟動
python rag_client.py rag_server.py
-
測試下
(模型不太好觸發這個tool,所以我直接加了“請調用rag_sample”)
問題:
請調用rag_sample,解釋下最近很火的mcp
?