diff --git a/processor/batch-processor/src/errors.ts b/processor/batch-processor/src/errors.ts index f71b7696d..4fd0c527a 100644 --- a/processor/batch-processor/src/errors.ts +++ b/processor/batch-processor/src/errors.ts @@ -1,12 +1,6 @@ import {HashAndHeight} from './database' import {formatHead} from './util' -export class DatabaseNotSupportHotBlocksError extends Error { - constructor() { - super('database does not support hot blocks') - } -} - export class AlreadyIndexedBlockNotFoundError extends Error { constructor(block: HashAndHeight) { super(`already indexed block ${formatHead(block)} was not found on chain`) diff --git a/processor/batch-processor/src/run.test.ts b/processor/batch-processor/src/run.test.ts index ecee532e6..aa8705e52 100644 --- a/processor/batch-processor/src/run.test.ts +++ b/processor/batch-processor/src/run.test.ts @@ -1,11 +1,11 @@ import {AsyncQueue} from '@subsquid/util-internal' import {FiniteRange, getSize} from '@subsquid/util-internal-range' -import assert from 'assert' +import assert, {AssertionError} from 'assert' import expect from 'expect' import {MockedObject, MockInstance, ModuleMocker} from 'jest-mock' import {FinalTxInfo, HashAndHeight, HotDatabaseState, HotTxInfo} from './database' import {DataSource} from './datasource' -import {DatabaseNotSupportHotBlocksError, FinalizedHeadBelowStateError} from './errors' +import {FinalizedHeadBelowStateError} from './errors' import {BlockBase, Processor} from './run' const mock = new ModuleMocker(global) @@ -124,20 +124,13 @@ describe('processor', () => { p = mock.mocked(new Processor(ds, db, batchHandler)) mock.spyOn(ds, 'getBlockStream') - mock.spyOn(db, 'transact') mock.spyOn(p, 'run') - p.run().catch(console.error) + p.run().catch(() => {}) await waitForCallExact(ds.getBlockStream, 1) - }) - it('throw on integrity error', async () => { - await ds.put(header('0x1', 2), [block('0x0', 0), block('0x1', 1), block('0x2', 2)]) - - await waitForResultExact(p.run, 1) - - expect(p.run.mock.results[0].value).rejects.toThrow(new Error()) + mock.spyOn(db, 'transact') }) it('throw on consistency error', async () => { @@ -226,12 +219,13 @@ describe('processor', () => { p = mock.mocked(new Processor(ds, db, batchHandler)) mock.spyOn(ds, 'getBlockStream') - mock.spyOn(db, 'transact') mock.spyOn(p, 'run') - p.run().catch(console.error) + p.run().catch(() => {}) await waitForCallExact(ds.getBlockStream, 1) + + mock.spyOn(db, 'transact') }) it('configure stream', async () => { @@ -319,7 +313,7 @@ describe('processor', () => { await waitForResultExact(p.run, 1) - expect(p.run.mock.results[0].value).rejects.toThrow(new DatabaseNotSupportHotBlocksError()) + expect(p.run.mock.results[0].value).rejects.toThrow(AssertionError) }) }) @@ -330,13 +324,14 @@ describe('processor', () => { p = mock.mocked(new Processor(ds, db, batchHandler)) mock.spyOn(ds, 'getBlockStream') - mock.spyOn(db, 'transact') - mock.spyOn(db, 'transactHot2') mock.spyOn(p, 'run') - p.run().catch(console.error) + p.run().catch(() => {}) await waitForCallExact(ds.getBlockStream, 1) + + mock.spyOn(db, 'transact') + mock.spyOn(db, 'transactHot2') }) it('configure stream', async () => { @@ -484,10 +479,7 @@ describe('processor', () => { { baseHead: expect.objectContaining(header('0x1', 1)), finalizedHead: expect.objectContaining(header('0x1', 1)), - newBlocks: [ - expect.objectContaining(header('0x2a', 2)), - expect.objectContaining(header('0x3a', 3)), - ], + newBlocks: [expect.objectContaining(header('0x2a', 2)), expect.objectContaining(header('0x3a', 3))], }, expect.any(Function) ) diff --git a/processor/batch-processor/src/run.ts b/processor/batch-processor/src/run.ts index edc599253..12a968aa1 100644 --- a/processor/batch-processor/src/run.ts +++ b/processor/batch-processor/src/run.ts @@ -7,7 +7,7 @@ import {DataSource} from './datasource' import {Metrics} from './metrics' import {formatHead, getItemsCount} from './util' import assert from 'assert' -import {AlreadyIndexedBlockNotFoundError, FinalizedHeadBelowStateError, DatabaseNotSupportHotBlocksError} from './errors' +import {AlreadyIndexedBlockNotFoundError, FinalizedHeadBelowStateError} from './errors' const log = createLogger('sqd:batch-processor') @@ -90,6 +90,20 @@ export class Processor { async run(): Promise { let state = await this.getDatabaseState() + + // remove all hot block to start from the finalized head + state = {...state, top: []} + if (this.db.supportsHotBlocks) { + await this.db.transactHot( + { + finalizedHead: state, + baseHead: state, + newBlocks: [], + }, + async () => {} + ) + } + if (state.height >= 0) { log.info(`last processed final block was ${state.height}`) } @@ -173,52 +187,35 @@ export class Processor { if (prevState.height === blocks[0].header.height && prevState.hash !== blocks[0].header.hash) { throw new Error() } - - let prevHead: HashAndHeight = maybeLast(prevState.top) || prevState - let top: HashAndHeight[] = [] - let baseHead: HashAndHeight = prevState + let lastBlock = last(blocks).header + let nextState: HotDatabaseState = + lastBlock.height < finalizedHead.height + ? toDatabaseState(lastBlock) + : toDatabaseState(finalizedHead) + + let baseHead = prevState as HashAndHeight for (let block of prevState.top) { - if (block.height >= blocks[0].header.height) { - break - } + if (block.height > blocks[0].header.height) break + if (block.height === blocks[0].header.height && block.hash !== blocks[0].header.hash) break baseHead = block + if (block.height <= nextState.height) continue + nextState.top.push(block) + } - if (block.height < finalizedHead.height) continue - if (block.height === finalizedHead.height) { - if (block.hash !== finalizedHead.hash) { - throw new Error() - } - continue - } - - top.push(block) + for (let {header: block} of blocks) { + if (block.height <= finalizedHead.height) continue + nextState.top.push(block) } - let nextHead: HashAndHeight - if (last(blocks).header.height >= finalizedHead.height) { - for (let {header: block} of blocks) { - if (block.height < finalizedHead.height) continue - if (block.height === finalizedHead.height) { - if (block.hash !== finalizedHead.hash) { - throw new Error() - } - continue - } - - top.push(block) - } + let prevHead = maybeLast(prevState.top) || prevState + let nextHead = maybeLast(nextState.top) || nextState - nextHead = maybeLast(top) || finalizedHead - } else { - nextHead = last(blocks).header - } if (baseHead.hash !== prevHead.hash) { log.info(`navigating a fork between ${formatHead(prevHead)} to ${formatHead(nextHead)} with a common base ${formatHead(baseHead)}`) } - let nextState: HotDatabaseState - if (top.length === 0 && baseHead.height === prevState.height && nextHead.height <= finalizedHead.height) { + if (nextHead.height === nextState.height && prevHead.height === prevState.height) { await this.db.transact( { prevHead, @@ -232,16 +229,8 @@ export class Processor { }) } ) - - nextState = { - height: nextHead.height, - hash: nextHead.hash, - top: [], - } } else { - if (!this.db.supportsHotBlocks) { - throw new DatabaseNotSupportHotBlocksError() - } + assert(this.db.supportsHotBlocks, 'database does not support hot blocks') let info: HotTxInfo = { finalizedHead, @@ -270,16 +259,10 @@ export class Processor { return this.handler({ store, blocks: [block], - isHead: nextState.height === ref.height, + isHead: nextHead.height === ref.height, }) }) } - - nextState = { - height: finalizedHead.height, - hash: finalizedHead.hash, - top, - } } return nextState @@ -320,3 +303,11 @@ export class Processor { } } } + +function toDatabaseState(block: HashAndHeight): HotDatabaseState { + return { + height: block.height, + hash: block.hash, + top: [], + } +} \ No newline at end of file