一、Flink部署
1.1、JAVA環境
vi /etc/profile
export JAVA_HOME=/data/flinkcdc/jdk1.8.0_181
export CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
export PATH=$JAVA_HOME/bin:$PATHsource /etc/profilevi ~/.bash_profileexport FLINK_HOME=/data/flinkcdc/flink-1.17.0
export PATH=$PATH:$FLINK_HOME/binsource ~/.bash_profile
1.2、配置Flink
vim conf/flink-conf.yaml
添加配置:env.java.home=/data/flinkcdc/jdk1.8.0_181①、localhost 修改為IP地址
rest.port: 8088
rest.address: 192.168.33.231
②、關閉防火墻
systemctl status firewalld
systemctl stop firewalld
1.3、Flink CDC Jar包
CDC jar放到Flink安裝包解壓之后的lib目錄
1.4、啟動flink
bin/start-cluster.shFlink Web-UI
http://192.168.33.231:8088
1.5、啟動 Flink SQL CLI
bin/sql-client.sh
二、達夢數據庫搭建
2.1、docker dm8
docker run -d \
--name dm8 \
--restart=always \
--privileged=true \
-e LD_LIBRARY_PATH=/opt/dmdbms/bin \
-e PAGE_SIZE=16 \
-e EXTENT_SIZE=32 \
-e LOG_SIZE=1024 \
-e CASE_SENSITIVE=0 \
-e UNICODE_FLAG=1 \
-e INSTANCE_NAME=DM8_CDC \
-e SYSDBA_PWD=SYSDBA001 \
-v /docker/dm8_data_cdc:/opt/dmdbms/data \
-p 5236:5236 \
dm8_flinkcdc:dm8
查看容器運行情況
查看數據庫容器
lsof -i:5236
docker logs -f dm8
docker exec -it dm8 bash
2.2、開啟達夢日志歸檔
##查看當前數據庫是否開啟歸檔
select arch_mode from v$database;
##查詢有哪些歸檔日志
SELECT NAME , FIRST_TIME , NEXT_TIME , FIRST_CHANGE# , NEXT_CHANGE# FROM V$ARCHIVED_LOG;
SELECT * FROM V$ARCH_FILE##修改數據庫實例的 /dmdata/DAMEGN/dm.ini文件中 ARCH_INI 參數值
vi /dmdata/DAMENG/dm.ini
##將 ARCH_INI 值改為 1,保存后退出
ARCH_INI = 1 #開啟歸檔功能
RLOG_APPEND_LOGIC = 1##新增文件dmarch.ini
vi /dmdata/DAMENG/dmarch.ini
##新增如下內容
[ARCHIVE_LOCAL1]
ARCH_TYPE = LOCAL
ARCH_DEST = /dmarch
ARCH_FILE_SIZE = 2048
ARCH_SPACE_LIMIT = 102400##最后重啟數據庫完成歸檔配置#DaMeng Database Archive Configuration file
#this is commentsARCH_WAIT_APPLY = 0[ARCHIVE_LOCAL1]ARCH_TYPE = LOCALARCH_DEST = /opt/dmdbms/data/DAMENG/archARCH_FILE_SIZE = 1024ARCH_SPACE_LIMIT = 51200ARCH_FLUSH_BUF_SIZE = 0ARCH_HANG_FLAG = 1
?2.3、重啟dm8數據庫
docker restart dm8
三、實時同步測試
##達夢
CREATE TABLE t_source_dm (id INT,name VARCHAR,insert_date DATE,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'dm','startupOptions' = 'Initial','hostname' = '192.168.33.231','port' = '5236','username' = 'SYSDBA','password' = 'SYSDBA001','database' = 'DM8_CDC','schema' = 'SYSDBA','table' = 'dm_flinkcdc'
);##MYSQL
CREATE TABLE sink_mysql_test (id int,name STRING,insert_date date,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.33.231:3306/flinkcdc','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'YTP1101102233','table-name' = 'dm_to_mysql'
);insert into sink_mysql_test
select * from t_source_dm;
四、數據實時同步樣本(很多留言說沒有用,這里證實一下,不成功重點檢查達夢配置)?
4.1、代碼
/** @(#)FlinkDMCDC.java, 2023年8月5日 上午10:33:17** Copyright (c) 2000-2020, 達夢數據庫有限公司.* All rights reserved.*/
package com.dameng.flinkcdc.dm;import java.io.File;
import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.dm.source.DMSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkDMCDC
{public static void main(String[] args) throws Exception {Properties properties = new Properties();properties.setProperty("database.tablename.case.insensitive", "false");properties.setProperty("log.mining.strategy", "offline_catalog");properties.setProperty("log.mining.continuous.mine", "true");//properties.setProperty("provide.transaction.metadata", "true");properties.setProperty("lob.enabled", "true");JdbcIncrementalSource<String> changeEventSource = new DMSourceBuilder<String>().hostname("localhost").port(15236).databaseList("DAMENG").tableList("FLINK_CDC.CDC_TEST").schemaList("FLINK_CDC").username("SYSDBA").password("SYSDBA112233").startupOptions(StartupOptions.initial()).dmProperties(properties).includeSchemaChanges(true).deserializer(new JsonDebeziumDeserializationSchema()).sliceSize(20).scanNewlyAddedTableEnabled(true).build();Configuration configuration = new Configuration();//檢查點文件StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.enableCheckpointing(20 * 1000);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6*1000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.fromSource(changeEventSource, WatermarkStrategy.noWatermarks(), "DmSource").setParallelism(1).print()env.execute();}
}
4.1.1、從頭開始采集數據,運行結果
4.1.2、數據插入 結果:
4.1.3、數據更新 結果:
4.1.4、數據刪除 結果:
4.2、達夢實時同步CDC 轉換為SQL代碼(直接拿去用)
/** @(#)FlinkDMCDC.java, 2023年8月5日 上午10:33:17** Copyright (c) 2000-2020, 達夢數據庫有限公司.* All rights reserved.*/
package com.dameng.flinkcdc.dm;import java.io.File;
import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.dm.source.DMSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
/**** Created by wuxin on 2023年8月5日 上午10:33:17*/
public class FlinkDMCDCSQL
{public static void main(String[] args) throws Exception {Properties properties = new Properties();properties.setProperty("database.tablename.case.insensitive", "false");properties.setProperty("log.mining.strategy", "offline_catalog");properties.setProperty("log.mining.continuous.mine", "true");properties.setProperty("lob.enabled", "true");JdbcIncrementalSource<String> changeEventSource =new DMSourceBuilder<String>().hostname("localhost").port(15236).databaseList("DAMENG").tableList("FLINK_CDC.CDC_TEST").schemaList("FLINK_CDC").username("SYSDBA").password("SYSDBA112233").startupOptions(StartupOptions.initial()).dmProperties(properties).includeSchemaChanges(true).deserializer(new JsonDebeziumDeserializationSchema()).sliceSize(20).scanNewlyAddedTableEnabled(true).build();Configuration configuration = new Configuration();//檢查點文件StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.enableCheckpointing(20 * 1000);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6*1000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.setStateBackend(new FsStateBackend("file:///D:/tmp/ck"));DataStream<String> sourceStream = env.fromSource(changeEventSource,WatermarkStrategy.noWatermarks(),"DmSource");// 處理CDC數據并生成JSON格式的SQL語句DataStream<String> jsonStream = sourceStream.map(new MapFunction<String, String>() {private transient ObjectMapper objectMapper;private final JsonNodeFactory nodeFactory = JsonNodeFactory.instance;@Overridepublic String map(String value) throws Exception {if (objectMapper == null) {objectMapper = new ObjectMapper();}JsonNode rootNode = objectMapper.readTree(value);String op = rootNode.path("op").asText();JsonNode sourceNode = rootNode.path("source");String schema = sourceNode.path("schema").asText();String table = sourceNode.path("table").asText();long tsMs = rootNode.path("ts_ms").asLong();// 創建JSON結構ObjectNode json = nodeFactory.objectNode();// 1. 添加metadata部分ObjectNode metadata = json.putObject("metadata").put("schema", schema).put("table", table).put("source_timestamp", tsMs);// 處理時間戳字段JsonNode afterNode = rootNode.path("after");if (!afterNode.isMissingNode() && afterNode.has("TIMESTAMP")) {long timestampNs = afterNode.path("TIMESTAMP").asLong();metadata.put("event_time", formatTimestamp(timestampNs));} else {metadata.put("event_time", formatTimestamp(tsMs * 1000000L));}// 2. 根據操作類型生成SQL并添加到JSONString sql = "";switch (op) {case "r":sql = generateInsertSQL(schema + "." + table, afterNode);json.putObject("sql").put("insert", sql);break;case "c":sql = generateInsertSQL(schema + "." + table, afterNode);json.putObject("sql").put("insert", sql);break;case "u":JsonNode beforeNode = rootNode.path("before");sql = generateUpdateSQL(schema + "." + table, beforeNode, afterNode);json.putObject("sql").put("update", sql);break;case "d":JsonNode beforeNodeDelete = rootNode.path("before");sql = generateDeleteSQL(schema + "." + table, beforeNodeDelete);json.putObject("sql").put("delete", sql);break;default:json.put("error", "UNKNOWN OPERATION: " + op);}return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);}private String generateInsertSQL(String tableName, JsonNode afterNode) {StringBuilder fields = new StringBuilder();StringBuilder values = new StringBuilder();afterNode.fieldNames().forEachRemaining(fieldName -> {if (fields.length() > 0) {fields.append(", ");values.append(", ");}fields.append(fieldName);JsonNode valueNode = afterNode.get(fieldName);if (valueNode.isNull()) {values.append("NULL");} else if (valueNode.isTextual()) {values.append("'").append(escapeSQL(valueNode.asText())).append("'");} else {values.append(valueNode.asText());}});return String.format("INSERT INTO %s (%s) VALUES (%s)",tableName, fields.toString(), values.toString());}private String generateUpdateSQL(String tableName, JsonNode beforeNode, JsonNode afterNode) {StringBuilder setClause = new StringBuilder();StringBuilder whereClause = new StringBuilder();// 構建SET部分afterNode.fieldNames().forEachRemaining(fieldName -> {if (!fieldName.equals("ID")) { // 假設ID是主鍵,不更新if (setClause.length() > 0) {setClause.append(", ");}JsonNode valueNode = afterNode.get(fieldName);if (valueNode.isNull()) {setClause.append(fieldName).append(" = NULL");} else if (valueNode.isTextual()) {setClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else {setClause.append(fieldName).append(" = ").append(valueNode.asText());}}});// 構建WHERE部分(使用所有字段作為條件以確保準確性)beforeNode.fieldNames().forEachRemaining(fieldName -> {if (whereClause.length() > 0) {whereClause.append(" AND ");}JsonNode valueNode = beforeNode.get(fieldName);if (valueNode.isNull()) {whereClause.append(fieldName).append(" IS NULL");} else if (valueNode.isTextual()) {whereClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else {whereClause.append(fieldName).append(" = ").append(valueNode.asText());}});return String.format("UPDATE %s SET %s WHERE %s",tableName, setClause.toString(), whereClause.toString());}private String generateDeleteSQL(String tableName, JsonNode beforeNode) {StringBuilder whereClause = new StringBuilder();beforeNode.fieldNames().forEachRemaining(fieldName -> {if (whereClause.length() > 0) {whereClause.append(" AND ");}JsonNode valueNode = beforeNode.get(fieldName);if (valueNode.isNull()) {whereClause.append(fieldName).append(" IS NULL");} else if (valueNode.isTextual()) {whereClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else {whereClause.append(fieldName).append(" = ").append(valueNode.asText());}});return String.format("DELETE FROM %s WHERE %s", tableName, whereClause.toString());}private String escapeSQL(String value) {return value.replace("'", "''");}// 時間戳格式化方法private String formatTimestamp(long timestampNs) {long timestampMs = timestampNs / 1000000L;SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");return sdf.format(new Date(timestampMs));}});// 將SQL語句寫入文件jsonStream.writeAsText("D:\\tmp\\flink-cdc-sql-output.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);env.execute();}
}
?4.3、達夢實時同步CDC 推送Kafka,解耦代碼(直接拿去用)
/** @(#)FlinkDMCDC.java, 2023年8月5日 上午10:33:17** Copyright (c) 2000-2020, 達夢數據庫有限公司.* All rights reserved.*/
package com.dameng.flinkcdc.dm;import java.io.File;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.dm.source.DMSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;/**** Created by wuxin on 2023年8月5日 上午10:33:17*/
public class FlinkDMCDCSQLKafka
{public static void main(String[] args) throws Exception {Properties properties = new Properties();properties.setProperty("database.tablename.case.insensitive", "false");properties.setProperty("log.mining.strategy", "offline_catalog");properties.setProperty("log.mining.continuous.mine", "true");properties.setProperty("lob.enabled", "true");JdbcIncrementalSource<String> changeEventSource =new DMSourceBuilder<String>().hostname("localhost").port(15236).databaseList("DAMENG").tableList("FLINK_CDC.CDC_TEST").schemaList("FLINK_CDC").username("SYSDBA").password("SYSDBA112233").startupOptions(StartupOptions.initial()).dmProperties(properties).includeSchemaChanges(true).deserializer(new JsonDebeziumDeserializationSchema()).sliceSize(20).scanNewlyAddedTableEnabled(true).build();Configuration configuration = new Configuration();//檢查點文件StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.enableCheckpointing(20 * 1000);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6*1000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.setStateBackend(new FsStateBackend("file:///D:/tmp/ck"));DataStream<String> sourceStream = env.fromSource(changeEventSource,WatermarkStrategy.noWatermarks(),"DmSource");// 處理CDC數據并生成JSON格式的SQL語句DataStream<String> jsonStream = sourceStream.map(new MapFunction<String, String>() {private transient ObjectMapper objectMapper;private final JsonNodeFactory nodeFactory = JsonNodeFactory.instance;@Overridepublic String map(String value) throws Exception {if (objectMapper == null) {objectMapper = new ObjectMapper();}JsonNode rootNode = objectMapper.readTree(value);String op = rootNode.path("op").asText();JsonNode sourceNode = rootNode.path("source");String schema = sourceNode.path("schema").asText();String table = sourceNode.path("table").asText();long tsMs = rootNode.path("ts_ms").asLong();// 創建JSON結構ObjectNode json = nodeFactory.objectNode();// 1. 添加metadata部分ObjectNode metadata = json.putObject("metadata").put("schema", schema).put("table", table).put("source_timestamp", tsMs);// 處理時間戳字段JsonNode afterNode = rootNode.path("after");if (!afterNode.isMissingNode() && afterNode.has("TIMESTAMP")) {long timestampNs = afterNode.path("TIMESTAMP").asLong();metadata.put("event_time", formatTimestamp(timestampNs));} else {metadata.put("event_time", formatTimestamp(tsMs * 1000000L));}// 2. 根據操作類型生成SQL并添加到JSONString sql = "";switch (op) {case "r":sql = generateInsertSQL(schema + "." + table, afterNode);json.putObject("sql").put("dml", sql);break;case "c":sql = generateInsertSQL(schema + "." + table, afterNode);json.putObject("sql").put("dml", sql);break;case "u":JsonNode beforeNode = rootNode.path("before");sql = generateUpdateSQL(schema + "." + table, beforeNode, afterNode);json.putObject("sql").put("dml", sql);break;case "d":JsonNode beforeNodeDelete = rootNode.path("before");sql = generateDeleteSQL(schema + "." + table, beforeNodeDelete);json.putObject("sql").put("dml", sql);break;default:json.put("error", "UNKNOWN OPERATION: " + op);}// 配置ObjectMapper禁用美化打印(默認即為緊湊格式)
// ObjectMapper mapper = new ObjectMapper();
// String jsonString = mapper.writeValueAsString(json);return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);}private String generateInsertSQL(String tableName, JsonNode afterNode) {StringBuilder fields = new StringBuilder();StringBuilder values = new StringBuilder();afterNode.fieldNames().forEachRemaining(fieldName -> {if (fields.length() > 0) {fields.append(", ");values.append(", ");}fields.append(fieldName);JsonNode valueNode = afterNode.get(fieldName);if (valueNode.isNull()) {values.append("NULL");} else if (valueNode.isTextual()) {// 處理文本類型(含轉義)values.append("'").append(escapeSQL(valueNode.asText())).append("'");} else if (valueNode.isLong() && isTimestampField(fieldName)) {// 處理時間戳轉換(納秒/毫秒級)values.append("'").append(formatTimestamp(valueNode.asLong())).append("'");} else if (valueNode.isNumber()) {// 處理其他數字類型values.append(valueNode.asText());} else {// 默認處理(如布爾值等)values.append(valueNode.asText());}});return String.format("INSERT INTO %s (%s) VALUES (%s)",tableName, fields.toString(), values.toString());}private String generateUpdateSQL(String tableName, JsonNode beforeNode, JsonNode afterNode) {StringBuilder setClause = new StringBuilder();StringBuilder whereClause = new StringBuilder();// 構建SET部分afterNode.fieldNames().forEachRemaining(fieldName -> {if (!fieldName.equals("ID")) { // 假設ID是主鍵,不更新if (setClause.length() > 0) {setClause.append(", ");}JsonNode valueNode = afterNode.get(fieldName);if (valueNode.isNull()) {setClause.append(fieldName).append(" = NULL");} else if (valueNode.isTextual()) {setClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else if (valueNode.isLong() && isTimestampField(fieldName)) {// 處理時間戳轉換(納秒/毫秒級)setClause.append(fieldName).append(" = '").append(formatTimestamp(valueNode.asLong())).append("'");} else {setClause.append(fieldName).append(" = ").append(valueNode.asText());}}});// 構建WHERE部分(使用所有字段作為條件以確保準確性)beforeNode.fieldNames().forEachRemaining(fieldName -> {if (whereClause.length() > 0) {whereClause.append(" AND ");}JsonNode valueNode = beforeNode.get(fieldName);if (valueNode.isNull()) {whereClause.append(fieldName).append(" IS NULL");} else if (valueNode.isTextual()) {whereClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else if (valueNode.isLong() && isTimestampField(fieldName)) {// 處理時間戳轉換(納秒/毫秒級)whereClause.append(fieldName).append(" = '").append(formatTimestamp(valueNode.asLong())).append("'");} else {whereClause.append(fieldName).append(" = ").append(valueNode.asText());}});return String.format("UPDATE %s SET %s WHERE %s",tableName, setClause.toString(), whereClause.toString());}private String generateDeleteSQL(String tableName, JsonNode beforeNode) {StringBuilder whereClause = new StringBuilder();beforeNode.fieldNames().forEachRemaining(fieldName -> {if (whereClause.length() > 0) {whereClause.append(" AND ");}JsonNode valueNode = beforeNode.get(fieldName);if (valueNode.isNull()) {whereClause.append(fieldName).append(" IS NULL");} else if (valueNode.isTextual()) {whereClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else if (valueNode.isLong() && isTimestampField(fieldName)) {// 處理時間戳轉換(納秒/毫秒級)whereClause.append(fieldName).append(" = '").append(formatTimestamp(valueNode.asLong())).append("'");} else {whereClause.append(fieldName).append(" = ").append(valueNode.asText());}});return String.format("DELETE FROM %s WHERE %s", tableName, whereClause.toString());}// 判斷字段是否為時間戳字段(根據命名約定或業務邏輯)private boolean isTimestampField(String fieldName) {return fieldName.toLowerCase().contains("time") ||fieldName.toLowerCase().contains("date") ||fieldName.toLowerCase().contains("ts");}// 格式化時間戳(支持納秒/毫秒/秒級)private String formatTimestamp(long timestamp) {// 判斷時間戳精度(假設大于1e16為納秒,大于1e12為微秒,其余為毫秒/秒)if (timestamp > 1e16) {timestamp /= 1_000_000; // 納秒轉毫秒} else if (timestamp > 1e12) {timestamp /= 1_000; // 微秒轉毫秒} else if (timestamp < 1e10) {timestamp *= 1_000; // 秒轉毫秒}// 使用Java 8時間API格式化(線程安全)return Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));}// SQL特殊字符轉義(防止注入)private String escapeSQL(String input) {return input.replace("'", "''");}});// 創建Kafka SinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("172.30.139.111:19092,172.30.139.111:29092,172.30.139.111:39092,172.30.139.111:49092") // Kafka broker地址.setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("dm-cdc-topic") // Kafka主題名稱.setValueSerializationSchema(new SimpleStringSchema()) // 使用字符串序列化.build()).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 精確一次交付.setProperty("transaction.timeout.ms", "300000") // 設置為5分鐘(需小于Broker的15分鐘限制).setProperty("acks", "all") // 確保高可靠性.build();// 構建數據處理管道jsonStream.sinkTo(kafkaSink); // 將數據發送到Kafkaenv.execute();}
}
五、FlinkCDC 達夢數據庫 所需文件下載
5.1、所需Jar包
5.2、支持JAVA程序和SQL
?
5.3、完成程序和說明文檔下載地址
版本1:
https://download.csdn.net/download/ytp552200ytp/90103896
版本2(最新版本截止:2025-06-03)?
https://download.csdn.net/download/ytp552200ytp/91119461
如果實在沒有CSDN積分,后臺聯系我留下郵箱,我看到后私信發到郵箱,支持共享。
6、情況說明
我看很多說沒用,不增量啥的,核心原因是DM數據庫沒有配置好,按照上面的步驟去配置或查看文檔中的說明操作,保證DM數據庫歸檔日志正常,上面的代碼直接可以使用拿去測試吧。歸檔日志查詢SQL,查查核驗一下!!!
1、通過開啟歸檔日志 SQL查詢處理 (SCN 作為標記增量查詢)-- 查看所有歸檔日志文件信息
SELECT * FROM SYS.V$ARCHIVED_LOG;-- 或使用以下視圖查看歸檔文件詳細信息
SELECT * FROM SYS.V$ARCH_FILE;--添加文件
DBMS_LOGMNR.ADD_LOGFILE('./dmarch/ARCHIVE_LOCAL1_0x1873DFE0_EP0_2025-06-26_11-34-09.log')-- 或使用默認參數分析所有添加的日志
DBMS_LOGMNR.START_LOGMNR(OPTIONS => 2130);--查詢 日志明細
SELECT *
FROM V$LOGMNR_CONTENTSSELECT OPERATION_CODE, SCN, SQL_REDO, TIMESTAMP,SEG_OWNER, TABLE_NAME
FROM V$LOGMNR_CONTENTS
WHERE TABLE_NAME IS NOT NULL-- 查看特定表的操作
SELECT OPERATION_CODE, SCN, SQL_REDO, TIMESTAMP, SEG_OWNER, TABLE_NAME
FROM V$LOGMNR_CONTENTS
WHERE TABLE_NAME = 'YOUR_TABLE_NAME';-- 查看DDL操作
SELECT SQL_REDO FROM V$LOGMNR_CONTENTS
WHERE SQL_REDO LIKE '%CREATE%' OR SQL_REDO LIKE '%ALTER%' OR SQL_REDO LIKE '%DROP%';--結束日志分析
DBMS_LOGMNR.END_LOGMNR();