Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add client side filtering recipe #6

Merged
merged 2 commits into from
Nov 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions cookbook/sections/data-publishers.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -118,5 +118,187 @@ image::images/data-publishers-wcmp2-validate-response.png[WIS2 GDC online valida

A response will be provided with validation results.

=== Advertise client side filters for data subscriptions in WCMP2 and WNM

A key concept of a WCMP2 record is "actionable links"; this means being able to access a dataset or data granule
without any further interactions. For real-time data, a WCMP2 record provides linkages to the WIS2 Global Broker
via the MQTT protocol. At its core, MQTT has two key components:

- topic: the topic to subscribe to
- message payload: the message provided as part of a notification to a given topic

WIS2 defines the WIS2 Topic Hierarchy (WTH) and WIS2 Notification Message (WNM) standards which provide a standards-based
GeoJSON payload/message.

A typical MQTT link in a WCMP2 document is defined as follows:

.Typical WCMP2 MQTT link
[source,json]
----
{
"rel" : "items",
"type" : "application/geo+json",
"title": "WIS2 notification service",
"href" : "mqtts://example.org",
"channel": "cache/a/wis2/ca-eccc-msc/data/core/weather/surface-based-observations/synop"
}
----

Given WCMP2, WTH and WNM, a user can subscribe to topics related to data of interest for download and access.

In some cases, a dataset may be organized in a manner which requires additional further "filtering" such that a
data consumer is only interested in a certain subset of the data granules being advertised by a given WNM. Some examples include (but are not limited to), where a data consumer may be only be interested in:

- surface weather observations from a certain station, or
- numerical weather prediction forecast data for a certain timestep or weather parameter

To implement this behaviour, add additional properties to both WCMP2 and WNM as follows:

==== Example: Surface weather observations

.Surface weather observations: WCMP2 MQTT link with additional properties
[source,json]
----
{
"rel" : "items",
"type" : "application/geo+json",
"title": "Real-time notifications",
"href" : "mqtts://globalbroker.meteo.fr:8883",
"channel": "cache/a/wis2/ca-eccc-msc/data/core/weather/surface-based-observations/synop",
"properties": {
"wigos_station_identifier": {
"type": "string",
"title": "WIGOS station identifier"
}
}
}
----

.Surface weather observations: WNM additional properties
[source,json]
----
{
"properties": {
"wigos_station_identifier": "0-20000-0-71628"
}
----

When implemented by a data producer, a data consumer can:

- subscribe to real-time notifications to the given topic
- perform client side filtering by against all incoming WNMs with `properties.wigos_station_identifier = "0-20000-0-71628"`

==== Example: Numerical weather prediction based forecast

.Numerical weather prediction: WCMP2 MQTT link with additional properties
[source,json]
----
{
"rel" : "items",
"type" : "application/geo+json",
"title": "Real-time notifications",
"href" : "mqtts://globalbroker.meteo.fr:8883",
"channel": "origin/a/wis2/ca-eccc-msc/data/core/weather/prediction/forecast/medium-range/deterministic/global",
"properties": {
"model_run": {
"type": "string",
"title": "Model run",
"enum": [
"00",
"12"
],
"example": "00"
},
"forecast_hour": {
"type": "string",
"title": "Forecast hour",
"example": "004"
}
}
}
----

.Numerical weather prediction: WNM additional properties
[source,json]
----
{
"properties": {
"model_run": "00",
"forecast_hour": "004"
}
----

A data producer would extend WCMP2 and WNM as follows:

- WCMP2: add a link `properties` object for MQTT links, where each key of the link `properties` object is a https://json-schema.org/understanding-json-schema/reference/object#properties[JSON Schema property definition].
- WNM: add additional properties (key: value pairs) in the `properties` object as desired

When implemented by a data producer, a data consumer can:

- subscribe to real-time notifications to the given topic
- perform client side filtering against all incoming WNMs with `properties.model_run = "00" and properties.forecast_hour = "004"`

A sample Python script can be found below. The script connects to the Météo-France Global Broker, subscribed to weather notifications
from Environment and Climate Change Canada, Meteorological Service of Canada. The script then performs client side filtering by
evaluating (for each WNM) the `properties.wigos_station_identifier` value to match a particular station (`0-20000-0-71628`).

.Sample Python script to perform client side filtering
[source,python]
----
import json
from paho.mqtt import client as mqtt_client

broker = 'globalbroker.meteo.fr'
port = 8883
username = 'everyone'
password = 'everyone'
topic = 'cache/a/wis2/ca-eccc-msc/data/core/weather/surface-based-observations/synop'

wsi_to_filter = '0-20000-0-71628'


def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
print(f'Connected to {broker}')
else:
print(f'Failed to connect: {reason_code}')

def on_log(client, userdata, level, message):
print("LOG:", message)

client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2,
client_id='s123')
client.username_pw_set(username, password)
client.on_connect = on_connect
client.on_log = on_log
client.tls_set(tls_version=2)
client.connect(broker, port)

return client


def subscribe(client: mqtt_client):
def on_message(client, userdata, message):
message_dict = json.loads(message.payload.decode())

print('Performing client side filtering')
wsi = message_dict['properties'].get('wigos_station_identifier')

if wsi != wsi_to_filter:
print(f'Topic: {message.topic}')
print(f'Payload: {message.payload.decode()}')

client.subscribe(topic)
client.on_message = on_message


def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()


if __name__ == '__main__':
run()
----
Loading