提醒
要求了解或者熟練掌握以下知識點
- spring 事務
- mysql 臟讀
- 如何保證緩存和數據庫數據一致性
- 延遲雙刪
- 分布式鎖
- 并發編程 原子操作類
前言
在起草這篇博客之前
我做了點功課
這邊我寫的是一個示例代碼
數據層都寫成了 mock 的形式(來源于 JUnit5)
// Dduo
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; // 數據服務類
public class DataService { // 模擬緩存(實際使用Redis等實現) private static final ConcurrentHashMap<String, String> cache = new ConcurrentHashMap<>(); // 延遲雙刪線程池 private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); // 模擬數據庫,使用一個 Map 來存儲數據記錄 private static final ConcurrentHashMap<Integer, DataRecord> mockDatabase = new ConcurrentHashMap<>(); // 數據記錄類,包含數據的基本信息和版本號 private static class DataRecord { private int id; private String content; private int version; public DataRecord(int id, String content, int version) { this.id = id; this.content = content; this.version = version; } public int getId() { return id; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public int getVersion() { return version; } public void setVersion(int version) { this.version = version; } } // 模擬從數據庫獲取數據 private static DataRecord mockDatabaseGet(int id) { return mockDatabase.get(id); } // 模擬數據庫更新操作,更新數據并更新版本號 private static boolean mockDatabaseUpdate(int id, String content, int expectedVersion) { DataRecord record = mockDatabase.get(id); if (record == null) { return false; } // 檢查版本號是否匹配 if (record.getVersion() != expectedVersion) { return false; } // 更新數據內容 record.setContent(content); // 更新版本號 record.setVersion(expectedVersion + 1); mockDatabase.put(id, record); return true; } // 初始化數據庫數據 public void initData(int id, String content) { mockDatabase.put(id, new DataRecord(id, content, 1)); } // 獲取數據(帶緩存邏輯) public String getData(int id) { String cacheKey = "data_" + id; // 1. 先查緩存 String cached = cache.get(cacheKey); if (cached != null) { return cached; } // 2. 緩存未命中,查詢數據庫 DataRecord record = mockDatabaseGet(id); if (record == null) { return null; } // 3. 寫入緩存(包含版本號信息) String value = record.getContent() + "|v" + record.getVersion(); cache.put(cacheKey, value); return value; } // 更新數據(帶延遲雙刪和版本控制) public boolean updateData(int id, String newContent) { String cacheKey = "data_" + id; // 獲取當前數據的版本號 DataRecord record = mockDatabaseGet(id); if (record == null) { return false; } int expectedVersion = record.getVersion(); try { // 1. 第一次刪除緩存 cache.remove(cacheKey); // 2. 更新數據庫(帶版本校驗) boolean updateSuccess = mockDatabaseUpdate(id, newContent, expectedVersion); if (!updateSuccess) { return false; } // 3. 提交后安排延遲刪除 scheduler.schedule(() -> { try { // 二次刪除前的二次校驗(可選) DataRecord current = mockDatabaseGet(id); if (current != null && current.getVersion() > expectedVersion) { cache.remove(cacheKey); // 只刪除舊版本緩存 } } catch (Exception e) { // 處理異常,可添加重試邏輯 e.printStackTrace(); } }, 1, TimeUnit.SECONDS); // 延遲時間根據主從同步時間調整 return true; } catch (Exception e) { // 處理異常,可添加補償邏輯 e.printStackTrace(); return false; } } public static void main(String[] args) { DataService service = new DataService(); // 初始化數據 service.initData(1, "Initial Content"); // 獲取數據 System.out.println("Initial Data: " + service.getData(1)); // 更新數據 boolean result = service.updateData(1, "Updated Content"); System.out.println("Update Result: " + result); // 再次獲取數據 System.out.println("Updated Data: " + service.getData(1)); }
}
要點
- mockDatabaseUpdate 方法中,當更新數據時,會先檢查傳入的期望版本號與數據庫中記錄的版本號是否一致。如果一致,會更新數據內容并將版本號加 1。
- getData 方法會先從緩存中查找數據,如果緩存中沒有,則從數據庫中獲取數據,并將數據內容和版本號拼接后存入緩存。
- updateData 方法會先獲取當前數據的版本號,然后執行延遲雙刪操作。在更新數據庫時,會攜帶版本號進行校驗,確保數據的一致性。
運行示例
在 main
方法中,我們演示了如何初始化數據、獲取數據、更新數據和再次獲取數據。運行程序后,你可以看到數據的初始狀態、更新結果和更新后的數據。
通過這種方式,版本號和延遲雙刪機制可以協同工作,保證數據的一致性和緩存的正確性。
- 延遲雙刪處理緩存層面的最終一致性
- 第二次刪除前的版本檢查避免過度刪除
典型時序:
- 請求A刪除緩存
- 請求A更新數據庫(版本2)
- 請求B讀取緩存未命中,查詢數據庫(版本1)并填充緩存
- 延遲任務執行二次刪除,發現數據庫版本已更新,刪除舊版本緩存
- 后續請求獲取最新數據(版本2)并更新緩存
注意實際需要:
- 替換mock數據庫操作為真實DAO操作
- 調整延遲時間(通常500ms-1s)
- 添加緩存空值處理
- 添加重試機制和監控
為什么要進行延遲雙刪
緩存和數據庫數據的一致性一直是我們在后端開發中探討的問題
先刪除緩存再更新數據庫情況
現在有兩個線程
線程 1 是 寫線程
線程 2 是 讀線程
如果線程 1 是先刪除緩存再更新數據庫
在這個時間間隙 就是線程 1 寫線程刪除緩存和更行數據庫的這個間隙
線程 2 讀線程進來了
因為緩存已經被刪除了 讀線程嘗試去數據庫讀取數據
臟數據就這樣被寫入了緩存
下次讀的時候 因為緩存存在 所以一直讀取的是舊數據
發生的幾率比較大的原因往往是因為
更新數據庫的數據是比較慢的
先更新數據庫再刪除緩存的情況
線程 1 是讀線程 線程 1 首先去數據庫讀取到了舊數據
在寫回緩存的這個間隙
線程 2 是寫線程 更新了數據庫為新數據
之后線程 1 才寫入緩存
這樣緩存里依舊是舊數據
但這種情況發生情況很小
應為緩存的寫入很快
所以很難出現 讀線程在寫線程更改了數據庫數據后再把數據寫入緩存
而且另一種情況
線程 1 讀線程 執行完畢后
線程 2 寫線程 也最終會進行一次刪除緩存的操作
思考
● 一種做法是在更新數據時也更新緩存,只是在更新緩存前先加一個分布式鎖。因為這樣在同一時間只允許一個線程更新緩存,就不會產生并發問題了。當然這么做對于寫入的性能會有一些影響;
● 另一種做法同樣也是在更新數據時更新緩存,只是給緩存加一個較短的過期時間。這樣即使出現緩存不一致的情況,緩存的數據也會很快過期,對業務的影響也是可以接受。
延時雙刪實現
偽代碼
# 延遲雙刪代碼的實現# 刪除緩存
redis.delKey(X)# 更新數據庫
db.update(X)# 睡眠
Thread.sleep(N)# 再刪除緩存
redis.delKey(X)
思考
在延遲雙刪策略中
我們需要在更新數據庫之前
就先把緩存刪掉
這樣是為了防止在這個間隙有其他請求讀取到了緩存
拿到的是失效的緩存數據
清除緩存后 在這個期間 其他請求是不會命中緩存的 會直接去數據庫中讀取最新數據
這樣保證了數據的一致性和緩存的即時更新
在我看來延遲雙刪是在對比了先刪除緩存再更新數據庫還是先更新數據庫的基礎上 選擇出了先更新數據庫再刪除緩存的基礎上 的改進
更新數據庫數據是一個很慢的過程
這樣做可以高效的提高數據的一致性
再高并發讀取的情況下 減輕數據庫的讀取壓力 提高讀取性能和響應速度
進一步優化
一、使用讀寫鎖優化數據庫并發控制
原理:通過區分讀鎖(共享鎖)和寫鎖(排他鎖),確保寫操作期間獨占資源,避免臟讀和不可重復讀問題。
示例場景:電商庫存扣減
- 寫鎖應用:當用戶下單扣減庫存時,事務對庫存記錄加寫鎖(
SELECT ... FOR UPDATE
),阻止其他事務同時修改或讀取未提交的庫存數據。 - 讀鎖應用:商品詳情頁展示庫存時,事務加讀鎖(
SELECT ... LOCK IN SHARE MODE
),允許其他讀操作共享數據,但阻塞寫操作。 - 效果:寫鎖獨占期間,其他讀請求需等待寫鎖釋放,確保扣減操作的原子性,避免超賣。
二、高效緩存淘汰算法降低緩存失效影響
原理:通過動態調整緩存過期策略,減少因緩存集中失效導致的數據庫瞬時壓力。
示例場景:新聞熱點數據緩存
- LRU算法優化:傳統LRU可能誤淘汰熱點數據,可升級為 LRU-K(記錄最近K次訪問時間),優先保留高頻訪問數據。
- 時間窗口分散:為緩存鍵的過期時間添加隨機值(如基礎30分鐘 + 隨機0-10分鐘),避免大量緩存同時失效引發雪崩。
- 主動更新機制:結合讀寫鎖,在緩存失效前異步刷新數據(如后臺線程檢測過期前5分鐘的熱點Key,提前加載新數據)。
三、綜合應用案例:社交平臺評論系統
- 寫鎖控制評論發布
-
- 用戶發布評論時,事務對評論區數據加寫鎖,阻塞其他用戶同時修改同一帖子,確保評論順序和完整性。
- 讀鎖允許其他用戶持續加載已有評論,僅寫操作短暫阻塞。
- LFU算法管理緩存
-
- 使用 LFU(Least Frequently Used) 算法緩存熱門帖子,自動淘汰低頻訪問的舊數據。
- 結合 布隆過濾器 攔截無效查詢(如已刪除的帖子ID),減少緩存穿透。
四、注意事項
- 鎖粒度選擇:優先使用行級鎖(如InnoDB的間隙鎖)而非表鎖,減少阻塞范圍。
- 緩存一致性:采用 延遲雙刪策略(更新數據庫后先刪緩存,短暫延遲后再次刪除),避免并發更新導致臟數據。
- 性能監控:通過工具(如Prometheus)監控鎖等待時間和緩存命中率,動態調整鎖策略和淘汰算法參數。
通過上述方法,可在高并發場景下平衡數據一致性與系統性能,減少因鎖競爭或緩存失效導致的業務風險。
具體代碼
我們現在要更新數據庫
具體業務是插入數據
添加
/*** 添加句子** @param addSentenceDTO 注意提交是一個事務 如果失敗則回滾 我們這邊使用的是spring的事務框架*/@Override@Transactional(rollbackFor = Exception.class, timeout = 10) // todo 如果插入標簽過多 可能會導致事務回滾public void addSentenceWithTags(AddSentenceDTO addSentenceDTO) throws Exception {// 主記錄插入AddSentenceReq addSentenceReq = addSentenceDTO.getAddSentenceReq();tSentencesMapper.addSentence(addSentenceReq);Long sentenceId = addSentenceReq.getSentenceId();// 關聯標簽插入List<AddTagsReq> tagsList = addSentenceDTO.getTagsList();AddSentenceTagReq addSentenceTagReq = new AddSentenceTagReq();addSentenceTagReq.setSentenceId(sentenceId);addSentenceTagReq.setTagsList(tagsList);int size = tagsList.size();if (size == 0) return;else {int i = tSentencesMapper.batchInsertTags(addSentenceTagReq); // 數據庫插入標簽并返回改變的標簽數量if (i != size) {throw new Exception("傳入了無效標簽");}}// 此時已經更新了數據庫 并且提交了事務(事務未回滾) 延遲雙刪 更新版本號TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {@Overridepublic void afterCommit() {DATA_VERSION.incrementAndGet(); // 版本號自增String cacheKey = "balloonSentences:all" + DATA_VERSION;delayDoubleDelete(cacheKey, 5, TimeUnit.SECONDS); // 執行延時雙刪List<GetAllContentResp> dbData = tSentencesMapper.getAll(); // 更新elasticsearchelasticsearchService.saveProduct(dbData); // 寫到elasticsearch里面去}});}
我們把代碼邏輯進行了事務管理
當完成提交后
我們自增版本號
這邊是使用的一個原子類
// 原子類 版本號 這邊表示的是當前數據版本的版本號private static final AtomicInteger DATA_VERSION = new AtomicInteger(1);
版本號機制重新構造緩存的 key
進行延遲雙刪
這邊為什么又要有版本號機制又要進行雙刪
因為防止多個線程同時更新 所以要以最近的一次更新來刷新緩存
如果加鎖的話 效率就會降低太多了
/*** 更新緩存中全部句子的數據策略:延遲雙刪* 策略 先刪除緩存 然后更新數據庫 然后休眠 再刪除緩存* 要求用分布式鎖方式多線程進入操作數據庫環境** @param cacheKey* @param delay* @param unit*/private void delayDoubleDelete(String cacheKey, int delay, TimeUnit unit) {RLock lock = redissonClient.getLock("lock:" + cacheKey);try {lock.lock();// 第一次刪除(立即執行)redisService.deleteObject(cacheKey);// 延遲隊列二次刪除ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.schedule(() -> {redisService.deleteObject(cacheKey);// 強制刷新緩存refreshCacheWithVersion(DATA_VERSION);}, delay, unit);} finally {lock.unlock();}}
之后再強制刷新緩存一遍
驗證了我們剛才的想法
我們使用的要是最新的數據
緩存里面的也要是最新數據
/*** 強制刷新緩存** @param currentVersion*/
private void refreshCacheWithVersion(AtomicInteger currentVersion) {String cacheKey = "balloonSentences:all" + currentVersion;RLock lock = redissonClient.getLock("refresh:" + cacheKey);try {lock.lock();// 版本校驗(防止舊版本覆蓋)List<GetAllContentResp> newData = tSentencesMapper.getAll();// 刪除緩存redisService.deleteObject(cacheKey);// 隨機化TTL防雪崩 隨機化過期時間redisService.setList(cacheKey, newData, RandomUtil.randomInt(30, 60), TimeUnit.MINUTES);} finally {lock.unlock();}
}
如何確定延時的時間
1.數據庫性能
如果數據庫更新快
可以選擇較短的更新時間
2.緩存過期的時間
如果緩存過期的時間較長
可以選擇縮短更新時間
以免過早的刪除緩存導致數據不一致
思考
假設在延時雙刪策略中,第一次刪除緩存后,會有一段時間的延時,然后再進行第二次刪除緩存。如果此時緩存的過期時間設置得很短,比如只有幾秒鐘,那么在第二次刪除緩存之前,緩存可能已經過期,而應用程序在讀取緩存時會發現緩存已失效,從而不得不去數據庫中查詢最新數據。
為了避免這種情況,延時雙刪的延時時長應該要大于緩存的過期時間,確保在第二次刪除緩存之前,緩存還是有效的,這樣可以保證應用程序讀取到的數據是一致的。
同時還需要考慮數據更新的頻率和緩存的使用情況。如果數據更新較為頻繁,那么延時雙刪的延時時長應該要適當縮短,以便及時更新緩存;如果緩存的使用率很低,可以適當延長延時時長,以減少對緩存服務的壓力。