文章目錄
- 前言
- 一、本地緩存與分布式緩存
- 1.1 使用緩存
- 1.2 本地緩存
- 1.3 本地模式在分布式下的問題
- 1.4 分布式緩存
- 二、整合redis測試
- 2.1 引入依賴
- 2.2 配置信息
- 2.3 測試
- 三、改造三級分類業務
- 3.1 代碼改造
- 四、高并發下緩存失效問題
- 4.1 緩存穿透
- 4.2 緩存雪崩
- 4.3 緩存擊穿
- 五、分布式下加鎖
- 5.1 分布式鎖示意圖
- 5.2 鎖的時序問題
前言
本文繼續記錄B站谷粒商城項目視頻 P151-157 的內容,做到知識點的梳理和總結的作用。
一、本地緩存與分布式緩存
1.1 使用緩存
為了系統性能的提升,我們一般都會將部分數據放入緩存中,加速訪問。而 db 承擔數據落盤工作。
哪些數據適合放入緩存?
- 即時性、數據一致性要求不高的
- 訪問量大且更新頻率不高的數據(讀多,寫少)
舉例:電商類應用,商品分類,商品列表等適合緩存并加一個失效時間(根據數據更新頻率來定),后臺如果發布一個商品,買家需要 5 分鐘才能看到新的商品一般還是可以接受的。
偽代碼
data = redisTemplate.opsForValue().get(redisKey);//從緩存加載數據
If(data == null){//緩存中沒有則從數據庫加載數據data = db.getDataFromDB(id);//保存到 cache 中redisTemplate.opsForValue().set(redisKey,data);
}
return data;
1.2 本地緩存
在單體項目中,我們可以使用 Map 集合存儲數據作為項目的本地緩存,因為 Map 數據是存儲與內存的,相比于數據庫查詢要從磁盤加載到內存有著更高的效率。
1.3 本地模式在分布式下的問題
但是在分布式情況下這種情況就不再適用了,每個微服務可能部署在多臺機器上,每個機器上有各自的緩存 Map 對象,會導致數據不一致的問題。
1.4 分布式緩存
所以應該將數據緩存在同一個緩存中間件中,才能保證數據一致性問題
二、整合redis測試
2.1 引入依賴
<!-- 緩存中間件redis依賴-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.2 配置信息
spring:redis:host: 192.168.57.129port: 6379
2.3 測試
@Autowired
StringRedisTemplate redisTemplate;@Test
public void testRedis() {//存儲redisTemplate.opsForValue().set("HELLO_REDIS", "SpringBoot!");//獲取String value = redisTemplate.opsForValue().get("HELLO_REDIS");System.out.println(value);
}
三、改造三級分類業務
3.1 代碼改造
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson() {//給緩存中放json字符串,拿出的json字符串,還用逆轉為能用的對象類型:【序列化與反序列化】/*** 1、空結果緩存:解決緩存穿透* 2、設置過期時間(加隨機值):解決緩存雪崩* 3、加鎖:解決緩存擊穿*///1、加入緩存邏輯,緩存中存的數據是json字符串。//JSON跨語言,跨平臺兼容。String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");if (StringUtils.isEmpty(catalogJSON)) {//2、緩存中沒有,查詢數據庫//保證數據庫查詢完成以后,將數據放在redis中,這是一個原子操作。log.info("緩存不命中....將要查詢數據庫...");Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDB();String result = JSON.toJSONString(catalogJsonFromDb);redisTemplate.opsForValue().set("catalogJSON",result);}log.info("緩存命中....直接返回....");//轉為我們指定的對象。return JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
}
四、高并發下緩存失效問題
4.1 緩存穿透
解決方案1:null 結果放入緩存,并加入短暫的過期時間
偽代碼
//從緩存加載數據
data = redisTemplate.opsForValue().get(redisKey);
If(data == null){//緩存中沒有則從數據庫加載數據data = db.getDataFromDB(id);if(data == null) {//空結果保存到 cache 中redisTemplate.opsForValue().set(redisKey,null,300,TimeUnit.SECONDS);}else {//保存到 cache 中redisTemplate.opsForValue().set(redisKey,data);}
}
return data;
解決方案2:使用布隆過濾器
這種技術在緩存之前再加一層屏障,里面存儲目前數據庫中存在的所有key。當業務系統有查詢請求的時候,首先去BloomFilter中查詢該key是否存在。若不存在,則說明數據庫中也不存在該數據,因此緩存都不要查了,直接返回null。若存在,則繼續執行后續的流程,先前往緩存中查詢,緩存中沒有的話再前往數據庫中的查詢。偽代碼如下:
String get(String key) {String value = redis.get(key); if (value == null) {if(!bloomfilter.mightContain(key)){//不存在則返回return null; }else{//可能存在則查數據庫value = db.get(key); redis.set(key, value); } }return value;
}
布隆過濾器示意圖
4.2 緩存雪崩
4.3 緩存擊穿
五、分布式下加鎖
5.1 分布式鎖示意圖
本地鎖,只能鎖住當前進程,所以我們需要分布式鎖。
5.2 鎖的時序問題
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson() {//給緩存中放json字符串,拿出的json字符串,還用逆轉為能用的對象類型:【序列化與反序列化】/*** 1、空結果緩存:解決緩存穿透* 2、設置過期時間(加隨機值):解決緩存雪崩* 3、加鎖:解決緩存擊穿*///1、加入緩存邏輯,緩存中存的數據是json字符串。//JSON跨語言,跨平臺兼容。String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");if (StringUtils.isEmpty(catalogJSON)) {//2、緩存中沒有,查詢數據庫//保證數據庫查詢完成以后,將數據放在redis中,這是一個原子操作。log.info("緩存不命中....將要查詢數據庫...");Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDB();return catalogJsonFromDb;}log.info("緩存命中....直接返回....");//轉為我們指定的對象。return JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
}
查詢數據庫后將結果放入緩存,保證這是一個原子性操作,防止多個線程查詢數據庫而導致日志輸出多個查詢了數據庫…
//從數據庫查詢并封裝分類數據
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDB() {//只要同一把鎖,就能鎖住需要這個鎖的所有線程//synchronized (this):springBoot所有組件在容器中都是單實例的//TODO 本地鎖:synchronized JUC(Lock) 在分布式情況下只能使用分布式鎖才能鎖住資源synchronized (this) {//得到鎖以后,我們應該再去緩存中確定一次,如果沒有才需要繼續查詢String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");if (!StringUtils.isEmpty(catalogJSON)) {return JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});}log.info("查詢了數據庫....");//1、將數據庫的多次查詢變為一次,查詢所有分類信息List<CategoryEntity> selectList = baseMapper.selectList(null);//1、查出所有1級分類List<CategoryEntity> level1Categorys = getParent_cid(selectList, 0L);//2、封裝數據Map<String, List<Catelog2Vo>> parent_cid = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {//1、每一個的一級分類,查到這個一級分類的二級分類List<CategoryEntity> categoryEntities = getParent_cid(selectList, v.getCatId());//2、封裝上面的結果List<Catelog2Vo> catelog2Vos = null;if (categoryEntities != null) {catelog2Vos = categoryEntities.stream().map(l2 -> {Catelog2Vo catelog2Vo = new Catelog2Vo(v.getCatId().toString(), null, l2.getCatId().toString(), l2.getName());//1、找當前二級分類的三級分類封裝成voList<CategoryEntity> level3Catelog = getParent_cid(selectList, l2.getCatId());if (level3Catelog != null) {List<Catelog2Vo.Catelog3Vo> collect = level3Catelog.stream().map(l3 -> {//2、封裝成指定格式Catelog2Vo.Catelog3Vo catelog3Vo = new Catelog2Vo.Catelog3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName());return catelog3Vo;}).collect(Collectors.toList());catelog2Vo.setCatalog3List(collect);}return catelog2Vo;}).collect(Collectors.toList());}return catelog2Vos;}));String result = JSON.toJSONString(parent_cid);redisTemplate.opsForValue().set("catalogJSON",result,1,TimeUnit.DAYS);return parent_cid;}
}
壓力測試結果:日志只輸出一個查詢了數據庫…,表面只有一個線程查詢了數據庫。