? ? ? ? 一、Functions簡介
? ? ? ? 可以把Tools作為依賴于外部服務的插件,Functions就是內部插件,二者都是用來增強open webui的能力的。Functions是輕量的,高度可定制的,并且是用純Python編寫的,所以你可以自由地創建任何東西——從新的人工智能工作流到與你使用的任何東西的集成,比如谷歌搜索或家庭助理。
? ? ? ?在open webui中的Function包括三種類型:Pipe Function、Filter Function和Action Function。
? ? ? ? Pipe類型的Function用于自定義Agent或模型,用戶在對話中可以像普通的模型那樣選擇使用。
? ? ? ? Filter類型的Function用于對往返大模型的數據進行處理,從而可以在不中斷對話的前提下,攔截對話內容并進行修改或其他處理,比如日志。過濾器一般用于輕量級處理,包括:發送數據到監控平臺、記錄日志、修改用戶輸入、阻斷有害消息、翻譯和限流等。
? ? ? ? Action類型的Function用來對聊天界面的按鈕進行定制。這些按鈕出現在單個聊天消息下方,讓您可以方便地一鍵訪問您定義的操作。
? ? ? ? 本文僅對Action類型的Function進行解析。? ? ? ??
? ? ? ? 二、導入一個Function
? ? ? ? 1)進入open webui社區的Functions頁面,選擇一個Function,這里以Save Outputs為例
? ? ? ? 2)點擊Save Outputs,進入如下頁面
? ? ? ? 3)點擊Get,在對話框填寫你的open webui的地址
? ? ? ? 4)點擊Import to WebUI
? ? ? ? 進入open webui頁面,顯示函數代碼,核心代碼為把大模型的輸出寫入本地文件中,完整源碼如下:
"""
title: save_outputs
author: stefanpietrusky
author_url: https://downchurch.studio/
inspiration: add_to_memories_action_button @pad4651
instruction: you need to mount the container folder /app/data with a local folder when creating the container! ?--mount type=bind,source="FOLDER PATH\docker_data",target=/app/data“
icon_url: ?
version: 0.1
"""import os
from pydantic import BaseModel, Field
from typing import Optional
class Action:
? ? class Valves(BaseModel):
? ? ? ? pass? ? class UserValves(BaseModel):
? ? ? ? show_status: bool = Field(
? ? ? ? ? ? default=True, description="Show status of the action."
? ? ? ? )
? ? ? ? pass? ? def __init__(self):
? ? ? ? self.valves = self.Valves()
? ? ? ? pass? ? async def action(
? ? ? ? self,
? ? ? ? body: dict,
? ? ? ? __user__=None,
? ? ? ? __event_emitter__=None,
? ? ? ? __event_call__=None,
? ? ) -> Optional[dict]:
? ? ? ? print(f"action:{__name__}")? ? ? ? user_valves = __user__.get("valves")
? ? ? ? if not user_valves:
? ? ? ? ? ? user_valves = self.UserValves()? ? ? ? if __event_emitter__:
? ? ? ? ? ? last_assistant_message = body["messages"][-1]? ? ? ? ? ? if user_valves.show_status:
? ? ? ? ? ? ? ? await __event_emitter__(
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? "type": "status",
? ? ? ? ? ? ? ? ? ? ? ? "data": {"description": "Saving to file", "done": False},
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? )? ? ? ? ? ? try:
? ? ? ? ? ? ? ? directory = "/app/data"
? ? ? ? ? ? ? ? if not os.path.exists(directory):
? ? ? ? ? ? ? ? ? ? os.makedirs(directory)? ? ? ? ? ? ? ? file_path = os.path.join(directory, "saved_outputs.txt")
? ? ? ? ? ? ? ? with open(file_path, "a") as file:
? ? ? ? ? ? ? ? ? ? file.write(f"{last_assistant_message['content']}\n\n")
? ? ? ? ? ? ? ? print("Output saved to file in the container, accessible on the host.")? ? ? ? ? ? except Exception as e:
? ? ? ? ? ? ? ? print(f"Error saving output to file: {str(e)}")
? ? ? ? ? ? ? ? if user_valves.show_status:
? ? ? ? ? ? ? ? ? ? await __event_emitter__(
? ? ? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? ? ? "type": "status",
? ? ? ? ? ? ? ? ? ? ? ? ? ? "data": {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "description": "Error Saving to File",
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "done": True,
? ? ? ? ? ? ? ? ? ? ? ? ? ? },
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? )? ? ? ? ? ? if user_valves.show_status:
? ? ? ? ? ? ? ? await __event_emitter__(
? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? "type": "status",
? ? ? ? ? ? ? ? ? ? ? ? "data": {"description": "Output Saved", "done": True},
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? )
?
? ? ? ? 因為可能是惡意代碼,所以需要閱讀檢查代碼。檢查無誤后,可以保存,該函數便作為插件進入open webui體系中。
? ? ? ? ? 三、具體使用
? ? ? ? 函數生效后,在大模型返回對一個問題的應答后,在工具欄顯示該函數圖標。
? ? ? ? 用戶點擊該鏈接,則保存當前大模型輸出寫入到文件中。
? ? ? ? 三、源碼分析
? ? ? ? 1)數據模型
? ? ? ?Function數據保存在Function表中,表定義如下:
? ? ? ? 其中:
? ? ? ? ? ? ? ? id:函數唯一標識
? ? ? ? ? ? ? ? userid:用戶唯一標識
? ? ? ? ? ? ? ? name:函數名
? ? ? ? ? ? ? ? type:函數類型 filter|pipe|action
? ? ? ? ? ? ? ? content:方法源代碼
? ? ? ? ? ? ? ?meta:元數據
? ? ? ? ? ? ? ? valves:閾值
? ? ? ? ? ? ? ? is_active:是否被激活(激活后才可見)
? ? ? ? ? ? ? ? is_global:全局還是局部(僅某個用戶使用)
? ? ? ? 2)導入函數
? ? ? ? 從open webui社區頁面點擊 Import to WebUI時,瀏覽器啟動一個新頁面,并提交代碼格式化請求到/app/v1/utils/code/format,后端調用black模塊進行嚴格格式化處理,并把格式化后的代碼返回前端。
@router.post("/code/format")
async def format_code(form_data: CodeForm, user=Depends(get_admin_user)):
? ? try:
? ? ? ? formatted_code = black.format_str(form_data.code, mode=black.Mode())
? ? ? ? return {"code": formatted_code}
? ? except black.NothingChanged:
? ? ? ? return {"code": form_data.code}
? ? except Exception as e:
? ? ? ? raise HTTPException(status_code=400, detail=str(e))
? ? ? ? 完成格式化處理后,后端再提交創建Function請求到 /api/v1/functions/create。請求數據為:
{
? ? "id": "save_outputs",
? ? "name": "Save Outputs",
? ? "meta": {
? ? ? ? "description": "Save outputs locally on your computer.",
? ? ? ? "manifest": {
? ? ? ? ? ? "title": "save_outputs",
? ? ? ? ? ? "author": "stefanpietrusky",
? ? ? ? ? ? "author_url": "https://downchurch.studio/",
? ? ? ? ? ? "inspiration": "add_to_memories_action_button @pad4651",
? ? ? ? ? ? "instruction": "you need to mount the container folder /app/data with a local folder when creating the container! ?--mount type=bind,source=\"FOLDER PATH\\docker_data\",target=/app/data\"",
? ? ? ? ? ? "icon_url": "",
? ? ? ? ? ? "version": "0.1"
? ? ? ? },
? ? ? ? "type": "action",
? ? ? ? "user": {
? ? ? ? ? ? "id": "9e4f4854-71d9-429a-99b9-9338a393de9e",
? ? ? ? ? ? "username": "pietrusky",
? ? ? ? ? ? "name": "",
? ? ? ? ? ? "createdAt": 1724186428,
? ? ? ? ? ? "role": null,
? ? ? ? ? ? "verified": false
? ? ? ? },
? ? ? ? "id": "542145b0-59a0-44f2-86f1-dd2f1e64d705"
? ? },? ? #content由注釋和源代碼組成
? ? "content": ?"\"\"\"\ntitle: save_outputs\nauthor: stefanpietrusky\nauthor_url: https://downchurch.studio/\ninspiration: add_to_memories_action_button @pad4651\ninstruction: you need to mount the container folder /app/data with a local folder when creating the container! ?--mount type=bind,source=\"FOLDER PATH\\docker_data\",target=/app/data\"\nicon_url: \nversion: 0.1\n\"\"\"\n\nimport os\nfrom pydantic import BaseModel, Field\nfrom typing import Optional\n\n\nclass Action:\n ? ?class Valves(BaseModel):\n ? ? ? ?pass\n\n ? ?class UserValves(BaseModel):\n ? ? ? ?show_status: bool = Field(\n ? ? ? ? ? ?default=True, description=\"Show status of the action.\"\n ? ? ? ?)\n ? ? ? ?pass\n\n ? ?def __init__(self):\n ? ? ? ?self.valves = self.Valves()\n ? ? ? ?pass\n\n ? ?async def action(\n ? ? ? ?self,\n ? ? ? ?body: dict,\n ? ? ? ?__user__=None,\n ? ? ? ?__event_emitter__=None,\n ? ? ? ?__event_call__=None,\n ? ?) -> Optional[dict]:\n ? ? ? ?print(f\"action:{__name__}\")\n\n ? ? ? ?user_valves = __user__.get(\"valves\")\n ? ? ? ?if not user_valves:\n ? ? ? ? ? ?user_valves = self.UserValves()\n\n ? ? ? ?if __event_emitter__:\n ? ? ? ? ? ?last_assistant_message = body[\"messages\"][-1]\n\n ? ? ? ? ? ?if user_valves.show_status:\n ? ? ? ? ? ? ? ?await __event_emitter__(\n ? ? ? ? ? ? ? ? ? ?{\n ? ? ? ? ? ? ? ? ? ? ? ?\"type\": \"status\",\n ? ? ? ? ? ? ? ? ? ? ? ?\"data\": {\"description\": \"Saving to file\", \"done\": False},\n ? ? ? ? ? ? ? ? ? ?}\n ? ? ? ? ? ? ? ?)\n\n ? ? ? ? ? ?try:\n ? ? ? ? ? ? ? ?directory = \"/app/data\"\n ? ? ? ? ? ? ? ?if not os.path.exists(directory):\n ? ? ? ? ? ? ? ? ? ?os.makedirs(directory)\n\n ? ? ? ? ? ? ? ?file_path = os.path.join(directory, \"saved_outputs.txt\")\n ? ? ? ? ? ? ? ?with open(file_path, \"a\") as file:\n ? ? ? ? ? ? ? ? ? ?file.write(f\"{last_assistant_message['content']}\\n\\n\")\n ? ? ? ? ? ? ? ?print(\"Output saved to file in the container, accessible on the host.\")\n\n ? ? ? ? ? ?except Exception as e:\n ? ? ? ? ? ? ? ?print(f\"Error saving output to file: {str(e)}\")\n ? ? ? ? ? ? ? ?if user_valves.show_status:\n ? ? ? ? ? ? ? ? ? ?await __event_emitter__(\n ? ? ? ? ? ? ? ? ? ? ? ?{\n ? ? ? ? ? ? ? ? ? ? ? ? ? ?\"type\": \"status\",\n ? ? ? ? ? ? ? ? ? ? ? ? ? ?\"data\": {\n ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?\"description\": \"Error Saving to File\",\n ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?\"done\": True,\n ? ? ? ? ? ? ? ? ? ? ? ? ? ?},\n ? ? ? ? ? ? ? ? ? ? ? ?}\n ? ? ? ? ? ? ? ? ? ?)\n\n ? ? ? ? ? ?if user_valves.show_status:\n ? ? ? ? ? ? ? ?await __event_emitter__(\n ? ? ? ? ? ? ? ? ? ?{\n ? ? ? ? ? ? ? ? ? ? ? ?\"type\": \"status\",\n ? ? ? ? ? ? ? ? ? ? ? ?\"data\": {\"description\": \"Output Saved\", \"done\": True},\n ? ? ? ? ? ? ? ? ? ?}\n ? ? ? ? ? ? ? ?)"
}
? ? ? ?對應函數源碼如下:
處理流程如下:
1)防錯處理,判斷函數名是否符合python標識符的命名規則,不符合則報錯
2)對源中import的模塊名進行替換
3)加載源碼成為可使用的模塊
4)把該函數加載到全局FUNCTIONS中,供后繼使用
5)為函數創建緩存目錄
@router.post("/create", response_model=Optional[FunctionResponse])
async def create_new_function(
? ? request: Request, form_data: FunctionForm, user=Depends(get_admin_user)
):
? ? if not form_data.id.isidentifier(): #對id進行校驗
? ? ? ? raise HTTPException(
? ? ? ? ? ? status_code=status.HTTP_400_BAD_REQUEST,
? ? ? ? ? ? detail="Only alphanumeric characters and underscores are allowed in the id",
? ? ? ? )? ? form_data.id = form_data.id.lower()
? ? #從Function表查詢該函數是否已經入庫
? ? function = Functions.get_function_by_id(form_data.id)
? ? if function is None:?
? ? ? ? try:? ? ? ? ? ? #用本地模塊名,替換源碼中的模塊名,比如用from open_webui.utils替換from utils?
? ? ? ? ? ? form_data.content = replace_imports(form_data.content)? ? ? ? ? ? #把函數加載為模塊
? ? ? ? ? ? function_module, function_type, frontmatter = load_function_module_by_id(
? ? ? ? ? ? ? ? form_data.id,
? ? ? ? ? ? ? ? content=form_data.content,
? ? ? ? ? ? )
? ? ? ? ? ? form_data.meta.manifest = frontmatter? ? ? ? ? ? #把Function實例增加到全局FUNCTIONS中
? ? ? ? ? ? FUNCTIONS = request.app.state.FUNCTIONS
? ? ? ? ? ? FUNCTIONS[form_data.id] = function_module? ? ? ? ? ? #把函數數據插入到FUNCTION表中
? ? ? ? ? ? function = Functions.insert_new_function(user.id, function_type, form_data)
? ? ? ? ? ? #為該方法創建目錄/app/backend/data/cache/functions/{函數名}
? ? ? ? ? ? function_cache_dir = CACHE_DIR / "functions" / form_data.id
? ? ? ? ? ? function_cache_dir.mkdir(parents=True, exist_ok=True)? ? ? ? ? ? if function:
? ? ? ? ? ? ? ? return function
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? raise HTTPException(
? ? ? ? ? ? ? ? ? ? status_code=status.HTTP_400_BAD_REQUEST,
? ? ? ? ? ? ? ? ? ? detail=ERROR_MESSAGES.DEFAULT("Error creating function"),
? ? ? ? ? ? ? ? )
? ? ? ? except Exception as e:
? ? ? ? ? ? log.exception(f"Failed to create a new function: {e}")
? ? ? ? ? ? raise HTTPException(
? ? ? ? ? ? ? ? status_code=status.HTTP_400_BAD_REQUEST,
? ? ? ? ? ? ? ? detail=ERROR_MESSAGES.DEFAULT(e),
? ? ? ? ? ? )
? ? else: #如果已經入庫,則報錯
? ? ? ? raise HTTPException(
? ? ? ? ? ? status_code=status.HTTP_400_BAD_REQUEST,
? ? ? ? ? ? detail=ERROR_MESSAGES.ID_TAKEN,
? ? ? ? )
?
? ? ? ? 在該方法中的核心代碼是load_function_module_by_id,load_function_module_by_id實現代碼動態加載,重點分析一下。
def load_function_module_by_id(function_id: str, content: str | None = None):
? ? #如果參數content為None,則從數據庫查詢
? ? if content is None:
? ? ? ? function = Functions.get_function_by_id(function_id)
? ? ? ? if not function:
? ? ? ? ? ? raise Exception(f"Function not found: {function_id}")
? ? ? ? content = function.content? ? ? ? content = replace_imports(content)#替換源碼中的導入的模塊名
? ? ? ? Functions.update_function_by_id(function_id, {"content": content})#更新數據庫content
? ? else:#從content提取元數據
? ? ? ? frontmatter = extract_frontmatter(content)? ? ? ? #安裝依賴模塊
? ? ? ? install_frontmatter_requirements(frontmatter.get("requirements", ""))? ?
? ? module_name = f"function_{function_id}"
? ? #創建function_{function_id}模塊,比如function_save_outputs
? ? module = types.ModuleType(module_name)? ? #加載模塊到sys_modules
? ? sys.modules[module_name] = module? ? # 創建臨時文件,用于存儲函數的源代碼
?? ?temp_file = tempfile.NamedTemporaryFile(delete=False)
? ? temp_file.close()
? ? try:? ? ? ? #把源代碼寫入臨時文件
? ? ? ? with open(temp_file.name, "w", encoding="utf-8") as f:
? ? ? ? ? ? f.write(content)
? ? ? ? module.__dict__["__file__"] = temp_file.name #設置模塊的__file__為臨時文件名? ? ? ? # 在本模塊的命名空間運行源代碼,完成模塊源碼的載入
? ? ? ? exec(content, module.__dict__)
? ? ? ? frontmatter = extract_frontmatter(content)
? ? ? ? log.info(f"Loaded module: {module.__name__}")? ? ? ? # 根據Function類型,返回對應類的實例
? ? ? ? if hasattr(module, "Pipe"):#返回管道實例
? ? ? ? ? ? return module.Pipe(), "pipe", frontmatter
? ? ? ? elif hasattr(module, "Filter"): #返回過濾器實例
? ? ? ? ? ? return module.Filter(), "filter", frontmatter
? ? ? ? elif hasattr(module, "Action"):
? ? ? ? ? ? return module.Action(), "action", frontmatter? #返回Action實例
? ? ? ? else:
? ? ? ? ? ? raise Exception("No Function class found in the module")
? ? except Exception as e:
? ? ? ? log.error(f"Error loading module: {function_id}: {e}")
? ? ? ? # Cleanup by removing the module in case of error
? ? ? ? del sys.modules[module_name]? ? ? ? Functions.update_function_by_id(function_id, {"is_active": False})
? ? ? ? raise e
? ? finally:
? ? ? ? os.unlink(temp_file.name)
? ? ? ? 3)執行函數
? ? ? ? 用戶在對話界面點擊按鈕執行函數時,后端入口為http://{ip:port}/api/chat/actions/{函數名},后端調用該函數執行對應的操作。對應入口函數為chat_action。
該方法和簡潔,主要是調用chat_action_handle。
@app.post("/api/chat/actions/{action_id}")
async def chat_action(
? ? request: Request, action_id: str, form_data: dict, user=Depends(get_verified_user)
):
? ? try:
? ? ? ? model_item = form_data.pop("model_item", {})? ? ? ? if model_item.get("direct", False):
? ? ? ? ? ? request.state.direct = True
? ? ? ? ? ? request.state.model = model_item? ? ? ? return await chat_action_handler(request, action_id, form_data, user)
? ? except Exception as e:
? ? ? ? raise HTTPException(
? ? ? ? ? ? status_code=status.HTTP_400_BAD_REQUEST,
? ? ? ? ? ? detail=str(e),
? ? ? ? )
? ? ? ??chat_action_handle實際對應?open_webui.utils.chat模塊中的chat_action方法,具體源碼如下:
async def chat_action(request: Request, action_id: str, form_data: dict, user: Any):
? ? if "." in action_id: #如果action_id是多層,則用'.'分割
? ? ? ? action_id, sub_action_id = action_id.split(".")
? ? else:
? ? ? ? sub_action_id = None? ? action = Functions.get_function_by_id(action_id)#從數據庫查找Function是否存在
? ? if not action:
? ? ? ? raise Exception(f"Action not found: {action_id}")? ? #以下代碼確定使用的模型
? ? if not request.app.state.MODELS:?
? ? ? ? await get_all_models(request, user=user)? ? if getattr(request.state, "direct", False) and hasattr(request.state, "model"):
? ? ? ? models = {
? ? ? ? ? ? request.state.model["id"]: request.state.model,
? ? ? ? }
? ? else:
? ? ? ? models = request.app.state.MODELS? ? data = form_data
? ? model_id = data["model"]? ? if model_id not in models:
? ? ? ? raise Exception("Model not found")
? ? model = models[model_id]? ? #通過websocket發送數據到前端
? ? __event_emitter__ = get_event_emitter(
? ? ? ? {
? ? ? ? ? ? "chat_id": data["chat_id"],
? ? ? ? ? ? "message_id": data["id"],
? ? ? ? ? ? "session_id": data["session_id"],
? ? ? ? ? ? "user_id": user.id,
? ? ? ? }
? ? )
? ? __event_call__ = get_event_call(
? ? ? ? {
? ? ? ? ? ? "chat_id": data["chat_id"],
? ? ? ? ? ? "message_id": data["id"],
? ? ? ? ? ? "session_id": data["session_id"],
? ? ? ? ? ? "user_id": user.id,
? ? ? ? }
? ? )? ? #根據action_id獲取模塊
? ? function_module, _, _ = get_function_module_from_cache(request, action_id)
? ? #閥門處理
? ? if hasattr(function_module, "valves") and hasattr(function_module, "Valves"):
? ? ? ? valves = Functions.get_function_valves_by_id(action_id)
? ? ? ? function_module.valves = function_module.Valves(**(valves if valves else {}))? ? if hasattr(function_module, "action"):
? ? ? ? try:
? ? ? ? ? ? action = function_module.action#從Action類中獲取action方法? ? ? ? ? ? # 得到函數簽名
? ? ? ? ? ? sig = inspect.signature(action)
? ? ? ? ? ? params = {"body": data}? ? ? ? ? ? # Extra parameters to be passed to the function
? ? ? ? ? ? extra_params = {
? ? ? ? ? ? ? ? "__model__": model,
? ? ? ? ? ? ? ? "__id__": sub_action_id if sub_action_id is not None else action_id,
? ? ? ? ? ? ? ? "__event_emitter__": __event_emitter__,
? ? ? ? ? ? ? ? "__event_call__": __event_call__,
? ? ? ? ? ? ? ? "__request__": request,
? ? ? ? ? ? }? ? ? ? ? ? #把extra_params中的項中與函數簽名中的參數匹配的項加入到params中
? ? ? ? ? ? for key, value in extra_params.items():
? ? ? ? ? ? ? ? if key in sig.parameters:
? ? ? ? ? ? ? ? ? ? params[key] = value? ? ? ? ? ? if "__user__" in sig.parameters:
? ? ? ? ? ? ? ? #如果函數簽名中有__user__,則在調用參數中增加用戶相關閥門設置
? ? ? ? ? ? ? ? __user__ = user.model_dump() if isinstance(user, UserModel) else {}? ? ? ? ? ? ? ? try:
? ? ? ? ? ? ? ? ? ? if hasattr(function_module, "UserValves"):
? ? ? ? ? ? ? ? ? ? ? ? __user__["valves"] = function_module.UserValves(
? ? ? ? ? ? ? ? ? ? ? ? ? ? **Functions.get_user_valves_by_id_and_user_id(
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? action_id, user.id
? ? ? ? ? ? ? ? ? ? ? ? ? ? )
? ? ? ? ? ? ? ? ? ? ? ? )
? ? ? ? ? ? ? ? except Exception as e:
? ? ? ? ? ? ? ? ? ? log.exception(f"Failed to get user values: {e}")? ? ? ? ? ? ? ? params = {**params, "__user__": __user__}
? ? ? ? ? ? if inspect.iscoroutinefunction(action): #如果action方法是協程,則await調用
? ? ? ? ? ? ? ? data = await action(**params)
? ? ? ? ? ? else: #非協程則直接調用
? ? ? ? ? ? ? ? data = action(**params)? ? ? ? except Exception as e:
? ? ? ? ? ? return Exception(f"Error: {e}")? ? return data
?