黑馬點評Reids重點詳解(Reids使用重點)

目錄

一、短信登錄(redis+seesion)

基于Session實現登錄流程

🔄 圖中關鍵模塊解釋:

利用seesion登錄的問題

設計key的具體細節

整體訪問流程

二、商戶查詢緩存

reids與數據庫主動更新的三種方案

緩存穿透

緩存雪崩問題及解決思路

緩存擊穿問題及解決思路

三、優惠券秒殺

全局唯一ID

初步實現秒殺下單秒殺下單應該思考的內容:

樂觀鎖解決超賣問題

有關超賣問題分析:在我們原有代碼中是這么寫的

悲觀鎖解決一人一單

分布式鎖解決集群模式下的秒殺問題

分布式鎖

解決Redis分布式鎖誤刪問題

四、Reidssion

1. 分布式鎖-redission功能介紹

2. 分布式鎖-redission可重入鎖原理

3. 分布式鎖-redission鎖重試和WatchDog機制

4. 分布式鎖-redission鎖的MutiLock原理

五、異步秒殺優化

Redis消息隊列-基于Stream的消息隊列-消費者組


項目分為? ?

重點:短信登錄、商戶查詢緩存、優惠券秒殺、用戶簽到

這部分涉及到reids+seesion、緩存擊穿、緩存穿透、緩存雪崩、樂觀鎖,悲觀鎖、分布式鎖、與HyperLogLog的使用,對我們來說非常的重要

一、短信登錄(redis+seesion)

基于Session實現登錄流程

發送驗證碼:

用戶在提交手機號后,會校驗手機號是否合法,如果不合法,則要求用戶重新輸入手機號

如果手機號合法,后臺此時生成對應的驗證碼,同時將驗證碼進行保存,然后再通過短信的方式將驗證碼發送給用戶

短信驗證碼登錄、注冊:

用戶將驗證碼和手機號進行輸入,后臺從session中拿到當前驗證碼,然后和用戶輸入的驗證碼進行校驗,如果不一致,則無法通過校驗,如果一致,則后臺根據手機號查詢用戶,如果用戶不存在,則為用戶創建賬號信息,保存到數據庫,無論是否存在,都會將用戶信息保存到session中,方便后續獲得當前登錄信息

校驗登錄狀態:

用戶在請求時候,會從cookie中攜帶者JsessionId到后臺,后臺通過JsessionId從session中拿到用戶信息,如果沒有session信息,則進行攔截,如果有session信息,則將用戶信息保存到threadLocal中,并且放行

Tomcat運行原理

response:響應,反應

用戶通過 socket 發起 HTTP 請求,Tomcat 接收并分發請求到工作線程,由 Web 應用(如 Spring Boot)中的 Controller → Service → DAO 完成處理后,再將結果通過 response 寫回用戶。

🔄 圖中關鍵模塊解釋:

模塊

說明

用戶 socket

發起 TCP 連接,發送 HTTP 請求。

Tomcat

Java Web 容器,監聽端口、管理線程池、轉發請求到 Web 應用。

監聽線程

監聽 TCP 請求,交由線程池中的工作線程處理。

工作線程池

處理具體的請求(取 socket 數據、封裝 request/response)。

WebApp

實際部署在 Tomcat 里的 Web 應用,如使用 Spring MVC 開發的項目。

Controller

接收請求,調度服務層。

Service

處理業務邏輯。

DAO

與數據庫進行交互。

DB

后端數據庫。

利用seesion登錄的問題

每個tomcat中都有一份屬于自己的session,假設用戶第一次訪問第一臺tomcat,并且把自己的信息存放到第一臺服務器的session中,但是第二次這個用戶訪問到了第二臺tomcat,那么在第二臺服務器上,肯定沒有第一臺服務器存放的session,所以此時 整個登錄攔截功能就會出現問題,我們能如何解決這個問題呢?早期的方案是session拷貝,就是說雖然每個tomcat上都有不同的session,但是每當任意一臺服務器的session修改時,都會同步給其他的Tomcat服務器的session,這樣的話,就可以實現session的共享了

但是這種方案具有兩個大問題

1、每臺服務器中都有完整的一份session數據,服務器壓力過大。

2、session拷貝數據時,可能會出現延遲

所以咱們后來采用的方案都是基于redis來完成,我們把session換成redis,redis數據本身就是共享的,就可以避免session共享的問題了

首先我們要思考一下利用redis來存儲數據,那么到底使用哪種結構呢?由于存入的數據比較簡單,我們可以考慮使用String,或者是使用哈希,如下圖,如果使用String,同學們注意他的value,用多占用一點空間,如果使用哈希,則他的value中只會存儲他數據本身,如果不是特別在意內存,其實使用String就可以啦。

設計key的具體細節

所以我們可以使用String結構,就是一個簡單的key,value鍵值對的方式,但是關于key的處理,session他是每個用戶都有自己的session,但是redis的key是共享的,咱們就不能使用code了

在設計這個key的時候,我們之前講過需要滿足兩點

1、key要具有唯一性

2、key要方便攜帶

如果我們采用phone:手機號這個的數據來存儲當然是可以的,但是如果把這樣的敏感數據存儲到redis中并且從頁面中帶過來畢竟不太合適,所以我們在后臺生成一個隨機串token,然后讓前端帶來這個token就能完成我們的整體邏輯了

整體訪問流程

當注冊完成后,用戶去登錄會去校驗用戶提交的手機號和驗證碼,是否一致,如果一致,則根據手機號查詢用戶信息,不存在則新建,最后將用戶數據保存到redis,并且生成token作為redis的key,當我們校驗用戶是否登錄時,會去攜帶著token進行訪問,從redis中取出token對應的value,判斷是否存在這個數據,如果沒有則攔截,如果存在則將其保存到threadLocal中,并且放行。

public class RefreshTokenInterceptor implements HandlerInterceptor {private StringRedisTemplate stringRedisTemplate;public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 1.獲取請求頭中的tokenString token = request.getHeader("authorization");if (StrUtil.isBlank(token)) {return true;}// 2.基于TOKEN獲取redis中的用戶String key  = LOGIN_USER_KEY + token;Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);// 3.判斷用戶是否存在if (userMap.isEmpty()) {return true;}// 5.將查詢到的hash數據轉為UserDTOUserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);// 6.存在,保存用戶信息到 ThreadLocalUserHolder.saveUser(userDTO);// 7.刷新token有效期stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);// 8.放行return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {// 移除用戶UserHolder.removeUser();}
}//設置攔截器校驗
public class LoginInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 1.判斷是否需要攔截(ThreadLocal中是否有用戶)if (UserHolder.getUser() == null) {// 沒有,需要攔截,設置狀態碼response.setStatus(401);// 攔截,會返回等錄界面return false;}// 有用戶,則放行return true;}
}

