-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtub_api.py
198 lines (160 loc) · 6.02 KB
/
tub_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import logging
import asyncio
import sys
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from pydantic import BaseModel
from tub_control import cs
# Logging configuration
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Adjust this to specify allowed origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class TemperatureSetting(BaseModel):
temperature: float
class DeviceState(BaseModel):
state: str
class ModeSetting(BaseModel):
mode: str
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
self.broadcast_task = None
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
if not self.broadcast_task:
self.broadcast_task = asyncio.create_task(self.broadcast_loop())
async def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
if not self.active_connections and self.broadcast_task:
self.broadcast_task.cancel()
self.broadcast_task = None
async def broadcast_loop(self):
while True:
await asyncio.sleep(1)
state = cs.get_state()
await self.broadcast(state)
async def send_personal_message(self, message: dict, websocket: WebSocket):
try:
await websocket.send_json(message)
except RuntimeError as e:
logger.error(f"Failed to send message: {e}")
await self.disconnect(websocket)
async def broadcast(self, message: dict):
for connection in self.active_connections:
try:
await connection.send_json(message)
except RuntimeError as e:
logger.error(f"Failed to broadcast message: {e}")
await self.disconnect(connection)
def close_all(self):
for connection in self.active_connections:
connection.close()
self.disconnect(connection)
self.active_connections = []
manager = ConnectionManager()
@app.get("/")
async def read_root():
return FileResponse("index.html")
@app.get("/admin")
async def read_admin():
return FileResponse("admin.html")
@app.websocket("/state")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
await websocket.receive() # Keep the connection open
except WebSocketDisconnect:
await manager.disconnect(websocket)
except Exception as e:
logger.error(f"Unexpected error: {e}")
await manager.disconnect(websocket)
@app.get("/quick_state")
async def get_quick_state():
# Prepare the optimized quick state data
quick_state = {
'set_temperature': cs.set_temperature,
'water_temperature': cs.temp_water.f(),
'main_pumps': {
'pump1': cs.pump1.get_state(),
'pump2': cs.pump2.get_state(),
},
'light_state': cs.light.get_state()
}
return quick_state
@app.post("/set_temperature")
async def set_temperature(setting: TemperatureSetting):
cs.set_temperature = setting.temperature
return {"status": "success", "message": f"Set temperature updated to {setting.temperature}°F."}
@app.post("/toggle/{device}")
async def toggle_device(device: str):
if device == 'heater':
cs.heater.toggle_state()
elif device == 'circpump':
cs.circpump.toggle_state()
elif device == 'pump1':
cs.pump1.advance_state()
elif device == 'pump2':
cs.pump2.advance_state()
elif device == 'blower':
cs.blower.toggle_state()
elif device == 'light':
cs.light.toggle_state()
elif device == 'ozone':
cs.ozone.toggle_state()
elif device == 'fans':
cs.fans.toggle_state()
else:
return {"status": "error", "message": "Unknown device."}
return {"status": "success", "message": f"{device} toggled."}
@app.post("/set/{device}")
async def set_device_state(device: str, state: DeviceState):
new_state = state.state.lower()
if device == 'heater':
cs.heater.set_state(new_state == "on")
elif device == 'circpump':
cs.circpump.set_state(new_state == "on")
elif device == 'pump1':
if new_state in ["off", "low", "high"]:
cs.pump1.set_state(new_state != "off", new_state if new_state != "off" else 'low')
else:
return {"status": "error", "message": "Invalid state for pump1."}
elif device == 'pump2':
if new_state in ["off", "low", "high"]:
cs.pump2.set_state(new_state != "off", new_state if new_state != "off" else 'low')
else:
return {"status": "error", "message": "Invalid state for pump2."}
elif device == 'blower':
cs.blower.set_state(new_state == "on")
elif device == 'light':
cs.light.set_state(new_state == "on")
elif device == 'ozone': # Add ozone to the set state API
cs.ozone.set_state(new_state == "on")
else:
return {"status": "error", "message": "Unknown device."}
return {"status": "success", "message": f"{device} state set to {new_state}."}
@app.post("/set_mode")
async def set_mode(setting: ModeSetting):
mode = setting.mode.lower()
if mode not in ["automatic", "manual"]:
return {"status": "error", "message": "Invalid mode. Choose 'automatic' or 'manual'."}
cs.mode = mode
return {"status": "success", "message": f"Mode set to {cs.mode}."}
def start_api_server():
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
def handle_exit_api(*args):
logger.info("Received shutdown signal, closing WebSocket connections.")
manager.close_all()
asyncio.get_event_loop().stop()
logger.info("Cleanup complete. Exiting application.")