Skip to content

Commit

Permalink
Refresh writers on time travel (#174)
Browse files Browse the repository at this point in the history
* refresh writers when system updates

* add chris test

* allow acks when not writable still

* getwriterbykey already resets isRemoved
  • Loading branch information
mafintosh authored Aug 23, 2024
1 parent f12a04a commit 398ba6b
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 11 deletions.
43 changes: 34 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,11 @@ module.exports = class Autobase extends ReadyResource {
}

get writable () {
return this.localWriter !== null
return this.localWriter !== null && !this.localWriter.isRemoved
}

get ackable () {
return this.localWriter !== null // prop should add .isIndexer but keeping it simple for now
}

get key () {
Expand Down Expand Up @@ -779,7 +783,7 @@ module.exports = class Autobase extends ReadyResource {
const unflushed = this._hasPendingCheckpoint || this.hasUnflushedIndexers()
if (!this._interrupting && (isPendingIndexer || this.linearizer.shouldAck(this.localWriter, unflushed))) {
try {
await this.append(null)
if (this.localWriter) await this.append(null)
} catch (err) {
if (!this._interrupting) throw err
}
Expand All @@ -800,7 +804,8 @@ module.exports = class Autobase extends ReadyResource {
// if a reset is scheduled await those
while (this._queueViewReset && !this._interrupting) await this._bump()

if (this.localWriter === null) {
// we wanna allow acks so interdexers can flush
if (this.localWriter === null || (this.localWriter.isRemoved && value !== null)) {
throw new Error('Not writable')
}

Expand Down Expand Up @@ -1024,7 +1029,7 @@ module.exports = class Autobase extends ReadyResource {
}

async _bootstrapLinearizer () {
const bootstrap = this._makeWriter(this.bootstrap, 0, true)
const bootstrap = this._makeWriter(this.bootstrap, 0, true, false)

this.activeWriters.add(bootstrap)
this._checkWriters.push(bootstrap)
Expand Down Expand Up @@ -1079,10 +1084,30 @@ module.exports = class Autobase extends ReadyResource {
}
}

async _refreshSystemState () {
if (!(await this.system.update())) return

for (const w of this.activeWriters) {
const data = await this.system.get(w.core.key)

if (data) {
w.isRemoved = data.isRemoved
w.isIndexer = data.isIndexer
} else {
w.isRemoved = true
w.isIndexer = false
}

if (!w.isIndexer) {
w.isActiveIndexer = false
}
}
}

async _reindex () {
if (this._updates.length) {
this._undoAll()
await this.system.update()
await this._refreshSystemState()
}

const sameIndexers = this.system.sameIndexers(this.linearizer.indexers)
Expand Down Expand Up @@ -1466,7 +1491,7 @@ module.exports = class Autobase extends ReadyResource {

await this._closeAllActiveWriters(false)

await this.system.update()
await this._refreshSystemState()
await this._makeLinearizer(this.system)
}

Expand Down Expand Up @@ -1810,7 +1835,7 @@ module.exports = class Autobase extends ReadyResource {
else if (migrated) await view.migrateTo(indexers, 0)
}

await this.system.update()
await this._refreshSystemState()

if (this.localWriter) {
if (localLength < 0) this._unsetLocalWriter()
Expand Down Expand Up @@ -1882,7 +1907,7 @@ module.exports = class Autobase extends ReadyResource {
assert(this._applying !== null, 'System changes are only allowed in apply')
await this.system.add(key, { isIndexer })

const writer = (await this._getWriterByKey(key, -1, 0, false, true, null)) || this._makeWriter(key, 0, true)
const writer = (await this._getWriterByKey(key, -1, 0, false, true, null)) || this._makeWriter(key, 0, true, false)
await writer.ready()

if (!this.activeWriters.has(key)) {
Expand Down Expand Up @@ -1990,7 +2015,7 @@ module.exports = class Autobase extends ReadyResource {
if (u.indexed.length) this._resetAckTick()

// make sure the latest changes is reflected on the system...
await this.system.update()
await this._refreshSystemState()

// todo: refresh the active writer set in case any were removed

Expand Down
3 changes: 2 additions & 1 deletion lib/system.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ module.exports = class SystemView extends ReadyResource {

async update () {
if (this.opened === false) await this.ready()
if (this._fork === this.core.fork && this._length === this.core.length) return
if (this._fork === this.core.fork && this._length === this.core.length) return false

await this._reset(await this.db.get('info', { valueEncoding: Info, keyEncoding: DIGEST }))
return true
}

async _reset (info) {
Expand Down
55 changes: 55 additions & 0 deletions test/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,61 @@ test.skip('basic - writer adds a writer while being removed', async t => {
t.is(await d.view.get(1), 'd1')
})

// todo: this test is hard, probably have to rely on ff to fix
test('basic - removed writer adds a writer while being removed', async t => {
const { bases } = await create(3, t, { apply: applyWithRemove })
const [a, b, c] = bases

await addWriterAndSync(a, b, false)
await addWriterAndSync(a, c, false)

await b.append('b1')
await c.append('c1')

await confirm([a, b, c])

t.is(b.view.indexedLength, 2)
t.is(c.view.indexedLength, 2)

await a.append({ remove: b4a.toString(c.local.key, 'hex') })

await replicateAndSync([a, b, c])

t.is(a.view.indexedLength, 2)
t.is(a.view.length, 2)
t.is(a.system.members, 2)

t.is(b.writable, true)
t.is(c.writable, false)

await addWriterAndSync(b, c, false)

t.is(c.writable, true)

// load c into b.activeWriters
await c.append(null)

await replicateAndSync([b, c])

t.is(b.activeWriters.size, 3)

for (let i = 0; i < 10; i++) a.append('a' + i)

await a.append({ remove: b4a.toString(b.local.key, 'hex') })

await replicateAndSync([a, b, c])

t.is(b.writable, false)
t.is(c.writable, false)

await t.exception(c.append('not writable'))

await t.execution(replicateAndSync([a, b, c]))

t.is(a.view.length, b.view.length)
t.is(a.view.length, c.view.length)
})

async function applyWithRemove (batch, view, base) {
for (const { value } of batch) {
if (value.add) {
Expand Down
2 changes: 1 addition & 1 deletion test/helpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async function confirm (bases, options = {}) {
const writers = bases.filter(b => !!b.localWriter)
const maj = options.majority || (Math.floor(writers.length / 2) + 1)
for (let j = 0; j < maj; j++) {
if (!writers[j].writable) continue
if (!writers[j].ackable) continue

await writers[j].append(null)
await helpers.replicateAndSync(bases)
Expand Down

0 comments on commit 398ba6b

Please sign in to comment.