今天我們來做一個典型的消費力度能達到萬級別的并發場景,老師點名-學生簽到
正常情況
正常情況來說是不同班級下的老師發布不同的點名--然后不同班級下的很多學生同一時間進行簽到,簽到成功就去修改數據庫,簽到失敗就返回,但是這樣的話 簽到的學生一多,數據庫修改每一行的內容,都會加上行鎖,那么改的多了,數據庫很可能出現卡頓的情況,導致學生明明在規定時間內簽到了,但是卻出現簽到結束的情況,或者說出現其他的冗余簽到的情況,這樣顯然是不希望我們看到的,也不希望學生看到
并發級處理
怎么解決前面的那種簽到錯誤的場景呢?
那么當然就是傳統級別的 面對并發情況下的重拳三連了哈哈哈
mysql-redis-rabbitMq
首先 我們這個業務需要怎么寫?
redis的key怎么選擇,學生的key怎么選都是一個問題,下面我們來一一的進行分析
MySQL表的業務數據關聯
因為我們是測試demo,所以我們只做出了關鍵的表結構關聯,像老師表我們是沒有做的
看上圖,首先我們最頂部有一個課程表,寫的有一個課程id和名稱,還有還有學生表,學生表和課程表之間有一個中間的表關聯,叫學生課程表(student-courses),然后我們老師點名的時候是屬于課堂活動表,里面記錄的課堂的活動,比如點名和提問,這個表(class_activities)與課程表關聯,最后的是每一個學生在該課程下的做出的課堂活動,也就是學生活動表(student-activities),她關聯了學生表,課堂活動表和課程表。
主要流程
老師發布點名,然后課堂互動表記錄一條會過期的課堂活動,狀態是進行中,然后學生簽到,簽到之后,找到該課程下的該簽到過的學生,像學生活動表中添加一條簽到過的數據
Redis業務
在redis方面,我們主要做的就是對學生簽到數據的存儲,對老師發布的簽到數據的存儲
我們知道 redis的string的數據類型是比較占用空間的,所以對于我們單個的老師發布的簽到數據,我們可以用string類型,對于不同班級下的多個學生的簽到情況,我們可以用hash結構 ,因為對于ihash結構,我們的數據一般是使用ziplist壓縮,更省空間
RabbitMQ業務
我們mq主要做的就是讀取redis中的簽到過的學生數據,然后把學生數據做一個異步寫入mysql,這樣減緩簽到高峰時段mysql的壓力
我們mq首先從redis中查到簽到過的學生數據,然后跟該課程下的學生數據做對比,如果該課程下學生有數據,redis中學生簽到無數據,那么該學生就是未簽到
如果簽到,就把簽到數據存入數據庫
總體代碼
老師點名-學生簽到
package com.example.tabledemo.controller;import cn.hutool.core.date.DateTime;
import cn.hutool.core.util.RandomUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.tabledemo.config.RabbitConfig;
import com.example.tabledemo.generator.service.ClassActivitiesService;
import com.example.tabledemo.generator.service.CourseService;
import com.example.tabledemo.pojo.Result;
import com.example.tabledemo.pojo.entity.ClassActivitiesEntity;
import com.example.tabledemo.pojo.entity.CourseEntity;
import com.example.tabledemo.pojo.request.ClassActivitiesRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Objects;import static cn.hutool.core.date.DateTime.now;/*** @Author: wyz* @Date: 2025-04-08-16:17* @Description:課堂活動*/
@Slf4j
@RequiredArgsConstructor
@RestController
@RequestMapping("/class/activities")
public class ClassActivitiesController {private final ClassActivitiesService classActivitiesService;private final CourseService courseService;private final StringRedisTemplate redisTemplate;private final RabbitTemplate rabbitTemplate;/*** 老師點名*/@PostMapping("/teacher/rollCall")public Result teacherRollCall(@RequestBody ClassActivitiesRequest.TeacherRollCall teacherRollCall) {//判斷是否有課程CourseEntity course = courseService.getById(teacherRollCall.getCourseId());if (Objects.isNull(course)) {return Result.fail("沒有該課程");}//查看該課程下是否有點名活動LambdaQueryWrapper<ClassActivitiesEntity> eq = Wrappers.lambdaQuery(ClassActivitiesEntity.class).eq(ClassActivitiesEntity::getCourseId, teacherRollCall.getCourseId()).eq(ClassActivitiesEntity::getActiveType, 1).eq(ClassActivitiesEntity::getActiveStatus, 0);ClassActivitiesEntity one = classActivitiesService.getOne(eq);if(!Objects.isNull(one)){return Result.fail("該課程已存在點名,請勿重復點名");}//生成簽到碼//// String signCode = RandomUtil.randomNumbers(4);String signCode = "1234";ClassActivitiesEntity classActivitiesEntity = new ClassActivitiesEntity();classActivitiesEntity.setCourseId(teacherRollCall.getCourseId());// 獲取當前時間DateTime now = now();classActivitiesEntity.setStartTime(now);// 使用Calendar計算未來時間Calendar calendar = Calendar.getInstance();calendar.setTime(now);calendar.add(Calendar.SECOND, teacherRollCall.getSignSeconds());Date endTime = calendar.getTime();classActivitiesEntity.setEndTime(endTime);classActivitiesEntity.setActiveType(1);classActivitiesEntity.setActiveStatus(0);//課堂活動存入數據庫boolean save = classActivitiesService.save(classActivitiesEntity);//redis中生成簽到碼的keyString signCodeKey = "sign_" + teacherRollCall.getCourseId() + "_" + signCode;redisTemplate.opsForValue().set(signCodeKey, signCode);//發給rabbitmq 延遲隊列 讓延遲隊列處理 最終的簽到情況//1. 學生查看課堂的活動的信息 應該在 課堂活動表中查看//2. 延遲隊列處理 簽到結束后的情況HashMap<Object, Object> map = new HashMap<>();map.put("course_id", teacherRollCall.getCourseId());map.put("class_activities_id", classActivitiesEntity.getId());map.put("sign_code", signCode);rabbitTemplate.convertAndSend(RabbitConfig.ROLL_CALL_DEAD_EXCHANGE, RabbitConfig.ROLL_CALL_DEAD_ROUTING_KEY, map, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(teacherRollCall.getSignSeconds()*1000);return message;}});return Result.success("發布簽到成功",signCode);}/*** 學生簽到*/@PostMapping("/student/sign")public Result studentSign(@RequestBody ClassActivitiesRequest.StudentSign studentSign) {//判斷該學生是否在班級當中//這里我們不判斷 知道就行String signCodeKey = "sign_" + studentSign.getCourseId() + "_" + studentSign.getSignCode();//不為空 證明有該簽到String signCode = redisTemplate.opsForValue().get(signCodeKey);if (!Objects.isNull(signCode)) {if (!signCode.equals(studentSign.getSignCode())) {return Result.fail("簽到碼錯誤,簽到失敗");}//學生簽到keyString studentSignKey="student_sign_"+studentSign.getStudentId();if(redisTemplate.opsForHash().hasKey("h"+signCodeKey,studentSignKey)){return Result.fail("您已經簽到成功,請勿重復簽到");}//value正常應該是 簽到時間 我們換成簽到碼redisTemplate.opsForHash().put("h"+signCodeKey,studentSignKey,signCode);return Result.success("簽到成功");} else {return Result.fail("簽到已過期或已被刪除");}}
}
mq配置
package com.example.tabledemo.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author: wyz* @Date: 2025-04-08-17:19* @Description:*/
@Configuration
public class RabbitConfig {@Beanpublic MessageConverter messageConverter() {// 定義消息轉換器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();return jackson2JsonMessageConverter;}// //點名延遲交換機
// public static final String ROLL_CALL_EXCHANGE = "roll_call_exchange";
// //點名延遲隊列
// public static final String ROLL_CALL_QUEUE = "roll_call_queue";//點名死信交換機public static final String ROLL_CALL_DEAD_EXCHANGE = "roll_call_dead_exchange";//點名死信隊列public static final String ROLL_CALL_DEAD_QUEUE = "roll_call_dead_queue";public static final String ROLL_CALL_DEAD_ROUTING_KEY = "roll_call";/*** 綁定 點名消息隊列 -> 點名私信交換機->點名私信隊列** @return*/
// @Bean
// public Queue bindMsgDeadQueue() {
// return QueueBuilder.durable(ROLL_CALL_QUEUE)
// .deadLetterExchange(ROLL_CALL_DEAD_EXCHANGE)
// .deadLetterRoutingKey(ROLL_CALL_DEAD_ROUTING_KEY)
//
// .build();
// }
//
//
//
//
// /**
// * 聲明點名交換機
// */
// @Bean
// Exchange rollCallExchange() {
// return ExchangeBuilder.directExchange(ROLL_CALL_EXCHANGE)
// .durable(true)
// .build();
// }
//
// /**
// * 綁定 點名 交換機隊列
// */
// @Bean
// Binding bingingRollCallExchangeQueue() {
// return BindingBuilder.bind(bindMsgDeadQueue())
// .to(rollCallExchange())
// .with(ROLL_CALL_DEAD_ROUTING_KEY).noargs();
// }/*** 聲明點名死信隊列*/@BeanQueue rollCallDeadQueue() {return QueueBuilder.durable(ROLL_CALL_DEAD_QUEUE).build();}/*** 聲明點名 死信交換機*/@BeanExchange rollCallDeadExchange() {return ExchangeBuilder.directExchange(ROLL_CALL_DEAD_EXCHANGE).delayed().durable(true).build();}/*** 綁定點名 私信交換機隊列*/@BeanBinding bindingRollCallExchangeQueue() {return BindingBuilder.bind(rollCallDeadQueue()).to(rollCallDeadExchange()).with(ROLL_CALL_DEAD_ROUTING_KEY).noargs();}}
消費者配置
package com.example.tabledemo.consumer;import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.tabledemo.config.RabbitConfig;
import com.example.tabledemo.generator.service.ClassActivitiesService;
import com.example.tabledemo.generator.service.StudentActivitiesService;
import com.example.tabledemo.pojo.entity.ClassActivitiesEntity;
import com.example.tabledemo.pojo.entity.StudentActivitiesEntity;
import com.example.tabledemo.student.StudentCoursesEntity;
import com.example.tabledemo.student.service.StudentCoursesService;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.io.IOException;import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;import static java.time.LocalTime.now;/*** @Author: wyz* @Date: 2025-04-08-20:40* @Description:處理學生簽到的消費者*/
@Component
@Slf4j
public class SignConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate StudentCoursesService studentCoursesService;@Autowiredprivate ClassActivitiesService classActivitiesService;@Autowiredprivate StudentActivitiesService studentActivitiesService;@RabbitListener(queues = RabbitConfig.ROLL_CALL_DEAD_QUEUE)@RabbitHandler// 直接引用隊列名public void studentSignConsumer(HashMap<Object, Object> map, Channel channel, Message message) throws IOException {try {log.info(now() + "----------老師點名延遲消息處理開始----------");//解析消息Integer courseId = (Integer) map.get("course_id");Integer classActivitiesId = (Integer) map.get("class_activities_id");String signCode = (String) map.get("sign_code");//業務冪等性判斷ClassActivitiesEntity byId = classActivitiesService.getById(classActivitiesId);//證明已經消費過了 本來是額外存的這里 只用狀態判斷if(byId.getActiveStatus()==1){return;}//拿到redis中的學生簽到數據String signCodeKey = "sign_" + courseId + "_" + signCode;Map<Object, Object> studentSignMap = redisTemplate.opsForHash().entries("h" + signCodeKey);//課堂活動狀態改為已經結束LambdaUpdateWrapper<ClassActivitiesEntity> eq1 = Wrappers.lambdaUpdate(ClassActivitiesEntity.class).set(ClassActivitiesEntity::getActiveStatus, 1).eq(ClassActivitiesEntity::getId, classActivitiesId);classActivitiesService.update(eq1);//學生簽到key//String studentSignKey="student_sign_"+studentSign.getStudentId();List<Integer> studentSignIdList = studentSignMap.entrySet().stream().map(i -> {String studentSignKey = (String) i.getKey();log.info("學生信息為{}", studentSignKey);Integer studentId = Integer.valueOf(studentSignKey.split("_")[2]);log.info("學生id為{}", studentId);return studentId;}).collect(Collectors.toList());//查出該課程下 的所有學生idLambdaQueryWrapper<StudentCoursesEntity> eq = Wrappers.lambdaQuery(StudentCoursesEntity.class).eq(StudentCoursesEntity::getCourseId, courseId);List<StudentCoursesEntity> list = studentCoursesService.list(eq);List<Integer> studentIds = list.stream().map(i -> i.getStudentId()).collect(Collectors.toList());//正常是 會有課程狀態 課程結課什么的 ,這里我們模擬 不做處理ArrayList<StudentActivitiesEntity> studentActivitiesEntities = new ArrayList<>();studentIds.stream().forEach(studentId -> {StudentActivitiesEntity studentActivitiesEntity = new StudentActivitiesEntity();studentActivitiesEntity.setStudentId(studentId);studentActivitiesEntity.setClassActivitiesId(classActivitiesId);studentActivitiesEntity.setCourseId(courseId);studentActivitiesEntity.setStudentActivitiesStatus(0);if (studentSignIdList.contains(studentId)) {log.info("有學生簽到了");studentActivitiesEntity.setStudentActivitiesStatus(1);}studentActivitiesEntities.add(studentActivitiesEntity);});//構建學生活動表的數據studentActivitiesService.saveBatch(studentActivitiesEntities);//刪除redis數據redisTemplate.delete(signCodeKey);redisTemplate.delete("h" + signCodeKey);//true 和false 代表著 是否 確認該條消息之前的 true 是確認 false 不確認// 假設隊列中有消息 deliveryTag=5,6,7 現在是6// 結果:僅消息6被確認刪除,消息5和7仍在隊列中channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info(now() + "----------老師點名延遲消息處理結束----------");} catch (Exception e) {Boolean redelivered = message.getMessageProperties().getRedelivered();if (redelivered) {log.info(now() + "----------老師點名延遲消息處理異常,已被重新投遞,丟棄消息----------");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {log.info(now() + "----------老師點名延遲消息處理異常,消息重新投遞----------");channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}throw e;}}
}
測試流程
接口測試
jmeter 壓測
數據庫數據查看?
可見 已經測試成功了?