二、商戶查詢緩存

實際開發中,會構筑多級緩存來使系統運行速度進一步提升,例如:本地緩存與redis中的緩存并發使用

瀏覽器緩存:主要是存在于瀏覽器端的緩存

應用層緩存:可以分為tomcat本地緩存,比如之前提到的map,或者是使用redis作為緩存

數據庫緩存:在數據庫中有一片空間是 buffer pool,增改查數據都會先加載到mysql的緩存中

CPU緩存:當代計算機最大的問題是 cpu性能提升了,但內存讀寫速度沒有跟上,所以為了適應當下的情況,增加了cpu的L1,L2,L3級的緩存

由于數據庫的查詢速度比較慢,而且在大量查詢條件下,數據庫的查詢有可能失敗和造成數據庫崩潰,所以我們可以添加一層緩存中間件,叫做redis

代碼思路:如果緩存有,則直接返回,如果緩存不存在,則查詢數據庫,然后存入redis。

reids與數據庫主動更新的三種方案

由于我們的緩存的數據源來自于數據庫,而數據庫的數據是會發生變化的,因此,如果當數據庫中數據發生變化,而緩存卻沒有同步,此時就會有一致性問題存在,其后果是:

用戶使用緩存中的過時數據,就會產生類似多線程數據安全問題,從而影響業務,產品口碑等;怎么解決呢?有如下幾種方案

  • Cache Aside Pattern 人工編碼方式:緩存調用者在更新完數據庫后再去更新緩存,也稱之為雙寫方案
  • Read/Write Through Pattern : 由系統本身完成,數據庫與緩存的問題交由系統本身去處理
  • Write Behind Caching Pattern :調用者只操作緩存,其他線程去異步處理數據庫,實現最終一致(用一個獨立的線程, 異步的去看一看緩存中的數據和數據庫中的數據的一致性,進行異步的更新,效率比較高),如果緩存出現了宕機,那么就會產生數據丟失的問題

綜合考慮使用方案一,但是方案一調用者如何處理呢?這里有幾個問題

操作緩存和數據庫時有三個問題需要考慮:

如果采用第一個方案,那么假設我們每次操作數據庫后,都操作緩存,但是中間如果沒有人查詢,那么這個更新動作實際上只有最后一次生效,中間的更新動作意義并不大,我們可以把緩存刪除,等待再次查詢時,將緩存中的數據加載出來

  • 刪除緩存還是更新緩存?
    • 更新緩存:每次更新數據庫都更新緩存,無效寫操作較多
    • 刪除緩存:更新數據庫時讓緩存失效,查詢時再更新緩存
  • 如何保證緩存與數據庫的操作的同時成功或失敗?(添加事務)
    • 單體系統,將緩存與數據庫操作放在一個事務
    • 分布式系統,利用TCC等分布式事務方案(確保都能成功或者失敗

應該具體操作緩存還是操作數據庫,我們應當是先操作數據庫,再刪除緩存,原因在于,如果你選擇第一種方案,在兩個線程并發來訪問時,假設線程1先來,他先把緩存刪了,此時線程2過來,他查詢緩存數據并不存在,此時他寫入緩存,當他寫入緩存后,線程1再執行更新動作時,實際上寫入的就是舊的數據,新的數據被舊數據覆蓋了。

  • 先操作緩存還是先操作數據庫?
    • 先刪除緩存,再操作數據庫
    • 先操作數據庫,再刪除緩存

綜上所述,先操作數據庫,再刪除緩存更優

緩存穿透

緩存穿透 :緩存穿透是指客戶端請求的數據在緩存中和數據庫中都不存在,這樣緩存永遠不會生效,這些請求都會打到數據庫。用戶請求的數據在緩存中和數據庫中都不存在,不斷發起這樣的請求,給數據庫帶來巨大壓力

常見的解決方案有兩種:

  • 緩存空對象
    • 優點:實現簡單,維護方便
    • 缺點:
      • 額外的內存消耗
      • 可能造成短期的不一致
  • 布隆過濾
    • 優點:內存占用較少,沒有多余key
    • 缺點:
      • 實現復雜
      • 存在誤判可能

緩存空對象思路分析:當我們客戶端訪問不存在的數據時,先請求redis,但是此時redis中沒有數據,此時會訪問到數據庫,但是數據庫中也沒有數據,這個數據穿透了緩存,直擊數據庫,我們都知道數據庫能夠承載的并發不如redis這么高,如果大量的請求同時過來訪問這種不存在的數據,這些請求就都會訪問到數據庫,簡單的解決方案就是哪怕這個數據在數據庫中也不存在,我們也把這個數據存入到redis中去(設置比較短的TTL),這樣,下次用戶過來訪問這個不存在的數據,那么在redis中也能找到這個數據就不會進入到緩存了

布隆過濾:布隆過濾器其實采用的是哈希思想來解決這個問題,通過一個龐大的二進制數組,走哈希思想去判斷當前這個要查詢的這個數據是否存在,如果布隆過濾器判斷存在,則放行,這個請求會去訪問redis,哪怕此時redis中的數據過期了,但是數據庫中一定存在這個數據,在數據庫中查詢出來這個數據后,再將其放入到redis中,

假設布隆過濾器判斷這個數據不存在,則直接返回

這種方式優點在于節約內存空間,存在誤判,誤判原因在于:布隆過濾器走的是哈希思想,只要哈希思想,就可能存在哈希沖突

緩存穿透的解決方案有哪些?

  • 緩存null值(可能會存在短期不一致性)
  • 布隆過濾(算法)
  • 增強id的復雜度,避免被猜測id規律
  • 做好數據的基礎格式校驗
  • 加強用戶權限校驗
  • 做好熱點參數的限流

緩存雪崩問題及解決思路

緩存雪崩是指在同一時段大量的緩存key同時失效或者Redis服務宕機,導致大量請求到達數據庫,帶來巨大壓力。

解決方案:

  • 給不同的Key的TTL添加隨機值
  • 利用Redis集群提高服務的可用性(哨兵機制)
  • 給緩存業務添加降級限流策略
  • 給業務添加多級緩存(微服務),應對億級以上的并發

緩存擊穿問題及解決思路

緩存擊穿問題也叫熱點Key問題,就是一個被高并發訪問并且緩存重建業務較復雜的key突然失效了,無數的請求訪問會在瞬間給數據庫帶來巨大的沖擊。

常見的解決方案有兩種:

  • 互斥鎖
  • 邏輯過期

邏輯分析:假設線程1在查詢緩存之后,本來應該去查詢數據庫,然后把這個數據重新加載到緩存的,此時只要線程1走完這個邏輯,其他線程就都能從緩存中加載這些數據了,但是假設在線程1沒有走完的時候,后續的線程2,線程3,線程4同時過來訪問當前這個方法, 那么這些線程都不能從緩存中查詢到數據,那么他們就會同一時刻來訪問查詢緩存,都沒查到,接著同一時間去訪問數據庫,同時的去執行數據庫代碼,對數據庫訪問壓力過大

解決方案一、使用鎖來解決:

因為鎖能實現互斥性。假設線程過來,只能一個人一個人的來訪問數據庫,從而避免對于數據庫訪問壓力過大,但這也會影響查詢的性能,因為此時會讓查詢的性能從并行變成了串行,我們可以采用tryLock方法 + double check來解決這樣的問題。

假設現在線程1過來訪問,他查詢緩存沒有命中,但是此時他獲得到了鎖的資源,那么線程1就會一個人去執行邏輯,假設現在線程2過來,線程2在執行過程中,并沒有獲得到鎖,那么線程2就可以進行到休眠,直到線程1把鎖釋放后,線程2獲得到鎖,然后再來執行邏輯,此時就能夠從緩存中拿到數據了。

解決方案二、邏輯過期方案

方案分析:我們之所以會出現這個緩存擊穿問題,主要原因是在于我們對key設置了過期時間,假設我們不設置過期時間,其實就不會有緩存擊穿的問題,但是不設置過期時間,這樣數據不就一直占用我們內存了嗎,我們可以采用邏輯過期方案。

我們把過期時間設置在 redis的value中,注意:這個過期時間并不會直接作用于redis,而是我們后續通過邏輯去處理。假設線程1去查詢緩存,然后從value中判斷出來當前的數據已經過期了,此時線程1去獲得互斥鎖,那么其他線程會進行阻塞,獲得了鎖的線程他會開啟一個 線程去進行 以前的重構數據的邏輯,直到新開的線程完成這個邏輯后,才釋放鎖, 而線程1直接進行返回,假設現在線程3過來訪問,由于線程線程2持有著鎖,所以線程3無法獲得鎖,線程3也直接返回數據,只有等到新開的線程2把重建數據構建完后,其他線程才能走返回正確的數據。

這種方案巧妙在于,異步的構建緩存,缺點在于在構建完緩存之前,返回的都是臟數據。

進行對比

**互斥鎖方案:**由于保證了互斥性,所以數據一致,且實現簡單,因為僅僅只需要加一把鎖而已,也沒其他的事情需要操心,所以沒有額外的內存消耗,缺點在于有鎖就有死鎖問題的發生,且只能串行執行性能肯定受到影響

邏輯過期方案: 線程讀取過程中不需要等待,性能好,有一個額外的線程持有鎖去進行重構數據,但是在重構數據完成前,其他的線程只能返回之前的數據,且實現起來麻煩

因為現在redis中存儲的數據的value需要帶上過期時間,此時要么去修改原來的實體類,要么新建一個實體類,我們采用第二個方案,這個方案,對原來代碼沒有侵入性。

@Data
public class RedisData {private LocalDateTime expireTime;private Object data;
}

ShopServiceImpl

private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public Shop queryWithLogicalExpire( Long id ) {String key = CACHE_SHOP_KEY + id;// 1.從redis查詢商鋪緩存String json = stringRedisTemplate.opsForValue().get(key);// 2.判斷是否存在if (StrUtil.isBlank(json)) {// 3.存在,直接返回return null;}// 4.命中,需要先把json反序列化為對象RedisData redisData = JSONUtil.toBean(json, RedisData.class);Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);LocalDateTime expireTime = redisData.getExpireTime();// 5.判斷是否過期if(expireTime.isAfter(LocalDateTime.now())) {// 5.1.未過期,直接返回店鋪信息return shop;}// 5.2.已過期,需要緩存重建// 6.緩存重建// 6.1.獲取互斥鎖String lockKey = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);// 6.2.判斷是否獲取鎖成功if (isLock){CACHE_REBUILD_EXECUTOR.submit( ()->{try{//重建緩存this.saveShop2Redis(id,20L);}catch (Exception e){throw new RuntimeException(e);}finally {unlock(lockKey);}});}// 6.4.返回過期的商鋪信息return shop;
}

三、優惠券秒殺

全局唯一ID

當用戶搶購時,就會生成訂單并保存到tb_voucher_order這張表中,而訂單表如果使用數據庫自增ID就存在一些問題:

  • id的規律性太明顯
  • 受單表數據量的限制

場景分析:如果我們的id具有太明顯的規則,用戶或者說商業對手很容易猜測出來我們的一些敏感信息,比如商城在一天時間內,賣出了多少單,這明顯不合適。

場景分析二:隨著我們商城規模越來越大,mysql的單表的容量不宜超過500W,數據量過大之后,我們要進行拆庫拆表,但拆分表了之后,他們從邏輯上講他們是同一張表,所以他們的id是不能一樣的, 于是乎我們需要保證id的唯一性。

全局ID生成器,是一種在分布式系統下用來生成全局唯一ID的工具,一般要滿足下列特性:

所以我們就可以以第一位為符號位,后31位為毫秒級別自增的時間戳,后32為在reids中自增

/*** 生成策略是基于redis的自增長,需要有key對應的值不斷自增* 不同的業務對應不同的key* @param keyPrefix* @return*/
public Long nextId(String keyPrefix) {
//1.生成時間戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMSTAMP;//2.生成序列號,注入reids
//2.獲取當前的日期,精確到天
String data = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + data);
//3. 拼接并返回//ps:這里左移32位為時間戳,|運算符表示,相同為0,相異為1
return timestamp << COUNT_BTTS | count;
}

初步實現秒殺下單秒殺下單應該思考的內容:

下單時需要判斷兩點:

  • 秒殺是否開始或結束,如果尚未開始或已經結束則無法下單
  • 庫存是否充足,不足則無法下單

下單核心邏輯分析:

當用戶開始進行下單,我們應當去查詢優惠卷信息,查詢到優惠卷信息,判斷是否滿足秒殺條件

比如時間是否充足,如果時間充足,則進一步判斷庫存是否足夠,如果兩者都滿足,則扣減庫存,創建訂單,然后返回訂單id,如果有一個條件不滿足則直接結束。

