kafka安裝先安裝zookeeper,jdk
確保jdk版本與kafka版本匹配:
?
先啟動zookeeper:
? ? # 啟動獨立安裝的zookeeper
? ? ./zkServer.sh start
? ??
? ? # 也可以自動kafka自帶的zookerper
? ? ./zookeeper-server-start.sh ../config/zookeeper.properties
再啟動kafka:
? ? cd /usr/local/kafka_2.12-3.1.0/bin
? ??
? ? ./kafka-server-start.sh -daemon ../config/server.properties
關閉kafka:
? ? # 關閉kafka
? ? ./kafka-server-stop.sh
? ??
? ? # 關閉zookeeper
? ? ./zkServer.sh stop
生產者:
創建一個topic:test20250604
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test20250604
查看創建的topic列表:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
查詢topic的詳細信息:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_kafka_topic
說明:如果未指定 topic 則輸出所有 topic 的信息
增加topic的partition數:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test_kafka_topic --partitions 5?
?
查看 topic 指定分區 offset 的最大值或最小值
time 為 -1 時表示最大值,為 -2 時表示最小值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test_kafka_topic --time -1 --broker-list 127.0.0.1:9092 --partitions 0?
刪除topic:
刪除名為 test_kafka_topic 的 Topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 ?--topic test_kafka_topic
說明:在${KAFKA_HOME}/config/server.properties中配置 delete.topic.enable 為 true,這樣才能生效,刪除指定的 topic主題
消費者:
消費消息(從頭開始)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test20250604 --from-beginning
--from-beginning為限制消費從頭開始消費
?? ?--from-beginning #為可選參數,表示要從頭消費消息
?? ?--from-earliest #從最早的消息開始消費(待驗證)
?? ?--from-latest #從最新的消息開始消費
?? ?--指定offset #從指定的位置開始消費
消費消息(從尾開始)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_kafka_topic --offset latest
?
消費消息(從尾開始制定分區)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_kafka_topic --offset latest --partition 0
?
消費消息(指定分區和偏移量)
–partition 指定起始偏移量消費–offset:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_kafka_topic ?--partition 0 --offset 100?
?
消費者消費消息(指定分組)
注意給客戶端命名之后,如果之前有過消費,那么–from-beginning就不會再從頭消費了
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 ?--from-beginning --topic test_kafka_topic --group t1
?
取指定個數
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_kafka_topic --offset latest --partition 0 --max-messages 1
?
消費者group:
指定分組從頭開始消費消息(應該會指定偏移量)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group --from-beginning
消費者group列表:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
?
查看group詳情:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --describe
輸出日志:
Consumer group 'test_group' has no active members.
TOPIC ? ? ? ? ? PARTITION ?CURRENT-OFFSET ?LOG-END-OFFSET ?LAG ? ? ? ? ? ? CONSUMER-ID ? ? HOST ? ? ? ? ? ?CLIENT-ID
test ? ? ? ? ? ?0 ? ? ? ? ?5 ? ? ? ? ? ? ? 5 ? ? ? ? ? ? ? 0 ? ? ? ? ? ? ? - ? ? ? ? ? ? ? - ? ? ? ? ? ? ? -
# CURRENT-OFFSET: 當前消費者群組最近提交的 offset,也就是消費者分區里讀取的當前位置
# LOG-END-OFFSET: 當前最高水位偏移量,也就是最近一個讀取消息的偏移量,同時也是最近一個提交到集群的偏移量
# LAG:消費者的 CURRENT-OFFSET 與 broker 的 LOG-END-OFFSET 之間的差距
?
刪除group中topic:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --topic test --delete
刪除group:
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --delete
?
補充:
平衡leader:
bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092
?
自帶壓測工具:
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092?
?