Skip to content

Commit

Permalink
async Stream methods
Browse files Browse the repository at this point in the history
  • Loading branch information
teogeb committed Nov 15, 2024
1 parent 3633b39 commit 65e60f7
Show file tree
Hide file tree
Showing 19 changed files with 51 additions and 49 deletions.
2 changes: 1 addition & 1 deletion docs/docs/usage/streams/partitioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const stream = await streamr.createStream({
});
console.log(
`Stream created: ${stream.id}. It has ${
stream.getPartitionCount()
await stream.getPartitionCount()
} partitions.`
);
```
Expand Down
6 changes: 3 additions & 3 deletions packages/cli-tools/bin/streamr-storage-node-list-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import { createClientCommand } from '../src/command'
createClientCommand((async (client: StreamrClient, storageNodeAddress: string) => {
const { streams } = await client.getStoredStreams(storageNodeAddress)
if (streams.length > 0) {
console.info(EasyTable.print(streams.map((stream) => {
console.info(EasyTable.print(await Promise.all(streams.map(async (stream) => {
return {
id: stream.id,
partitions: stream.getPartitionCount()
partitions: await stream.getPartitionCount()
}
})))
}))))
}
}))
.arguments('<storageNodeAddress>')
Expand Down
2 changes: 1 addition & 1 deletion packages/cli-tools/test/stream-create.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ describe('create stream', () => {
})
const client = createTestClient()
const stream = await client.getStream(streamId)
expect(stream.getPartitionCount()).toBe(1)
expect(await stream.getPartitionCount()).toBe(1)
await client.destroy()
}, 20 * 1000)

Expand Down
2 changes: 1 addition & 1 deletion packages/node/src/plugins/storage/DeleteExpiredCmd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export class DeleteExpiredCmd {
return {
streamId: stream.streamId,
partition: stream.partition,
storageDays: streamFromChain.getStorageDayCount() ?? 365
storageDays: (await streamFromChain.getStorageDayCount()) ?? 365
}
} catch (err) { logger.error('Failed to fetch stream info', { err }) }
})
Expand Down
16 changes: 8 additions & 8 deletions packages/node/src/plugins/storage/StorageConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ export class StorageConfig {
this.clusterSize = clusterSize
this.myIndexInCluster = myIndexInCluster
this.listener = listener
this.storagePoller = new StoragePoller(clusterId, pollInterval, streamrClient, (streams, block) => {
const streamParts = streams.flatMap((stream: Stream) => ([
...this.createMyStreamParts(stream)
]))
this.storagePoller = new StoragePoller(clusterId, pollInterval, streamrClient, async (streams, block) => {
const streamParts = (await Promise.all(streams.map(async (stream: Stream) => {
return [...await this.createMyStreamParts(stream)]
}))).flat()
this.handleDiff(this.synchronizer.ingestSnapshot(new Set<StreamPartID>(streamParts), block))
})
this.storageEventListener = new StorageEventListener(clusterId, streamrClient, (stream, type, block) => {
const streamParts = this.createMyStreamParts(stream)
this.storageEventListener = new StorageEventListener(clusterId, streamrClient, async (stream, type, block) => {
const streamParts = await this.createMyStreamParts(stream)
this.handleDiff(this.synchronizer.ingestPatch(streamParts, type, block))
})
this.abortController = new AbortController()
Expand All @@ -82,8 +82,8 @@ export class StorageConfig {
return this.synchronizer.getState()
}

private createMyStreamParts(stream: Stream): Set<StreamPartID> {
return new Set<StreamPartID>(stream.getStreamParts().filter((streamPart) => {
private async createMyStreamParts(stream: Stream): Promise<Set<StreamPartID>> {
return new Set<StreamPartID>((await stream.getStreamParts()).filter((streamPart) => {
const hashedIndex = keyToArrayIndex(this.clusterSize, streamPart)
return hashedIndex === this.myIndexInCluster
}))
Expand Down
4 changes: 2 additions & 2 deletions packages/node/src/plugins/storage/StorageEventListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ export class StorageEventListener {
private readonly clusterId: EthereumAddress
private readonly streamrClient: StreamrClient
private readonly onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => void
private readonly onAddToStorageNode: (event: StorageNodeAssignmentEvent) => void
private readonly onRemoveFromStorageNode: (event: StorageNodeAssignmentEvent) => void
private readonly onAddToStorageNode: (event: StorageNodeAssignmentEvent) => unknown
private readonly onRemoveFromStorageNode: (event: StorageNodeAssignmentEvent) => unknown

constructor(
clusterId: EthereumAddress,
Expand Down
2 changes: 1 addition & 1 deletion packages/node/src/plugins/storage/StoragePoller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export class StoragePoller {
clusterId: string,
pollInterval: number,
streamrClient: StreamrClient,
onNewSnapshot: (streams: Stream[], block: number) => void
onNewSnapshot: (streams: Stream[], block: number) => unknown
) {
this.clusterId = clusterId
this.pollInterval = pollInterval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,21 @@ describe('MaintainTopologyService', () => {
await maintainTopologyHelper.start()

await until(async () => {
return containsAll(await getSubscribedStreamPartIds(client), stream1.getStreamParts())
return containsAll(await getSubscribedStreamPartIds(client), await stream1.getStreamParts())
}, 10000, 1000)

await stake(operatorContract, await sponsorship2.getAddress(), 10000)
await until(async () => {
return containsAll(await getSubscribedStreamPartIds(client), [
...stream1.getStreamParts(),
...stream2.getStreamParts()
...await stream1.getStreamParts(),
...await stream2.getStreamParts()
])
}, 10000, 1000)

await (await operatorContract.unstake(await sponsorship1.getAddress())).wait()
await until(async () => {
const state = await getSubscribedStreamPartIds(client)
return containsAll(state, stream2.getStreamParts()) && doesNotContainAny(state, stream1.getStreamParts())
return containsAll(state, await stream2.getStreamParts()) && doesNotContainAny(state, await stream1.getStreamParts())
}, 10000, 1000)
}, 120 * 1000)
})
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function makeStubStream(streamId: string): Stream {
const partitions = PARTITION_COUNT_LOOKUP[streamId]
const stub: Partial<Stream> = {
id: toStreamID(streamId),
getStreamParts(): StreamPartID[] { // TODO: duplicated code from client
async getStreamParts(): Promise<StreamPartID[]> { // TODO: duplicated code from client
return range(0, partitions).map((p) => toStreamPartID(toStreamID(streamId), p))
}
}
Expand Down
22 changes: 11 additions & 11 deletions packages/sdk/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,16 @@ export class Stream {
/**
* Returns the partitions of the stream.
*/
getStreamParts(): StreamPartID[] {
return range(0, this.getPartitionCount()).map((p) => toStreamPartID(this.id, p))
async getStreamParts(): Promise<StreamPartID[]> {
return range(0, await this.getPartitionCount()).map((p) => toStreamPartID(this.id, p))
}

getPartitionCount(): number {
return getPartitionCount(this.getMetadata())
async getPartitionCount(): Promise<number> {
return getPartitionCount(await this.getMetadata())
}

getDescription(): string | undefined {
const value = this.getMetadata().description
async getDescription(): Promise<string | undefined> {
const value = (await this.getMetadata()).description
if (isString(value)) {
return value
} else {
Expand All @@ -133,16 +133,16 @@ export class Stream {

async setDescription(description: string): Promise<void> {
await this.setMetadata({
...this.getMetadata(),
...await this.getMetadata(),
description
})
}

/**
* Gets the value of `storageDays` field
*/
getStorageDayCount(): number | undefined {
const value = this.getMetadata().storageDays
async getStorageDayCount(): Promise<number | undefined> {
const value = (await this.getMetadata()).storageDays
if (isNumber(value)) {
return value
} else {
Expand All @@ -155,15 +155,15 @@ export class Stream {
*/
async setStorageDayCount(count: number): Promise<void> {
await this.setMetadata({
...this.getMetadata(),
...await this.getMetadata(),
storageDays: count
})
}

/**
* Returns the metadata of the stream.
*/
getMetadata(): StreamMetadata {
async getMetadata(): Promise<StreamMetadata> {
return this.metadata
}

Expand Down
6 changes: 4 additions & 2 deletions packages/sdk/test/end-to-end/StreamRegistry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,16 +241,18 @@ describe('StreamRegistry', () => {
await createdStream.setMetadata({
description
})
const createdMetadata = await createdStream.getMetadata()
await until(async () => {
try {
return (await client.getStream(createdStream.id)).getMetadata().description === createdStream.getMetadata().description
const queriedMetadata = await (await client.getStream(createdStream.id)).getMetadata()
return queriedMetadata.description === createdMetadata.description
} catch {
return false
}
}, 100000, 1000)
// check that other fields not overwritten
const updatedStream = await client.getStream(createdStream.id)
expect(updatedStream.getMetadata()).toEqual({
expect(await updatedStream.getMetadata()).toEqual({
description
})
}, TIMEOUT)
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/test/integration/GroupKeyPersistence.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ describe('Group Key Persistence', () => {
})

it('works', async () => {
await startPublisherKeyExchangeSubscription(publisher2, stream.getStreamParts()[0])
await startPublisherKeyExchangeSubscription(publisher2, (await stream.getStreamParts())[0])

const received: Message[] = []
const sub = await subscriber.resend(
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/test/integration/PublisherKeyExchange.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ describe('PublisherKeyExchange', () => {
}
})
const stream = await createStream()
streamPartId = stream.getStreamParts()[0]
streamPartId = (await stream.getStreamParts())[0]
await startPublisherKeyExchangeSubscription(publisherClient, streamPartId)
})

Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/test/integration/StreamrClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe('StreamrClient', () => {
}
})
const stream = await createTestStream(client, module)
streamDefinition = stream.getStreamParts()[0]
streamDefinition = (await stream.getStreamParts())[0]
const publisherWallet = fastWallet()
await stream.grantPermissions({
userId: publisherWallet.address,
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/test/integration/resend-and-subscribe.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ describe('resend and subscribe', () => {
streamId: stream.id,
distributionMethod: 'rekey'
})
await startPublisherKeyExchangeSubscription(publisher, stream.getStreamParts()[0])
await startPublisherKeyExchangeSubscription(publisher, (await stream.getStreamParts())[0])

const historicalMessage = await createMockMessage({
timestamp: 1000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ describe('resend with existing key', () => {
storageNode.storeMessage(message)
}

const resendRange = (fromTimestamp: number, toTimestamp: number) => {
return subscriber.resend(stream.getStreamParts()[0], {
const resendRange = async (fromTimestamp: number, toTimestamp: number) => {
return subscriber.resend((await stream.getStreamParts())[0], {
from: {
timestamp: fromTimestamp
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe('update encryption key', () => {
userId: await subscriber.getUserId(),
permissions: [StreamPermission.SUBSCRIBE]
})
streamPartId = stream.getStreamParts()[0]
streamPartId = (await stream.getStreamParts())[0]
const sub = await subscriber.subscribe(streamPartId)
messageIterator = sub[Symbol.asyncIterator]()
onError = jest.fn()
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/test/test-utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export const createMockMessage = async (
opts: CreateMockMessageOptions
): Promise<StreamMessage> => {
const [streamId, partition] = StreamPartIDUtils.getStreamIDAndPartition(
opts.streamPartId ?? opts.stream.getStreamParts()[0]
opts.streamPartId ?? (await opts.stream.getStreamParts())[0]
)
const authentication = createPrivateKeyAuthentication(opts.publisher.privateKey)
const factory = new MessageFactory({
Expand Down
12 changes: 6 additions & 6 deletions packages/sdk/test/unit/Stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ import { Stream } from '../../src/Stream'

describe('Stream', () => {

it('initial fields', () => {
it('initial fields', async () => {
const stream = new Stream(toStreamID('mock-id'), {}, undefined as any)
expect(stream.getMetadata()).toEqual({})
expect(await stream.getMetadata()).toEqual({})
})

it('getMetadata', () => {
it('getMetadata', async () => {
const stream = new Stream(toStreamID('mock-id'), {
partitions: 10,
storageDays: 20
}, undefined as any)
expect(stream.getMetadata()).toEqual({
expect(await stream.getMetadata()).toEqual({
partitions: 10,
storageDays: 20
})
Expand All @@ -28,7 +28,7 @@ describe('Stream', () => {
{ partitions: 150 },
undefined as any,
)
expect(() => stream.getPartitionCount()).toThrowStreamrError({
expect(() => stream.getPartitionCount()).rejects.toThrowStreamrError({
message: 'Invalid partition count: 150',
code: 'INVALID_STREAM_METADATA'
})
Expand All @@ -49,7 +49,7 @@ describe('Stream', () => {
description: 'updated-description'
})
}).rejects.toThrow('mock-error')
expect(stream.getMetadata().description).toBe('original-description')
expect((await stream.getMetadata()).description).toBe('original-description')
})
})
})

0 comments on commit 65e60f7

Please sign in to comment.