FlinkCDC 達夢數據庫實時同步

一、Flink部署

1.1、JAVA環境

vi /etc/profile
export JAVA_HOME=/data/flinkcdc/jdk1.8.0_181
export CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
export PATH=$JAVA_HOME/bin:$PATHsource /etc/profilevi ~/.bash_profileexport FLINK_HOME=/data/flinkcdc/flink-1.17.0
export PATH=$PATH:$FLINK_HOME/binsource ~/.bash_profile

1.2、配置Flink

vim conf/flink-conf.yaml
添加配置:env.java.home=/data/flinkcdc/jdk1.8.0_181①、localhost  修改為IP地址
rest.port: 8088
rest.address: 192.168.33.231
②、關閉防火墻
systemctl status firewalld
systemctl stop firewalld

1.3、Flink CDC Jar包

CDC jar放到Flink安裝包解壓之后的lib目錄

1.4、啟動flink

bin/start-cluster.shFlink Web-UI
http://192.168.33.231:8088

1.5、啟動 Flink SQL CLI

bin/sql-client.sh

二、達夢數據庫搭建

2.1、docker dm8

docker run -d \
--name dm8 \
--restart=always \
--privileged=true \
-e LD_LIBRARY_PATH=/opt/dmdbms/bin \
-e PAGE_SIZE=16 \
-e EXTENT_SIZE=32 \
-e LOG_SIZE=1024 \
-e CASE_SENSITIVE=0 \
-e UNICODE_FLAG=1 \
-e INSTANCE_NAME=DM8_CDC \
-e SYSDBA_PWD=SYSDBA001 \
-v /docker/dm8_data_cdc:/opt/dmdbms/data \
-p 5236:5236 \
dm8_flinkcdc:dm8

查看容器運行情況

查看數據庫容器
lsof -i:5236
docker logs -f  dm8
docker exec -it dm8 bash

2.2、開啟達夢日志歸檔

##查看當前數據庫是否開啟歸檔
select arch_mode from v$database;
##查詢有哪些歸檔日志
SELECT NAME , FIRST_TIME , NEXT_TIME , FIRST_CHANGE# , NEXT_CHANGE# FROM V$ARCHIVED_LOG; 
SELECT * FROM V$ARCH_FILE##修改數據庫實例的 /dmdata/DAMEGN/dm.ini文件中 ARCH_INI 參數值
vi /dmdata/DAMENG/dm.ini
##將 ARCH_INI 值改為 1,保存后退出
ARCH_INI = 1 #開啟歸檔功能
RLOG_APPEND_LOGIC = 1##新增文件dmarch.ini
vi /dmdata/DAMENG/dmarch.ini
##新增如下內容
[ARCHIVE_LOCAL1]
ARCH_TYPE = LOCAL
ARCH_DEST = /dmarch
ARCH_FILE_SIZE = 2048
ARCH_SPACE_LIMIT = 102400##最后重啟數據庫完成歸檔配置#DaMeng Database Archive Configuration file
#this is commentsARCH_WAIT_APPLY      = 0[ARCHIVE_LOCAL1]ARCH_TYPE            = LOCALARCH_DEST            = /opt/dmdbms/data/DAMENG/archARCH_FILE_SIZE       = 1024ARCH_SPACE_LIMIT     = 51200ARCH_FLUSH_BUF_SIZE  = 0ARCH_HANG_FLAG       = 1

?2.3、重啟dm8數據庫

docker restart dm8

三、實時同步測試

##達夢
CREATE TABLE t_source_dm (id INT,name VARCHAR,insert_date DATE,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'dm','startupOptions' = 'Initial','hostname' = '192.168.33.231','port' = '5236','username' = 'SYSDBA','password' = 'SYSDBA001','database' = 'DM8_CDC','schema' = 'SYSDBA','table' = 'dm_flinkcdc'
);##MYSQL
CREATE TABLE sink_mysql_test (id int,name STRING,insert_date date,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.33.231:3306/flinkcdc','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'YTP1101102233','table-name' = 'dm_to_mysql'
);insert into sink_mysql_test 
select * from t_source_dm;

