Debezium嵌入式連接postgresql封裝服務

文章目錄

      • 1.項目結構:
      • 2.依賴:
      • 3.application.properties
      • 4.DebeziumConnectorConfig類
      • 5.TableEnum類
      • 6.TableHandler接口(表處理抽象)
      • 7.DefaultTableHandler默認實現類
      • 8.UserTableHandler處理類
      • 9.TableHandlerFactory工廠
      • 10.DebeziumListener 監聽事件
      • 11.測試

環境:JDK8,Debezium1.94,postgresql12

1.項目結構:

在這里插入圖片描述

2.依賴:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>spring-boot-starter-parent</artifactId><groupId>org.springframework.boot</groupId><version>2.7.18</version></parent><groupId>com.linging</groupId><artifactId>springboot-debezium-server</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><debezium.version>1.9.4.Final</debezium.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${debezium.version}</version><exclusions><exclusion><artifactId>slf4j-reload4j</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-postgres</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.43</version></dependency></dependencies></project>

3.application.properties

# Debezium Configuration
#連接器基本信息
#指定 Debezium 連接器的名稱,唯一標識,用于在 Kafka Connect 中區分不同的連接器。
debezium.name=my-postgres-connector
#指定連接器的類名,這里是連接postgresql
debezium.connector.class=io.debezium.connector.postgresql.PostgresConnector#偏移量相關
#指定偏移量存儲的實現類,這里使用的是文件存儲,將偏移量存儲在本地文件中。
debezium.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
#處理數據的偏移量存儲路徑
debezium.offset.storage.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\offsets_0.dat
#處理數據的偏移量提交時間間隔,單位毫秒,設置為 0 表示每次處理完一批記錄后立即提交偏移量,這可以減少數據丟失的風險,但會增加系統開銷
#以上一次提交時間開始計算
debezium.offset.flush.interval.ms=10000#數據庫連接信息
debezium.database.hostname=192.168.159.103
debezium.database.port=15432
debezium.database.user=postgres
debezium.database.password=123456
#捕獲變更的數據庫名稱
debezium.database.dbname=db_test
#指定邏輯服務器的唯一標識,用于區分不同的數據庫實例
debezium.database.server.id=postgresql_0
#指定邏輯服務器的名稱,用于在 Kafka 主題中區分不同的數據庫實例
debezium.database.server.name=customer_postgres_db_server#數據庫歷史記錄
#指定數據庫模式記錄的實現類,這里使用的是文件存儲,將數據庫歷史記錄存儲在本地文件中
debezium.database.history=io.debezium.relational.history.FileDatabaseHistory
#指定數據庫模式記錄文件的路徑
debezium.database.history.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\history_0.dat#表和字段過濾
#debezium.table.include.list=public.user
#debezium.column.include.list=public.user.id,public.user.name#其他配置
#指定是否自動創建 PostgreSQL 的邏輯復制槽,設置為 filtered 表示只捕獲配置中指定的表和字段的變更
debezium.publication.autocreate.mode=filtered
#指定 PostgreSQL 的邏輯復制插件名稱,pgoutput是 PostgreSQL 的邏輯復制插件名稱,用于捕獲變更
debezium.plugin.name=pgoutput
#指定邏輯復制槽的名稱
debezium.slot.name=dbz_customerdb_listener
#不執行初始快照,直接捕獲變更數據,取值:never、initial、when_needed
debezium.snapshot.mode=never
#批量提交條數
debezium.max.batch.size=100logging.level.root=INFO
logging.level.io.debezium.postgres.BinlogReader=INFO
logging.level.io.davidarhcanjo=DEBUG
logging.level.io.debezium=INFO

4.DebeziumConnectorConfig類

