Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Acquire bulk readings in JSON format #40

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ Changelog
*********


incomplete
==========
- Add possibility to acquire bulk readings in JSON format


in progress
===========

Expand Down
13 changes: 12 additions & 1 deletion kotori/daq/graphing/grafana/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,19 @@ def use_field(field_name: str):
# time is from intercom.mqtt
blacklist = ['_hex_', 'time']

# Compute list of unique attribute names.
if isinstance(data, dict):
keys = data.keys()
elif isinstance(data, list):
keys = set()
for item in data:
for key in item.keys():
keys.add(key)
else:
raise ValueError(f"Type of data {type(data)} not accepted")

fields = []
for field in data.keys():
for field in keys:
if field in blacklist:
continue

Expand Down
9 changes: 9 additions & 0 deletions kotori/daq/storage/influx.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ def is_udp_database(self, name):
return False

def write(self, meta, data):
if isinstance(data, dict):
self.write_single(meta, data)
elif isinstance(data, list):
for item in data:
self.write_single(meta, item)
else:
raise ValueError(f"Type of data {type(data)} not accepted")

def write_single(self, meta, data):

meta_copy = deepcopy(dict(meta))
data_copy = deepcopy(data)
Expand Down
52 changes: 52 additions & 0 deletions test/test_daq_grafana.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,55 @@ def test_mqtt_to_grafana_two_dashboards(machinery, create_influxdb, reset_influx
titles = grafana.get_dashboard_titles()
assert settings.grafana_dashboards[0] in titles
assert settings.grafana_dashboards[1] in titles


@pytest_twisted.inlineCallbacks
@pytest.mark.grafana
def test_mqtt_to_grafana_bulk(machinery, create_influxdb, reset_influxdb, reset_grafana):
"""
Publish multiple readings in JSON format to MQTT broker and proof
that a corresponding datasource and a dashboard was created in Grafana.
"""

# Submit multiple measurements, without timestamp.
data = [
{
'temperature': 21.42,
'humidity': 41.55,
},
{
'temperature': 42.84,
'humidity': 83.1,
'voltage': 4.2,
},
{
'weight': 10.10,
},
]
yield mqtt_json_sensor(settings.mqtt_topic_json, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_MQTT)
yield sleep(PROCESS_DELAY_MQTT)
yield sleep(PROCESS_DELAY_MQTT)

# Proof that Grafana is well provisioned.
logger.info('Grafana: Checking datasource')
datasource_names = []
for datasource in grafana.client.datasources.get():
datasource_names.append(datasource['name'])
assert settings.influx_database in datasource_names

logger.info('Grafana: Checking dashboard')
dashboard_name = settings.grafana_dashboards[0]
dashboard = grafana.get_dashboard_by_name(dashboard_name)
targets = dashboard['rows'][0]['panels'][0]['targets']

# Validate table name.
assert targets[0]['measurement'] == settings.influx_measurement_sensors

# Validate field names.
fields = set()
for target in targets:
fields.add(target["fields"][0]["name"])
assert fields == set(["temperature", "humidity", "weight", "voltage"])
34 changes: 34 additions & 0 deletions test/test_daq_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,40 @@ def test_mqtt_to_influxdb_json_single(machinery, create_influxdb, reset_influxdb
yield record


@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
def test_mqtt_to_influxdb_json_bulk(machinery, create_influxdb, reset_influxdb):
"""
Publish multiple readings in JSON format to MQTT broker
and proof it is stored in the InfluxDB database.
"""

# Submit multiple measurements, without timestamp.
data = [
{
'temperature': 21.42,
'humidity': 41.55,
},
{
'temperature': 42.84,
'humidity': 83.1,
},
]
yield threads.deferToThread(mqtt_json_sensor, settings.mqtt_topic_json, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_MQTT)

# Proof that data arrived in InfluxDB.
record = influx_sensors.get_record(index=0)
del record['time']
assert record == {u'temperature': 21.42, u'humidity': 41.55}

record = influx_sensors.get_record(index=1)
del record['time']
assert record == {u'temperature': 42.84, u'humidity': 83.1}


@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
@pytest.mark.legacy
Expand Down