背景
最近由于工作要求需要使用Springboot搭建一個流式響應服務,即客戶端發送一次請求,服務端需要多次響應才能返回完整的數據。使用場景就是與chatGPT對話,你問一個問題,頁面會逐字將結果打印出來。
下面我在SpringBoot中可以簡單的實現一下這種場景需求,即SSE(Server-Sent Events)模式
前端請求實現方式
目前前端的請求實現方式有兩種,一個是采用EventSource實現,這種實現方式不支持自定義的請求頭,也就沒有辦法再請求頭部中增加Token這樣的用戶身份驗證信息。并且該方式只支持GET請求方式。所以這種實現方式只適用于,不需要驗證用戶身份并且請求參數內容少的情況下。
若要傳輸更多的參數信息或者在請求頭中增加自定義內容建議使用AbortController實現
若傳輸過程中鏈接斷開,EventSource可以實現自動重新鏈接,AbortController不能實現自動重新鏈接。
使用EventSource實現
// 建立連接let source = new EventSource('http://localhost:8080/sse/connect/' + userId);/*** 連接一旦建立,就會觸發open事件* 另一種寫法:source.onopen = function (event) {}*/source.addEventListener('open', function (e) {console.log("建立連接。。。");}, false);/*** 客戶端收到服務器發來的數據* 另一種寫法:source.onmessage = function (event) {}*/source.addEventListener('message', function (e) {console.log(e.data);});/*** 如果發生通信錯誤(比如連接中斷),就會觸發error事件* 或者:* 另一種寫法:source.onerror = function (event) {}*/source.addEventListener('error', function (e) {if (e.readyState === EventSource.CLOSED) {console.log("連接關閉");} else {console.log(e);}}, false);
使用AbortController實現
<template><div><input v-model="name" placeholder="Enter your name"><button @click="sendPost">Send POST request</button><button @click="stopGenerating">Stop Generating</button><button @click="restartGenerating">Restart Generating</button><pre>{{ response }}</pre></div>
</template><script>
export default {data() {return {name: '',response: '',controller: new AbortController(),isStopped: false}},methods: {async sendPost() {this.controller = new AbortController()this.response = ''this.isStopped = falseconst response = await fetch('http://127.0.0.1:5000/stream', {method: 'POST',headers: { 'Content-Type': 'application/json' },body: JSON.stringify({ name: this.name }),signal: this.controller.signal})const reader = response.body.getReader()while (true) {if (this.isStopped) breakconst { done, value } = await reader.read()if (done) breakthis.response += new TextDecoder().decode(value)}
},stopGenerating() {this.controller.abort()this.isStopped = true},restartGenerating() {this.controller = new AbortController()this.sendPost()}}
}
</script>
后端響應實現方式
使用SseEmitter實現
@RequestMapping(value = "/talkeAbouttestSseEmitter")public SseEmitter talkeAbouttestSseEmitter(HttpServletResponse response, @RequestBody JSONObject object) throws IOException {SseEmitter emitter = new SseEmitter();logger.info("【prompt內容】:{}", object.getString("prompt"));String str = " 什么是愛而不得? \n" +"東邊日出西邊雨,道是無晴卻有晴。\n" +"他朝若是同淋雪,此生也算共白頭。\n" +"我本將心向明月,奈何明月照溝渠。\n" +"此時相望不相聞,愿逐月華流照君。\n" +"衣帶漸寬終不悔,為伊消得人憔悴。\n" +"此情可待成追憶,只是當時已惘然。\n" +"人生若只如初見,何事西風悲畫扇。\n" +"曾經滄海難為水,除卻巫山不是云。\n" +"何當共剪西窗燭,卻話巴山夜雨時。\n" +"天長地久有時盡,此恨綿綿無絕期。\n" +"\n";response.setHeader("Content-Type", "text/event-stream");response.setContentType("text/event-stream");response.setCharacterEncoding("UTF-8");response.setHeader("Pragma", "no-cache");new Thread(() -> {
// // 響應流try {for (int i = 0; i < str.length(); i++) {// 指定事件標識 event: 這個為固定格式emitter.send(String.valueOf(str.charAt(i)));Thread.sleep(100);}emitter.send("stop");emitter.complete(); // Complete the SSE connection} catch (IOException e) {e.printStackTrace();}}).start();return emitter;}
使用HttpServlet實現
@RequestMapping(value = "/talkeAbouttestEvent")public void talkeAbouttestEvent(HttpServletResponse response, @Param("prompt") String prompt) throws IOException {logger.info("【prompt內容】:{}", prompt);String str = " 什么是愛而不得? \n" +"東邊日出西邊雨,道是無晴卻有晴。\n" +"他朝若是同淋雪,此生也算共白頭。\n" +"我本將心向明月,奈何明月照溝渠。\n" +"此時相望不相聞,愿逐月華流照君。\n" +"衣帶漸寬終不悔,為伊消得人憔悴。\n" +"此情可待成追憶,只是當時已惘然。\n" +"人生若只如初見,何事西風悲畫扇。\n" +"曾經滄海難為水,除卻巫山不是云。\n" +"何當共剪西窗燭,卻話巴山夜雨時。\n" +"天長地久有時盡,此恨綿綿無絕期。\n" +"\n";// 響應流response.setHeader("Content-Type", "text/event-stream");response.setContentType("text/event-stream");response.setCharacterEncoding("UTF-8");response.setHeader("Pragma", "no-cache");try {// 指定事件標識 event: 這個為固定格式response.getWriter().write("event:open\n");response.getWriter().flush();for (int i = 0; i < str.length(); i++) {// 指定事件標識 event: 這個為固定格式
// response.getWriter().write("event:msg\n");// 格式:data: + 數據 + 2個回車response.getWriter().write("data:{\"content\":\""+ String.valueOf(str.charAt(i)).getBytes(StandardCharsets.UTF_8) + "\"}\n\n");response.getWriter().flush();Thread.sleep(100);}// 指定事件標識 event: 這個為固定格式response.getWriter().write("event:error\n");response.getWriter().flush();
// response.getWriter().close();} catch (IOException | InterruptedException e) {e.printStackTrace();} finally {}}
后端請求實現方式
/*** ** @param url* @param json* @return*/public static BufferedReader sendJsonPostResveEventStream(String url, String json) {PrintWriter out = null;BufferedReader in = null;BufferedReader reader = null;try {log.info("sendPost - {}", url);log.info("json - {}", json);URL realUrl = new URL(url);HttpURLConnection conn = (HttpURLConnection) realUrl.openConnection();conn.setRequestMethod("POST");conn.setDoOutput(true);conn.setDoInput(true);conn.setUseCaches(false);conn.setRequestProperty("Connection", "Keep-Alive");conn.setRequestProperty("Charset", "UTF-8");conn.setRequestProperty("Content-Type", "application/json; charset=UTF-8");conn.setRequestProperty("accept", "application/json");if (json != null && !json.equals("")) {byte[] writebytes = json.getBytes();conn.setRequestProperty("Content-Length", String.valueOf(writebytes.length));OutputStream outwritestream = conn.getOutputStream();outwritestream.write(json.getBytes());outwritestream.flush();outwritestream.close();conn.getResponseCode();}if (conn.getResponseCode() == 200) {reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));return reader;}} catch (ConnectException e) {log.error("調用HttpUtils.sendPost ConnectException, url=" + url + ",param=" + json, e);} catch (SocketTimeoutException e) {log.error("調用HttpUtils.sendPost SocketTimeoutException, url=" + url + ",param=" + json, e);} catch (IOException e) {log.error("調用HttpUtils.sendPost IOException, url=" + url + ",param=" + json, e);} catch (Exception e) {log.error("調用HttpsUtil.sendPost Exception, url=" + url + ",param=" + json, e);} finally {try {if (out != null) {out.close();}if (in != null) {in.close();}} catch (IOException ex) {log.error("調用in.close Exception, url=" + url + ",param=" + json, ex);}}return null;}
后端請求然后以事件流的方式發送給前端
@PostMapping(value = "/talkeAbout", produces = "text/event-stream")public void talkeAbout(HttpServletResponse response, @RequestBody JSONObject object) throws IOException {response.setHeader("Content-Type", "text/event-stream");response.setContentType("text/event-stream");response.setCharacterEncoding("UTF-8");response.setHeader("Pragma", "no-cache");talkeAboutToXinference(object.getString("prompt"), response);}public void talkeAboutToXinference(String msg, HttpServletResponse response) throws IOException {String json = CHAT_PRARAM.replace("user_talke_about", msg);BufferedReader reader = HttpUtils.sendJsonPostResveEventStream("http://localhost/chat" + CHAT_CHAT_COMPLETIONS, json);if (reader == null) return;String line = "";while ((line = reader.readLine()) != null) {response.getWriter().write(line +"\n");response.getWriter().flush();}response.getWriter().close();}