使用Logstash將數據從MySQL同步至Elasticsearch(有坑)

文章目錄

  • 一、準備工作
    • 1、安裝elasticSearch+kibana
    • 2、安裝MySQL
    • 3、安裝Logstash
  • 二、全量同步
    • 1、準備MySQL數據與表
    • 2、上傳mysql-connector-java.jar
    • 3、啟動Logstash
    • 4、修改logstash.conf文件
    • 5、修改full_jdbc.sql文件
    • 6、打開Kibana創建索引和映射
    • 7、重啟logstash進行全量同步
    • 8、踩坑
      • (1)報錯
  • 三、增量同步
    • 1、修改增量配置
    • 2、新建increment_jdbc.sql文件
    • 3、重啟容器
    • 4、測試
    • 5、同步原理

一、準備工作

1、安裝elasticSearch+kibana

我們此處用的es和kibana版本是7.4.0版本的。
docker安裝elasticSearch+kibana

2、安裝MySQL

docker安裝mysql-簡單無坑

3、安裝Logstash

logstash就是一個具備實時數據傳輸能力的管道,負責將數據信息從管道的輸入端傳輸到管道的輸出端;與此同時這根管道還可以根據自己的需求在inuput --output中間加上濾網,Logstash內置了幾十種插件,可以滿足各種應用場景。

logstash官方插件 logstash-input-jdbc集成在logstash(5.X之后)中,通過配置文件實現mysql與elasticsearch數據同步。
能實現mysql數據全量和增量的數據同步,且能實現定時同步。

在這里插入圖片描述

# 拉取logstach
docker pull logstash:8.5.2

二、全量同步

全量同步是指全部將數據同步到es,通常是剛建立es,第一次同步時使用。

1、準備MySQL數據與表

