創建 Kafka Topic 的命令
以下是創建 Kafka Topic 的幾種常用方法:
1. 使用 kafka-topics.sh 基礎命令(Kafka 自帶工具)
bin/kafka-topics.sh --create \--bootstrap-server <broker地址:端口> \--topic <topic名稱> \--partitions <分區數> \--replication-factor <副本數>
bin/kafka-topics.sh --create \--bootstrap-server localhost:9092 \--topic my_new_topic \--partitions 3 \--replication-factor 2
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test01_topic --partitions 2 --replication-factor 2
2. 帶額外配置的創建命令
bin/kafka-topics.sh --create \--bootstrap-server localhost:9092 \--topic my_config_topic \--partitions 5 \--replication-factor 1 \--config retention.ms=172800000 \--config segment.bytes=1073741824
3. 使用 ZooKeeper 的舊版命令(Kafka 2.2 之前版本)
bin/kafka-topics.sh --create \--zookeeper localhost:2181 \--topic legacy_topic \--partitions 2 \--replication-factor 1
4. 使用 Kafka API (Java) 創建 Topic
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); AdminClient admin = AdminClient.create(props);NewTopic newTopic = new NewTopic("my_api_topic", 3, (short) 1); newTopic.configs(Map.of("retention.ms", "86400000"));CreateTopicsResult result = admin.createTopics(Collections.singletonList(newTopic)); result.all().get(); // 等待創建完成
5. 驗證 Topic 是否創建成功
bin/kafka-topics.sh --list --bootstrap-server localhost:9092 bin/kafka-topics.sh --describe --topic my_new_topic --bootstrap-server localhost:9092
常用配置參數
可以在創建時通過?--config
?指定:
-
retention.ms
?- 消息保留時間(毫秒) -
`segment.bytes
查看 Kafka 所有 Topic 的命令
以下是幾種查看 Kafka 中所有 Topic 的常用方法:
1. 使用 kafka-topics.sh 基礎命令(推薦)
bin/kafka-topics.sh --list --bootstrap-server <broker地址:端口> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
2. 查看所有 Topic 的詳細信息(包括分區、副本等)
bin/kafka-topics.sh --describe --bootstrap-server <broker地址:端口> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092
3. 使用 ZooKeeper 的舊版命令(Kafka 2.2 之前版本)
bin/kafka-topics.sh --list --zookeeper <zookeeper地址:端口> bin/kafka-topics.sh --list --zookeeper localhost:2181
4. 使用 kafkacat 工具
kafkacat -L -b <broker地址:端口> kafkacat -L -b localhost:9092
5. 使用 Kafka API (Java)
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); AdminClient admin = AdminClient.create(props);ListTopicsResult topics = admin.listTopics(); Set<String> topicNames = topics.names().get();topicNames.forEach(System.out::println);
6. 查看包含內部 Topic 的所有 Topic(如 __consumer_offsets)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092 --exclude-internal
7. 使用正則表達式過濾 Topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092 | grep "your_pattern"
注意事項
-
新版本 Kafka(2.2+)推薦使用?
--bootstrap-server
?而不是?--zookeeper
-
生產環境中可能需要添加認證參數,如:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092 --command-config admin.properties
-
對于大型集群,列出所有 Topic 可能需要一些時間
查看 Kafka Topic 分區信息的命令
以下是幾種查看 Kafka Topic 分區信息的常用方法:
1. 使用 kafka-topics.sh 工具(Kafka 自帶)
bin/kafka-topics.sh --describe --topic <topic名稱> --bootstrap-server <broker地址:端口> bin/kafka-topics.sh --describe --topic my_topic --bootstrap-server localhost:9092
2. 查看所有 topic 的分區信息
bin/kafka-topics.sh --list --bootstrap-server <broker地址:端口> bin/kafka-topics.sh --describe --bootstrap-server <broker地址:端口>
bin/kafka-topics.sh --describe --topic my_topic --bootstrap-server localhost:9092
?
3. 使用 kafkacat 工具
kafkacat -L -b <broker地址:端口> -t <topic名稱>
4. 使用 Kafka API (Java)
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); AdminClient admin = AdminClient.create(props);DescribeTopicsResult result = admin.describeTopics(Collections.singletonList("my_topic")); TopicDescription description = result.all().get().get("my_topic");description.partitions().forEach(partition -> {System.out.println("Partition: " + partition.partition());System.out.println("Leader: " + partition.leader());System.out.println("Replicas: " + partition.replicas());System.out.println("ISR: " + partition.isr()); });
輸出信息解釋
Topic: my_topic PartitionCount: 3 ReplicationFactor: 2 Configs:Topic: my_topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2Topic: my_topic Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0Topic: my_topic Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
其中:
-
PartitionCount
: 分區總數 -
ReplicationFactor
: 副本因子數 -
Leader
: 負責該分區讀寫的主 broker -
Replicas
: 該分區的所有副本所在的 broker -
Isr
: 同步中的副本(In-Sync Replicas)