spring-kafka消費異常處理

默認的消費異常處理

默認情況下,如果程序沒有顯式做任何的異常處理,spring-kafka會提供一個默認的DefaultErrorHandler, 它會使用FixedBackOff做重試,會不間斷的連續重試最多9次,也就是說一個消息最多會被消費10次。如果重試次數耗盡,最終會在控制臺打印異常,并且會提交offset,也就是說這條消息就被丟棄了。
舉個例子:
發消息

@GetMapping("send/{msg}")public String send(@PathVariable("msg")String msg){CompletableFuture future = kafkaTemplate.send("test-topic", msg);try{future.get();log.info("消息發送成功");}catch(Exception e){e.printStackTrace();}return "OK";}

收消息

@Component
public class DemoListener {private static Logger log = LoggerFactory.getLogger(DemoListener.class);@KafkaListener(topics = {"test-topic"})public void onMessage(ConsumerRecord record){Object value = record.value();log.info("收到消息:{}", value);throw new RuntimeException("manually throw");}
}

kafka的配置

spring:kafka:bootstrap-servers: localhost:9092  # Kafka服務器地址consumer:group-id: my-group  # 默認的消費者組IDauto-offset-reset: earliest  # 如果沒有初始偏移量或偏移量已失效,從最早的消息開始讀取key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

現在發一條消息做測試,控制臺輸出如下:

2025-09-14T10:26:27.508+08:00  INFO 5912 --- [nio-8080-exec-1] c.g.xjs.kafka.controller.DemoController  : 消息發送成功
2025-09-14T10:26:27.509+08:00  INFO 5912 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
......
2025-09-14T10:26:31.666+08:00  INFO 5912 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
2025-09-14T10:26:31.680+08:00  INFO 5912 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 6 for partition test-topic-0
2025-09-14T10:26:31.680+08:00  INFO 5912 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T10:26:32.174+08:00  INFO 5912 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
2025-09-14T10:26:32.182+08:00 ERROR 5912 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for test-topic-0@6org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.github.xjs.kafka.listener.DemoListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord)' threw exceptionat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2996) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2903) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2867) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2779) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2616) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2505) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2151) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1527) ~[spring-kafka-3.3.3.jar:3.3.3]

自定義重試邏輯

我們可以自定義一個DefaultErrorHandler的bean來自定義重試邏輯,比如:

@Bean
public DefaultErrorHandler errorHandler(){ExponentialBackOff backOff = new ExponentialBackOff();// 最大的重試間隔,默認是30秒backOff.setMaxInterval(30000);// 初始的重試間隔,默認是2秒backOff.setInitialInterval(3000);// 間隔倍數,下一次間隔 = 當前間隔 * 間隔倍數,默認是1.5backOff.setMultiplier(3);// 最大重試次數, 默認無限制重試,如果按照默認配置,首次重試隔2秒,下一次隔(2*1.5)3秒,以此類推backOff.setMaxAttempts(2);return new DefaultErrorHandler(null,backOff);
}

現在重新發一條消息,控制臺輸出:

2025-09-14T10:42:32.069+08:00  INFO 1288 --- [nio-8080-exec-1] c.g.xjs.kafka.controller.DemoController  : 消息發送成功
2025-09-14T10:42:32.070+08:00  INFO 1288 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
2025-09-14T10:42:35.128+08:00  INFO 1288 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 8 for partition test-topic-0
2025-09-14T10:42:35.129+08:00  INFO 1288 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T10:42:35.131+08:00  INFO 1288 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
2025-09-14T10:42:44.193+08:00  INFO 1288 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 8 for partition test-topic-0
2025-09-14T10:42:44.193+08:00  INFO 1288 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T10:42:44.195+08:00  INFO 1288 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
2025-09-14T10:42:44.199+08:00 ERROR 1288 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Backoff ExponentialBackOffExecution{currentInterval=9000ms, multiplier=3.0, attempts=2} exhausted for test-topic-0@8org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.github.xjs.kafka.listener.DemoListener.onMessage(org.apache.kafka.clients.consumer.ConsumerRecord)' threw exceptionat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2996) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2903) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2867) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2779) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2616) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2505) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2151) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1527) ~[spring-kafka-3.3.3.jar:3.3.3]

