SSE(Server-Send-Event)服務端推送數據技術
大家是否遇到過服務端需要主動傳輸數據到客戶端的情況,目前有三種解決方案。
- 客戶端輪詢更新數據。
- 服務端與客戶端建立 Socket 連接雙向通信
- 服務端與客戶建立 SSE 連接單向通信
幾種方案的比較:
-
輪詢:
客戶端通過頻繁請求向服務端請求數據,達到類似實時更新的效果。輪詢的優點是實現簡單,但是會給服務端和網絡帶來額外的壓力,且延遲較高。
-
WebSocket連接:
服務端與客戶端建立Socket連接進行數據傳輸,Socket的傳輸方式是全雙工的。WebSocket是基于 TCP 的長連接,和HTTP 協議相比,它能實現輕量級的、低延遲的數據傳輸,非常適合實時通信場景,主要用于交互性強的雙向通信。
-
SSE推送:
SSE(Server-Sent Events)是一種基于 HTTP 協議的推送技術,只允許單向通訊。相較于 WebSocket,SSE 更簡單、更輕量級。
下面是SpringBoot使用SSE的步驟和示例代碼
-
配置依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency>
SSE已經集成到spring-web中,所以可以直接使用。
-
后端代碼
import com.wry.wry_test.service.SseService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.validation.constraints.NotBlank; import java.util.concurrent.CompletableFuture;@RestController @RequestMapping("/sse") @Slf4j @Validated public class SseTestController {@Autowiredprivate SseService service;@GetMapping("/testSse")public SseEmitter testSse(@RequestParam("clientId") @NotBlank(message = "客戶端id不能為空") String clientId) {final SseEmitter emitter = service.getConn(clientId);CompletableFuture.runAsync(() -> {try {service.send(clientId);log.info("建立連接成功!clientId = {}", clientId);} catch (Exception e) {log.error("推送數據異常");}});return emitter;}@GetMapping("/sseConection")public SseEmitter createConnection(@RequestParam("clientId") @NotBlank(message = "客戶端id不能為空") String clientId) {return service.getConn(clientId);}@GetMapping("/sendMsg")public void sendMsg(@RequestParam("clientId") String clientId) {try {// 異步發送消息CompletableFuture.runAsync(() -> {try {service.send(clientId);} catch (Exception e) {log.error("推送數據異常");}});} catch (Exception e) {e.printStackTrace();}}@GetMapping("/sendMsgToAll")public void sendMsgToAll() {try {//異步發送消息CompletableFuture.runAsync(() -> {try {service.sendToAll();} catch (Exception e) {e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}@GetMapping("closeConn/{clientId}")public String closeConn(@PathVariable("clientId") @NotBlank(message = "客戶端id不能為空") String clientId) {service.closeConn(clientId);return "連接已關閉";}}
package com.wry.wry_test.service;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.validation.constraints.NotBlank;public interface SseService {/*** 獲取連接* @param clientId 客戶端id* @return*/SseEmitter getConn(String clientId);/*** 發送消息到指定客戶端* @param clientId 客戶端id* @throws Exception*/void send(String clientId);/*** 發送消息到所有SSE客戶端* @throws Exception*/void sendToAll() throws Exception;/*** 關閉指定客戶端的連接* @param clientId 客戶端id*/void closeConn(String clientId); }
package com.wry.wry_test.service.impl;import com.wry.wry_test.service.SseService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.validation.constraints.NotBlank; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap;@Service @Slf4j public class SseServiceImpl implements SseService {private static final Map<String, SseEmitter> SSE_CACHE = new ConcurrentHashMap<>();@Overridepublic SseEmitter getConn(@NotBlank String clientId) {final SseEmitter sseEmitter = SSE_CACHE.get(clientId);if (sseEmitter != null) {return sseEmitter;} else {// 設置連接超時時間,需要配合配置項 spring.mvc.async.request-timeout: 600000 一起使用final SseEmitter emitter = new SseEmitter(600_000L);// 注冊超時回調,超時后觸發emitter.onTimeout(() -> {log.info("連接已超時,正準備關閉,clientId = {}", clientId);SSE_CACHE.remove(clientId);});// 注冊完成回調,調用 emitter.complete() 觸發emitter.onCompletion(() -> {log.info("連接已關閉,正準備釋放,clientId = {}", clientId);SSE_CACHE.remove(clientId);log.info("連接已釋放,clientId = {}", clientId);});// 注冊異常回調,調用 emitter.completeWithError() 觸發emitter.onError(throwable -> {log.error("連接已異常,正準備關閉,clientId = {}", clientId, throwable);SSE_CACHE.remove(clientId);});SSE_CACHE.put(clientId, emitter);log.info("建立連接成功!clientId = {}", clientId);return emitter;}}/*** 模擬類似于 chatGPT 的流式推送回答** @param clientId 客戶端 id* @throws IOException 異常*/@Overridepublic void send(@NotBlank String clientId) {final SseEmitter emitter = SSE_CACHE.get(clientId);if (emitter == null) return;// 開始推送數據// todo 模擬推送數據for (int i = 0; i < 10000000; i++) {String msg = "SSE 測試數據";try {this.sseSend(emitter, msg, clientId);Thread.sleep(1000);} catch (Exception e) {log.error("推送數據異常", e);break;}}log.info("推送數據結束,clientId = {}", clientId);// 結束推流emitter.complete();}/*** 發送數據給所有連接*/public void sendToAll() {List<SseEmitter> emitters = new ArrayList<>(SSE_CACHE.values());for (int i = 0; i < 10000000; i++) {String msg = "SSE 測試數據";this.sseSend(emitters, msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void closeConn(@NotBlank String clientId) {final SseEmitter sseEmitter = SSE_CACHE.get(clientId);if (sseEmitter != null) {sseEmitter.complete();}}/*** 推送數據封裝** @param emitter sse長連接* @param data 發送數據* @param clientId 客戶端id*/private void sseSend(SseEmitter emitter, Object data, String clientId) {try {emitter.send(data);log.info("推送數據成功,clientId = {}", clientId);} catch (Exception e) {log.error("推送數據異常", e);throw new RuntimeException("推送數據異常");}}/*** 推送數據封裝** @param emitter sse長連接* @param data 發送數據*/private void sseSend(List<SseEmitter> emitter, Object data) {emitter.forEach(e -> {try {e.send(data);} catch (IOException ioException) {log.error("推送數據異常", ioException);}});log.info("推送數據成功");}}
實現效果如下:服務端不斷推送數據到前端,前端可以也可以調用接口主動關閉連接。
適用場景:SSE由于是服務端單向通訊,所以適合那種需要單向持久的連接。比如:
- ChatGPT這種實時加載會話數據
- 文件下載,通過SSE異步下載文件
- 服務端實時數據推送