使用ogg實現oracle到kafka的增量數據實時同步

Oracle Golden Gate軟件是一種基于日志的結構化數據復制備份軟件,它通過解析源數據庫在線日志或歸檔日志獲得數據的增量變化,再將這些變化應用到目標數據庫,從而實現源數據庫與目標數據庫同步。

0、本篇中源端和目標端的一些配置信息:

-版本OGG版本id地址
源端Oracle11gR2Oracle GoldenGate 11.2.1.0.1 for Oracle on Linux x86-64Carlota3
目標端kafka_2.12-2.5.0Linux x86-64上的Oracle GoldenGate for Big Data 19.1.0.0.1Carlota2

源端和目標端的文件不一樣,目標端需要下載Oracle GoldenGate for Big Data,源端需要下載Oracle GoldenGate for Oracle!

PS:源端是安裝好了Oracle的機器,目標端是安裝好了Kafka的機器,二者環境變量之前都配置好了。

1、源端OGG安裝

  • 先建立ogg目錄

    mkdir -p /opt/ogg
    
  • 解壓zip文件

    unzip ogg112101_fbo_ggs_Linux_x64_ora11g_64bit.zip
    
  • 解壓后得到一個tar包,再解壓這個tar

    tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg
    
  • 使oracle用戶有ogg的權限,后面有些需要在oracle用戶下執行才能成功

    chown -R oracle:oinstall /data/ogg 
    
  • 配置OGG環境變量

    vim /etc/profile
    

    export OGG_HOME=/opt/ogg
    export LD_LIBRARY_PATH=ORACLEHOME/lib:/usr/libexportPATH=ORACLE_HOME/lib:/usr/lib export PATH=ORACLEH?OME/lib:/usr/libexportPATH=OGG_HOME:$PATH

  • source /etc/profile
    

2、目標端OGG安裝

  • 先建立ogg目錄

    mkdir -p /data/ogg
    
  • 解壓zip文件

    unzip OGG_BigData_Linux_x64_19.1.0.0.1.zip
    
  • 解壓后得到一個tar包,再解壓這個tar

    tar xf OGG_BigData_Linux_x64_19.1.0.0.1.tar
    
  • 使oracle用戶有ogg的權限,后面有些需要在oracle用戶下執行才能成功

    chown -R oracle:oinstall /data/ogg 
    
  • 配置OGG環境變量

    vim /etc/profile
    

    export OGG_HOME=/opt/ogg
    export LD_LIBRARY_PATH=JAVAHOME/jre/lib/amd64:JAVA_HOME/jre/lib/amd64:JAVAH?OME/jre/lib/amd64:JAVA_HOME/jre/lib/amd64/server:JAVAHOME/jre/lib/amd64/libjsig.so:JAVA_HOME/jre/lib/amd64/libjsig.so:JAVAH?OME/jre/lib/amd64/libjsig.so:JAVA_HOME/jre/lib/amd64/server/libjvm.so:OGGHOME/libexportPATH=OGG_HOME/lib export PATH=OGGH?OME/libexportPATH=OGG_HOME:$PATH

  • source /etc/profile
    
  • ggsci
    
  • create subdirs
    