@Configuration
public class DebeziumConnectorConfig {@Beanpublic Properties customerConnector(Environment env) {Properties props = new Properties();props.setProperty("name", env.getProperty("debezium.name"));props.setProperty("connector.class", env.getProperty("debezium.connector.class"));props.setProperty("offset.storage", env.getProperty("debezium.offset.storage"));props.setProperty("offset.storage.file.filename", env.getProperty("debezium.offset.storage.file.filename"));props.setProperty("offset.flush.interval.ms", env.getProperty("debezium.offset.flush.interval.ms"));props.setProperty("database.hostname", env.getProperty("debezium.database.hostname"));props.setProperty("database.port", env.getProperty("debezium.database.port"));props.setProperty("database.user", env.getProperty("debezium.database.user"));props.setProperty("database.password", env.getProperty("debezium.database.password"));props.setProperty("database.dbname", env.getProperty("debezium.database.dbname"));props.setProperty("database.server.id", env.getProperty("debezium.database.server.id"));props.setProperty("database.server.name", env.getProperty("debezium.database.server.name"));props.setProperty("database.history", env.getProperty("debezium.database.history"));props.setProperty("database.history.file.filename", env.getProperty("debezium.database.history.file.filename"));props.setProperty("table.include.list", TableEnum.getTableNames()); //表名props.setProperty("column.include.list", TableEnum.getColumns()); // 表中得哪些字段props.setProperty("publication.autocreate.mode", env.getProperty("debezium.publication.autocreate.mode"));props.setProperty("plugin.name", env.getProperty("debezium.plugin.name"));props.setProperty("slot.name", env.getProperty("debezium.slot.name"));props.setProperty("snapshot.mode", env.getProperty("debezium.snapshot.mode"));props.setProperty("max.batch.size", env.getProperty("debezium.max.batch.size"));return props;}
}

5.TableEnum類

package com.linging.enums;import java.util.Arrays;
import java.util.stream.Collectors;/*** 監聽的表及字段配置* @author Linging* @version 1.0.0* @since 1.0*/
public enum TableEnum {DEFAULT("default", "defaultTableHandler", null),USER("public.user", "userTableHandler", "public.user.id,public.user.name"),;// 表名稱private final String tableName;// 表處理類的名稱private final String handlerName;// 表的字段名稱,多個用逗號隔開public final String columnName;TableEnum(String tableName, String handlerName, String columnName) {this.tableName = tableName;this.handlerName = handlerName;this.columnName = columnName;}public String getTableName() {return tableName;}public String getHandlerName() {return handlerName;}public String getColumnName() {return columnName;}public static String getTableNames(){return Arrays.stream(TableEnum.values()).map(TableEnum::getTableName).filter(name -> !"default".equals(name)).distinct().collect(Collectors.joining(","));}public static String getColumns(){return Arrays.stream(TableEnum.values()).filter(e -> !"default".equals(e.getTableName()) && e.getColumnName() != null).map(TableEnum::getColumnName).distinct().collect(Collectors.joining(","));}
}

6.TableHandler接口(表處理抽象)

public interface TableHandler {void handle(SourceRecord sourceRecord);void handleBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer);
}

7.DefaultTableHandler默認實現類

/*** 默認處理類*/
@Component("defaultTableHandler")
public class DefaultTableHandler implements TableHandler {private static final Logger log = LoggerFactory.getLogger(DefaultTableHandler.class);@Overridepublic void handle(SourceRecord sourceRecord) {log.info("Handling default table: {}", sourceRecord.topic());log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());}@Overridepublic void handleBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) {log.info("Handling batch default table: {}", recordChangeEvents.size());}
}

8.UserTableHandler處理類

