大模型及agent開發6 OpenAI Assistant API 高階應用 - 流式輸出功能


1.Assistant API 的主要優點:
減少編碼工作量、自動管理上下文窗口、安全的訪問控制、工具和文檔的輕松集成
本節講應用設計和性能

流式輸出:借助流式輸出,可以讓應用程序實時處理和響應用戶輸入。具體來說,這種技術允許數據在生成的同時即刻傳輸和處理,而不必等待整個數據處理過程完成。這樣,用戶無需等待全部數據加載完成即可開始接收到部分結果,從而顯著提高了應用的反應速度和交互流暢性。
eg:對話時回答是逐漸展現的。

在流式輸出的實現方式中,我們需要在調用 client.chat.completions.create() 時添加 stream=True 參數,用以指定啟用流式輸出,從而允許 API 逐步發送生成的文本片段,然后使用 for 循環來迭代 completion 對象
code如下:
from openai import OpenAI
client = OpenAI()

completion = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一位樂于助人的人工智能小助理。"},
{"role": "user", "content": "你好,請你介紹一下你自己。"}
],
stream=True
)

for chunk in completion:
print(chunk.choices[0].delta)


OpenAI 提供了兩種主要的流實現方法:第一種基于 WebSocket 的直接實時通信方法,第二種是適用于本身不支持此類交互的平臺的模擬流。兩種流實現方法各有其特點和應用場景:

基于 WebSocket 的直接實時通信方法:
WebSocket 是一種網絡通信協議,它允許在用戶的瀏覽器和服務器之間建立一個不斷開的連接,通過這個連接可以實現雙向的數據傳輸。

模擬流
是為了在不支持 WebSocket 或其他實時協議的環境中提供類似的功能。這種方法通過定期發送HTTP請求來“模擬”一個持續的數據流。常見的實現方式包括長輪詢和服務器發送事件(Server-Sent Events, SSE)。
在這兩種方法中,模擬流更加常用,因為它依賴于標準的HTTP請求和響應,易于理解和實現。

二.具體實操:
1.Assistant API 如何開啟流式傳輸
要在一個完整的生命周期內啟用流式傳輸,可以分別在Create Run、 Create Thread and Run和 Submit Tool Outputs 這三個 API 端點中添加 "stream": True 參數。
這樣設置后,API 返回的響應將是一個事件流。

首先,我們按照 Assistant API創建對話或代理的標準流程來構建應用實例。如下代碼所示:
from openai import OpenAI

# 構建 OpenAI 客戶端對象的實例
client = OpenAI()

# Step 1. 創建 Assistant 對象
assistant = client.beta.assistants.create(
model="gpt-4o-mini-2024-07-18",
name="Good writer", ?# 優秀的作家
instructions="You are an expert at writing excellent literature" ?# 你是一位善于寫優秀文學作品的專家
)

# Step 2. 創建 Thread 對象
thread = client.beta.threads.create()

# Step 3. 向Thread中添加Message
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="寫一篇關于一個小女孩在森林里遇到一只怪獸的故事。詳細介紹她的所見所聞,并描述她的心里活動"
)
在Create Run的過程中,添加stream=True參數,開啟流媒體傳輸。代碼如下:
run = client.beta.threads.runs.create(
assistant_id=assistant.id,
thread_id=thread.id,
stream=True ? ? ?# 開啟流式傳輸
)

print(run)

for event in run:
print(event)
其返回的結果是一個 openai.Stream 對象。這個對象表示的是一個流式數據通道,可以用來接收連續傳輸的數據。


2.Assistant API 流式傳輸中的事件流
整個事件流的核心流程包括:當新的運行被創建時,發出 thread.run.created 事件;當運行完成時,發出 thread.run.completed 事件。在運行期間選擇創建消息時,將發出一個 thread.message.created 事件、一個 thread.message.in_progress 事件、多個 thread.message.delta 事件,以及最終的 thread.message.completed 事件。

在處理過程中,任何需要的信息都可以通過訪問數據結構的方式來獲取。
run = client.beta.threads.runs.create(
assistant_id=assistant.id,
thread_id=thread.id,
stream=True ? ? ?# 開啟流媒體傳輸
)

for event in run:
#print(event.event)
if event.event == 'thread.message.delta':
# 提取 text delta 的 value
value = event.data.delta.content[0].text.value
print(value)

