From e7051aac0156184b055b204d07aa55bd06243930 Mon Sep 17 00:00:00 2001 From: Raine Revere Date: Tue, 4 Jul 2023 17:24:54 +0000 Subject: [PATCH 1/2] per-document transactions --- src/y-leveldb.js | 54 ++++++++++++++++++++++++---------------- tests/y-leveldb.tests.js | 10 ++++---- 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/src/y-leveldb.js b/src/y-leveldb.js index 9b14646..7ffb1dc 100644 --- a/src/y-leveldb.js +++ b/src/y-leveldb.js @@ -354,33 +354,42 @@ export class LeveldbPersistence { */ constructor (location, /* istanbul ignore next */ { level = defaultLevel, levelOptions = {} } = {}) { const db = level(location, { ...levelOptions, valueEncoding, keyEncoding }) - this.tr = promise.resolve() + this.tr = new Map() /** * Execute an transaction on a database. This will ensure that other processes are currently not writing. * * This is a private method and might change in the future. * - * @todo only transact on the same room-name. Allow for concurrency of different rooms. - * * @template T * * @param {function(any):Promise} f A transaction that receives the db object + * @param {string | null} docName Blocks transactions to the same docName. If null, blocks other transactions to null. * @return {Promise} */ - this._transact = f => { - const currTr = this.tr - this.tr = (async () => { + this._transact = (docName, f) => { + const mutexKey = docName || '' + const currTr = this.tr.get(mutexKey) + let res = /** @type {any} */ (null) + let tr = promise.resolve() + tr = (async () => { await currTr - let res = /** @type {any} */ (null) try { res = await f(db) + + // remove the lock after the final transaction completes + const isFinal = this.tr.get(mutexKey) === tr + if (isFinal) { + this.tr.delete(mutexKey) + } } catch (err) { /* istanbul ignore next */ console.warn('Error during y-leveldb transaction', err) } - return res })() - return this.tr + + this.tr.set(mutexKey, tr) + + return tr.then(() => res) } } @@ -388,7 +397,7 @@ export class LeveldbPersistence { * @param {string} docName */ flushDocument (docName) { - return this._transact(async db => { + return this._transact(docName, async db => { const updates = await getLevelUpdates(db, docName) const { update, sv } = mergeUpdates(updates) await flushDocument(db, docName, update, sv) @@ -400,7 +409,7 @@ export class LeveldbPersistence { * @return {Promise} */ getYDoc (docName) { - return this._transact(async db => { + return this._transact(docName, async db => { const updates = await getLevelUpdates(db, docName) const ydoc = new Y.Doc() ydoc.transact(() => { @@ -420,7 +429,7 @@ export class LeveldbPersistence { * @return {Promise} */ getStateVector (docName) { - return this._transact(async db => { + return this._transact(docName, async db => { const { clock, sv } = await readStateVector(db, docName) let curClock = -1 if (sv !== null) { @@ -444,7 +453,7 @@ export class LeveldbPersistence { * @return {Promise} Returns the clock of the stored update */ storeUpdate (docName, update) { - return this._transact(db => storeUpdate(db, docName, update)) + return this._transact(docName, db => storeUpdate(db, docName, update)) } /** @@ -461,9 +470,10 @@ export class LeveldbPersistence { * @return {Promise} */ clearDocument (docName) { - return this._transact(async db => { + return this._transact(docName, async db => { await db.del(createDocumentStateVectorKey(docName)) await clearRange(db, createDocumentFirstKey(docName), createDocumentLastKey(docName)) + this.tr.delete(docName) }) } @@ -474,7 +484,7 @@ export class LeveldbPersistence { * @return {Promise} */ setMeta (docName, metaKey, value) { - return this._transact(db => levelPut(db, createDocumentMetaKey(docName, metaKey), buffer.encodeAny(value))) + return this._transact(docName, db => levelPut(db, createDocumentMetaKey(docName, metaKey), buffer.encodeAny(value))) } /** @@ -483,7 +493,7 @@ export class LeveldbPersistence { * @return {Promise} */ delMeta (docName, metaKey) { - return this._transact(db => db.del(createDocumentMetaKey(docName, metaKey))) + return this._transact(docName, db => db.del(createDocumentMetaKey(docName, metaKey))) } /** @@ -492,7 +502,7 @@ export class LeveldbPersistence { * @return {Promise} */ getMeta (docName, metaKey) { - return this._transact(async db => { + return this._transact(docName, async db => { const res = await levelGet(db, createDocumentMetaKey(docName, metaKey)) if (res === null) { return// return void @@ -505,7 +515,7 @@ export class LeveldbPersistence { * @return {Promise>} */ getAllDocNames () { - return this._transact(async db => { + return this._transact(null, async db => { const docKeys = await getAllDocs(db, false, true) return docKeys.map(key => key[1]) }) @@ -515,7 +525,7 @@ export class LeveldbPersistence { * @return {Promise>} */ getAllDocStateVectors () { - return this._transact(async db => { + return this._transact(null, async db => { const docs = /** @type {any} */ (await getAllDocs(db, true, true)) return docs.map(doc => { const { sv, clock } = decodeLeveldbStateVector(doc.value) @@ -529,7 +539,7 @@ export class LeveldbPersistence { * @return {Promise>} */ getMetas (docName) { - return this._transact(async db => { + return this._transact(docName, async db => { const data = await getLevelBulkData(db, { gte: createDocumentMetaKey(docName, ''), lt: createDocumentMetaEndKey(docName), @@ -548,13 +558,13 @@ export class LeveldbPersistence { * @return {Promise} */ destroy () { - return this._transact(db => db.close()) + return this._transact(null, db => db.close()) } /** * Delete all data in database. */ clearAll () { - return this._transact(async db => db.clear()) + return this._transact(null, async db => db.clear()) } } diff --git a/tests/y-leveldb.tests.js b/tests/y-leveldb.tests.js index c6129ec..6839c9a 100644 --- a/tests/y-leveldb.tests.js +++ b/tests/y-leveldb.tests.js @@ -55,7 +55,7 @@ export const testLeveldbUpdateStorage = async tc => { ydoc1.clientID = 0 // so we can check the state vector const leveldbPersistence = new LeveldbPersistence(storageName) // clear all data, so we can check allData later - await leveldbPersistence._transact(async db => db.clear()) + await leveldbPersistence._transact(null, async db => db.clear()) t.compareArrays([], await leveldbPersistence.getAllDocNames()) const updates = [] @@ -77,13 +77,13 @@ export const testLeveldbUpdateStorage = async tc => { const ydoc2 = await leveldbPersistence.getYDoc(docName) t.compareArrays(ydoc2.getArray('arr').toArray(), [2, 1]) - const allData = await leveldbPersistence._transact(async db => getLevelBulkData(db, { gte: ['v1'], lt: ['v2'] })) + const allData = await leveldbPersistence._transact(docName, async db => getLevelBulkData(db, { gte: ['v1'], lt: ['v2'] })) t.assert(allData.length > 0, 'some data exists') t.compareArrays([docName], await leveldbPersistence.getAllDocNames()) await leveldbPersistence.clearDocument(docName) t.compareArrays([], await leveldbPersistence.getAllDocNames()) - const allData2 = await leveldbPersistence._transact(async db => getLevelBulkData(db, { gte: ['v1'], lt: ['v2'] })) + const allData2 = await leveldbPersistence._transact(docName, async db => getLevelBulkData(db, { gte: ['v1'], lt: ['v2'] })) console.log(allData2) t.assert(allData2.length === 0, 'really deleted all data') @@ -108,7 +108,7 @@ export const testEncodeManyUpdates = async tc => { }) await flushUpdatesHelper(leveldbPersistence, docName, updates) - const keys = await leveldbPersistence._transact(db => getLevelUpdates(db, docName, { keys: true, values: false })) + const keys = await leveldbPersistence._transact(docName, db => getLevelUpdates(db, docName, { keys: true, values: false })) for (let i = 0; i < keys.length; i++) { t.assert(keys[i][3] === i) @@ -124,7 +124,7 @@ export const testEncodeManyUpdates = async tc => { t.assert(ydoc2.getArray('arr').length === N) await leveldbPersistence.flushDocument(docName) - const mergedKeys = await leveldbPersistence._transact(db => getLevelUpdates(db, docName, { keys: true, values: false })) + const mergedKeys = await leveldbPersistence._transact(docName, db => getLevelUpdates(db, docName, { keys: true, values: false })) t.assert(mergedKeys.length === 1) // getYDoc still works after flush/merge From 8f74fd785ddeafa1ab77a96038d89fb196e48c49 Mon Sep 17 00:00:00 2001 From: Raine Revere Date: Tue, 4 Jul 2023 17:26:43 +0000 Subject: [PATCH 2/2] lock all for non-specific transactions --- src/y-leveldb.js | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/y-leveldb.js b/src/y-leveldb.js index 7ffb1dc..2ea9de8 100644 --- a/src/y-leveldb.js +++ b/src/y-leveldb.js @@ -354,7 +354,13 @@ export class LeveldbPersistence { */ constructor (location, /* istanbul ignore next */ { level = defaultLevel, levelOptions = {} } = {}) { const db = level(location, { ...levelOptions, valueEncoding, keyEncoding }) + + // global lock + this.trAll = promise.resolve() + + // per-doc locks this.tr = new Map() + /** * Execute an transaction on a database. This will ensure that other processes are currently not writing. * @@ -363,15 +369,17 @@ export class LeveldbPersistence { * @template T * * @param {function(any):Promise} f A transaction that receives the db object - * @param {string | null} docName Blocks transactions to the same docName. If null, blocks other transactions to null. + * @param {string | null} docName Blocks transactions to the same docName. If null, blocks all transactions. * @return {Promise} */ this._transact = (docName, f) => { const mutexKey = docName || '' + const currTrAll = this.trAll const currTr = this.tr.get(mutexKey) let res = /** @type {any} */ (null) let tr = promise.resolve() tr = (async () => { + await currTrAll await currTr try { res = await f(db) @@ -387,7 +395,11 @@ export class LeveldbPersistence { } })() - this.tr.set(mutexKey, tr) + if (docName === null) { + this.trAll = Promise.all([...this.tr.values()]).then(() => tr).then(() => res) + } else { + this.tr.set(mutexKey, tr) + } return tr.then(() => res) }