可以看到,消息總共被接受了3次,包含了2次重試,第一次是在3每秒以后,第二次是在9秒以后。
除了ExponentialBackOff 之外,常見的還有ExponentialBackOffWithMaxRetriesFixedBackOff,當然也可以自定義。
ExponentialBackOff 默認無限重試,默認的最大重試間隔是30秒,如果超過了30秒則按30秒算。
ExponentialBackOffWithMaxRetries可以設置最大的重試次數。
FixedBackOff是固定時間間隔,默認是5秒,默認沒有重試次數限制。

隊頭阻塞與消息丟失問題

上面介紹的異常處理方式存在2個非常嚴重的問題,一個是隊頭阻塞問題,另一個是消息丟失問題。所謂的隊頭阻塞問題,就是說當一條消息在進行重試的時候,就算topic中有了新的消息,消費者也無法消費到,因為消費者線程會以阻塞的方式進行重試,重試結束以后才可以繼續后面消息的消費,如果重試時間很長就會導致后面的消息長時間得不到消費。消息丟失就很好理解了,重試次數耗盡以后,僅僅是打印一條錯誤的日志,更好的處理辦法是把錯誤的消息發送給死信Topic,然后交由人工進行后續處理。接下來先來處理下消息丟失的問題。

死信Topic

在構造DefaultErrorHandler的時候,還有一個參數是ConsumerRecordRecoverer,如果我們提供了這個recover,那么重試次數耗盡以后,消息會被傳遞給這個recover,我們就可以把消費失敗的消息重新投遞到DLT中。
幸運的是,spring-kafka已經提供了一個DeadLetterPublishingRecoverer就可以實現這個功能。
下面我們重寫下DefaultErrorHandler :

@Beanpublic DefaultErrorHandler errorHandler(KafkaTemplate kafkaTemplate){var recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,(cr, e)-> new TopicPartition(cr.topic()+".DLT", cr.partition()));ExponentialBackOff backOff = new ExponentialBackOff();backOff.setMaxInterval(30000);backOff.setInitialInterval(3000);backOff.setMultiplier(3);backOff.setMaxAttempts(2);return new DefaultErrorHandler(recoverer,backOff);}

在構造DeadLetterPublishingRecoverer的時候,需要用到KafkaTemplate ,同時我們需要設置DLT的topic和partition。
現在我們重新發一個消息,控制臺的日志:

2025-09-14T11:17:48.532+08:00  INFO 9804 --- [nio-8080-exec-4] c.g.xjs.kafka.controller.DemoController  : 消息發送成功
2025-09-14T11:17:48.533+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
2025-09-14T11:17:51.609+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 10 for partition test-topic-0
2025-09-14T11:17:51.609+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:17:51.611+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello
2025-09-14T11:18:00.708+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 10 for partition test-topic-0
2025-09-14T11:18:00.708+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:18:00.710+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:hello

這次就沒有異常拋出,而且我們可以從DLT中看到消息:

D:\kafka_2.12-3.9.1> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic test-topic.DLT 
hello

非阻塞重試

還是使用上面的代碼,我們連續發送2條消息,控制臺輸出如下:

2025-09-14T11:24:02.837+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:1111
2025-09-14T11:24:03.869+08:00  INFO 9804 --- [nio-8080-exec-8] c.g.xjs.kafka.controller.DemoController  : 消息發送成功
2025-09-14T11:24:05.914+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 11 for partition test-topic-0
2025-09-14T11:24:05.914+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:24:05.915+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:1111
2025-09-14T11:24:14.963+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 11 for partition test-topic-0
2025-09-14T11:24:14.963+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:24:14.965+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:1111
2025-09-14T11:24:15.470+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 12 for partition test-topic-0
2025-09-14T11:24:15.473+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:2222
2025-09-14T11:24:18.553+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 12 for partition test-topic-0
2025-09-14T11:24:18.553+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:24:18.554+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:2222
2025-09-14T11:24:27.609+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 12 for partition test-topic-0
2025-09-14T11:24:27.609+08:00  INFO 9804 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:24:27.611+08:00  INFO 9804 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:2222
2025-09-14T11:24:28.635+08:00  INFO 9804 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-my-group-1, groupId=my-group] Node -1 disconnected.
2025-09-14T11:24:58.128+08:00  INFO 9804 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node -1 disconnected.

可以看出來,雖然消息是同時發出的,但是第一條消息重試期間,第二條消息是無法得到消費的。只有第一條消息的重試次數耗盡以后,第二條消息才有機會被消費。如果重試時間間隔和次數都比較大,這種阻塞式的重試就不合適了。

下面我們來看下如何使用非阻塞重試:

