-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrouting_table.py
112 lines (90 loc) · 3.33 KB
/
routing_table.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/python3
from routing import Router
from crypto import md5_hash
from utils import Utils
class RoutingTable:
def __init__(self, table={}):
self.table = table
def __bytes__(self):
result = bytes(0)
for key, value in self.table.items():
result = result + bytearray.fromhex(key) + b'\x01' # __
return result
@staticmethod
def parse(data):
if len(data) % 17 != 0:
raise ValueError('Incorrect size of routing table! (' + str(len(data)) + ')')
result = RoutingTable()
i = 0
while i < len(data):
result.table[Utils.hex_decode(data[i : i+16])] = data[i+16]
i += 17
return result
# Updates the table for the router
# @param: Router router
# @param: tuple sender (string (md5hash), string(ipv4-address))
def update_router(self, router, sender):
update_table = []
for i in self.table.keys():
update_table.append((i, None, False, self.table[i]))
router.update(sender, update_table)
# Creates a RoutingTable table from a Router
# @param: Router router
def data_from_router(self, router):
self.table = router.table
def testcase():
rt = RoutingTable()
me = md5_hash("[email protected]")
n1 = md5_hash("node1")
n2 = md5_hash("node2")
n3 = md5_hash("node3")
n4 = md5_hash("node4")
n5 = md5_hash("node5")
r = Router(me, [(n1, "1.1.1.1"), (n2, "2.2.2.2")])
# update from neighbor
r.update((n1, "1.1.1.1"), [(n3, "3.3.3.3", False, 1), (n4, "4.4.4.4", False, 2)])
# test getting data from router object
rt.data_from_router(r)
for key, val in rt.table.items():
print(key, val)
rt_bytes = rt.__bytes__()
# update router object from routingtable
rt_test = RoutingTable()
rt_test.parse(rt_bytes)
r_test = Router(me, [(n1, "1.1.1.1"), (n2, "2.2.2.2")])
rt_test.update_router(r_test, (n5, "5.5.5.5"))
print("\n" + "Update from " + str(n5))
for key, val in r_test.table.items():
print(key, val)
# see if get_next_hop gives the right answer
assert r_test.get_next_hop(n3) == (n5, "5.5.5.5")
assert r_test.get_next_hop(n1) == (n1, "1.1.1.1")
assert r_test.get_next_hop(n5) == (n5, "5.5.5.5")
# full test
r_me = Router(me, [(n1, "1.1.1.1"), (n2, "2.2.2.2")])
r_me.update((n1, "1.1.1.1"), [(n3, "3.3.3.3", False, 1), (n4, "4.4.4.4", False, 1)])
r_test = Router(me, [(n1, "1.1.1.1"), (n2, "2.2.2.2")])
r_test_update = Router(me, [(n1, "1.1.1.1"), (n2, "2.2.2.2")])
r_n1 = Router(n1, [(n3, "3.3.3.3"), (n4, "4.4.4.4")])
rt_me = RoutingTable()
rt_me.data_from_router(r_me)
bytes_me = rt_me.__bytes__()
rt_n1 = RoutingTable()
rt_n1.data_from_router(r_n1)
bytes_n1 = rt_n1.__bytes__()
rt_test = RoutingTable()
rt_test.data_from_router(r_test)
rt_test.parse(bytes_n1)
rt_test_update = RoutingTable()
rt_test_update.parse(bytes_n1)
rt_test_update.update_router(r_test_update, (n1, "1.1.1.1"))
rt_test_update.data_from_router(r_test_update)
bytes_test_update = rt_test_update.__bytes__()
print("\n")
for key, val in rt_me.table.items():
print(key, val)
print("\n")
for key, val in rt_test_update.table.items():
print(key, val)
assert bytes_me == bytes_test_update
#testcase()