From 398ba6b53e4d2e280413b4f5bb67de8abee32cec Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Fri, 23 Aug 2024 09:32:14 +0200 Subject: [PATCH] Refresh writers on time travel (#174) * refresh writers when system updates * add chris test * allow acks when not writable still * getwriterbykey already resets isRemoved --- index.js | 43 ++++++++++++++++++++++++++------- lib/system.js | 3 ++- test/basic.js | 55 +++++++++++++++++++++++++++++++++++++++++++ test/helpers/index.js | 2 +- 4 files changed, 92 insertions(+), 11 deletions(-) diff --git a/index.js b/index.js index fade29ae..9b5b256c 100644 --- a/index.js +++ b/index.js @@ -181,7 +181,11 @@ module.exports = class Autobase extends ReadyResource { } get writable () { - return this.localWriter !== null + return this.localWriter !== null && !this.localWriter.isRemoved + } + + get ackable () { + return this.localWriter !== null // prop should add .isIndexer but keeping it simple for now } get key () { @@ -779,7 +783,7 @@ module.exports = class Autobase extends ReadyResource { const unflushed = this._hasPendingCheckpoint || this.hasUnflushedIndexers() if (!this._interrupting && (isPendingIndexer || this.linearizer.shouldAck(this.localWriter, unflushed))) { try { - await this.append(null) + if (this.localWriter) await this.append(null) } catch (err) { if (!this._interrupting) throw err } @@ -800,7 +804,8 @@ module.exports = class Autobase extends ReadyResource { // if a reset is scheduled await those while (this._queueViewReset && !this._interrupting) await this._bump() - if (this.localWriter === null) { + // we wanna allow acks so interdexers can flush + if (this.localWriter === null || (this.localWriter.isRemoved && value !== null)) { throw new Error('Not writable') } @@ -1024,7 +1029,7 @@ module.exports = class Autobase extends ReadyResource { } async _bootstrapLinearizer () { - const bootstrap = this._makeWriter(this.bootstrap, 0, true) + const bootstrap = this._makeWriter(this.bootstrap, 0, true, false) this.activeWriters.add(bootstrap) this._checkWriters.push(bootstrap) @@ -1079,10 +1084,30 @@ module.exports = class Autobase extends ReadyResource { } } + async _refreshSystemState () { + if (!(await this.system.update())) return + + for (const w of this.activeWriters) { + const data = await this.system.get(w.core.key) + + if (data) { + w.isRemoved = data.isRemoved + w.isIndexer = data.isIndexer + } else { + w.isRemoved = true + w.isIndexer = false + } + + if (!w.isIndexer) { + w.isActiveIndexer = false + } + } + } + async _reindex () { if (this._updates.length) { this._undoAll() - await this.system.update() + await this._refreshSystemState() } const sameIndexers = this.system.sameIndexers(this.linearizer.indexers) @@ -1466,7 +1491,7 @@ module.exports = class Autobase extends ReadyResource { await this._closeAllActiveWriters(false) - await this.system.update() + await this._refreshSystemState() await this._makeLinearizer(this.system) } @@ -1810,7 +1835,7 @@ module.exports = class Autobase extends ReadyResource { else if (migrated) await view.migrateTo(indexers, 0) } - await this.system.update() + await this._refreshSystemState() if (this.localWriter) { if (localLength < 0) this._unsetLocalWriter() @@ -1882,7 +1907,7 @@ module.exports = class Autobase extends ReadyResource { assert(this._applying !== null, 'System changes are only allowed in apply') await this.system.add(key, { isIndexer }) - const writer = (await this._getWriterByKey(key, -1, 0, false, true, null)) || this._makeWriter(key, 0, true) + const writer = (await this._getWriterByKey(key, -1, 0, false, true, null)) || this._makeWriter(key, 0, true, false) await writer.ready() if (!this.activeWriters.has(key)) { @@ -1990,7 +2015,7 @@ module.exports = class Autobase extends ReadyResource { if (u.indexed.length) this._resetAckTick() // make sure the latest changes is reflected on the system... - await this.system.update() + await this._refreshSystemState() // todo: refresh the active writer set in case any were removed diff --git a/lib/system.js b/lib/system.js index 53bca119..0e65ad89 100644 --- a/lib/system.js +++ b/lib/system.js @@ -113,9 +113,10 @@ module.exports = class SystemView extends ReadyResource { async update () { if (this.opened === false) await this.ready() - if (this._fork === this.core.fork && this._length === this.core.length) return + if (this._fork === this.core.fork && this._length === this.core.length) return false await this._reset(await this.db.get('info', { valueEncoding: Info, keyEncoding: DIGEST })) + return true } async _reset (info) { diff --git a/test/basic.js b/test/basic.js index 3a5ec24a..8e23d20d 100644 --- a/test/basic.js +++ b/test/basic.js @@ -1734,6 +1734,61 @@ test.skip('basic - writer adds a writer while being removed', async t => { t.is(await d.view.get(1), 'd1') }) +// todo: this test is hard, probably have to rely on ff to fix +test('basic - removed writer adds a writer while being removed', async t => { + const { bases } = await create(3, t, { apply: applyWithRemove }) + const [a, b, c] = bases + + await addWriterAndSync(a, b, false) + await addWriterAndSync(a, c, false) + + await b.append('b1') + await c.append('c1') + + await confirm([a, b, c]) + + t.is(b.view.indexedLength, 2) + t.is(c.view.indexedLength, 2) + + await a.append({ remove: b4a.toString(c.local.key, 'hex') }) + + await replicateAndSync([a, b, c]) + + t.is(a.view.indexedLength, 2) + t.is(a.view.length, 2) + t.is(a.system.members, 2) + + t.is(b.writable, true) + t.is(c.writable, false) + + await addWriterAndSync(b, c, false) + + t.is(c.writable, true) + + // load c into b.activeWriters + await c.append(null) + + await replicateAndSync([b, c]) + + t.is(b.activeWriters.size, 3) + + for (let i = 0; i < 10; i++) a.append('a' + i) + + await a.append({ remove: b4a.toString(b.local.key, 'hex') }) + + await replicateAndSync([a, b, c]) + + t.is(b.writable, false) + t.is(c.writable, false) + + await t.exception(c.append('not writable')) + + await t.execution(replicateAndSync([a, b, c])) + + t.is(a.view.length, b.view.length) + t.is(a.view.length, c.view.length) +}) + async function applyWithRemove (batch, view, base) { for (const { value } of batch) { if (value.add) { diff --git a/test/helpers/index.js b/test/helpers/index.js index a3993215..9fc5a397 100644 --- a/test/helpers/index.js +++ b/test/helpers/index.js @@ -195,7 +195,7 @@ async function confirm (bases, options = {}) { const writers = bases.filter(b => !!b.localWriter) const maj = options.majority || (Math.floor(writers.length / 2) + 1) for (let j = 0; j < maj; j++) { - if (!writers[j].writable) continue + if (!writers[j].ackable) continue await writers[j].append(null) await helpers.replicateAndSync(bases)