提供了兩個MessageListenerContainer實現:
KafkaMessageListenerContainer
ConcurrentMessageListener容器
KafkaMessageListenerContainer在單個線程上接收來自所有主題或分區的所有消息。ConcurrentMessageListenerContainer委托給一個或多個KafkaMessageListenerCcontainer實例,以提供多線程消費。
從2.2.7版本開始,您可以將RecordInterceptor添加到偵聽器容器中;它將在調用偵聽器之前被調用,允許檢查或修改記錄。如果攔截器返回null,則不調用偵聽器。從2.7版本開始,它具有在偵聽器退出后調用的其他方法(通常,或通過拋出異常)。此外,從2.7版本開始,現在有一個BatchInterceptor,為Batch Listeners提供類似的功能。此外,ConsumerwareRecordInterceptor(和BatchInterceptor)還提供對Consumer的訪問?, ?>.例如,這可用于訪問攔截器中的消費者指標。
您不應該執行任何影響消費者在這些攔截器中的位置和/或已提交偏移量的方法;容器需要管理這些信息。
如果攔截器更改了記錄(通過創建新記錄),則主題、分區和偏移量必須保持不變,以避免記錄丟失等意外副作用。
CompositeRecordInterceptor和CompositeBatchInterceptor可用于調用多個攔截器。
默認情況下,從2.8版本開始,當使用事務時,攔截器會在事務開始之前被調用。您可以將偵聽器容器的interceptBeforeTx屬性設置為false,以便在事務開始后調用偵聽器。從2.9版本開始,這將適用于任何事務管理器,而不僅僅是KafkaAwareTransactionManagers。例如,這允許攔截器參與由容器啟動的JDBC事務。
從2.3.8、2.4.6版本開始,當并發性大于1時,ConcurrentMessageListenerContainer現在支持靜態成員資格。group.instance.id的后綴是-n,n從1開始。這與增加的session.timeout.ms一起可用于減少重新平衡事件,例如在重新啟動應用程序實例時。
Using KafkaMessageListenerContainer
以下構造函數可用:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)
它在ContainerProperties對象中接收ConsumerFactory以及有關主題和分區以及其他配置的信息。ContainerProperties具有以下構造函數:
public ContainerProperties(TopicPartitionOffset... topicPartitions)public ContainerProperties(String... topics)public ContainerProperties(Pattern topicPattern)
第一個構造函數接受TopicPartitionOffset參數數組,以顯式指示容器使用哪些分區(使用消費者assign()方法),并具有可選的初始偏移量。默認情況下,正值是絕對偏移量。默認情況下,負值相對于分區內當前最后一個偏移量。提供了一個接受額外布爾參數的TopicPartitionOffset構造函數。如果這是真的,則初始偏移(正或負)相對于該消費者的當前位置。容器啟動時應用偏移量。第二個是一個主題數組,Kafka根據group.id屬性分配分區?—?在整個組中分布分區。第三種使用正則表達式模式來選擇主題。
要將MessageListener分配給容器,可以在創建容器時使用ContainerProps.setMessageListener方法。以下示例顯示了如何執行此操作:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {...
});
DefaultKafkaConsumerFactory<Integer, String> cf =new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
請注意,在創建DefaultKafkaConsumerFactory時,使用僅接受上述屬性的構造函數意味著從配置中獲取鍵和值反序列化器類。或者,反序列化器實例可以傳遞給DefaultKafkaConsumerFactory構造函數的鍵和/或值,在這種情況下,所有消費者共享相同的實例。另一種選擇是提供Supplier<Deserializer>(從2.3版本開始),用于為每個Consumer獲取單獨的Deserializer實例:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
有關可以設置的各種屬性的更多信息,請參閱ContainerProperties的Javadoc。
自2.1.1版本起,一個名為logContainerConfig的新屬性可用。當啟用true和INFO日志記錄時,每個偵聽器容器都會寫入一條日志消息,總結其配置屬性。
默認情況下,在DEBUG日志級別執行主題偏移提交的日志記錄。從2.1.2版本開始,ContainerProperties中名為commitLogLevel的屬性允許您指定這些消息的日志級別。例如,要將日志級別更改為INFO,可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.level.INFO);。
從2.2版本開始,添加了一個名為missingTopicsFatal的新容器屬性(默認值:自2.3.4以來為false)。如果代理上不存在任何配置的主題,這將阻止容器啟動。如果容器配置為偵聽主題模式(正則表達式),則不適用。以前,容器線程在consumer.poll()方法中循環,在記錄許多消息時等待主題出現。除了日志,沒有跡象表明存在問題。
自2.8版本起,引入了新的容器屬性authExceptionRetryInterval。這會導致容器在從KafkaConsumer獲取任何AuthenticationException或AuthorizationException后重試獲取消息。例如,當配置的用戶被拒絕讀取某個主題或憑據不正確時,就會發生這種情況。定義authExceptionRetryInterval允許容器在授予適當權限時恢復。
默認情況下,不配置間隔-身份驗證和授權錯誤被認為是致命的,這會導致容器停止。
從2.8版本開始,在創建消費者工廠時,如果您將反序列化器作為對象提供(在構造函數中或通過setter),工廠將調用configure()方法,使用配置屬性對其進行配置。
Using ConcurrentMessageListenerContainer
單個構造函數類似于KafkaListenerContainer構造函數。以下列表顯示了構造函數的簽名:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,ContainerProperties containerProperties)
它還具有并發屬性。例如,container.setCourrency(3)創建了三個KafkaMessageListenerContainer實例。
如果容器屬性是為主題(或主題模式)配置的,Kafka將使用其組管理功能在消費者之間分配分區。
在聽多個主題時,默認的分區分布可能不是您所期望的。例如,如果你有三個主題,每個主題有五個分區,并且你想使用并發=15,你只會看到五個活動的消費者,每個消費者從每個主題分配一個分區,其他10個消費者處于空閑狀態。這是因為默認的Kafka ConsumerPartitionAssignor是RangeAssignor(請參閱其Javadoc)。對于這種情況,您可能需要考慮使用RoundRobinAssignor,它將分區分配給所有消費者。然后,為每個消費者分配一個主題或分區。要更改ConsumerPartitionAssignor,您可以在提供給DefaultKafkaConsumerFactory的屬性中設置partition.assignment.strategy消費者屬性(ConsumerConfig.partition_assignment_STRATY_CONFIG)。
使用Spring Boot時,您可以按如下方式分配和設置策略:
spring.kafka.consumer.properties.分區.分配.策略=\
org.apache.kafka.clients.consumer。RoundRobin分配器
當容器屬性配置了TopicPartitionOffset時,ConcurrentMessageListenerContainer會將TopicPartitionOffset實例分發到代理KafkaMessageListenerCcontainer實例中。
假設提供了六個TopicPartitionOffset實例,并發性為3;每個容器有兩個分區。對于五個TopicPartitionOffset實例,兩個容器獲得兩個分區,第三個容器獲得一個分區。如果并發性大于TopicPartitions的數量,則會降低并發性,使每個容器都有一個分區。
client.id屬性(如果設置)附加了-n,其中n是與并發性對應的消費者實例。這是在啟用JMX時為MBean提供唯一名稱所必需的。
從1.3版本開始,MessageListenerContainer提供了對底層KafkaConsumer指標的訪問。在ConcurrentMessageListenerContainer的情況下,metrics()方法返回所有目標KafkaMessageListenerCcontainer實例的度量。這些指標被分組到Map<MetricName?根據為底層KafkaConsumer提供的客戶端id擴展Metric>。
從2.3版本開始,ContainerProperties提供了一個idleBetweenPolls選項,讓偵聽器容器中的主循環在KafkaConsumer.poll()調用之間休眠。從提供的選項和max.poll.interval.ms消費者配置與當前記錄批處理時間之間的差值中選擇實際睡眠間隔作為最小值。
Committing Offsets
提供了幾種用于提交抵消的選項。如果enable.auto.commit消費者屬性為true,Kafka會根據其配置自動提交偏移量。如果為false,則容器支持多個AckMode設置(如下表所述)。默認的確認模式為批處理。從2.3版本開始,除非在配置中明確設置,否則框架將enable.auto.commit設置為false。以前,如果未設置屬性,則使用Kafka默認值(true)。
Consumerpoll()方法返回一個或多個ConsumerRecords。每條記錄都會調用MessageListener。以下列表描述了容器對每個AckMode(未使用事務時)采取的操作:
RECORD:在監聽器處理記錄后返回時提交偏移量。
批處理:當poll()返回的所有記錄都已處理完畢時,提交偏移量。
TIME:在poll()返回的所有記錄都已處理完畢時提交偏移量,只要超過了自上次提交以來的ackTime。
COUNT:在poll()返回的所有記錄都已處理完畢時提交偏移量,只要自上次提交以來已收到ackCount記錄即可。
COUNT_TIME:類似于TIME和COUNT,但如果任一條件為真,則執行提交。
手冊:消息監聽器負責確認()確認。之后,應用與BATCH相同的語義。
MANUAL_IMMEDIATE:當偵聽器調用Acknowledgment.reacknowe()方法時,立即提交偏移量。
使用事務時,偏移量被發送到事務,語義等效于RECORD或BATCH,具體取決于偵聽器類型(記錄或批處理)。
MANUAL和MANUAL_IMMEDIATE要求偵聽器是AcknowledgingMessageListener或BatchAcknowledingMessageListener。請參閱消息偵聽器。
根據syncCommits容器屬性,使用消費者上的commitSync()或commitSync(()方法。syncCommits默認為true;另請參見setSyncCommitTimeout。查看setCommitCallback以獲取異步提交的結果;默認回調是LoggingCommitCallback,它記錄錯誤(以及調試級別的成功)。
因為監聽器容器有自己的偏移提交機制,所以它更喜歡Kafka ConsumerConfig。ENABLE_AUTO_COMIT_CONFIG為false。從2.3版本開始,它無條件地將其設置為false,除非在消費者工廠中特別設置或容器的消費者屬性重寫。
確認書有以下方法:
public interface Acknowledgment {void acknowledge();}
此方法使偵聽器可以控制何時提交偏移量。
從2.3版本開始,Acknowledgment接口有兩個額外的方法nack(長睡眠)和nack(int index,長睡眠)。第一個用于記錄偵聽器,第二個用于批處理偵聽器。為您的偵聽器類型調用錯誤的方法將引發IllegalStateException。
如果你想使用nack()提交部分批處理,在使用事務時,將AckMode設置為MANUAL;調用nack()將成功處理的記錄的偏移量發送給事務。
nack()只能在調用監聽器的消費者線程上調用。
使用無序提交時不允許使用nack()。
使用記錄偵聽器,當調用nack()時,任何掛起的偏移都會被提交,最后一次輪詢的剩余記錄會被丟棄,并在它們的分區上執行查找,以便在下一次輪詢()時重新傳遞失敗的記錄和未處理的記錄。通過設置sleep參數,可以在重新交付之前暫停消費者。這與在容器配置了DefaultErrorHandler時拋出異常的功能類似。
nack()在指定的睡眠持續時間內暫停整個偵聽器,包括所有分配的分區。
使用批偵聽器時,可以指定發生故障的批中的索引。當調用nack()時,在對失敗和丟棄的記錄的分區執行索引和查找之前,將為記錄提交偏移量,以便在下一次poll()時重新傳遞它們。
有關更多信息,請參閱容器錯誤處理程序。
消費者在睡眠期間暫停,以便我們繼續輪詢經紀人以保持消費者的活力。實際睡眠時間及其分辨率取決于容器的pollTimeout,默認值為5秒。最小睡眠時間等于pollTimeout,所有睡眠時間都是它的倍數。對于較小的睡眠時間,或者為了提高其準確性,可以考慮減少容器的pollTimetime。
從3.0.10版本開始,批處理監聽器可以使用Acknowledgment參數上的confirm(index)來提交批處理部分的偏移量。調用此方法時,將提交索引處記錄的偏移量(以及所有以前的記錄)。在執行部分批處理提交后調用confirmate()將提交批處理剩余部分的偏移量。以下限制適用:
確認模式。需要立即手動
必須在偵聽器線程上調用該方法
偵聽器必須使用List而不是原始ConsumerRecords
索引必須在列表元素的范圍內
索引必須大于前一次調用中使用的索引
這些限制被強制執行,該方法將根據違規情況拋出IllegalArgumentException或IllegalStateException。
Listener Container Auto Startup
偵聽器容器實現SmartLifecycle,默認情況下autoStartup為true。容器在后期階段(Integer.MAX-VALUE-100)啟動。實現SmartLifecycle以處理來自偵聽器的數據的其他組件應在早期階段啟動。-100為后續階段留出了空間,使組件能夠在容器后自動啟動。