Springboot集成Debezium監聽postgresql變更

在這里插入圖片描述

1.創建springboot項目引入pom

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>1.4.2.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-postgres</artifactId><version>1.4.2.Final</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>1.4.2.Final</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.43</version></dependency></dependencies>

2.application.properties配置

# Debezium Configuration
debezium.name=my-postgres-connector
debezium.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
debezium.offset.storage.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\offsets_0.dat
debezium.offset.flush.interval.ms=60000
debezium.database.hostname=192.168.159.103
debezium.database.port=15432
debezium.database.user=postgres
debezium.database.password=123456
debezium.database.dbname=db_test
debezium.database.server.id=12345
debezium.database.server.name=customer-postgres-db-server
debezium.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.database.history.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\history_0.dat
debezium.table.include.list=public.user
debezium.column.include.list=public.user.id,public.user.name
debezium.publication.autocreate.mode=filtered
debezium.plugin.name=pgoutput
debezium.slot.name=dbz_customerdb_listener

3.配置類:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.io.IOException;@Configuration
public class DebeziumConnectorConfig {@Beanpublic io.debezium.config.Configuration customerConnector(Environment env) throws IOException {return io.debezium.config.Configuration.create().with("name", env.getProperty("debezium.name")).with("connector.class", env.getProperty("debezium.connector.class")).with("offset.storage", env.getProperty("debezium.offset.storage")).with("offset.storage.file.filename", env.getProperty("debezium.offset.storage.file.filename")).with("offset.flush.interval.ms", env.getProperty("debezium.offset.flush.interval.ms")).with("database.hostname", env.getProperty("debezium.database.hostname")).with("database.port", env.getProperty("debezium.database.port")).with("database.user", env.getProperty("debezium.database.user")).with("database.password", env.getProperty("debezium.database.password")).with("database.dbname", env.getProperty("debezium.database.dbname")).with("database.server.id", env.getProperty("debezium.database.server.id")).with("database.server.name", env.getProperty("debezium.database.server.name"))//.with("database.history", "io.debezium.relational.history.MemoryDatabaseHistory").with("database.history", env.getProperty("debezium.database.history")).with("database.history.file.filename", env.getProperty("debezium.database.history.file.filename")).with("table.include.list", env.getProperty("debezium.table.include.list")) //表名.with("column.include.list", env.getProperty("debezium.column.include.list")) // 表中得哪些字段.with("publication.autocreate.mode", env.getProperty("debezium.publication.autocreate.mode")).with("plugin.name", env.getProperty("debezium.plugin.name")).with("slot.name", env.getProperty("debezium.slot.name")).build();}
}

4.注冊監聽

import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;import static io.debezium.data.Envelope.FieldName.*;
import static java.util.stream.Collectors.toMap;@Slf4j
@Component
public class DebeziumListener {private final Executor executor = Executors.newSingleThreadExecutor();private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;public DebeziumListener(Configuration customerConnectorConfiguration) {this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(customerConnectorConfiguration.asProperties()).notifying(this::handleChangeEvent).build();}private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());Struct sourceRecordChangeValue= (Struct) sourceRecord.value();//log.info("SourceRecordChangeValue = '{}'",sourceRecordRecordChangeEvent);if (sourceRecordChangeValue != null) {Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));// 處理非讀操作if(operation != Envelope.Operation.READ) {String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;Struct struct = (Struct) sourceRecordChangeValue.get(record);Map<String, Object> payload = struct.schema().fields().stream().map(Field::name).filter(fieldName -> struct.get(fieldName) != null).map(fieldName -> Pair.of(fieldName, struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));// this.customerService.replicateData(payload, operation);log.info("Updated Data: {} with Operation: {}", payload, operation.name());}}}@PostConstructprivate void start() {this.executor.execute(debeziumEngine);}@PreDestroyprivate void stop() throws IOException {if (Objects.nonNull(this.debeziumEngine)) {this.debeziumEngine.close();}}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/74517.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/74517.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/74517.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

報錯 standard_init_linux.go:228: exec user process caused: exec format error

docker logs 容器名 報錯&#xff1a; standard_init_linux.go:228: exec user process caused: exec format error 或者 standard_init_linux.go:228: exec user process caused: input/output error 排查思路 1、檢查源鏡像的框架是否正確&#xff0c;是否amd64&#x…

Go 代理爬蟲

現在注冊&#xff0c;還送15美金注冊獎勵金 --- 亮數據-網絡IP代理及全網數據一站式服務商 使用代理服務器&#xff0c;通過 Colly、Goquery、Selenium 進行網絡爬蟲的基礎示例程序 本倉庫包含兩個分支&#xff1a; basic 分支包含供 Go Proxy Servers 這篇文章改動的基礎代碼…

STM32實現智能溫控系統(暖手寶):PID 算法 + DS18B20+OLED 顯示,[學習 PID 優質項目]

一、項目概述 本文基于 STM32F103C8T6 單片機&#xff0c;設計了一個高精度溫度控制系統。通過 DS18B20 采集溫度&#xff0c;采用位置型 PID 算法控制 PWM 輸出驅動 MOS 管加熱Pi膜&#xff0c;配合 OLED 實時顯示溫度數據。系統可穩定將 PI 膜加熱至 40℃&#xff0c;適用于…

neo4j知識圖譜常用命令

1. 查看所有節點和關系 如果你想查看圖數據庫中的所有節點和關系&#xff0c;可以使用以下查詢&#xff1a; Cypher 深色版本 MATCH (n)-[r]->(m) RETURN n, r, m n 和 m 表示節點。r 表示兩個節點之間的關系。這條命令會返回所有節點及其直接相連的關系。 2. 查看所有節…

從零開始:使用Luatools工具高效燒錄Air780EPM核心板項目的完整指南

本文將深入講解如何使用Luatools工具燒錄一個具體的項目到Air780EPM開發板中。如何使用官方推薦的Luatools工具&#xff08;一款跨平臺、命令行驅動的燒錄利器&#xff09;&#xff0c;通過“環境配置→硬件連接→參數設置→一鍵燒錄”四大步驟&#xff0c;幫助用戶實現Air780E…

2024年認證杯SPSSPRO杯數學建模C題(第二階段)云中的海鹽全過程文檔及程序

2024年認證杯SPSSPRO杯數學建模 C題 云中的海鹽 原題再現&#xff1a; 巴黎氣候協定提出的目標是&#xff1a;在2100年前&#xff0c;把全球平均氣溫相對于工業革命以前的氣溫升幅控制在不超過2攝氏度的水平&#xff0c;并為1.5攝氏度而努力。但事實上&#xff0c;許多之前的…

大疆上云api介紹

概述 目前對于 DJI 無人機接入第三方云平臺,主要是基于 MSDK 開發定制 App,然后自己定義私有上云通信協議連接到云平臺中。這樣對于核心業務是開發云平臺,無人機只是其中一個接入硬件設備的開發者來說,重新基于 MSDK 開發 App 工作量大、成本高,同時還需要花很多精力在無人…

云原生之開源遙測框架OpenTelemetry(在 Gin 框架中使用 OpenTelemetry 進行分布式追蹤和監控)

文章目錄 云原生之開源遙測框架OpenTelemetry背景什么是可觀測性&#xff1f; 什么是 OpenTelemetry&#xff1f;Opentelemetry的主要優勢有以下幾點&#xff1a;理解分布式鏈路日志Spans分布式鏈路 在 Gin 框架中使用 OpenTelemetry 進行分布式追蹤和監控0. 整體思路1. 初始化…

【藍橋杯速成】| 11.回溯 之 子集問題

題目一&#xff1a;子集 問題描述 78. 子集 - 力扣&#xff08;LeetCode&#xff09; 給你一個整數數組 nums &#xff0c;數組中的元素 互不相同 。返回該數組所有可能的子集&#xff08;冪集&#xff09;。 解集 不能 包含重復的子集。你可以按 任意順序 返回解集。 示例…

Nginx目錄結構

Nginx目錄結構 ? Nginx 的安裝目錄結構可能會因安裝方式&#xff08;如使用包管理器、源碼編譯等&#xff09;和操作系統的不同而有所差異。以下是通過在線安裝時&#xff0c;Nginx 默認的目錄結構&#xff0c;以及各目錄和文件的作用。 yum install nginx查詢nginx [rootRo…

2.(vue3.x+vite)使用vue-router

前端技術社區總目錄(訂閱之前請先查看該博客) 效果預覽 路由配置的“/”與“helloWorld”都可以訪問到以下內容 http://10.11.0.87:4000/#/ http://10.11.0.87:4000/#/helloWorld 1:安裝vue-router npm i vue-router 2:創建router文件 在src的目錄下創建router文件夾…

后端返回了 xlsx 文件流,前端怎么下載處理

當后端返回一個 .xlsx 文件流時&#xff0c;前端可以通過 JavaScript 處理這個文件流并觸發瀏覽器下載。 實現步驟 發送請求獲取文件流&#xff1a; 使用 fetch 或 axios 等工具向后端發送請求&#xff0c;確保響應類型設置為 blob&#xff08;二進制數據流&#xff09;。 創建…

HTML5拖拽功能教程

HTML5拖拽功能教程 簡介 HTML5引入了原生拖放(Drag and Drop)API&#xff0c;使開發者能夠輕松實現網頁中的拖拽功能&#xff0c;無需依賴第三方庫。拖拽功能可以大大提升用戶體驗&#xff0c;適用于文件上傳、列表排序、看板系統等多種交互場景。本教程將帶您全面了解HTML拖…

VUE3 路由配置

1.下載 VueRouter 模塊 在命令行中輸入 yarn add vue-router 2.導?相關函數 在自己創建的router/index.js 文件中 import { createRouter, createWebHashHistory } from vue-router 3.創建路由實例 在自己創建的router/index.js 文件中 const theFirstRouter ()>{return…

歷史序列影像 Esri的World Imagery Wayback簡介

Esri的World Imagery Wayback是一個專注于提供歷史衛星影像的在線平臺&#xff0c;由全球領先的地理信息系統&#xff08;GIS&#xff09;技術提供商Esri開發。該平臺整合了多源衛星影像數據&#xff0c;允許用戶回溯特定區域在不同時間點的影像變化&#xff0c;支持時間序列分…

golang結構體與指針類型

結構體與指針類型 指針類型字段 具名字段 舉例 package struct_knowledgeimport "fmt"//結構體字段為指針類型 func StructWithPoint(){type Student struct{name *string}var lisa Studentfmt.Printf("賦值前,Student的實例的值%#v\n",lisa)//錯誤的賦…

NetMizer-日志管理系統-遠程命令執行漏洞挖掘

漏洞描述&#xff1a;NetMizer 日志管理系統 cmd.php中存在遠程命令執行漏洞&#xff0c;攻擊者通過傳入 cmd參數即可命令執行 1.fofa搜素語句 title"NetMizer 日志管理系統" 2.漏洞驗證 網站頁面 驗證POC /data/manage/cmd.php?cmdid

Contactile三軸觸覺傳感器:多維力感賦能機器人抓取

在非結構化環境中&#xff0c;機器人對物體的精準抓取與操作始終面臨巨大挑戰。傳統傳感器因無法全面感知觸覺參數&#xff08;如三維力、位移、摩擦&#xff09;&#xff0c;難以適應復雜多變的場景。Contactile推出的三軸觸覺力傳感器&#xff0c;通過仿生設計與創新光學技術…

OpenCV三維解算常用方法C++

如果標定過程是通過OpenCV張正友標定法實現的&#xff0c;得到的內參外參保存在.txt文件中是這樣的形式&#xff1a; ① 內參intrinsics.txt&#xff1a; ② 外參extrinsics.txt&#xff1a; 那么可以通過如下方法讀取.txt文件獲取左右相機內外參&#xff0c;主要包括三維解算…

棧和隊列相關知識題目

棧的底層原理 棧&#xff08;Stack&#xff09;是一種后進先出&#xff08;LIFO&#xff09;?的線性數據結構&#xff0c;所有操作&#xff08;如插入、刪除&#xff09;僅在棧頂進行。它的底層實現可以是數組或鏈表&#xff0c;具體取決于編程語言和應用場景。 1.基于數組實…