本文討論了達夢數據實時同步軟件DMHS的相關內容,包括概念總結、環境模擬及部署實現從達夢數據庫到Kafka隊列的同步復制。關鍵要點包括:
1.DMHS系統概述:
達夢公司推出的異構環境高性能數據庫實時同步系統,可應用于應急、容災等多領域,能避免傳統備份問題,降低性能影響,解決主備系統局限。
2.系統組件與功能:
由源端和目標端數據庫及服務組成,源端含裝載、日志捕獲分析等模塊,目標端含執行和管理服務模塊,支持秒級同步、主備同步及數據一致性。
3.環境模擬:
在[192.168.58.3](192.168.58.3)機器創建達夢數據庫環境及數據,在[192.168.58.5](192.168.58.5)機器安裝單機Kafka并測試。
4.部署同步:
源端檢查并開啟歸檔和邏輯日志,兩端安裝DMHS服務,配置dmhs.hs等文件,進行同步測試,驗證數據能否從源端同步到Kafka隊列。
一、概念總結
達夢數據實時同步軟件 DMHS 是達夢公司推出的新一代支持異構環境的高性能、高可靠和高可擴展的數據庫實時同步系統。
DMHS 的基礎實現原理如下圖所示:
1、這是一個描述數據從源數據庫(SOURCE DB)遷移到目標數據庫(TARGET DB)的過程。以下是詳細解析:
1.1、源端數據庫(SOURCE DB):
-
- 包含兩個主要部分:
tables
和logs
。 tables
表示數據庫中的表結構和數據。logs
表示數據庫的操作日志。
- 包含兩個主要部分:
1.2、加載(LOAD):
-
- 步驟①:源數據庫的表數據被加載到一個臨時存儲區域或緩沖區中,準備進行傳輸。
1.3、變更數據捕獲(CPT):
-
- 步驟③:源數據庫的日志信息通過變更數據捕獲(CPT)處理,提取出需要同步的數據變更信息。
1.4、管理器(MGR):
-
- 步驟②和步驟④:管理和協調數據的傳輸過程。它接收來自LOAD和CPT的數據,并通過網絡發送給目標端的管理器。
1.5、網絡傳輸:
-
- 數據通過網絡從源端的管理器傳輸到目標端的管理器。
1.6、消息操作(Message operation):
-
- 目標端接收到數據后,進行消息操作,將數據分配給多個線程(Thread 1, Thread 2, ..., Thread n)進行并行處理。
1.7、執行(EXEC):
-
- 各個線程處理完數據后,將結果寫入目標數據庫(TARGET DB)。
整個流程是一個典型的數據庫遷移或同步過程,涉及數據加載、變更數據捕獲、網絡傳輸、多線程處理和最終寫入目標數據庫等步驟。
DMHS 的組成原理框圖中包含源端數據庫、目標端數據庫、源端 DMHS 服務以及目標端 DMHS 服務,
源端 DMHS 服務主要由裝載模塊(LOAD)、日志捕獲分析模塊(CPT)以及管理服務模塊(MGR)組成;
目標端 DMHS 服務則由執行模塊(EXEC)和管理服務模塊 (MGR)組成。
在源端,DMHS 的 CPT 模塊采用優化的日志掃描算法實現增量日志數據的快速捕獲分析,并將分析完成后的日志數據轉換為內部的消息格式,然后通過網絡將消息發送至目標端DMHS 服務;
在目標端, DMHS 服務接收到源端的日志消息后,對消息進行處理,通過多線程并行執行的方式將同步數據應用至目標端數據庫,實現數據實時同步。
2、系統概述與組件描述
1. 系統概述與組件
- DMHS 是高性能、高可靠性的數據庫實時同步系統。
- 系統組件包括:
-
- MGR(管理模塊):啟動框架,負責加載和啟動其他模塊。
- CPT(捕獲模塊):捕獲源數據庫的增量日志。
- LOAD(裝載模塊):負責數據的初始裝載和離線字典管理。
- NET(傳輸模塊):負責數據傳輸,包括發送和接收子模塊。
- EXEC(執行模塊):在目標端執行數據入庫。
2. 關鍵概念
- 同步:DMHS 支持秒級實時數據同步。
- 主備同步:支持雙活數據庫,實現業務連續性。
- 數據一致性:確保事務級的數據完整性和一致性。
3. 配置與管理
- 配置文件使用 XML 格式,詳細定義了各模塊的參數。
- MGR 模塊配置包括站點號、管理端口號等。
- CPT 模塊配置涉及數據庫連接信息、日志文件清理策略等。
4. 日志捕獲與分析
- CPT 模塊使用優化算法快速捕獲和分析增量日志。
- 支持基于觸發器和數據庫日志的 DDL 操作捕獲。
5. 數據裝載
- 初始裝載確保源端和目標端數據庫數據一致性。
- 支持直接數據裝載和備份文件裝載兩種方式。
6. 數據傳輸
- NET 模塊負責數據的傳輸,支持 TCP/IP 網絡傳輸和文件傳輸。
- 可以配置數據過濾和映射,以適應不同的同步需求。
7. 數據執行
- EXEC 模塊負責在目標端執行數據入庫。
- 支持多線程并行執行,提高同步效率。
8. 高級功能
- CVT 模塊:提供數據清洗和轉換功能。
- 復雜同步場景:支持雙向同步、級聯同步、環狀同步等。
- ETL 支持:與達夢 ETL 工具集成,提供數據抽取、清洗、轉換和裝載。
9. 二次開發
- 提供 C 語言接口和數據結構,方便第三方應用程序進行二次開發。
二、環境模擬
操作環境:VMware Workstation Pro 17 | ||||
機器ip | 主機名 | 操作系統 | 資源配置 | 實例名 |
192.168.52.10 | source | kylin-v10 | 4核4G,磁盤20g | SOURCE |
192.168.52.11 | kafka | kylin-v10 | 4核4G,磁盤20g |
|
機器ip | 實例名 | 達夢軟件安裝目錄 | 數據存儲目錄 |
192.168.52.10 | SOURCE | /dm8/dminstall | /dm8/data |
192.168.52.11 | / | /dm8/dminstall | /dm8/data |
三、安裝達夢數據庫并數據準備
需求:192.168.52.10機器上有一個SOURCE的數據庫,數據庫里面有個APP的模式,里面存放著項目的表數據,現在需要把這些表數據實時同步到192.168.52.11機器上的kafka數據庫里面的APP模式。
192.168.52.10(源數據庫)的SOURCE數據庫中的APP模式---->>192.168.52.11(目的數據庫)的kafka數據庫中的APP模式
下面的操作模擬環境:
1、模擬192.168.52.10(源環境)
1.1、創建達夢數據庫對應的用戶和組
groupadd dinstall
useradd -g dinstall dmdba
echo "Dameng123" |passwd --stdin dmdba
1.2、創建達夢數據庫安裝目錄
mkdir -p /dm8/{dminstall,dmdata,dmarch,dmback}
chown -R dmdba:dinstall /dm8
chmod -R 755 /dm8
1.3、調整系統資源限制
vim /etc/security/limits.conf
dmdba soft nofile 65536
dmdba hard nofile 65536
dmdba soft nproc 65536
dmdba hard nproc 65536
###soft軟連接,hard硬連接,nofile打開文件,nproc打開的進程
1.4、關閉防火墻:
systemctl stop firewalld
systemctl disable firewalld
sed -i 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/selinux/config
1.5、下載鏡像
達夢數據庫鏡像,官方下載:
產品下載 | 達夢在線服務平臺
1.6、掛載鏡像:
mount -o loop dm8_20230925_x86_rh6_64.iso /mnt/
1.7、切換dmdba用戶,安裝數據庫
su - dmdba
/mnt/DMInstall.bin -i
root用戶下執行命令
/dm8/dminstall/script/root/root_installer.sh
root用戶下執行命令 (關閉自啟服務)
systemctl disable DmAPService.service --now
systemctl status DmAPService.service
數據庫創建完畢!!!
1.8、初始化實例
su - dmdba
/dm8/dminstall/bin/dminit path=/dm8/dmdata db_name=SOURCE instance_name=SOURCE port_num=5236
1.9、編寫啟動腳本
cd /dm8/dminstall/bin
cp service_template/DmService DmServiceSOURCE
vim DmServiceSOURCE
1.10、修改內容如下
INI_PATH=%INI_PATH% 修改為 INI_PATH=/dm8/dmdata/DEST/dm.ini
1.11、啟動數據庫實例
./DmServiceSOURCE start
1.12、創建數據
切換到dmdba用戶執行
su - dmdba
cd /dm8/dminstall/bin
./disql SYSDBA/SYSDBA@192.168.52.10:5236
執行下面的命令創建表空間、用戶
CREATE TABLESPACE TEST DATAFILE 'TEST.DBF' SIZE 256;
CREATE USER APP IDENTIFIED BY "Dameng123" DEFAULT TABLESPACE TEST;
grant "DBA" to "APP";
exit
1.13、使用新建的APP用戶登錄數據庫,創建表和插入數據
./disql APP/Dameng123@192.168.52.10:5236
CREATE TABLE STUDENTS(STUDENT_ID INTEGER PRIMARY KEY IDENTITY(1,1),NAME VARCHAR(50) NOT NULL,BIRTH_DATE DATE NOT NULL,GENDER CHAR(1) CHECK (GENDER IN ('M','F')) NOT NULL,EMAIL VARCHAR(100) UNIQUE NOT NULL,PHONE_NUMBER VARCHAR(15));
INSERT INTO STUDENTS (NAME, BIRTH_DATE, GENDER, EMAIL, PHONE_NUMBER) VALUES ('張三', '2000-03-01', 'M', 'zhangsan@djl.com', '13243253257'),('李四', '1999-05-21', 'F', 'lisi@djl.com', '13312345678'),('王五', '2001-07-11', 'M', 'wangwu@djl.com', '13423456789'),('趙六', '1998-08-15', 'F', 'zhaoliu@djl.com', '13534567890'),('錢七', '2002-12-12', 'M', 'qianqi@djl.com', '13645678901'),('孫八', '2000-10-10', 'F', 'sunba@djl.com', '13756789012'),('周九', '1997-11-22', 'M', 'zhoujiu@djl.com', '13867890123'),('吳十', '2001-04-05', 'F', 'wushi@djl.com', '13978901234'),('鄭十一', '1999-06-18', 'M', 'zhengshiyi@djl.com', '14089012345'),('王十二', '1998-09-09', 'F', 'wangshier@djl.com', '14190123456');
commit;
SELECT * from STUDENTS;
exit
2、模擬192.168.52.11(目的庫)環境,安裝單機kafka
1.1、安裝數據庫不再占用篇幅,參考上方安裝數據庫
1.2、安裝java環境(root用戶)
java -version
在銀河麒麟V10sp2系統中,系統中已經帶有java,這里就不進行安裝了
1.3、安裝zookeeper(root用戶)
由于kafka依賴于ZooKeeper,需要安裝部署zookeeper并啟動,這里使用的是3.9.3版本
可以上阿里云里下載:apache-zookeeper-zookeeper-3.9.3安裝包下載_開源鏡像站-阿里云
1.3.1、上傳壓縮包并解壓
tar -zxvf apache-zookeeper-3.9.3-bin.tar.gz
mv apache-zookeeper-3.9.3-bin /usr/local/zookeeper
1.3.2、復制配置文件zoo_sample.cfg 為zoo.cfg
cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
1.3.3、修改 zoo.cfg 中dataDir的路徑
vim /usr/local/zookeeper/conf/zoo.cfg
:/dataDir 或者 :12
dataDir=/usr/local/zookeeper/data
1.3.4、創建數據庫存放目錄
mkdir -p /usr/local/zookeeper/data
1.3.5、配置環境變量(文件最小面追加)
vim /etc/profile
#zookeeper
export ZK_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZK_HOME/bin#kafka
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
1.3.6、加載環境變量
source /etc/profile
1.3.7、啟動zookeeper
zkServer.sh start
1.3.8、查看狀態和端口
zkServer.sh status
1.3.9、查看zkServer.sh 狀態內容解析:
輸出解析:
/usr/bin/java
-
- 這表示腳本使用了系統的
java
命令來啟動 ZooKeeper。 - 確保系統中安裝的 Java 版本是兼容的(ZooKeeper 通常需要 Java 8 或更高版本)。
- 這表示腳本使用了系統的
ZooKeeper JMX enabled by default
-
- 表示 ZooKeeper 默認啟用了 JMX(Java Management Extensions),這是一個用于監控和管理 Java 應用程序的功能。
- 如果不需要 JMX,可以通過配置禁用它,但這不是錯誤。
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
-
- 這表明腳本正在使用
/usr/local/zookeeper/conf/zoo.cfg
配置文件。 - 確保
zoo.cfg
文件存在并且配置正確(例如dataDir
和clientPort
是否設置正確)。
- 這表明腳本正在使用
Client port found: 2181. Client address: localhost. Client SSL: false.
-
- 客戶端連接端口為
2181
,這是 ZooKeeper 的默認端口。 - 客戶端地址為
localhost
,表示 ZooKeeper 只監聽本地連接。 - SSL 被禁用,這通常是正常的,除非你需要啟用 SSL 加密。
- 客戶端連接端口為
Mode: standalone
-
- 表示 ZooKeeper 當前運行在 單機模式(standalone mode)。
- 單機模式適用于開發和測試環境。如果你需要高可用性或分布式部署,則需要配置 ZooKeeper 集群(即多節點模式)。
總結內容:
- ZooKeeper 成功加載了配置文件。
- 它正在監聽默認的客戶端端口
2181
。 - 它以單機模式運行,狀態正常。
1.3.10、查看與端口號 2181 相關的所有網絡連接信息
ss -auntpl |grep 2181
結果解析:
- tcp:表示這是 TCP 協議的連接。
- LISTEN:表示該套接字處于監聽狀態,等待客戶端連接。
- 0 和 50:分別表示接收隊列和發送隊列的大小。當前接收隊列為空(0),最大允許的未完成連接數為 50。
- *:2181:表示服務器在所有可用網絡接口上監聽 2181 端口。
*:* 表示任何遠程地址和端口都可以連接到這個監聽端口。
- users:(("java",pid=3436,fd=73)):表示使用該套接字的進程是 Java 進程,其進程 ID (PID) 為 3436,文件描述符 (FD) 為 73。
總結
執行 ss -auntpl | grep 2181 可幫助檢查端口 2181 上的網絡活動,尤其是當 ZooKeeper 正在運行并監聽此端口時。
1.4、安裝kafka(root用戶)
使用的版本是3.90
https://dlcdn.apache.org/kafka/3.9.0/kafka_2.12-3.9.0.tgz
1.4.1、將下載的文件上傳到服務器解壓
tar -zxvf kafka_2.13-3.9.0.tgz
mv kafka_2.13-3.9.0 /usr/local/kafka
1.4.2、修改 /usr/local/kafka/config/server.properties 文件三個地方
vim /usr/local/kafka/config/server.properties
# ip寫自己機器的ip ##34行
listeners=PLAINTEXT://192.168.52.11:9092
##62行
log.dirs=/usr/local/kafka/data/kafka-logs
# 連接zookeeper ##125行
zookeeper.connect=192.168.58.5:2181
1.4.3、啟動kafka
cd /usr/local/kafka/bin
kafka-server-start.sh ../config/server.properties &
查看kafka版本
kafka-server-start.sh --version
1.4.4、測試kafka(root用戶)
1、創建 topic
cd /usr/local/kafka/bin
kafka-topics.sh --create --bootstrap-server 192.168.52.11:9092 --replication-factor 1 --partitions 1 --topic test
1.kafka-topics.sh
- 這是一個 Kafka 提供的腳本,用于管理 Kafka 主題(Topic),包括創建、刪除、查看和修改主題。
- 它通常位于 Kafka 安裝目錄的
bin
文件夾中。
2. --create
- 表示要創建一個新的主題。
3. --bootstrap-server 192.168.52.11:9092
- 指定 Kafka 集群的地址和端口。
192.168.52.11
是 Kafka Broker 的 IP 地址。9092
是 Kafka 默認的監聽端口。- 如果有多個 Broker,可以指定多個地址,用逗號分隔。例如:
4. --replication-factor 1
- 指定主題的副本因子(Replication Factor)。
- 副本因子決定了每個分區(Partition)的副本數量。
- 在這個例子中,副本因子為
1
,表示每個分區只有一個副本(即沒有額外的備份)。 - 注意:副本因子不能超過 Kafka 集群中可用 Broker 的數量。
5. --partitions 1
- 指定主題的分區數量。
- 分區(Partition)是 Kafka 中并行處理的基本單位。
- 在這個例子中,分區數量為
1
,表示該主題只有一個分區。
6. --topic test
- 指定要創建的主題名稱。
- 在這個例子中,主題名稱為
test
。
2、查看topic
kafka-topics.sh --list --bootstrap-server 192.168.52.11:9092
1. kafka-topics.sh
- 這是一個 Kafka 提供的腳本,用于管理 Kafka 主題(Topic),包括創建、刪除、查看和修改主題。
- 它通常位于 Kafka 安裝目錄的
bin
文件夾中。
2. --list
- 表示要列出 Kafka 集群中的所有主題。
3. --bootstrap-server 192.168.52.11:9092
- 指定 Kafka 集群的地址和端口。
192.168.52.11
是 Kafka Broker 的 IP 地址。9092
是 Kafka 默認的監聽端口。- 如果有多個 Broker,可以指定多個地址,用逗號分隔。例如:
- --bootstrap-server 192.168.52.11:9092,192.168.52.12:9092
這條命令的作用是:
- 連接到
192.168.52.11:9092
上的 Kafka Broker。 - 列出 Kafka 集群中所有的主題名稱。
3、返回上面創建的 test # 查看topic描述
kafka-topics.sh --describe --bootstrap-server 192.168.52.11:9092 --topic test
1. kafka-topics.sh
- 這是一個 Kafka 提供的腳本,用于管理 Kafka 主題(Topic),包括創建、刪除、查看和修改主題。
- 它通常位于 Kafka 安裝目錄的
bin
文件夾中。
2. --describe
- 表示要查看指定主題的詳細信息。
3. --bootstrap-server 192.168.52.11:9092
- 指定 Kafka 集群的地址和端口。
192.168.52.11
是 Kafka Broker 的 IP 地址。9092
是 Kafka 默認的監聽端口。
4.--topic test
- 指定要查看的主題名稱。
- 在這個例子中,主題名稱為
test
。
這條命令的作用是:
- 連接到
192.168.52.11:9092
上的 Kafka Broker。 - 查看主題
test
的詳細信息,包括分區、副本、領導者(Leader)、同步副本集(ISR)等。
4、啟動生產者(保留該窗口,別關閉)
cd /usr/local/kafka/bin
kafka-console-producer.sh --broker-list 192.168.52.11:9092 --topic test
- kafka-console-producer.sh
這是一個 Kafka 提供的腳本,用于啟動控制臺生產者。
它允許用戶通過命令行向 Kafka 主題發送消息。
腳本通常位于 Kafka 安裝目錄的 bin 文件夾中。 - --broker-list 192.168.52.11:9092
指定 Kafka 集群的地址和端口。
192.168.52.11 是 Kafka Broker 的 IP 地址。
9092 是 Kafka 默認的監聽端口。
如果有多個 Broker,可以指定多個地址,用逗號分隔。例如:
Bash
深色版本
--broker-list 192.168.52.11:9092,192.168.52.12:9092 - --topic test
指定要發送消息的目標主題名稱。
在這個例子中,目標主題為 test。
這條命令的作用是:
啟動一個 Kafka 控制臺生產者。
連接到 192.168.52.11:9092 上的 Kafka Broker。
將用戶從終端輸入的消息發送到主題 test。
運行過程:
執行命令后,終端會進入交互模式。
用戶可以直接在終端中輸入消息并按回車鍵發送。
每次按下回車鍵,輸入的內容會被當作一條消息發送到 Kafka 主題 test。
生產者會持續運行,直到手動終止(通常是通過按下 Ctrl+C)。
5、啟動消費者
cd /usr/local/kafka/bin
kafka-console-consumer.sh --bootstrap-server 192.168.52.11:9092 --topic test --from-beginning
1.--topic test
指定要消費的目標主題名稱。
在這個例子中,目標主題為 test。
2. --from-beginning
表示從主題的最早消息開始消費。
如果沒有這個選項,消費者會從當前最新的偏移量(Offset)開始消費,只會接收新發送的消息。
使用 --from-beginning 后,消費者會讀取該主題中存儲的所有歷史消息。
這條命令的作用是:
啟動一個 Kafka 控制臺消費者。
連接到 192.168.52.11:9092 上的 Kafka Broker。
從主題 test 中讀取消息,并從最早的消息開始顯示。
6、生產和消費的模擬
生產者生產消息:
消費者查看消息:
四、開啟歸檔和日志
1、環境檢查(源端)
需要檢查源端的數據庫是否開啟了歸檔日志和邏輯日志,如果有開啟,忽略下面的操作
1.1、檢查歸檔日志和邏輯日志是否開啟
su - dmdba
cd /dm8/dminstall/bin
./disql APP/Dameng123@192.168.52.10:5236
1.2、檢查歸檔配置的正確性,如果能查詢到結果就是開啟了
SELECT ARCH_DEST, ARCH_FILE_SIZE FROM SYS.V$DM_ARCH_INI WHERE ARCH_TYPE='LOCAL' AND ARCH_IS_VALID='Y';
1.3、檢查邏輯日志配置的正確性,查詢出來的結果是1或者2是開啟了
SELECT PARA_VALUE FROM SYS.V$DM_INI WHERE PARA_NAME = 'RLOG_APPEND_LOGIC';
exit
已經開啟的示例:
2、如果上面的查詢結果顯示沒有開啟,執行下面的操作開啟
2.1、編輯文件dm.ini
vim /dm8/dmdata/SOURCE/dm.ini
##ARCH_INI和RLOG_APPEND_LOGIC設置成1,表示開啟
ARCH_INI = 1
RLOG_APPEND_LOGIC = 1
2.2、添加歸檔配置文件
vim /dm8/dmdata/SOURCE/dmarch.ini
# 內容如下:
[ARCHIVE_LOCAL1]
ARCH_TYPE = LOCAL
ARCH_DEST = /dm8/dmarch #歸檔目錄
ARCH_FILE_SIZE = 128 #歸檔文件大小,單位 MB
ARCH_SPACE_LIMIT = 0 #空間大小限制,0 表示不限制
2.3、重啟數據庫
cd /dm8/dminstall/bin
./DmServiceSOURCE restart
五、安裝dmhs服務(源端和目的端都操作)(dmdba用戶)
1、關閉防火墻
systemctl stop firewalld
systemctl disable firewalld
systemctl status firewalld
2、上傳dmhs的bin、key文件
把下載好的bin、key文件上傳到服務器的/opt目錄下,并給bin文件授予執行權限。
chmod +x /opt/dmhs_V4.3.20_dm8_rev140201_rh6_64_20230916.bin
3、開始安裝dmhs
su - dmdba
/opt/dmhs_V4.3.20_dm8_rev140201_rh6_64_20230916.bin -i
大部分配置使用默認的就行,直接回車
如果不用Key文件到后方會報錯
這里 的ip需要照應
配置依賴庫路徑就填這個
/dm8/dminstall/bin:/dm8/dmhs/bin
安裝完成!!
六、配置dmhs.hs
1、源端配置
1.1、文件的復制
su - dmdba
cd /dm8/dmhs
復制一份bin目錄,單獨配置,當同一個服務上需要部署多個dmhs,可以方便區分
cp -r bin cpt01
cd cpt01
如果是測試dmhs,在使用的過程中可能會缺少libdmoci.so動態庫文件報錯,執行下面的命令復制一份到數據庫的bin目錄下,如果是生產中,需要去達夢官方申請
cp -r stat/libdmoci.so /dm8/dminstall/bin
1.2、編輯配置文件dmhs.hs
cd /dm8/dmhs/cpt01
vim dmhs.hs
# 內容如下:
<?xml version="1.0" encoding="GB2312" standalone="no"?>
<dmhs>
<base><lang>en</lang><mgr_port>5345</mgr_port><ckpt_interval>60</ckpt_interval><siteid>1</siteid><version>2.0</version>
</base>
<cpt><db_type>DM8</db_type><db_server>192.168.52.10</db_server><db_user>APP</db_user><db_pwd>Dameng123</db_pwd><db_port>5236</db_port><idle_time>10</idle_time><ddl_mask>0</ddl_mask><cpt_mask>PARSE:POST:REG_OP2</cpt_mask><n2c>0</n2c><update_fill_flag>3</update_fill_flag><set_heartbeat>1</set_heartbeat><arch><clear_interval>600</clear_interval><clear_flag>0</clear_flag></arch>
<send><ip>192.168.52.11</ip><mgr_port>5345</mgr_port><data_port>5346</data_port><net_pack_size>256</net_pack_size><net_turns>0</net_turns><crc_check>0</crc_check><trigger>0</trigger><constraint>0</constraint><identity>0</identity><filter><enable><item>APP.*</item></enable></filter><map><item>APP.* == APP.*</item></map>
</send>
</cpt>
</dmhs>
2、配置目的端:
su - dmdba
cd /dm8/dmhs
2.1、復制一份bin目錄,單獨配置,當同一個服務上需要部署多個dmhs,可以方便區分
cp -r bin exec01
cd exec01
2.2、目的端(52.11)配置文件dmhs.hs
vim dmhs.hs
<?xml version="1.0" encoding="GB2312" standalone="no"?>
<dmhs><base><lang>en</lang><mgr_port>5345</mgr_port><ckpt_interval>60</ckpt_interval><siteid>2</siteid><version>2.0</version></base><exec><recv><data_port>5346</data_port></recv><exec_thr>1</exec_thr><exec_sql>1024</exec_sql><exec_policy>2</exec_policy><is_kafka>1</is_kafka><max_packet_size>16</max_packet_size><enable_ddl>1</enable_ddl></exec>
</dmhs>
七、配置文件dmhs_kafka.properties
vim dmhs_kafka.properties
# 內容如下:
# DMHS config file path
dmhs.conf.path=/dm8/dmhs/exec01/dmhs.hs
# kafka broker list,such as ip1:port1,ip2:port2,...
bootstrap.servers=192.168.52.11:9092
# kafka topic name
kafka.topic.name=test
#dmhs.sendKey.parse.format=schema:source:tableName
#dmhs.sendKey.parse.format=primary_keys_values
#dmhs.sendTopic.parse.format=schema:source:tableName
#topic.map.conf.path=/dmhs_kafka/bin_0329/tableTopicMap.properties
# whether to enable JSON format check
json.format.check=1
# How many messages print cost time
print.message.num=1000
# How many messages batch to get
dmhs.min.batch.size=100
# kafka serializer class
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# kafka partitioner config
partitioner.class=com.dameng.dmhs.dmga.service.impl.OnePartitioner
# kafka request acks config
acks=-1
max.request.size=5024000
batch.size=1048576
linger.ms=3
buffer.memory=134217728
retries=3
#enable.idempotence=true
compression.type=none
max.in.flight.requests.per.connection=1
send.buffer.bytes=1048576
metadata.max.age.ms=300000
八、配置文件start_dmhs_kafka.sh
vim start_dmhs_kafka.sh
# 內容如下:
export LANG=en_US.UTF-8
java -Djava.ext.dirs="/usr/local/kafka/libs:." com.dameng.dmhs.dmga.service.impl.ExecDMHSKafkaService dmhs_kafka.properties
賦可執行權限
chmod +x start_dmhs_kafka.sh
九、同步測試
1、啟動目的端
su - dmdba
cd /dm8/dmhs/exec01
./start_dmhs_kafka.sh啟動后則表示目的端已經以前臺方式啟動增量同步。
成功示例:
2、啟動源端:
su - dmdba
cd /dm8/dmhs/cpt01
./dmhs_server
成功示例:
DMHS>copy 0 "SCH.NAME='APP'" DICT
DMHS>copy 0 "SCH.NAME='APP'" INSERT
DMHS>clear exec lsn
DMHS>start cpt
3、查看kafka隊列(topic)test中是否有數據
kafka-console-consumer.sh --bootstrap-server 192.168.52.11:9092 --topic test --from-beginning
4、配置好服務并啟動后,再去源端新增一條數據,然后到目的端kafka看看是否同步成功
# 源端添加一條數據
su - dmdba
cd /dm8/dminstall/bin
./disql APP/Dameng123@192.168.52.10:5236
INSERT INTO APP.STUDENTS (NAME, BIRTH_DATE, GENDER, EMAIL, PHONE_NUMBER) VALUES ('美女', '2002-4-25', 'F', 'meinv@djl.com', '13243253549');
COMMIT;
達夢數據庫社區地址:達夢數據庫 - 新一代大型通用關系型數據庫 | 達夢在線服務平臺