Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

typeorm: typeorm-store rework #293

Open
wants to merge 11 commits into
base: beta
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 69 additions & 42 deletions common/config/rush/pnpm-lock.yaml

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion typeorm/typeorm-store/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
},
"dependencies": {
"@subsquid/typeorm-config": "^4.1.1",
"@subsquid/util-internal": "^3.2.0"
"@subsquid/util-internal": "^3.2.0",
"@subsquid/logger": "^1.3.3",
"fast-copy": "^3.0.2"
},
"peerDependencies": {
"typeorm": "^0.3.17",
"@subsquid/big-decimal": "^1.0.0"
},
"devDependencies": {
"@types/clone": "^2.1.4",
"@types/mocha": "^10.0.6",
"@types/node": "^18.18.14",
"@types/pg": "^8.10.9",
Expand Down
214 changes: 143 additions & 71 deletions typeorm/typeorm-store/src/database.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,96 @@
import {createOrmConfig} from '@subsquid/typeorm-config'
import {assertNotNull, last, maybeLast} from '@subsquid/util-internal'
import {assertNotNull, def, last, maybeLast} from '@subsquid/util-internal'
import assert from 'assert'
import {DataSource, EntityManager} from 'typeorm'
import {ChangeTracker, rollbackBlock} from './hot'
import {ChangeWriter, rollbackBlock} from './utils/changeWriter'
import {DatabaseState, FinalTxInfo, HashAndHeight, HotTxInfo} from './interfaces'
import {Store} from './store'
import {createLogger} from '@subsquid/logger'
import {StateManager} from './utils/stateManager'
import {sortMetadatasInCommitOrder} from './utils/commitOrder'
import {IsolationLevel} from './utils/tx'


export type IsolationLevel = 'SERIALIZABLE' | 'READ COMMITTED' | 'REPEATABLE READ'

export {IsolationLevel}

export interface TypeormDatabaseOptions {
/**
* Support for storing the data on unfinalized
* blocks and the related rollbacks.
* See {@link https://docs.subsquid.io/sdk/resources/basics/unfinalized-blocks/}
*
* @defaultValue true
*/
supportHotBlocks?: boolean

/**
* PostgreSQL transaction isolation level
* See {@link https://www.postgresql.org/docs/current/transaction-iso.html}
*
* @defaultValue 'SERIALIZABLE'
*/
isolationLevel?: IsolationLevel

/**
* @defaultValue true
*/
batchWriteOperations?: boolean

/**
* @defaultValue true
*/
cacheEntitiesByDefault?: boolean

// FIXME: needs better name, means if we check db if entity is not found in the state
/**
* @defaultValue true
*/
syncOnGet?: boolean

/**
* @defaultValue true
*/
resetOnCommit?: boolean

/**
* Name of the database schema that the processor
* will use to track its state (height and hash of
* the highest indexed block). Set this if you run
* more than one processor against the same DB.
*
* @defaultValue 'squid_processor'
*/
stateSchema?: string

/**
* Directory with model definitions (at lib/model)
* and migrations (at db/migrations).
*
* @defaultValue process.cwd()
*/
projectDir?: string
}

const STATE_MANAGERS: WeakMap<DataSource, StateManager> = new WeakMap()

export class TypeormDatabase {
private statusSchema: string
private isolationLevel: IsolationLevel
private con?: DataSource
private projectDir: string
protected statusSchema: string
protected isolationLevel: IsolationLevel
protected batchWriteOperations: boolean
protected cacheEntitiesByDefault: boolean
protected syncOnGet: boolean
protected resetOnCommit: boolean
protected con?: DataSource
protected projectDir: string

public readonly supportsHotBlocks: boolean

constructor(options?: TypeormDatabaseOptions) {
this.statusSchema = options?.stateSchema || 'squid_processor'
this.isolationLevel = options?.isolationLevel || 'SERIALIZABLE'
this.batchWriteOperations = options?.batchWriteOperations ?? true
this.cacheEntitiesByDefault = options?.cacheEntitiesByDefault ?? true
this.syncOnGet = options?.syncOnGet ?? true
this.resetOnCommit = options?.resetOnCommit ?? true
this.supportsHotBlocks = options?.supportHotBlocks !== false
this.projectDir = options?.projectDir || process.cwd()
}
Expand All @@ -42,48 +104,46 @@ export class TypeormDatabase {
await this.con.initialize()

try {
return await this.con.transaction('SERIALIZABLE', em => this.initTransaction(em))
} catch(e: any) {
return await this.con.transaction('SERIALIZABLE', (em) => this.initTransaction(em))
} catch (e: any) {
await this.con.destroy().catch(() => {}) // ignore error
this.con = undefined
throw e
}
}

async disconnect(): Promise<void> {
await this.con?.destroy().finally(() => this.con = undefined)
await this.con?.destroy().finally(() => (this.con = undefined))
}

private async initTransaction(em: EntityManager): Promise<DatabaseState> {
let schema = this.escapedSchema()

await em.query(
`CREATE SCHEMA IF NOT EXISTS ${schema}`
)
await em.query(`CREATE SCHEMA IF NOT EXISTS ${schema}`)
await em.query(
`CREATE TABLE IF NOT EXISTS ${schema}.status (` +
`id int4 primary key, ` +
`height int4 not null, ` +
`hash text DEFAULT '0x', ` +
`nonce int4 DEFAULT 0`+
`)`
`id int4 primary key, ` +
`height int4 not null, ` +
`hash text DEFAULT '0x', ` +
`nonce int4 DEFAULT 0` +
`)`
)
await em.query( // for databases created by prev version of typeorm store
await em.query(
// for databases created by prev version of typeorm store
`ALTER TABLE ${schema}.status ADD COLUMN IF NOT EXISTS hash text DEFAULT '0x'`
)
await em.query( // for databases created by prev version of typeorm store
`ALTER TABLE ${schema}.status ADD COLUMN IF NOT EXISTS nonce int DEFAULT 0`
)
await em.query(
`CREATE TABLE IF NOT EXISTS ${schema}.hot_block (height int4 primary key, hash text not null)`
// for databases created by prev version of typeorm store
`ALTER TABLE ${schema}.status ADD COLUMN IF NOT EXISTS nonce int DEFAULT 0`
)
await em.query(`CREATE TABLE IF NOT EXISTS ${schema}.hot_block (height int4 primary key, hash text not null)`)
await em.query(
`CREATE TABLE IF NOT EXISTS ${schema}.hot_change_log (` +
`block_height int4 not null references ${schema}.hot_block on delete cascade, ` +
`index int4 not null, ` +
`change jsonb not null, ` +
`PRIMARY KEY (block_height, index)` +
`)`
`block_height int4 not null references ${schema}.hot_block on delete cascade, ` +
`index int4 not null, ` +
`change jsonb not null, ` +
`PRIMARY KEY (block_height, index)` +
`)`
)

let status: (HashAndHeight & {nonce: number})[] = await em.query(
Expand All @@ -94,9 +154,7 @@ export class TypeormDatabase {
status.push({height: -1, hash: '0x', nonce: 0})
}

let top: HashAndHeight[] = await em.query(
`SELECT height, hash FROM ${schema}.hot_block ORDER BY height`
)
let top: HashAndHeight[] = await em.query(`SELECT height, hash FROM ${schema}.hot_block ORDER BY height`)

return assertStateInvariants({...status[0], top})
}
Expand All @@ -110,15 +168,13 @@ export class TypeormDatabase {

assert(status.length == 1)

let top: HashAndHeight[] = await em.query(
`SELECT hash, height FROM ${schema}.hot_block ORDER BY height`
)
let top: HashAndHeight[] = await em.query(`SELECT hash, height FROM ${schema}.hot_block ORDER BY height`)

return assertStateInvariants({...status[0], top})
}

transact(info: FinalTxInfo, cb: (store: Store) => Promise<void>): Promise<void> {
return this.submit(async em => {
return this.submit(async (em) => {
belopash marked this conversation as resolved.
Show resolved Hide resolved
let state = await this.getState(em)
let {prevHead: prev, nextHead: next} = info

Expand Down Expand Up @@ -146,15 +202,21 @@ export class TypeormDatabase {
})
}

transactHot2(info: HotTxInfo, cb: (store: Store, sliceBeg: number, sliceEnd: number) => Promise<void>): Promise<void> {
return this.submit(async em => {
transactHot2(
info: HotTxInfo,
cb: (store: Store, sliceBeg: number, sliceEnd: number) => Promise<void>
): Promise<void> {
return this.submit(async (em) => {
let state = await this.getState(em)
let chain = [state, ...state.top]

assertChainContinuity(info.baseHead, info.newBlocks)
assert(info.finalizedHead.height <= (maybeLast(info.newBlocks) ?? info.baseHead).height)

assert(chain.find(b => b.hash === info.baseHead.hash), RACE_MSG)
assert(
chain.find((b) => b.hash === info.baseHead.hash),
RACE_MSG
)
if (info.newBlocks.length == 0) {
assert(last(chain).hash === info.baseHead.hash, RACE_MSG)
}
Expand All @@ -169,17 +231,17 @@ export class TypeormDatabase {
if (info.newBlocks.length) {
let finalizedEnd = info.finalizedHead.height - info.newBlocks[0].height + 1
if (finalizedEnd > 0) {
await this.performUpdates(store => cb(store, 0, finalizedEnd), em)
await this.performUpdates((store) => cb(store, 0, finalizedEnd), em)
} else {
finalizedEnd = 0
}
for (let i = finalizedEnd; i < info.newBlocks.length; i++) {
let b = info.newBlocks[i]
await this.insertHotBlock(em, b)
await this.performUpdates(
store => cb(store, i, i + 1),
(store) => cb(store, i, i + 1),
em,
new ChangeTracker(em, this.statusSchema, b.height)
new ChangeWriter(em, this.statusSchema, b.height)
)
}
}
Expand All @@ -195,17 +257,14 @@ export class TypeormDatabase {
}

private deleteHotBlocks(em: EntityManager, finalizedHeight: number): Promise<void> {
return em.query(
`DELETE FROM ${this.escapedSchema()}.hot_block WHERE height <= $1`,
[finalizedHeight]
)
return em.query(`DELETE FROM ${this.escapedSchema()}.hot_block WHERE height <= $1`, [finalizedHeight])
}

private insertHotBlock(em: EntityManager, block: HashAndHeight): Promise<void> {
return em.query(
`INSERT INTO ${this.escapedSchema()}.hot_block (height, hash) VALUES ($1, $2)`,
[block.height, block.hash]
)
return em.query(`INSERT INTO ${this.escapedSchema()}.hot_block (height, hash) VALUES ($1, $2)`, [
block.height,
block.hash,
])
}

private async updateStatus(em: EntityManager, nonce: number, next: HashAndHeight): Promise<void> {
Expand All @@ -220,32 +279,29 @@ export class TypeormDatabase {

// Will never happen if isolation level is SERIALIZABLE or REPEATABLE_READ,
// but occasionally people use multiprocessor setups and READ_COMMITTED.
assert.strictEqual(
rowsChanged,
1,
RACE_MSG
)
assert.strictEqual(rowsChanged, 1, RACE_MSG)
}

private async performUpdates(
cb: (store: Store) => Promise<void>,
em: EntityManager,
changeTracker?: ChangeTracker
changeWriter?: ChangeWriter
): Promise<void> {
let running = true

let store = new Store(
() => {
assert(running, `too late to perform db updates, make sure you haven't forgot to await on db query`)
return em
},
changeTracker
)
let store = new Store({
em,
state: this.getStateManager(),
logger: this.getLogger(),
changes: changeWriter,
batchWriteOperations: this.batchWriteOperations,
syncOnGet: this.syncOnGet,
cacheEntitiesByDefault: this.cacheEntitiesByDefault,
})

try {
await cb(store)
await store.sync(this.resetOnCommit)
} finally {
running = false
store['isClosed'] = true
}
}

Expand All @@ -256,7 +312,7 @@ export class TypeormDatabase {
let con = this.con
assert(con != null, 'not connected')
return await con.transaction(this.isolationLevel, tx)
} catch(e: any) {
} catch (e: any) {
if (e.code == '40001' && retries) {
retries -= 1
} else {
Expand All @@ -270,11 +326,28 @@ export class TypeormDatabase {
let con = assertNotNull(this.con)
return con.driver.escape(this.statusSchema)
}
}

@def
private getLogger() {
return createLogger('sqd:typeorm-db')
}

const RACE_MSG = 'status table was updated by foreign process, make sure no other processor is running'
private getStateManager() {
let con = assertNotNull(this.con)
let stateManager = STATE_MANAGERS.get(con)
if (stateManager != null) return stateManager

stateManager = new StateManager({
commitOrder: sortMetadatasInCommitOrder(con),
logger: this.getLogger(),
})
STATE_MANAGERS.set(con, stateManager)

return stateManager
}
}

const RACE_MSG = 'status table was updated by foreign process, make sure no other processor is running'

function assertStateInvariants(state: DatabaseState): DatabaseState {
let height = state.height
Expand All @@ -287,7 +360,6 @@ function assertStateInvariants(state: DatabaseState): DatabaseState {
return state
}


function assertChainContinuity(base: HashAndHeight, chain: HashAndHeight[]) {
let prev = base
for (let b of chain) {
Expand Down
4 changes: 2 additions & 2 deletions typeorm/typeorm-store/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export * from './database'
export {EntityClass, FindManyOptions, FindOneOptions, Store} from './store'
export * from './store'
export * from './decorators'
export * from './transformers'
export * from './transformers'
Loading