文章目錄
- 1. 經典面試題
- 2. 雙寫一致性
- 3. 更新策略
- 4. canal簡介
- 5. Redis與Mysql數據雙寫一致性工程落地案例
1. 經典面試題
- 上面的業務邏輯你用java代碼如何實現?
- 你只要用緩存,就可能會涉及到redis緩存與數據庫雙存儲雙寫,你只要是雙寫,就一定會有數據一致性問題,那么你如何解決數據一致性問題?
- 雙寫一致性,你先動redis緩存還是數據庫mysql中的哪一個?為什么?
- 延時雙刪你做過嗎?會有哪些問題?
- 有這個一種情況,微服務查詢redis無mysql有,為保證數據雙寫一致性回寫redis你需要注意什么?雙檢加鎖策略你了解過嗎?如何盡量避免緩存擊穿問題?
- redis和mysql雙寫100%會出紕漏,做不到強一致性,你如何保證最終一致性?
2. 雙寫一致性
首先,如果redis中有數據,就需要和數據庫中的值相同,如果redis中沒有數據,數據庫中的值要是最新值,且準備回寫redis。redis按照操作來分,分為兩種,即只讀緩存,實際環境中只能讀(實際業務中也是讀的操作居多),另一種是讀寫緩存,redis此時不僅可以讀數據,還可以寫數據(充當redis的門神),而這種緩存,redis與mysql數據同步有兩種策略:
- 同步直寫策略
寫數據庫后也同步寫redis,緩存和數據庫中的數據一致,對于讀寫緩存來說,要想保證緩存和數據庫中的數據一致,就要采用同步直寫策略。
- 異步緩寫策略
正常業務當中,mysql數據變動了,但是可以在業務上容許出現一定時間后才作用于redis,比如倉庫、物流系統等。如果出現異常情況,不得不將失敗的動作重新修補,有可能需要借助kafka或者RabbitMQ等消息中間件,實現重試重寫
有這個一種情況,微服務查詢redis無mysql有,為保證數據雙寫一致性回寫redis你需要注意什么?雙檢加鎖策略你了解過嗎?如何盡量避免緩存擊穿問題?
考慮一個場景,在QPS很高的一個項目中,有幾百萬個線程要訪問redis,發現redis中沒有這幾個數據,那么這些線程又要去訪問Mysql,Mysql此時在此海量的訪問下可能會奔潰,此時,mysql查詢完數據后要回寫redis,可能會出現redis數據覆蓋的問題(例如一個線程A已經回寫了a這條數據到redis中,在回寫的過程中另一個線程B正在讀mysql,B也是讀的a這條數據,然后回寫又覆蓋了A回寫到redis的數據)。總結下來,上面的場景揭露了兩個核心問題:
- Redis沒有數據,海量訪問可能會訪問mysql,導致mysql奔潰
- 海量線程回寫數據到redis,可能出現同一條數據多次回寫,出現數據覆蓋的問題
出現上面問題的本質原因就是訪問mysql數據庫和回寫數據的redis,這兩個操作并不是原子的。所以這時候就出現了雙檢加鎖策略:
多個線程同時區去查詢數據庫的這條數據,那么我們可以在第一個查詢數據的請求上使用一個互斥鎖鎖住它。其他線程走到這一步拿不到鎖就等著,等第一個線程查詢到了數據,然后做緩存,后面的線程進來發現已經有緩存了,就直接走緩存。
public String getData(String key) {// 先嘗試從 Redis 中獲取數據String data = redisClient.get(key);if (data == null) {// 如果 Redis 中沒有數據,則加鎖并從 MySQL 中獲取數據synchronized (lock) {// 再次檢查 Redis 中是否已經存在數據,因為在獲取鎖之前可能其他線程已經更新了緩存data = redisClient.get(key);if (data == null) {try {// 從 MySQL 中獲取數據PreparedStatement statement = mysqlConn.prepareStatement("SELECT data FROM table WHERE key = ?");statement.setString(1, key);ResultSet resultSet = statement.executeQuery();if (resultSet.next()) {data = resultSet.getString("data");// 將數據存入 Redis,并設置過期時間redisClient.setex(key, 3600, data);}statement.close();} catch (SQLException e) {e.printStackTrace();}}}}return data;}
在加鎖前進行一次檢查的目的是避免不必要的加鎖操作。如果在加鎖之前就發現緩存中已經存在數據,那么就不需要加鎖,直接返回緩存中的數據,從而減少了對鎖資源的競爭,提高了并發性能。加鎖后進行一次檢查是為了處理競態條件。即使在加鎖前檢查時發現緩存中沒有數據,但在獲取鎖之前可能其他線程已經更新了緩存。因此,在獲取鎖之后,需要再次檢查緩存中是否已經存在數據,以避免重復向緩存中寫入數據或者以過期數據為準備數據返回。
3. 更新策略
- 你只要用緩存,就可能會涉及到redis緩存與數據庫雙存儲雙寫,你只要是雙寫,就一定會有數據一致性問題,那么你如何解決數據一致性問題?
- 雙寫一致性,你先動redis緩存還是數據庫mysql中的哪一個?為什么?
上面兩個的問題的最終目的都是達到最終一致性。給緩存設置過期時間,定義清理緩存并回寫,是保證最終一致性的解決方案。我們可以對存入緩存的數據設置過期時間,所有的寫操作都以數據庫為準,對緩存操作只是盡最大努力即可。也就是說,如果數據庫寫成功,緩存更新失敗,那么只要達過期時間,則后面的請求自然會從數據庫中讀取到最新值然后回填到緩存,達到一致性,切記,要以Mysql數據庫寫入庫為準
。
如何解決數據一致性問題,如果機器可以停機,那么就可以直接停機維護了,但一般公司的業務是不能隨便停止的,所以這里我們主要探討4種更新策略:(這一部分十分重要)
- 先更新數據庫,再更新緩存
- 先更新緩存,再更新數據庫
- 先刪除緩存,再刪除數據庫
- 先更新數據庫,再刪除緩存
- 方案一:先更新數據庫后更新緩存
這種方案存在兩個問題:
問題一
例如先更新mysql的某商品的庫存,當前商品的庫存是100,更新為99個。先更新mysql修改為99,然后更新redis。此時假設異常出現(例如redis掛了),更新redis失敗了,這導致mysql里面的庫存是99,而redis里面還是100。上面問題發生,會導致數據庫里面和緩存里面的數據不一致,客戶端會讀到redis的臟數據。
問題二
A、B兩個線程發起調用:
A update mysql 100
A update redis 100
B update mysql 80
B update redis 80
但在多線程環境下,上面代碼的執行順序不是固定的,可能出現很多情況,例如下面情況:
A update redis 100
A update mysql 100
B update redis 80
B update mysql 80
現在就出現了mysql數據為100,而redis的數據為80的情況,出現了數據不一致問題,說到底這個問題就是高并發導致的問題。
- 方案二:先更新緩存再更新數據庫
這種方案是不太推薦的,因為業務上通常把Mysql作為底單數據庫
,保證最后解釋。考慮下面這種情況
A update redis 100
A update mysql 100
B update redis 80
B update mysql 80
同樣是多線程緩存可能出現下面情況:
A update redis 100
B update redis 80
B update mysql 80
A update mysql 100
上面現象同樣導致了緩存不一致的情況
針對上面這種情況,有一些解決方案可以考慮:
使用樂觀鎖或版本控制:在更新數據時,使用樂觀鎖或版本控制來確保更新的原子性。例如,在更新數據庫時,可以使用版本號字段或者時間戳來判斷數據是否被其他線程修改過,從而避免覆蓋其他線程的更新。
引入分布式鎖:在更新緩存和數據庫時,可以引入分布式鎖來保證操作的原子性。這樣可以確保在更新緩存和數據庫時只有一個線程可以執行,從而避免并發更新導致的問題。
使用消息隊列:將更新緩存和更新數據庫的操作放入消息隊列中按順序執行,這樣可以保證更新的順序性。例如,先將更新緩存的消息發送到消息隊列中,等待更新完成后再將更新數據庫的消息發送到消息隊列中,這樣可以保證先更新緩存再更新數據庫。
細化業務邏輯:根據具體業務情況,設計更加細化的業務邏輯來處理緩存和數據庫的更新順序。例如,可以將緩存和數據庫的更新操作放在同一個事務中執行,以保證操作的原子性和一致性。
- 方案三:先刪除緩存再更新數據庫
這種方案同樣有異常問題,例如:A線程先刪除了redis里面的數據,然后去更新mysql,此時Mysql正在更新中,還沒有結束(比如出現了網絡延遲),此時B突然出現要來讀取緩存。
此時redis里面的數據是空的,B線程來讀,先去讀redis里面的數據(但數據被A給刪除了),此時就有兩個問題了:
- B去讀Mysql但是獲取了舊值(A沒更新完)
- B獲取舊值后寫回redis(剛被A刪除的數據又寫回redis了)
此時A更新完Mysql了,發現redis里面的緩存是臟數據,給A線程干的CPU直接燒了。于是緩存中的數據還是老數據,導致緩存中的數據是臟的,而且還會一直臟下去。
延時雙刪你做過嗎?會有哪些問題?
在注釋暫停兩秒的地方,加上Thread.sleep(2)
,就是為了讓b線程先從數據看中讀取數據,再把缺失的數據寫入緩存,然后線程A再進行刪除,所以,線程A sleep的時間,就需要大雨線程B讀取數據的時間,這樣一來,其它線程讀取數據時,會發現緩存缺失,所以會從數據看中讀取最新的值,因為這個方案會在第一次刪除緩存值后,延遲一段時間再次進行刪除,所以我們也把它叫做延遲雙刪。
針對延遲雙刪也會出現一些問題:
這個刪除該休眠多久?
確定這個時間有兩種方法,第一種是在業務程序運行的時候,統計程序讀數據和寫緩存的操作時間,自行評估自己的項目的讀數據業務邏輯的耗時。以此為基礎進行估算,然后寫數據的休眠時間則在讀數據業務邏輯的耗時基礎上加上100ms。這么做的目的就是為了確保請求結束,寫請求可以刪除讀請求造成的緩存臟數據。另一種方案就是新啟動一個后臺監控程序,比如要講解的WatchDog。
這種同步淘汰策略,吞吐量降低怎么辦?
由于A線程會睡眠阻塞后面的業務,這確實會導致業務吞吐量降低。此時可以考慮第二次刪的時候,再開一個線程來異步刪除。
- 方案四:先更新數據庫再刪除緩存
這種方案相比上面三種方案是最好的,但它也存在它的問題。
上面緩存刪除失敗或者來不及刪除,導致請求再次訪問redis緩存命中的時候,讀取到的緩存是舊的值。
redis和mysql雙寫100%會出紕漏,做不到強一致性,你如何保證最終一致性?
- 我們可以把要刪除的緩存值或者是要更新的值暫存在消息隊列中(Kafka/RabbitMQ)
- 當程序沒有能夠成功地刪除緩存值或者是更新數據庫值時,可以從消息隊列中重寫讀取這些值面,然后再次進行刪除或者更新
- 如果能夠成功刪除或更新,我們就要把這些值從消息隊列中去除,以免重復操作,此時,我們也可以保證數據庫和緩存的數據一致了,否則還需要重試
- 如果重試超過一定次數后還沒有成功,我們就需要向業務層發送報錯信息了,通知運維人員
業務指導思想:
微軟云
阿里巴巴的canal
- 總結
4. canal簡介
如何知道Mysql數據發生了變動?
Mysql在更新數據的時候所有的更新操作都會記錄到binlog中,如果我們拿到了這個binlog我們相應就知道了Mysql數據庫的變動情況。所以我們需要一個技術,能監聽到mysql變動,且能通知到redis,這個中間件就是canal。
Canal是什么?
Canal是阿里巴巴的Mysql Binlog增量訂閱和消費組件,主要用途是用于Mysql數據庫增量日志數據的訂閱、消費和解析,是阿里巴巴開發并開源的,采用java語言開發。Canal模擬MySQL的slave角色,向MySQL請求binlog,然后解析binlog成為DBChange對象。
Canal能干什么?
- 數據庫景象
- 數據庫實時備份
- 索引構建和實時維護(拆分異構索引、倒排索引等)
- 業務cache刷新
- 帶業務邏輯的增量數據處理
Canal工作原理?
首先了解一個傳統的Mysql主從復制的工作原理:
Mysql主從復制將經歷下面這些過程:
- 當master主服務器上的數據發生改變時,則將其改變寫入到二進制日志文件當中
- slave從服務器會在一定時間間隔內對master服務器上的二進制文件進行探測,探測其是否發生變化(offset偏移量),如果探測到master主服務器的二進制事件日志發生了改變,則開始一個I/O Thread請求master二進制事件日志
- 同時master主服務器為每個I/O thread啟動一個dump Thread,用于向其發送二進制事件文件
參考Mysql主從復制的原理,canal就出現了。
canal模擬Mysql slave的交互協議,偽裝自己為mysql slave,向mysql master 發送dump協議,Mysql master受到dump請求,開始推送binary log給slave(即canal),canal解析binary log對象(原始流為byte)。
5. Redis與Mysql數據雙寫一致性工程落地案例
- Mysql配置
查看Mysql版本
SELECT VERSION();
查看當前主機的binlog
SHOW MASTER STATUS;
查看當前binlog是否對外開放
SHOW VARIABLES LIKE 'log_bin'
默認時關閉的,這里我打開了
開啟binlog的寫入功能
打開Mysql的my.ini配置文件,進行以下配置
log-bin=mysql-bin #開啟binlog
binlog-format=ROW #選擇row模式
server_id=1 #配置Mysql replaction需要定義,不要和canal的slaveid相同
Row模式除了記錄sql語句之外,還會記錄每個字段的變化情況,能夠清楚的記錄每行記錄的變化歷史,但會占用更多空間
STATEMENT模式只記錄sql語句,但是沒有記錄上下文信息,在進行數據恢復的時候可能會丟失數據
MIX模式比較靈活的記錄,理論上說當遇到了表結構發生變更的情況,就會時statement模式,當遇到數據更新或者刪除的情況就會變味row模式
授權canal連接mysql
DROP USER IF EXISTS 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
FLUSH PRIVILEGES;
- canal配置
下載地址
解壓
解壓后放在指定的一個目錄就行
配置
修改conf/example
路徑下的instance.properties文件
- 換成自己的mysql主機的master地址
- 換成自己的在mysql新建立的canal賬戶
啟動canal
./startup.sh
查看是否啟動成功
查看example.log和canal.log即可
- java程序開發
引入jar包
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.3</version></dependency>
添加配置
spring:datasource:master:url: jdbc:mysql://localhost:3306/atguigudb?characterEncoding=utf-8&useSSL=falsedriver-class-name: com.mysql.cj.jdbc.Driverusername: rootpassword: 123456
業務類
public class RedisUtils {public static final String REDIS_IP_ADDR="127.0.0.1";public static JedisPool jedispool;static {JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();jedisPoolConfig.setMaxTotal(20);jedisPoolConfig.setMaxIdle(20);jedispool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000);}public static Jedis getJedis() throws Exception {if(null!=jedispool){return jedispool.getResource();}throw new Exception("Jedis poll is not ok");}
}
package com.jack.mybatis_plus.biz;import java.net.InetSocketAddress;
import java.util.List;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.jack.mybatis_plus.utils.RedisUtils;
import redis.clients.jedis.Jedis;import java.util.List;
import java.util.UUID;public class SimpleCanalClientExample {public static final Integer _60SECONDS=60;public static final String REDIS_IP_ADDR="127.0.0.1";public static void main(String args[]) {// 創建鏈接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();//connector.subscribe(".*\\..*"); 訂閱全部庫全部表connector.subscribe("atguigudb.jobs"); //只訂閱atguigudb數據庫的jobs表connector.rollback();//這幾個就指定了監聽程序·10分鐘結束int totalEmptyCount = 10*_60SECONDS;while (emptyCount < totalEmptyCount) {System.out.println("我是canal,每一秒一次正在監聽:"+ UUID.randomUUID().toString());Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據,一次處理1000條long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//表示當前數據庫沒有變動emptyCount++;try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {//計數器置0emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交確認// connector.rollback(batchId); // 處理失敗, 回滾數據}System.out.println("已經監聽了"+totalEmptyCount+"秒,無任何消息,請重啟!");} finally {connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {//獲取已經變更的數據rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {redisDelete(rowData.getAfterColumnsList());} else if (eventType == EventType.INSERT) {redisInsert(rowData.getAfterColumnsList());} else {redisUpdate(rowData.getAfterColumnsList());}}}}private static void redisUpdate(List<Column> columns) {JSONObject jsonObject=new JSONObject();for (Column column : columns) {System.out.println(column.getName()+" : "+column.getValue()+" update="+column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size()>0){try(Jedis jedis= RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());System.out.println("--------update after: "+jedis.get(columns.get(0).getValue()));}catch (Exception e){e.printStackTrace();}}}private static void redisDelete(List<Column> columns) {JSONObject jsonObject=new JSONObject();for (Column column : columns) {jsonObject.put(column.getName(),column.getValue());}if(columns.size()>0){try(Jedis jedis= RedisUtils.getJedis()){jedis.del(columns.get(0).getValue());}catch (Exception e){e.printStackTrace();}}}private static void redisInsert(List<Column> columns) {JSONObject jsonObject=new JSONObject();for (Column column : columns) {System.out.println(column.getName()+" : "+column.getValue()+" insert="+column.getUpdated());jsonObject.put(column.getName(),column.getValue());}if(columns.size()>0){try(Jedis jedis= RedisUtils.getJedis()){jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());}catch (Exception e){e.printStackTrace();}}}
}
測試
修改數據
被正確同步到redis