CREATE TABLE `product` (`id` int NOT NULL COMMENT 'id',`name` varchar(255) DEFAULT NULL,`price` decimal(10,2) DEFAULT NULL,`create_at` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (1, '小米手機', 33.00, '1');
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (2, '長虹手機', 2222.00, '2');
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (3, '華為電腦', 3333.00, '3');
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (4, '小米電腦', 333.30, '4');

2、上傳mysql-connector-java.jar

把mysql-connector-java-8.0.21.jar上傳到logstach服務器,

3、啟動Logstash

# 編輯logstash.yml
vi /usr/local/logstash/config/logstash.yml# 內容,需要修改es地址
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://172.17.0.3:9200" ]
# 自定義網絡(可以解決網絡不一致的問題)
#docker network create --subnet=172.188.0.0/16  czbkNetwork
# 啟動 logstash
docker run --name logstash -v /usr/local/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml -v /usr/local/logstash/config/conf.d/:/usr/share/logstash/pipeline/   -v /root/mysql-connector-java-8.0.21.jar:/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.48.jar -d  d7102f8c625d# 查看日志
docker logs -f --tail=200 c1d20ebf76c3

4、修改logstash.conf文件

cd /usr/local/logstash/config/conf.d
vi logstash.conf

stdin從標準輸入讀取事件。

默認情況下,每一行讀取為一個事件

input { stdin {}#使用jdbc插件jdbc {# mysql數據庫驅動#jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.48.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"# mysql數據庫鏈接,數據庫名jdbc_connection_string => "jdbc:mysql://172.17.0.2:3306/shop?allowMultiQueries=true&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true"# mysql數據庫用戶名,密碼jdbc_user => "root"jdbc_password => "root"# 分頁jdbc_paging_enabled => "true"# 分頁大小jdbc_page_size => "50000"# sql語句執行文件,也可直接使用 statement => 'select * from t'statement_filepath => "/usr/share/logstash/pipeline/sql/full_jdbc.sql"#statement => " select *  from product  where id  <=100 "}}# 過濾部分(不是必須項)
filter {json {source => "message"remove_field => ["message"]}
}# 輸出部分
output {elasticsearch {# elasticsearch索引名index => "product"# elasticsearch的ip和端口號hosts => ["172.17.0.3:9200"]# 同步mysql中數據id作為elasticsearch中文檔iddocument_id => "%{id}"}stdout {codec => json_lines}
}

5、修改full_jdbc.sql文件

mkdir /usr/local/logstash/config/conf.d/sql
cd /usr/local/logstash/config/conf.d/sql
vi full_jdbc.sql

full_jdbc.sql內容如下

SELECTid,TRIM( REPLACE ( name, ' ', '' ) ) AS productname,price
FROM product

6、打開Kibana創建索引和映射

注意!mysql—>logstash—>es
如果創建的映射是有大寫的時候,es會自動轉成小寫
而且查看映射數據結構的時候會出現兩個相同的字段(productname和productName)
這樣就導致我們自己定義的映射無法使用,而有數據的是es自動生成的那個小寫

PUT product
{"settings": {"number_of_shards": 1,"number_of_replicas": 1},"mappings": {"properties": {"productname": {"type": "text"},"price": {"type": "double"} }}
}

如果對映射沒有硬性要求,可以忽略當前步驟,會自動創建索引。

# 當前在es中是沒有數據的
GET product/_search

7、重啟logstash進行全量同步

# 重啟
docker restart c1d20ebf76c3
# 查看日志
docker logs -f --tail=200 c1d20ebf76c3

發現mysql中的數據已經同步至logstash中了。

8、踩坑

(1)報錯

Error response from daemon: Cannot restart container 3849f947e115: driver failed programming external connectivity on endpoint logstash (60f5d9678218dc8d19bc8858fb1a195f4ebee294cff23d499a28612019a0ff78): (iptables failed: iptables --wait -t nat -A DOCKER -p tcp -d 0/0 --dport 4567 -j DNAT --to-destination 172.188.0.77:4567 ! -i br-413b460a0fc8: iptables: No chain/target/match by that name.

原因為:在啟動firewalld之后,iptables被激活,
此時沒有docker chain,重啟docker后被加入到iptable里面

解決方案:
systemctl restart docker

三、增量同步

1、修改增量配置

修改上面的logstash.conf文件

input { stdin {}#使用jdbc插件jdbc {# mysql數據庫驅動#jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.48.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"# mysql數據庫鏈接,數據庫名jdbc_connection_string => "jdbc:mysql://172.188.0.15:3306/shop?characterEncoding=UTF-8&useSSL=false"# mysql數據庫用戶名,密碼jdbc_user => "root"jdbc_password => "root"# 設置監聽間隔  各字段含義(分、時、天、月、年),全部為*默認含義為每分鐘更新一次# /2* * * *表示每隔2分鐘執行一次,依次類推schedule => "* * * * *"# 分頁jdbc_paging_enabled => "true"# 分頁大小jdbc_page_size => "50000"# sql語句執行文件,也可直接使用 statement => 'select * from t'statement_filepath => "/usr/share/logstash/pipeline/sql/increment_jdbc.sql"#上一個sql_last_value值的存放文件路徑, 必須要在文件中指定字段的初始值#last_run_metadata_path => "./config/station_parameter.txt"#設置時區,此處更新sql_last_value查詢的時區,sql_last_value還是默認UTCjdbc_default_timezone => "Asia/Shanghai"#使用其它字段追蹤,而不是用時間#use_column_value => true#追蹤的字段#tracking_column => idtracking_column_type => "timestamp"}}# 過濾部分(不是必須項)
filter {json {source => "message"remove_field => ["message"]}
}# 輸出部分
output {elasticsearch {# elasticsearch索引名index => "product"# elasticsearch的ip和端口號hosts => ["172.188.0.88:9200"]# 同步mysql中數據id作為elasticsearch中文檔iddocument_id => "%{id}"}stdout {codec => json_lines}
}

2、新建increment_jdbc.sql文件

/usr/local/logstash/config/conf.d/sql目錄下新建increment_jdbc.sql文件

cd /usr/local/logstash/config/conf.d/sql
vi increment_jdbc.sql

increment_jdbc.sql內容如下:

此處sql盡量保持與全量一致Select后的

SELECTid,TRIM( REPLACE ( product_name, ' ', '' ) ) AS productname,priceFROM product where update_time > :sql_last_value

3、重啟容器

# 啟動
docker restart 容器id

4、測試

數據庫插入一條數據之后,會自動同步至es

5、同步原理

#進入容器
docker exec -it 4f95a47f12de /bin/bash
#查看記錄點
cat /usr/share/logstash/.logstash_jdbc_last_run

last_run_metadata_path=>“/usr/share/logstash/.logstash_jdbc_last_run”

在容器里面的/usr/share/logstash/路徑下的隱藏文件.logstash_jdbc_last_run中記錄了全量同步的UTC時間

每次同步完成記錄該時間(重要)
在這里插入圖片描述
注意
logstash_jdbc_last_run默認是沒有的,執行增量后創建
文件也是可以刪除的
容器重啟會自動創建

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/38671.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/38671.shtml
英文地址,請注明出處:http://en.pswp.cn/news/38671.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

TCP/IP協議追層分析物理層(第三十九課)

TCP/IP協議追層分析物理層(第三十九課) 1 物理層:建立、維護、斷開物理連接,定義了接口及介質,實現了比特流的傳輸。 1、傳輸介質分類 有線介質:網線(雙絞線)、光纖 無線介質:無線電 微波 激光 紅外線 2、雙絞線分類: 五類cat5: 適用于100Mbps 超五類cat5e:適用于…

Qt掃盲- Graphics View框架理論綜述

Graphics View框架理論綜述 一、概述二、Graphics View 體系結構1. The Scene2. The View3. 圖元 Item 三、圖形視圖坐標系統1. 圖元Item的坐標2. Scene Scene坐標3. View 視圖坐標4. 坐標映射 四、關鍵特性1. 縮放和旋轉2. 打印3. 拖放4. 鼠標指針和 提示5. 動畫6. OpenGL渲染…

【100天精通python】Day35:一文掌握GUI界面編程基本操作

目錄 專欄導讀 1 GUI 編程概述 1.1 為什么需要GUI&#xff1f; 1.2 常見的GUI編程工具和庫 1.3 GUI應用程序的組成和架構 2 使用Tkinter 庫 進行GUI編程 2.1 使用Tkinter庫進行GUI編程的基本流程 2.2 使用Tkinter庫進行GUI編程 2.2.1 導入Tkinter庫 2.2.2 添加標簽和…

繪制世界地圖or中國地圖

寫在前面 在8月初,自己需要使用中國地圖的圖形,自己就此也查詢相關的教程,自己也做一下小小總結,希望對自己和同學們有所幫助。 最終圖形 這個系列從2022年開始,一直更新使用R語言分析數據及繪制精美圖形。小杜的生信筆記主要分享小杜學習日常!如果,你對此感興趣可以加…

Flutter Engine編譯環境安裝

前言 根據設置引擎開發環境的描述&#xff0c;確保有以下可用依賴項&#xff1a; Linux、macOS 或 Windows。 Linux 支持 Android 和 Fuchsia 的交叉編譯工件&#xff0c;但不支持 iOS。macOS 支持 Android 和 iOS 的交叉編譯工件。Windows 不支持任何 Android、Fuchsia 或 i…

MySQL存儲結構及索引

文章目錄 MySQL結構1.2存儲引擎介紹1.3存儲引擎特點InnoDB邏輯存儲結構 MyISAMMemory區別及特點存儲引擎選擇 索引索引概述索引結構BTreeHash索引分類聚集索引&二級索引索引語法SQL性能分析索引優化最左前綴法則范圍查詢字符串不加引號模糊查詢or連接條件數據分布影響覆蓋索…

達夢數據庫dbms_stats包的操作實踐記錄

索引的統計信息收集 GATHER_INDEX_STATSindex_stats_show 根據模式名&#xff0c;索引名獲得該索引的統計信息。用于經過 GATHER_TABLE_STATS、GATHER_INDEX_STATS 或 GATHER_SCHEMA_STATS 收集之后展示。返回兩個結果集&#xff1a;一個是索引的統計信息&#xff1b;另一個是…

Kotlin優點及為什么使用Kotlin

文章目錄 一 Hello Kotlin二 Kotlin優點三 團隊為什么采用 Kotlin 一 Hello Kotlin Kotlin和Andriod 二 Kotlin優點 三 團隊為什么采用 Kotlin

如何從PHP 獲取絕對路徑、文檔根目錄、基本 URL

根據您的服務器配置,獲取正確的路徑信息可能具有挑戰性。例如,PHP 并沒有直接提供一個變量來返回站點基本 URL。以下是一些代碼片段,可以幫助您獲取絕對路徑、文檔根目錄和基本 URL。 獲取絕對路徑 如果您的腳本位于 /path/directory/ 目錄中,您可以使用以下代碼片段來獲…

Mendix 基礎審計模塊介紹

一、前言 作為售前顧問&#xff0c;幫助客戶選型低代碼產品是日常工作。考察一家低代碼產品的好壞&#xff0c;其中一個維度就是產品的成熟度。產品成熟度直接影響產品在使用中的穩定性和用戶體驗&#xff0c;對于新工具導入和可持續運用至關重要。 那怎么考察一個產品是否成…

【校招VIP】java語言考點之ConcurrentHashMap1.7和1.8

考點介紹&#xff1a; ConcurrentHashMap是JAVA校招面試的熱門考點&#xff0c;主要集中在1.7和1.8的底層結構和相關的性能提高。 理解這個考點要從map本身的并發問題出發&#xff0c;再到hashTable的低性能并發安全&#xff0c;引申到ConcurrentHashMap的分塊處理。同時要理解…

JAVA工具類Collections

// 【Collections】:集合的工具類 對集合進行排序 主要針對類類型 // 使用sort方法 // 1. 在需要排序的實體類中實現 Comparable接口 重寫compareTo方法 // 1.動態綁定 向下轉型 // 2.基本數據類型 this.id-s1.id 升序 // 3.類類型 this.n…

【C++】做一個飛機空戰小游戲(八)——生成敵方炮彈(rand()和srand()函數應用)

[導讀]本系列博文內容鏈接如下&#xff1a; 【C】做一個飛機空戰小游戲(一)——使用getch()函數獲得鍵盤碼值 【C】做一個飛機空戰小游戲(二)——利用getch()函數實現鍵盤控制單個字符移動【C】做一個飛機空戰小游戲(三)——getch()函數控制任意造型飛機圖標移動 【C】做一個飛…

SpringBoot中的可擴展接口

目錄 # 背景 # 可擴展的接口啟動調用順序圖 # ApplicationContextInitializer # BeanDefinitionRegistryPostProcessor # BeanFactoryPostProcessor # InstantiationAwareBeanPostProcessor # SmartInstantiationAwareBeanPostProcessor # BeanFactoryAware # Applicati…

炬芯科技發布全新第二代智能手表芯片,引領腕上新趨勢!

2023年7月&#xff0c;炬芯科技宣布全新第二代智能手表芯片正式發布。自2021年底炬芯科技推出第一代的智能手表芯片開始便快速獲得了市場廣泛認可和品牌客戶的普遍好評。隨著技術的不斷創新和突破&#xff0c;為了更加精準地滿足市場多元化的變幻和用戶日益增長的體驗需求&…

Jmeter-壓力測試工具

文章目錄 Jmeter快速入門1.1.下載1.2.解壓1.3.運行 2.快速入門2.1.設置中文語言2.2.基本用法 Jmeter快速入門 1s內發送大量請求&#xff0c;模擬高QPS&#xff0c;用以測試網站能承受的壓力有多大 Jmeter依賴于JDK&#xff0c;所以必須確保當前計算機上已經安裝了JDK&#xff0…

Android Shape 的使用

目錄 什么是Shape? shape屬性 子標簽屬性 corners &#xff08;圓角&#xff09; solid &#xff08;填充色&#xff09; gradient &#xff08;漸變&#xff09; stroke &#xff08;描邊&#xff09; padding &#xff08;內邊距&#xff09; size &#xff08;大小…

CentOS系統環境搭建(三)——Centos7安裝DockerDocker Compose

centos系統環境搭建專欄&#x1f517;點擊跳轉 Centos7安裝Docker&Docker Compose 使用 yum 安裝Docker 內核 [rootVM-4-17-centos ~]# uname -r 3.10.0-1160.88.1.el7.x86_64Docker 要求 CentOS 系統的內核版本高于 3.10 更新 yum yum update安裝需要的軟件包&#x…

在Windows Server 2008上啟用自動文件夾備份

要在Windows Server 2008上啟用自動文件夾備份&#xff0c;您可以使用內置的Windows備份功能。下面是如何設置它的方法&#xff1a; 1. 點擊“開始”按鈕并選擇“服務器管理器”&#xff0c;打開“服務器管理器”。 2. 在“服務器管理器”窗口中&#xff0c;單擊左側窗格中的“…

Python學習筆記_基礎篇(六)_Set集合,函數,深入拷貝,淺入拷貝,文件處理

1、Set基本數據類型 a、set集合&#xff0c;是一個無序且不重復的元素集合 class set(object):"""set() -> new empty set objectset(iterable) -> new set objectBuild an unordered collection of unique elements."""def add(self, *a…