diff --git a/docs/examples/create_entities_from_submissions/create_entities_from_submissions.py b/docs/examples/create_entities_from_submissions/create_entities_from_submissions.py index 6421576..c649d38 100644 --- a/docs/examples/create_entities_from_submissions/create_entities_from_submissions.py +++ b/docs/examples/create_entities_from_submissions/create_entities_from_submissions.py @@ -2,29 +2,48 @@ A script that uses CSV data to create an entity list and populate it with entities. """ -import csv +from csv import DictReader from pathlib import Path from uuid import uuid4 from pyodk import Client -if __name__ == "__main__": - project_id = 1 - entity_list_name = f"previous_survey_{uuid4()}" - entity_label_field = "first_name" - entity_properties = ("age", "location") - csv_path = Path("./imported_answers.csv") +project_id = 1 +entity_label_field = "first_name" +entity_properties = ("age", "location") +csv_path = Path("./imported_answers.csv") + +def create_one_at_a_time(): with Client(project_id=project_id) as client, open(csv_path) as csv_file: # Create the entity list. - client.entity_lists.create(entity_list_name=entity_list_name) + entity_list = client.entity_lists.create( + entity_list_name=f"previous_survey_{uuid4()}" + ) for prop in entity_properties: - client.entity_lists.add_property(name=prop, entity_list_name=entity_list_name) + client.entity_lists.add_property(name=prop, entity_list_name=entity_list.name) # Create the entities from the CSV data. - for row in csv.DictReader(csv_file): + for row in DictReader(csv_file): client.entities.create( label=row[entity_label_field], data={k: str(v) for k, v in row.items() if k in entity_properties}, - entity_list_name=entity_list_name, + entity_list_name=entity_list.name, ) + + +def create_with_merge(): + with Client(project_id=project_id) as client, open(csv_path) as csv_file: + client.entity_lists.default_entity_list_name = f"previous_survey_{uuid4()}" + entity_list = client.entity_lists.create() + client.entities.merge( + data=DictReader(csv_file), + entity_list_name=entity_list.name, + source_label_key=entity_label_field, + source_keys=(entity_label_field, *entity_properties), + ) + + +if __name__ == "__main__": + # create_one_at_a_time() + create_with_merge() diff --git a/pyodk/_endpoints/entities.py b/pyodk/_endpoints/entities.py index 31074ed..35b7640 100644 --- a/pyodk/_endpoints/entities.py +++ b/pyodk/_endpoints/entities.py @@ -1,13 +1,46 @@ import logging +from collections.abc import Iterable, Mapping +from dataclasses import dataclass, field from datetime import datetime +from typing import Any from uuid import uuid4 +from pyodk.__version__ import __version__ from pyodk._endpoints import bases +from pyodk._endpoints.entity_list_properties import EntityListPropertyService from pyodk._utils import validators as pv from pyodk._utils.session import Session from pyodk.errors import PyODKError log = logging.getLogger(__name__) +SENTINEL = object() + + +@dataclass +class MergeActions: + """Return type for EntityService._prep_data_for_merge / merge""" + + match_keys: list + to_insert: dict = field(default_factory=dict) + to_update: dict = field(default_factory=dict) + to_delete: dict = field(default_factory=dict) + source_keys: set = field(default_factory=set) + target_keys: set = field(default_factory=set) + reserved_keys: frozenset = frozenset({"__id", "__system", "label"}) + # Set by "merge" function according to the "add_new_properties" parameter. + final_keys: set = field(default_factory=set) + + @property + def keys_difference(self) -> set: + return (self.source_keys - self.target_keys) - self.reserved_keys + + @property + def keys_intersect(self) -> set: + return (self.source_keys & self.target_keys) - self.reserved_keys + + @property + def keys_union(self) -> set: + return (self.source_keys | self.target_keys) - self.reserved_keys class CurrentVersion(bases.Model): @@ -41,6 +74,7 @@ class Config: list: str = _entities post: str = _entities patch: str = f"{_entities}/{{entity_id}}" + delete: str = patch get_table: str = f"{_entity_name}.svc/Entities" @@ -116,7 +150,7 @@ def create( :param label: Label of the Entity. :param data: Data to store for the Entity. :param entity_list_name: The name of the Entity List (Dataset) being referenced. - :param project_id: The id of the project this form belongs to. + :param project_id: The id of the project this Entity belongs to. :param uuid: An optional unique identifier for the Entity. If not provided then a uuid will be generated and sent by the client. """ @@ -144,6 +178,80 @@ def create( data = response.json() return Entity(**data) + def create_many( + self, + data: Iterable[Mapping[str, Any]], + entity_list_name: str | None = None, + project_id: int | None = None, + create_source: str | None = None, + source_size: str | None = None, + ) -> bool: + """ + Create one or more Entities in a single request. + + Example input for `data` would be a list of dictionaries from a CSV file: + + data = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + {"label": "Melbourne", "state": "VIC", "postcode": "3000"}, + ] + + Each Entity in `data` must include a "label" key. An Entity List property must be + created in advance for each key in `data` that is not "label". The `merge` method + can be used to automatically add properties (or a subset) and create Entities. + + :param data: Data to store for the Entities. + :param entity_list_name: The name of the Entity List (Dataset) being referenced. + :param project_id: The id of the project this Entity belongs to. + :param create_source: Used to capture the source of the change in Central, for + example a file name. Defaults to the PyODK version. + :param source_size: Used to capture the size of the source data in Central, for + example a file size or row count. Excluded if None. + """ + if create_source is None: + create_source = f"pyodk v{__version__}" + if source_size is None: + size = {} + else: + size = {"size": source_size} + + def reshape(d): + try: + new = [ + { + "label": i["label"], + "data": {k: i.get(k) for k in i if k != "label"}, + } + for i in d + ] + except KeyError as kerr: + raise PyODKError("All data must include a 'label' key.") from kerr + else: + return new + + try: + pid = pv.validate_project_id(project_id, self.default_project_id) + eln = pv.validate_entity_list_name( + entity_list_name, self.default_entity_list_name + ) + data = pv.validate_is_instance(data, typ=Iterable, key="data") + final_data = { + "entities": reshape(data), + "source": {"name": create_source, **size}, + } + except PyODKError as err: + log.error(err, exc_info=True) + raise + + response = self.session.response_or_error( + method="POST", + url=self.session.urlformat(self.urls.post, project_id=pid, el_name=eln), + logger=log, + json=final_data, + ) + data = response.json() + return data["success"] + def update( self, uuid: str, @@ -165,16 +273,15 @@ def update( :param base_version: The expected current version of the Entity on the server. If `force` is not True, then `base_version` must be specified. :param entity_list_name: The name of the Entity List (Dataset) being referenced. - :param project_id: The id of the project this form belongs to. + :param project_id: The id of the project this Entity belongs to. """ try: pid = pv.validate_project_id(project_id, self.default_project_id) eln = pv.validate_entity_list_name( entity_list_name, self.default_entity_list_name ) - params = { - "uuid": pv.validate_str(uuid, key="uuid"), - } + eid = pv.validate_str(uuid, key="uuid") + params = {} if force is not None: params["force"] = pv.validate_bool(force, key="force") if base_version is not None: @@ -193,7 +300,7 @@ def update( response = self.session.response_or_error( method="PATCH", url=self.session.urlformat( - self.urls.patch, project_id=pid, el_name=eln, entity_id=uuid + self.urls.patch, project_id=pid, el_name=eln, entity_id=eid ), logger=log, params=params, @@ -202,6 +309,39 @@ def update( data = response.json() return Entity(**data) + def delete( + self, + uuid: str, + entity_list_name: str | None = None, + project_id: int | None = None, + ) -> bool: + """ + Delete an Entity. + + :param uuid: The unique identifier for the Entity. + :param entity_list_name: The name of the Entity List (Dataset) being referenced. + :param project_id: The id of the project this Entity belongs to. + """ + try: + pid = pv.validate_project_id(project_id, self.default_project_id) + eln = pv.validate_entity_list_name( + entity_list_name, self.default_entity_list_name + ) + eid = pv.validate_str(uuid, key="uuid") + except PyODKError as err: + log.error(err, exc_info=True) + raise + + response = self.session.response_or_error( + method="DELETE", + url=self.session.urlformat( + self.urls.delete, project_id=pid, el_name=eln, entity_id=eid + ), + logger=log, + ) + data = response.json() + return data["success"] + def get_table( self, entity_list_name: str | None = None, @@ -216,7 +356,7 @@ def get_table( Read Entity List data. :param entity_list_name: The name of the Entity List (Dataset) being referenced. - :param project_id: The id of the project this form belongs to. + :param project_id: The id of the project this Entity belongs to. :param skip: The first n rows will be omitted from the results. :param top: Only up to n rows will be returned in the results. :param count: If True, an @odata.count property will be added to the result to @@ -258,3 +398,200 @@ def get_table( params=params, ) return response.json() + + @staticmethod + def _prep_data_for_merge( + source_data: Iterable[Mapping[str, Any]], + target_data: Iterable[Mapping[str, Any]], + match_keys: Iterable[str] | None = None, + source_label_key: str = "label", + source_keys: Iterable[str] | None = None, + ) -> MergeActions: + """ + Compare source and target data to identify rows to insert, update, or delete. + + :param source_data: Incoming data to be sent to the target database. + :param target_data: Existing data from the target database. + :param match_keys: Dictionary keys common to source and target used to match rows. + :param source_label_key: The key in the source data to use as the label. + :param source_keys: If provided, process only these keys in the source data. + """ + default_key = "label" + if source_label_key is None: + source_label_key = default_key + if match_keys is None: + match_keys = (default_key,) + match_keys_sorted = sorted(match_keys) + + if source_keys is not None and source_label_key not in source_keys: + raise PyODKError( + "Parameter 'source_keys' must include \"label\" or the " + "'source_label_key' parameter value" + ) + + def get_key(entity: Mapping[str, Any], keys: list) -> tuple: + try: + return tuple(entity[i] for i in keys) + except KeyError as e: + raise PyODKError( + f"Found Entity that did not have all expected match_keys: {e}" + ) from e + + result = MergeActions(match_keys=match_keys_sorted) + # Dict conversion uses memory, but original list of dict has worst case O(n*m). + src = {} + source_data_len = 0 # Not using len() since it might not be a collection. + for s in source_data: + row = {default_key: s[source_label_key]} + if source_keys is None: + row.update({k: s[k] for k in s if k != source_label_key}) + else: + row.update( + {k: s[k] for k in s if k != source_label_key and k in source_keys} + ) + src[get_key(row, match_keys_sorted)] = row + result.source_keys.update(row.keys()) + source_data_len += 1 + + if len(src) != source_data_len: + raise PyODKError( + "Parameter 'match_keys' not unique across all 'source_data'." + ) + + for t in target_data: + key = get_key(t, match_keys_sorted) + result.target_keys.update(t.keys()) + match = src.pop(key, None) + if match is None: + result.to_delete[key] = t + else: + for_update = False + new_data = {} + for k, v in t.items(): + # Add all the ID fields from the target data. + if k in result.reserved_keys: + new_data[k] = v + continue + # Ignore values where source has no key (nothing to update). + # Uses sentinel differentiate None as a value, without a keys check. + new_value = match.get(k, SENTINEL) + if new_value is SENTINEL: + continue + # Add the source value if it is different. + # Entity values are stored in Central as strings. + if str(new_value) != v: + new_data[k] = new_value + for_update = True + for k, v in match.items(): + # Add values for any new keys not in the target. + if k not in t: + new_data[k] = v + for_update = True + if for_update: + result.to_update[key] = new_data + + result.to_insert = src + return result + + def merge( + self, + data: Iterable[Mapping[str, Any]], + entity_list_name: str | None = None, + project_id: int | None = None, + match_keys: Iterable[str] | None = None, + add_new_properties: bool = True, + update_matched: bool = True, + delete_not_matched: bool = False, + source_label_key: str = "label", + source_keys: Iterable[str] | None = None, + create_source: str | None = None, + source_size: str | None = None, + ) -> MergeActions: + """ + Update Entities in Central based on the provided data: + + 1. Create Entities from `data` that don't exist in Central. + 2. Update Entities from `data` that exist in Central. + 3. Optionally, delete any Entities in Central that don't exist in `data`. + + Example input for `source_data` would be a list of dictionaries from a CSV file: + + data = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + {"label": "Melbourne", "state": "VIC", "postcode": "3000"}, + ] + + Entity creation is performed in one request using `create_many`. The merge + operation may be slow if large quantities of updates or deletes are required, + since for these operations each change is a request in a loop. If this is a + concern, set the parameters `update_matched` and `delete_not_matched` to False and + use the return value to perform threaded or async requests for these data. + + :param data: Data to use for updating Entities in Central. + :param entity_list_name: The name of the Entity List (Dataset) being referenced. + :param project_id: The id of the project this Entity belongs to. + :param match_keys: Dictionary keys common to source and target used to match rows. + Defaults to ("label",). If a custom source_label_key is provided, specify that + key as "label", because it is translated to "label" for matching. + :param add_new_properties: If True, add any Entity List properties from `data` + that aren't in Central. + :param update_matched: If True, update any Entities in Central that match `data` + but have different properties. + :param delete_not_matched: If True, delete any Entities in Central that aren't + in `data`. + :param source_label_key: The key in `data` to use as the label. The target label + key is always "label" because this key is required by Central. + :param source_keys: If provided, process only these keys in `data`. + :param create_source: If Entities are created, this is used to capture the source + of the change in Central, for example a file name. Defaults to the PyODK version. + :param source_size: If Entities are created, this is used to capture the size of + `data` in Central, for example a file size. Excluded if None. + """ + pid = pv.validate_project_id(project_id, self.default_project_id) + eln = pv.validate_entity_list_name( + entity_list_name, self.default_entity_list_name + ) + target_data = self.get_table(entity_list_name=entity_list_name)["value"] + merge_actions = self._prep_data_for_merge( + source_data=data, + target_data=target_data, + match_keys=match_keys, + source_label_key=source_label_key, + source_keys=source_keys, + ) + if add_new_properties: + elps = EntityListPropertyService( + session=self.session, + default_project_id=pid, + default_entity_list_name=eln, + ) + for k in merge_actions.keys_difference: + elps.create(name=k) + merge_actions.final_keys = merge_actions.keys_union + else: + merge_actions.final_keys = merge_actions.keys_intersect + if len(merge_actions.to_insert) > 0: + relevant_keys = {"label", *merge_actions.final_keys} + insert_filter = [ + {k: i.get(k) for k in i if k in relevant_keys} + for i in merge_actions.to_insert.values() + ] + self.create_many( + data=insert_filter, + entity_list_name=eln, + create_source=create_source, + source_size=source_size, + ) + if update_matched: + for u in merge_actions.to_update.values(): + self.update( + uuid=u["__id"], + entity_list_name=eln, + label=u["label"], + data={k: u.get(k) for k in u if k in merge_actions.final_keys}, + base_version=u["__system"]["version"], + ) + if delete_not_matched: + for d in merge_actions.to_delete.values(): + self.delete(uuid=d["__id"], entity_list_name=eln) + return merge_actions diff --git a/pyodk/_endpoints/entity_lists.py b/pyodk/_endpoints/entity_lists.py index cb94f36..ee478c9 100644 --- a/pyodk/_endpoints/entity_lists.py +++ b/pyodk/_endpoints/entity_lists.py @@ -67,7 +67,11 @@ def __init__( ): self.urls: URLs = urls if urls is not None else URLs() self.session: Session = session - self._property_service = EntityListPropertyService(session=self.session) + self._property_service = EntityListPropertyService( + session=self.session, + default_project_id=default_project_id, + default_entity_list_name=default_entity_list_name, + ) self.add_property = self._property_service.create self._default_project_id: int | None = None diff --git a/pyodk/_utils/validators.py b/pyodk/_utils/validators.py index 77ad52c..666999b 100644 --- a/pyodk/_utils/validators.py +++ b/pyodk/_utils/validators.py @@ -104,3 +104,10 @@ def validate_fp(f): return v.path_exists_validator(p) return wrap_error(validator=validate_fp, key=key, value=coalesce(*args)) + + +def validate_is_instance(*args: Any, typ: Any, key: str): + val = coalesce(*args) + if not isinstance(val, typ): + raise PyODKError(f"{key}: Unexpected type. Expected '{typ}'.") + return val diff --git a/pyproject.toml b/pyproject.toml index dc60785..58a88c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dev = [ docs = [ "mkdocs==1.5.3", "mkdocstrings==0.24.1", + "griffe==0.37", "mkdocstrings-python==1.9.0", "mkdocs-jupyter==0.24.6", ] diff --git a/tests/endpoints/test_entities.py b/tests/endpoints/test_entities.py index f486ea9..7b0514f 100644 --- a/tests/endpoints/test_entities.py +++ b/tests/endpoints/test_entities.py @@ -1,7 +1,10 @@ +from csv import DictReader +from io import StringIO from unittest import TestCase from unittest.mock import MagicMock, patch -from pyodk._endpoints.entities import Entity +from pyodk._endpoints.entities import Entity, MergeActions +from pyodk._endpoints.entities import EntityService as es from pyodk._utils.session import Session from pyodk.client import Client from pyodk.errors import PyODKError @@ -116,3 +119,462 @@ def test_update__raise_if_invalid_force_or_base_version(self): "Must specify one of 'force' or 'base_version'.", err.exception.args[0], ) + + +class TestPrepDataForMerge(TestCase): + def test_noop__source_same_as_target(self): + """Should identify no rows for insert/update/delete""" + source = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + ] + target = source + observed = es._prep_data_for_merge(source_data=source, target_data=target) + self.assertEqual({}, observed.to_insert, observed.to_insert) + self.assertEqual({}, observed.to_update, observed.to_update) + self.assertEqual({}, observed.to_delete, observed.to_delete) + self.assertEqual({"label", "state", "postcode"}, observed.source_keys) + self.assertEqual({"label", "state", "postcode"}, observed.target_keys) + + def test_noop__source_has_no_value_for_key(self): + """Should identify no rows for insert/update/delete""" + source = [ + {"label": "Sydney", "state": "NSW"}, + ] + target = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + ] + observed = es._prep_data_for_merge(source_data=source, target_data=target) + self.assertEqual({}, observed.to_insert, observed.to_insert) + self.assertEqual({}, observed.to_update, observed.to_update) + self.assertEqual({}, observed.to_delete, observed.to_delete) + self.assertEqual({"label", "state"}, observed.source_keys) + self.assertEqual({"label", "state", "postcode"}, observed.target_keys) + + def test_to_insert__source_has_new_row__empty(self): + """Should identify row to_insert only.""" + source = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + ] + target = [] + observed = es._prep_data_for_merge(source_data=source, target_data=target) + self.assertEqual( + source[0]["label"], + next(iter(observed.to_insert.keys()))[0], + observed.to_insert, + ) + self.assertEqual({}, observed.to_update, observed.to_update) + self.assertEqual({}, observed.to_delete, observed.to_delete) + self.assertEqual({"label", "state", "postcode"}, observed.source_keys) + self.assertEqual(set(), observed.target_keys) + + def test_to_insert__source_has_new_row__existing(self): + """Should identify row to_insert only.""" + source = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + {"label": "Brisbane", "state": "QLD", "postcode": "4000"}, + ] + target = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + ] + observed = es._prep_data_for_merge(source_data=source, target_data=target) + self.assertEqual( + source[1]["label"], + next(iter(observed.to_insert.keys()))[0], + observed.to_insert, + ) + self.assertEqual({}, observed.to_update, observed.to_update) + self.assertEqual({}, observed.to_delete, observed.to_delete) + self.assertEqual({"label", "state", "postcode"}, observed.source_keys) + self.assertEqual({"label", "state", "postcode"}, observed.target_keys) + + def test_to_delete__target_has_extra_row__empty(self): + """Should identify row to_delete only.""" + source = [] + target = [ + {"label": "Sydney", "state": "VIC", "postcode": "2000"}, + ] + observed = es._prep_data_for_merge(source_data=source, target_data=target) + self.assertEqual({}, observed.to_insert, observed.to_insert) + self.assertEqual({}, observed.to_update, observed.to_update) + self.assertEqual( + target[0]["label"], + next(iter(observed.to_delete.keys()))[0], + observed.to_delete, + ) + self.assertEqual(set(), observed.source_keys) + self.assertEqual({"label", "state", "postcode"}, observed.target_keys) + + def test_to_delete__target_has_extra_row__existing(self): + """Should identify row to_delete only.""" + source = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + ] + target = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + {"label": "Brisbane", "state": "QLD", "postcode": "4000"}, + ] + observed = es._prep_data_for_merge(source_data=source, target_data=target) + self.assertEqual({}, observed.to_insert, observed.to_insert) + self.assertEqual({}, observed.to_update, observed.to_update) + self.assertEqual( + target[1]["label"], + next(iter(observed.to_delete.keys()))[0], + observed.to_delete, + ) + self.assertEqual({"label", "state", "postcode"}, observed.source_keys) + self.assertEqual({"label", "state", "postcode"}, observed.target_keys) + + def test_to_update__source_value_changed__from_existing(self): + """Should identify row to_update only.""" + source = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + ] + target = [ + {"label": "Sydney", "state": "NSW", "postcode": "3000"}, + ] + observed = es._prep_data_for_merge(source_data=source, target_data=target) + self.assertEqual({}, observed.to_insert, observed.to_insert) + self.assertEqual( + source[0]["label"], + next(iter(observed.to_update.keys()))[0], + observed.to_update, + ) + self.assertEqual({}, observed.to_delete, observed.to_delete) + self.assertEqual({"label", "state", "postcode"}, observed.source_keys) + self.assertEqual({"label", "state", "postcode"}, observed.target_keys) + + def test_to_update__source_value_changed__from_none(self): + """Should identify row to_update only.""" + source = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + ] + target = [ + {"label": "Sydney", "state": "NSW", "postcode": None}, + ] + observed = es._prep_data_for_merge(source_data=source, target_data=target) + self.assertEqual({}, observed.to_insert, observed.to_insert) + self.assertEqual( + source[0]["label"], + next(iter(observed.to_update.keys()))[0], + observed.to_update, + ) + self.assertEqual({}, observed.to_delete, observed.to_delete) + self.assertEqual({"label", "state", "postcode"}, observed.source_keys) + self.assertEqual({"label", "state", "postcode"}, observed.target_keys) + + def test_to_update__source_value_changed__to_none(self): + """Should identify row to_update only.""" + source = [ + {"label": "Sydney", "state": "NSW", "postcode": None}, + ] + target = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + ] + observed = es._prep_data_for_merge(source_data=source, target_data=target) + self.assertEqual({}, observed.to_insert, observed.to_insert) + self.assertEqual( + source[0]["label"], + next(iter(observed.to_update.keys()))[0], + observed.to_update, + ) + self.assertEqual({}, observed.to_delete, observed.to_delete) + self.assertEqual({"label", "state", "postcode"}, observed.source_keys) + self.assertEqual({"label", "state", "postcode"}, observed.target_keys) + + def test_to_update__new_source_field(self): + """Should identify row to_update only.""" + source = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + ] + target = [ + {"label": "Sydney", "state": "NSW"}, + ] + observed = es._prep_data_for_merge(source_data=source, target_data=target) + self.assertEqual({}, observed.to_insert, observed.to_insert) + self.assertEqual( + source[0]["label"], + next(iter(observed.to_update.keys()))[0], + observed.to_update, + ) + self.assertEqual({}, observed.to_delete, observed.to_delete) + self.assertEqual({"label", "state", "postcode"}, observed.source_keys) + self.assertEqual({"label", "state"}, observed.target_keys) + + def test_to_update__new_source_field__with_other_change(self): + """Should identify row to_update only.""" + source = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + ] + target = [ + {"label": "Sydney", "state": "QLD"}, + ] + observed = es._prep_data_for_merge(source_data=source, target_data=target) + self.assertEqual({}, observed.to_insert, observed.to_insert) + self.assertEqual( + source[0]["label"], + next(iter(observed.to_update.keys()))[0], + observed.to_update, + ) + self.assertEqual({}, observed.to_delete, observed.to_delete) + self.assertEqual({"label", "state", "postcode"}, observed.source_keys) + self.assertEqual({"label", "state"}, observed.target_keys) + + def test_to_update__new_source_field__with_no_old_data(self): + """Should identify row to_update only.""" + source = [ + {"label": "Sydney", "postcode": "2000"}, + ] + target = [ + {"label": "Sydney", "state": "NSW"}, + ] + observed = es._prep_data_for_merge(source_data=source, target_data=target) + self.assertEqual({}, observed.to_insert, observed.to_insert) + self.assertEqual( + source[0]["label"], + next(iter(observed.to_update.keys()))[0], + observed.to_update, + ) + self.assertEqual({}, observed.to_delete, observed.to_delete) + self.assertEqual({"label", "postcode"}, observed.source_keys) + self.assertEqual({"label", "state"}, observed.target_keys) + + def test_merge__all_ops(self): + """Should identify a row for each op type at the same time.""" + source = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, # update + {"label": "Brisbane", "state": "QLD", "postcode": "4000"}, # insert + {"label": "Melbourne", "state": "VIC"}, # noop + ] + target = [ + {"label": "Sydney", "state": "VIC"}, + {"label": "Darwin", "state": "NT"}, # delete + {"label": "Melbourne", "state": "VIC"}, + ] + observed = es._prep_data_for_merge(source_data=source, target_data=target) + self.assertIsInstance(observed, MergeActions) + self.assertEqual(1, len(observed.to_insert)) + self.assertEqual( + source[1]["label"], + next(iter(observed.to_insert.keys()))[0], + observed.to_insert, + ) + self.assertEqual(1, len(observed.to_update)) + self.assertEqual( + source[0]["label"], + next(iter(observed.to_update.keys()))[0], + observed.to_update, + ) + self.assertEqual(1, len(observed.to_delete)) + self.assertEqual( + target[1]["label"], + next(iter(observed.to_delete.keys()))[0], + observed.to_delete, + ) + self.assertEqual({"label", "state", "postcode"}, observed.source_keys) + self.assertEqual({"label", "state"}, observed.target_keys) + + def test_merge__all_ops__alternative_source_label_key(self): + """Should identify a row for each op type at the same time.""" + source = [ + {"city": "Sydney", "state": "NSW", "postcode": "2000"}, # update + {"city": "Brisbane", "state": "QLD", "postcode": "4000"}, # insert + {"city": "Melbourne", "state": "VIC"}, # noop + ] + target = [ + {"label": "Sydney", "state": "VIC"}, + {"label": "Darwin", "state": "NT"}, # delete + {"label": "Melbourne", "state": "VIC"}, + ] + observed = es._prep_data_for_merge( + source_data=source, target_data=target, source_label_key="city" + ) + self.assertEqual(1, len(observed.to_insert)) + self.assertEqual( + source[1]["city"], + next(iter(observed.to_insert.keys()))[0], + observed.to_insert, + ) + self.assertEqual(1, len(observed.to_update)) + self.assertEqual( + source[0]["city"], + next(iter(observed.to_update.keys()))[0], + observed.to_update, + ) + self.assertEqual(1, len(observed.to_delete)) + self.assertEqual( + target[1]["label"], + next(iter(observed.to_delete.keys()))[0], + observed.to_delete, + ) + self.assertEqual({"label", "state", "postcode"}, observed.source_keys) + self.assertEqual({"label", "state"}, observed.target_keys) + + def test_merge__all_ops__source_data_not_strings(self): + """Should identify a row for each op type at the same time.""" + source = [ + {"label": "Sydney", "postcode": 2000}, # update + {"label": "Brisbane", "postcode": 4000}, # insert + {"label": "Melbourne", "postcode": 3000}, # noop + ] + target = [ + {"label": "Sydney", "postcode": "3000"}, + {"label": "Darwin", "postcode": "4000"}, # delete + {"label": "Melbourne", "postcode": "3000"}, + ] + observed = es._prep_data_for_merge(source_data=source, target_data=target) + self.assertEqual(1, len(observed.to_insert)) + self.assertEqual( + source[1]["label"], + next(iter(observed.to_insert.keys()))[0], + observed.to_insert, + ) + self.assertEqual(1, len(observed.to_update)) + self.assertEqual( + source[0]["label"], + next(iter(observed.to_update.keys()))[0], + observed.to_update, + ) + self.assertEqual(1, len(observed.to_delete)) + self.assertEqual( + target[1]["label"], + next(iter(observed.to_delete.keys()))[0], + observed.to_delete, + ) + self.assertEqual({"label", "postcode"}, observed.source_keys) + self.assertEqual({"label", "postcode"}, observed.target_keys) + + def test_merge__all_ops__match_keys_not_including_label(self): + """Should identify a row for each op type at the same time.""" + source = [ + {"label": "Sydney", "id": "2", "state": "NSW", "postcode": "2000"}, # update + { + "label": "Brisbane", + "id": "4", + "state": "QLD", + "postcode": "4000", + }, # insert + {"label": "Melbourne", "id": "3", "state": "VIC"}, # noop + ] + target = [ + {"label": "Sydney", "id": "2", "state": "VIC"}, + {"label": "Darwin", "id": "1", "state": "NT"}, # delete + {"label": "Melbourne", "id": "3", "state": "VIC"}, + ] + observed = es._prep_data_for_merge( + source_data=source, target_data=target, match_keys=("id",) + ) + self.assertEqual(1, len(observed.to_insert)) + self.assertEqual( + source[1]["id"], + next(iter(observed.to_insert.keys()))[0], + observed.to_insert, + ) + self.assertEqual(1, len(observed.to_update)) + self.assertEqual( + source[0]["id"], + next(iter(observed.to_update.keys()))[0], + observed.to_update, + ) + self.assertEqual(1, len(observed.to_delete)) + self.assertEqual( + target[1]["id"], + next(iter(observed.to_delete.keys()))[0], + observed.to_delete, + ) + self.assertEqual({"id", "label", "postcode", "state"}, observed.source_keys) + self.assertEqual({"id", "label", "state"}, observed.target_keys) + + def test_source_has_duplicate_match_key(self): + """Should detect duplicate rows in source.""" + source = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + ] + target = [] + with self.assertRaises(PyODKError) as err: + es._prep_data_for_merge(source_data=source, target_data=target) + self.assertEqual( + "Parameter 'match_keys' not unique across all 'source_data'.", + err.exception.args[0], + ) + + def test_source_has_row_missing_match_key(self): + """Should detect rows in source missing a match key.""" + source = [ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + {"label": "Brisbane", "postcode": "4000"}, + ] + target = [] + with self.assertRaises(PyODKError) as err: + es._prep_data_for_merge( + source_data=source, target_data=target, match_keys={"label", "state"} + ) + self.assertEqual( + "Found Entity that did not have all expected match_keys: 'state'", + err.exception.args[0], + ) + + def test_source_keys_limits_columns_of_interest(self): + """Should only process source_keys if specified.""" + source = [ + {"city": "Sydney", "state": "NSW", "postcode": "2000"}, + {"city": "Brisbane", "state": "QLD", "postcode": "4000"}, + {"city": "Hobart"}, + ] + target = [ + {"label": "Sydney", "state": "VIC", "postcode": "3000"}, + {"label": "Brisbane", "state": "QLD"}, + {"label": "Hobart"}, + ] + observed = es._prep_data_for_merge( + source_data=source, + target_data=target, + source_label_key="city", + source_keys={"city", "state"}, + ) + # "city" is translated to "label", "postcode" is ignored, "state" is updated. + self.assertEqual(1, len(observed.to_update)) + self.assertEqual( + {"label": "Sydney", "state": "NSW"}, + next(iter(observed.to_update.values())), + observed.to_update, + ) + self.assertEqual(0, len(observed.to_insert)) + self.assertEqual(0, len(observed.to_delete)) + self.assertEqual(["label"], observed.match_keys) + + def test_source_keys_does_not_include_label_or_source_label_key(self): + """Should raise an error if the source column specifications don't make sense.""" + source = [ + {"city": "Sydney", "state": "NSW", "postcode": "2000"}, + {"city": "Brisbane", "postcode": "4000"}, + ] + target = [] + with self.assertRaises(PyODKError) as err: + es._prep_data_for_merge( + source_data=source, + target_data=target, + source_label_key="city", + source_keys={"state", "postcode"}, + ) + self.assertEqual( + "Parameter 'source_keys' must include \"label\" or the " + "'source_label_key' parameter value", + err.exception.args[0], + ) + + def test_csv_as_source_data(self): + """Should be able to pass in CSV DictReader as a the source_data.""" + csv = """label,state,postcode\nSydney,NSW,2000\nBrisbane,QLD,4000""" + target = [{"label": "Brisbane", "state": "QLD", "postcode": "4000"}] + observed = es._prep_data_for_merge( + source_data=list(DictReader(StringIO(csv))), + target_data=target, + ) + self.assertEqual(1, len(observed.to_insert)) + self.assertEqual( + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, + next(iter(observed.to_insert.values())), + ) + self.assertEqual(0, len(observed.to_update)) + self.assertEqual(0, len(observed.to_delete)) diff --git a/tests/test_client.py b/tests/test_client.py index a480e3a..ff669b8 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -295,6 +295,106 @@ def test_entities__update(self): ) self.assertEqual("test_value3", forced.currentVersion.data["test_label"]) + def test_entity__merge__new(self): + """Should create a new Entity List, and merge in some new data.""" + self.client.entity_lists.default_entity_list_name = ( + self.client.session.get_xform_uuid() + ) + entity_list = self.client.entity_lists.create() + self.client.entities.merge( + data=[ + {"label": "Sydney", "state": "NSW"}, + {"label": "Melbourne", "state": "VIC"}, + ], + entity_list_name=entity_list.name, + ) + entity_data = self.client.entities.get_table(entity_list_name=entity_list.name) + self.assertEqual(2, len(entity_data["value"])) + + def test_entity__merge__existing__add_props__delete_unmatched(self): + """Should create a new Entity List, and merge in some new data.""" + self.client.entity_lists.default_entity_list_name = ( + self.client.session.get_xform_uuid() + ) + entity_list = self.client.entity_lists.create() + self.client.entity_lists.add_property( + name="state", entity_list_name=entity_list.name + ) + self.client.entities.create_many( + data=[ + {"label": "Sydney", "state": "VIC"}, + {"label": "Darwin", "state": "NT"}, + ], + entity_list_name=entity_list.name, + ) + # Add postcode property, Add Brisbane, update Sydney, delete Darwin. + self.client.entities.merge( + data=[ + {"label": "Sydney", "state": "NSW", "postcode": "2001"}, + {"label": "Brisbane", "state": "QLD", "postcode": "4000"}, + ], + entity_list_name=entity_list.name, + add_new_properties=True, + delete_not_matched=True, + ) + entity_data = self.client.entities.get_table(entity_list_name=entity_list.name) + expected = [ + {"label": "Sydney", "state": "NSW", "postcode": "2001"}, + {"label": "Brisbane", "state": "QLD", "postcode": "4000"}, + ] + observed = [ + {k: o.get(k) for k in ("state", "label", "postcode")} + for o in entity_data["value"] + ] + self.assertTrue( + len(expected) == len(observed) + and all(e in observed for e in expected) + and expected[0].keys() == observed[0].keys(), + observed, + ) + + def test_entity__merge__existing__ignore_props__keep_unmatched(self): + """Should create a new Entity List, and merge in some new data.""" + self.client.entity_lists.default_entity_list_name = ( + self.client.session.get_xform_uuid() + ) + entity_list = self.client.entity_lists.create() + self.client.entity_lists.add_property( + name="state", entity_list_name=entity_list.name + ) + self.client.entities.create_many( + data=[ + {"label": "Sydney", "state": "VIC"}, + {"label": "Darwin", "state": "NT"}, + ], + entity_list_name=entity_list.name, + ) + # Skip postcode property, add Brisbane, update Sydney, keep Darwin. + self.client.entities.merge( + data=[ + {"label": "Sydney", "state": "NSW", "postcode": "2000"}, # update + {"label": "Brisbane", "state": "QLD", "postcode": "4000"}, # insert + ], + entity_list_name=entity_list.name, + add_new_properties=False, + delete_not_matched=False, + ) + entity_data = self.client.entities.get_table(entity_list_name=entity_list.name) + expected = [ + {"label": "Sydney", "state": "NSW"}, + {"label": "Brisbane", "state": "QLD"}, + {"label": "Darwin", "state": "NT"}, + ] + observed = [ + {k: o.get(k) for k in ("state", "label")} for o in entity_data["value"] + ] + self.assertTrue( + len(expected) == len(observed) + and all(e in observed for e in expected) + and expected[0].keys() == observed[0].keys(), + observed, + ) + def test_entity_lists__list(self): """Should return a list of Entity Lists.""" observed = self.client.entity_lists.list()