為什么要保證數據一致性
只要使用redis做緩存,就必然存在緩存和DB數據一致性問題。若數據不一致,則業務應用從緩存讀取的數據就不是最新數據,可能導致嚴重錯誤。比如將商品的庫存緩存在Redis,若庫存數量不對,則下單時就可能出錯,這是不能接受的。
旁路緩存
在使用 Redis 作為緩存時,最常見的緩存模式是 Cache-Aside(旁路緩存)模式因其靈活性、容錯性以及與 Redis 的高效配合,適用于大多數業務場景。
Cache-Aside(旁路緩存) 模式,又叫 Lazy-Loading(懶加載)模式,在這種模式下,緩存的讀取和寫入由應用程序直接管理。
- 讀策略:先查詢緩存,若緩存未命中,則從數據庫中加載數據并將其存儲在緩存中;
- 寫策略:應用程序直接更新數據庫,并更新或刪除緩存中的相關數據。
旁路緩存因其按需觸發、避免冗余操作特點所以被稱為懶加載。
旁路緩存模式,存在緩存一致性問題,需要小心處理緩存的清除和更新。
更新數據庫并更新緩存
那么如何保證redis緩存和數據庫的數據一致性呢?
直觀的思維想到的是 更新數據庫并更新緩存。
這種直觀的做法會有連續兩次更新,數據不一致問題。并且這種情況經常出現,所以一般不會采用。
對于緩存命中率要求很高的場景,我們可以使用更新數據庫+更新緩存的方案,因為只要緩存不過期,都是可以命中的。
解決方案:
- 加分布式鎖,丟失一部分性能。
- 加一個比較短的過期時間,哪怕緩存數據不一致也很快過期。
先刪除緩存再更新數據庫
請求A想要將數據庫的數據修改為21,先去redis中刪除了緩存,這個時候請求B來訪問查詢該數據,發現redis中沒有該數據,就去數據庫中查詢了舊數據并寫回到了緩存。這個時候請求A才更新數據庫。
此時緩存中的數據為20,而數據庫中存儲的為21產生了數據不一致。
可以看到先刪除緩存,再更新數據庫會在【讀+寫】并發的時候,會出現數據不一致的問題。
延時雙刪
那么針對于先刪除緩存再更新數據庫的方案我們有沒有解決方案呢?
就是先刪除緩存再更新數據庫,然后睡一段時間再刪除一次緩存。
延遲一段時間刪除的目的是什么?
是讓線程B有足夠的時間去將(舊值)寫回到緩存。確保延遲一段時間后能夠將線程B的臟數據刪除。也是保證了最終一致性。
但是具體延遲多久,很難評估出來,所以這種方案也只能盡可能保證數據一致。但是在極端情況下,還是會有可能出現數據不一致。
先更新數據庫再刪除緩存
還是在【讀+寫】并發的場景下分析,請求A查詢數據,發現緩存沒有命中,查詢數據庫為20。此時請求B更新數據庫并刪除了緩存。最后請求A寫回了緩存。請求A寫回的是舊數據也就是20。
此時數據庫中為21(新值),緩存中是20(舊值)。出現了數據不一致。
可以看到先更新數據庫再刪除緩存是有可能發生數據庫的,但是這個在實際當中發生的概率很小。
因為緩存的寫入通常要遠遠快于數據庫的寫入,所以實際當中,很少會出現請求B將數據庫更新并且刪除緩存后,請求A才寫回緩存的情況。
所以先更新數據庫再刪除緩存的方式是可以實現數據一致性的。但是為了保險起見,給緩存數據加上過期時間。也就是即使出現了以上這種小概率時間,還是可以通過過期時間,來達到一個最終一致性的目的。
刪除操作失敗
但是在實際開發當中還是會出現問題,如果刪除緩存操作失敗,那么緩存中就會一直存放臟數據,直到key過期。
也就是我們要保證更新數據庫之后緩存能被成功刪除?
對應的就有兩種解決方案:
- 消息隊列重試機制
- 訂閱 MySQL binlog日志,更新緩存。
消息隊列重試機制
引入消息隊列,將第二步操作(刪除緩存) 要操作的數據放到消息隊列中,由消費者來刪除緩存。
- 如果應用刪除緩存失敗,可以通過消費者重試機制。設置一個最大的重試次數。如果超過最大重試次數還是失敗,那就要向服務層發送報錯信息。
- 如果刪除緩存成功后返回ack。避免重復消費。
這種方案的缺點就是對代碼侵入性比較強,需要修改原本的業務代碼。
消費者:交換機和隊列都持久化防止消息丟失。
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MqConstant.REDIS_MQ_QUEUE, durable = "true", arguments = @Argument(name = "x-queue-mode", value = "lazy")),exchange = @Exchange(name = MqConstant.REDIS_DELETE_EXCHANGE_NAME, type = ExchangeTypes.DIRECT),key = MqConstant.REDIS_MQ_QUEUE_ROUTING_KEY
))
public void listenRedisDeleteMqMessage( String msg) {log.info("RedisDeleteMqListener:redis刪除數據:" + msg);Set keys = redisTemplate.keys(msg);redisTemplate.delete(keys);
}
配置好相關的生產者消費者確認機制,以及消費者失敗重試機制,確保消息正確路由和重試。
spring: rabbitmq:host: 192.168.215.140 # 你的虛擬機IPport: 5672 # 端口virtual-host: /bigevents # 虛擬主機username: big-events # 用戶名password: 123321 # 密碼publisher-returns: true # 開啟publisher return機制listener:simple:retry:enabled: true # 開啟消費者失敗重試initial-interval: 1000ms # 初始的失敗等待時長為1秒multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-intervalmax-attempts: 3 # 最大重試次數stateless: true # true無狀態;false有狀態。如果業務中包含事務,這里改為falseacknowledge-mode: auto #消費者自動ack 異常nack
Canal+MQ
先更新數據庫,對數據庫的寫操作都會被記錄在binlog日志中。所以我們更新數據庫,也就會產生一條對應的變更日志在binlog中。
于是我們可以通過訂閱binlog日志,拿到具體要操作的數據,然后再執行刪除緩存,阿里巴巴開源的Canal中間件就是基于這個實現的。
Canal模擬MySQL主從復制的交互協議,把自己偽裝成一個MySQL的從節點,向MySQL的主節點發送dump命令請求,MySQL收到請求后,就會推送Binlog給Canal,Canal解析binlog字節流后,轉換為便于讀取的結構化數據。供下游服務訂閱使用。
前面我們講了MQ的解決方案對原有的業務代碼有很強的侵入性。但是這種方案就不會,直接監聽binlog,因此我們可以用binlog+MQ的解決方案。既保證了數據的一致性又不會對原有業務代碼有侵入。
具體實現
mysql添加用戶和權限
CREATE USER canal IDENTIFIED BY 'canal'; # 添加從節點權限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;
修改canal相關配置:
vi canal-server/conf/example/instance.properties
通過spring定時任務監聽canal消息,解析canal的消息,并投遞給mq
/*** 每2秒執行一次** @throws Exception*/
@Scheduled(fixedRate = 2 * 1000)
public void run() throws Exception {//進行連接canalConnector.connect();//進行訂閱canalConnector.subscribe();int batchSize = 5 * 1024;//獲取Message對象Message message = canalConnector.getWithoutAck(batchSize);long id = message.getId();int size = message.getEntries().size();System.out.println("當前監控到的binLog消息數量是:" + size);//判斷是否有數據//如果有數據,進行數據解析List<CanalEntry.Entry> entries = message.getEntries();//遍歷獲取到的Entry集合for (CanalEntry.Entry entry : entries) {System.out.println("----------------------------------------");System.out.println("當前的二進制日志的條目(entry)類型是:" + entry.getEntryType());//如果屬于原始數據ROWDATA,進行打印內容if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {try {//獲取存儲的內容CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());//打印事件的類型,增刪改查哪種 eventTypeSystem.out.println("事件類型是:" + rowChange.getEventType());ArrayList<Long> idList = new ArrayList<>();//打印改變的內容(增量數據)for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {System.out.println("改變后的數據:" + rowData.getAfterColumnsList());List<Column> after;if (rowChange.getEventType().equals(CanalEntry.EventType.INSERT)) {after = rowData.getAfterColumnsList();} else if (rowChange.getEventType().equals(CanalEntry.EventType.DELETE)) {after = rowData.getBeforeColumnsList();} else {//updateafter = rowData.getAfterColumnsList();}for (Column column : after) {if (column.getName().equals("id")) {sendMessageToMq(column.getValue());}}}} catch (Exception e) {e.printStackTrace();}}}//消息確認已經處理了canalConnector.ack(id);
}
消費者監聽mq,最后刪除緩存。
@Slf4j
@Component
@AllArgsConstructor
public class CanalMqListener {private final RedisTemplate redisTemplate;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MqConstant.CANAL_MQ_QUEUE, durable = "true", arguments = @Argument(name = "x-queue-mode", value = "lazy")),exchange = @Exchange(name = MqConstant.CANAL_MQ_EXCHANGE_NAME, type = ExchangeTypes.DIRECT),key = MqConstant.CANAL_MQ_EXCHANGE_NAME))public void listenCanalMqMessage( String msg) {log.info("listenCanalMqMessage:redis變更數據:" + msg);try {Set keys = redisTemplate.keys(msg);redisTemplate.delete(keys);} catch (Exception e) {throw new RuntimeException(e);}}
}
總結
解決方案
延時雙刪
優點:
- 無需引入中間件
- 實現簡單,代碼改動小
缺點:
- 延遲時間不好控制
- 極端情況會出現數據不一致
消息隊列
優點:
- 確保緩存被刪除
缺點:
- 需要引入消息隊列,增加了系統復雜度。
- 延遲稍高(對于能接受一定延遲的場景使用)
- 對原有業務代碼有侵入
Canal+MQ
優點:
- 確保緩存被刪除
- 直接監聽binlog,對原有代碼無侵入
缺點:
- 需要引入多個中間件對團隊運維能力有較高要求
- 延遲稍高