Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB: Add pagination support for full-load table loader #252

Merged
merged 3 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dynamodb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
# TODO: yarl, dependency of influxio, is currently not available on Python 3.12.
# https://github.com/aio-libs/yarl/pull/942
python-version: ["3.8", "3.11"]
localstack-version: ["3.6"]
localstack-version: ["3.7"]

env:
OS: ${{ matrix.os }}
Expand Down
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@


## Unreleased
- DynamoDB: Add special decoding for varied lists.
Store them into a separate `OBJECT(IGNORED)` column in CrateDB.
- DynamoDB: Add pagination support for `full-load` table loader

## 2024/08/27 v0.0.20
- DMS/DynamoDB: Fix table name quoting within CDC processor handler
Expand Down
39 changes: 35 additions & 4 deletions cratedb_toolkit/io/dynamodb/adapter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import logging
import typing as t

import boto3
from yarl import URL

logger = logging.getLogger(__name__)


class DynamoDBAdapter:
def __init__(self, dynamodb_url: URL, echo: bool = False):
def __init__(self, dynamodb_url: URL):
self.session = boto3.Session(
aws_access_key_id=dynamodb_url.user,
aws_secret_access_key=dynamodb_url.password,
Expand All @@ -15,11 +20,37 @@
self.dynamodb_resource = self.session.resource("dynamodb", endpoint_url=endpoint_url)
self.dynamodb_client = self.session.client("dynamodb", endpoint_url=endpoint_url)

def scan(self, table_name: str):
def scan(
self,
table_name: str,
page_size: int = 1000,
consistent_read: bool = False,
on_error: t.Literal["log", "raise"] = "log",
) -> t.Generator[t.Dict, None, None]:
"""
Return all items from DynamoDB table.
Fetch and generate all items from a DynamoDB table, with pagination.

https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.Pagination
"""
return self.dynamodb_client.scan(TableName=table_name)
key = None
while True:
try:
scan_kwargs = {"TableName": table_name, "ConsistentRead": consistent_read, "Limit": page_size}
if key is not None:
scan_kwargs.update({"ExclusiveStartKey": key})

Check warning on line 40 in cratedb_toolkit/io/dynamodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/adapter.py#L40

Added line #L40 was not covered by tests
Comment on lines +39 to +40
Copy link
Member Author

@amotl amotl Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This spot, not covered yet, certainly needs a software test. I didn't pay enough attention, and Codecov didn't have access to the repository beforehand, so it did not run a corresponding admonition.

response = self.dynamodb_client.scan(**scan_kwargs)
yield response
key = response.get("LastEvaluatedKey", None)
if key is None:
break
except Exception as ex:
if on_error == "log":
logger.exception("Error reading DynamoDB table")

Check warning on line 48 in cratedb_toolkit/io/dynamodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/adapter.py#L48

Added line #L48 was not covered by tests
elif on_error == "raise":
raise
else:
raise ValueError(f"Unknown 'on_error' value: {on_error}") from ex
break

Check warning on line 53 in cratedb_toolkit/io/dynamodb/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/adapter.py#L52-L53

Added lines #L52 - L53 were not covered by tests

def count_records(self, table_name: str):
table = self.dynamodb_resource.Table(table_name)
Expand Down
39 changes: 25 additions & 14 deletions cratedb_toolkit/io/dynamodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.data import asbool

logger = logging.getLogger(__name__)

Expand All @@ -23,6 +24,7 @@
dynamodb_url: str,
cratedb_url: str,
progress: bool = False,
debug: bool = True,
):
cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
Expand All @@ -36,38 +38,47 @@
self.translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table)

self.progress = progress
self.debug = debug

self.page_size: int = int(self.dynamodb_url.query.get("page-size", 1000))
self.consistent_read: bool = asbool(self.dynamodb_url.query.get("consistent-read", False))

def start(self):
"""
Read items from DynamoDB table, convert to SQL INSERT statements, and submit to CrateDB.
"""
records_in = self.dynamodb_adapter.count_records(self.dynamodb_table)
logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}")
logger_on_error = logger.warning
if self.debug:
logger_on_error = logger.exception
with self.cratedb_adapter.engine.connect() as connection:
if not self.cratedb_adapter.table_exists(self.cratedb_table):
connection.execute(sa.text(self.translator.sql_ddl))
connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
result = self.dynamodb_adapter.scan(table_name=self.dynamodb_table)
records_out = 0
for operation in self.items_to_operations(result["Items"]):
for result in self.dynamodb_adapter.scan(
table_name=self.dynamodb_table,
consistent_read=self.consistent_read,
page_size=self.page_size,
):
result_size = len(result["Items"])
try:
operation = self.translator.to_sql(result["Items"])
except Exception as ex:
logger_on_error(f"Transforming query failed: {ex}")
continue

