-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsailtrack-status
executable file
·105 lines (77 loc) · 2.8 KB
/
sailtrack-status
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/env python3
# `sailtrack-status` - Python script that periodically sends the status data (e.g. battery voltage, CPU load,
# CPU temperature, etc.) of the module. Needed for logging purposes.
import json
import logging
import os
import struct
import sys
from datetime import timedelta
from gpiozero import DigitalInputDevice, CPUTemperature, LoadAverage, DiskUsage
from paho.mqtt.client import Client, CallbackAPIVersion
from smbus2 import SMBus
from timeloop import Timeloop
# -------------------------- Configuration -------------------------- #
MQTT_PUBLISH_FREQ_HZ = 0.1
LOG_PRINT_FREQ_HZ = 0.1
MQTT_CLIENT_ID = "sailtrack-status"
I2C_BUS_NUM = 1
I2C_ADDR = 0x36
I2C_BATTERY_VOLTAGE_REG = 0x02
I2C_BATTERY_CAPACITY_REG = 0x04
CHARGE_PIN = 6
MQTT_JOB_INTERVAL_MS = 1000 / MQTT_PUBLISH_FREQ_HZ
LOG_JOB_INTERVAL_MS = 1000 / LOG_PRINT_FREQ_HZ
# ------------------------------------------------------------------- #
published_messages = 0
def on_publish_callback(client, userdata, mid):
global published_messages
published_messages += 1
def get_battery_voltage():
regval = bus.read_word_data(I2C_ADDR, I2C_BATTERY_VOLTAGE_REG)
regval = struct.unpack("<H", struct.pack(">H", regval))[0]
return regval * 1.25 / 1000 / 16
def get_battery_capacity():
regval = bus.read_word_data(I2C_ADDR, I2C_BATTERY_CAPACITY_REG)
regval = struct.unpack("<H", struct.pack(">H", regval))[0]
return regval / 256
mqtt = Client(CallbackAPIVersion.VERSION1, MQTT_CLIENT_ID)
mqtt.username_pw_set("mosquitto", os.environ["SAILTRACK_GLOBAL_PASSWORD"])
mqtt.on_publish = on_publish_callback
mqtt.connect("localhost")
mqtt.loop_start()
tl = Timeloop()
formatter = logging.Formatter("[%(levelname)s] %(message)s")
logging.getLogger("timeloop").handlers[0].setFormatter(formatter)
logger = logging.getLogger(MQTT_CLIENT_ID)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
bus = SMBus(I2C_BUS_NUM)
battery_charging = DigitalInputDevice(CHARGE_PIN)
cpu_temperature = CPUTemperature()
cpu_load = LoadAverage()
disk_load = DiskUsage()
@tl.job(interval=timedelta(milliseconds=MQTT_JOB_INTERVAL_MS))
def mqtt_job():
sys.stdout = open(os.devnull, 'w')
mqtt.publish("status/core", json.dumps({
"battery": {
"voltage": get_battery_voltage(),
"capacity": get_battery_capacity(),
"charging": not battery_charging.value
},
"cpu": {
"temperature": cpu_temperature.temperature,
"load": cpu_load.load_average
},
"disk": {
"load": disk_load.value
}
}))
sys.stdout = sys.__stdout__
@tl.job(interval=timedelta(milliseconds=LOG_JOB_INTERVAL_MS))
def log_job():
logger.info(f"Published messages: {published_messages}")
tl.start(block=True)