@Configuration
@EnableKafkaRetryTopic //non-blocking:1
public class KafkaConfiguration {// non-blocking:2@BeanTaskScheduler scheduler() {return new ThreadPoolTaskScheduler();}// non-blocking:3@Beanpublic RetryTopicConfiguration myRetryConfiguration(KafkaTemplate<String, String> template) {return RetryTopicConfigurationBuilder.newInstance().exponentialBackoff(3000, 10, Long.MAX_VALUE).maxAttempts(3).dltSuffix(".DLT").create(template);}}
  • 首先添加@EnableKafkaRetryTopic 注解
  • 然后提供一個TaskScheduler 的實例
  • 最后提供RetryTopicConfiguration 的實例

現在重啟應用,連續發送2個消息再次觀察控制臺輸出:

2025-09-14T11:44:40.303+08:00  INFO 5380 --- [nio-8080-exec-8] c.g.xjs.kafka.controller.DemoController  : 消息發送成功
2025-09-14T11:44:40.304+08:00  INFO 5380 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:3333
2025-09-14T11:44:40.817+08:00  INFO 5380 --- [etry-3000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-3, groupId=my-group] Seeking to offset 3 for partition test-topic-retry-3000-0
2025-09-14T11:44:40.817+08:00  INFO 5380 --- [etry-3000-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:44:41.284+08:00  INFO 5380 --- [nio-8080-exec-5] c.g.xjs.kafka.controller.DemoController  : 消息發送成功
2025-09-14T11:44:41.284+08:00  INFO 5380 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:4444
2025-09-14T11:44:43.316+08:00  INFO 5380 --- [etry-3000-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:3333
2025-09-14T11:44:43.826+08:00  INFO 5380 --- [etry-3000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-3, groupId=my-group] Seeking to offset 4 for partition test-topic-retry-3000-0
2025-09-14T11:44:43.826+08:00  INFO 5380 --- [try-30000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-1, groupId=my-group] Seeking to offset 3 for partition test-topic-retry-30000-0
2025-09-14T11:44:43.826+08:00  INFO 5380 --- [try-30000-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:44:43.828+08:00  INFO 5380 --- [etry-3000-0-C-1] o.a.k.c.c.internals.LegacyKafkaConsumer  : [Consumer clientId=consumer-my-group-3, groupId=my-group] Seeking to offset 4 for partition test-topic-retry-3000-0
2025-09-14T11:44:43.828+08:00  INFO 5380 --- [etry-3000-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Record in retry and not yet recovered
2025-09-14T11:44:44.332+08:00  INFO 5380 --- [etry-3000-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:4444
2025-09-14T11:45:13.334+08:00  INFO 5380 --- [try-30000-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:3333
2025-09-14T11:45:13.334+08:00 ERROR 5380 --- [try-30000-0-C-1] k.r.DeadLetterPublishingRecovererFactory : Record: topic = test-topic-retry-30000, partition = 0, offset = 3, main topic = test-topic threw an error at topic test-topic-retry-30000 and won't be retried. Sending to DLT with name test-topic.DLT.org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failedat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:3000) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2903) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2867) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2779) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2616) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2505) ~[spring-kafka-3.3.3.jar:3.3.3]at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2151) ~[spring-kafka-3.3.3.jar:3.3.3]at

可以看到再也不會存在隊頭阻塞問題,并且消息也成功投遞到了DLT中:

D:\kafka_2.12-3.9.1> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic test-topic.DLT 
3333
4444

非阻塞重試的原理

我們查看下kafka中的topic列表:

D:\kafka_2.12-3.9.1> .\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
__consumer_offsets
test-topic
test-topic-retry-3000
test-topic-retry-30000
test-topic.DLT

此時會發現多出來2個帶retry的topic:test-topic-retry-3000 和 test-topic-retry-30000。

如果消息處理失敗,該消息會被轉發到一個retry的topic。消費者會檢查時間戳,如果尚未到達重試時間,則會暫停該主題分區的消費。當到達重試時間時,分區消費會恢復,消息會被再次消費。這也是為啥我們要配置一個TaskScheduler的原因。如果消息處理再次失敗,消息將被轉發到下一個重試主題,重復此模式直到處理成功,或者重試次數用盡,最后消息被發送到DLT。

以我們的案例來說,采用初始3秒的指數退避策略,乘數為10,最大重試3-1=2次,系統將自動創建test-topic-retry-3000和test-topic-retry-30000和test-topic.DLT。

參考:https://docs.spring.io/spring-kafka/reference/index.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/923577.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/923577.shtml
英文地址,請注明出處:http://en.pswp.cn/news/923577.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

leecode73 矩陣置零

我的思路 這個題目不難&#xff0c;就是一句話&#xff0c;遍歷這個矩陣的時候&#xff0c;當遇到0的時候就把該行該列改為0&#xff0c;同時為了不影響后續的遍歷&#xff0c;我們可以將這個遍歷和修改分為兩個數組。使用mn的輔助空間 class Solution {public void setZeroe…

Spring Boot 與前端文件上傳跨域問題:Multipart、CORS 與網關配置

前言在前后端分離架構下&#xff0c;文件上傳是一個常見功能。但在 Spring Boot 項目中&#xff0c;我們經常會遇到前端調用接口上傳文件時出現 跨域問題&#xff0c;表現為&#xff1a;瀏覽器控制臺報錯&#xff1a;Access-Control-Allow-Origin 缺失或不匹配。使用 FormData …

快速解決云服務器的數據庫PhpMyAdmin登錄問題

打開PhpMyAdmin數據庫管理器登錄頁面賬號密碼就是你的用戶名&#xff08;如YiXun&#xff09;和密碼注意&#xff1a;root賬戶的密碼&#xff0c;點擊下面的“root密碼”即能看到或修改PhpMyAdmin無法打開如果打不開&#xff1a;在數據庫&#xff0c;點擊PHPMyAdmin&#xff0c…

vite+vue3中使用FFmpeg@0.12.15實現視頻編輯功能,不依賴SharedArrayBuffer!!!

FFmpeg0.12.15完全不依賴SharedArrayBuffer!!!強烈推薦使用 本文章主要是在vitevue3項目中使用FFmpeg&#xff0c;只展示了如何在項目中引入和基礎的使用 更多詳細參數可參照 ffmpeg官網https://ffmpeg.org/ 一、安裝FFmpeg 可通過npm直接安裝 npm install ffmpeg/core0.12.10…

構網型5MW中壓儲能變流升壓一體機技術方案

1 構網型儲能背景概述1.1 新型電力系統亟需構網支撐眾所周知&#xff0c;新型電力系統具有兩高特征&#xff1a;高比例新能源大規模并網、高比例電力電子大范圍接入。近年來風光裝機占比越來越高&#xff0c;而傳統火電裝機占比越來越低&#xff0c;并在2023年首次降至50%以下…

SRE 系列(七)| 從技術架構到團隊組織

目錄SRE落地與組織架構實踐技術架構與組織架構的匹配技術架構示例運維職責分工技術保障體系SRE 多角色團隊總結SRE落地與組織架構實踐 在落地 SRE 時&#xff0c;很多團隊最關心的問題之一就是組織架構&#xff1a;我們究竟需要怎樣的團隊形態&#xff0c;才能支撐微服務和分…

香港期權市場的主要參與者有哪些?

本文主要介紹香港期權市場的主要參與者有哪些&#xff1f;香港期權市場作為全球重要的金融衍生品市場&#xff0c;其參與者結構呈現多元化、專業化的特征&#xff0c;主要涵蓋以下核心群體。香港期權市場的主要參與者有哪些&#xff1f;1. 機構投資者&#xff08;主導力量&…

搜維爾科技:全身可穿戴Teslasuit動捕服的功能,自立式FES裝置

功能性電刺激 (FES) 設備廣泛應用于康復和醫療實踐。其底層技術利用低能量電脈沖&#xff0c;在中風、脊髓損傷、多發性硬化癥、腦癱等各種疾病患者中人工產生身體運動。一般來說&#xff0c;FES系統可以分為三類&#xff1a;開環、有限狀態控制和閉環方法。這三種方法描述了 F…

【深度學習新浪潮】MoE是什么技術?

混合專家模型(Mixture of Experts,MoE)是大模型時代提升計算效率與模型能力的核心技術之一。其核心思想是將復雜任務分解為多個子任務,通過動態路由機制激活特定專家網絡處理輸入數據,從而在保持模型容量的同時大幅降低計算成本。以下是技術細節與實際應用的深度解析: 一…

Java進階教程,全面剖析Java多線程編程,實現Callable接口實現多線程,筆記05

Java進階教程&#xff0c;全面剖析Java多線程編程&#xff0c;實現Callable接口實現多線程&#xff0c;筆記05 參考資料 多線程&JUC-05-多線程的第三種實現方式一、實現Callable接口實現多線程 二、三種方式對比 優點缺點繼承Thread類編程比較簡單&#xff0c;可以直接使…

軌道交通絕緣監測—軌道交通安全的隱形防線

軌道交通絕緣監測作為保障行車安全的核心環節&#xff0c;正面臨多重技術與環境挑戰。復雜運營環境是首要痛點&#xff0c;隧道內高濕度&#xff08;月均濕度達95%&#xff09;會增大鋼軌表面電導率&#xff0c;霧氣中的鹽分更會加速扣件絕緣性能下降&#xff0c;導致過渡電阻驟…

tar-符號連接(軟連接)

1.符號連接是什么符號鏈接&#xff08;symbolic link&#xff0c;也叫軟鏈接&#xff09;本質上是一個 指向路徑的特殊文件。例如&#xff1a;ln -s /etc/passwd passwd_link這會創建一個叫 passwd_link 的文件&#xff0c;但它本身不存放 /etc/passwd 的內容&#xff0c;而是存…

ffmpeg切割音頻

ffmpeg切割音頻 我希望對指定音頻切割&#xff0c;按照開始時間&#xff0c;結束時間&#xff0c;切割成新文件&#xff0c;自動保存&#xff0c;非常好用 step1: from pydub import AudioSegment import os# 配置FFmpeg路徑&#xff08;確保路徑正確&#xff09; ffmpeg_path …

Python 批量處理:Markdown 與 HTML 格式相互轉換

文章目錄引言與同類工具的優勢對比Python 將 Markdown 轉換為 HTMLPython 將 HTML 轉換為 Markdown批量轉換與自動化處理引言 在多平臺內容分發與管理的場景中&#xff0c;文檔格式轉換已成為內容生態系統中的關鍵環節。Markdown 作為輕量級標記語言&#xff0c;以其語法簡潔、…

御控物聯網遠程控制水泵啟停智能自控解決方案

在農業灌溉、城市排水、工業供水等場景中&#xff0c;水泵作為核心設備&#xff0c;長期面臨以下難題&#xff1a;人工依賴度高&#xff1a;需24小時值守&#xff0c;暴雨或干旱時響應滯后&#xff1b; 能耗浪費嚴重&#xff1a;空轉、過載運行導致電費居高不下&#xff1b; …

RedisI/O多路復用:單線程網絡模型epoll工作流程

epoll1. 在內核創建eventpoll結構體&#xff0c;返回句柄epfd&#xff08;唯一標識&#xff09;eventpoll包含存放被監聽的fd的紅黑樹&#xff0c;和存放已就緒的fd的鏈表2. 將要監聽的fd加入到epoll紅黑樹中&#xff0c;并設置callback回調函數callback觸發時&#xff0c;就將…

SmartBear API Hub助力MCP開發,無縫、安全的連接AI與外部工具

人工智能&#xff08;AI&#xff09;技術的應用場景日益廣泛&#xff0c;如何讓不同的AI系統之間實現高效、無縫的交互&#xff0c;成為了業界的重要課題。隨著人工智能技術的不斷進步&#xff0c;模型上下文協議&#xff08;MCP&#xff09;應運而生。MCP為不同AI系統之間提供…

如何選擇高性價比的iOS簽名服務?關鍵因素與價格區間

作為一名摸爬滾打多年的開發者&#xff0c;我來和你聊聊怎么挑一個靠譜又不坑的iOS簽名服務。這玩意兒選不好&#xff0c;輕則測試團隊干瞪眼&#xff0c;重則App下架&#xff0c;用戶投訴&#xff0c;簡直是我們開發者的噩夢。別光看價格&#xff01;先想清楚你的核心需求在選…

MoonBit 正式加入 WebAssembly Component Model 官方文檔 !

我們非常高興地宣布&#xff0c;MoonBit 已正式收錄在 WebAssembly Component Model 的官方文檔中。這不僅是對 MoonBit 技術路線的一次肯定&#xff0c;也讓我們有機會和 Rust、Go、C# 等語言一起&#xff0c;出現在開發者查閱組件模型的入口頁面中。一、 關于 WebAssembly Co…

Python快速入門專業版(三十二):匿名函數:lambda表達式的簡潔用法(結合filter/map)

目錄引一、lambda表達式的基本語法&#xff1a;一行代碼定義函數示例1&#xff1a;lambda表達式與普通函數的對比二、lambda表達式的應用場景&#xff1a;臨時與靈活1. 臨時使用&#xff1a;無需定義函數名的簡單功能2. 作為參數傳遞給高階函數三、結合filter()&#xff1a;篩選…