-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathasync-socket-server.py
155 lines (128 loc) · 4.77 KB
/
async-socket-server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python3.4
import sys
import config
import asyncio
import logging
from datetime import datetime, timedelta
from database import db_session
from models import RadioData
from sqlalchemy import exc
from tasks import async_to_gateway
# socker server logging
formatter = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
logger = logging.getLogger(__name__)
logger.setLevel("DEBUG")
log_handler = logging.FileHandler("asyncio-socket-server.log")
formatter = logging.Formatter(formatter)
log_handler.setFormatter(formatter)
logger.addHandler(log_handler)
class SocketServer(asyncio.Protocol):
def connection_made(self, transport):
client_name = transport.get_extra_info("peername")
print("Connection received from {}".format(client_name))
logger.info("Connection received from {}".format(client_name))
self.transport = transport
def data_received(self, data):
"""
Implement a specific decoder for each radio type
data.decode()
"""
try:
# format the radio data for database insert
reading = format_radio_data(data.decode("utf-8"))
today = datetime.now()
if isinstance(reading, dict):
try:
# create a new tx record
new_tx = RadioData(
imei=reading["imei"],
voltage=reading["voltage"],
rssi=reading["rssi"],
sensorval_1=reading["sensor1"],
sensorval_2=reading["sensor2"],
sensorval_3=reading["sensor3"],
sensorval_4=reading["sensor4"],
created_on=today,
modified_on=today,
sync=0
)
# commit to database
db_session.add(new_tx)
db_session.commit()
db_session.flush()
tx_id = new_tx.id
# async to gateway
async_to_gateway.delay(tx_id)
# log and output to console for debugging
print("RadioData: {}".format(str(reading)))
logger.info("RadioData: {}".format(str(reading)))
# catch database error
except exc.SQLAlchemyError as db_err:
logger.critical("{}".format(str(db_err)))
print("{}".format(str(db_err)))
# catch malformed radio data and log
else:
logger.warning("TX Data Malformed: {}".format(str(reading)))
print("TX Data Malformed: {}".format(str(reading)))
# catch value or type error
except (ValueError, TypeError) as err:
logger.warning("{}".format(str(err)))
print("{}".format(str(err)))
def eof_received(self):
print("EOF")
return True
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop this event loop')
def format_radio_data(data):
"""
Format the transmitted radio data into a dict
:params: data <string>
:return reading <dict>
"""
values = data.split(",")
reading = dict()
if len(values) > 5:
reading["imei"] = values[0]
reading["dummy"] = values[1]
reading["voltage"] = values[2]
reading["rssi"] = values[3]
reading["sensor1"] = values[4]
reading["sensor2"] = values[5]
reading["sensor3"] = 0
reading["sensor4"] = 0
if len(values) == 7:
reading["sensor3"] = values[6]
if len(values) == 8:
reading["sensor3"] = values[6]
reading["sensor4"] = values[7]
else:
# the tx data is of insufficient length
reading = list(values)
return reading
def main():
"""
Asyncio server main loop
:params HOST, PORT
:return socket server
"""
try:
today = datetime.now().strftime("%c")
loop = asyncio.get_event_loop()
coro = loop.create_server(SocketServer, config.RECEIVER_HOST, config.RECEIVER_PORT)
server = loop.run_until_complete(coro)
logger.info("Starting Socket Server...")
print("Starting Socket Server...")
print("Socket Server running on {}".format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
print("Socket Server Exited at {}".format(today))
sys.exit(1)
finally:
server.close()
loop.close()
except asyncio.TimeoutError as te:
logger.debug("Asyncio timed out:{}".format(str(te)))
if __name__ == "__main__":
main()