3、源端Oracle歸檔模式設置

  • 登陸Oracle用戶

    su - oracle
    
  • 登陸Oracle

    sqlplus / as sysdba
    
  • 查看當前是否為歸檔模式(若為Disabled,則需手動打開)

    archive log list 
    

    Database log mode

    No Archive Mode Automatic archival

    Disabled Archive destination

    USE_DB_RECOVERY_FILE_DEST Oldest online log sequence 12

    Current log sequence 14

  • 立即關閉數據庫

    shutdown immediate 
    
  • 啟動實例并加載數據庫,但不打開

    startup mount 
    
  • 更改數據庫為歸檔模式

    alter database archivelog; 
    
  • 打開數據庫

    alter database open;
    
  • 啟用自動歸檔

    alter system archive log start; 
    
  • 再次查看當前是否為歸檔模式(看到為Enabled,則成功打開歸檔模式。)

    archive log list 
    

    Database log mode Archive Mode
    Automatic archival Enabled
    Archive destination USE_DB_RECOVERY_FILE_DEST
    Oldest online log sequence 12
    Next log sequence to archive 14
    Current log sequence 14

  • 查看輔助日志狀態(若為NO,則需要通過命令修改)

    select force_logging, supplemental_log_data_min from v$database;
    

    FORCE_ SUPPLEMENTAL_LOG


    NO NO

  • alter database force logging;
    
  • alter database add supplemental log data;
    
  • 再次查看輔助日志狀態(為YES即可)

    select force_logging, supplemental_log_data_min from v$database;
    

    FORCE_ SUPPLEMENTAL_LOG


    YES YES

4、源端oracle創建復制用戶

  • root用戶建立相關文件夾,并賦予權限

    mkdir -p /data/oracle/oggdata/orcl
    
    chown -R oracle:oinstall /data/oracle/oggdata/orcl
    
  • 執行下面sql

    SQL> create tablespace oggtbs datafile '/data/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;Tablespace created.SQL>  create user ogg identified by ogg default tablespace oggtbs;User created.SQL> grant dba to ogg;Grant succeeded.
  • Oracle創建測試表

    create user test_ogg  identified by test_ogg default tablespace users;
    grant dba to test_ogg;
    conn test_ogg/test_ogg;
    create table test_ogg(id int ,name varchar(20),primary key(id));
    

5、OGG源端配置

  • ggsci
    
  • create subdirs
    
  • dblogin userid ogg password ogg
    
  • edit param ./globals
    

    oggschema ogg

  • 配置管理器mgr

    edit param mgr
    

    PORT 7809

    DYNAMICPORTLIST 7810-7909

    AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 *

    PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3

  • 添加復制表

    add trandata test_ogg.test_ogginfo trandata test_ogg.test_ogg
    
  • 配置extract進程(ORACLE_SID與Orcale中的相同)

    edit param extkafka
    

    extract extkafka

    dynamicresolution

    SETENV (ORACLE_SID = “orcl11g”)

    SETENV (NLS_LANG = “american_america.AL32UTF8”)

    userid ogg,password ogg

    exttrail /da ta/ogg/dirdat/to

    table test_ogg.test_ogg;

    add extract extkafka,tranlog,begin now
    

    若報錯

    ERROR: Could not create checkpoint file /opt/ogg/dirchk/EXTKAFKA.cpe (error 2, No such file or directory).

    執行下面的命令再重新添加即可。

    create subdirs
    
    add exttrail /data/ogg/dirdat/to,extract extkafka
    
  • 配置pump進程

    edit param pukafka
    

    extract pukafka

    passthru

    dynamicresolution

    userid ogg,password ogg

    rmthost Carlota2 mgrport 7809

    rmttrail /data/ogg/dirdat/to

    table test_ogg.test_ogg;

    add extract pukafka,exttrailsource /data/ogg/dirdat/to
    
    add rmttrail /data/ogg/dirdat/to,extract pukafka
    
  • 配置define文件(Oracle與MySQL,Hadoop集群(HDFS,Hive,kafka等)等之間數據傳輸可以定義為異構數據類型的傳輸,故需要定義表之間的關系映射,)

    edit param test_ogg
    

    defsfile /data/ogg/dirdef/test_ogg.test_ogg

    userid ogg,password ogg

    table test_ogg.test_ogg;

  • 返回終端執行

    ./defgen paramfile dirprm/test_ogg.prm
    
  • 將生成的/data/ogg/dirdef/test_ogg.test_ogg發送的目標端ogg目錄下的dirdef里:

    scp -r /data/ogg/dirdef/test_ogg.test_ogg root@Carlota2:/opt/ogg/dirdef/ 
    

