diff --git a/src/crawler/Topology.ts b/src/crawler/Topology.ts index 8d45b6d..46dc956 100644 --- a/src/crawler/Topology.ts +++ b/src/crawler/Topology.ts @@ -1,23 +1,52 @@ import { getNodeIdFromPeerDescriptor } from '@streamr/dht' +import { StreamPartIDUtils } from '@streamr/protocol' import { NodeInfo } from '@streamr/trackerless-network' +import { Multimap } from '@streamr/utils' import { DhtAddress, StreamPartID } from 'streamr-client' +interface Node { + id: DhtAddress + streamPartNeighbors: Multimap +} + export class Topology { - private nodeInfos: NodeInfo[] + private nodes: Map = new Map() + + constructor(infos: NodeInfo[]) { + const nodeIds = new Set(...[infos.map((info) => getNodeIdFromPeerDescriptor(info.peerDescriptor))]) + for (const info of infos) { + const streamPartNeighbors: Multimap = new Multimap() + for (const streamPartitionInfo of info.streamPartitions) { + const neighbors = streamPartitionInfo.deliveryLayerNeighbors + .map((n) => getNodeIdFromPeerDescriptor(n)) + .filter((id) => nodeIds.has(id)) + streamPartNeighbors.addAll(StreamPartIDUtils.parse(streamPartitionInfo.id), neighbors) + } + const nodeId = getNodeIdFromPeerDescriptor(info.peerDescriptor) + this.nodes.set(nodeId, { + id: nodeId, + streamPartNeighbors + }) + } + } + + getNodes(): Node[] { + return [...this.nodes.values()] + } - constructor(nodeInfos: NodeInfo[]) { - this.nodeInfos = nodeInfos + getNeighbors(nodeId: DhtAddress, streamPartId: StreamPartID): DhtAddress[] { + return this.nodes.get(nodeId)?.streamPartNeighbors.get(streamPartId) ?? [] } getPeers(streamPartId: StreamPartID): Set { const nodeIds: Set = new Set() - for (const info of this.nodeInfos) { - const streamPart = info.streamPartitions.find((sp) => sp.id === streamPartId) - if (streamPart !== undefined) { - nodeIds.add(getNodeIdFromPeerDescriptor(info.peerDescriptor)) - for (const neighbor of streamPart.deliveryLayerNeighbors) { - nodeIds.add(getNodeIdFromPeerDescriptor(neighbor)) + for (const node of this.nodes.values()) { + const neighbors = node.streamPartNeighbors.get(streamPartId) + if (neighbors.length > 0) { + nodeIds.add(node.id) + for (const neighbor of neighbors) { + nodeIds.add(neighbor) } } } diff --git a/test/Crawler.test.ts b/test/Crawler.test.ts index 402dd80..d78c14f 100644 --- a/test/Crawler.test.ts +++ b/test/Crawler.test.ts @@ -15,13 +15,13 @@ describe('Crawler', () => { return { peerDescriptor, controlLayer: { - neighbors: neighbors.get(getNodeIdFromPeerDescriptor(peerDescriptor)) ?? [], + neighbors: [], connections: [] }, streamPartitions: [{ id: STREAM_PART_ID, controlLayerNeighbors: [], - deliveryLayerNeighbors: [] + deliveryLayerNeighbors: neighbors.get(getNodeIdFromPeerDescriptor(peerDescriptor)) ?? [] }], version: '' } @@ -55,7 +55,12 @@ describe('Crawler', () => { ) }) } - const topology = await crawlTopology(localNode as any, [nodes[0], nodes[5]], (response: NodeInfo) => response.controlLayer!.neighbors, '') + const topology = await crawlTopology( + localNode as any, + [nodes[0], nodes[5]], + (response: NodeInfo) => response.streamPartitions[0].deliveryLayerNeighbors, + '' + ) expect(localNode.fetchNodeInfo).toHaveBeenCalledTimes(nodes.length) expect([...topology.getPeers(STREAM_PART_ID)!]).toIncludeSameMembers(nodes.map((n) => getNodeIdFromPeerDescriptor(n))) }) diff --git a/test/Topology.test.ts b/test/Topology.test.ts new file mode 100644 index 0000000..1f0b64b --- /dev/null +++ b/test/Topology.test.ts @@ -0,0 +1,35 @@ +import { PeerDescriptor, createRandomDhtAddress, getNodeIdFromPeerDescriptor, getRawFromDhtAddress } from '@streamr/dht' +import { Topology } from '../src/crawler/Topology' +import { StreamPartIDUtils } from '@streamr/protocol' +import { range } from 'lodash' + +const STREAM_PART_ID_1 = StreamPartIDUtils.parse('stream#1') +const STREAM_PART_ID_2 = StreamPartIDUtils.parse('stream#2') + +describe('Topology', () => { + + it('ignore unknown neighbors', () => { + const nodes: PeerDescriptor[] = range(3).map(() => ({ + nodeId: getRawFromDhtAddress(createRandomDhtAddress()), + } as any)) + const topology = new Topology([{ + peerDescriptor: nodes[0], + streamPartitions: [{ + id: STREAM_PART_ID_1, + deliveryLayerNeighbors: [nodes[1], nodes[2]] + }] + }, { + peerDescriptor: nodes[2], + streamPartitions: [{ + id: STREAM_PART_ID_2, + deliveryLayerNeighbors: [nodes[0], nodes[1], nodes[2]] + }] + }] as any) + expect([...topology.getNeighbors(getNodeIdFromPeerDescriptor(nodes[0]), STREAM_PART_ID_1)]).toIncludeSameMembers([ + getNodeIdFromPeerDescriptor(nodes[2]) + ]) + expect([...topology.getNeighbors(getNodeIdFromPeerDescriptor(nodes[2]), STREAM_PART_ID_2)]).toIncludeSameMembers([ + getNodeIdFromPeerDescriptor(nodes[0]), getNodeIdFromPeerDescriptor(nodes[2]) + ]) + }) +})