目錄
- 一、消息的接收
- 1.1、消息監聽器
- 二、消息監聽容器
- 2.1、 實現方法
- 2.1.1、KafkaMessageListenerContainer
- 2.1.1.1、 基本概念
- 2.1.1.2、如何使用 KafkaMessageListenerContainer
- 2.1.2、ConcurrentMessageListenerContainer
- 三、偏移
- 四、監聽器容器自動啟動
一、消息的接收
消息的接收:可以通過配置MessageListenerContainer并提供消息偵聽器或使用@KafkaListener注釋來接收消息。本章我們主要說明通過配置MessageListenerContainer并提供消息偵聽器的方式接收消息。
1.1、消息監聽器
當使用消息監聽容器時,就必須提供一個監聽器來接收數據。目前有八個支持消息偵聽器的接口:
public interface MessageListener<K, V> { // 當使用自動提交或容器管理的提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的各個 ConsumerRecord 實例。void onMessage(ConsumerRecord<K, V> data);
}public interface AcknowledgingMessageListener<K, V> { // 當使用手動提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的各個 ConsumerRecord 實例。void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { // 當使用自動提交或容器管理的提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的各個 ConsumerRecord 實例。提供對 Consumer 對象的訪問。void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);}public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { //當使用手動提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的各個 ConsumerRecord 實例。提供對 Consumer 對象的訪問。void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);}public interface BatchMessageListener<K, V> { //當使用自動提交或容器管理的提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的所有 ConsumerRecord 實例。使用此接口時不支持 AckMode.RECORD,因為偵聽器會獲得完整的批次。void onMessage(List<ConsumerRecord<K, V>> data);}public interface BatchAcknowledgingMessageListener<K, V> { // 當使用手動提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的所有 ConsumerRecord 實例。void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);}public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { // 當使用自動提交或容器管理的提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的所有 ConsumerRecord 實例。使用此接口時不支持 AckMode.RECORD,因為偵聽器會獲得完整的批次。提供對 Consumer 對象的訪問。void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);}public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { //當使用手動提交方法之一時,使用此接口處理從 Kafka 消費者 poll() 操作接收到的所有 ConsumerRecord 實例。提供對 Consumer 對象的訪問。void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);}
注意:1、 Consumer對象不是線程安全的;2、不應執行任何Consumer<?, ?>影響消費者位置和/或監聽器中已提交偏移量的方法;容器需要管理這些信息。
二、消息監聽容器
2.1、 實現方法
MessageListenerContainer 提供了兩種實現方式 :
1、KafkaMessageListenerContainer,
2、ConcurrentMessageListenerContainer
2.1.1、KafkaMessageListenerContainer
2.1.1.1、 基本概念
KafkaMessageListenerContainer在單個線程上接收來自所有主題或分區的所有消息。委托ConcurrentMessageListenerContainer給一個或多個KafkaMessageListenerContainer實例以提供多線程消費。
- 從2.2.7版本開始,可以添加一個記錄攔截器(RecordInterceptor)監聽器容器;它將在調用偵聽器之前調用,以允許檢查或修改記錄。如果攔截器返回 null,則不會調用偵聽器。
- 從版本 2.7 開始,它具有在偵聽器退出后(通常或通過拋出異常)調用的附加方法。
- 批處理攔截器(BatchInterceptor)為批量監聽器(Batch Listeners)提供類似的功能。
- 此外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供對 Consumer<?, ?> 的訪問。 例如,這可以用于訪問攔截器中的消費者指標。
- CompositeRecordInterceptor and CompositeBatchInterceptor可以調用多個攔截器。
- 默認情況下,當使用事務時,攔截器在事務啟動后被調用。從版本 2.3.4 開始,可以設置偵聽器容器的 interceptBeforeTx 屬性在事務開始之前調用攔截器。
- 從版本 2.3.8、2.4.6 開始,當并發大于 1 時 ConcurrentMessageListenerContainer 支持靜態成員資格。 group.instance.id 后綴為 -n ,起始n于1。這與增加 session.timeout.ms 的值 一起可用于減少重新平衡事件,例如,當應用程序實例重新啟動時。
- 靜態成員資格是指在提高流應用程序、消費者組和其他構建在組再平衡協議之上的應用程序的可用性。再平衡協議依賴組協調器為組成員分配實體 ID。這些生成的 ID 是短暫的,并且會在成員重新啟動和重新加入時發生變化。對于基于消費者的應用程序,這種“動態成員資格”可能會導致在管理操作(例如代碼部署、配置更新和定期重新啟動)期間將大部分任務重新分配給不同的實例。對于大型狀態應用程序,洗牌任務在處理之前需要很長時間才能恢復其本地狀態,從而導致應用程序部分或完全不可用。受這一觀察的啟發,Kafka 的組管理協議允許組成員提供持久的實體 ID。根據這些 ID,組成員資格保持不變,因此不會觸發重新平衡。
同樣的,攔截器中不應該執行任何影響消費者的位置和/或提交的偏移量的方法,容器需要管理這些信息。
如果攔截器改變了記錄(通過創建新記錄),則topic、partition和offset必須保持不變,以避免意外的副作用,例如記錄丟失。
2.1.1.2、如何使用 KafkaMessageListenerContainer
-
KafkaMessageListenerContainer 構造函數
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)
該構造函數接收接收消費者工廠(ConsumerFactory)有關對象中主題和分區以及其他配置的信息。
-
容器屬性(ContainerProperties)包含3個構造函數,下面我們一個一個介紹它們。
1、以TopicPartitionOffset為參數public ContainerProperties(TopicPartitionOffset... topicPartitions)
該構造函數采用一個主題分區偏移量(TopicPartitionOffset)參數數組來顯式指示容器要使用哪些分區(使用消費者assign()方法)并帶有可選的初始偏移量。默認情況下,正值是絕對偏移量,負值是相對于分區內當前最后一個偏移量。TopicPartitionOffset提供了一個帶有附加參數的構造函,boolean如果是true,則在容器啟動時相對于該消費者的當前位置初始偏移(正或負)。
2、以String為參數public ContainerProperties(String... topics)
該構造函數采用主題數組,Kafka 根據屬性分配分區group.id——在組中分配分區
3、以Pattern為參數public ContainerProperties(Pattern topicPattern)
該構造函數使用正則表達式Pattern來選擇主題。
-
如何將監聽器分配給容器
監聽器有了容器也有了,如何將監聽器分配給容器呢?。要將 MessageListener 分配給容器,可以在創建 Container 時使用 ContainerProps.setMessageListener 方法:ContainerProperties containerProps = new ContainerProperties("topic1", "topic2"); containerProps.setMessageListener(new MessageListener<Integer, String>() {... }); DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<>(consumerProps()); KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps); return container;
要注意的是,在創建 DefaultKafkaConsumerFactory 時,使用僅接受上述屬性的構造函數意味著從配置中選取鍵和值反序列化器類。 或者,反序列化器實例可以傳遞到 DefaultKafkaConsumerFactory 構造函數以獲取鍵和/或值,在這種情況下,所有消費者共享相同的實例。 另一種選擇是提供Supplier(從版本2.3開始),它將用于為每個消費者獲取單獨的Deserializer實例:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps); return container;
從版本 2.3.5 開始,引入了一個名為authorizationExceptionRetryInterval 的新容器屬性。 這會導致容器在從 KafkaConsumer 獲取任何 AuthorizationException 后重試獲取消息。 例如,當配置的用戶被拒絕讀取特定主題時,就會發生這種情況。 定義authorizationExceptionRetryInterval應該有助于應用程序在授予適當的權限后立即恢復。
2.1.2、ConcurrentMessageListenerContainer
ConcurrentMessageListenerContainer只有一個構造函數與構造函數類似 KafkaListenerContainer。
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)
它有一個concurrency屬性,這個屬性的作用是創建幾個 KafkaMessageListenerContainer 實例。例如:container.setConcurrency(3) 創建三個 KafkaMessageListenerContainer 實例。
當監聽多個主題時,默認的分區分布可能不是我們所期望的。 例如,如果有 3 個主題,每個主題有 5 個分區,并且我們想要使用 concurrency=15,但是我們只會看到 5 個活動使用者,每個使用者從每個主題分配一個分區,而其他 10 個使用者處于空閑狀態。 這是因為默認的 Kafka PartitionAssignor 是 RangeAssignor。 對于這種情況,我們需要考慮使用 RoundRobinAssignor,它將分區分配給所有使用者。 然后,為每個消費者分配一個主題或分區。 我們可以在提供給DefaultKafkaConsumerFactory的屬性中設置partition.assignment.strategy消費者屬性來更改要更改PartitionAssignor。(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。
在springboot中可以這樣:
spring.kafka.consumer.properties.partition.assignment.strategy=
org.apache.kafka.clients.consumer.RoundRobinAssignor
當使用 TopicPartitionOffset 配置容器屬性時,ConcurrentMessageListenerContainer 會在委托 KafkaMessageListenerContainer 實例之間分發 TopicPartitionOffset 實例。
假設提供了 6 個 TopicPartitionOffset 實例,并發度為 3; 每個容器有兩個分區。 對于五個
TopicPartitionOffset 實例,兩個容器獲得兩個分區,第三個容器獲得一個分區。 如果并發數大于TopicPartition的數量,則降低并發數,使每個容器獲得一個分區。
三、偏移
spring提供了幾個偏移選項, 如果 enable.auto.commit 消費者屬性為 true,Kafka會根據其配置自動提交偏移量。 如果為 false,則容器支持多種 AckMode 設置。 默認 AckMode 為 BATCH。
從版本 2.3 開始,框架將 enable.auto.commit 設置為 false,除非在配置中明確設置。以前,如果未設置該屬性,則使用 Kafka 默認值 (true)。
消費者 poll() 方法返回一個或多個 ConsumerRecord。 為每條記錄調用 MessageListener。 以下列表描述了容器對每個 AckMode 采取的操作(當未使用事務時):
-
RECORD:當偵聽器處理記錄后返回時提交偏移量。
-
BATCH:當 poll() 返回的所有記錄都已處理完畢時提交偏移量。
-
TIME:當 poll() 返回的所有記錄都處理完畢后,只要超過了自上次提交以來的 ackTime,就提交偏移量。
-
COUNT:當 poll() 返回的所有記錄都已處理完畢時,提交偏移量,只要自上次提交以來已收到 ackCount 條記錄。
-
COUNT_TIME:與 TIME 和 COUNT 類似,但如果任一條件為真,則執行提交。
-
MANUAL:消息偵聽器負責acknowledge() 確認。 之后,應用與 BATCH 相同的語義。
-
MANUAL_IMMEDIATE:當偵聽器調用 Acknowledgment.acknowledge() 方法時立即提交偏移量。
使用事務(transactions)時,偏移量將發送到事務,語義相當于 RECORD 或 BATCH,具體取決于偵聽器類型(記錄或批處理)。MANUAL 和 MANUAL_IMMEDIATE 要求偵聽器是 AcknowledgingMessageListener 或 BatchAcknowledgingMessageListener。
根據syncCommits容器屬性,使用消費者上的commitSync()或commitAsync()方法。 默認情況下,syncCommits 為 true。
作者個人建議建議設置:ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 為 false。
從版本 2.3 開始,Acknowledgment 接口增加了兩個方法 nack(long sleep) 和 nack(int index, long sleep)。 第一個與記錄偵聽器一起使用,第二個與批處理偵聽器一起使用。 為偵聽器類型調用錯誤的方法將引發 IllegalStateException。在此之前他是這樣:
public interface Acknowledgment {void acknowledge();}
- 如果要提交部分批次,使用 nack()。
- 使用事務時,將 AckMode 設置為 MANUAL;
- 調用 nack() 會將成功處理的記錄的偏移量發送到事務。
- nack() 只能在調用偵聽器的消費者線程上調用。
- 當調用 nack() 時,將提交所有掛起的偏移量,丟棄上次輪詢的剩余記錄,并在其分區上執行查找,以便在下一次輪詢時重新傳遞失敗的記錄和未處理的記錄( )。
- 通過設置 sleep 參數,消費者線程可以在重新交付之前暫停。 這與在容器配置了 SeekToCurrentErrorHandler 時拋出異常的功能類似。
當通過組管理使用分區分配時,確保 sleep 參數(加上處理先前輪詢的記錄所花費的時間)小于使用者 max.poll.interval.ms屬性,這個非常重要。
四、監聽器容器自動啟動
偵聽器容器實現 SmartLifecycle,并且 autoStartup 默認為 true。 容器在后期啟動 (Integer.MAX-VALUE - 100)。 實現 SmartLifecycle 來處理來自偵聽器的數據的其他組件應在早期階段啟動。 -100 為后續階段留出了空間,使組件能夠在容器之后自動啟動。