Skip to content

Commit

Permalink
fixing training protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
NikolayBlagoev committed Mar 6, 2024
1 parent e5929d2 commit 309fc9e
Show file tree
Hide file tree
Showing 15 changed files with 1,034 additions and 130 deletions.
1 change: 1 addition & 0 deletions deccom/nodes/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def __init__(self, protocol: AbstractProtocol, ip_addr = "0.0.0.0", port = None
self.peers: dict[bytes,tuple[str,int]] = dict()
print(f"Node listening on {ip_addr}:{port}")
self.protocol_type = protocol
protocol.callback = call_back
pass

async def listen(self):
Expand Down
6 changes: 3 additions & 3 deletions deccom/protocols/abstractprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,16 @@ def __init__(self, submodule = None, callback: Callable[[tuple[str, int], bytes]


@bindfrom("callback")
def process_datagram(self,addr:tuple[str,int],data:bytes):
def process_datagram(self, addr:tuple[str,int],data:bytes):
self.callback(addr,data)
return

@bindto("sendto")
async def _lower_sendto(msg:bytes, addr:tuple[str,int]):
async def _lower_sendto(self, msg:bytes, addr:tuple[str,int]):
return

@bindto("start")
async def _lower_start():
async def _lower_start(self):
return

async def start(self):
Expand Down
16 changes: 10 additions & 6 deletions deccom/protocols/defaultprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@ def set_callback(self, callback):
def datagram_received(self, data, addr):

# print("from:", addr, "data", data)
loop = asyncio.get_event_loop()
if len(data) < 2:
print("invalid msg received")
return
if data[0] == DefaultProtocol.PING:
self.handle_ping(addr, data[1:])
loop.create_task(self.handle_ping(addr, data[1:]))
elif data[0] == DefaultProtocol.PONG:
self.handle_pong(addr,data[1:])
loop.create_task(self.handle_pong(addr,data[1:]))
else:
self.callback(addr,data[1:])
loop.create_task(self.call_callback(addr,data[1:]))
async def call_callback(self, addr,data):
self.callback(addr,data)

async def start(self):
return
def timeout(self, addr, error, msg_id):
Expand All @@ -58,14 +62,14 @@ async def send_ping(self, addr, success, error, dt = 10):

return

def handle_ping(self, addr, data):
async def handle_ping(self, addr, data):
trmp = bytearray([DefaultProtocol.PONG])
trmp = trmp + data
self.transport.sendto(trmp, addr=addr)
# print("sent pong",addr)
return

def handle_pong(self, addr, data):
async def handle_pong(self, addr, data):
msg_id = int.from_bytes(data, "big")
# print("received pong",addr )
if self.pings.get(msg_id) is None:
Expand All @@ -82,7 +86,7 @@ def connection_lost(self, exc: Exception) -> None:
return super().connection_lost(exc)

async def sendto(self,msg,addr):
print("sending to",addr)
# print("sending to",addr)
trmp = bytearray(b'\x01')
trmp = trmp + msg
self.transport.sendto(trmp, addr=addr)
Expand Down
49 changes: 45 additions & 4 deletions deccom/protocols/peerdiscovery/fixedpeers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@


class FixedPeers(AbstractPeerDiscovery):
EMPTY = int.from_bytes(b'\x00', byteorder="big")
offers = dict(AbstractPeerDiscovery.offers, **{
"sendto_id": "sendto_id",
"broadcast": "broadcast",
Expand All @@ -18,19 +19,48 @@ def __init__(self, peer_list: list[Peer], bootstrap_peers: list[Peer] = [], inte
super().__init__(bootstrap_peers, interval, submodule, callback, disconnected_callback, connected_callback)
self.p_to_a: dict[bytes,tuple[str,int]] = dict()
self.a_to_p: dict[tuple[str,int],Peer] = dict()
self.peer_crawls = dict()
self.sent_finds = dict()
for p in peer_list:
self.p_to_a[p.id_node] = p.addr
self.a_to_p[p.addr] = p
self.peers[p.id_node] = p

self.introduced = []
async def start(self):
await super().start()
loop = asyncio.get_running_loop()
loop.call_later(2, self.introduce_to_others)
def introduce_to_others(self):
loop = asyncio.get_running_loop()
for a in self.a_to_p:
# self.introduced.append(a)
loop.create_task(self.introduction(a))
async def introduction(self, addr):
msg = bytearray([1])
print("introducing to ",addr)
await self._lower_sendto(msg, addr)
def process_datagram(self, addr: tuple[str, int], data: bytes):
print("ey yo",addr)
if self.a_to_p.get(addr) == None:
return
super().process_datagram(addr, data)
if not addr in self.introduced:
print("new peer MET!",addr)
self.introduced.append(addr)

if self.peer_crawls.get(self.a_to_p.get(addr)) != None:
self.peer_crawls.get(self.a_to_p.get(addr)).set_result(True)
else:
self.connected_callback(self.a_to_p[addr])
if data[0] == FixedPeers.EMPTY:
super().process_datagram(addr, data[1:])
async def sendto(self, msg, addr):
if self.a_to_p.get(addr) == None:
print("dont know this peer?")

return
await super().sendto(msg, addr)
tmp = bytearray([FixedPeers.EMPTY])
tmp += msg
await super().sendto(tmp, addr)

async def sendto_id(self, msg, p: bytes):
if self.p_to_a.get(p) == None:
Expand All @@ -41,4 +71,15 @@ async def broadcast(self, msg):
await self.sendto(msg,addr)
def get_al(self, addr: tuple[str, int]) -> Union[Peer, None]:
return self.a_to_p.get(addr)

async def find_peer(self, id: bytes) -> Peer:
if self.peers.get(id) == None:
if self.peer_crawls.get(id) == None:
loop = asyncio.get_running_loop()
fut = loop.create_future()
self.peer_crawls[id] = fut

await fut
else:
await self.peer_crawls.get(id)
return self.get_peer(id)

9 changes: 8 additions & 1 deletion deccom/protocols/peerdiscovery/kademliadiscovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async def start(self):
msg = bytearray([KademliaDiscovery.ASK_FOR_ID])
await self._lower_sendto(msg,p.addr)
loop = asyncio.get_event_loop()
loop.call_later(2, self.refresh_table)
loop.call_later(self.interval+2, self.refresh_table)

def refresh_table(self):

Expand All @@ -46,6 +46,13 @@ def refresh_table(self):
async def _refresh_table(self):
print("refreshing")
loop = asyncio.get_running_loop()
if len(self.bucket_manager.buckets) == 1 and len(self.bucket_manager.buckets[0].peers) == 0:
for p in self.bootstrap_peers:
await self.introduce_to_peer(p)
msg = bytearray([KademliaDiscovery.ASK_FOR_ID])
await self._lower_sendto(msg,p.addr)
self.refresh_loop = loop.call_later(self.interval+2, self.refresh_table)
return
rand_ids = [Peer.me.id_node]
unique_id = os.urandom(8)
while self.searches.get(unique_id) != None:
Expand Down
2 changes: 1 addition & 1 deletion deccom/protocols/streamprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async def handle_connection(self, reader: asyncio.StreamReader,writer: asyncio.S
return

@bindto("get_peer")
def _lower_get_peer(self, id) -> Union[Peer,None]:
def get_peer(self, id) -> Union[Peer,None]:
return None

@bindfrom("connected_callback")
Expand Down
72 changes: 72 additions & 0 deletions flow_run1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import asyncio
from deccom.peers.peer import Peer
from deccom.protocols.peerdiscovery import FixedPeers
from deccom.protocols.defaultprotocol import DefaultProtocol
from flowprotocol import FlowProtocol
from deccom.nodes import Node



def costmap(a1, a2):
matrix = {
"0": {
"1": 1,
"2": 1,
"3": 10,
"4": 10,
"5": 10,
"6": 10
},
"1": {
"0": 1,
"2": 10,
"3": 1,
"4": 3,
"5": 10,
"6": 10
},
"2": {
"1": 10,
"0": 1,
"3": 2,
"4": 1,
"5": 10,
"6": 10
},
"3": {
"1": 1,
"2": 2,
"0": 10,
"4": 10,
"5": 10,
"6": 1
},
"4": {
"1": 3,
"2": 1,
"3": 10,
"0": 10,
"5": 10,
"6": 1
},
"6": {
"1": 10,
"2": 10,
"3": 1,
"4": 1,
"5": 10,
"0": 10
},
}
return matrix[a1][a2]
peers = [Peer(("127.0.0.1", 10020), "0"), Peer(("127.0.0.1", 10023), "3"), Peer(("127.0.0.1", 10024), "4"), Peer(("127.0.0.1", 10022), "2")]
Peer.me = Peer(("127.0.0.1", 10021), "1")
lowest = DefaultProtocol()
prs = FixedPeers(peers)
prs.set_lower(lowest)
fp = FlowProtocol(1, 3, 1, 0, 0, 0, costmap)
fp.set_lower(prs)
me = Node(fp,"127.0.0.1",10021, print)
loop = asyncio.new_event_loop()
loop.run_until_complete(me.listen())
loop.run_forever()
69 changes: 69 additions & 0 deletions flow_run2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import asyncio
from deccom.peers.peer import Peer
from deccom.protocols.peerdiscovery import FixedPeers
from deccom.protocols.defaultprotocol import DefaultProtocol
from flowprotocol import FlowProtocol
from deccom.nodes import Node
def costmap(a1, a2):
matrix = {
"0": {
"1": 1,
"2": 1,
"3": 10,
"4": 10,
"5": 10,
"6": 10
},
"1": {
"0": 1,
"2": 10,
"3": 1,
"4": 3,
"5": 10,
"6": 10
},
"2": {
"1": 10,
"0": 1,
"3": 2,
"4": 1,
"5": 10,
"6": 10
},
"3": {
"1": 1,
"2": 2,
"0": 10,
"4": 10,
"5": 10,
"6": 1
},
"4": {
"1": 3,
"2": 1,
"3": 10,
"0": 10,
"5": 10,
"6": 1
},
"6": {
"1": 10,
"2": 10,
"3": 1,
"4": 1,
"5": 10,
"0": 10
},
}
return matrix[a1][a2]
peers = [Peer(("127.0.0.1", 10021), "1"), Peer(("127.0.0.1", 10022), "2")]
Peer.me = Peer(("127.0.0.1", 10020), "0")
lowest = DefaultProtocol()
prs = FixedPeers(peers)
prs.set_lower(lowest)
fp = FlowProtocol(0, 3, 4, 0, 0, 4, costmap)
fp.set_lower(prs)
me = Node(fp,"127.0.0.1",10020, print)
loop = asyncio.new_event_loop()
loop.run_until_complete(me.listen())
loop.run_forever()
Loading

0 comments on commit 309fc9e

Please sign in to comment.