/*** user表變更處理類*/
@Component("userTableHandler")
public class UserTableHandler implements TableHandler {private static final Logger log = LoggerFactory.getLogger(UserTableHandler.class);@Overridepublic void handle(SourceRecord sourceRecord) {log.info("Handling user table: {}", sourceRecord.topic());log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());// 添加具體的處理邏輯Struct sourceRecordChangeValue= (Struct) sourceRecord.value();if (sourceRecordChangeValue != null) {Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));// 處理非讀操作if(operation != Envelope.Operation.READ) {String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;Struct struct = (Struct) sourceRecordChangeValue.get(record);Map<String, Object> payload = struct.schema().fields().stream().map(Field::name).filter(fieldName -> struct.get(fieldName) != null).map(fieldName -> Pair.of(fieldName, struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));// TODO 處理邏輯(保存數據庫,發送MQ等操作,需要保證冪等)log.info("Updated Data: {} with Operation: {}", payload, operation.name());}}}@Overridepublic void handleBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) {for (RecordChangeEvent<SourceRecord> recordChangeEvent : recordChangeEvents) {try {SourceRecord sourceRecord = recordChangeEvent.record();// TODO 處理邏輯(保存數據庫,發送MQ等操作,需要保證冪等)this.handle(sourceRecord);// 標記已處理committer.markProcessed(recordChangeEvent);} catch (InterruptedException e) {log.error("處理異常:", e);}}}
}

9.TableHandlerFactory工廠

@Component
public class TableHandlerFactory implements ApplicationContextAware {@Value("${debezium.database.server.name}")private String prefixServerName;private ApplicationContext context;private final Map<String, TableHandler> handlers = new HashMap<>();@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.context = applicationContext;}@PostConstructpublic void init(){String name = null;for (TableEnum tableEnum : TableEnum.values()) {if(TableEnum.DEFAULT.equals(tableEnum)){name = tableEnum.getTableName();}else{name = getTableName(tableEnum.getTableName());}handlers.putIfAbsent(name,context.getBean(tableEnum.getHandlerName(), TableHandler.class));}}public TableHandler getHandler(String tableName) {return handlers.getOrDefault(tableName, handlers.get(TableEnum.DEFAULT.getTableName()));}public String getTableName(String name){return prefixServerName + "." + name;}}

10.DebeziumListener 監聽事件

@Component
public class DebeziumListener {private static final Logger log = LoggerFactory.getLogger(DebeziumListener.class);private final ExecutorService executor = Executors.newSingleThreadExecutor();private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;private final TableHandlerFactory tableHandlerFactory;@Autowiredpublic DebeziumListener(Properties customerConnector, TableHandlerFactory tableHandlerFactory) {this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(customerConnector).using(OffsetCommitPolicy.periodic(customerConnector)).notifying(this::handleChangeEventBatch).build();this.tableHandlerFactory = tableHandlerFactory;}/*** 批量記錄處理* @param recordChangeEvents* @param committer*/private void handleChangeEventBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer){// 根據表分組Map<String, List<RecordChangeEvent<SourceRecord>>> tableName2List = recordChangeEvents.stream().collect(Collectors.groupingBy(event -> event.record().topic()));tableName2List.forEach((tableName, recordChangeEventList) -> {TableHandler handler = tableHandlerFactory.getHandler(tableName);handler.handleBatch(recordChangeEventList, committer);});try {// 觸發提交策略committer.markBatchFinished();} catch (InterruptedException e) {log.error("提交異常:", e);}}/*** 單條記錄處理* @param sourceRecordChangeEvent*/private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordChangeEvent) {SourceRecord sourceRecord = sourceRecordChangeEvent.record();log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());String tableName = sourceRecord.topic();// 獲取對應的表處理類TableHandler handler = tableHandlerFactory.getHandler(tableName);handler.handle(sourceRecord);}@PostConstructprivate void start() {this.executor.execute(debeziumEngine);}@PreDestroyprivate void stop() {if (this.debeziumEngine != null) {try {this.debeziumEngine.close();} catch (IOException e) {log.error("關閉debeziumEngine異常:", e);}}this.executor.shutdown();}
}

11.測試

啟動服務,修改數據庫user表數據:
在這里插入圖片描述
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/75881.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/75881.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/75881.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

ER-圖,詳情和畫法

一、E-R圖的核心元素 1.實體 表示現實中對象或概念&#xff0c;用矩形表示 示例&#xff1a;用戶、老師、學生 2.屬性 描述實體的特征&#xff0c;用橢圓表示。 分為主鍵&#xff08;用戶id&#xff09; 和非主鍵&#xff08;用戶昵稱&#xff09; 3.關系 表示實體間的…

Windows Flip PDF Plus Corporate PDF翻頁工具

