Oracle Golden Gate軟件是一種基于日志的結構化數據復制備份軟件,它通過解析源數據庫在線日志或歸檔日志獲得數據的增量變化,再將這些變化應用到目標數據庫,從而實現源數據庫與目標數據庫同步。
0、本篇中源端和目標端的一些配置信息:
- | 版本 | OGG版本 | id地址 |
---|---|---|---|
源端 | Oracle11gR2 | Oracle GoldenGate 11.2.1.0.1 for Oracle on Linux x86-64 | Carlota3 |
目標端 | kafka_2.12-2.5.0 | Linux x86-64上的Oracle GoldenGate for Big Data 19.1.0.0.1 | Carlota2 |
源端和目標端的文件不一樣,目標端需要下載Oracle GoldenGate for Big Data,源端需要下載Oracle GoldenGate for Oracle!
PS:源端是安裝好了Oracle的機器,目標端是安裝好了Kafka的機器,二者環境變量之前都配置好了。
1、源端OGG安裝
-
先建立ogg目錄
mkdir -p /opt/ogg
-
解壓zip文件
unzip ogg112101_fbo_ggs_Linux_x64_ora11g_64bit.zip
-
解壓后得到一個tar包,再解壓這個tar
tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg
-
使oracle用戶有ogg的權限,后面有些需要在oracle用戶下執行才能成功
chown -R oracle:oinstall /data/ogg
-
配置OGG環境變量
vim /etc/profile
export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=ORACLEHOME/lib:/usr/libexportPATH=ORACLE_HOME/lib:/usr/lib export PATH=ORACLEH?OME/lib:/usr/libexportPATH=OGG_HOME:$PATH -
source /etc/profile
2、目標端OGG安裝
-
先建立ogg目錄
mkdir -p /data/ogg
-
解壓zip文件
unzip OGG_BigData_Linux_x64_19.1.0.0.1.zip
-
解壓后得到一個tar包,再解壓這個tar
tar xf OGG_BigData_Linux_x64_19.1.0.0.1.tar
-
使oracle用戶有ogg的權限,后面有些需要在oracle用戶下執行才能成功
chown -R oracle:oinstall /data/ogg
-
配置OGG環境變量
vim /etc/profile
export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=JAVAHOME/jre/lib/amd64:JAVA_HOME/jre/lib/amd64:JAVAH?OME/jre/lib/amd64:JAVA_HOME/jre/lib/amd64/server:JAVAHOME/jre/lib/amd64/libjsig.so:JAVA_HOME/jre/lib/amd64/libjsig.so:JAVAH?OME/jre/lib/amd64/libjsig.so:JAVA_HOME/jre/lib/amd64/server/libjvm.so:OGGHOME/libexportPATH=OGG_HOME/lib export PATH=OGGH?OME/libexportPATH=OGG_HOME:$PATH -
source /etc/profile
-
ggsci
-
create subdirs
3、源端Oracle歸檔模式設置
-
登陸Oracle用戶
su - oracle
-
登陸Oracle
sqlplus / as sysdba
-
查看當前是否為歸檔模式(若為Disabled,則需手動打開)
archive log list
Database log mode
No Archive Mode Automatic archival
Disabled Archive destination
USE_DB_RECOVERY_FILE_DEST Oldest online log sequence 12
Current log sequence 14
-
立即關閉數據庫
shutdown immediate
-
啟動實例并加載數據庫,但不打開
startup mount
-
更改數據庫為歸檔模式
alter database archivelog;
-
打開數據庫
alter database open;
-
啟用自動歸檔
alter system archive log start;
-
再次查看當前是否為歸檔模式(看到為Enabled,則成功打開歸檔模式。)
archive log list
Database log mode Archive Mode
Automatic archival Enabled
Archive destination USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence 12
Next log sequence to archive 14
Current log sequence 14 -
查看輔助日志狀態(若為NO,則需要通過命令修改)
select force_logging, supplemental_log_data_min from v$database;
FORCE_ SUPPLEMENTAL_LOG
NO NO
-
alter database force logging;
-
alter database add supplemental log data;
-
再次查看輔助日志狀態(為YES即可)
select force_logging, supplemental_log_data_min from v$database;
FORCE_ SUPPLEMENTAL_LOG
YES YES
4、源端oracle創建復制用戶
-
root用戶建立相關文件夾,并賦予權限
mkdir -p /data/oracle/oggdata/orcl
chown -R oracle:oinstall /data/oracle/oggdata/orcl
-
執行下面sql
SQL> create tablespace oggtbs datafile '/data/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;Tablespace created.SQL> create user ogg identified by ogg default tablespace oggtbs;User created.SQL> grant dba to ogg;Grant succeeded.
-
Oracle創建測試表
create user test_ogg identified by test_ogg default tablespace users; grant dba to test_ogg; conn test_ogg/test_ogg; create table test_ogg(id int ,name varchar(20),primary key(id));
5、OGG源端配置
-
ggsci
-
create subdirs
-
dblogin userid ogg password ogg
-
edit param ./globals
oggschema ogg
-
配置管理器mgr
edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 *
PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3
-
添加復制表
add trandata test_ogg.test_ogginfo trandata test_ogg.test_ogg
-
配置extract進程(ORACLE_SID與Orcale中的相同)
edit param extkafka
extract extkafka
dynamicresolution
SETENV (ORACLE_SID = “orcl11g”)
SETENV (NLS_LANG = “american_america.AL32UTF8”)
userid ogg,password ogg
exttrail /da ta/ogg/dirdat/to
table test_ogg.test_ogg;
add extract extkafka,tranlog,begin now
若報錯
ERROR: Could not create checkpoint file /opt/ogg/dirchk/EXTKAFKA.cpe (error 2, No such file or directory).
執行下面的命令再重新添加即可。
create subdirs
add exttrail /data/ogg/dirdat/to,extract extkafka
-
配置pump進程
edit param pukafka
extract pukafka
passthru
dynamicresolution
userid ogg,password ogg
rmthost Carlota2 mgrport 7809
rmttrail /data/ogg/dirdat/to
table test_ogg.test_ogg;
add extract pukafka,exttrailsource /data/ogg/dirdat/to
add rmttrail /data/ogg/dirdat/to,extract pukafka
-
配置define文件(Oracle與MySQL,Hadoop集群(HDFS,Hive,kafka等)等之間數據傳輸可以定義為異構數據類型的傳輸,故需要定義表之間的關系映射,)
edit param test_ogg
defsfile /data/ogg/dirdef/test_ogg.test_ogg
userid ogg,password ogg
table test_ogg.test_ogg;
-
返回終端執行
./defgen paramfile dirprm/test_ogg.prm
-
將生成的/data/ogg/dirdef/test_ogg.test_ogg發送的目標端ogg目錄下的dirdef里:
scp -r /data/ogg/dirdef/test_ogg.test_ogg root@Carlota2:/opt/ogg/dirdef/
6、OGG目標端配置
-
開啟kafka服務
zkServer.sh startkafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
-
ggsic
-
配置管理器mgr
edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 *
PURGEOLDEXTRACTS ./dirdat/,usecheckpoints, minkeepdays 3
-
配置checkpoint
edit param ./GLOBALS
CHECKPOINTTABLE test_ogg.checkpoint
-
配置replicate進程
edit param rekafka
REPLICAT rekafka
sourcedefs /data/ogg/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;
-
配置kafka.props(去掉注釋)
cd /opt/ogg/dirprm/ vim kafka.props
gg.handlerlist=kafkahandler //handler類型
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相關配置
gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名稱,無需手動創建
gg.handler.kafkahandler.format=json //傳輸文件的格式,支持json,xml等
gg.handler.kafkahandler.mode=op //OGG for Big Data中傳輸模式,即op為一次SQL傳輸一次,tx為一次事務傳輸一次
gg.classpath=dirprm/:/usr/local/apps/kafka_2.12-2.5.0/libs/:/opt/ogg/:/opt/ogg/lib/vim custom_kafka_producer.properties
bootstrap.servers=192.168.44.129:9092 //kafkabroker的地址
acks=1
compression.type=gzip //壓縮類型
reconnect.backoff.ms=1000 //重連延時
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000 -
添加trail文件到replicate進程
add replicat rekafka exttrail /data/ogg/dirdat/to,checkpointtable test_ogg.checkpoint
7、測試
在源端和目標端的OGG命令行下使用start [進程名]的形式啟動所有進程。
啟動順序按照源mgr——目標mgr——源extract——源pump——目標replicate來完成。
全部需要在ogg目錄下執行ggsci目錄進入ogg命令行。
源端依次是
start mgr
start extkafka
start pukafka
目標端
start mgr
start rekafka
GGSCI (Carlota2) 1> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REKAFKA 00:00:00 00:00:08
GGSCI (Carlota3) 1> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKAFKA 00:00:00 00:00:00
EXTRACT RUNNING PUKAFKA 00:00:00 00:00:10
現在源端執行sql語句
conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;
查看源端trail文件狀態
ls -l /data/ogg/dirdat/to*
查看目標端trail文件狀態
ls -l /data/ogg/dirdat/to*
查看kafka是否自動建立對應的主題
kafka-topics.sh --list --zookeeper localhost:2181
在列表中顯示有test_ogg則表示沒問題
通過消費者看是否有同步消息
kafka-console-consumer.sh --bootstrap-server Carlota2:9092 --topic test_ogg --from-beginning
{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“I”,“op_ts”:“2020-07-31 13:42:33.072327”,“current_ts”:“2020-07-31T13:42:38.928000”,“pos”:“00000000000000001066”,“after”:{“ID”:1,“NAME”:“test”}}
{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“U”,“op_ts”:“2020-07-31 13:42:46.005763”,“current_ts”:“2020-07-31T13:42:52.201000”,“pos”:“00000000000000001204”,“before”:{},“after”:{“ID”:1,“NAME”:“zhangsan”}}
{“table”:“TEST_OGG.TEST_OGG”,“op_type”:“D”,“op_ts”:“2020-07-31 13:42:57.079268”,“current_ts”:“2020-07-31T13:43:02.231000”,“pos”:“00000000000000001347”,“before”:{“ID”:1}}