從 PostgreSQL 到 DolphinDB:數據實時同步一站式解決方案

隨著越來越多的用戶使用 DolphinDB,各種不同的應用的場景也對 DolphinDB 的數據接入提出了不同的要求。部分用戶需要將 PostgreSQL 的數據實時同步到 DolphinDB 中來,以滿足在 DolphinDB 中使用數據的實時性需求。本篇教程將介紹使用 Debezium 來實時捕獲和發布 PostgreSQL 的數據庫更改事件,并完成 PostgreSQL 到 DolphinDB 的實時數據同步的完整的解決方案。

1. Debezium 同步方案概述

Debezium 是一個開源的分布式平臺,用于實時捕獲和發布數據庫更改事件。它可以將關系型數據庫(如MySQL、PostgreSQL、Oracle 等)的變更事件轉化為可觀察的流數據,以供其他應用程序實時消費和處理。本教程中將采用 Debezium 與 Kafka 組合的方式來實現從 PosgreSQL12 到 DolphinDB 的數據同步。

Kafka +Debezium 的數據同步方案需要部署 4 個服務,如下所示:

  • ZooKeeper:kafka 的依賴部署。
  • Kafka:數據存儲。
  • Kafka-Connect:用于接入數據插件 source-connetor, sink-connector 的框架,可以提供高可用,也可以部署單實例版本。
  • Schema-Registry?:提供實時同步的數據的元數據注冊功能 ,支持數據序列化。

基于 Debezium 同步 PostgreSQL 數據到 DolphinDB 的架構圖如下:

圖 1-1 同步架構圖

接下來,本教程將逐一介紹這些服務的下載、安裝,以及配置數據同步任務。

2. 部署 Kafka 單實例實時數據存儲

Kafka 單實例實時數據存儲部署方案在 MySQL 和 Oracle 數據同步教程中均有有介紹,可以查看:Debezium+Kafka 實時同步 Oracle 11g 數據到 DolphinDB 教程,部署好 ZooKeeper,Kafka,Kafka-Connect , Schema-Registry 的數據同步、存儲運行環境。

3. 從 PostgreSQL 到 Kafka 的數據同步:部署程序與配置任務

在部署好 Kafka 和 Kafka Connect 環境后,即可配置 PostgreSQL 的實時數據同步任務。

注意:PostgreSQL 的數據實時同步僅支持 UTF-8 字符集。

本節涉及的操作,若未作特殊說明,均使用操作系統用戶 kafka 執行。kafka 用戶的權限等配置,詳見章節 2 部署 Kafka 單實例實時數據存儲 部分。

3.1 配置 PostgreSQL 數據庫

對于 Source 數據庫是 PostgresSQL,本教程采用 PostgreSQL 自帶的邏輯復制流插件 pgoutput 插件來捕獲實時數據變化。pgoutput 插件是 PostgreSQL 在版本 PostgreSQL 10+ 推出并內置的插件,無須額外配置。所以本教程介紹的 PostgresSQL 到 DolphinDB 的實時數據同步方案 需要 PostgreSQL 10+ 以上版本。

本教程中使用的數據庫是配置于 CentOS 7 下的 PostgresSQL 12 版本。

第一步: 配置邏輯解碼和數據庫訪問權限

PostgreSQL 的實時變更數據捕獲是通過配置 replication slots ,開啟邏輯復制流實現。配置以下參數,可以管理當前數據庫的邏輯解碼進程。

通過 vim 命令打開 /var/lib/pgsql/12/data/postgresql.conf 文件,添加或者調整以下參數。wal_level 參數必須設置。

# REPLICATION
wal_level = logical             # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 4             # max number of walsender processes (change requires restart)
#wal_keep_segments = 4          # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s       # in milliseconds; 0 disables
max_replication_slots = 4       # max number of replication slots (change requires restart)

PostgreSQL 中的數據庫訪問權限需要在 /var/lib/pgsql/12/data/pg_hba.conf 中進行配置。可以如下配置 IP4 的所有主機的 md5 加密訪問權限。

host all all 0.0.0.0/0  md5

修改配置文件需要重啟數據庫生效。

第二步: 創建數據庫和用戶

在使用數據庫時,通常會為業務數據創建專門的用戶和數據庫進行數據管理。這里我們創建業務數據的存儲數據庫和業務用戶,再創建一個專門進行邏輯復制的用戶。

使用超級用戶 postgres ,創建業務數據用戶和數據庫,并授予權限。

CREATE USER factoruser WITH PASSWORD '111111';
CREATE DATABASE factordb;
GRANT ALL PRIVILEGES ON DATABASE factordb TO factoruser ;

創建邏輯復制用戶,并授予該用戶邏輯復制權限和連接業務數據庫權限。

CREATE USER datasyn WITH PASSWORD '111111';
ALTER USER datasyn WITH REPLICATION;
GRANT CONNECT ON DATABASE factordb TO datasyn;

接下來我們創建業務數據庫的 schema 并授予邏輯復制用戶對業務數據庫的數據訪問權限,這需要登錄數據庫,并切換到業務數據庫(database)后執行。切換 database,使用 psql 連接可以按 圖 3-1 操作。

圖 3-1 PostgreSQL 切換 database 操作

創建業務數據 schema,需要當前數據庫是 factordb。

create schema factorsch authorization factoruser;

授予 datasyn 用戶對 業務數據庫 factordb 的 factorsch 的具體使用權限。

GRANT USAGE ON SCHEMA factorsch TO datasyn;
GRANT SELECT ON ALL TABLES IN SCHEMA factorsch TO datasyn;

配置邏輯復制用戶的邏輯復制訪問權限,修改文件 /var/lib/pgsql/12/data/pg_hba.conf。

host replication datasyn 0.0.0.0/0 md5

重啟數據庫,并測試邏輯解碼插件。

--創建復制槽
SELECT * FROM pg_create_logical_replication_slot('pgoutput_demo', 'pgoutput');
--查看相應復制槽是否存在
SELECT * FROM pg_replication_slots;
--刪除復制槽
SELECT * FROM pg_drop_replication_slot('pgoutput_demo');

3.2 安裝 Debezium-PostgreSQL 連接器插件

