一、OceanBase 數據庫核心配置
1. 環境準備與版本要求
- 版本要求:OceanBase CE 4.0+ 或 OceanBase EE 2.2+
- 組件依賴:需部署 LogProxy 服務(社區版/企業版部署方式不同)
- 兼容模式:支持 MySQL 模式(默認)和 Oracle 模式
2. 創建用戶與權限配置
在 sys 租戶創建管理用戶(社區版示例):
-- 連接 sys 租戶(默認端口 2881)
mysql -h127.0.0.1 -P2881 -uroot@sys -p-- 創建用戶(替換為實際用戶名密碼)
CREATE USER 'ob_cdc_user' IDENTIFIED BY 'Ob@123456';
GRANT ALL PRIVILEGES ON *.* TO 'ob_cdc_user' WITH GRANT OPTION;
FLUSH PRIVILEGES;
在業務租戶創建 CDC 用戶:
-- 切換到業務租戶(如 test_tenant)
USE test_tenant;-- 創建 CDC 數據讀取用戶
CREATE USER 'flink_user' IDENTIFIED BY 'Flink@123';
GRANT SELECT ON test_db.* TO 'flink_user';
FLUSH PRIVILEGES;
3. 獲取關鍵配置信息
社區版獲取 rootserver-list:
-- 連接業務租戶
mysql -h127.0.0.1 -P2881 -uflink_user -p-- 查詢 rootserver 列表(格式:ip:rpc_port:sql_port)
SHOW PARAMETERS LIKE 'rootservice_list';
-- 示例輸出:rootservice_list | 127.0.0.1:2882:2881
企業版獲取 config-url:
SHOW PARAMETERS LIKE 'obconfig_url';
-- 示例輸出:obconfig_url | http://127.0.0.1:8080/services?Action=ObRootServiceInfo&...
4. 部署 LogProxy 服務(社區版快速啟動)
# 下載 LogProxy 二進制(社區版)
wget https://github.com/oceanbase/oblogproxy/releases/download/v2.2.7/oblogproxy-2.2.7.tar.gz
tar -zxvf oblogproxy-2.2.7.tar.gz# 編輯配置文件 oblogproxy.conf
vi oblogproxy/oblogproxy.conf
# 添加以下配置(根據實際情況修改):
[common]
rootservice_list = "127.0.0.1:2882:2881"
logproxy_port = 2983
working_mode = "memory"# 啟動 LogProxy
cd oblogproxy
./oblogproxy -c oblogproxy.conf
二、Flink 環境集成配置
1. 添加Maven依賴
<!-- OceanBase CDC 連接器 -->
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-oceanbase-cdc</artifactId><version>3.0.1</version><scope>provided</scope>
</dependency><!-- 企業版需添加OceanBase JDBC驅動 -->
<dependency><groupId>com.oceanbase</groupId><artifactId>oceanbase-client</artifactId><version>2.4.2</version>
</dependency>
2. SQL Client部署
- 下載 CDC 連接器 JAR:
flink-sql-connector-oceanbase-cdc-3.0.1.jar - 企業版需額外下載 OceanBase JDBC 驅動:
oceanbase-client-2.4.2.jar - 將 JAR 包放入
$FLINK_HOME/lib/
后重啟 Flink 集群。
三、Flink SQL 表定義與參數詳解
1. MySQL 模式建表示例(含元數據)
-- 配置checkpoint
SET 'execution.checkpointing.interval' = '5s';-- 創建OceanBase CDC表(MySQL模式)
CREATE TABLE ob_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),-- 元數據列tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,db_name STRING METADATA FROM 'database_name' VIRTUAL,table_name STRING METADATA FROM 'table_name' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'oceanbase-cdc','scan.startup.mode' = 'initial','username' = 'flink_user@test_tenant#ob_cluster','password' = 'Flink@123','tenant-name' = 'test_tenant','database-name' = 'test_db','table-name' = 'orders','hostname' = '127.0.0.1','port' = '2881','rootserver-list' = '127.0.0.1:2882:2881', -- 社區版必填'logproxy.host' = '127.0.0.1','logproxy.port' = '2983','working-mode' = 'memory'
);
2. Oracle 模式建表示例
CREATE TABLE ob_orders_oracle (order_id INT,order_date TIMESTAMP(0),customer_name STRING,-- 元數據列tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL
) WITH ('connector' = 'oceanbase-cdc','scan.startup.mode' = 'initial','username' = 'flink_user@test_tenant#ob_cluster','password' = 'Flink@123','tenant-name' = 'test_tenant','database-name' = 'test_db','table-name' = 'orders','hostname' = '127.0.0.1','port' = '2881','compatible-mode' = 'oracle', -- 關鍵:設置Oracle兼容模式'jdbc.driver' = 'com.oceanbase.jdbc.Driver', -- 企業版JDBC驅動'config-url' = 'http://127.0.0.1:8080/...', -- 企業版必填'logproxy.host' = '127.0.0.1','logproxy.port' = '2983'
);
3. 核心參數詳解
參數名 | 必選 | 默認值 | 類型 | 說明 |
---|---|---|---|---|
connector | 是 | 無 | String | 固定為oceanbase-cdc |
scan.startup.mode | 是 | 無 | String | 啟動模式:initial (快照+日志)、latest-offset (僅最新)、timestamp (指定時間) |
tenant-name | 是 | 無 | String | 目標租戶名稱(如test_tenant ) |
logproxy.host | 是 | 無 | String | LogProxy 服務IP |
logproxy.port | 是 | 無 | Integer | LogProxy 服務端口(默認2983) |
rootserver-list | 社區版是 | 無 | String | 社區版rootserver列表(格式ip:rpc_port:sql_port ) |
config-url | 企業版是 | 無 | String | 企業版配置服務URL |
compatible-mode | 否 | mysql | String | 兼容模式:mysql (默認)、oracle |
jdbc.driver | 企業版是 | com.mysql.jdbc.Driver | String | 企業版JDBC驅動類(com.oceanbase.jdbc.Driver ) |
四、環境驗證與測試
1. 準備測試數據(OceanBase MySQL模式)
-- 連接業務租戶
mysql -h127.0.0.1 -P2881 -uflink_user -p test_db-- 創建測試表
CREATE TABLE orders (order_id INT PRIMARY KEY,order_date TIMESTAMP,customer_name VARCHAR(100),price DECIMAL(10, 2)
);-- 插入數據
INSERT INTO orders VALUES
(1, '2023-01-01 10:00:00', 'Alice', 100.50),
(2, '2023-01-02 11:00:00', 'Bob', 200.75);
COMMIT;
2. Flink SQL 驗證
-- 查詢OceanBase CDC表(首次觸發快照)
SELECT * FROM ob_orders;-- 在OceanBase中更新數據
UPDATE orders SET price = 150.00 WHERE order_id = 1;
COMMIT;-- 觀察Flink輸出:應顯示變更記錄,op_ts為變更時間
3. DataStream API 驗證(社區版)
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.connectors.oceanbase.OceanBaseSource;
import org.apache.flink.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.VarcharType;public class OceanBaseSourceExample {public static void main(String[] args) throws Exception {// 定義表結構RowType physicalType = RowType.of(RowType.Field.of("order_id", BigIntType.INSTANCE),RowType.Field.of("customer_name", VarcharType.of(100)));InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(physicalType);// 配置OceanBase SourceOceanBaseSource<RowData> source = OceanBaseSource.<RowData>builder().rsList("127.0.0.1:2882:2881") // 社區版rootserver-list.startupMode(StartupMode.INITIAL).username("flink_user@test_tenant#ob_cluster").password("Flink@123").tenantName("test_tenant").databaseName("test_db").tableName("orders").hostname("127.0.0.1").port(2881).logProxyHost("127.0.0.1").logProxyPort(2983).deserializer(RowDataOceanBaseDeserializationSchema.newBuilder().setPhysicalRowType(physicalType).setResultTypeInfo(typeInfo).build()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.fromSource(source, null, "OceanBase CDC Source").print();env.execute("OceanBase CDC Test");}
}
五、常見問題與解決方案
-
LogProxy連接失敗
ERROR: Failed to connect to LogProxy at 127.0.0.1:2983
- 解決方案:
- 確認LogProxy服務已啟動且端口正確(
netstat -an | grep 2983
) - 檢查
logproxy.host
和logproxy.port
配置是否與LogProxy一致
- 確認LogProxy服務已啟動且端口正確(
- 解決方案:
-
權限不足(社區版)
ERROR: Access denied for user 'flink_user'@'127.0.0.1'
- 解決方案:
- 確認用戶在業務租戶有
SELECT
權限 - 檢查用戶名格式是否正確(
user@tenant#cluster
)
- 確認用戶在業務租戶有
- 解決方案:
-
企業版Oracle模式配置錯誤
ERROR: incompatible-mode must be set for Oracle mode
- 解決方案:
- 顯式設置
compatible-mode = 'oracle'
- 確保已添加
oceanbase-client
依賴并部署JDBC驅動
- 顯式設置
- 解決方案:
-
時間戳轉換異常
- 解決方案:顯式設置時區:
'server-time-zone' = 'Asia/Shanghai'
- 解決方案:顯式設置時區:
六、生產環境優化建議
-
LogProxy性能調優
- 設置
working-mode = 'memory'
(內存模式,適合高頻變更) - 調整
obcdc.properties.batch_size
(如1024
)優化批量處理
- 設置
-
高可用配置
- 部署多節點LogProxy,Flink配置多個
logproxy.host
(逗號分隔) - 企業版使用
config-url
自動發現OB集群節點
- 部署多節點LogProxy,Flink配置多個
-
監控與清理
- 定期清理LogProxy內存數據(
working-mode = 'memory'
時):# 重啟LogProxy或通過API清理
- 定期清理LogProxy內存數據(
通過以上步驟,可完成Flink OceanBase CDC的全流程配置與驗證。生產環境中需特別注意社區版與企業版的配置差異、LogProxy服務穩定性及兼容模式的正確設置,以確保數據一致性和系統穩定性。