使用Flink CDC實現 Oracle數據庫數據同步(非SQL)

文章目錄

  • 前言
  • 一、開啟歸檔日志
  • 二、創建flinkcdc專屬用戶
    • 2.1 對于Oracle 非CDB數據庫,執行如下sql
    • 2.2 對于Oracle CDB數據庫,執行如下sql
  • 三、指定oracle表、庫級啟用
  • 四、使用flink-connector-oracle-cdc實現數據庫同步
    • 4.1 引入pom依賴
    • 4.1 Java主代碼
    • 4.1 json轉換為row


前言

Flink CDC 是一個基于流的數據集成工具,旨在為用戶提供一套功能更加全面的編程接口(API)。 該工具使得用戶能夠以 YAML 配置文件的形式實現數據庫同步,同時也提供了Flink CDC Source Connector API。 Flink CDC 在任務提交過程中進行了優化,并且增加了一些高級特性,如表結構變更自動同步(Schema Evolution)、數據轉換(Data Transformation)、整庫同步(Full Database Synchronization)以及 精確一次(Exactly-once)語義。
本文通過flink-connector-oracle-cdc來實現Oracle數據庫的數據同步。


一、開啟歸檔日志

1)數據庫服務器終端,使用sysdba角色連接數據庫

 sqlplus / as sysdba
或
sqlplus /nolog
CONNECT sys/password AS SYSDBA;

2)檢查歸檔日志是否開啟

archive log list;

(“Database log mode: No Archive Mode”,日志歸檔未開啟)
(“Database log mode: Archive Mode”,日志歸檔已開啟)
3)啟用歸檔日志

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

注意:
啟用歸檔日志需要重啟數據庫。
歸檔日志會占用大量的磁盤空間,應定期清除過期的日志文件
4)啟動完成后重新執行 archive log list; 查看歸檔打開狀態

二、創建flinkcdc專屬用戶

2.1 對于Oracle 非CDB數據庫,執行如下sql

  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;GRANT CREATE SESSION TO flinkuser;GRANT SET CONTAINER TO flinkuser;GRANT SELECT ON V_$DATABASE to flinkuser;GRANT FLASHBACK ANY TABLE TO flinkuser;GRANT SELECT ANY TABLE TO flinkuser;GRANT SELECT_CATALOG_ROLE TO flinkuser;GRANT EXECUTE_CATALOG_ROLE TO flinkuser;GRANT SELECT ANY TRANSACTION TO flinkuser;GRANT LOGMINING TO flinkuser;GRANT ANALYZE ANY TO flinkuser;GRANT CREATE TABLE TO flinkuser;-- need not to execute if set scan.incremental.snapshot.enabled=true(default)GRANT LOCK ANY TABLE TO flinkuser;GRANT ALTER ANY TABLE TO flinkuser;GRANT CREATE SEQUENCE TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;GRANT SELECT ON V_$LOG TO flinkuser;GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;GRANT SELECT ON V_$LOGFILE TO flinkuser;GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

2.2 對于Oracle CDB數據庫,執行如下sql

  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;GRANT LOGMINING TO flinkuser CONTAINER=ALL;GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;-- need not to execute if set scan.incremental.snapshot.enabled=true(default)GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;

三、指定oracle表、庫級啟用

-- 指定表啟用補充日志記錄:
ALTER TABLE databasename.tablename ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;-- 為數據庫的所有表啟用
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;-- 指定數據庫啟用補充日志記錄
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

四、使用flink-connector-oracle-cdc實現數據庫同步

4.1 引入pom依賴

 <dependency><groupId>com.ververica</groupId><artifactId>flink-connector-oracle-cdc</artifactId><version>2.4.0</version></dependency>

4.1 Java主代碼