PostgreSQL 的 Debezium 數據同步插件需要安裝在 Kafka Connect 程序部署文件路徑下。

配置啟動 Debezium-PostgreSQL 連接器,需要以下兩步:

  • 下載 Debezium-PostgreSQL-Connector 插件,將插件解壓并放到 Kafka Connect 的插件路徑下

  • 重新啟動 Kafka Connect 程序,以加載插件

第一步: 下載安裝 Debezium-PostgreSQL 插件

前往官方網站?Debezium,選擇 2.5.1.Final 版本進行下載,程序名為 debezium-connector-postgres-2.5.1.Final-plugin.tar.gz,本教程測試時采用的此版本,也可以使用 2.5 系列的最新版本。如果想使用 Debezium 的最新穩定版本,需要仔細閱讀對應的版本需求,并進行詳細測試。

圖 3-2 Debezium 官網下載鏈接

在 confluent 的安裝路徑下創建插件路徑,在此路徑下解壓 Debezium 的 PostgreSQL 插件包,請確保 kafka 用戶對此路徑具有讀權限,如果下面代碼中的下載鏈接失效,請按 圖 3-2 官網示例位置下載。

sudo mkdir -p /opt/confluent/share/java/plugin
cd /opt/confluent/share/java/plugin
sudo wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.5.1.Final/debezium-connector-postgres-2.5.1.Final-plugin.tar.gz
sudo tar -xvf ./debezium-connector-postgres-2.5.1.Final-plugin.tar.gz
sudo rm ./debezium-connector-postgres-2.5.1.Final-plugin.tar.gz

第二步: 配置 Kafka Connect 加載插件

修改 Kafka Connect 的配置文件,添加插件路徑配置。若已配置插件路徑,則跳過該步驟。

cd /KFDATA/kafka-connect/etc
vim kafka-connect.properties

添加或修改參數 plugin.path 如下。

plugin.path=/opt/confluent/share/java/plugin

重新啟動 Kafka Connect。

sudo systemctl restart kafka-connect

查看 kafka connect 的日志輸出,能查詢到信息則說明插件加載成功。

cat /KFDATA/kafka-connect/logs/connect.log | grep PostgresConnector

圖 3-3 Debezium-PostgreSQL 插件加載成功信息

3.3 配置 PostgreSQL 數據同步連接任務

數據庫基礎配置和 PostgreSQL 同步插件安裝好,我們就可以開始配置 PostgreSQL 的數據同步任務。配置同步任務及檢查的很多命令都要帶上 url 等參數。為了操作快捷,本教程封裝了一些加載配置文件的操作腳本在 kafka-tools.tar 包中,詳情參見附錄。下載當前包,解壓縮到 /KFDATA 目錄下。后續的很多操作,包括檢查 Kafka 的 topic、查看數據和配置同步任務等都會使用 kafka-tools.tar 包中的腳本。包中的腳本在無參數運行時會輸出 help 文檔。

cd /KFDATA
sudo tar -xvf kafka-tools.tar
sudo chown kafka:kafka kafka-tools
rm ./kafka-tools.tar

修改 /KFDATA/kafka-tools/config/config.properties 配置參數。

按照本機的路徑、IP 等對應修改 Kafka 和 Kafka Connect 的啟動 IP 地址,以及安裝目錄。

示例如下:

#kafka parameters
kafka_home=/opt/kafka
confluent_home=/opt/confluent
bootstrap_server=192.168.189.130:9092#kafka-connect parameters
connect_rest_url=192.168.1.178:8083
#rest_pd  means restful request  password,This is not necessary
#rest_pd=appsdba:passwd
schema_ip=192.168.189.130
schema_port=8081

第一步:準備 PostgreSQL 數據庫表

本教程的同步方案支持 DolphinDB 的 TSDB 和 OLAP 兩種存儲引擎的數據同步。其中 TSDB 引擎對于單字段主鍵和多字段復合主鍵有不同的處理方式。所以這里我們創建三張表來展示不同的情況的配置操作。

  • 單主鍵同步到 TSDB 引擎 : factorsch.stock_example。

  • 復合主鍵同步到 TSDB 引擎 : factorsch.index_example_tsdb。

  • 復合主鍵同步到 OLAP 引擎 : factorsch.index_example_olap。

創建業務數據庫表、數據,均使用 factoruser ,登錄 factordb。

創建表 factorsch.stock_example。

create table factorsch.stock_example (id bigint,ts_code varchar(20),symbol_id varchar(20),name varchar(20),area varchar(20),industry varchar(20),list_date date,primary key (id)
);

插入數據。

insert into factorsch.stock_example(id,ts_code,symbol_id,name,area,industry,list_date)
values (1,'000001.SZ','000001','平安銀行','深圳','銀行','1991-04-03'),
(2,'000002.SZ','000002','萬科A','深圳','地產','1991-01-29'),
(3,'000004.SZ','000004','ST國華','深圳','軟件服務','1991-01-14');

創建表 factorsch.index_example_tsdb。

create table factorsch.index_example_tsdb (trade_date date,stock_code varchar(20),effDate timestamp,indexShortName varchar(20),indexCode varchar(20),secShortName varchar(50),exchangeCD varchar(10),weight decimal(26,6),tm_stamp timestamp,flag integer,primary key (trade_date, stock_code, indexCode, flag)
);

插入數據。

