為了支持 SpreadJS 協同編輯場景,協同服務器需要持久化存儲文檔、操作、快照及里程碑數據。本文介紹了 MongoDB 數據庫適配器的實現方法,包括集合初始化、適配器接口實現以及里程碑存儲支持。
一、MongoDB 集合初始化
協同編輯服務需要以下集合(Collections)來存儲數據:
- documents:存儲文檔基本信息(類型、版本號、快照版本號)。
- operations:存儲操作記錄,用于實現基于 OT 的操作重放。
- snapshot_fragments:存儲快照的分片數據,支持大文檔分段管理。
- milestone_snapshot:存儲里程碑快照,用于回溯和恢復。
初始化腳本示例如下:
export async function InitCollections(client) {const collectionsToCreate = [{ name: 'documents', indexes: [{ key: { id: 1 }, unique: true }] },{ name: 'operations', indexes: [{ key: { doc_id: 1, version: 1 }, unique: true }] },{ name: 'snapshot_fragments', indexes: [{ key: { doc_id: 1, fragment_id: 1 }, unique: true }] },{ name: 'milestone_snapshot', indexes: [{ key: { doc_id: 1, version: 1 }, unique: true }] },];await client.connect();const db = client.db(dbName);
const existingCollections = (await db.listCollections().toArray()).map(c => c.name);
for (const col of collectionsToCreate) {if (!existingCollections.includes(col.name)) {await db.createCollection(col.name);console.log(`Collection '${col.name}' created.`);} else {console.log(`Collection '${col.name}' already exists.`);}for (const idx of col.indexes) {await db.collection(col.name).createIndex(idx.key, { unique: idx.unique });}}
await client.close();
}
二、MongoDB 適配器實現
- 適配器說明
適配器 MongoDb
繼承了協同服務的數據庫接口,負責:
- 文檔信息存取
- 操作記錄管理
- 快照存儲與更新
- 分片快照管理
可根據業務需要,增加 事務(session) 或 并發沖突檢測。
- 適配器核心實現
export class MongoDb extends Db {constructor(client) {super();this.client = client;this.client.connect()this.db = client.db(dbName);}async getDocument(docId) {const documents = this.db.collection('documents');let row = await documents.findOne({ id: docId });if (row) {return {id: row.id,type: row.type,version: row.version,snapshotVersion: row.snapshot_version};}}async getSnapshot(docId) {const documents = this.db.collection('documents');let row = await documents.findOne({ id: docId });if (!row) {return null;}const fragments = await this.getFragments(docId);return {id: row.id,v: row.snapshot_version,type: row.type,fragments: fragments};}async getFragments(docId) {const fragments = this.db.collection('snapshot_fragments');const rows = await fragments.find({ doc_id: docId }).toArray();if (rows.length === 0) {return {};}const results = {};for (const row of rows) {results[row.fragment_id] = JSON.parse(row.data);}return results;}async getFragment(docId, fragmentId) {const fragments = this.db.collection('snapshot_fragments');const row = await fragments.findOne({ doc_id: docId, fragment_id: fragmentId });if (row) {return JSON.parse(row.data);}return null;}async getOps(docId, from, to) {const operations = this.db.collection('operations');const query = { doc_id: docId, version: { $gte: from } };if (to !== undefined) {query.version.$lte = to;}const rows = await operations.find(query).toArray();if (rows.length === 0) {return [];}return rows.map(row => JSON.parse(row.operation));}async commitOp(docId, op, document) {try {const documents = this.db.collection('documents');const operations = this.db.collection('operations');const row = await documents.findOne({ id: docId });if (op.create) {if (row) {throw new Error(`Document with id ${docId} already exists.`);}await documents.insertOne({id: docId,type: document.type,version: document.version,snapshot_version: document.snapshotVersion},);await operations.insertOne({doc_id: docId,version: op.v,operation: JSON.stringify(op)},);return true;}else if (op.del) {if (!row) {throw new Error(`Document with id ${docId} does not exist.`);}await documents.deleteOne({ id: docId },);return true;}else {if (!row || row.version !== op.v) {throw new Error(`Document with id ${docId} does not exist or version mismatch.`);}await operations.insertOne({doc_id: docId,version: op.v,operation: JSON.stringify(op)},);await documents.updateOne({ id: docId },{ $set: { version: document.version } },);return true;}}catch (error) {console.error('Error committing operation:', error);return false;}finally {}}async commitSnapshot(docId, snapshot) {try {const documents = this.db.collection('documents');const fragments = this.db.collection('snapshot_fragments');const row = await documents.findOne({ id: docId },);if (!row) {throw new Error(`Document with id ${docId} does not exist.`);}const currentSnapshotVersion = row.snapshot_version;if (snapshot.fromVersion !== currentSnapshotVersion || snapshot.v <= currentSnapshotVersion) {throw new Error(`Snapshot version mismatch: expected ${currentSnapshotVersion}, got ${snapshot.v}`);}await documents.updateOne({ id: docId },{ $set: { snapshot_version: snapshot.v } },);if (snapshot.fragmentsChanges.deleteSnapshot) {fragments.deleteMany({ doc_id: docId },);}else {const { createFragments, updateFragments, deleteFragments } = snapshot.fragmentsChanges;if (createFragments) {const createOps = Object.entries(createFragments).map(([id, data]) => ({doc_id: docId,fragment_id: id,data: JSON.stringify(data)}));if (createOps.length > 0) {await fragments.insertMany(createOps,);}}if (updateFragments) {const updateOps = Object.entries(updateFragments).map(([id, data]) => ({updateOne: {filter: { doc_id: docId, fragment_id: id },update: { $set: { data: JSON.stringify(data) } }}}));if (updateOps.length > 0) {await fragments.bulkWrite(updateOps,// { session });}}if (deleteFragments) {const deleteOps = deleteFragments.map(id => ({deleteOne: {filter: { doc_id: docId, fragment_id: id }}}));if (deleteOps.length > 0) {await fragments.bulkWrite(deleteOps,);}}}return true;}catch (error) {console.error('Error committing snapshot:', error);return false;}finally {}}async close() {await this.client.close();}
}
三、里程碑數據存儲
里程碑快照用于優化快照恢復性能(避免從頭重放所有操作)。 實現類 MongoMilestoneDb
提供 保存 與 讀取 接口:
export class MongoMilestoneDb {constructor(client, interval) {this.client = client;this.interval = interval ? interval : 1000;this.db = client.db(dbName);}
async saveMilestoneSnapshot(snapshot) {const milestones = this.db.collection('milestone_snapshot');await milestones.insertOne({doc_id: snapshot.id,version: snapshot.v,snapshot: JSON.stringify(snapshot)});return true;}
async getMilestoneSnapshot(id, version) {const milestones = this.db.collection('milestone_snapshot');const row = await milestones.findOne({ doc_id: id, version: { $lte: version } },{ sort: { version: -1 } });if (row) {return JSON.parse(row.snapshot);}return null;}
}
四、在 DocumentServices 中配置
完成適配器與里程碑數據庫后,需要在 DocumentServices
中進行配置:
const documentServices = new OT.DocumentServices(
{ db: new MongoDb(mongoClient),milestoneDb: new MongoMilestoneDb(mongoClient, 500)
});
這樣,SpreadJS 協同服務器即可通過 MongoDB 實現文檔存儲、操作日志管理、快照與里程碑維護,保證協同編輯過程的高效與可擴展。
五、總結
本文展示了 SpreadJS 協同服務器對 MongoDB 數據庫的適配實現,主要包括:
- 集合初始化:定義所需集合與索引。
- 數據庫****適配器:支持文檔、操作、快照的存儲與管理。
- 里程碑存儲:提供快照的高效回溯能力。
- 服務集成:在
DocumentServices
中配置 MongoDB 適配器。
借助 MongoDB 的高性能與靈活數據結構,SpreadJS 協同服務可實現穩定、可擴展的文檔協作平臺。
擴展鏈接
使用 MySQL 為 SpreadJS 協同服務器提供存儲支持