目錄
一、短信登錄(redis+seesion)
基于Session實現登錄流程
🔄 圖中關鍵模塊解釋:
利用seesion登錄的問題
設計key的具體細節
整體訪問流程
二、商戶查詢緩存
reids與數據庫主動更新的三種方案
緩存穿透
緩存雪崩問題及解決思路
緩存擊穿問題及解決思路
三、優惠券秒殺
全局唯一ID
初步實現秒殺下單秒殺下單應該思考的內容:
樂觀鎖解決超賣問題
有關超賣問題分析:在我們原有代碼中是這么寫的
悲觀鎖解決一人一單
分布式鎖解決集群模式下的秒殺問題
分布式鎖
解決Redis分布式鎖誤刪問題
四、Reidssion
1. 分布式鎖-redission功能介紹
2. 分布式鎖-redission可重入鎖原理
3. 分布式鎖-redission鎖重試和WatchDog機制
4. 分布式鎖-redission鎖的MutiLock原理
五、異步秒殺優化
Redis消息隊列-基于Stream的消息隊列-消費者組
項目分為? ?
重點:短信登錄、商戶查詢緩存、優惠券秒殺、用戶簽到
這部分涉及到reids+seesion、緩存擊穿、緩存穿透、緩存雪崩、樂觀鎖,悲觀鎖、分布式鎖、與HyperLogLog的使用,對我們來說非常的重要
一、短信登錄(redis+seesion)
基于Session實現登錄流程
發送驗證碼:
用戶在提交手機號后,會校驗手機號是否合法,如果不合法,則要求用戶重新輸入手機號
如果手機號合法,后臺此時生成對應的驗證碼,同時將驗證碼進行保存,然后再通過短信的方式將驗證碼發送給用戶
短信驗證碼登錄、注冊:
用戶將驗證碼和手機號進行輸入,后臺從session中拿到當前驗證碼,然后和用戶輸入的驗證碼進行校驗,如果不一致,則無法通過校驗,如果一致,則后臺根據手機號查詢用戶,如果用戶不存在,則為用戶創建賬號信息,保存到數據庫,無論是否存在,都會將用戶信息保存到session中,方便后續獲得當前登錄信息
校驗登錄狀態:
用戶在請求時候,會從cookie中攜帶者JsessionId到后臺,后臺通過JsessionId從session中拿到用戶信息,如果沒有session信息,則進行攔截,如果有session信息,則將用戶信息保存到threadLocal中,并且放行
Tomcat運行原理
response:響應,反應
用戶通過 socket 發起 HTTP 請求,Tomcat 接收并分發請求到工作線程,由 Web 應用(如 Spring Boot)中的 Controller → Service → DAO 完成處理后,再將結果通過 response 寫回用戶。
🔄 圖中關鍵模塊解釋:
模塊 | 說明 |
用戶 socket | 發起 TCP 連接,發送 HTTP 請求。 |
Tomcat | Java Web 容器,監聽端口、管理線程池、轉發請求到 Web 應用。 |
監聽線程 | 監聽 TCP 請求,交由線程池中的工作線程處理。 |
工作線程池 | 處理具體的請求(取 socket 數據、封裝 request/response)。 |
WebApp | 實際部署在 Tomcat 里的 Web 應用,如使用 Spring MVC 開發的項目。 |
Controller | 接收請求,調度服務層。 |
Service | 處理業務邏輯。 |
DAO | 與數據庫進行交互。 |
DB | 后端數據庫。 |
利用seesion登錄的問題
每個tomcat中都有一份屬于自己的session,假設用戶第一次訪問第一臺tomcat,并且把自己的信息存放到第一臺服務器的session中,但是第二次這個用戶訪問到了第二臺tomcat,那么在第二臺服務器上,肯定沒有第一臺服務器存放的session,所以此時 整個登錄攔截功能就會出現問題,我們能如何解決這個問題呢?早期的方案是session拷貝,就是說雖然每個tomcat上都有不同的session,但是每當任意一臺服務器的session修改時,都會同步給其他的Tomcat服務器的session,這樣的話,就可以實現session的共享了
但是這種方案具有兩個大問題
1、每臺服務器中都有完整的一份session數據,服務器壓力過大。
2、session拷貝數據時,可能會出現延遲
所以咱們后來采用的方案都是基于redis來完成,我們把session換成redis,redis數據本身就是共享的,就可以避免session共享的問題了
首先我們要思考一下利用redis來存儲數據,那么到底使用哪種結構呢?由于存入的數據比較簡單,我們可以考慮使用String,或者是使用哈希,如下圖,如果使用String,同學們注意他的value,用多占用一點空間,如果使用哈希,則他的value中只會存儲他數據本身,如果不是特別在意內存,其實使用String就可以啦。
設計key的具體細節
所以我們可以使用String結構,就是一個簡單的key,value鍵值對的方式,但是關于key的處理,session他是每個用戶都有自己的session,但是redis的key是共享的,咱們就不能使用code了
在設計這個key的時候,我們之前講過需要滿足兩點
1、key要具有唯一性
2、key要方便攜帶
如果我們采用phone:手機號這個的數據來存儲當然是可以的,但是如果把這樣的敏感數據存儲到redis中并且從頁面中帶過來畢竟不太合適,所以我們在后臺生成一個隨機串token,然后讓前端帶來這個token就能完成我們的整體邏輯了
整體訪問流程
當注冊完成后,用戶去登錄會去校驗用戶提交的手機號和驗證碼,是否一致,如果一致,則根據手機號查詢用戶信息,不存在則新建,最后將用戶數據保存到redis,并且生成token作為redis的key,當我們校驗用戶是否登錄時,會去攜帶著token進行訪問,從redis中取出token對應的value,判斷是否存在這個數據,如果沒有則攔截,如果存在則將其保存到threadLocal中,并且放行。
public class RefreshTokenInterceptor implements HandlerInterceptor {private StringRedisTemplate stringRedisTemplate;public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 1.獲取請求頭中的tokenString token = request.getHeader("authorization");if (StrUtil.isBlank(token)) {return true;}// 2.基于TOKEN獲取redis中的用戶String key = LOGIN_USER_KEY + token;Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);// 3.判斷用戶是否存在if (userMap.isEmpty()) {return true;}// 5.將查詢到的hash數據轉為UserDTOUserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);// 6.存在,保存用戶信息到 ThreadLocalUserHolder.saveUser(userDTO);// 7.刷新token有效期stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);// 8.放行return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {// 移除用戶UserHolder.removeUser();}
}//設置攔截器校驗
public class LoginInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 1.判斷是否需要攔截(ThreadLocal中是否有用戶)if (UserHolder.getUser() == null) {// 沒有,需要攔截,設置狀態碼response.setStatus(401);// 攔截,會返回等錄界面return false;}// 有用戶,則放行return true;}
}
二、商戶查詢緩存
實際開發中,會構筑多級緩存來使系統運行速度進一步提升,例如:本地緩存與redis中的緩存并發使用
瀏覽器緩存:主要是存在于瀏覽器端的緩存
應用層緩存:可以分為tomcat本地緩存,比如之前提到的map,或者是使用redis作為緩存
數據庫緩存:在數據庫中有一片空間是 buffer pool,增改查數據都會先加載到mysql的緩存中
CPU緩存:當代計算機最大的問題是 cpu性能提升了,但內存讀寫速度沒有跟上,所以為了適應當下的情況,增加了cpu的L1,L2,L3級的緩存
由于數據庫的查詢速度比較慢,而且在大量查詢條件下,數據庫的查詢有可能失敗和造成數據庫崩潰,所以我們可以添加一層緩存中間件,叫做redis
代碼思路:如果緩存有,則直接返回,如果緩存不存在,則查詢數據庫,然后存入redis。
reids與數據庫主動更新的三種方案
由于我們的緩存的數據源來自于數據庫,而數據庫的數據是會發生變化的,因此,如果當數據庫中數據發生變化,而緩存卻沒有同步,此時就會有一致性問題存在,其后果是:
用戶使用緩存中的過時數據,就會產生類似多線程數據安全問題,從而影響業務,產品口碑等;怎么解決呢?有如下幾種方案
- Cache Aside Pattern 人工編碼方式:緩存調用者在更新完數據庫后再去更新緩存,也稱之為雙寫方案
- Read/Write Through Pattern : 由系統本身完成,數據庫與緩存的問題交由系統本身去處理
- Write Behind Caching Pattern :調用者只操作緩存,其他線程去異步處理數據庫,實現最終一致(用一個獨立的線程, 異步的去看一看緩存中的數據和數據庫中的數據的一致性,進行異步的更新,效率比較高),如果緩存出現了宕機,那么就會產生數據丟失的問題
綜合考慮使用方案一,但是方案一調用者如何處理呢?這里有幾個問題
操作緩存和數據庫時有三個問題需要考慮:
如果采用第一個方案,那么假設我們每次操作數據庫后,都操作緩存,但是中間如果沒有人查詢,那么這個更新動作實際上只有最后一次生效,中間的更新動作意義并不大,我們可以把緩存刪除,等待再次查詢時,將緩存中的數據加載出來
- 刪除緩存還是更新緩存?
-
- 更新緩存:每次更新數據庫都更新緩存,無效寫操作較多
- 刪除緩存:更新數據庫時讓緩存失效,查詢時再更新緩存
- 如何保證緩存與數據庫的操作的同時成功或失敗?(添加事務)
-
- 單體系統,將緩存與數據庫操作放在一個事務
- 分布式系統,利用TCC等分布式事務方案(確保都能成功或者失敗
應該具體操作緩存還是操作數據庫,我們應當是先操作數據庫,再刪除緩存,原因在于,如果你選擇第一種方案,在兩個線程并發來訪問時,假設線程1先來,他先把緩存刪了,此時線程2過來,他查詢緩存數據并不存在,此時他寫入緩存,當他寫入緩存后,線程1再執行更新動作時,實際上寫入的就是舊的數據,新的數據被舊數據覆蓋了。
- 先操作緩存還是先操作數據庫?
-
- 先刪除緩存,再操作數據庫
- 先操作數據庫,再刪除緩存
綜上所述,先操作數據庫,再刪除緩存更優
緩存穿透
緩存穿透 :緩存穿透是指客戶端請求的數據在緩存中和數據庫中都不存在,這樣緩存永遠不會生效,這些請求都會打到數據庫。用戶請求的數據在緩存中和數據庫中都不存在,不斷發起這樣的請求,給數據庫帶來巨大壓力
常見的解決方案有兩種:
- 緩存空對象
-
- 優點:實現簡單,維護方便
- 缺點:
-
-
- 額外的內存消耗
- 可能造成短期的不一致
-
- 布隆過濾
-
- 優點:內存占用較少,沒有多余key
- 缺點:
-
-
- 實現復雜
- 存在誤判可能
-
緩存空對象思路分析:當我們客戶端訪問不存在的數據時,先請求redis,但是此時redis中沒有數據,此時會訪問到數據庫,但是數據庫中也沒有數據,這個數據穿透了緩存,直擊數據庫,我們都知道數據庫能夠承載的并發不如redis這么高,如果大量的請求同時過來訪問這種不存在的數據,這些請求就都會訪問到數據庫,簡單的解決方案就是哪怕這個數據在數據庫中也不存在,我們也把這個數據存入到redis中去(設置比較短的TTL),這樣,下次用戶過來訪問這個不存在的數據,那么在redis中也能找到這個數據就不會進入到緩存了
布隆過濾:布隆過濾器其實采用的是哈希思想來解決這個問題,通過一個龐大的二進制數組,走哈希思想去判斷當前這個要查詢的這個數據是否存在,如果布隆過濾器判斷存在,則放行,這個請求會去訪問redis,哪怕此時redis中的數據過期了,但是數據庫中一定存在這個數據,在數據庫中查詢出來這個數據后,再將其放入到redis中,
假設布隆過濾器判斷這個數據不存在,則直接返回
這種方式優點在于節約內存空間,存在誤判,誤判原因在于:布隆過濾器走的是哈希思想,只要哈希思想,就可能存在哈希沖突
緩存穿透的解決方案有哪些?
- 緩存null值(可能會存在短期不一致性)
- 布隆過濾(算法)
- 增強id的復雜度,避免被猜測id規律
- 做好數據的基礎格式校驗
- 加強用戶權限校驗
- 做好熱點參數的限流
緩存雪崩問題及解決思路
緩存雪崩是指在同一時段大量的緩存key同時失效或者Redis服務宕機,導致大量請求到達數據庫,帶來巨大壓力。
解決方案:
- 給不同的Key的TTL添加隨機值
- 利用Redis集群提高服務的可用性(哨兵機制)
- 給緩存業務添加降級限流策略
- 給業務添加多級緩存(微服務),應對億級以上的并發
緩存擊穿問題及解決思路
緩存擊穿問題也叫熱點Key問題,就是一個被高并發訪問并且緩存重建業務較復雜的key突然失效了,無數的請求訪問會在瞬間給數據庫帶來巨大的沖擊。
常見的解決方案有兩種:
- 互斥鎖
- 邏輯過期
邏輯分析:假設線程1在查詢緩存之后,本來應該去查詢數據庫,然后把這個數據重新加載到緩存的,此時只要線程1走完這個邏輯,其他線程就都能從緩存中加載這些數據了,但是假設在線程1沒有走完的時候,后續的線程2,線程3,線程4同時過來訪問當前這個方法, 那么這些線程都不能從緩存中查詢到數據,那么他們就會同一時刻來訪問查詢緩存,都沒查到,接著同一時間去訪問數據庫,同時的去執行數據庫代碼,對數據庫訪問壓力過大
解決方案一、使用鎖來解決:
因為鎖能實現互斥性。假設線程過來,只能一個人一個人的來訪問數據庫,從而避免對于數據庫訪問壓力過大,但這也會影響查詢的性能,因為此時會讓查詢的性能從并行變成了串行,我們可以采用tryLock方法 + double check來解決這樣的問題。
假設現在線程1過來訪問,他查詢緩存沒有命中,但是此時他獲得到了鎖的資源,那么線程1就會一個人去執行邏輯,假設現在線程2過來,線程2在執行過程中,并沒有獲得到鎖,那么線程2就可以進行到休眠,直到線程1把鎖釋放后,線程2獲得到鎖,然后再來執行邏輯,此時就能夠從緩存中拿到數據了。
解決方案二、邏輯過期方案
方案分析:我們之所以會出現這個緩存擊穿問題,主要原因是在于我們對key設置了過期時間,假設我們不設置過期時間,其實就不會有緩存擊穿的問題,但是不設置過期時間,這樣數據不就一直占用我們內存了嗎,我們可以采用邏輯過期方案。
我們把過期時間設置在 redis的value中,注意:這個過期時間并不會直接作用于redis,而是我們后續通過邏輯去處理。假設線程1去查詢緩存,然后從value中判斷出來當前的數據已經過期了,此時線程1去獲得互斥鎖,那么其他線程會進行阻塞,獲得了鎖的線程他會開啟一個 線程去進行 以前的重構數據的邏輯,直到新開的線程完成這個邏輯后,才釋放鎖, 而線程1直接進行返回,假設現在線程3過來訪問,由于線程線程2持有著鎖,所以線程3無法獲得鎖,線程3也直接返回數據,只有等到新開的線程2把重建數據構建完后,其他線程才能走返回正確的數據。
這種方案巧妙在于,異步的構建緩存,缺點在于在構建完緩存之前,返回的都是臟數據。
進行對比
**互斥鎖方案:**由于保證了互斥性,所以數據一致,且實現簡單,因為僅僅只需要加一把鎖而已,也沒其他的事情需要操心,所以沒有額外的內存消耗,缺點在于有鎖就有死鎖問題的發生,且只能串行執行性能肯定受到影響
邏輯過期方案: 線程讀取過程中不需要等待,性能好,有一個額外的線程持有鎖去進行重構數據,但是在重構數據完成前,其他的線程只能返回之前的數據,且實現起來麻煩
因為現在redis中存儲的數據的value需要帶上過期時間,此時要么去修改原來的實體類,要么新建一個實體類,我們采用第二個方案,這個方案,對原來代碼沒有侵入性。
@Data
public class RedisData {private LocalDateTime expireTime;private Object data;
}
ShopServiceImpl
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public Shop queryWithLogicalExpire( Long id ) {String key = CACHE_SHOP_KEY + id;// 1.從redis查詢商鋪緩存String json = stringRedisTemplate.opsForValue().get(key);// 2.判斷是否存在if (StrUtil.isBlank(json)) {// 3.存在,直接返回return null;}// 4.命中,需要先把json反序列化為對象RedisData redisData = JSONUtil.toBean(json, RedisData.class);Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);LocalDateTime expireTime = redisData.getExpireTime();// 5.判斷是否過期if(expireTime.isAfter(LocalDateTime.now())) {// 5.1.未過期,直接返回店鋪信息return shop;}// 5.2.已過期,需要緩存重建// 6.緩存重建// 6.1.獲取互斥鎖String lockKey = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);// 6.2.判斷是否獲取鎖成功if (isLock){CACHE_REBUILD_EXECUTOR.submit( ()->{try{//重建緩存this.saveShop2Redis(id,20L);}catch (Exception e){throw new RuntimeException(e);}finally {unlock(lockKey);}});}// 6.4.返回過期的商鋪信息return shop;
}
三、優惠券秒殺
全局唯一ID
當用戶搶購時,就會生成訂單并保存到tb_voucher_order這張表中,而訂單表如果使用數據庫自增ID就存在一些問題:
- id的規律性太明顯
- 受單表數據量的限制
場景分析:如果我們的id具有太明顯的規則,用戶或者說商業對手很容易猜測出來我們的一些敏感信息,比如商城在一天時間內,賣出了多少單,這明顯不合適。
場景分析二:隨著我們商城規模越來越大,mysql的單表的容量不宜超過500W,數據量過大之后,我們要進行拆庫拆表,但拆分表了之后,他們從邏輯上講他們是同一張表,所以他們的id是不能一樣的, 于是乎我們需要保證id的唯一性。
全局ID生成器,是一種在分布式系統下用來生成全局唯一ID的工具,一般要滿足下列特性:
所以我們就可以以第一位為符號位,后31位為毫秒級別自增的時間戳,后32為在reids中自增
/*** 生成策略是基于redis的自增長,需要有key對應的值不斷自增* 不同的業務對應不同的key* @param keyPrefix* @return*/
public Long nextId(String keyPrefix) {
//1.生成時間戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMSTAMP;//2.生成序列號,注入reids
//2.獲取當前的日期,精確到天
String data = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + data);
//3. 拼接并返回//ps:這里左移32位為時間戳,|運算符表示,相同為0,相異為1
return timestamp << COUNT_BTTS | count;
}
初步實現秒殺下單秒殺下單應該思考的內容:
下單時需要判斷兩點:
- 秒殺是否開始或結束,如果尚未開始或已經結束則無法下單
- 庫存是否充足,不足則無法下單
下單核心邏輯分析:
當用戶開始進行下單,我們應當去查詢優惠卷信息,查詢到優惠卷信息,判斷是否滿足秒殺條件
比如時間是否充足,如果時間充足,則進一步判斷庫存是否足夠,如果兩者都滿足,則扣減庫存,創建訂單,然后返回訂單id,如果有一個條件不滿足則直接結束。
樂觀鎖解決超賣問題
有關超賣問題分析:在我們原有代碼中是這么寫的
if (voucher.getStock() < 1) {// 庫存不足return Result.fail("庫存不足!");}//5,扣減庫存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).update();if (!success) {//扣減庫存return Result.fail("庫存不足!");}
假設線程1過來查詢庫存,判斷出來庫存大于1,正準備去扣減庫存,但是還沒有來得及去扣減,此時線程2過來,線程2也去查詢庫存,發現這個數量一定也大于1,那么這兩個線程都會去扣減庫存,最終多個線程相當于一起去扣減庫存,此時就會出現庫存的超賣問題。
超賣問題是典型的多線程安全問題,針對這一問題的常見解決方案就是加鎖:而對于加鎖,我們通常有兩種解決方案:見下圖:
悲觀鎖:
悲觀鎖可以實現對于數據的串行化執行,比如syn,和lock都是悲觀鎖的代表,同時,悲觀鎖中又可以再細分為公平鎖,非公平鎖,可重入鎖,等等
樂觀鎖:
樂觀鎖:會有一個版本號,每次操作數據會對版本號+1,再提交回數據時,會去校驗是否比之前的版本大1 ,如果大1 ,則進行操作成功,這套機制的核心邏輯在于,如果在操作過程中,版本號只比原來大1 ,那么就意味著操作過程中沒有人對他進行過修改,他的操作就是安全的,如果不大1,則數據被修改過,當然樂觀鎖還有一些變種的處理方式,比如cas:
樂觀鎖的典型代表:就是cas,利用cas進行無鎖化機制加鎖,var5 是操作前讀取的內存值,while中的var1+var2 是預估值,如果預估值 == 內存值,則代表中間沒有被人修改過,此時就將新值去替換 內存值
其中do while 是為了在操作失敗時,再次進行自旋操作,即把之前的邏輯再操作一次。
int var5;
do {var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));return var5;
悲觀鎖解決一人一單
需求:修改秒殺業務,要求同一個優惠券,一個用戶只能下一單
現在的問題在于:
優惠卷是為了引流,但是目前的情況是,一個人可以無限制的搶這個優惠卷,所以我們應當增加一層邏輯,讓一個用戶只能下一個單,而不是讓一個用戶下多個單
具體操作邏輯如下:比如時間是否充足,如果時間充足,則進一步判斷庫存是否足夠,然后再根據優惠卷id和用戶id查詢是否已經下過這個訂單,如果下過這個訂單,則不再下單,否則進行下單
VoucherOrderServiceImpl
初步代碼:增加一人一單邏輯
if (count > 0) {// 用戶已經購買過了return Result.fail("用戶已經購買過一次!");}
存在問題:現在的問題還是和之前一樣,并發過來,查詢數據庫,都不存在訂單,所以我們還是需要加鎖,但是樂觀鎖比較適合更新數據,而現在是插入數據,所以我們需要使用悲觀鎖操作
注意:在這里提到了非常多的問題,我們需要慢慢的來思考,首先我們的初始方案是封裝了一個createVoucherOrder方法,同時為了確保他線程安全,在方法上添加了一把synchronized 鎖
只要有一個線程進入該方法,其他任何線程想要進入這個對象實例的這個方法,都得等待,不管這些線程操作的數據是不是相互獨立
@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {//代碼return Result.ok(orderId);
}
,但是這樣添加鎖,鎖的粒度太粗了,在使用鎖過程中,控制鎖粒度 是一個非常重要的事情,因為如果鎖的粒度太大,會導致每個線程進來都會鎖住,所以我們需要去控制鎖的粒度,以下這段代碼需要修改為:
intern() 這個方法是從常量池中拿到數據,如果我們直接使用userId.toString() 他拿到的對象實際上是不同的對象,new出來的對象,我們使用鎖必須保證鎖必須是同一把,所以我們需要使用intern()方法
@Transactional
public Result createVoucherOrder(Long voucherId) {synchronized(userId.toString().intern()){// 5.1.查詢訂單int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判斷是否存在if (count > 0) {// 用戶已經購買過了return Result.fail("用戶已經購買過一次!");}// 6.扣減庫存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣減失敗return Result.fail("庫存不足!");}// 7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1.訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2.用戶idvoucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7.返回訂單idreturn Result.ok(orderId);}
}
但是以上代碼還是存在問題,問題的原因在于當前方法被spring的事務控制,如果你在方法內部加鎖,可能會導致當前方法事務還沒有提交,但是鎖已經釋放也會導致問題,所以我們選擇將當前方法整體包裹起來,確保事務不會出現問題:如下:
在seckillVoucher 方法中,添加以下邏輯,這樣就能保證事務的特性,同時也控制了鎖的粒度
但是以上做法依然有問題,因為你調用的方法,其實是this.的方式調用的,事務想要生效,還得利用代理來生效,所以這個地方,我們需要獲得原始的事務對象, 來操作事務
synchronized在集群模式下會失效,我們需要用到下面的分布式鎖
分布式鎖解決集群模式下的秒殺問題
通過加鎖可以解決在單機情況下的一人一單安全問題,但是在集群模式下就不行了。
1、我們將服務啟動兩份,端口分別為8081和8082:
2、然后修改nginx的conf目錄下的nginx.conf文件,配置反向代理和負載均衡:
有關鎖失效原因分析
由于現在我們部署了多個tomcat,每個tomcat都有一個屬于自己的jvm,那么假設在服務器A的tomcat內部,有兩個線程,這兩個線程由于使用的是同一份代碼,那么他們的鎖對象是同一個,是可以實現互斥的,但是如果現在是服務器B的tomcat內部,又有兩個線程,但是他們的鎖對象寫的雖然和服務器A一樣,但是鎖對象卻不是同一個,所以線程3和線程4可以實現互斥,但是卻無法和線程1和線程2實現互斥,這就是 集群環境下,syn鎖失效的原因,在這種情況下,我們就需要使用分布式鎖來解決這個問題。
分布式鎖
所以我們需要跨進程的鎖
分布式鎖的核心思想就是讓大家都使用同一把鎖,只要大家使用的是同一把鎖,那么我們就能鎖住線程,不讓線程進行,讓程序串行執行,這就是分布式鎖的核心思路
那么分布式鎖他應該滿足一些什么樣的條件呢?
可見性:多個線程都能看到相同的結果,注意:這個地方說的可見性并不是并發編程中指的內存可見性,只是說多個進程之間都能感知到變化的意思
互斥:互斥是分布式鎖的最基本的條件,使得程序串行執行
高可用:程序不易崩潰,時時刻刻都保證較高的可用性
高性能:由于加鎖本身就讓性能降低,所有對于分布式鎖本身需要他就較高的加鎖性能和釋放鎖性能
安全性:安全也是程序中必不可少的一環
常見的分布式鎖有三種:
Mysql:mysql本身就帶有鎖機制,但是由于mysql性能本身一般,所以采用分布式鎖的情況下,其實使用mysql作為分布式鎖比較少見
Redis:redis作為分布式鎖是非常常見的一種使用方式,現在企業級開發中基本都使用redis或者zookeeper作為分布式鎖,利用setnx這個方法,如果插入key成功,則表示獲得到了鎖,如果有人插入成功,其他人插入失敗則表示無法獲得到鎖,利用這套邏輯來實現分布式鎖
Zookeeper:zookeeper也是企業級開發中較好的一個實現分布式鎖的方案,由于本套視頻并不講解zookeeper的原理和分布式鎖的實現,所以不過多闡述
實現分布式鎖時需要實現的兩個基本方法:
- 獲取鎖:
-
- 互斥:確保只能有一個線程獲取鎖
- 非阻塞:嘗試一次,成功返回true,失敗返回false
- 釋放鎖:
-
- 手動釋放
- 超時釋放:獲取鎖時添加一個超時時間
要具備原子性,獲取鎖要么都成功要么都失敗
SET lock thread1 EX 10 NX,nx代表沒有就創建,有就不創建,Ex表示如果發生宕機,會在10s后自動釋放
核心思路:
我們利用redis 的SETNX 方法,當有多個線程進入時,我們就利用該方法,第一個線程進入時,redis 中就有這個key 了,返回了1,如果結果是1,則表示他搶到了鎖,那么他去執行業務,然后再刪除鎖,退出鎖邏輯,沒有搶到鎖的哥們,等待一定時間后重試即可
實現分布式鎖方案
SimpleRedisLock
利用setnx方法進行加鎖,同時增加過期時間,防止死鎖,此方法可以保證加鎖和增加過期時間具有原子性
private static final String KEY_PREFIX="lock:"
@Override
public boolean tryLock(long timeoutSec) {// 獲取線程標示String threadId = Thread.currentThread().getId()// 獲取鎖Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);
}
- 釋放鎖邏輯
釋放鎖,防止刪除別人的鎖
public void unlock() {//通過del刪除鎖stringRedisTemplate.delete(KEY_PREFIX + name);
}
解決Redis分布式鎖誤刪問題
需求:修改之前的分布式鎖實現,滿足:在獲取鎖時存入線程標示(可以用UUID表示)
在釋放鎖時先獲取鎖中的線程標示,判斷是否與當前線程標示一致
- 如果一致則釋放鎖
- 如果不一致則不釋放鎖
之前我們用的是線程的id,在jvm內部線程id是一個遞增的數字,當我們在集群的模式下,多個jvm很有可能會出現線程id沖突的問題
所以說我們直接使用線程id區分鎖是不夠的,我們需要用UUID來區分不同的服務(jvm),再用線程id區分不同的線程,兩者結合
核心邏輯:在存入鎖時,放入自己線程的標識,在刪除鎖時我們利用UUID對每個鎖獨特標識,判斷當前這把鎖的標識是不是自己存入的,如果是,則進行刪除,如果不是,則不進行刪除。
同時,在更極端的情況下例如:
當線程1需要釋放鎖的時候,發生了阻塞,導致鎖被超時自動釋放,然后我們的線程2獲取到了鎖,剛好線程1又不阻塞了,這時候線程1要釋放鎖,因為前文已經做過判斷了,所以這里很可能因為判斷鎖和釋放鎖沒有保持原子性導致線程2的鎖被釋放
所以我們可以引入lua腳本來保證鎖的原子性,同時觀察redission的底層源碼,我們也會發現,redission的底層也是采用lua腳本的方案
總結上文:
基于Redis的分布式鎖實現思路:
- 利用set nx ex獲取鎖,并設置過期時間,保存線程標示
- 釋放鎖時先判斷線程標示是否與自己一致,一致則刪除鎖
-
- 特性:
-
-
- 利用set nx滿足互斥性
- 利用set ex,設置超時時間,保證故障時鎖依然能釋放,避免死鎖,提高安全性
- 利用Redis集群保證高可用和高并發特性
-
我們一路走來,利用添加過期時間,防止死鎖問題的發生,但是有了過期時間之后,可能出現誤刪別人鎖的問題,這個問題我們開始是利用刪之前 通過拿鎖,比鎖,刪鎖這個邏輯來解決的,也就是刪之前判斷一下當前這把鎖是否是屬于自己的,但是現在還有原子性問題,也就是我們沒法保證拿鎖比鎖刪鎖是一個原子性的動作,最后通過lua表達式來解決這個問題
四、Reidssion
1. 分布式鎖-redission功能介紹
基于setnx實現的分布式鎖存在下面的問題:
重入問題:重入問題是指 獲得鎖的線程可以再次進入到相同的鎖的代碼塊中,可重入鎖的意義在于防止死鎖,比如HashTable這樣的代碼中,他的方法都是使用synchronized修飾的,假如他在一個方法內,調用另一個方法,那么此時如果是不可重入的,不就死鎖了嗎?所以可重入鎖他的主要意義是防止死鎖,我們的synchronized和Lock鎖都是可重入的。
不可重試:是指目前的分布式只能嘗試一次,我們認為合理的情況是:當線程在獲得鎖失敗后,他應該能再次嘗試獲得鎖。
**超時釋放:**我們在加鎖時增加了過期時間,這樣的我們可以防止死鎖,但是如果卡頓的時間超長,雖然我們采用了lua表達式防止刪鎖的時候,誤刪別人的鎖,但是畢竟沒有鎖住,有安全隱患
主從一致性: 如果Redis提供了主從集群,當我們向集群寫數據時,主機需要異步的將數據同步給從機,而萬一在同步過去之前,主機宕機了,就會出現死鎖問題。
那么什么是Redission呢
Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務,其中就包含了各種分布式鎖的實現。
Redission提供了分布式鎖的多種多樣的功能
2. 分布式鎖-redission可重入鎖原理
在Lock鎖中,他是借助于底層的一個voaltile的一個state變量來記錄重入的狀態的,比如當前沒有人持有這把鎖,那么state=0,假如有人持有這把鎖,那么state=1,如果持有這把鎖的人再次持有這把鎖,那么state就會+1 ,如果是對于synchronized而言,他在c語言代碼中會有一個count,原理和state類似,也是重入一次就加一,釋放一次就-1 ,直到減少成0 時,表示當前這把鎖沒有被人持有。
在redission中,我們的也支持支持可重入鎖
在分布式鎖中,他采用hash結構用來存儲鎖,其中大key表示表示這把鎖是否存在,用小key表示當前這把鎖被哪個線程持有,所以接下來我們一起分析一下當前的這個lua表達式
這個地方一共有3個參數
KEYS[1] : 鎖名稱
ARGV[1]: 鎖失效時間
ARGV[2]: id + ":" + threadId; 鎖的小key (hash結構的vaule)
exists
: 判斷數據是否存在 name:是lock是否存在,如果==0,就表示當前這把鎖不存在
redis.call('hset', KEYS[1], ARGV[2], 1);
此時他就開始往redis里邊去寫數據 ,寫成一個hash結構
Lock{
id + ":" + threadId : 1
}
如果當前這把鎖存在,則第一個條件不滿足,再判斷
redis.call('hexists', KEYS[1], ARGV[2]) == 1
此時需要通過大key+小key判斷當前這把鎖是否是屬于自己的,如果是自己的,則進行
redis.call('hincrby', KEYS[1], ARGV[2], 1)
將當前這個鎖的value進行+1 ,redis.call('pexpire', KEYS[1], ARGV[1]);
然后再對其設置過期時間,如果以上兩個條件都不滿足,則表示當前這把鎖搶鎖失敗,最后返回pttl,即為當前這把鎖的失效時間
如果小伙幫們看了前邊的源碼, 你會發現他會去判斷當前這個方法的返回值是否為null,如果是null,則對應則前兩個if對應的條件,退出搶鎖邏輯,如果返回的不是null,即走了第三個分支,在源碼處會進行while(true)的自旋搶鎖。
"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);"
//獲取鎖失敗返回剩余有效期,單位為毫秒
3. 分布式鎖-redission鎖重試和WatchDog機制
說明:由于課程中已經說明了有關tryLock的源碼解析以及其看門狗原理,所以筆者在這里給大家分析lock()方法的源碼解析,希望大家在學習過程中,能夠掌握更多的知識
搶鎖過程中,獲得當前線程,通過tryAcquire進行搶鎖,該搶鎖邏輯和之前邏輯相同
1、先判斷當前這把鎖是否存在,如果不存在,插入一把鎖,返回null
2、判斷當前這把鎖是否是屬于當前線程,如果是,則返回null
所以如果返回是null,則代表著當前這哥們已經搶鎖完畢,或者可重入完畢,但是如果以上兩個條件都不滿足,則進入到第三個條件,返回的是鎖的失效時間,同學們可以自行往下翻一點點,你能發現有個while( true) 再次進行tryAcquire進行搶鎖
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {return;
}
接下來會有一個條件分支,因為lock方法有重載方法,一個是帶參數,一個是不帶參數,如果帶帶參數傳入的值是-1,如果傳入參數,則easeTime
是他本身,所以如果傳入了參數,此時leaseTime != -1
則會進去搶鎖,搶鎖的邏輯就是之前說的那三個邏輯
if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
如果是沒有傳入時間,則此時也會進行搶鎖, 而且搶鎖時間是默認看門狗時間 commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()
ttlRemainingFuture.onComplete((ttlRemaining, e)
這句話相當于對以上搶鎖進行了監聽,也就是說當上邊搶鎖完畢后,此方法會被調用,具體調用的邏輯就是去后臺開啟一個線程,進行續約邏輯,也就是看門狗線程
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}
});
return ttlRemainingFuture;
此邏輯就是續約邏輯,注意看commandExecutor.getConnectionManager().newTimeout() 此方法
Method( new TimerTask() {},參數2 ,參數3 )
指的是:通過參數2,參數3 去描述什么時候去做參數1的事情,現在的情況是:10s之后去做參數一的事情
因為鎖的失效時間是30s,當10s之后,此時這個timeTask 就觸發了,他就去進行續約,把當前這把鎖續約成30s,如果操作成功,那么此時就會遞歸調用自己,再重新設置一個timeTask(),于是再過10s后又再設置一個timerTask,完成不停的續約
那么大家可以想一想,假設我們的線程出現了宕機他還會續約嗎?當然不會,因為沒有人再去調用renewExpiration這個方法,所以等到時間之后自然就釋放了。
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}
4. 分布式鎖-redission鎖的MutiLock原理
為了提高redis的可用性,我們會搭建集群或者主從,現在以主從為例
此時我們去寫命令,寫在主機上, 主機會將數據同步給從機,但是假設在主機還沒有來得及把數據寫入到從機去的時候,此時主機宕機,哨兵會發現主機宕機,并且選舉一個slave變成master,而此時新的master中實際上并沒有鎖信息,此時鎖信息就已經丟掉了。
為了解決這個問題,redission提出來了MutiLock鎖,使用這把鎖咱們就不使用主從了,每個節點的地位都是一樣的, 這把鎖加鎖的邏輯需要寫入到每一個主叢節點上,只有所有的服務器都寫入成功,此時才是加鎖成功,假設現在某個節點掛了,那么他去獲得鎖的時候,只要有一個節點拿不到,都不能算是加鎖成功,就保證了加鎖的可靠性。
那么MutiLock 加鎖原理是什么呢?筆者畫了一幅圖來說明
當我們去設置了多個鎖時,redission會將多個鎖添加到一個集合中,然后用while循環去不停去嘗試拿鎖,但是會有一個總共的加鎖時間,這個時間是用需要加鎖的個數 * 1500ms ,假設有3個鎖,那么時間就是4500ms,假設在這4500ms內,所有的鎖都加鎖成功, 那么此時才算是加鎖成功,如果在4500ms有線程加鎖失敗,則會再次去進行重試.
五、異步秒殺優化
當用戶發起請求,此時會請求nginx,nginx會訪問到tomcat,而tomcat中的程序,會進行串行操作,分成如下幾個步驟
1、查詢優惠卷
2、判斷秒殺庫存是否足夠
3、查詢訂單
4、校驗是否是一人一單
5、扣減庫存
6、創建訂單
在這六步操作中,又有很多操作是要去操作數據庫的(數據庫操作比較緩慢),而且還是一個線程串行執行, 這樣就會導致我們的程序執行的很慢,所以我們需要異步程序執行,那么如何加速呢?
在這里筆者想給大家分享一下課程內沒有的思路,看看有沒有小伙伴這么想,比如,我們可以不可以使用異步編排來做,或者說我開啟N多線程,N多個線程,一個線程執行查詢優惠卷,一個執行判斷扣減庫存,一個去創建訂單等等,然后再統一做返回,這種做法和課程中有哪種好呢?答案是課程中的好,因為如果你采用我剛說的方式,如果訪問的人很多,那么線程池中的線程可能一下子就被消耗完了,而且你使用上述方案,最大的特點在于,你覺得時效性會非常重要,但是你想想是嗎?(并不是,比如我只要確定他能做這件事,然后我后邊慢慢做就可以了,我并不需要他一口氣做完這件事),所以我們應當采用的是課程中,類似消息隊列的方式來完成我們的需求,而不是使用線程池或者是異步編排的方式來完成這個需求。
綜上,我們直接來引入redisStream消息隊列
Redis消息隊列-基于Stream的消息隊列-消費者組
消費者組(Consumer Group):將多個消費者劃分到一個組中,監聽同一個隊列。具備下列特點:
創建消費者組:
key:隊列名稱
groupName:消費者組名稱
ID:起始ID標示,$代表隊列中最后一個消息,0則代表隊列中第一個消息
MKSTREAM:隊列不存在時自動創建隊列
其它常見命令:
刪除指定的消費者組
XGROUP DESTORY key groupNam e
給指定的消費者組添加消費者(
XGROUP CREATECONSUMER key groupname consumername
刪除消費者組中的指定消費者
XGROUP DELCONSUMER key groupname consumername
從消費者組讀取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID
[ID ...]
- group:消費組名稱
- consumer:消費者名稱,如果消費者不存在,會自動創建一個消費者
- count:本次查詢的最大數量
- BLOCK milliseconds:當沒有消息時最長等待時間
- NOACK:無需手動ACK,獲取到消息后自動確認(給了NOACK代表不用消費者確認,消息投遞給消費者會自動確認)
- STREAMS key:指定隊列名稱
- ID:獲取消息的起始ID:
">":從下一個未消費的消息開始,確保所有消息都會被消費
其它:根據指定id從pending-list中獲取已消費但未確認的消息,(例如0,是從pending-list中的第一個消息開始)
結尾跟0,代表讀取pending-list 中的第一條消息
XPENDING key group [空閑時間] start(最大-) end(最小+) cout
命令格式:
XPENDING key group [start end count] [consumer]
其中:
key
:流的鍵名(stream 的 key)group
:消費者組名(consumer group)[start end count]
:可選參數,用于限制返回的 pending 條目范圍:
-
start
:最小的消息 ID(可以用-
表示最小)end
:最大的消息 ID(可以用+
表示最大)count
:返回的最大條數
[consumer]
:可選參數,只查詢指定消費者名的 pending 條目
正確使用示例:
XPENDING mystream mygroup - + 10
含義:
- 在流
mystream
中, - 查看消費者組
mygroup
的 pending 消息, - 消息 ID 范圍是從最小 (
-
) 到最大 (+
), - 最多返回 10 條消息的詳細信息。
返回內容類似:
bash復制編輯1) 1) "1686748123145-0" # 消息ID2) "consumer-1" # 消費者名稱3) (integer) 2000 # 空閑時間(毫秒)4) (integer) 3 # 被傳遞的次數
2) ...
STREAM類型消息隊列的XREADGROUP命令特點:
- 消息可回溯
- 可以多消費者爭搶消息,加快消費速度
- 可以阻塞讀取
- 沒有消息漏讀的風險
- 有消息確認機制,保證消息至少被消費一次
需求:
- 創建一個Stream類型的消息隊列,名為stream.orders
- 修改之前的秒殺下單Lua腳本,在認定有搶購資格后,直接向stream.orders中添加消息,內容包含voucherId、userId、orderId
- 項目啟動時,開啟一個線程任務,嘗試獲取stream.orders中的消息,完成下單
修改lua表達式,新增
VoucherOrderServiceImpl
private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1.獲取消息隊列中的訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 2.判斷訂單信息是否為空if (list == null || list.isEmpty()) {// 如果為null,說明沒有消息,繼續下一次循環continue;}// 解析數據MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.創建訂單createVoucherOrder(voucherOrder);// 4.確認消息 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("處理訂單異常", e);//處理異常消息handlePendingList();}}}private void handlePendingList() {while (true) {try {// 1.獲取pending-list中的訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("stream.orders", ReadOffset.from("0")));// 2.判斷訂單信息是否為空if (list == null || list.isEmpty()) {// 如果為null,說明沒有異常消息,結束循環break;}// 解析數據MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.創建訂單createVoucherOrder(voucherOrder);// 4.確認消息 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("處理pendding訂單異常", e);try{Thread.sleep(20);}catch(Exception e){e.printStackTrace();}}}}
}