Springboot整合SSE實現實時消息推送

SSE詳細介紹傳送門:SSE實時消息推送

簡單描述一下SSE推送在實際項目中應用的常見場景

1,項目頁面中有消息通知板塊,當信息有變化時,只有手動刷新頁面,才會看到最新的數據,這里可以采用SSE技術實時推送最新消息
.
2,大屏數據,這種場景是可以用SSE進行推送,但是需要注意的是SSE是單向的服務端向前端推數據,一般要求的是大屏基本沒有查詢框條件這種,比較合適。

注意點:如果對于實時數據要求很高并且連接要求做到安全穩定,這里推薦用WebSocket,一般來說對于數據量小,并發連接不是很高要求的情況下,SSE足夠,用而且SSE的配置對于前后端都比較簡單,但是WebSocket的配置對于后端來說需要花費比較多的時間去完善,而且WebSocket是比較消耗服務器資源和網絡帶寬資源的,另外一個,如果項目中運維配置了代理服務器的話,可能代理服務器也要配置一些支持WebSocket的屬性,總體來說WebSocket配置的位置比較多,容易出現各種坑bug,這里注意一下即可。

話不多說,總結一下Springboot整合SSE需要的步驟如下:

1,編寫SSE的服務類:主要包括建立連接、關閉連接、異常連接、心跳檢測、推送消息等
.
2,controller層寫入SSE連接和關閉接口
.
3,在所需要的業務模塊中直接調用SSE服務類中推送消息功能即可

SSE步驟簡單,無需導入maven依賴,踩坑bug少,主要是SSE內部支持斷線重連,爽爽爽

1,SSE服務類

