前言
最近需要使用uniapp開發一個智能對話頁面,其中就需要使用SSE進行通信。
本文介紹下在uniapp中如何基于uni.request實現SSE流式處理。
在線體驗
#小程序:yinuosnowball
SSE傳輸格式
返回輸出的流式塊:
-
Content-Type為
text/event-stream
-
每個流式塊均為 data: 開頭,塊之間以 \n\n 即兩個換行符分隔, 如下所示:
-
后端接口定義的數據如下: - event為message,開始接收數據,answer為返回的結果 - event為message_end結束
接口數據已約定完成,下一步使用uniapp開始接收處理數據。
uniapp處理數據
客戶端實現在微信小程序中接收 SSE 流式響應,需要以下步驟:
- 配置 HTTP 請求:設置適當的請求頭和參數,以確保服務器返回流式響應。
- 處理分塊數據:由于SSE是分塊傳輸的,我們需要監聽每個數據塊,并解析它們。
- 錯誤處理:當每一次返回的最后出現不是完整的響應時,需要進行特殊處理。
- 完成時:可以進行追問等額外處理
下面使用 uni.request實現SSE的案例:
基本框架:
const requestTask = uni.request({url,method: 'POST',header: {Accept: 'text/event-stream',Authorization,},data,enableChunked: true,responseType: 'arraybuffer',success: (res) => {console.log('Data received 數據接受完畢:', res.data)},fail: (error) => {console.log('打印***error 錯誤處理', error)},complete: (complete) => {console.log('打印***complete 完成接收', complete)}
})requestTask.onChunkReceived((res)=>{// 處理數據
})
通過對requestTask
的onChunkReceived
監聽就可以得到數據塊,通過打印我們可以看到數據返回是ArrayBuffer,我們需要進行處理。
const uint8Array = new Uint8Array(res.data);let text = String.fromCharCode.apply(null, uint8Array);
解析后得到以data:data:
返回的格式,
此處需要注意解析后是data: 還是 data:data:格式
進一步處理:由于返回的數據塊不是一段一段,而是很多段都返回,因此我們需要進行\n\n
進行拆分,然后逐個解析:
const arr = text.split('\n\n').filter(Boolean)
arr.forEach(msg => {const jsonStr = msg.substring(11); // 去掉 'data:data: ' 前綴const data = JSON.parse(jsonStr);switch (data.event) {case 'message': {// 拼接返回文本this.dialogueList[existingMessageIndex].answer += data.answer;break;}case 'message_end':// 消息結束break;}
});
至此,我們就可以接收到消息,如果就這樣那就最好,但對接的過程發現,每一次返回的文本最后一段不是完整的,導致解析出現失敗,如下圖所示:
解決方案是: 定義一個變量,當解析出現失敗時,肯定是最后一段,進行存儲,下一次接收到數據將上一次存儲的進行拼接,然后解析:
具體代碼:
// 每次發送存儲數據
const msgObj = {query,answer: "",conversationId: null,isDone: false
}
this.dialogueList.push(msgObj)
// 保存上一次失敗的text
let lastText = ''
requestTask.onChunkReceived((res) => {// 第一步:獲取 字符串 數組const uint8Array = new Uint8Array(res.data);let text = lastText + String.fromCharCode.apply(null, uint8Array);lastText = '';let arr = text.split('\n\n').filter(Boolean)let lastIndex = arr.length - 1// 第二步:是否可以直接進行解析try {// 判斷是否可以全部解析完成arr.every(item => JSON.parse(item.substring(11)))} catch (error) {// 如果報錯截取最后一項lastText = arr[lastIndex]arr = arr.filter((_, i) => i !== lastIndex)}// 處理數據塊if (arr.length) {try {arr.forEach(msg => {const jsonStr = msg.substring(11); // 去掉 'data: ' 前綴const data = JSON.parse(jsonStr);const existingMessageIndex = this.dialogueList.findIndex(item => item === msgObj);switch (data.event) {case 'message': {// 查找是否存在相同ID的消息this.dialogueList[existingMessageIndex].answer += data.answer;break;}case 'message_end':// 消息結束break;}});} catch (error) {console.error('解析數據失敗:', error);}}
});
這樣就完成了對不連續返回的錯誤處理。
如果需要直接結束請求,可以直接使用requestTask.abort()
。
完整代碼
function sendMsg(query) {const msgObj = {query, // 問題answer: "", // 回答的結果conversationId: null,feedback: null,isDone: false // 自定義格式,用于加載處理}this.dialogueList.push(msgObj)// 請求參數const data = {}this.requestTask = uni.request({url,method: 'POST',header: {Accept: 'text/event-stream',Authorization: getStorage(tokenKeyEnum.zhonglv),},data,enableChunked: true,responseType: 'arraybuffer',success: (res) => {console.log('Data received 數據接受完畢:', res.data)},fail: (error) => {console.log('打印***error 錯誤處理', error)},complete: (complete) => { console.log('打印***complete 完成接收', complete)}})let lastText = ''this.requestTask.onChunkReceived((res) => {// 第一步:獲取 字符串 數組const uint8Array = new Uint8Array(res.data);let text = lastText + String.fromCharCode.apply(null, uint8Array);lastText = '';let arr = text.split('\n\n').filter(Boolean)let lastIndex = arr.length - 1// 第二步:是否可以直接進行解析try {let isCanResolve = arr.every(item => JSON.parse(item.substring(11)))console.log('打印***isCanResolve', isCanResolve)} catch (error) {// 如果報錯截取最后一項lastText = arr[lastIndex]arr = arr.filter((_, i) => i !== lastIndex)console.log('打印***error', error)}// 處理數據塊if (arr.length) {try {arr.forEach(msg => {const jsonStr = msg.substring(11); // 去掉 'data:data: ' 前綴const data = JSON.parse(jsonStr);const existingMessageIndex = this.dialogueList.findIndex(item => item === msgObj);switch (data.event) {case 'message': {// 查找是否存在相同ID的消息this.dialogueList[existingMessageIndex].answer += data.answer;break;}case 'message_end':// 消息結束this.dialogueList[existingMessageIndex].isDone = true;break;}});} catch (error) {console.error('解析數據失敗:', error);}}});
}
總結
最后總結一下,在uniapp中使用uni.request處理流式響應,主要步驟有:
- 開啟:
enableChunked: true
- 設置請求Header:
Accept: 'text/event-stream'
- 注冊數據接收響應函數:
requestTask.onChunkReceived(onChunkReceived)
- 分塊數據解析
String.fromCharCode
- 處理不連續返回問題
- 結束:
requestTask.abort()
希望對你有所幫助,如有錯誤,請指正 O^O!
參考文檔
- uni.request文檔: uniapp.dcloud.net.cn/api/request…