4、Sending Messages

本節介紹如何發送消息。

Using KafkaTemplate

本節介紹如何使用KafkaTemplate發送消息。

Overview

KafkaTemplate封裝了一個生產者,并提供了向Kafka主題發送數據的便利方法。以下列表顯示了KafkaTemplate的相關方法:

CompletableFuture<SendResult<K, V>> sendDefault(V data);CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);CompletableFuture<SendResult<K, V>> send(String topic, V data);CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);CompletableFuture<SendResult<K, V>> send(Message<?> message);Map<MetricName, ? extends Metric> metrics();List<PartitionInfo> partitionsFor(String topic);<T> T execute(ProducerCallback<K, V, T> callback);<T> T executeInTransaction(OperationsCallback<K, V, T> callback);// Flush the producer.
void flush();interface ProducerCallback<K, V, T> {T doInKafka(Producer<K, V> producer);}interface OperationsCallback<K, V, T> {T doInOperations(KafkaOperations<K, V> operations);}

有關更多詳細信息,請參閱Javadoc。

sendDefault API要求已向模板提供默認主題。

API將時間戳作為參數,并將此時間戳存儲在記錄中。如何存儲用戶提供的時間戳取決于Kafka主題上配置的時間戳類型。如果主題配置為使用CREATE_TIME,則記錄用戶指定的時間戳(如果未指定,則生成)。如果主題配置為使用LOG_APPEND_TIME,則用戶指定的時間戳將被忽略,代理將添加本地代理時間。

度量和分區For方法委托給底層Producer上的相同方法。execute方法提供對底層Producer的直接訪問。

要使用該模板,您可以配置生產者工廠并在模板的構造函數中提供它。以下示例顯示了如何執行此操作:

@Bean
public ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());
}@Bean
public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// See https://kafka.apache.org/documentation/#producerconfigs for more propertiesreturn props;
}@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {return new KafkaTemplate<Integer, String>(producerFactory());
}

