使用canal同步分庫分表數據,到 Elasticsearch

作者:小凱
沉淀、分享、成長,讓自己和他人都能有所收獲!

本文的宗旨在于通過簡單干凈實踐的方式教會讀者,配置出一套 Canal 工具服務,來同步分庫分表的數據到 Elasticsearch 文件夾系統中。同時在 SpringBoot 工程中,配置出兩套數據源,一套是 MySQL + MyBatis,一套是 Elasticsearch + MyBatis。【這是非常重要的設計手段】

雖然現在有 TiDB 這樣的分布式數據庫,但對于分庫分表 + 數據同步ES,依然是非常主流的方案。同時也有一部分是把分庫分表的數據同步到 TiDB 使用。

Github:https://github.com/alibaba/canal(opens new window)

#一、組件介紹
在這里插入圖片描述

canal ,譯為水道/管道/溝渠,主要用途是基于 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費。

早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基于業務 trigger 獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。

它的工作原理是,canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議。在 MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal ) 這樣 canal 再解析 binary log (binlog)進行配置分發,同步到 Elasticsearch 等系統中進行使用。

那么有了 canal 就可以把分庫分表的數據同步到 Elasticsearch,提供匯總查詢和聚合操作,也就不需要把輪訓每個分庫分表數據了。

二、測試預期
本文的案例會把MySQL,2庫4表的數據,通過 Sharding 分庫分表寫入數據后,同步到 Elasticsearch。分庫分表如下(環境安裝中會自動安裝數據庫和設置庫表);
在這里插入圖片描述

三、環境安裝
為了讓讀者伙伴更加簡單的學習到這一項方案技能,這里把所需的環境都配置成一整套的 docker compose 腳本文件(ARM、AMD),你只要執行安裝即可。安全前注意,無論是本機還是云服務器都需要安裝 docker-ce(opens new window)

1. 環境腳本
在這里插入圖片描述

  • 打開 xfg-dev-tech-canal 工程,下面就是 docker compose 的執行腳本。
  • mac/windows 如果安裝了 docker 可以直接點擊如圖的三角號安裝。如果是在 Linux 安裝了 docker 可以把 dev-ops 整個文件夾都上傳到云服務器,之后通過腳本;docker-compose -f xfg-dev-tech-canal-docker-compose.yml up -d 進行安裝。

1.1 開啟 binlog
mysql 數據同步需要創建一個 canal 的賬戶,之后還需要開啟 binlog 日志
在這里插入圖片描述

  • 在 mysql 配置文件夾中,設置了初始化授權的賬戶、導入的庫表,以及開啟 mysql-bin 和配置要采集的庫。
  • 如果你有配置自己其他的庫要同步也可以如此配置。

1.2 庫表采集配置
在這里插入圖片描述

  • 本文選擇的是 es 同步方式,所以需要在 canal-adapter 中 es7 文件夾添加同步的庫表 yml 配置。
  • 以及在 application.yml 中配置出需要鏈接的庫表以及同步的目標地址,也就是 es 的地址。【因為本文的案例是在同一個 docker compose 下安裝,所以直接用名稱 elsticsearch 即可訪問】

2. 運行狀態
在這里插入圖片描述

  • 安裝完成后可以進入 portainer 查看各個組件的運行,如果有哪個運行失敗了,可以點擊那個小文件的圖標,它可以查看日志。

3. 創建索引
在 doc/dev-ops/curl 下提供了創建 Elasticsearch 的腳本;你可以點擊執行或者直接復制執行,也可以復制導入到 ApiPost 里執行。

以上這些腳本是為了創建出數據庫表同步到 Elasticsearch 后對應的索引和映射的字段。文章下面會用到。

3.1 創建

curl -X PUT "127.0.0.1:9200/xfg_dev_tech.user_order" -H 'Content-Type: application/json' -d'
{"mappings": {"properties": {"_user_id":{"type": "text"},"_user_name":{"type": "text"},"_order_id":{"type": "text"},"_uuid":{"type": "text"},"_create_time":{"type": "date"},"_update_time":{"type": "date"}}}
}'

