Logstash開啟定時任務增量同步mysql數據到es的時區問題

本文使用修改時間modify_date作為增量同步檢測字段,可檢測新增和修改,檢測不到刪除,檢測刪除請使用canal查詢binlog日志同步數據

檢測修改時間字段為varchar的時候可以先創建索引,并設置對應的mapping為(可以無視時區問題)

...
"time": {"type": "date","format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"}...

檢測修改時間字段為datetime的時候需要注意時區問題

前提:正常存儲時間,mysql是UTC+8,Logstash 和ES則都使用UTC

下面幾個測試得出的結論

使用測試4,不需要考慮客戶端連接es的時區問題(直觀上忽略es存儲的時間格式為UTC,把其當作UTC+8來用)

使用測試1,需要注意es中存儲UTC,使用客戶端client連接時需要轉換對應的事件UTC+8轉為UTC

SearchRequest searchRequest = new SearchRequest("stu_sign");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
LocalDateTime localDateTime1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
LocalDateTime localDateTime2 = LocalDateTime.of(2023, 3, 1, 8, 0, 0);
ZonedDateTime zonedDateTime1 = localDateTime1.atZone(ZoneId.of("UTC+8"));
ZonedDateTime zonedDateTime2 = localDateTime2.atZone(ZoneId.of("UTC+8"));// 將ZonedDateTime轉換為UTC時區
ZonedDateTime utcZonedDateTime1 = zonedDateTime1.withZoneSameInstant(ZoneId.of("UTC"));
ZonedDateTime utcZonedDateTime2 = zonedDateTime2.withZoneSameInstant(ZoneId.of("UTC"));searchSourceBuilder.query(QueryBuilders.rangeQuery("sing_date").gte(utcZonedDateTime1).lte(utcZonedDateTime2));
searchRequest.source(searchSourceBuilder);
try {SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);System.out.println(searchResponse.getHits().getHits());
} catch (IOException e) {e.printStackTrace();
}
測試1:

不創建mapping

設置時區都為Asia/Shanghai,ES中存儲時間均為UTC,比正常少八個小時,

追蹤日志記錄時間為UTC:

— !ruby/object:DateTime ‘2024-12-31 02:13:30.000000000 Z’

定時任務執行正常

SELECT count(*) AS count FROM (SELECT * FROM stu_sign_2023202401 WHERE modify_date > ‘2024-12-31 10:13:30’ AND modify_date < NOW() ) AS t1 LIMIT 1

配置文件為:

