Skip to content

Commit

Permalink
Merge pull request #5 from streamr-dev/do-not-include-unknown-nodes
Browse files Browse the repository at this point in the history
The topology doesn't include unknown neighbors: if we don't have a `NodeInfo` item for a neighbor, we remove it from the neighbor list.

## Refactoring

Changed internal data structure of `Topology#nodes`.
  • Loading branch information
teogeb authored Feb 26, 2024
2 parents fc64a7e + df4d765 commit fa1a2e0
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 12 deletions.
47 changes: 38 additions & 9 deletions src/crawler/Topology.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,52 @@
import { getNodeIdFromPeerDescriptor } from '@streamr/dht'
import { StreamPartIDUtils } from '@streamr/protocol'
import { NodeInfo } from '@streamr/trackerless-network'
import { Multimap } from '@streamr/utils'
import { DhtAddress, StreamPartID } from 'streamr-client'

interface Node {
id: DhtAddress
streamPartNeighbors: Multimap<StreamPartID, DhtAddress>
}

export class Topology {

private nodeInfos: NodeInfo[]
private nodes: Map<DhtAddress, Node> = new Map()

constructor(infos: NodeInfo[]) {
const nodeIds = new Set(...[infos.map((info) => getNodeIdFromPeerDescriptor(info.peerDescriptor))])
for (const info of infos) {
const streamPartNeighbors: Multimap<StreamPartID, DhtAddress> = new Multimap()
for (const streamPartitionInfo of info.streamPartitions) {
const neighbors = streamPartitionInfo.deliveryLayerNeighbors
.map((n) => getNodeIdFromPeerDescriptor(n))
.filter((id) => nodeIds.has(id))
streamPartNeighbors.addAll(StreamPartIDUtils.parse(streamPartitionInfo.id), neighbors)
}
const nodeId = getNodeIdFromPeerDescriptor(info.peerDescriptor)
this.nodes.set(nodeId, {
id: nodeId,
streamPartNeighbors
})
}
}

getNodes(): Node[] {
return [...this.nodes.values()]
}

constructor(nodeInfos: NodeInfo[]) {
this.nodeInfos = nodeInfos
getNeighbors(nodeId: DhtAddress, streamPartId: StreamPartID): DhtAddress[] {
return this.nodes.get(nodeId)?.streamPartNeighbors.get(streamPartId) ?? []
}

getPeers(streamPartId: StreamPartID): Set<DhtAddress> {
const nodeIds: Set<DhtAddress> = new Set()
for (const info of this.nodeInfos) {
const streamPart = info.streamPartitions.find((sp) => sp.id === streamPartId)
if (streamPart !== undefined) {
nodeIds.add(getNodeIdFromPeerDescriptor(info.peerDescriptor))
for (const neighbor of streamPart.deliveryLayerNeighbors) {
nodeIds.add(getNodeIdFromPeerDescriptor(neighbor))
for (const node of this.nodes.values()) {
const neighbors = node.streamPartNeighbors.get(streamPartId)
if (neighbors.length > 0) {
nodeIds.add(node.id)
for (const neighbor of neighbors) {
nodeIds.add(neighbor)
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions test/Crawler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ describe('Crawler', () => {
return {
peerDescriptor,
controlLayer: {
neighbors: neighbors.get(getNodeIdFromPeerDescriptor(peerDescriptor)) ?? [],
neighbors: [],
connections: []
},
streamPartitions: [{
id: STREAM_PART_ID,
controlLayerNeighbors: [],
deliveryLayerNeighbors: []
deliveryLayerNeighbors: neighbors.get(getNodeIdFromPeerDescriptor(peerDescriptor)) ?? []
}],
version: ''
}
Expand Down Expand Up @@ -55,7 +55,12 @@ describe('Crawler', () => {
)
})
}
const topology = await crawlTopology(localNode as any, [nodes[0], nodes[5]], (response: NodeInfo) => response.controlLayer!.neighbors, '')
const topology = await crawlTopology(
localNode as any,
[nodes[0], nodes[5]],
(response: NodeInfo) => response.streamPartitions[0].deliveryLayerNeighbors,
''
)
expect(localNode.fetchNodeInfo).toHaveBeenCalledTimes(nodes.length)
expect([...topology.getPeers(STREAM_PART_ID)!]).toIncludeSameMembers(nodes.map((n) => getNodeIdFromPeerDescriptor(n)))
})
Expand Down
35 changes: 35 additions & 0 deletions test/Topology.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { PeerDescriptor, createRandomDhtAddress, getNodeIdFromPeerDescriptor, getRawFromDhtAddress } from '@streamr/dht'
import { Topology } from '../src/crawler/Topology'
import { StreamPartIDUtils } from '@streamr/protocol'
import { range } from 'lodash'

const STREAM_PART_ID_1 = StreamPartIDUtils.parse('stream#1')
const STREAM_PART_ID_2 = StreamPartIDUtils.parse('stream#2')

describe('Topology', () => {

it('ignore unknown neighbors', () => {
const nodes: PeerDescriptor[] = range(3).map(() => ({
nodeId: getRawFromDhtAddress(createRandomDhtAddress()),
} as any))
const topology = new Topology([{
peerDescriptor: nodes[0],
streamPartitions: [{
id: STREAM_PART_ID_1,
deliveryLayerNeighbors: [nodes[1], nodes[2]]
}]
}, {
peerDescriptor: nodes[2],
streamPartitions: [{
id: STREAM_PART_ID_2,
deliveryLayerNeighbors: [nodes[0], nodes[1], nodes[2]]
}]
}] as any)
expect([...topology.getNeighbors(getNodeIdFromPeerDescriptor(nodes[0]), STREAM_PART_ID_1)]).toIncludeSameMembers([
getNodeIdFromPeerDescriptor(nodes[2])
])
expect([...topology.getNeighbors(getNodeIdFromPeerDescriptor(nodes[2]), STREAM_PART_ID_2)]).toIncludeSameMembers([
getNodeIdFromPeerDescriptor(nodes[0]), getNodeIdFromPeerDescriptor(nodes[2])
])
})
})

0 comments on commit fa1a2e0

Please sign in to comment.