實現偽從技術:基于Binlog的Following表變更監聽與緩存更新
技術方案概述
要實現一個專門消費者服務作為Following表的偽從,訂閱binlog并在數據變更時更新緩存,可以采用以下技術方案:
主要組件
- MySQL Binlog監聽:使用開源工具監聽MySQL的binlog
- 消息隊列:將變更事件發布到消息隊列(可選)
- 消費者服務:處理變更事件并更新緩存
- 緩存系統:Redis或其他緩存解決方案
具體實現步驟
1. 配置MySQL Binlog
首先確保MySQL已開啟binlog并配置為ROW模式:
-- 檢查當前binlog配置
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';-- 修改my.cnf/my.ini文件
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
2. 使用Java實現Binlog監聽
可以使用開源的mysql-binlog-connector-java
庫:
<!-- pom.xml 依賴 -->
<dependency><groupId>com.github.shyiko</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.25.4</version>
</dependency>
3. 消費者服務實現
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;public class FollowingTableBinlogConsumer {private final BinaryLogClient client;private final CacheService cacheService;public FollowingTableBinlogConsumer(String hostname, int port, String username, String password, CacheService cacheService) {this.cacheService = cacheService;this.client = new BinaryLogClient(hostname, port, username, password);client.registerEventListener(event -> {EventData data = event.getData();if (data instanceof TableMapEventData) {// 表映射事件TableMapEventData tableMapEvent = (TableMapEventData) data;if ("your_database".equals(tableMapEvent.getDatabase()) && "Following".equals(tableMapEvent.getTable())) {// 處理Following表的事件}} else if (data instanceof WriteRowsEventData) {// 插入操作processWriteEvent((WriteRowsEventData) data);} else if (data instanceof UpdateRowsEventData) {// 更新操作processUpdateEvent((UpdateRowsEventData) data);} else if (data instanceof DeleteRowsEventData) {// 刪除操作processDeleteEvent((DeleteRowsEventData) data);}});}private void processWriteEvent(WriteRowsEventData data) {// 處理新增關注事件for (Serializable[] row : data.getRows()) {Long followerId = (Long) row[0]; // 假設第一列是follower_idLong followeeId = (Long) row[1]; // 假設第二列是followee_idcacheService.addFollowing(followerId, followeeId);}}private void processUpdateEvent(UpdateRowsEventData data) {// 處理更新事件(如果Following表有更新操作)for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) {Serializable[] before = row.getKey();Serializable[] after = row.getValue();// 根據業務邏輯處理更新}}private void processDeleteEvent(DeleteRowsEventData data) {// 處理取消關注事件for (Serializable[] row : data.getRows()) {Long followerId = (Long) row[0];Long followeeId = (Long) row[1];cacheService.removeFollowing(followerId, followeeId);}}public void start() {try {client.connect();} catch (IOException e) {throw new RuntimeException("Failed to connect to MySQL binlog", e);}}public void stop() {try {client.disconnect();} catch (IOException e) {// 處理異常}}
}
4. 緩存服務實現
public interface CacheService {void addFollowing(Long followerId, Long followeeId);void removeFollowing(Long followerId, Long followeeId);Set<Long> getFollowings(Long followerId);Set<Long> getFollowers(Long followeeId);
}public class RedisCacheService implements CacheService {private final JedisPool jedisPool;public RedisCacheService(JedisPool jedisPool) {this.jedisPool = jedisPool;}@Overridepublic void addFollowing(Long followerId, Long followeeId) {try (Jedis jedis = jedisPool.getResource()) {// 用戶關注列表jedis.sadd("user:" + followerId + ":followings", followeeId.toString());// 用戶粉絲列表jedis.sadd("user:" + followeeId + ":followers", followerId.toString());}}@Overridepublic void removeFollowing(Long followerId, Long followeeId) {try (Jedis jedis = jedisPool.getResource()) {// 用戶關注列表jedis.srem("user:" + followerId + ":followings", followeeId.toString());// 用戶粉絲列表jedis.srem("user:" + followeeId + ":followers", followerId.toString());}}@Overridepublic Set<Long> getFollowings(Long followerId) {try (Jedis jedis = jedisPool.getResource()) {Set<String> followings = jedis.smembers("user:" + followerId + ":followings");return followings.stream().map(Long::valueOf).collect(Collectors.toSet());}}@Overridepublic Set<Long> getFollowers(Long followeeId) {try (Jedis jedis = jedisPool.getResource()) {Set<String> followers = jedis.smembers("user:" + followeeId + ":followers");return followers.stream().map(Long::valueOf).collect(Collectors.toSet());}}
}
5. 服務啟動
public class Application {public static void main(String[] args) {// 配置Redis連接池JedisPool jedisPool = new JedisPool("localhost", 6379);CacheService cacheService = new RedisCacheService(jedisPool);// 啟動binlog消費者FollowingTableBinlogConsumer consumer = new FollowingTableBinlogConsumer("localhost", 3306, "username", "password", cacheService);consumer.start();// 添加關閉鉤子Runtime.getRuntime().addShutdownHook(new Thread(() -> {consumer.stop();jedisPool.close();}));}
}
高級優化方案
1. 引入消息隊列(如Kafka)
// 在Binlog消費者中,將事件發布到Kafka
public class KafkaEventPublisher {private final Producer<String, String> producer;public KafkaEventPublisher(String bootstrapServers) {Properties props = new Properties();props.put("bootstrap.servers", bootstrapServers);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");this.producer = new KafkaProducer<>(props);}public void publishFollowingEvent(String eventType, Long followerId, Long followeeId) {String key = followerId + ":" + followeeId;String value = String.format("{\"eventType\":\"%s\",\"followerId\":%d,\"followeeId\":%d}", eventType, followerId, followeeId);producer.send(new ProducerRecord<>("following-events", key, value));}public void close() {producer.close();}
}// 然后有獨立的消費者服務從Kafka消費并更新緩存
2. 處理初始數據同步
// 在服務啟動時,先全量同步Following表數據到緩存
public void initialSync() {// 從數據庫讀取所有Following關系List<Following> allFollowings = followingRepository.findAll();// 批量寫入緩存try (Jedis jedis = jedisPool.getResource()) {Pipeline pipeline = jedis.pipelined();for (Following following : allFollowings) {pipeline.sadd("user:" + following.getFollowerId() + ":followings", following.getFolloweeId().toString());pipeline.sadd("user:" + following.getFolloweeId() + ":followers", following.getFollowerId().toString());}pipeline.sync();}
}
3. 監控與容錯
- 記錄binlog位置,以便重啟后從正確位置繼續
- 實現重試機制處理緩存更新失敗
- 添加監控指標跟蹤事件處理延遲和錯誤率
總結
這個方案實現了Following表的偽從技術,通過監聽MySQL binlog實時捕獲數據變更,并更新Redis緩存。這種架構具有以下優點:
- 低延遲:幾乎實時同步數據庫變更
- 解耦:消費者服務獨立于主業務服務
- 可擴展:可以輕松添加更多消費者處理不同業務邏輯
- 高性能:Redis提供了高效的關系數據存儲和查詢
根據業務規模,可以選擇簡單的直接更新緩存方案,或者引入消息隊列的更復雜架構。
經過對數據庫設計、緩存設計的詳細論證,總結并提煉出緩存與數據庫結合的最終方案。
偽從方案應用場景如:用戶關系服務,關注與取消關注的接口。
- 即接口直接更新數據庫Following表即響應用戶,后續流程對用戶來說是完全異步的。
- Follower表、計數服務、Redis緩存會依賴Following表產生的binlog日志分別更新數據。
關于Binlog監聽在服務重啟/暫停時的數據丟失問題
Binlog監聽在服務重啟或暫停時是否會導致數據丟失,取決于具體的實現方式和配置。下面我將詳細分析這個問題及解決方案。
關鍵影響因素
1. Binlog位置記錄
- 不記錄位置:如果服務沒有記錄已處理的binlog位置,重啟后會從當前最新的binlog位置開始,導致中間變更丟失
- 記錄位置:正確記錄binlog位置可以確保重啟后從斷點繼續
2. MySQL binlog保留策略
expire_logs_days
參數決定binlog保留天數- 如果binlog被過早清除,而服務長時間停機,可能導致無法恢復
3. 事務完整性
- 部分處理的事務在重啟后可能導致不一致
解決方案
1. 持久化binlog位置
修改之前的消費者服務,增加位置記錄功能:
public class FollowingTableBinlogConsumer {// 增加binlog位置存儲接口private final BinlogPositionStore positionStore;public FollowingTableBinlogConsumer(..., BinlogPositionStore positionStore) {this.positionStore = positionStore;// 設置binlog文件名和位置BinlogPosition position = positionStore.getPosition();if (position != null) {client.setBinlogFilename(position.getFilename());client.setBinlogPosition(position.getPosition());}client.registerEventListener(event -> {// 處理事件...// 記錄位置if (event.getHeader().getEventType() == EventType.ROTATE) {RotateEventData rotateEvent = (RotateEventData) event.getData();positionStore.savePosition(new BinlogPosition(rotateEvent.getBinlogFilename(), rotateEvent.getBinlogPosition()));} else if (event.getHeader().getEventType() != EventType.FORMAT_DESCRIPTION) {positionStore.savePosition(new BinlogPosition(client.getBinlogFilename(), event.getHeader().getNextPosition()));}});}
}// Binlog位置存儲接口
public interface BinlogPositionStore {void savePosition(BinlogPosition position);BinlogPosition getPosition();
}// 簡單的文件存儲實現
public class FileBinlogPositionStore implements BinlogPositionStore {private final File positionFile;public FileBinlogPositionStore(String filePath) {this.positionFile = new File(filePath);}@Overridepublic void savePosition(BinlogPosition position) {try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(positionFile))) {out.writeObject(position);} catch (IOException e) {throw new RuntimeException("Failed to save binlog position", e);}}@Overridepublic BinlogPosition getPosition() {if (!positionFile.exists()) return null;try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(positionFile))) {return (BinlogPosition) in.readObject();} catch (Exception e) {throw new RuntimeException("Failed to read binlog position", e);}}
}// Binlog位置對象
public class BinlogPosition implements Serializable {private final String filename;private final long position;// constructor, getters...
}
2. MySQL配置優化
確保MySQL配置合理:
-- 設置足夠的binlog保留時間(根據業務需求調整)
SET GLOBAL expire_logs_days = 7;-- 或使用新的變量(MySQL 8.0+)
SET GLOBAL binlog_expire_logs_seconds = 604800; -- 7天
3. 啟動時數據校驗和修復
服務啟動時增加校驗邏輯:
public void start() {// 檢查binlog位置是否有效BinlogPosition position = positionStore.getPosition();if (position != null) {if (!isBinlogFileExists(position.getFilename())) {// 執行全量同步initialSync();positionStore.clearPosition();}}client.connect();
}private boolean isBinlogFileExists(String filename) {// 實現檢查binlog文件是否存在的邏輯// 可以通過SHOW BINARY LOGS命令獲取當前存在的binlog文件列表
}
4. 優雅停機處理
確保服務停止時正確處理:
public void stop() {try {// 等待當前事件處理完成client.disconnect();// 確保最后的位置已保存positionStore.flush();} catch (IOException e) {// 處理異常}
}
高級保障方案
1. 引入事務表記錄處理狀態
創建一張事務記錄表:
CREATE TABLE binlog_consumer_state (consumer_id VARCHAR(100) PRIMARY KEY,binlog_filename VARCHAR(100) NOT NULL,binlog_position BIGINT NOT NULL,last_heartbeat TIMESTAMP NOT NULL,processed_checksum VARCHAR(100)
);
2. 定期檢查點(checkpoint)
// 每處理N個事件或每隔M秒記錄一次完整狀態
private void checkpoint(Event event) {// 計算當前已處理數據的校驗和String checksum = computeChecksum();// 更新數據庫狀態jdbcTemplate.update("INSERT INTO binlog_consumer_state VALUES (?, ?, ?, NOW(), ?) " +"ON DUPLICATE KEY UPDATE binlog_filename=?, binlog_position=?, last_heartbeat=NOW(), processed_checksum=?",consumerId, client.getBinlogFilename(), event.getHeader().getNextPosition(), checksum,client.getBinlogFilename(), event.getHeader().getNextPosition(), checksum);
}
3. 數據修復機制
當檢測到不一致時:
public void repairIfNeeded() {// 從數據庫獲取最后處理的狀態StateRecord state = getLastStateFromDB();// 從緩存獲取最后處理的狀態StateRecord cacheState = getLastStateFromCache();if (!state.equals(cacheState)) {// 執行修復邏輯executeRepair(state);}
}
總結
正確實現的Binlog監聽服務在重啟/暫停時不會丟失數據,但需要:
- 持久化記錄binlog位置(文件名+偏移量)
- 配置足夠的binlog保留時間
- 實現優雅的停機和恢復機制
- 考慮增加校驗和修復邏輯(針對關鍵業務)
建議的完整方案:
- 使用混合位置存儲(本地文件+數據庫)
- 定期檢查點
- 啟動時數據校驗
- 足夠的binlog保留期
- 監控binlog消費延遲
這樣即使在服務重啟、暫停甚至長時間停機后,也能保證數據不會丟失,并能從正確的位置恢復處理。