在 Logstash 中配置從 Elasticsearch (ES) 讀取數據并輸出到 Kafka 是一個相對高級但強大的用法,通常用于數據遷移、重新索引、或構建新的數據管道。
下面我將詳細解釋配置文件的各個部分和細節。
核心配置文件結構 (es-to-kafka.conf
)
一個完整的配置文件主要包含三個部分:input
, filter
(可選), 和 output
。
input {elasticsearch {# 輸入配置:告訴Logstash如何從ES讀取數據}
}filter {# 過濾配置(可選):對從ES讀取的數據進行加工、清洗、轉換
}output {kafka {# 輸出配置:告訴Logstash如何將數據寫入Kafka}
}
1. Input (Elasticsearch) 插件配置詳解
用以定義數據來源
input {elasticsearch {# 【必需】ES集群的地址列表hosts => ["http://localhost:9200", "http://node2:9200"] # 【必需】要查詢的索引。支持通配符(如`my-index-*`)和逗號分隔。index => "source-index-*" # 【強烈建議】查詢語句。默認是 `{ "query": { "match_all": {} } }`,即查詢所有。# 你可以根據需要添加時間范圍過濾等,避免全量同步。query => '{"query": {"range": {"@timestamp": {"gte": "now-1h/d", # 例如:只拉取過去1小時的數據"lte": "now/d"}}}}'# 【必需】分頁大小。控制一次從ES拉取多少條數據。根據文檔大小和JVM堆內存調整。size => 1000 # 【必需】滾動API的保持時間。每次滾動查詢的上下文保持時間,應大于處理一批數據所需的時間。scroll => "5m" # 【可選】認證信息。如果ES有安全認證user => "your_elasticsearch_user"password => "your_password"# 【可選】SSL/TLS配置(如果ES開啟了HTTPS)ssl => truecacert => "/path/to/your/ca.crt" # 或使用 `ssl_certificate_verification => false` (不推薦,僅測試用)# 【可選】調度計劃。默認只運行一次。# 如果希望持續從ES拉取新數據,可以使用cron表達式,但這通常不是好主意,容易重復消費。# schedule => "* * * * *" # 每分鐘運行一次(謹慎使用!)# 【可選】用于排序的字段。建議使用唯一且遞增的字段,如`@timestamp`或自增ID,與`docinfo`配合實現斷點續傳。sort => "@timestamp:asc" # 【高級可選】啟用文檔元數據獲取。可以將ES文檔的_id, _index等信息也添加到Logstash event中。docinfo => true docinfo_target => "[@metadata][elasticsearch]" docinfo_fields => ["_index", "_type", "_id"] # 【高級可選】設置請求重試次數retry_max_attempts => 3}
}
2. Filter 插件配置(可選)
從 ES 獲取的數據已經是 JSON 格式,通常不需要復雜解析,但常用來進行一些調整。
filter {# 1. 移除不必要的字段。例如,從docinfo中獲取的元數據可能不需要發送到Kafka。mutate {remove_field => ["@version", "@timestamp", "[@metadata][elasticsearch]"]}# 2. 添加Kafka消息所需的Key或Header(在output中可以使用)# 例如,使用文檔的_id作為Kafka消息的Key,保證同一文檔始終進入同一分區。mutate {add_field => {"[@metadata][kafka_key]" => "%{[@metadata][elasticsearch][_id]}"}}mutate {rename => {"舊的字段名" => "新的字段名"# 可以同時重命名多個字段"另一個舊字段" => "另一個新字段"}}# 3. 轉換數據格式或內容# json {# source => "message" # 如果ES里的某個字段是JSON字符串,可以在這里解析它# }# date {# match => ["timestamp", "UNIX_MS"] # 格式化時間字段# target => "timestamp"# }
}
重要提示:@metadata
字段中的內容不會在輸出中顯示,非常適合用來做流程中的臨時變量(比如上面的 kafka_key
)。
3. Output (Kafka) 插件配置詳解
用以定義數據的目的地。
output {kafka {# 【必需】Kafka集群的broker列表bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"# 【必需】目標Topic的名稱topic_id => "target-topic-name"# 【可選】消息的Key。常用于分區選擇。這里使用filter階段設置的metadata。# 如果沒有key,Kafka會使用輪詢策略分配分區。codec => "json" # 非常重要!指定消息的序列化格式為JSON。# 【可選】消息格式序列化器。`json` codec已經幫我們處理了,所以不需要單獨設置。# value_serializer => "org.apache.kafka.common.serialization.StringSerializer"# 【可選】壓縮算法,可以有效減少網絡傳輸量和存儲空間。compression_type => "snappy" # 可選 "gzip", "lz4", "snappy"# 【可選】生產者ACK機制,關系到數據可靠性。acks => "1" # "0"(不等待), "1"(等待Leader確認), "all"(等待所有ISR確認)# 【可選】批量發送設置,提高吞吐量。batch_size => 16384linger_ms => 1000 # 發送前等待更多消息加入batch的時間(毫秒)# 【可選】SSL/SASL認證(如果Kafka集群需要)ssl_truststore_location => "/path/to/kafka.client.truststore.jks"ssl_truststore_password => "password"sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='your_username' password='your_password';"sasl_mechanism => "PLAIN"security_protocol => "SASL_SSL"# 【可選】遇到錯誤(如Topic不存在)時重試次數retries => 3}# 強烈建議添加一個備用輸出(如stdout),用于調試和查看錯誤信息。stdout {codec => rubydebug}
}
完整配置示例
假設我們將 app-logs-*
索引中過去 15 分鐘的數據,遷移到名為 logstash-migration-topic
的 Kafka Topic 中,并使用文檔 ID 作為 Kafka Message Key。
input {elasticsearch {hosts => ["http://es-node1:9200"]index => "app-logs-*"query => '{"query": {"range": {"@timestamp": {"gte": "now-15m","lte": "now"}}}}'size => 500scroll => "5m"docinfo => truedocinfo_target => "[@metadata][es_doc]"schedule => "*/5 * * * *" # 每5分鐘運行一次(謹慎!可能導致數據重復)}
}filter {# 使用文檔的_id作為Kafka消息的Keymutate {add_field => {"[@metadata][kafka_key]" => "%{[@metadata][es_doc][_id]}"}}# 移除一些不必要的系統字段mutate {remove_field => ["@version", "@timestamp", "[@metadata][es_doc]"]}
}output {kafka {bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"topic_id => "logstash-migration-topic"codec => "json"compression_type => "lz4"acks => "all" # 追求高可靠性}stdout {}
}
運行命令
將上述配置保存為 es-to-kafka.conf
文件,然后使用以下命令運行 Logstash:
bin/logstash -f /path/to/your/es-to-kafka.conf --config.test_and_exit # 測試配置文件語法
bin/logstash -f /path/to/your/es-to-kafka.conf # 啟動運行
重要注意事項
- 性能與資源:這種操作對 ES 和 Logstash 都是 資源密集型 的。務必調整
size
參數,監控 JVM 內存和 CPU 使用率。 - 重復數據:默認情況下,每次運行
input
都會重新查詢。使用schedule
會導致數據重復。要實現增量遷移,必須在query
中使用嚴格的時間范圍或自增 ID,并記錄上次獲取的位置。 - 數據類型:ES 輸入插件會將整個文檔作為一個 Logstash event,
message
字段就是原始的 JSON 文檔。使用json
codec 輸出可以保持其結構。 - 錯誤處理:網絡中斷、Kafka Topic 不存在等都可能導致任務失敗。建議在測試環境充分測試,并配置好
retries
和retry_max_attempts
。 - 版本兼容性:確保你的 Logstash 版本與 ES 和 Kafka 集群版本兼容。插件可能因版本不同而參數略有差異。