微服務實戰項目-學成在線-項目優化(redis緩存優化)
1 優化需求
視頻播放頁面用戶未登錄也可以訪問,當用戶觀看試學課程時需要請求服務端查詢數據,接口如下:
1、根據課程id查詢課程信息。
2、根據文件id查詢視頻信息。
這些接口在用戶未認證狀態下也可以訪問,如果接口的性能不高,當高并發到來很可能耗盡整個系統的資源,將整個系統壓垮,所以特別需要對這些暴露在外邊的接口進行優化。
下邊對 根據課程id查詢課程信息
接口進行優化,下邊的內容將此接口簡稱為課程查詢接口。
接口地址:http://www.51xuecheng.cn/open/content/course/whole/{courseId}
2 壓力測試
2.1 性能指標
對接口進行優化之前需要對接口進行壓力測試,不僅接口需要壓力測試,整個微服務在發布前也是需要經歷壓力測試的,因為壓力測試可以暴露功能測試所發現不了的問題。
功能測試即是對系統的功能按用戶需求進行測試,比如:添加一門課程,根據需求文檔先準備測試數據,再通過前端界面將一門課程添加到系統,測試是否可以操作成功。整個過程就是測試軟件是否可以實現用戶的需求。
壓力測試是通過測試工具制造大規模的并發請求去訪問系統,測試系統是否經受住壓力。
比如:一個在線學習網站,上線要求該網站可以支持1萬用戶同時在線,此時就需要模擬1萬并發請求去訪問網站的關鍵業務流程,比如:測試點播學習流程,測試系統是否可以抗住1萬并發請求。
一些功能測試時無法發現的問題在壓力測試時就會發現,比如:內存泄露、線程安全、IO異常等問題。
壓力測試常用的性能指標如下:
1、吞吐量
吞吐量是系統每秒可以處理的事務數,也稱為TPS(Transaction Per Second)。
比如:一次點播流程,從請求進入系統到視頻畫圖顯示出來這整個流程就是一次事務。
所以吞吐量并不是一次數據庫事務,它是完成一次業務的整體流程。
2、響應時間
響應時間是指客戶端請求服務端,從請求進入系統到客戶端拿到響應結果所經歷的時間。響應時間包括:最大響應時間、最小響應時間、平均響應時間。
3、每秒查詢數
每秒查詢數即QPS(Queries-per-second),它是衡量查詢接口的性能指標,比如:商品信息查詢,
一秒可以請求該接口查詢商品信息的次數就是QPS。
拿查詢接口舉例,一次查詢請求內部不會再去請求其它接口,此時 QPS=TPS
如果一次查詢請求內容需要遠程調用另一個接口查詢數據,此時 QPS=2 * TPS
4、錯誤率
錯誤率 是一批請求發生錯誤的請求占全部請求的比例。
不同的指標其要求不同,比如現在進行接口優化,優化后的接口響應時間應該越來越小,吞吐量越來越大,以及QPS值也是越大越好,錯誤率要保持在一個很小的范圍。
另外除了關注這些性能指標以外還要關注系統的負載情況:
1、CPU使用率,不高于85%
2、內存利用率,不高于 85%
3、網絡利用率,不高于 80%
4、磁盤IO
磁盤IO的性能指標是IOPS (Input/Output Per
Second)即每秒的輸入輸出量(或讀寫次數)。
如果過大說明IO操作密集,IO過大也會影響性能指標。
2.2 安裝Jmeter
Apache JMeter 是 Apache 組織基于 Java
開發的壓力測試工具,用于對軟件做壓力測試。
下載Jmeter
https://jmeter.apache.org/download_jmeter.cgi
下載,解壓,進入bin目錄修改jmeter.properties,設置中文和字體
language=zh_CN
jmeter.hidpi.mode=true
jmeter.hidpi.scale.factor=1.8
jsyntaxtextarea.font.family= Hack
jsyntaxtextarea.font.size=25
jmeter.toolbar.icons.size=32x32
雙擊運行bin目錄下的jmeter.bat文件。
界面如下圖:
2.3 壓力測試
樣本數:200個線程,每個線程請求100次,共20000次
壓力機:通常壓力機是單獨的客戶端。
測試gateway+content
吞吐量180左右
測試content
吞吐量300左右
2.4 優化日志
內容管理日志級別改為info級別.
單獨請求內容管理測試,吞吐量達到1500左右
3 緩存優化
3.1 redis緩存
測試用例是根據id查詢課程信息,這里不存在復雜的SQL,也不存在數據庫連接不釋放的問題,暫時不考慮數據庫方面的優化。
課程發布信息的特點的是查詢較多,修改很少,這里考慮將課程發布信息進行緩存。
課程信息緩存的流程如下:
在nacos配置redis-dev.yaml(group=xuecheng-plus-common)
spring: redis:host: 192.168.101.65port: 6379password: redisdatabase: 0lettuce:pool:max-active: 20max-idle: 10min-idle: 0timeout: 10000
在content-api微服務加載redis-dev.yaml
shared-configs:- data-id: redis-${spring.profiles.active}.yamlgroup: xuecheng-plus-commonrefresh: true
在content-service微服務中添加依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.6.2</version>
</dependency>
定義查詢緩存接口:
/*** @description 查詢緩存中的課程信息* @param courseId * @return com.xuecheng.content.model.po.CoursePublish* @author Mr.M* @date 2022/10/22 16:15
*/
public CoursePublish getCoursePublishCache(Long courseId);
接口實現如下:
public CoursePublish getCoursePublishCache(Long courseId){//查詢緩存Object jsonObj = redisTemplate.opsForValue().get("course:" + courseId);if(jsonObj!=null){String jsonString = jsonObj.toString();System.out.println("=================從緩存查=================");CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);return coursePublish;} else {System.out.println("從數據庫查詢...");//從數據庫查詢CoursePublish coursePublish = getCoursePublish(courseId);if(coursePublish!=null){redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePublish));}return coursePublish;}
}
}
修改controller接口調用代碼
@ApiOperation("獲取課程發布信息")@ResponseBody@GetMapping("/course/whole/{courseId}")public CoursePreviewDto getCoursePublish(@PathVariable("courseId") Long courseId) {//查詢課程發布信息CoursePublish coursePublish = coursePublishService.getCoursePublishCache(courseId);
// CoursePublish coursePublish = coursePublishService.getCoursePublish(courseId);if(coursePublish==null){return new CoursePreviewDto();}//課程基本信息CourseBaseInfoDto courseBase = new CourseBaseInfoDto();BeanUtils.copyProperties(coursePublish, courseBase);//課程計劃List<TeachplanDto> teachplans = JSON.parseArray(coursePublish.getTeachplan(), TeachplanDto.class);CoursePreviewDto coursePreviewInfo = new CoursePreviewDto();coursePreviewInfo.setCourseBase(courseBase);coursePreviewInfo.setTeachplans(teachplans);return coursePreviewInfo;}
重新測試請求內容管理服務課程查詢接口。
吞吐量達到2700左右,增加了近一倍。
3.2 緩存穿透問題
3.2.1 什么是緩存穿透
使用緩存后代碼的性能有了很大的提高,雖然性能有很大的提升但是控制臺打出了很多"從數據庫查詢"的日志,明明判斷了如果緩存存在課程信息則從緩存查詢,為什么要有這么多從數據庫查詢的請求的?
這是因為并發數高,很多線程會同時到達查詢數據庫代碼處去執行。
我們分析下代碼:
如果存在惡意攻擊的可能,如果有大量并發去查詢一個不存在的課程信息會出現什么問題呢?
比如去請求/content/course/whole/181,查詢181號課程,該課程并不在課程發布表中。
進行壓力測試發現會去請求數據庫。
大量并發去訪問一個數據庫不存在的數據,由于緩存中沒有該數據導致大量并發查詢數據庫,這個現象要緩存穿透。
緩存穿透可以造成數據庫瞬間壓力過大,連接數等資源用完,最終數據庫拒絕連接不可用。
3.2.2 解決緩存穿透
如何解決緩存穿透?
1、對請求增加校驗機制
比如:課程Id是長整型,如果發來的不是長整型則直接返回。
2、使用布隆過濾器
什么是布隆過濾器,以下摘自百度百科:
布隆過濾器可以用于檢索一個元素是否在一個集合中。如果想要判斷一個元素是不是在一個集合里,一般想到的是將所有元素保存起來,然后通過比較確定。鏈表,樹等等數據結構都是這種思路.
但是隨著集合中元素的增加,我們需要的存儲空間越來越大,檢索速度也越來越慢(O(n),O(logn))。不過世界上還有一種叫作散列表(又叫哈希表,Hash
table)的數據結構。它可以通過一個Hash函數將一個元素映射成一個位陣列(Bit
array)中的一個點。這樣一來,我們只要看看這個點是不是1就可以知道集合中有沒有它了。這就是布隆過濾器的基本思想。
布隆過濾器的特點是,高效地插入和查詢,占用空間少;查詢結果有不確定性,如果查詢結果是存在則元素不一定存在,如果不存在則一定不存在;另外它只能添加元素不能刪除元素,因為刪除元素會增加誤判率。
比如:將商品id寫入布隆過濾器,如果分3次hash此時在布隆過濾器有3個點,當從布隆過濾器查詢該商品id,通過hash找到了該商品id在過濾器中的點,此時返回1,如果找不到一定會返回0。
所以,為了避免緩存穿透我們需要緩存預熱將要查詢的課程或商品信息的id提前存入布隆過濾器,添加數據時將信息的id也存入過濾器,當去查詢一個數據時先在布隆過濾器中找一下如果沒有到到就說明不存在,此時直接返回。
實現方法有:
Google工具包Guava實現。
redisson 。
2、緩存空值或特殊值
請求通過了第一步的校驗,查詢數據庫得到的數據不存在,此時我們仍然去緩存數據,緩存一個空值或一個特殊值的數據。
但是要注意:如果緩存了空值或特殊值要設置一個短暫的過期時間。
public CoursePublish getCoursePublishCache(Long courseId) {//查詢緩存Object jsonObj = redisTemplate.opsForValue().get("course:" + courseId);if(jsonObj!=null){String jsonString = jsonObj.toString();if(jsonString.equals("null"))return null;CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);return coursePublish;} else {//從數據庫查詢System.out.println("從數據庫查詢數據...");CoursePublish coursePublish = getCoursePublish(courseId);//設置過期時間300秒redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePublish),30, TimeUnit.SECONDS);return coursePublish;}
}
再測試,雖然還存在個別請求去查詢數據庫,但不是所有請求都去查詢數據庫,基本上都命中緩存。
3.3 緩存雪崩
3.3.1 什么是緩存雪崩
緩存雪崩是緩存中大量key失效后當高并發到來時導致大量請求到數據庫,瞬間耗盡數據庫資源,導致數據庫無法使用。
造成緩存雪崩問題的原因是是大量key擁有了相同的過期時間,比如對課程信息設置緩存過期時間為10分鐘,在大量請求同時查詢大量的課程信息時,此時就會有大量的課程存在相同的過期時間,一旦失效將同時失效,造成雪崩問題。
3.3.2 解決緩存雪崩
如何解決緩存雪崩?
1、使用同步鎖控制查詢數據庫的線程
使用同步鎖控制查詢數據庫的線程,只允許有一個線程去查詢數據庫,查詢得到數據后存入緩存。
synchronized(obj){//查詢數據庫//存入緩存
}
2、對同一類型信息的key設置不同的過期時間
通常對一類信息的key設置的過期時間是相同的,這里可以在原有固定時間的基礎上加上一個隨機時間使它們的過期時間都不相同。
示例代碼如下:
//設置過期時間300秒redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePublish),300+new Random().nextInt(100), TimeUnit.SECONDS);
3、緩存預熱
不用等到請求到來再去查詢數據庫存入緩存,可以提前將數據存入緩存。使用緩存預熱機制通常有專門的后臺程序去將數據庫的數據同步到緩存。
3.4 緩存擊穿
3.4.1 什么是緩存擊穿
緩存擊穿是指大量并發訪問同一個熱點數據,當熱點數據失效后同時去請求數據庫,瞬間耗盡數據庫資源,導致數據庫無法使用。
比如某手機新品發布,當緩存失效時有大量并發到來導致同時去訪問數據庫。
3.4.2 解決緩存擊穿
如何解決緩存擊穿?
1、使用同步鎖控制查詢數據庫的線程
使用同步鎖控制查詢數據庫的代碼,只允許有一個線程去查詢數據庫,查詢得到數據庫存入緩存。
synchronized(obj){//查詢數據庫//存入緩存
}
2、熱點數據不過期
可以由后臺程序提前將熱點數據加入緩存,緩存過期時間不過期,由后臺程序做好緩存同步。
下邊使用synchronized對代碼加鎖。
public CoursePublish getCoursePublishCache(Long courseId){synchronized(this){//查詢緩存String jsonString = (String) redisTemplate.opsForValue().get("course:" + courseId);if(StringUtils.isNotEmpty(jsonString)){if(jsonString.equals("null"))return null;CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);return coursePublish;}else{System.out.println("=========從數據庫查詢==========");//從數據庫查詢CoursePublish coursePublish = getCoursePublish(courseId);//設置過期時間300秒redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePublish),300, TimeUnit.SECONDS);return coursePublish;}}}
測試,吞吐量有1300左右
對上邊的代碼進行優化,對查詢緩存的代碼不用synchronized加鎖控制,只對查詢數據庫進行加鎖,如下:
public CoursePublish getCoursePublishCache(Long courseId){//查詢緩存Object jsonObj = redisTemplate.opsForValue().get("course:" + courseId);if(jsonObj!=null){String jsonString = jsonObj.toString();CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);return coursePublish;}else{synchronized(this){Object jsonObj = redisTemplate.opsForValue().get("course:" + courseId);if(jsonObj!=null){String jsonString = jsonObj.toString();CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);return coursePublish;}System.out.println("=========從數據庫查詢==========");//從數據庫查詢CoursePublish coursePublish = getCoursePublish(courseId);//設置過期時間300秒redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePublish),300, TimeUnit.SECONDS);return coursePublish;}}}
測試,查詢數據庫只發生一次,整個測試過程的吞吐量在3800左右。
3.4.3 小結
1)緩存穿透:
去訪問一個數據庫不存在的數據無法將數據進行緩存,導致查詢數據庫,當并發較大就會對數據庫造成壓力。緩存穿透可以造成數據庫瞬間壓力過大,連接數等資源用完,最終數據庫拒絕連接不可用。
解決的方法:
緩存一個null值。
使用布隆過濾器。
2)緩存雪崩:
緩存中大量key失效后當高并發到來時導致大量請求到數據庫,瞬間耗盡數據庫資源,導致數據庫無法使用。
造成緩存雪崩問題的原因是是大量key擁有了相同的過期時間。
解決辦法:
使用同步鎖控制
對同一類型信息的key設置不同的過期時間,比如:使用固定數+隨機數作為過期時間。
3)緩存擊穿
大量并發訪問同一個熱點數據,當熱點數據失效后同時去請求數據庫,瞬間耗盡數據庫資源,導致數據庫無法使用。
解決辦法:
使用同步鎖控制
設置key永不過期
無中生有是穿透,布隆過濾null隔離。
緩存擊穿key過期, 鎖與非期解難題。
大量過期成雪崩,過期時間要隨機。
面試必考三兄弟,可用限流來保底。
限流技術方案:
alibaba/Sentinel
nginx+Lua
3.5 分布式鎖
3.5.1 本地鎖的問題
上邊的程序使用了同步鎖解決了緩存擊穿、緩存雪崩的問題,保證同一個key過期后只會查詢一次數據庫。
如果將同步鎖的程序分布式部署在多個虛擬機上則無法保證同一個key只會查詢一次數據庫,如下圖:
一個同步鎖程序只能保證同一個虛擬機中多個線程只有一個線程去數據庫,如果高并發通過網關負載均衡轉發給各個虛擬機,此時就會存在多個線程去查詢數據庫情況,因為虛擬機中的鎖只能保證該虛擬機自己的線程去同步執行,無法跨虛擬機保證同步執行。
我們將虛擬機內部的鎖叫本地鎖,本地鎖只能保證所在虛擬機的線程同步執行。
下邊進行測試:
啟動三個內容管理服務:
通過網關訪問課程查詢,網關通過負載均衡將請求轉發給三個服務。
通過測試發現,有兩個服務各有一次數據庫查詢,這說明本地鎖無法跨虛擬機保證同步執行。
3.5.2 什么是分布鎖
本地鎖只能控制所在虛擬機中的線程同步執行,現在要實現分布式環境下所有虛擬機中的線程去同步執行就需要讓多個虛擬機去共用一個鎖,虛擬機可以分布式部署,鎖也可以分布式部署,如下圖:
虛擬機都去搶占同一個鎖,鎖是一個單獨的程序提供加鎖、解鎖服務,誰搶到鎖誰去查詢數據庫。
該鎖已不屬于某個虛擬機,而是分布式部署,由多個虛擬機所共享,這種鎖叫分布式鎖。
3.5.3 分布式鎖的實現方案
實現分布式鎖的方案有很多,常用的如下:
1、基于數據庫實現分布鎖
利用數據庫主鍵唯一性的特點,或利用數據庫唯一索引的特點,多個線程同時去插入相同的記錄,誰插入成功誰就搶到鎖。
2、基于redis實現鎖
redis提供了分布式鎖的實現方案,比如:SETNX、set nx、redisson等。
拿SETNX舉例說明,SETNX命令的工作過程是去set一個不存在的key,多個線程去設置同一個key只會有一個線程設置成功,設置成功的的線程拿到鎖。
3、使用zookeeper實現
zookeeper是一個分布式協調服務,主要解決分布式程序之間的同步的問題。zookeeper的結構類似的文件目錄,多線程向zookeeper創建一個子目錄(節點)只會有一個創建成功,利用此特點可以實現分布式鎖,誰創建該結點成功誰就獲得鎖。
3.5.4 Redis NX實現分布式鎖
redis實現分布式鎖的方案可以在redis.cn網站查閱,地址http://www.redis.cn/commands/set.html
使用命令: SET resource-name anystring NX EX max-lock-time 即可實現。
NX:表示key不存在才設置成功。
EX:設置過期時間
這里啟動三個ssh客戶端,連接redis: docker exec -it redis redis-cli
先認證: auth redis
同時向三個客戶端發送測試命令如下:
表示設置lock001鎖,value為001,過期時間為30秒
SET lock001 001 NX EX 30
命令發送成功,觀察三個ssh客戶端發現只有一個設置成功,其它兩個設置失敗,設置成功的請求表示搶到了lock001鎖。
如何在代碼中使用Set nx去實現分布鎖呢?
使用spring-boot-starter-data-redis 提供的api即可實現set nx。
添加依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.6.2</version>
</dependency>
添加依賴后,在bean中注入restTemplate。
我們先分析一段偽代碼如下:
if(緩存中有){返回緩存中的數據
}else{獲取分布式鎖if(獲取鎖成功){try{查詢數據庫}finally{釋放鎖}}}
1、獲取分布式鎖
使用redisTemplate.opsForValue().setIfAbsent(key,vaue)獲取鎖
這里考慮一個問題,當set
nx一個key/value成功1后,這個key(就是鎖)需要設置過期時間嗎?
如果不設置過期時間當獲取到了鎖卻沒有執行finally這個鎖將會一直存在,其它線程無法獲取這個鎖。
所以執行set nx時要指定過期時間,即使用如下的命令
SET resource-name anystring NX EX max-lock-time
具體調用的方法是:redisTemplate.opsForValue().setIfAbsent(K var1, V
var2, long var3, TimeUnit var5)
2、如何釋放鎖
釋放鎖分為兩種情況:key到期自動釋放,手動刪除。
1)key到期自動釋放的方法
因為鎖設置了過期時間,key到期會自動釋放,但是會存在一個問題就是
查詢數據庫等操作還沒有執行完時key到期了,此時其它線程就搶到鎖了,最終重復查詢數據庫執行了重復的業務操作。
怎么解決這個問題?
可以將key的到期時間設置的長一些,足以執行完成查詢數據庫并設置緩存等相關操作。
如果這樣效率會低一些,另外這個時間值也不好把控。
2)手動刪除鎖
如果是采用手動刪除鎖可能和key到期自動刪除有所沖突,造成刪除了別人的鎖。
比如:當查詢數據庫等業務還沒有執行完時key過期了,此時其它線程占用了鎖,當上一個線程執行查詢數據庫等業務操作完成后手動刪除鎖就把其它線程的鎖給刪除了。
要解決這個問題可以采用刪除鎖之前判斷是不是自己設置的鎖,偽代碼如下:
if(緩存中有){返回緩存中的數據
}else{獲取分布式鎖: set lock 01 NXif(獲取鎖成功){try{查詢數據庫}finally{if(redis.call("get","lock")=="01"){釋放鎖: redis.call("del","lock")}}}}
以上代碼第11行到13行非原子性,也會導致刪除其它線程的鎖。
查看文檔上的說明:http://www.redis.cn/commands/set.html
在調用setnx命令設置key/value時,每個線程設置不一樣的value值,這樣當線程去刪除鎖時可以先根據key查詢出來判斷是不是自己當時設置的vlaue,如果是則刪除。
這整個操作是原子的,實現方法就是去執行上邊的lua腳本。
Lua
是一個小巧的腳本語言,redis在2.6版本就支持通過執行Lua腳本保證多個命令的原子性。
什么是原子性?
這些指令要么全成功要么全失敗。
以上就是使用Redis
Nx方式實現分布式鎖,為了避免刪除別的線程設置的鎖需要使用redis去執行Lua腳本的方式去實現,這樣就具有原子性,但是過期時間的值設置不存在不精確的問題。
3.5.5 Redisson實現分布式鎖
3.5.5.1 什么是Redisson
再查閱 文檔http://www.redis.cn/commands/set.html
點擊鏈接查看
我們選用Java的實現方案 https://github.com/redisson/redisson
Redisson的文檔地址:https://github.com/redisson/redisson/wiki/Table-of-Content
Redisson底層采用的是Netty
框架。支持Redis
2.8以上版本,支持Java1.6+以上版本。Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory
Data
Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務。其中包括(BitSet,
Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque,
BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish /
Subscribe, Bloom filter, Remote service, Spring cache, Executor service,
Live Object service, Scheduler service) 。
使用Redisson可以非常方便將Java本地內存中的常用數據結構的對象搬到分布式緩存redis中。
也可以將常用的并發編程工具如:AtomicLong、CountDownLatch、Semaphore等支持分布式。
使用RScheduledExecutorService 實現分布式調度服務。
支持數據分片,將數據分片存儲到不同的redis實例中。
支持分布式鎖,基于Java的Lock接口實現分布式鎖,方便開發。
下邊使用Redisson將Queue隊列的數據存入Redis,實現一個排隊及出隊的接口。
添加redisson的依賴
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.11.2</version>
</dependency>
從課程資料目錄拷貝singleServerConfig.yaml到config工程下
在redis配置文件中添加:
spring:redis:redisson:#配置文件目錄config: classpath:singleServerConfig.yaml#config: classpath:clusterServersConfig.yaml
redis集群配置clusterServersConfig.yaml.
Redisson相比set nx實現分布式鎖要簡單的多,工作原理如下:
加鎖機制
線程去獲取鎖,獲取成功: 執行lua腳本,保存數據到redis數據庫。
線程去獲取鎖,獲取失敗:
一直通過while循環嘗試獲取鎖,獲取成功后,執行lua腳本,保存數據到redis
WatchDog自動延期看門狗機制
第一種情況:在一個分布式環境下,假如一個線程獲得鎖后,突然服務器宕機了,那么這個時候在一定時間后這個鎖會自動釋放,你也可以設置鎖的有效時間(當不設置默認30秒時),這樣的目的主要是防止死鎖的發生
第二種情況:線程A業務還沒有執行完,時間就過了,線程A
還想持有鎖的話,就會啟動一個watch
dog后臺線程,不斷的延長鎖key的生存時間。
lua腳本-保證原子性操作
主要是如果你的業務邏輯復雜的話,通過封裝在lua腳本中發送給redis,而且redis是單線程的,這樣就保證這段復雜業務邏輯執行的原子性
具體使用RLock操作分布鎖,RLock繼承JDK的Lock接口,所以他有Lock接口的所有特性,比如lock、unlock、trylock等特性,同時它還有很多新特性:強制鎖釋放,帶有效期的鎖,。
public interface RRLock {//----------------------Lock接口方法-----------------------/*** 加鎖 鎖的有效期默認30秒*/void lock();/*** 加鎖 可以手動設置鎖的有效時間** @param leaseTime 鎖有效時間* @param unit 時間單位 小時、分、秒、毫秒等*/void lock(long leaseTime, TimeUnit unit);/*** tryLock()方法是有返回值的,用來嘗試獲取鎖,* 如果獲取成功,則返回true,如果獲取失敗(即鎖已被其他線程獲取),則返回false .*/boolean tryLock();/*** tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,* 只不過區別在于這個方法在拿不到鎖時會等待一定的時間,* 在時間期限之內如果還拿不到鎖,就返回false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回true。** @param time 等待時間* @param unit 時間單位 小時、分、秒、毫秒等*/boolean tryLock(long time, TimeUnit unit) throws InterruptedException;/*** 比上面多一個參數,多添加一個鎖的有效時間** @param waitTime 等待時間* @param leaseTime 鎖有效時間* @param unit 時間單位 小時、分、秒、毫秒等* waitTime 大于 leaseTime*/boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;/*** 解鎖*/void unlock();
}
lock():
此方法為加鎖,但是鎖的有效期采用默認30秒
如果主線程未釋放,且當前鎖未調用unlock方法,則進入到watchDog機制
如果主線程未釋放,且當前鎖調用unlock方法,則直接釋放鎖
3.5.5.2 分布式鎖避免緩存擊穿
下邊使用分布式鎖修改查詢課程信息的接口。
//Redisson分布式鎖
public CoursePublish getCoursePublishCache(Long courseId){//查詢緩存String jsonString = (String) redisTemplate.opsForValue().get("course:" + courseId);if(StringUtils.isNotEmpty(jsonString)){if(jsonString.equals("null")){return null;}CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);return coursePublish;}else{//每門課程設置一個鎖RLock lock = redissonClient.getLock("coursequerylock:"+courseId);//獲取鎖lock.lock();try {jsonString = (String) redisTemplate.opsForValue().get("course:" + courseId);if(StringUtils.isNotEmpty(jsonString)){CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);return coursePublish;}System.out.println("=========從數據庫查詢==========");//從數據庫查詢CoursePublish coursePublish = getCoursePublish(courseId);redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePublish),1,TimeUnit.DAYS);return coursePublish;}finally {//釋放鎖lock.unlock();}}}
啟動多個內容管理服務實例,使用JMeter壓力測試,只有一個實例查詢一次數據庫。
測試Redisson自動續期功能。
在查詢數據庫處添加休眠,觀察鎖是否會自動續期。
try {Thread.sleep(60000);
} catch (InterruptedException e) {throw new RuntimeException(e);
}