mysql到pg怎么高效_干貨 | Debezium實現Mysql到Elasticsearch高效實時同步(示例代碼)

題記

來自Elasticsearch中文社區的問題——

MySQL中表無唯一遞增字段,也無唯一遞增時間字段,該怎么使用logstash實現MySQL實時增量導數據到es中?

logstash和kafka_connector都僅支持基于自增id或者時間戳更新的方式增量同步數據。

回到問題本身:如果庫表里沒有相關字段,該如何處理呢?

本文給出相關探討和解決方案。

1、 binlog認知

1.1 啥是 binlog?

binlog是Mysql sever層維護的一種二進制日志,與innodb引擎中的redo/undo log是完全不同的日志;其主要是用來記錄對mysql數據更新或潛在發生更新的SQL語句,并以"事務"的形式保存在磁盤中。

作用主要有:

1)復制:達到master-slave數據一致的目的。

2)數據恢復:通過mysqlbinlog工具恢復數據。

3)增量備份。

1.2 阿里的Canal實現了增量Mysql同步

lazy.gif

一圖勝千言,canal是用java開發的基于數據庫增量日志解析、提供增量數據訂閱&消費的中間件。

目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用來處理獲得的相關數據。目的:增量數據訂閱&消費。

綜上,使用binlog可以突破logstash或者kafka-connector沒有自增id或者沒有時間戳字段的限制,實現增量同步。

2、基于binlog的同步方式

1)基于kafka Connect的Debezium 開源工程,地址:. https://debezium.io/

2)不依賴第三方的獨立應用: Maxwell開源項目,地址:http://maxwells-daemon.io/

由于已經部署過conluent(kafka的企業版本,自帶zookeeper、kafka、ksql、kafka-connector等),本文僅針對Debezium展開。

3、Debezium介紹

Debezium是捕獲數據實時動態變化的開源的分布式同步平臺。能實時捕獲到數據源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、刪除(deletes)操作,實時同步到Kafka,穩定性強且速度非常快。

特點:

1)簡單。無需修改應用程序。可對外提供服務。

2)穩定。持續跟蹤每一行的每一處變動。

3)快速。構建于kafka之上,可擴展,經官方驗證可處理大容量的數據。

4、同步架構

lazy.gif

如圖,Mysql到ES的同步策略,采取“曲線救國”機制。

步驟1: 基Debezium的binlog機制,將Mysql數據同步到Kafka。

步驟2: 基于Kafka_connector機制,將kafka數據同步到Elasticsearch。

5、Debezium實現Mysql到ES增刪改實時同步

軟件版本:

confluent:5.1.2;

Debezium:0.9.2_Final;

Mysql:5.7.x.

Elasticsearch:6.6.1

5.1 Debezium安裝

Debezium的安裝只需要把debezium-connector-mysql的壓縮包解壓放到Confluent的解壓后的插件目錄(share/java)中。

MySQL Connector plugin 壓縮包的下載地址:

注意重啟一下confluent,以使得Debezium生效。

5.2 Mysql binlog等相關配置。

Debezium使用MySQL的binlog機制實現數據動態變化監測,所以需要Mysql提前配置binlog。

核心配置如下,在Mysql機器的/etc/my.cnf的mysqld下添加如下配置。

1[mysqld]

2

3server-id = 223344

4log_bin = mysql-bin

5binlog_format = row

6binlog_row_image = full

7expire_logs_days = 10

然后,重啟一下Mysql以使得binlog生效。

1systemctl start mysqld.service

5.3 配置connector連接器。

配置confluent路徑目錄 : /etc

創建文件夾命令 :

1mkdir kafka-connect-debezium

在mysql2kafka_debezium.json存放connector的配置信息 :

