緩存原理
本文的業務就是redis的經典應用,標準的操作方式就是查詢數據庫之前先查詢緩存,如果緩存數據存在,則直接從緩存中返回,如果緩存數據不存在,再查詢數據庫,然后將數據存入redis。
緩存更新策略
根據id查詢店鋪時,如果緩存未命中,則查詢數據庫,將數據庫結果寫入緩存,并設置超時時間
根據id修改店鋪時,先修改數據庫,再刪除緩存
/*** 根據id查詢商鋪數據(查詢時,重建緩存)** @param id* @return*/@Overridepublic Result queryById(Long id) {String key = CACHE_SHOP_KEY + id;// 1、從Redis中查詢店鋪數據String shopJson = stringRedisTemplate.opsForValue().get(key);Shop shop = null;// 2、判斷緩存是否命中if (StrUtil.isNotBlank(shopJson)) {// 2.1 緩存命中,直接返回店鋪數據shop = JSONUtil.toBean(shopJson, Shop.class);return Result.ok(shop);}// 2.2 緩存未命中,從數據庫中查詢店鋪數據shop = this.getById(id);// 4、判斷數據庫是否存在店鋪數據if (Objects.isNull(shop)) {// 4.1 數據庫中不存在,返回失敗信息return Result.fail("店鋪不存在");}// 4.2 數據庫中存在,重建緩存,并返回店鋪數據stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);return Result.ok(shop);}/*** 更新商鋪數據(更新時,更新數據庫,刪除緩存)** @param shop* @return*/@Transactional@Overridepublic Result updateShop(Shop shop) {// 參數校驗, 略// 1、更新數據庫中的店鋪數據boolean f = this.updateById(shop);if (!f){// 緩存更新失敗,拋出異常,事務回滾throw new RuntimeException("數據庫更新失敗");}// 2、刪除緩存f = stringRedisTemplate.delete(CACHE_SHOP_KEY + shop.getId());if (!f){// 緩存刪除失敗,拋出異常,事務回滾throw new RuntimeException("緩存刪除失敗");}return Result.ok();}
緩存穿透
緩存穿透 :緩存穿透是指客戶端請求的數據在緩存中和數據庫中都不存在,這樣緩存永遠不會生效,這些請求都會打到數據庫。
簡單的解決方案是哪怕這個數據在數據庫中也不存在,我們也把這個數據存入到redis中去,下次用戶過來訪問這個不存在的數據,那么在redis中也能找到這個數據就不會進入到數據庫了。
布隆過濾器采用的是哈希思想來解決這個問題,通過一個龐大的二進制數組判斷當前這個要查詢的這個數據是否存在,如果布隆過濾器判斷存在,則放行,這個請求會去訪問redis,哪怕此時redis中的數據過期了,但是數據庫中一定存在這個數據,在數據庫中查詢出來這個數據后,再將其放入到redis中,假設布隆過濾器判斷這個數據不存在,則直接返回。
這種方式優點在于節約內存空間,存在誤判,誤判原因在于:布隆過濾器走的是哈希思想,只要哈希思想,就可能存在哈希沖突。
商品查詢的緩存穿透解決
這里采用上述第一種方法:
在原來的邏輯中,我們如果發現這個數據在mysql中不存在,直接就返回404了,這樣是會存在緩存穿透問題的
現在的邏輯中:如果這個數據不存在,我們不會返回404 ,還是會把這個數據寫入到Redis中,并且將value設置為空,歐當再次發起查詢時,我們如果發現命中之后,判斷這個value是否是null,如果是null,則是之前寫入的數據,證明是緩存穿透數據,如果不是,則直接返回數據。
緩存雪崩
緩存雪崩是指在同一時段大量的緩存key同時失效或者Redis服務宕機,導致大量請求到達數據庫,帶來巨大壓力。
緩存擊穿問題也叫熱點Key問題,就是一個被高并發訪問并且緩存重建業務較復雜的key突然失效了,無數的請求訪問會在瞬間給數據庫帶來巨大的沖擊。
具體場景中,假設線程1在查詢緩存之后,本來應該去查詢數據庫,然后把這個數據重新加載到緩存的,此時只要線程1走完這個邏輯,其他線程就都能從緩存中加載這些數據了,但是假設在線程1沒有走完的時候,后續的線程2,線程3,線程4同時過來訪問當前這個方法, 那么這些線程都不能從緩存中查詢到數據,那么他們就會同一時刻來訪問查詢緩存,都沒查到,接著同一時間去訪問數據庫,同時的去執行數據庫代碼,對數據庫訪問壓力過大
因為鎖能實現互斥性。假設線程過來,只能一個人一個人的來訪問數據庫,從而避免對于數據庫訪問壓力過大,但這也會影響查詢的性能,因為此時會讓查詢的性能從并行變成了串行,我們可以采用tryLock方法 + double check來解決這樣的問題。
假設現在線程1過來訪問,他查詢緩存沒有命中,但是此時他獲得到了鎖的資源,那么線程1就會一個人去執行邏輯,假設現在線程2過來,線程2在執行過程中,并沒有獲得到鎖,那么線程2就可以進行到休眠,直到線程1把鎖釋放后,線程2獲得到鎖,然后再來執行邏輯,此時就能夠從緩存中拿到數據了。
我們之所以會出現這個緩存擊穿問題,主要原因是在于我們對key設置了過期時間,假設我們不設置過期時間,其實就不會有緩存擊穿的問題,但是不設置過期時間,這樣數據不就一直占用內存了,我們可以采用邏輯過期方案。
我們把過期時間設置在 redis的value中,注意:這個過期時間并不會直接作用于redis,而是我們后續通過邏輯去處理。假設線程1去查詢緩存,然后從value中判斷出來當前的數據已經過期了,此時線程1去獲得互斥鎖,那么其他線程會進行阻塞,獲得了鎖的線程他會開啟一個 線程去進行 以前的重構數據的邏輯,直到新開的線程完成這個邏輯后,才釋放鎖, 而線程1直接進行返回,假設現在線程3過來訪問,由于線程線程2持有著鎖,所以線程3無法獲得鎖,線程3也直接返回數據,只有等到新開的線程2把重建數據構建完后,其他線程才能走返回正確的數據。
商品查詢的緩存雪崩解決
相較于原來從緩存中查詢不到數據后直接查詢數據庫而言,現在的方案是進行查詢之后,如果從緩存沒有查詢到數據,則進行互斥鎖的獲取,獲取互斥鎖后,判斷是否獲得到了鎖,如果沒有獲得到,則休眠,過一會再進行嘗試,直到獲取到鎖為止,才能進行查詢。如果獲取到了鎖的線程,再去進行查詢,查詢后將數據寫入redis,再釋放鎖,返回數據,利用互斥鎖就能保證只有一個線程去執行操作數據庫的邏輯,防止緩存擊穿。
public Shop queryWithMutex(Long id) {String key = CACHE_SHOP_KEY + id;// 1、從redis中查詢商鋪緩存String shopJson = stringRedisTemplate.opsForValue().get("key");// 2、判斷是否存在if (StrUtil.isNotBlank(shopJson)) {// 存在,直接返回return JSONUtil.toBean(shopJson, Shop.class);}//判斷命中的值是否是空值if (shopJson != null) {//返回一個錯誤信息return null;}// 4.實現緩存重構//4.1 獲取互斥鎖String lockKey = "lock:shop:" + id;Shop shop = null;try {boolean isLock = tryLock(lockKey);// 4.2 判斷否獲取成功if(!isLock){//4.3 失敗,則休眠重試Thread.sleep(50);return queryWithMutex(id);}//4.4 成功,根據id查詢數據庫shop = getById(id);// 5.不存在,返回錯誤if(shop == null){//將空值寫入redisstringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);//返回錯誤信息return null;}//6.寫入redisstringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_NULL_TTL,TimeUnit.MINUTES);}catch (Exception e){throw new RuntimeException(e);}finally {//7.釋放互斥鎖unlock(lockKey);}return shop;}
當用戶開始查詢redis時,判斷是否命中,如果沒有命中則直接返回空數據,不查詢數據庫,而一旦命中后,將value取出,判斷value中的過期時間是否滿足,如果沒有過期,則直接返回redis中的數據,如果過期,則在開啟獨立線程后直接返回之前的數據,獨立線程去重構數據,重構完成后釋放互斥鎖。
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public Shop queryWithLogicalExpire( Long id ) {String key = CACHE_SHOP_KEY + id;// 1.從redis查詢商鋪緩存String json = stringRedisTemplate.opsForValue().get(key);// 2.判斷是否存在if (StrUtil.isBlank(json)) {// 3.存在,直接返回return null;}// 4.命中,需要先把json反序列化為對象RedisData redisData = JSONUtil.toBean(json, RedisData.class);Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);LocalDateTime expireTime = redisData.getExpireTime();// 5.判斷是否過期if(expireTime.isAfter(LocalDateTime.now())) {// 5.1.未過期,直接返回店鋪信息return shop;}// 5.2.已過期,需要緩存重建// 6.緩存重建// 6.1.獲取互斥鎖String lockKey = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);// 6.2.判斷是否獲取鎖成功if (isLock){CACHE_REBUILD_EXECUTOR.submit( ()->{try{//重建緩存this.saveShop2Redis(id,20L);}catch (Exception e){throw new RuntimeException(e);}finally {unlock(lockKey);}});}// 6.4.返回過期的商鋪信息return shop;
}
包裝處緩存類
基于上面兩種問題,可以包裝下原生的StringRedisTemplate:
@Component
public class CacheClient {private final StringRedisTemplate stringRedisTemplate;private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);public CacheClient(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public void set(String key, Object value, Long time, TimeUnit unit) {stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);}public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit) {// 設置邏輯過期RedisData redisData = new RedisData();redisData.setData(value);redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));// 寫入RedisstringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));}public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit){String key = keyPrefix + id;// 1.從redis查詢商鋪緩存String json = stringRedisTemplate.opsForValue().get(key);// 2.判斷是否存在if (StrUtil.isNotBlank(json)) {// 3.存在,直接返回return JSONUtil.toBean(json, type);}// 判斷命中的是否是空值if (json != null) {// 返回一個錯誤信息return null;}// 4.不存在,根據id查詢數據庫R r = dbFallback.apply(id);// 5.不存在,返回錯誤if (r == null) {// 將空值寫入redisstringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);// 返回錯誤信息return null;}// 6.存在,寫入redisthis.set(key, r, time, unit);return r;}public <R, ID> R queryWithLogicalExpire(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {String key = keyPrefix + id;// 1.從redis查詢商鋪緩存String json = stringRedisTemplate.opsForValue().get(key);// 2.判斷是否存在if (StrUtil.isBlank(json)) {// 3.存在,直接返回return null;}// 4.命中,需要先把json反序列化為對象RedisData redisData = JSONUtil.toBean(json, RedisData.class);R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);LocalDateTime expireTime = redisData.getExpireTime();// 5.判斷是否過期if(expireTime.isAfter(LocalDateTime.now())) {// 5.1.未過期,直接返回店鋪信息return r;}// 5.2.已過期,需要緩存重建// 6.緩存重建// 6.1.獲取互斥鎖String lockKey = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);// 6.2.判斷是否獲取鎖成功if (isLock){// 6.3.成功,開啟獨立線程,實現緩存重建CACHE_REBUILD_EXECUTOR.submit(() -> {try {// 查詢數據庫R newR = dbFallback.apply(id);// 重建緩存this.setWithLogicalExpire(key, newR, time, unit);} catch (Exception e) {throw new RuntimeException(e);}finally {// 釋放鎖unlock(lockKey);}});}// 6.4.返回過期的商鋪信息return r;}public <R, ID> R queryWithMutex(String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) {String key = keyPrefix + id;// 1.從redis查詢商鋪緩存String shopJson = stringRedisTemplate.opsForValue().get(key);// 2.判斷是否存在if (StrUtil.isNotBlank(shopJson)) {// 3.存在,直接返回return JSONUtil.toBean(shopJson, type);}// 判斷命中的是否是空值if (shopJson != null) {// 返回一個錯誤信息return null;}// 4.實現緩存重建// 4.1.獲取互斥鎖String lockKey = LOCK_SHOP_KEY + id;R r = null;try {boolean isLock = tryLock(lockKey);// 4.2.判斷是否獲取成功if (!isLock) {// 4.3.獲取鎖失敗,休眠并重試Thread.sleep(50);return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit);}// 4.4.獲取鎖成功,根據id查詢數據庫r = dbFallback.apply(id);// 5.不存在,返回錯誤if (r == null) {// 將空值寫入redisstringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);// 返回錯誤信息return null;}// 6.存在,寫入redisthis.set(key, r, time, unit);} catch (InterruptedException e) {throw new RuntimeException(e);}finally {// 7.釋放鎖unlock(lockKey);}// 8.返回return r;}private boolean tryLock(String key) {Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(flag);}private void unlock(String key) {stringRedisTemplate.delete(key);}
}
總結
這一部分主要在查詢商戶的場景下分析了緩存的更新、穿透和雪崩的問題,最后給出一個實際場景中的實用類