Logstash數據遷移之es-to-kafka.conf詳細配置

在 Logstash 中配置從 Elasticsearch (ES) 讀取數據并輸出到 Kafka 是一個相對高級但強大的用法,通常用于數據遷移、重新索引、或構建新的數據管道。

下面我將詳細解釋配置文件的各個部分和細節。

核心配置文件結構 (es-to-kafka.conf)

一個完整的配置文件主要包含三個部分:input, filter (可選), 和 output

input {elasticsearch {# 輸入配置:告訴Logstash如何從ES讀取數據}
}filter {# 過濾配置(可選):對從ES讀取的數據進行加工、清洗、轉換
}output {kafka {# 輸出配置:告訴Logstash如何將數據寫入Kafka}
}

1. Input (Elasticsearch) 插件配置詳解

用以定義數據來源

input {elasticsearch {# 【必需】ES集群的地址列表hosts => ["http://localhost:9200", "http://node2:9200"] # 【必需】要查詢的索引。支持通配符(如`my-index-*`)和逗號分隔。index => "source-index-*" # 【強烈建議】查詢語句。默認是 `{ "query": { "match_all": {} } }`,即查詢所有。# 你可以根據需要添加時間范圍過濾等,避免全量同步。query => '{"query": {"range": {"@timestamp": {"gte": "now-1h/d",  # 例如:只拉取過去1小時的數據"lte": "now/d"}}}}'# 【必需】分頁大小。控制一次從ES拉取多少條數據。根據文檔大小和JVM堆內存調整。size => 1000 # 【必需】滾動API的保持時間。每次滾動查詢的上下文保持時間,應大于處理一批數據所需的時間。scroll => "5m" # 【可選】認證信息。如果ES有安全認證user => "your_elasticsearch_user"password => "your_password"# 【可選】SSL/TLS配置(如果ES開啟了HTTPS)ssl => truecacert => "/path/to/your/ca.crt" # 或使用 `ssl_certificate_verification => false` (不推薦,僅測試用)# 【可選】調度計劃。默認只運行一次。# 如果希望持續從ES拉取新數據,可以使用cron表達式,但這通常不是好主意,容易重復消費。# schedule => "* * * * *" # 每分鐘運行一次(謹慎使用!)# 【可選】用于排序的字段。建議使用唯一且遞增的字段,如`@timestamp`或自增ID,與`docinfo`配合實現斷點續傳。sort => "@timestamp:asc" # 【高級可選】啟用文檔元數據獲取。可以將ES文檔的_id, _index等信息也添加到Logstash event中。docinfo => true docinfo_target => "[@metadata][elasticsearch]" docinfo_fields => ["_index", "_type", "_id"] # 【高級可選】設置請求重試次數retry_max_attempts => 3}
}

2. Filter 插件配置(可選)

從 ES 獲取的數據已經是 JSON 格式,通常不需要復雜解析,但常用來進行一些調整。

filter {# 1. 移除不必要的字段。例如,從docinfo中獲取的元數據可能不需要發送到Kafka。mutate {remove_field => ["@version", "@timestamp", "[@metadata][elasticsearch]"]}# 2. 添加Kafka消息所需的Key或Header(在output中可以使用)# 例如,使用文檔的_id作為Kafka消息的Key,保證同一文檔始終進入同一分區。mutate {add_field => {"[@metadata][kafka_key]" => "%{[@metadata][elasticsearch][_id]}"}}mutate {rename => {"舊的字段名" => "新的字段名"# 可以同時重命名多個字段"另一個舊字段" => "另一個新字段"}}# 3. 轉換數據格式或內容# json {#   source => "message" # 如果ES里的某個字段是JSON字符串,可以在這里解析它# }# date {#   match => ["timestamp", "UNIX_MS"] # 格式化時間字段#   target => "timestamp"# }
}

重要提示@metadata 字段中的內容不會在輸出中顯示,非常適合用來做流程中的臨時變量(比如上面的 kafka_key)。

3. Output (Kafka) 插件配置詳解

用以定義數據的目的地。

output {kafka {# 【必需】Kafka集群的broker列表bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"# 【必需】目標Topic的名稱topic_id => "target-topic-name"# 【可選】消息的Key。常用于分區選擇。這里使用filter階段設置的metadata。# 如果沒有key,Kafka會使用輪詢策略分配分區。codec => "json" # 非常重要!指定消息的序列化格式為JSON。# 【可選】消息格式序列化器。`json` codec已經幫我們處理了,所以不需要單獨設置。# value_serializer => "org.apache.kafka.common.serialization.StringSerializer"# 【可選】壓縮算法,可以有效減少網絡傳輸量和存儲空間。compression_type => "snappy" # 可選 "gzip", "lz4", "snappy"# 【可選】生產者ACK機制,關系到數據可靠性。acks => "1" # "0"(不等待), "1"(等待Leader確認), "all"(等待所有ISR確認)# 【可選】批量發送設置,提高吞吐量。batch_size => 16384linger_ms => 1000 # 發送前等待更多消息加入batch的時間(毫秒)# 【可選】SSL/SASL認證(如果Kafka集群需要)ssl_truststore_location => "/path/to/kafka.client.truststore.jks"ssl_truststore_password => "password"sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='your_username' password='your_password';"sasl_mechanism => "PLAIN"security_protocol => "SASL_SSL"# 【可選】遇到錯誤(如Topic不存在)時重試次數retries => 3}# 強烈建議添加一個備用輸出(如stdout),用于調試和查看錯誤信息。stdout {codec => rubydebug}
}

完整配置示例

假設我們將 app-logs-* 索引中過去 15 分鐘的數據,遷移到名為 logstash-migration-topic 的 Kafka Topic 中,并使用文檔 ID 作為 Kafka Message Key。

input {elasticsearch {hosts => ["http://es-node1:9200"]index => "app-logs-*"query => '{"query": {"range": {"@timestamp": {"gte": "now-15m","lte": "now"}}}}'size => 500scroll => "5m"docinfo => truedocinfo_target => "[@metadata][es_doc]"schedule => "*/5 * * * *" # 每5分鐘運行一次(謹慎!可能導致數據重復)}
}filter {# 使用文檔的_id作為Kafka消息的Keymutate {add_field => {"[@metadata][kafka_key]" => "%{[@metadata][es_doc][_id]}"}}# 移除一些不必要的系統字段mutate {remove_field => ["@version", "@timestamp", "[@metadata][es_doc]"]}
}output {kafka {bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"topic_id => "logstash-migration-topic"codec => "json"compression_type => "lz4"acks => "all" # 追求高可靠性}stdout {}
}

運行命令

將上述配置保存為 es-to-kafka.conf 文件,然后使用以下命令運行 Logstash:

bin/logstash -f /path/to/your/es-to-kafka.conf --config.test_and_exit # 測試配置文件語法
bin/logstash -f /path/to/your/es-to-kafka.conf # 啟動運行

重要注意事項

  1. 性能與資源:這種操作對 ES 和 Logstash 都是 資源密集型 的。務必調整 size 參數,監控 JVM 內存和 CPU 使用率。
  2. 重復數據:默認情況下,每次運行 input 都會重新查詢。使用 schedule 會導致數據重復。要實現增量遷移,必須在 query 中使用嚴格的時間范圍或自增 ID,并記錄上次獲取的位置。
  3. 數據類型:ES 輸入插件會將整個文檔作為一個 Logstash event,message 字段就是原始的 JSON 文檔。使用 json codec 輸出可以保持其結構。
  4. 錯誤處理:網絡中斷、Kafka Topic 不存在等都可能導致任務失敗。建議在測試環境充分測試,并配置好 retriesretry_max_attempts
  5. 版本兼容性:確保你的 Logstash 版本與 ES 和 Kafka 集群版本兼容。插件可能因版本不同而參數略有差異。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/95303.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/95303.shtml
英文地址,請注明出處:http://en.pswp.cn/web/95303.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

在OracleLinux9.4系統上靜默滾動打補丁安裝Oracle19c

OracleLinux9.4系統 安裝Oracle19c 文章目錄OracleLinux9.4系統 安裝Oracle19c一、安裝準備1、yum安裝預檢查需要的包2、系統資源二、滾動安裝一、安裝準備 1、yum安裝預檢查需要的包 yum install libnsl yum install -y oracle-database-preinstall-19c # 最新的unzip yum i…

Android原生HttpURLConnection上傳圖片方案

創建上傳方法object FormUploader {private val BOUNDARY "Boundary-" System.currentTimeMillis()private const val LINE_FEED "\r\n"Throws(IOException::class)fun uploadImage(url: String, imageFile: File, params: MutableMap<String?, Str…

落葉清掃機器人cad+三維圖+設計說明書

摘 要 城市公共場所、校園等環境中&#xff0c;落葉的清掃一直是一個繁瑣而耗時的任務。傳統的人工清掃方式不僅效率低下&#xff0c;還存在人力浪費和安全隱患等問題。因此&#xff0c;研發一款能夠自主完成落葉清掃任務的機器人成為了當今研究的熱點之一。隨著科技的不斷進…

國別域名的SEO優勢:是否更利于在當地搜索引擎排名?

當你盯著搜索引擎結果頁發呆時&#xff0c;有沒有想過——憑什么那個.jp域名的網站能排在.ca前面&#xff1f;別扯什么內容質量&#xff0c;上周幫客戶優化新加坡市場時&#xff0c;親眼見著兩個內容相似度90%的頁面&#xff0c;.sg域名比.com.au在Google Singapore上高出3個排…

動態配置最佳實踐:Spring Boot 十種落地方式與回滾審計指南(含實操與避坑)

作為一名Spring Boot開發者&#xff0c;正在運維一個高可用微服務系統&#xff1a;業務需求變化頻繁&#xff0c;需要實時調整配置如數據庫連接或日志級別&#xff0c;但每次修改都得重啟應用&#xff0c;造成服務中斷和用戶投訴。這不是小麻煩&#xff0c;而是配置管理的痛點—…

vue社區網格化管理系統(代碼+數據庫+LW)

摘要 隨著城市化進程的加快&#xff0c;社區管理的復雜性逐漸增大&#xff0c;傳統的管理模式已無法滿足現代社區管理的需求。社區網格化管理系統作為一種新的管理模式&#xff0c;通過將社區劃分為多個網格單元&#xff0c;使得管理更加精細化、智能化和高效化。本論文基于Sp…

使用EasyExcel實現Excel單元格保護:自由鎖定表頭和數據行

使用EasyExcel實現Excel單元格保護&#xff1a;鎖定表頭和第二行數據 前言 在日常開發中&#xff0c;我們經常需要導出Excel文件&#xff0c;有時還需要對Excel中的某些單元格進行保護&#xff0c;防止用戶誤修改。本文將介紹如何使用EasyExcel 4.0.3實現鎖定Excel表頭和第二行…

dify docker知識庫topk最大值參數配置

1 問題說明 dify構建RAG知識庫過程中&#xff0c;通過會遇到一些默認配置不能解決的問題。 比如topk&#xff0c;topk默認最大10&#xff0c;對語義模糊的檢索&#xff0c;目標文檔可能沒進前10&#xff0c;出現在10-30區間。 所以&#xff0c;需要調整topk最大值參數。 # T…

SRE命令行兵器譜之一:精通top/htop - 從性能“體檢”到瓶頸“解剖”

SRE命令行兵器譜之一:精通top/htop - 從性能“體檢”到瓶頸“解剖” SRE的“戰場”:真實故障場景 下午三點,監控系統告警:“核心API服務響應時間(P99)飆升至5秒”。用戶已經開始在群里抱怨接口超時。這是一個典型的線上性能問題,每一秒的延遲都在影響用戶體驗和公司收…

一、Git與Gitee常見問題解答

Git與Gitee常見問題解答 Git相關問題 Q1: 什么是Git&#xff1f; A: Git是一個分布式版本控制系統&#xff0c;由Linux之父Linus Torvalds開發。它能夠跟蹤文件的變更歷史&#xff0c;支持多人協作開發&#xff0c;是現代軟件開發中不可或缺的工具。 Q2: Git的三個區域是什么&a…

kubernetes服務質量之QoS類

一、QoS類 Kubernetes的QoS&#xff08;Quality of Service&#xff09;類別允許您指定可用于應用程序的可用資源數量&#xff0c;以便更好地控制應用程序的可用性。它還允許您限制特定應用程序的資源使用率&#xff0c;以幫助保護系統的穩定性和性能。 Kubernetes 創建 Pod 時…

Redis--Lua腳本以及在SpringBoot中的使用

前言、為什么要用 Lua&#xff1f;多步操作合并為一步&#xff0c;保證原子性。減少網絡通信次數。下推邏輯到 Redis&#xff0c;提高性能。一、Redis 使用 Lua 腳本的兩種方式方式一&#xff1a;使用 --eval 執行腳本文件這種方式 需要先寫一個 Lua 文件。&#x1f4cc; 示例&…

基于 C 語言的網絡單詞查詢系統設計與實現(客戶端 + 服務器端)

一、項目概述本文將介紹一個基于 C 語言開發的網絡單詞查詢系統&#xff0c;該系統包含客戶端和服務器端兩部分&#xff0c;支持用戶注冊、登錄、單詞查詢及歷史記錄查詢等功能。系統采用 TCP socket 實現網絡通信&#xff0c;使用 SQLite 數據庫存儲用戶信息、單詞數據及查詢記…

《JAVA EE企業級應用開發》第一課筆記

《JAVA EE企業級應用開發》第一課筆記 文章目錄《JAVA EE企業級應用開發》第一課筆記課程主題&#xff1a;三層架構與SSM框架概述一、核心架構&#xff1a;三層架構 (MVC)1. 表現層 (Presentation Layer)2. 業務邏輯層 (Business Logic Layer)3. 數據持久層 (Data Persistence …

RT-DETR網絡結構

1.前言 本章主要來介紹下RT-DETR的網絡結構,參考的依舊是ultralytics實現的RT-DETR-L,代碼如下: ultralytics/ultralytics: Ultralytics YOLO ?? 首先談談我對RT-DETR的淺顯認識,他不像是YOLOv8這種純CNN實現的網絡,也不像是Vit這種以Transformer實現的網絡,他是前一…

Python 文件復制實戰指南:從基礎操作到高效自動化的最佳實踐

Python 文件復制實戰指南:從基礎操作到高效自動化的最佳實踐 1. 引言:文件復制為何是自動化的核心能力? 在日常開發與運維工作中,文件復制是一項基礎卻至關重要的操作。無論是備份日志、同步配置、部署代碼,還是批量遷移數據,都離不開對文件的精準復制與路徑管理。而 Py…

WebSocket的基本使用方法

一. 與HTTP對比WebSocket 是一種在單個 TCP 連接上實現全雙工&#xff08;雙向&#xff09;通信的網絡協議&#xff0c;它解決了傳統 HTTP 協議 “請求 - 響應” 模式的局限性&#xff0c;讓客戶端&#xff08;如瀏覽器&#xff09;和服務器能建立持久連接&#xff0c;實現實時…

架構選型:為何用對象存儲替代HDFS構建現代數據湖

在過去十余年的大數據浪潮中&#xff0c;Hadoop及其核心組件HDFS&#xff08;Hadoop分布式文件系統&#xff09;無疑是整個技術生態的基石。它開創性地解決了海量數據的分布式存儲難題&#xff0c;支撐了無數企業從數據中挖掘價值。然而&#xff0c;隨著數據規模的指數級增長以…

智能養花誰更優?WebIDE PLOY技術與裝置的結合及實踐價值 —— 精準養護的賦能路徑

一、WebIDEPLOY 技術支撐下的智能養花系統核心構成在 WebIDEPLOY 技術的框架下&#xff0c;智能養花裝置形成了一套精準協同的閉環系統&#xff0c;其核心在于通過技術整合實現 “監測 - 決策 - 執行 - 遠程交互” 的無縫銜接&#xff0c;讓植物養護更貼合城市居民的生活節奏。…

基于llama.cpp在CPU環境部署Qwen3

大家好,我是奇文王語,NLP愛好者,長期分享大模型實戰技巧,歡迎關注交流。 最近兩天在研究如何使用小規模參數的模型在CPU環境上進行落地應用,比如模型Qwen3-0.6B。開始使用Transformers庫能夠正常把模型服務進行部署起來,但是通過測試速度比較慢,用戶的體驗會比較差。 …