按照相同的思路,我們還可以嘗試測試Create Thread and Run和 Submit Tool Outputs兩個端點在啟用流媒體傳輸時的實現方法。

??首先來看Create Thread and Run方法。這個方法很容易理解,它所做的事情就是把創建Thread對象實例、向Thread對象實例中追加Message信息以及構建Run狀態這三個單獨的步驟合并在了一個.beta.threads.create_and_run方法中,所以調用方法也發生了一定的變化,代碼如下:
run = client.beta.threads.create_and_run(
assistant_id=assistant.id,
thread={
"messages": [
{"role": "user", "content": "寫一篇歌頌中國的文章"}
]
},
stream=True,
)

for event in run:
print(event)

3.如何在函數調用中啟用流式傳輸
這里我們定義一個外部函數get_current_weather,用來獲取指定地點的當前天氣狀況。
import json

def get_current_weather(location, unit="celsius"):
"""Get the current weather in a given location in China"""
if "beijing" in location.lower():
return json.dumps({"location": "Beijing", "temperature": "15", "unit": unit})
elif "shanghai" in location.lower():
return json.dumps({"location": "Shanghai", "temperature": "20", "unit": unit})
elif "guangzhou" in location.lower():
return json.dumps({"location": "Guangzhou", "temperature": "25", "unit": unit})
else:
return json.dumps({"location": location, "temperature": "unknown"})

接下來,需要為 get_current_weather 函數編寫一個 JSON Schema的表示。這個 Json Schema的描述將用于指導大模型何時以及如何調用這個外部函數。通過定義必要的參數、預期的數據格式和響應類型,可以確保大模型能夠正確并有效地利用這個功用這個功能。定義如下:
get_weather_desc = {
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g.beijing",
},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["location"],
},
},
}

然后,定義一個available_functions 字典,用來映射函數名到具體的函數對象,因為在大模型識別到需要調用外部函數的時候,我們需要動態地調用函數并獲取到函數的執行結果。
available_functions = {
"get_current_weather": get_current_weather,
}

在準備好外部函數后,我們在定義 Assistant 對象時需要使用 tools 參數。此參數是一個列表的數據類型,我們要將可用的外部工具的 JSON Schema 描述添加進去,從而讓 Assistant 可以正確識別這些工具,代碼如下:
from openai import OpenAI
client = OpenAI()


# Step 1. 創建一個新的 assistant 對象實例
assistant = client.beta.assistants.create(
name="你是一個實時天氣小助理", ??
instructions="你可以調用工具,獲取到當前的實時天氣,再給出最終的回復",?
model="gpt-4o-mini-2024-07-18",
tools=[get_weather_desc],
)?

# Step 2. 創建一個新的 thread 對象實例
thread = client.beta.threads.create()
然后,還是按照Assistant API的標準流程,將Messages追加到Thread中,并執行Run運行狀態。代碼如下:

# Step 3. 將消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京現在的天氣怎么樣?"
)


# Step 4. 執行 運行
run = client.beta.threads.runs.create(
thread_id=thread.id,?
assistant_id=assistant.id,
stream=True
)

for event in run:
print(event)


從上述的輸出中可以分析出:**與直接生成回復的流式過程相比,明顯的區別體現在 `thread.run.step.delta` 事件中。**在這一事件中,增量更新的是大模型輸出的參數`arguments`,這個參數中的內容是用于執行外部函數的,而非對“北京現在的天氣怎么樣?”這個提問的回答。此外,流程會暫停在 `thread.run.requires_action` 事件處,整個過程不會自動生成最終的回復,而是等待外部操作的完成。
因此要在操作中識別這兩點
這里我們仍然可以通過直接提取數據結構的方式來獲取到想要的信息。
# Step 2. 創建一個新的 thread 對象實例
thread = client.beta.threads.create()

# Step 3. 將消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京現在的天氣怎么樣?"
)


# Step 4. 執行 運行
run = client.beta.threads.runs.create(
thread_id=thread.id,?
assistant_id=assistant.id,
stream=True
)


for event in run:
if event.event == 'thread.run.step.delta':
# 打印大模型輸出的參數
print("Delta Event Arguments:")
print(event.data.delta.step_details.tool_calls[0].function.arguments)
print("--------------------------------------------------") ?# 添加分隔符

? ? if event.event == 'thread.run.requires_action':
# 打印需要采取行動的詳細信息
print("Requires Action Data:")
print(event.data)
print("--------------------------------------------------") ?# 添加分隔符