package com.bosera.salesioc.home.sse;
import com.alibaba.fastjson.JSONObject;
import com.bosera.salesioc.domain.home.vo.MessageVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;@Slf4j
@Component
public class SseEmitterServer{private static final ConcurrentHashMap<String, Map<String,SseEmitter>> sseEmitterPool = new ConcurrentHashMap<>();private static final ConcurrentHashMap<String, Timer>  headerPool = new ConcurrentHashMap<>();public  static ConcurrentHashMap<String, Map<String, SseEmitter>> getSseEmitterPool(){return sseEmitterPool;}/*** 建立連接*/public  SseEmitter connect(String  userCode, String userId){log.info("******************開始建立連接*****************");//設置超時時間,0表示不過期,默認是30秒,超過時間未完成會拋出異常SseEmitter sseemitter = new SseEmitter(0L);//注冊回調sseemitter.onCompletion(completionCallBack(userCode,userId));sseemitter.onError(errorCallBack(userCode,userId));sseemitter.onTimeout(timeoutCallBack(userCode,userId));sseEmitterPool.computeIfAbsent(userCode, k -> new ConcurrentHashMap<>()).put(userId, sseemitter);// 開啟心跳活躍startHeartbeat(sseemitter,userId);return sseemitter;}/*** 關閉當前連接*/public void complete(String userCode, String userId){Map<String, SseEmitter> map = sseEmitterPool.get(userCode);if (map != null)map.get(userId).complete();}/*** 關閉所有連接*/public void completeAll(){if(!sseEmitterPool.isEmpty()){for (Map.Entry<String, Map<String, SseEmitter>> entry : sseEmitterPool.entrySet()) {Map<String, SseEmitter> userIdMap = entry.getValue();if(!userIdMap.isEmpty()){for (Map.Entry<String, SseEmitter> userIdEntry : userIdMap.entrySet()) {userIdEntry.getValue().complete();}}}sseEmitterPool.clear();}}private  Runnable completionCallBack(String userCode, String userId) {return () -> {removeUser(userCode,userId);log.info("{}結束連接:{}",userCode,userId);};}private  Runnable timeoutCallBack(String userCode, String userId){return ()->{removeUser(userCode,userId);log.error("{}連接超時:{}",userCode,userId);};}private  Consumer<Throwable> errorCallBack(String userCode, String userId){return throwable -> {log.error("{}連接異常:{}",userCode,userId);stopHeartbeat(userId);};}/*** 推送消息*/public  void sendMessage(String userCode, MessageVO message){Map<String, SseEmitter> map = sseEmitterPool.get(userCode);if (map != null) {for (Map.Entry<String, SseEmitter> entry : map.entrySet()) {try {// 發送事件entry.getValue().send(JSONObject.toJSONString(message));}catch (Exception e){log.error("{}連接信息:{}, 錯誤消息:{}",userCode,entry.getKey(),e.getMessage());}}}}private void removeUser(String userCode, String userId){try {Map<String, SseEmitter> map = sseEmitterPool.get(userCode);if (map != null) {map.remove(userId);// 如果該用戶的所有會話都已關閉,則移除整個映射if (map.isEmpty())sseEmitterPool.remove(userCode);}// 關閉心跳stopHeartbeat(userId);}catch (Exception e){log.error("關閉連接異常{}",e.getMessage());}}/*** 開啟心跳*/public void startHeartbeat(SseEmitter sseemitter, String userId) {Timer heartbeatTimer = new Timer();headerPool.put(userId,heartbeatTimer);heartbeatTimer.schedule(new TimerTask() {@Overridepublic void run() {if (Objects.nonNull(headerPool.get(userId))) {// 發送心跳:保持長連接try {sseemitter.send("connect active");} catch (Exception e) {log.error("connect active error");}}}}, 25000, 25000);}/*** 關閉心跳* @param userId*/public void stopHeartbeat(String userId) {Timer timer = headerPool.get(userId);if (timer!= null)timer.cancel();headerPool.remove(userId);}
}

推送的消息可以統一定義一個類來封裝信息
2,消息推送響應體

/*** @Author xiaozq* @Date 2024/2/21* @Description: 消息推送響應體*/
public class MessageVO<T> {// 主題:不同位置推送的內容不同private String topic;// 推送消息private T data;public void setTopic(String topic) {this.topic = topic;}public void setData(T data) {this.data = data;}public String getTopic() {return topic;}public T getData() {return data;}
}

3,controller層編寫連接和關閉接口

@RestController
@RequestMapping("/sse")
@Slf4j
public class SSEController{@Autowiredprivate SseEmitterServer sseEmitterServer;/*** 用于創建連接*/@GetMapping(value = "/connect/{userCode}/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter connect(@PathVariable("userCode") String userCode, @PathVariable("userId") String userId){return sseEmitterServer.connect(userCode, userId);}/*** 關閉連接*/@GetMapping(value = "/close/{userCode}/{userId}")public void close(@PathVariable("userCode") String userCode,@PathVariable("userId") String userId ) {sseEmitterServer.complete(userCode, userId);}}

4,業務中實際應用:推送消息

@Autowired
SseInfoService  sseInfoService;
private void handlerMessageInform() {ConcurrentHashMap<String, Map<String, SseEmitter>> sessionPool = SseEmitterServer.getSseEmitterPool();for (Map.Entry<String, Map<String, SseEmitter>> entry : sessionPool.entrySet()) {// 封裝消息MessageVO<List<MessageNotificationVO>> messageVO = new MessageVO();messageVO.setTopic(TopicTypeEnum.MESSAGE_INFORM.getTopic());messageVO.setData(messageService.getMessageList(request));// 推送消息sseEmitterServer.sendMessage(entry.getKey(), messageVO);}}

在實踐過程中存在的問題:

1,報錯504 gateway timeout:這里主要是原項目中配置了響應超時時間,不支持長連接,這里的做法是心跳活躍,保證連接不會被掐斷,可以寫一個定時任務,每天晚上定時去關閉所有連接,第二天用新的連接,這樣可以盡量保證內存的連接數不會過多占用內存,因為夜深人靜的時候誰還會打開web項目工作啊,哈哈太卷了吧,所以把時間定在晚上最好。
.
如果項目是集群模式的話,上述代碼就得改造了,建議是把消息推送這塊單獨抽出一個微服務模塊來,這樣子保證所有的連接統一走單獨的一個服務,因為SSE不是雙向的,既然是單項連接,與后端集群下的其中一個服務建立連接產生的IO流這是只屬于當前服務的本地IO,關閉IO只能連接對應的這臺服務去關閉,否則關閉失效。總之,考慮的點還有很多,一般情況下,SSE夠用啦

總體來說,應用是比較簡單的,涉及到消息實時推送相關的業務,可以嘗試SSE

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/711361.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/711361.shtml
英文地址,請注明出處:http://en.pswp.cn/news/711361.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Docker技術概論(1):Docker與虛擬化技術比較

Docker技術概論&#xff08;1&#xff09; Docker與虛擬化技術比較 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:https:…

深入解析Android-AutoLayout,2024安卓開發面試題及答案

前言 如果你也學習Android&#xff0c;那么你大概率會看過我的文章。經常有讀者給我留言&#xff1a;“該怎么學習Android&#xff1f;”、“日常學習Android的方法是什么”。 所以&#xff0c;今天&#xff0c;我將獻上一份《Android知識圖譜》&#xff0c;以自身的經驗 &…

ABAP 發送帶EXCEL郵件

前言 沒啥特殊需求&#xff0c;就是有個庫齡報表用戶想整郵件發送 實現 用的最簡單的XLS文件作為excel附件發送出去 觀察XLS文件的純文本格式&#xff0c;每列之間用TAB制表符分隔&#xff0c;每行之間用回車符分隔 思路也比較明確&#xff0c;在SAP中實現這種格式&#xf…

.Net利用Microsoft.Extensions.DependencyInjection配置依賴注入

一、概述 為了讓接口程序更加模塊化和可測試,采用依賴注入的方式調用接口方法。 二、安裝Microsoft.Extensions.DependencyInjection 在NuGet里面搜索Microsoft.Extensions.DependencyInjection,并進行安裝。 三、代碼編寫 3.1 創建Service 實現類 /*****************…

【跨境電商須知】FP獨立站的特點和痛點有哪些?

無論是做獨立站&#xff0c;還是做亞馬遜&#xff0c;都有各自的難點。自己做獨立站若要在跨境行業長足發展&#xff0c;既要知道FP獨立站有什么特點&#xff0c;要清楚FP獨立站的痛點并一一克服。 一、FP獨立站的特點 與依賴第三方平臺相比&#xff0c;擁有自己的域名、服務器…

Doccano 修復 spacy.gold 的bug

引言 最初只是想把Doccano標注的數據集轉換成BIO(類似conll2003數據集)的標注格式&#xff1b; 摘要 可先閱讀一下教程&#xff1a;【已解決】關于如何將Doccano標注的文本轉換成NER模型可以直接處理的CoNLL 2003格式 裝包:pip install doccano-transformer 報錯信息 運行…

Adam優化算法

Adam算法&#xff08;Adaptive Moment Estimation&#xff09;是一種用于深度學習模型優化的算法&#xff0c;它結合了動量&#xff08;Momentum&#xff09;和RMSprop&#xff08;Root Mean Square Propagation&#xff09;的概念。Adam算法自2015年提出以來&#xff0c;因其高…

【前端素材】推薦優質后臺管理系統DAdmin平臺模板(附源碼)

一、需求分析 1、系統定義 后臺管理系統是一種用于管理網站、應用程序或系統的管理界面&#xff0c;通常由管理員和工作人員使用。它提供了訪問和控制網站或應用程序后臺功能的工具和界面&#xff0c;使其能夠管理用戶、內容、數據和其他各種功能。 2、功能需求 后臺管理系…

FreeCAD|讀取STEP、創建平面、相交、瓶子

FreeCAD是一個基于OpenCASCADE的開源CAD/CAE工具。OpenCASCADE是一套開源的CAD/CAM/CAE幾何模型核心&#xff0c;來自法國Matra Datavision公司&#xff0c;是著名的CAD軟件EUCLID的開發平臺。FreeCAD可運行于Windows以及Linux系統環境下&#xff0c;是一種通用的3D CAD建模工具…

記錄 關于navicat連接數據庫報錯1045的問題

重裝數據庫之后就連接不上了 報錯1045 而網上的解決方案大都是更改數據庫密碼&#xff0c;但是我在第一步就被卡住無法更改密碼&#xff0c;輸入指令也報錯&#xff0c;檢查的環境變量也沒錯&#xff0c;經過長時間的試錯終于找到解決了辦法 解決辦法 刪除data文件夾 如果無法…

積累:Qt 多種數據類型之間的轉換方法

前言 開發時經常涉及到數據類型的轉換&#xff0c;為方便溫故知新、提升開發效率&#xff0c;現將 Qt 開發部分常用的數據類型轉換方式形成工具文檔供查詢、參考。 1. int 轉 QString 1&#xff09;函數&#xff1a;QString::number 2&#xff09;函數原型 //將數字&#xff0…

LD: 利用Plink軟件進行連鎖不平衡計算和繪圖

輸入文件詳解 PLINK主要使用以下三種文件格式: .ped文件:文本文件,列出所有樣本的基因型數據。每行代表一個樣本,包含個體和家系信息,以及其對應的基因型數據。.map文件:文本文件,與.ped文件配合使用,列出了基因型數據中所有SNP的位置信息。每行代表一個SNP,包含染色…

Python:練習:輸出int值a占b的百分之幾。例如:輸入1和4,輸出:25%。

案例&#xff1a; 輸出int值a占b的百分之幾。例如&#xff1a;輸入1和4&#xff0c;輸出&#xff1a;25%。 思考&#xff1a; 所有的一步步思考&#xff0c;最后綜合起來。 首先&#xff0c;確定 輸出&#xff0c;那么就用input&#xff0c;而且是int值&#xff0c;所以肯定…

springboot2.6.5 下配置ForkJoinPool線程池大小

從java1.7開始&#xff0c;引入了parallelStream的方式使用ForkJoinPool多線程處理數據的方式&#xff0c;ForkJoinPool默認線程池大小是cpu內核數-1&#xff0c;并且可以通過以下方式配置線程池大小&#xff1a; System.setProperty("java.util.concurrent.ForkJoinPool…

C++設計模式_創建型模式_工廠方法模式

目錄 C設計模式_創建型模式_工廠方法模式 一、簡單工廠模式 1.1 簡單工廠模式引入 1.2 簡單工廠模式 1.3 簡單工廠模式利弊分析 1.4 簡單工廠模式的UML圖 二、工廠方法模式 2.1 工廠模式和簡單工廠模式比較 2.2 工廠模式代碼實現 2.3 工廠模式UML 三、抽象工廠模式 3.1 戰斗場景…

MDS300-16-ASEMI整流模塊MDS300-16參數、封裝、尺寸

編輯&#xff1a;ll MDS300-16-ASEMI整流模塊MDS300-16參數、封裝、尺寸 型號&#xff1a;MDS300-16 品牌&#xff1a;ASEMI 封裝&#xff1a;M25 最大重復峰值反向電壓&#xff1a;1600V 最大正向平均整流電流(Vdss)&#xff1a;300A 功率(Pd)&#xff1a;大功率 芯片…

centos 安裝 glibc2.25

在 CentOS 7 系統上安裝 glibc 2.25 需要非常謹慎&#xff0c;因為 glibc 是系統核心庫之一&#xff0c;升級它可能導致與系統其他組件的兼容性問題。CentOS 7 自帶的 glibc 版本較低&#xff0c;直接替換為高版本可能會導致依賴于舊版 glibc 的系統軟件崩潰。 以下是一般情況…

Flink——芒果TV的實時數倉建設實踐

目錄 一、芒果TV實時數倉建設歷程 1.1 階段一&#xff1a;Storm/Flink JavaSpark SQL 1.2 階段二&#xff1a;Flink SQLSpark SQL 1.3 階段三&#xff1a;Flink SQLStarRocks 二、自研Flink實時計算調度平臺介紹 2.1 現有痛點 2.2 平臺架構設計 三、Flink SQL實時數倉分…

面試筆記系列三之spring基礎知識點整理及常見面試題

目錄 如何實現一個IOC容器? 說說你對Spring 的理解&#xff1f; 你覺得Spring的核心是什么&#xff1f; 說一下使用spring的優勢&#xff1f; Spring是如何簡化開發的&#xff1f; IOC 運行時序 prepareRefresh() 初始化上下文環境 obtainFreshBeanFactory() 創建并…

Linux系統加固:如何有效管理系統賬號

Linux系統加固&#xff1a;如何有效管理系統賬號 1.1 口令重復次數限制1.2 避免系統存在uid相同的賬號1.3 空密碼的帳戶1.4 口令復雜度1.5 口令生存期1.6 登錄失敗次數鎖定策略 &#x1f496;The Begin&#x1f496;點點關注&#xff0c;收藏不迷路&#x1f496; 在Linux系統中…