1[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json

2{

3 "name" : "debezium-mysql-source-0223",

4 "config":

5 {

6 "connector.class" : "io.debezium.connector.mysql.MySqlConnector",

7 "database.hostname" : "192.168.1.22",

8 "database.port" : "3306",

9 "database.user" : "root",

10 "database.password" : "XXXXXX",

11 "database.whitelist" : "kafka_base_db",

12 "table.whitlelist" : "accounts",

13 "database.server.id" : "223344",

14 "database.server.name" : "full",

15 "database.history.kafka.bootstrap.servers" : "192.168.1.22:9092",

16 "database.history.kafka.topic" : "account_topic",

17 "include.schema.changes" : "true" ,

18 "incrementing.column.name" : "id",

19 "database.history.skip.unparseable.ddl" : "true",

20 "transforms": "unwrap,changetopic",

21 "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",

22 "transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter",

23 "transforms.changetopic.regex":"(.*)",

24 "transforms.changetopic.replacement":"$1-smt"

25 }

26}

注意如下配置:

"database.server.id",對應Mysql中的server-id的配置。

"database.whitelist" : 待同步的Mysql數據庫名。

"table.whitlelist" :待同步的Mysq表名。

重要:“database.history.kafka.topic”:存儲數據庫的Shcema的記錄信息,而非寫入數據的topic、

"database.server.name":邏輯名稱,每個connector確保唯一,作為寫入數據的kafka topic的前綴名稱。

坑一:transforms相關5行配置作用是寫入數據格式轉換。

如果沒有,輸入數據會包含:before、after記錄修改前對比信息以及元數據信息(source,op,ts_ms等)。

這些信息在后續數據寫入Elasticsearch是不需要的。(注意結合自己業務場景)。

5.4 啟動connector

1curl -X POST -H "Content-Type:application/json"

2--data @mysql2kafka_debezium.json.json

3http://192.168.1.22:18083/connectors | jq

5.5 驗證寫入是否成功。

5.5.1 查看kafka-topic

1 kafka-topics --list --zookeeper localhost:2181

此處會看到寫入數據topic的信息。

注意新寫入數據topic的格式:database.schema.table-smt 三部分組成。

本示例topic名稱:

full.kafka_base_db.account-smt

5.5.2 消費數據驗證寫入是否正常

1./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning

至此,Debezium實現mysql同步kafka完成。

6、kafka-connector實現kafka同步Elasticsearch

6.1、Kafka-connector介紹

Kafka Connect是一個用于連接Kafka與外部系統(如數據庫,鍵值存儲,檢索系統索引和文件系統)的框架。

連接器實現公共數據源數據(如Mysql、Mongo、Pgsql等)寫入Kafka,或者Kafka數據寫入目標數據庫,也可以自己開發連接器。

6.2、kafka到ES connector同步配置

配置路徑:

1/home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

配置內容:

1"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

2"tasks.max": "1",

3"topics": "full.kafka_base_db.account-smt",

4"key.ignore": "true",

5"connection.url": "http://192.168.1.22:9200",

6"type.name": "_doc",

7"name": "elasticsearch-sink-test"

6.3 kafka到ES啟動connector

啟動命令

1confluent load elasticsearch-sink-test

2-d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

6.4 Kafka-connctor RESTFul API查看

Mysql2kafka,kafka2ES的connector詳情信息可以借助postman或者瀏覽器或者命令行查看。

1curl -X GET http://localhost:8083/connectors

7、坑復盤。

坑2: 同步的過程中可能出現錯誤,比如:kafka topic沒法消費到數據。

排解思路如下:

1)確認消費的topic是否是寫入數據的topic;

2)確認同步的過程中沒有出錯。可以借助connector如下命令查看。

1curl -X GET http://localhost:8083/connectors-xxx/status

坑3: Mysql2ES出現日期格式不能識別。

是Mysql jar包的問題,解決方案:在my.cnf中配置時區信息即可。

坑4: kafka2ES,ES沒有寫入數據。

排解思路:

1)建議:先創建同topic名稱一致的索引,注意:Mapping靜態自定義,不要動態識別生成。

2)通過connetor/status排查出錯原因,一步步分析。