Check warning on line 73 in cratedb_toolkit/io/dynamodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/copy.py#L71-L73

Added lines #L71 - L73 were not covered by tests
try:
connection.execute(sa.text(operation.statement), operation.parameters)
records_out += 1
except sa.exc.ProgrammingError as ex:
logger.warning(f"Running query failed: {ex}")
progress_bar.update()
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(f"Executing query failed: {ex}")

Check warning on line 79 in cratedb_toolkit/io/dynamodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/dynamodb/copy.py#L78-L79

Added lines #L78 - L79 were not covered by tests
progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out < records_in:
if records_out == 0:
logger.warning("No data has been copied")

def items_to_operations(self, items):
"""
Convert data for record items to INSERT statements.
"""
for item in items:
yield self.translator.to_sql(item)
6 changes: 4 additions & 2 deletions cratedb_toolkit/io/processor/kinesis_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
MESSAGE_FORMAT: str = os.environ.get("MESSAGE_FORMAT", "unknown")
COLUMN_TYPES: str = os.environ.get("COLUMN_TYPES", "")
CRATEDB_SQLALCHEMY_URL: str = os.environ.get("CRATEDB_SQLALCHEMY_URL", "crate://")
CRATEDB_TABLE: str = os.environ.get("CRATEDB_TABLE", "default")
CRATEDB_TABLE: t.Optional[str] = os.environ.get("CRATEDB_TABLE")

logger = logging.getLogger(__name__)
logger.setLevel(LOG_LEVEL)
Expand Down Expand Up @@ -127,7 +127,9 @@ def handler(event, context):
connection.execute(sa.text(operation.statement), parameters=operation.parameters)

# Processing alternating CDC events requires write synchronization.
connection.execute(sa.text(f"REFRESH TABLE {cdc.quote_table_name(CRATEDB_TABLE)}"))
# TODO: Improve interface.
if hasattr(cdc, "table_name"):
connection.execute(sa.text(f"REFRESH TABLE {cdc.table_name}"))

connection.commit()

