Skip to content

Commit

Permalink
fixed a test
Browse files Browse the repository at this point in the history
  • Loading branch information
NikolayBlagoev committed Mar 6, 2024
1 parent 631183e commit 812997f
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 46 deletions.
21 changes: 17 additions & 4 deletions deccom/protocols/peerdiscovery/_kademlia_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def remove_peer(self, dist):
if len(self.toadd) > 0:

dist,peer = self.toadd.pop()
# self.success_call(dist, peer)
self.success_call(dist, peer)
self.peers[dist] = peer

def get_peer(self, dist):
Expand Down Expand Up @@ -130,12 +130,17 @@ def update_peer(self, id, node) -> Peer:


def add_peer(self,id,node, lv=0):
if self.get_peer(id) != None:
return None
if isinstance(id, bytearray):
id = bytes(id)
# print(lv)
if isinstance(id, bytes):
id = int.from_bytes(id, byteorder="big")
dist = self.id ^ id
if dist == 0:
print("added self")
return
indx = self._get_index(dist)

ret = self.buckets[indx].add_peer(dist,node)
Expand Down Expand Up @@ -180,15 +185,23 @@ def get_closest(self, id, alpha = None) -> list[Peer]:
diff = 1
idx += diff
stopper = max(idx, len(self.buckets)-idx)
# print("stopper", idx, len(self.buckets), stopper, len(lst), alpha)
# acc = 0
# for b in self.buckets:
# acc += len(b.peers)
# print("we know",acc)

while len(lst) < alpha:
if idx >= 0 and idx < len(self.buckets):
lst += list(self.buckets[idx].peers.values())

if idx + diff >= 0 and idx + diff < len(self.buckets):
lst += list(self.buckets[idx + diff].peers.values())
if diff < 0:
diff *= -1
diff += 1
else:
diff *= -1
if abs(diff) > stopper:

if abs(diff) > stopper + 1:
break
lst = lst[:alpha]
return lst
Expand Down
34 changes: 22 additions & 12 deletions deccom/protocols/peerdiscovery/kademliadiscovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ async def _refresh_table(self):
self.refresh_loop = loop.call_later(self.interval+2, self.refresh_table)

def remove_peer(self, addr: tuple[str, int], node_id: bytes):
print("removing peer.")
if self.bucket_manager.get_peer(node_id) == None:
return
print(self.peer.pub_key, "removing peer.", self.bucket_manager.get_peer(node_id).pub_key)
self.bucket_manager.remove_peer(node_id)
return super().remove_peer(addr, node_id)

Expand Down Expand Up @@ -115,7 +117,7 @@ def process_datagram(self, addr: tuple[str, int], data: bytes):
elif data[0] == KademliaDiscovery.FIND or data[0] ^ KademliaDiscovery.FIND == 1:
# print("peer looking")
if self.sent_finds.get(data) != None:
# print("duplicate")
print(self.peer.pub_key, "duplicate")
return


Expand All @@ -132,23 +134,25 @@ def process_datagram(self, addr: tuple[str, int], data: bytes):
loop = asyncio.get_running_loop()
loop.create_task(self._lower_sendto(msg, addr))
elif self.get_peer(id) != None and data[0] == KademliaDiscovery.FIND:
print(self.peer.pub_key,"I KNOW THAT GUY!",self.get_peer(id).pub_key)
self.send_find_response(addr,[self.get_peer(id)],unique_id)
else:

closest_peers = self.bucket_manager.get_closest(id,alpha=3)
if len(closest_peers) == 0:
# print("oops dont know anyone :/")
print("oops dont know anyone :/")
return

# print(self.peer.pub_key,": i know someone close", len(closest_peers))
print(self.peer.pub_key,": i know someone close", len(closest_peers))
self.send_find_response(addr,closest_peers,unique_id)

elif data[0] == KademliaDiscovery.RESPOND_FIND:
# print(self.peer.pub_key,"got a response",addr)
print(self.peer.pub_key,"got a response",addr)
i = 1
unique_id = data[i:i+8]
i+=8
if self.searches.get(unique_id) == None and self.warmup >= 7:
print(self.peer.pub_key,"NOT A VALID SEARCH")
return
else:
self.warmup += 1
Expand All @@ -164,7 +168,7 @@ def process_datagram(self, addr: tuple[str, int], data: bytes):
if self.searches.get(unique_id) != None:
for p in peers:
if p.id_node == self.searches.get(unique_id):
# print("oh he in here!")
print("oh he in here!")
loop = asyncio.get_running_loop()
loop.create_task(self.send_find(unique_id,p))
self.connection_approval(p.addr,p,self.add_peer,self.ban_peer)
Expand All @@ -180,7 +184,10 @@ def process_datagram(self, addr: tuple[str, int], data: bytes):