樂觀鎖解決超賣問題
有關超賣問題分析:在我們原有代碼中是這么寫的
 if (voucher.getStock() < 1) {// 庫存不足return Result.fail("庫存不足!");}//5,扣減庫存boolean success = seckillVoucherService.update().setSql("stock= stock -1").eq("voucher_id", voucherId).update();if (!success) {//扣減庫存return Result.fail("庫存不足!");}

假設線程1過來查詢庫存,判斷出來庫存大于1,正準備去扣減庫存,但是還沒有來得及去扣減,此時線程2過來,線程2也去查詢庫存,發現這個數量一定也大于1,那么這兩個線程都會去扣減庫存,最終多個線程相當于一起去扣減庫存,此時就會出現庫存的超賣問題。

超賣問題是典型的多線程安全問題,針對這一問題的常見解決方案就是加鎖:而對于加鎖,我們通常有兩種解決方案:見下圖:

悲觀鎖:

悲觀鎖可以實現對于數據的串行化執行,比如syn,和lock都是悲觀鎖的代表,同時,悲觀鎖中又可以再細分為公平鎖,非公平鎖,可重入鎖,等等

樂觀鎖:

樂觀鎖:會有一個版本號,每次操作數據會對版本號+1,再提交回數據時,會去校驗是否比之前的版本大1 ,如果大1 ,則進行操作成功,這套機制的核心邏輯在于,如果在操作過程中,版本號只比原來大1 ,那么就意味著操作過程中沒有人對他進行過修改,他的操作就是安全的,如果不大1,則數據被修改過,當然樂觀鎖還有一些變種的處理方式,比如cas:

樂觀鎖的典型代表:就是cas,利用cas進行無鎖化機制加鎖,var5 是操作前讀取的內存值,while中的var1+var2 是預估值,如果預估值 == 內存值,則代表中間沒有被人修改過,此時就將新值去替換 內存值

其中do while 是為了在操作失敗時,再次進行自旋操作,即把之前的邏輯再操作一次。

int var5;
do {var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));return var5;

悲觀鎖解決一人一單

需求:修改秒殺業務,要求同一個優惠券,一個用戶只能下一單

現在的問題在于:

優惠卷是為了引流,但是目前的情況是,一個人可以無限制的搶這個優惠卷,所以我們應當增加一層邏輯,讓一個用戶只能下一個單,而不是讓一個用戶下多個單

具體操作邏輯如下:比如時間是否充足,如果時間充足,則進一步判斷庫存是否足夠,然后再根據優惠卷id和用戶id查詢是否已經下過這個訂單,如果下過這個訂單,則不再下單,否則進行下單

VoucherOrderServiceImpl