8、小結

binlog的實現突破了字段的限制,實際上業界的go-mysql-elasticsearch已經實現。

對比:logstash、kafka-connector,雖然Debezium“曲線救國”兩步實現了實時同步,但穩定性+實時性能相對不錯。

推薦大家使用。大家有好的同步方式也歡迎留言討論交流。

推薦閱讀:

重磅 | 死磕Elasticsearch方法論認知清單(2019春節更新版)

lazy.gif

Elasticsearch基礎、進階、實戰第一公眾號

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/533062.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/533062.shtml
英文地址,請注明出處:http://en.pswp.cn/news/533062.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

mysql怎么復制信息_mysql關于復制的一些信息參考

1.主庫的復制用戶密碼修改后,在備庫修改復制:stop slave;change master to master_user‘username‘, master_password‘password‘;start slave;2.創建復制子用戶及其授權:GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘repl‘‘%…

java swing web_Java-JFrame-swing嵌套瀏覽器步驟

Java-JFrame-swing嵌套瀏覽器步驟一、使用swing嵌套瀏覽器要實現的功能:通過java的swing實現在一個窗體中嵌套一個瀏覽器,可以在這個瀏覽器中將另一個項目的內容顯示出來,只需要回去另一個項目首頁的url即可,這樣另一個項目就可以…

java thread safe_Java 線程安全 Thread-Safety

在 Java 的線程安全是老生常談的問題。經常是各種寫法說法一大堆,感覺很多的來源都是在面試的時候,很多考官都喜歡問線程安全的問題。起源這個問題的起源就是 Java 是支持多線程的。如果對進程和線程是什么不太清楚的話,可以惡補下大學課程《…

java 對象復制字段_利用Java反射機制實現對象相同字段的復制

一。如何實現不同類型對象之間的復制問題?1、為什么會有這個問題?近來在進行一個項目開發的時候,為了隱藏后端數據庫表結構、同時也為了配合給前端一個更友好的API接口文檔(swagger API文檔),我采用POJO來對應數據表結構&#xff…

java 類確定運行時間_java回調函數實例:實現一個測試函數運行時間的工具類

