本節介紹如何發送消息。
Using KafkaTemplate
本節介紹如何使用KafkaTemplate發送消息。
Overview
KafkaTemplate封裝了一個生產者,并提供了向Kafka主題發送數據的便利方法。以下列表顯示了KafkaTemplate的相關方法:
CompletableFuture<SendResult<K, V>> sendDefault(V data);CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);CompletableFuture<SendResult<K, V>> send(String topic, V data);CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);CompletableFuture<SendResult<K, V>> send(Message<?> message);Map<MetricName, ? extends Metric> metrics();List<PartitionInfo> partitionsFor(String topic);<T> T execute(ProducerCallback<K, V, T> callback);<T> T executeInTransaction(OperationsCallback<K, V, T> callback);// Flush the producer.
void flush();interface ProducerCallback<K, V, T> {T doInKafka(Producer<K, V> producer);}interface OperationsCallback<K, V, T> {T doInOperations(KafkaOperations<K, V> operations);}
有關更多詳細信息,請參閱Javadoc。
sendDefault API要求已向模板提供默認主題。
API將時間戳作為參數,并將此時間戳存儲在記錄中。如何存儲用戶提供的時間戳取決于Kafka主題上配置的時間戳類型。如果主題配置為使用CREATE_TIME,則記錄用戶指定的時間戳(如果未指定,則生成)。如果主題配置為使用LOG_APPEND_TIME,則用戶指定的時間戳將被忽略,代理將添加本地代理時間。
度量和分區For方法委托給底層Producer上的相同方法。execute方法提供對底層Producer的直接訪問。
要使用該模板,您可以配置生產者工廠并在模板的構造函數中提供它。以下示例顯示了如何執行此操作:
@Bean
public ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());
}@Bean
public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// See https://kafka.apache.org/documentation/#producerconfigs for more propertiesreturn props;
}@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {return new KafkaTemplate<Integer, String>(producerFactory());
}
從2.5版本開始,您現在可以覆蓋工廠的ProducerConfig屬性,從同一工廠創建具有不同生產者配置的模板。
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {return new KafkaTemplate<>(pf);
}@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {return new KafkaTemplate<>(pf,Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
請注意,ProducerFactory類型的bean<?,?>(如Spring Boot自動配置的)可以用不同的狹義泛型類型引用。
您還可以使用標準的<bean/>定義來配置模板。
然后,要使用該模板,您可以調用它的一個方法。
當您將這些方法用于Message<?>參數,主題、分區、密鑰和時間戳信息在消息頭中提供,消息頭包括以下項目:
卡夫卡標頭。主題
卡夫卡標頭。隔板
卡夫卡標頭。鑰匙
卡夫卡標頭。時間戳
消息有效載荷是數據。
您可以選擇使用ProducerListener配置KafkaTemplate,以獲得包含發送結果(成功或失敗)的異步回調,而不是等待Future完成。以下清單顯示了ProducerListener接口的定義:
public interface ProducerListener<K, V> {default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {}default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {}}
默認情況下,模板配置了LoggingProductListener,它記錄錯誤,在發送成功時不做任何事情。
為了方便起見,如果您只想實現其中一個方法,則提供默認方法實現。
請注意,send方法返回一個CompletableFuture<SendResult>。您可以向偵聽器注冊一個回調,以異步接收發送的結果。以下示例顯示了如何執行此操作:
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {...
});
SendResult有兩個屬性,ProducerRecord和RecordMetadata。有關這些對象的信息,請參閱Kafka API文檔。
Throwable可以轉換為KafkaProducerException;其producerRecord屬性包含失敗的記錄。
如果你想阻止發送線程等待結果,你可以調用future的get()方法;建議使用帶超時的方法。如果您設置了linger.ms,您可能希望在等待之前調用flush(),或者為了方便起見,模板有一個帶有autoFlush參數的構造函數,該參數使模板在每次發送時都會刷新()。只有當您設置了linger.ms生產者屬性并希望立即發送部分批次時,才需要刷新。
Examples
本節展示了向Kafka發送消息的示例:
public void sendToKafka(final MyOutputData data) {final ProducerRecord<String, String> record = createRecord(data);CompletableFuture<SendResult<String, String>> future = template.send(record);future.whenComplete((result, ex) -> {if (ex == null) {handleSuccess(data);}else {handleFailure(data, record, ex);}});
}
public void sendToKafka(final MyOutputData data) {final ProducerRecord<String, String> record = createRecord(data);try {template.send(record).get(10, TimeUnit.SECONDS);handleSuccess(data);}catch (ExecutionException e) {handleFailure(data, record, e.getCause());}catch (TimeoutException | InterruptedException e) {handleFailure(data, record, e);}
}
請注意,ExecutionException的原因是具有producerRecord屬性的KafkaProducerException。
Using RoutingKafkaTemplate
從2.5版本開始,您可以使用RoutingKafkaTemplate在運行時根據目標主題名稱選擇生產者。
路由模板不支持事務、執行、刷新或度量操作,因為這些操作的主題未知。
該模板需要java.util.regex的映射。ProducerFactory<Object,Object>實例的模式。這個映射應該是有序的(例如LinkedHashMap),因為它是按順序遍歷的;您應該在開始時添加更具體的模式。
以下簡單的Spring Boot應用程序提供了一個示例,說明如何使用相同的模板發送到不同的主題,每個主題都使用不同的值序列化器。
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}@Beanpublic RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,ProducerFactory<Object, Object> pf) {// Clone the PF with a different Serializer, register with Spring for shutdownMap<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();map.put(Pattern.compile("two"), bytesPF);map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializerreturn new RoutingKafkaTemplate(map);}@Beanpublic ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {return args -> {routingTemplate.send("one", "thing1");routingTemplate.send("two", "thing2".getBytes());};}}
此示例的相應@KafkaListeners顯示在注釋屬性中。
有關實現類似結果的另一種技術,但具有向同一主題發送不同類型的附加功能,請參閱委派序列化程序和解序列化程序。
Using DefaultKafkaProducerFactory
如使用KafkaTemplate中所示,ProducerFactory用于創建生產者。
當不使用事務時,默認情況下,DefaultKafkaProducerFactory會創建一個供所有客戶端使用的單例生產者,如KafkaProducierJavaDocs中所建議的。但是,如果在模板上調用flush(),這可能會導致使用同一生成器的其他線程延遲。從2.3版本開始,DefaultKafkaProducerFactory有一個新的屬性producerPerThread。當設置為true時,工廠將為每個線程創建(并緩存)一個單獨的生產者,以避免此問題。
當producerPerThread為true時,當不再需要生產者時,用戶代碼必須在工廠上調用closeThreadBoundProducer()。這將物理關閉生產者并將其從ThreadLocal中刪除。調用reset()或destroy()不會清理這些生產者。
另請參閱KafkaTemplate事務性和非事務性發布。
創建DefaultKafkaProducerFactory時,可以通過調用僅接受屬性映射的構造函數從配置中獲取鍵和/或值序列化器類(請參閱Using KafkaTemplate中的示例),或者可以將序列化器實例傳遞給DefaultKafkaProducerFactory構造函數(在這種情況下,所有生產者共享相同的實例)。或者,您可以提供供應商<Serializer>(從2.3版本開始),用于為每個生產商獲取單獨的序列化程序實例:
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
從2.5.10版本開始,您現在可以在創建工廠后更新生產者屬性。例如,如果您必須在憑據更改后更新SSL密鑰/信任存儲位置,這可能很有用。這些更改不會影響現有的生產者實例;調用reset()關閉任何現有的生產者,以便使用新屬性創建新的生產者。
您不能將事務生產者工廠更改為非事務生產者工廠,反之亦然。
現在提供了兩種新方法:
void updateConfigs(Map<String, Object> updates);void removeConfig(String configKey);
從2.8版本開始,如果你提供序列化器作為對象(在構造函數中或通過setter),工廠將調用configure()方法來配置它們的配置屬性。
Using ReplyingKafkaTemplate
2.1.3版本引入了KafkaTemplate的一個子類來提供請求/回復語義。該類名為ReplyingKafkaTemplate,有兩個附加方法;下面顯示了方法簽名:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,Duration replyTimeout);
(另請參閱帶消息的請求/回復)。
結果是一個CompletableFuture,它異步填充了結果(或超時的異常)。結果還有一個sendFuture屬性,這是調用KafkaTemplate.send()的結果。您可以使用此未來來確定發送操作的結果。
如果使用第一種方法,或者replyTimeout參數為null,則使用模板的defaultReplyTimeout屬性(默認為5秒)。
從2.8.8版本開始,模板有一個新方法waitForAssignment。如果回復容器配置了auto.coffset.reset=latest,這將非常有用,以避免在容器初始化之前發送請求和回復。
使用手動分區分配(無組管理)時,等待時間必須大于容器的pollTimeout屬性,因為在第一次輪詢完成之前不會發送通知。
以下Spring Boot應用程序顯示了如何使用該功能的示例:
@SpringBootApplication
public class KRequestingApplication {public static void main(String[] args) {SpringApplication.run(KRequestingApplication.class, args).close();}@Beanpublic ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {return args -> {if (!template.waitForAssignment(Duration.ofSeconds(10))) {throw new IllegalStateException("Reply container did not initialize");}ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);System.out.println("Sent ok: " + sendResult.getRecordMetadata());ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);System.out.println("Return value: " + consumerRecord.value());};}@Beanpublic ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf,ConcurrentMessageListenerContainer<String, String> repliesContainer) {return new ReplyingKafkaTemplate<>(pf, repliesContainer);}@Beanpublic ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {ConcurrentMessageListenerContainer<String, String> repliesContainer =containerFactory.createContainer("kReplies");repliesContainer.getContainerProperties().setGroupId("repliesGroup");repliesContainer.setAutoStartup(false);return repliesContainer;}@Beanpublic NewTopic kRequests() {return TopicBuilder.name("kRequests").partitions(10).replicas(2).build();}@Beanpublic NewTopic kReplies() {return TopicBuilder.name("kReplies").partitions(10).replicas(2).build();}}
請注意,我們可以使用Boot的自動配置容器工廠來創建回復容器。
如果將非平凡的反序列化器用于回復,請考慮使用ErrorHandlingDeserializer,該反序列化器委托給您配置的反序列化程序。如此配置后,RequestReplyFuture將異常完成,您可以捕獲ExecutionException,并在其cause屬性中包含反序列化Exception。
從2.6.7版本開始,除了檢測反序列化異常外,模板還將調用replyErrorChecker函數(如果提供)。如果它返回異常,則未來將異常完成。
以下是一個示例:
template.setReplyErrorChecker(record -> {Header error = record.headers().lastHeader("serverSentAnError");if (error != null) {return new MyException(new String(error.value()));}else {return null;}
});...RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {future.getSendFuture().get(10, TimeUnit.SECONDS); // send okConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);...
}
catch (InterruptedException e) {...
}
catch (ExecutionException e) {if (e.getCause() instanceof MyException) {...}
}
catch (TimeoutException e) {...
}
模板設置了一個標頭(默認情況下名為KafkaHeaders.CORRELATION_ID),該標頭必須由服務器端回顯。
在這種情況下,以下@KafkaListener應用程序會響應:
@SpringBootApplication
public class KReplyingApplication {public static void main(String[] args) {SpringApplication.run(KReplyingApplication.class, args);}@KafkaListener(id="server", topics = "kRequests")@SendTo // use default replyTo expressionpublic String listen(String in) {System.out.println("Server received: " + in);return in.toUpperCase();}@Beanpublic NewTopic kRequests() {return TopicBuilder.name("kRequests").partitions(10).replicas(2).build();}@Bean // not required if Jackson is on the classpathpublic MessagingMessageConverter simpleMapperConverter() {MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());return messagingMessageConverter;}}
@KafkaListener基礎結構響應關聯ID并確定回復主題。
有關發送回復的更多信息,請參閱使用@SendTo轉發偵聽器結果。該模板使用默認標頭KafKaHeaders。REPLY_TOPIC表示回復所針對的主題。
從2.2版本開始,模板嘗試從配置的回復容器中檢測回復主題或分區。如果容器配置為監聽單個主題或單個TopicPartitionOffset,則用于設置回復標頭。如果容器配置為其他方式,則用戶必須設置回復標頭。在這種情況下,初始化期間會寫入INFO日志消息。以下示例使用KafkaHeaders。REPLY_TOPIC:
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
當您使用單個回復TopicPartitionOffset進行配置時,您可以對多個模板使用相同的回復主題,只要每個實例都在不同的分區上偵聽。當配置單個回復主題時,每個實例必須使用不同的group.id。在這種情況下,所有實例都會收到每個回復,但只有發送請求的實例才能找到相關id。這可能有助于自動擴展,但會產生額外的網絡流量開銷,丟棄每個不需要的回復的成本很低。使用此設置時,我們建議您將模板的sharedReplyTopic設置為true,這會降低對DEBUG的意外回復的日志記錄級別,而不是默認的ERROR。
以下是一個配置回復容器以使用相同共享回復主題的示例:
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // uniqueProperties props = new Properties();props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old repliescontainer.getContainerProperties().setKafkaConsumerProperties(props);return container;
}
如果您有多個客戶端實例,并且沒有如前一段所述對其進行配置,則每個實例都需要一個專門的回復主題。另一種方法是設置KafkaHeaders。REPLY_PARTITION并為每個實例使用專用分區。Header包含一個四字節的int(大端序)。服務器必須使用此標頭將回復路由到正確的分區(@KafkaListener執行此操作)。然而,在這種情況下,回復容器不得使用Kafka的組管理功能,并且必須配置為在固定分區上偵聽(通過在其ContainerProperties構造函數中使用TopicPartitionOffset)。
DefaultKafkaHeaderMapper要求Jackson在類路徑上(對于@KafkaListener)。如果它不可用,則消息轉換器沒有標頭映射器,因此您必須使用SimpleKafkaHeaderMapper配置MessagingMessageConverter,如前所示。
默認情況下,使用3個標頭:
卡夫卡標頭。CORRELATION_ID-用于將回復與請求相關聯
卡夫卡標頭。REPLY_TOPIC-用于告訴服務器在哪里回復
卡夫卡標頭。REPLY_PARTITION-(可選)用于告訴服務器要回復哪個分區
@KafkaListener基礎結構使用這些標頭名稱來路由回復。
從2.3版本開始,您可以自定義標頭名稱-該模板有3個屬性,分別是HeaderName、replyTopicHeaderName和replyPartitionHeaderName。如果您的服務器不是Spring應用程序(或不使用@KafkaListener),這很有用。
相反,如果請求的應用程序不是spring應用程序,并且將相關性信息放在不同的標頭中,從3.0版本開始,您可以在偵聽器容器工廠上配置自定義correlationHeaderName,該標頭將被回顯。以前,偵聽器必須回顯自定義相關性標頭。
Request/Reply with Message<?>s
2.7版本在ReplyingKafkaTemplate中添加了發送和接收春季消息的方法?>抽象:
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,ParameterizedTypeReference<P> returnType);
這些將使用模板的默認replyTimeout,也有重載版本可以在方法調用中超時。
如果消費者的反序列化器或模板的MessageConverter可以通過配置或回復消息中的類型元數據轉換有效載荷,而無需任何額外信息,則使用第一種方法。
如果需要為返回類型提供類型信息,請使用第二種方法來幫助消息轉換器。這也允許同一模板接收不同的類型,即使回復中沒有類型元數據,例如當服務器端不是Spring應用程序時。以下是后者的一個例子:
@Bean
ReplyingKafkaTemplate<String, String, String> template(ProducerFactory<String, String> pf,ConcurrentKafkaListenerContainerFactory<String, String> factory) {ConcurrentMessageListenerContainer<String, String> replyContainer =factory.createContainer("replies");replyContainer.getContainerProperties().setGroupId("request.replies");ReplyingKafkaTemplate<String, String, String> template =new ReplyingKafkaTemplate<>(pf, replyContainer);template.setMessageConverter(new ByteArrayJsonMessageConverter());template.setDefaultTopic("requests");return template;
}
RequestReplyTypedMessageFuture<String, String, Thing> future1 =template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
Reply Type Message<?>
當@KafkaListener返回消息時<?>對于2.5之前的版本,有必要填充回復主題和相關性id標頭。在這個例子中,我們使用請求中的回復主題頭:
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {return MessageBuilder.withPayload(in.toUpperCase()).setHeader(KafkaHeaders.TOPIC, replyTo).setHeader(KafkaHeaders.KEY, 42).setHeader(KafkaHeaders.CORRELATION_ID, correlation).build();
}
這也顯示了如何在回復記錄上設置密鑰。
從2.5版本開始,框架將檢測這些標頭是否丟失,并用主題填充它們——要么是根據@SendTo值確定的主題,要么是傳入的KafkaHeaders。REPLY_TOPIC標頭(如果存在)。它還將對傳入的KafkaHeaders進行回聲處理。CORRELATION_ID和KafkaHeaders。REPLY_PARTITION(如果存在)。
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {return MessageBuilder.withPayload(in.toUpperCase()).setHeader(KafkaHeaders.KEY, 42).build();
}
Original Record Key in Reply
從3.3版本開始,傳入請求中的Kafka記錄鍵(如果存在)將保留在回復記錄中。這僅適用于單記錄請求/回復場景。當偵聽器是批處理或返回類型是集合時,由應用程序通過將回復記錄包裝在消息類型中來指定要使用的鍵。
Aggregating Multiple Replies
Using ReplyingKafkaTemplate中的模板僅適用于單個請求/回復場景。對于單個消息的多個接收者返回回復的情況,您可以使用AggregatingReplyingKafka template。這是Scatter-Garger企業集成模式客戶端的實現。
與ReplyingKafkaTemplate一樣,AggregatingReplyingKafka Template構造函數需要一個生產者工廠和一個監聽器容器來接收回復;它有第三個參數BiPredicate<List<ConsumerRecord<K,R>,Boolean>releaseStrategy,每次收到回復時都會參考該參數;當謂詞返回true時,ConsumerRecords的集合用于完成sendAndReceive方法返回的Future。
還有一個額外的屬性returnPartialOnTimeout(默認值為false)。當此設置為true時,不使用KafkaReplyTimeoutException完成未來,而是使用部分結果正常完成未來(只要至少收到一條回復記錄)。
從2.3.5版本開始,謂詞也會在超時后調用(如果returnPartialOnTimeout為true)。第一個參數是當前記錄列表;如果此調用是由于超時引起的,則第二個為真。謂詞可以修改記錄列表。
AggregatingReplyingKafkaTemplate<Integer, String, String> template =new AggregatingReplyingKafkaTemplate<>(producerFactory, container,coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =future.get(30, TimeUnit.SECONDS);
請注意,返回類型是ConsumerRecord,其值是ConsumerRecords的集合。“外部”ConsumerRecord不是“真實”記錄,它是由模板合成的,作為請求收到的實際回復記錄的持有者。當正常發布發生時(發布策略返回true),主題設置為aggregatedResults;如果returnPartialOnTimeout為true,并且發生超時(并且至少收到一條回復記錄),則主題設置為partialResultsAfterTimeout。該模板為這些“主題”名稱提供了恒定的靜態變量:
/*** Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated* results in its value after a normal release by the release strategy.*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";/*** Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated* results in its value after a timeout.*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
集合中的真實ConsumerRecords包含收到回復的實際主題。
答復的偵聽器容器必須配置為AckMode。手動或確認模式。手動即時;消費者屬性enable.auto.commit必須為false(自2.3版本以來的默認值)。為了避免丟失消息的任何可能性,模板僅在有零個未完成的請求時提交偏移量,即當發布策略釋放最后一個未完成請求時。重新平衡后,可能會出現重復的回復交付;對于任何飛行中的請求,這些都將被忽略;當收到已發布回復的重復回復時,您可能會看到錯誤日志消息。
如果將ErrorHandlingDeserializer與此聚合模板一起使用,則框架將不會自動檢測到反序列化異常。相反,記錄(具有null值)將原封不動地返回,頭中有反序列化異常。建議應用程序調用實用程序方法ReplyingKafkaTemplate.checkDeserialize()方法,以確定是否發生了反序列化異常。有關更多信息,請參閱其JavaDocs。此聚合模板也不需要replyErrorChecker;您應該對回復的每個元素進行檢查。