Skip to content

Commit

Permalink
perf(node): [NET-1113] Optimize operators query in `checkOperatorValu…
Browse files Browse the repository at this point in the history
…eBreach` (#2721)

The `checkOperatorValueBreach` functionality uses cached list of staked
operators instead of querying random operator address on each
invocation.

## Changes
- `OperatorPlugin` creates a cache of staked operators, gives that as an
to the `checkOperatorValueBreach` call
- added `Cache` utility to `@streamr/utils` package
- renamed `Operator#getOperatorAddresses` -> `getStakedOperators`, it no
longer uses `updateRequiredBlockNumber()` functionality

---------

Co-authored-by: Eric Andrews <[email protected]>
  • Loading branch information
teogeb and harbu authored Aug 21, 2024
1 parent 6b5f755 commit 4cabeeb
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 34 deletions.
13 changes: 9 additions & 4 deletions packages/node/src/plugins/operator/OperatorPlugin.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import {
import {
OperatorDiscoveryRequest,
OperatorDiscoveryResponse,
peerDescriptorTranslator,
ReviewRequestEvent,
SignerWithProvider,
StreamrClient
} from '@streamr/sdk'
import {
import {
addManagedEventListener,
Cache,
EthereumAddress,
Logger,
StreamPartIDUtils,
addManagedEventListener,
scheduleAtInterval,
setAbortableInterval,
StreamPartIDUtils,
toEthereumAddress
} from '@streamr/utils'
import { Schema } from 'ajv'
Expand Down Expand Up @@ -70,6 +71,8 @@ export interface OperatorServiceConfig {
getEthersOverrides: () => Promise<Overrides>
}

const STAKED_OPERATORS_CACHE_MAX_AGE = 2 * 24 * 60 * 60 * 1000

const logger = new Logger(module)

export class OperatorPlugin extends Plugin<OperatorPluginConfig> {
Expand Down Expand Up @@ -139,10 +142,12 @@ export class OperatorPlugin extends Plugin<OperatorPluginConfig> {
)
})()
}, this.pluginConfig.heartbeatUpdateIntervalInMs, this.abortController.signal)
const stakedOperatorsCache = new Cache(() => operator.getStakedOperators(), STAKED_OPERATORS_CACHE_MAX_AGE)
await scheduleAtInterval(
async () => checkOperatorValueBreach(
operator,
streamrClient,
() => stakedOperatorsCache.get(),
this.pluginConfig.maintainOperatorValue.minSponsorshipEarningsInWithdraw,
this.pluginConfig.maintainOperatorValue.maxSponsorshipsInWithdraw
).catch((err) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { StreamrClient, Operator } from '@streamr/sdk'
import { Logger } from '@streamr/utils'
import { EthereumAddress, Logger } from '@streamr/utils'
import { sample, without } from 'lodash'

const logger = new Logger(module)

export const checkOperatorValueBreach = async (
myOperator: Operator,
client: StreamrClient,
getStakedOperators: () => Promise<EthereumAddress[]>,
minSponsorshipEarningsInWithdraw: number,
maxSponsorshipsInWithdraw: number
): Promise<void> => {
const targetOperatorAddress = await myOperator.getRandomOperator()
const targetOperatorAddress = sample(without(await getStakedOperators(), await myOperator.getContractAddress()))
if (targetOperatorAddress === undefined) {
logger.info('No operators found')
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,11 @@ describe('checkOperatorValueBreach', () => {
const client = createClient(watcherWallets[0].privateKey)
const operator = client.getOperator(toEthereumAddress(await watcherOperatorContract.getAddress()))

// overwrite (for this test only) the getRandomOperator method to deterministically return the operator's address
operator.getRandomOperator = async () => {
return toEthereumAddress(await operatorContract.getAddress())
}

logger.debug('Waiting until above', { allowedDifference })
await waitForCondition(async () => await getEarnings(operatorContract) > allowedDifference, 10000, 1000)
await checkOperatorValueBreach(operator, client, 1, 20)
await checkOperatorValueBreach(operator, client, async () => {
return [toEthereumAddress(await operatorContract.getAddress())]
}, 1, 20)

const earnings = await getEarnings(operatorContract)
expect(earnings).toBeLessThan(allowedDifference)
Expand Down
17 changes: 1 addition & 16 deletions packages/sdk/src/contracts/Operator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
collect, ensureValidStreamPartitionIndex, toEthereumAddress, toStreamID
} from '@streamr/utils'
import { Overrides } from 'ethers'
import sample from 'lodash/sample'
import { z } from 'zod'
import { Authentication } from '../Authentication'
import { DestroySignal } from '../DestroySignal'
Expand Down Expand Up @@ -394,17 +393,6 @@ export class Operator {
await (await this.contract!.flag(sponsorship, operator, metadata, await this.getEthersOverrides())).wait()
}

// TODO could move this method as this is functionality is not specific to one Operator contract instance
// (it excludes the current operator from the list, but that exclusion could be done by the caller of this method)
async getRandomOperator(): Promise<EthereumAddress | undefined> {
const latestBlock = await this.getCurrentBlockNumber() // TODO maybe we should remove this "feature"?
const operators = await this.getOperatorAddresses(latestBlock)
const excluded = await this.getContractAddress()
const operatorAddresses = operators.filter((id) => id !== excluded)
logger.debug(`Found ${operatorAddresses.length} operators`)
return sample(operatorAddresses)
}

/**
* Find the sum of earnings in Sponsorships (that the Operator must withdraw before the sum reaches a limit),
* SUBJECT TO the constraints, set in the OperatorServiceConfig:
Expand Down Expand Up @@ -457,8 +445,7 @@ export class Operator {
}

// TODO could move this method as this is functionality is not specific to one Operator contract instance
private async getOperatorAddresses(requiredBlockNumber: number): Promise<EthereumAddress[]> {
// TODO: find a clever more efficient way of selecting a random operator? (NET-1113)
async getStakedOperators(): Promise<EthereumAddress[]> {
const createQuery = (lastId: string, pageSize: number) => {
return {
query: `
Expand All @@ -470,9 +457,7 @@ export class Operator {
`
}
}
this.theGraphClient.updateRequiredBlockNumber(requiredBlockNumber)
const queryResult = this.theGraphClient.queryEntities<{ id: string }>(createQuery)

const operatorAddresses: EthereumAddress[] = []
for await (const operator of queryResult) {
operatorAddresses.push(toEthereumAddress(operator.id))
Expand Down
18 changes: 13 additions & 5 deletions packages/sdk/test/end-to-end/Operator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import OperatorArtifact from '../../src/ethereumArtifacts/OperatorAbi.json'
import type { OperatorFactory as OperatorFactoryContract } from '../../src/ethereumArtifacts/OperatorFactory'
import OperatorFactoryArtifact from '../../src/ethereumArtifacts/OperatorFactoryAbi.json'
import type { Sponsorship as SponsorshipContract } from '../../src/ethereumArtifacts/Sponsorship'
import { sample } from 'lodash'

const createClient = (privateKey?: string): StreamrClient => {
return new StreamrClient({
Expand Down Expand Up @@ -79,11 +80,12 @@ describe('Operator', () => {

}, 90 * 1000)

it('getRandomOperator', async () => {
const operator = await getOperator(deployedOperator.nodeWallets[0], deployedOperator)
const randomOperatorAddress = await operator.getRandomOperator()
it('getStakedOperators', async () => {
await delegate(deployedOperator.operatorWallet, await deployedOperator.operatorContract.getAddress(), 20000)
await stake(deployedOperator.operatorContract, await sponsorship1.getAddress(), 10000)
const dummyOperator = await getOperator(deployedOperator.nodeWallets[0], deployedOperator)
const randomOperatorAddress = sample(await dummyOperator.getStakedOperators())
expect(randomOperatorAddress).toBeDefined()
expect(randomOperatorAddress).not.toEqual(await deployedOperator.operatorContract.getAddress()) // should not be me

// check it's a valid operator, deployed by the OperatorFactory
const operatorFactory = new Contract(
Expand All @@ -93,7 +95,13 @@ describe('Operator', () => {
) as unknown as OperatorFactoryContract
const isDeployedByFactory = (await operatorFactory.deploymentTimestamp(randomOperatorAddress!)) > 0
expect(isDeployedByFactory).toBeTrue()

// check that there is a stake
const operatorContract = new Contract(
randomOperatorAddress!,
OperatorArtifact,
deployedOperator.operatorWallet
) as unknown as OperatorContract
expect(await operatorContract.totalStakedIntoSponsorshipsWei()).toBeGreaterThan(0n)
}, 30 * 1000)

it('getSponsorships, getOperatorsInSponsorship', async () => {
Expand Down
21 changes: 21 additions & 0 deletions packages/utils/src/Cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
export class Cache<V> {

private value?: V
private valueTimestamp?: number
private readonly valueFactory: () => Promise<V>
private readonly maxAgeInMilliseconds: number

constructor(valueFactory: () => Promise<V>, maxAgeInMilliseconds: number) {
this.valueFactory = valueFactory
this.maxAgeInMilliseconds = maxAgeInMilliseconds
}

async get(): Promise<V> {
const now = Date.now()
if ((this.valueTimestamp === undefined) || (now > (this.valueTimestamp + this.maxAgeInMilliseconds))) {
this.value = await this.valueFactory()
this.valueTimestamp = now
}
return this.value!
}
}
4 changes: 3 additions & 1 deletion packages/utils/src/exports.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Cache } from './Cache'
import { pTransaction } from './pTransaction'
import { AbortError, asAbortable } from './asAbortable'
import { setAbortableInterval, setAbortableTimeout } from './abortableTimers'
Expand Down Expand Up @@ -107,7 +108,8 @@ export {
ipv4ToNumber,
numberToIpv4,
hash,
MapWithTtl
MapWithTtl,
Cache
}

export {
Expand Down
24 changes: 24 additions & 0 deletions packages/utils/test/Cache.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { Cache } from '../src/Cache'
import { wait } from '../src/wait'

const MAX_AGE = 100
const JITTER_FACTOR = 10

describe('Cache', () => {

it('happy path', async () => {
let plainValue = 'foo'
const valueFactory = jest.fn().mockImplementation(async () => plainValue)
const cache = new Cache(valueFactory, MAX_AGE)
expect(await cache.get()).toEqual('foo')
expect(valueFactory).toHaveBeenCalledTimes(1)
plainValue = 'bar'
// should not change yet
expect(await cache.get()).toEqual('foo')
expect(valueFactory).toHaveBeenCalledTimes(1)
// changes after max age elapsed
await wait(MAX_AGE + JITTER_FACTOR)
expect(await cache.get()).toEqual('bar')
expect(valueFactory).toHaveBeenCalledTimes(2)
})
})

0 comments on commit 4cabeeb

Please sign in to comment.