6、OGG目標端配置

  • 開啟kafka服務

    zkServer.sh startkafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties 
    
  • ggsic
    
  • 配置管理器mgr

    edit param mgr
    

    PORT 7809

    DYNAMICPORTLIST 7810-7909

    AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 *

    PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3

  • 配置checkpoint

    edit  param  ./GLOBALS
    

    CHECKPOINTTABLE test_ogg.checkpoint

  • 配置replicate進程

    edit param rekafka
    

    REPLICAT rekafka

    sourcedefs /data/ogg/dirdef/test_ogg.test_ogg

    TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props

    REPORTCOUNT EVERY 1 MINUTES, RATE

    GROUPTRANSOPS 10000

    MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;

  • 配置kafka.props(去掉注釋)

    cd /opt/ogg/dirprm/
    vim kafka.props
    

    gg.handlerlist=kafkahandler //handler類型
    gg.handler.kafkahandler.type=kafka
    gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相關配置
    gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名稱,無需手動創建
    gg.handler.kafkahandler.format=json //傳輸文件的格式,支持json,xml等
    gg.handler.kafkahandler.mode=op //OGG for Big Data中傳輸模式,即op為一次SQL傳輸一次,tx為一次事務傳輸一次
    gg.classpath=dirprm/:/usr/local/apps/kafka_2.12-2.5.0/libs/:/opt/ogg/:/opt/ogg/lib/

    vim custom_kafka_producer.properties
    

    bootstrap.servers=192.168.44.129:9092 //kafkabroker的地址
    acks=1
    compression.type=gzip //壓縮類型
    reconnect.backoff.ms=1000 //重連延時
    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    batch.size=102400
    linger.ms=10000

  • 添加trail文件到replicate進程

    add replicat rekafka exttrail /data/ogg/dirdat/to,checkpointtable test_ogg.checkpoint
    

7、測試

在源端和目標端的OGG命令行下使用start [進程名]的形式啟動所有進程。
啟動順序按照源mgr——目標mgr——源extract——源pump——目標replicate來完成。
全部需要在ogg目錄下執行ggsci目錄進入ogg命令行。
源端依次是

start mgr
start extkafka
start pukafka

目標端

start mgr
start rekafka

GGSCI (Carlota2) 1> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

REPLICAT RUNNING REKAFKA 00:00:00 00:00:08

GGSCI (Carlota3) 1> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

EXTRACT RUNNING EXTKAFKA 00:00:00 00:00:00

EXTRACT RUNNING PUKAFKA 00:00:00 00:00:10

現在源端執行sql語句

conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;

查看源端trail文件狀態

ls -l /data/ogg/dirdat/to*

查看目標端trail文件狀態

ls -l /data/ogg/dirdat/to*

查看kafka是否自動建立對應的主題

kafka-topics.sh --list --zookeeper localhost:2181

在列表中顯示有test_ogg則表示沒問題
通過消費者看是否有同步消息

kafka-console-consumer.sh --bootstrap-server Carlota2:9092 --topic test_ogg --from-beginning

{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“I”,“op_ts”:“2020-07-31 13:42:33.072327”,“current_ts”:“2020-07-31T13:42:38.928000”,“pos”:“00000000000000001066”,“after”:{“ID”:1,“NAME”:“test”}}

{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“U”,“op_ts”:“2020-07-31 13:42:46.005763”,“current_ts”:“2020-07-31T13:42:52.201000”,“pos”:“00000000000000001204”,“before”:{},“after”:{“ID”:1,“NAME”:“zhangsan”}}

