From 85e6e2fb34e53841e3f538551d84735f852848d4 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 26 Feb 2024 09:14:43 +0200 Subject: [PATCH 1/3] rm neighbors without infos --- src/crawler/Topology.ts | 46 +++++++++++++++++++++++++++++++++-------- test/Crawler.test.ts | 11 +++++++--- test/Topology.test.ts | 31 +++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 12 deletions(-) create mode 100644 test/Topology.test.ts diff --git a/src/crawler/Topology.ts b/src/crawler/Topology.ts index 8d45b6d..c07b7b7 100644 --- a/src/crawler/Topology.ts +++ b/src/crawler/Topology.ts @@ -1,23 +1,51 @@ import { getNodeIdFromPeerDescriptor } from '@streamr/dht' import { NodeInfo } from '@streamr/trackerless-network' +import { Multimap } from '@streamr/utils' import { DhtAddress, StreamPartID } from 'streamr-client' +interface Node { + id: DhtAddress + streamPartNeighbors: Multimap +} + export class Topology { - private nodeInfos: NodeInfo[] + private nodes: Map = new Map() + + constructor(infos: NodeInfo[]) { + const nodeIds = new Set(...[infos.map((info) => getNodeIdFromPeerDescriptor(info.peerDescriptor))]) + for (const info of infos) { + const streamPartNeighbors: Multimap = new Multimap() + for (const streamPartitionInfo of info.streamPartitions) { + const neighbors = streamPartitionInfo.deliveryLayerNeighbors + .map((n) => getNodeIdFromPeerDescriptor(n)) + .filter((id) => nodeIds.has(id)) + streamPartNeighbors.addAll(streamPartitionInfo.id as StreamPartID, neighbors) + } + const nodeId = getNodeIdFromPeerDescriptor(info.peerDescriptor) + this.nodes.set(nodeId, { + id: nodeId, + streamPartNeighbors + }) + } + } + + getNodes(): Node[] { + return [...this.nodes.values()] + } - constructor(nodeInfos: NodeInfo[]) { - this.nodeInfos = nodeInfos + getNeighbors(nodeId: DhtAddress, streamPartId: StreamPartID): DhtAddress[] { + return this.nodes.get(nodeId)?.streamPartNeighbors.get(streamPartId) ?? [] } getPeers(streamPartId: StreamPartID): Set { const nodeIds: Set = new Set() - for (const info of this.nodeInfos) { - const streamPart = info.streamPartitions.find((sp) => sp.id === streamPartId) - if (streamPart !== undefined) { - nodeIds.add(getNodeIdFromPeerDescriptor(info.peerDescriptor)) - for (const neighbor of streamPart.deliveryLayerNeighbors) { - nodeIds.add(getNodeIdFromPeerDescriptor(neighbor)) + for (const node of this.nodes.values()) { + const neighbors = node.streamPartNeighbors.get(streamPartId) + if (neighbors.length > 0) { + nodeIds.add(node.id) + for (const neighbor of neighbors) { + nodeIds.add(neighbor) } } } diff --git a/test/Crawler.test.ts b/test/Crawler.test.ts index 402dd80..d78c14f 100644 --- a/test/Crawler.test.ts +++ b/test/Crawler.test.ts @@ -15,13 +15,13 @@ describe('Crawler', () => { return { peerDescriptor, controlLayer: { - neighbors: neighbors.get(getNodeIdFromPeerDescriptor(peerDescriptor)) ?? [], + neighbors: [], connections: [] }, streamPartitions: [{ id: STREAM_PART_ID, controlLayerNeighbors: [], - deliveryLayerNeighbors: [] + deliveryLayerNeighbors: neighbors.get(getNodeIdFromPeerDescriptor(peerDescriptor)) ?? [] }], version: '' } @@ -55,7 +55,12 @@ describe('Crawler', () => { ) }) } - const topology = await crawlTopology(localNode as any, [nodes[0], nodes[5]], (response: NodeInfo) => response.controlLayer!.neighbors, '') + const topology = await crawlTopology( + localNode as any, + [nodes[0], nodes[5]], + (response: NodeInfo) => response.streamPartitions[0].deliveryLayerNeighbors, + '' + ) expect(localNode.fetchNodeInfo).toHaveBeenCalledTimes(nodes.length) expect([...topology.getPeers(STREAM_PART_ID)!]).toIncludeSameMembers(nodes.map((n) => getNodeIdFromPeerDescriptor(n))) }) diff --git a/test/Topology.test.ts b/test/Topology.test.ts new file mode 100644 index 0000000..ff11199 --- /dev/null +++ b/test/Topology.test.ts @@ -0,0 +1,31 @@ +import { PeerDescriptor, createRandomDhtAddress, getNodeIdFromPeerDescriptor, getRawFromDhtAddress } from '@streamr/dht' +import { Topology } from '../src/crawler/Topology' +import { StreamPartIDUtils } from '@streamr/protocol' +import { range } from 'lodash' + +const STREAM_PART_ID_1 = StreamPartIDUtils.parse('stream#1') +const STREAM_PART_ID_2 = StreamPartIDUtils.parse('stream#2') + +describe('Topology', () => { + + it('ignore unknown neighbors', () => { + const nodes: PeerDescriptor[] = range(3).map(() => ({ + nodeId: getRawFromDhtAddress(createRandomDhtAddress()), + } as any)) + const topology = new Topology([{ + peerDescriptor: nodes[0], + streamPartitions: [{ + id: STREAM_PART_ID_1, + deliveryLayerNeighbors: [nodes[1], nodes[2]] + }] + }, { + peerDescriptor: nodes[2], + streamPartitions: [{ + id: STREAM_PART_ID_2, + deliveryLayerNeighbors: [nodes[0], nodes[1], nodes[2]] + }] + }] as any) + expect([...topology.getNeighbors(getNodeIdFromPeerDescriptor(nodes[0]), STREAM_PART_ID_1)]).toHaveLength(1) + expect([...topology.getNeighbors(getNodeIdFromPeerDescriptor(nodes[2]), STREAM_PART_ID_2)]).toHaveLength(2) + }) +}) From fa7e8e0419e1203015f1253201044a4e9b86963e Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 26 Feb 2024 11:58:12 +0200 Subject: [PATCH 2/3] use StreamPartIDUtils.parse --- src/crawler/Topology.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/crawler/Topology.ts b/src/crawler/Topology.ts index c07b7b7..46dc956 100644 --- a/src/crawler/Topology.ts +++ b/src/crawler/Topology.ts @@ -1,4 +1,5 @@ import { getNodeIdFromPeerDescriptor } from '@streamr/dht' +import { StreamPartIDUtils } from '@streamr/protocol' import { NodeInfo } from '@streamr/trackerless-network' import { Multimap } from '@streamr/utils' import { DhtAddress, StreamPartID } from 'streamr-client' @@ -20,7 +21,7 @@ export class Topology { const neighbors = streamPartitionInfo.deliveryLayerNeighbors .map((n) => getNodeIdFromPeerDescriptor(n)) .filter((id) => nodeIds.has(id)) - streamPartNeighbors.addAll(streamPartitionInfo.id as StreamPartID, neighbors) + streamPartNeighbors.addAll(StreamPartIDUtils.parse(streamPartitionInfo.id), neighbors) } const nodeId = getNodeIdFromPeerDescriptor(info.peerDescriptor) this.nodes.set(nodeId, { From df4d76580a95fd80214a2eb6dc0bc0b75a67e292 Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Mon, 26 Feb 2024 12:03:07 +0200 Subject: [PATCH 3/3] stricter assertions --- test/Topology.test.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/test/Topology.test.ts b/test/Topology.test.ts index ff11199..1f0b64b 100644 --- a/test/Topology.test.ts +++ b/test/Topology.test.ts @@ -25,7 +25,11 @@ describe('Topology', () => { deliveryLayerNeighbors: [nodes[0], nodes[1], nodes[2]] }] }] as any) - expect([...topology.getNeighbors(getNodeIdFromPeerDescriptor(nodes[0]), STREAM_PART_ID_1)]).toHaveLength(1) - expect([...topology.getNeighbors(getNodeIdFromPeerDescriptor(nodes[2]), STREAM_PART_ID_2)]).toHaveLength(2) + expect([...topology.getNeighbors(getNodeIdFromPeerDescriptor(nodes[0]), STREAM_PART_ID_1)]).toIncludeSameMembers([ + getNodeIdFromPeerDescriptor(nodes[2]) + ]) + expect([...topology.getNeighbors(getNodeIdFromPeerDescriptor(nodes[2]), STREAM_PART_ID_2)]).toIncludeSameMembers([ + getNodeIdFromPeerDescriptor(nodes[0]), getNodeIdFromPeerDescriptor(nodes[2]) + ]) }) })