從2.5版本開始,您現在可以覆蓋工廠的ProducerConfig屬性,從同一工廠創建具有不同生產者配置的模板。

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {return new KafkaTemplate<>(pf);
}@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {return new KafkaTemplate<>(pf,Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

請注意,ProducerFactory類型的bean<?,?>(如Spring Boot自動配置的)可以用不同的狹義泛型類型引用。

您還可以使用標準的<bean/>定義來配置模板。

然后,要使用該模板,您可以調用它的一個方法。

當您將這些方法用于Message<?>參數,主題、分區、密鑰和時間戳信息在消息頭中提供,消息頭包括以下項目:

卡夫卡標頭。主題

卡夫卡標頭。隔板

卡夫卡標頭。鑰匙

卡夫卡標頭。時間戳

消息有效載荷是數據。

您可以選擇使用ProducerListener配置KafkaTemplate,以獲得包含發送結果(成功或失敗)的異步回調,而不是等待Future完成。以下清單顯示了ProducerListener接口的定義:

public interface ProducerListener<K, V> {default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {}default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {}}

默認情況下,模板配置了LoggingProductListener,它記錄錯誤,在發送成功時不做任何事情。

為了方便起見,如果您只想實現其中一個方法,則提供默認方法實現。

請注意,send方法返回一個CompletableFuture<SendResult>。您可以向偵聽器注冊一個回調,以異步接收發送的結果。以下示例顯示了如何執行此操作:

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {...
});

SendResult有兩個屬性,ProducerRecord和RecordMetadata。有關這些對象的信息,請參閱Kafka API文檔。

Throwable可以轉換為KafkaProducerException;其producerRecord屬性包含失敗的記錄。

如果你想阻止發送線程等待結果,你可以調用future的get()方法;建議使用帶超時的方法。如果您設置了linger.ms,您可能希望在等待之前調用flush(),或者為了方便起見,模板有一個帶有autoFlush參數的構造函數,該參數使模板在每次發送時都會刷新()。只有當您設置了linger.ms生產者屬性并希望立即發送部分批次時,才需要刷新。

Examples

本節展示了向Kafka發送消息的示例:

public void sendToKafka(final MyOutputData data) {final ProducerRecord<String, String> record = createRecord(data);CompletableFuture<SendResult<String, String>> future = template.send(record);future.whenComplete((result, ex) -> {if (ex == null) {handleSuccess(data);}else {handleFailure(data, record, ex);}});
}
public void sendToKafka(final MyOutputData data) {final ProducerRecord<String, String> record = createRecord(data);try {template.send(record).get(10, TimeUnit.SECONDS);handleSuccess(data);}catch (ExecutionException e) {handleFailure(data, record, e.getCause());}catch (TimeoutException | InterruptedException e) {handleFailure(data, record, e);}
}

請注意,ExecutionException的原因是具有producerRecord屬性的KafkaProducerException。

Using RoutingKafkaTemplate

從2.5版本開始,您可以使用RoutingKafkaTemplate在運行時根據目標主題名稱選擇生產者。

路由模板不支持事務、執行、刷新或度量操作,因為這些操作的主題未知。

該模板需要java.util.regex的映射。ProducerFactory<Object,Object>實例的模式。這個映射應該是有序的(例如LinkedHashMap),因為它是按順序遍歷的;您應該在開始時添加更具體的模式。

以下簡單的Spring Boot應用程序提供了一個示例,說明如何使用相同的模板發送到不同的主題,每個主題都使用不同的值序列化器。

@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}@Beanpublic RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,ProducerFactory<Object, Object> pf) {// Clone the PF with a different Serializer, register with Spring for shutdownMap<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();map.put(Pattern.compile("two"), bytesPF);map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializerreturn new RoutingKafkaTemplate(map);}@Beanpublic ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {return args -> {routingTemplate.send("one", "thing1");routingTemplate.send("two", "thing2".getBytes());};}}

此示例的相應@KafkaListeners顯示在注釋屬性中。

有關實現類似結果的另一種技術,但具有向同一主題發送不同類型的附加功能,請參閱委派序列化程序和解序列化程序。

Using DefaultKafkaProducerFactory

如使用KafkaTemplate中所示,ProducerFactory用于創建生產者。

當不使用事務時,默認情況下,DefaultKafkaProducerFactory會創建一個供所有客戶端使用的單例生產者,如KafkaProducierJavaDocs中所建議的。但是,如果在模板上調用flush(),這可能會導致使用同一生成器的其他線程延遲。從2.3版本開始,DefaultKafkaProducerFactory有一個新的屬性producerPerThread。當設置為true時,工廠將為每個線程創建(并緩存)一個單獨的生產者,以避免此問題。

當producerPerThread為true時,當不再需要生產者時,用戶代碼必須在工廠上調用closeThreadBoundProducer()。這將物理關閉生產者并將其從ThreadLocal中刪除。調用reset()或destroy()不會清理這些生產者。

另請參閱KafkaTemplate事務性和非事務性發布。

創建DefaultKafkaProducerFactory時,可以通過調用僅接受屬性映射的構造函數從配置中獲取鍵和/或值序列化器類(請參閱Using KafkaTemplate中的示例),或者可以將序列化器實例傳遞給DefaultKafkaProducerFactory構造函數(在這種情況下,所有生產者共享相同的實例)。或者,您可以提供供應商<Serializer>(從2.3版本開始),用于為每個生產商獲取單獨的序列化程序實例:

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

從2.5.10版本開始,您現在可以在創建工廠后更新生產者屬性。例如,如果您必須在憑據更改后更新SSL密鑰/信任存儲位置,這可能很有用。這些更改不會影響現有的生產者實例;調用reset()關閉任何現有的生產者,以便使用新屬性創建新的生產者。

您不能將事務生產者工廠更改為非事務生產者工廠,反之亦然。

現在提供了兩種新方法:

void updateConfigs(Map<String, Object> updates);void removeConfig(String configKey);

從2.8版本開始,如果你提供序列化器作為對象(在構造函數中或通過setter),工廠將調用configure()方法來配置它們的配置屬性。

Using ReplyingKafkaTemplate

2.1.3版本引入了KafkaTemplate的一個子類來提供請求/回復語義。該類名為ReplyingKafkaTemplate,有兩個附加方法;下面顯示了方法簽名:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,Duration replyTimeout);

(另請參閱帶消息的請求/回復)。

