1、分布式鎖
1.1、超賣問題
/*** 存在庫存超賣的不安全問題*/private void deductStock() {int stockTotal = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));if (stockTotal > 0) { // 這里存在多個線程、進程同時判斷通過,然后超買的問題int realStock = stockTotal - 1;stringRedisTemplate.opsForValue().set("stock", realStock + "");System.out.println("扣減成功,剩余庫存:" + realStock);} else {System.out.println("扣減失敗,庫存不足");}}
1.2、線程鎖
/*** 單進程線程安全,但是多進程不安全(分布式不安全)*/private void threadStock() {synchronized (this) { // 同一進程內串行執行,因此僅在同一個進程內線程安全int stockTotal = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));if (stockTotal > 0) {int realStock = stockTotal - 1;stringRedisTemplate.opsForValue().set("stock", realStock + "");System.out.println("扣減成功,剩余庫存:" + realStock);} else {System.out.println("扣減失敗,庫存不足");}}}
1.3、分布式鎖
1.3.1、基礎版
借用setnx(set if not exists)實現分布式鎖
setnx的作用是:當目標key不存在時才會設置成功,否則設置失敗。這樣的話當多個進程同時嘗試set同一個key時,同一時刻只能有一個成功。
/*** 一般添加分布式鎖的方式*/private void stockLock1() {// id為10086的商品String lockKey = "stock:10086";// setIfAbsent其實就是setnx的實現Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "locked");// 設置成功才會返回trueif (!result) {throw new RuntimeException("該商品正在被其他買家扣減庫存,請稍后再試~");}try {int stockTotal = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));if (stockTotal > 0) {int realStock = stockTotal - 1;stringRedisTemplate.opsForValue().set(lockKey, realStock + "");System.out.println("扣減成功,剩余庫存:" + realStock);} else {System.out.println("扣減失敗,庫存不足");}} finally {// 釋放鎖stringRedisTemplate.delete(lockKey);}}
這種方式存在一個死鎖問題,如果進程還沒執行到finally代碼塊中釋放分布式鎖的邏輯時就宕機了,會導致分布式鎖一直無法釋放。
1.3.2、帶過期時間
/*** 帶超時時間的分布式鎖的方式*/private void stockLock2() {// id為10086的商品String lockKey = "stock:10086";// 設置10秒的超時時間,10秒后自動釋放鎖// 這里需要注意,這里是通過一行命令執行的加鎖 + 超時,而不是2行。 因為如果放開為2行也會有剛加上鎖,還沒來得及設置超時就宕機成為死鎖的情況。Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "locked", 10, TimeUnit.SECONDS);// 設置成功才會返回trueif (!result) {throw new RuntimeException("該商品正在被其他買家扣減庫存,請稍后再試~");}try {int stockTotal = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));if (stockTotal > 0) {int realStock = stockTotal - 1;stringRedisTemplate.opsForValue().set(lockKey, realStock + "");System.out.println("扣減成功,剩余庫存:" + realStock);} else {System.out.println("扣減失敗,庫存不足");}} finally {// 釋放鎖stringRedisTemplate.delete(lockKey);}}
這里也有一個問題,如果業務在10s內沒執行完,鎖也釋放了。這樣也會出現不安全的問題。
1.3.3、鎖續命(watch dog)
針對上面的情況,可以在加鎖成功后,進程單獨開啟一個線程,每隔幾秒的時間檢測一下鎖是否還持有,如果持有就去將鎖的過期時間重置
。這樣就不會出現進程還持有鎖,但是Redis中這個鎖已經過期被強制被動釋放掉的情況。
偽代碼
/*** 帶鎖續命的分布式鎖的方式*/private void stockLock3() {// id為10086的商品String lockKey = "stock:10086";// 設置10秒的超時時間,10秒后自動釋放鎖// 這里需要注意,這里是通過一行命令執行的加鎖 + 超時,而不是2行。 因為如果放開為2行也會有剛加上鎖,還沒來得及設置超時就宕機成為死鎖的情況。Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "locked", 10, TimeUnit.SECONDS);// 設置成功才會返回trueif (!result) {throw new RuntimeException("該商品正在被其他買家扣減庫存,請稍后再試~");}// 開啟一個線程(watch dog)專門去掃描鎖是否還持有new Thread(watch dog).start();try {int stockTotal = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));if (stockTotal > 0) {int realStock = stockTotal - 1;stringRedisTemplate.opsForValue().set(lockKey, realStock + "");System.out.println("扣減成功,剩余庫存:" + realStock);} else {System.out.println("扣減失敗,庫存不足");}} finally {// 釋放鎖stringRedisTemplate.delete(lockKey);}}
watch dog的邏輯實現比較復雜,現在已經有現成的開源框架實現,直接用即可,不需要重復造比較復雜的輪子,因為容易出錯。
1.3.4、redisson
這是一個類似jedis的客戶端,它就實現了帶watch dog很完善的分布式鎖功能。
pom
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.6.5</version>
</dependency>
參考代碼
/*** redisson*/private void stockLock4() {// id為10086的商品String lockKey = "stock:10086";//獲取鎖對象RLock redissonLock = redisson.getLock(lockKey);//加分布式鎖redissonLock.lock(); // .setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);try {int stockTotal = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));if (stockTotal > 0) {int realStock = stockTotal - 1;stringRedisTemplate.opsForValue().set(lockKey, realStock + "");System.out.println("扣減成功,剩余庫存:" + realStock);} else {System.out.println("扣減失敗,庫存不足");}} finally {// 釋放鎖redissonLock.unlock();}}
1.3.4.1、核心加鎖流程
這里補充一下,鎖超時時間默認30s
1.3.4.2、核心源碼
加鎖
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}
核心是使用lua腳本依靠hset進行加鎖 和 支持重入鎖
這里提一下,lua腳本其中某個指令執行失敗時不會回滾數據,但是會中斷后續指令執行。 – 因此如果要回滾數據,需要lua自身保證
redis.call('set', 'key1', 'value1') -- 成功
redis.call('incr', 'key1') -- 失敗(key1為字符串)
redis.call('set', 'key2', 'value2') -- 不會執行
鎖續命
private void scheduleExpirationRenewal(final long threadId) {if (expirationRenewalMap.containsKey(getEntryName())) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {expirationRenewalMap.remove(getEntryName());if (!future.isSuccess()) {log.error("Can't update lock " + getName() + " expiration", future.cause());return;}if (future.getNow()) {// reschedule itself 這里就是循環續命邏輯scheduleExpirationRenewal(threadId);}}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {task.cancel();}}
核心lua
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",
如果沒過期,直接重置超時時間。
加鎖失敗時自旋重試
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return;}RFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future);try {while (true) {ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;}// waiting for messageif (ttl >= 0) {// 等待上一把鎖超時,這里會阻塞ttl超時的時間,這里使用的Semaphore,會阻塞時會讓出CPU時間片// 這里有一個巧妙的地方,如果目標鎖提前釋放,這里就會提前退出阻塞。不會固定等ttl時間getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().acquire();}}} finally {unsubscribe(future, threadId);}
// get(lockAsync(leaseTime, unit));}
鎖釋放
本質上是用到了Redis的發布訂閱功能,也就是Stream流類型。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('exists', KEYS[1]) == 0) then " +// 鎖不存在時,會發布一個鎖釋放的Stream事件, 這樣Java客戶端就可以感知到,走釋放邏輯,提前通知其它線程可以嘗試獲取鎖了"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +// 使用線程ID判斷是否鎖是自己線程加的,如果不是則不進行后續處理// 防止其它線程因為代碼錯誤或者惡意直接釋放代碼// 這里需要注意,getLockName(threadId) 實際上是uuid + 線程id,能保證唯一"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +// 重入鎖邏輯,釋放一重鎖,依舊會重置超時時間"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));}
1.3.5、鎖優化
上面的方式本質上就是將請求串行化,這樣的話并發會一點起不來。為了保證數據安全的同時,又要提供并發,可以從鎖粒度
上面去思考,例如一個商品100個庫存,是否可以將這個商品拆成5個鎖,每個鎖管20個庫存,這樣的話就可以支持五個并發了(分段鎖
)
2、緩存設計
2.1、需要注意的問題
2.1.1、緩存穿透
一般情況下,會將從數據庫中查詢到的數據放到Redis緩存中,然后下次查詢時在緩存中能查到該條數據,就直接返回。
但是這里存在一個特殊的情況,如果對應數據確實在Redis緩存和數據庫中都不存在,那么每次空查詢都會打到數據庫中,導致數據庫壓力劇增。 - 這種情況,Redis緩存就失去了保護作用,即緩存穿透。
造成這種情況的原因:
1、業務代碼本身問題,導致正常的數據一直查不到數據,或者沒有放入緩存。
2、惡意攻擊,或者爬蟲,產生大量惡意空查詢。
解決方案
1、將空對象也緩存到本地緩存之中,然后設置緩存淘汰策略。
public static String get(String key) {// 從緩存中獲取數據String cacheValue = cache.get(key);// 緩存為空if (StringUtils.isBlank(cacheValue)) {// 從存儲中獲取String storageValue = storage.get(key);cache.set(key, storageValue);// 如果存儲數據為空, 需要設置一個過期時間(300秒)if (storageValue == null) {cache.expire(key, 60 * 5);}return storageValue;} else {// 緩存非空return cacheValue;}}
本地緩存比Redis緩存還快,加上設置淘汰策略,能更好的屏蔽掉無效請求。
2、使用布隆過濾器
布隆過濾器作用:如果認為一個請求存在,那么這個請求可能不存在。但是如果認為一個請求存在,那么這個請求必然不存在
。
布隆過濾器本身就是一個很大的位數組 + 一個無偏的hash函數構成。能夠將對應key值通過函數分攤到位數組中的不同位。 這樣僅需要檢查對應key是否在位數組中存在重合,就可以判定是否存在。
使用布隆過濾器的場景:數據量大(實時性低)、數據命中率不高、數據相對固定(沒有頻繁增量數據)的場景。另外值得一提的是,布隆過濾器屬于CPU換空間,占用的內存很低
。
注意:已經加入布隆過濾器的數據不能移除,除非重構布隆過濾器。
示例(基于Redisson)
public static void main(String[] args) {Config config = new Config();config.useSingleServer().setAddress("redis://localhost:6379");//構造RedissonRedissonClient redisson = Redisson.create(config);RBloomFilter<String> bloomFilter = redisson.getBloomFilter("nameList");//初始化布隆過濾器:預計元素為100000000L,誤差率為3%,根據這兩個參數會計算出底層的bit數組大小bloomFilter.tryInit(100000L, 0.03);//將zhuge插入到布隆過濾器中bloomFilter.add("wangxiaoyi");bloomFilter.add("hh");//判斷下面號碼是否在布隆過濾器中System.out.println(bloomFilter.contains("wangxiaoyi"));//trueSystem.out.println(bloomFilter.contains("3"));//false}
即使使用布隆過濾器,也有插入數據需要刪除布隆過濾器中某個key的場景。好像可以使用內存增加幾倍的變種過濾器。
2.1.2、緩存擊穿(失效)
一般將數據放到緩存中時會設置過期時間,這里存在一個問題,如果大量的緩存在一個集中的時間被加載,那么也會存在同一時刻大量的失效。緩存失效就意味著大量的請求會直接打到數據庫,這也會導致數據庫瞬間壓力劇增。 這就是緩存失效造成的擊穿緩存。
解決方案的話,可以給緩存的過期時間設置一個波動區間,這樣的話在同一時間失效的數據量就會小很多。
另外就是,基本上熱點數據相對冷數據較少,也可以在每次查詢時,給對應key進行過期時間續期,這樣也能更快的分離冷熱數據。
示例
public static String get(String key) {// 從緩存中獲取數據String cacheValue = cache.get(key);// 緩存為空if (StringUtils.isBlank(cacheValue)) {// 從存儲中獲取String storageValue = storage.get(key);cache.set(key, storageValue); //設置一個過期時間(300到600之間的一個隨機數)int expireTime = new Random().nextInt(300) + 300;if (storageValue == null) {cache.expire(key, expireTime);}return storageValue;} else {// 緩存非空return cacheValue;}}
2.1.3、緩存雪崩
有一句話:雪崩之下,沒有一篇雪花是無辜的。
在這里,對應的也就是,當redis中存在某個bigkey,或者網絡波動導致響應變慢。這樣就會導致第一個請求拉慢第二個請求,第二個拉慢第三個。 最終導致整體響應變慢,最后Redis被拉垮(或者說Redis直接蹦掉),服務調用鏈一層影響一層,最終整個系統不再響應,最終觸發OOM崩掉。 – 這就是緩存雪崩,由一片超時雪花
造成。
解決方向
1、保證Redis的高可用,例如主從哨兵、集群。
2、支持服務限流、熔斷。比如使用Sentinel或Hystrix限流降級組件。
如果是非核心業務,例如用戶信息,則可以暫時直接返回友好的提示。 如果是核心業務,則考慮限流,允許小水細流的請求直接查詢數據庫。
3、把本地緩存也使用起來,先查本地緩存,再查Redis,這樣也可以過濾一波大流量。這里需要對本地緩存的淘汰策略有嚴格控制。
2.1.4、熱點緩存重建
查詢數據都是先查詢緩存,再查詢數據庫。 如果突然除了一個熱點key,被大量的請求同時訪問,這時就會出現大量的請求都沒有被緩存擋住(緩存還沒被及時加載到緩存中),那么這些大量的請求就直接打到數據庫中了,這樣也會導致數據庫壓力突增。 – 這里要做的就是在key從數據庫加載到緩存的操作需要控制在第一個到達的請求來做,其它請求要么阻塞,要么直接拒接。 這就是熱點緩存重建策略。
示例
public static String get(String key) {// 從Redis中獲取數據String value = redis.get(key);// 如果value為空, 則開始重構緩存if (value == null) {// 只允許一個線程重建緩存, 使用nx, 并設置過期時間exString mutexKey = "mutext:key:" + key;if (redis.set(mutexKey, "1", "ex 180", "nx")) {// 從數據源獲取數據value = db.get(key);// 回寫Redis, 并設置過期時間redis.setex(key, timeout, value);// 刪除key_mutexredis.delete(mutexKey);}// 其他線程休息50毫秒后重試else {Thread.sleep(50);get(key);}}return value;}
2.1.5、緩存與數據庫雙寫不一致
出現雙寫不一致的原因:先來的請求,在給Redis發請求時,不一定就先執行,存在被其他Redis請求加塞
的情況,因為網絡、CPU等因素都是不確定的,會造成寫丟失。
解決方案
1、對于并發度很小的數據(比如個人維度的購物車,訂單),可以不用考慮這個問題,按照常規的設置緩存過期時間,到時失效后,就會重新寫緩存最新的數據。
2、就算并發度很高,有些數據也是能容忍短暫的數據不一致的,例如商品名稱。
3、如果必須要保證一致性,就需要加Redis讀寫鎖。
4、也可以使用阿里開源的canal
通過監聽數據庫binlog的方式,及時去修改緩存,但是這樣的話就引入了新的中間件,增加了系統的復雜度,同時在讀的時候存在一定時間的延遲。
2.1.6、總結
1、使用Redis緩存的應該主要是讀多寫少的場景。
2、如果是寫多讀也多的場景,并且還有保證數據強一致性就沒有必要十余年緩存了,直接使用數據庫。
3、如果確實數據庫抗不住,就只能將Redis作為讀寫主存,異步將數據同步給數據庫,這時的數據庫是作為一個備份的角色。 總之就是同步扛不住就只能轉異步(例如RabbitMQ),或者限流。
4、總之,Redis是用來緩存,解決那些能容忍一點時間不一致的數據查詢壓力的,如果為了其它一致性過度設計就得不償失了。