Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add script to post alerts to CoMapeo Cloud API #60

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions f/connectors/comapeo/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# CoMapeo: Fetch Observations
# `comapeo_observations`: Fetch Observations from CoMapeo API

This script fetches data from the REST API of a [CoMapeo archive server](https://github.com/digidem/comapeo-core/tree/server/src/server), which stores data from multiple CoMapeo projects. Each project contains observation data and attachments.

For each project, the observations data is stored in a table prefixed by `table_prefix`. For example, with a `table_prefix` of "comapeo" and a `name` of "My Mapeo Project", this script will create a Postgres table named `comapeo_my_mapeo_project`. Attachment files (e.g. photos and audio) will be stored in the following directory schema: `{attachment_root}/comapeo/my_mapeo_project/attachments/...`

## Endpoints

The request header must include an access token in the format: Authorized: Bearer <token>.
The request header must include an access token in the format:

Authorized: Bearer <token>.

### `GET /projects`

Expand Down Expand Up @@ -41,4 +43,46 @@ The request header must include an access token in the format: Authorized: Beare

### `GET /projects/abc123/attachments/attachment2_hash/photo/blob2_hash`

This endpoint retrieves the binary data of a specific attachment, such as a photo, associated with a project. The response will contain the raw binary content of the file, which can be saved or processed as needed.
This endpoint retrieves the binary data of a specific attachment, such as a photo, associated with a project. The response will contain the raw binary content of the file, which can be saved or processed as needed.

# `comapeo_alerts`: Post Alerts to CoMapeo API

This script fetches alerts data from a database and posts it to a CoMapeo server.

## Endpoints

The request header must include an access token in the format: Authorized: Bearer <token>.
rudokemper marked this conversation as resolved.
Show resolved Hide resolved

### `POST /projects/abc123/remoteDetectionAlerts`

```json
{
"detectionDateStart": "2024-11-03T04:20:69Z",
"detectionDateEnd": "2024-11-04T04:20:69Z",
"sourceId": "abc123",
"metadata": { "foo": "bar" },
"geometry": {
"type": "Point",
"coordinates": [12, 34]
}
}
# => HTTP 201, no response body
```

### `GET /projects/abc123/remoteDetectionAlerts`
rudokemper marked this conversation as resolved.
Show resolved Hide resolved

```json
[
{
"detectionDateStart": "2024-11-03T04:20:69Z",
"detectionDateEnd": "2024-11-04T04:20:69Z",
"sourceId": "abc123",
"metadata": { "foo": "bar" },
"geometry": {
"type": "Point",
"coordinates": [12, 34]
}
},
...
]
```
175 changes: 175 additions & 0 deletions f/connectors/comapeo/comapeo_alerts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# requirements:
# psycopg2-binary
# requests~=2.32

import logging
from typing import TypedDict

import psycopg2
import requests

# type names that refer to Windmill Resources
postgresql = dict


class comapeo_server(TypedDict):
server_url: str
access_token: str


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def conninfo(db: postgresql):
"""Convert a `postgresql` Windmill Resources to psycopg-style connection string"""
# password is optional
password_part = f" password={db['password']}" if "password" in db else ""
conn = "dbname={dbname} user={user} host={host} port={port}".format(**db)
return conn + password_part


def main(
db: postgresql,
comapeo: comapeo_server,
comapeo_project: str,
db_table_name: str = "alerts",
):
comapeo_server_url = comapeo["server_url"]
comapeo_alerts_endpoint = (
f"{comapeo_server_url}/projects/{comapeo_project}/remoteDetectionAlerts"
)

comapeo_access_token = comapeo["access_token"]
comapeo_headers = {
"Authorization": f"Bearer {comapeo_access_token}",
"Content-Type": "application/json",
}

alerts = get_alerts_from_db(conninfo(db), db_table_name)

unposted_alerts = filter_alerts(comapeo_alerts_endpoint, comapeo_headers, alerts)

post_alerts(comapeo_alerts_endpoint, comapeo_headers, unposted_alerts)


def get_alerts_from_db(db_connection_string, db_table_name: str):
"""
Retrieves alerts from a PostgreSQL database table.

Parameters
----------
db_connection_string : str
The connection string for the PostgreSQL database.
db_table_name : str
The name of the database table containing the alerts.

Returns
-------
list
A list of dictionaries, where each dictionary represents an alert row from the database table with keys for the column names.
"""
logger.info("Fetching alerts from database...")

conn = psycopg2.connect(dsn=db_connection_string)
cur = conn.cursor()
cur.execute(f"SELECT * FROM {db_table_name}")
alerts = [
dict(zip([col.name for col in cur.description], row)) for row in cur.fetchall()
]
cur.close()
conn.close()
return alerts


def _get_alerts_from_comapeo(comapeo_alerts_endpoint: str, comapeo_headers: str):
"""
Fetches alerts from the CoMapeo API.

Parameters
----------
comapeo_alerts_endpoint : str
The URL endpoint for retrieving alerts from the CoMapeo API.
comapeo_headers : str
The headers to be included in the API request, such as authorization tokens.

Returns
-------
set
A set of alert source IDs for alerts that have been posted to the CoMapeo API.
"""
logger.info("Fetching alerts from CoMapeo API...")
response = requests.request(
"GET", url=comapeo_alerts_endpoint, headers=comapeo_headers, data={}
)

response.raise_for_status()
alerts = response.json().get("data", [])

posted_alert_source_ids = {alert["sourceId"] for alert in alerts}

return posted_alert_source_ids


def filter_alerts(
comapeo_alerts_endpoint: str, comapeo_headers: str, alerts: list[dict]
):
"""
Filters a list of alerts to find those that have not been posted to the CoMapeo API.

Parameters
----------
comapeo_alerts_endpoint : str
The URL endpoint for retrieving alerts from the CoMapeo API.
comapeo_headers : str
The headers to be included in the API request, such as authorization tokens.
alerts : list[dict]
A list of dictionaries, where each dictionary represents an alert.

Returns
-------
list[dict]
A list of dictionaries, where each dictionary represents an alert that has not been posted to the CoMapeo API.
"""
logger.info("Filtering alerts...")

alerts_posted_to_comapeo = _get_alerts_from_comapeo(
comapeo_alerts_endpoint, comapeo_headers
)

# alert_id in the database matches sourceId on CoMapeo
unposted_alerts = [
alert
for alert in alerts
if alert.get("alert_id") not in alerts_posted_to_comapeo
]

return unposted_alerts


def post_alerts(
comapeo_alerts_endpoint: str,
comapeo_headers: str,
unposted_alerts,
):
"""
Posts a list of alerts to the CoMapeo API.

Parameters
----------
comapeo_alerts_endpoint : str
The URL endpoint for posting alerts to the CoMapeo API.
comapeo_headers : str
The headers to be included in the API request, such as authorization tokens.
unposted_alerts : list
A list of dictionaries, where each dictionary represents an alert to be posted to the CoMapeo API.
"""
logger.info("Posting alerts to CoMapeo API...")

for alert in unposted_alerts:
response = requests.post(
url=comapeo_alerts_endpoint, headers=comapeo_headers, json=alert
)
response.raise_for_status()

logger.info(f"{len(unposted_alerts)} alerts posted successfully.")
2 changes: 2 additions & 0 deletions f/connectors/comapeo/comapeo_alerts.script.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
psycopg2-binary==2.9.10
requests==2.32.3
40 changes: 40 additions & 0 deletions f/connectors/comapeo/comapeo_alerts.script.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
summary: 'CoMapeo: Post Alerts'
description: This script fetches alerts data from a database and posts it to a CoMapeo server.
lock: '!inline f/connectors/comapeo/comapeo_alerts.script.lock'
concurrency_time_window_s: 0
kind: script
schema:
$schema: 'https://json-schema.org/draft/2020-12/schema'
type: object
order:
- db
- db_table_name
- comapeo
- comapeo_project
properties:
comapeo:
type: object
description: A server URL and access token pair to connect to a CoMapeo archive server.
default: null
format: resource-comapeo_server
comapeo_project:
type: string
description: A project ID on the CoMapeo server where the alerts will be posted.
default: null
originalType: string
db:
type: object
description: A database connection for fetching alerts data.
default: null
format: resource-postgresql
db_table_name:
type: string
description: The name of the database table where alerts data is stored.
default: "alerts"
originalType: string
pattern: '^.{1,54}$'
required:
- db
- db_table_name
- comapeo
- comapeo_project
14 changes: 14 additions & 0 deletions f/connectors/comapeo/tests/assets/server_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,17 @@ def comapeo_project_observations(uri, project_id):
},
]
}


