1.Assistant API 的主要優點:
減少編碼工作量、自動管理上下文窗口、安全的訪問控制、工具和文檔的輕松集成
本節講應用設計和性能
流式輸出:借助流式輸出,可以讓應用程序實時處理和響應用戶輸入。具體來說,這種技術允許數據在生成的同時即刻傳輸和處理,而不必等待整個數據處理過程完成。這樣,用戶無需等待全部數據加載完成即可開始接收到部分結果,從而顯著提高了應用的反應速度和交互流暢性。
eg:對話時回答是逐漸展現的。
在流式輸出的實現方式中,我們需要在調用 client.chat.completions.create() 時添加 stream=True 參數,用以指定啟用流式輸出,從而允許 API 逐步發送生成的文本片段,然后使用 for 循環來迭代 completion 對象
code如下:
from openai import OpenAI
client = OpenAI()
completion = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一位樂于助人的人工智能小助理。"},
{"role": "user", "content": "你好,請你介紹一下你自己。"}
],
stream=True
)
for chunk in completion:
print(chunk.choices[0].delta)
OpenAI 提供了兩種主要的流實現方法:第一種基于 WebSocket 的直接實時通信方法,第二種是適用于本身不支持此類交互的平臺的模擬流。兩種流實現方法各有其特點和應用場景:
基于 WebSocket 的直接實時通信方法:
WebSocket 是一種網絡通信協議,它允許在用戶的瀏覽器和服務器之間建立一個不斷開的連接,通過這個連接可以實現雙向的數據傳輸。
模擬流
是為了在不支持 WebSocket 或其他實時協議的環境中提供類似的功能。這種方法通過定期發送HTTP請求來“模擬”一個持續的數據流。常見的實現方式包括長輪詢和服務器發送事件(Server-Sent Events, SSE)。
在這兩種方法中,模擬流更加常用,因為它依賴于標準的HTTP請求和響應,易于理解和實現。
二.具體實操:
1.Assistant API 如何開啟流式傳輸
要在一個完整的生命周期內啟用流式傳輸,可以分別在Create Run、 Create Thread and Run和 Submit Tool Outputs 這三個 API 端點中添加 "stream": True 參數。
這樣設置后,API 返回的響應將是一個事件流。
首先,我們按照 Assistant API創建對話或代理的標準流程來構建應用實例。如下代碼所示:
from openai import OpenAI
# 構建 OpenAI 客戶端對象的實例
client = OpenAI()
# Step 1. 創建 Assistant 對象
assistant = client.beta.assistants.create(
model="gpt-4o-mini-2024-07-18",
name="Good writer", ?# 優秀的作家
instructions="You are an expert at writing excellent literature" ?# 你是一位善于寫優秀文學作品的專家
)
# Step 2. 創建 Thread 對象
thread = client.beta.threads.create()
# Step 3. 向Thread中添加Message
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="寫一篇關于一個小女孩在森林里遇到一只怪獸的故事。詳細介紹她的所見所聞,并描述她的心里活動"
)
在Create Run的過程中,添加stream=True參數,開啟流媒體傳輸。代碼如下:
run = client.beta.threads.runs.create(
assistant_id=assistant.id,
thread_id=thread.id,
stream=True ? ? ?# 開啟流式傳輸
)
print(run)
for event in run:
print(event)
其返回的結果是一個 openai.Stream 對象。這個對象表示的是一個流式數據通道,可以用來接收連續傳輸的數據。
2.Assistant API 流式傳輸中的事件流
整個事件流的核心流程包括:當新的運行被創建時,發出 thread.run.created 事件;當運行完成時,發出 thread.run.completed 事件。在運行期間選擇創建消息時,將發出一個 thread.message.created 事件、一個 thread.message.in_progress 事件、多個 thread.message.delta 事件,以及最終的 thread.message.completed 事件。
在處理過程中,任何需要的信息都可以通過訪問數據結構的方式來獲取。
run = client.beta.threads.runs.create(
assistant_id=assistant.id,
thread_id=thread.id,
stream=True ? ? ?# 開啟流媒體傳輸
)
for event in run:
#print(event.event)
if event.event == 'thread.message.delta':
# 提取 text delta 的 value
value = event.data.delta.content[0].text.value
print(value)
按照相同的思路,我們還可以嘗試測試Create Thread and Run和 Submit Tool Outputs兩個端點在啟用流媒體傳輸時的實現方法。
??首先來看Create Thread and Run方法。這個方法很容易理解,它所做的事情就是把創建Thread對象實例、向Thread對象實例中追加Message信息以及構建Run狀態這三個單獨的步驟合并在了一個.beta.threads.create_and_run方法中,所以調用方法也發生了一定的變化,代碼如下:
run = client.beta.threads.create_and_run(
assistant_id=assistant.id,
thread={
"messages": [
{"role": "user", "content": "寫一篇歌頌中國的文章"}
]
},
stream=True,
)
for event in run:
print(event)
3.如何在函數調用中啟用流式傳輸
這里我們定義一個外部函數get_current_weather,用來獲取指定地點的當前天氣狀況。
import json
def get_current_weather(location, unit="celsius"):
"""Get the current weather in a given location in China"""
if "beijing" in location.lower():
return json.dumps({"location": "Beijing", "temperature": "15", "unit": unit})
elif "shanghai" in location.lower():
return json.dumps({"location": "Shanghai", "temperature": "20", "unit": unit})
elif "guangzhou" in location.lower():
return json.dumps({"location": "Guangzhou", "temperature": "25", "unit": unit})
else:
return json.dumps({"location": location, "temperature": "unknown"})
接下來,需要為 get_current_weather 函數編寫一個 JSON Schema的表示。這個 Json Schema的描述將用于指導大模型何時以及如何調用這個外部函數。通過定義必要的參數、預期的數據格式和響應類型,可以確保大模型能夠正確并有效地利用這個功用這個功能。定義如下:
get_weather_desc = {
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g.beijing",
},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["location"],
},
},
}
然后,定義一個available_functions 字典,用來映射函數名到具體的函數對象,因為在大模型識別到需要調用外部函數的時候,我們需要動態地調用函數并獲取到函數的執行結果。
available_functions = {
"get_current_weather": get_current_weather,
}
在準備好外部函數后,我們在定義 Assistant 對象時需要使用 tools 參數。此參數是一個列表的數據類型,我們要將可用的外部工具的 JSON Schema 描述添加進去,從而讓 Assistant 可以正確識別這些工具,代碼如下:
from openai import OpenAI
client = OpenAI()
# Step 1. 創建一個新的 assistant 對象實例
assistant = client.beta.assistants.create(
name="你是一個實時天氣小助理", ??
instructions="你可以調用工具,獲取到當前的實時天氣,再給出最終的回復",?
model="gpt-4o-mini-2024-07-18",
tools=[get_weather_desc],
)?
# Step 2. 創建一個新的 thread 對象實例
thread = client.beta.threads.create()
然后,還是按照Assistant API的標準流程,將Messages追加到Thread中,并執行Run運行狀態。代碼如下:
# Step 3. 將消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京現在的天氣怎么樣?"
)
# Step 4. 執行 運行
run = client.beta.threads.runs.create(
thread_id=thread.id,?
assistant_id=assistant.id,
stream=True
)
for event in run:
print(event)
從上述的輸出中可以分析出:**與直接生成回復的流式過程相比,明顯的區別體現在 `thread.run.step.delta` 事件中。**在這一事件中,增量更新的是大模型輸出的參數`arguments`,這個參數中的內容是用于執行外部函數的,而非對“北京現在的天氣怎么樣?”這個提問的回答。此外,流程會暫停在 `thread.run.requires_action` 事件處,整個過程不會自動生成最終的回復,而是等待外部操作的完成。
因此要在操作中識別這兩點
這里我們仍然可以通過直接提取數據結構的方式來獲取到想要的信息。
# Step 2. 創建一個新的 thread 對象實例
thread = client.beta.threads.create()
# Step 3. 將消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京現在的天氣怎么樣?"
)
# Step 4. 執行 運行
run = client.beta.threads.runs.create(
thread_id=thread.id,?
assistant_id=assistant.id,
stream=True
)
for event in run:
if event.event == 'thread.run.step.delta':
# 打印大模型輸出的參數
print("Delta Event Arguments:")
print(event.data.delta.step_details.tool_calls[0].function.arguments)
print("--------------------------------------------------") ?# 添加分隔符
? ? if event.event == 'thread.run.requires_action':
# 打印需要采取行動的詳細信息
print("Requires Action Data:")
print(event.data)
print("--------------------------------------------------") ?# 添加分隔符
在 event.event == 'thread.run.requires_action' 事件中,當識別到需要執行的外部函數及其參數后,我們可以根據Function Calling中介紹的方法來執行這些函數。在流媒體的事件流中我們只要適當的進行修改,就可以完成這個過程。代碼如下:
# Step 2. 創建一個新的 thread 對象實例
thread = client.beta.threads.create()
# Step 3. 將消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京現在的天氣怎么樣?"
)
# Step 4. 執行 運行
run = client.beta.threads.runs.create(
thread_id=thread.id,?
assistant_id=assistant.id,
stream=True
)
tool_outputs = []
for event in run:
# print(f"event:{event}")
if event.event == 'thread.run.requires_action':
# 先拿到thread.run.requires_action事件的全部信息
function_info = event.data
print("--------------------------------------------------")
print(f"Function Info: {function_info}")
required_action = function_info.required_action
print("--------------------------------------------------")
print(f"Required Action: {required_action}")
tool_calls = required_action.submit_tool_outputs.tool_calls
print("--------------------------------------------------")
print(f"Tool Calls: {tool_calls}")
? ? ? ? # 執行外部函數,使用循環是因為`Assistant API` 可以并行執行多個函數
for tool_call in tool_calls:
tool_id = tool_call.id
function = tool_call.function
function_name = function.name
function_args = json.loads(function.arguments)
function_result = available_functions[function_name](**function_args)
print("--------------------------------------------------")
print(f"Function Result for {function_name}: {function_result}")
? ? ? ? ? ? tool_outputs.append({"tool_call_id": tool_id, "output":function_result})
print("--------------------------------------------------")
print(f"Tool Outputs: {tool_outputs}")
?當獲取到外部函數的執行結果后,需要將這些結果再次追加到 Thread 中,使其再次進入到隊列中,以繼續回答北京現在的天氣怎么樣?這個原始的問題。正如我們在上節課中介紹的,使用.beta.threads.runs.submit_tool_outputs方法用于提交外部工具的輸出,此方法也支持流媒體輸出,如果在這個階段需要開啟流媒體傳輸,我們就需要使用 stream=True 參數來明確指定。代碼如下:
# Step 2. 創建一個新的 thread 對象實例
thread = client.beta.threads.create()
# Step 3. 將消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京現在的天氣怎么樣?"
)
# Step 4. 執行 運行
run = client.beta.threads.runs.create(
thread_id=thread.id,?
assistant_id=assistant.id,
stream=True
)
tool_outputs = []
for event in run:
# print(f"event:{event}")
if event.event == 'thread.run.requires_action':
# 先拿到thread.run.requires_action事件的全部信息
function_info = event.data
print("--------------------------------------------------")
print(f"Function Info: {function_info}")
required_action = function_info.required_action
print("--------------------------------------------------")
print(f"Required Action: {required_action}")
tool_calls = required_action.submit_tool_outputs.tool_calls
print("--------------------------------------------------")
print(f"Tool Calls: {tool_calls}")
? ? ? ? # 執行外部函數,使用循環是因為`Assistant API` 可以并行執行多個函數
for tool_call in tool_calls:
tool_id = tool_call.id
function = tool_call.function
function_name = function.name
function_args = json.loads(function.arguments)
function_result = available_functions[function_name](**function_args)
print("--------------------------------------------------")
print(f"Function Result for {function_name}: {function_result}")
? ? ? ? ? ? tool_outputs.append({"tool_call_id": tool_id, "output":function_result})
print("--------------------------------------------------")
print(f"Tool Outputs: {tool_outputs}")
run_tools = client.beta.threads.runs.submit_tool_outputs(
thread_id=thread.id,?
run_id=function_info.id,
tool_outputs=tool_outputs,
stream=True)
for event_tool in run_tools:
# print(f"event_tool:{event_tool}")
if event_tool.event == 'thread.message.delta':
# 提取 text delta 的 value
text = event_tool.data.delta.content[0].text.value
print("Delta Text Output:", text)
print("--------------------------------------------------") ?# 添加分隔符
if event_tool.event == 'thread.message.completed':
# 提取 text delta 的 value
full_text = event_tool.data.content[0].text.value
print("Completed Message Text:", full_text)
print("--------------------------------------------------") ?# 添加分隔符
由此可見,當流式傳輸中涉及到函數調用的中間過程時,實際上也是發生了兩次`Run`操作。為了使第二個`Run`能夠接續第一個`Run`的輸出,關鍵在于兩者之間需要共享第一個`Run`的`Run id`。此外,在第二個`Run`的運行狀態中需要設置`stream=True`以啟用流式傳輸,確保兩個`Run`狀態的輸出能夠有效鏈接,形成一次連貫的響應。整個過程的事件流非常多,需要大家仔細體會。