【緩存與數據庫結合最終方案】偽從技術

實現偽從技術:基于Binlog的Following表變更監聽與緩存更新

技術方案概述

要實現一個專門消費者服務作為Following表的偽從,訂閱binlog并在數據變更時更新緩存,可以采用以下技術方案:

主要組件

  1. MySQL Binlog監聽:使用開源工具監聽MySQL的binlog
  2. 消息隊列:將變更事件發布到消息隊列(可選)
  3. 消費者服務:處理變更事件并更新緩存
  4. 緩存系統:Redis或其他緩存解決方案

具體實現步驟

1. 配置MySQL Binlog

首先確保MySQL已開啟binlog并配置為ROW模式:

-- 檢查當前binlog配置
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';-- 修改my.cnf/my.ini文件
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1

2. 使用Java實現Binlog監聽

可以使用開源的mysql-binlog-connector-java庫:

<!-- pom.xml 依賴 -->
<dependency><groupId>com.github.shyiko</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.25.4</version>
</dependency>

3. 消費者服務實現

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;public class FollowingTableBinlogConsumer {private final BinaryLogClient client;private final CacheService cacheService;public FollowingTableBinlogConsumer(String hostname, int port, String username, String password, CacheService cacheService) {this.cacheService = cacheService;this.client = new BinaryLogClient(hostname, port, username, password);client.registerEventListener(event -> {EventData data = event.getData();if (data instanceof TableMapEventData) {// 表映射事件TableMapEventData tableMapEvent = (TableMapEventData) data;if ("your_database".equals(tableMapEvent.getDatabase()) && "Following".equals(tableMapEvent.getTable())) {// 處理Following表的事件}} else if (data instanceof WriteRowsEventData) {// 插入操作processWriteEvent((WriteRowsEventData) data);} else if (data instanceof UpdateRowsEventData) {// 更新操作processUpdateEvent((UpdateRowsEventData) data);} else if (data instanceof DeleteRowsEventData) {// 刪除操作processDeleteEvent((DeleteRowsEventData) data);}});}private void processWriteEvent(WriteRowsEventData data) {// 處理新增關注事件for (Serializable[] row : data.getRows()) {Long followerId = (Long) row[0]; // 假設第一列是follower_idLong followeeId = (Long) row[1]; // 假設第二列是followee_idcacheService.addFollowing(followerId, followeeId);}}private void processUpdateEvent(UpdateRowsEventData data) {// 處理更新事件(如果Following表有更新操作)for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) {Serializable[] before = row.getKey();Serializable[] after = row.getValue();// 根據業務邏輯處理更新}}private void processDeleteEvent(DeleteRowsEventData data) {// 處理取消關注事件for (Serializable[] row : data.getRows()) {Long followerId = (Long) row[0];Long followeeId = (Long) row[1];cacheService.removeFollowing(followerId, followeeId);}}public void start() {try {client.connect();} catch (IOException e) {throw new RuntimeException("Failed to connect to MySQL binlog", e);}}public void stop() {try {client.disconnect();} catch (IOException e) {// 處理異常}}
}

4. 緩存服務實現

public interface CacheService {void addFollowing(Long followerId, Long followeeId);void removeFollowing(Long followerId, Long followeeId);Set<Long> getFollowings(Long followerId);Set<Long> getFollowers(Long followeeId);
}public class RedisCacheService implements CacheService {private final JedisPool jedisPool;public RedisCacheService(JedisPool jedisPool) {this.jedisPool = jedisPool;}@Overridepublic void addFollowing(Long followerId, Long followeeId) {try (Jedis jedis = jedisPool.getResource()) {// 用戶關注列表jedis.sadd("user:" + followerId + ":followings", followeeId.toString());// 用戶粉絲列表jedis.sadd("user:" + followeeId + ":followers", followerId.toString());}}@Overridepublic void removeFollowing(Long followerId, Long followeeId) {try (Jedis jedis = jedisPool.getResource()) {// 用戶關注列表jedis.srem("user:" + followerId + ":followings", followeeId.toString());// 用戶粉絲列表jedis.srem("user:" + followeeId + ":followers", followerId.toString());}}@Overridepublic Set<Long> getFollowings(Long followerId) {try (Jedis jedis = jedisPool.getResource()) {Set<String> followings = jedis.smembers("user:" + followerId + ":followings");return followings.stream().map(Long::valueOf).collect(Collectors.toSet());}}@Overridepublic Set<Long> getFollowers(Long followeeId) {try (Jedis jedis = jedisPool.getResource()) {Set<String> followers = jedis.smembers("user:" + followeeId + ":followers");return followers.stream().map(Long::valueOf).collect(Collectors.toSet());}}
}

5. 服務啟動

public class Application {public static void main(String[] args) {// 配置Redis連接池JedisPool jedisPool = new JedisPool("localhost", 6379);CacheService cacheService = new RedisCacheService(jedisPool);// 啟動binlog消費者FollowingTableBinlogConsumer consumer = new FollowingTableBinlogConsumer("localhost", 3306, "username", "password", cacheService);consumer.start();// 添加關閉鉤子Runtime.getRuntime().addShutdownHook(new Thread(() -> {consumer.stop();jedisPool.close();}));}
}

高級優化方案

1. 引入消息隊列(如Kafka)

// 在Binlog消費者中,將事件發布到Kafka
public class KafkaEventPublisher {private final Producer<String, String> producer;public KafkaEventPublisher(String bootstrapServers) {Properties props = new Properties();props.put("bootstrap.servers", bootstrapServers);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");this.producer = new KafkaProducer<>(props);}public void publishFollowingEvent(String eventType, Long followerId, Long followeeId) {String key = followerId + ":" + followeeId;String value = String.format("{\"eventType\":\"%s\",\"followerId\":%d,\"followeeId\":%d}", eventType, followerId, followeeId);producer.send(new ProducerRecord<>("following-events", key, value));}public void close() {producer.close();}
}// 然后有獨立的消費者服務從Kafka消費并更新緩存

2. 處理初始數據同步

// 在服務啟動時,先全量同步Following表數據到緩存
public void initialSync() {// 從數據庫讀取所有Following關系List<Following> allFollowings = followingRepository.findAll();// 批量寫入緩存try (Jedis jedis = jedisPool.getResource()) {Pipeline pipeline = jedis.pipelined();for (Following following : allFollowings) {pipeline.sadd("user:" + following.getFollowerId() + ":followings", following.getFolloweeId().toString());pipeline.sadd("user:" + following.getFolloweeId() + ":followers", following.getFollowerId().toString());}pipeline.sync();}
}

3. 監控與容錯

  • 記錄binlog位置,以便重啟后從正確位置繼續
  • 實現重試機制處理緩存更新失敗
  • 添加監控指標跟蹤事件處理延遲和錯誤率

總結

這個方案實現了Following表的偽從技術,通過監聽MySQL binlog實時捕獲數據變更,并更新Redis緩存。這種架構具有以下優點:

  1. 低延遲:幾乎實時同步數據庫變更
  2. 解耦:消費者服務獨立于主業務服務
  3. 可擴展:可以輕松添加更多消費者處理不同業務邏輯
  4. 高性能:Redis提供了高效的關系數據存儲和查詢

根據業務規模,可以選擇簡單的直接更新緩存方案,或者引入消息隊列的更復雜架構。

經過對數據庫設計、緩存設計的詳細論證,總結并提煉出緩存與數據庫結合的最終方案。

偽從方案應用場景如:用戶關系服務,關注與取消關注的接口。

  • 即接口直接更新數據庫Following表即響應用戶,后續流程對用戶來說是完全異步的。
  • Follower表、計數服務、Redis緩存會依賴Following表產生的binlog日志分別更新數據。

關于Binlog監聽在服務重啟/暫停時的數據丟失問題

Binlog監聽在服務重啟或暫停時是否會導致數據丟失,取決于具體的實現方式和配置。下面我將詳細分析這個問題及解決方案。

關鍵影響因素

1. Binlog位置記錄

  • 不記錄位置:如果服務沒有記錄已處理的binlog位置,重啟后會從當前最新的binlog位置開始,導致中間變更丟失
  • 記錄位置:正確記錄binlog位置可以確保重啟后從斷點繼續

2. MySQL binlog保留策略

  • expire_logs_days參數決定binlog保留天數
  • 如果binlog被過早清除,而服務長時間停機,可能導致無法恢復

3. 事務完整性

  • 部分處理的事務在重啟后可能導致不一致

解決方案

1. 持久化binlog位置

修改之前的消費者服務,增加位置記錄功能:

public class FollowingTableBinlogConsumer {// 增加binlog位置存儲接口private final BinlogPositionStore positionStore;public FollowingTableBinlogConsumer(..., BinlogPositionStore positionStore) {this.positionStore = positionStore;// 設置binlog文件名和位置BinlogPosition position = positionStore.getPosition();if (position != null) {client.setBinlogFilename(position.getFilename());client.setBinlogPosition(position.getPosition());}client.registerEventListener(event -> {// 處理事件...// 記錄位置if (event.getHeader().getEventType() == EventType.ROTATE) {RotateEventData rotateEvent = (RotateEventData) event.getData();positionStore.savePosition(new BinlogPosition(rotateEvent.getBinlogFilename(), rotateEvent.getBinlogPosition()));} else if (event.getHeader().getEventType() != EventType.FORMAT_DESCRIPTION) {positionStore.savePosition(new BinlogPosition(client.getBinlogFilename(), event.getHeader().getNextPosition()));}});}
}// Binlog位置存儲接口
public interface BinlogPositionStore {void savePosition(BinlogPosition position);BinlogPosition getPosition();
}// 簡單的文件存儲實現
public class FileBinlogPositionStore implements BinlogPositionStore {private final File positionFile;public FileBinlogPositionStore(String filePath) {this.positionFile = new File(filePath);}@Overridepublic void savePosition(BinlogPosition position) {try (ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(positionFile))) {out.writeObject(position);} catch (IOException e) {throw new RuntimeException("Failed to save binlog position", e);}}@Overridepublic BinlogPosition getPosition() {if (!positionFile.exists()) return null;try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(positionFile))) {return (BinlogPosition) in.readObject();} catch (Exception e) {throw new RuntimeException("Failed to read binlog position", e);}}
}// Binlog位置對象
public class BinlogPosition implements Serializable {private final String filename;private final long position;// constructor, getters...
}

2. MySQL配置優化

確保MySQL配置合理:

-- 設置足夠的binlog保留時間(根據業務需求調整)
SET GLOBAL expire_logs_days = 7;-- 或使用新的變量(MySQL 8.0+)
SET GLOBAL binlog_expire_logs_seconds = 604800;  -- 7天

3. 啟動時數據校驗和修復

服務啟動時增加校驗邏輯:

public void start() {// 檢查binlog位置是否有效BinlogPosition position = positionStore.getPosition();if (position != null) {if (!isBinlogFileExists(position.getFilename())) {// 執行全量同步initialSync();positionStore.clearPosition();}}client.connect();
}private boolean isBinlogFileExists(String filename) {// 實現檢查binlog文件是否存在的邏輯// 可以通過SHOW BINARY LOGS命令獲取當前存在的binlog文件列表
}

4. 優雅停機處理

確保服務停止時正確處理:

public void stop() {try {// 等待當前事件處理完成client.disconnect();// 確保最后的位置已保存positionStore.flush();} catch (IOException e) {// 處理異常}
}

高級保障方案

1. 引入事務表記錄處理狀態

創建一張事務記錄表:

CREATE TABLE binlog_consumer_state (consumer_id VARCHAR(100) PRIMARY KEY,binlog_filename VARCHAR(100) NOT NULL,binlog_position BIGINT NOT NULL,last_heartbeat TIMESTAMP NOT NULL,processed_checksum VARCHAR(100)
);

2. 定期檢查點(checkpoint)

// 每處理N個事件或每隔M秒記錄一次完整狀態
private void checkpoint(Event event) {// 計算當前已處理數據的校驗和String checksum = computeChecksum();// 更新數據庫狀態jdbcTemplate.update("INSERT INTO binlog_consumer_state VALUES (?, ?, ?, NOW(), ?) " +"ON DUPLICATE KEY UPDATE binlog_filename=?, binlog_position=?, last_heartbeat=NOW(), processed_checksum=?",consumerId, client.getBinlogFilename(), event.getHeader().getNextPosition(), checksum,client.getBinlogFilename(), event.getHeader().getNextPosition(), checksum);
}

3. 數據修復機制

當檢測到不一致時:

public void repairIfNeeded() {// 從數據庫獲取最后處理的狀態StateRecord state = getLastStateFromDB();// 從緩存獲取最后處理的狀態StateRecord cacheState = getLastStateFromCache();if (!state.equals(cacheState)) {// 執行修復邏輯executeRepair(state);}
}

總結

正確實現的Binlog監聽服務在重啟/暫停時不會丟失數據,但需要:

  1. 持久化記錄binlog位置(文件名+偏移量)
  2. 配置足夠的binlog保留時間
  3. 實現優雅的停機和恢復機制
  4. 考慮增加校驗和修復邏輯(針對關鍵業務)

建議的完整方案:

  • 使用混合位置存儲(本地文件+數據庫)
  • 定期檢查點
  • 啟動時數據校驗
  • 足夠的binlog保留期
  • 監控binlog消費延遲

這樣即使在服務重啟、暫停甚至長時間停機后,也能保證數據不會丟失,并能從正確的位置恢復處理。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/77385.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/77385.shtml
英文地址,請注明出處:http://en.pswp.cn/web/77385.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

《100天精通Python——基礎篇 2025 第3天:變量與數據類型全面解析,掌握Python核心語法》

目錄 一、Python變量的定義和使用二、Python整數類型&#xff08;int&#xff09;詳解三、Python小數/浮點數&#xff08;float&#xff09;類型詳解四、Python復數類型(complex)詳解---了解五、Python字符串詳解(包含長字符串和原始字符串)5.1 處理字符串中的引號5.2 字符串的…

【前后端分離項目】Vue+Springboot+MySQL

文章目錄 1.安裝 Node.js2.配置 Node.js 環境3.安裝 Node.js 國內鏡像4.創建 Vue 項目5.運行 Vue 項目6.訪問 Vue 項目7.創建 Spring Boot 項目8.運行 Spring Boot 項目9.訪問 Spring Boot 項目10.實現 Vue 與 Spring Boot 聯動11.安裝 axios12.編寫請求13.調用函數請求接口14.…

線性代數(一些別的應該關注的點)

一、矩陣 矩陣運算&#xff1a;線性變換 縮放、平移、旋轉 無所不能的矩陣 - 三維圖形變換_嗶哩嗶哩_bilibili

01Redis快速入門(nosql、安裝redis、客戶端、命令及類型、java客戶端、序列化)

Redis的常見命令和客戶端使用 1.初識Redis Redis是一種鍵值型的NoSql數據庫&#xff0c;這里有兩個關鍵字&#xff1a; 鍵值型 NoSql 其中鍵值型&#xff0c;是指Redis中存儲的數據都是以key、value對的形式存儲&#xff0c;而value的形式多種多樣&#xff0c;可以是字符串…

AI編程:[體驗]從 0 到 1 開發一個項目的初體驗

一、開發信息 開發時間&#xff1a;1.5-2天工具使用&#xff1a; 不熟練&#xff0c;開發本項目前1天&#xff0c;才簡單使用了Cursor的功能 功能復雜度&#xff1a; 開發的功能相對簡單。頁面&#xff1a;2個&#xff0c;登錄頁面&#xff0c;個人中心頁面功能&#xff1a;5個…

LeetCode-392 判斷子序列

給定字符串 s 和 t &#xff0c;判斷 s 是否為 t 的子序列。 字符串的一個子序列是原始字符串刪除一些&#xff08;也可以不刪除&#xff09;字符而不改變剩余字符相對位置形成的新字符串。&#xff08;例如&#xff0c;"ace"是"abcde"的一個子序列&#…

Linux 系統監控大師:Glances 工具詳解助力自動化

看圖猜詩&#xff0c;你有任何想法都可以在評論區留言哦~ 摘要 Glances 是一款基于 Python 開發的跨平臺系統監控工具&#xff0c;集成了 CPU、內存、磁盤、網絡、進程等核心指標的實時監控能力&#xff0c;并支持命令行、Web界面、客戶端-服務器模式等多種使用場景。其輕量級…

Spring Boot 3.4.5 運行環境需求

&#x1f4dd; Spring Boot 3.4.5 運行環境要求 &#x1f33f; 1?? 基本需求 ?? JDK版本&#xff1a;最低 Java 17 &#x1f517; https://www.java.com/ 最高兼容至 Java 24 ?? 依賴框架&#xff1a;需搭配 Spring Framework 6.2.6 &#x1f517; https://docs.sprin…

在KEIL里C51和MDK兼容以及添加ARM compiler5 version編譯器

前言 我們想在一個keil里面可以打開32和51的文件&#xff0c;這樣就不需要兩個keil了 還有就是現在的keil&#xff0c;比如我用的是5.41的&#xff0c;就沒有5版本的處理器&#xff0c;所以要安裝 本篇文章我們來詳細講解如何實現上面說的兩個內容 準備的東西 1.ARM5編譯器 …

Flutter 彈窗隊列管理:支持優先級的線程安全通用彈窗隊列系統

在復雜的 Flutter 應用開發中&#xff0c;彈窗管理是一個常見難題。手動管理彈窗的顯示順序和條件判斷不僅繁瑣&#xff0c;還容易出錯。為此&#xff0c;我們實現了一個支持優先級的線程安全通用彈窗隊列管理系統。它能夠自動管理彈窗的顯示順序&#xff0c;支持條件判斷&…

鴻蒙NEXT開發剪貼板工具類(ArkTs)

import { pasteboard } from kit.BasicServicesKit; import { StrUtil } from ./StrUtil;/*** 剪貼板工具類* 需要權限&#xff1a;* ohos.permission.READ_PASTEBOARD // 允許應用讀取剪貼板。* author CSDN-鴻蒙布道師* since 2025/04/25*/ export class PasteboardUtil {…

FastAPI 零基礎入門指南:10 分鐘搭建高性能 API

一、為什么選擇 FastAPI&#xff1f; 想象一下&#xff0c;用 Python 寫 API 可以像搭積木一樣簡單&#xff0c;同時還能擁有媲美 Go 語言的性能&#xff0c;這個框架憑借三大核心優勢迅速風靡全球&#xff1a; 開發效率提升 3 倍&#xff1a;類型注解 自動文檔&#xff0c;…

【算法】BFS-解決FloodFill問題

目錄 FloodFill問題 圖像渲染 島嶼數量 島嶼的最大面積 被圍繞的區域 FloodFill問題 FloodFill就是洪水灌溉的意思&#xff0c;假設有下面的一塊田地&#xff0c;負數代表是凹地&#xff0c;正數代表是凸地&#xff0c;數字的大小表示凹或者凸的程度。現在下一場大雨&…

代碼隨想錄算法訓練營第三十七天|動態規劃part4

1049. 最后一塊石頭的重量 II 題目鏈接&#xff1a; 1049. 最后一塊石頭的重量 II - 力扣&#xff08;LeetCode&#xff09; 文章講解&#xff1a; 代碼隨想錄 思路&#xff1a; 理解為把石頭分成兩堆 使得兩堆的差值盡可能小 求這個最小值1 理解為往背包里裝物品 每個物品的…

(八)深入了解AVFoundation-采集:拍照功能的實現

引言 在上一篇文章中&#xff0c;我們初步完成了使用 AVFoundation 采集視頻數據的流程&#xff0c;掌握了 AVCaptureSession 的搭建與視頻流的預覽顯示。 本篇將繼續深入 AVFoundation&#xff0c;聚焦于靜態圖片采集的實現。通過 AVCapturePhotoOutput&#xff0c;我們可以…

git tag使用場景和實踐

背景 每次上線一個迭代&#xff0c;為了區分本次代碼的分支是哪個迭代的commit&#xff0c;可以給分支打上tag&#xff0c;這樣利于追蹤分支所屬迭代&#xff0c;如果devops沒有自動給分支打tag&#xff0c;需要自己來打 操作 1.查看當前tag git tag2.給分支打tag git tag…

從零開始掌握Linux數據流:管道與重定向完全指南

全文目錄 1 知識背景與核心概念1.1 操作系統的輸入輸出模型1.2 Shell 的中間人角色 2 重定向技術深度解析2.1 輸出重定向2.1.1 覆蓋寫2.1.2 追加寫2.1.3 錯誤重定向2.1.4 同時重定向 stdout 和 stderr 2.2 輸入重定向2.2.1 文件作為輸入源2.2.2 Here Document&#xff08;多行輸…

aws(學習筆記第三十九課) iot-core

文章目錄 aws(學習筆記第三十九課) iotcore(Internet Of Thing)學習內容:1. 整體架構1.1 代碼鏈接1.2 整體架構(概要)1.3 整體架構(詳細 )2. 代碼解析2.1 創建`IOT thing`2.2 創建`AWS IOT certificate`證書2.2.1 創建`lambda`需要的`role`2.2.2 創建`lambda`2.2.3 `lambd…

國家新政鼓勵游戲出海,全球化安全威脅如何解

本文作者&#xff1a;騰訊宙斯盾DDoS防護團隊 01 政策紅利釋放&#xff1a;游戲出海升級為“國家戰略工程” 01 4月21日&#xff0c;國務院新聞辦公室發布《加快推進服務業擴大開放綜合試點工作方案》&#xff0c;釋放了一個信號&#xff1a;首次將“游戲出海”列為戰略級工程&…

MobX 在 React 中的使用:狀態管理的新選擇

&#x1f90d; 前端開發工程師、技術日更博主、已過CET6 &#x1f368; 阿珊和她的貓_CSDN博客專家、23年度博客之星前端領域TOP1 &#x1f560; 牛客高級專題作者、打造專欄《前端面試必備》 、《2024面試高頻手撕題》、《前端求職突破計劃》 &#x1f35a; 藍橋云課簽約作者、…