結果是一個CompletableFuture,它異步填充了結果(或超時的異常)。結果還有一個sendFuture屬性,這是調用KafkaTemplate.send()的結果。您可以使用此未來來確定發送操作的結果。

如果使用第一種方法,或者replyTimeout參數為null,則使用模板的defaultReplyTimeout屬性(默認為5秒)。

從2.8.8版本開始,模板有一個新方法waitForAssignment。如果回復容器配置了auto.coffset.reset=latest,這將非常有用,以避免在容器初始化之前發送請求和回復。

使用手動分區分配(無組管理)時,等待時間必須大于容器的pollTimeout屬性,因為在第一次輪詢完成之前不會發送通知。

以下Spring Boot應用程序顯示了如何使用該功能的示例:

@SpringBootApplication
public class KRequestingApplication {public static void main(String[] args) {SpringApplication.run(KRequestingApplication.class, args).close();}@Beanpublic ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {return args -> {if (!template.waitForAssignment(Duration.ofSeconds(10))) {throw new IllegalStateException("Reply container did not initialize");}ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);System.out.println("Sent ok: " + sendResult.getRecordMetadata());ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);System.out.println("Return value: " + consumerRecord.value());};}@Beanpublic ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf,ConcurrentMessageListenerContainer<String, String> repliesContainer) {return new ReplyingKafkaTemplate<>(pf, repliesContainer);}@Beanpublic ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {ConcurrentMessageListenerContainer<String, String> repliesContainer =containerFactory.createContainer("kReplies");repliesContainer.getContainerProperties().setGroupId("repliesGroup");repliesContainer.setAutoStartup(false);return repliesContainer;}@Beanpublic NewTopic kRequests() {return TopicBuilder.name("kRequests").partitions(10).replicas(2).build();}@Beanpublic NewTopic kReplies() {return TopicBuilder.name("kReplies").partitions(10).replicas(2).build();}}

請注意,我們可以使用Boot的自動配置容器工廠來創建回復容器。

如果將非平凡的反序列化器用于回復,請考慮使用ErrorHandlingDeserializer,該反序列化器委托給您配置的反序列化程序。如此配置后,RequestReplyFuture將異常完成,您可以捕獲ExecutionException,并在其cause屬性中包含反序列化Exception。

從2.6.7版本開始,除了檢測反序列化異常外,模板還將調用replyErrorChecker函數(如果提供)。如果它返回異常,則未來將異常完成。

以下是一個示例:

template.setReplyErrorChecker(record -> {Header error = record.headers().lastHeader("serverSentAnError");if (error != null) {return new MyException(new String(error.value()));}else {return null;}
});...RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {future.getSendFuture().get(10, TimeUnit.SECONDS); // send okConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);...
}
catch (InterruptedException e) {...
}
catch (ExecutionException e) {if (e.getCause() instanceof MyException) {...}
}
catch (TimeoutException e) {...
}

模板設置了一個標頭(默認情況下名為KafkaHeaders.CORRELATION_ID),該標頭必須由服務器端回顯。

在這種情況下,以下@KafkaListener應用程序會響應:

@SpringBootApplication
public class KReplyingApplication {public static void main(String[] args) {SpringApplication.run(KReplyingApplication.class, args);}@KafkaListener(id="server", topics = "kRequests")@SendTo // use default replyTo expressionpublic String listen(String in) {System.out.println("Server received: " + in);return in.toUpperCase();}@Beanpublic NewTopic kRequests() {return TopicBuilder.name("kRequests").partitions(10).replicas(2).build();}@Bean // not required if Jackson is on the classpathpublic MessagingMessageConverter simpleMapperConverter() {MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());return messagingMessageConverter;}}

@KafkaListener基礎結構響應關聯ID并確定回復主題。

有關發送回復的更多信息,請參閱使用@SendTo轉發偵聽器結果。該模板使用默認標頭KafKaHeaders。REPLY_TOPIC表示回復所針對的主題。

從2.2版本開始,模板嘗試從配置的回復容器中檢測回復主題或分區。如果容器配置為監聽單個主題或單個TopicPartitionOffset,則用于設置回復標頭。如果容器配置為其他方式,則用戶必須設置回復標頭。在這種情況下,初始化期間會寫入INFO日志消息。以下示例使用KafkaHeaders。REPLY_TOPIC:

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

當您使用單個回復TopicPartitionOffset進行配置時,您可以對多個模板使用相同的回復主題,只要每個實例都在不同的分區上偵聽。當配置單個回復主題時,每個實例必須使用不同的group.id。在這種情況下,所有實例都會收到每個回復,但只有發送請求的實例才能找到相關id。這可能有助于自動擴展,但會產生額外的網絡流量開銷,丟棄每個不需要的回復的成本很低。使用此設置時,我們建議您將模板的sharedReplyTopic設置為true,這會降低對DEBUG的意外回復的日志記錄級別,而不是默認的ERROR。

以下是一個配置回復容器以使用相同共享回復主題的示例:

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // uniqueProperties props = new Properties();props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old repliescontainer.getContainerProperties().setKafkaConsumerProperties(props);return container;
}

