如果您在應用程序上下文中定義了KafkaAdmin bean,它可以自動向代理添加主題。為此,您可以將每個主題的NewTopic@Bean添加到應用程序上下文中。2.3版本引入了一個新的類TopicBuilder,使創建此類bean更加方便。以下示例顯示了如何執行此操作:
@Bean
public KafkaAdmin admin() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");return new KafkaAdmin(configs);
}@Bean
public NewTopic topic1() {return TopicBuilder.name("thing1").partitions(10).replicas(3).compact().build();
}@Bean
public NewTopic topic2() {return TopicBuilder.name("thing2").partitions(10).replicas(3).config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd").build();
}@Bean
public NewTopic topic3() {return TopicBuilder.name("thing3").assignReplicas(0, List.of(0, 1)).assignReplicas(1, List.of(1, 2)).assignReplicas(2, List.of(2, 0)).config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd").build();
}
從2.6版本開始,您可以省略partitions()和/或replias(),代理默認值將應用于這些屬性。代理版本必須至少為2.4.0才能支持此功能-請參閱KIP-464。
@Bean
public NewTopic topic4() {return TopicBuilder.name("defaultBoth").build();
}@Bean
public NewTopic topic5() {return TopicBuilder.name("defaultPart").replicas(1).build();
}@Bean
public NewTopic topic6() {return TopicBuilder.name("defaultRepl").partitions(3).build();
}
從2.7版本開始,您可以在一個KafkaAdmin中聲明多個NewTopics。NewTopics bean定義:
@Bean
public KafkaAdmin.NewTopics topics456() {return new NewTopics(TopicBuilder.name("defaultBoth").build(),TopicBuilder.name("defaultPart").replicas(1).build(),TopicBuilder.name("defaultRepl").partitions(3).build());
}
使用Spring Boot時,KafkaAdmin bean會自動注冊,因此您只需要NewTopic(和/或NewTopics)@Beans。
默認情況下,如果代理不可用,則會記錄一條消息,但上下文會繼續加載。您可以通過編程調用管理員的initialize()方法,稍后重試。如果您希望將此情況視為致命,請將管理員的fatalIfBrokerNotAvailable屬性設置為true。隨后,上下文初始化失敗。
如果代理支持它(1.0.0或更高版本),如果發現現有主題的分區數少于NewTopic.numPartitions,管理員會增加分區數。
從2.7版本開始,KafkaAdmin提供了在運行時創建和檢查主題的方法。
創建或修改主題
描述主題
對于更高級的功能,您可以直接使用AdminClient。以下示例顯示了如何執行此操作:
@Autowired
private KafkaAdmin admin;...AdminClient client = AdminClient.create(admin.getConfigurationProperties());...client.close();
從2.9.10、3.0.9版本開始,您可以提供一個Predicate<NewTopic>,用于確定是否應考慮創建或修改特定的NewTopic bean。例如,如果您有多個指向不同集群的KafkaAdmin實例,并且希望選擇應由每個管理員創建或修改的主題,則這很有用。
admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));