insert into factorsch.index_example_tsdb
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'中證500', '000905', '中百集團', 'XSHE', 0.0044, to_date('2018-06-30 05:43:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_tsdb
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 04:47:05', 'YYYY-MM-DD HH24:MI:SS'),
'中證500', '000906', '中百集團', 'XSHE', 0.0011, to_date('2018-06-30 05:48:06', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_tsdb
values(to_date('2006-11-30','YYYY-MM-DD'), '600031', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'上證180', '000010', '三一重工', 'XSHG', 0.0043, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);

創建表 factorsch.index_example_olap。

create table factorsch.index_example_olap (trade_date date,stock_code varchar(20),effDate timestamp,indexShortName varchar(20),indexCode varchar(20),secShortName varchar(50),exchangeCD varchar(10),weight decimal(26,6),tm_stamp timestamp,flag integer,primary key (trade_date, stock_code, indexCode, flag)
);

插入數據。

insert into factorsch.index_example_olap
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'中證500', '000905', '中百集團', 'XSHE', 0.0044, to_date('2018-06-30 05:43:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_olap
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 04:47:05', 'YYYY-MM-DD HH24:MI:SS'),
'中證500', '000906', '中百集團', 'XSHE', 0.0011, to_date('2018-06-30 05:48:06', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_olap
values(to_date('2006-11-30','YYYY-MM-DD'), '600031', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'上證180', '000010', '三一重工', 'XSHG', 0.0043, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);

第二步:配置訂閱發布

使用 PostgreSQL 的 pgoutput 插件進行邏輯復制,需要配置 publication 來定義哪些表的數據變更需要被邏輯復制。配置 publication 在 PostgreSQL 的不同版本支持的腳本寫法不盡相同,本教程以 PostgreSQL 12 版本為例介紹通用的兩種配置方式。

方式一:普通用戶配置(推薦)

factoruser 是普通的業務數據用戶,不具備超級用戶權限。配置發布時需要逐一配置每張表,也可以對單一指定表進行取消配置,當有新增表時需要進行添加。這里我們將上文中創建的三張表全部配置。

創建一個發布,先發布兩張表 factorsch.index_example_tsdb 和 factorsch.stock_example。

CREATE PUBLICATION factordb_publication FOR TABLE factorsch.index_example_tsdb,factorsch.stock_example

查看發布和發布的具體表。

select * from pg_publication
select * from pg_publication_tables

圖 3-4 PostgreSQL 中發布列表查詢

圖 3-5 PostgreSQL 中的發布表明細查詢

對指定 publication ,添加一張表 factorsch.index_example_olap。

ALTER PUBLICATION factordb_publication ADD TABLE factorsch.index_example_olap;

圖 3-6 查看增加的發布表

對指定 publication ,刪除一張表。

ALTER PUBLICATION factordb_publication DROP TABLE factorsch.index_example_olap;

刪除發布。.

drop publication factordb_publication

方式二:超級用戶配置

需要先提升 factoruser 權限為超級用戶,可以使用超級用戶 postgres 操作。

ALTER USER factoruser WITH SUPERUSER;

使用超級用戶 factoruser 創建全表發布。

CREATE PUBLICATION all_tables_publication FOR ALL TABLES;

如 圖 3-7 所示,所有的表都會被發布,增加新表也會直接變成發布表。

圖 3-7 全表訂閱發布表列表查看

對于以上兩種方式,推薦使用方式一,可以精確進行邏輯復制,控制資源使用,控制用戶權限。只有在確實需要整庫同步,且表的數據量眾多的場景可以使用方式二。

第三步:準備連接器配置文件,并啟動連接任務

創建連接 PostgreSQL 的 source 連接器配置文件。

mkdir -p /KFDATA/datasyn-config
vim /KFDATA/datasyn-config/source-postgres.json

錄入以下配置,hostname 和 kafka 啟動地址需對應修改。注意這里的 publication.name 參數要和配置發布名一致,且要提前配置好。

{"name": "postgresTask","config": {"connector.class" : "io.debezium.connector.postgresql.PostgresConnector","snapshot.mode" : "initial","tasks.max" : "1","topic.prefix" : "pg_factorDB","database.hostname" : "192.168.189.130","database.port" : 5432,"database.user" : "datasyn","database.password" : "111111","database.dbname" : "factordb","table.include.list" : "factorsch.index_example_tsdb,factorsch.index_example_olap,factorsch.stock_example","decimal.handling.mode": "string","plugin.name": "pgoutput","publication.name": "factordb_publication","slot.name": "datasyn_slot","heartbeat.interval.ms":"20000"}
}

重要參數說明:

表 3-1 source 連接器重要參數說明

更多詳細參數說明可以參看?Debezium 2.5,不同Debezium版本的參數配置不同,若使用其他版本的Debezium,需找到對應文檔做修改。

第三步: 啟動 PostgreSQL 的數據同步任務

通過 REST API 啟動 PostgreSQL 的 source 連接器。

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.189.130:8083/connectors/ -d @/KFDATA/datasyn-config/source-postgres.json

也可以通過 kafka-tools 中的腳本啟動。

cd /KFDATA/kafka-tools/bin
./rest.sh create /KFDATA/datasyn-config/source-postgres.json

圖 3-8 source 連接器啟動成功信息

第四步:查看 PostgreSQL 數據同步任務狀態

查看同步任務列表。list 參數展示任務名列表,showall 參數會顯示全部同步任務狀態。

cd /KFDATA/kafka-tools/bin
./rest.sh showall

當前只有一個同步任務,如下圖所示。

圖 3-9 查看全部同步任務狀態信息

查看 kafka 中的 topic 列表:

./kafka.sh tplist

圖 3-10 當前 kafka 中的 topic 列表

查看 表 stock_example、index_example_tsdb、index_example_olap 目前進入到 kafka 中的數據條數:

./kafka.sh get_offsets pg_factorDB.factorsch.stock_example
./kafka.sh get_offsets pg_factorDB.factorsch.index_example_tsdb
./kafka.sh get_offsets pg_factorDB.factorsch.index_example_olap

結果如圖所示,當前配置的三張數據數據表的歷史數據都已經寫入了 kafka 中的對應 topic 中。

圖 3-11 對應 topic 中數據條數信息

4. 從Kafka 到 DolphinDB 的數據同步:部署程序與配置任務

4.1 安裝 Kafka-DolphinDB 數據同步連接器插件

配置啟動 Kafka-DolphinDB 連接器,需要以下兩步:

  • 下載 Kafka-DolphinDB-Connector 插件,將插件解壓并放到 Kafka Connect 的插件路徑下。

  • 重新啟動 Kafka Connect 程序,以加載插件。

第一步:下載 Kafka-DolphinDB 插件

  • jdbc-1.30.22.5-CDC.jar :該包為 DolphinDB JDBC 包為數據同步做了一些專門修改,為特殊版本。

  • kafka-connect-jdbc-4.00.jar:是基于kafka-connect-jdbc-10.7.4 開發的 DolphinDB 連接器,后續會進行代碼開源。

創建插件路徑,在此路徑下放置 Kafka-DolphinDB 插件包,將上述兩個 jar 包放在此目錄下。請確保 kafka 用戶包含對這兩個文件的讀權限。(文件包見附件

sudo mkdir -p /opt/confluent/share/java/plugin/kafka-connect-jdbc
sudo cp ~/jdbc-1.30.22.5-CDC.jar /opt/confluent/share/java/plugin/kafka-connect-jdbc/
sudo cp ~/kafka-connect-jdbc-10.7.4-ddb1.04.OLAP.jar /opt/confluent/share/java/plugin/kafka-connect-jdbc/

如果上面的操作碰到權限問題,則可以使用以下命令賦予權限。

sudo chmod o+rx /opt/confluent/share/java/plugin/kafka-connect-jdbc/*

第二步: 重啟 kafka-connect

sudo systemctl restart kafka-connect

查看 kafka-connect 路徑的日志輸出

cat /KFDATA/kafka-connect/logs/connect.log | grep JdbcSinkConnector

如下圖所示,則插件加載成功。

圖 4 -1 Kafka-DolphinDB 插件加載成功信息

4.2 配置 DolphinDB 的數據同步連接任務

第一步:創建同步的 DolphinDB 庫、表

根據 PostgreSQL 表結構,創建與 PostgreSQL 表結構一致的表,PostgreSQL 數據類型轉換為 DolphinDB 數據類型對照表可以參考5.2節。

創建單主鍵表 stock_example 的 DolphinDB 對應表:

//創建 dfs://stock_data 數據庫
if (existsDatabase("dfs://stock_data"))dropDatabase("dfs://stock_data")
dbName = "dfs://stock_data"
db=database(directory=dbName, partitionType=HASH, partitionScheme=[LONG, 3], engine="TSDB", atomic="CHUNK")
//創建 stock_example 表
tbName = "stock_example"
colNames = `id`ts_code`symbol_id`name`area`industry`list_date`dummySortKey__
colTypes = `LONG`SYMBOL`SYMBOL`SYMBOL`SYMBOL`SYMBOL`DATE`INT
t = table(1:0, colNames, colTypes)
db.createPartitionedTable(t, tbName, partitionColumns=`id, sortColumns=`id`dummySortKey__, keepDuplicates=LAST, sortKeyMappingFunction=[hashBucket{,100}], softDelete=true)

創建復合主鍵表 index_example_olap 的 DolphinDB 對應表:

//創建 dfs://index_data_olap 數據庫
if (existsDatabase("dfs://index_data_olap"))dropDatabase("dfs://index_data_olap")
dbName = "dfs://index_data_olap"
db = database(directory=dbName, partitionType=RANGE, partitionScheme=1990.01M+(0..80)*12,  atomic="CHUNK")
//創建 olap 引擎數據庫下的 index_example
tbName = "index_example"
colNames = `trade_date`stock_code`effDate`indexShortName`indexCode`secShortName`exchangeCD`weight`tm_stamp`flag
colTypes = `DATE`SYMBOL`TIMESTAMP`SYMBOL`SYMBOL`SYMBOL`SYMBOL`DOUBLE`TIMESTAMP`INT
t = table(1:0, colNames, colTypes)
db.createPartitionedTable(t, tbName, partitionColumns=`trade_date)

創建復合主鍵表 index_example_tsdb 的 DolphinDB 對應表:

//創建 dfs://index_data_tsdb 數據庫
if (existsDatabase("dfs://index_data_tsdb"))dropDatabase("dfs://index_data_tsdb")
dbName = "dfs://index_data_tsdb"
db = database(directory=dbName, partitionType=RANGE, partitionScheme=1990.01M+(0..80)*12, engine="TSDB", atomic="CHUNK")
//創建 tsdb 引擎數據庫下的 index_example
tbName = "index_example"
colNames = `trade_date`stock_code`effDate`indexShortName`indexCode`secShortName`exchangeCD`weight`tm_stamp`flag
colTypes = `DATE`SYMBOL`TIMESTAMP`SYMBOL`SYMBOL`SYMBOL`SYMBOL`DOUBLE`TIMESTAMP`INT
t = table(1:0, colNames, colTypes)
db.createPartitionedTable(t, tbName, partitionColumns=`trade_date, sortColumns=`stock_code`indexCode`flag`trade_date, keepDuplicates=LAST, sortKeyMappingFunction=[hashBucket{,10},hashBucket{,10},hashBucket{,1}], softDelete=true)

注:建表時的軟刪除功能,即 softDelete 選項需要 DolphinDB 2.00.11 及以上的版本。舊版本 DolphinDB 建表時可以去除該選項。

第二步: 配置同步配置表

在DolphinDB 中創建一張配置表,記錄 kafka topic 和 DolphinDB 庫表之間的映射關系。配置表的庫表名可以自行調整,并在 DolphinDB 的同步任務中設置相應的庫表名稱。配置表中字段名是固定的,需和示例保持一致。

數據庫名:dfs://ddb_sync_config

表名:sync_config

db = database("dfs://ddb_sync_config", HASH, [SYMBOL, 2])
t = table(1:0, `connector_name`topic_name`target_db`target_tab`add_sortcol_flag`primary_key,
[SYMBOL, SYMBOL, SYMBOL, SYMBOL, SYMBOL, SYMBOL])
db.createTable(t, "sync_config")

kafka topic 名可以通過之前介紹的 ./kafka.sh tplist 的命令查看,當前配置的三張表對應的 topic 如下:

  • 單主鍵同步到 TSDB 引擎 : stock_example -> pg_factorDB.factorsch.stock_example。

  • 復合主鍵同步到 TSDB 引擎 : index_example_tsdb->pg_factorDB.factorsch.index_example_tsdb

  • 復合主鍵同步到 OLAP 引擎 : index_example_olap ->pg_factorDB.factorsch.index_example_olap。

插入配置信息表,將 kafka 中的 topic 和 DolphinDB 庫表名稱一一對應。

def addSyncConfig(connector_name, topic_name, dbname, tbname, add_sortcol_flag="0",primary_key=NULL) {loadTable("dfs://ddb_sync_config", "sync_config").append!(table([connector_name] as col1, [topic_name] as col2, [dbname] as col3,[tbname] as col4, [add_sortcol_flag] as col5,[primary_key] as col6))
}addSyncConfig("ddb-sink-postgres", "pg_factorDB.factorsch.stock_example", "dfs://stock_data", "stock_example", "1", "")
addSyncConfig("ddb-sink-postgres", "pg_factorDB.factorsch.index_example_tsdb", "dfs://index_data_tsdb", "index_example", "0", "")
addSyncConfig("ddb-sink-postgres", "pg_factorDB.factorsch.index_example_olap", "dfs://index_data_olap", "index_example", "0", "stock_code,indexCode,flag,trade_date")

以下是配置表的各個字段說明:

表 4-1 同步配置表字段說明

第三步: 準備連接器配置文件,并啟動連接任務

創建 DolphinDB 數據同步任務配置文件。

cd /KFDATA/datasyn-config
vim ddb-sink-postgres.json

配置如下

{"name": "ddb-sink-postgres","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "1","topics": "pg_factorDB.factorsch.stock_example,pg_factorDB.factorsch.index_example_tsdb,pg_factorDB.factorsch.index_example_olap","connection.url": "jdbc:dolphindb://192.168.189.130:8848?user=admin&password=123456","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": "false","auto.evolve": "false","insert.mode": "upsert","delete.enabled": "true","batch.size":"10000","pk.mode": "record_key","ddbsync.config.table":"dfs://ddb_sync_config,sync_config","ddbsync.addSortColFlag":"true","ddbsync.config.engineTypes" :"TSDB,OLAP"}
}-

表 4-2 sink 連接器重要參數說明

參數說明:以上參數項為同步 DolphinDB 所需參數。如果對 Confluent 的JDBC Sink Connect 有經驗可適當調節。

通過 REST API 啟動source連接器:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://183.134.101.144:8083/connectors/ -d @ddb-sink-postgres.json

也可以通過 kafka-tools 中的腳本啟動:

cd /KFDATA/kafka-tools/bin
./rest.sh create /KFDATA/datasyn-config/ddb-sink-postgres.json

查看同步任務狀態, ddb-sink-postgres 是 DolphinDB 的數據同步任務,可以看到現在我們有兩個同步任務,這樣構成了從 PostgreSQL 到 DolphinDB 的數據同步鏈。

./rest.sh showall

圖 4-2 同步任務狀態信息

第四步: 查看表初始數據同步進度

在設置 PostgreSQL 同步任務時,將 snapshot.mode 選項值設置為 ”initial” ,該選項意味著 PostgreSQL 會同步表的初始數據到 Kafka 中,設置完下游的 DolphinDB 任務后,可以檢查初始數據的同步情況。

通過 kafka.sh 腳本查看消費者列表:

圖 4-3 Kafka 消費者列表信息

查看 DolphinDB 同步任務對應的 Kafka 消費組中的每一個 consumer 的消費進度,通過此命令可以查看同步程序中每一張的表同步進度。 Lag 為 0 則表示 Kafka 中 topic 當前沒有未消費的數據,即 Kafka 中的數據與對應表的數據是一致的。

./kafka.sh cm_detail connect-ddb-sink-postgres

圖 4-4 connect-ddb-sink 中每張表同步進度

如 圖 4-4 顯示,數據已被 DolphinDB 同步任務消費完畢,此時在 DolphinDB 的 web 界面查看表中數據,表數據和 PostgreSQL 表中數據是一致的。

圖 4-5 DolphinDB中 index_example 表數據

圖 4-6 DolphinDB 中 stock_example表數據

4.3 實時同步驗證

第一步:插入數據

向 PostgreSQL 中 factorsch.index_example_tsdb 表中插入兩條新數據:

insert into factorsch.index_example_tsdb
values (to_date('2006-11-30', 'YYYY-MM-DD'), '600054', 
to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH:MI:SS'), '上證180', '000010', '三一重工', 
'XXXB', 0.0043, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_tsdb
values (to_date('2006-11-30', 'YYYY-MM-DD'), '600055
', 
to_date('2018-06-30 06:48:02', 'YYYY-MM-DD HH:MI:SS'), '滬深300', '000300', '三一重工', 
'XSHG', 0.0029, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);

查看 DolphinDB 對應的表數據:

select * from loadTable("dfs://index_data_tsdb","index_example")

可以看到新數據已寫入:

圖4-7 數據寫入成功

第二步:更新數據

更新 PostgreSQL 中 factorsch.index_example_olap 表的 tmp_stamp 和 secshortname 的值。

update factorsch.index_example_olap 
set tm_stamp = to_date('2025-02-28 16:00:00', 'YYYY-MM-DD HH24:MI:SS'),
secshortname ='測試修改名稱'
where stock_code='000759'

查看 DolphinDB 中數據,數據已被修改:

select * from loadTable("dfs://index_data_olap","index_example")

圖4-8 數據更新成功

第三步:刪除數據

在 PostgreSQL 中的 factorsch.stock_example 刪除一條數據:

delete from factorsch.stock_example where ts_code='000004.SZ'

再查看 DolphinDB 中數據,數據已被刪除:

select * from loadTable("dfs://index_data_tsdb","index_example")

圖4-9 數據刪除成功

5. 部署注意事項

5.1 實時同步須知

DolphinDB 是一款支持海量數據的分布式時序數據庫。針對不同的數據處理需求,在底層架構上天然上與通常的關系型數據庫不同,所以需要有以下限制:

  • DolphinDB 的表沒有主鍵設計,若使用 TSDB 引擎,需將主鍵設置為 sortColumn 字段,并設置 keepDuplicates=LAST 來進行去重,以確保數據唯一性。TSDB 引擎的 sortColumn 是分區內去重,如果使用的是分區表,需要至少將其中一個主鍵列設置為分區列。若使用 OLAP 引擎,需在同步配置表中設置目標庫的主鍵。

  • PostgreSQL 表的主鍵可能不滿足 TSDB 引擎的 sortColumn 設置規則,有以下三種情況:

?1. PostgreSQL 表中有兩個及以上的主鍵,其中一個主鍵為整數類型或時間類型,但末尾列不是整數類型或時間類型:

  • 該情況需要調整 sortColumn 設置的順序,將整數類型或時間類型的主鍵移動到末尾。

2. PostgreSQL 表中只有一個主鍵,或者 PostgreSQL 表中的主鍵的數據類型均不包含整數類型或時間類型:

  • 該情況需要建表時在末尾補充一個 dummySortKey__ 列,值均設置為0,對應同步程序的配置表中需要將 add_sortcol_flag 列的值設置為“1”,并將 DolphinDB 同步連接任務中“ddbsync.addSortColFlag” 設置為 “true”。若使用 DataX 等工具進行初始全表同步,則需要做數據轉換,提供該字段對應初始值。

3.?PostgreSQL 表中的主鍵類型包含 DolphinDB 不支持的類型。

  • DolphinDB TSDB 引擎的 sortColumns 支持整數、日期或時間、字符串類型,暫時不支持小數類型,但在后續的版本里可能提供支持,請關注版本更新。

DDL 語句相關:

  • 當前不支持 DDL 語句同步。

  • 若表結構發生更改,需進行單表修復,具體操作后續會在實時同步的運維手冊文檔中給出。

其他:

  • 表字段命名時,請盡量規避一些簡單的名字,比如 code, timestamp 等,這種命名與 DolphinDB 內關鍵字重復,可能會導致無法正確同步。

5.2 PostgreSQL-DolphinDB 數據類型對應表

以下的類型對應表為推薦設置的 DolphinDB 類型,注意兩者數據類型表示的精度范圍,確保 DolphinDB 數據類型的精度可以覆蓋原 PostgreSQL 類型。

表 5-1 PostgreSQL-DolphinDB 數據類型對應表

在浮點數數據處理上,PostgreSQL 的 DECIMAL 類型是精確值,DolphinDB 的 DOUBLE 類型的精度為15-16位有效數字,如果轉換成 DolphinDB 的 DOUBLE 類型,會存在浮點數精度丟失問題。因此推薦用戶轉換成 DolphinDB 的 DECIMAL 類型,確保浮點數精度。

在時間類型轉換上,請參照表中的類型映射,以保證 DolphinDB 中的時間類型字段在精度上可以覆蓋 PostgreSQL 中時間類型字段的精度。對于帶有時區信息的時間類型,DolphinDB 統一以 UTC 時區存儲。

6. 同步性能測試

6.1 性能測試配置

建表語句

PostgreSQL 建表,并生成測試數據代碼:

DROP TABLE if EXISTS factorsch.performance_test1;
CREATE TABLE factorsch.performance_test1 (dt date,id varchar(20),str1 char(10),val DECIMAL,qty varchar(20),tm TIMESTAMP
);
-- 生成100w行數據,每天1000行
INSERT INTO factorsch.performance_test1 (dt,id,str1,val,qty,tm)
SELECT ('2020-01-01'::date + (gs - 1) / 1000), ((gs-1)%1000+1),'aa', 1.234, 1000, '2024-01-01 15:00:00'::timestamp
FROM generate_series(1, 1000000) gs;ALTER TABLE factorsch.performance_test1
ADD CONSTRAINT pk_performance_test1 PRIMARY KEY (id, dt);DROP TABLE if EXISTS factorsch.performance_test2;
CREATE TABLE debezium.performance_test2 (dt date,id varchar(20),str1 char(10),val DECIMAL,qty varchar(20),tm TIMESTAMP
);
-- 生成1億行數據,每天100000行
INSERT INTO factorsch.performance_test2 (dt,id,str1,val,qty,tm)
SELECT ('2020-01-01'::date + (gs - 1) / 100000), ((gs-1)%100000+1),'aa', 1.234, 1000, '2024-01-01 15:00:00'::timestamp
FROM generate_series(1, 100000000) gs;ALTER TABLE factorsch.performance_test2
ADD CONSTRAINT pk_performance_test2 PRIMARY KEY (id, dt);grant select on factorsch.performance_test1 to datasyn;
grant select on factorsch.performance_test2 to datasyn;

DolphinDB 建表代碼:

dbName = "dfs://performance_test1"
tbName = "performance_test1"
colNames = `dt`id`str1`val`qty`tm
colTypes = `DATE`SYMBOL`SYMBOL`DOUBLE`LONG`TIMESTAMP
t = table(1:0, colNames, colTypes)
pkColumns = `id`dt
db = database(dbName, HASH, [SYMBOL, 2], , 'TSDB', 'CHUNK')
db.createTable(t, tbName, sortColumns=pkColumns, keepDuplicates=LAST, softDelete=true)dbName = "dfs://performance_test2"
tbName = "performance_test2"
colNames = `dt`id`str1`val`qty`tm
colTypes = `DATE`SYMBOL`SYMBOL`DOUBLE`LONG`TIMESTAMP
t = table(1:0, colNames, colTypes)
pkColumns = `id`dt
partitionCols = `dt`id
db1 = database(, RANGE, date(datetimeAdd(1990.01M, 0..100*12, 'M')))
db2 = database(, HASH, [SYMBOL, 50])
db = database(dbName, COMPO, [db1, db2], , `TSDB, `CHUNK)
db.createPartitionedTable(t, tbName, partitionColumns=partitionCols, sortColumns=pkColumns, keepDuplicates=LAST, softDelete=true)

6.2 性能測試結果

性能測試結果如下表所示,其中總耗時等于 DolphinDB 更新完成時間減去 PostgreSQL 更新完成時間,因此總耗時包含了以下數據同步的完整鏈路:

  • Debezium 挖掘 PostgreSQL 日志到 Kafka

  • Kafka 推送數據給相應 topic 的消費者

  • 下游的 DolphinDB Connector 消費 Kafka 中數據,解析為相應的 DolphinDB 更新語句,并執行寫入 DolphinDB 完成

Kafka 每次推送的變更數據在 3000-4000 條,具體條數和 Kafka 的日志大小配置相關。對于 insert 和 update 類型的操作,DolphinDB 的處理效率很高。對于 delete 類型操作,由于 delete 操作涉及數據查找, DolphinDB 的處理效率和具體表的數據行數、分區方式相關。

表 6-1 使用 pgoutput 插件進行數據同步性能測試結果

7. 常見問題解答(FAQ)

7.1 創建同步任務時報錯

(1)json 文件格式錯誤

圖 7-1 json文件格式錯誤報錯信息

造成上述問題的原因可能是多了逗號、少了逗號或者括號不正確,需要檢查并修訂 json 文件。

(2)Failed to find any class that implements Connector

圖 7-2 PostgreSQL 數據庫無法正常連接報錯信息

該報錯提示意味著 PostgreSQL 數據庫無法正常連接。造成上述問題的可能原因:

  • 未將 debezium-connector-postgres-2.5.1.Final.jar 包放置到插件目錄下

  • Kafka 用戶對 debezium-connector-postgres-2.5.1.Final.jar 文件沒有讀權限

(3)Can’t find JdbcSinkConnector

查看日志提示沒有 JdbcSinkConnector 包的加載。JdbcSinkConnector 是包含在 kafka-connect-jdbc-4.00.jar 包內,需要確認該 jar 包是否放置在 kafka connect 的插件路徑下,確認 kafka 對該文件的讀權限。再通過 java --version 查看 Java 版本是否是17,Java 版本較低時,可能無法正確加載插件。目前已知使用 Java 8 時無法正確加載該插件。

7.2 數據未同步或者未正確同步

當數據未同步或者未正確同步時,請先按以下兩步進行檢查。然后對照后面的提供的錯誤列表進行參考調整。

step1 查看同步任務狀態

先查看同步任務是否報錯:

cd /KFDATA/kafka-tools/bin
./rest.sh showall

再看 kafka connect 的日志中是否出現 ERROR:

cd /KFDATA/kafka-connect/logs
cat connect.log | grep ERROR

如果有出現 ERROR,看 ERROR 顯示的日志是 PostgreSQL 報錯還是 ddb-sink 報錯,查看具體的報錯信息。如果同步任務未報錯,也沒有 ERROR,再通過以下方式排查。

step2 查看 PostgreSQL 數據是否同步到 Kafka

查看 Kafka 所有的 topic:

cd /KFDATA/kafka-tools/bin
./kafka.sh tplist

?圖 7-3 kafka 中 topic 信息

在查看該 topic 對應的數據條數:

./kafka.sh get_offsets postgres_service.debezium.index_example
  • 一張表出現兩個 topic 名字

這說明 PostgreSQL source 任務的 topic.prefix 或 DolphinDB sink 任務的 topics 配置項拼寫有誤,請檢查這兩項。DolphinDB sink 任務的 topics 必須為 {topic.prefix}.{schema}.{tablename} 的格式。創建 sink 任務時,如果 topic 不存在,則會自動創建 topic,因此拼寫錯誤會導致出現兩個 topic 。

  • 沒有表對應的 topic / 有對應的 topic,但數據條數為0

這說明 PostgreSQL 數據未正常同步到 Kafka 中,請在同步任務的 table.include.list 中檢查 PostgreSQL 表名的拼寫,或者檢查用戶是否擁有 REPLICATION 和 LOGIN 權限和對對應表的 SELECT 權限。

  • 有對應的 topic,有數據條數,但 DolphinDB 未同步

檢查 DolphinDB Sink 任務中 topics 配置項中的拼寫,檢查同步任務配置表中是否有相同的條數。

查看 Kafka 中數據是否與 PostgreSQL 變更數據一致:

./tpconsumer.sh --op=2 --topic=postgres_service.debezium.index_example --offset=0 --max-messages=20

在顯示的結果中,初始數據同步的消息數據 op = r,新插入數據 op = c,更新數據 op = u

7.3 同步任務運行報錯

  • Java.lang.OutOfMemoryError

Kafka Connect 的默認內存為 1 GB,當數據更新量較大時會出現 JVM 內存不足,需要調整 JVM 大小。根據之前配置的安裝路徑,修改 kafka connect 的配置文件:

vim /KFDATA/kafka-connect/etc/kafka-connect.env

在末尾加入 JVM 選項,內存大小根據實際需要調整:

KAFKA_HEAP_OPTS="-Xms10G -Xmx10G"
  • permission denied for database postgres at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection

使用 pgoutput 插件同步時未預先創建發布,會報此錯誤。

CREATE PUBLICATION dbz_publication FOR TABLE debezium.index_example,debezium.stock_example;
  • Creation of replication slot failed; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each

每個 Debezium 連接器需要使用唯一的復制槽名稱。如果多個連接器使用相同的復制槽名稱,就會導致沖突。可以為每個連接器使用不同的復制槽名稱或者刪除原先復制槽。

SELECT * FROM pg_drop_replication_slot('your_slot_name');

8. 附錄

  • DolphinDB 的 Kafka-Connect 插件包:kafka-connect-jdbc-4.00.jar
  • DolphinDB 的 JDBC 包:jdbc-1.30.22.5-CDC.jar
  • 運維腳本包 kafka-tools:kafka-tools.tar

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/913023.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/913023.shtml
英文地址,請注明出處:http://en.pswp.cn/news/913023.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

關于聯詠(Novatek )白平衡色溫坐標系探究

目錄 一、疑問 二、結論 三、分析 四、釋疑 五、仿真模擬 一、疑問 為什么Novatek的白平衡色溫坐標系是這個樣子的呢?各條直線和曲線分別代表什么含義呢?色溫坐標系中所標定的參數代表什么含義呢?如何標定新增一些特殊的光源呢?二、結論

Protein FID:AI蛋白質結構生成模型評估新指標

一、引言:蛋白質生成模型面臨的評估挑戰 近年來,AI驅動的蛋白質結構生成模型取得了令人矚目的進展,但如何有效評估這些模型的質量卻一直是一個懸而未決的問題。雖然實驗驗證仍然是金標準,但計算機模擬評估對于快速開發和比較機器…

Vim 高效編輯指南:從基礎操作到塊編輯的進階之路

文章目錄?? 一、基礎編輯命令(生存必備)? 二、進階操作:可視化塊模式 (Ctrl+v)典型應用場景?? 三、效率提升技巧?? 四、配置建議(~/.vimrc)結語作為開發者最強大的文本編輯器之一,Vim 的高效操作離不開其命令模式(Normal Mode)。本文將系統性地介紹 Vim 的核心編…

docker學習第一天框架學習以及在redhat7.9安裝操作

一.docker是什么。 Docker 是一個開源的容器化平臺,通過將應用程序及其依賴項(如代碼、運行時環境、系統工具等)打包到輕量級、可移植的容器中,實現「一次構建,處處運行」的現代化開發模式。它利用了 Linux 內核特性來…

QT控件 使用Font Awesome開源圖標庫修改QWidget和QML兩種界面框架的控件圖標

又一個月快要結束了,在這里總結下分別在QWidget和QML兩種界面設計模式中應用Font Awesome開源圖標庫,修改界面的顯示圖標效果, AriaNg是aria2的可視化web界面工具,其中的圖標大都是Font AWesome中的字體圖標,某位曾經嘗試將AriaNg…

Qt Quick 與 QML(四)qml中的Delegate系列委托組件

一、概念 在QML中,Delegate是一種非常重要的組件,特別是在使用ListView、GridView、PathView等視圖組件時。Delegate用于定義每個列表或網格中的項目是如何展示的。通過自定義Delegate,你可以控制每個項目的外觀和行為。 Delegate通常是一個…

android圖片優化

在 Android 中加載大圖時,如果不進行優化處理,很容易導致內存溢出(OOM)和應用卡頓。以下是幾種高效處理大圖加載的方法和最佳實踐: 1. 使用圖片加載庫(推薦) 成熟的第三方庫已經處理了內存管理…

【機器人】復現 DOV-SG 機器人導航 | 動態開放詞匯 | 3D 場景圖

DOV-SG 建了動態 3D 場景圖,并使用LLM大型語言模型進行任務分解,從而能夠在交互式探索過程中對 3D 場景圖進行局部更新。 來自RA-L 2025,適合長時間的 語言引導移動操作,動態開放詞匯 3D 場景圖。 論文地址:Dynamic …

mongodb 中dbs 時,local代表的是什么

在 MongoDB 中,local 是一個內置的系統數據庫,用于存儲當前 MongoDB 實例(或副本集節點)的元數據和內部數據,與其他數據庫不同,local 數據庫的數據不會被復制到副本集的其他成員。 local 數據庫的核心作用 …

Spring Cloud(微服務部署與監控)

📌 摘要 在微服務架構中,隨著服務數量的增長和部署復雜度的提升,如何高效部署、持續監控、快速定位問題并實現自動化運維成為保障系統穩定性的關鍵。 本文將圍繞 Spring Cloud 微服務的部署與監控 展開,深入講解: 微…

音頻動態壓縮算法曲線實現

Juce實現動態壓縮曲線繪制 動態范圍壓縮算法(Dynamic Range Compression,DRC)是將音頻信號的動態范圍映射到一個較小的范圍內的過程,即降低較高的峰值的信號電平,而不處理較安靜的部分。DRC被廣泛用于音頻錄制、制作工…

技術視界 | OpenLoong 控制框架:打造通用人形機器人智能系統的中樞基座

在人形機器人向通用性、智能化方向加速演進的當下,控制系統的角色正在發生根本變化:它不再只是底層驅動的接口適配層,也不只是策略調用的轉譯引擎,而是成為連接具身模型、異構本體與多樣化任務的“中樞神經系統”。 在 2025 年張…

IOS 藍牙連接

最近做一個硬件設備,寫IOS相應的數據連接/分析代碼時;發現一個問題,如果是開機,每次都能連接上。連接斷開后,發現再也掃描不到了。通過第三方工具LightBlue,發現信號是-127。 此時進入設置查看藍牙設備&am…

【硬核數學 · LLM篇】3.1 Transformer之心:自注意力機制的線性代數解構《從零構建機器學習、深度學習到LLM的數學認知》

我們已經完成了對機器學習和深度學習核心數學理論的全面探索。我們從第一階段的經典機器學習理論,走到了第二階段的深度學習“黑盒”內部,用線性代數、微積分、概率論、優化理論等一系列數學工具,將神經網絡的每一個部件都拆解得淋漓盡致。 …

flutter封裝vlcplayer的控制器

import dart:async;import package:flutter_vlc_player/flutter_vlc_player.dart; import package:flutter/material.dart;class GlobalVlcController extends ChangeNotifier {//設置單例/*static final GlobalVlcController _instance GlobalVlcController._internal();fact…

SEO-濫用元機器人、規范或 hreflang 標簽

&#x1f9f1; 一、濫用 Meta Robots 標簽 ? 常見問題&#xff1a; 問題描述設置了 noindex 不該屏蔽的頁面比如產品頁、分類頁被意外 noindex&#xff0c;導致不被收錄設置 nofollow 導致內鏈失效所有鏈接都被 nofollow&#xff0c;影響爬蟲抓取路徑在 <meta> 標簽和…

笨方法學python -練習14

程序&#xff1a; from sys import argv script, user_name argv prompt > print(f"Hi {user_name}, Im the {script} script.") print("Id like to ask you a few questions.") print(f"Do you like me {user_name}?") likes in…

Frida:配置自動補全 in VSCode

1. 前言 編寫 frida JavaScript 腳本是一件 very 普遍的事情在 Android Reverse 中。為了方便編寫&#xff0c;配置相關的環境使其能夠自動補全是很關鍵的&#xff0c;即通過類名就能夠獲取該類的所有對外接口信息&#xff0c;這是面向對象編程的核心優勢&#xff0c;可惜我沒…

FPGA矩陣算法實現

簡介 現如今設計上對速度的要求越來越高&#xff0c;而矩陣相乘含有大量的乘法和加法計算&#xff0c;造成計算時間長從而影響性能&#xff0c;本章節利用FPGA實現浮點型矩陣運算&#xff0c;可在極短時間內完成矩陣運算。 知識介紹 矩陣計算公式如下&#xff1a; 需要保證A的…

C#可空類型詳解:從基礎到高級應用

C#可空類型詳解&#xff1a;從基礎到高級應用 在C#編程中&#xff0c;可空類型是一個非常重要的概念&#xff0c;它允許我們為值類型&#xff08;如int、bool、DateTime等&#xff09;分配null值&#xff0c;從而增強了代碼的表達能力和靈活性。本文將詳細介紹C#中可空類型的各…