Kafka消費者是消息系統的關鍵組成部分,掌握/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh工具的使用對于調試、測試和監控都至關重要。本文將全面介紹該工具的各種用法,幫助您高效地從Kafka消費消息。
1 基礎消費模式
1.1 從最新位置消費
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic
參數解析:
- --bootstrap-server: 指定Kafka集群地址
- --topic: 指定消費的主題名稱
特點:
- 從該消費者組最后提交的offset開始消費
- 如果沒有提交記錄,則從最新消息開始
1.2 從最早位置消費
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--from-beginning
關鍵參數:
- --from-beginning: 從主題最早的消息開始消費
應用場景:
2 消息元數據展示
2.1 顯示消息Key
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property print.key=true \--property key.separator=":"
參數說明:
- print.key=true: 顯示消息key
- key.separator: 指定key/value分隔符
2.2 顯示完整元數據
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property print.key=true \--property print.value=true \--property print.partition=true \--property print.offset=true \--property print.timestamp=true \--property key.separator=":" \--property line.separator="\n"
元數據參數:
- print.partition: 顯示分區號
- print.offset: 顯示消息offset
- print.timestamp: 顯示時間戳
3 精準消費控制
3.1 指定分區消費
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--partition 1
參數說明:
3.2 指定Offset消費
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--partition 0 \--offset 1000
參數說明:
- --offset: 指定開始消費的offset位置
3.3 消費超時設置
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--timeout-ms 10000
參數說明:
- --timeout-ms: 設置無消息時的超時時間(毫秒)
4 消費者組管理
4.1 使用消費者組
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--group mygroup
參數說明:
特點:
- 支持offset自動提交
- 支持消費者組rebalance
4.2 手動控制offset提交
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--group mygroup \--property enable.auto.commit=false
參數說明:
- enable.auto.commit=false: 關閉自動提交
5 高級消費配置
5.1 限制消費消息數
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--max-messages 100
參數說明:
5.2 過濾消費
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property filter.key.regex="test.*"
參數說明:
- filter.key.regex: 按key正則過濾
5.3 消費速率控制
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property fetch.min.bytes=1024 \--property fetch.max.wait.ms=500
參數說明:
- fetch.min.bytes: 最小拉取字節數
- fetch.max.wait.ms: 最大等待時間
6 常用命令擴展
6.1 消費多個主題
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--whitelist "topic1|topic2"
6.2 顯示消息頭信息
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property print.headers=true
6.3 消費特定時間后的消息
/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property offsets.storage=kafka \--property timestamp=1625097600000