目錄
1. Kafka 簡介
1.1 Kafka 核心概念
(1)消息系統 vs. 流處理平臺
(2)核心組件
1.2 Kafka 核心特性
(1)高吞吐 & 低延遲
(2)持久化存儲
(3)分布式 & 高可用
(4)水平擴展
(5)流處理能力
1.3 Kafka 典型應用場景
1.4 Kafka 架構示例
數據流示例(訂單處理)
1.5 Kafka vs 其他消息隊列
2. kafka部署
2.1 創建Namespace
2.2 創建ConfigMap
2.3 創建Headless Service
2.4 創建Statefulset
2.5 部署所有資源
2.6 檢查kafka Pod狀態
1. Kafka 簡介
Apache Kafka 是一個 分布式流處理平臺,主要用于構建 高吞吐量、低延遲、可擴展 的實時數據管道和流式應用程序。它最初由 LinkedIn 開發,后成為 Apache 頂級開源項目,廣泛應用于大數據、日志聚合、事件驅動架構等領域。
1.1 Kafka 核心概念
(1)消息系統 vs. 流處理平臺
-
傳統消息隊列(如 RabbitMQ):主要用于解耦生產者和消費者,保證消息可靠傳遞。
-
Kafka:
-
不僅是一個消息隊列,還是一個 分布式流存儲系統,支持持久化存儲和流式計算。
-
適用于 高吞吐、大規模數據流 場景(如日志、指標、事件數據)。
-
(2)核心組件
組件 | 說明 |
---|---|
Producer(生產者) | 向 Kafka 發送消息(如日志、交易數據)。 |
Consumer(消費者) | 從 Kafka 讀取并處理消息。 |
Broker(代理) | Kafka 服務器,負責存儲和轉發消息。 |
Topic(主題) | 消息的分類(類似數據庫表),如 orders 、logs 。 |
Partition(分區) | 每個 Topic 可分成多個 Partition,提高并行處理能力。 |
Offset(偏移量) | 每條消息在 Partition 中的唯一 ID(類似數據庫主鍵)。 |
Consumer Group(消費者組) | 多個消費者共同消費一個 Topic,實現負載均衡。 |
ZooKeeper | 管理 Kafka 集群元數據(新版本 Kafka 已逐步移除依賴)。 |
1.2 Kafka 核心特性
(1)高吞吐 & 低延遲
-
支持每秒百萬級消息處理(取決于硬件配置)。
-
采用 順序 I/O(相比隨機 I/O 更快)和 零拷貝 技術優化性能。
(2)持久化存儲
-
消息默認持久化到磁盤(可配置保留時間),支持 重放(replay) 數據。
-
適用于 事件溯源(Event Sourcing) 和 審計日志。
(3)分布式 & 高可用
-
支持 多副本(Replication),防止數據丟失。
-
自動故障轉移(Leader/Follower 機制)。
(4)水平擴展
-
可動態增加 Broker 和 Partition,提升吞吐量。
(5)流處理能力
-
配合 Kafka Streams 或 ksqlDB 可實現實時流計算(如聚合、窗口計算)。
1.3 Kafka 典型應用場景
場景 | 說明 |
---|---|
日志聚合 | 收集應用日志(替代 ELK 中的 Logstash)。 |
消息隊列 | 解耦微服務,如訂單系統 → 庫存系統。 |
實時數據處理 | 結合 Flink/Spark Streaming 做實時分析。 |
事件驅動架構 | 如用戶行為追蹤、IoT 設備數據采集。 |
Commit Log(提交日志) | 數據庫變更捕獲(CDC),如 Debezium + Kafka。 |
1.4 Kafka 架構示例
生產者(Producer) → Kafka Cluster(Broker1, Broker2...)↓ 消費者(Consumer Group)→ 實時處理(Flink/Spark)↓存儲(HDFS/DB)
數據流示例(訂單處理)
-
訂單服務(Producer)發送消息到
orders
Topic。 -
庫存服務(Consumer)讀取
orders
消息,扣減庫存。 -
分析服務(Consumer)統計實時銷售額。
1.5 Kafka vs 其他消息隊列
特性 | Kafka | RabbitMQ | Pulsar |
---|---|---|---|
吞吐量 | ????? | ?? | ???? |
延遲 | ??? | ???? | ??? |
持久化 | 支持(磁盤) | 可選(內存/磁盤) | 支持 |
流處理 | 原生支持(Kafka Streams) | 不支持 | 支持(Pulsar Functions) |
適用場景 | 大數據、日志 | 任務隊列、RPC | 多租戶、云原生 |
? 適用 Kafka 的場景:
-
需要高吞吐、持久化存儲的實時數據流(如日志、事件)。
-
流處理(如實時分析、監控)。
? 不適用 Kafka 的場景:
-
需要復雜路由(RabbitMQ 更合適)。
-
低延遲任務隊列(Redis Streams/RabbitMQ 更好)。
Kafka 已成為現代數據架構的核心組件,廣泛應用于大數據、微服務、實時計算等領域。
2. kafka部署
2.1 創建Namespace
kubectl create namespace elk
2.2 創建ConfigMap
vim kafka-configmap.yaml
apiVersion: v1 kind: ConfigMap metadata:name: ldc-kafka-scriptsnamespace: elk data:setup.sh: |- #啟動腳本#!/bin/bashexport KAFKA_CFG_NODE_ID=${MY_POD_NAME##*-} exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh
2.3 創建Headless Service
vim kafka-headless.yaml
apiVersion: v1 kind: Service metadata:name: kafka-headlessnamespace: elk spec:clusterIP: Noneselector:app: kafkaports:- name: brokerport: 9092- name: controllerport: 9093
2.4 創建Statefulset
vim kafka-statefulset.yaml
apiVersion: apps/v1 kind: StatefulSet metadata:name: kafkanamespace: elklabels:app: kafka spec:selector:matchLabels:app: kafkaserviceName: kafka-headlesspodManagementPolicy: Parallelreplicas: 1 #根據資源情況設置實例數,推薦3個副本updateStrategy:type: RollingUpdatetemplate:metadata:labels:app: kafkaspec:affinity:nodeAffinity: #這里做了節點親和性調度到master節點requiredDuringSchedulingIgnoredDuringExecution:nodeSelectorTerms:- matchExpressions:- key: node-role.kubernetes.io/control-planeoperator: Exists#values:#- mastertolerations:- key: "node-role.kubernetes.io/control-plane"operator: "Exists"effect: "NoSchedule"containers:- name: kafkaimage: swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/bitnami/kafka:3.4.0imagePullPolicy: "IfNotPresent"command:- /opt/leaderchain/setup.shenv:- name: BITNAMI_DEBUGvalue: "true" #詳細日志# KRaft settings - name: MY_POD_NAME # 用于生成KAFKA_CFG_NODE_IDvalueFrom:fieldRef:fieldPath: metadata.name ? ? ? ? ? ?- name: KAFKA_CFG_PROCESS_ROLESvalue: "controller,broker"- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERSvalue: "0@kafka-0.kafka-headless:9093" #修改實例數時要更新- name: KAFKA_KRAFT_CLUSTER_IDvalue: "Jc7hwCMorEyPprSI1Iw4sW" ?# Listeners ? ? ? ? ? ?- name: KAFKA_CFG_LISTENERSvalue: "PLAINTEXT://:9092,CONTROLLER://:9093"- name: KAFKA_CFG_ADVERTISED_LISTENERSvalue: "PLAINTEXT://:9092"- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPvalue: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMESvalue: "CONTROLLER"- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAMEvalue: "PLAINTEXT"- name: ALLOW_PLAINTEXT_LISTENERvalue: "yes"ports:- containerPort: 9092name: broker- containerPort: 9093name: controllerprotocol: TCP ? ? ? ? ? ? ? ? ? ? volumeMounts:- mountPath: /bitnami/kafkaname: kafka-data- mountPath: /opt/leaderchain/setup.shname: scriptssubPath: setup.shreadOnly: true ? ? ?securityContext:fsGroup: 1001runAsUser: 1001volumes: ? ?- configMap:defaultMode: 493name: ldc-kafka-scripts #ConfigMap的名字name: scripts ? ? ? ? ? ? ? ? ? volumeClaimTemplates:- apiVersion: v1kind: PersistentVolumeClaimmetadata:name: kafka-dataspec:accessModes: [ "ReadWriteOnce" ] storageClassName: nfs-client #存儲類的名稱resources:requests:storage: 1Gi
2.5 部署所有資源
[root@master1 Kafka]# ls kafka-configmap.yaml kafka-headless.yaml kafka-statefulset.yaml [root@master1 Kafka]# kubectl apply -f ./ configmap/ldc-kafka-scripts created service/kafka-headless created statefulset.apps/kafka created
2.6 檢查kafka Pod狀態
[root@master1 Kafka]# kubectl get pod -n elk NAME ? ? ? ? ? ? READY ? STATUS ? RESTARTS ? AGE filebeat-6db9l ? 1/1 ? ? Running ? 0 ? ? ? ? 62m filebeat-qllxg ? 1/1 ? ? Running ? 0 ? ? ? ? 62m filebeat-r5hw7 ? 1/1 ? ? Running ? 0 ? ? ? ? 62m kafka-0 ? ? ? ? 1/1 ? ? Running ? 0 ? ? ? ? 2m2s