方案總覽
數據導入過程: ? ?
- 根據控制表判斷當前活躍組(假設當前活躍的是a,那么接下來要導入到b)。 ? ?
- 清空非活躍表(即b表)的數據,然后將新數據導入到b表。 ? ?
- 切換控制表,將活躍組改為b,這樣新的查詢就會使用view_b(指向table_b)。 ? ?
- 延遲1分鐘后清空原來的活躍表(即a表)的數據。
系統設計
1. 數據庫結構
-- 數據表
CREATE TABLE data_a (id SERIAL PRIMARY KEY, ...);
CREATE TABLE data_b (id SERIAL PRIMARY KEY, ...);-- 視圖(始終指向活躍表)
CREATE OR REPLACE VIEW current_data AS SELECT * FROM data_a; -- 初始指向A表-- 控制表(關鍵元數據)
CREATE TABLE ab_control (id SERIAL PRIMARY KEY,active_group CHAR(1) NOT NULL CHECK (active_group IN ('a','b')),next_switch_time TIMESTAMP
);
INSERT INTO ab_control(active_group) VALUES ('a'); -- 初始狀態--Powered by https://zhengkai.blog.csdn.net/
2. 狀態流轉邏輯
當前活躍組 | 操作步驟
-------------------------
A (初始) → 導入數據到B表 → 切換視圖到B → (1分鐘后清空A表)→ 狀態變更為B
B → 反向操作
Express 實現代碼
僅供參考,按照實際使用場景進行改造
const express = require('express');
const { Pool } = require('pg'); // 以PostgreSQL為例
const app = express();
app.use(express.json());// 數據庫配置
const pool = new Pool({...});// 獲取當前活躍組
async function getActiveGroup() {const res = await pool.query('SELECT active_group FROM ab_control LIMIT 1');return res.rows[0].active_group;
}// Powered by https://zhengkai.blog.csdn.net/
// 主切換函數
async function switchDataGroup(newData) {const client = await pool.connect();try {await client.query('BEGIN');// 1. 獲取當前狀態const { active_group } = (await client.query('SELECT active_group FROM ab_control FOR UPDATE')).rows[0];// 2. 確定目標組const targetGroup = active_group === 'a' ? 'b' : 'a';const targetTable = `data_${targetGroup}`;// 3. 清空目標表并導入數據await client.query(`TRUNCATE TABLE ${targetTable}`);await client.query(`INSERT INTO ${targetTable} (col1, col2) VALUES ${newData.map(d => `(${d.val1}, ${d.val2})`).join(',')}`);// 4. 切換視圖await client.query(`CREATE OR REPLACE VIEW current_data AS SELECT * FROM ${targetTable}`);// 5. 更新控制表await client.query(`UPDATE ab_control SET active_group = $1,next_switch_time = NOW() + INTERVAL '1 minute'`, [targetGroup]);await client.query('COMMIT');// 6. 啟動延遲清空任務(非事務內)setTimeout(() => clearOldTable(active_group), 60000); } finally {client.release();}
}// 延遲清空舊表
async function clearOldTable(oldGroup) {const client = await pool.connect();try {await client.query(`TRUNCATE TABLE data_${oldGroup}`);} finally {client.release();}
}// 數據導入路由
app.post('/import', async (req, res) => {try {await switchDataGroup(req.body.data);res.status(200).send('Import and switch successful');} catch (err) {console.error('Switch failed:', err);res.status(500).send('Switch operation failed');}
});// 查詢路由(始終使用統一視圖)
app.get('/data', async (req, res) => {const result = await pool.query('SELECT * FROM current_data');res.json(result.rows);
});app.listen(3000, () => console.log('Server running on port 3000'));
關鍵設計說明
原子性切換
使用事務(BEGIN/COMMIT)確保:三者操作的原子性
目標表清空+導入
控制表更新
視圖解耦
current_data
?視圖始終作為應用層統一查詢入口切換時動態重建視圖指向新物理表
延遲清理
使用?
setTimeout
?實現1分鐘延遲清空避免阻塞主流程
注意:生產環境建議用Redis/Kue等持久化定時任務
并發控制
SELECT ... FOR UPDATE
?鎖控制表,防止并發切換視圖重建瞬間的查詢短暫阻塞可接受
故障恢復
控制表記錄?
next_switch_time
?可用于:重啟后檢查未完成的清理任務
監控切換狀態
生產環境增強建議
切換日志表
CREATE TABLE ab_switch_log (switch_time TIMESTAMPTZ PRIMARY KEY,from_group CHAR(1),to_group CHAR(1),success BOOLEAN );
重試機制
// 在clearOldTable中添加重試邏輯 async function clearOldTable(group) {let attempts = 0;while (attempts < 3) {try {await pool.query(`TRUNCATE TABLE data_${group}`);return;} catch (err) {attempts++;await new Promise(r => setTimeout(r, 5000));}}// 告警通知 }
維護接口
// 手動觸發清理 app.post('/force-clean', async (req, res) => {const { group } = req.body;await clearOldTable(group);res.send(`Cleaned data_${group}`); });
監控指標
每次切換時記錄:切換耗時、數據量大小
視圖查詢性能監控
此設計實現了平滑的AB表切換,確保服務連續性,同時通過延遲清理機制保障數據安全