3.2 添加

curl -X PUT "127.0.0.1:9200/xfg_dev_tech.user_order/_mapping" -H 'Content-Type: application/json' -d'
{"properties": {"_sku_name": {"type": "text"}}
}'

3.3 刪除

curl -X DELETE "127.0.0.1:9200/xfg_dev_tech.user_order"

4. 創建索引(Kibana)
4.1 索引管理
地址:http://127.0.0.1:5601/app/management/kibana/indexPatterns(opens new window)
在這里插入圖片描述

  • 填寫完,點擊創建索引模式即可。

4.2 數據頁面
地址:http://127.0.0.1:5601/app/discover(opens new window)
在這里插入圖片描述

  • 等后面同步數據了以后,直接在這里點刷新就可以看到了。

5. 許可證
kibana 提供了免費30天的試用許可,安裝后可以使用 x-pack-sql-jdbc。它的好處是可以讓我們通過 MyBatis 的方式查詢 Elasticsearch 數據。

地址:http://127.0.0.1:5601/app/management/stack/license_management(opens new window)
在這里插入圖片描述

Elasticsearch 提供了 x-pack-sql-jdbc,讓對 Elasticsearch 的查詢也可以像使用 MySQL 數據庫一樣通過 MyBatis 進行查詢。但這個 x-pack-sql-jdbc 是付費的,免費可以使用 30 天。之后你可以選擇使用重新安裝,破解,或者使用 Elasticsearch 的查詢方式。還可以自己開發一個 Elasticsearch JDBC,GitHub 上也有類似的組件。

使用時需要引入 POM 配置;

<!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/x-pack-sql-jdbc -->
<dependency><groupId>org.elasticsearch.plugin</groupId><artifactId>x-pack-sql-jdbc</artifactId><version>7.17.14</version>
</dependency>

三、工程配置
本節涉及到了簡明教程中所講解的 Sharding 分庫分表 (opens new window)的使用,因為我們需要把分庫分表的數據通過 canal 同步到 Elasticsearch。(也可以使用其他分庫分表組件)

在工程中配置一套 Sharding 分庫分表映射的 MyBatis MyBatis,在配置一套 Elasticsearch x-pack-sql-jdbc 數據源映射的 MyBatis Mapper。這樣可以讀寫分別走自己設定好的 Mapper 對象了。
在這里插入圖片描述

1. 創建數據源

