1、ES原理
原理 使?filebeat來上傳?志數據,logstash進??志收集與處理,elasticsearch作為?志存儲與搜索引擎,最后使?kibana展現?志的可視化輸出。所以不難發現,?志解析主要還 是logstash做的事情
從上圖中可以看到,logstash主要包含三?模塊:
1、INPUTS: 收集所有數據源的?志數據([源有file、redis、beats等,filebeat就是使?了beats源*);
2、FILTERS: 負責數據處理與轉換、解析、整理?志數據(常?:grok、mutate、drop、clone、geoip)
3、OUTPUTS: 將解析的?志數據輸出?存儲器([elasticseach、file、syslog等);
通過配置Logstash的管道(pipeline),你可以定義數據的收集、處理和輸出過程。每個管道由輸入插件、過濾器插件和輸出插件組成,它們一起協作來實現特定的數據流轉
filters常用的過濾器插件如下:
-
grok 過濾器: 場景:解析包含時間戳、日志級別和消息的日志行。
?rubyCopy codegrok {match => {"message" => "\[%{TIMESTAMP_ISO8601:time}\] \[%{WORD:level}\] %{GREEDYDATA:msg}"}}
-
mutate 過濾器: 場景:清理字段,將 IP 地址字段重命名為 "client_ip"。
?rubyCopy codemutate {rename => { "ip" => "client_ip" }}
-
date 過濾器: 場景:將時間戳字段轉換為可操作的日期類型。
?rubyCopy codedate {match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]target => "log_date"}
-
json 過濾器: 場景:解析包含嵌套 JSON 數據的日志消息。
?rubyCopy codejson {source => "message"target => "parsed_json"}
-
kv 過濾器: 場景:解析 HTTP 查詢字符串中的參數。
?rubyCopy codekv {field_split => "&"value_split => "="source => "query_string"}
-
xml 過濾器: 場景:解析包含 XML 數據的日志消息。
?rubyCopy codexml {source => "message"store_xml => falsexpath => ["//user/name/text()", "username","//user/age/text()", "user_age"]}
-
translate 過濾器: 場景:將日志中的狀態碼映射為更可讀的狀態描述。
?rubyCopy codetranslate {field => "status_code"dictionary => ["200", "OK","404", "Not Found","500", "Internal Server Error"]}
-
useragent 過濾器: 場景:解析用戶代理字符串,提取瀏覽器和操作系統信息。
?rubyCopy codeuseragent {source => "user_agent"target => "user_agent_info"}
-
geoip 過濾器: 場景:將 IP 地址解析為地理位置信息。
?rubyCopy codegeoip {source => "client_ip"target => "geoip"}
-
multiline 過濾器: 場景:合并多行堆棧跟蹤日志成單個事件。
?rubyCopy codemultiline {pattern => "^\s"negate => truewhat => "previous"}
2.Logstash性能優化主要體現在以下幾個方面:
1.多pipeline配置
多pipeline配置。可以將不同的輸入分割到不同的pipeline,每個pipeline有獨立的過濾器和輸出,這可以提高處理效率。pipeline之間的數據交互可以通過隊列實現。
?ruby# 管道1:接收日志輸入input { stdin { } } ??filter { grok { } }output { stdout { } }?# 管道2:從Kafka讀取數據input { kafka { } }?filter { json { } } output { elasticsearch { } }
2.Grok過濾器配置
?rubyfilter {grok {match => { "message" => "%{COMBINEDAPACHELOG}" }add_field => { "timestamp" => "%{DATE:timestamp}" }}}
3.Elasticsearch輸出配置
?ruby ?output {elasticsearch { hosts => ["http://localhost:9200"]index => "logstash-%{+YYYY.MM.dd}"}}
4.Redis隊列配置
?rubyoutput {redis { host => "127.0.0.1"port => 6379db ? => 0key => "logstash" }}
5.batching編輯模式配置
?rubyinput {file {path => "/var/log/messages"start_position => "beginning" sincedb_path => "/dev/null"codec => "json"mode => "batch" ? # 配置batching模式batch_size => 1000 # 每1000條記錄批量讀取}}
6.調整JVM內存配置在 jvm.options
文件中配置,例如:
?-Xms2g-Xmx2g
7.并行處理配置 在 logstash.yml
中配置:
?yml
pipeline.workers: 2 ?#配置工作線程數為2
pipeline.output.workers: 2 #輸出線程也配置為2
logstash 配置文件
logstash 配置文件配置
vim ?logstash.conf?
input {
? kafka {
? ? bootstrap_servers => ["192.168.190.159:9092"]
? ? topics_pattern ?=> ["hwb\.test|ywyth-sc|zj_test"]
? ? consumer_threads => 5
? ? codec => json
? ? auto_offset_reset => latest
? ? group_id => "hwb"
? }
}filter {
? ? ? ? ruby {
? ? ? ? code => "event.timestamp.time.localtime"
? ? ? ? }
? ? ? ? mutate {
? ? ? ? remove_field => ["beat"]
? ? ? ? }
? ? ? ? mutate {
? ? ? ? ? ? ? ? split => ["message"," "]
? ? ? ? ? ? ? ? add_field => { "level" => "%{[message][3]}" }
? ? ? ? }
? ? ? ? mutate {
?? ??? ?add_field => {
?? ??? ? ? ?"index_name" => "hwb.test,%{[ywyth-sc]}"
? ? ? ? ? ? ? ? }
? ? ? ? }
? ? ? ? grok {
? ? ? ? ? ? ?match => {"message" => "\[(?<time>\d+-\d+-\d+\s\d+:\d+:\d+)\] \[(?<level>\w+)\] (?<thread>[\w|-]+) (?<class>[\w|\.]+) (?<lineNum>\d+):(?<msg>.+)"
? ? ? ? ? ? ?}
? ? ? ? }
}output {
? ?elasticsearch {
? ? ? ? ?hosts => ["192.168.190.161:9200"]
? ? ? ? ?index => "%{[fields][log_topic]}"
? ? ? ? ?codec => "json"
? ?}
}
?
啟動logstash
nohup ./logstash -f ?../config/logstash.conf &?