SpringBoot + SSE 實時異步流式推送

前言

在當今數字化時代,實時數據處理對于企業的決策和運營至關重要。許多業務場景需要及時響應數據庫中的數據變化,例如電商平臺實時更新庫存、金融系統實時監控交易數據等。

本文將詳細介紹如何通過Debezium捕獲數據庫變更事件,并利用Server - Sent Events(SSE)將這些變更實時推送給前端應用。

技術背景

+----------------+ ? ? ? ? ?+----------------+ ? ? ? ? ?+----------------+ ?
| ? MySQL 數據庫 ?| 監聽變更 ?| ?SpringBoot 服務 ?| ?推送變更 ?| ? ?Web 前端 ? ? | ?
| ?(Binlog 模式) ?| ------> ?| (Debezium CDC) | ------> ?| (EventSource) ?| ?
+----------------+ ? ? ? ? ?+----------------+ ? ? ? ? ?+----------------+ ?
  • Debezium?是一個開源的分布式平臺,它能夠監控數據庫的變化,并將這些變化以事件流的形式發送出去。它支持多種數據庫,如?MySQL、PostgreSQL?等,通過模擬數據庫的復制協議來實現對數據庫變更的實時捕獲。

  • Server - Sent Events(SSE)是一種允許網頁自動獲取服務器推送更新的技術。它基于?HTTP?協議,通過一個單向的連接,服務器可以持續向客戶端發送事件流數據,非常適合實時數據推送的場景。

環境準備

MySQL 配置
-- 啟用 Binlog(ROW 模式) ?
SET GLOBAL log_bin = ON; ?
SET GLOBAL binlog_format =?'ROW'; ?-- 創建 CDC 用戶(需 REPLICATION 權限) ?
CREATE USER?'cdc_user'?IDENTIFIED BY?'cdc_pass'; ?
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO?'cdc_user'; ?
?

配置要點:確保 Binlog 記錄行級變更?

引入依賴
<dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>1.6.0.Final</version>
</dependency>
<dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>1.6.0.Final</version>
</dependency>

核心代碼實現

