基礎模塊
? ? ? ? 1. 郵箱發送功能
? ? ? ?最初設計的接口 (雛形)
public interface EmailService {/*** 發送驗證碼郵件** @param email 目標郵箱* @return 發送的code* @throws RuntimeException 如果發送郵件失敗,將拋出異常*/String sendVerificationCode(String email);/*** 校驗驗證碼是否正確** @param email 郵箱地址* @param code 用戶輸入的驗證碼* @return true 表示校驗通過,false 為不通過*/boolean checkVerificationCode(String email, String code);/*** 判斷郵箱當前是否處于驗證碼限流狀態** @param email 郵箱地址* @return true 表示當前已限流,不可發送,false 表示未限流,可以發送*/boolean isVerificationCodeRateLimited(String email);
}
EmailServiceImpl 實現類? ?( 具體實現)
@Service
public class EmailServiceImpl implements EmailService {@Autowiredprivate ObjectMapper objectMapper;@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Value("${mail.verify-code.limit-expire-seconds}")private int limitExpireSeconds;@Overridepublic String sendVerificationCode(String email) {// ...}@Overridepublic boolean checkVerificationCode(String email, String code) {// ...}@Overridepublic boolean isVerificationCodeRateLimited(String email) {// ...}
}
分析
先來分析實現類中的三個注入的對象的用途:
redisTemplate:操作redis的接口,可以類比JDBC接口
limitExpireSeconds:application.yaml中的配置,表示一個email在發送一條郵件后,會被限流多長時間,才能發送第二條郵件,配置文件中是60秒,可以按照自己的需求修改
objectMapper:Jackson的接口對象,用于JSON和對象的相互轉換
sendVerificationCode方法和"EmailTaskConsumer"方法
到這里,我們可以開始實現一個"基于Redis消息隊列"的輕量級異步任務機制了。
消息隊列有兩個最基礎的部分:
1.生產者
2.消費者
從代碼層面來說,sendVerificationCode就是所謂的生產者。
生產者的工作步驟:
1.接收email參數,生成一個email對應的code
2.將email、code、以及當前的時間戳封裝為一個對象(EmailTask對象)
3.將EmailTask對象序列化為JSON字符串(Redis只支持存儲字符串)
4.將JSON字符串寫入到消息隊列中
5.防止重復請求,給郵箱設置一個限流鍵(這步可以不包含在sendVerificationCode中)
消費者的工作步驟:
1.每隔一段時間輪詢Redis,查看消息隊列中是否存在任務
2.將任務的JSON字符串從消息隊列中取出,將其反序列化為EmailTask對象
3.根據EmailTask對象中的字段,填充SimpleEmailMessage對象
4.JavaMailSender發送SimpleEmailMessage對象(調用第三方服務,發郵件給用戶)
到這里,生產者和消費者需要完成的步驟已經規劃完畢,接下來我們看具體代碼實現。
生產者部分:
首先是EmailTask對象,擁有三個字段
1.email字段:用戶的email
2.code:驗證碼
3.timestamp:時間戳
@Data
public class EmailTask {private String email;private String code;private long timestamp;
}
接下來是?sendVerificationCode 的實現
@Override
public String sendVerificationCode(String email) {// 檢查發送頻率if (isVerificationCodeRateLimited(email)) {throw new RuntimeException("驗證碼發送太頻繁,請 60 秒后重試");}// 生成6位隨機驗證碼String verificationCode = RandomCodeUtil.generateNumberCode(6);// 實現異步發送郵件的邏輯try {// 創建郵件任務EmailTask emailTask = new EmailTask();// 初始化郵件任務內容// 1. 郵件目的郵箱// 2. 驗證碼// 3. 時間戳emailTask.setEmail(email);emailTask.setCode(verificationCode);emailTask.setTimestamp(System.currentTimeMillis());// 將郵件任務存入消息隊列// 1. 將任務對象轉成 JSON 字符串// 2. 將 JSON 字符串保存到 Redis 模擬的消息隊列中String emailTaskJson = objectMapper.writeValueAsString(emailTask);String queueKey = RedisKey.emailTaskQueue();redisTemplate.opsForList().leftPush(queueKey, emailTaskJson);// 設置 email 發送注冊驗證碼的限制String emailLimitKey = RedisKey.registerVerificationLimitCode(email);redisTemplate.opsForValue().set(emailLimitKey, "1", limitExpireSeconds, TimeUnit.SECONDS);return verificationCode;} catch (Exception e) {log.error("發送驗證碼郵件失敗", e);throw new RuntimeException("發送驗證碼失敗,請稍后重試");}
}
消費者部分
EmailTaskConsumer? 的實現
@Component
public class EmailTaskConsumer {@Autowiredprivate JavaMailSender mailSender;@Autowiredprivate ObjectMapper objectMapper;@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Value("${spring.mail.username}")private String from;// 每 3 秒輪詢一次 redis,查看是否有待發的郵件任務@Scheduled(fixedDelay = 3000)public void resume() throws JsonProcessingException {String emailQueueKey = RedisKey.emailTaskQueue();// 從隊列中取任務對象while (true) {// 獲取任務對象String emailTaskJson = redisTemplate.opsForList().rightPop(emailQueueKey);if (emailTaskJson == null) { // 隊列中沒有任務對象,退出本次執行break;}// 將 redis 中的 JSON 字符串轉成 emailTask 對象EmailTask emailTask = objectMapper.readValue(emailTaskJson, EmailTask.class);String email = emailTask.getEmail();String verificationCode = emailTask.getCode();// 根據 emailTask 對象中的信息// 填充 SimpleMailMessage 對象,然后使用 JavaMailSender 發送郵件SimpleMailMessage mailMessage = new SimpleMailMessage();mailMessage.setFrom(from);mailMessage.setTo(email);mailMessage.setSubject("卡碼筆記- 驗證碼");mailMessage.setText("您的驗證碼是:" + verificationCode + ",有效期" + 5 + "分鐘,請勿泄露給他人。");mailSender.send(mailMessage);// 保存驗證碼到 Redis// 有效時間為 5 分鐘redisTemplate.opsForValue().set(RedisKey.registerVerificationCode(email), verificationCode, 5, TimeUnit.MINUTES);}}
}
checkVerificationCode? 的實現
@Override
public boolean checkVerificationCode(String email, String code) {String redisKey = RedisKey.registerVerificationCode(email);String verificationCode = redisTemplate.opsForValue().get(redisKey);if (verificationCode != null && verificationCode.equals(code)) {redisTemplate.delete(redisKey);return true;}return false;
}
checkVerificationCode的實現邏輯很簡單,根據用戶的email查詢出redis種存儲的code,和用戶輸入的code,比較是否相等。在驗證成功后需要將Redis中的記錄刪除。
isVerificationCodeRateLimited? ?實現
@Override
public boolean isVerificationCodeRateLimited(String email) {String redisKey = RedisKey.registerVerificationLimitCode(email);return redisTemplate.opsForValue().get(redisKey) != null;
}
檢查Redis中是否存在email對應的限流鍵,存在則返回true表示已被限流
2. 排行榜
Spring + Mysql + Redis? 異步更新
- 1.? 用戶提交筆記時,同步寫業務數據后異步發送一條消息到 Redis 列表(當作 MQ),消息是 JSON(包含 userId、score?增量、messageId、timestamp、noteId 等)。
- 2.? 后臺消費進程用 BRPOP/阻塞彈出消息,解析后對 Redis 的 ZSET?做增量更新(ZINCRBY),ZSET 用于排行榜查詢。
- 3.? 定時任務每小時把 Redis 中的?ZSET 批量持久化到 MySQL(通過 MyBatis?批量 upsert),以確保最終一致性和歷史存檔。
?
controller.java?
接收用戶提交并返回消息
@RestController
@RequestMapping("/notes")
public class NoteController {private final ProducerService producer;@PostMappingpublic ResponseEntity<?> submitNote(@RequestBody NoteSubmitDto dto){// 1. 處理業務寫入筆記表(略)// 2. 發送異步消息NoteMessage msg = new NoteMessage(...);producer.sendMessage(msg);return ResponseEntity.ok().build();}
}
Producer
(將消息 push 到 Redis 列表)
@Service
public class ProducerService {private static final String QUEUE = "mq:note:queue";private final RedisTemplate<String, String> redis;private final ObjectMapper mapper = new ObjectMapper();public ProducerService(RedisTemplate<String, String> redis){ this.redis = redis; }public void sendMessage(NoteMessage msg){try {String json = mapper.writeValueAsString(msg);redis.opsForList().leftPush(QUEUE, json);} catch (Exception e) {// 記錄失敗/監控}}
}
Consumer
(后臺阻塞消費,更新 ZSET)
@Service
public class RedisConsumerService {private static final String QUEUE = "mq:note:queue";private static final String DLQ = "mq:note:dlq";private final RedisTemplate<String, String> redis;private final LeaderboardService leaderboard;private final ObjectMapper mapper = new ObjectMapper();private volatile boolean running = true;public RedisConsumerService(RedisTemplate<String, String> redis, LeaderboardService leaderboard) {this.redis = redis; this.leaderboard = leaderboard;}@PostConstructpublic void start() {Thread t = new Thread(this::loop, "redis-mq-consumer");t.setDaemon(true);t.start();}private void loop() {while (running) {try {// 阻塞彈出(0 表示一直阻塞)String json = redis.opsForList().rightPop(QUEUE, 0, TimeUnit.SECONDS);if (json == null) continue;NoteMessage msg = mapper.readValue(json, NoteMessage.class);// 可做冪等校驗(基于 messageId)leaderboard.incrementScore(msg.getUserId(), msg.getDelta());} catch (Exception e) {// 解析或處理失敗 -> 推到死信隊列并記錄redis.opsForList().leftPush(DLQ, /* 原始消息 */);}}}@PreDestroypublic void stop() { running = false; }
}
LeaderboardService
(封裝 ZSET 操作)
@Service
public class LeaderboardService {private static final String ZKEY = "leaderboard:zset";private final RedisTemplate<String, String> redis;public LeaderboardService(RedisTemplate<String, String> redis){ this.redis = redis; }public void incrementScore(Long userId, double delta){redis.opsForZSet().incrementScore(ZKEY, userId.toString(), delta);}public Set<ZSetOperations.TypedTuple<String>> topN(int n){return redis.opsForZSet().reverseRangeWithScores(ZKEY, 0, n - 1);}public Set<ZSetOperations.TypedTuple<String>> rangeAllWithScores(){return redis.opsForZSet().rangeWithScores(ZKEY, 0, -1);}
}
DB 定時持久化
(每小時)
@Component
public class DbPersistScheduler {private final LeaderboardService leaderboardService;private final LeaderboardMapper mapper;public DbPersistScheduler(LeaderboardService leaderboardService, LeaderboardMapper mapper){this.leaderboardService = leaderboardService; this.mapper = mapper;}// 每小時執行一次(整點)@Scheduled(cron = "0 0 * * * ?")public void persistHourly() {// 讀取全部或 top K(注意數據量)Set<ZSetOperations.TypedTuple<String>> entries = leaderboardService.rangeAllWithScores();if (entries.isEmpty()) return;List<Leaderboard> list = entries.stream().map(t -> new Leaderboard(Long.valueOf(t.getValue()), t.getScore().longValue(), new Date())).collect(Collectors.toList());// 批量 upsert(MyBatis 實現)mapper.batchUpsert(list);}
}
?MyBatis Mapper?+ SQL
Mapper 接口:
Apply to HallScene.ts
public interface LeaderboardMapper {void batchUpsert(@Param("list") List<Leaderboard> list);// 可加查詢方法
}mapper XML(LeaderboardMapper.xml)<insert id="batchUpsert" parameterType="map">INSERT INTO leaderboard (user_id, score, updated_at)VALUES<foreach collection="list" item="item" separator=",">(#{item.userId}, #{item.score}, #{item.updatedAt})</foreach>ON DUPLICATE KEY UPDATEscore = VALUES(score),updated_at = VALUES(updated_at)
</insert>
- 冪等與重復消息:消息包含?messageId,消費端在?Redis/DB 中做冪等檢查(如?SETNX messageId:processed 或 Redis?Hash?記錄已處理?id)。
- 死信與重試:消費失敗推到?mq:note:dlq,人工或單獨重試流程處理。
- 批量與限流:ZSET?的全量持久化當數據量大時會耗時,建議:
- 只持久化 Top N(按需求),或分批(分頁?ZRANGE)。
- 按用戶分區并并行持久化。
- 持久化策略:可以每小時寫入 DB?覆蓋(ON DUPLICATE KEY),也可寫入增量表(記錄歷史)。
- 數據一致性:在關機或部署前優雅停止消費線程并將?Redis?中的變更 flush 到 DB。
- 性能:消費端更新 ZSET?為單條 Redis 請求,若高并發可用 pipeline 或本地批隊列合并多條消息后一起?ZINCRBY(減少網絡往返)。
- 監控:記錄隊列長度(LLEN)、consumer 錯誤率、持久化耗時,設置報警。