diff --git a/CHANGES.rst b/CHANGES.rst index ca31c89..aaf9590 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -3,6 +3,11 @@ Changelog ********* +incomplete +========== +- Add possibility to acquire bulk readings in JSON format + + in progress =========== diff --git a/kotori/daq/graphing/grafana/dashboard.py b/kotori/daq/graphing/grafana/dashboard.py index 1df9702..875c9b2 100644 --- a/kotori/daq/graphing/grafana/dashboard.py +++ b/kotori/daq/graphing/grafana/dashboard.py @@ -281,8 +281,19 @@ def use_field(field_name: str): # time is from intercom.mqtt blacklist = ['_hex_', 'time'] + # Compute list of unique attribute names. + if isinstance(data, dict): + keys = data.keys() + elif isinstance(data, list): + keys = set() + for item in data: + for key in item.keys(): + keys.add(key) + else: + raise ValueError(f"Type of data {type(data)} not accepted") + fields = [] - for field in data.keys(): + for field in keys: if field in blacklist: continue diff --git a/kotori/daq/storage/influx.py b/kotori/daq/storage/influx.py index 67fa426..13f6293 100644 --- a/kotori/daq/storage/influx.py +++ b/kotori/daq/storage/influx.py @@ -66,6 +66,15 @@ def is_udp_database(self, name): return False def write(self, meta, data): + if isinstance(data, dict): + self.write_single(meta, data) + elif isinstance(data, list): + for item in data: + self.write_single(meta, item) + else: + raise ValueError(f"Type of data {type(data)} not accepted") + + def write_single(self, meta, data): meta_copy = deepcopy(dict(meta)) data_copy = deepcopy(data) diff --git a/test/test_daq_grafana.py b/test/test_daq_grafana.py index 86a919d..28e1c52 100644 --- a/test/test_daq_grafana.py +++ b/test/test_daq_grafana.py @@ -171,3 +171,55 @@ def test_mqtt_to_grafana_two_dashboards(machinery, create_influxdb, reset_influx titles = grafana.get_dashboard_titles() assert settings.grafana_dashboards[0] in titles assert settings.grafana_dashboards[1] in titles + + +@pytest_twisted.inlineCallbacks +@pytest.mark.grafana +def test_mqtt_to_grafana_bulk(machinery, create_influxdb, reset_influxdb, reset_grafana): + """ + Publish multiple readings in JSON format to MQTT broker and proof + that a corresponding datasource and a dashboard was created in Grafana. + """ + + # Submit multiple measurements, without timestamp. + data = [ + { + 'temperature': 21.42, + 'humidity': 41.55, + }, + { + 'temperature': 42.84, + 'humidity': 83.1, + 'voltage': 4.2, + }, + { + 'weight': 10.10, + }, + ] + yield mqtt_json_sensor(settings.mqtt_topic_json, data) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + yield sleep(PROCESS_DELAY_MQTT) + yield sleep(PROCESS_DELAY_MQTT) + + # Proof that Grafana is well provisioned. + logger.info('Grafana: Checking datasource') + datasource_names = [] + for datasource in grafana.client.datasources.get(): + datasource_names.append(datasource['name']) + assert settings.influx_database in datasource_names + + logger.info('Grafana: Checking dashboard') + dashboard_name = settings.grafana_dashboards[0] + dashboard = grafana.get_dashboard_by_name(dashboard_name) + targets = dashboard['rows'][0]['panels'][0]['targets'] + + # Validate table name. + assert targets[0]['measurement'] == settings.influx_measurement_sensors + + # Validate field names. + fields = set() + for target in targets: + fields.add(target["fields"][0]["name"]) + assert fields == set(["temperature", "humidity", "weight", "voltage"]) diff --git a/test/test_daq_mqtt.py b/test/test_daq_mqtt.py index a92b970..3ca2368 100644 --- a/test/test_daq_mqtt.py +++ b/test/test_daq_mqtt.py @@ -37,6 +37,40 @@ def test_mqtt_to_influxdb_json_single(machinery, create_influxdb, reset_influxdb yield record +@pytest_twisted.inlineCallbacks +@pytest.mark.mqtt +def test_mqtt_to_influxdb_json_bulk(machinery, create_influxdb, reset_influxdb): + """ + Publish multiple readings in JSON format to MQTT broker + and proof it is stored in the InfluxDB database. + """ + + # Submit multiple measurements, without timestamp. + data = [ + { + 'temperature': 21.42, + 'humidity': 41.55, + }, + { + 'temperature': 42.84, + 'humidity': 83.1, + }, + ] + yield threads.deferToThread(mqtt_json_sensor, settings.mqtt_topic_json, data) + + # Wait for some time to process the message. + yield sleep(PROCESS_DELAY_MQTT) + + # Proof that data arrived in InfluxDB. + record = influx_sensors.get_record(index=0) + del record['time'] + assert record == {u'temperature': 21.42, u'humidity': 41.55} + + record = influx_sensors.get_record(index=1) + del record['time'] + assert record == {u'temperature': 42.84, u'humidity': 83.1} + + @pytest_twisted.inlineCallbacks @pytest.mark.mqtt @pytest.mark.legacy