if self.warmup < 7:
loop = asyncio.get_running_loop()
loop.create_task(self.send_find(unique_id,self.bucket_manager.get_closest(self.peer.id_node, 1)[0], True))
ret = self.bucket_manager.get_closest(self.peer.id_node, 1)

if len(ret) > 0:
loop.create_task(self.send_find(unique_id,ret[0], True))

elif data[0] == KademliaDiscovery.ASK_FOR_ID:
# print("ASKING FOR ID")
Expand Down Expand Up @@ -215,7 +222,8 @@ async def send_find(self, unique_id, p: Peer, bypass = False):
msg += unique_id
msg += self.peer.id_node
await self._lower_sendto(msg,p.addr)

else:
print(self.peer.pub_key, "not bypassing", p.pub_key)
return
msg = bytearray([KademliaDiscovery.FIND])
msg += unique_id
Expand All @@ -241,13 +249,13 @@ def update_peer(self, p: Peer):

def add_peer(self, addr: tuple[str,int], p: Peer):
# print(p)
# if self.peer.pub_key != "0":
# print(self.peer.pub_key," : adding peer", p.pub_key)
if self.peer.pub_key != "0":
print(self.peer.pub_key," : adding peer", p.pub_key)
ret = self.bucket_manager.add_peer(p.id_node,p)
if ret != None:
print(self.peer.pub_key,"oops, kinda big for", p.pub_key)
loop = asyncio.get_event_loop()
loop.create_task(self._lower_ping(ret[1].addr, lambda addr, peer=ret[1], self=self: self.update_peer(peer), lambda addr, oldp=ret[1], self=self: self.remove_peer(addr, oldp.id_node), 5))
loop.create_task(self._lower_ping(ret[1].addr, lambda addr, peer=ret[1], self=self: self.update_peer(peer), lambda addr, oldp=ret[1], self=self: self.remove_peer(addr, oldp.id_node), 8))
else:
self.successful_add(addr,p)

Expand All @@ -267,8 +275,10 @@ async def _find_peer(self, fut, id):
id = SHA256(id)
msg += id
l = self.bucket_manager.get_closest(id,10)
# print(self.peer.pub_key,"we found a list of", len(l))
print(self.peer.pub_key,"we found a list of", len(l), id)

for p in l:
print("sending to ", p.pub_key)
await self._lower_sendto(msg, p.addr)

