diff --git a/cookbook/sections/data-publishers.adoc b/cookbook/sections/data-publishers.adoc index 818e0a6..13683db 100644 --- a/cookbook/sections/data-publishers.adoc +++ b/cookbook/sections/data-publishers.adoc @@ -118,5 +118,187 @@ image::images/data-publishers-wcmp2-validate-response.png[WIS2 GDC online valida A response will be provided with validation results. +=== Advertise client side filters for data subscriptions in WCMP2 and WNM +A key concept of a WCMP2 record is "actionable links"; this means being able to access a dataset or data granule +without any further interactions. For real-time data, a WCMP2 record provides linkages to the WIS2 Global Broker +via the MQTT protocol. At its core, MQTT has two key components: +- topic: the topic to subscribe to +- message payload: the message provided as part of a notification to a given topic + +WIS2 defines the WIS2 Topic Hierarchy (WTH) and WIS2 Notification Message (WNM) standards which provide a standards-based +GeoJSON payload/message. + +A typical MQTT link in a WCMP2 document is defined as follows: + +.Typical WCMP2 MQTT link +[source,json] +---- +{ + "rel" : "items", + "type" : "application/geo+json", + "title": "WIS2 notification service", + "href" : "mqtts://example.org", + "channel": "cache/a/wis2/ca-eccc-msc/data/core/weather/surface-based-observations/synop" +} +---- + +Given WCMP2, WTH and WNM, a user can subscribe to topics related to data of interest for download and access. + +In some cases, a dataset may be organized in a manner which requires additional further "filtering" such that a +data consumer is only interested in a certain subset of the data granules being advertised by a given WNM. Some examples include (but are not limited to), where a data consumer may be only be interested in: + +- surface weather observations from a certain station, or +- numerical weather prediction forecast data for a certain timestep or weather parameter + +To implement this behaviour, add additional properties to both WCMP2 and WNM as follows: + +==== Example: Surface weather observations + +.Surface weather observations: WCMP2 MQTT link with additional properties +[source,json] +---- +{ + "rel" : "items", + "type" : "application/geo+json", + "title": "Real-time notifications", + "href" : "mqtts://globalbroker.meteo.fr:8883", + "channel": "cache/a/wis2/ca-eccc-msc/data/core/weather/surface-based-observations/synop", + "properties": { + "wigos_station_identifier": { + "type": "string", + "title": "WIGOS station identifier" + } + } +} +---- + +.Surface weather observations: WNM additional properties +[source,json] +---- +{ + "properties": { + "wigos_station_identifier": "0-20000-0-71628" +} +---- + +When implemented by a data producer, a data consumer can: + +- subscribe to real-time notifications to the given topic +- perform client side filtering by against all incoming WNMs with `properties.wigos_station_identifier = "0-20000-0-71628"` + +==== Example: Numerical weather prediction based forecast + +.Numerical weather prediction: WCMP2 MQTT link with additional properties +[source,json] +---- +{ + "rel" : "items", + "type" : "application/geo+json", + "title": "Real-time notifications", + "href" : "mqtts://globalbroker.meteo.fr:8883", + "channel": "origin/a/wis2/ca-eccc-msc/data/core/weather/prediction/forecast/medium-range/deterministic/global", + "properties": { + "model_run": { + "type": "string", + "title": "Model run", + "enum": [ + "00", + "12" + ], + "example": "00" + }, + "forecast_hour": { + "type": "string", + "title": "Forecast hour", + "example": "004" + } + } +} +---- + +.Numerical weather prediction: WNM additional properties +[source,json] +---- +{ + "properties": { + "model_run": "00", + "forecast_hour": "004" +} +---- + +A data producer would extend WCMP2 and WNM as follows: + +- WCMP2: add a link `properties` object for MQTT links, where each key of the link `properties` object is a https://json-schema.org/understanding-json-schema/reference/object#properties[JSON Schema property definition]. +- WNM: add additional properties (key: value pairs) in the `properties` object as desired + +When implemented by a data producer, a data consumer can: + +- subscribe to real-time notifications to the given topic +- perform client side filtering against all incoming WNMs with `properties.model_run = "00" and properties.forecast_hour = "004"` + +A sample Python script can be found below. The script connects to the Météo-France Global Broker, subscribed to weather notifications +from Environment and Climate Change Canada, Meteorological Service of Canada. The script then performs client side filtering by +evaluating (for each WNM) the `properties.wigos_station_identifier` value to match a particular station (`0-20000-0-71628`). + +.Sample Python script to perform client side filtering +[source,python] +---- +import json +from paho.mqtt import client as mqtt_client + +broker = 'globalbroker.meteo.fr' +port = 8883 +username = 'everyone' +password = 'everyone' +topic = 'cache/a/wis2/ca-eccc-msc/data/core/weather/surface-based-observations/synop' + +wsi_to_filter = '0-20000-0-71628' + + +def connect_mqtt() -> mqtt_client: + def on_connect(client, userdata, flags, reason_code, properties): + if reason_code == 0: + print(f'Connected to {broker}') + else: + print(f'Failed to connect: {reason_code}') + + def on_log(client, userdata, level, message): + print("LOG:", message) + + client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2, + client_id='s123') + client.username_pw_set(username, password) + client.on_connect = on_connect + client.on_log = on_log + client.tls_set(tls_version=2) + client.connect(broker, port) + + return client + + +def subscribe(client: mqtt_client): + def on_message(client, userdata, message): + message_dict = json.loads(message.payload.decode()) + + print('Performing client side filtering') + wsi = message_dict['properties'].get('wigos_station_identifier') + + if wsi != wsi_to_filter: + print(f'Topic: {message.topic}') + print(f'Payload: {message.payload.decode()}') + + client.subscribe(topic) + client.on_message = on_message + + +def run(): + client = connect_mqtt() + subscribe(client) + client.loop_forever() + + +if __name__ == '__main__': + run() +----