def comapeo_alerts():
return {
"data": [
{
"detectionDateStart": "2024-11-03T04:20:69Z",
"detectionDateEnd": "2024-11-04T04:20:69Z",
"sourceId": "abc123",
"metadata": {"foo": "bar"},
"geometry": {"type": "Point", "coordinates": [12, 34]},
}
]
}
53 changes: 53 additions & 0 deletions f/connectors/comapeo/tests/comapeo_alerts_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from typing import NamedTuple

import psycopg2
import pytest

from f.connectors.comapeo.comapeo_alerts import (
main,
)


class Alert(NamedTuple):
alert_id: str
alert_message: str


@pytest.fixture
def fake_alerts_table(pg_database):
alerts = [
Alert("abc123", "gold_mining"),
Alert("def456", "illegal_fishing"),
]

conn = psycopg2.connect(**pg_database)
conn.autocommit = True
cur = conn.cursor()

try:
cur.execute("""
CREATE TABLE fake_alerts (
alert_id TEXT PRIMARY KEY,
alert_type TEXT
)
""") # there are more alert columns than these, but it is not necessary for this test to include them

values = [(a.alert_id, a.alert_message) for a in alerts]
cur.executemany("INSERT INTO fake_alerts VALUES (%s, %s)", values)

yield alerts
finally:
cur.close()
conn.close()


def test_script_e2e(comapeoserver_alerts, pg_database, fake_alerts_table):
main(
pg_database,
comapeoserver_alerts.comapeo_server,
"forest_expedition",
"fake_alerts",
)

expected_alerts = set(a.alert_id for a in fake_alerts_table)
assert expected_alerts == {"def456", "abc123"}
6 changes: 3 additions & 3 deletions f/connectors/comapeo/tests/comapeo_observations_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ def test_normalize_and_snakecase_keys():
assert result == expected_output, f"Expected {expected_output}, but got {result}"


def test_script_e2e(comapeoserver, pg_database, tmp_path):
def test_script_e2e(comapeoserver_observations, pg_database, tmp_path):
asset_storage = tmp_path / "datalake"

main(
comapeoserver.comapeo_server,
comapeoserver.comapeo_project_blocklist,
comapeoserver_observations.comapeo_server,
comapeoserver_observations.comapeo_project_blocklist,
pg_database,
"comapeo",
asset_storage,
Expand Down
Loading