Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rocksdb: use atomic batches for state updates #202

Merged
merged 14 commits into from
Jan 6, 2025
Merged
165 changes: 123 additions & 42 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,22 @@ module.exports = class Autobase extends ReadyResource {
key: this._primaryBootstrap ? await this._primaryBootstrap.getUserData('autobase/local') : null
}

this.local = Autobase.getLocalCore(this.store, opts, this.encryptionKey)
if (this._primaryBootstrap) {
await this._primaryBootstrap.ready()
if (this._primaryBootstrap.writable) {
this.local = this._primaryBootstrap.session({
compat: false,
active: false,
exclusive: true,
valueEncoding: messages.OplogMessage,
encryptionKey: this.encryptionKey
})
}
}

if (!this.local) {
this.local = Autobase.getLocalCore(this.store, opts, this.encryptionKey)
}

await this.local.ready()

Expand Down Expand Up @@ -1182,16 +1197,49 @@ module.exports = class Autobase extends ReadyResource {
}

async _reindex () {
let store = null
let system = this.system

if (this._updates.length) {
await this._undoAll()
await this._refreshSystemState(this.system)
this._updates = []

const checkout = await this._viewInfo(this._indexedLength)

store = this._viewStore.memorySession(checkout)
const truncated = await this.openMemoryView(store)

system = truncated.system

await store.opened()
await system.ready()

await this._refreshSystemState(system)
}

const sameIndexers = this.system.sameIndexers(this.linearizer.indexers)
const sameIndexers = system.sameIndexers(this.linearizer.indexers)

await this._makeLinearizer(this.system)
if (!sameIndexers) await this._viewStore.migrate(this._indexedLength)
await this._makeLinearizer(system)

if (!sameIndexers) {
const name = this._viewStore.getSystemCore().name
const prologue = { hash: await system.core.treeHash(), length: system.core.length }
const key = this.deriveKey(name, this.linearizer.indexers, prologue)

const atom = this.store.storage.atom()

const actions = [
this._advanceBootRecord(key, atom),
this._viewStore.migrate(system, atom)
]

await Promise.all(actions)
} else {
await this._advanceBootRecord(this.system.core.key, null)
}

if (store) await store.close()

await this._refreshSystemState(this.system)
this.version = this.system.version

this.queueFastForward()
Expand Down Expand Up @@ -1298,21 +1346,23 @@ module.exports = class Autobase extends ReadyResource {
return added
}

async _advanceBootRecord () {
async _advanceBootRecord (key, atom) {
const info = await this.getIndexedInfo()
const views = this._viewStore.indexedViewsByName(info)
await this._setBootRecord(this.system.core.key, views)
await this._setBootRecord(key, views, atom)
}

async _setBootRecord (key, views) {
async _setBootRecord (key, views, atom) {
const pointer = c.encode(messages.BootRecord, { key, views })
await this.local.setUserData('autobase/boot', pointer)
await this.local.setUserData('autobase/boot', pointer, { atom })
}

async _persistUpdates () {
async _persistUpdates (length, atom) {
const updates = []

for (const u of this._pendingFlush.concat(this._updates)) {
if (length !== -1 && u.systemLength < length) continue

const views = []
for (const view of u.views) {
views.push({ core: view.core.systemIndex, appending: view.appending })
Expand All @@ -1321,11 +1371,9 @@ module.exports = class Autobase extends ReadyResource {
updates.push({ ...u, views })
}

return this.local.setUserData('autobase/updates', c.encode(messages.UpdateArray, updates))
}
await this.local.setUserData('autobase/updates', c.encode(messages.UpdateArray, updates), { atom })

async _flushPendingUpdates (length) {
this._systemPointer = length
if (length === -1) return

while (this._pendingFlush.length) {
if (this._pendingFlush[0].systemLength > length) break
Expand Down Expand Up @@ -1376,18 +1424,13 @@ module.exports = class Autobase extends ReadyResource {

if (this._interrupting) return

await this._persistUpdates()

if (this._interrupting) return

if (this.localWriter !== null && localNodes !== null) {
await this._flushLocal(localNodes)
}

if (this._interrupting) return

const flushed = (await this._flushIndexes()) ? this.system.core.signedLength : this._systemPointer
if (this.updating || flushed > this._systemPointer) await this._flushPendingUpdates(flushed)
await this._flushIndexes()

if (indexed) await this.onindex(this)

Expand All @@ -1413,7 +1456,6 @@ module.exports = class Autobase extends ReadyResource {

await this._gcWriters()
await this._reindex(changed)
await this._advanceBootRecord()
}

// emit state changes post drain
Expand Down Expand Up @@ -1966,12 +2008,19 @@ module.exports = class Autobase extends ReadyResource {

await this._undoAll()

const atom = this.store.storage.atom()
const actions = []

if (migrated) actions.push(this._advanceBootRecord(key, atom))

for (const view of this._viewStore.opened.values()) {
const info = views.get(view)
if (info) await view.catchup(info)
else if (migrated) await view.migrateTo(indexers, 0)
if (info) actions.push(view.catchup(info, atom))
else if (migrated) actions.push(view.migrateTo(indexers, 0, atom))
}

await Promise.all(actions)

await this._refreshSystemState(this.system)

if (this.localWriter) {
Expand All @@ -1982,7 +2031,8 @@ module.exports = class Autobase extends ReadyResource {
this._indexedLength = length

await this._makeLinearizer(this.system)
await this._advanceBootRecord()

this.version = this.system.version

// manually set the digest
if (migrated) {
Expand Down Expand Up @@ -2012,25 +2062,43 @@ module.exports = class Autobase extends ReadyResource {
}

async _flushIndexes () {
if (this._indexedLength === this.system.core.signedLength) return true
if (this._indexedLength === this.system.core.signedLength) {
return this._persistUpdates(this.system.core.signedLength)
}

let complete = true
this._updatingCores = false

const { views } = await this.system.getIndexedInfo(this._indexedLength)

const atom = this.store.storage.atom()

atom.enter()

let systemLength = -1
const flushing = []

for (const core of this._viewStore.opened.values()) {
const index = core.systemIndex
if (!core._isSystem() && (index === -1 || index >= views.length)) continue
if (core._isSystem()) {
systemLength = await core.signer.getSignableLength(this.linearizer.indexers, this._indexedLength)

const length = core._isSystem() ? this._indexedLength : views[index].length
flushing.push(core.flush(this._indexedLength, atom))
core._onindex(this._indexedLength)
continue
}

if (!(await core.flush(length))) complete = false
if (core.systemIndex === -1 || core.systemIndex >= views.length) continue

await core._onindex(length)
const v = views[core.systemIndex]

flushing.push(core.flush(v.length, atom))
core._onindex(v.length)
}

return complete
flushing.push(this._persistUpdates(systemLength, atom).catch(safetyCatch)) // throws when fails

atom.exit()

return Promise.all(flushing)
}

// triggered from apply
Expand Down Expand Up @@ -2085,13 +2153,10 @@ module.exports = class Autobase extends ReadyResource {
}

async _undoAll () {
let count = 0
for (const u of this._updates) {
count += u.batch
}
this._updates = []

const p = []
for (const [ac, length] of await this._undo(count)) {
for (const [ac, length] of await this._viewInfo(this._indexedLength)) {
p.push(ac.truncate(length))
}

Expand All @@ -2113,10 +2178,16 @@ module.exports = class Autobase extends ReadyResource {
const u = updates[updates.length - 1]
const systemLength = u ? u.systemLength : this._indexedLength

return this._viewInfo(systemLength)
}

async _viewInfo (systemLength) {
const checkout = new Map()

const { views } = await this.system.getIndexedInfo(systemLength)

for (const [core, length] of checkout) {
if (length === 0) continue
for (const core of this._viewStore.opened.values()) {
if (core.length === 0) continue

if (core._isSystem()) {
checkout.set(core, systemLength)
Expand Down Expand Up @@ -2212,6 +2283,8 @@ module.exports = class Autobase extends ReadyResource {

const checkout = await this._undo(u.undo)

const atom = this.store.storage.atom()

const store = this._viewStore.memorySession(checkout)
const { view, system } = await this.openMemoryView(store)

Expand Down Expand Up @@ -2302,7 +2375,10 @@ module.exports = class Autobase extends ReadyResource {

if (!update.indexers && !upgraded) continue

await store.flush()
const flush = store.flush(atom)
const updates = this._persistUpdates(-1, atom)

await Promise.all([flush, updates])

await this.system.update()

Expand All @@ -2313,7 +2389,10 @@ module.exports = class Autobase extends ReadyResource {
return update.systemLength
}

await store.flush()
const flush = store.flush(atom)
const updates = this._persistUpdates(-1, atom)

await Promise.all([flush, updates])

await this.system.update()

Expand All @@ -2324,6 +2403,8 @@ module.exports = class Autobase extends ReadyResource {

return -1
} finally {
// incase we bailed midway through, teardown the batch
if (atom.batch) atom.batch.destroy()
this._applySystem = null
await store.close()
}
Expand Down
Loading