Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per-document transactions #16

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 44 additions & 22 deletions src/y-leveldb.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,41 +354,62 @@ export class LeveldbPersistence {
*/
constructor (location, /* istanbul ignore next */ { level = defaultLevel, levelOptions = {} } = {}) {
const db = level(location, { ...levelOptions, valueEncoding, keyEncoding })
this.tr = promise.resolve()

// global lock
this.trAll = promise.resolve()

// per-doc locks
this.tr = new Map()

/**
* Execute an transaction on a database. This will ensure that other processes are currently not writing.
*
* This is a private method and might change in the future.
*
* @todo only transact on the same room-name. Allow for concurrency of different rooms.
*
* @template T
*
* @param {function(any):Promise<T>} f A transaction that receives the db object
* @param {string | null} docName Blocks transactions to the same docName. If null, blocks all transactions.
* @return {Promise<T>}
*/
this._transact = f => {
const currTr = this.tr
this.tr = (async () => {
this._transact = (docName, f) => {
const mutexKey = docName || ''
const currTrAll = this.trAll
const currTr = this.tr.get(mutexKey)
let res = /** @type {any} */ (null)
let tr = promise.resolve()
tr = (async () => {
await currTrAll
await currTr
let res = /** @type {any} */ (null)
try {
res = await f(db)

// remove the lock after the final transaction completes
const isFinal = this.tr.get(mutexKey) === tr
if (isFinal) {
this.tr.delete(mutexKey)
}
} catch (err) {
/* istanbul ignore next */
console.warn('Error during y-leveldb transaction', err)
}
return res
})()
return this.tr

if (docName === null) {
this.trAll = Promise.all([...this.tr.values()]).then(() => tr).then(() => res)
} else {
this.tr.set(mutexKey, tr)
}

return tr.then(() => res)
}
}

/**
* @param {string} docName
*/
flushDocument (docName) {
return this._transact(async db => {
return this._transact(docName, async db => {
const updates = await getLevelUpdates(db, docName)
const { update, sv } = mergeUpdates(updates)
await flushDocument(db, docName, update, sv)
Expand All @@ -400,7 +421,7 @@ export class LeveldbPersistence {
* @return {Promise<Y.Doc>}
*/
getYDoc (docName) {
return this._transact(async db => {
return this._transact(docName, async db => {
const updates = await getLevelUpdates(db, docName)
const ydoc = new Y.Doc()
ydoc.transact(() => {
Expand All @@ -420,7 +441,7 @@ export class LeveldbPersistence {
* @return {Promise<Uint8Array>}
*/
getStateVector (docName) {
return this._transact(async db => {
return this._transact(docName, async db => {
const { clock, sv } = await readStateVector(db, docName)
let curClock = -1
if (sv !== null) {
Expand All @@ -444,7 +465,7 @@ export class LeveldbPersistence {
* @return {Promise<number>} Returns the clock of the stored update
*/
storeUpdate (docName, update) {
return this._transact(db => storeUpdate(db, docName, update))
return this._transact(docName, db => storeUpdate(db, docName, update))
}

/**
Expand All @@ -461,9 +482,10 @@ export class LeveldbPersistence {
* @return {Promise<void>}
*/
clearDocument (docName) {
return this._transact(async db => {
return this._transact(docName, async db => {
await db.del(createDocumentStateVectorKey(docName))
await clearRange(db, createDocumentFirstKey(docName), createDocumentLastKey(docName))
this.tr.delete(docName)
})
}

Expand All @@ -474,7 +496,7 @@ export class LeveldbPersistence {
* @return {Promise<void>}
*/
setMeta (docName, metaKey, value) {
return this._transact(db => levelPut(db, createDocumentMetaKey(docName, metaKey), buffer.encodeAny(value)))
return this._transact(docName, db => levelPut(db, createDocumentMetaKey(docName, metaKey), buffer.encodeAny(value)))
}

/**
Expand All @@ -483,7 +505,7 @@ export class LeveldbPersistence {
* @return {Promise<any>}
*/
delMeta (docName, metaKey) {
return this._transact(db => db.del(createDocumentMetaKey(docName, metaKey)))
return this._transact(docName, db => db.del(createDocumentMetaKey(docName, metaKey)))
}

/**
Expand All @@ -492,7 +514,7 @@ export class LeveldbPersistence {
* @return {Promise<any>}
*/
getMeta (docName, metaKey) {
return this._transact(async db => {
return this._transact(docName, async db => {
const res = await levelGet(db, createDocumentMetaKey(docName, metaKey))
if (res === null) {
return// return void
Expand All @@ -505,7 +527,7 @@ export class LeveldbPersistence {
* @return {Promise<Array<string>>}
*/
getAllDocNames () {
return this._transact(async db => {
return this._transact(null, async db => {
const docKeys = await getAllDocs(db, false, true)
return docKeys.map(key => key[1])
})
Expand All @@ -515,7 +537,7 @@ export class LeveldbPersistence {
* @return {Promise<Array<{ name: string, sv: Uint8Array, clock: number }>>}
*/
getAllDocStateVectors () {
return this._transact(async db => {
return this._transact(null, async db => {
const docs = /** @type {any} */ (await getAllDocs(db, true, true))
return docs.map(doc => {
const { sv, clock } = decodeLeveldbStateVector(doc.value)
Expand All @@ -529,7 +551,7 @@ export class LeveldbPersistence {
* @return {Promise<Map<string, any>>}
*/
getMetas (docName) {
return this._transact(async db => {
return this._transact(docName, async db => {
const data = await getLevelBulkData(db, {
gte: createDocumentMetaKey(docName, ''),
lt: createDocumentMetaEndKey(docName),
Expand All @@ -548,13 +570,13 @@ export class LeveldbPersistence {
* @return {Promise<void>}
*/
destroy () {
return this._transact(db => db.close())
return this._transact(null, db => db.close())
}

/**
* Delete all data in database.
*/
clearAll () {
return this._transact(async db => db.clear())
return this._transact(null, async db => db.clear())
}
}
10 changes: 5 additions & 5 deletions tests/y-leveldb.tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export const testLeveldbUpdateStorage = async tc => {
ydoc1.clientID = 0 // so we can check the state vector
const leveldbPersistence = new LeveldbPersistence(storageName)
// clear all data, so we can check allData later
await leveldbPersistence._transact(async db => db.clear())
await leveldbPersistence._transact(null, async db => db.clear())
t.compareArrays([], await leveldbPersistence.getAllDocNames())

const updates = []
Expand All @@ -77,13 +77,13 @@ export const testLeveldbUpdateStorage = async tc => {
const ydoc2 = await leveldbPersistence.getYDoc(docName)
t.compareArrays(ydoc2.getArray('arr').toArray(), [2, 1])

const allData = await leveldbPersistence._transact(async db => getLevelBulkData(db, { gte: ['v1'], lt: ['v2'] }))
const allData = await leveldbPersistence._transact(docName, async db => getLevelBulkData(db, { gte: ['v1'], lt: ['v2'] }))
t.assert(allData.length > 0, 'some data exists')

t.compareArrays([docName], await leveldbPersistence.getAllDocNames())
await leveldbPersistence.clearDocument(docName)
t.compareArrays([], await leveldbPersistence.getAllDocNames())
const allData2 = await leveldbPersistence._transact(async db => getLevelBulkData(db, { gte: ['v1'], lt: ['v2'] }))
const allData2 = await leveldbPersistence._transact(docName, async db => getLevelBulkData(db, { gte: ['v1'], lt: ['v2'] }))
console.log(allData2)
t.assert(allData2.length === 0, 'really deleted all data')

Expand All @@ -108,7 +108,7 @@ export const testEncodeManyUpdates = async tc => {
})
await flushUpdatesHelper(leveldbPersistence, docName, updates)

const keys = await leveldbPersistence._transact(db => getLevelUpdates(db, docName, { keys: true, values: false }))
const keys = await leveldbPersistence._transact(docName, db => getLevelUpdates(db, docName, { keys: true, values: false }))

for (let i = 0; i < keys.length; i++) {
t.assert(keys[i][3] === i)
Expand All @@ -124,7 +124,7 @@ export const testEncodeManyUpdates = async tc => {
t.assert(ydoc2.getArray('arr').length === N)

await leveldbPersistence.flushDocument(docName)
const mergedKeys = await leveldbPersistence._transact(db => getLevelUpdates(db, docName, { keys: true, values: false }))
const mergedKeys = await leveldbPersistence._transact(docName, db => getLevelUpdates(db, docName, { keys: true, values: false }))
t.assert(mergedKeys.length === 1)

// getYDoc still works after flush/merge
Expand Down