初步代碼:增加一人一單邏輯

    if (count > 0) {// 用戶已經購買過了return Result.fail("用戶已經購買過一次!");}

存在問題:現在的問題還是和之前一樣,并發過來,查詢數據庫,都不存在訂單,所以我們還是需要加鎖,但是樂觀鎖比較適合更新數據,而現在是插入數據,所以我們需要使用悲觀鎖操作

注意:在這里提到了非常多的問題,我們需要慢慢的來思考,首先我們的初始方案是封裝了一個createVoucherOrder方法,同時為了確保他線程安全,在方法上添加了一把synchronized 鎖

只要有一個線程進入該方法,其他任何線程想要進入這個對象實例的這個方法,都得等待,不管這些線程操作的數據是不是相互獨立

@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {//代碼return Result.ok(orderId);
}

,但是這樣添加鎖,鎖的粒度太粗了,在使用鎖過程中,控制鎖粒度 是一個非常重要的事情,因為如果鎖的粒度太大,會導致每個線程進來都會鎖住,所以我們需要去控制鎖的粒度,以下這段代碼需要修改為:
intern() 這個方法是從常量池中拿到數據,如果我們直接使用userId.toString() 他拿到的對象實際上是不同的對象,new出來的對象,我們使用鎖必須保證鎖必須是同一把,所以我們需要使用intern()方法

@Transactional
public  Result createVoucherOrder(Long voucherId) {synchronized(userId.toString().intern()){// 5.1.查詢訂單int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();// 5.2.判斷是否存在if (count > 0) {// 用戶已經購買過了return Result.fail("用戶已經購買過一次!");}// 6.扣減庫存boolean success = seckillVoucherService.update().setSql("stock = stock - 1") // set stock = stock - 1.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0.update();if (!success) {// 扣減失敗return Result.fail("庫存不足!");}// 7.創建訂單VoucherOrder voucherOrder = new VoucherOrder();// 7.1.訂單idlong orderId = redisIdWorker.nextId("order");voucherOrder.setId(orderId);// 7.2.用戶idvoucherOrder.setUserId(userId);// 7.3.代金券idvoucherOrder.setVoucherId(voucherId);save(voucherOrder);// 7.返回訂單idreturn Result.ok(orderId);}
}

但是以上代碼還是存在問題,問題的原因在于當前方法被spring的事務控制,如果你在方法內部加鎖,可能會導致當前方法事務還沒有提交,但是鎖已經釋放也會導致問題,所以我們選擇將當前方法整體包裹起來,確保事務不會出現問題:如下:

在seckillVoucher 方法中,添加以下邏輯,這樣就能保證事務的特性,同時也控制了鎖的粒度

但是以上做法依然有問題,因為你調用的方法,其實是this.的方式調用的,事務想要生效,還得利用代理來生效,所以這個地方,我們需要獲得原始的事務對象, 來操作事務

synchronized在集群模式下會失效,我們需要用到下面的分布式鎖

分布式鎖解決集群模式下的秒殺問題

通過加鎖可以解決在單機情況下的一人一單安全問題,但是在集群模式下就不行了。

1、我們將服務啟動兩份,端口分別為8081和8082:

2、然后修改nginx的conf目錄下的nginx.conf文件,配置反向代理和負載均衡:

有關鎖失效原因分析

由于現在我們部署了多個tomcat,每個tomcat都有一個屬于自己的jvm,那么假設在服務器A的tomcat內部,有兩個線程,這兩個線程由于使用的是同一份代碼,那么他們的鎖對象是同一個,是可以實現互斥的,但是如果現在是服務器B的tomcat內部,又有兩個線程,但是他們的鎖對象寫的雖然和服務器A一樣,但是鎖對象卻不是同一個,所以線程3和線程4可以實現互斥,但是卻無法和線程1和線程2實現互斥,這就是 集群環境下,syn鎖失效的原因,在這種情況下,我們就需要使用分布式鎖來解決這個問題。

分布式鎖

所以我們需要跨進程的鎖

分布式鎖的核心思想就是讓大家都使用同一把鎖,只要大家使用的是同一把鎖,那么我們就能鎖住線程,不讓線程進行,讓程序串行執行,這就是分布式鎖的核心思路

那么分布式鎖他應該滿足一些什么樣的條件呢?

可見性:多個線程都能看到相同的結果,注意:這個地方說的可見性并不是并發編程中指的內存可見性,只是說多個進程之間都能感知到變化的意思

互斥:互斥是分布式鎖的最基本的條件,使得程序串行執行

高可用:程序不易崩潰,時時刻刻都保證較高的可用性

高性能:由于加鎖本身就讓性能降低,所有對于分布式鎖本身需要他就較高的加鎖性能和釋放鎖性能

安全性:安全也是程序中必不可少的一環

常見的分布式鎖有三種:

Mysql:mysql本身就帶有鎖機制,但是由于mysql性能本身一般,所以采用分布式鎖的情況下,其實使用mysql作為分布式鎖比較少見

Redis:redis作為分布式鎖是非常常見的一種使用方式,現在企業級開發中基本都使用redis或者zookeeper作為分布式鎖,利用setnx這個方法,如果插入key成功,則表示獲得到了鎖,如果有人插入成功,其他人插入失敗則表示無法獲得到鎖,利用這套邏輯來實現分布式鎖

Zookeeper:zookeeper也是企業級開發中較好的一個實現分布式鎖的方案,由于本套視頻并不講解zookeeper的原理和分布式鎖的實現,所以不過多闡述


實現分布式鎖時需要實現的兩個基本方法:

  • 獲取鎖:
    • 互斥:確保只能有一個線程獲取鎖
    • 非阻塞:嘗試一次,成功返回true,失敗返回false
  • 釋放鎖:
    • 手動釋放
    • 超時釋放:獲取鎖時添加一個超時時間

要具備原子性,獲取鎖要么都成功要么都失敗

SET lock thread1 EX 10 NX,nx代表沒有就創建,有就不創建,Ex表示如果發生宕機,會在10s后自動釋放

核心思路:

我們利用redis 的SETNX 方法,當有多個線程進入時,我們就利用該方法,第一個線程進入時,redis 中就有這個key 了,返回了1,如果結果是1,則表示他搶到了鎖,那么他去執行業務,然后再刪除鎖,退出鎖邏輯,沒有搶到鎖的哥們,等待一定時間后重試即可

實現分布式鎖方案

SimpleRedisLock

利用setnx方法進行加鎖,同時增加過期時間,防止死鎖,此方法可以保證加鎖和增加過期時間具有原子性

private static final String KEY_PREFIX="lock:"
@Override
public boolean tryLock(long timeoutSec) {// 獲取線程標示String threadId = Thread.currentThread().getId()// 獲取鎖Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);return Boolean.TRUE.equals(success);
}
  • 釋放鎖邏輯

釋放鎖,防止刪除別人的鎖

public void unlock() {//通過del刪除鎖stringRedisTemplate.delete(KEY_PREFIX + name);
}
解決Redis分布式鎖誤刪問題

需求:修改之前的分布式鎖實現,滿足:在獲取鎖時存入線程標示(可以用UUID表示)

在釋放鎖時先獲取鎖中的線程標示,判斷是否與當前線程標示一致

  • 如果一致則釋放鎖
  • 如果不一致則不釋放鎖

之前我們用的是線程的id,在jvm內部線程id是一個遞增的數字,當我們在集群的模式下,多個jvm很有可能會出現線程id沖突的問題

所以說我們直接使用線程id區分鎖是不夠的,我們需要用UUID來區分不同的服務(jvm),再用線程id區分不同的線程,兩者結合

核心邏輯:在存入鎖時,放入自己線程的標識,在刪除鎖時我們利用UUID對每個鎖獨特標識,判斷當前這把鎖的標識是不是自己存入的,如果是,則進行刪除,如果不是,則不進行刪除。

同時,在更極端的情況下例如:

當線程1需要釋放鎖的時候,發生了阻塞,導致鎖被超時自動釋放,然后我們的線程2獲取到了鎖,剛好線程1又不阻塞了,這時候線程1要釋放鎖,因為前文已經做過判斷了,所以這里很可能因為判斷鎖和釋放鎖沒有保持原子性導致線程2的鎖被釋放

所以我們可以引入lua腳本來保證鎖的原子性,同時觀察redission的底層源碼,我們也會發現,redission的底層也是采用lua腳本的方案

總結上文:

基于Redis的分布式鎖實現思路:

  • 利用set nx ex獲取鎖,并設置過期時間,保存線程標示
  • 釋放鎖時先判斷線程標示是否與自己一致,一致則刪除鎖
    • 特性:
      • 利用set nx滿足互斥性
      • 利用set ex,設置超時時間,保證故障時鎖依然能釋放,避免死鎖,提高安全性
      • 利用Redis集群保證高可用和高并發特性

我們一路走來,利用添加過期時間,防止死鎖問題的發生,但是有了過期時間之后,可能出現誤刪別人鎖的問題,這個問題我們開始是利用刪之前 通過拿鎖,比鎖,刪鎖這個邏輯來解決的,也就是刪之前判斷一下當前這把鎖是否是屬于自己的,但是現在還有原子性問題,也就是我們沒法保證拿鎖比鎖刪鎖是一個原子性的動作,最后通過lua表達式來解決這個問題

四、Reidssion

1. 分布式鎖-redission功能介紹

基于setnx實現的分布式鎖存在下面的問題:

重入問題:重入問題是指 獲得鎖的線程可以再次進入到相同的鎖的代碼塊中,可重入鎖的意義在于防止死鎖,比如HashTable這樣的代碼中,他的方法都是使用synchronized修飾的,假如他在一個方法內,調用另一個方法,那么此時如果是不可重入的,不就死鎖了嗎?所以可重入鎖他的主要意義是防止死鎖,我們的synchronized和Lock鎖都是可重入的。

不可重試:是指目前的分布式只能嘗試一次,我們認為合理的情況是:當線程在獲得鎖失敗后,他應該能再次嘗試獲得鎖。

**超時釋放:**我們在加鎖時增加了過期時間,這樣的我們可以防止死鎖,但是如果卡頓的時間超長,雖然我們采用了lua表達式防止刪鎖的時候,誤刪別人的鎖,但是畢竟沒有鎖住,有安全隱患

主從一致性: 如果Redis提供了主從集群,當我們向集群寫數據時,主機需要異步的將數據同步給從機,而萬一在同步過去之前,主機宕機了,就會出現死鎖問題。

那么什么是Redission呢

Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務,其中就包含了各種分布式鎖的實現。

Redission提供了分布式鎖的多種多樣的功能

2. 分布式鎖-redission可重入鎖原理

在Lock鎖中,他是借助于底層的一個voaltile的一個state變量來記錄重入的狀態的,比如當前沒有人持有這把鎖,那么state=0,假如有人持有這把鎖,那么state=1,如果持有這把鎖的人再次持有這把鎖,那么state就會+1 ,如果是對于synchronized而言,他在c語言代碼中會有一個count,原理和state類似,也是重入一次就加一,釋放一次就-1 ,直到減少成0 時,表示當前這把鎖沒有被人持有。

在redission中,我們的也支持支持可重入鎖

在分布式鎖中,他采用hash結構用來存儲鎖,其中大key表示表示這把鎖是否存在,用小key表示當前這把鎖被哪個線程持有,所以接下來我們一起分析一下當前的這個lua表達式

這個地方一共有3個參數

KEYS[1] : 鎖名稱

ARGV[1]: 鎖失效時間

ARGV[2]: id + ":" + threadId; 鎖的小key (hash結構的vaule)

exists: 判斷數據是否存在 name:是lock是否存在,如果==0,就表示當前這把鎖不存在

redis.call('hset', KEYS[1], ARGV[2], 1);此時他就開始往redis里邊去寫數據 ,寫成一個hash結構

Lock{

id + ":" + threadId : 1

}

如果當前這把鎖存在,則第一個條件不滿足,再判斷

redis.call('hexists', KEYS[1], ARGV[2]) == 1

此時需要通過大key+小key判斷當前這把鎖是否是屬于自己的,如果是自己的,則進行

redis.call('hincrby', KEYS[1], ARGV[2], 1)

將當前這個鎖的value進行+1 ,redis.call('pexpire', KEYS[1], ARGV[1]); 然后再對其設置過期時間,如果以上兩個條件都不滿足,則表示當前這把鎖搶鎖失敗,最后返回pttl,即為當前這把鎖的失效時間

如果小伙幫們看了前邊的源碼, 你會發現他會去判斷當前這個方法的返回值是否為null,如果是null,則對應則前兩個if對應的條件,退出搶鎖邏輯,如果返回的不是null,即走了第三個分支,在源碼處會進行while(true)的自旋搶鎖。

"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);"
//獲取鎖失敗返回剩余有效期,單位為毫秒

3. 分布式鎖-redission鎖重試和WatchDog機制

說明:由于課程中已經說明了有關tryLock的源碼解析以及其看門狗原理,所以筆者在這里給大家分析lock()方法的源碼解析,希望大家在學習過程中,能夠掌握更多的知識

搶鎖過程中,獲得當前線程,通過tryAcquire進行搶鎖,該搶鎖邏輯和之前邏輯相同

1、先判斷當前這把鎖是否存在,如果不存在,插入一把鎖,返回null

2、判斷當前這把鎖是否是屬于當前線程,如果是,則返回null

所以如果返回是null,則代表著當前這哥們已經搶鎖完畢,或者可重入完畢,但是如果以上兩個條件都不滿足,則進入到第三個條件,返回的是鎖的失效時間,同學們可以自行往下翻一點點,你能發現有個while( true) 再次進行tryAcquire進行搶鎖

long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {return;
}

接下來會有一個條件分支,因為lock方法有重載方法,一個是帶參數,一個是不帶參數,如果帶帶參數傳入的值是-1,如果傳入參數,則easeTime是他本身,所以如果傳入了參數,此時leaseTime != -1 則會進去搶鎖,搶鎖的邏輯就是之前說的那三個邏輯

if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}

如果是沒有傳入時間,則此時也會進行搶鎖, 而且搶鎖時間是默認看門狗時間 commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()

ttlRemainingFuture.onComplete((ttlRemaining, e) 這句話相當于對以上搶鎖進行了監聽,也就是說當上邊搶鎖完畢后,此方法會被調用,具體調用的邏輯就是去后臺開啟一個線程,進行續約邏輯,也就是看門狗線程

RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}
});
return ttlRemainingFuture;