Expand Down
2 changes: 1 addition & 1 deletion cratedb_toolkit/testing/testcontainers/localstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class LocalStackContainerWithKeepalive(KeepaliveContainer, LocalStackContainer):
useful when used within a test matrix. Its default value is `latest`.
"""

LOCALSTACK_VERSION = os.environ.get("LOCALSTACK_VERSION", "latest")
LOCALSTACK_VERSION = os.environ.get("LOCALSTACK_VERSION", "3.7")

def __init__(
self,
Expand Down
19 changes: 19 additions & 0 deletions doc/io/dynamodb/loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ ctk shell --command "SELECT * FROM testdrive.demo;"
ctk show table "testdrive.demo"
```

## Options

### `page-size`
The source URL accepts the `page-size` option to configure DynamoDB
[pagination]. The default value is `1000`.
```shell
ctk load table .../ProductCatalog?region=us-east-1&page-size=5000
```

### `consistent-read`
The source URL accepts the `consistent-read` option to configure DynamoDB
[read consistency]. The default value is `false`.
```shell
ctk load table .../ProductCatalog?region=us-east-1&consistent-read=true
```


## Variants

### CrateDB Cloud
Expand Down Expand Up @@ -66,3 +83,5 @@ docker run \

[Credentials for accessing LocalStack AWS API]: https://docs.localstack.cloud/references/credentials/
[Get started with DynamoDB on LocalStack]: https://docs.localstack.cloud/user-guide/aws/dynamodb/
[pagination]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.Pagination
[read consistency]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ReadConsistency
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ docs = [
]
dynamodb = [
"boto3",
"commons-codec>=0.0.12",
"commons-codec>=0.0.14",
]
full = [
"cratedb-toolkit[cfr,cloud,datasets,io,service]",
Expand All @@ -155,11 +155,11 @@ io = [
"sqlalchemy>=2",
]
kinesis = [
"commons-codec>=0.0.12",
"lorrystream[carabas]",
"commons-codec>=0.0.14",
"lorrystream[carabas]>=0.0.6",
]
mongodb = [
"commons-codec[mongodb,zyp]>=0.0.12",
"commons-codec[mongodb,zyp]>=0.0.14",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<5,>=3.10.1",
Expand Down
42 changes: 42 additions & 0 deletions tests/io/dynamodb/test_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import pytest
from botocore.exceptions import ParamValidationError
from yarl import URL

from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter

pytestmark = pytest.mark.dynamodb


RECORD = {
"Id": {"N": "101"},
}


def test_adapter_scan_success(dynamodb):
dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1"
adapter = DynamoDBAdapter(URL(dynamodb_url))
adapter.scan("foo")


def test_adapter_scan_failure_consistent_read(dynamodb):
"""
Check supplying invalid parameters to `DynamoDBAdapter` fails as expected.
"""
dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1"
adapter = DynamoDBAdapter(URL(dynamodb_url))

with pytest.raises(ParamValidationError) as ex:
next(adapter.scan("demo", consistent_read=-42, on_error="raise"))
assert ex.match("Parameter validation failed:\nInvalid type for parameter ConsistentRead, value: -42.*")


def test_adapter_scan_failure_page_size(dynamodb):
"""
Check supplying invalid parameters to `DynamoDBAdapter` fails as expected.
"""
dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1"
adapter = DynamoDBAdapter(URL(dynamodb_url))

with pytest.raises(ParamValidationError) as ex:
next(adapter.scan("demo", page_size=-1, on_error="raise"))
assert ex.match("Parameter validation failed:\nInvalid value for parameter Limit, value: -1, valid min value: 1")
2 changes: 1 addition & 1 deletion tests/io/dynamodb/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ def test_dynamodb_load_table(caplog, cratedb, dynamodb, dynamodb_test_manager):
"""
CLI test: Invoke `ctk load table` for DynamoDB.
"""
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"
dynamodb_url = f"{dynamodb.get_connection_url()}/ProductCatalog?region=us-east-1"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Populate source database with sample dataset.
dynamodb_test_manager.load_product_catalog()
Expand Down
48 changes: 6 additions & 42 deletions tests/io/dynamodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,22 @@
pytestmark = pytest.mark.dynamodb


RECORD_UTM = {
RECORD = {
"Id": {"N": "101"},
"utmTags": {
"L": [
{
"M": {
"date": {"S": "2024-08-28T20:05:42.603Z"},
"utm_adgroup": {"L": [{"S": ""}, {"S": ""}]},
"utm_campaign": {"S": "34374686341"},
"utm_medium": {"S": "foobar"},
"utm_source": {"S": "google"},
}
}
]
},
"location": {
"M": {
"coordinates": {"L": [{"S": ""}]},
"meetingPoint": {"S": "At the end of the tunnel"},
"address": {"S": "Salzbergwerk Berchtesgaden"},
},
},
}


def test_dynamodb_copy(caplog, cratedb, dynamodb, dynamodb_test_manager):
def test_dynamodb_copy_success(caplog, cratedb, dynamodb, dynamodb_test_manager):
"""
CLI test: Invoke `ctk load table` for DynamoDB.
Verify `DynamoDBFullLoad` works as expected.
"""

# Define source and target URLs.
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"
dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Populate source database with data.
dynamodb_test_manager.load_records(table_name="demo", records=[RECORD_UTM])
dynamodb_test_manager.load_records(table_name="demo", records=[RECORD])

# Run transfer command.
table_loader = DynamoDBFullLoad(dynamodb_url=dynamodb_url, cratedb_url=cratedb_url)
Expand All @@ -52,20 +32,4 @@ def test_dynamodb_copy(caplog, cratedb, dynamodb, dynamodb_test_manager):
assert cratedb.database.count_records("testdrive.demo") == 1

results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) # noqa: S608
assert results[0]["data"] == {
"Id": 101.0,
"utmTags": [
{
"date": "2024-08-28T20:05:42.603Z",
"utm_adgroup": ["", ""],
"utm_campaign": "34374686341",
"utm_medium": "foobar",
"utm_source": "google",
}
],
"location": {
"coordinates": [""],
"meetingPoint": "At the end of the tunnel",
"address": "Salzbergwerk Berchtesgaden",
},
}
assert results[0]["data"] == {"Id": 101.0}