{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“D”,“op_ts”:“2020-07-31 13:42:57.079268”,“current_ts”:“2020-07-31T13:43:02.231000”,“pos”:“00000000000000001347”,“before”:{“ID”:1}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/535796.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/535796.shtml
英文地址,請注明出處:http://en.pswp.cn/news/535796.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

轉載:35歲前成功的12條黃金法則

習慣的力量是驚人的。習慣能載著你走向成功,也能馱著你滑向失敗。如何選擇,完全取決于你自己。 1.習慣的力量:35歲以前養成好習慣 你想成功嗎?那就及早培養有利于成功的好習慣。 習慣的力量是驚人的,35歲…

JDK源碼解析之 Java.lang.Object

Object類是Java中其他所有類的祖先,沒有Object類Java面向對象無從談起。作為其他所有類的基類,Object具有哪些屬性和行為,是Java語言設計背后的思維體現。 Object類位于java.lang包中,java.lang包包含著Java最基礎和核心的類&…

將z-blog改成英文blog所遇到的問題

1.將z-blog中文章日期中的“年,月,日”改成英文 相關模板:b_article-multi.htmlb_article-single.html默認用的時間標簽是<#article/posttime/longdate#> 即 "2007年1月13日" 這樣的形式你可以換成 <#article/posttime/shortdate#>即 "2…

JDK源碼解析之 Java.lang.String

String 類代表字符串。Java 程序中的所有字符串字面值&#xff08;如 “abc” &#xff09;都作為此類的實例實現。 字符串是常量&#xff1b;它們的值在創建之后不能更改。字符串緩沖區支持可變的字符串。因為 String 對象是不可變的&#xff0c;所以可以共享。 一、類定義 p…

看到一個blog的標語,有意思!

"上世紀80年代勇氣&#xff0c;90年代靠關系&#xff0c;現在必須靠知識能力&#xff01;掙錢靠1、興趣廣泛&#xff1b; 2、感覺敏銳&#xff1b; 3、集中力強&#xff1b; 4、個性不脆弱&#xff08;堅韌性&#xff09;&#xff1b; 5、能在瞬間了解因果關系&#xff1b…

JDK源碼解析之 Java.lang.AbstractStringBuilder

這個抽象類是StringBuilder和StringBuffer的直接父類&#xff0c;而且定義了很多方法&#xff0c;因此在學習這兩個類之間建議先學習 AbstractStringBuilder抽象類 該類在源碼中注釋是以JDK1.5開始作為前兩個類的父類存在的&#xff0c;可是直到JDK1.8的API中&#xff0c;關于S…

RHEL下安裝配置基于2臺服務器的MYSQL集群

一、介紹這篇文檔旨在介紹如何在RHEL下安裝配置基于2臺服務器的MySQL集群。并且實現任意一臺服務器出現問題或宕機時MySQL依然能夠繼續運行。 注意&#xff01;雖然這是基于2臺服務器的MySQL集群&#xff0c;但也必須有額外的第三臺服務器作為管理節點&#xff0c;但這臺服務器…

JDK源碼解析之 Java.lang.StringBuffer

StringBuffer類表示一個可變的字符序列。StringBuffer的API與StringBuilder互相兼容&#xff0c;但是StringBuffer是線程安全的。在可能的情況下&#xff0c;建議優先使用StringBuilder&#xff0c;因為在大多數實現中它比StringBuffer更快。 一、類定義 public final class S…

redo和undo

這是在網上看到的對redo和undo的探討&#xff1a; 1. redo 記錄所有做過的事情&#xff0c;用于恢復 undo 記錄事務的前鏡相&#xff0c;用于回滾2. redo&#xff0c;恢復數據庫時&#xff0c;按照重做日志文件來恢復你之前的操作 undo&#xff0c;撤消你做過的操作&#xff0…

JDK源碼解析之 Java.lang.StringBuilder

StringBuilder類表示一個可變的字符序列。StringBuilder的API與StringBuffer互相兼容&#xff0c;但是StringBuilder是非線程安全的&#xff0c;在大多數實現中它比StringBuffer更快。 一、類定義 public final class StringBufferextends AbstractStringBuilderimplements ja…

從映射觀點看索引

信息檢索主要有“檢”與“索&#xff08;辦手續&#xff09;”兩個動作。在圖書館借書時&#xff0c;一般而言&#xff0c; 找書的時間比辦理手續的時間長得多&#xff0c;因而縮短檢查時間是提高效率的關鍵。數據庫中檢 索信息也與此類似。 在沒有索引文件時&#xff0c;DBM…

JDK源碼解析之 Java.lang.Boolean

Boolean 類是將 boolean 基本類型進行包裝。類型為 Boolean 的對象包含一個單一屬性 value&#xff0c;其類型為 boolean。 此外還提供了許多將 boolean 轉換為 String、String 轉換為 boolean&#xff0c;以及其他一些方法。 一、類定義 public final class Boolean implemen…

MYSQL的集群的安裝與配置(mysql-5.1.21)

具體安裝與配置&#xff1a;1&#xff09;準備工作&#xff1a;6臺機器&#xff0c;IP地址分別為192.168.0.&#xff08;231-236&#xff09;MGM節點&#xff1a;192.168.0.231(232)SQL 節點&#xff1a;192.168.0.233-234NDBD 節點&#xff1a;192.168.0.235-236系統都是REDHA…

JDK源碼解析之 Java.lang.Byte

byte&#xff0c;即字節&#xff0c;由8位的二進制組成。在Java中&#xff0c;byte類型的數據是8位帶符號的二進制數,以二進制補碼表示的整數 取值范圍&#xff1a;默認值為0&#xff0c;最小值為-128&#xff08;-27&#xff09;;最大值是127&#xff08;27-1&#xff09; Byt…

在命令行模式下管理SELinux

作者&#xff1a; Oslad.com (原創&#xff01;轉載請注明出處) 2006-07-14 在 GUI 圖形界面模式下&#xff0c;要更改 SELinux 的策略使用方式&#xff0c;只需依次點擊“應用程序”&#xff0c;“系統設置”&#xff0c;“安全級別”&#xff1b;然后在“安全級別配置”對…

JDK源碼解析之 Java.lang.Double

Double類是原始類型double的包裝類&#xff0c;它包含若干有效處理double值的方法&#xff0c;如將其轉換為字符串表示形式&#xff0c;反之亦然。Double類的對象可以包含一個double值。 Double類包裝原始類型的值 double中的對象。類型的對象 Double包含一個類型為的字段 doub…

網頁搜索幫助-禁止搜索引擎收錄的方法

什么是robots.txt文件?搜索引擎使用spider程序自動訪問互聯網上的網頁并獲取網頁信息。spider在訪問一個網站時&#xff0c;會首先會檢查該網站的根域下是否有一個叫做robots.txt的純文本文件。您可以在您的網站中創建一個純文本文件robots.txt&#xff0c;在文件中聲明該網站…

JDK源碼解析之 Java.lang.Float

Float類是原始類型float的包裝類&#xff0c;它包含若干有效處理浮點值的方法&#xff0c;如將其轉換為字符串表示形式&#xff0c;反之亦然。Float類的一個對象可以包含一個浮點值 一、類定義 public final class Float extends Number implements Comparable<Float> {…

FTP兩種工作模式:主動模式(Active FTP)和被動模式(Passive FTP)

在主動模式下&#xff0c;FTP客戶端隨機開啟一個大于1024的端口N向服務器的21號端口發起連接&#xff0c;然后開放N1號端口進行監聽&#xff0c;并向服務器發出PORT N 1命令。服務器接收到命令后&#xff0c;會用其本地的FTP數據端口&#xff08;通常是20&#xff09;來連接客戶…

JDK源碼解析之 java.lang.Integer

teger 基本數據類型int 的包裝類 Integer 類型的對象包含一個 int 類型的字段 一、類定義 public final class Integer extends Number implements Comparable<Integer>{}類被聲明為final的,表示不能被繼承;繼承了Number抽象類,可以用于數字類型的一系列轉換;實現了Comp…