文章目錄
- 簡介
- OkHttp 流式獲取 GPT 響應
- 通過 SSE 流式推送前端
- 后端代碼
- 消息實體
- 接口
- 接口實現
- 數據推送給前端
- 前端代碼
- 創建 `sseClient.js`
- vue3代碼
- 優化后端代碼
簡介
用過 ChatGPT 的伙伴應該想過自己通過調用ChatGPT官網提供的接口來實現一個自己的問答機器人,但是在調用的時候發現,請求總是以傳統的HTTP請求/響應模式進行,這意味著我們沒發送一個請求后需要等待 ChatGPT 服務器返回完整的響應。這種方式在生成文本時并不不是我們理想的,因為用戶體驗不夠流暢。
為了提供更好的用戶體驗,我們可以使用Server-Sent Events(SSE)技術來實現流式接收。這樣,當ChatGPT 服務器可以在生成響應的同時逐步將內容推送給我們,我們在通過 SSE 流式推送到前端頁面,讓用戶能夠實時看到生成的內容。我將詳細介紹如何在Java中實現這一功能。
OkHttp 流式獲取 GPT 響應
其實市面上已經有很多現成的框架支持,但我們這里使用 okHttp 這個輕量級的HTTP客戶端庫來實現。
需要先引用相關maven:
<dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId></dependency><dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp-sse</artifactId></dependency>
構建請求體,必須加上參數 stream
值為true
//構建發送內容String messageStr = StrUtil.format(prompt, params);// 創建一個Message對象,該對象表示一個消息,并設置其屬性Message message = new Message(Message.Role.USER.getRole(), messageStr);// 創建一個ChatCompletion對象,表示聊天完成請求,并將剛創建的消息添加到其中ChatCompletionRequest request = ChatCompletionRequest.builder().model(ChatCompletionRequest.Model.GPT_3_5_TURBO.getName()).messages(Arrays.asList(message)).stream(true).build();
// 定義see接口
Request request = new Request.Builder().url("https://api.openai.com/v1/chat/completions").header("Authorization","xxx").post(okhttp3.RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"),param.toJSONString())).build();
OkHttpClient okHttpClient = new OkHttpClient.Builder().connectTimeout(10, TimeUnit.MINUTES).readTimeout(10, TimeUnit.MINUTES)//這邊需要將超時顯示設置長一點,不然剛連上就斷開,之前以為調用方式錯誤被坑了半天.build();// 實例化EventSource,注冊EventSource監聽器
RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {@Overridepublic void onOpen(EventSource eventSource, Response response) {log.info("onOpen");}@SneakyThrows@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {// log.info("onEvent");// 在實際應用中,你可以在這里將數據推送給前端log.info(data);//請求到的數據}@Overridepublic void onClosed(EventSource eventSource) {log.info("onClosed");
// emitter.complete();}@Overridepublic void onFailure(EventSource eventSource, Throwable t, Response response) {log.error("onFailure 出現異常,response={}", response, t);//這邊可以監聽并重新打開
// emitter.complete();}
});
realEventSource.connect(okHttpClient);//真正開始請求的一步
通過 SSE 流式推送前端
sse(Server Sent Event),直譯為服務器發送事件,顧名思義,也就是客戶端可以獲取到服務器發送的事件
我們常見的 http 交互方式是客戶端發起請求,服務端響應,然后一次請求完畢;但是在 sse 的場景下,客戶端發起請求,連接一直保持,服務端有數據就可以返回數據給客戶端,這個返回可以是多次間隔的方式
原理是先建立鏈接,然后不斷發消息就可以
我們利用 springboot
封裝的 SseEmitter
來完成推送,需要用到以下依賴:
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.16</version>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId>
</dependency>
后端代碼
消息實體
其中客戶端 ID 是每個 SSE 鏈接的唯一標識,拿到 ID 可以精準的給唯一的用戶推送消息,消息通過字符串的方式進行傳遞
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** 消息體*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageVo {/*** 客戶端id*/private String clientId;/*** 傳輸數據體(json)*/private String data;
}
接口
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;public interface SseEmitterService {/*** 創建連接** @param clientId 客戶端ID*/SseEmitter createConnect(String clientId);/*** 根據客戶端id獲取SseEmitter對象** @param clientId 客戶端ID*/SseEmitter getSseEmitterByClientId(String clientId);/*** 發送消息給所有客戶端** @param msg 消息內容*/void sendMessageToAllClient(String msg);/*** 給指定客戶端發送消息** @param clientId 客戶端ID* @param msg 消息內容*/void sendMessageToOneClient(String clientId, String msg);/*** 關閉連接** @param clientId 客戶端ID*/void closeConnect(String clientId);
}
接口實現
@Slf4j
@Service
public class SseEmitterServiceImpl implements SseEmitterService {/*** 容器,保存連接,用于輸出返回 ;可使用其他方法實現*/private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();/*** 根據客戶端id獲取SseEmitter對象** @param clientId 客戶端ID*/@Overridepublic SseEmitter getSseEmitterByClientId(String clientId) {return sseCache.get(clientId);}/*** 創建連接** @param clientId 客戶端ID*/@Overridepublic SseEmitter createConnect(String clientId) {// 設置超時時間,0表示不過期。默認30秒,超過時間未完成會拋出異常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(0L);// 是否需要給客戶端推送IDif (StrUtil.isBlank(clientId)) {clientId = IdUtil.simpleUUID();}// 注冊回調sseEmitter.onCompletion(completionCallBack(clientId)); // 長鏈接完成后回調接口(即關閉連接時調用)sseEmitter.onTimeout(timeoutCallBack(clientId)); // 連接超時回調sseEmitter.onError(errorCallBack(clientId)); // 推送消息異常時,回調方法sseCache.put(clientId, sseEmitter);log.info("創建新的sse連接,當前用戶:{} 累計用戶:{}", clientId, sseCache.size());try {// 注冊成功返回用戶信息sseEmitter.send(SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_CREATED)).data(clientId, MediaType.APPLICATION_JSON));} catch (IOException e) {log.error("創建長鏈接異常,客戶端ID:{} 異常信息:{}", clientId, e.getMessage());}return sseEmitter;}/*** 發送消息給所有客戶端** @param msg 消息內容*/@Overridepublic void sendMessageToAllClient(String msg) {if (MapUtil.isEmpty(sseCache)) {return;}// 判斷發送的消息是否為空for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {MessageVo messageVo = new MessageVo();messageVo.setClientId(entry.getKey());messageVo.setData(msg);sendMsgToClientByClientId(entry.getKey(), messageVo, entry.getValue());}}/*** 給指定客戶端發送消息** @param clientId 客戶端ID* @param msg 消息內容*/@Overridepublic void sendMessageToOneClient(String clientId, String msg) {MessageVo messageVo = new MessageVo(clientId, msg);sendMsgToClientByClientId(clientId, messageVo, sseCache.get(clientId));}/*** 關閉連接** @param clientId 客戶端ID*/@Overridepublic void closeConnect(String clientId) {SseEmitter sseEmitter = sseCache.get(clientId);if (sseEmitter != null) {sseEmitter.complete();removeUser(clientId);}}/*** 推送消息到客戶端* 此處做了推送失敗后,重試推送機制,可根據自己業務進行修改** @param clientId 客戶端ID* @param messageVo 推送信息,此處結合具體業務,定義自己的返回值即可**/private void sendMsgToClientByClientId(String clientId, MessageVo messageVo, SseEmitter sseEmitter) {if (sseEmitter == null) {log.error("推送消息失敗:客戶端{}未創建長鏈接,失敗消息:{}",clientId, messageVo.toString());return;}SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_OK)).data(messageVo, MediaType.APPLICATION_JSON);try {sseEmitter.send(sendData);} catch (IOException e) {// 推送消息失敗,記錄錯誤日志,進行重推log.error("推送消息失敗:{},嘗試進行重推", messageVo.toString());boolean isSuccess = true;// 推送消息失敗后,每隔10s推送一次,推送5次for (int i = 0; i < 5; i++) {try {Thread.sleep(10000);sseEmitter = sseCache.get(clientId);if (sseEmitter == null) {log.error("{}的第{}次消息重推失敗,未創建長鏈接", clientId, i + 1);continue;}sseEmitter.send(sendData);} catch (Exception ex) {log.error("{}的第{}次消息重推失敗", clientId, i + 1, ex);continue;}log.info("{}的第{}次消息重推成功,{}", clientId, i + 1, messageVo.toString());return;}}}/*** 長鏈接完成后回調接口(即關閉連接時調用)** @param clientId 客戶端ID**/private Runnable completionCallBack(String clientId) {return () -> {log.info("結束連接:{}", clientId);removeUser(clientId);};}/*** 連接超時時調用** @param clientId 客戶端ID**/private Runnable timeoutCallBack(String clientId) {return () -> {log.info("連接超時:{}", clientId);removeUser(clientId);};}/*** 推送消息異常時,回調方法** @param clientId 客戶端ID**/private Consumer<Throwable> errorCallBack(String clientId) {return throwable -> {log.error("SseEmitterServiceImpl[errorCallBack]:連接異常,客戶端ID:{}", clientId);// 推送消息失敗后,每隔10s推送一次,推送5次for (int i = 0; i < 5; i++) {try {Thread.sleep(10000);SseEmitter sseEmitter = sseCache.get(clientId);if (sseEmitter == null) {log.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失敗,未獲取到 {} 對應的長鏈接", i + 1, clientId);continue;}sseEmitter.send("失敗后重新推送");} catch (Exception e) {e.printStackTrace();}}};}/*** 移除用戶連接** @param clientId 客戶端ID**/private void removeUser(String clientId) {sseCache.remove(clientId);log.info("SseEmitterServiceImpl[removeUser]:移除用戶:{}", clientId);}
}
數據推送給前端
在 onEvent 回調中添加代碼,每接收到消息后就推送到前端
// 定義see接口
Request request = new Request.Builder().url("https://api.openai.com/v1/chat/completions").header("Authorization","xxx").post(okhttp3.RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"),param.toJSONString())).build();
OkHttpClient okHttpClient = new OkHttpClient.Builder().connectTimeout(10, TimeUnit.MINUTES).readTimeout(10, TimeUnit.MINUTES)//這邊需要將超時顯示設置長一點,不然剛連上就斷開,之前以為調用方式錯誤被坑了半天.build();// 實例化EventSource,注冊EventSource監聽器
RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {@Overridepublic void onOpen(EventSource eventSource, Response response) {log.info("onOpen");}@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {if ("[DONE]".equals(data)) {System.out.println("收到 [DONE] 信號");return;}ChatCompletionResp chatCompletionResp = JSON.parseObject(data, ChatCompletionResp.class);// 獲得生成的文章內容if (CollUtil.isEmpty(chatCompletionResp.getChoices())){return;}Message delta = chatCompletionResp.getChoices().get(0).getDelta();if (delta == null || delta.getContent() == null){return;}sseEmitterService.sendMessageToOneClient(clientId , delta.getContent());log.info(data);//請求到的數據}@Overridepublic void onClosed(EventSource eventSource) {log.info("onClosed");
// emitter.complete();}@Overridepublic void onFailure(EventSource eventSource, Throwable t, Response response) {log.error("onFailure 出現異常,response={}", response, t);//這邊可以監聽并重新打開
// emitter.complete();}
});
realEventSource.connect(okHttpClient);//真正開始請求的一步
前端代碼
由于 EventSource 不允許直接配置請求頭,普通的 EventSource 如果需要攜帶token請求,那就需要引入一個插件
安裝 EventSourcePolyfill
你可以通過npm
安裝 event-source-polyfill
:
npm install event-source-polyfill
引入 EventSourcePolyfill 后,它會自動替換瀏覽器中的原生 EventSource,其用法與原生的 API 一致。你可以像使用 EventSource 一樣使用它:
創建 sseClient.js
封裝一下, sse 最佳實踐,
// utils/sseClient.js
import { EventSourcePolyfill } from 'event-source-polyfill'
import { baseURL } from '../config';// 封裝一個創建 SSE 連接的方法
export function newEventSource({ clientId = '', headers = {}, onMessage, onError, onOpen }) {const token = sessionStorage.getItem('token') || ''const es = new EventSourcePolyfill(baseURL + 'p/sse/createConnect?clientId=' + clientId , {headers: {'Authorization': `Bearer ${token}`...headers},heartbeatTimeout: 60 * 1000, // 心跳超時(可選)})es.onopen = (event) => {console.log('SSE 連接已開啟')onOpen && onOpen(event)}es.onmessage = (event) => {//前端:在接收到結束標識后立即銷毀if (event.data === '[DONE]') {console.log('SSE 連接已關閉')es.close()}onMessage && onMessage(event)}es.onerror = (event) => {console.error('SSE 錯誤:', event)onError && onError(event)es.close() // 出錯時自動關閉}return es // 返回實例,方便外部主動關閉
}
vue3代碼
import { newEventSource } from '@/utils/sseClient.js'const createSseConnection = () => {return newEventSource({clientId: 'xxx',onMessage: (event) => {console.log('Received SSE message:', event.data);}});
};
優化后端代碼
按需建立連接并及時關閉 是非常關鍵的實踐策略,每一個 SseEmitter 在服務端都是一個線程或者任務掛起的狀態,太多不關閉會導致資源消耗(線程、連接、內存等);
如果每個用戶長時間掛一個 SSE,不及時關閉,可能造成內存泄露或線程池耗盡,所以我們優化一下后端代碼,在完成輸出后及時關閉連接.
在關閉和異常的回調方法中添加:
sseEmitterService.sendMessageToOneClient(clientId, "[DONE]");
sseEmitterService.closeConnect(clientId);
修改后:
// 定義see接口
Request request = new Request.Builder().url("https://api.openai.com/v1/chat/completions").header("Authorization","xxx").post(okhttp3.RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"),param.toJSONString())).build();
OkHttpClient okHttpClient = new OkHttpClient.Builder().connectTimeout(10, TimeUnit.MINUTES).readTimeout(10, TimeUnit.MINUTES)//這邊需要將超時顯示設置長一點,不然剛連上就斷開,之前以為調用方式錯誤被坑了半天.build();// 實例化EventSource,注冊EventSource監聽器
RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {@Overridepublic void onOpen(EventSource eventSource, Response response) {log.info("onOpen");}@SneakyThrows@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {// log.info("onEvent");// 在實際應用中,你可以在這里將數據推送給前端log.info(data);//請求到的數據}@Overridepublic void onClosed(EventSource eventSource) {log.info("onClosed");sseEmitterService.sendMessageToOneClient(clientId, "[DONE]");sseEmitterService.closeConnect(clientId);
// emitter.complete();}@Overridepublic void onFailure(EventSource eventSource, Throwable t, Response response) {log.error("onFailure 出現異常,response={}", response, t);//這邊可以監聽并重新打開sseEmitterService.sendMessageToOneClient(clientId, "[DONE]");sseEmitterService.closeConnect(clientId);
// emitter.complete();}
});
realEventSource.connect(okHttpClient);//真正開始請求的一步
輸出效果如下:
參考文章:
java模擬GPT流式問答
Springboot 集成 SSE 向前端推送消息