Debezium 監聽服務
@Slf4j
@Component
public class BinlogListener {@Autowiredprivate SseService sseService;@PostConstructpublic void?start() {Configuration config = Configuration.create().with("name",?"mysql-connector-1").with("connector.class",?"io.debezium.connector.mysql.MySqlConnector").with("offset.storage",?"org.apache.kafka.connect.storage.FileOffsetBackingStore").with("offset.storage.file.filename",?"D:\\usr\\debezium\\mysql-offsets.dat").with("offset.flush.interval.ms",?"10000").with("database.server.name",?"mysql-connector-1").with("database.hostname",?"localhost").with("database.port",?"3306").with("database.user",?"root").with("database.password",?"root").with("database.server.id",?"1").with("database.include.list",?"scf").with("table.include.list",?"scf.user").with("include.schema.changes",?"false").with("snapshot.mode",?"initial").with("database.history.skip.unparseable.ddl",?"true") // 忽略解析錯誤.with("database.connection.attempts",?"5") // 最大重試次數.with("database.connection.backoff.ms",?"10000") // 重試間隔 10s.with("database.history",?"io.debezium.relational.history.FileDatabaseHistory").with("database.history.file.filename",?"D:\\usr\\debezium\\mysql-history.dat").build();EmbeddedEngine engine = EmbeddedEngine.create().using(config).notifying(this::handleEvent).build();Executors.newSingleThreadExecutor().execute(engine::run);}private void handleEvent(SourceRecord record) {Struct value = (Struct) record.value();Struct after = value.getStruct("after");// 轉換為 Map 并序列化Map<String, Object> dataMap = new HashMap<>();dataMap.put("id", after.getString("id"));dataMap.put("name", after.getString("name"));dataMap.put("age", after.getInt32("age"));sseService.broadcast(JSON.toJSONString(dataMap));}
}
SSE 推送服務
@Service ?
public class SseService { ?private final Set<SseEmitter> emitters = ConcurrentHashMap.newKeySet(); ?public SseEmitter?subscribe() { ?SseEmitter emitter = new SseEmitter(60_000L); ?emitter.onCompletion(() -> emitters.remove(emitter)); ?emitters.add(emitter); ?return?emitter; ?} ?public void broadcast(String data) { ?emitters.forEach(emitter -> { ?try { ?emitter.send(SseEmitter.event() ?.data(data) ?.id(UUID.randomUUID().toString())); ?} catch (IOException e) { ?emitter.completeWithError(e); ?} ?}); ?} ?
} ?
控制器層
@RestController ?
@RequestMapping("/sse") ?
public class SseController { ?@Autowired ?private SseService sseService; ?@GetMapping(value =?"/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) ?public SseEmitter?stream() { ?return?sseService.subscribe(); ?} ?
} ?

前端實現

<html lang="en">
<head><meta charset="UTF-8"><title>實時數據推送測試</title>
</head>
<body>
<div id="updates"></div>
<script>const eventSource = new EventSource('/sse/stream');eventSource.onmessage = e => {const data = JSON.parse(e.data);document.getElementById('updates').innerHTML +=`<p>用戶變更: ID=${data.id}, 姓名=${data.name}</p>`;};eventSource.onerror = e => console.error("SSE 錯誤:", e);
</script>
</body>
</html>?

測試數據變更

圖片

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/81042.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/81042.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/81042.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ADS1299模擬前端(AFE)代替芯片——LHE7909

在現代醫療科技的飛速發展中&#xff0c;精確的生物電勢測量設備變得越來越重要。領慧立芯推出的LHE7909&#xff0c;是一款專為心電圖&#xff08;ECG&#xff09;和其他生物電勢測量設計的低噪聲24位模數轉換器&#xff08;ADC&#xff09;&#xff0c;為醫療設備制造商提供了…

如何實現Redis和Mysql中數據雙寫一致性

一、引言 今天我們來聊聊一個在分布式系統中非常常見但又十分棘手的問題——Redis與MySQL之間的雙寫一致性。我們在項目中多多少少都遇到過類似的困擾&#xff0c;緩存是用Redis&#xff0c;數據庫是用MySQL&#xff0c;但如何確保兩者之間的數據一致性呢&#xff1f;接下來我…

面試篇 - Transformer前饋神經網絡(FFN)使用什么激活函數?

1. FFN結構分解 原始Transformer的FFN層 FFN(x) max(0, xW? b?)W? b? # 原始論文公式 輸入&#xff1a;自注意力層的輸出 x&#xff08;維度 d_model512&#xff09; 擴展層&#xff1a;xW? b?&#xff08;擴展為 d_ff2048&#xff09; 激活函數&#xff1a;Re…

基于Python Flask的深度學習電影評論情感分析可視化系統(2.0升級版,附源碼)

博主介紹&#xff1a;?IT徐師兄、7年大廠程序員經歷。全網粉絲15W、csdn博客專家、掘金/華為云//InfoQ等平臺優質作者、專注于Java技術領域和畢業項目實戰? &#x1f345;文末獲取源碼聯系&#x1f345; &#x1f447;&#x1f3fb; 精彩專欄推薦訂閱&#x1f447;&#x1f3…

前端vue2修改echarts字體為思源黑體-避免侵權-可以更換為任意字體統一管理

1.下載字體 npm install fontsource/noto-sans-sc 不知道為什么我從github上面下載的不好使&#xff0c;所以就用了npm的 2.引用字體 import fontsource/noto-sans-sc; 在入口文件-main.js中引用 3.設置echats模板樣式 import * as echarts from echarts; // 在import的后…

51c自動駕駛~合集37

我自己的原文哦~ https://blog.51cto.com/whaosoft/13878933 #DETR->DETR3D->Sparse4D 走向長時序稀疏3D目標檢測 一、DETR 圖1 DETR架構 DETR是第一篇將Transformer應用到目標檢測方向的算法。DETR是一個經典的Encoder-Decoder結構的算法&#xff0c;它的骨干網…

【MongoDB篇】MongoDB的集合操作!

目錄 引言第一節&#xff1a;集合的“誕生”——自動出現還是手動打造&#xff1f;&#x1f914;第二節&#xff1a;集合的“查閱”——看看這個數據庫里有哪些柜子&#xff1f;&#x1f4c2;&#x1f440;第三節&#xff1a;集合的“重命名”——給文件柜換個名字&#xff01;…

Goland終端PowerShell命令失效

Goland終端Terminal的PowerShell不能使用&#xff0c;明明windows上升級了PowerShell 7設置了配置文件&#xff0c;但是只能在windows終端下使用&#xff0c;goland終端下直接失效報錯&#xff0c;安裝升級PowerShell請看Windows11終端升級PowerShell7 - HashFlag - 博客園 問…

簡單分析自動駕駛發展現狀與挑戰

一、技術進展與市場滲透 技術分級與滲透率 當前量產乘用車的自動駕駛等級以L2為主&#xff08;滲透率約51%&#xff09;&#xff0c;L3級處于初步落地階段&#xff08;滲透率約20%&#xff09;&#xff0c;而L4級仍處于測試和示范運營階段&#xff08;滲透率約11%&#xff09;2…

【C++類和數據抽象】消息處理示例(1):從設計模式到實戰應用

目錄 一、數據抽象概述 二、消息處理的核心概念 2.1 什么是消息處理&#xff1f; 2.2 消息處理的核心目標 三、基于設計模式的消息處理實現 3.1 觀察者模式&#xff08;Observer Pattern&#xff09; 3.2 命令模式&#xff08;Command Pattern&#xff09; 四、實戰場景…

【統計方法】交叉驗證:Resampling, nested 交叉驗證等策略 【含R語言】

Resampling (重采樣方法) 重采樣方法是從訓練數據中反復抽取樣本&#xff0c;并在每個&#xff08;重新&#xff09;樣本上重新調整模型&#xff0c;以獲得關于擬合模型的附加信息的技術。 兩種主要的重采樣方法 Cross-Validation (CV) 交叉驗證 &#xff1a; 用于估計測試誤…

常見的 CSS 知識點整理

1. 盒模型&#xff08;Box Model&#xff09;是什么&#xff1f;標準盒模型和 IE 盒模型的區別&#xff1f; 答案&#xff1a; CSS 盒模型將元素視為一個盒子&#xff0c;由內容&#xff08;content&#xff09;、內邊距&#xff08;padding&#xff09;、邊框&#xff08;bor…

Educational Codeforces Round 178 div2(題解ABCDE)

A. Three Decks #1.由于最后三個數會相等&#xff0c;提前算出來和&#xff0c;%3判斷&#xff0c;再判前兩個數是否大于 #include<iostream> #include<vector> #include<stdio.h> #include<map> #include<string> #include<algorithm> #…

如何創建一個導入模板?全流程圖文解析

先去找到系統內可以上傳東西的按鈕 把你的模板上傳上去,找到對應的fileName 圖里的文字寫錯了,是復制粘貼"filePath"到URL才能下載

通信原理第七版與第六版區別附pdf

介紹 我用夸克網盤分享了「通信原理 第7版》樊昌信」&#xff0c;鏈接&#xff1a;https://pan.quark.cn/s/be7c5af4cdce 《通信原理&#xff08;第7版&#xff09;》是在第6版的基礎上&#xff0c;為了適應當前通信技術發展和教學需求&#xff0c;并吸取了數十所院校教師的反…

Mysql唯一性約束

唯一性約束&#xff08;Unique Constraint&#xff09;是數據庫設計中用于保證表中某一列或多列組合的值具有唯一性的一種規則。它可以防止在指定列中插入重復的數據&#xff0c;有助于維護數據的完整性和準確性。下面從幾個方面為你詳細解釋 作用 確保數據準確性&#xff1a…

測試基礎筆記第十六天

提示&#xff1a;文章寫完后&#xff0c;目錄可以自動生成&#xff0c;如何生成可參考右邊的幫助文檔 文章目錄 一、UI自動化介紹1.認識UI自動化測試2.實施UI自動化測試前置條件3.UI自動化測試執行時機4.UI自動化測試核心作用和劣勢 二、認識Web自動化測試工具-Selenium021.Sel…

PaddleX的安裝

參考&#xff1a;安裝PaddlePaddle - PaddleX 文檔 1、安裝PaddlePaddle 查看 docker 版本 docker --version 若您通過 Docker 安裝&#xff0c;請參考下述命令&#xff0c;使用飛槳框架官方 Docker 鏡像&#xff0c;創建一個名為 paddlex 的容器&#xff0c;并將當前工作目…

長效住宅IP是什么?如何獲取長效住宅IP?

在當今的互聯網世界里&#xff0c;IP地址作為連接用戶與網站之間的橋梁&#xff0c;其重要性不言而喻。對于跨境電商、社交媒體運營以及數據采集等領域的專業人士而言&#xff0c;普通的IP地址已無法滿足日益復雜的需求。他們更需要一種穩定、安全且持久的長效住宅IP來完成各類…

02 業務流程架構

業務流程架構提供了自上而下的組織鳥瞰圖&#xff0c;是業務流程的全景圖。根據所采用的方法不同&#xff0c;有時被稱為流程全景圖或高層級流程圖&#xff0c;提供了業務運營中所有業務流程的整體視圖。 這樣有助于理解企業內部各個業務流程之間的相互關系以及它們如何共同工…