在 event.event == 'thread.run.requires_action' 事件中,當識別到需要執行的外部函數及其參數后,我們可以根據Function Calling中介紹的方法來執行這些函數。在流媒體的事件流中我們只要適當的進行修改,就可以完成這個過程。代碼如下:

# Step 2. 創建一個新的 thread 對象實例
thread = client.beta.threads.create()

# Step 3. 將消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京現在的天氣怎么樣?"
)


# Step 4. 執行 運行
run = client.beta.threads.runs.create(
thread_id=thread.id,?
assistant_id=assistant.id,
stream=True
)

tool_outputs = []
for event in run:
# print(f"event:{event}")
if event.event == 'thread.run.requires_action':
# 先拿到thread.run.requires_action事件的全部信息
function_info = event.data
print("--------------------------------------------------")
print(f"Function Info: {function_info}")

required_action = function_info.required_action
print("--------------------------------------------------")
print(f"Required Action: {required_action}")

tool_calls = required_action.submit_tool_outputs.tool_calls
print("--------------------------------------------------")
print(f"Tool Calls: {tool_calls}")

? ? ? ? # 執行外部函數,使用循環是因為`Assistant API` 可以并行執行多個函數
for tool_call in tool_calls:
tool_id = tool_call.id
function = tool_call.function
function_name = function.name
function_args = json.loads(function.arguments)
function_result = available_functions[function_name](**function_args)
print("--------------------------------------------------")
print(f"Function Result for {function_name}: {function_result}")

? ? ? ? ? ? tool_outputs.append({"tool_call_id": tool_id, "output":function_result})

print("--------------------------------------------------")
print(f"Tool Outputs: {tool_outputs}")

?當獲取到外部函數的執行結果后,需要將這些結果再次追加到 Thread 中,使其再次進入到隊列中,以繼續回答北京現在的天氣怎么樣?這個原始的問題。正如我們在上節課中介紹的,使用.beta.threads.runs.submit_tool_outputs方法用于提交外部工具的輸出,此方法也支持流媒體輸出,如果在這個階段需要開啟流媒體傳輸,我們就需要使用 stream=True 參數來明確指定。代碼如下:

# Step 2. 創建一個新的 thread 對象實例
thread = client.beta.threads.create()

# Step 3. 將消息追加到 Thread 中
message = client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content="北京現在的天氣怎么樣?"
)


# Step 4. 執行 運行
run = client.beta.threads.runs.create(
thread_id=thread.id,?
assistant_id=assistant.id,
stream=True
)

tool_outputs = []
for event in run:
# print(f"event:{event}")
if event.event == 'thread.run.requires_action':
# 先拿到thread.run.requires_action事件的全部信息
function_info = event.data
print("--------------------------------------------------")
print(f"Function Info: {function_info}")

required_action = function_info.required_action
print("--------------------------------------------------")
print(f"Required Action: {required_action}")

tool_calls = required_action.submit_tool_outputs.tool_calls
print("--------------------------------------------------")
print(f"Tool Calls: {tool_calls}")

? ? ? ? # 執行外部函數,使用循環是因為`Assistant API` 可以并行執行多個函數
for tool_call in tool_calls:
tool_id = tool_call.id
function = tool_call.function
function_name = function.name
function_args = json.loads(function.arguments)
function_result = available_functions[function_name](**function_args)
print("--------------------------------------------------")
print(f"Function Result for {function_name}: {function_result}")

? ? ? ? ? ? tool_outputs.append({"tool_call_id": tool_id, "output":function_result})

print("--------------------------------------------------")
print(f"Tool Outputs: {tool_outputs}")


run_tools = client.beta.threads.runs.submit_tool_outputs(
thread_id=thread.id,?
run_id=function_info.id,
tool_outputs=tool_outputs,
stream=True)

for event_tool in run_tools:
# print(f"event_tool:{event_tool}")
if event_tool.event == 'thread.message.delta':
# 提取 text delta 的 value
text = event_tool.data.delta.content[0].text.value
print("Delta Text Output:", text)
print("--------------------------------------------------") ?# 添加分隔符

if event_tool.event == 'thread.message.completed':
# 提取 text delta 的 value
full_text = event_tool.data.content[0].text.value
print("Completed Message Text:", full_text)
print("--------------------------------------------------") ?# 添加分隔符