如果您有多個客戶端實例,并且沒有如前一段所述對其進行配置,則每個實例都需要一個專門的回復主題。另一種方法是設置KafkaHeaders。REPLY_PARTITION并為每個實例使用專用分區。Header包含一個四字節的int(大端序)。服務器必須使用此標頭將回復路由到正確的分區(@KafkaListener執行此操作)。然而,在這種情況下,回復容器不得使用Kafka的組管理功能,并且必須配置為在固定分區上偵聽(通過在其ContainerProperties構造函數中使用TopicPartitionOffset)。

DefaultKafkaHeaderMapper要求Jackson在類路徑上(對于@KafkaListener)。如果它不可用,則消息轉換器沒有標頭映射器,因此您必須使用SimpleKafkaHeaderMapper配置MessagingMessageConverter,如前所示。

默認情況下,使用3個標頭:

卡夫卡標頭。CORRELATION_ID-用于將回復與請求相關聯

卡夫卡標頭。REPLY_TOPIC-用于告訴服務器在哪里回復

卡夫卡標頭。REPLY_PARTITION-(可選)用于告訴服務器要回復哪個分區

@KafkaListener基礎結構使用這些標頭名稱來路由回復。

從2.3版本開始,您可以自定義標頭名稱-該模板有3個屬性,分別是HeaderName、replyTopicHeaderName和replyPartitionHeaderName。如果您的服務器不是Spring應用程序(或不使用@KafkaListener),這很有用。

相反,如果請求的應用程序不是spring應用程序,并且將相關性信息放在不同的標頭中,從3.0版本開始,您可以在偵聽器容器工廠上配置自定義correlationHeaderName,該標頭將被回顯。以前,偵聽器必須回顯自定義相關性標頭。

Request/Reply with Message<?>s

2.7版本在ReplyingKafkaTemplate中添加了發送和接收春季消息的方法?>抽象:

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,ParameterizedTypeReference<P> returnType);

這些將使用模板的默認replyTimeout,也有重載版本可以在方法調用中超時。

如果消費者的反序列化器或模板的MessageConverter可以通過配置或回復消息中的類型元數據轉換有效載荷,而無需任何額外信息,則使用第一種方法。

如果需要為返回類型提供類型信息,請使用第二種方法來幫助消息轉換器。這也允許同一模板接收不同的類型,即使回復中沒有類型元數據,例如當服務器端不是Spring應用程序時。以下是后者的一個例子:

@Bean
ReplyingKafkaTemplate<String, String, String> template(ProducerFactory<String, String> pf,ConcurrentKafkaListenerContainerFactory<String, String> factory) {ConcurrentMessageListenerContainer<String, String> replyContainer =factory.createContainer("replies");replyContainer.getContainerProperties().setGroupId("request.replies");ReplyingKafkaTemplate<String, String, String> template =new ReplyingKafkaTemplate<>(pf, replyContainer);template.setMessageConverter(new ByteArrayJsonMessageConverter());template.setDefaultTopic("requests");return template;
}
RequestReplyTypedMessageFuture<String, String, Thing> future1 =template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));

Reply Type Message<?>

