文章目錄
一、準備工作
1、安裝elasticSearch+kibana
我們此處用的es和kibana版本是7.4.0
版本的。
docker安裝elasticSearch+kibana
2、安裝MySQL
docker安裝mysql-簡單無坑
3、安裝Logstash
logstash就是一個具備實時數據傳輸能力的管道,負責將數據信息從管道的輸入端傳輸到管道的輸出端;與此同時這根管道還可以根據自己的需求在inuput --output中間加上濾網,Logstash內置了幾十種插件,可以滿足各種應用場景。
logstash官方插件 logstash-input-jdbc
集成在logstash(5.X之后)中,通過配置文件實現mysql與elasticsearch數據同步。
能實現mysql數據全量和增量的數據同步,且能實現定時同步。
# 拉取logstach
docker pull logstash:8.5.2
二、全量同步
全量同步是指全部將數據同步到es,通常是剛建立es,第一次同步時使用。
1、準備MySQL數據與表
CREATE TABLE `product` (`id` int NOT NULL COMMENT 'id',`name` varchar(255) DEFAULT NULL,`price` decimal(10,2) DEFAULT NULL,`create_at` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (1, '小米手機', 33.00, '1');
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (2, '長虹手機', 2222.00, '2');
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (3, '華為電腦', 3333.00, '3');
INSERT INTO `shop`.`product`(`id`, `name`, `price`, `create_at`) VALUES (4, '小米電腦', 333.30, '4');
2、上傳mysql-connector-java.jar
把mysql-connector-java-8.0.21.jar上傳到logstach服務器,
3、啟動Logstash
# 編輯logstash.yml
vi /usr/local/logstash/config/logstash.yml# 內容,需要修改es地址
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://172.17.0.3:9200" ]
# 自定義網絡(可以解決網絡不一致的問題)
#docker network create --subnet=172.188.0.0/16 czbkNetwork
# 啟動 logstash
docker run --name logstash -v /usr/local/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml -v /usr/local/logstash/config/conf.d/:/usr/share/logstash/pipeline/ -v /root/mysql-connector-java-8.0.21.jar:/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.48.jar -d d7102f8c625d# 查看日志
docker logs -f --tail=200 c1d20ebf76c3
4、修改logstash.conf文件
cd /usr/local/logstash/config/conf.d
vi logstash.conf
stdin從標準輸入讀取事件。
默認情況下,每一行讀取為一個事件
input { stdin {}#使用jdbc插件jdbc {# mysql數據庫驅動#jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.48.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"# mysql數據庫鏈接,數據庫名jdbc_connection_string => "jdbc:mysql://172.17.0.2:3306/shop?allowMultiQueries=true&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true"# mysql數據庫用戶名,密碼jdbc_user => "root"jdbc_password => "root"# 分頁jdbc_paging_enabled => "true"# 分頁大小jdbc_page_size => "50000"# sql語句執行文件,也可直接使用 statement => 'select * from t'statement_filepath => "/usr/share/logstash/pipeline/sql/full_jdbc.sql"#statement => " select * from product where id <=100 "}}# 過濾部分(不是必須項)
filter {json {source => "message"remove_field => ["message"]}
}# 輸出部分
output {elasticsearch {# elasticsearch索引名index => "product"# elasticsearch的ip和端口號hosts => ["172.17.0.3:9200"]# 同步mysql中數據id作為elasticsearch中文檔iddocument_id => "%{id}"}stdout {codec => json_lines}
}
5、修改full_jdbc.sql文件
mkdir /usr/local/logstash/config/conf.d/sql
cd /usr/local/logstash/config/conf.d/sql
vi full_jdbc.sql
full_jdbc.sql內容如下
SELECTid,TRIM( REPLACE ( name, ' ', '' ) ) AS productname,price
FROM product
6、打開Kibana創建索引和映射
注意!mysql—>logstash—>es
如果創建的映射是有大寫的時候,es會自動轉成小寫
而且查看映射數據結構的時候會出現兩個相同的字段(productname和productName)
這樣就導致我們自己定義的映射無法使用,而有數據的是es自動生成的那個小寫
PUT product
{"settings": {"number_of_shards": 1,"number_of_replicas": 1},"mappings": {"properties": {"productname": {"type": "text"},"price": {"type": "double"} }}
}
如果對映射沒有硬性要求,可以忽略當前步驟,會自動創建索引。
# 當前在es中是沒有數據的
GET product/_search
7、重啟logstash進行全量同步
# 重啟
docker restart c1d20ebf76c3
# 查看日志
docker logs -f --tail=200 c1d20ebf76c3
發現mysql中的數據已經同步至logstash中了。
8、踩坑
(1)報錯
Error response from daemon: Cannot restart container 3849f947e115: driver failed programming external connectivity on endpoint logstash (60f5d9678218dc8d19bc8858fb1a195f4ebee294cff23d499a28612019a0ff78): (iptables failed: iptables --wait -t nat -A DOCKER -p tcp -d 0/0 --dport 4567 -j DNAT --to-destination 172.188.0.77:4567 ! -i br-413b460a0fc8: iptables: No chain/target/match by that name.
原因為:在啟動firewalld之后,iptables被激活,
此時沒有docker chain,重啟docker后被加入到iptable里面
解決方案:
systemctl restart docker
三、增量同步
1、修改增量配置
修改上面的logstash.conf文件
input { stdin {}#使用jdbc插件jdbc {# mysql數據庫驅動#jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.48.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"# mysql數據庫鏈接,數據庫名jdbc_connection_string => "jdbc:mysql://172.188.0.15:3306/shop?characterEncoding=UTF-8&useSSL=false"# mysql數據庫用戶名,密碼jdbc_user => "root"jdbc_password => "root"# 設置監聽間隔 各字段含義(分、時、天、月、年),全部為*默認含義為每分鐘更新一次# /2* * * *表示每隔2分鐘執行一次,依次類推schedule => "* * * * *"# 分頁jdbc_paging_enabled => "true"# 分頁大小jdbc_page_size => "50000"# sql語句執行文件,也可直接使用 statement => 'select * from t'statement_filepath => "/usr/share/logstash/pipeline/sql/increment_jdbc.sql"#上一個sql_last_value值的存放文件路徑, 必須要在文件中指定字段的初始值#last_run_metadata_path => "./config/station_parameter.txt"#設置時區,此處更新sql_last_value查詢的時區,sql_last_value還是默認UTCjdbc_default_timezone => "Asia/Shanghai"#使用其它字段追蹤,而不是用時間#use_column_value => true#追蹤的字段#tracking_column => idtracking_column_type => "timestamp"}}# 過濾部分(不是必須項)
filter {json {source => "message"remove_field => ["message"]}
}# 輸出部分
output {elasticsearch {# elasticsearch索引名index => "product"# elasticsearch的ip和端口號hosts => ["172.188.0.88:9200"]# 同步mysql中數據id作為elasticsearch中文檔iddocument_id => "%{id}"}stdout {codec => json_lines}
}
2、新建increment_jdbc.sql文件
/usr/local/logstash/config/conf.d/sql目錄下新建increment_jdbc.sql文件
cd /usr/local/logstash/config/conf.d/sql
vi increment_jdbc.sql
increment_jdbc.sql內容如下:
此處sql盡量保持與全量一致Select后的
SELECTid,TRIM( REPLACE ( product_name, ' ', '' ) ) AS productname,priceFROM product where update_time > :sql_last_value
3、重啟容器
# 啟動
docker restart 容器id
4、測試
數據庫插入一條數據之后,會自動同步至es
5、同步原理
#進入容器
docker exec -it 4f95a47f12de /bin/bash
#查看記錄點
cat /usr/share/logstash/.logstash_jdbc_last_run
last_run_metadata_path=>“/usr/share/logstash/.logstash_jdbc_last_run”
在容器里面的/usr/share/logstash/路徑下的隱藏文件.logstash_jdbc_last_run中記錄了全量同步的UTC時間
每次同步完成記錄該時間(重要)
注意
logstash_jdbc_last_run默認是沒有的,執行增量后創建
文件也是可以刪除的
容器重啟會自動創建