軟件介紹 Flip PDF Plus Corporate是一款功能強大的PDF翻頁工具&#xff0c;也被稱為名編輯電子雜志大師。這款軟件能夠迅速將PDF文件轉換為具有翻頁動畫效果的電子書&#xff0c;同時保留原始的超鏈接和書簽。無論是相冊、視頻、音頻&#xff0c;還是Flash、視頻和鏈接&#…

Linux文件系統中的Page Cache和內存管理中的Page之間的關系

Linux文件系統中的Page Cache和內存管理中的Page之間有密切的關聯&#xff0c;兩者在底層機制上緊密結合&#xff0c;共同實現高效的內存和文件系統管理。以下是它們的關系和關鍵點&#xff1a; 核心關系 Page Cache的底層是內存Page Page Cache是由內存管理中的物理內存頁&…

每日一個小病毒(C++)EnumChildWindows+shellcode

這里寫目錄標題 1. `EnumChildWindows` 的基本用法2. 如何利用 `EnumChildWindows` 執行 Shellcode?關鍵點:完整 Shellcode 執行示例3. 為什么 `EnumChildWindows` 能執行 Shellcode?4. 防御方法5. 總結EnumChildWindows 是 Windows API 中的一個函數,通常用于枚舉所有子窗…

AI爬蟲?爬!

1.你是否還在為大模型的key而感到憂傷和囊中羞澀&#xff0c;openrouter.ai&#xff0c;目前可免費白嫖多個大模型&#xff0c;代碼如下 from openai import OpenAIclient OpenAI(base_url"https://openrouter.ai/api/v1",api_key"", )completion clien…

洛谷題單3-P5720 【深基4.例4】一尺之棰-python-流程圖重構

題目描述 《莊子》中說到&#xff0c;“一尺之棰&#xff0c;日取其半&#xff0c;萬世不竭”。第一天有一根長度為 a a a 的木棍&#xff0c;從第二天開始&#xff0c;每天都要將這根木棍鋸掉一半&#xff08;每次除 2 2 2&#xff0c;向下取整&#xff09;。第幾天的時候木…

c++中的auto關鍵字

在 C 中&#xff0c;auto 是一個類型推斷關鍵字&#xff08;C11 引入&#xff09;&#xff0c;允許編譯器根據變量的初始化表達式自動推導其類型。它極大地簡化了代碼編寫&#xff0c;尤其在涉及復雜類型或模板的場景中。以下是 auto 的詳細說明&#xff1a; 1. 基本用法 1.1 …

開發指南111-關閉所有打開的子窗口

門戶系統是通過window.open通過單點登錄的模式打開子系統的&#xff0c;這就要求門戶系統退出時&#xff0c;關閉所有打開的子系統。 平臺處理這一問題的核心原理如下&#xff1a; 主窗口定義&#xff1a; allChildWindows:[], //所有子窗口 pushChildWindow(childWindow){ …

Kotlin語言進階:協程、Flow、Channel詳解(二)

Kotlin語言進階:協程、Flow、Channel詳解(二) 一、Flow基礎 1.1 什么是Flow Flow是Kotlin提供的用于處理異步數據流的解決方案,它建立在協程之上,具有以下特點: 冷流特性:只有在收集時才會開始發射數據背壓處理:自動處理生產者和消費者速度不匹配的問題組合操作:提…

mysql中my.cnf權限不能過大。否則無法生效

mysql 報錯 World-writable config file ‘/etc/my.cnf‘ is ignored. /etc/my.cnf 配置文件, 或著docker 掛載的配置文件(宿主機中的配置文件),權限過大 如是二進制啟動 chmod 644 /etc/my.cnf 如是docker啟動 chmod 644 /opt/docker-data/mysql/conf/my.cnf 重啟服務,就可…

Spring 中的 @Autowired 和 @Resource

&#x1f9e9; 一、Autowired 和 Resource 的基本作用 注解來源作用AutowiredSpring 提供&#xff08;org.springframework.beans.factory.annotation.Autowired&#xff09;按類型 自動注入ResourceJDK 提供&#xff08;javax.annotation.Resource&#xff09;默認按名稱 注入…