由此可見,當流式傳輸中涉及到函數調用的中間過程時,實際上也是發生了兩次`Run`操作。為了使第二個`Run`能夠接續第一個`Run`的輸出,關鍵在于兩者之間需要共享第一個`Run`的`Run id`。此外,在第二個`Run`的運行狀態中需要設置`stream=True`以啟用流式傳輸,確保兩個`Run`狀態的輸出能夠有效鏈接,形成一次連貫的響應。整個過程的事件流非常多,需要大家仔細體會。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/88361.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/88361.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/88361.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

React Native安卓劉海屏適配終極方案:僅需修改 AndroidManifest.xml!

📌 問題背景在 React Native 開發中,我們經常會遇到安卓設備劉海屏(Notch)適配問題。即使正確使用了 react-native-safe-area-context 和 react-navigation,在一些安卓設備(如小米、華為、OPPO 等&#xff…

Spring Boot整合MyBatis+MySQL實戰指南(Java 1.8 + 單元測試)

一、環境準備 開發工具&#xff1a;IntelliJ IDEA 2023.1 JDK 1.8.0_382 Maven3.6.3數據庫&#xff1a;MySQL 8.0.21依賴版本&#xff1a;<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifact…

游戲開發日記

如何用數據表來儲存&#xff0c;位置坐標&#xff08;XYZ&#xff09;&#xff1a;決定了對象在世界中的擺放資源ID / 圖片URL&#xff1a;決定了使用什么模型或貼圖事件ID / 特效&#xff1a;是否觸發某些事件&#xff08;例如點擊、交互&#xff09;邏輯索引&#xff08;Grid…

如何使用xmind編寫測試用例

如何使用xmind編寫測試用例為什么要使用xmind&#xff1f;使用xmind編寫測試用例是為了梳理我們的思路。使用xmind編寫測試用例的思路是什么&#xff1f;先進行分析再提取測試用例。 例如下面的注冊功能的測試用例的分析&#xff1a; 分析&#xff1a; 先提取出需要測試的功能點…

使用LLaMA-Factory微調Qwen2.5-VL-3B 的目標檢測任務-數據集格式轉換(voc 轉 ShareGPT)

一、LLaMA-Factory Qwen2.5-VL ShareGPT 格式要求ShareGPT 格式就是多輪對話的 list&#xff0c;每條數據如下&#xff1a;[{"conversations": [{"from": "user", "value": "<image>\n請標注圖片中的所有目標及其類別和位…

【SkyWalking】服務端部署與微服務無侵入接入實戰指南

【SkyWalking】服務端部署與微服務無侵入接入實戰指南 &#x1f4a1; SkyWalking 系列總引導 在微服務架構快速演進的今天&#xff0c;如何有效實現服務鏈路追蹤、性能分析、日志采集與自動化告警&#xff0c;成為系統穩定性的關鍵保障手段。 SkyWalking&#xff0c;作為 Apa…

LVDS系列20:Xilinx 7系ISERDESE2原語(一)

Xilinx 7系FPGA bank的io單元如下&#xff1a;Hr bank比hp bank少odelaye2組件&#xff0c;兩者的idelaye2組件后面&#xff0c;都有iserdese2組件&#xff1b; iserdese2組件是一種專用的串并轉換器或稱解串器&#xff0c;用于高速源同步應用&#xff0c;如大部分LVDS信號解析…

【U-Boot】Shell指令

目錄 U-Boot 三個Shell U-Boot Shell Linux Shell shell腳本 總結 U-Boot Shell命令 幫助命令 部分命令分類與功能說明 一、基礎操作與信息查詢 二、內存操作 三、啟動管理 四、文件系統操作 五、設備與分區管理 六、環境變量 七、診斷與調試 八、特殊功能 九…

《Revisiting Generative Replay for Class Incremental Object Detection》閱讀筆記

摘要Abstract部分 原文 Generative replay has gained significant attention in class-incremental learning; however, its application to Class Incremental Object Detection (CIOD) remains limited due to the challenges in generating complex images with precise …

Mysql: Bin log原理以及三種格式

目錄 一、什么是 Binlog&#xff1f; 二、Binlog 的應用場景與案例 1. 數據恢復 (Point-in-Time Recovery) 2. 主從復制 (Master-Slave Replication) 3. 數據審計 三、Binlog 的三種格式 1. STATEMENT 模式 (Statement-Based Logging - SBL) 2. ROW 模式 (Row-Based Log…

LiteHub之文件下載與視頻播放

文件下載 前端請求 箭頭函數 //這個箭頭函數可以形象理解為&#xff0c;x流入&#xff08;>&#xff09;x*x, //自然而然>前面的就是傳入參數,>表示函數體 x > x * x//相當于 function (x) {return x * x; }//如果參數不是一個&#xff0c;就需要用括號()括起來…

QT5使用cmakelists引入Qt5Xlsx庫并使用

1、首先需要已經有了Qt5Xlsx的頭文件和庫&#xff0c;并拷貝到程序exe路徑下&#xff08;以xxx.exe/3rdparty/qtxlsx路徑為例&#xff0c;Qt5Xlsx版本為0.3.0&#xff09;&#xff1b; 2、cmakelist中&#xff1a; # 設置 QtXlsx 路徑 set(QTXLSX_ROOT_DIR ${CMAKE_CURRENT_SOU…

醋酸鐠:閃亮的稀土寶藏,掀開科技應用新篇章

一、什么是醋酸鐠醋酸鐠是一種鐠的有機鹽&#xff0c;鐠是稀土金屬元素之一。作為一種重要的稀土化合物&#xff0c;醋酸鐠通常以水合物的形式存在&#xff0c;呈現淡黃色或無色結晶。鐠元素本身因其獨特的物理化學特性&#xff0c;在工業和科技領域有著廣泛應用&#xff0c;而…

深入解析JVM內存結構與垃圾回收機制

java是強類型高級語言JVM&#xff08;Java Virtual Machine&#xff0c;Java虛擬機&#xff09;是Java平臺的核心組件&#xff0c;它是一個虛擬的計算機&#xff0c;能夠執行Java字節碼&#xff08;bytecode&#xff09;。1、區域劃分JVM對Java內存的管理也是分區分塊進行&…

Java 流程控制詳解:從順序執行到跳轉語句,掌握程序邏輯設計

作為一名Java開發工程師&#xff0c;你一定知道&#xff0c;流程控制&#xff08;Flow Control&#xff09; 是編寫任何程序的核心。它決定了代碼的執行路徑、分支走向和循環次數。本文將帶你系統梳理 Java中的所有常用流程控制結構&#xff0c;包括&#xff1a;順序結構分支結…

面試150 環形鏈表

思路 采用雙指針法,slow指針每次走一步,fast指針每次走兩步&#xff0c;如果相遇的情況下&#xff0c;slow指針回到開始的位置,此時快慢指針各走一步&#xff0c;當相遇的時候也就是說明鏈表中有環。 # Definition for singly-linked list. # class ListNode: # def __init…

AI技術正在深度重構全球產業格局,其影響已超越工具屬性,演變為推動行業變革的核心引擎。

一、AI如何重塑AI的工作與行業&#xff08;AI助手領域&#xff09;能力升級理解與生成&#xff1a;基于LLM&#xff08;大語言模型&#xff09;&#xff0c;AI能處理開放式問題、撰寫報告、翻譯代碼&#xff0c;替代部分人類知識工作。個性化交互&#xff1a;通過用戶歷史對話分…

Kafka的無消息丟失配置怎么實現

那 Kafka 到底在什么情況下才能保證消息不丟失呢&#xff1f; Kafka 只對“已提交”的消息&#xff08;committed message&#xff09;做有限度的持久化保證。 第一個核心要素是“已提交的消息”。什么是已提交的消息&#xff1f;當 Kafka 的若干個 Broker 成 功地接收到一條…

集成CommitLInt+ESLint+Prettier+StyleLint+LintStaged

代碼可讀性低代碼 代碼規范落地難代碼格式難統一代碼質量低下 配置 ESLint ESLint 是一個用來識別 ECMAScript 并且按照規則給出報告的代碼檢測工具&#xff0c;使用它可以避免低級錯誤和統一代碼的風格。它擁有以下功能&#xff1a; 查出 JavaScript 代碼語法問題。根據配置…

尋找兩個正序數組的中位數(C++)

給定兩個大小分別為 m 和 n 的正序&#xff08;從小到大&#xff09;數組 nums1 和 nums2。請你找出并返回這兩個正序數組的 中位數 。算法的時間復雜度應該為 O(log (mn)) 。示例 1&#xff1a;輸入&#xff1a;nums1 [1,3], nums2 [2] 輸出&#xff1a;2.00000 解釋&#x…