Java面試題025:一文深入了解數據庫Redis(1)
Java面試題026:一文深入了解數據庫Redis(2)
????????本節我們整理一下Redis高可用和消息隊列使用場景的重點原理,讓大家在面試或者實際工作中遇到這類問題時能夠知道怎么入手,如何選擇合理的方案,至于怎么去搭建和具體的操作步驟不是我們本節的內容。
1、主從結構
????????Redis雖然讀取寫入的速度都特別快,但是也會產生讀壓力特別大的情況。為了分擔讀壓力,Redis支持主從復制,保證主數據庫的數據內容和從數據庫的內容完全一致。
????????Redis 主從架構是一種數據復制機制,用于提高數據庫的可用性和擴展性。在這種架構中,數據可以從一個主節點(master)復制到一個或多個從節點(slave)。主節點負責處理寫操作,而從節點則主要用于處理讀操作,實現讀寫分離。
開啟方式
master:
[root@master src]# vim /etc/redis/6379.conf
#70行 修改監聽地址為20.0.0.10 master地址
bind 20.0.0.10
#137行 開啟守護進程
daemonize yes
#172行 修改日志文件目錄
logfile /var/log/redis_6379.log
#264行 修改工作目錄
dir /var/lib/redis/6379
#700行 開啟AOF持久化功能
appendonly yes
slave1:
[root@slave1 src]# vim /etc/redis/6379.conf
#70行 修改監聽地址為20.0.0.11 slave1地址
bind 20.0.0.11
#137行 開啟守護進程
daemonize yes
#172行 修改日志文件目錄
logfile /var/log/redis_6379.log
#264行 修改工作目錄
dir /var/lib/redis/6379
#700行 開啟AOF持久化功能
appendonly yes
#287 修改IP和端口 指向master
replicaof 20.0.0.10 6379
slave2:
[root@slave1 src]# vim /etc/redis/6379.conf
#70行 修改監聽地址為20.0.0.12 slave2地址
bind 20.0.0.12
#137行 開啟守護進程
daemonize yes
#172行 修改日志文件目錄
logfile /var/log/redis_6379.log
#264行 修改工作目錄
dir /var/lib/redis/6379
#700行 開啟AOF持久化功能
appendonly yes
#287 修改IP和端口 指向master
replicaof 20.0.0.10 6379
復制原理
????????先啟動主節點,再啟動從節點,從節點啟動后,會向主數據庫發送SYNC命令。同時主數據庫收到SYNC命令后會開始在后臺保存快照(即RDB持久化,在主從復制時,會無條件觸發RDB),并將保存快照期間接收到的命令緩存起來,當快照完成后,redis會將快照文件和所有緩存命令發送給數據庫。從數據庫接收到快照文件和緩存命令后,會載入快照文件和執行命令,也就是說redis是通過RDB持久化文件和redis緩存命令來時間主從復制。----初始化復制。
????????后續每當主數據庫接到寫命令時,就會將命令同步給從數據庫,保證主從數據一致性。
? ? ? ? 主從數據庫斷線重連后,主數據庫只需要將斷線期間執行的命令傳送給從數據庫。
???????
復制方式
?????????主節點除了備份RDB文件之外還會維護者一個環形積壓隊列,以及環形隊列的寫索引和從節點同步的全局offset,環形隊列用于存儲最新的操作數據。
????????每個redis實例會擁有一個唯一的運行id,當實例重啟后,就會自動生成一個新的id。?從數據庫會存儲主數據庫的運行id。
????????主節點在復制同步階段,主數據庫每將一個命令傳遞給從數據庫時,都會將命令存放到積壓隊列,并記錄當前積壓隊列中存放命令的偏移量。
????????從數據庫接收到主數據庫傳來的命令時,會記錄下偏移量。
(1)全量復制:一般發生在Slave初始化階段
?在2.8之后,主從復制不再發送SYNC命令,取而代之的是PSYNC,格式為:“PSYNC ID offset”。
當主節點接到請求后,會判斷請求是否滿足以下兩個條件,當滿足時,不進行全量復制:
- 從節點傳遞的run id和master的run id一致。
- 主節點在環形隊列上可以找到對應offset的值。
1. 發送psync命令進行數據同步,由于是第一次進行復制,從節點沒有復制偏移量和主節點的運行ID,所以發送psync-1。
2. 主節點根據psync-1解析出當前為全量復制,回復+FULLRESYNC響應。
3. 從節點接收主節點的響應數據保存運行ID和偏移量offset
4. 主節點執行bgsave保存RDB文件到本地
5. 主節點發送RDB文件給從節點,從節點把接收的RDB文件保存在本地直接作為從節點數據文件
6. 對于從節點開始接收RDB快照到接收完成期間,主節點仍然響應讀寫命令,因此主節點會把這期間寫命令數據保存在復制客戶端緩沖區內,當從節點加載完RDB文件后,主節點再把緩沖區內的數據發送給從節點,保證主從之間數據一致性。
7. 從節點接收完主節點傳送來的全部數據后會清空自身舊數據
8. 從節點清空數據后開始加載RDB文件
9. 從節點成功加載完RDB后,如果當前節點開啟了AOF持久化功能, 它會立刻做bgrewriteaof操作,為了保證全量復制后AOF持久化文件立刻可用。
(2)?增量復制:
? ? ? ? 增量復制主要是Redis針對全量復制的過高開銷做出的一種優化措施, Slave初始化后開始正常工作時主服務器發生的寫操作同步到從服務器的過程。
????????增量復制的過程主要是主服務器每執行一個寫命令就會向從服務器發送相同的寫命令,從服務器接收并執行收到的寫命令。
主從結構存在問題
- 一旦主節點出現故障,需要手動將一個從節點晉升為主節點,同時需要修改應用方的主節點地址,還需要命令其他從節點去復制新的主節點,整個過程都需要人工干預。
- 主節點的寫能力受到單機的限制。
- 主節點的存儲能力受到單機的限制。
2、哨兵結構
????????主從結構的手動重啟和恢復都相對麻煩,這時候就需要哨兵登場了。
????????哨兵的作用就是監控redis節點的運行狀態,監控主數據庫和從數據庫是否能夠正常運行,主數據庫出現故障時自動將從數據庫轉換為主數據庫。
當使用多個哨兵時,哨兵不僅會監控主數據庫和從數據庫,哨兵之間也會相互監控。
開啟方式
[root@master ~]# vi redis-5.0.7/sentinel.conf
17行/protected-mode no #關閉保護模式
26行/daemonize yes #指定sentinel為后臺啟動
36行/logfile "/var/log/sentinel.log" #指定日志存放路徑
65行/dir "/var/lib/redis/6379" #指定數據庫存放路徑
84行/sentinel monitor mymaster 20.0.0.10 6379 2 #至少幾個哨兵檢測到主服務器故障了,才會進行故障遷移,全部指向masterIP
113行/sentinel down-after-milliseconds mymaster 30000 #判定服務器down掉的時間周期,默認30000毫秒(30秒)sentinel auth-pass mymaster 123456
146行/sentinel failover-timeout mymaster 180000 #故障節的的最大超時時間為180000(180秒)
監控原理
Redis Sentinel通過三個定時監控任務完成對各個節點發現和監控:
????????1. 每隔10秒,每個Sentinel節點會向主節點和從節點發送info命令獲取最新的拓撲結構
????????2. 每隔2秒,每個Sentinel節點會向Redis數據節點的sentinel:hello 頻道上發送該Sentinel節點對于主節點的判斷以及當前Sentinel節點的信息
????????3. 每隔1秒,每個Sentinel節點會向主節點、從節點、其余Sentinel節點發送一條ping命令做一次心跳檢測,來確認這些節點當前是否存活。????????
第一條操作的作用是獲取當前數據庫信息,比如發現新增從節點時,會建立連接,并加入到監控列表中,當主從數據庫的角色發生變化進行信息更新。第二條操作的作用是將自己的監控數據和哨兵分享,發送的內容為:
<哨兵地址>,<哨兵端口>,<哨兵運行id>,<哨兵配置版本>,<主數據庫名字>,<主數據庫地址>,<主數據庫端口>,<主數據庫配置版本>,每個哨兵會訂閱數據庫的_sentinel_:hello頻道,當其他哨兵收到消息后,會判斷該哨兵是不是新的哨兵,如果是則將其加入哨兵列表,并建立連接。第三條操作的作用是監控節點是否存活。該時間間隔有down-after-millisecond實現,當該值小于1s時,哨兵會按照設定的值發送ping,當大于1s時,哨兵會間隔1s發送ping命令。
主觀下線
????????主觀下線是當前哨兵節點認為某個節點有問題,客觀下線就是超過一定數量的哨兵節點認為某個主節點有問題。
????????每個Sentinel節點會每隔1秒對主節點、從節點、其他Sentinel節點發送ping命令做心跳檢測,當這些節點超過 down-after-milliseconds沒有進行有效回復,Sentinel節點就會對該節點做失敗判定,這個行為叫做主觀下線。
客觀下線
????????當Sentinel主觀下線的節點是主節點時,該Sentinel節點會通過sentinel is- master-down-by-addr命令向其他Sentinel節點詢問對主節點的判斷,當超過 <quorum>個數,Sentinel節點認為主節點確實有問題,這時該Sentinel節點會做出客觀下線的決定。
3、集群結構
????????當數據量過大到一臺服務器存放不下的情況時,主從模式或sentinel模式就不能滿足需求了,這個時候需要對存儲的數據進行分片,將數據存儲到多個Redis實例中。cluster模式的出現就是為了解決單機Redis容量有限的問題,將Redis的數據根據一定的規則分配到多臺機器。
????????使用集群,只需要將redis配置文件中的cluster-enable
配置打開即可。每個集群中至少需要三個主數據庫才能正常運行。
????????所有的節點都是一主一從(也可以是一主多從),其中從不提供服務,僅作為備用。
開啟方式
參考下面這篇文章,很詳細:
redis集群搭建之官方redis cluster 搭建實踐_redis cluster搭建-CSDN博客文章瀏覽閱讀2.2w次,點贊12次,收藏73次。redis cluster是官方的redis集群實現,本篇文章為搭建集群實踐篇一、手動搭建redis官方已經redis-trib.rb命令來給我們實現redis搭建了。但是為了了解原理,首先我們來手動搭建不使用官方的命令。如果大家想快速搭建,可以直接跳到二。1、準備我們這個例子是在單機上部署集群,實際的工作情況會在不同的機器上搭建,一方面為了保證高可用也是為了擴大數據的容量所以實際中會在不同的機器..._redis cluster搭建https://blog.csdn.net/fst438060684/article/details/80712433
集群創建過程
(1)設置節點
????????Redis集群一般由多個節點組成,節點數量至少為6個才能保證組成完整高可用的集群。每個節點需要開啟配置 cluster-enabled yes,讓Redis運行在集群模式。
(2)節點握手
????????節點握手是指一批運行在集群模式下的節點通過Gossip協議彼此通信, 達到感知對方的過程。節點握手是集群彼此通信的第一步,由客戶端發起命令:cluster meet{ip}{port}。完成節點握手之后,一個的Redis節點就組成了一個多節點的集群。
(3)分配槽(slot)
????????Redis集群把所有的數據映射到16384個槽中。每個節點對應若干個槽,只有當節點分配了槽,才能響應和這些槽關聯的鍵命令。通過 cluster addslots命令為節點分配槽。
4、消息隊列
(1)使用list作為隊列
????????Redis的列表類型可以用來實現隊列,并且支持阻塞式讀取。?在Redis中,List類型是按照插入順序排序的字符串鏈表。
- lpush生產消息,rpop消費消息
redis.properties
redis.url=localhost
redis.port=6379
redis.maxIdle=30
redis.minIdle=10
redis.maxTotal=100
redis.maxWait=10000
工具類:
public class JedisPoolUtils {private static JedisPool pool = null;static {//加載配置文件InputStream in = JedisPoolUtils.class.getClassLoader().getResourceAsStream("redis.properties");Properties pro = new Properties();try {pro.load(in);} catch (IOException e) {e.printStackTrace();}//獲得池子對象JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxIdle(Integer.parseInt(pro.get("redis.maxIdle").toString()));//最大閑置個數poolConfig.setMaxWaitMillis(Integer.parseInt(pro.get("redis.maxWait").toString()));//最大閑置個數poolConfig.setMinIdle(Integer.parseInt(pro.get("redis.minIdle").toString()));//最小閑置個數poolConfig.setMaxTotal(Integer.parseInt(pro.get("redis.maxTotal").toString()));//最大連接數pool = new JedisPool(poolConfig, pro.getProperty("redis.url"), Integer.parseInt(pro.get("redis.port").toString()));}//獲得jedis資源的方法public static Jedis getJedis() {return pool.getResource();}
}
消息生產者:
public class MessageProducer extends Thread {public static final String MESSAGE_KEY = "message:queue";private volatile int count;public void putMessage(String message) {Jedis jedis = JedisPoolUtils.getJedis();Long size = jedis.lpush(MESSAGE_KEY, message);System.out.println(Thread.currentThread().getName() + " put message,size=" + size + ",count=" + count);count++;}@Overridepublic synchronized void run() {for (int i = 0; i < 5; i++) {putMessage("message" + count);}}public static void main(String[] args) {MessageProducer messageProducer = new MessageProducer();Thread t1 = new Thread(messageProducer, "thread1");Thread t2 = new Thread(messageProducer, "thread2");Thread t3 = new Thread(messageProducer, "thread3");Thread t4 = new Thread(messageProducer, "thread4");Thread t5 = new Thread(messageProducer, "thread5");t1.start();t2.start();t3.start();t4.start();t5.start();}
}
redis后臺查看:
127.0.0.1:6379> lrange message:queue 0 -11) "message24"2) "message23"3) "message22"4) "message21"5) "message20"6) "message19"7) "message18"8) "message17"9) "message16"
10) "message15"
11) "message14"
12) "message13"
13) "message12"
14) "message11"
15) "message10"
16) "message9"
17) "message8"
18) "message7"
19) "message6"
20) "message5"
21) "message4"
22) "message3"
23) "message2"
24) "message1"
25) "message0"
消費者:
public class MessageConsumer implements Runnable {public static final String MESSAGE_KEY = "message:queue";private volatile int count;public void consumerMessage() {Jedis jedis = JedisPoolUtils.getJedis();String message = jedis.rpop(MESSAGE_KEY);System.out.println(Thread.currentThread().getName() + " consumer message,message=" + message + ",count=" + count);count++;}@Overridepublic void run() {while (true) {consumerMessage();}}public static void main(String[] args) {MessageConsumer messageConsumer = new MessageConsumer();Thread t1 = new Thread(messageConsumer, "thread6");Thread t2 = new Thread(messageConsumer, "thread7");t1.start();t2.start();}
}
結果:
thread6 consumer message,message=message0,count=0
thread6 consumer message,message=message1,count=1
thread6 consumer message,message=message2,count=2
thread6 consumer message,message=message3,count=3
thread7 consumer message,message=message4,count=4
thread6 consumer message,message=message5,count=5
thread7 consumer message,message=message6,count=6
thread6 consumer message,message=message7,count=7
thread7 consumer message,message=message8,count=8
thread6 consumer message,message=message9,count=9
thread7 consumer message,message=message10,count=10
thread6 consumer message,message=message11,count=11
thread7 consumer message,message=message12,count=12
thread6 consumer message,message=message13,count=13
thread7 consumer message,message=message14,count=14
thread6 consumer message,message=message15,count=15
thread7 consumer message,message=message16,count=16
thread6 consumer message,message=message17,count=16
thread7 consumer message,message=message18,count=18
thread6 consumer message,message=message19,count=19
thread7 consumer message,message=message20,count=20
thread6 consumer message,message=message21,count=20
thread7 consumer message,message=message22,count=22
thread6 consumer message,message=message23,count=22
thread7 consumer message,message=message24,count=24
thread6 consumer message,message=null,count=25
thread7 consumer message,message=null,count=26
thread6 consumer message,message=null,count=27
thread7 consumer message,message=null,count=28
thread6 consumer message,message=null,count=28
thread7 consumer message,message=null,count=30
thread6 consumer message,message=null,count=31...
????????這種方式,消費者死循環rpop從隊列中消費消息。即使隊列里沒有消息,也會進行rpop,會導致Redis CPU的消耗。
- lpush生產消息,brpop消費消息
public class MessageConsumer implements Runnable {public static final String MESSAGE_KEY = "message:queue";private volatile int count;private Jedis jedis = JedisPoolUtils.getJedis();public void consumerMessage() {List<String> brpop = jedis.brpop(0, MESSAGE_KEY);//0是timeout,返回的是一個集合,第一個是消息的key,第二個是消息的內容System.out.println(brpop);}@Overridepublic void run() {while (true) {consumerMessage();}}public static void main(String[] args) {MessageConsumer messageConsumer = new MessageConsumer();Thread t1 = new Thread(messageConsumer, "thread6");Thread t2 = new Thread(messageConsumer, "thread7");t1.start();t2.start();}
}
(2)使用pub/sub來進行消息的發布/訂閱
????????redis還提供了一組命令可以讓開發者實現"發布/訂閱"(publish/subscribe)模式。"發布/訂閱"模式包含兩種角色,分別是發布者和訂閱者。訂閱者可以訂閱一個或者多個頻道(channel),而發布者可以向指定的頻道(channel)發送消息,所有訂閱此頻道的訂閱者都會收到此消息。
????????發布者發布消息的命令是? publish,用法是 publish channel message。
????????訂閱頻道的命令是 subscribe,可以同時訂閱多個頻道,用法是 subscribe channel1 [channel2 ...]。不會收到訂閱之前就發布到該頻道的消息。
????????還可以使用psubscribe命令訂閱指定的規則。規則支持通配符格式。命令格式為? ? ? psubscribe pattern [pattern ...]訂閱多個模式的頻道。
通配符中?表示1個占位符,*表示任意個占位符(包括0),?*表示1個以上占位符。
例如訂閱者訂閱三個通配符頻道: psubscribe c? b* d?*
C:\Users\liqiang>redis-cli
127.0.0.1:6379> publish c m1
(integer) 0
127.0.0.1:6379> publish c1 m1
(integer) 1
127.0.0.1:6379> publish c11 m1
(integer) 0
127.0.0.1:6379> publish b m1
(integer) 1
127.0.0.1:6379> publish b1 m1
(integer) 1
127.0.0.1:6379> publish b11 m1
(integer) 1
127.0.0.1:6379> publish d m1
(integer) 0
127.0.0.1:6379> publish d1 m1
(integer) 1
127.0.0.1:6379> publish d11 m1
(integer) 1
????????上面返回值為1表示被訂閱者所接受,可以匹配上面的通配符。????????
????????使用psubscribe命令可以重復訂閱同一個頻道,如客戶端執行了psubscribe c? c?*。這時向c1發布消息客戶端會接受到兩條消息
生產者:
public class MessageProducer extends Thread {public static final String CHANNEL_KEY = "channel:1";private volatile int count;public void putMessage(String message) {Jedis jedis = JedisPoolUtils.getJedis();Long publish = jedis.publish(CHANNEL_KEY, message);//返回訂閱者數量System.out.println(Thread.currentThread().getName() + " put message,count=" + count+",subscriberNum="+publish);count++;}@Overridepublic synchronized void run() {for (int i = 0; i < 5; i++) {putMessage("message" + count);}}public static void main(String[] args) {MessageProducer messageProducer = new MessageProducer();Thread t1 = new Thread(messageProducer, "thread1");Thread t2 = new Thread(messageProducer, "thread2");Thread t3 = new Thread(messageProducer, "thread3");Thread t4 = new Thread(messageProducer, "thread4");Thread t5 = new Thread(messageProducer, "thread5");t1.start();t2.start();t3.start();t4.start();t5.start();}
}
subscribe消費者:
public class MessageConsumer implements Runnable {public static final String CHANNEL_KEY = "channel:1";//頻道public static final String EXIT_COMMAND = "exit";//結束程序的消息private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();//處理接收消息public void consumerMessage() {Jedis jedis = JedisPoolUtils.getJedis();jedis.subscribe(myJedisPubSub, CHANNEL_KEY);//第一個參數是處理接收消息,第二個參數是訂閱的消息頻道}@Overridepublic void run() {while (true) {consumerMessage();}}public static void main(String[] args) {MessageConsumer messageConsumer = new MessageConsumer();Thread t1 = new Thread(messageConsumer, "thread5");Thread t2 = new Thread(messageConsumer, "thread6");t1.start();t2.start();}
}/*** 繼承JedisPubSub,重寫接收消息的方法*/
class MyJedisPubSub extends JedisPubSub {@Override/** JedisPubSub類是一個沒有抽象方法的抽象類,里面方法都是一些空實現* 所以可以選擇需要的方法覆蓋,這兒使用的是SUBSCRIBE指令,所以覆蓋了onMessage* 如果使用PSUBSCRIBE指令,則覆蓋onPMessage方法* 當然也可以選擇BinaryJedisPubSub,同樣是抽象類,但方法參數為byte[]**/public void onMessage(String channel, String message) {System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message);//接收到exit消息后退出if (MessageConsumer.EXIT_COMMAND.equals(message)) {System.exit(0);}}
}
psubscribe消費者:
public class MessageConsumer implements Runnable {public static final String CHANNEL_KEY = "channel*";//頻道public static final String EXIT_COMMAND = "exit";//結束程序的消息private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();//處理接收消息public void consumerMessage() {Jedis jedis = JedisPoolUtils.getJedis();jedis.psubscribe(myJedisPubSub, CHANNEL_KEY);//第一個參數是處理接收消息,第二個參數是訂閱的消息頻道}@Overridepublic void run() {while (true) {consumerMessage();}}public static void main(String[] args) {MessageConsumer messageConsumer = new MessageConsumer();Thread t1 = new Thread(messageConsumer, "thread5");Thread t2 = new Thread(messageConsumer, "thread6");t1.start();t2.start();}
}/*** 繼承JedisPubSub,重寫接收消息的方法*/
class MyJedisPubSub extends JedisPubSub {@Overridepublic void onPMessage(String pattern, String channel, String message) {System.out.println(Thread.currentThread().getName()+"-接收到消息:pattern="+pattern+",channel=" + channel + ",message=" + message);//接收到exit消息后退出if (MessageConsumer.EXIT_COMMAND.equals(message)) {System.exit(0);}}
}
(3)缺點
Redis可以提供基本的發布訂閱功能,但畢竟不像消息隊列那種專業級別,所以會存在以下缺點:
-
redis無法對消息持久化存儲,消息一旦被發送,如果沒有訂閱者接收,數據會丟失
-
消息隊列提供了消息傳輸保障,當客戶端連接超時或事物回滾的等情況發生時,消息會重新發布給訂閱者,redis沒有該保障,導致的結果就是在訂閱者斷線超時或其他異常情況時,將會丟失所有發布者發布的信息
-
若訂閱者訂閱了頻道,但自己讀取消息的速度很慢的話,那么不斷積壓的消息會使redis輸出緩沖區的體積變得越來越大,這可能使得redis本身的速度變慢,甚至直接崩潰