Flink OceanBase CDC 環境配置與驗證

一、OceanBase 數據庫核心配置
1. 環境準備與版本要求
  • 版本要求:OceanBase CE 4.0+ 或 OceanBase EE 2.2+
  • 組件依賴:需部署 LogProxy 服務(社區版/企業版部署方式不同)
  • 兼容模式:支持 MySQL 模式(默認)和 Oracle 模式
2. 創建用戶與權限配置

在 sys 租戶創建管理用戶(社區版示例):

-- 連接 sys 租戶(默認端口 2881)
mysql -h127.0.0.1 -P2881 -uroot@sys -p-- 創建用戶(替換為實際用戶名密碼)
CREATE USER 'ob_cdc_user' IDENTIFIED BY 'Ob@123456';
GRANT ALL PRIVILEGES ON *.* TO 'ob_cdc_user' WITH GRANT OPTION;
FLUSH PRIVILEGES;

在業務租戶創建 CDC 用戶:

-- 切換到業務租戶(如 test_tenant)
USE test_tenant;-- 創建 CDC 數據讀取用戶
CREATE USER 'flink_user' IDENTIFIED BY 'Flink@123';
GRANT SELECT ON test_db.* TO 'flink_user';
FLUSH PRIVILEGES;
3. 獲取關鍵配置信息

社區版獲取 rootserver-list:

-- 連接業務租戶
mysql -h127.0.0.1 -P2881 -uflink_user -p-- 查詢 rootserver 列表(格式:ip:rpc_port:sql_port)
SHOW PARAMETERS LIKE 'rootservice_list';
-- 示例輸出:rootservice_list | 127.0.0.1:2882:2881

企業版獲取 config-url:

SHOW PARAMETERS LIKE 'obconfig_url';
-- 示例輸出:obconfig_url | http://127.0.0.1:8080/services?Action=ObRootServiceInfo&...
4. 部署 LogProxy 服務(社區版快速啟動)
# 下載 LogProxy 二進制(社區版)
wget https://github.com/oceanbase/oblogproxy/releases/download/v2.2.7/oblogproxy-2.2.7.tar.gz
tar -zxvf oblogproxy-2.2.7.tar.gz# 編輯配置文件 oblogproxy.conf
vi oblogproxy/oblogproxy.conf
# 添加以下配置(根據實際情況修改):
[common]
rootservice_list = "127.0.0.1:2882:2881"
logproxy_port = 2983
working_mode = "memory"# 啟動 LogProxy
cd oblogproxy
./oblogproxy -c oblogproxy.conf
二、Flink 環境集成配置
1. 添加Maven依賴
<!-- OceanBase CDC 連接器 -->
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-oceanbase-cdc</artifactId><version>3.0.1</version><scope>provided</scope>
</dependency><!-- 企業版需添加OceanBase JDBC驅動 -->
<dependency><groupId>com.oceanbase</groupId><artifactId>oceanbase-client</artifactId><version>2.4.2</version>
</dependency>
2. SQL Client部署
  1. 下載 CDC 連接器 JAR:
    flink-sql-connector-oceanbase-cdc-3.0.1.jar
  2. 企業版需額外下載 OceanBase JDBC 驅動:
    oceanbase-client-2.4.2.jar
  3. 將 JAR 包放入 $FLINK_HOME/lib/ 后重啟 Flink 集群。