下面使用java回調函數來實現一個測試函數運行時間的工具類:如果我們要測試一個類的方法的執行時間,通常我們會這樣做:public class TestObject {/*** 一個用來被測試的方法,進行了一個比較耗時的循環*/public static void testMet…

java socket調用接口_Java中socket接口調用

最近一個項目中接口通訊這一塊主要是調用銀聯系統的socket接口,我方是客戶端,即發送請求接收返回報文的一方。在貼代碼之前,還是要了解一下關于socket的基礎知識。Socket的基本概念1.建立連接當需要建立網絡連接時,必須…

protobuf java 編譯_Maven項目中,編譯proto文件成Java類

新建Maven項目新建一個 Maven 項目:pom定義了最小的maven2元素,即:groupId,artifactId,version。 groupId:項目或者組織的唯一標志,并且配置時生成的路徑也是由此生成,如org.codehaus.mojo生成的相對路徑為&#xff1a…

java 結構體數組初始化_C數組結構體聯合體快速初始化

背景C89標準規定初始化語句的元素以固定順序出現,該順序即待初始化數組或結構體元素的定義順序。C99標準新增指定初始化(Designated Initializer),即可按照任意順序對數組某些元素或結構體某些成員進行選擇性初始化,只需指明它們所對應的數組…

java override 訪問權限_java基礎之——訪問修飾符(private/default/protected/public)

1. 訪問修飾符介紹java中的訪問修飾符包含了四種:private、default(沒有對應的保留字)、protected和public。它們的含義如下:private:如果一個元素聲明為private,那么只有同一個類下的元素才可以訪問它。default:如果一…

python中scrapy可以爬取多少數據_python中scrapy框架爬取攜程景點數據

———————————————————————————————[版權申明:本文系作者原創,轉載請注明出處]文章出處:https://blog.csdn.net/sdksdk0/article/details/82381198作者:朱培 ID:sdksdk0——————…

python灰色關聯度分析代碼_灰色關聯分析法步驟 - osc_uwnmtz9n的個人空間 - OSCHINA - 中文開源技術交流社區...

https://wenku.baidu.com/view/dc356290af1ffc4fff47ac0d.html?rec_flagdefault&sxts1538121950212利用灰色關聯分析的步驟是:1.根據分析目的確定分析指標體系,收集分析數據。設n個數據序列形成如下矩陣:其中m為指標的個數&a…

aio 系統原理 Java_Java新一代網絡編程模型AIO原理及Linux系統AIO介紹

從JDK 7版本開始,Java新加入的文件和網絡io特性稱為nio2(new io 2, 因為jdk1.4中已經有過一個nio了),包含了眾多性能和功能上的改進,其中最重要的部分,就是對異步io的支持,稱為Java AIO(asynchronous IO)。因為AIO的實…

centos mysql 5.5 art_Linux?CentOS6.5下編譯安裝MySQL?5.5.51''''

一、編譯安裝MySQL前的準備工作安裝編譯源碼所需的工具和庫yum install gcc gcc-c ncurses-devel perl安裝cmake,從http://www.cmake.org下載源碼并編譯安裝wget http://www.cmake.org/files/v2.8/cmake-2.8.10.2.tar.gztar -xzvf cmake-2.8.10.2.tar.gzcd cmake-2.…

java修改默認字符編碼_設置默認的Java字符編碼?

如何以編程方式正確設置JVM(1.5.x)使用??的默認字符編碼?我已經讀過-Dfile.encoding 以前是以往的方式去為舊的JVM …我沒有那么奢侈的原因,我不會進入。我努力了:System.setProperty("file.encoding", "UTF-8");并且屬…

java api 第一個類是_JAVA常用API:String 類的常用方法

字符串是一個對象,有很多方法可以使用1. length();返回字符串的長度String str "abcd";int len str.length();2. isEmpty(); 僅當當length()為0時返回true,否則返回falseboolean b str.isEmpty();3. getBytes();返回字符串中每個字符的ASCII碼(使用平臺…

關于java內容_關于java一些概念性的內容

PO:persistant object持久對象最形象的理解就是一個PO就是數據庫中的一條記錄。好處是可以把一條記錄作為一個對象處理,可以方便的轉為其它對象。--------------------------------------------------------------------------------BO:busin…

java訂單類_基于Java創建一個訂單類代碼實例

這篇文章主要介紹了基于Java創建一個訂單類代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下需求描述定義一個類,描述訂單信息訂單id訂單所屬用戶(用戶對象)訂單所包含的商品(不定數量個商品對…

java請假審批怎么實現_java實現請假時間判斷

筆記:需求分析:每周上班6天夏季早上8:30-12:00下午14:00-17:30冬季早上8:30-12:00下午14:30-18:00請假最低為半天按照上午8:00-12:00,下午14:00-18:00計算,包括了夏季和冬季時間,規律分布如下public String getDouble(HttpServletRequest request) throws ParseException {//參…

mariadb mysql 5.6_MySQL / MariaDB 5.5 升級到 MySQL 5.6

RHEL 及 CentOS 7 默認的資庫系統是 MariaDB 5.5 (等同 MySQL 5.5), 雖然現時 MySQL 最新版是 5.7, 但一般上升級都建議一級一級上, 而 MySQL 5.6 比 5.5 也提高了效能及提供更多功能, 以下是在 RHEL 及 CentOS 從原來的 MySQL 5.5 或 MariaDB 5.5, 升級到 MySQL 5.6 的步驟。1…

iText報表Java_(例)Java生成PDF報表 iText

// 導入IO庫類import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.FileOutputStream;import java.io.IOException;import java.io.InputStream;import java.util.ArrayList;// 導入 PO&#x…