目錄
RabbitMQ保證消息可靠性
生產者丟失消息
MQ丟失消息
消費端丟失了數據
Kakfa的消息可靠性
生產者的消息可靠性
Kakfa的消息可靠性
消費者的消息可靠性
RabbitMQ保證消息可靠性
生產者丟失消息
1.事務消息保證
生產者在發送消息之前,開啟事務消息隨后生產者發送消息,消息發送之后,如果消息沒有被MQ接收到的話,生產者會收到異常報錯,生產者回滾事務,然后重試消息,如果收到了消息,就能提交事務了
@Autowired
private RabbitTemplate rabbitTemplate;public void sendTransactionalMessage() {ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();Channel channel = connectionFactory.createConnection().createChannel(false);try {channel.txSelect(); // 開啟事務channel.basicPublish("exchange", "routing.key", null, "message".getBytes());channel.txCommit(); // 提交事務} catch (Exception e) {channel.txRollback(); // 出錯回滾}
}
2.使用confirm機制
- 普通confirm機制,就是發送消息之后,等待服務器confirm之后再發送下一個消息
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息成功發送到Broker");} else {System.out.println("消息發送失敗,原因:" + cause);}
});rabbitTemplate.convertAndSend("exchange", "routing.key", "message");
- 批量confirm機制,每發送一批消息之后,等待服務器confirm
Channel channel = connection.createChannel(false);
channel.confirmSelect();for (int i = 0; i < 100; i++) {channel.basicPublish("exchange", "routing.key", null, ("msg" + i).getBytes());
}
channel.waitForConfirms(); // 等待所有消息確認
- 異步confirm機制,服務器confirm一個或者多個消息之后,客戶端(生產者)能夠通過回調函數來確定消息是否被confirm(推薦)
SortedSet<Long> pendingSet = Collections.synchronizedSortedSet(new TreeSet<>());
channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {public void handleAck(long tag, boolean multiple) {if (multiple) pendingSet.headSet(tag + 1).clear();else pendingSet.remove(tag);}public void handleNack(long tag, boolean multiple) {System.err.println("未確認消息:" + tag);if (multiple) pendingSet.headSet(tag + 1).clear();else pendingSet.remove(tag);}
});while (true) {long seq = channel.getNextPublishSeqNo();channel.basicPublish("demo.exchange", "demo.key",MessageProperties.PERSISTENT_TEXT_PLAIN, "hello".getBytes());pendingSet.add(seq);
}
MQ丟失消息
防止MQ的丟失數據的話,方法就是開啟RabbitMQ的持久化,消息寫入之后(也就是到了MQ之后)就直接持久化到磁盤中,即使Rabbimq自己掛了之后,會恢復數據。
設置持久化步驟
- 創建queue的時候直接設置持久化,此時就能持久化queue的元數據(不是消息)
@Bean
public Queue durableQueue() {return new Queue("myQueue", true); // true 表示持久化
}
- 發送消息的時候指定消息為deliveryMode設置為2,也就是設置消息為持久化,此時消息可以持久化磁盤上
MessageProperties props = new MessageProperties();
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("content".getBytes(), props);
rabbitTemplate.send("exchange", "routing.key", message);
極端情況:
消息寫到RabbitMQ之后,但是還沒有持久化到磁盤之后直接掛了,導致內存中消息丟失。
解決方法:持久化與生產者的confirm機制配合,當且僅當持久化了消息之后,再confirm,避免數據與消息丟失,此時生產者收不到ack,也是可以自己重發
消費端丟失了數據
意思就是消息已經拉取到了信息,還沒有處理(注意這是已經告訴MQ我拉取到數據了),結果進程掛了,重啟之后繼續消費下一條消息,導致中間的這一條沒有消費到,此時數據丟失了。
利用ack機制處理
取消RabbiMQ的自動ack,也就是一個api,可以在消費端消費完了消息之后再調用api告訴MQ我們收到并且處理了該消息。如果沒有返回ack,RabbitMQ會把該消息分配給其他的consumer處理,消息不會丟失。通過配置處理
spring:rabbitmq:listener:simple:acknowledge-mode: manual
Kakfa的消息可靠性
生產者的消息可靠性
在kafka中,可以在producer(生產段)設置一個參數,也就是ack=all,要求每個數據,必須寫入所有的replica(也就是所有該分區的副本),才認為是接收成功。該參數設置的是你的leader接收到消息后,所有的follower都同步到消息后才認為寫成功
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // 等待所有副本確認
props.put("retries", Integer.MAX_VALUE); // 無限重試
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");producer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("發送成功:" + metadata.offset());} else {exception.printStackTrace();}
});
producer.close();
Kakfa的消息可靠性
kafka默認是會將消息持久化到磁盤上的,但是還是有情況會導致丟失數據
kafka某個broker宕機,隨后重新選舉partition的leader。倘若在該broker中的partition中的leader副本中的消息,還沒有被其他broker中的follower同步,此時同步缺失的數據就丟失了,也就是少了一些數據
解決方法:
- 給 topic 設置
replication.factor
參數:這個值必須大于 1,要求每個 partition 必須有至少 2 個副本。 - 在 Kafka 服務端設置
min.insync.replicas
參數:這個值必須大于 1,這個是要求一個 leader 至少感知到有至少一個 follower 還跟自己保持聯系。 - 在 producer 端設置
acks=all
:這個是要求每條數據,必須是寫入所有 replica 之后,才能認為是寫成功了。 - 在 producer 端設置
retries=MAX
(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這里了。
按照上面的配置之后,leader的切換就不會導致數據缺失了。
消費者的消息可靠性
唯一可能也是類似于RabbitMQ中的,也就是說你消費到該消息的時候,消費者自動提交offset,讓kafka以為你消費好了該消息,但是自己還沒處理就宕機后,會導致重啟后沒有消費該消息。
解決方法:
關閉kafka默認的自動提交offset,通過消費端業務邏輯處理完消息后,再手動提交offset,當然這里就是會導致重復消費了,這里就是冪等性的問題了。比如你剛處理完,還沒提交 offset,結果自己掛了,此時肯定會重復消費一次
手動提交api:consumer.commitSync();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "false"); // 關閉自動提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 處理消息System.out.println("處理消息:" + record.value());}// 手動提交 offsetconsumer.commitSync();
}