文章目錄
- 問題
- 示例
- 異常
- 原因
- nack方法
- Acknowledgment接口
- 實現類:ConsumerAcknowledgment
- 實現類:ConsumerBatchAcknowledgment
- 解決方案
- 1 批量消費指定index
- 示例
- 2 單條消費
- 示例
問題
使用BatchAcknowledgingMessageListener 批量消費Kafka消息,成功則手動提交offset,失敗則重試。消費成功的情況下沒有問題,但消費失敗情況下,調用nack方法重試時則報異常。
示例
public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {private MessageHandler messageHandler;public BatchCustomMessageListener(MessageHandler messageHandler) {this.messageHandler = messageHandler;}@Overridepublic void onMessage(List data, Acknowledgment acknowledgment) {try {messageHandler.handle(data); // 處理多條消息acknowledgment.acknowledge(); // 成功處理后提交偏移量} catch (Exception e) {// 消息處理失敗,30min后重試// nack作用:將會在指定sleep時間后,重新消費消息。在sleep期間內,不會消費新消息。acknowledgment.nack(30 * 60 * 1000); // 這里報了異常}}
}
上邊的代碼乍一看沒啥問題,編譯,啟動也都沒報錯。但是在執行nack的時候進到了Acknowledgment接口默認nack(sleep) 方法里邊,并拋出異常。
nack(sleep) is not supported by this Acknowledgment
異常
原因
Acknowledgment接口有兩個nack方法:nack(long sleep) 和nack(int index, long sleep), 以及兩個實現類ConsumerAcknowledgment和ConsumerBatchAcknowledgment。ConsumerAcknowledgment僅實現了nack(long sleep),而ConsumerBatchAcknowledgment僅實現了nack(int index, long sleep)。
nack方法
注意:調用nack方法后,將會在指定sleep時間后,重新消費消息。在sleep期間內,不會消費新消息。
Acknowledgment接口
實現類:ConsumerAcknowledgment
實現類:ConsumerBatchAcknowledgment
這樣的設計也很好理解。。
當BatchAcknowledgingMessageListener批量消費消息時, 使用的是ConsumerBatchAcknowledgment,重試時需要告訴ConsumerBatchAcknowledgment要從這批量消息中的哪條開始重試消費,即要指定index值。我的例子中調用的是nack(long sleep),沒有指定index,所以進到了默認方法里,拋了異常。
而使用AcknowledgingMessageListener消費單條消息時,使用的是ConsumerAcknowledgment,重試時它知道重試當前的消息,因為就這一條,所以只需要指定重試時間就可以了。
也就是說批量消費時,重試要調用nack(int index, long sleep),單條消費時,重試要調用nack(long sleep),二者不搭配,就會拋不支持該方法的異常。
解決方案
1 批量消費指定index
示例
public class BatchCustomMessageListener implements BatchAcknowledgingMessageListener {private MessageHandler messageHandler;public BatchCustomMessageListener(MessageHandler messageHandler) {this.messageHandler = messageHandler;}@Overridepublic void onMessage(List data, Acknowledgment acknowledgment) {int index = 0;try {for (; index < data.size(); index++) {messageHandler.handle(data.get(index)); // 處理單條消息}// 成功處理后提交偏移量acknowledgment.acknowledge();} catch (Exception e) {// 消息處理失敗,30min后重試index及index之后的消息acknowledgment.nack(index, 30 * 60 * 1000);}}}
2 單條消費
改成單條消費消息,調用nack(long sleep)
示例
public class SingleCustomMessageListener implements AcknowledgingMessageListener {private MessageHandler messageHandler;public SingleCustomMessageListener(MessageHandler messageHandler) {this.messageHandler = messageHandler;}@Overridepublic void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {try {messageHandler.handle(data); // 處理單條消息acknowledgment.acknowledge();} catch (Exception e) {acknowledgment.nack(30 * 60 * 1000);}}