當@KafkaListener返回消息時<?>對于2.5之前的版本,有必要填充回復主題和相關性id標頭。在這個例子中,我們使用請求中的回復主題頭:

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {return MessageBuilder.withPayload(in.toUpperCase()).setHeader(KafkaHeaders.TOPIC, replyTo).setHeader(KafkaHeaders.KEY, 42).setHeader(KafkaHeaders.CORRELATION_ID, correlation).build();
}

這也顯示了如何在回復記錄上設置密鑰。

從2.5版本開始,框架將檢測這些標頭是否丟失,并用主題填充它們——要么是根據@SendTo值確定的主題,要么是傳入的KafkaHeaders。REPLY_TOPIC標頭(如果存在)。它還將對傳入的KafkaHeaders進行回聲處理。CORRELATION_ID和KafkaHeaders。REPLY_PARTITION(如果存在)。

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {return MessageBuilder.withPayload(in.toUpperCase()).setHeader(KafkaHeaders.KEY, 42).build();
}

Original Record Key in Reply

從3.3版本開始,傳入請求中的Kafka記錄鍵(如果存在)將保留在回復記錄中。這僅適用于單記錄請求/回復場景。當偵聽器是批處理或返回類型是集合時,由應用程序通過將回復記錄包裝在消息類型中來指定要使用的鍵。

Aggregating Multiple Replies

Using ReplyingKafkaTemplate中的模板僅適用于單個請求/回復場景。對于單個消息的多個接收者返回回復的情況,您可以使用AggregatingReplyingKafka template。這是Scatter-Garger企業集成模式客戶端的實現。

與ReplyingKafkaTemplate一樣,AggregatingReplyingKafka Template構造函數需要一個生產者工廠和一個監聽器容器來接收回復;它有第三個參數BiPredicate<List<ConsumerRecord<K,R>,Boolean>releaseStrategy,每次收到回復時都會參考該參數;當謂詞返回true時,ConsumerRecords的集合用于完成sendAndReceive方法返回的Future。

還有一個額外的屬性returnPartialOnTimeout(默認值為false)。當此設置為true時,不使用KafkaReplyTimeoutException完成未來,而是使用部分結果正常完成未來(只要至少收到一條回復記錄)。

從2.3.5版本開始,謂詞也會在超時后調用(如果returnPartialOnTimeout為true)。第一個參數是當前記錄列表;如果此調用是由于超時引起的,則第二個為真。謂詞可以修改記錄列表。