package test.datastream.cdc.oracle;import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.types.Row;
import test.datastream.cdc.oracle.function.CacheDataAllWindowFunction;
import test.datastream.cdc.oracle.function.CdcString2RowMap;
import test.datastream.cdc.oracle.function.DbCdcSinkFunction;import java.util.Properties;public class OracleCdcExample {public static void main(String[] args) throws Exception {Properties properties = new Properties();//數字類型數據 轉換為字符properties.setProperty("decimal.handling.mode", "string");SourceFunction<String> sourceFunction = OracleSource.<String>builder()
//                .startupOptions(StartupOptions.latest()) // 從最晚位點啟動.url("jdbc:oracle:thin:@localhost:1521:orcl").port(1521).database("ORCL") // monitor XE database.schemaList("c##flink_user") // monitor inventory schema.tableList("c##flink_user.TEST2") // monitor products table.username("c##flink_user").password("flinkpw").debeziumProperties(properties).deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message orderingSingleOutputStreamOperator<Row> mapStream = source.flatMap(new CdcString2RowMap());SingleOutputStreamOperator<Row[]> winStream = mapStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new CacheDataAllWindowFunction());//批量同步winStream.addSink(new DbCdcSinkFunction(null));env.execute();}
}

4.1 json轉換為row

package test.datastream.cdc.oracle.function;import cn.com.victorysoft.common.configuration.VsConfiguration;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import test.datastream.cdc.CdcConstants;import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;/*** @desc cdc json解析,并轉換為Row*/
public class CdcString2RowMap extends RichFlatMapFunction<String, Row> {private Map<String,Integer> columnMap =new HashMap<>();@Overridepublic void open(Configuration parameters) throws Exception {columnMap.put("ID",0);columnMap.put("NAME",1);columnMap.put("DESCRIPTION",2);columnMap.put("AGE",3);columnMap.put("CREATE_TIME",4);columnMap.put("SCORE",5);columnMap.put("C_1",6);columnMap.put("B_1",7);}@Overridepublic void flatMap(String s, Collector<Row> collector) throws Exception {System.out.println("receive: "+s);VsConfiguration conf=VsConfiguration.from(s);String op = conf.getString(CdcConstants.K_OP);VsConfiguration before = conf.getConfiguration(CdcConstants.K_BEFORE);VsConfiguration after = conf.getConfiguration(CdcConstants.K_AFTER);Row row =null;if(CdcConstants.OP_C.equals(op)){//插入,使用after數據row = convertToRow(after);row.setKind(RowKind.INSERT);}else if(CdcConstants.OP_U.equals(op)){//更新,使用after數據row = convertToRow(after);row.setKind(RowKind.UPDATE_AFTER);}else if(CdcConstants.OP_D.equals(op)){//刪除,使用before數據row = convertToRow(before);row.setKind(RowKind.DELETE);}else {//r 操作,使用after數據row = convertToRow(after);row.setKind(RowKind.INSERT);}collector.collect(row);}private Row convertToRow(VsConfiguration data){Set<String> keys = data.getKeys();int size = keys.size();Row row=new Row(8);int i=0;for (String key:keys) {Integer index = this.columnMap.get(key);Object value=data.get(key);if(key.equals("CREATE_TIME")){//long日期轉timestampvalue=long2Timestamp((Long)value);}row.setField(index,value);}return row;}private static  java.sql.Timestamp long2Timestamp(Long time){Timestamp timestamp = new Timestamp(time/1000);System.out.println(timestamp);return timestamp;}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/36965.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/36965.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/36965.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Docker Desktop 簡易操作指南 (Windows, macOS, Linux)

1. 下載最新版本 Docker Desktop https://www.docker.com/products/docker-desktop/ 2.啟動 Docker Desktop 3.常用命令&#xff08;在 cmd 或 Terminal 中執行&#xff09; #列出所有鏡像&#xff08;Images&#xff09; docker images #列出所有容器&#xff08;Containers&…

OpenSSL/3.3.0: error:0A00018A:SSL routines::dh key too small

php curl解決辦法: curl_setopt($ch, CURLOPT_SSL_CIPHER_LIST, ‘DEFAULTSECLEVEL1’); python 解決辦法: from twisted.internet.ssl import AcceptableCiphers from scrapy.core.downloader import contextfactory contextfactory.DEFAULT_CIPHERS AcceptableCiphers.from…

CSS 核心知識點 - grid

思維導圖 參考網址: https://developer.mozilla.org/zh-CN/docs/Web/CSS/CSS_grid_layout 一、什么是 grid&#xff1f; CSS Grid布局是在CSS3規范中引入的一種新的布局方式&#xff0c;旨在解決傳統布局方法&#xff08;如浮動、定位、表格布局&#xff09;存在的許多問題。C…

Spring Boot 集成 MyBatis-Plus 總結

Spring Boot 集成 MyBatis-Plus 總結 大家好&#xff0c;我是免費搭建查券返利機器人省錢賺傭金就用微賺淘客系統3.0的小編&#xff0c;也是冬天不穿秋褲&#xff0c;天冷也要風度的程序猿&#xff01; 在Java開發中&#xff0c;Spring Boot以其簡潔和高效的特點&#xff0c;…

Oh My Zsh Git 插件

以下是一些常見的別名和它們對應的 Git 命令&#xff1a; g: gitga: git addgaa: git add --allgapa: git add --patchgau: git add --updategb: git branchgba: git branch -agbd: git branch -dgbda: git branch --no-color --merged | command grep -vE “^(||*|\s*(main|m…

第十九站:Java鈦藍——區塊鏈技術的新探索

在區塊鏈技術的新探索中&#xff0c;Java作為一門成熟的編程語言&#xff0c;正在通過Hyperledger Fabric和Web3j等技術實現其在區塊鏈領域的應用。以下是對這些技術的簡要介紹和如何使用Java源代碼與它們進行交互的講解。 Hyperledger Fabric Hyperledger Fabric是一個由Lin…

React.js 全面解析:從基礎到實戰案例

引言&#xff1a; React.js&#xff0c;由Facebook推出并維護的開源JavaScript庫&#xff0c;以其組件化思想、虛擬DOM技術和聲明式編程風格&#xff0c;成為構建用戶界面的首選工具之一。本文將系統性地介紹React的基礎概念、核心特性&#xff0c;并通過實際案例展示基礎屬性…

DataWhale-吃瓜教程學習筆記(四)

學習視頻&#xff1a;第3章-二分類線性判別分析_嗶哩嗶哩_bilibili 西瓜書對應章節&#xff1a; 3.4 文章目錄 - 算法原理- 損失函數推導-- 異類樣本中心盡可能遠-- 同類樣本方差盡可能小-- 綜合 知識點補充 - 二范數二范數&#xff08;2-norm&#xff09;詳解定義幾何意義性質…

vue3中省市區聯動在同一個el-form-item中咋么設置rules驗證都不為空的效果

在開發中出現如下情況&#xff0c;在同一個el-form-item設置了省市區三級聯動的效果 <el-form-item label"地區" prop"extraProperties.Province"><el-row :gutter"20"><el-col :span"12"><el-select v-model&qu…

OpenHarmony開發實戰:HDF驅動開發流程

概述 HDF&#xff08;Hardware Driver Foundation&#xff09;驅動框架&#xff0c;為驅動開發者提供驅動框架能力&#xff0c;包括驅動加載、驅動服務管理、驅動消息機制和配置管理。并以組件化驅動模型作為核心設計思路&#xff0c;讓驅動開發和部署更加規范&#xff0c;旨在…

Unity3D Excel表格數據處理模塊詳解

一、引言 在Unity3D開發中&#xff0c;我們經常需要處理大量的數據&#xff0c;這些數據可能是游戲配置、角色屬性、道具信息等。Excel表格作為一種常見的數據存儲方式&#xff0c;具有結構清晰、易于編輯的特點&#xff0c;因此被廣泛應用于游戲開發中。本文將詳細介紹如何在…

四川赤橙宏海商務信息咨詢有限公司抖音開店靠譜嗎?

在數字化浪潮席卷全球的今天&#xff0c;電商行業正以前所未有的速度發展。而在這個大潮中&#xff0c;四川赤橙宏海商務信息咨詢有限公司憑借其專業的團隊和前瞻性的戰略眼光&#xff0c;專注于抖音電商服務&#xff0c;為廣大商家提供了一站式解決方案&#xff0c;成為了行業…

面經-常用框架

1.Spring 1.1什么是Spring框架&#xff1f; Spring 是?種輕量級開發框架&#xff0c;旨在提?開發?員的開發效率以及系統的可維護性。 Spring 的 6 個特征:核?技術&#xff0c;測試&#xff0c;數據訪問&#xff0c;Web?持&#xff0c;集成&#xff0c;語? 1.2列舉?些重…

Ubuntu20.04安裝LibTorch并完成高斯濺射環境搭建

0. 簡介 最近受到優刻得的使用邀請&#xff0c;正好解決了我在大模型和自動駕駛行業對GPU的使用需求。UCloud云計算旗下的Compshare的GPU算力云平臺。他們提供高性價比的4090 GPU&#xff0c;按時收費每卡2.6元&#xff0c;月卡只需要1.7元每小時&#xff0c;并附帶200G的免費…

接口自動化測試-項目實戰

什么是接口自動化測試&#xff1a;使用工具或代碼代替人對接口進行測試 測試項目結構&#xff08;python包&#xff09; 1、接口api包 2、script:業務腳本 3、data:數據 4、config.py :配置文件 5、reporter:報告 錯誤問題&#xff1a; 1、未打印任何東西。添加pip ins…

走馬燈封裝

走馬燈功能需求&#xff1a; 支持定時切換&#xff1b;支持左右按鈕切換&#xff08;根據鼠標是否在切換組件內展示和隱藏左右切換按鈕&#xff09;&#xff1b;支持底部標識切換&#xff1b; 走馬燈 完整代碼如下&#xff1a; /*** class 走馬燈*/import react, { Compone…

C語言 指針——緩沖區溢出與緩沖區溢出攻擊

目錄 緩沖區溢出攻擊 緩沖區溢出攻擊實例 字符串的安全輸入方法?編輯 防止緩沖區溢出的兩個要點 緩沖區溢出攻擊 網絡黑客常針對系統和程序自身存在的漏洞&#xff0c;編寫相應的攻擊程序 ? 對緩沖區溢出漏洞的攻擊 —— 最常見 ? 幾乎占到了網絡攻擊次數的一半以上…

Android (已解決)Gradle 編譯失敗 Unsupported class file major version 61

文章目錄 一、報錯原因二、解決方法 一、報錯原因 新版本的 Android Studio 默認使用的是 Java 17 LTS&#xff0c;而這個歷史項目的 Gradle 版本很低&#xff0c;不支持高版本的 Java。 具體原因&#xff1a;Java 17 (major version 61) 編譯的 class 文件&#xff0c;如果在…

逆向學習匯編篇:指令的操作

本節課在線學習視頻&#xff08;網盤地址&#xff0c;保存后即可免費觀看&#xff09;&#xff1a; ??https://pan.quark.cn/s/660c759dea95?? 在逆向工程中&#xff0c;深入理解匯編語言的指令操作是至關重要的。匯編指令是計算機硬件與軟件之間的橋梁&#xff0c;它們直…

DevEco Studio有時會多出來.js和.map文件,導致項目不能運行

1、問題 在使用DevEco的時候有時候會出現啥都沒干&#xff0c;但是在項目的目錄下會自動生成和文件同名的.js和.js.map文件&#xff0c;至于為什么會生成目前我也不知道&#xff0c;如果想要更深了解可以到論壇討論&#xff1a;華為開發者論壇。生成.js和.js.map文件優…