前言
在當今數字化時代,實時數據處理對于企業的決策和運營至關重要。許多業務場景需要及時響應數據庫中的數據變化,例如電商平臺實時更新庫存、金融系統實時監控交易數據等。
本文將詳細介紹如何通過Debezium
捕獲數據庫變更事件,并利用Server - Sent Events(SSE)
將這些變更實時推送給前端應用。
技術背景
+----------------+ ? ? ? ? ?+----------------+ ? ? ? ? ?+----------------+ ?
| ? MySQL 數據庫 ?| 監聽變更 ?| ?SpringBoot 服務 ?| ?推送變更 ?| ? ?Web 前端 ? ? | ?
| ?(Binlog 模式) ?| ------> ?| (Debezium CDC) | ------> ?| (EventSource) ?| ?
+----------------+ ? ? ? ? ?+----------------+ ? ? ? ? ?+----------------+ ?
-
Debezium
?是一個開源的分布式平臺,它能夠監控數據庫的變化,并將這些變化以事件流的形式發送出去。它支持多種數據庫,如?MySQL、PostgreSQL
?等,通過模擬數據庫的復制協議來實現對數據庫變更的實時捕獲。 -
Server - Sent Events(SSE)
是一種允許網頁自動獲取服務器推送更新的技術。它基于?HTTP
?協議,通過一個單向的連接,服務器可以持續向客戶端發送事件流數據,非常適合實時數據推送的場景。
環境準備
MySQL 配置
-- 啟用 Binlog(ROW 模式) ?
SET GLOBAL log_bin = ON; ?
SET GLOBAL binlog_format =?'ROW'; ?-- 創建 CDC 用戶(需 REPLICATION 權限) ?
CREATE USER?'cdc_user'?IDENTIFIED BY?'cdc_pass'; ?
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO?'cdc_user'; ?
?配置要點:確保 Binlog 記錄行級變更?
引入依賴
<dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>1.6.0.Final</version>
</dependency>
<dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>1.6.0.Final</version>
</dependency>
核心代碼實現
Debezium 監聽服務
@Slf4j
@Component
public class BinlogListener {@Autowiredprivate SseService sseService;@PostConstructpublic void?start() {Configuration config = Configuration.create().with("name",?"mysql-connector-1").with("connector.class",?"io.debezium.connector.mysql.MySqlConnector").with("offset.storage",?"org.apache.kafka.connect.storage.FileOffsetBackingStore").with("offset.storage.file.filename",?"D:\\usr\\debezium\\mysql-offsets.dat").with("offset.flush.interval.ms",?"10000").with("database.server.name",?"mysql-connector-1").with("database.hostname",?"localhost").with("database.port",?"3306").with("database.user",?"root").with("database.password",?"root").with("database.server.id",?"1").with("database.include.list",?"scf").with("table.include.list",?"scf.user").with("include.schema.changes",?"false").with("snapshot.mode",?"initial").with("database.history.skip.unparseable.ddl",?"true") // 忽略解析錯誤.with("database.connection.attempts",?"5") // 最大重試次數.with("database.connection.backoff.ms",?"10000") // 重試間隔 10s.with("database.history",?"io.debezium.relational.history.FileDatabaseHistory").with("database.history.file.filename",?"D:\\usr\\debezium\\mysql-history.dat").build();EmbeddedEngine engine = EmbeddedEngine.create().using(config).notifying(this::handleEvent).build();Executors.newSingleThreadExecutor().execute(engine::run);}private void handleEvent(SourceRecord record) {Struct value = (Struct) record.value();Struct after = value.getStruct("after");// 轉換為 Map 并序列化Map<String, Object> dataMap = new HashMap<>();dataMap.put("id", after.getString("id"));dataMap.put("name", after.getString("name"));dataMap.put("age", after.getInt32("age"));sseService.broadcast(JSON.toJSONString(dataMap));}
}
SSE 推送服務
@Service ?
public class SseService { ?private final Set<SseEmitter> emitters = ConcurrentHashMap.newKeySet(); ?public SseEmitter?subscribe() { ?SseEmitter emitter = new SseEmitter(60_000L); ?emitter.onCompletion(() -> emitters.remove(emitter)); ?emitters.add(emitter); ?return?emitter; ?} ?public void broadcast(String data) { ?emitters.forEach(emitter -> { ?try { ?emitter.send(SseEmitter.event() ?.data(data) ?.id(UUID.randomUUID().toString())); ?} catch (IOException e) { ?emitter.completeWithError(e); ?} ?}); ?} ?
} ?
控制器層
@RestController ?
@RequestMapping("/sse") ?
public class SseController { ?@Autowired ?private SseService sseService; ?@GetMapping(value =?"/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) ?public SseEmitter?stream() { ?return?sseService.subscribe(); ?} ?
} ?
前端實現
<html lang="en">
<head><meta charset="UTF-8"><title>實時數據推送測試</title>
</head>
<body>
<div id="updates"></div>
<script>const eventSource = new EventSource('/sse/stream');eventSource.onmessage = e => {const data = JSON.parse(e.data);document.getElementById('updates').innerHTML +=`<p>用戶變更: ID=${data.id}, 姓名=${data.name}</p>`;};eventSource.onerror = e => console.error("SSE 錯誤:", e);
</script>
</body>
</html>?