這次主要選用RabbitMQ消息隊列來對判題服務和題目服務解耦,題目服務只需要向消息隊列發送消息,判題服務從消息隊列中取信息去執行判題,然后異步更新數據庫即可。
五一寶寶請快點跑~~~~~
先回顧一下RabbitMQ
(1)引入依賴
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.2</version>
</dependency>
(2)在yml中引入配置
spring:rabbitmq:host: localhostport: 5672password: guestusername: guest
(3)創建交換機和隊列
/*** 用于創建測試程序用到的交換機和隊列(只用在程序啟動前執行一次)*/
public class MqInitMain {public static void main(String[] args) {try {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();String EXCHANGE_NAME = "code_exchange";channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 創建隊列,隨機分配一個隊列名稱String queueName = "code_queue";channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "my_routingKey");} catch (Exception e) {}}
}
(4)生產者代碼
@Component
public class MyMessageProducer {@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage(String exchange, String routingKey, String message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);}}
(5)消費者代碼
@Component
@Slf4j
public class MyMessageConsumer {// 指定程序監聽的消息隊列和確認機制@SneakyThrows@RabbitListener(queues = {"code_queue"}, ackMode = "MANUAL")public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {log.info("receiveMessage message = {}", message);channel.basicAck(deliveryTag, false);}}
(6)單元測試執行
@SpringBootTest
class MyMessageProducerTest {@Resourceprivate MyMessageProducer myMessageProducer;@Testvoid sendMessage() {myMessageProducer.sendMessage("code_exchange", "my_routingKey", "你好呀");}
}
項目實踐
要傳遞的消息是什么?題目提交 id
題目服務中,把原本的本地異步執行改為向消息隊列發送消息:
// 發送消息
myMessageProducer.sendMessage("code_exchange", "my_routingKey", String.valueOf(questionSubmitId));
// 執行判題服務
// CompletableFuture.runAsync(() -> {
// judgeFeignClient.doJudge(questionSubmitId);
// });
判題服務中,監聽消息,執行判題:
@Component
@Slf4j
public class MyMessageConsumer {@Resourceprivate JudgeService judgeService;// 指定程序監聽的消息隊列和確認機制@SneakyThrows@RabbitListener(queues = {"code_queue"}, ackMode = "MANUAL")public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {log.info("receiveMessage message = {}", message);long questionSubmitId = Long.parseLong(message);try {judgeService.doJudge(questionSubmitId);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}}
消息確認機制:basicAck
?basicNack
在 RabbitMQ 中,消息從生產者發送到隊列,然后被消費者消費。為了確保消息被正確處理,RabbitMQ 提供了一種機制,讓消費者可以確認消息是否已經被成功處理。這就是 消息確認機制。
basicAck:
作用:消費者告訴 RabbitMQ,我已經成功處理了這條消息。
參數:
(1)deliveryTag
:消息的唯一標識符,RabbitMQ 用它來跟蹤每條消息。
(2)multiple
:布爾值,表示是否確認多個消息。如果為 true
,則確認所有小于等于deliveryTag
的消息;如果為 false
,則只確認當前的 deliveryTag
消息。
?示例:
channel.basicAck(deliveryTag, false);
?這行代碼的意思是:“我成功處理了 deliveryTag
指定的消息,請從隊列中移除它。”
basicNack:
作用:消費者告訴 RabbitMQ,我無法處理這條消息。
參數:
(1)deliveryTag
:消息的唯一標識符。
(2)multiple
:布爾值,表示是否拒絕多個消息。如果為 true
,則拒絕所有小于等于 deliveryTag
的消息;如果為 false
,則只拒絕當前的 deliveryTag
消息。
(3)requeue
:布爾值,表示是否將消息重新放回隊列。如果為 true
,則將消息重新放回隊列,等待其他消費者處理;如果為 false
,則丟棄這條消息。
示例:
channel.basicNack(deliveryTag, false, false);
這行代碼的意思是:“我無法處理 deliveryTag
指定的消息,請丟棄它。”
總結
basicAck
:確認消息已成功處理,從隊列中移除。
basicNack
:確認消息處理失敗,可以選擇重新放回隊列或丟棄。