01 引言
大模型時代,尤其會話模型為了提高用戶的使用體驗,它不會將所有的數據加載完成一次響應給客戶端,而是通過數據流,一點點的將數據慢慢呈現出來。
正是這種有趣的交互方式一次次將SSE(Server Sent Event)
技術推到大眾視野。之前的文章已經介紹過SSE推送技術
以及SSE替代WebSocket實現直播間實時評論功能
的文章,這里就不在贅述。
恰巧,有位粉絲朋友咨詢怎么講流式數據傳給前端,前端怎么接收?小編為此做了相關的測試,整理在這里,分享給大家。
02 思維定式
我們經常編寫的是一次請求,一次響應這樣標準的http
請求。如:
@RestController
public class FooController {@GetMapping("/foo")public String foo() {return "success";}
}
而流式數據該怎么處理呢?一次請求,有源源不斷的數據加載進來。如果按照思維定式處理,只能能到所有的數據全部加載完成,再響應給前端。這樣帶來的結果有:
- 響應時間過長,體驗感很差
- 響應超時,前端無法獲取到數據
流式數據返回在響應式編程里面非常普遍,如reactor.core.publisher.Flux
、io.reactivex.Flowable
等。我們姑且以Flux
和Flowable
作為案例測試。
03 流式響應
3.1 Flux<T>
Flux
是spring-boot-starter-webflux
管理下的reactor-core.jar
包。具體的Maven
:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
案例
@GetMapping("/stream")
public Flux<String> streamData() {return Flux.interval(Duration.ofSeconds(1)).map(sequence -> {// 為了方便:拼接隨機數方便觀察String a = "flux_data: " + sequence + ":" +new Random().nextFloat();System.out.println(a);return a;});
}
瀏覽器直接訪問
我們可以看到正常的Http請求,只有等到所有的流數據處理完成之后才會一起展示出來。
3.2 Flowable<T>
Flowable
是位于rxjava.jar
包下。具體的Maven
:
<dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId><version>2.2.21</version>
</dependency>
案例
@GetMapping("/flowable")
public Flowable<String> flowable() {// 每隔一秒發送一條數據return Flowable.interval(1, TimeUnit.SECONDS).map(item -> {// 為了方便觀察,增加隨機數String data ="flowable_test:"+ item + "_" + new Random().nextFloat();System.out.println(data);return data;});
}
瀏覽器直接訪問
斷開服務器
3.3 前端接收
上面的案例均使用了瀏覽器直接訪問,類似使用了ajax
、fetch
等請求。我們以fetch
為例:
$("button[name='send']").click(function (){fetch("/stream").then(resp => resp.text()).then(data => console.info(data));
});
結果
第一張圖中并沒有打印數據,而是等全部流數據完成之后才會打印展示。前端處理的話,同樣等到數據處理完成才能渲染。
這種常規的請求已然不能滿足我們的業務場景了。只能請出我們的大殺器SSE
了。
04 前端處理流式請求
關于SSE
這里不多介紹,詳細可以看詳細文檔說明:https://javascript.info/server-sent-events
4.1 前端直接處理流式數據
服務端只要返回流式數據即可,如Flux<T>
、Flowable<T>
等。為了方便演示,我們通過按鈕出發接收流式數據。服務端和上面的案例相同。
JS代碼
$("button[name='send']").click(function (){let eventSource = new EventSource("/stream");eventSource.onmessage = function (event) {console.log("數據打印:" + event.data);}
});
結果
我們可以看到數據是一條一條返回給前端的。
4.2 服務端推數據給前端
不使用響應式編程的情況下,如何將數據逐步推給前端呢?那就通過org.springframework.web.servlet.mvc.method.annotation.SseEmitter
,手動推送給前端。
前端的JS
代碼只需要訂閱指定的路徑,然后監聽消息即可。寫法同4.1
。
JS代碼
$("button[name='send']").click(function (){let eventSource = new EventSource("/subscription");eventSource.onmessage = function (event) {console.log("數據打印:" + event.data);}
});
服務端
SseEmitter sseEmitter;@GetMapping("/subscription")
public SseEmitter sseEmitter() {sseEmitter = new SseEmitter();return sseEmitter;
}@PostConstruct
public void timer() {new Timer().schedule(new TimerTask() {@Overridepublic void run() {if (sseEmitter != null) {try {sseEmitter.send("sseEmitter_" + new Random().nextFloat());} catch (IOException e) {throw new RuntimeException(e);}}}}, 1000, 1000);
}
通過/subscription
訂閱,并生成對應的客戶端。服務端推送消息,都是通過這個客戶端推送的。timer()
定時器模擬數據自動推送。
我們直接看前端的效果:
當然我們對接業務中需要考慮連接超時的問題,以及頁面多開客戶端區分的問題。這部分內容可以參考之前的文檔:SSE 推送技術
05 小結
流式數據對于大數據量的處理展示比較常見,包括Mybaits
也支持流式查詢,防止頻繁的數據庫連接池的打開和關閉。流式處理一般都是長連接,長連接勢必帶來資源的占用。用不好,可能造成連接池沾滿、內存溢出等意想不到的問題。