Skip to content

Commit

Permalink
Add possibility to acquire bulk readings in JSON format
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jan 21, 2021
1 parent 0247da5 commit 9f1eb9c
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ in progress
- Fix logging when running under pytest with "--capture=no"
- Address compatibility with Grafana 7.4.0
- Fix error logging after migration to Python 3
- Add possibility to acquire bulk readings in JSON format


.. _kotori-0.25.0:
Expand Down
13 changes: 12 additions & 1 deletion kotori/daq/graphing/grafana/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,19 @@ def collect_fields(data, prefixes=None, sorted=True):
# time is from intercom.mqtt
blacklist = ['_hex_', 'time']

# Compute list of unique attribute names.
if isinstance(data, dict):
keys = data.keys()
elif isinstance(data, list):
keys = set()
for item in data:
for key in item.keys():
keys.add(key)
else:
raise ValueError(f"Type of data {type(data)} not accepted")

fields = []
for field in data.keys():
for field in keys:
if field in blacklist:
continue

Expand Down
9 changes: 9 additions & 0 deletions kotori/daq/storage/influx.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ def is_udp_database(self, name):
return False

def write(self, meta, data):
if isinstance(data, dict):
self.write_single(meta, data)
elif isinstance(data, list):
for item in data:
self.write_single(meta, item)
else:
raise ValueError(f"Type of data {type(data)} not accepted")

def write_single(self, meta, data):

meta_copy = deepcopy(dict(meta))
data_copy = deepcopy(data)
Expand Down
54 changes: 53 additions & 1 deletion test/test_daq_grafana.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

@pytest_twisted.inlineCallbacks
@pytest.mark.grafana
def test_mqtt_to_grafana(machinery, create_influxdb, reset_influxdb, reset_grafana):
def test_mqtt_to_grafana_single(machinery, create_influxdb, reset_influxdb, reset_grafana):
"""
Publish single reading in JSON format to MQTT broker and proof
that a corresponding datasource and a dashboard was created in Grafana.
Expand Down Expand Up @@ -44,3 +44,55 @@ def test_mqtt_to_grafana(machinery, create_influxdb, reset_influxdb, reset_grafa
target = dashboard['dashboard']['rows'][0]['panels'][0]['targets'][0]
assert target['measurement'] == settings.influx_measurement_sensors
assert 'temperature' in target['query'] or 'humidity' in target['query']


@pytest_twisted.inlineCallbacks
@pytest.mark.grafana
def test_mqtt_to_grafana_bulk(machinery, create_influxdb, reset_influxdb, reset_grafana):
"""
Publish multiple readings in JSON format to MQTT broker and proof
that a corresponding datasource and a dashboard was created in Grafana.
"""

# Submit multiple measurements, without timestamp.
data = [
{
'temperature': 21.42,
'humidity': 41.55,
},
{
'temperature': 42.84,
'humidity': 83.1,
'voltage': 4.2,
},
{
'weight': 10.10,
},
]
yield mqtt_json_sensor(settings.mqtt_topic_json, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY)
yield sleep(PROCESS_DELAY)
yield sleep(PROCESS_DELAY)

# Proof that Grafana is well provisioned.
logger.info('Grafana: Checking datasource')
datasource_names = []
for datasource in grafana.client.datasources.get():
datasource_names.append(datasource['name'])
assert settings.influx_database in datasource_names

logger.info('Grafana: Checking dashboard')
dashboard_name = settings.grafana_dashboards[0]
dashboard = grafana.client.dashboards.db[dashboard_name].get()
targets = dashboard['dashboard']['rows'][0]['panels'][0]['targets']

# Validate table name.
assert targets[0]['measurement'] == settings.influx_measurement_sensors

# Validate field names.
fields = set()
for target in targets:
fields.add(target["fields"][0]["name"])
assert fields == set(["temperature", "humidity", "weight", "voltage"])
34 changes: 34 additions & 0 deletions test/test_daq_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,40 @@ def test_mqtt_to_influxdb_json_single(machinery, create_influxdb, reset_influxdb
yield record


@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
def test_mqtt_to_influxdb_json_bulk(machinery, create_influxdb, reset_influxdb):
"""
Publish multiple readings in JSON format to MQTT broker
and proof it is stored in the InfluxDB database.
"""

# Submit multiple measurements, without timestamp.
data = [
{
'temperature': 21.42,
'humidity': 41.55,
},
{
'temperature': 42.84,
'humidity': 83.1,
},
]
yield threads.deferToThread(mqtt_json_sensor, settings.mqtt_topic_json, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY)

# Proof that data arrived in InfluxDB.
record = influx_sensors.get_record(index=0)
del record['time']
assert record == {u'temperature': 21.42, u'humidity': 41.55}

record = influx_sensors.get_record(index=1)
del record['time']
assert record == {u'temperature': 42.84, u'humidity': 83.1}


@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
@pytest.mark.legacy
Expand Down

0 comments on commit 9f1eb9c

Please sign in to comment.