目錄
- 什么是消費者 Lag
- 舉例說明:
- Lag 的意義:
- Lag 監控和查詢
- kafka-consumer-groups
- 基本語法
- 常用命令示例
- 1. 查看單個消費者組的詳細信息(最常用)
- 2. 列出所有消費者組(只顯示名稱)
- 3. 列出所有消費者組(有詳情信息,可以通過grep過濾topic和消費者組對應信息)
- 4. 查看消費者組成員信息
- Golang 代碼實現 Lag 監控
什么是消費者 Lag
在消息隊列系統(如 Kafka)中,消費者 Lag(也稱為 “滯后量”)
是衡量消費者(或消費者組)處理消息進度的核心指標,它表示尚未被消費的消息數量。
具體來說,Lag 的計算方式是:
Lag = 分區當前最大偏移量(Max Offset) - 消費者已提交的偏移量(Committed Offset)
- 最大偏移量(Max Offset):分區中最新一條消息的位置(即已經生產的消息總量標識)。
- 已提交偏移量(Committed Offset):消費者組已經成功處理并提交的最新消息位置(即已經消費完成的進度標識)。
通常來說,Lag 的單位是消息數,而且我們一般是在主題這個級別上討論 Lag 的,但實際上,Kafka 監控 Lag 的層級是在分區上的。如果要計算主題級別的,你需要手動匯總所有主題分區的 Lag,將它們累加起來,合并成最終的 Lag 值。
舉例說明:
假設某個分區的消息偏移量是從 0 開始遞增的:
- 目前分區中最新的消息偏移量是 100(即已生產了 101 條消息,0~100)。
- 消費者組已提交的偏移量是 80(即已處理完 0~80 的消息)。
此時,Lag = 100 - 80 = 20,意味著還有 20 條消息(81~100)未被消費。
Lag 的意義:
- Lag = 0:表示消費者完全跟上了消息生產速度,沒有未處理的消息。
- Lag 增大:說明消費者處理速度慢于消息生產速度,出現了消息積壓,可能導致業務延遲。
- Lag 長期不為 0:可能是消費者能力不足、邏輯阻塞或系統異常的信號,需要排查。
Lag 監控和查詢
kafka-consumer-groups
kafka-consumer-groups 腳本是 Kafka 為我們提供的最直接的監控消費者消費進度的工具。
kafka-consumer-groups.sh
是 Kafka 自帶的命令行工具,用于管理和查詢消費者組(Consumer Group)的信息,包括消費進度(Lag)、位移(Offset)、成員信息等。它是排查消費問題的常用工具,適用于快速診斷消費者組狀態。
基本語法
kafka-consumer-groups.sh --bootstrap-server <kafka-broker地址> [選項]
核心參數說明:
--bootstrap-server
:指定 Kafka 集群的 broker 地址(如localhost:9092
或多個地址用逗號分隔),必須指定。--group
:指定消費者組名稱(操作單個組時使用)。--all-groups
:操作所有消費者組(如查詢所有組的信息)。--describe
:查看消費者組的詳細信息(包括每個分區的位移和 Lag)。
常用命令示例
1. 查看單個消費者組的詳細信息(最常用)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group
輸出示例及解讀:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-consumer-group test-topic 0 80 100 20 consumer-1-abc123 /192.168.1.1 consumer-1
my-consumer-group test-topic 1 50 50 0 consumer-2-def456 /192.168.1.2 consumer-2
字段含義:
GROUP
:消費者組名稱。TOPIC
:消費的主題名稱。PARTITION
:主題的分區編號。CURRENT-OFFSET
:消費者組已提交的位移(已處理到的位置)。LOG-END-OFFSET
:分區最新的消息位移(已生產的最新位置)。LAG
:未消費的消息數量(LOG-END-OFFSET - CURRENT-OFFSET
)。CONSUMER-ID
/HOST
/CLIENT-ID
:當前消費該分區的消費者信息。
2. 列出所有消費者組(只顯示名稱)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
輸出示例:
my-consumer-group
order-service-group
user-tracking-group
3. 列出所有消費者組(有詳情信息,可以通過grep過濾topic和消費者組對應信息)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.37.10:9092 --all-groups --describe 2>/dev/null | grep -v GROUP |awk '{size[$1" "$2]+=$6} END{for(i in size) if(size[i]>300) {print " 消費.對應Tpic "i,"的積壓數為:"size[i]}}'
實現效果
4. 查看消費者組成員信息
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \--describe --group my-consumer-group \--members
輸出消費者組內的成員列表、分配的分區等信息,用于確認組內消費者是否正常在線。
Golang 代碼實現 Lag 監控
填坑,待完善!