使用Kafka的同學都知道,我們每次創建Kafka主題(Topic)的時候可以指定分區數和副本數等信息,如果將這些屬性配置到server.properties文件中,以后調用Java API生成的主題將使用默認值,先改變需要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000顯示的修改,我們也希望將此過程在Producer調用之前通過API的方式進行設定,無需在之前或之后使用腳本進行操作,所以才了這篇文章。查看源碼發現,其實內部所有的實現都是通過TopicCommand的main方法,在此記錄兩種方式:
1、創建主題(Topic)
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
【JAVA API方式】:
String[]?options?=?new?String[]{
"--create",
"--zookeeper",
"zk_host:port/chroot",
"--partitions",
"20",
"--topic",
"my_topic_name",
"--replication-factor",
"3",
"--config",
"x=y"
};
TopicCommand.main(options);
2、查看所有主題
【命令方式】:bin/kafka-topics.sh --list --zookeeper localhost:2181
【JAVA API方式】:
String[]?options?=?new?String[]{
"--list",
"--zookeeper",
"localhost:2181"
};
TopicCommand.main(options);
3、查看指定主題:
【命令方式】:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
【JAVA API方式】:
String[]?options?=?new?String[]{
"--describe",
"--zookeeper",
"localhost:2181",
"--topic",
"my-replicated-topic",
};
TopicCommand.main(options);
4、修改主題:
【命令方式】:bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
【JAVA API方式】:
String[]?options?=?new?String[]{
"--alter",
"--zookeeper",
"zk_host:port/chroot",
"--topic",
"my_topic_name",
"--deleteConfig",
"x"
};
TopicCommand.main(options);
5、刪除出題:
【命令方式】:無
【JAVA API方式】:
String[]?options?=?new?String[]{
"--zookeeper",
"zk_host:port/chroot",
"--topic",
"my_topic_name"
};
DeleteTopicCommand.main(options);