此邏輯就是續約邏輯,注意看commandExecutor.getConnectionManager().newTimeout() 此方法

Method( new TimerTask() {},參數2 ,參數3 )

指的是:通過參數2,參數3 去描述什么時候去做參數1的事情,現在的情況是:10s之后去做參數一的事情

因為鎖的失效時間是30s,當10s之后,此時這個timeTask 就觸發了,他就去進行續約,把當前這把鎖續約成30s,如果操作成功,那么此時就會遞歸調用自己,再重新設置一個timeTask(),于是再過10s后又再設置一個timerTask,完成不停的續約

那么大家可以想一想,假設我們的線程出現了宕機他還會續約嗎?當然不會,因為沒有人再去調用renewExpiration這個方法,所以等到時間之后自然就釋放了。

private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}

4. 分布式鎖-redission鎖的MutiLock原理

為了提高redis的可用性,我們會搭建集群或者主從,現在以主從為例

此時我們去寫命令,寫在主機上, 主機會將數據同步給從機,但是假設在主機還沒有來得及把數據寫入到從機去的時候,此時主機宕機,哨兵會發現主機宕機,并且選舉一個slave變成master,而此時新的master中實際上并沒有鎖信息,此時鎖信息就已經丟掉了。

為了解決這個問題,redission提出來了MutiLock鎖,使用這把鎖咱們就不使用主從了,每個節點的地位都是一樣的, 這把鎖加鎖的邏輯需要寫入到每一個主叢節點上,只有所有的服務器都寫入成功,此時才是加鎖成功,假設現在某個節點掛了,那么他去獲得鎖的時候,只要有一個節點拿不到,都不能算是加鎖成功,就保證了加鎖的可靠性。

那么MutiLock 加鎖原理是什么呢?筆者畫了一幅圖來說明

當我們去設置了多個鎖時,redission會將多個鎖添加到一個集合中,然后用while循環去不停去嘗試拿鎖,但是會有一個總共的加鎖時間,這個時間是用需要加鎖的個數 * 1500ms ,假設有3個鎖,那么時間就是4500ms,假設在這4500ms內,所有的鎖都加鎖成功, 那么此時才算是加鎖成功,如果在4500ms有線程加鎖失敗,則會再次去進行重試.

五、異步秒殺優化

當用戶發起請求,此時會請求nginx,nginx會訪問到tomcat,而tomcat中的程序,會進行串行操作,分成如下幾個步驟

1、查詢優惠卷

2、判斷秒殺庫存是否足夠

3、查詢訂單

4、校驗是否是一人一單

5、扣減庫存

6、創建訂單

在這六步操作中,又有很多操作是要去操作數據庫的(數據庫操作比較緩慢),而且還是一個線程串行執行, 這樣就會導致我們的程序執行的很慢,所以我們需要異步程序執行,那么如何加速呢?