三、Flink SQL 表定義與參數詳解
1. MySQL 模式建表示例(含元數據)
-- 配置checkpoint
SET 'execution.checkpointing.interval' = '5s';-- 創建OceanBase CDC表(MySQL模式)
CREATE TABLE ob_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),-- 元數據列tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,db_name STRING METADATA FROM 'database_name' VIRTUAL,table_name STRING METADATA FROM 'table_name' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'oceanbase-cdc','scan.startup.mode' = 'initial','username' = 'flink_user@test_tenant#ob_cluster','password' = 'Flink@123','tenant-name' = 'test_tenant','database-name' = 'test_db','table-name' = 'orders','hostname' = '127.0.0.1','port' = '2881','rootserver-list' = '127.0.0.1:2882:2881',  -- 社區版必填'logproxy.host' = '127.0.0.1','logproxy.port' = '2983','working-mode' = 'memory'
);
2. Oracle 模式建表示例
CREATE TABLE ob_orders_oracle (order_id INT,order_date TIMESTAMP(0),customer_name STRING,-- 元數據列tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL
) WITH ('connector' = 'oceanbase-cdc','scan.startup.mode' = 'initial','username' = 'flink_user@test_tenant#ob_cluster','password' = 'Flink@123','tenant-name' = 'test_tenant','database-name' = 'test_db','table-name' = 'orders','hostname' = '127.0.0.1','port' = '2881','compatible-mode' = 'oracle',       -- 關鍵:設置Oracle兼容模式'jdbc.driver' = 'com.oceanbase.jdbc.Driver',  -- 企業版JDBC驅動'config-url' = 'http://127.0.0.1:8080/...',  -- 企業版必填'logproxy.host' = '127.0.0.1','logproxy.port' = '2983'
);
3. 核心參數詳解
參數名必選默認值類型說明
connectorString固定為oceanbase-cdc
scan.startup.modeString啟動模式:initial(快照+日志)、latest-offset(僅最新)、timestamp(指定時間)
tenant-nameString目標租戶名稱(如test_tenant
logproxy.hostStringLogProxy 服務IP
logproxy.portIntegerLogProxy 服務端口(默認2983)
rootserver-list社區版是String社區版rootserver列表(格式ip:rpc_port:sql_port
config-url企業版是String企業版配置服務URL
compatible-modemysqlString兼容模式:mysql(默認)、oracle
jdbc.driver企業版是com.mysql.jdbc.DriverString企業版JDBC驅動類(com.oceanbase.jdbc.Driver
四、環境驗證與測試
1. 準備測試數據(OceanBase MySQL模式)
-- 連接業務租戶
mysql -h127.0.0.1 -P2881 -uflink_user -p test_db-- 創建測試表
CREATE TABLE orders (order_id INT PRIMARY KEY,order_date TIMESTAMP,customer_name VARCHAR(100),price DECIMAL(10, 2)
);-- 插入數據
INSERT INTO orders VALUES 
(1, '2023-01-01 10:00:00', 'Alice', 100.50),
(2, '2023-01-02 11:00:00', 'Bob', 200.75);
COMMIT;
2. Flink SQL 驗證
-- 查詢OceanBase CDC表(首次觸發快照)
SELECT * FROM ob_orders;-- 在OceanBase中更新數據
UPDATE orders SET price = 150.00 WHERE order_id = 1;
COMMIT;-- 觀察Flink輸出:應顯示變更記錄,op_ts為變更時間
3. DataStream API 驗證(社區版)
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.connectors.oceanbase.OceanBaseSource;
import org.apache.flink.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.VarcharType;public class OceanBaseSourceExample {public static void main(String[] args) throws Exception {// 定義表結構RowType physicalType = RowType.of(RowType.Field.of("order_id", BigIntType.INSTANCE),RowType.Field.of("customer_name", VarcharType.of(100)));InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(physicalType);// 配置OceanBase SourceOceanBaseSource<RowData> source = OceanBaseSource.<RowData>builder().rsList("127.0.0.1:2882:2881")  // 社區版rootserver-list.startupMode(StartupMode.INITIAL).username("flink_user@test_tenant#ob_cluster").password("Flink@123").tenantName("test_tenant").databaseName("test_db").tableName("orders").hostname("127.0.0.1").port(2881).logProxyHost("127.0.0.1").logProxyPort(2983).deserializer(RowDataOceanBaseDeserializationSchema.newBuilder().setPhysicalRowType(physicalType).setResultTypeInfo(typeInfo).build()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.fromSource(source, null, "OceanBase CDC Source").print();env.execute("OceanBase CDC Test");}
}
五、常見問題與解決方案
  1. LogProxy連接失敗

    ERROR: Failed to connect to LogProxy at 127.0.0.1:2983
    
    • 解決方案:
      1. 確認LogProxy服務已啟動且端口正確(netstat -an | grep 2983
      2. 檢查logproxy.hostlogproxy.port配置是否與LogProxy一致
  2. 權限不足(社區版)

    ERROR: Access denied for user 'flink_user'@'127.0.0.1'
    
    • 解決方案:
      • 確認用戶在業務租戶有SELECT權限
      • 檢查用戶名格式是否正確(user@tenant#cluster
  3. 企業版Oracle模式配置錯誤

    ERROR: incompatible-mode must be set for Oracle mode
    
    • 解決方案:
      • 顯式設置compatible-mode = 'oracle'
      • 確保已添加oceanbase-client依賴并部署JDBC驅動
  4. 時間戳轉換異常

    • 解決方案:顯式設置時區:
      'server-time-zone' = 'Asia/Shanghai'
      
六、生產環境優化建議
  1. LogProxy性能調優

    • 設置working-mode = 'memory'(內存模式,適合高頻變更)
    • 調整obcdc.properties.batch_size(如1024)優化批量處理
  2. 高可用配置

    • 部署多節點LogProxy,Flink配置多個logproxy.host(逗號分隔)
    • 企業版使用config-url自動發現OB集群節點
  3. 監控與清理

    • 定期清理LogProxy內存數據(working-mode = 'memory'時):
      # 重啟LogProxy或通過API清理
      

通過以上步驟,可完成Flink OceanBase CDC的全流程配置與驗證。生產環境中需特別注意社區版與企業版的配置差異、LogProxy服務穩定性及兼容模式的正確設置,以確保數據一致性和系統穩定性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/913329.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/913329.shtml
英文地址,請注明出處:http://en.pswp.cn/news/913329.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

c++對象池

【設計模式】其它經典模式-對象池模式&#xff08;Object Pool Pattern&#xff09;-CSDN博客 在C中&#xff0c;對象池&#xff08;Object Pool&#xff09;是一種管理對象生命周期的技術&#xff0c;旨在減少對象創建和銷毀的開銷&#xff0c;提高性能。對象池預先分配一定數…

JavaFX:Scene(場景)

簡介 Scene對象是JavaFX場景圖的根(root)。JavaFX 場景中包含所有可視的 JavaFX GUI 組件。JavaFX 場景由javafx.scene.Scene類表示。必須在 Stage(舞臺)上設置 Scene 對象才能使其可見。在本 JavaFX Scene 教程中,將向您展示如何創建 Scene 對象并向其添加 GUI 組件。 創…

vue3.4中的v-model的用法~

1.首先以前我們針對父子組件傳參是不是通過defineProps與defineEmits來實現的&#xff0c;但是這么比較繁瑣&#xff0c;因為他是單向傳參&#xff0c;而不是雙向的&#xff0c;這里我們要介紹的是vue3.4的v-model來實現雙向數據傳遞。 2、代碼示例&#xff1a; //父組件 <…

nvm常用指令匯總

nvm是用來管理nodejs的&#xff0c;可以方便安裝、切換、卸載當前環境的node版本。 以下是常用指令匯總&#xff1a;nvm list 查看本機已經安裝的node版本。*表示當前系統正在使用的node版本nvm install xx.xx.x 后邊加版本號&#xff0c;表示安裝指定的版本nvm use xx.xx.x當前…

洛谷P5021 [NOIP 2018 提高組] 賽道修建【題解】【二分答案+樹上貪心】

P5021 [NOIP 2018 提高組] 賽道修建 題意簡述 給定一棵含 n n n 個點的無向帶權樹&#xff0c;求將其分裂為 m m m 條鏈后&#xff0c;最短的一條鏈的最大長度是多少&#xff1f; 點可以重復使用&#xff0c;邊不可以重復使用。 思路 二分答案貪心判定貌似可以&#xff…

Portal認證過程雜談

Portal認證模型簡介 Portal認證模型通常由這四個設備組成 認證服務器即3A服務器&#xff0c;通常用radius服務器 接入設備通常就是NAC設備&#xff08;網絡接入控制&#xff09; Portal服務器就是Portal認證的認證網站&#xff08;通常叫門戶網站&#xff09; 認證過程簡述…

ZSGuardian ---AI賦能,新一代研發管理守護平臺 -即將上線

一場研發管理的革命 在數字化浪潮奔涌向前的今天&#xff0c;軟件開發與產品研發的節奏不斷加快&#xff0c;市場需求瞬息萬變&#xff0c;技術迭代日新月異。對于研發團隊而言&#xff0c;如何在復雜多變的環境中&#xff0c;高效地管理項目、保障產品質量、確保按時上線&…

小菜狗的云計算之旅,學習了解rsync+sersync實現數據實時同步(詳細操作步驟)

Rsyncsersync實現數據實時同步 目錄 Rsyncsersync實現數據實時同步 一、rsync概述 二、rsync運行原理 三、rsync部署 四、備份測試 五、使用非系統用戶備份數據 5.1 rsync的配置文件介紹 5.2 配置備份目錄 5.3 使用rsync用戶備份測試 5.4 pull拉取數據 六、rsyncse…

牛客周賽Round 99(Go語言)

A題 (A.go) 思路總結: 這道題要求判斷一個整數中是否包含連續的兩個9。 核心思路是將輸入的整數轉換為字符串&#xff0c;然后遍歷這個字符串&#xff0c;檢查是否存在相鄰的兩個字符都是9。如果找到了&#xff0c;就立即停止遍歷并輸出"YES"&#xff1b;如果遍歷完…

紅外圖像小目標檢測熱力圖可視化系統

原創代碼&#xff0c;可以工程修改含界面。

供應鏈管理:指標評估方式分類與詳解

一、指標評估方式分類與詳解 評估維度評估方式核心方法適用場景示例數據來源內部數據評估從企業ERP、MES、CRM等系統提取生產、財務、客戶等數據。成本、效率、質量等內部管理指標評估。生產成本數據&#xff08;MES系統&#xff09;、客戶滿意度&#xff08;CRM系統&#xff…

基于 Rust 的前端工具基本實現

1. Rust 環境安裝 1.1. 安裝 Rust Rust 提供了一個非常方便的安裝工具 rustup,可以通過以下命令安裝 Rust: curl --proto =https --tlsv1.2 -sSf https://sh.rustup.rs | sh 這個命令會安裝 Rust 編譯器 rustc、包管理工具 cargo 以及其他相關工具。 1.2. 配置環境變量 …

大模型關鍵字解釋

&#x1f4a1; 一、模型結構關鍵詞 1. Transformer Transformer 是一種專門用來“理解文字”的神經網絡結構。就像一個聰明的秘書&#xff0c;能同時看懂整段話的所有詞之間的關系&#xff0c;而不是像老式模型那樣一句一句讀。 &#x1f449; 舉例&#xff1a;以前的模型像…

空調和烘干機的使用

開關 制冷 選擇上下掃風 那個就下來了 烘干機 電源鍵 長按3s以上直到菜單顯示 選擇小件 不要快烘 至少1個半小時 才可以烘干

極簡的神經網絡反向傳播例子

我之前一直沒搞清楚&#xff0c;神經網絡為什么要求導&#xff1f;反向傳播又是什么&#xff1f;于是到現在深究回來…… 本質就是擬合一個未知函數。 高中的數理統計就學過最小二乘法這種回歸方法&#xff08;? 代表自己的預測y&#xff0c;這個表達要記住&#xff09;&…

01-什么是強化學習

什么是強化學習 1. 定義 強化學習&#xff08;Reinforcement Learning, RL&#xff09;是一種使智能體&#xff08;Agent&#xff09;通過與環境&#xff08;Environment&#xff09;不斷交互&#xff0c;學習如何在不同情境下采取行動以獲得最大化累積獎勵的機器學習方法。 強…

淘寶直播數字人:音視頻算法工程技術

本專題是我們打造智能數字人的部分實踐總結。我們將探討六大核心環節&#xff1a;LLM文案生產賦予數字人思考和內容生成能力&#xff0c;如同其“大腦”&#xff1b;LLM互動能力則聚焦對話邏輯與擬人化交流&#xff0c;是實現自然交互的關鍵&#xff1b;TTS&#xff08;語音合成…

MySQL回表查詢深度解析:原理、影響與優化實戰

引言 作為后端開發或DBA&#xff0c;你是否遇到過這樣的場景&#xff1a; 明明給字段加了索引&#xff0c;查詢還是慢&#xff1f;EXPLAIN一看&#xff0c;執行計劃里type是ref&#xff0c;但數據量不大卻耗時很久&#xff1f; 這時候&#xff0c;你很可能遇到了MySQL中常見的…

任務管理器看不到的內存占用:RAMMap 深度分析指南

前言&#xff1a;任務管理器看不到的內存真相 在日常使用 Windows 系統時&#xff0c;我們有時會遇到一種令人費解的情況&#xff1a; 剛剛開機&#xff0c;什么軟件都沒運行&#xff0c;系統內存卻已經占用了 7&#xff5e;8 GB。 打開任務管理器一看&#xff0c;前幾個進程加…

從傳統倉庫到智能物流樞紐:艾立泰的自動化蛻變之旅

在物流行業智能化浪潮中&#xff0c;艾立泰從依賴人工的傳統倉庫轉型為智能物流樞紐&#xff0c;其自動化升級路徑為行業提供了典型范本。?曾幾何時&#xff0c;艾立泰倉庫內人工搬運、紙質單據流轉、手工盤點是常態&#xff0c;效率低下、差錯率高、人力成本攀升等問題制約發…