什么是死鎖?
場景:圖書館有兩個相鄰的儲物柜(柜子A和柜子B),小明和小紅需要同時使用這兩個柜子才能完成借書流程。
- 互斥資源
每個柜子只有一把鑰匙,且一次只能被一人使用(資源不可共享)。 - 持有并等待
-
- 小明拿到了柜子A的鑰匙,但他說:“我要等小紅用完柜子B的鑰匙,才能繼續操作。”
- 小紅拿到了柜子B的鑰匙,但她說:“我要等小明用完柜子A的鑰匙,才能繼續操作。”
- 僵局形成
兩人都死死攥著已有的鑰匙,同時等待對方手里的另一把鑰匙。結果兩人卡在原地,誰也無法完成借書流程。
死鎖的定義:線程A獲取到資源1,需要再次獲取到資源2被釋放以獲取到該資源,此時線程B獲取到了資源2,等待獲取資源1,兩線程進入了互相等待的狀態形成死鎖。
業務中的死鎖
讓我們以電商系統中的「購物車庫存鎖定」場景為例,具體分析死鎖的觸發機制:當用戶A嘗試同時鎖定商品X和Y的庫存,而用戶B以相反順序(先鎖Y再鎖X)發起操作時,這兩個并發請求可能因資源競爭進入相互等待狀態——此時系統既無法完成庫存扣減,也無法釋放已占用的資源,形成典型的死鎖僵局。
MockData
類初始化了商品的庫存,使用ConcurrentHashp
模擬購物車和商品庫存信息,并提供了添加至購物車、清空購物車、扣減庫存等方法。
public class MockData {// 模擬購物車:用戶ID -> 商品列表 + 總價public static final ConcurrentHashMap<Long, Cart> Carts = new ConcurrentHashMap<>();// 模擬庫存:商品ID -> 庫存數量public static final ConcurrentHashMap<Long, AtomicInteger> Inventory = new ConcurrentHashMap<>();static {// 初始化庫存(商品1和2各有1件)Inventory.put(1L, new AtomicInteger(1));Inventory.put(2L, new AtomicInteger(1));}// 添加商品到購物車public static void addToCart(Long userId, Long productId, int quantity) {Carts.computeIfAbsent(userId, k -> new Cart()).addProduct(productId, quantity);}// 清空購物車public static void clearCart(Long userId) {Carts.remove(userId);}// 獲取庫存數量public static int getStock(Long productId) {return Inventory.getOrDefault(productId, new AtomicInteger(0)).get();}// 扣減庫存(原子操作)public static boolean decreaseStock(Long productId, int quantity) {return Inventory.getOrDefault(productId, new AtomicInteger(0)).compareAndSet(getStock(productId), getStock(productId) - quantity);}// 購物車類static class Cart {private final ConcurrentHashMap<Long, Integer> items = new ConcurrentHashMap<>();@Getterprivate int totalPrice = 0;/*** 購物車添加商品* @param productId 商品id* @param quantity 數量*/public void addProduct(Long productId, int quantity) {items.put(productId, items.getOrDefault(productId, 0) + quantity);totalPrice += quantity;}public void removeProduct(Long productId) {items.remove(productId);totalPrice -= items.getOrDefault(productId, 0);}}
}
以下代碼模擬庫存不足造成死鎖的場景:
@Slf4j
@Service
@EnableAsync
public class OrderService {// 死鎖場景(模擬庫存不足)public void createOrderDeadLock(Long userId, Long productId) {log.info("用戶:{},開始下單商品:{}", userId, productId);// 模擬購物車添加商品cartLock.lock();try {// 步驟2:檢查庫存(此時可能有其他線程扣減)if (MockData.getStock(productId) < 1) {log.error("用戶:{} 庫存不足,放棄訂單", userId);return;}// 模擬長時間業務操作(人為制造時間差)try {Thread.sleep(5000);} catch (Exception e) {log.error("異常", e);}// 步驟3:扣減庫存(實際業務場景需要原子操作)if (!MockData.decreaseStock(productId, 1)) {log.error("用戶:{} 庫存已被搶光,放棄訂單", userId);return;}log.info("用戶 {},下單成功", userId);MockData.clearCart(userId);} catch (Exception e) {log.error("下單失敗", e);} finally {cartLock.unlock();}}
}
在Controller層調用該方法,同時進行場景分析:
死鎖場景分析:
- 核心邏輯:兩個用戶同時搶購同一商品,庫存僅剩1件。
- 死鎖原因:
-
- 線程1持有購物車鎖,等待庫存鎖。
- 線程2持有購物車鎖,等待庫存鎖。
- 雙方互相等待對方釋放鎖,形成循環等待。
@GetMapping("/wrong/cert/lock")public void wrongCertLock() throws InterruptedException {ExecutorService executor = Executors.newFixedThreadPool(2);// 用戶A嘗試購買商品1(庫存1)executor.submit(() -> orderService.createOrderLockBySequence(1L, 1L));// 用戶B嘗試購買商品1(庫存已不足)executor.submit(() -> orderService.createOrderLockBySequence(2L, 1L));// 等待觀察結果(死鎖表現為長時間無輸出)executor.shutdown();executor.awaitTermination(20, TimeUnit.SECONDS);}
}
使用工具調用該接口,并查看接口的輸出結果,接口響應時間5.08秒:
在下單時使用購物車的全局鎖certLock
時,存在兩個問題:
一.單鎖阻塞堆積(隱性"假死鎖")
當使用全局鎖 cartLock
時,所有下單請求必須串行執行。在高并發場景下:
- 第一個線程獲得鎖后執行5秒休眠
- 后續所有線程在
cartLock.lock()
處排隊阻塞 - 線程堆積導致系統吞吐量驟降,最終表現類似"死鎖"
數據示例:
- 假設QPS=100,5秒內會堆積500個等待線程
- 實際業務處理能力被壓縮到0.2 TPS(每秒處理0.2個請求)
使用Arthas
的thread -b
命令分析服務存在線程“死鎖”的情況和線程阻塞情況,同時Arthas
支持查看阻塞位置的源碼。
使用jad --jad --source-only
命令查看源碼,如例子中展示第40行附近存在線程阻塞的問題,我們可以通過反編譯查看源碼:
jad --source-only com.codetree.business_error.chapter.chapter02.shop.OrderService createOrderDeadLock
二、鎖粒度錯位導致的競態條件
隱患根源:
// 非原子操作
if (MockData.getStock(productId) < 1) {return;
}
// 非原子操作
MockData.decreaseStock(productId, 1)
即使有全局鎖保護:
- 庫存檢查與扣減分離:其他系統(如支付系統)可能同時修改庫存
- 超賣風險:檢查時庫存充足,但扣減時已被其他通道(API/后臺)修改
如何避免死鎖?
避免死鎖一般有兩種方案:
方案 | 實現方式 | 優點 | 缺點 |
---|---|---|---|
一次性獲取資源 | 使用全局鎖(globalLock ) | 簡單粗暴,徹底避免死鎖 | 并發性能差,所有請求串行執行 |
按順序獲取資源 | 固定鎖順序(先購物車 → 再庫存) | 兼顧并發性能,適用于復雜業務 | 需全局統一鎖順序策略 |
方案一、一次性獲取所有資源
一次性獲取所有資源可以視為將多個非原子性操作封裝成一個大的原子性操作,強制實現線程“串行化”訪問,該方案能夠徹底消除持有并等待條件,同時保證臨界區操作的原子一致性。
public void createOrderLockAllResource(Long userId, Long productId) {log.info("優化:一次性獲取所有的資源,用戶:{},開始下單商品:{}", userId, productId);// 模擬購物車添加商品globalLock.lock();try {// 步驟2:檢查庫存(此時可能有其他線程扣減)if (MockData.getStock(productId) < 1) {log.error("用戶:{} 庫存不足,放棄訂單", userId);return;}// 模擬長時間業務操作(人為制造時間差)try {Thread.sleep(10000);} catch (Exception e) {log.error("異常", e);}// 步驟3:扣減庫存(實際業務場景需要原子操作)if (!MockData.decreaseStock(productId, 1)) {log.error("用戶:{} 庫存已被搶光,放棄訂單", userId);return;}log.info("用戶 {},下單成功", userId);MockData.clearCart(userId);} catch (Exception e) {log.error("下單失敗", e);} finally {globalLock.unlock();}}
未出現阻塞問題,串行化執行成功,接口響應10.10秒。
方案二、按順序獲取資源
順序化獲取資源可以有效規避死鎖產生的必要條件(之一)——循環等待條件,同時消除進程間非原子操作的競爭沖突,從而避免競態條件的發生。
public void createOrderLockBySequence(Long userId, Long productId) {log.info("優化:按順序獲取鎖,用戶:{},開始下單商品:{}", userId, productId);// 模擬購物車添加商品cartLock.lock();try {// 步驟2:獲取庫存鎖inventoryLock.lock();try {// 快速失敗if (MockData.getStock(productId) < 1) {System.out.println("用戶 " + userId + " 庫存不足,方案二無效");return;}// 執行所有操作MockData.addToCart(userId, productId, 1);MockData.decreaseStock(productId, 1);System.out.println("用戶 " + userId + " 方案二下單成功!");MockData.clearCart(userId);} catch (Exception e) {log.error("異常", e);throw new RuntimeException(e);} finally {inventoryLock.unlock();}} catch (Exception e) {log.error("下單失敗", e);} finally {cartLock.unlock();}}
總結
在并發系統的設計與優化中,死鎖預防始終是確保系統穩定性的核心命題,我介紹的兩種死鎖的處理方式:
- "一刀切"的原子化方案
通過全局鎖強制串行化操作,犧牲了并發性能,以最簡單的方式徹底消除死鎖風險。這種"粗暴但可靠"的設計思路,特別適合對數據一致性要求極高、容錯成本較大的業務場景,保證了基本的安全性。 - 精細化控制的順序化策略
訪問資源順序化,投機取巧的利用了業務場景優勢,方案適合于對接口響應時間敏感的業務場景(下單搶購)。
實踐啟示錄:
- 沒有銀彈的解決方案:兩種方案各有利弊,需根據業務特性進行取舍。高頻小事務場景宜用原子化方案,長流程多步驟業務則更適合順序化控制。
- 死鎖預防≠完全消除:即使采取最優策略,仍需通過監控(如JVM線程Dump分析)、日志埋點(死鎖檢測)、壓力測試等手段持續驗證系統穩定性。
優秀的設計永遠是在理論模型與實際需求之間尋找精妙的平衡點。希望本文的分析能為你提供一些新的思路。