input {jdbc {# 配置數據庫信息jdbc_connection_string => "jdbc:mysql://188.18.66.185:3306/eschool?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_user => "root"jdbc_password => "123456"jdbc_paging_enabled => "true"jdbc_page_size => "50000"jdbc_default_timezone => "Asia/Shanghai"# mysql驅動所在位置jdbc_driver_library => "D:\environment\apache-maven-3.8.8\maven_repository\mysql\mysql-connector-java\8.0.27\mysql-connector-java-8.0.27.jar"#sql執行語句statement => "SELECT * FROM `stu_sign_2023202401` WHERE modify_date > :sql_last_value AND modify_date < NOW() "use_column_value => truetracking_column => "modify_date"tracking_column_type => "timestamp"last_run_metadata_path => "E:\software\logstash\last_run_stu_login.txt"schedule => "*/3 * * * * Asia/Shanghai"lowercase_column_names => false}
}output {elasticsearch {hosts => ["127.0.0.1:9200"]index => "stu_sign_test"# document_id => "%{id}"}stdout {codec => json_lines}
}
測試2:

使用mapping,將自動映射的date轉為其他允許的時間格式

"modify_date": {"type": "date","format": "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"}

與測試1結果完全一致

測試3:

不使用mapping,時間日期調整

mysql中時間為"modify_date":“2024-12-31 10:13:30”

ES記錄時間多8個小時:“modify_date”:“2024-12-31T18:13:30.000Z”

{"id":20120,"appeal_time":null,"teacher_name":"黃雅平","stu_id":736,"stu_name":"鄭欣欣","modify_date":"2024-12-31T18:13:30.000Z","sing_date":"2024-12-31T18:13:30.000Z","teach_time":"2024-12-31T08:00:00.000Z","teach_time_str":"20241231","stu_sign_status":0,"@timestamp":"2025-04-01T01:54:25.599Z","teacher_id":731,"@version":"1","class_num_end":null,"leave_status":null,"sign_status":0,"stu_num":"20233003","course_name":"數字系統基礎","appeal_msg":null,"course_sched_id":186641,"grade_id":"DE44AF38ADB74D9BBF42A6C8285B8285","appeal_status":null,"academy_id":471,"classroom_id":440428,"roll_call_status":0,"roll_call_date":"2024-12-31T18:13:30.000Z","classroom":"304","class_end_time":"2024-12-31 18:30:00","create_date":"2024-12-31T18:13:30.000Z","course_id":1373,"academy_superior_id":6,"late_status":null,"sign_type":null,"class_id":5407,"class_num_begin":null,"class_begin_time":"2024-12-31 16:35:00","semester_id":1,"leave_statusersss":null,"leave_statuser":null}

追蹤日志記錄為— !ruby/object:DateTime ‘2024-12-31 18:13:30.000000000 Z’

執行sql為:SELECT count(*) AS count FROM (SELECT * FROM stu_sign_2023202401 WHERE modify_date > ‘2024-12-31 18:13:30’ AND modify_date < NOW() ) AS t1 LIMIT 1

對應的配置文件為:

input {jdbc {# 配置數據庫信息jdbc_connection_string => "jdbc:mysql://188.18.66.185:3306/eschool?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_user => "root"jdbc_password => "123456"jdbc_paging_enabled => "true"jdbc_page_size => "50000"jdbc_default_timezone => "UTC"# mysql驅動所在位置jdbc_driver_library => "D:\environment\apache-maven-3.8.8\maven_repository\mysql\mysql-connector-java\8.0.27\mysql-connector-java-8.0.27.jar"#sql執行語句statement => "SELECT * FROM `stu_sign_2023202401` WHERE modify_date > :sql_last_value AND modify_date < NOW() "use_column_value => truetracking_column => "modify_date"tracking_column_type => "timestamp"last_run_metadata_path => "E:\software\logstash\last_run_stu_login.txt"schedule => "*/3 * * * * Asia/Shanghai"lowercase_column_names => false}
}
測試4:

不使用mapping,時間日期調整

mysql中時間為"modify_date":“2024-12-31 10:13:30”

ES記錄時間:“modify_date”:“2024-12-31T10:13:30.000Z”

{"teacher_id":731,"class_num_end":null,"stu_sign_status":0,"roll_call_status":0,"semester_id":1,"stu_id":749,"leave_statusersss":null,"classroom_id":440428,"course_sched_id":186641,"teacher_name":"黃雅平","create_date":"2024-12-31T10:13:30.000Z","appeal_msg":null,"appeal_status":null,"class_end_time":"2024-12-31 18:30:00","modify_date":"2024-12-31T10:13:30.000Z","class_begin_time":"2024-12-31 16:35:00","leave_statuser":null,"teach_time":"2024-12-31T00:00:00.000Z","appeal_time":null,"academy_superior_id":6,"sign_type":null,"roll_call_date":"2024-12-31T10:13:30.000Z","late_status":null,"classroom":"304","@timestamp":"2025-04-01T02:03:36.223Z","class_id":5407,"stu_num":"2024008","@version":"1","id":20107,"teach_time_str":"20241231","sign_status":0,"sing_date":"2024-12-31T10:13:30.000Z","course_name":"數字系統基礎","course_id":1373,"stu_name":"張恒","class_num_begin":null,"academy_id":471,"leave_status":null,"grade_id":"DE44AF38ADB74D9BBF42A6C8285B8285"}

追蹤日志記錄為— !ruby/object:DateTime ‘2024-12-31 10:13:30.000000000 Z’

執行sql為:SELECT count(*) AS count FROM (SELECT * FROM stu_sign_2023202401 WHERE modify_date > ‘2024-12-31 10:13:30’ AND modify_date < NOW() ) AS t1 LIMIT 1

對應的配置文件為:

input {jdbc {# 配置數據庫信息jdbc_connection_string => "jdbc:mysql://188.18.66.185:3306/eschool?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_user => "root"jdbc_password => "123456"jdbc_paging_enabled => "true"jdbc_page_size => "50000"jdbc_default_timezone => "UTC"# mysql驅動所在位置jdbc_driver_library => "D:\environment\apache-maven-3.8.8\maven_repository\mysql\mysql-connector-java\8.0.27\mysql-connector-java-8.0.27.jar"#sql執行語句statement => "SELECT * FROM `stu_sign_2023202401` WHERE modify_date > :sql_last_value AND modify_date < NOW() "use_column_value => truetracking_column => "modify_date"tracking_column_type => "timestamp"last_run_metadata_path => "E:\software\logstash\last_run_stu_login.txt"schedule => "*/3 * * * * Asia/Shanghai"lowercase_column_names => false}
}

時間配置正確符合預期需求

其中定時任務配置
schedule => “/3 * * * * Asia/Shanghai"
schedule => "
/3 * * * * UTC”
schedule => "*/3 * * * * "
效果一致

測試5:
不使用mapping,時間日期調整
與測試1相反,時間都調整為UTC的時候,es儲存的時間均多出八個小時
如下:
mysql中時間為"modify_date":“2024-12-31 18:13:30”
ES記錄時間:“modify_date”:“2024-12-31T18:13:30.000Z”
追蹤日志記錄為— !ruby/object:DateTime ‘2024-12-31 18:13:30.000000000 Z’
執行sql為:SELECT count(*) AS count FROM (SELECT * FROM stu_sign_2023202401 WHERE modify_date > ‘2024-12-31 18:13:30’ AND modify_date < NOW() ) AS t1 LIMIT 1

可自行得出結論:
jdbc_connection_string和jdbc_default_timezone需要配合使用,效果會疊加。具體為什么會這樣?懂得兄弟可以在評論區解釋一下,博主也學習學習

本文環境相關:
elasticsearch-7.16.3
kibana-7.16.3-windows-x86_64
logstash-7.16.3
mysql5.7.38

客戶端:RestHighLevelClient

 <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.16.3</version></dependency>

啟動命令:

logstash.bat -f E:\software\logstash\logstash-7.16.3\conf\stu_sign.conf

最后附上配置文件解釋
Logstash配置文件conf介紹:

input {jdbc {# mysql 數據庫鏈接jdbc_connection_string => "jdbc:mysql:localhost/database?characterEncoding=utf8"# 用戶名和密碼jdbc_user => "xxx"jdbc_password => "xxxx"# 驅動jdbc_driver_library => "D:/xx/xx/logstash-6.2.4/config/mysql-connector-java-8.0.18.jar"# 驅動類名jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_paging_enabled => "true"jdbc_page_size => "50000"# 執行的sql 文件路徑+名稱#statement_filepath => ""parameters => { "sql_last_value" => "UpdateTime" }statement => "SELECT * FROM (SELECT * FROM table1 ) t WHERE t.updatetime > :sql_last_value"# 設置監聽間隔 各字段含義(由左至右)分、時、天、月、年,全部為*默認含義為每分鐘都更新schedule => "* * * * *"# 索引類型#type => "article"# 防止自動將大小轉為小寫lowercase_column_names => false# 記錄上一次運行記錄record_last_run => true# 使用字段值use_column_value => true# 追蹤字段名tracking_column => "updatetime"# 字段類型tracking_column_type => "timestamp"# 上一次運行元數據保存路徑last_run_metadata_path => "./logstash_last_id"# 是否刪除記錄的數據clean_run => false}
}
filter {json {source => "message"remove_field => ["message"]}
}
output {elasticsearch {hosts => "http://localhost:9200/"index => "indexname"document_type => "articles"document_id => "%{articleid}"template_overwrite => true}# 這里輸出調試,正式運行時可以注釋掉stdout {codec => json_lines}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/76354.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/76354.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/76354.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

如何使用 FastAPI 構建 MCP 服務器

哎呀&#xff0c;各位算法界的小伙伴們&#xff01;今天咱們要聊聊一個超酷的話題——MCP 協議&#xff01;你可能已經聽說了&#xff0c;Anthropic 推出了這個新玩意兒&#xff0c;目的是讓 AI 代理和你的應用程序之間的對話變得更順暢、更清晰。不過別擔心&#xff0c;為你的…

【Git】-- 處理 Git 提交到錯誤分支的問題

如果你不小心把本應提交到 test 分支的代碼提交到了 master 分支&#xff08;但尚未 push&#xff09;&#xff0c;可以按照以下步驟解決&#xff1a; 方法一&#xff08;推薦&#xff09;&#xff1a;使用 git reset 和 git stash 首先&#xff0c;確保你在 master 分支&…

通用目標檢測技術選型分析報告--截止2025年4月

前言 本文撰寫了一份關于通用目標檢測&#xff08;General Object Detection&#xff09;的技術選型分析報告&#xff0c;覆蓋2000至2025年技術演進歷程&#xff0c;重點納入YOLO-World、RT-DETR、Grounding DINO等2024-2025年的最新模型。 報告將包括技術定義、行業現狀、技…

鏈路追蹤Skywalking

一、什么是Skywalking 分布式鏈路追蹤的一種方式&#xff1a;Spring Cloud SleuthZipKin&#xff0c;這種方案目前也是有很多企業在用&#xff0c;但是作為程序員要的追逐一些新奇的技術&#xff0c;Skywalking作為后起之秀也是值得大家去學習的。 Skywalking是一個優秀的國產…

websocket獲取客服端真實ip

在websocket建立連接時,獲取訪問客戶端的真實ip 1. websocket建立連接過程 2. pom依賴 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>3. 添加配置,握…

NSSCTF(MISC)—[justCTF 2020]pdf

相應的做題地址&#xff1a;https://www.nssctf.cn/problem/920 binwalk分離 解壓文件2AE59A.zip mutool 得到一張圖片 B5F31內容 B5FFD內容 轉換成圖片 justCTF{BytesAreNotRealWakeUpSheeple}

部分國產服務器CPU及內存性能測試情況

近日對部分國產服務器進行了CPU和內存的性能測試&#xff0c; 服務器包括華錕振宇、新華三和中興三家&#xff0c;CPU包括鯤鵬、海光和Intel&#xff0c;初步測試結果如下&#xff1a; 服務器廠商四川華錕振宇新華三中興中興服務器HuaKun TG225 B1R4930 G5R5930 G2R5300 G4操作…

【無標題】Scala函數基礎

函數和方法的區別 1&#xff09; 核心概念 &#xff08;1&#xff09; 為完成某一功能的程序語句的集合&#xff0c;稱為函數。 &#xff08;2&#xff09; 類中的函數稱之方法。 2&#xff09; 案例實操 &#xff08;1&#xff09; Scala 語言可以在任何的語法結構中聲明…

uniapp -- 列表垂直方向拖拽drag組件

背景 需要在小程序中實現拖拽排序功能,所以就用到了m-drag拖拽組件,在開發的過程中,發現該組件在特殊的場景下會有些問題,并對其進行了拓展。 效果 組件代碼 <template><!-- 創建一個垂直滾動視圖,類名為m-drag --><scroll

conda安裝python 遇到 pip is configured with locations that require TLS/SSL問題本質解決方案

以前寫了一篇文章&#xff0c;不過不是專門為了解決這個問題的&#xff0c;但是不能訪問pip install 不能安裝來自https 協議的包問題幾乎每次都出現&#xff0c;之前解決方案只是治標不治本 https://blog.csdn.net/wangsenling/article/details/130194456???????https…

【初階數據結構】隊列

文章目錄 目錄 一、概念與結構 二、隊列的實現 隊列的定義 1.初始化 2.入隊列 3.判斷隊列是否為空 4.出隊列 5.取隊頭數據 6.取隊尾數據 7.隊列有效個數 8.銷毀隊列 三.完整源碼 總結 一、概念與結構 概念&#xff1a;只允許在一端進行插入數據操作&#xff0c;在另一端進行刪除…

Apache Shiro 全面指南:從入門到高級應用

一、Shiro 概述與核心架構 1.1 什么是 Shiro&#xff1f; Apache Shiro 是一個強大且易用的 Java 安全框架&#xff0c;它提供了認證&#xff08;Authentication&#xff09;、授權&#xff08;Authorization&#xff09;、加密&#xff08;Cryptography&#xff09;和會話管…

es 3期 第28節-深入掌握集群組建與集群設置

#### 1.Elasticsearch是數據庫&#xff0c;不是普通的Java應用程序&#xff0c;傳統數據庫需要的硬件資源同樣需要&#xff0c;提升性能最有效的就是升級硬件。 #### 2.Elasticsearch是文檔型數據庫&#xff0c;不是關系型數據庫&#xff0c;不具備嚴格的ACID事務特性&#xff…

Android學習總結之通信篇

一、Binder跨進程通信的底層實現細節&#xff08;掛科率35%&#xff09; 高頻問題&#xff1a;“Binder如何實現一次跨進程方法調用&#xff1f;”   候選人常見錯誤&#xff1a;   僅回答“通過Binder驅動傳輸數據”&#xff0c;缺乏對內存映射和線程調度的描述混淆Binde…

數據結構C語言練習(兩個棧實現隊列)

一、引言 在數據結構的學習中&#xff0c;我們經常會遇到一些有趣的問題&#xff0c;比如如何用一種數據結構去實現另一種數據結構的功能。本文將深入探討 “用棧實現隊列” 這一經典問題&#xff0c;詳細解析解題思路、代碼實現以及每個函數的作用&#xff0c;幫助讀者更好地…

前端如何導入谷歌字體庫

#谷歌字體庫內容豐富&#xff0c;涵蓋上千種多語言支持的字體&#xff0c;學習導入谷歌字體庫來增加網站的閱讀性&#xff0c;是必不可少的一項技能# 1&#xff0c;前往谷歌字體網站 要會魔法&#xff0c;裸連很卡 2&#xff0c; 尋找心儀字體 Googles Fonts下面的filters可…

SnapdragonCamera驍龍相機源碼解析

驍龍相機是高通開發的一個測試系統攝像頭的demo&#xff0c;代碼完善&#xff0c;功能強大。可以配合Camera驅動進行功能聯調。 很多邏輯代碼在CaptureModule.java里。 CaptureModule有8000多行&#xff0c;包羅萬象。 涉及到界面顯示要結合CaptureUI.java 一起來實現。 Ca…

多線程猜數問題

題目&#xff1a;線程 A 生成隨機數&#xff0c;另外兩個線程來猜數&#xff0c;線程 A 可以告訴猜的結果是大還是小&#xff0c;兩個線程都猜對后&#xff0c;游戲結束&#xff0c;編寫代碼完成。 一、Semaphore 多個線程可以同時操作同一信號量&#xff0c;由此實現線程同步…

seq2seq

理解 transformer 中的 encoder decoder 詳細的 transformer 教程見&#xff1a;【極速版 – 大模型入門到進階】Transformer 文章目錄 &#x1f30a; Encoder: 給一排向量輸出另外一排向量&#x1f30a; Encoder vs. Decoder: multi-head attention vs. masked multi-head at…

Proxmox pct 部署ubuntu

pct 前言 PCT(Proxmox Container Tool)是 PVE 中用于管理 Linux 容器(LXC)的命令行工具。通過 PCT,用戶可以執行各種容器管理任務,例如創建新的容器、啟動和停止容器、更新容器、安裝軟件包、導出和導入容器等。PCT 提供了與 Web 界面相同的功能,但通過命令行進行操作,…