AggregatingReplyingKafkaTemplate<Integer, String, String> template =new AggregatingReplyingKafkaTemplate<>(producerFactory, container,coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =future.get(30, TimeUnit.SECONDS);

請注意,返回類型是ConsumerRecord,其值是ConsumerRecords的集合。“外部”ConsumerRecord不是“真實”記錄,它是由模板合成的,作為請求收到的實際回復記錄的持有者。當正常發布發生時(發布策略返回true),主題設置為aggregatedResults;如果returnPartialOnTimeout為true,并且發生超時(并且至少收到一條回復記錄),則主題設置為partialResultsAfterTimeout。該模板為這些“主題”名稱提供了恒定的靜態變量:

/*** Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated* results in its value after a normal release by the release strategy.*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";/*** Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated* results in its value after a timeout.*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

集合中的真實ConsumerRecords包含收到回復的實際主題。

答復的偵聽器容器必須配置為AckMode。手動或確認模式。手動即時;消費者屬性enable.auto.commit必須為false(自2.3版本以來的默認值)。為了避免丟失消息的任何可能性,模板僅在有零個未完成的請求時提交偏移量,即當發布策略釋放最后一個未完成請求時。重新平衡后,可能會出現重復的回復交付;對于任何飛行中的請求,這些都將被忽略;當收到已發布回復的重復回復時,您可能會看到錯誤日志消息。

如果將ErrorHandlingDeserializer與此聚合模板一起使用,則框架將不會自動檢測到反序列化異常。相反,記錄(具有null值)將原封不動地返回,頭中有反序列化異常。建議應用程序調用實用程序方法ReplyingKafkaTemplate.checkDeserialize()方法,以確定是否發生了反序列化異常。有關更多信息,請參閱其JavaDocs。此聚合模板也不需要replyErrorChecker;您應該對回復的每個元素進行檢查。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/88202.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/88202.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/88202.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

CSS長度單位問題

在 CSS 中&#xff0c;100px 的邏輯長度在不同分辨率的手機屏幕上是否表現一致&#xff0c;取決于 設備的像素密度&#xff08;devicePixelRatio&#xff09; 和 視口&#xff08;viewport&#xff09;的縮放設置。以下是詳細分析&#xff1a;1. 核心概念 CSS 像素&#xff08;…

基于Java+SpringBoot的圖書管理系統

源碼編號&#xff1a;S606源碼名稱&#xff1a;基于SpringBoot的圖書管理系統用戶類型&#xff1a;雙角色&#xff0c;用戶、管理員數據庫表數量&#xff1a;12 張表主要技術&#xff1a;Java、Vue、ElementUl 、SpringBoot、Maven運行環境&#xff1a;Windows/Mac、JDK1.8及以…

XTOM工業級藍光三維掃描儀用于筆記本電腦背板模具全尺寸檢測

鎂合金具有密度小、強度高、耐腐蝕性好等優點&#xff0c;成為筆記本電腦外殼主流材料。沖壓模具作為批量生產筆記本電腦鎂合金背板的核心工具&#xff0c;其精度直接決定了產品的尺寸一致性、結構可靠性與外觀品質。微米級模具誤差可能在沖壓過程中被放大至毫米級&#xff08;…

運維打鐵: MongoDB 數據庫集群搭建與管理

文章目錄思維導圖一、集群基礎概念1. 分片集群2. 副本集二、集群搭建1. 環境準備2. 配置副本集步驟 1&#xff1a;修改配置文件步驟 2&#xff1a;啟動 MongoDB 服務步驟 3&#xff1a;初始化副本集3. 配置分片集群步驟 1&#xff1a;配置配置服務器副本集步驟 2&#xff1a;啟…

HCIP-Datacom Core Technology V1.0_5 OSPF特殊區域及其他特性

在前面的章節中&#xff0c;OSPF可以劃分區域&#xff0c;減輕單區域里面LSDB的規模&#xff0c;從而減輕路由器的負荷&#xff0c;雖然OSPF能夠劃分區域&#xff0c;但是依舊需要維護域間路由和外部路由&#xff0c;這樣隨著網絡規模的不斷擴大&#xff0c;路由器所維護的LSDB…

實時開發IDE部署指南

&#x1f525;&#x1f525; AllData大數據產品是可定義數據中臺&#xff0c;以數據平臺為底座&#xff0c;以數據中臺為橋梁&#xff0c;以機器學習平臺為中層框架&#xff0c;以大模型應用為上游產品&#xff0c;提供全鏈路數字化解決方案。 ?杭州奧零數據科技官網&#xff…

深入解析 RAGFlow:文件上傳到知識庫的完整流程

在 RAGFlow 這樣的檢索增強生成&#xff08;RAG&#xff09;系統中&#xff0c;知識庫是其核心。用戶上傳的文檔如何高效、可靠地轉化為可檢索的知識&#xff0c;是系統穩定運行的關鍵。今天&#xff0c;我們就來深入探討 RAGFlow 中文件上傳到知識庫的完整流程&#xff0c;揭秘…

cad_recognition 筆記

Hubch/cad_recognition | DeepWiki https://github.com/Hubch/cad_recognition winget install python.python.3.10 python -m venv venv micromamba activate ./venv pip install paddleocr2.9.0 pip install poetry pip install moviepy1.0.3 下次要用conda建環境 或者…

基于odoo17的設計模式詳解---構建模式

大家好&#xff0c;我是你的Odoo技術伙伴。在Odoo開發中&#xff0c;創建一個簡單的記錄可能只需要一行 self.env[res.partner].create({name: New Partner})。但如果我們要創建一個復雜的對象&#xff0c;比如一個包含了特定上下文、具有多個可選配置、并且需要執行一系列關聯…

暑假算法日記第四天

目標?&#xff1a;刷完靈神專題訓練算法題單 階段目標&#x1f4cc;&#xff1a;【算法題單】滑動窗口與雙指針 LeetCode題目:2953. 統計完全子字符串1016. 子串能表示從 1 到 N 數字的二進制串其他: 今日總結 往期打卡 2953. 統計完全子字符串 跳轉: 2953. 統計完全子字符串…

Linux 常用命令大全(2025簡明版)

&#x1f9ed; 一、文件和目錄操作命令說明ls列出目錄內容ls -l以列表形式顯示&#xff08;含權限&#xff09;cd /path切換目錄pwd顯示當前路徑mkdir dir創建目錄mkdir -p dir/subdir遞歸創建目錄rm file刪除文件rm -r dir刪除目錄&#xff08;遞歸&#xff09;rm -rf dir強制…

React Ref 指南:原理、實現與實踐

前言 React Ref&#xff08;引用&#xff09;是React中一個強大而重要的概念&#xff0c;它為我們提供了直接訪問DOM元素或組件實例的能力。雖然React推崇聲明式編程和數據驅動的理念&#xff0c;但在某些場景下&#xff0c;我們仍需要直接操作DOM或訪問組件實例。本文將深入探…

4.權重衰減(weight decay)

4.1 手動實現權重衰減 import torch from torch import nn from torch.utils.data import TensorDataset,DataLoader import matplotlib.pyplot as plt def synthetic_data(w,b,num_inputs):Xtorch.normal(0,1,size(num_inputs,w.shape[0]))yXwbytorch.normal(0,0.1,sizey.shap…

OpenCV開發-初始概念

第一章 OpenCV核心架構解析1.1 計算機視覺的基石OpenCV&#xff08;Open Source Computer Vision Library&#xff09;作為跨平臺計算機視覺庫&#xff0c;自1999年由Intel發起&#xff0c;已成為圖像處理領域的標準工具。其核心價值體現在&#xff1a;跨平臺性&#xff1a;支持…

LeetCode 930.和相同的二元子數組

給你一個二元數組 nums &#xff0c;和一個整數 goal &#xff0c;請你統計并返回有多少個和為 goal 的 非空 子數組。 子數組 是數組的一段連續部分。 示例 1&#xff1a; 輸入&#xff1a;nums [1,0,1,0,1], goal 2 輸出&#xff1a;4 解釋&#xff1a; 有 4 個滿足題目要求…

【論文解讀】Referring Camouflaged Object Detection

論文信息 論文題目&#xff1a;Referring Camouflaged Object Detection 論文鏈接&#xff1a;https://arxiv.org/pdf/2306.07532 代碼鏈接&#xff1a;https://github.com/zhangxuying1004/RefCOD 錄用期刊&#xff1a;TPAMI 2025 論文單位&#xff1a;南開大學 ps&#xff1a…

Spring中過濾器和攔截器的區別及具體實現

在 Spring 框架中&#xff0c;過濾器&#xff08;Filter&#xff09; 和 攔截器&#xff08;Interceptor&#xff09; 都是用于處理 HTTP 請求的中間件&#xff0c;但它們在作用范圍、實現方式和生命周期上有顯著區別。以下是詳細對比和實現方式&#xff1a;核心區別特性過濾器…

CANFD 數據記錄儀在新能源汽車售后維修中的應用

一、前言隨著新能源汽車市場如火如荼和新能源汽車電子系統的日益復雜&#xff0c;傳統維修手段在面對復雜和偶發故障時往往捉襟見肘&#xff0c;CANFD 數據記錄儀則憑借其獨特優勢&#xff0c;為售后維修帶來新的解決方案。二、 詳細介紹在新能源汽車領域&#xff0c;CANFD 數據…

某當CRM XlsFileUpload存在任意文件上傳(CNVD-2025-10982)

免責聲明 本文檔所述漏洞詳情及復現方法僅限用于合法授權的安全研究和學術教育用途。任何個人或組織不得利用本文內容從事未經許可的滲透測試、網絡攻擊或其他違法行為。使用者應確保其行為符合相關法律法規,并取得目標系統的明確授權。 前言: 我們建立了一個更多,更全的…

自然語言處理與實踐

文章目錄Lesson1&#xff1a;Introduction to NLP、NLP 基礎與文本預處理1.教材2.自然語言處理概述(1)NLP 的定義、發展歷程與應用場景(2)NLP 的主要任務&#xff1a;分詞、詞性標注、命名實體識別、句法分析等2.文本預處理3.文本表示方法&#xff1a;詞向量表示/詞表征Lesson2…