SSE詳細介紹傳送門:SSE實時消息推送
簡單描述一下SSE推送在實際項目中應用的常見場景
1,項目頁面中有消息通知板塊,當信息有變化時,只有手動刷新頁面,才會看到最新的數據,這里可以采用SSE技術
實時推送最新消息
.
2,大屏數據,這種場景是可以用SSE進行推送,但是需要注意的是SSE是單向的服務端向前端推數據,一般要求的是大屏基本沒有查詢框條件這種,比較合適。
注意點:如果對于實時數據要求很高并且連接要求做到安全穩定,這里推薦用WebSocket
,一般來說對于數據量小,并發連接不是很高要求的情況下,SSE足夠
,用而且SSE的配置對于前后端都比較簡單,但是WebSocket的配置對于后端來說需要花費比較多的時間去完善,而且WebSocket是比較消耗服務器資源和網絡帶寬資源的,另外一個,如果項目中運維配置了代理服務器的話,可能代理服務器也要配置一些支持WebSocket的屬性,
總體來說WebSocket配置的位置比較多,容易出現各種坑bug,這里注意一下即可。
話不多說,總結一下Springboot整合SSE需要的步驟如下:
1,編寫SSE的服務類:主要包括建立連接、關閉連接、異常連接、心跳檢測、推送消息等
.
2,controller層寫入SSE連接和關閉接口
.
3,在所需要的業務模塊中直接調用SSE服務類中推送消息功能即可
SSE步驟簡單,無需導入maven依賴,踩坑bug少,主要是SSE內部支持斷線重連,爽爽爽
1,SSE服務類
package com.bosera.salesioc.home.sse;
import com.alibaba.fastjson.JSONObject;
import com.bosera.salesioc.domain.home.vo.MessageVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;@Slf4j
@Component
public class SseEmitterServer{private static final ConcurrentHashMap<String, Map<String,SseEmitter>> sseEmitterPool = new ConcurrentHashMap<>();private static final ConcurrentHashMap<String, Timer> headerPool = new ConcurrentHashMap<>();public static ConcurrentHashMap<String, Map<String, SseEmitter>> getSseEmitterPool(){return sseEmitterPool;}/*** 建立連接*/public SseEmitter connect(String userCode, String userId){log.info("******************開始建立連接*****************");//設置超時時間,0表示不過期,默認是30秒,超過時間未完成會拋出異常SseEmitter sseemitter = new SseEmitter(0L);//注冊回調sseemitter.onCompletion(completionCallBack(userCode,userId));sseemitter.onError(errorCallBack(userCode,userId));sseemitter.onTimeout(timeoutCallBack(userCode,userId));sseEmitterPool.computeIfAbsent(userCode, k -> new ConcurrentHashMap<>()).put(userId, sseemitter);// 開啟心跳活躍startHeartbeat(sseemitter,userId);return sseemitter;}/*** 關閉當前連接*/public void complete(String userCode, String userId){Map<String, SseEmitter> map = sseEmitterPool.get(userCode);if (map != null)map.get(userId).complete();}/*** 關閉所有連接*/public void completeAll(){if(!sseEmitterPool.isEmpty()){for (Map.Entry<String, Map<String, SseEmitter>> entry : sseEmitterPool.entrySet()) {Map<String, SseEmitter> userIdMap = entry.getValue();if(!userIdMap.isEmpty()){for (Map.Entry<String, SseEmitter> userIdEntry : userIdMap.entrySet()) {userIdEntry.getValue().complete();}}}sseEmitterPool.clear();}}private Runnable completionCallBack(String userCode, String userId) {return () -> {removeUser(userCode,userId);log.info("{}結束連接:{}",userCode,userId);};}private Runnable timeoutCallBack(String userCode, String userId){return ()->{removeUser(userCode,userId);log.error("{}連接超時:{}",userCode,userId);};}private Consumer<Throwable> errorCallBack(String userCode, String userId){return throwable -> {log.error("{}連接異常:{}",userCode,userId);stopHeartbeat(userId);};}/*** 推送消息*/public void sendMessage(String userCode, MessageVO message){Map<String, SseEmitter> map = sseEmitterPool.get(userCode);if (map != null) {for (Map.Entry<String, SseEmitter> entry : map.entrySet()) {try {// 發送事件entry.getValue().send(JSONObject.toJSONString(message));}catch (Exception e){log.error("{}連接信息:{}, 錯誤消息:{}",userCode,entry.getKey(),e.getMessage());}}}}private void removeUser(String userCode, String userId){try {Map<String, SseEmitter> map = sseEmitterPool.get(userCode);if (map != null) {map.remove(userId);// 如果該用戶的所有會話都已關閉,則移除整個映射if (map.isEmpty())sseEmitterPool.remove(userCode);}// 關閉心跳stopHeartbeat(userId);}catch (Exception e){log.error("關閉連接異常{}",e.getMessage());}}/*** 開啟心跳*/public void startHeartbeat(SseEmitter sseemitter, String userId) {Timer heartbeatTimer = new Timer();headerPool.put(userId,heartbeatTimer);heartbeatTimer.schedule(new TimerTask() {@Overridepublic void run() {if (Objects.nonNull(headerPool.get(userId))) {// 發送心跳:保持長連接try {sseemitter.send("connect active");} catch (Exception e) {log.error("connect active error");}}}}, 25000, 25000);}/*** 關閉心跳* @param userId*/public void stopHeartbeat(String userId) {Timer timer = headerPool.get(userId);if (timer!= null)timer.cancel();headerPool.remove(userId);}
}
推送的消息可以統一定義一個類來封裝信息
2,消息推送響應體
/*** @Author xiaozq* @Date 2024/2/21* @Description: 消息推送響應體*/
public class MessageVO<T> {// 主題:不同位置推送的內容不同private String topic;// 推送消息private T data;public void setTopic(String topic) {this.topic = topic;}public void setData(T data) {this.data = data;}public String getTopic() {return topic;}public T getData() {return data;}
}
3,controller層編寫連接和關閉接口
@RestController
@RequestMapping("/sse")
@Slf4j
public class SSEController{@Autowiredprivate SseEmitterServer sseEmitterServer;/*** 用于創建連接*/@GetMapping(value = "/connect/{userCode}/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter connect(@PathVariable("userCode") String userCode, @PathVariable("userId") String userId){return sseEmitterServer.connect(userCode, userId);}/*** 關閉連接*/@GetMapping(value = "/close/{userCode}/{userId}")public void close(@PathVariable("userCode") String userCode,@PathVariable("userId") String userId ) {sseEmitterServer.complete(userCode, userId);}}
4,業務中實際應用:推送消息
@Autowired
SseInfoService sseInfoService;
private void handlerMessageInform() {ConcurrentHashMap<String, Map<String, SseEmitter>> sessionPool = SseEmitterServer.getSseEmitterPool();for (Map.Entry<String, Map<String, SseEmitter>> entry : sessionPool.entrySet()) {// 封裝消息MessageVO<List<MessageNotificationVO>> messageVO = new MessageVO();messageVO.setTopic(TopicTypeEnum.MESSAGE_INFORM.getTopic());messageVO.setData(messageService.getMessageList(request));// 推送消息sseEmitterServer.sendMessage(entry.getKey(), messageVO);}}
在實踐過程中存在的問題:
1,報錯504 gateway timeout:這里主要是原項目中配置了響應超時時間,不支持長連接,這里的做法是心跳活躍,保證連接不會被掐斷,可以寫一個定時任務,每天晚上定時去關閉所有連接,第二天用新的連接,這樣可以盡量保證內存的連接數不會過多占用內存,因為夜深人靜的時候誰還會打開web項目工作啊,哈哈太卷了吧,所以把時間定在晚上最好。
.
如果項目是集群模式的話,上述代碼就得改造了,建議是把消息推送這塊單獨抽出一個微服務模塊來,這樣子保證所有的連接統一走單獨的一個服務,因為SSE不是雙向的,既然是單項連接,與后端集群下的其中一個服務建立連接產生的IO流這是只屬于當前服務的本地IO,關閉IO只能連接對應的這臺服務去關閉,否則關閉失效。總之,考慮的點還有很多,一般情況下,SSE夠用啦
總體來說,應用是比較簡單的,涉及到消息實時推送相關的業務,可以嘗試SSE