diff --git a/packages/cli/CHANGELOG.md b/packages/cli/CHANGELOG.md index 99526188b2..111e97b3be 100644 --- a/packages/cli/CHANGELOG.md +++ b/packages/cli/CHANGELOG.md @@ -5,9 +5,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] - ### Added -- Add some constants for Starknet +- Add some constants for Starknet + +### Changed +- Added internal note relating to some tests ## [5.4.0] - 2024-12-11 ### Changed diff --git a/packages/cli/src/controller/codegen-controller.test.ts b/packages/cli/src/controller/codegen-controller.test.ts index 696c81103a..3626b19928 100644 --- a/packages/cli/src/controller/codegen-controller.test.ts +++ b/packages/cli/src/controller/codegen-controller.test.ts @@ -8,30 +8,28 @@ import {codegen} from './codegen-controller'; jest.setTimeout(30000); +const projectPath = path.join(__dirname, '../../test/schemaTest'); + describe('Codegen can generate schema', () => { afterEach(async () => { await rimraf(path.join(__dirname, '../../test/schemaTest/src')); }); it('codegen with correct schema should pass', async () => { - const projectPath = path.join(__dirname, '../../test/schemaTest'); await expect(codegen(projectPath)).resolves.not.toThrow(); }); it('codegen with incorrect schema field should fail', async () => { - const projectPath = path.join(__dirname, '../../test/schemaTest'); await expect(codegen(projectPath, ['project-bad-schema.yaml'])).rejects.toThrow(/is not an valid type/); }); it('codegen with entities that uses reserved names should throw', async () => { - const projectPath = path.join(__dirname, '../../test/schemaTest'); await expect(codegen(projectPath, ['project-bad-entity.yaml'])).rejects.toThrow( 'EntityName: exampleEntityFilter cannot end with reservedKey: filter' ); }); it('Codegen should be able to generate ABIs from template datasources', async () => { - const projectPath = path.join(__dirname, '../../test/schemaTest'); await codegen(projectPath, ['project-templates-abi.yaml']); await expect( fs.promises.readFile(`${projectPath}/src/types/abi-interfaces/Erc721.ts`, 'utf8') @@ -39,12 +37,10 @@ describe('Codegen can generate schema', () => { }); it('Should not fail, if ds does not have any assets', async () => { - const projectPath = path.join(__dirname, '../../test/schemaTest'); await expect(codegen(projectPath, ['project-no-assets.yaml'])).resolves.not.toThrow(); }); it('Codegen should be able to generate ABIs from customName datasources', async () => { - const projectPath = path.join(__dirname, '../../test/schemaTest'); await codegen(projectPath); await expect( fs.promises.readFile(`${projectPath}/src/types/abi-interfaces/Erc721.ts`, 'utf8') @@ -52,16 +48,14 @@ describe('Codegen can generate schema', () => { }); it('Should clean out existing types directory', async () => { - const projectPath = path.join(__dirname, '../../test/schemaTest'); await codegen(projectPath); await codegen(projectPath, ['project-no-abi.yaml']); // should not contain abi directory - await expect(fs.promises.readFile(`${projectPath}/src/types/abi-interfaces/Erc721.ts`, 'utf8')).rejects.toThrow(); + await expect(fs.promises.readFile(`${projectPath}/src/types/abi-interfaces/erc721.ts`, 'utf8')).rejects.toThrow(); }); it('should generate contracts on different glob paths', async () => { - const projectPath = path.join(__dirname, '../../test/schemaTest'); await codegen(projectPath, ['typechain-test.yaml']); await expect( @@ -78,19 +72,16 @@ describe('Codegen can generate schema', () => { }); it('Should not generate ABI for non evm ds', async () => { - const projectPath = path.join(__dirname, '../../test/schemaTest'); await codegen(projectPath, ['non-evm-project.yaml']); expect(fs.existsSync(`${projectPath}/src/types/abi-interfaces/`)).toBeFalsy(); }); it('Should not generate proto-interfaces if no chaintypes are provided', async () => { - const projectPath = path.join(__dirname, '../../test/schemaTest'); await codegen(projectPath, ['project-cosmos.yaml']); expect(fs.existsSync(`${projectPath}/src/types/proto-interfaces/`)).toBeFalsy(); }); it('Should dedupe enums', async () => { - const projectPath = path.join(__dirname, '../../test/schemaTest'); await codegen(projectPath, ['project-duplicate-enum.yaml']); const fooFile = await fs.promises.readFile(`${projectPath}/src/types/models/Foo.ts`, 'utf8'); @@ -104,7 +95,6 @@ describe('Codegen can generate schema', () => { // github issue #2211 it('codegen file should import model files with correct case-sensitive names', async () => { - const projectPath = path.join(__dirname, '../../test/schemaTest'); await codegen(projectPath, ['project-case-sensitive-import-entity.yaml']); const codegenFile = await fs.promises.readFile(`${projectPath}/src/types/models/index.ts`, 'utf8'); diff --git a/packages/cli/src/controller/codegen-controller.ts b/packages/cli/src/controller/codegen-controller.ts index 0fe7793bd5..4bd880eb2c 100644 --- a/packages/cli/src/controller/codegen-controller.ts +++ b/packages/cli/src/controller/codegen-controller.ts @@ -246,7 +246,9 @@ export async function codegen(projectPath: string, fileNames: string[] = [DEFAUL datasources as CosmosRuntimeDatasource[] ); } + // TODO what about custom datasource processors, e.g. FrontierEvmProcessor, EthermintProcessor const ethManifests = plainManifests.filter((m) => m.networkFamily === NETWORK_FAMILY.ethereum); + // Todo, starknet codegen not supported yet const starknetManifests = plainManifests.filter((m) => m.networkFamily === NETWORK_FAMILY.starknet); diff --git a/packages/cli/test/schemaTest/project.yaml b/packages/cli/test/schemaTest/project.yaml index 22efa5c2d1..eddc6822a9 100644 --- a/packages/cli/test/schemaTest/project.yaml +++ b/packages/cli/test/schemaTest/project.yaml @@ -5,7 +5,7 @@ schema: file: './schema.graphql' runner: node: - name: '@subql/node' + name: '@subql/node-ethereum' version: '>=3.0.1' query: name: '@subql/query' diff --git a/packages/node-core/CHANGELOG.md b/packages/node-core/CHANGELOG.md index 7219368704..44ac6e91af 100644 --- a/packages/node-core/CHANGELOG.md +++ b/packages/node-core/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fixed the inconsistency between the `monitor-file-size` flag and the expected behavior.(#2644) - Improved block range validation in POI endpoint with custom class-validator decorator - When setting a smaller batch size and the current processing height reaches latestFinalizedHeight, it causes the program to exit unexpectedly. +- Store flush interval having a chance of trying to flush with invalid metadata (#2650) ## [16.1.0] - 2024-12-11 ### Changed @@ -153,7 +154,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [10.10.2] - 2024-07-10 ### Fixed -- Fix issue admin api can not get `dbSize` due to it not been set in _metadata table +- Fix issue admin api can not get `dbSize` due to it not been set in \_metadata table ## [10.10.1] - 2024-07-09 ### Added @@ -618,7 +619,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Base58 encoding check for POI (#1788) -- Fix project _startHeight been blocked by poiSync (#1792) +- Fix project \_startHeight been blocked by poiSync (#1792) ## [2.4.4] - 2023-06-07 ### Fixed diff --git a/packages/node-core/src/configure/ProjectUpgrade.service.spec.ts b/packages/node-core/src/configure/ProjectUpgrade.service.spec.ts index 6921b6cf39..92a03b7fd5 100644 --- a/packages/node-core/src/configure/ProjectUpgrade.service.spec.ts +++ b/packages/node-core/src/configure/ProjectUpgrade.service.spec.ts @@ -1,7 +1,6 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import {SchedulerRegistry} from '@nestjs/schedule'; import {Sequelize} from '@subql/x-sequelize'; import {CacheMetadataModel, IStoreModelProvider, ISubqueryProject, StoreCacheService, StoreService} from '../indexer'; import {NodeConfig} from './NodeConfig'; @@ -292,7 +291,7 @@ describe('Project Upgrades', () => { let storeCache: IStoreModelProvider; beforeEach(async () => { - storeCache = new StoreCacheService({} as any, {} as any, {} as any, new SchedulerRegistry()); + storeCache = new StoreCacheService({} as any, {} as any, {} as any); // eslint-disable-next-line @typescript-eslint/dot-notation (storeCache as any).cachedModels['_metadata'] = mockMetadata(); diff --git a/packages/node-core/src/db/migration-service/SchemaMigration.service.test.ts b/packages/node-core/src/db/migration-service/SchemaMigration.service.test.ts index f3952976a4..cef14d1853 100644 --- a/packages/node-core/src/db/migration-service/SchemaMigration.service.test.ts +++ b/packages/node-core/src/db/migration-service/SchemaMigration.service.test.ts @@ -4,7 +4,6 @@ import {readFileSync} from 'fs'; import * as path from 'path'; import {EventEmitter2} from '@nestjs/event-emitter'; -import {SchedulerRegistry} from '@nestjs/schedule'; import {buildSchemaFromString} from '@subql/utils'; import {IndexesOptions, QueryTypes, Sequelize} from '@subql/x-sequelize'; import {GraphQLSchema} from 'graphql'; @@ -37,7 +36,7 @@ async function setup( }, } as unknown as ISubqueryProject; - const storeCache = new StoreCacheService(sequelize, config, new EventEmitter2(), new SchedulerRegistry()); + const storeCache = new StoreCacheService(sequelize, config, new EventEmitter2()); const storeService = new StoreService(sequelize, config, storeCache, project); diff --git a/packages/node-core/src/indexer/core.module.ts b/packages/node-core/src/indexer/core.module.ts index f5a3cbcb29..287a461d6e 100644 --- a/packages/node-core/src/indexer/core.module.ts +++ b/packages/node-core/src/indexer/core.module.ts @@ -3,7 +3,6 @@ import {Module} from '@nestjs/common'; import {EventEmitter2} from '@nestjs/event-emitter'; -import {SchedulerRegistry} from '@nestjs/schedule'; import {Sequelize} from '@subql/x-sequelize'; import {AdminController, AdminListener} from '../admin/admin.controller'; import {NodeConfig} from '../configure'; @@ -32,7 +31,7 @@ import {storeModelFactory} from './storeModelProvider'; { provide: 'IStoreModelProvider', useFactory: storeModelFactory, - inject: [NodeConfig, EventEmitter2, SchedulerRegistry, Sequelize], + inject: [NodeConfig, EventEmitter2, Sequelize], }, AdminListener, ], diff --git a/packages/node-core/src/indexer/poi/poi.service.spec.ts b/packages/node-core/src/indexer/poi/poi.service.spec.ts index da629939fe..da209b2e43 100644 --- a/packages/node-core/src/indexer/poi/poi.service.spec.ts +++ b/packages/node-core/src/indexer/poi/poi.service.spec.ts @@ -2,14 +2,12 @@ // SPDX-License-Identifier: GPL-3.0 import {EventEmitter2} from '@nestjs/event-emitter'; -import {SchedulerRegistry} from '@nestjs/schedule'; import {Test, TestingModule} from '@nestjs/testing'; import {delay} from '@subql/common'; import {Sequelize, Transaction} from '@subql/x-sequelize'; import {NodeConfig} from '../../configure'; import {ProofOfIndex} from '../entities/Poi.entity'; import {StoreCacheService} from '../storeModelProvider'; -import {METADATA_ENTITY_NAME} from '../storeModelProvider/metadata/utils'; import {PoiService} from './poi.service'; jest.mock('@subql/x-sequelize', () => { @@ -62,7 +60,7 @@ describe('PoiService', () => { } as unknown as NodeConfig; const sequelize = new Sequelize(); - storeCache = new StoreCacheService(sequelize, nodeConfig, new EventEmitter2(), new SchedulerRegistry()); + storeCache = new StoreCacheService(sequelize, nodeConfig, new EventEmitter2()); storeCache.init('height', {} as any, {} as any); (storeCache as any).cachedModels._metadata = { diff --git a/packages/node-core/src/indexer/store.service.test.ts b/packages/node-core/src/indexer/store.service.test.ts index 7ca16aeab5..68674d3fb0 100644 --- a/packages/node-core/src/indexer/store.service.test.ts +++ b/packages/node-core/src/indexer/store.service.test.ts @@ -1,13 +1,13 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { buildSchemaFromString } from '@subql/utils'; -import { Sequelize, QueryTypes } from '@subql/x-sequelize'; -import { NodeConfig } from '../configure'; -import { DbOption } from '../db'; -import { StoreService } from './store.service'; -import { CachedModel, PlainStoreModelService, StoreCacheService } from './storeModelProvider'; +import {EventEmitter2} from '@nestjs/event-emitter'; +import {buildSchemaFromString} from '@subql/utils'; +import {Sequelize, QueryTypes} from '@subql/x-sequelize'; +import {NodeConfig} from '../configure'; +import {DbOption} from '../db'; +import {StoreService} from './store.service'; +import {CachedModel, PlainStoreModelService, StoreCacheService} from './storeModelProvider'; const option: DbOption = { host: process.env.DB_HOST ?? '127.0.0.1', port: process.env.DB_PORT ? Number(process.env.DB_PORT) : 5432, @@ -38,8 +38,8 @@ describe('Check whether the db store and cache store are consistent.', () => { await sequelize.authenticate(); await sequelize.query(`CREATE SCHEMA ${testSchemaName};`); - const nodeConfig = new NodeConfig({ subquery: 'test', proofOfIndex: true, enableCache: false }); - const project = { network: { chainId: '1' }, schema } as any; + const nodeConfig = new NodeConfig({subquery: 'test', proofOfIndex: true, enableCache: false}); + const project = {network: {chainId: '1'}, schema} as any; const dbModel = new PlainStoreModelService(sequelize, nodeConfig); storeService = new StoreService(sequelize, nodeConfig, dbModel, project); await storeService.initCoreTables(testSchemaName); @@ -57,9 +57,9 @@ describe('Check whether the db store and cache store are consistent.', () => { }); it('Same block, Execute the set method multiple times.', async () => { - await storeService.setBlockHeader({ blockHeight: 1, blockHash: '0x01', parentHash: '0x00' }); + await storeService.setBlockHeader({blockHeight: 1, blockHash: '0x01', parentHash: '0x00'}); - const accountEntity = { id: 'block-001', balance: 100 }; + const accountEntity = {id: 'block-001', balance: 100}; // account not exist. let account = await storeService.getStore().get('Account', accountEntity.id); @@ -72,21 +72,21 @@ describe('Check whether the db store and cache store are consistent.', () => { // block range check. const [dbData] = await sequelize.query(`SELECT * FROM "${testSchemaName}"."accounts" WHERE id = :id`, { - replacements: { id: accountEntity.id }, + replacements: {id: accountEntity.id}, type: QueryTypes.SELECT, transaction: storeService.transaction, }); expect(dbData._block_range).toEqual([ - { value: '1', inclusive: true }, - { value: null, inclusive: false }, + {value: '1', inclusive: true}, + {value: null, inclusive: false}, ]); // update account success. - const account001 = { id: 'block-001', balance: 10000 }; + const account001 = {id: 'block-001', balance: 10000}; await storeService.getStore().set('Account', account001.id, account001 as any); const account001After = await storeService.getStore().get('Account', account001.id); expect(account001After).toEqual(account001); - console.log({ accountAfter: account001After, accountEntityAfter: account001 }); + console.log({accountAfter: account001After, accountEntityAfter: account001}); // only one record in db and block range check. const allDatas = await sequelize.query(`SELECT * FROM "${testSchemaName}"."accounts"`, { @@ -95,11 +95,11 @@ describe('Check whether the db store and cache store are consistent.', () => { }); expect(allDatas).toHaveLength(1); expect(allDatas[0]._block_range).toEqual([ - { value: '1', inclusive: true }, - { value: null, inclusive: false }, + {value: '1', inclusive: true}, + {value: null, inclusive: false}, ]); - const account002 = { id: 'block-002', balance: 100 }; + const account002 = {id: 'block-002', balance: 100}; await storeService.getStore().bulkCreate('Account', [account002, account001]); const account002After = await storeService.getStore().get('Account', account002.id); expect(account002After).toEqual(account002); @@ -111,20 +111,20 @@ describe('Check whether the db store and cache store are consistent.', () => { }); expect(allDatas2).toHaveLength(2); expect(allDatas2[0]._block_range).toEqual([ - { value: '1', inclusive: true }, - { value: null, inclusive: false }, + {value: '1', inclusive: true}, + {value: null, inclusive: false}, ]); expect(allDatas2[1]._block_range).toEqual([ - { value: '1', inclusive: true }, - { value: null, inclusive: false }, + {value: '1', inclusive: true}, + {value: null, inclusive: false}, ]); }, 30000); it('_block_range update check', async () => { - await storeService.setBlockHeader({ blockHeight: 1000, blockHash: '0x1000', parentHash: '0x0999' }); + await storeService.setBlockHeader({blockHeight: 1000, blockHash: '0x1000', parentHash: '0x0999'}); // insert new account. - const account1000Data = { id: 'block-1000', balance: 999 }; + const account1000Data = {id: 'block-1000', balance: 999}; await storeService.getStore().set('Account', account1000Data.id, account1000Data as any); const account1000 = await storeService.getStore().get('Account', account1000Data.id); expect(account1000).toEqual(account1000Data); @@ -136,7 +136,7 @@ describe('Check whether the db store and cache store are consistent.', () => { expect(allDatas).toHaveLength(3); // set old account. - const account002 = { id: 'block-002', balance: 222222 }; + const account002 = {id: 'block-002', balance: 222222}; await storeService.getStore().set('Account', account002.id, account002 as any); const account002After = await storeService.getStore().get('Account', account002.id); expect(account002After).toEqual(account002); @@ -153,12 +153,12 @@ describe('Check whether the db store and cache store are consistent.', () => { expect(account002Datas).toHaveLength(2); expect(account002Datas.map((v) => v._block_range).sort((a, b) => b[0].value - a[0].value)).toEqual([ [ - { value: '1000', inclusive: true }, - { value: null, inclusive: false }, + {value: '1000', inclusive: true}, + {value: null, inclusive: false}, ], [ - { value: '1', inclusive: true }, - { value: '1000', inclusive: false }, + {value: '1', inclusive: true}, + {value: '1000', inclusive: false}, ], ]); }, 100000); @@ -187,8 +187,8 @@ describe('Cache Provider', () => { storeCacheUpperLimit: 1, storeFlushInterval: 0, }); - const project = { network: { chainId: '1' }, schema } as any; - cacheModel = new StoreCacheService(sequelize, nodeConfig, new EventEmitter2(), null as any); + const project = {network: {chainId: '1'}, schema} as any; + cacheModel = new StoreCacheService(sequelize, nodeConfig, new EventEmitter2()); storeService = new StoreService(sequelize, nodeConfig, cacheModel, project); await storeService.initCoreTables(testSchemaName); await storeService.init(testSchemaName); @@ -204,7 +204,7 @@ describe('Cache Provider', () => { tx.afterCommit(() => { Account.clear(blockHeight); }); - await storeService.setBlockHeader({ blockHeight, blockHash: `0x${blockHeight}`, parentHash: `0x${blockHeight - 1}` }); + await storeService.setBlockHeader({blockHeight, blockHash: `0x${blockHeight}`, parentHash: `0x${blockHeight - 1}`}); await handle(blockHeight); await Account.runFlush(tx, blockHeight); await tx.commit(); @@ -216,7 +216,7 @@ describe('Cache Provider', () => { type: QueryTypes.SELECT, }); - const accountEntity1 = { id: 'accountEntity-001', balance: 100 }; + const accountEntity1 = {id: 'accountEntity-001', balance: 100}; await cacheFlush(1, async (blockHeight) => { await Account.set(accountEntity1.id, accountEntity1, blockHeight); }); @@ -226,7 +226,7 @@ describe('Cache Provider', () => { expect(allDatas).toHaveLength(1); // next block 999 - const accountEntity2 = { id: 'accountEntity-002', balance: 9999 }; + const accountEntity2 = {id: 'accountEntity-002', balance: 9999}; await cacheFlush(999, async (blockHeight) => { await Account.remove(accountEntity1.id, blockHeight); const oldAccunt = await Account.get(accountEntity1.id); @@ -251,7 +251,7 @@ describe('Cache Provider', () => { oldAccunt2 = await Account.get(accountEntity2.id); expect(oldAccunt2).toBeUndefined(); - await Account.set(accountEntity2.id, { id: 'accountEntity-002', balance: 999999 } as any, blockHeight); + await Account.set(accountEntity2.id, {id: 'accountEntity-002', balance: 999999} as any, blockHeight); oldAccunt2 = await Account.get(accountEntity2.id); expect(oldAccunt2.balance).toEqual(999999); }); diff --git a/packages/node-core/src/indexer/storeModelProvider/storeCache.service.spec.ts b/packages/node-core/src/indexer/storeModelProvider/storeCache.service.spec.ts index ad6461b353..c83bdb7390 100644 --- a/packages/node-core/src/indexer/storeModelProvider/storeCache.service.spec.ts +++ b/packages/node-core/src/indexer/storeModelProvider/storeCache.service.spec.ts @@ -2,7 +2,6 @@ // SPDX-License-Identifier: GPL-3.0 import {EventEmitter2} from '@nestjs/event-emitter'; -import {SchedulerRegistry} from '@nestjs/schedule'; import {Sequelize} from '@subql/x-sequelize'; import {NodeConfig} from '../../configure'; import {delay} from '../../utils'; @@ -43,11 +42,18 @@ jest.mock('@subql/x-sequelize', () => { destroy: jest.fn(), }), sync: jest.fn(), - transaction: () => ({ - commit: jest.fn(() => delay(1)), // Delay of 1s is used to test whether we wait for cache to flush - rollback: jest.fn(), - afterCommit: jest.fn(), - }), + transaction: () => { + let afterCommits: Array<() => void> = []; + return { + commit: jest.fn(() => { + afterCommits.forEach((fn) => fn()); + afterCommits = []; + return delay(1); + }), // Delay of 1s is used to test whether we wait for cache to flush + rollback: jest.fn(), + afterCommit: (fn: () => void) => afterCommits.push(fn), + }; + }, // createSchema: jest.fn(), }; const actualSequelize = jest.requireActual('@subql/x-sequelize'); @@ -66,7 +72,7 @@ describe('Store Cache Service historical', () => { const nodeConfig: NodeConfig = {} as any; beforeEach(() => { - storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter, new SchedulerRegistry()); + storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter); }); it('could init store cache service and init cache for models', () => { @@ -152,7 +158,7 @@ describe('Store Cache flush with order', () => { const nodeConfig: NodeConfig = {} as any; beforeEach(() => { - storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter, new SchedulerRegistry()); + storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter); storeCacheService.init(false, {} as any, undefined); }); @@ -189,7 +195,7 @@ describe('Store Cache flush with non-historical', () => { const nodeConfig: NodeConfig = {disableHistorical: true} as any; beforeEach(() => { - storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter, new SchedulerRegistry()); + storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter); storeCacheService.init(false, {} as any, undefined); }); @@ -247,7 +253,7 @@ describe('Store cache upper threshold', () => { } as NodeConfig; beforeEach(() => { - storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter, new SchedulerRegistry()); + storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter); storeCacheService.init(false, {findByPk: () => Promise.resolve({toJSON: () => 1})} as any, undefined); }); @@ -303,7 +309,7 @@ describe('Store cache with exporters', () => { const nodeConfig = {} as NodeConfig; beforeEach(() => { - storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter, new SchedulerRegistry()); + storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter); storeCacheService.init(false, {findByPk: () => Promise.resolve({toJSON: () => 1})} as any, undefined); }); @@ -334,3 +340,40 @@ describe('Store cache with exporters', () => { await expect(() => storeCacheService.flushData(true)).rejects.toThrow('Cant export'); }); }); + +describe('Store cache with flush interval', () => { + let storeCacheService: StoreCacheService; + + const sequelize = new Sequelize(); + const nodeConfig = {storeFlushInterval: 2} as NodeConfig; + + beforeEach(() => { + storeCacheService = new StoreCacheService(sequelize, nodeConfig, eventEmitter); + storeCacheService.init(false, {findByPk: () => Promise.resolve({toJSON: () => 1})} as any, undefined); + }); + + it('force flushes at an interval', async () => { + // Setup initial data that is flushed so we have a last flushed time + const entity1Model = storeCacheService.getModel('entity1'); + await entity1Model.set( + 'entity1_id_0x01', + { + id: 'entity1_id_0x01', + field1: 'set at block 1', + }, + 1 + ); + await storeCacheService.flushData(true); + + const flushSpy = jest.spyOn(storeCacheService, 'flushData'); + + let x = 0; + while (x <= 12) { + await storeCacheService.applyPendingChanges(x, false); + await delay(0.1); + x++; + } + + expect(flushSpy).toHaveBeenCalledWith(true); + }); +}); diff --git a/packages/node-core/src/indexer/storeModelProvider/storeCache.service.ts b/packages/node-core/src/indexer/storeModelProvider/storeCache.service.ts index 46bcd7426b..85a9895dc2 100644 --- a/packages/node-core/src/indexer/storeModelProvider/storeCache.service.ts +++ b/packages/node-core/src/indexer/storeModelProvider/storeCache.service.ts @@ -4,7 +4,6 @@ import assert from 'assert'; import {Injectable} from '@nestjs/common'; import {EventEmitter2} from '@nestjs/event-emitter'; -import {SchedulerRegistry} from '@nestjs/schedule'; import {DatabaseError, Deferrable, Sequelize} from '@subql/x-sequelize'; import {sum} from 'lodash'; import {NodeConfig} from '../../configure'; @@ -12,8 +11,6 @@ import {IndexerEvent} from '../../events'; import {getLogger} from '../../logger'; import {exitWithError} from '../../process'; import {profiler} from '../../profiler'; -import {MetadataRepo, PoiRepo} from '../entities'; -import {HistoricalMode} from '../types'; import {BaseCacheService} from './baseCache.service'; import {CsvExporter, Exporter} from './exporters'; import {CacheMetadataModel} from './metadata'; @@ -30,11 +27,12 @@ export class StoreCacheService extends BaseCacheService implements IStoreModelPr private readonly cacheUpperLimit: number; private _storeOperationIndex = 0; + #lastFlushed: Date | undefined; + constructor( private sequelize: Sequelize, private config: NodeConfig, - protected eventEmitter: EventEmitter2, - private schedulerRegistry: SchedulerRegistry + protected eventEmitter: EventEmitter2 ) { super('StoreCache'); this.storeCacheThreshold = config.storeCacheThreshold; @@ -45,28 +43,6 @@ export class StoreCacheService extends BaseCacheService implements IStoreModelPr } } - init(historical: HistoricalMode, meta: MetadataRepo, poi?: PoiRepo): void { - super.init(historical, meta, poi); - - if (this.config.storeFlushInterval > 0) { - this.schedulerRegistry.addInterval( - 'storeFlushInterval', - setInterval(() => { - this.flushData(true).catch((e) => logger.warn(`storeFlushInterval failed ${e.message}`)); - }, this.config.storeFlushInterval * 1000) - ); - } - } - - async beforeApplicationShutdown(): Promise { - try { - this.schedulerRegistry.deleteInterval('storeFlushInterval'); - } catch (e) { - /* Do nothing, an interval might not have been created */ - } - await super.beforeApplicationShutdown(); - } - getNextStoreOperationIndex(): number { this._storeOperationIndex += 1; return this._storeOperationIndex; @@ -117,6 +93,7 @@ export class StoreCacheService extends BaseCacheService implements IStoreModelPr @profiler() async _flushCache(): Promise { + this.#lastFlushed = new Date(); this.logger.debug('Flushing cache'); // With historical disabled we defer the constraints check so that it doesn't matter what order entities are modified const tx = await this.sequelize.transaction({ @@ -171,19 +148,24 @@ export class StoreCacheService extends BaseCacheService implements IStoreModelPr } async applyPendingChanges(height: number, dataSourcesCompleted: boolean): Promise { + const force = + this.#lastFlushed && + this.config.storeFlushInterval > 0 && + Date.now() >= this.#lastFlushed.getTime() + this.config.storeFlushInterval * 1000; + if (this.config.storeCacheAsync) { // Flush all completed block data and don't wait - await this.flushAndWaitForCapacity(false)?.catch((e) => { + await this.flushAndWaitForCapacity(force)?.catch((e) => { exitWithError(new Error(`Flushing cache failed`, {cause: e}), logger); }); } else { // Flush all data from cache and wait - await this.flushData(false); + await this.flushData(force); } if (dataSourcesCompleted) { const msg = `All data sources have been processed up to block number ${height}. Exiting gracefully...`; - await this.flushData(false); + await this.flushData(force); exitWithError(msg, logger, 0); } } diff --git a/packages/node-core/src/indexer/storeModelProvider/utils.ts b/packages/node-core/src/indexer/storeModelProvider/utils.ts index 9dce080873..7043a219e5 100644 --- a/packages/node-core/src/indexer/storeModelProvider/utils.ts +++ b/packages/node-core/src/indexer/storeModelProvider/utils.ts @@ -2,7 +2,6 @@ // SPDX-License-Identifier: GPL-3.0 import {EventEmitter2} from '@nestjs/event-emitter'; -import {SchedulerRegistry} from '@nestjs/schedule'; import {Sequelize} from '@subql/x-sequelize'; import {NodeConfig} from '../../configure'; import {StoreCacheService} from './storeCache.service'; @@ -23,10 +22,9 @@ export async function cacheProviderResetData(modelProvider: IStoreModelProvider) export function storeModelFactory( nodeConfig: NodeConfig, eventEmitter: EventEmitter2, - schedulerRegistry: SchedulerRegistry, sequelize: Sequelize ): IStoreModelProvider { return nodeConfig.enableCache - ? new StoreCacheService(sequelize, nodeConfig, eventEmitter, schedulerRegistry) + ? new StoreCacheService(sequelize, nodeConfig, eventEmitter) : new PlainStoreModelService(sequelize, nodeConfig); } diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts index 757acdf03d..6d3622e823 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.spec.ts @@ -1,11 +1,9 @@ // Copyright 2020-2024 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -// import { Header } from '@polkadot/types/interfaces'; -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { SchedulerRegistry } from '@nestjs/schedule'; -import { Header, IBlock } from '../indexer'; -import { StoreCacheService, CacheMetadataModel } from './storeModelProvider'; +import {EventEmitter2} from '@nestjs/event-emitter'; +import {Header, IBlock} from '../indexer'; +import {StoreCacheService, CacheMetadataModel} from './storeModelProvider'; import { METADATA_LAST_FINALIZED_PROCESSED_KEY, METADATA_UNFINALIZED_BLOCKS_KEY, @@ -49,8 +47,8 @@ class UnfinalizedBlocksService extends BaseUnfinalizedBlocksService> function getMockMetadata(): any { const data: Record = {}; return { - upsert: ({ key, value }: any) => (data[key] = value), - findOne: ({ where: { key } }: any) => ({ value: data[key] }), + upsert: ({key, value}: any) => (data[key] = value), + findOne: ({where: {key}}: any) => ({value: data[key]}), findByPk: (key: string) => data[key], find: (key: string) => data[key], } as any; @@ -65,7 +63,7 @@ function mockStoreCache(): StoreCacheService { function mockBlock(height: number, hash: string, parentHash?: string): IBlock { return { getHeader: () => { - return { blockHeight: height, parentHash: parentHash ?? '', blockHash: hash, timestamp: new Date() }; + return {blockHeight: height, parentHash: parentHash ?? '', blockHash: hash, timestamp: new Date()}; }, block: { header: { @@ -81,7 +79,7 @@ describe('UnfinalizedBlocksService', () => { let unfinalizedBlocksService: UnfinalizedBlocksService; beforeEach(async () => { - unfinalizedBlocksService = new UnfinalizedBlocksService({ unfinalizedBlocks: true } as any, mockStoreCache()); + unfinalizedBlocksService = new UnfinalizedBlocksService({unfinalizedBlocks: true} as any, mockStoreCache()); await unfinalizedBlocksService.init(() => Promise.resolve()); }); @@ -247,25 +245,20 @@ describe('UnfinalizedBlocksService', () => { }); it('can rewind any unfinalized blocks when restarted and unfinalized blocks is disabled', async () => { - const storeCache = new StoreCacheService( - null as any, - { storeCacheThreshold: 300 } as any, - new EventEmitter2(), - new SchedulerRegistry() - ); + const storeCache = new StoreCacheService(null as any, {storeCacheThreshold: 300} as any, new EventEmitter2()); storeCache.init('height', {} as any, undefined); await storeCache.metadata.set( METADATA_UNFINALIZED_BLOCKS_KEY, JSON.stringify([ - { blockHeight: 90, blockHash: '0xabcd' }, - { blockHeight: 91, blockHash: '0xabc91' }, - { blockHeight: 92, blockHash: '0xabc92' }, + {blockHeight: 90, blockHash: '0xabcd'}, + {blockHeight: 91, blockHash: '0xabc91'}, + {blockHeight: 92, blockHash: '0xabc92'}, ]) ); await storeCache.metadata.set(METADATA_LAST_FINALIZED_PROCESSED_KEY, 90); - const unfinalizedBlocksService2 = new UnfinalizedBlocksService({ unfinalizedBlocks: false } as any, storeCache); + const unfinalizedBlocksService2 = new UnfinalizedBlocksService({unfinalizedBlocks: false} as any, storeCache); const reindex = jest.fn().mockReturnValue(Promise.resolve()); diff --git a/packages/node/CHANGELOG.md b/packages/node/CHANGELOG.md index 0131426c1d..261f3dab6f 100644 --- a/packages/node/CHANGELOG.md +++ b/packages/node/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Changed - Update node core to expose PG Pool options (#2646) +- Remove unused SchedulerRegistry dependency with node-core changes ### Added - Bump `@subql/node-core` dependency (#2644) diff --git a/packages/node/src/indexer/indexer.manager.spec.ts b/packages/node/src/indexer/indexer.manager.spec.ts index 2662b939ab..0d1ec7fb99 100644 --- a/packages/node/src/indexer/indexer.manager.spec.ts +++ b/packages/node/src/indexer/indexer.manager.spec.ts @@ -2,7 +2,6 @@ // SPDX-License-Identifier: GPL-3.0 import { EventEmitter2 } from '@nestjs/event-emitter'; -import { SchedulerRegistry } from '@nestjs/schedule'; import { SubstrateDatasourceKind, SubstrateHandlerKind, @@ -166,12 +165,7 @@ async function createIndexerManager( const dsProcessorService = new DsProcessorService(project, nodeConfig); const dynamicDsService = new DynamicDsService(dsProcessorService, project); - const storeCache = new StoreCacheService( - sequelize, - nodeConfig, - eventEmitter, - new SchedulerRegistry(), - ); + const storeCache = new StoreCacheService(sequelize, nodeConfig, eventEmitter); const storeService = new StoreService( sequelize, nodeConfig, diff --git a/packages/node/src/subcommands/reindex.module.ts b/packages/node/src/subcommands/reindex.module.ts index 6f45c5ad19..8ee8cb238f 100644 --- a/packages/node/src/subcommands/reindex.module.ts +++ b/packages/node/src/subcommands/reindex.module.ts @@ -27,7 +27,7 @@ import { UnfinalizedBlocksService } from '../indexer/unfinalizedBlocks.service'; { provide: 'IStoreModelProvider', useFactory: storeModelFactory, - inject: [NodeConfig, EventEmitter2, SchedulerRegistry, Sequelize], + inject: [NodeConfig, EventEmitter2, Sequelize], }, StoreService, ReindexService, diff --git a/packages/node/src/subcommands/testing.module.ts b/packages/node/src/subcommands/testing.module.ts index bbaa0c079f..c454a440c3 100644 --- a/packages/node/src/subcommands/testing.module.ts +++ b/packages/node/src/subcommands/testing.module.ts @@ -33,7 +33,7 @@ import { UnfinalizedBlocksService } from '../indexer/unfinalizedBlocks.service'; { provide: 'IStoreModelProvider', useFactory: storeModelFactory, - inject: [NodeConfig, EventEmitter2, SchedulerRegistry, Sequelize], + inject: [NodeConfig, EventEmitter2, Sequelize], }, EventEmitter2, PoiService,