forked from lbryio/lbry-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection_manager.py
105 lines (89 loc) · 3.8 KB
/
connection_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import time
import asyncio
import typing
import collections
import logging
log = logging.getLogger(__name__)
CONNECTED_EVENT = "connected"
DISCONNECTED_EVENT = "disconnected"
TRANSFERRED_EVENT = "transferred"
class ConnectionManager:
def __init__(self, loop: asyncio.AbstractEventLoop):
self.loop = loop
self.incoming_connected: typing.Set[str] = set()
self.incoming: typing.DefaultDict[str, int] = collections.defaultdict(int)
self.outgoing_connected: typing.Set[str] = set()
self.outgoing: typing.DefaultDict[str, int] = collections.defaultdict(int)
self._max_incoming_mbs = 0.0
self._max_outgoing_mbs = 0.0
self._status = {}
self._running = False
self._task: typing.Optional[asyncio.Task] = None
@property
def status(self):
return self._status
def sent_data(self, host_and_port: str, size: int):
if self._running:
self.outgoing[host_and_port] += size
def received_data(self, host_and_port: str, size: int):
if self._running:
self.incoming[host_and_port] += size
def connection_made(self, host_and_port: str):
if self._running:
self.outgoing_connected.add(host_and_port)
def connection_received(self, host_and_port: str):
# self.incoming_connected.add(host_and_port)
pass
def outgoing_connection_lost(self, host_and_port: str):
if self._running and host_and_port in self.outgoing_connected:
self.outgoing_connected.remove(host_and_port)
def incoming_connection_lost(self, host_and_port: str):
if self._running and host_and_port in self.incoming_connected:
self.incoming_connected.remove(host_and_port)
async def _update(self):
self._status = {
'incoming_bps': {},
'outgoing_bps': {},
'total_incoming_mbs': 0.0,
'total_outgoing_mbs': 0.0,
'total_sent': 0,
'total_received': 0,
'max_incoming_mbs': 0.0,
'max_outgoing_mbs': 0.0
}
while True:
last = time.perf_counter()
await asyncio.sleep(0.1, loop=self.loop)
self._status['incoming_bps'].clear()
self._status['outgoing_bps'].clear()
now = time.perf_counter()
while self.outgoing:
k, sent = self.outgoing.popitem()
self._status['total_sent'] += sent
self._status['outgoing_bps'][k] = sent / (now - last)
while self.incoming:
k, received = self.incoming.popitem()
self._status['total_received'] += received
self._status['incoming_bps'][k] = received / (now - last)
self._status['total_outgoing_mbs'] = int(sum(list(self._status['outgoing_bps'].values())
)) / 1000000.0
self._status['total_incoming_mbs'] = int(sum(list(self._status['incoming_bps'].values())
)) / 1000000.0
self._max_incoming_mbs = max(self._max_incoming_mbs, self._status['total_incoming_mbs'])
self._max_outgoing_mbs = max(self._max_outgoing_mbs, self._status['total_outgoing_mbs'])
self._status['max_incoming_mbs'] = self._max_incoming_mbs
self._status['max_outgoing_mbs'] = self._max_outgoing_mbs
def stop(self):
if self._task:
self._task.cancel()
self._task = None
self.outgoing.clear()
self.outgoing_connected.clear()
self.incoming.clear()
self.incoming_connected.clear()
self._status.clear()
self._running = False
def start(self):
self.stop()
self._running = True
self._task = self.loop.create_task(self._update())