kafka 腳本
connect-distributed.sh
connect-mirror-maker.sh
connect-standalone.sh
kafka-acls.sh
kafka-broker-api-versions.sh
kafka-configs.sh
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-consumer-groups.sh
kafka-consumer-perf-test.sh
kafka-delegation-tokens.sh
kafka-delete-records.sh
kafka-dump-log.sh
kafka-leader-election.sh
kafka-log-dirs.sh
kafka-mirror-maker.sh
kafka-preferred-replica-election.sh
kafka-producer-perf-test.sh
kafka-reassign-partitions.sh
kafka-replica-verification.sh
kafka-run-class.sh
kafka-server-start.sh
kafka-server-stop.sh
kafka-streams-application-reset.sh
kafka-topics.sh
kafka-verifiable-consumer.sh
kafka-verifiable-producer.sh
trogdor.sh
windows
zookeeper-security-migration.sh
zookeeper-server-start.sh
zookeeper-server-stop.sh
zookeeper-shell.sh
集群管理
# 啟動zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &# 停止zookeeper
bin/zookeeper-server-stop.sh# 前臺啟動broker Ctrl + C 關閉
bin/kafka-server-start.sh <path>/server.properties# 后臺啟動broker
bin/kafka-server-start.sh -daemon <path>/server.properties# 關閉broker
bin/kafka-server-stop.sh
topic相關
# 創建topic(4個分區,2個副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
# kafka版本 >= 2.2
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test--create:指定創建topic動作--topic:指定新建topic的名稱--zookeeper: 指定kafka連接zk的連接url,該值和server.properties文件中的配置項{zookeeper.connect}一樣--partitions:指定當前創建的kafka分區數量,默認為1個--replication-factor:指定每個分區的復制因子個數,默認1個# 分區擴容,注意:分區數量只能增加,不能減少
# kafka版本 < 2.2
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
# kafka版本 >= 2.2
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic topic1 --partitions 2# 刪除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test# 查詢topic列表
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
# 查詢topic列表(支持0.9版本+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092# 查看所有topic的詳細信息
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe # 查詢topic詳情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topicname
消費者組
查看consumer group列表有新、舊兩種命令,分別查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要區分指定bootstrap–server和zookeeper參數
# 新消費者列表查詢(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
# 消費者列表查詢(支持0.10版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list# 顯示某個消費組的消費詳情(僅支持offset存儲在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
# 顯示某個新消費組的消費詳情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group my-group
## 顯示某個消費組的消費詳情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group# 重設消費者組位移
# 最早處
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-earliest --execute
# 最新處
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-latest --execute
# 某個位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-offset 2000 --execute
# 調整到某個時間之后的最早位移
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-datetime 2019-09-15T00:00:00.000# 刪除消費者組
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --delete --group groupname
顯示某個消費組的消費詳情
各字段含義如下
TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID | HOST | CLIENT-ID |
---|---|---|---|---|---|---|---|
topic名字 | 分區id | 當前已消費的條數 | 總條數 | 未消費的條數 | 消費id | 主機ip | 客戶端id |
發送和消費
# 生產者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test# 消費者,其中"--from-beginning"為可選參數,表示要從頭消費消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
# 指定groupid
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning --consumer-property group.id=old-consumer-group
# 指定分區
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning --partition 0# 新生產者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties# 新消費者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties# kafka-verifiable-consumer.sh(消費者事件,例如:offset提交等)
bin/kafka-verifiable-consumer.sh --broker-list localhost:9092 --topic test --group-id groupName# 高級點的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234 --max-messages 10
切換leader
# kafka版本 <= 2.4
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot# kafka新版本
bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port
kafka自帶壓測命令
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092
kafka持續發送消息
持續發送消息到指定的topic中,且每條發送的消息都會有響應信息:
kafka-verifiable-producer.sh --broker-list $(hostname -i):9092 --topic test --max-messages 100000
zookeeper-shell.sh
如果kafka集群的zk配置了chroot路徑,那么需要加上/path
。
bin/zookeeper-shell.sh localhost:2181[/path]
ls /brokers/ids
get /brokers/ids/0
遷移分區
- 創建規則json
cat > increase-replication-factor.json <<EOF
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":3,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":4,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":5,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":6,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":7,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":8,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":9,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":10,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":11,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":12,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":13,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":14,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":15,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":16,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":17,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":18,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":19,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":20,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":21,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":22,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":23,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":24,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":25,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":26,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":27,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":28,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":29,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":30,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":31,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":32,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":33,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":34,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":35,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":36,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":37,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":38,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":39,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":40,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":41,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":42,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":43,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":44,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":45,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":46,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":47,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":48,"replicas":[0,1]},
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1]}]
}
EOF
- 執行
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
- 驗證
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
MirrorMaker 跨機房災備工具
bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist topicA|topicB