#作者:任少近
文章目錄
- 需求描述
- 系統目標
- 系統組件
- Fluent Bit
- Fluentd
- Kafka
- 數據流與處理流程
- 日志采集
- 日志轉發到 Fluentd
- 日志處理與轉發到 Kafka
- Kafka 作為消息隊列
- 具體配置
- Fluent-Bit的CM配置
- Fluent-Bit的DS配置
- Fluentd的CM配置
- Fluentd的DS配置
- Kafka查詢結果
需求描述
Fluent Bit 將日志傳輸到 Fluentd,Fluentd 再將日志寫入 Kafka
背景: 隨著系統日志量的增加,尤其是在微服務架構下,日志的收集、處理和傳輸變得愈加復雜。為了實現高效的日志收集和處理,需要一個可靠且可擴展的日志管道。Fluent Bit、Fluentd 和 Kafka 的結合可以為這一需求提供強大的支持。
系統目標
Fluent Bit 作為日志收集器,從不同來源(如容器、應用程序、系統日志文件等)收集日志。
將收集到的日志實時轉發到 Fluentd,Fluentd 對日志進行進一步的處理(如解析、過濾、增強等)。
Fluentd 將處理后的日志通過 Kafka 傳輸,Kafka 作為消息隊列,提供日志的高吞吐量傳輸和存儲。
系統組件
Fluent Bit
功能:負責日志的采集和轉發,能夠高效地從各種日志源收集日志數據,并將其發送到 Fluentd。
特點:輕量級、高效、低資源消耗,適用于邊緣設備和容器環境。
Fluentd
功能:接收來自 Fluent Bit 的日志數據,對日志進行進一步的處理,如過濾、格式轉換、增強等。
特點:支持豐富的插件生態系統,能夠靈活地擴展和配置,適用于復雜的日志處理和存儲需求。
Kafka
功能:作為日志數據的消息隊列,提供高吞吐量、可靠的日志傳輸機制。Fluentd 將日志數據發送到 Kafka,Kafka 作為緩沖區存儲和傳遞日志數據,確保日志的可靠性和可擴展性。
特點:高吞吐量、可擴展、容錯能力強。
數據流與處理流程
日志采集
Fluent Bit 部署在日志源所在的節點或容器中,實時監控指定的日志文件(如 /var/log/test.log 等)。
Fluent Bit 使用 tail 插件采集日志,并將其轉換為指定的格式(如 JSON)。
日志轉發到 Fluentd
Fluent Bit 使用 forward 輸出插件將日志數據轉發到 Fluentd,通過指定 Fluentd 的 IP 地址和端口進行連接。
日志處理與轉發到 Kafka
Fluentd 接收到日志后,可以進行各種處理(如過濾、解析、增強、格式轉換等)。
處理后的日志通過 Kafka 輸出插件將日志發送到指定的 Kafka 集群。
Kafka 將日志存儲在其主題中,以便進行后續的分析、查詢和處理。
Kafka 作為消息隊列
Kafka 將日志數據持久化到其分區中,提供可靠的消息存儲和高吞吐量的數據傳輸。
消費者 可以從 Kafka 中讀取數據進行進一步處理或存儲到其他系統(如 Elasticsearch、OpenSearch、數據庫等)。
具體配置
Fluent-Bit的CM配置
fluent-bit.conf: |[SERVICE]Flush 1Parsers_File parsers.confHTTP_Server OnHTTP_Listen 0.0.0.0HTTP_PORT 3302[INPUT]Name tailTag regex-fluentDB /var/log/regex-fluent.dbRead_from_Head truePath /var/log/test.logPath_Key pod_log_path[OUTPUT]Name forwardMatch *Host fluentd-service.logging.svcPort 24224Retry_Limit 5
Fluent-Bit的DS配置
注意:日志目錄的掛載
apiVersion: apps/v1
kind: DaemonSet
metadata:name: fluent-bitnamespace: logginglabels:k8s-app: fluent-bit-loggingversion: v1kubernetes.io/cluster-service: "true"
spec:selector:matchLabels:k8s-app: fluent-bit-loggingtemplate:metadata:labels:k8s-app: fluent-bit-loggingversion: v1kubernetes.io/cluster-service: "true"annotations:prometheus.io/scrape: "true"prometheus.io/port: "2020"prometheus.io/path: /api/v1/metrics/prometheusspec:nodeSelector:zk-app: appcontainers:- name: fluent-bitimage: registry.cn-hangzhou.aliyuncs.com/ali_cloud_images/fluent-bit:1.9imagePullPolicy: Neverports:- containerPort: 2020volumeMounts:- name: varlogmountPath: /var/log- name: varlibdockercontainersmountPath: /var/lib/docker/containersreadOnly: true- name: fluent-bit-configmountPath: /fluent-bit/etc/- name: dbmountPath: /tail-db/terminationGracePeriodSeconds: 10volumes:- name: varloghostPath:path: /var/log- name: varlibdockercontainershostPath:path: /var/lib/docker/containers- name: fluent-bit-configconfigMap:name: fluent-bit-config- name: dbhostPath:path: /home/chb/hundun/fluent-bittype: DirectoryserviceAccountName: fluent-bittolerations:- key: node-role.kubernetes.io/masteroperator: Existseffect: NoSchedule- operator: "Exists"effect: "NoExecute"- operator: "Exists"effect: "NoSchedule"
Fluentd的CM配置
fluent.conf: |-<source>@type forwardport 24224bind 0.0.0.0tag test</source><match test>@type kafka2brokers 192.168.123.100:9092 # Kafka broker 地址topic fluentd_topic<format>@type json</format></match>
Fluentd的DS配置
注意:端口的暴露
apiVersion: apps/v1
kind: DaemonSet
metadata:name: fluentd-kafkanamespace: logginglabels:k8s-app: fluentd-kafkakubernetes.io/cluster-service: "true"addonmanager.kubernetes.io/mode: Reconcile
spec:selector:matchLabels:k8s-app: fluentd-kafkatemplate:metadata:labels:k8s-app: fluentd-kafkakubernetes.io/cluster-service: "true"# 此注釋確保如果節點被驅逐,fluentd不會被驅逐,支持關鍵的基于 pod 注釋的優先級方案。annotations:scheduler.alpha.kubernetes.io/critical-pod: ''spec:initContainers:- name: init-permissionimage: busyboximagePullPolicy: Nevercommand: ["sh", "-c", "mkdir -p /var/log/td-agent && chown -R 1000:1000 /var/log/td-agent"]nodeSelector:zk-app: appserviceAccountName: fluentd-kafkacontainers:- name: fluentd-kafkaimage: fluentd-kafka:latestimagePullPolicy: NeversecurityContext:runAsUser: 0ports: - containerPort: 24224name: forward-portvolumeMounts:- name: fluentd-config-volumemountPath: /opt/bitnami/fluentd/conf/fluentd.confsubPath: fluent.conf- name: varlogmountPath: /var/log- name: posmountPath: /var/log/td-agentvolumes:- name: fluentd-config-volumeconfigMap:name: fluentd-config- name: varloghostPath:path: /home/chb/test- name: poshostPath:path: /var/log/td-agentimagePullSecrets:- name: default-secrettolerations:- operator: ExiststerminationGracePeriodSeconds: 30
---
apiVersion: v1
kind: Service
metadata:name: fluentd-servicenamespace: logging
spec:selector:k8s-app: fluentd-kafkaports:- protocol: TCPport: 24224targetPort: 24224clusterIP: None