四、數據實時同步樣本(很多留言說沒有用,這里證實一下,不成功重點檢查達夢配置)?

4.1、代碼

/** @(#)FlinkDMCDC.java, 2023年8月5日 上午10:33:17** Copyright (c) 2000-2020, 達夢數據庫有限公司.* All rights reserved.*/
package com.dameng.flinkcdc.dm;import java.io.File;
import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.dm.source.DMSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkDMCDC
{public static void main(String[] args) throws Exception {Properties properties = new Properties();properties.setProperty("database.tablename.case.insensitive", "false");properties.setProperty("log.mining.strategy", "offline_catalog");properties.setProperty("log.mining.continuous.mine", "true");//properties.setProperty("provide.transaction.metadata", "true");properties.setProperty("lob.enabled", "true");JdbcIncrementalSource<String> changeEventSource = new DMSourceBuilder<String>().hostname("localhost").port(15236).databaseList("DAMENG").tableList("FLINK_CDC.CDC_TEST").schemaList("FLINK_CDC").username("SYSDBA").password("SYSDBA112233").startupOptions(StartupOptions.initial()).dmProperties(properties).includeSchemaChanges(true).deserializer(new JsonDebeziumDeserializationSchema()).sliceSize(20).scanNewlyAddedTableEnabled(true).build();Configuration configuration = new Configuration();//檢查點文件StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.enableCheckpointing(20 * 1000);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6*1000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.fromSource(changeEventSource, WatermarkStrategy.noWatermarks(), "DmSource").setParallelism(1).print()env.execute();}
}

4.1.1、從頭開始采集數據,運行結果

4.1.2、數據插入 結果:

4.1.3、數據更新 結果:

4.1.4、數據刪除 結果:

4.2、達夢實時同步CDC 轉換為SQL代碼(直接拿去用)

/** @(#)FlinkDMCDC.java, 2023年8月5日 上午10:33:17** Copyright (c) 2000-2020, 達夢數據庫有限公司.* All rights reserved.*/
package com.dameng.flinkcdc.dm;import java.io.File;
import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.dm.source.DMSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
/**** Created by wuxin on 2023年8月5日 上午10:33:17*/
public class FlinkDMCDCSQL
{public static void main(String[] args) throws Exception {Properties properties = new Properties();properties.setProperty("database.tablename.case.insensitive", "false");properties.setProperty("log.mining.strategy", "offline_catalog");properties.setProperty("log.mining.continuous.mine", "true");properties.setProperty("lob.enabled", "true");JdbcIncrementalSource<String> changeEventSource =new DMSourceBuilder<String>().hostname("localhost").port(15236).databaseList("DAMENG").tableList("FLINK_CDC.CDC_TEST").schemaList("FLINK_CDC").username("SYSDBA").password("SYSDBA112233").startupOptions(StartupOptions.initial()).dmProperties(properties).includeSchemaChanges(true).deserializer(new JsonDebeziumDeserializationSchema()).sliceSize(20).scanNewlyAddedTableEnabled(true).build();Configuration configuration = new Configuration();//檢查點文件StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.enableCheckpointing(20 * 1000);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6*1000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.setStateBackend(new FsStateBackend("file:///D:/tmp/ck"));DataStream<String> sourceStream = env.fromSource(changeEventSource,WatermarkStrategy.noWatermarks(),"DmSource");// 處理CDC數據并生成JSON格式的SQL語句DataStream<String> jsonStream = sourceStream.map(new MapFunction<String, String>() {private transient ObjectMapper objectMapper;private final JsonNodeFactory nodeFactory = JsonNodeFactory.instance;@Overridepublic String map(String value) throws Exception {if (objectMapper == null) {objectMapper = new ObjectMapper();}JsonNode rootNode = objectMapper.readTree(value);String op = rootNode.path("op").asText();JsonNode sourceNode = rootNode.path("source");String schema = sourceNode.path("schema").asText();String table = sourceNode.path("table").asText();long tsMs = rootNode.path("ts_ms").asLong();// 創建JSON結構ObjectNode json = nodeFactory.objectNode();// 1. 添加metadata部分ObjectNode metadata = json.putObject("metadata").put("schema", schema).put("table", table).put("source_timestamp", tsMs);// 處理時間戳字段JsonNode afterNode = rootNode.path("after");if (!afterNode.isMissingNode() && afterNode.has("TIMESTAMP")) {long timestampNs = afterNode.path("TIMESTAMP").asLong();metadata.put("event_time", formatTimestamp(timestampNs));} else {metadata.put("event_time", formatTimestamp(tsMs * 1000000L));}// 2. 根據操作類型生成SQL并添加到JSONString sql = "";switch (op) {case "r":sql = generateInsertSQL(schema + "." + table, afterNode);json.putObject("sql").put("insert", sql);break;case "c":sql = generateInsertSQL(schema + "." + table, afterNode);json.putObject("sql").put("insert", sql);break;case "u":JsonNode beforeNode = rootNode.path("before");sql = generateUpdateSQL(schema + "." + table, beforeNode, afterNode);json.putObject("sql").put("update", sql);break;case "d":JsonNode beforeNodeDelete = rootNode.path("before");sql = generateDeleteSQL(schema + "." + table, beforeNodeDelete);json.putObject("sql").put("delete", sql);break;default:json.put("error", "UNKNOWN OPERATION: " + op);}return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);}private String generateInsertSQL(String tableName, JsonNode afterNode) {StringBuilder fields = new StringBuilder();StringBuilder values = new StringBuilder();afterNode.fieldNames().forEachRemaining(fieldName -> {if (fields.length() > 0) {fields.append(", ");values.append(", ");}fields.append(fieldName);JsonNode valueNode = afterNode.get(fieldName);if (valueNode.isNull()) {values.append("NULL");} else if (valueNode.isTextual()) {values.append("'").append(escapeSQL(valueNode.asText())).append("'");} else {values.append(valueNode.asText());}});return String.format("INSERT INTO %s (%s) VALUES (%s)",tableName, fields.toString(), values.toString());}private String generateUpdateSQL(String tableName, JsonNode beforeNode, JsonNode afterNode) {StringBuilder setClause = new StringBuilder();StringBuilder whereClause = new StringBuilder();// 構建SET部分afterNode.fieldNames().forEachRemaining(fieldName -> {if (!fieldName.equals("ID")) { // 假設ID是主鍵,不更新if (setClause.length() > 0) {setClause.append(", ");}JsonNode valueNode = afterNode.get(fieldName);if (valueNode.isNull()) {setClause.append(fieldName).append(" = NULL");} else if (valueNode.isTextual()) {setClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else {setClause.append(fieldName).append(" = ").append(valueNode.asText());}}});// 構建WHERE部分(使用所有字段作為條件以確保準確性)beforeNode.fieldNames().forEachRemaining(fieldName -> {if (whereClause.length() > 0) {whereClause.append(" AND ");}JsonNode valueNode = beforeNode.get(fieldName);if (valueNode.isNull()) {whereClause.append(fieldName).append(" IS NULL");} else if (valueNode.isTextual()) {whereClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else {whereClause.append(fieldName).append(" = ").append(valueNode.asText());}});return String.format("UPDATE %s SET %s WHERE %s",tableName, setClause.toString(), whereClause.toString());}private String generateDeleteSQL(String tableName, JsonNode beforeNode) {StringBuilder whereClause = new StringBuilder();beforeNode.fieldNames().forEachRemaining(fieldName -> {if (whereClause.length() > 0) {whereClause.append(" AND ");}JsonNode valueNode = beforeNode.get(fieldName);if (valueNode.isNull()) {whereClause.append(fieldName).append(" IS NULL");} else if (valueNode.isTextual()) {whereClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else {whereClause.append(fieldName).append(" = ").append(valueNode.asText());}});return String.format("DELETE FROM %s WHERE %s", tableName, whereClause.toString());}private String escapeSQL(String value) {return value.replace("'", "''");}// 時間戳格式化方法private String formatTimestamp(long timestampNs) {long timestampMs = timestampNs / 1000000L;SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");return sdf.format(new Date(timestampMs));}});// 將SQL語句寫入文件jsonStream.writeAsText("D:\\tmp\\flink-cdc-sql-output.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);env.execute();}
}

?4.3、達夢實時同步CDC 推送Kafka,解耦代碼(直接拿去用)

/** @(#)FlinkDMCDC.java, 2023年8月5日 上午10:33:17** Copyright (c) 2000-2020, 達夢數據庫有限公司.* All rights reserved.*/
package com.dameng.flinkcdc.dm;import java.io.File;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.dm.source.DMSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;/**** Created by wuxin on 2023年8月5日 上午10:33:17*/
public class FlinkDMCDCSQLKafka
{public static void main(String[] args) throws Exception {Properties properties = new Properties();properties.setProperty("database.tablename.case.insensitive", "false");properties.setProperty("log.mining.strategy", "offline_catalog");properties.setProperty("log.mining.continuous.mine", "true");properties.setProperty("lob.enabled", "true");JdbcIncrementalSource<String> changeEventSource =new DMSourceBuilder<String>().hostname("localhost").port(15236).databaseList("DAMENG").tableList("FLINK_CDC.CDC_TEST").schemaList("FLINK_CDC").username("SYSDBA").password("SYSDBA112233").startupOptions(StartupOptions.initial()).dmProperties(properties).includeSchemaChanges(true).deserializer(new JsonDebeziumDeserializationSchema()).sliceSize(20).scanNewlyAddedTableEnabled(true).build();Configuration configuration = new Configuration();//檢查點文件StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.enableCheckpointing(20 * 1000);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6*1000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.setStateBackend(new FsStateBackend("file:///D:/tmp/ck"));DataStream<String> sourceStream = env.fromSource(changeEventSource,WatermarkStrategy.noWatermarks(),"DmSource");// 處理CDC數據并生成JSON格式的SQL語句DataStream<String> jsonStream = sourceStream.map(new MapFunction<String, String>() {private transient ObjectMapper objectMapper;private final JsonNodeFactory nodeFactory = JsonNodeFactory.instance;@Overridepublic String map(String value) throws Exception {if (objectMapper == null) {objectMapper = new ObjectMapper();}JsonNode rootNode = objectMapper.readTree(value);String op = rootNode.path("op").asText();JsonNode sourceNode = rootNode.path("source");String schema = sourceNode.path("schema").asText();String table = sourceNode.path("table").asText();long tsMs = rootNode.path("ts_ms").asLong();// 創建JSON結構ObjectNode json = nodeFactory.objectNode();// 1. 添加metadata部分ObjectNode metadata = json.putObject("metadata").put("schema", schema).put("table", table).put("source_timestamp", tsMs);// 處理時間戳字段JsonNode afterNode = rootNode.path("after");if (!afterNode.isMissingNode() && afterNode.has("TIMESTAMP")) {long timestampNs = afterNode.path("TIMESTAMP").asLong();metadata.put("event_time", formatTimestamp(timestampNs));} else {metadata.put("event_time", formatTimestamp(tsMs * 1000000L));}// 2. 根據操作類型生成SQL并添加到JSONString sql = "";switch (op) {case "r":sql = generateInsertSQL(schema + "." + table, afterNode);json.putObject("sql").put("dml", sql);break;case "c":sql = generateInsertSQL(schema + "." + table, afterNode);json.putObject("sql").put("dml", sql);break;case "u":JsonNode beforeNode = rootNode.path("before");sql = generateUpdateSQL(schema + "." + table, beforeNode, afterNode);json.putObject("sql").put("dml", sql);break;case "d":JsonNode beforeNodeDelete = rootNode.path("before");sql = generateDeleteSQL(schema + "." + table, beforeNodeDelete);json.putObject("sql").put("dml", sql);break;default:json.put("error", "UNKNOWN OPERATION: " + op);}// 配置ObjectMapper禁用美化打印(默認即為緊湊格式)
//                ObjectMapper mapper = new ObjectMapper();
//                String jsonString = mapper.writeValueAsString(json);return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);}private String generateInsertSQL(String tableName, JsonNode afterNode) {StringBuilder fields = new StringBuilder();StringBuilder values = new StringBuilder();afterNode.fieldNames().forEachRemaining(fieldName -> {if (fields.length() > 0) {fields.append(", ");values.append(", ");}fields.append(fieldName);JsonNode valueNode = afterNode.get(fieldName);if (valueNode.isNull()) {values.append("NULL");} else if (valueNode.isTextual()) {// 處理文本類型(含轉義)values.append("'").append(escapeSQL(valueNode.asText())).append("'");} else if (valueNode.isLong() && isTimestampField(fieldName)) {// 處理時間戳轉換(納秒/毫秒級)values.append("'").append(formatTimestamp(valueNode.asLong())).append("'");} else if (valueNode.isNumber()) {// 處理其他數字類型values.append(valueNode.asText());} else {// 默認處理(如布爾值等)values.append(valueNode.asText());}});return String.format("INSERT INTO %s (%s) VALUES (%s)",tableName, fields.toString(), values.toString());}private String generateUpdateSQL(String tableName, JsonNode beforeNode, JsonNode afterNode) {StringBuilder setClause = new StringBuilder();StringBuilder whereClause = new StringBuilder();// 構建SET部分afterNode.fieldNames().forEachRemaining(fieldName -> {if (!fieldName.equals("ID")) { // 假設ID是主鍵,不更新if (setClause.length() > 0) {setClause.append(", ");}JsonNode valueNode = afterNode.get(fieldName);if (valueNode.isNull()) {setClause.append(fieldName).append(" = NULL");} else if (valueNode.isTextual()) {setClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else if (valueNode.isLong() && isTimestampField(fieldName)) {// 處理時間戳轉換(納秒/毫秒級)setClause.append(fieldName).append(" = '").append(formatTimestamp(valueNode.asLong())).append("'");} else {setClause.append(fieldName).append(" = ").append(valueNode.asText());}}});// 構建WHERE部分(使用所有字段作為條件以確保準確性)beforeNode.fieldNames().forEachRemaining(fieldName -> {if (whereClause.length() > 0) {whereClause.append(" AND ");}JsonNode valueNode = beforeNode.get(fieldName);if (valueNode.isNull()) {whereClause.append(fieldName).append(" IS NULL");} else if (valueNode.isTextual()) {whereClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else if (valueNode.isLong() && isTimestampField(fieldName)) {// 處理時間戳轉換(納秒/毫秒級)whereClause.append(fieldName).append(" = '").append(formatTimestamp(valueNode.asLong())).append("'");} else {whereClause.append(fieldName).append(" = ").append(valueNode.asText());}});return String.format("UPDATE %s SET %s WHERE %s",tableName, setClause.toString(), whereClause.toString());}private String generateDeleteSQL(String tableName, JsonNode beforeNode) {StringBuilder whereClause = new StringBuilder();beforeNode.fieldNames().forEachRemaining(fieldName -> {if (whereClause.length() > 0) {whereClause.append(" AND ");}JsonNode valueNode = beforeNode.get(fieldName);if (valueNode.isNull()) {whereClause.append(fieldName).append(" IS NULL");} else if (valueNode.isTextual()) {whereClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else if (valueNode.isLong() && isTimestampField(fieldName)) {// 處理時間戳轉換(納秒/毫秒級)whereClause.append(fieldName).append(" = '").append(formatTimestamp(valueNode.asLong())).append("'");} else {whereClause.append(fieldName).append(" = ").append(valueNode.asText());}});return String.format("DELETE FROM %s WHERE %s", tableName, whereClause.toString());}// 判斷字段是否為時間戳字段(根據命名約定或業務邏輯)private boolean isTimestampField(String fieldName) {return fieldName.toLowerCase().contains("time") ||fieldName.toLowerCase().contains("date") ||fieldName.toLowerCase().contains("ts");}// 格式化時間戳(支持納秒/毫秒/秒級)private String formatTimestamp(long timestamp) {// 判斷時間戳精度(假設大于1e16為納秒,大于1e12為微秒,其余為毫秒/秒)if (timestamp > 1e16) {timestamp /= 1_000_000; // 納秒轉毫秒} else if (timestamp > 1e12) {timestamp /= 1_000; // 微秒轉毫秒} else if (timestamp < 1e10) {timestamp *= 1_000; // 秒轉毫秒}// 使用Java 8時間API格式化(線程安全)return Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));}// SQL特殊字符轉義(防止注入)private String escapeSQL(String input) {return input.replace("'", "''");}});// 創建Kafka SinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("172.30.139.111:19092,172.30.139.111:29092,172.30.139.111:39092,172.30.139.111:49092") // Kafka broker地址.setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("dm-cdc-topic") // Kafka主題名稱.setValueSerializationSchema(new SimpleStringSchema()) // 使用字符串序列化.build()).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 精確一次交付.setProperty("transaction.timeout.ms", "300000") // 設置為5分鐘(需小于Broker的15分鐘限制).setProperty("acks", "all") // 確保高可靠性.build();// 構建數據處理管道jsonStream.sinkTo(kafkaSink); // 將數據發送到Kafkaenv.execute();}
}

五、FlinkCDC 達夢數據庫 所需文件下載

5.1、所需Jar包

5.2、支持JAVA程序和SQL

?

5.3、完成程序和說明文檔下載地址

版本1:

https://download.csdn.net/download/ytp552200ytp/90103896

版本2(最新版本截止:2025-06-03)?

https://download.csdn.net/download/ytp552200ytp/91119461

如果實在沒有CSDN積分,后臺聯系我留下郵箱,我看到后私信發到郵箱,支持共享。

6、情況說明

我看很多說沒用,不增量啥的,核心原因是DM數據庫沒有配置好,按照上面的步驟去配置或查看文檔中的說明操作,保證DM數據庫歸檔日志正常,上面的代碼直接可以使用拿去測試吧。歸檔日志查詢SQL,查查核驗一下!!!

1、通過開啟歸檔日志  SQL查詢處理 (SCN 作為標記增量查詢)-- 查看所有歸檔日志文件信息
SELECT * FROM SYS.V$ARCHIVED_LOG;-- 或使用以下視圖查看歸檔文件詳細信息
SELECT * FROM SYS.V$ARCH_FILE;--添加文件
DBMS_LOGMNR.ADD_LOGFILE('./dmarch/ARCHIVE_LOCAL1_0x1873DFE0_EP0_2025-06-26_11-34-09.log')-- 或使用默認參數分析所有添加的日志
DBMS_LOGMNR.START_LOGMNR(OPTIONS => 2130);--查詢 日志明細
SELECT *
FROM V$LOGMNR_CONTENTSSELECT OPERATION_CODE, SCN, SQL_REDO, TIMESTAMP,SEG_OWNER, TABLE_NAME 
FROM V$LOGMNR_CONTENTS
WHERE TABLE_NAME IS NOT NULL-- 查看特定表的操作
SELECT OPERATION_CODE, SCN, SQL_REDO, TIMESTAMP, SEG_OWNER, TABLE_NAME 
FROM V$LOGMNR_CONTENTS
WHERE TABLE_NAME = 'YOUR_TABLE_NAME';-- 查看DDL操作
SELECT SQL_REDO FROM V$LOGMNR_CONTENTS 
WHERE SQL_REDO LIKE '%CREATE%' OR SQL_REDO LIKE '%ALTER%' OR SQL_REDO LIKE '%DROP%';--結束日志分析
DBMS_LOGMNR.END_LOGMNR();

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/922077.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/922077.shtml
英文地址,請注明出處:http://en.pswp.cn/news/922077.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Eip開源主站EIPScanner在Linux上的調試記錄(二 多生產者連接)

目錄 一、背景 二、可行性驗證 三、開發調試 一、背景 在一般場景下&#xff0c;只需一路IO連接&#xff0c;但稍微復雜的場景&#xff0c;就需要不同通訊周期的連接&#xff0c;這就需要有多組IO連接。 而大于一組的連接調試方法是一樣的&#xff0c;因此主要解決2組連接的…

Oracle APEX 利用卡片實現翻轉(方法二)

目錄 0. 以 Oracle 的標準示例表 EMP 為例&#xff0c;實現卡片翻轉 1. 創建卡片區域 (Cards Region) 2. 定義卡片的 HTML 結構 3. 添加 CSS 實現樣式和翻轉動畫 4. 創建動態操作觸發翻轉 5. 運行效果 0. 以 Oracle 的標準示例表 EMP 為例&#xff0c;實現卡片翻轉 目標如…

低代碼拖拽實現與bpmn-js詳解

低代碼平臺中的可視化拖拽功能是其核心魅力所在&#xff0c;它讓構建應用變得像搭積木一樣直觀。下面我將為你梳理其實現原理&#xff0c;并詳細介紹 vue-draggable 這個常用工具。 &#x1f9f1; 一、核心架構&#xff1a;三大區域與數據驅動 低代碼編輯器界面通常分為三個核心…

【科研繪圖系列】R語言繪制模型預測與數據可視化

禁止商業或二改轉載,僅供自學使用,侵權必究,如需截取部分內容請后臺聯系作者! 文章目錄 介紹 加載R包 數據下載 函數 導入數據 數據預處理 畫圖 總結 系統信息 介紹 本文介紹了一種利用R語言進行海洋微生物群落動態分析的方法,該方法通過構建多個統計模型來預測不同環境…

TODO的面試(dw三面、sqb二面、ks二面)

得物的前端三面&#xff08;通常是技術終面&#xff09;會深入考察你的技術深度、項目經驗、解決問題的思路以及職業素養。下面我結合搜索結果&#xff0c;為你梳理一份得物前端三面的常問問題及詳解&#xff0c;希望能助你一臂之力。 &#x1f9e0; 得物前端三面常問問題及詳解…

開發 PHP 擴展新途徑 通過 FrankenPHP 用 Go 語言編寫 PHP 擴展

通過 FrankenPHP 用 Go 語言編寫 PHP 擴展 在 PHPVerse 2025 大會上&#xff08;JetBrains 為紀念 PHP 語言 30 周年而組織的會議&#xff09;&#xff0c;FrankenPHP 開發者 Kvin Dunglas 做了一個開創性的宣布&#xff1a;通過 FrankenPHP&#xff0c;可以使用 Go 語言創建 …

完美解決:應用版本更新,增加字段導致 Redis 舊數據反序列化報錯

完美解決&#xff1a;應用版本更新&#xff0c;增加字段導致 Redis 舊數據反序列化報錯 前言 在敏捷開發和快速迭代的今天&#xff0c;我們經常需要為現有的業務模型增加新的字段。但一個看似簡單的操作&#xff0c;卻可能給正在穩定運行的系統埋下“地雷”。 一個典型的場景是…

66-python中的文件操作

1. 文件的編碼 UTF-8 GBK GB2312 Big5 GB18030 2. 文件讀取 文件操作步驟: 打開文件 讀\寫文件 關閉文件 open(name,mode,encoding) name:文件名字符串 “D:/haha.txt” mode: 只讀、寫入、追加 r:以只讀方式打開 w: 只用于寫 a :用于追加 encoding:編碼方式 # -*- coding: utf…

FPGA實例源代碼集錦:27個實戰項目

本文還有配套的精品資源&#xff0c;點擊獲取 簡介&#xff1a;FPGA是一種可編程邏輯器件&#xff0c;允許用戶根據需求配置硬件功能。本壓縮包提供27個不同的FPGA應用實例源代碼&#xff0c;旨在幫助初學者深入學習FPGA設計&#xff0c;并為專業工程師提供靈感。內容涵蓋了…

基于 Vue+Mapbox 的智慧礦山可視化功能的技術拆解

01、項目背景 在全球礦業加速向 “高端化、智能化、綠色化” 轉型的浪潮下&#xff0c;傳統礦業面臨的深地開采難題、效率瓶頸與安全隱患日益凸顯。 在礦業轉型的迫切需求與政策、技術支撐的背景下依托 GIS 技術&#xff0c;開展了 “中國智礦” GIS 開發項目&#xff0c;旨在…

進程狀態(Linux)

進程狀態Linux進程狀態Linux進程狀態進程描述R運行狀態S睡眠狀態D磁盤休眠狀態T停止狀態t被追蹤狀態(調試狀態)X死亡狀態Z僵死狀態其實大致也就可以分為三種運行&#xff0c;阻塞&#xff0c;掛起。運行狀態每個cpu里都有一個運行隊列&#xff0c;進程在運行隊列里&#xff0c;…

物聯網領域中PHP框架的最佳選擇有哪些?

物聯網&#xff08;IoT&#xff09;作為近年來快速發展的技術領域&#xff0c;已經滲透到智能家居、工業自動化、智慧城市等方方面面。作為Web開發中廣泛使用的語言&#xff0c;PHP憑借其易學易用、開發效率高和生態豐富的特點&#xff0c;也在物聯網領域找到了用武之地。 本文…

java反射(詳細教程)

我們平常創建類的實例并調用類中成員需要建立在一個前提下&#xff0c;就是已經知道類名和類中成員的信息&#xff0c;靈活性大大降低。甚至在一些項目中還需要修改源碼來滿足使用條件&#xff0c;大大降低了操作的靈活性。Java 反射&#xff08;Reflection&#xff09;是 Java…

消息隊列-初識kafka

優缺點 消息隊列的優點&#xff1a; 實現系統解耦&#xff1a; :::color5 系統解耦解釋 有 MQ 時是 “服務 A 發消息到隊列&#xff0c;其他服務從隊列拿消息&#xff0c;新增服務接隊列就行”&#xff1b;無 MQ 時是 “服務 A 直接調其他服務的接口 / 依賴&#xff0c;新增 / …

實踐《數字圖像處理》之Canny邊緣檢測、霍夫變換與主動二值化處理在短線段清除應用中的實踐

在最近的圖像處理項目中&#xff0c;其中一個環節&#xff1a;圖片中大量短線&#xff08;不是噪聲&#xff09;&#xff0c;需要在下一步處理前進行清除。在確定具體實現時&#xff0c;碰到了Canny邊緣檢測、霍夫變換與主動二值化處理的辯證使用&#xff0c;相關邏輯從圖片灰度…

vue3與ue5通信-工具類

工具 ue5-simple.js /*** UE5 通信工具* 兩個核心方法&#xff1a;發送消息和接收消息*/// 確保全局對象存在 if (typeof window ! undefined) {window.ue window.ue || {};window.ue.interface window.ue.interface || {}; }/*** 生成 UUID*/ function generateUUID() {retu…

在kotlin中如何使用像java中的static

在 Kotlin 中&#xff0c;沒有直接的 static 關鍵字&#xff0c;但有幾種等效的方式來實現 Java 中靜態成員的功能&#xff1a; 1. 伴生對象 (Companion Object) - 最常用 class MyClass {companion object {// 靜態常量const val STATIC_CONSTANT "constant value"…

如何在 Spring Boot 中指定不同的配置文件?

介紹 Spring Boot 提供了多種方式來管理和加載配置文件&#xff0c;特別是在多環境配置下&#xff0c;比如開發、測試和生產環境。通過指定不同的配置文件&#xff0c;可以靈活地調整應用程序的行為&#xff0c;以適應不同的需求。本文將介紹在 Spring Boot 中如何指定使用不同…

在centOS源碼編譯方式安裝MySQL5.7

一、前言 在生產環境中部署數據庫時&#xff0c;很多人會選擇直接使用 yum/apt 包管理器 安裝 MySQL&#xff0c;這樣簡單快速&#xff0c;但缺點是版本受限&#xff0c;靈活性不足。對于需要指定版本、啟用特定編譯參數或優化的場景&#xff0c;源碼編譯安裝 MySQL 就顯得非常…

探討Hyperband 等主要機器學習調優方法的機制和權衡

本篇文章Master Hyperband — An Efficient Hyperparameter Tuning Method in Machine Learning深入探討了Hyperband這一高效的超參數調優方法。文章的技術亮點在于其結合了多臂老虎機策略和逐次減半算法&#xff0c;能夠在大搜索空間中快速剔除表現不佳的配置&#xff0c;從而…