歡迎來到啾啾的博客🐱。
記錄學習點滴。分享工作思考和實用技巧,偶爾也分享一些雜談💬。
歡迎評論交流,感謝您的閱讀😄。
目錄
- 引言
- 設計一個共享數據中心
- 選擇數據模型
- 鍵值對設計
- 數據可靠性設計
- 持久化
- 快照 (Snapshotting/RDB-like)
- 操作日志(Write-Ahead Log -WAL /Append-Only File AOF-like)
- 快照與日志融合使用(Snapshot + WAL)
- 一致性
- 監控數據狀態
- 狀態通知機制
- 集群間數據同步
- 實現一個共享數據中心
引言
分布式微服務架構中,有一些常見的分布式協調問題,如配置管理、命名服務(共享服務實例的地址信息)、分布式鎖、集群管理、Master選舉等。
這些協調問題可以簡單歸納為數據的共享與狀態監控,我們需要解決這些問題來保障架構的可用、性能,同時降低耦合、開放拓展等。
為此,我們的框架需要一個可靠的、一致的、可觀察的共享數據存儲。
ZooKeeper就是這樣的一個存在。
不過在深入了解ZooKeeper之前,我們先就這些特性來簡單實現一下共享數據存儲。
在上篇注冊中心中,我們設計了主從讀寫分離、數據安全結構與讀寫鎖,實現了簡單的注冊中心。其數據共享主要是集群內數據共享。
本篇共享數據存儲是面對整個架構的所有服務的。
設計一個共享數據中心
首先,我們需要考慮數據以什么樣的形式、什么樣的結構存在。
是僅存在于內存還是需要持久化,持久化是否需要進行事務特性實現以確保一致性……等等。
需要考慮的問題很多,首先,讓我們來進行數據結構的選型、決定數據以什么樣的形式存在。
選擇數據模型
數據模型可以簡單分為以下幾種:
數據模型類型 | 說明 |
---|---|
鍵值對(Key-Value) | 這是最簡單和最常見的數據模型。 比如Redis。 key一般為String類型、value的值靈活多變。 這樣的結構易于實現,但查詢能力優先,只能按鍵查找。 |
文檔型(Document-Oriented) | 比如MongoDB。 數據以類似JSON或BSON的文檔形式存儲。 適合存儲半結構化數據(不完全符合嚴格的表結構,但包含一些內部組織標記或元數據的數據),結構靈活、具備層次結構、可解析、字描述。 這樣的結構易于解析和傳輸大量數據,但查詢更難。 |
列式(Columnar) | 比如HBase或Cassandra。 適合大規模數據分析。 |
關系型(Relational) | 比如MySQL。 適合數據間有復雜關系,有事務需求時的選擇。 實現一個完整的關系型數據庫非常困難。 |
圖(Graph) | 比如 Neo4j。 適合數據之前關系非常重要復雜的情況。 顯然實現起來更復雜。 |
對于一個共享數據存儲中心,它需要能存儲各式各樣的值,且用戶服務可以很快速地獲取共享的值。
那么選擇“鍵值對”作為數據存儲的結構是個很合理的選擇。
鍵值對設計
鍵值對應該如何設計呢?
了解基本數據結構的我們很容易想到使用Hash計算的方式映射,用數組進行存儲。
在Java中簡單來說就是使用HashMap。共享數據存儲中心需要考慮多個用戶服務同時調用的情況,因此,我們的結構應當是線程安全的,即使用ConcurrentHashMap
,或者對普通的HashMap使用鎖,如讀寫鎖。
數據可靠性設計
數據是僅內存(In-Memory Only)還是需要持久化(Persistence)呢?
-
僅內存
僅內存性能高,但共享數據存儲中心一旦重啟或崩潰,數據容易丟失。 -
持久化
持久化雖然更可靠,但是復雜度顯著提升。
需要以什么形式持久化?
持久化是否需要事務來確保一致性(內存和持久化存儲的一致性、并發訪問的一致性)?
以什么樣的方式持久化?
等,有很多需要考慮的地方,越可靠越復雜。
因此必須在復雜度和可靠中做取舍,這往往也取決于對數據丟失的容忍度和數據的重要性。
持久化
持久化設計需要考慮持久化方式(Durability)與崩潰恢復(Crash Recovery)。從內存到磁盤的持久化有3種方式。
快照 (Snapshotting/RDB-like)
定期將內存中的整個數據結構完整地序列化到磁盤(分布式的“狀態轉移”)。
快照時,系統會創建一個當前內存數據結構的副本,對副本進行序列化操作。快照文件通常是二進制格式,考慮存儲效率和加載速度。例如Redis的RDB文件。
快照操作后臺線程執行,對正在運行的操作影響小。
但如果兩次快照之間發生故障,快照之間的部分數據會丟失。且快照過程比較耗時且消耗I/O,尤其數據量大時。
快照可以設置的觸發策略如下:
- 基于時間的策略 (Time-based)
- 基于操作數量\數據變化的策略(Change-based)
- 基于日志文件大小(Log-size-based - 通常與WAL/AOF結合
- 手動觸發 (Manual Trigger)
- 系統關閉 (On Shutdown)
操作日志(Write-Ahead Log -WAL /Append-Only File AOF-like)
數據操作日志 是一種按時間順序記錄所有對數據產生修改的“操作”的日志文件。 它記錄的是如何達到當前數據狀態的過程,而不是數據狀態本身(分布式的“操作轉移”)。
數據需要先記錄到日志,然后再更新到內存中。
- 追加寫入(Append-Only)
新的操作日志條目總是被添加到日志文件的末尾。順序寫入的方式通常比隨機寫入磁盤效率高。 - 預寫(Write-Ahead)
在數據真正被修改到內存中的持久化結構(在數據被刷到數據文件)之前,描述該修改的日志條目必須首先被安全地寫入到持久化的操作日志中并刷盤 (fsync)。
日志條目內容一般包含:操作類型、操作參數、事務信息、時間戳或其他序列號(LSN)。
操作日志恢復數據時,會從日志頭開始讀取文件并按順序重新執行,從而在內存中重建數據狀態。
這種方式數據持久性更好,崩潰時只丟失最后未刷盤的少量操作。缺點是恢復時需要重放所有日志,可能較慢,日志文件會不斷增長,需要定期進行壓縮或與快照結合。
快照與日志融合使用(Snapshot + WAL)
快照只能恢復數據的最終狀態,且兩次快照之間數據會丟失,雖然效率高,但是數據量大時I/O消耗大。
而日志模式雖然不會丟失太多數據,但是重放日志效率更低,日志數量多時恢復效率會顯著下降。
因此,生產級方案往往是快照與日志融合。
- 融合方案
定期做快照,同時記錄快照之后的WAL。恢復時先加載最近的快照,再重放后續的WAL。
比如Redis和MySQL(InnoDB)就是用的融合方案。
-
MySQL
MySQL的InnoDB設計有重做日志Redo Log,在數據被刷入磁盤之前,數據修改的記錄(Redo Log)必須先被寫入到Redo Log Buffer,并從Buffer刷到磁盤的Redo Log文件中。
在InnoDB中,Redo Log有一個概念為檢查點(Checkpoint),它記錄一個LSN(Log Sequence Number)。
數據恢復時不需要重放所有的Redo Log,只需要從最近的Checkpoint開始重放。
Checkpoint 確保了其記錄點之前的所有臟頁都已刷盤,因此Checkpoint之前的Redo Log文件都可以被覆蓋。。這個機制解決了日志文件不斷增長的問題。 -
Redis
Redis有兩種持久化方案:RDB與AOF,且可以同時開啟。
RDB對應快照,AOF對應操作日志。
Redis應對操作日志不斷增大的機制是 AOF重寫(AOF Rewrite)。AOF重寫在不中斷服務的情況下,創建一個新的更小的AOF文件,新文件包含達到當前數據集狀態所需的最小命令集。即去掉數據狀態變更過程,只保留最新數據狀態。
Redis從4.0開始RDB-AOF混合持久化。AOF重寫時,新的AOF文件可以配置為以RDB格式開頭,后跟增量的AOF命令(aof-use-rdb-preamble yes)。
新的 AOF 文件首先包含一個 RDB 快照部分(記錄重寫開始時的數據狀態),然后是重寫期間發生的增量寫命令。
這使得恢復時可以先加載 RDB 部分,然后只重放少量的 AOF 命令,大大加快了恢復速度,同時保留了 AOF 的高持久性。
比如我們計劃使用ConcurrentHashMap
,那么快照時就需要將這個ConcurrentHashMap
序列化。
常見的方式是快照方式是非阻塞式的后臺復制——寫時復制(Copy-on-Write COW)。快照策略選擇按數據量進行觸發。
快照與日志融合方案使用MySQL的更為簡單。
一致性
在通過持久化的方式來保證數據的可靠后,我們的共享數據存儲中心有了一定的可用性保障。這時我們需要開始考慮數據的一致性。
但一個完整的ACID事務系統是極其困難的,設計到并發控制、恢復管理、日志管理等多個復雜的子系統。
因此,簡單實現可以暫不追求完整的ACID事務。
僅先考慮基本的一致性,如簡單原子性,批量操作提交視為一個整體。集群數據同步的一致性。
監控數據狀態
狀態通知機制
當數據發生變更時,我們需要一個機制能可靠地通知所有相關節點,并保證它們獲取到的最新的、一致性的數據。
我們很容易想到觀察者模式。當數據被修改時,告訴共享數據的訂閱者數據已更改。
因此通知機制的前提是需要有一張注冊表。
在上篇的注冊中心我們已經知道,注冊表可以通過心跳來維持其有效性。
但還有另一種做法,就是ZooKeeper的的Watcher機制。
每次修改數據通知完訂閱者后,刪除其在注冊表中的信息,每次getData()時再重新注冊。即一次性觸發 (One-time Trigger)。
這樣可以精簡設計,省去心跳機制。
集群間數據同步
簡單追求集群數據同步的強一致性,共享數據做讀寫分離處理提升性能。
實現一個共享數據中心
簡單實現兩個model類,用于存儲在內存的共享數據對象ShareData
import lombok.Data;/*** 內存中的共享數據** @author crayon* @version 1.0* @date 2025/5/14*/
@Data
public class ShareData {private String id;/*** 數據更新時間戳*/private Long lsn;/*** 數據*/private Object data;/*** 數據版本*/private int version;public ShareData(String id1, String initialValueForKey1, int version) {this.id = id1;this.data = initialValueForKey1;this.version = version;this.lsn = System.currentTimeMillis();}public void incrementVersion() {this.version++;}
}
與用于序列化的PersistenceData
package com.crayon.datashare.model;import lombok.Data;import java.io.Serializable;/*** 內存共享數據的序列化對象** @author crayon* @version 1.0* @date 2025/5/15*/
@Data
public class PersistenceData implements Serializable {/*** 數據序列化時間*/private Long serialDateTime;/*** 操作類型*/private String operaType;/*** 數據key*/private String key;/*** 共享數據*/private ShareData shareData;public PersistenceData(Builder builder) {this.key = builder.key;this.shareData = builder.shareData;this.operaType = builder.operaType;this.serialDateTime = System.currentTimeMillis();}@Overridepublic String toString() {return "PersistenceData{" +"serialDateTime=" + serialDateTime +", operaType='" + operaType + '\'' +", key='" + key + '\'' +", shareData=" + shareData +'}';}// 建造者模式public static class Builder {private String key;private ShareData shareData;private String operaType;public Builder key(String key) {this.key = key;return this;}public Builder shareData(ShareData shareData) {this.shareData = shareData;return this;}public Builder operaType(String operaType) {this.operaType = operaType;return this;}public PersistenceData build() {return new PersistenceData(this);}}}
實現最重要的數據共享中心服務端
import com.crayon.datashare.model.ShareData;import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;/*** 簡單的共享數據存儲中心** <p>* 功能如下:* <p>* API-獲取數據* API-存儲數據* API-注冊信息* <p>* 數據量到了應規模時進行序列化快照,存儲數據時日志追加** @author crayon* @version 1.0* @date 2025/5/14*/
public class ShareDataServer {/*** 使用ConcurrentHashMap存儲共享數據* <p>* keyName -> ShareData* </p>* <p>* 集群做讀寫分離設計,Leader-Follower 模型 。* master寫同步到slave,slave節點讀,負載均衡采用隨機策略。* </p>* <p>* 數據容量暫不設置上限與對應清理機制。* </p>* <p>* 沒有選舉機制,也沒有邏輯時鐘* </p>*/private static ConcurrentHashMap<String, ShareData> shareDataMaster = new ConcurrentHashMap<>();private static ConcurrentHashMap<String, ShareData> shareDataSlave1 = new ConcurrentHashMap<>();private static ConcurrentHashMap<String, ShareData> shareDataSlave2 = new ConcurrentHashMap<>();/*** 用于隨機獲取從節點*/private static Random random = new Random();/*** <p>* keyName -> ReentrantReadWriteLock* </p>* 線程安全方案一:* <p>* 使用讀寫鎖控制共享數據安全。* ConcurrentHashMap 操作數據是安全的,但是共享數據內容是可變的(Mutable)。* 當需要組合多個ConcurrentHashMap操作時,其是不安全的。* 其他線程可能在ConcurrentHashMap多個操作之間,對可變對象進行更改。* <p>* 因此需要讀寫鎖來保證寫入時候數據安全。* 在共享數據中,因為原子操作為:寫數據+日志追加,所以更需要使用鎖來控制。* <p>* 在分布式系統中,共享數據中心本身常被作為分布式鎖使用。* <p>* 如果不是需要WAL,其實可以通過不可變對象(Immutable Objects)來消除數據共享來簡化并發問題* </p>*/private static ConcurrentHashMap<String, ReentrantReadWriteLock> readWriteLocks = new ConcurrentHashMap<>();/*** 訂閱者集合* <p>* 采取一次性觸發機制(One-time Trigger),省去心跳檢測的麻煩* 每次通知訂閱者時,會從集合中移除訂閱者,訂閱者每次需要重新注冊* 比如在調用get時重新注冊* <p>* 訂閱者也可以封裝成一個對象,這里簡單一點=ip:port* <p>* keyName -> Set<ip:port>* 使用線程安全的Set,如 ConcurrentHashMap.newKeySet()* </p>*/private static ConcurrentHashMap<String, Set<String>> subscribers = new ConcurrentHashMap<>();/*** Watcher*/private static Notifier notifier = new Notifier();/*** 序列化服務* 日志操作,數據恢復(暫無)等*/private static SerializableService serializableService = new SerializableService();/*** 獲取共享數據* 采取一次性觸發機制(One-time Trigger)由Server完成** @param key* @param ipPort (可選) 客戶端標識,用于重新注冊Watcher* @param watch (可選) 是否要設置Watcher* @return*/public ShareData get(String key, String ipPort, boolean watch) {ReentrantReadWriteLock readWriteLock = readWriteLocks.computeIfAbsent(key, k -> new ReentrantReadWriteLock());readWriteLock.readLock().lock();try {ConcurrentHashMap<String, ShareData> readNode = getReadNode();ShareData shareData = readNode.get(key);if (watch && null != ipPort && !"".equals(ipPort) && null != shareData) {register(key, ipPort);}return shareData;} finally {readWriteLock.readLock().unlock();}}/*** 注冊訂閱者** @param key* @param ipPort*/public void register(String key, String ipPort) {// 使用ConcurrentHashMap.newKeySet() 創建一個線程安全的Setsubscribers.computeIfAbsent(key, k -> ConcurrentHashMap.newKeySet()).add(ipPort);}/*** 添加共享數據* 組合 日志追加 + 添加 + 集群同步* <p>* 原子操作設計:* 一般這種帶集群同步的標準方案是共識算法(Consensus Algorithm)。太復雜了,搞不來。** </p>** @param key* @param value*/public boolean set(String key, ShareData value) {ReentrantReadWriteLock readWriteLock = readWriteLocks.computeIfAbsent(key, k -> new ReentrantReadWriteLock());readWriteLock.writeLock().lock();try {// 1、寫入日志 WALboolean logSuccess = serializableService.appendLog(OperaTypeEnum.SET.getType(), key, value);if (!logSuccess) {return false;}// 2、寫入內存MastershareDataMaster.put(key, value);/*** 3、集群同步* 簡單模擬,沒有處理網絡失敗、異步、其他復雜ack機制等*/syncToSlave(key, value);// 4、通知訂閱者,從注冊表移除// 獲取并移除,實現一次性觸發Set<String> currentSubscribers = subscribers.remove(key);if (currentSubscribers != null && !currentSubscribers.isEmpty()) {for (String subscriberIpPort : currentSubscribers) {// 實際應用中,這里會通過網絡連接向客戶端發送通知notifier.notify(subscriberIpPort, key, OperaTypeEnum.CHANGE.getType());}}} catch (Exception e) {// 實際生產需要回滾等事務操作、日志記錄等return false;} finally {readWriteLock.writeLock().unlock();}return true;}/*** 集群同步* <p>* 只在set操作中調用** @param key* @param value*/private void syncToSlave(String key, ShareData value) {shareDataSlave1.put(key, value); // 模擬同步到slave1shareDataSlave2.put(key, value); // 模擬同步到slave2}/*** 50%概率隨機取節點** @return*/private ConcurrentHashMap<String, ShareData> getReadNode() {return random.nextBoolean() ? shareDataSlave1 : shareDataSlave2;}}
封裝操作類型枚舉
/*** 操作類型枚舉*/
public enum OperaTypeEnum {GET("GET"),SET("SET"),CHANGE("CHANGE");private String type;OperaTypeEnum(String type) {this.type = type;}public String getType() {return type;}}
實現序列化方法
import com.crayon.datashare.model.PersistenceData;
import com.crayon.datashare.model.ShareData;import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;/*** 序列化服務** <p>* 序列化:采用快照+日志方式* 按數據量策略進行快照* <p>* 文件內容:類Redis融合方案* 快照內容+日志內容。文件前面是RDB格式,后面是AOF格式* <p>* RDB格式:* AOF格式* <p>* 文件大小處理:采用保留數據最終狀態的壓縮方案** <p>* 恢復機制:** @author crayon* @version 1.0* @date 2025/5/15*/
public class SerializableService {/*** 假設的日志文件名*/private static final String MASTER_LOG_FILE = System.getProperty("user.dir") + "/wal/master_wal.log";/*** 日志追加* <p>* 簡化的日志格式,實際應該至少有操作類型、時間戳、序列號、狀態碼,* 數據庫的話會有數據庫的一些信息,如數據庫名字、server id等* </p>* 生產日志會有壓縮、刷盤等操作,這里簡化了*/public boolean appendLog(String operaType, String key, ShareData value) {PersistenceData persistenceData = new PersistenceData.Builder().operaType(operaType).key(key).shareData(value).build();try (PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(MASTER_LOG_FILE, true)))) {// out.flush(); // 可以考慮更頻繁的flush或根據策略fsyncout.println(persistenceData.toString());return true;} catch (IOException e) {System.err.println("Error writing to WAL: " + e.getMessage());return false;}}
}
封裝一個用于通知的監視者Watcher
/*** @author crayon* @version 1.0* @date 2025/5/16*/
public class Notifier {/*** 通知訂閱者** @param subscriberIpPort* @param key* @param operaType*/public void notify(String subscriberIpPort, String key, String operaType) {// 調用訂閱者的接口,讓訂閱者進行相應的處理}
}
封裝一個客戶端
/*** 簡單模擬客戶端* <p>* 訂閱和獲取、更新數據等操作* </p>* * @author crayon* @version 1.0* @date 2025/5/16*/
public class SubscriberClient {private String ipPort;private ShareDataServer shareDataServer = new ShareDataServer();public SubscriberClient(String ipPort) {this.ipPort = ipPort;}public void subscribe(String key) {shareDataServer.register(key, ipPort);}public ShareData get(String key, String ipPort) {return shareDataServer.get(key, ipPort, true);}}
test
import com.crayon.datashare.client.SubscriberClient;
import com.crayon.datashare.model.ShareData;
import com.crayon.datashare.server.ShareDataServer;/*** 簡單數據共享中心演示** @author crayon* @version 1.0* @date 2025/5/14*/
public class Demo {public static void main(String[] args) {ShareDataServer shareDataServer = new ShareDataServer();// 模擬客戶端1注冊對 key1 的訂閱SubscriberClient subscriberClient8080 = new SubscriberClient("127.0.0.1:8080");subscriberClient8080.subscribe("key1");SubscriberClient subscriberClient8081 = new SubscriberClient("127.0.0.1:8081");subscriberClient8081.subscribe("key1");// 模擬客戶端2注冊對 key2 的訂閱subscriberClient8081.subscribe("key2");System.out.println("\n Setting data for key1...");shareDataServer.set("key1", new ShareData("id1", "Initial Value for key1", 1));System.out.println("\n Getting data for key1 by client1 (will re-register watcher)...");ShareData data1 = shareDataServer.get("key1", "client1_ip:port", true);System.out.println("Client1 got: " + data1);System.out.println("\n Setting data for key1 again (client1 should be notified)...");shareDataServer.set("key1", new ShareData("id1", "Updated Value for key1", 2));System.out.println("\n Setting data for key2...");shareDataServer.set("key2", new ShareData("id2", "Value for key2", 1));System.out.println("\n Client2 getting data for key1 (not subscribed initially, but sets a watch now)...");ShareData data1_by_client2 = shareDataServer.get("key1", "client2_ip:port", true);System.out.println("Client2 got (for key1): " + data1_by_client2);System.out.println("\n Simulating a read from a random slave for key1:");ShareData slaveData = shareDataServer.get("key1", null, false); // No re-registerSystem.out.println("Read from slave for key1: " + slaveData);}}
結果展示如下