查看kafka服務版本
[root@localhost eicar]# kafka-server-start.sh --version
[2025-06-23 11:10:54,106] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
3.3.2 (Commit:b66af662e61082cb)
[root@localhost eicar]#
查看消費者組信息
[root@localhost eicar]# kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
python-consumer-group
[root@localhost eicar]#
查看指定消費者組詳細信息
一個組可以有多個消費者。
主題中的消息只能被同一個組中的一個消費者消費。
一個主題可以被多個消費者組消費。
[root@localhost opt]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group python-consumer-group --describeGROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
python-consumer-group test-topic 0 39 39 0 rdkafka-76d00c65-de7c-4b57-9213-7af52aa7cdd8 /172.16.1.108 rdkafka
偏移量就是消費者消費到哪個消息了,kafka集群有記錄的該信息。
CURRENT-OFFSET: 消費者當前消費到的偏移量
LOG-END-OFFSET: 分區最新消息的偏移量(該第39了)
LAG: 消費滯后量(未消費的消息數)
創建topic
kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
列出所有topic
[root@localhost eicar]# kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
test-topic
[root@localhost eicar]#
啟動kafka生產者
kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
啟動kafka消費者
kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
python kafka消費者實現過程:
- 初始化消費者服務
- 訂閱主題
- 檢測消息
- 解析消息