KafkaAdmin-請參閱配置主題
ProducerFactory-請參閱發送消息
ConsumerFactory-請參閱接收消息
從2.5版本開始,每個版本都擴展了KafkaResourceFactory。這允許在運行時通過向引導服務器的配置中添加Supplier<String>來更改引導服務器:setBootstrapServersSupplier(()->…)。所有新連接都將調用此命令以獲取服務器列表。消費者和生產者通常壽命較長。要關閉現有的生產者,請在DefaultKafkaProducerFactory上調用reset()。要關閉現有的Consumers,請在KafkaListenerEndpointRegistry上調用stop()(然后調用start())和/或在任何其他偵聽器容器bean上調用stop()和start()。
為了方便起見,該框架還提供了一個支持兩組引導服務器的ABSwitch集群;其中一個在任何時候都是活動的。通過調用setBootstrapServersSupplier()配置ABSwitchCluster并將其添加到生產者和消費者工廠以及KafkaAdmin。當你想切換時,在生產者工廠上調用primary()或secondary()并調用reset()來建立新的連接;對于消費者,stop()和start()都是監聽器容器。使用@KafkaListeners時,停止()和啟動()KafkaListenerEndpointRegistry bean。
有關更多信息,請參閱Javadocs。
Factory Listeners
從2.5版本開始,DefaultKafkaProducerFactory和DefaultKafkaConsumerFactory可以配置一個監聽器,以便在創建或關閉生產者或消費者時接收通知。
interface Listener<K, V> {default void producerAdded(String id, Producer<K, V> producer) {}default void producerRemoved(String id, Producer<K, V> producer) {}}
interface Listener<K, V> {default void consumerAdded(String id, Consumer<K, V> consumer) {}default void consumerRemoved(String id, Consumer<K, V> consumer) {}}
在每種情況下,id都是通過將客戶端id屬性(在創建后從metrics()中獲得)附加到工廠beanName屬性中創建的,用..分隔。。
例如,這些監聽器可用于在創建新客戶端時創建和綁定Micrometer KafkaClientMetrics實例(并在客戶端關閉時關閉它)。
該框架提供了正是這樣做的監聽器;請參見千分尺本地度量。
Default client ID prefixes
從版本3.2開始,對于使用Spring.application.name屬性定義應用程序名稱的Spring Boot應用程序,此名稱現在用作這些客戶端類型的自動生成客戶端ID的默認前綴:
不使用消費者群體的消費者客戶
生產商客戶
管理員客戶端
這使得在服務器端更容易識別這些客戶端,以便進行故障排除或應用配額。
Client Type | Without application name | With application name |
---|
consumer without consumer group | consumer-null-1 | myapp-consumer-1 |
consumer with consumer group "mygroup" | consumer-mygroup-1 | consumer-mygroup-1 |
producer | producer-1 | myapp-producer-1 |
admin | adminclient-1 | myapp-admin-1 |