#作者:張桐瑞
文章目錄
- 1 問題概述
- 2 修改方案
- 2.1修改參數
- 2.2配置示例
- 3 消費者組均分腳本
- 3.1使用說明
- 3.2腳本內容
- 3.3實現原理說明
- 4 KAFKA-EXPORTER流程代碼
- 4.1KAFKA-EXPORTER拉取數據流程
1 問題概述
由于kafka-exporter獲取kafka指標時間過長,無法通過curl kafka-exporter:9308/metrics 獲取指標,通過查看kafka-exporter日志,發現
time=“2025-07-22T02:18:00Z” level=error msg=“Cannot get offset of group xxxx: kafka: broker not connected” source=“kafka_exporter.go:396”
的報錯信息,發現kafka-exporter在查詢消費者組時出現超時,通過命令
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
同樣出現超時信息。
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeConsumerGroups, deadlineMs=1753170317381, tries=1, nextAllowedTryMs=1753170317482) timed out at 1753170317382 after 1 attempt(s)
通過針對具體消費者組進行查看,發現返回信息較快,故計劃過濾一部分groups來減少數據量。
2 修改方案
當前環境中,Kafka與Kafka Exporter共同部署在同一Pod 內,且各個Kafka Exporter未配置過濾規則,導致采集數據存在大量重復。為提升采集效率和準確性,計劃通過以下措施進行優化:
- 基于不同的 ReplicaSet(RC)配置差異化過濾規則;
針對每個RC單獨設置專屬的消費者組過濾規則,避免重復采集。 - 利用消費者均分腳本生成過濾正則表達式;
通過腳本將所有消費者組按規則均分,生成適用于Kafka Exporter的正則表達式,保證每個 Exporter 只采集其分配范圍內的消費者組。 - 將生成的消費者組過濾正則應用于不同 Exporter 配置中;
將對應的正則表達式配置到各自 Exporter 實例中,實現消費組采集的隔離和均衡分布
2.1修改參數
group.filter .* Regex that determines which consumer groups to collect
group.filter # 正則表達式,用于確定要收集哪些消費者組(Consumer Groups)的指標。
2.2配置示例
只需為每個Kafka Exporter添加對應的YAML配置段落即可。
2.2.1KAFKA_EXPORTER_1:
- args:
- –kafka.server=localhost:9092
- –web.listen-address=:9308
- –group.filter=^(A|a|B).*
2.2.2KAFKA_EXPORTER_2:
- args:
- –kafka.server=localhost:9092
- –web.listen-address=:9308
- –group.filter=^(E|b).*
2.2.3KAFKA_EXPORTER_3:
- args:
- –kafka.server=localhost:9092
- –web.listen-address=:9308
- –group.filter=^?.*
3 消費者組均分腳本
3.1使用說明
配置參數:
- BOOTSTRAP_SERVER: 指定 Kafka 服務地址;
- EXPORTER_COUNT: 指定需要生成多少個 Exporter 的過濾規則。
# Bash group-distributor.sh使用消費者組均分腳本進行消費者組均勻分布,腳本執行結果如下:
bash-5.0$ bash /bitnami/kafka/data/topic_to_consumer_group.sh================ Hash by 首字母 (區分大小寫) =================
Exporter 1: --group.filter=^(A|a|B).* [3 groups]
Exporter 2: --group.filter=^(E|b).* [2 groups]
Exporter 3: --group.filter=^(C).* [1 groups]
可參考對應的exporter過濾正則結果。
3.2腳本內容
3.2.1Hash分組腳本
#!/bin/bashBOOTSTRAP_SERVER="localhost:9092"
EXPORTER_COUNT=3# 獲取所有消費者組
all_groups=$(kafka-consumer-groups.sh --bootstrap-server "$BOOTSTRAP_SERVER" --list)# 初始化數組
for ((i=0; i<EXPORTER_COUNT; i++)); doprefix_sets[$i]=""count[$i]=0
donedeclare -A seen_prefix# 遍歷組名,按首字母 hash 均分
while IFS= read -r group; doprefix=$(echo "$group" | cut -c1)[[ -z "$prefix" ]] && continue# 計算哈希分配位置hash_hex=$(echo -n "$prefix" | md5sum | awk '{print $1}' | cut -c1-8)hash_dec=$((16#$hash_hex))idx=$((hash_dec % EXPORTER_COUNT))# 記錄唯一首字母,用于生成正則if [[ -z "${seen_prefix[$prefix]}" ]]; thenseen_prefix[$prefix]=1if [[ -z "${prefix_sets[$idx]}" ]]; thenprefix_sets[$idx]="$prefix"elseprefix_sets[$idx]+="|$prefix"fifi# 每個實際組都會計入對應 exporter 的數量count[$idx]=$((count[$idx] + 1))
done <<< "$all_groups"# 輸出結果
echo "================ Hash by 首字母 (區分大小寫) ================="
for ((i=0; i<EXPORTER_COUNT; i++)); doif [[ -n "${prefix_sets[$i]}" ]]; thenecho "Exporter $((i+1)): --group.filter='^(${prefix_sets[$i]}).*' [${count[$i]} groups]"elseecho "Exporter $((i+1)): --group.filter='^()' [${count[$i]} groups]"fi
done
3.2.2 排序分組腳本
#!/bin/bashBOOTSTRAP_SERVER="localhost:9092"
EXPORTER_COUNT=3# 獲取所有消費組
all_groups=$(kafka-consumer-groups.sh --bootstrap-server "$BOOTSTRAP_SERVER" --list)# 按首字母分組(區分大小寫)
declare -A initial_to_groups
declare -A initial_countswhile IFS= read -r group; do[[ -z "$group" ]] && continuefirst_char=${group:0:1}initial_to_groups["$first_char"]+="$group"$'\n'initial_counts["$first_char"]=$((initial_counts["$first_char"] + 1))
done <<< "$all_groups"# 將首字母按消費組數量降序排序
sorted_initials=$(for k in "${!initial_counts[@]}"; doecho -e "${initial_counts[$k]}\t$k"
done | sort -rn | awk '{print $2}')# 初始化每個 exporter 的負載、過濾項和組列表
for ((i=0; i<EXPORTER_COUNT; i++)); doexporter_filters[$i]=""exporter_loads[$i]=0exporter_groups[$i]=""
done# 分配首字母到 exporter,按消費組數量均衡
for initial in $sorted_initials; docount=${initial_counts[$initial]}# 找當前負載最小的 exportermin_index=0min_load=${exporter_loads[0]}for ((i=1; i<EXPORTER_COUNT; i++)); doif (( exporter_loads[i] < min_load )); thenmin_index=$imin_load=${exporter_loads[i]}fidone# 添加到對應 exporterif [[ -z "${exporter_filters[$min_index]}" ]]; thenexporter_filters[$min_index]="$initial"elseexporter_filters[$min_index]+="|$initial"fiexporter_loads[$min_index]=$((exporter_loads[$min_index] + count))exporter_groups[$min_index]+="${initial_to_groups[$initial]}"
done# 輸出格式
echo "================ kafka-exporter group.filter 正則分片 ================="
for ((i=0; i<EXPORTER_COUNT; i++)); dogroup_count=$(echo -n "${exporter_groups[$i]}" | grep -c '^')echo "Exporter $((i+1)): ($group_count groups)"echo " --group.filter='^(${exporter_filters[$i]}).*'"echo
done
3.3實現原理說明
3.3.1Hash分組腳本說明
該腳本通過以下核心邏輯實現消費者組的分配與正則表達式生成:
- 獲取消費者組列表:
利用 kafka-consumer-groups.sh --list 獲取當前 Kafka 所有消費者組名。 - 提取首字母并去重:
對每個消費者組名提取首字母(區分大小寫),使用關聯數組 seen_prefix 確保相同首字母只處理一次。 - Hash 均分邏輯:
1)使用 md5sum 對首字母進行 Hash 處理;
2)將 Hash 值轉為十進制,再通過取模操作將其平均分配到 N 個 Exporter 中;
3)每個 Exporter 收集分配到的首字母集合,組合成正則表達式。 - 生成過濾規則:
每個Exporter輸出一條形如–group.filter=^(A|B|C).* 的正則規則,僅匹配以指定前綴開頭的消費者組。
3.3.2排序分鐘腳本說明
在首字母提取之后,腳本還會對首字母列表進行 排序處理,其作用是: - 確保穩定性
通過排序(sort 命令),可以保證即使 Kafka 集群中的消費者組順序發生變化,腳本對首字母的處理順序仍保持一致,從而保證正則分配的穩定性。 - 提升分配公平性
排序后的首字母經過統一的 Hash 計算和取模分配,有助于在 Exporter間更公平地分攤消費者組,提高采集性能和均衡性。 - 簡化調試和查看
將字母排序輸出后,便于在查看分配結果時快速定位具體字母屬于哪個Exporter,也有助于問題定位和正則表達式核查。
4 KAFKA-EXPORTER流程代碼
4.1KAFKA-EXPORTER拉取數據流程
4.1.1拉取所有消費者組
在getConsumerGroupMetrics函數中,首先通過broker.ListGroups(&sarama.ListGroupsRequest{})向Kafka broker拉取所有的消費者組(group)列表。
這一步會返回集群中所有存在的group名稱。
groupIds := make([]string, 0)for groupId := range groups.Groups {if e.groupFilter.MatchString(groupId) {groupIds = append(groupIds, groupId)}}
4.1.2 過濾消費者組
拉取到所有group后,遍歷每個groupId,用e.groupFilter.MatchString(groupId)判斷該group是否匹配過濾條件。
只有匹配的groupId才會被加入到后續的處理流程。
groupIds := make([]string, 0)for groupId := range groups.Groups {if e.groupFilter.MatchString(groupId) {groupIds = append(groupIds, groupId)}}
4.1.3 DESCRIBE消費者組
只對通過過濾的group進行DescribeGroups、FetchOffset等詳細指標采集。
describeGroups, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{Groups: groupIds})if err != nil {glog.Errorf("Cannot get describe groups: %v", err)return}for _, group := range describeGroups.Groups {offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: 1}if e.offsetShowAll {for topic, partitions := range offset {for partition := range partitions {offsetFetchRequest.AddPartition(topic, partition)}}