anomalib—2—輸入圖像大小調整

三個地方 第一&#xff1a;在定義model時&#xff0c;要在pre_processor里面去定義一個前處理&#xff0c;前處理就一個功能&#xff0c;定義圖像的大小 pre_processor0 Patchcore.configure_pre_processor( image_size (128, 128)) model Patchcore( backbone"wide_r…

JSX、支持HTML標簽、Ref的使用、虛擬DOM的使用

&#x1f90d; 前端開發工程師、技術日更博主、已過CET6 &#x1f368; 阿珊和她的貓_CSDN博客專家、23年度博客之星前端領域TOP1 &#x1f560; 牛客高級專題作者、打造專欄《前端面試必備》 、《2024面試高頻手撕題》、《前端求職突破計劃》 &#x1f35a; 藍橋云課簽約作者、…

JVM 學習計劃表(2025 版)

JVM 學習計劃表&#xff08;2025 版&#xff09; &#x1f4da; 基礎階段&#xff08;2 周&#xff09; 1. JVM 核心概念 ?JVM 作用與體系結構 理解 JVM 在 Java 跨平臺運行中的核心作用&#xff0c;掌握類加載子系統、運行時數據區、執行引擎的交互流程?內存結構與數據存…

進程內存分布--之理論知識

一個由C/C編譯的程序占用的內存分為以下幾個部分 &#xff1a; 1、棧區&#xff08;stack&#xff09;&#xff1a;由編譯器自動分配釋放 &#xff0c;存放函數調用函數的參數值&#xff0c;局部變量的值等。其操作方式類似于數據結構中的棧。 2、堆區&#xff08;heap&#xf…

WEB安全--內網滲透--LMNTLM基礎

一、前言 LM Hash和NTLM Hash是Windows系統中的兩種加密算法&#xff0c;不過LM Hash加密算法存在缺陷&#xff0c;在Windows Vista 和 Windows Server 2008開始&#xff0c;默認情況下只存儲NTLM Hash&#xff0c;LM Hash將不再存在。所以我們會著重分析NTLM Hash。 在我們內…

時尚優雅奢華品牌包裝徽標設計襯線英文字體安裝包 Kagea – Luxury Women Ligature Font

Kagea 是一種復雜的襯線字體&#xff0c;有常規和壓縮兩種樣式&#xff0c;具有 50 連字和替代字體&#xff0c;并支持多種語言。其精致的比例和別致的字體使其成為高端品牌、編輯布局、高檔包裝、時尚品牌和奢侈品設計的理想選擇。 包含內容&#xff1a; TTF、OTF 和 WOFF 格…

cut命令用法

cut 是 Linux/Unix 系統中一個用于按列提取文本內容的命令&#xff0c;常用于處理結構化文本&#xff08;如 CSV、日志、配置文件等&#xff09;。它通過分隔符、字符位置或字節位置來切割文本&#xff0c;提取指定部分。 核心功能 按字段&#xff08;列&#xff09;提取&#…

美國mlb與韓國mlb的關系·棒球9號位

MLB&#xff08;Major League Baseball&#xff0c;美國職業棒球大聯盟&#xff09;作為全球最高水平的職業棒球聯賽&#xff0c;與韓國市場流行的“MLB”時尚品牌之間存在著授權合作關系&#xff0c;但兩者在業務范疇和品牌定位上存在顯著差異。 一、品牌授權背景&#xff1a;…

從吉卜力漫畫到藝術創造:GPT-4o多種風格繪圖Prompt大全

在3月底&#xff0c;GPT-4o掀起了一陣吉卜力繪圖浪潮&#xff0c;大家紛紛輸入一張圖片&#xff0c;讓4o模型進行風格化遷移&#xff0c;其中吉卜力風格的漫畫在社交媒體上最為火熱。在大家爭議4o的訓練數據是否侵權和4o背后的技術原理的時候&#xff0c;我們先來玩一玩&#x…