一、集群管理
-
前臺啟動Broker
bin/kafka-server-start.sh <path>/server.properties
- 關閉方式:
Ctrl + C
- 關閉方式:
-
后臺啟動Broker
bin/kafka-server-start.sh -daemon <path>/server.properties
-
關閉Broker
bin/kafka-server-stop.sh
二、Topic管理
操作 | 命令 |
---|---|
創建Topic | bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 3 --topic <topicname> |
刪除Topic | bin/kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic <topicname> |
查詢Topic列表 | bin/kafka-topics.sh --bootstrap-server localhost:9092 --list |
查詢Topic詳情 | bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic <topicname> |
增加分區數 | bin/kafka-topics.sh --alter --bootstrap-server localhost:9092 --partitions 6 --topic <topicname> |
注意:新版本推薦使用
--bootstrap-server
替代--zookeeper
三、Consumer Groups管理
操作 | 命令 |
---|---|
查詢消費組列表 | bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list |
查詢消費組詳情 | bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <groupname> |
刪除消費組 | bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group <groupname> |
- 重設消費位移:
# 重置到最早位移 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <groupname> --reset-offsets --to-earliest --execute# 重置到最新位移 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <groupname> --reset-offsets --to-latest --execute# 重置到指定位移 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <groupname> --reset-offsets --to-offset 2000 --execute
四、常用運維工具
-
生產者控制臺
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic <topicname>
- 高級參數:
--compression-codec lz4
(壓縮)
--request-required-acks all
(ACK機制)
--message-send-max-retries 10
(重試次數)
- 高級參數:
-
消費者控制臺
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <topicname> --group <groupname> --from-beginning
-
性能測試工具:
# 生產者性能測試 bin/kafka-producer-perf-test.sh --topic <topic> --num-records 1000000 --record-size 1000# 消費者性能測試 bin/kafka-consumer-perf-test.sh --topic <topic> --messages 1000000
-
獲取Topic消息數
# 最新位移(總消息數) bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic <topic> --time -1# 最早位移 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic <topic> --time -2
五、分區管理
-
Preferred Leader選舉
bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --topic <topic> --partition 0
-
分區重分配:
# 生成遷移計劃 bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --generate --topics-to-move-json-file plan.json# 執行遷移 bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file reassign.json# 驗證進度 bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --verify --reassignment-json-file reassign.json
六、高級運維
-
查看日志段元數據
bin/kafka-dump-log.sh --files 000000000000.log --print-data-log
-
跨集群復制(MirrorMaker)
bin/kafka-mirror-maker.sh --consumer.config consumer.conf --producer.config producer.conf --whitelist ".*"
-
查看__consumer_offsets
bin/kafka-console-consumer.sh --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
關鍵注意事項:
-
版本兼容性:
- ≥ Kafka 2.2.x 推薦使用
--bootstrap-server
替代--zookeeper
- 刪除Topic需確保
delete.topic.enable=true
- ≥ Kafka 2.2.x 推薦使用
-
生產環境建議:
- 分區重分配避免高峰時段操作
- 重設位移前先停止消費者組
- MirrorMaker需監控復制延遲
-
常用診斷技巧:
# 檢查消息積壓(Lag) bin/kafka-consumer-groups.sh --describe --group <group> | awk '{print $6}'# 檢查Controller狀態 bin/kafka-metadata-quorum.sh --status
完整文檔參考:Apache Kafka Operations