return
Expand Down
5 changes: 3 additions & 2 deletions stubs/network_stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ async def call_callback(self, addr,data):
async def start(self):
return
def timeout(self, addr, error, msg_id):
print("timed out")
if self.pings.get(msg_id) is None:
return
del self.pings[msg_id]
Expand All @@ -71,8 +72,8 @@ async def send_ping(self, addr, success, error, dt = 10):
while self.pings.get(msg_id) != None:
bts = os.urandom(4)
msg_id = int.from_bytes(bts, "big")
# print("sending ping",addr)
timeout = loop.call_later(dt+2,
print("sending ping",addr,dt)
timeout = loop.call_later(dt,
self.timeout, addr,error,msg_id)
self.pings[msg_id] = (success, timeout)
trmp = bytearray([NetworkStub.PING])
Expand Down
2 changes: 1 addition & 1 deletion stubs/node_stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self, peer: Peer, protocol: AbstractProtocol, ip_addr = "0.0.0.0",
self.ip_addr = ip_addr
self.call_back = call_back
self.peers: dict[bytes,tuple[str,int]] = dict()
print(f"Node listening on {ip_addr}:{port}")
print(f"{peer.pub_key}, Node listening on {ip_addr}:{port}")
self.protocol_type = protocol
protocol.callback = call_back
self.peer = peer
Expand Down
72 changes: 45 additions & 27 deletions test_protocol_kademlia.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from stubs.network_stub import NetworkStub
from stubs.node_stub import NodeStub

class test_protocol_kademlia(unittest.TestCase):
class test_protocol_kademlia(unittest.IsolatedAsyncioTestCase):
def setUp(self):
NetworkStub.connections = {}
self.p1 = Peer(None, pub_key=str(0))
self.loop = asyncio.new_event_loop()
pl = NetworkStub()
Expand Down Expand Up @@ -52,8 +53,10 @@ def test_kademlia_should_find(self):
for p in prlist:
self.loop.run_until_complete(k.find_peer(bytes(p.id_node)))
self.assertEqual(p.id_node, k.get_peer(bytes(p.id_node)).id_node)

def test_ensure_not_central(self):
def doCleanups(self) -> None:
self.n1.set_listen(False)
return super().doCleanups()
async def test_ensure_not_central(self):

prlist = []
kls = []
Expand All @@ -67,70 +70,85 @@ def test_ensure_not_central(self):
k2.set_lower(pl)
kls.append(k2)
n2 = NodeStub(p2, k2)
self.loop.run_until_complete(n2.listen())
self.loop.run_until_complete(asyncio.sleep(3))
await n2.listen()
await asyncio.sleep(3)
self.n1.set_listen(False)
for k in kls:
for p in prlist:
# print("do we know?")
self.loop.run_until_complete(k.find_peer(bytes(p.id_node)))
print("do we know?")
await k.find_peer(bytes(p.id_node))
self.assertEqual(p.id_node, k.get_peer(bytes(p.id_node)).id_node)
self.n1.set_listen(True)

def test_small_bucket(self):
class test_protocol_kademlia_2(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.p1 = Peer(None, pub_key=str(0))
self.loop = asyncio.new_event_loop()
NetworkStub.connections = {}
pl = NetworkStub()
kl = KademliaDiscovery()
kl.set_lower(pl)
self.n1 = NodeStub(self.p1, kl)
self.loop.run_until_complete(self.n1.listen())
async def test_small_bucket(self):

prlist = []
kls = []



p3= Peer(None, id_node= bytes(bytearray([int.from_bytes(b'\xff', byteorder="big") for _ in range(31)] + [0])), pub_key="10")
prlist.append(p3)
p1= Peer(None, id_node= bytes(bytearray([int.from_bytes(b'\xff', byteorder="big") for _ in range(31)] + [0])), pub_key="10")
prlist.append(p1)
pl = NetworkStub()
k1 = KademliaDiscovery([self.p1], interval=2, k = 1)
k1 = KademliaDiscovery([self.p1], interval=3, k = 1)
k1.set_lower(pl)
kls.append(k1)
n3 = NodeStub(p3, k1)
self.loop.run_until_complete(n3.listen())
n3 = NodeStub(p1, k1)
await n3.listen()

p2 = Peer(None, id_node= bytes(bytearray([int.from_bytes(b'\x00', byteorder="big") for _ in range(32)])), pub_key="00")
prlist.append(p2)
pl = NetworkStub()
k2 = KademliaDiscovery([self.p1], interval=2)
k2 = KademliaDiscovery([self.p1], interval=3)
k2.set_lower(pl)
kls.append(k2)
n2 = NodeStub(p2, k2)
self.loop.run_until_complete(n2.listen())
self.loop.run_until_complete(asyncio.sleep(5))
await n2.listen()
await asyncio.sleep(5)

self.loop.run_until_complete(k2.find_peer(bytes(p3.id_node)))
self.assertEqual(p3.id_node, k2.get_peer(bytes(p3.id_node)).id_node)
await k2.find_peer(bytes(p1.id_node))
self.assertEqual(p1.id_node, k2.get_peer(bytes(p1.id_node)).id_node)


p3 = Peer(None, id_node= bytes(bytearray([int.from_bytes(b'\xff', byteorder="big") for _ in range(32)])), pub_key="1")
prlist.append(p3)
pl = NetworkStub()
k3 = KademliaDiscovery([self.p1], interval=2)
k3 = KademliaDiscovery([self.p1], interval=3)
k3.set_lower(pl)
kls.append(k3)
n2 = NodeStub(p3, k3)
self.loop.run_until_complete(n2.listen())
self.loop.run_until_complete(asyncio.sleep(5))
await n2.listen()
await asyncio.sleep(5)

self.loop.run_until_complete(k1.find_peer(bytes(p3.id_node)))
await k1.find_peer(bytes(p3.id_node))

self.assertEqual(p3.id_node, k1.get_peer(bytes(p3.id_node)).id_node)

self.loop.run_until_complete(k3.find_peer(bytes(p2.id_node)))
await k3.find_peer(bytes(p2.id_node))

self.assertEqual(p2.id_node, k3.get_peer(bytes(p2.id_node)).id_node)
n3.set_listen(False)




self.n1.set_listen(False)
print("lookin...")
self.loop.run_until_complete(asyncio.sleep(10))
self.loop.run_until_complete(k1.find_peer(bytes(p2.id_node)))
await asyncio.sleep(5)
print("digging in...", p2.id_node)
await k1.find_peer(bytes(p2.id_node))

self.assertEqual(p2.id_node, k1.get_peer(bytes(p2.id_node)).id_node)
n3.set_listen(True)
self.n1.set_listen(True)
# self.loop.run_until_complete(asyncio.sleep(3))
# n3.set_listen(False)
# for k in kls:
Expand Down

0 comments on commit 812997f

Please sign in to comment.