在這里筆者想給大家分享一下課程內沒有的思路,看看有沒有小伙伴這么想,比如,我們可以不可以使用異步編排來做,或者說我開啟N多線程,N多個線程,一個線程執行查詢優惠卷,一個執行判斷扣減庫存,一個去創建訂單等等,然后再統一做返回,這種做法和課程中有哪種好呢?答案是課程中的好,因為如果你采用我剛說的方式,如果訪問的人很多,那么線程池中的線程可能一下子就被消耗完了,而且你使用上述方案,最大的特點在于,你覺得時效性會非常重要,但是你想想是嗎?(并不是,比如我只要確定他能做這件事,然后我后邊慢慢做就可以了,我并不需要他一口氣做完這件事),所以我們應當采用的是課程中,類似消息隊列的方式來完成我們的需求,而不是使用線程池或者是異步編排的方式來完成這個需求。

綜上,我們直接來引入redisStream消息隊列

Redis消息隊列-基于Stream的消息隊列-消費者組

消費者組(Consumer Group):將多個消費者劃分到一個組中,監聽同一個隊列。具備下列特點:

創建消費者組:

key:隊列名稱
groupName:消費者組名稱
ID:起始ID標示,$代表隊列中最后一個消息,0則代表隊列中第一個消息
MKSTREAM:隊列不存在時自動創建隊列
其它常見命令:

刪除指定的消費者組

XGROUP DESTORY key groupNam e

給指定的消費者組添加消費者

XGROUP CREATECONSUMER key groupname consumername

刪除消費者組中的指定消費者

XGROUP DELCONSUMER key groupname consumername

從消費者組讀取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID 
[ID ...]
  • group:消費組名稱
  • consumer:消費者名稱,如果消費者不存在,會自動創建一個消費者
  • count:本次查詢的最大數量
  • BLOCK milliseconds:當沒有消息時最長等待時間
  • NOACK:無需手動ACK,獲取到消息后自動確認(給了NOACK代表不用消費者確認,消息投遞給消費者會自動確認)
  • STREAMS key:指定隊列名稱
  • ID:獲取消息的起始ID:
    ">":從下一個未消費的消息開始,確保所有消息都會被消費
    其它:根據指定id從pending-list中獲取已消費但未確認的消息,(例如0,是從pending-list中的第一個消息開始)
    結尾跟0,代表讀取pending-list 中的第一條消息

XPENDING key group [空閑時間] start(最大-) end(最小+) cout

命令格式:

XPENDING key group [start end count] [consumer]

其中:

  • key:流的鍵名(stream 的 key)
  • group:消費者組名(consumer group)
  • [start end count]:可選參數,用于限制返回的 pending 條目范圍:
    • start:最小的消息 ID(可以用 - 表示最小)
    • end:最大的消息 ID(可以用 + 表示最大)
    • count:返回的最大條數
  • [consumer]:可選參數,只查詢指定消費者名的 pending 條目

正確使用示例:

XPENDING mystream mygroup - + 10

含義:

  • 在流 mystream 中,
  • 查看消費者組 mygroup 的 pending 消息,
  • 消息 ID 范圍是從最小 (-) 到最大 (+),
  • 最多返回 10 條消息的詳細信息。

返回內容類似:

bash復制編輯1) 1) "1686748123145-0"         # 消息ID2) "consumer-1"              # 消費者名稱3) (integer) 2000            # 空閑時間(毫秒)4) (integer) 3               # 被傳遞的次數
2) ...

STREAM類型消息隊列的XREADGROUP命令特點:

  • 消息可回溯
  • 可以多消費者爭搶消息,加快消費速度
  • 可以阻塞讀取
  • 沒有消息漏讀的風險
  • 有消息確認機制,保證消息至少被消費一次

需求:

  • 創建一個Stream類型的消息隊列,名為stream.orders
  • 修改之前的秒殺下單Lua腳本,在認定有搶購資格后,直接向stream.orders中添加消息,內容包含voucherId、userId、orderId
  • 項目啟動時,開啟一個線程任務,嘗試獲取stream.orders中的消息,完成下單

修改lua表達式,新增

VoucherOrderServiceImpl

