diff --git a/packages/node/src/plugins/operator/OperatorPlugin.ts b/packages/node/src/plugins/operator/OperatorPlugin.ts index c40daaddee..7c4ae64f8c 100644 --- a/packages/node/src/plugins/operator/OperatorPlugin.ts +++ b/packages/node/src/plugins/operator/OperatorPlugin.ts @@ -1,4 +1,4 @@ -import { +import { OperatorDiscoveryRequest, OperatorDiscoveryResponse, peerDescriptorTranslator, @@ -6,13 +6,14 @@ import { SignerWithProvider, StreamrClient } from '@streamr/sdk' -import { +import { + addManagedEventListener, + Cache, EthereumAddress, Logger, - StreamPartIDUtils, - addManagedEventListener, scheduleAtInterval, setAbortableInterval, + StreamPartIDUtils, toEthereumAddress } from '@streamr/utils' import { Schema } from 'ajv' @@ -70,6 +71,8 @@ export interface OperatorServiceConfig { getEthersOverrides: () => Promise } +const STAKED_OPERATORS_CACHE_MAX_AGE = 2 * 24 * 60 * 60 * 1000 + const logger = new Logger(module) export class OperatorPlugin extends Plugin { @@ -139,10 +142,12 @@ export class OperatorPlugin extends Plugin { ) })() }, this.pluginConfig.heartbeatUpdateIntervalInMs, this.abortController.signal) + const stakedOperatorsCache = new Cache(() => operator.getStakedOperators(), STAKED_OPERATORS_CACHE_MAX_AGE) await scheduleAtInterval( async () => checkOperatorValueBreach( operator, streamrClient, + () => stakedOperatorsCache.get(), this.pluginConfig.maintainOperatorValue.minSponsorshipEarningsInWithdraw, this.pluginConfig.maintainOperatorValue.maxSponsorshipsInWithdraw ).catch((err) => { diff --git a/packages/node/src/plugins/operator/checkOperatorValueBreach.ts b/packages/node/src/plugins/operator/checkOperatorValueBreach.ts index 0a2f24221d..2e56f54b3c 100644 --- a/packages/node/src/plugins/operator/checkOperatorValueBreach.ts +++ b/packages/node/src/plugins/operator/checkOperatorValueBreach.ts @@ -1,15 +1,17 @@ import { StreamrClient, Operator } from '@streamr/sdk' -import { Logger } from '@streamr/utils' +import { EthereumAddress, Logger } from '@streamr/utils' +import { sample, without } from 'lodash' const logger = new Logger(module) export const checkOperatorValueBreach = async ( myOperator: Operator, client: StreamrClient, + getStakedOperators: () => Promise, minSponsorshipEarningsInWithdraw: number, maxSponsorshipsInWithdraw: number ): Promise => { - const targetOperatorAddress = await myOperator.getRandomOperator() + const targetOperatorAddress = sample(without(await getStakedOperators(), await myOperator.getContractAddress())) if (targetOperatorAddress === undefined) { logger.info('No operators found') return diff --git a/packages/node/test/integration/plugins/operator/checkOperatorValueBreach.test.ts b/packages/node/test/integration/plugins/operator/checkOperatorValueBreach.test.ts index 545c87dbfa..99878cf7a3 100644 --- a/packages/node/test/integration/plugins/operator/checkOperatorValueBreach.test.ts +++ b/packages/node/test/integration/plugins/operator/checkOperatorValueBreach.test.ts @@ -63,14 +63,11 @@ describe('checkOperatorValueBreach', () => { const client = createClient(watcherWallets[0].privateKey) const operator = client.getOperator(toEthereumAddress(await watcherOperatorContract.getAddress())) - // overwrite (for this test only) the getRandomOperator method to deterministically return the operator's address - operator.getRandomOperator = async () => { - return toEthereumAddress(await operatorContract.getAddress()) - } - logger.debug('Waiting until above', { allowedDifference }) await waitForCondition(async () => await getEarnings(operatorContract) > allowedDifference, 10000, 1000) - await checkOperatorValueBreach(operator, client, 1, 20) + await checkOperatorValueBreach(operator, client, async () => { + return [toEthereumAddress(await operatorContract.getAddress())] + }, 1, 20) const earnings = await getEarnings(operatorContract) expect(earnings).toBeLessThan(allowedDifference) diff --git a/packages/sdk/src/contracts/Operator.ts b/packages/sdk/src/contracts/Operator.ts index f264f3ba93..6d7e734da2 100644 --- a/packages/sdk/src/contracts/Operator.ts +++ b/packages/sdk/src/contracts/Operator.ts @@ -5,7 +5,6 @@ import { collect, ensureValidStreamPartitionIndex, toEthereumAddress, toStreamID } from '@streamr/utils' import { Overrides } from 'ethers' -import sample from 'lodash/sample' import { z } from 'zod' import { Authentication } from '../Authentication' import { DestroySignal } from '../DestroySignal' @@ -394,17 +393,6 @@ export class Operator { await (await this.contract!.flag(sponsorship, operator, metadata, await this.getEthersOverrides())).wait() } - // TODO could move this method as this is functionality is not specific to one Operator contract instance - // (it excludes the current operator from the list, but that exclusion could be done by the caller of this method) - async getRandomOperator(): Promise { - const latestBlock = await this.getCurrentBlockNumber() // TODO maybe we should remove this "feature"? - const operators = await this.getOperatorAddresses(latestBlock) - const excluded = await this.getContractAddress() - const operatorAddresses = operators.filter((id) => id !== excluded) - logger.debug(`Found ${operatorAddresses.length} operators`) - return sample(operatorAddresses) - } - /** * Find the sum of earnings in Sponsorships (that the Operator must withdraw before the sum reaches a limit), * SUBJECT TO the constraints, set in the OperatorServiceConfig: @@ -457,8 +445,7 @@ export class Operator { } // TODO could move this method as this is functionality is not specific to one Operator contract instance - private async getOperatorAddresses(requiredBlockNumber: number): Promise { - // TODO: find a clever more efficient way of selecting a random operator? (NET-1113) + async getStakedOperators(): Promise { const createQuery = (lastId: string, pageSize: number) => { return { query: ` @@ -470,9 +457,7 @@ export class Operator { ` } } - this.theGraphClient.updateRequiredBlockNumber(requiredBlockNumber) const queryResult = this.theGraphClient.queryEntities<{ id: string }>(createQuery) - const operatorAddresses: EthereumAddress[] = [] for await (const operator of queryResult) { operatorAddresses.push(toEthereumAddress(operator.id)) diff --git a/packages/sdk/test/end-to-end/Operator.test.ts b/packages/sdk/test/end-to-end/Operator.test.ts index 72f40b4972..c15cdb3c9e 100644 --- a/packages/sdk/test/end-to-end/Operator.test.ts +++ b/packages/sdk/test/end-to-end/Operator.test.ts @@ -20,6 +20,7 @@ import OperatorArtifact from '../../src/ethereumArtifacts/OperatorAbi.json' import type { OperatorFactory as OperatorFactoryContract } from '../../src/ethereumArtifacts/OperatorFactory' import OperatorFactoryArtifact from '../../src/ethereumArtifacts/OperatorFactoryAbi.json' import type { Sponsorship as SponsorshipContract } from '../../src/ethereumArtifacts/Sponsorship' +import { sample } from 'lodash' const createClient = (privateKey?: string): StreamrClient => { return new StreamrClient({ @@ -79,11 +80,12 @@ describe('Operator', () => { }, 90 * 1000) - it('getRandomOperator', async () => { - const operator = await getOperator(deployedOperator.nodeWallets[0], deployedOperator) - const randomOperatorAddress = await operator.getRandomOperator() + it('getStakedOperators', async () => { + await delegate(deployedOperator.operatorWallet, await deployedOperator.operatorContract.getAddress(), 20000) + await stake(deployedOperator.operatorContract, await sponsorship1.getAddress(), 10000) + const dummyOperator = await getOperator(deployedOperator.nodeWallets[0], deployedOperator) + const randomOperatorAddress = sample(await dummyOperator.getStakedOperators()) expect(randomOperatorAddress).toBeDefined() - expect(randomOperatorAddress).not.toEqual(await deployedOperator.operatorContract.getAddress()) // should not be me // check it's a valid operator, deployed by the OperatorFactory const operatorFactory = new Contract( @@ -93,7 +95,13 @@ describe('Operator', () => { ) as unknown as OperatorFactoryContract const isDeployedByFactory = (await operatorFactory.deploymentTimestamp(randomOperatorAddress!)) > 0 expect(isDeployedByFactory).toBeTrue() - + // check that there is a stake + const operatorContract = new Contract( + randomOperatorAddress!, + OperatorArtifact, + deployedOperator.operatorWallet + ) as unknown as OperatorContract + expect(await operatorContract.totalStakedIntoSponsorshipsWei()).toBeGreaterThan(0n) }, 30 * 1000) it('getSponsorships, getOperatorsInSponsorship', async () => { diff --git a/packages/utils/src/Cache.ts b/packages/utils/src/Cache.ts new file mode 100644 index 0000000000..d7008c6230 --- /dev/null +++ b/packages/utils/src/Cache.ts @@ -0,0 +1,21 @@ +export class Cache { + + private value?: V + private valueTimestamp?: number + private readonly valueFactory: () => Promise + private readonly maxAgeInMilliseconds: number + + constructor(valueFactory: () => Promise, maxAgeInMilliseconds: number) { + this.valueFactory = valueFactory + this.maxAgeInMilliseconds = maxAgeInMilliseconds + } + + async get(): Promise { + const now = Date.now() + if ((this.valueTimestamp === undefined) || (now > (this.valueTimestamp + this.maxAgeInMilliseconds))) { + this.value = await this.valueFactory() + this.valueTimestamp = now + } + return this.value! + } +} diff --git a/packages/utils/src/exports.ts b/packages/utils/src/exports.ts index 6d4fb762e8..f48ec9d9f3 100644 --- a/packages/utils/src/exports.ts +++ b/packages/utils/src/exports.ts @@ -1,3 +1,4 @@ +import { Cache } from './Cache' import { pTransaction } from './pTransaction' import { AbortError, asAbortable } from './asAbortable' import { setAbortableInterval, setAbortableTimeout } from './abortableTimers' @@ -107,7 +108,8 @@ export { ipv4ToNumber, numberToIpv4, hash, - MapWithTtl + MapWithTtl, + Cache } export { diff --git a/packages/utils/test/Cache.test.ts b/packages/utils/test/Cache.test.ts new file mode 100644 index 0000000000..4c9d901420 --- /dev/null +++ b/packages/utils/test/Cache.test.ts @@ -0,0 +1,24 @@ +import { Cache } from '../src/Cache' +import { wait } from '../src/wait' + +const MAX_AGE = 100 +const JITTER_FACTOR = 10 + +describe('Cache', () => { + + it('happy path', async () => { + let plainValue = 'foo' + const valueFactory = jest.fn().mockImplementation(async () => plainValue) + const cache = new Cache(valueFactory, MAX_AGE) + expect(await cache.get()).toEqual('foo') + expect(valueFactory).toHaveBeenCalledTimes(1) + plainValue = 'bar' + // should not change yet + expect(await cache.get()).toEqual('foo') + expect(valueFactory).toHaveBeenCalledTimes(1) + // changes after max age elapsed + await wait(MAX_AGE + JITTER_FACTOR) + expect(await cache.get()).toEqual('bar') + expect(valueFactory).toHaveBeenCalledTimes(2) + }) +})