Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dictionary v2 #2257

Merged
merged 55 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
cdf9be4
node-core and types
jiqiang90 Feb 15, 2024
34ac73c
remove deprecated dictionary
jiqiang90 Feb 16, 2024
9c1ab64
need to fix runtime
jiqiang90 Feb 19, 2024
d5ff192
fix getDsProcessor
jiqiang90 Feb 20, 2024
2099ae3
Tidy up
stwiname Feb 20, 2024
c701c0d
Remove dictionary version enum, tidy up generics and imports
stwiname Feb 20, 2024
424c0a1
runtime test and fix
jiqiang90 Feb 23, 2024
3bb5beb
move tests, and change getDictionary behaviour
jiqiang90 Feb 23, 2024
093bee9
Merge branch 'main' into dictionary-v2
jiqiang90 Feb 25, 2024
3f3ddca
Add changelogs
jiqiang90 Feb 25, 2024
df8107d
Tidy up unused code
stwiname Feb 26, 2024
312c160
Update queryEndBlock and add retry logic
jiqiang90 Feb 27, 2024
6f0504b
return undefined for invalid dictionary index
jiqiang90 Feb 27, 2024
aee65ea
add test for v2
jiqiang90 Feb 27, 2024
a883e20
Merge branch 'main' into dictionary-v2
jiqiang90 Feb 27, 2024
f2ad093
Simplify generics
stwiname Feb 28, 2024
a15f82a
Update packages/node-core/src/indexer/dictionary/v2/dictionaryV2.ts
jiqiang90 Feb 29, 2024
3bf4a0a
Update packages/node-core/src/indexer/dictionary/v2/dictionaryV2.ts
jiqiang90 Feb 29, 2024
850f9b4
generalised getData and other fixes
jiqiang90 Feb 29, 2024
3135927
Unfinalized service use IBlock
jiqiang90 Mar 1, 2024
48a5459
Merge branch 'main' into dictionary-v2
jiqiang90 Mar 1, 2024
bf45f44
remove TODO
jiqiang90 Mar 1, 2024
cd8bd30
Merge remote-tracking branch 'origin/dictionary-v2' into dictionary-v2
jiqiang90 Mar 1, 2024
c769870
Update packages/node-core/src/indexer/dictionary/v2/dictionaryV2.ts
jiqiang90 Mar 3, 2024
0ea7865
Update packages/node-core/src/indexer/dictionary/v2/dictionaryV2.ts
jiqiang90 Mar 3, 2024
6a4d55f
Update packages/node/src/subcommands/testing.service.ts
jiqiang90 Mar 3, 2024
cf016d4
multiple fixes
jiqiang90 Mar 4, 2024
b8e5a24
async create dictionary
jiqiang90 Mar 5, 2024
b8c328a
Merge branch 'main' into dictionary-v2
jiqiang90 Mar 5, 2024
edc38a0
bring resolver back
jiqiang90 Mar 6, 2024
0707e6f
update for validateChainMeta
jiqiang90 Mar 6, 2024
a8e9a1e
Update packages/node-core/src/indexer/dictionary/dictionary.service.ts
jiqiang90 Mar 7, 2024
2200f16
update for validateChainMeta
jiqiang90 Mar 8, 2024
592698a
Merge remote-tracking branch 'origin/main' into dictionary-v2
jiqiang90 Mar 8, 2024
541719f
remove entryscript from test
jiqiang90 Mar 8, 2024
811f45c
fix self import
jiqiang90 Mar 8, 2024
b70acaf
remove
jiqiang90 Mar 8, 2024
06b43a9
rollback
jiqiang90 Mar 8, 2024
6c032bc
space
jiqiang90 Mar 8, 2024
f7f16d2
Update packages/node-core/src/indexer/dictionary/v1/dictionaryV1.ts
jiqiang90 Mar 8, 2024
42d48b5
header
jiqiang90 Mar 8, 2024
b3f5fe3
header
jiqiang90 Mar 8, 2024
b252154
remove SubstrateDsInterface
jiqiang90 Mar 8, 2024
93064d1
update dictionary type in common
jiqiang90 Mar 8, 2024
4af1570
use SubstrateDataSource type
jiqiang90 Mar 8, 2024
488c881
fix dictionary test
jiqiang90 Mar 8, 2024
ec77209
add test for substrate dictionary v1
jiqiang90 Mar 8, 2024
5fe37aa
rename test file
jiqiang90 Mar 8, 2024
2b678f8
add test and queryMapValidByHeight
jiqiang90 Mar 10, 2024
cad42de
dictionary getData test
jiqiang90 Mar 10, 2024
65381fc
fix tests
jiqiang90 Mar 11, 2024
f37a9b3
fix tests
jiqiang90 Mar 11, 2024
52eaa6c
fix metadata log error msg
jiqiang90 Mar 11, 2024
891cff8
tidy up test
jiqiang90 Mar 11, 2024
971562f
Fix unfinalizedBlocks and StoreOp tests
jiqiang90 Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Removed
- `scale-batch-size` flag as it had no use (#2275)

### Added
- Add service to support for dictionary v2 (#2257)

## [7.3.0] - 2024-02-23
### Added
- Schema Migration support for Enums, Relations, Subscription (#2251)
Expand Down
10 changes: 7 additions & 3 deletions packages/node-core/src/configure/NodeConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export interface IConfig {
readonly preferRange: boolean;
readonly networkEndpoint?: string[];
readonly primaryNetworkEndpoint?: string;
readonly networkDictionary?: string;
readonly networkDictionary?: string[];
readonly dictionaryResolver?: string | false;
readonly dictionaryRegistry: string;
readonly outputFmt?: 'json';
Expand Down Expand Up @@ -143,8 +143,12 @@ export class NodeConfig<C extends IConfig = IConfig> implements IConfig {
return this._config.primaryNetworkEndpoint;
}

get networkDictionary(): string | undefined {
return this._config.networkDictionary;
get networkDictionaries(): string[] | undefined | false {
jiqiang90 marked this conversation as resolved.
Show resolved Hide resolved
return typeof this._config.networkDictionary === 'string'
? this._config.networkDictionary === 'false'
? false
: [this._config.networkDictionary]
: this._config.networkDictionary;
}

get allowSchemaMigration(): boolean {
Expand Down
4 changes: 2 additions & 2 deletions packages/node-core/src/configure/configure.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ export async function registerApp<P extends ISubqueryProject>(
omitBy(
{
endpoint: config.networkEndpoints,
dictionary: config.networkDictionary,
dictionary: config.networkDictionaries,
},
isNil
)
Expand All @@ -160,7 +160,7 @@ export async function registerApp<P extends ISubqueryProject>(
// Apply the network endpoint and dictionary from the source project to the parent projects if they are not defined in the config
{
endpoint: config.networkEndpoints ?? project.network.endpoint,
dictionary: config.networkDictionary ?? project.network.dictionary,
dictionary: config.networkDictionaries ?? project.network.dictionary,
},
isNil
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import assert from 'assert';

import {EventEmitter2} from '@nestjs/event-emitter';
import {IBlock} from '@subql/types-core';
import {hexToU8a, u8aEq} from '@subql/utils';
import {NodeConfig, IProjectUpgradeService} from '../../configure';
import {IndexerEvent, PoiEvent} from '../../events';
Expand All @@ -25,9 +26,9 @@ export type ProcessBlockResponse = {
reindexBlockHeight: number | null;
};

export interface IBlockDispatcher {
enqueueBlocks(heights: number[], latestBufferHeight?: number): void | Promise<void>;

export interface IBlockDispatcher<B> {
// now within enqueueBlock should handle getLatestBufferHeight
enqueueBlocks(heights: (IBlock<B> | number)[], latestBufferHeight: number): void | Promise<void>;
queueSize: number;
freeSize: number;
latestBufferedHeight: number;
Expand All @@ -44,7 +45,7 @@ function isNullMerkelRoot(operationHash: Uint8Array): boolean {
return u8aEq(operationHash, NULL_MERKEL_ROOT);
}

export abstract class BaseBlockDispatcher<Q extends IQueue, DS> implements IBlockDispatcher {
export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IBlockDispatcher<B> {
protected _latestBufferedHeight = 0;
protected _processedBlockCount = 0;
protected _latestProcessedHeight = 0;
Expand All @@ -65,7 +66,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS> implements IBloc
protected dynamicDsService: DynamicDsService<any>
) {}

abstract enqueueBlocks(heights: number[], latestBufferHeight?: number): void | Promise<void>;
abstract enqueueBlocks(heights: (IBlock<B> | number)[], latestBufferHeight?: number): void | Promise<void>;

async init(onDynamicDsCreated: (height: number) => Promise<void>): Promise<void> {
this._onDynamicDsCreated = onDynamicDsCreated;
Expand Down
49 changes: 22 additions & 27 deletions packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ import {getHeapStatistics} from 'v8';
import {OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {Interval} from '@nestjs/schedule';
import {last} from 'lodash';
import {IBlock} from '@subql/types-core';
import {NodeConfig} from '../../configure';
import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service';
import {IndexerEvent} from '../../events';
import {PoiSyncService} from '../../indexer';
import {getBlockHeight, PoiSyncService} from '../../indexer';
import {getLogger} from '../../logger';
import {profilerWrap} from '../../profiler';
import {Queue, AutoQueue, delay, memoryLock, waitForBatchSize, isTaskFlushedError} from '../../utils';
import {DynamicDsService} from '../dynamic-ds.service';
import {PoiService} from '../poi/poi.service';
import {SmartBatchService} from '../smartBatch.service';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
Expand All @@ -23,25 +22,24 @@ import {BaseBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher

const logger = getLogger('BlockDispatcherService');

type BatchBlockFetcher<B> = (heights: number[]) => Promise<B[]>;
type BatchBlockFetcher<B> = (heights: number[]) => Promise<IBlock<B>[]>;

/**
* @description Intended to behave the same as WorkerBlockDispatcherService but doesn't use worker threads or any parallel processing
*/
export abstract class BlockDispatcher<B, DS>
extends BaseBlockDispatcher<Queue<number>, DS>
extends BaseBlockDispatcher<Queue<IBlock<B> | number>, DS, B>
implements OnApplicationShutdown
{
private fetchQueue: AutoQueue<B>;
private fetchQueue: AutoQueue<IBlock<B>>;
private processQueue: AutoQueue<void>;

private fetchBlocksBatches: BatchBlockFetcher<B>;

private fetching = false;
private isShutdown = false;

protected abstract indexBlock(block: B): Promise<ProcessBlockResponse>;
protected abstract getBlockHeight(block: B): number;
protected abstract indexBlock(block: IBlock<B>): Promise<ProcessBlockResponse>;

constructor(
nodeConfig: NodeConfig,
Expand Down Expand Up @@ -71,7 +69,6 @@ export abstract class BlockDispatcher<B, DS>
);
this.processQueue = new AutoQueue(nodeConfig.batchSize * 3, 1, nodeConfig.timeout, 'Process');
this.fetchQueue = new AutoQueue(nodeConfig.batchSize * 3, nodeConfig.batchSize, nodeConfig.timeout, 'Fetch');

if (this.nodeConfig.profiler) {
this.fetchBlocksBatches = profilerWrap(fetchBlocksBatches, 'BlockDispatcher', 'fetchBlocksBatches');
} else {
Expand All @@ -84,19 +81,18 @@ export abstract class BlockDispatcher<B, DS>
this.processQueue.abort();
}

enqueueBlocks(heights: number[], latestBufferHeight?: number): void {
enqueueBlocks(heights: (IBlock<B> | number)[], latestBufferHeight: number): void {
// In the case where factors of batchSize is equal to bypassBlock or when heights is []
// to ensure block is bypassed, we set the latestBufferHeight to the heights
// make sure lastProcessedHeight in metadata is updated
if (!!latestBufferHeight && !heights.length) {
if (!heights.length) {
heights = [latestBufferHeight];
}
logger.info(`Enqueueing blocks ${heights[0]}...${last(heights)}, total ${heights.length} blocks`);

// Those blocks will still be filtered in the handler
const startBlockHeight = getBlockHeight(heights[0]);
const endBlockHeight = getBlockHeight(heights[heights.length - 1]);
logger.info(`Enqueueing blocks ${startBlockHeight}...${endBlockHeight}, total ${heights.length} blocks`);
this.queue.putMany(heights);

this.latestBufferedHeight = latestBufferHeight ?? last(heights) ?? this.latestBufferedHeight;
this.latestBufferedHeight = latestBufferHeight;
void this.fetchBlocksFromQueue();
}

Expand All @@ -117,7 +113,6 @@ export abstract class BlockDispatcher<B, DS>
`QUEUE INFO ${stat}: Block numbers: ${this.queue[stat]}, fetch: ${this.fetchQueue[stat]}, process: ${this.processQueue[stat]}`
);
}

private async fetchBlocksFromQueue(): Promise<void> {
if (this.fetching || this.isShutdown) return;

Expand All @@ -131,11 +126,9 @@ export abstract class BlockDispatcher<B, DS>
await delay(1);
continue;
}

const blockNum = this.queue.take();

const blockOrNum = this.queue.take();
// This shouldn't happen but if it does it whould get caught above
if (!blockNum) {
if (!blockOrNum) {
continue;
}

Expand All @@ -152,20 +145,23 @@ export abstract class BlockDispatcher<B, DS>
if (memoryLock.isLocked()) {
await memoryLock.waitForUnlock();
}
const [block] = await this.fetchBlocksBatches([blockNum]);

if (typeof blockOrNum !== 'number') {
// Type is of block
return blockOrNum;
}
const [block] = await this.fetchBlocksBatches([blockOrNum]);
this.smartBatchService.addToSizeBuffer([block]);
return block;
})
.then(
(block) => {
const height = this.getBlockHeight(block);
const {height} = block.getHeader();

return this.processQueue.put(async () => {
// Check if the queues have been flushed between queue.takeMany and fetchBlocksBatches resolving
// Peeking the queue is because the latestBufferedHeight could have regrown since fetching block
const peeked = this.queue.peek();
if (bufferedHeight > this._latestBufferedHeight || (peeked && peeked < blockNum)) {
if (bufferedHeight > this._latestBufferedHeight || (peeked && peeked < height)) {
logger.info(`Queue was reset for new DS, discarding fetched blocks`);
return;
}
Expand All @@ -174,7 +170,6 @@ export abstract class BlockDispatcher<B, DS>
await this.preProcessBlock(height);
// Inject runtimeVersion here to enhance api.at preparation
const processBlockResponse = await this.indexBlock(block);

await this.postProcessBlock(height, processBlockResponse);

//set block to null for garbage collection
Expand All @@ -197,7 +192,7 @@ export abstract class BlockDispatcher<B, DS>
// Do nothing, fetching the block was flushed, this could be caused by forked blocks or dynamic datasources
return;
}
logger.error(e, `Failed to fetch block ${blockNum}.`);
logger.error(e, `Failed to fetch block ${getBlockHeight(blockOrNum)}.`);
throw e;
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import {StoreCacheService} from '../storeCache';
import {IProjectService, ISubqueryProject} from '../types';
import {WorkerBlockDispatcher} from './worker-block-dispatcher';

class TestWorkerBlockDispatcher extends WorkerBlockDispatcher<any, any> {
class TestWorkerBlockDispatcher extends WorkerBlockDispatcher<any, any, any> {
async fetchBlock(worker: any, height: number): Promise<void> {
return Promise.resolve();
}
}
describe('WorkerBlockDispatcher', () => {
let dispatcher: WorkerBlockDispatcher<any, any>;
let dispatcher: WorkerBlockDispatcher<any, any, any>;

// Mock workers
const mockWorkers = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import assert from 'assert';
import {OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {Interval} from '@nestjs/schedule';
import {IBlock} from '@subql/types-core';
import {last} from 'lodash';
import {NodeConfig} from '../../configure';
import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service';
Expand All @@ -13,7 +14,6 @@ import {PoiSyncService} from '../../indexer';
import {getLogger} from '../../logger';
import {AutoQueue, isTaskFlushedError} from '../../utils';
import {DynamicDsService} from '../dynamic-ds.service';
import {PoiService} from '../poi/poi.service';
import {SmartBatchService} from '../smartBatch.service';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
Expand Down Expand Up @@ -42,8 +42,8 @@ function initAutoQueue<T>(
return new AutoQueue(workers * batchSize * 2, 1, timeout, name);
}

export abstract class WorkerBlockDispatcher<DS, W extends Worker>
extends BaseBlockDispatcher<AutoQueue<void>, DS>
export abstract class WorkerBlockDispatcher<DS, W extends Worker, B>
extends BaseBlockDispatcher<AutoQueue<void>, DS, B>
implements OnApplicationShutdown
{
protected workers: W[] = [];
Expand Down Expand Up @@ -100,8 +100,15 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker>
await Promise.all(this.workers.map((w) => w.terminate()));
}
}
async enqueueBlocks(heights: (IBlock<B> | number)[], latestBufferHeight?: number): Promise<void> {
assert(
heights.every((h) => typeof h === 'number'),
'Worker block dispatcher only supports enqueuing numbers, not blocks.'
);
await this._enqueueBlocks(heights as number[], latestBufferHeight);
}

async enqueueBlocks(heights: number[], latestBufferHeight?: number): Promise<void> {
private async _enqueueBlocks(heights: number[], latestBufferHeight?: number): Promise<void> {
jiqiang90 marked this conversation as resolved.
Show resolved Hide resolved
// In the case where factors of batchSize is equal to bypassBlock or when heights is []
// to ensure block is bypassed, we set the latestBufferHeight to the heights
// make sure lastProcessedHeight in metadata is updated
Expand Down
Loading
Loading