private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1.獲取消息隊列中的訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 2.判斷訂單信息是否為空if (list == null || list.isEmpty()) {// 如果為null,說明沒有消息,繼續下一次循環continue;}// 解析數據MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.創建訂單createVoucherOrder(voucherOrder);// 4.確認消息 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("處理訂單異常", e);//處理異常消息handlePendingList();}}}private void handlePendingList() {while (true) {try {// 1.獲取pending-list中的訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("stream.orders", ReadOffset.from("0")));// 2.判斷訂單信息是否為空if (list == null || list.isEmpty()) {// 如果為null,說明沒有異常消息,結束循環break;}// 解析數據MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.創建訂單createVoucherOrder(voucherOrder);// 4.確認消息 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("處理pendding訂單異常", e);try{Thread.sleep(20);}catch(Exception e){e.printStackTrace();}}}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/81353.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/81353.shtml
英文地址,請注明出處:http://en.pswp.cn/web/81353.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Pandas】pandas DataFrame add_suffix

Pandas2.2 DataFrame Reindexing selection label manipulation 方法描述DataFrame.add_prefix(prefix[, axis])用于在 DataFrame 的行標簽或列標簽前添加指定前綴的方法DataFrame.add_suffix(suffix[, axis])用于在 DataFrame 的行標簽或列標簽后添加指定后綴的方法 pandas…

解鎖MCP:AI大模型的萬能工具箱

摘要&#xff1a;MCP&#xff08;Model Context Protocol&#xff0c;模型上下文協議&#xff09;是由Anthropic開源發布的一項技術&#xff0c;旨在作為AI大模型與外部數據和工具之間溝通的“通用語言”。它通過標準化協議&#xff0c;讓大模型能夠自動調用外部工具完成任務&a…

nginx性能調優與深度監控

目錄 nginx性能調優 更改進程數與連接數 進程數 連接數 靜態緩存功能設置 日志切割 配置網頁壓縮 nginx 的深度監控 GoAccess 簡介 GoAccess安裝 ?編輯 配置中文環境 GOAccess生成中文報告 測試訪問 nginx vts 簡介 nginx vts 安裝 nginx配置開啟vts 測試訪問…

【時時三省】Python 語言----牛客網刷題筆記

目錄 1,常用函數 1,input() 2,map() 3,split() 4,range() 5, 切片 6,列表推導式 山不在高,有仙則名。水不在深,有龍則靈。 ----CSDN 時時三省 1,常用函數 1,input() 該函數遇到 換行停止接收,返回類型為字符串 2,map() 該函數出鏡率較高,目的是將一個可迭…

docker compose yml 啟動的容器中,如何使用linux環境變量賦值

在 Docker Compose 中&#xff0c;可以通過環境變量&#xff08;${VAR} 或 $VAR&#xff09;來動態配置容器。以下是幾種常見的使用方式 - 使用 env_file 加載變量文件 可以單獨定義一個環境變量文件&#xff08;如 app.env&#xff09;&#xff0c;然后在 docker-compose.y…

深入解析Kafka JVM堆內存:優化策略與監控實踐

&#x1f49d;&#x1f49d;&#x1f49d;歡迎蒞臨我的博客&#xff0c;很高興能夠在這里和您見面&#xff01;希望您在這里可以感受到一份輕松愉快的氛圍&#xff0c;不僅可以獲得有趣的內容和知識&#xff0c;也可以暢所欲言、分享您的想法和見解。 推薦&#xff1a;「storms…

git常用操作命令

本文介紹git常用的操作命令&#xff0c;供大家參考。 1、開始 # 初始化本地git git init# 在初始化的目錄中&#xff0c;創建readme.txt&#xff0c;添加到git庫中 git add readme.txt git commit -m "寫了一個readme.txt文件"2、版本回退 2.1、git reset git lo…

解鎖 MCP 中的 JSON-RPC:跨平臺通信的奧秘

你好,我是 shengjk1,多年大廠經驗,努力構建 通俗易懂的、好玩的編程語言教程。 歡迎關注!你會有如下收益: 了解大廠經驗擁有和大廠相匹配的技術等希望看什么,評論或者私信告訴我! 文章目錄 零、 背景一、RPC vs HTTP1.1 什么是RPC1.2 為什么需要 RPC?1.3 RPC 解決了什么…

【Redis】第1節|Redis服務搭建

一、Redis 基礎概念 核心功能 內存數據庫&#xff0c;支持持久化&#xff08;RDB/AOF&#xff09;、主從復制、哨兵高可用、集群分片。常用場景&#xff1a;緩存、分布式鎖、消息隊列、計數器、排行榜等。 安裝環境 依賴 GCC 環境&#xff08;C語言編譯&#xff09;&#xff0…

GitLab-CI簡介

概述 持續集成&#xff08;CI&#xff09;和 持續交付(CD) 是一種流行的軟件開發實踐&#xff0c;每次提交都通過自動化的構建&#xff08;測試、編譯、發布&#xff09;來驗證&#xff0c;從而盡早的發現錯誤。 持續集成實現了DevOps, 使開發人員和運維人員從繁瑣的工作中解…

FFmpeg解碼器配置指南:為什么--enable-decoders不能單獨使用?

FFmpeg解碼器配置指南 在FFmpeg的編譯配置過程中&#xff0c;許多開發者會遇到關于解碼器配置的困惑。特別是--enable-decoders這個選項&#xff0c;很多人誤以為啟用它就能自動包含所有解碼器。本文將深入解析FFmpeg解碼器配置的機制&#xff0c;并通過實際測試展示正確的配置…

C++多態與虛函數

C++多態與虛函數詳解 多態(Polymorphism)是 C++ 面向對象編程的重要特性,通過統一的接口實現不同的行為。虛函數(Virtual Function)是實現運行時多態的核心機制。以下從多態的構成條件、意義、析構函數的虛函數化、純虛函數和抽象類,以及虛函數表的底層實現依次介紹。 1.…

游戲引擎學習第313天:回到 Z 層級的工作

回顧并為今天的內容定下基調 昨天我們新增了每個元素級別的排序功能&#xff0c;并且采用了一種我們認為挺有意思的方法。原本計劃采用一個更復雜的實現方式&#xff0c;但在中途實現的過程中&#xff0c;突然意識到其實有個更簡單的做法&#xff0c;于是我們就改用了這個簡單…

ODBC簡介

ODBC&#xff08;Open Database Connectivity&#xff09;是一個由 Microsoft 制定的標準接口&#xff0c;允許不同的應用程序通過統一的方式訪問各種數據庫系統。 &#x1f9e0; 簡單理解&#xff1a; ODBC 就像是 “翻譯官”&#xff0c;在應用程序&#xff08;如 Excel、Py…

RK3588 buildroot QT 懸浮顯示(OSD)

概述 主要介紹在rockchip rk3588 buildroot中 運行QT程序。需要結合之前的文檔:認識DRM顯示系統、buildroot中QT開發指導、以及如何集成QT庫到3588板子上。 場景:在linux開發中,需要使用QT開發程序,做OSD顯示。(如下圖顯示,顯示器播放視頻,QT頁面懸浮于視頻上方,顯示…

sockaddr_in

在網絡編程中&#xff0c;sockaddr_in 結構體是用于表示 IPv4 地址的套接字地址結構。它定義在 <netinet/in.h> 頭文件中&#xff0c;是 sockaddr 結構體的一個特化版本&#xff0c;專門用于處理 IPv4 地址。 下面是 sockaddr_in 結構體的典型定義&#xff1a; struct …

有銅半孔工藝的制造難點與工藝優化

技術難點剖析 有銅半孔工藝在制造過程中面臨多重挑戰&#xff0c;主要集中在材料加工精度、孔壁完整性及良率控制三個方面&#xff1a; 銅層翹起與毛刺殘留 半孔成型時&#xff0c;銑刀高速切割可能導致孔壁銅層被拉扯&#xff0c;產生翹起或殘留銅屑&#xff0c;影響導電性能…

云原生安全:網絡協議TCP詳解

&#x1f525;「炎碼工坊」技術彈藥已裝填&#xff01; 點擊關注 → 解鎖工業級干貨【工具實測|項目避坑|源碼燃燒指南】 &#xff08;注&#xff1a;文末附可視化流程圖與專有名詞說明表&#xff09; 1. 基礎概念 TCP&#xff08;Transmission Control Protocol&#xff09;是…

Dify中的Extension插件開發例子:以neko為例

本文使用Dify v1.0.0-beta.1版本。以neko為例&#xff0c;介紹Dify中的Extension插件開發例子。需要說明的是Dify官方要求Python≥3.12&#xff0c;但發現本地PyCharm調試Python≥3.12有問題&#xff0c;就采用的Python 3.11版本。 一.Extension插件項目創建 1.填寫插件信息 …

Linux中logger命令的使用方法詳解

文章目錄 一、基礎語法二、核心功能選項三、?設施與優先級對照?1. 常用設施&#xff08;Facility&#xff09;2. 優先級&#xff08;Priority&#xff09;從低到高&#xff1a;3. 組合示例? 四、典型使用場景1. 記錄簡單消息2. 帶標簽和優先級3. 記錄命令輸出4. 發送到遠程服…