@Configuration
public class DataSourceConfig {@Configuration@MapperScan(basePackages = "cn.bugstack.xfg.dev.tech.infrastructure.dao.elasticsearch", sqlSessionFactoryRef = "elasticsearchSqlSessionFactory")static class ElasticsearchMyBatisConfig {@Bean("elasticsearchDataSource")@ConfigurationProperties(prefix = "spring.elasticsearch.datasource")public DataSource igniteDataSource(Environment environment) {return new EsDataSource();}@Bean("elasticsearchSqlSessionFactory")public SqlSessionFactory elasticsearchSqlSessionFactory(DataSource elasticsearchDataSource) throws Exception {SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();factoryBean.setDataSource(elasticsearchDataSource);factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mybatis/mapper/elasticsearch/*.xml"));return factoryBean.getObject();}}@Configuration@MapperScan(basePackages = "cn.bugstack.xfg.dev.tech.infrastructure.dao.mysql", sqlSessionFactoryRef = "mysqlSqlSessionFactory")static class MysqlMyBatisConfig {@Bean("mysqlDataSource")@ConfigurationProperties(prefix = "spring.mysql.datasource")public DataSource mysqlDataSource(Environment environment) {return DataSourceBuilder.create().url(environment.getProperty("spring.mysql.datasource.url")).driverClassName(environment.getProperty("spring.mysql.datasource.driver-class-name")).build();}@Bean("mysqlSqlSessionFactory")public SqlSessionFactory mysqlSqlSessionFactory(DataSource mysqlDataSource) throws Exception {SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();factoryBean.setDataSource(mysqlDataSource);factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mybatis/mapper/mysql/*.xml"));return factoryBean.getObject();}}}
  • ElasticsearchMyBatisConfig 使用 EsDataSource 創建數據源和映射 MyBatis Mapper 文件。
  • MysqlMyBatisConfig 使用 DataSourceBuilder 創建 Sharding 提供的數據源和映射 MyBatis Mapper 文件。

2. Mapper 映射

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.bugstack.xfg.dev.tech.infrastructure.dao.elasticsearch.IElasticsearchUserOrderDao"><resultMap id="dataMap" type="cn.bugstack.xfg.dev.tech.infrastructure.po.UserOrderPO"><result column="_user_id" property="userId"/><result column="_user_name" property="userName"/><result column="_order_id" property="orderId"/><result column="_sku_name" property="skuName"/><result column="_update_time" property="updateTime"/><result column="_create_time" property="createTime"/></resultMap><select id="selectByUserId" resultMap="dataMap">select _user_id, _user_name, _order_id, _sku_namefrom "xfg_dev_tech.user_order"order by _update_timelimit 10</select></mapper>
  • 這個是 Elasticsearch 映射的 Mapper 文件,映射的字段就是前面安裝環境的時候設置的索引和字段。現在你使用 Elasticsearch 就不用在工程中硬編碼查詢語句了,變得非常方便。

四、工程測試
1. 寫入數據

@Test
public void test_insert() throws InterruptedException {for (int i = 0; i < 3; i++) {UserOrderPO userOrderPO = UserOrderPO.builder().userName("小哥哥").userId("xfg_" + RandomStringUtils.randomAlphabetic(6)).userMobile("+86 13521408***").sku("13811216").skuName("《手寫MyBatis:漸進式源碼實踐》").orderId(RandomStringUtils.randomNumeric(11)).quantity(1).unitPrice(BigDecimal.valueOf(128)).discountAmount(BigDecimal.valueOf(50)).tax(BigDecimal.ZERO).totalAmount(BigDecimal.valueOf(78)).orderDate(new Date()).orderStatus(0).isDelete(0).uuid(UUID.randomUUID().toString().replace("-", "")).ipv4("127.0.0.1").ipv6("2001:0db8:85a3:0000:0000:8a2e:0370:7334".getBytes()).extData("{\"device\": {\"machine\": \"IPhone 14 Pro\", \"location\": \"shanghai\"}}").build();userOrderDao.insert(userOrderPO);Thread.sleep(100);}
}
  • 循環插入3條數據,按需你可以設置更多條數據。
  • 這里的用戶編號 userId 是隨機的,也是切分鍵的 ID,所以會在不同的庫表寫入數據。

2. 數據驗證
MySQL:http://127.0.0.1:8899/ (opens new window)docker compose 配置的管理后臺,可以 root/123456 登錄
Kibana:http://127.0.0.1:5601/app/discover (opens new window)查詢寫入的數據。
在這里插入圖片描述

3. 查詢數據

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class UserOrderDaoTest {@Resourceprivate IElasticsearchUserOrderDao userOrderDao;@Testpublic void test() {List<UserOrderPO> userOrderPOS = userOrderDao.selectByUserId();log.info("測試結果:{}", JSON.toJSONString(userOrderPOS));}}

在這里插入圖片描述

  • 通過 Elasticsearch 走 x-pack-sql-jdbc 的方式再把數據查詢出來

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/914787.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/914787.shtml
英文地址,請注明出處:http://en.pswp.cn/news/914787.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

氣候為何愈演愈“炙” — 未來五年高溫趨勢與 AI 氣象大模型的突破性價值

早、更準 代表性模型 主要特征 應用進展 GraphCast(DeepMind) 10 天全球預報;0.25 分辨率;< 1 min 推理 90 % 指標超 ECMWF HRES,已用于極端風暴提前鎖定Google DeepMind MetNet-3(Google Research) 1–4 km 分辨率;2 min 時序;24 h 區域精細預報 美東、歐洲已在 G…

LVS四種模式及部署NAT、DR模式集群

1、lvs簡介LVS:Linux Virtual Server&#xff0c;負載調度器&#xff0c;內核集成&#xff0c;章文嵩&#xff0c;阿里四層SLB(ServerLoadBalance)是基于LVSkeepalived實現LVS 官網: http://www.linuxvirtualserver.org/LVS 相關術語VS: Virtual Server&#xff0c;負責調度RS:…

【Linux】Ubuntu22.04安裝zabbix

官方文檔&#xff1a;zabbix安裝文檔 環境如下 環境版本nginx1.26.3zabbix7.0.16mysql8.0.41 安裝nginx和mysql 一鍵部署腳本 部署zabbix #!/bin/bash wget https://repo.zabbix.com/zabbix/7.0/ubuntu/pool/main/z/zabbix-release/zabbix-release_latest_7.0ubuntu22.04_…

C++ - 仿 RabbitMQ 實現消息隊列--sqlite與gtest快速上手

目錄 SQLite 什么是 SQLite 為什么要用 SQLite SQLite3 C/C API 介紹 SQLite3 C/C API 使用 GTest GTest 是什么 GTest 使用 TEST 宏 斷言 事件機制 全局事件 TestSuite 事件 SQLite 什么是 SQLite SQLite 是一個進程內的輕量級數據庫&#xff0c;它實現了自給自足…

Web3.0 學習方案

Web3.0 學習方案 一、學習方案 &#xff08;一&#xff09;入門階段 1. 了解 Web3.0 基礎概念 學習內容&#xff1a; Web3.0 的起源、愿景、與 Web2.0 的區別區塊鏈的基本概念&#xff1a;分布式賬本、哈希、公鑰/私鑰、共識機制&#xff08;PoW、PoS、DPoS、PBFT 等&#xff0…

springboot3.5.3依賴學習

springboot3.5.3依賴學習 ? Spring Boot BOM&#xff08;spring-boot-dependencies&#xff09;是 Spring 官方維護的超級依賴清單&#xff0c;覆蓋了 Spring 生態中幾乎所有核心庫、常用工具庫及第三方依賴。其作用是統一管理這些依賴的版本&#xff0c;確保它們相互兼容。以…

制作一款打飛機游戲80:道具碰撞

目前我們仍然無法拾取這些物品&#xff0c;它們只是簡單地掉落在地上。因此&#xff0c;我們需要對這些功能進行增強。目標?彈射物品?&#xff1a;當物品生成時&#xff0c;我們希望它們能以一定的力量彈出&#xff0c;而不是無力地掉落。?添加不同類型的物品?&#xff1a;…

Python編程基礎(六)| 用戶輸入和while循環

引言 很久沒有寫 Python 了&#xff0c;有一點生疏。這是學習《Python 編程&#xff1a;從入門到實踐&#xff08;第3版&#xff09;》的課后練習記錄&#xff0c;主要目的是快速回顧基礎知識。 練習1&#xff1a;汽車租賃 編寫一個程序&#xff0c;詢問用戶要租什么樣的汽車&a…

【華為機試】HJ52 計算字符串的編輯距離

文章目錄HJ52 計算字符串的編輯距離描述輸入描述輸出描述示例1HJ52 計算字符串的編輯距離描述輸入描述輸出描述示例1解題思路算法分析動態規劃狀態轉移狀態轉移方程算法流程圖DP表格示例三種操作詳解代碼實現思路時間復雜度分析關鍵優化技巧實際應用場景算法擴展面試考點完整題…

15.手動實現BatchNorm(BN)

15.1 BatchNorm操作手動實現 import torch from torch import nndef batch_norm(X,gamma,beta,moving_mean,moving_var,eps,momentum):if not torch.is_grad_enabled():#這個是推理模式X_hat(X-moving_mean)/torch.sqrt(moving_vareps)else:assert len(X.shape) in (2,4)if le…

【項目實踐】SMBMS(Javaweb版)匯總版

文章目錄前期準備工作數據庫、數據表創建web項目創建項目文件目錄配置Tomcat&#xff0c;導入依賴建立實體類編寫基礎公共方法類導入基礎資源登錄功能登錄頁面持久層dao層的用戶登錄及接口實現dao層接口實現所需的方法業務層sevice層的接口的實現接口實現相關的業務邏輯編寫ser…

隱藏源IP的核心方案與高防實踐

一、源IP暴露的風險 直接DDoS攻擊&#xff1a;2025年Q2全球DDoS攻擊峰值達3.8Tbps&#xff08;來源&#xff1a;Cloudflare報告&#xff09;漏洞利用&#xff1a;暴露的SSH端口平均每天遭受12,000暴力破解嘗試數據泄露&#xff1a;直接連接數據庫風險提升300% 二、4種有效隱藏方…

深度學習圖像分類數據集—五種電器識別分類

該數據集為圖像分類數據集&#xff0c;適用于ResNet、VGG等卷積神經網絡&#xff0c;SENet、CBAM等注意力機制相關算法&#xff0c;Vision Transformer等Transformer相關算法。 數據集信息介紹&#xff1a;五種電器識別分類&#xff1a;[notebook, phone, powerbank, tablet, w…

Windows11家庭版配置frigate 嵌入自研算法(基于Yolov8)-【2】

使用 YOLOv8 的 results.xyxy 結構&#xff0c;下面是一個完整的 MQTT 推送腳本&#xff0c;用于把識別到的目標&#xff08;比如突涌水、水漬、障礙物等&#xff09;發送到 Frigate 的 MQTT 接口。? 前提假設 YOLOv8 推理代碼已經運行并生成 results.xyxy。每一行是 [x1, y1,…

安裝llama-factory報錯 error: subprocess-exited-with-error

報錯信息如下 Using cached https://mirrors.aliyun.com/pypi/packages/17/89/940a509ee7e9449f0c877fa984b37b7cc485546035cc67bbc353f2ac20f3/av-15.0.0.tar.gz (3.8 MB)Preparing metadata (pyproject.toml) ... errorerror: subprocess-exited-with-error Preparing metad…

QT 多線程 管理串口

記錄一下自己使用多線程進行串口管理和數據讀取的過程。如果有問題的話可以發消息給我。背景在使用QT制作一個串口數據讀取處理的小軟件的時候&#xff0c;發現了存在界面卡頓的情況&#xff0c;感覺性能太低&#xff0c;于是考慮把串口數據的讀取和處理都放到子線程的緩沖區中…

在虛擬環境中復現論文(環境配置)

前提&#xff1a;已經下載condawinR&#xff0c;輸入cmd進入命令行conda create -n PPT python3.8.3 pytorch1.7.0conda create -n PPT(虛擬環境名) python3.8.3(包名) pytorch1.7.0(包名)安裝完畢&#xff0c;激活虛擬環境&#xff1a;conda activate PPT根據論文readme要求安…

Flutter Web 的發展歷程:Dart、Flutter 與 WasmGC

Flutter Web 應該是 Flutter 開發者里最不“受寵”的平臺了&#xff0c;但是其實 Flutter 和 Dart 團隊對于 Web 的投入一直沒有減少&#xff0c;這也和 Flutter 還有 Dart 的"出生"有關系&#xff0c;今天就借著 Dart 團隊的 mer A?acan 和 Martin Kustermann 在油…

c#方法關鍵字,ref、out、int

在 C# 中&#xff0c;ref、out 和 in 是用于方法參數傳遞的關鍵字&#xff0c;它們控制參數如何在方法和調用者之間傳遞數據。以下是對這三個關鍵字的詳細分析&#xff1a;1. ref 關鍵字&#xff08;引用傳遞&#xff09;作用允許方法修改調用者的變量&#xff1a;通過引用傳遞…

設計模式—初識設計模式

1.設計模式經典面試題分析幾個常見的設計模式對應的面試題。1.1原型設計模式1.使用UML類圖畫出原型模式核心角色&#xff08;意思就是使用會考察使用UML畫出設計模式中關鍵角色和關系圖等&#xff09;2.原型設計模式的深拷貝和淺拷貝是什么&#xff0c;寫出深拷貝的兩種方式的源…