文章目錄
- 一、日志收集所解決的問題
- 二、`Elastic Stack` 組件介紹
- 2.1 `Elasticsearch`
- 2.2 `Logstash`
- 2.3 `Kibana`
- 2.4 `Filebeat` beats
- 三、`ELK Stack`集群安裝
- 3.1 安裝`JAVA`環境(所有`ES`節點)
- 3.2 安裝`ES`集群
- 3.2.1 `ES`單節點部署
- 3.2.2 `ES` `JAVA`調優:堆(`heap`)內存大小
- 3.2.3 `ES`使用自定義`JAVA`環境配置
- 3.2.3 部署`ES`高可用集群
- 3.3 `Kibana`安裝(`elk03`節點)
- 3.4 `Logstash`安裝(`elk02`)
- 3.5 `Filebeat`安裝(`elk01`)
- 3.5.1 `Filebeat`架構
- 四、`Filebeat`日志收集
- 4.1 `Filebeat`標準輸入和輸出
- 4.2.2 `Filebeat `基于`Log`類型進行輸入并且自定義`Tags`
- 4.2.8 `Filebeat`輸出數據至`Elasticsearch`集群
- 4.2.12 `FIlebeat` 收集`JSON`格式的`nginx`訪問日志
- 五、`Logstash` 日志收集
- 5.1 `Logstash`安裝
- 5.3 `Logstash`基于`File`的形式進行`input`
- 5.4 `Logstash`基于`Filebeat`形式進行`input`
- 5.5 Logstash基于ES形式進行output
- 六、`Logstash grok`插件使用
- 6.1 `Logstash` 內置正則使用
- 6.4 `Logstash`刪除指定字段
- 八、`Logstash useragent`插件分析用戶客戶端類型
- 九、`Logstash`多`if`分支
- 十、`Kibana`操作使用
- 10.1 `Kibana`手動創建索引模板
- 九、`kafka`簡介和使用
- 9.1`kafka`相關概念
- 9.1.1 `Kafka`介紹
- 9.1.2 `Kafka`相關專業術語
- 9.2 `Zookeeper`使用和配置
- 9.2.1 簡介
- 9.2.2 `Zookeeper`集群部署:三節點😀
- 9.2.3 `Zookeeper`角色劃分
- 9.2.4 `zookeeper`配置內存堆棧
- 9.3 `Kafka`集群安裝:三節點😀
- 9.4 `Kafka Topic`日常操作
- 9.4.1 查看`Topic`相關信息
- 9.4.2 創建`Topic`
- 9.5 `Filebeat`收集日志至`Kafka`
- 9.6 `Logstash`收集`Kafka Topic`日志
- 十、`Elasticsearch Restful`風格`API`實戰
- 10.1 `ES`集群狀態
- 10.2 `Elasticsearch`術語
一、日志收集所解決的問題
運維工程師首要職責維護服務的正常運行,什么是正常運行狀態?服務的正常運行是萬千報錯中的一種“特例”,維護這個特例?
- 生產環境出現問題后,需要查看各種?志進行分析排錯;
- 日常運維工作,日志文件分散,運維工作繁瑣。可以實現日志聚合;
- 開發人員沒有登陸服務器的權限。開發人員可以通過web界面查看日志;
- 面對大量的訪問日志,可以統計出各項指標,比如PV UV。
二、Elastic Stack
組件介紹
Elasticsearch 9部署文檔:https://www.elastic.co/docs/deploy-manage/deploy/self-managed/install-elasticsearch-from-archive-on-linux-macos
2.1 Elasticsearch
Elasticsearch
(簡稱ES
)是一個分布式、**RESTful
**風格的開源搜索和數據分析引擎,專為全文搜索、結構化搜索、分析、和數據可視化而設計。支持幾乎實時的搜索,并且具有高擴展性,能夠處理PB
級別的數據。
Index
(索引): 索引類似于關系型數據庫中的表,它存儲著相似結構的數據。在Elasticsearch
中,每個索引包含一個或多個文檔(Document
)。Document
(文檔): 文檔是Elasticsearch
的基本數據單位,相當于關系型數據庫中的一行記錄。文檔以JSON
格式存儲,包含字段和值的對。Shards
(分片)和Replicas
(副本):Elasticsearch
將數據分為多個分片存儲,每個索引可以包含一個或多個分片。分片可以提高并行查詢的性能。副本是分片的備份,保證了數據的高可用性和容錯性。Cluster
(集群):Elasticsearch
集群由一個或多個節點(Node
)組成。每個節點負責存儲部分數據,并參與集群中的索引和搜索操作。集群有一個主節點,用于管理集群狀態。Node
(節點): 節點是Elasticsearch
的運行實例,每個節點屬于某個集群。節點可以充當主節點、數據節點。- 使用場景:
- 日志和事件數據分析:
Elasticsearch
常用于集中化日志管理和分析,幫助企業在海量日志中快速定位問題。 - 電商網站的搜索功能:
Elasticsearch
被廣泛用于電商網站的產品搜索,提供快速、相關性高的搜索結果。 - 數據存儲與檢索:
Elasticsearch
能有效存儲和檢索結構化或非結構化數據,適用于大規模數據的管理。 JSON key=values
- 日志和事件數據分析:
2.2 Logstash
? Logstash
是一個開源的數據收集、處理和傳輸引擎,主要用于實時的數據管道管理。是 Elastic Stack
的重要組成部分,負責將各種來源的數據收集起來,進行過濾和格式化處理,并最終將其輸出到指定的目標,如 Elasticsearch
、文件或其他存儲系統。
2.3 Kibana
? Kibana
提供強大的數據可視化能力,使用戶可以在瀏覽器中實時查看、搜索和分析存儲在 Elasticsearch
中的數據。它提供豐富的可視化選項,如圖表、地圖和表格,支持構建交互式儀表板。
2.4 Filebeat
beats
Filebeat
是一個輕量級的日志收集和傳輸工具。它通常用于從各種數據源收集日志,并將這些日志傳輸到Logstash
或 Elasticsearch
進行進一步處理和分析。Filebeat
是一個邊車代理,安裝在需要監控的服務器上,用于高效地讀取和轉發日志數據。它旨在減少資源消耗,提供可靠的日志傳輸,并確保在出現故障時不會丟失日志數據。
三、ELK Stack
集群安裝
3.1 安裝JAVA
環境(所有ES
節點)
#>>> 上傳jdk1.8安裝包
$ ll jdk-8u381-linux-x64.tar.gz
-rw-r--r--. 1 root root 139273048 7月 31 10:23 jdk-8u381-linux-x64.tar.gz#>>> 解壓jdk到指定目錄
$ tar xf jdk-8u381-linux-x64.tar.gz -C /usr/local/#>>> 創建軟鏈接
$ cd /usr/local/
$ ln -s jdk1.8.0_381/ java#>>> 聲明Java環境變量
$ cat >> /etc/profile.d/jdk.sh <<-EOF
#!/bin/bash
export JAVA_HOME=/usr/local/java
export PATH=\$PATH:\$JAVA_HOME/bin
EOF#>>> 重新加載配置文件
$ source /etc/profile.d/jdk.sh #>>> 測試java
$ java -version
java version "1.8.0_381"
Java(TM) SE Runtime Environment (build 1.8.0_381-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.381-b09, mixed mode)
3.2 安裝ES
集群
3.2.1 ES
單節點部署
#>>> 解壓es安裝包到指定目錄
$ tar xf elasticsearch-7.17.11-linux-x86_64.tar.gz -C /usr/local#>>> 創建es軟鏈接
$ cd /usr/local/
$ ln -s elasticsearch-7.17.11/ es#>>> 聲明ES環境變量
$ cat >> /etc/profile.d/es.sh <<-EOF
#! /bin/bash
export ES_HOME=/usr/local/es
export PATH=\$PATH:\$ES_HOME/bin
EOF#>>> 重新加載環境變量
$ source /etc/profile.d/es.sh #>>> 啟動es實例
$ elasticsearch
此時啟動會有報錯信息,記住詳細看報錯信息喲!報錯信息如下:此服務不能以超級管理員的身份運行,需要單獨創建
es
普通用戶。
解決方法如下操作:
#>>> 創建用于啟動ES的用戶
$ useradd es#>>> 查看用戶是否創建
$ id es
uid=1000(es) gid=1000(es) 組=1000(es)#>>> 修改elasticsearch屬主和數組
$ chown -R es.es /usr/local/elasticsearch-7.17.11/
#>>> 查看是否修改成功
$ ll /usr/local/elasticsearch-7.17.11/
#>>> 切換es工作目錄
$ cd /usr/local/elasticsearch-7.17.11/config/#>>> 備份es配置文件
$ cp elasticsearch.yml{,.bak}#>>> 修改文件相關參數
$ egrep -v "^(#|$)" elasticsearch.yml
···
# es集群名稱
cluster.name: study-elk-cluster
# 節點名稱(一般以當前主機名一致)
node.name: elk01
# 監聽地址,,配置IP則知允許基于IP地址進行訪問
network.host: 0.0.0.0
# 自動發現節點IP
discovery.seed_hosts: ["192.168.100.160"]
# 初始化master節點
cluster.initial_master_nodes: ["192.168.100.160"]
# 添加以下參數關閉geoip數據庫的更新,減少對帶寬和存儲的消耗,禁用自動更新。GeoIP根據IP地址確定地理位置的技術,用于日志分析、用戶活動追蹤等場景。
ingest.geoip.downloader.enabled: false
···#>>> 測試啟動ES
$ su -c "elasticsearch" es
雖然
es
能夠正常啟動,但是啟動日志會出下報錯信息,需要我們修改啟動的內核參數,第一個為修改系統最大的文件描述符,第二個修改系統的虛擬內存。
#>>> 修改es需要的limits參數(重新連接會話框才能成功加載參數)
$ cat >>/etc/security/limits.d/elk.conf <<EOF
* soft nofile 65535
* hard nofile 131070
EOF# 參數解釋:soft nofile 65535表示將文件描述符的軟限制設置為65535。hard nofile 131070表示將文件描述符的硬限制設置為131070。#>>> 查看limits參數是否加載
$ ulimit -Sn
65535
$ ulimit -Hn
131070#>>> 修改內核參數
$ cat >> /etc/sysctl.d/elk.conf <<'EOF'
vm.max_map_count = 262144
EOF# 參數解釋vm.max_map_count = 262144 用于設置單個進程可以擁有的內存映射的最大數量。#>>> 加載內核參數
$ sysctl -f /etc/sysctl.d/elk.conf
vm.max_map_count = 262144
$ sysctl -q vm.max_map_count
vm.max_map_count = 262144#>>> 測試啟動ES
$ su -c "elasticsearch" es
#>>> 或者使用一下的參數實現后臺運行
$ su -c "elasticsearch -d" es#>>> 測試節點
$ curl 192.168.100.160:9200
{"name" : "elk01", # 節點的名稱,一般以當前主機名命名。"cluster_name" : "study-elk-cluster", # Elasticsearch集群的名稱,多個節點可以組成一個集群,共同提供搜索和存儲功能。"cluster_uuid" : "EyOTgfNNSHOvUsQNLQ9f7Q", # 集群的唯一標識符(UUID)。每個Elasticsearch集群都有一個唯一的cluster_uuid,用于在集群內識別和管理節點之間的關系。"version" : {"number" : "7.17.11", # Elasticsearch 實例的版本"build_flavor" : "default", # 示這個版本沒有特殊的自定義修改或變化。"build_type" : "tar", # Elasticsearch 是通過 tar 包安裝的。"build_hash" : "eeedb98c60326ea3d46caef960fb4c77958fb885", # Git提交哈希。"build_date" : "2023-06-23T05:33:12.261262042Z", # 版本的構建日期和時間。"build_snapshot" : false, # 此版本適用于生產"lucene_version" : "8.11.1", # Lucene版本"minimum_wire_compatibility_version" : "6.8.0", # elasticsearch實例能夠與運行 6.8.0 及以上版本的其他節點進行通訊。"minimum_index_compatibility_version" : "6.0.0-beta1" # Elasticsearch兼容的最低索引版本。},"tagline" : "You Know, for Search" # Elasticsearch的標語或宣傳口號。
}
端口解釋:
9200
對外暴露端口。使用的HTTP
協議,Elasticsearch
的REST API
服務的默認端口。開發者和應用程序通過HTTP
協議與Elasticsearch
進行交互,發送查詢、索引文檔、管理集群等操作。9300
集群內部通訊端口。使用TCP
協議。 用于集群內的節點間同步和協調。節點之間通過這個端口交換數據,包括集群管理信息、索引和搜索操作等。
3.2.2 ES
JAVA
調優:堆(heap
)內存大小
#>>> 查看運行的JAVA程序
$ jps
14917 Jps
14713 Elasticsearch#>>> 查看ES堆內存大小
$ jmap -heap 14713(pid)
#>>> 修改堆內存大小(最大設置為32G,要么內存的一半)
$ vim /usr/local/elasticsearch-7.17.11/config/jvm.options
···
-Xms256m
-Xmx256m
···#>>> 刪除數據、日志
$ rm -rf /usr/local/es/{data,logs}/*
#>>> 刪除緩存數據
$ rm -rf /tmp/*#>>> 準備ES啟動腳本并啟動(加入systemd)
$ cat >> /usr/lib/systemd/system/es.service <<EOF
[Unit]
# 描述服務的簡短說明,這里描述為 "ELK"。
Description=ELK
# 指定服務的啟動順序。表示該服務在網絡目標 (network.target) 之后啟動,確保網絡服務已啟動并可用。
After=network.target[Service]
# 指定服務的啟動類型為 forking,表示服務啟動后會產生一個子進程,并且主進程會在啟動完成后退出。這通常用于后臺運行的守護進程。
Type=forking
# 指定啟動服務的命令
ExecStart=/usr/local/es/bin/elasticsearch -d
# 指定服務在退出后不重啟。可以根據需要將其更改為 always 或 on-failure,以確保服務在失敗后自動重啟。
Restart=no
# 指定以 es 用戶的身份運行服務。
User=es
# 指定服務所屬的組為 es。
Group=es
# 設置進程打開文件的最大數量(文件描述符限制)
LimitNOFILE=131070[Install]
# 指定服務的目標,表示該服務在多用戶模式下可用。
WantedBy=multi-user.target
EOF#>>> 重新加載systemd配置文件
$ systemctl daemon-reload#>>> 啟動es
$ systemctl enable --now es
3.2.3 ES
使用自定義JAVA
環境配置
由于默認情況啟動ES
時,使用的ES
家目錄下的JAVA
環境。如果需要自定義JAVA
環境啟動ES
。請配置一下參數:
# Step 1:切換至ES命令存放目錄
$ cd /usr/local/es/bin/# Step 2:修改ES默認啟動腳本
$ vim elasticsearch# 腳本內添加以下參數:# 設置 Java 路徑export JAVA_HOME=/usr/local/javaexport PATH=$JAVA_HOME/bin:$PATH# Java 選項:配置堆棧大小export JVM_OPTIONS="-Xms1g -Xmx4g"

# Step 3:重啟ES
$ systemctl restart es# Step 4:查看es進程信息
$ ps -aux | grep java
3.2.3 部署ES
高可用集群
#>>> 創建用于啟動ES的用戶
$ useradd es
$ id es
uid=1000(elasticsearch) gid=1000(elasticsearch) 組=1000(elasticsearch)#>>> 創建ES數據目錄和日志目錄存放目錄
$ mkdir -p /opt/{data,logs}
$ install -d /opt/{data,logs}/es -o es -g es#>>> 解壓es安裝包到指定目錄
$ tar xf elasticsearch-7.17.11-linux-x86_64.tar.gz -C /opt/#>>> 更改目錄名
$ cd /opt/ && mv elasticsearch-7.17.11 es#>>> 創建ES環境變量
$ vim >> /etc/profile.d/es.sh <<-EOF
#! /bin/bash
export ES_HOME=/opt/es
export PATH=\$PATH:\$ES_HOME/bin
EOF#>>> 重新加載環境變量
$ source /etc/profile.d/es.sh #>>> 修改elasticsearch屬主和數組
$ chown -R es,es /opt/es#>>> 修改es需要的limits參數(重新連接會話框才能成功加載參數)
$ cat >> /etc/security/limits.d/elk.conf <<-EOF
* soft nofile 65535
* hard nofile 131070
EOF#>>> 查看limits參數是否加載
$ ulimit -Sn
65535
$ ulimit -Hn
131070#>>> 修改內核參數
$ cat > /etc/sysctl.d/elk.conf <<EOF
vm.max_map_count = 262144
EOF#>>> 加載內核參數
$ sysctl -f /etc/sysctl.d/elk.conf
vm.max_map_count = 262144
$ sysctl -q vm.max_map_count
vm.max_map_count = 262144#>>> 修改堆內存大小(最大設置為32G,要么內存的一半)
$ vim /opt/elasticsearch-7.17.11/config/jvm.options
···
-Xms256m
-Xmx256m
···
#>>> elk01修改配置文件
$ egrep -v "^(#|$)" /opt/es/config/elasticsearch.yml
cluster.name: study-elk-cluster
node.name: elk01
path.data: /opt/data/es # 指定數據目錄
path.logs: /opt/logs/es # 指定日志目錄
network.host: 0.0.0.0
discovery.seed_hosts: ["192.168.100.160","192.168.100.161","192.168.100.162"]
cluster.initial_master_nodes: ["192.168.100.160","192.168.100.161","192.168.100.162"]
ingest.geoip.downloader.enabled: false
#>>> elk02修改配置文件
$ egrep -v "^(#|$)" /opt/es/config/elasticsearch.yml
cluster.name: study-elk-cluster
node.name: elk02
path.data: /opt/data/es
path.logs: /opt/logs/es
network.host: 0.0.0.0
discovery.seed_hosts: ["192.168.100.160","192.168.100.161","192.168.100.162"]
cluster.initial_master_nodes: ["192.168.100.160","192.168.100.161","192.168.100.162"]
ingest.geoip.downloader.enabled: false
#>>> elk03修改配置文件
$ egrep -v "^#|^$" /opt/es/config/elasticsearch.yml
cluster.name: study-elk-cluster
node.name: elk03
path.data: /opt/data/es
path.logs: /opt/logs/es
network.host: 0.0.0.0
discovery.seed_hosts: ["192.168.100.160","192.168.100.161","192.168.100.162"]
cluster.initial_master_nodes: ["192.168.100.160","192.168.100.161","192.168.100.162"]
ingest.geoip.downloader.enabled: false
#>>> 所有節點添加elk啟動腳本
$ cat > /usr/lib/systemd/system/es.service <<EOF
[Unit]
Description=ELK
After=network.target
[Service]
Type=forking
ExecStart=/opt/es/bin/elasticsearch -d
Restart=no
User=es
Group=es
LimitNOFILE=131070
[Install]
WantedBy=multi-user.target
EOF#>>> 所有節點重新加載并啟動
$ systemctl daemon-reload
$ systemctl restart es#>>> 測試集群
[root@elk01 ~]# curl 192.168.100.160:9200/_cat/nodes
192.168.100.160 40 59 8 0.14 0.10 0.06 cdfhilmrstw * elk01
192.168.100.161 54 30 11 0.55 0.29 0.11 cdfhilmrstw - elk02
192.168.100.162 48 28 8 0.29 0.16 0.06 cdfhilmrstw - elk03# 參數解釋:第一列:每個節點的IP地址;第二列:每個節點的CPU使用率;第三列:每個節點的內存的使用率;第四列:每個節點的活躍分片數量;第五列:每個節點的1分鐘、5分鐘、15分鐘平均負載;第六列:每個節點在集群中的角色;第七列:*代表主節點;主節點負責管理集群的元數據、分片分配和集群狀態等任務。第八列:節點的名稱
3.3 Kibana
安裝(elk03
節點)
#>>> 安裝kibana
$ yum localinstall -y kibana-7.17.11-x86_64.rpm #>>> 備份配置文件
$ cd /etc/kibana/
$ cp kibana.yml kibana.yml.bak#>>> 修改kibana配置文件
$ egrep -v "^(#|$)" kibana.yml
# 服務端口
server.port: 5601
# 主機地址或者主機名
server.host: "0.0.0.0"
# 服務名稱
server.name: "study-elk-kibana"
# ES主機組地址
elasticsearch.hosts: ["http://192.168.100.160:9200","http://192.168.100.161:9200","http://192.168.100.162:9200"]
# 修改語言
i18n.locale: "zh-CN"#>>> 啟動Kibana
$ systemctl enable --now kibana#>>> 游覽器IP+5601訪問
3.4 Logstash
安裝(elk02
)
#>>> 安裝Logstash
$ yum localinstall -y logstash-7.17.11-x86_64.rpm #>>> 創建軟連接
$ ln -s /usr/share/logstash/bin/logstash /usr/bin/logstash#>>> 編寫測試文件
$ mkdir conf-logstash
$ cat conf-logstash/01-stdin-to-stdout.conf
input {stdin {}
}
output{stdout {}
}#>>> 測試Logstash
$ logstash -f ~/conf-logstash/01-stdin-to-stdout.conf
3.5 Filebeat
安裝(elk01
)
邊車代理,需要安裝在每臺服務器上,并且通過tail -f
讀取指定目錄的日志文件。“收割機” Beats
? 幫助文檔:https://www.elastic.co/guide/en/beats/filebeat/7.17/index.html
# Step 1:elk01主機安裝Filebeat
$ yum localinstall -y filebeat-7.17.11-x86_64.rpm # Step 2:elk01主機測試Filebeat
$ mkdir /root/filebeat-config
$ cp /etc/filebeat/filebeat.yml /root/filebeat-config/01-test.yml
$ cat /root/filebeat-config/filebeat-config/01-test.yml
filebeat.inputs:
- type: stdinenabled: trueoutput.console:pretty: true # 啟動美觀格式輸出# Step 2:elk01主機啟動Filebeat實例
$ filebeat -e -c /opt/filebeat/filebeat-config/01-test.yml

3.5.1 Filebeat
架構
架構圖:
Input
(數據源)收集方式:https://www.elastic.co/guide/en/beats/filebeat/7.17/configuration-filebeat-options.html
Output
(輸出地)推送方式:https://www.elastic.co/guide/en/beats/filebeat/7.17/configuring-output.html
四、Filebeat
日志收集
? Input
方式:https://www.elastic.co/guide/en/beats/filebeat/7.17/filebeat-input-stdin.html
4.1 Filebeat
標準輸入和輸出
? 標準輸入官方文檔:https://www.elastic.co/guide/en/beats/filebeat/7.17/filebeat-input-stdin.html
? 標準輸出官方文檔:https://www.elastic.co/guide/en/beats/filebeat/7.17/console-output.html
? 簡介:使用終端輸入從標準輸入讀取事件。
# Step 1:elk01主機創建Filebeat配置文件測試目錄
$ mkdir filebeat-config# Step 2:elk01主機創建配置文件
$ vim filebeat-config/01-stdin-on-stdout.yml
# 指定輸入類型
filebeat.inputs:# 終端標準輸入
- type: stdin
# 標準輸出類型:終端輸出output.console:#如果設置為 true,則寫入 stdout 的事件將采用良好的格式。默認值為 false。pretty: true# Step 3:elk01主機測試
$ filebeat -e -c /root/filebeat-config/01-stdin-on-stdout.yml

4.2.2 Filebeat
基于Log
類型進行輸入并且自定義Tags
$ vim filebeat-config/02-log-on-stdout.yml
filebeat.inputs:
- type: logpaths:- /var/log/messages# 對當前的輸入日志文件,指定獨有的標簽tags: ["system logs"]fields:log_type: system- type: logpaths:- /var/log/nginx/access.log# 對當前的輸入日志文件,指定獨有的標簽 tags: ["nginx access_log"]fields:log_type: nginx_accessoutput.console:pretty: true#>>> 清除偏移量
$ > /var/lib/filebeat/registry/filebeat/log.json #>>> 啟動Filebeat實例
$ filebeat -e -c /root/filebeat-config/02-log-on-stdout.yml

對于
Tags
的設定,將Filebeat
收集到的日志轉發至ES
集群時,可以針對不同的Tag
設置不同的索引,方便查閱。
4.2.8 Filebeat
輸出數據至Elasticsearch
集群
? 官方鏈接:https://www.elastic.co/guide/en/beats/filebeat/7.17/elasticsearch-output.html#elasticsearch-output
? 簡介:Elasticsearch
輸出使用Elasticsearch HTTP API
將事件直接發送到Elasticearch
。
#>>> 編寫Filebeat配置文件
$ vim filebeat-config/06-log-nginx-access-es.yml
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logtags: ["nginx_access"]fields:log_type: nginx_accessfields_under_root: true# 輸出至ES集群
output.elasticsearch:# ES集群地址hosts: ["http://192.168.174.140:9200","http://192.168.174.141:9200","http://192.168.174.142:9200"]#>>> 啟動Filebeat實例
$ filebeat -e -c /root/filebeat-config/06-log-nginx-access-es.yml
Kibana
查看ES
數據


通過索引模式來匹配索引
![]()
4.2.12 FIlebeat
收集JSON
格式的nginx
訪問日志
#>>> 安裝nginx
$ vim /etc/yum.repos.d/nginx.repo
[nginx-stable]
name=nginx stable repo
baseurl=http://nginx.org/packages/centos/$releasever/$basearch/
gpgcheck=0
enabled=1
gpgkey=https://nginx.org/keys/nginx_signing.key
module_hotfixes=true$ yum install -y nginx#>>> 啟動nginx
$ systemctl enable --now nginx#>>> 修改nginx訪問日志格式
$ vim /etc/nginx/nginx.conf
···log_format test_nginx_json '{"@timestamp":"$time_iso8601",''"host":"$server_addr",''"clientip":"$remote_addr",''"SendBytes":$body_bytes_sent,''"responsetime":$request_time,''"upstreamtime":"$upstream_response_time",''"upstreamhost":"$upstream_addr",''"http_host":"$host",''"uri":"$uri",''"domain":"$host",''"xff":"$http_x_forwarded_for",''"referer":"$http_referer",''"tcp_xff":"$proxy_protocol_addr",''"http_user_agent":"$http_user_agent",''"status":"$status"}';access_log /var/log/nginx/access.log test_nginx_json;
···#>>> 檢查語法格式
$ nginx -t#>>> 重啟nginx
$ systemctl reload nginx#>>> 測試
$ curl localhost
$ cat /var/log/nginx/access.log
#>>> 修改Filebeat配置(識別json格式日志,生成相關字段)
$ vim filebeat-config/10-log-nginx-log-shard-json-es.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/nginx/access.log*tags: ["access"]
# 啟動json格式 json.keys_under_root: trueoutput.elasticsearch:hosts: ["http://192.168.174.140:9200","http://192.168.174.141:9200","http://192.168.174.142:9200"]#>>> 清除filebeat的指針偏移,用于從第一行重新讀取日志文件
$ rm -rf /var/lib/filebeat/*#>>> 清楚以前的ES nginx索引,防止出現臟數據
略#>>> 重啟filebeat
$ filebeat -e -c ~/filebeat-config/10-log-nginx-log-shard-json-es.yml
五、Logstash
日志收集
? 官網:https://www.elastic.co/guide/en/logstash/7.17/
? 簡介:Logstash
是一個具有實時流水線功能的開源數據收集引擎。Logstash
可以動態地統一來自不同來源的數據,并將數據規范化為您選擇的目的地。為各種高級下游分析和可視化用例清理和民主化您的所有數據。雖然Logstash
最初推動了日志收集的創新,但其功能遠遠超出了該用例。任何類型的事件都可以通過廣泛的輸入、過濾和輸出插件進行豐富和轉換。
5.1 Logstash
安裝
#>>> 安裝
$ yum install -y logstash-7.17.11-x86_64.rpm #>>> 聲明軟連接
$ ln -s /usr/share/logstash/bin/logstash /sbin/#>>> 創建Logstash 測試文件目錄
$ mkdir logstash-config
5.3 Logstash
基于File
的形式進行input
? 官網:https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-file.html
#>>> 編寫配置文件
$ vim 02-file-input.conf
input {file {# 指定日志文件收集的路徑path => ["/tmp/*.txt"]# 從日志文件的第一行進行讀取,只會在第一次啟動時從頭讀取start_position => "beginning" # 從日志文件的尾行讀取日志,且偏移指針文件對原日志文件偏移量未記錄# start_position => "end" }
}output {stdout {}
}#>>> 準備測試日志文件
$ echo 1111 > /tmp/1.txt#>>> 啟動Logstash實例
$ logstash -f /root/stduy-logstash-config/02_input_file.conf

#>>> 查看偏移量
$ cat /usr/share/logstash/data/plugins/inputs/file/.sincedb_820ddbbd098cfece4b56f4fcbf67a9bb
16789540 0 64768 4 1717082577.147625 /tmp/1.txt#>>> 查看日志文件
$ ll -i /tmp/1.txt
16789540 -rw-r--r-- 1 root root 4 5月 30 23:20 /tmp/1.txt
5.4 Logstash
基于Filebeat
形式進行input
? Filebeat output Logstash
官網:https://www.elastic.co/guide/en/beats/filebeat/7.17/logstash-output.html
? Logstash input Fliebeat
官網:https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-beats.html
#>>> 修改Filebeat配置文件
$ vim /root/filebeat-config/output-logstash.yml filebeat.inputs:
- type: logpaths:- /tmp/test.logtags: ["test"]fields:log_type: testfields_under_root: trueoutput.logstash:# Logstash主機地址hosts: ["192.168.174.142:5044"]#>>> 啟動Filebeat實例
$ filebeat -e -c /root/filebeat-conifg/output-logstash.yml
#>>> 修改Logstash配置文件
$ vim input-beats.conf
input {beats {# 通訊端口號,默認5044port => 5044}
}output {stdout {}
}#>>> 啟動Logstash
$ logstash -rf /root/logstash-config/input-beats.conf
5.5 Logstash基于ES形式進行output
? 官網:https://www.elastic.co/guide/en/logstash/7.17/plugins-outputs-elasticsearch.html
#>>> 修改Logstash配置文件
$ cat 08_output_es.conf
input {beats {# 通訊端口號,默認5044port => 5044}
}output {elasticsearch {# ES集群IPhosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"] # 索引模式名稱index => "test-log-%{+yyyy.MM.dd}"}
}#>>> 啟動Logstash
$ logstash -f /root/stduy-logstash-config/08_output_es.conf
六、Logstash grok
插件使用
? 官網: https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html
? 簡介:解析任意文本并對其進行結構化。Grok
是將非結構化日志數據解析為結構化和可查詢內容的好方法。該工具非常適合系統日志,apache
和其他Web
服務器日志,mysql
日志,以及通常為人類編寫的任何日志格式。
6.1 Logstash
內置正則使用
? 簡介:將非結構化的日志格式通過Grok
內置的正則轉化為結構化。
#>>> 日志格式
192.168.174.1 - - [01/Jun/2024:10:37:16 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0" "-"
192.168.174.1 - - [01/Jun/2024:10:37:16 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0" "-"#>>> 編寫Filebeat配置文件
$ vim filebeat-out-logstash.yml
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logtags: ["access"]
output.logstash:hosts: ["192.168.174.142:5044"]#>>> 清除偏移量
$ rm -rf /var/lib/filebeat/*
#>>> 啟動Filebeat實例
$ filebeat -e -c ~/filebeat-out-logstash.yml #>>> 編寫Logstash配置文件
$ vim filter_grok.yml
input {beats {port => 5044}
}filter {grok {match => {"message" => "%{HTTPD_COMMONLOG}" }}
}output {stdout {}elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"] index => "test-%{+yyyy.MM.dd}"}
}#>>> 啟動Logstash
$ logstash -rf ~/filter_grok.yml
6.4 Logstash
刪除指定字段
? 簡介:如果此篩選器成功,請從此事件中刪除任意字段。
? 未刪除字段終端打印如下:
#>>> Filebeat配置文件編寫
$ vim filebeat-out-logstash.yml
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logtags: ["access"]
output.logstash:hosts: ["192.168.174.142:5044"]#>>> 刪除偏移量
$ rm -rf /var/lib/filebeat/*#>>> 啟動Filebeat實例
$ filebeat -e -c ~/filebeat-out-logstash.yml #>>> Logstash配置文件編寫
$ vim filter_grok.yml
input {beats {port => 5044}
}filter {grok {match => {"message" => "%{HTTPD_COMMONLOG}"}# You can also remove multiple fields at once: remove_field => [ "type","offset","ecs","input","@version","log","agent","tags" ]}
}output {stdout {}elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]index => "test-%{+yyyy.MM.dd}"}
}#>>> 啟動Logstash實例
$ logstash -rf ~/filter_grok.yml
刪除指定字段后終端數據顯示如下:
八、Logstash useragent
插件分析用戶客戶端類型
? 官網:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-useragent.html
#>>> Filebeat配置文件編寫
$ vim filebeat-out-logstash.yml
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logtags: ["access"]json.keys_under_root: true output.logstash:hosts: ["192.168.174.142:5044"]#>>> 刪除偏移量
$ rm -rf /var/lib/filebeat/*#>>> 啟動Filebeat實例
$ filebeat -e -c ~/filebeat-out-logstash.yml #>>> Logstash配置文件編寫
$ vim filter_grok.yml
input {beats {port => 5044}
}filter {grok { remove_field => [ "type","offset","ecs","input","@version","log","agent","tags" ]}useragent {source => "http_user_agent"target => "test_user_agent"remove_field => [ "agent","@version","tags","ecs","log","offset","type" ]}
}output {stdout {}elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]index => "test-%{+yyyy.MM.dd}"}
}#>>> 啟動Logstash實例
$ logstash -rf ~/filter_grok.yml
九、Logstash
多if
分支
? 官網:https://www.elastic.co/guide/en/logstash/7.17/event-dependent-configuration.html#metadata
? 簡介: 針對不同的日志類型。filter
去做不同的處理
#>>> Filebeat配置文件
$ vim filebeat-out-logstash.yml
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logfields:log_type: nginx_accessfields_under_root: truejson.keys_under_root: true - type: logpaths:- /var/log/nginx/error.logfields:log_type: nginx_errorfields_under_root: true- type: logpaths:- /var/log/messagesfields:log_type: system_logfields_under_root: trueoutput.logstash:hosts: ["192.168.174.142:5044"]#>>> 刪除偏移量
$ rm -rf /var/lib/filebeat/*#>>> 啟動Filebeat實例
$ filebeat -e -c ~/filebeat-out-logstash.yml #>>> Logstash 配置文件準備
$ vim filter_grok.yml
input {beats {port => 5044}
}filter {mutate {remove_field => ["agent","ephemeral_id","ecs","@version","tags","input","log","offest"] }useragent {source => "http_user_agent" target => "study_user_agent"}
}output {stdout {}if [log_type] == "nginx_access" { elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"] index => "web-access-%{+yyyy.MM.dd}"} } else if [log_type] == "nginx_error" {elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]index => "web-error-%{+yyyy.MM.dd}" }} else if [log_type] == "system_log" {elasticsearch {hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]index => "system-log-%{+yyyy.MM.dd}" } }
}#>>> 啟動Logstash
$ logstash -rf ~/filter_grok.yml
十、Kibana
操作使用
10.1 Kibana
手動創建索引模板
{
"number_of_replicas":3,
"number_of_shards":3
}
九、kafka
簡介和使用
9.1kafka
相關概念
? 官網:https://kafka.apache.org/
9.1.1 Kafka
介紹
? Apache Kafka
是一個開源的分布式事件流平臺,為數以千計的公司提供高性能數據管道、流分析、 數據集成和任務關鍵型應用程序。
-
高吞吐量:使用延遲低至
2ms
的機器集群以網絡有限的吞吐量傳遞消息。 -
可伸縮:將生產集群擴展到
1000
個代理、每天數萬億條消息、PB的數據和數十萬個分區。彈性擴展和收縮存儲和處理。 -
永久存儲器:將數據流安全地存儲在分布式、持久、容錯的集群中。
-
高可用性:在可用性區域上高效地擴展集群,或者跨地理區域連接單獨的集群。
9.1.2 Kafka
相關專業術語
-
主題(
Topic
):主題是Kafka
中用于對消息進行分類的邏輯分組,每個主題可以看作是消息的分類器。主題是多訂閱者模式,即一個主題可以有多個消費者訂閱。 -
分區(
Partition
):每個主題被分成一個或多個分區。分區是消息存儲的基本單元。分區內的消息是有序的,但不同分區之間無序。每個分區可以分布在不同的Kafka
服務器上,從而實現水平擴展。leader partition
負責對kafka
集群的讀寫操作,和客戶端進行交互。follower partition
負責去leader partition
同步數據,不可以和客戶端進行交互。
-
偏移量(
Offset
):偏移量是分區中每條消息的唯一標識符。它是一個遞增的整數,記錄了消息在分區中的位置。消費者使用偏移量來跟蹤讀取消息的位置。 -
生產者(
Producer
):生產者是負責向Kafka
主題發布消息的客戶端應用程序。生產者將消息發送到指定的主題和分區。 -
消費者(
Consumer
):消費者是負責從Kafka
主題讀取消息的客戶端應用程序。消費者通過訂閱一個或多個主題來讀取消息,并使用偏移量來跟蹤讀取進度。 -
消費者組(
Consumer Group
):消費者組是一組消費者實例,共同消費一個或多個主題的消息。 -
代理(
Broker
):代理是Kafka
集群中的一個服務器節點,負責存儲和傳輸消息數據。一個Kafka
集群由多個代理組成,每個代理可以處理多個分區。 -
復制(
Replication
):Kafka
中的復制機制將分區數據復制到多個代理上,以確保數據的高可用性和容錯性。每個分區有一個領導副本(Leader
)和若干個跟隨副本(Follower
)。所有的讀寫操作都由領導副本處理,跟隨副本只需同步領導副本的數據。但是Leader
和Follwer
都屬于副本。創建時副本數不能為0
。
9.2 Zookeeper
使用和配置
? 官網鏈接:https://dlcdn.apache.org/zookeeper/
? 下載鏈接:https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
9.2.1 簡介
? Zookeeper
是 Apache
旗下的一個開源分布式協調服務,用于管理和協調分布式應用程序中的各種服務和組件,它提供了一系列高效且可靠的分布式數據一致性和協調機制。
9.2.2 Zookeeper
集群部署:三節點😀
#>>> 解壓安裝包到指定目錄
$ tar xf apache-zookeeper-3.8.2-bin.tar.gz -C /opt/ && cd /opt/#>>> 更改目錄名稱
$ mv apache-zookeeper-3.8.2-bin/ zookeeper#>>> 配置zk環境變量
$ cat >> /etc/profile.d/zookeeper.sh <<-EOF
#!/bin/bash
export ZK_HOME=/opt/zookeeper
export PATH=\$PATH:\$ZK_HOME/bin
EOF#>>> 重新加載環境變量
$ source /etc/profile.d/zookeeper.sh# 創建zk數據存放目錄
$ mkdir -p /opt/data/zk#>>> 修改zk的配置文件(三節點)
$ cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg$ egrep -v "^(#|$)" /opt/zookeeper/conf/zoo.cfg # Zookeeper的心跳間隔時間。服務器和客戶端會通過心跳包維持連接狀態。
tickTime=2000# 從節點最多可以等待10 個tickTime(即 10 * 2000 毫秒 = 20 秒)來與領導者同步數據狀態。否則它將被認為是不可用的。
initLimit=10# 定義Zookeeper領導者和從節點之間的心跳和同步請求的最大允許延遲時間。領導者與從節點之間的響應時間如果超過了這個限制,從節點將被認為失去同步狀態。(即 5 * 2000 毫秒 = 10 秒)。
syncLimit=5# 定義zk數據目錄
dataDir=/opt/data/zk# 定義Zookeeper集群的客戶端連接端口
clientPort=2181
server.140=192.168.174.140:2888:3888
server.141=192.168.174.141:2888:3888
server.142=192.168.174.142:2888:3888# 參數解釋:# dataDir zk數據目錄# clientPort 端口號# server.140 zk節點唯一標識# 192.168.100.160 zk節點主機地址# 2888 集群內部通訊端口# 3888 leader選舉端口# 創建ID文件(位置存放在zk的數據存放路徑下)101節點
$ echo "140" > /opt/data/zk/myid102節點
$ echo "141" > /opt/data/zk/myid103節點
$ echo "142" > /opt/data/zk/myid#>>> 所有節點啟動zk
$ zkServer.sh start#>>> 查看zk狀態
$ zkServer.sh status
myid
文件:在Zookeeper
集群中,myid
文件是每個Zookeeper
服務器節點的重要配置文件之一,用于唯一標識集群中的每個服務器。
9.2.3 Zookeeper
角色劃分
在Zookeeper
集群中,不同的服務器節點可以承擔不同的角色,以確保集群的高可用性、數據一致性和故障恢復能力。主要角色包括:server.140=192.168.174.140:2888:3888:角色
-
領導者(
Leader
)-
處理所有寫操作(如創建、更新、刪除節點)并確保數據的一致性。
-
管理并協調集群中其他節點的活動。
-
負責為客戶端請求生成唯一的事務
ID
(zxid
)。 -
在集群啟動或領導者失效時,會進行領導者選舉,選出新的領導者。
-
定期發送心跳給跟隨者,確保自己處于活動狀態。
-
-
跟隨者(
Follower
)- 處理所有讀取操作(如讀取節點數據)。
- 接收并轉發客戶端的寫請求給領導者進行處理。
- 將領導者的事務日志同步到本地,確保數據一致性。
- 參與領導者選舉。
- 與領導者保持同步,接收并應用領導者的事務。
-
觀察者(
Observer
)- 處理讀取操作,減輕領導者和跟隨者的負擔。
- 不參與領導者選舉和事務投票,只同步領導者的事務日志。
- 提高集群的讀取擴展能力,適合需要高讀取吞吐量的場景。
-
客戶端(
Client
)- 連接到集群中的任一節點進行讀取或寫入操作。
- 自動處理節點故障并重新連接到其他可用節點。
- 通過客戶端
API
與Zookeeper
集群進行交互。
9.2.4 zookeeper
配置內存堆棧
#>>> 查看zk進程
$ jps
1367 Elasticsearch
5400 QuorumPeerMain
5593 Jps#>>> 查看zk的堆棧大小
$ jmap -heap 5400
zookeeper
默認堆內存大小為1GB
,一般設置為2GB
或者4GB
#>>> 調節zookeeper推內存大小為256MB
$ vim /opt/zookeeper/conf/java.env
#! /bin/bash # 指定JDK安裝路徑
export JAVA_HOME=/usr/local/java # 指定zookeeper的堆內存大小
export JVMFLAGS="-Xms256m -Xmx256m $JVMFLAGS" #>>> 同步文件
$ scp /opt/zookeeper/conf/java.env elk02:/opt/zookeeper/conf/
$ scp /opt/zookeeper/conf/java.env elk03:/opt/zookeeper/conf/#>>> 所有節點重啟zk
$ zkServer.sh restart#>>> 驗證堆內存
$ jmap -heap `jps | grep QuorumPeerMain | awk '{print $1}'`
9.3 Kafka
集群安裝:三節點😀
? 下載鏈接:https://kafka.apache.org/downloads
? 官網:https://kafka.apache.org/
#>>> 解壓安裝包到指定目錄
$ tar xf kafka_2.13-3.2.1.tgz -C /opt/ && cd /opt/#>>> 修改目錄名稱
$ mv kafka_2.13-3.2.1/ kafka#>>> 配置kafka環境變量
$ cat >> /etc/profile.d/kafka.sh <<-EOF
#!/bin/bash
export KAFKA_HOME=/opt/kafka
export PATH=\$PATH:\$KAFKA_HOME/bin
EOF#>>> 重新加載環境變量
$ source /etc/profile.d/kafka.sh #>>> 創建數據目錄
$ mkdir /opt/data/kafka -p#>>> 修改kafka配置文件# 1.kafka101節點配置文件
$ egrep -v "^(#|$)" kafka/config/server.properties
broker.id=140
log.dirs=/opt/data/kafka/
zookeeper.connect=192.168.174.140:2181,192.168.174.141:2181,192.168.174.142:2181/kafka# 2.kafka102節點配置文件
$ egrep -v "^(#|$)" kafka/config/server.properties
broker.id=141
log.dirs=/opt/data/kafka/
zookeeper.connect=192.168.174.140:2181,192.168.174.141:2181,192.168.174.142:2181/kafka # /kafka為zk中的znode# 3.kafka103節點配置文件
$ egrep -v "^(#|$)" kafka/config/server.properties
broker.id=142
log.dirs=/opt/data/kafka/
zookeeper.connect=192.168.174.140:2181,192.168.174.141:2181,192.168.174.142:2181/kafka#>>> 所有節點啟動kafka
$ kafka-server-start.sh -daemon /opt/kafka/config/server.properties #>>> zk節點查看kafka注冊信息
$ zkCli.sh ls /kafka/brokers/ids | grep "^\["
[140, 141, 142 #>>> kafka停止腳本
$ kafka-server-stop.sh
9.4 Kafka Topic
日常操作
9.4.1 查看Topic
相關信息
#>>> 查看集群中所有Topic
$ kafka-topics.sh \--bootstrap-server 192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 \--list#>>> 查看所有Topic詳細信息
$ kafka-topics.sh \--bootstrap-server 192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 \--describe#>>> 查看某個Topic信息
$ kafka-topics.sh \--bootstrap-server 192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 \--describe --topic Topice名稱
9.4.2 創建Topic
#>>> 創建test-elk Topic
$ kafka-topics.sh \--bootstrap-server 192.168.174.140:9092 \--create --topic test-elk#>>> 創建Topic,并指定副本數量
$ kafka-topics.sh \--bootstrap-server 192.168.174.140:9092 \--create --topic test-elk-02 \--partitions 10 \--replication-factor 1#>>> 查看Topic的詳細信息
$ kafka-topics.sh \--bootstrap-server 192.168.174.140:9092 \--describe --topic test-elk-02
#>>> 創建Topic,并指定副本數量
$ kafka-topics.sh \--bootstrap-server 192.168.174.140:909 \--create --topic test-elk-03 \--partitions 10 \--replication-factor 2 $ kafka-topics.sh \--bootstrap-server 192.168.174.140:9092 \--describe --topic test-elk-03
注意:副本數量不能大于Broker的數量
9.5 Filebeat
收集日志至Kafka
官網:https://www.elastic.co/guide/en/beats/filebeat/7.17/kafka-output.html
$ cat filbeat-out-kafka.yml
filebeat.inputs:
- type: logpaths:- /var/log/nginx/access.logjson.keys_under_root: true #- type: log
# paths:
# - /var/log/nginx/error.logoutput.kafka:# initial brokers for reading cluster metadatahosts: ["192.168.174.140:9092", "192.168.174.141:9092", "192.168.174.142:9092"]# message topic selection + partitioningtopic: "test-kafka-topic"$ filebeat -e -c ~/filbeat-out-kafka.yml #>>> kafka測試拉取數據
$ kafka-console-consumer.sh \--topic test-kafka-topic \--bootstrap-server 192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092 \--from-beginning
9.6 Logstash
收集Kafka Topic
日志
? 官網:https://www.elastic.co/guide/en/logstash/7.17/plugins-inputs-kafka.html
input {kafka {bootstrap_servers => "192.168.174.140:9092,192.168.174.141:9092,192.168.174.142:9092" # Kafka集群IPtopics => ["test-kafka-topic"] # 指定Topic進行消費數據group_id => "test-kafka" # 指定消費者組codec => json { # 指定消費的數據是JSON格式。charset => "UTF-8"}}
}filter {mutate {remove_field => ["agent","ephemeral_id","ecs","@version","tags","input","log","offest"] }if [log_type] == "nginx_access" {geoip {source => "clientip"}useragent {source => "http_user_agent" target => "study_user_agent"}
}
}output {stdout {}# if [log_type] == "nginx_access" {
# elasticsearch {
# hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
# index => "web-access-%{+yyyy.MM.dd}"
# }
# } else if [log_type] == "nginx_error" {
# elasticsearch {
# hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
# index => "web-error-%{+yyyy.MM.dd}"
# }
# } else if [log_type] == "system_log" {
# elasticsearch {
# hosts => ["192.168.174.140:9200","192.168.174.141:9200","192.168.174.142:9200"]
# index => "system-log-%{+yyyy.MM.dd}"
# }
# }
}#>>> 啟動Logstash實例
$ logstash -rf ~/filter_grok.yml
十、Elasticsearch Restful
風格API
實戰
10.1 ES
集群狀態
-
綠色(
Green
)- 含義:集群健康狀態正常,所有的主分片和副本分片都已分配。
- 解釋:綠色狀態表示集群中的所有數據都可以訪問,所有分片(包括主分片和副本分片)都分配到集群中的節點上。
- 示例:假設集群有3個主分片,每個主分片有1個副本分片,那么綠色狀態下這6個分片都已成功分配且正常運行。
-
黃色(
Yellow
)- 含義:集群健康狀態部分正常,所有的主分片都已分配,但有一個或多個副本分片未分配。
- 解釋:黃色狀態表示集群中的所有主分片都可以訪問,但一些副本分片由于某種原因(例如節點故障或資源不足)未能分配。這意味著數據是安全的,但沒有高可用性,因為如果某個節點失敗,它可能會導致數據無法訪問。
- 示例:假設集群有3個主分片和每個主分片1個副本分片,如果有3個主分片和2個副本分片已分配,但1個副本分片未能分配,則集群為黃色狀態。
-
紅色(
Red
)-
含義:集群健康狀態不正常,有一個或多個主分片未分配。
-
解釋:紅色狀態表示集群中有一些數據不可訪問,因為主分片未能分配。此時,可能存在數據丟失的風險,需要立即采取措施來修復問題。
-
示例:假設集群有3個主分片和每個主分片1個副本分片,如果有2個主分片和所有副本分片未能分配,則集群為紅色狀態。
-
10.2 Elasticsearch
術語
Document文檔,用戶存儲在ES的數據,ES最小單元,文檔不可被拆分。文檔使用JSON的對象存儲類型。filed相當于數據表的字段,對文檔數據根據不同屬性進行分類標識。index索引,一個索引就是擁有相似特征文檔的集合。shard分片,存儲數據的地方,每個底層對應的使一個Lucene庫,一個索引至少有一個或多個分片。replica副本,一個分片可以有0個或者多個副本。作用是對數據進行備份,一旦副本數量不為0,就會引入主分片(primary shard)和副本分片(replica shard)的概念。主分片(primary shard)實現數據的讀寫操作。副本分片(replica shard)可以實現數據的讀操作,需要主分片同步數據,當主分片掛掉時,副本分片會變為主分片。Allocation分配。將分片分配給某個節點的過程,包括主分片和副本分片。如果副本分片,還包含從主分片復制數據的過程,此過程由Master節點調度完成。
- 上家公司每天所產生的日志量(5GB左右)
- ELastic stack技術棧有哪些組件 (beats Logstash elasticsearch Kibana)
- elasticsearch 使用場景?(日志收集場景:elk技術棧。搜索引擎(IK分詞器))
- elasticsearch 數據遷移?