Skip to content

Commit

Permalink
add new resources
Browse files Browse the repository at this point in the history
  • Loading branch information
sanjushahgupta committed Sep 6, 2024
1 parent 64c1149 commit 20faf56
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 7 deletions.
13 changes: 13 additions & 0 deletions docs/supported-sources/klaviyo.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,17 @@ Klaviyo source allows ingesting the following sources into separate tables:

[metrics](https://developers.klaviyo.com/en/reference/metrics_api_overview): Retrieves all metrics in an account where each metric represents a category of events or actions a person can take.

[tags](https://developers.klaviyo.com/en/reference/get_tags): Retrieves all tags in an account.

[coupons](https://developers.klaviyo.com/en/reference/get_coupons): Retrieves all coupons in an account.

[catalog-variants](https://developers.klaviyo.com/en/reference/get_catalog_variants): Retrieves all variants in an account.

[catalog-categories](https://developers.klaviyo.com/en/reference/get_catalog_categories): Retrieves all catalog categories in an account.

[catalog-items](https://developers.klaviyo.com/en/reference/get_catalog_items): Retrieves all catalog items in an account.

Use these as `--source-table` parameter in the `ingestr ingest` command.

> [!WARNING]
> Klaviyo does not support incremental loading for many endpoints in its APIs, which means ingestr will load endpoints incrementally if they support it, and do a full-refresh if not.
40 changes: 39 additions & 1 deletion ingestr/src/klaviyo/_init_.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,42 @@ def metrics(
) -> Iterable[TDataItem]:
yield from client.fetch_metrics(create_client(), updated.start_value)

return events, profiles, campaigns, metrics
@dlt.resource(write_disposition="replace", primary_key="id")
def tags() -> Iterable[TAnyDateTime]:
yield from client.fetch_tag(create_client())

@dlt.resource(write_disposition="replace", primary_key="id")
def coupons() -> Iterable[TAnyDateTime]:
yield from client.fetch_coupons(create_client())

@dlt.resource(write_disposition="merge", primary_key="id", name="catalog-variants")
def catalog_variants(
updated=dlt.sources.incremental("updated", start_date_obj.isoformat()),
) -> Iterable[TDataItem]:
yield from client.fetch_catalog_variant(create_client(), updated.start_value)

@dlt.resource(
write_disposition="merge", primary_key="id", name="catalog-categories"
)
def catalog_categories(
updated=dlt.sources.incremental("updated", start_date_obj.isoformat()),
) -> Iterable[TDataItem]:
yield from client.fetch_catalog_categories(create_client(), updated.start_value)

@dlt.resource(write_disposition="merge", primary_key="id", name="catalog-items")
def catalog_items(
updated=dlt.sources.incremental("updated", start_date_obj.isoformat()),
) -> Iterable[TDataItem]:
yield from client.fetch_catalog_item(create_client(), updated.start_value)

return (
events,
profiles,
campaigns,
metrics,
tags,
coupons,
catalog_variants,
catalog_categories,
catalog_items,
)
69 changes: 64 additions & 5 deletions ingestr/src/klaviyo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,26 @@ def _flatten_attributes(self, items: list):
del event["attributes"]
return items

def _fetch_pages(self, session: requests.Session, url: str) -> list:
def _fetch_pages(
self, session: requests.Session, url: str, flat: bool = True
) -> list:
all_items = []

while True:
response = session.get(url=url, headers=self.__get_headers())
result = response.json()
items = result.get("data", [])
items = self._flatten_attributes(items)

if flat:
items = self._flatten_attributes(items)

all_items.extend(items)
nextURL = result["links"]["next"]

url = result["links"]["next"]
if url is None:
if nextURL is None:
break

url = nextURL

return all_items

def fetch_events(
Expand Down Expand Up @@ -101,3 +107,56 @@ def fetch_campaigns(
page["campaign_type"] = campaign_type

return pages

def fetch_tag(self, session: requests.Session):
url = f"{BASE_URL}/tags"
return self._fetch_pages(session, url, False)

def fetch_catalog_variant(
self,
session: requests.Session,
last_updated: str,
):
url = f"{BASE_URL}/catalog-variants"
items = self._fetch_pages(session, url)
last_updated_obj = pendulum.parse(last_updated)

for item in items:
updated_at = pendulum.parse(item["updated"])
if updated_at > last_updated_obj:
yield item

def fetch_coupons(self, session: requests.Session):
url = f"{BASE_URL}/coupons"
print("coupons_url:", url)
return self._fetch_pages(session, url, False)

def fetch_catalog_categories(
self,
session: requests.Session,
last_updated: str,
):

url = f"{BASE_URL}/catalog-categories"
items = self._fetch_pages(session, url)
print("items",items)
last_updated_obj = pendulum.parse(last_updated)

for item in items:
updated_at = pendulum.parse(item["updated"])
if updated_at > last_updated_obj:
yield item

def fetch_catalog_item(
self,
session: requests.Session,
last_updated: str,
):
url = f"{BASE_URL}/catalog-items"
items = self._fetch_pages(session, url)
last_updated_obj = pendulum.parse(last_updated)

for item in items:
updated_at = pendulum.parse(item["updated"])
if updated_at > last_updated_obj:
yield item
12 changes: 11 additions & 1 deletion ingestr/src/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,17 @@ def dlt_source(self, uri: str, table: str, **kwargs):
raise ValueError("api_key in the URI is required to connect to klaviyo")

resource = None
if table in ["events", "profiles", "campaigns", "metrics"]:
if table in [
"events",
"profiles",
"campaigns",
"metrics",
"tags",
"coupons",
"catalog-variants",
"catalog-categories",
"catalog-items",
]:
resource = table
else:
raise ValueError(
Expand Down

0 comments on commit 20faf56

Please sign in to comment.