From c93e5ba66a05002617951c85d57dd5de5bc3bcdf Mon Sep 17 00:00:00 2001 From: turtleDev Date: Fri, 31 Jan 2025 17:52:15 +0530 Subject: [PATCH 1/3] tests: add test for writing to CSV destination --- ingestr/main_test.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/ingestr/main_test.py b/ingestr/main_test.py index 5ff2c09d..b2e7d0dd 100644 --- a/ingestr/main_test.py +++ b/ingestr/main_test.py @@ -1784,6 +1784,37 @@ def test_date_coercion_issue(): source_instance.stop() dest_instance.stop() +def test_csv_dest(): + """ + Smoke test to ensure that CSV destination works. + """ + with ( + tempfile.NamedTemporaryFile("w") as duck_src, + tempfile.NamedTemporaryFile("w") as csv_dest, + ): + duck_src.close() + csv_dest.close() + try: + conn = duckdb.connect(duck_src.name) + conn.sql(""" + CREATE SCHEMA public; + CREATE TABLE public.testdata(name varchar, age integer); + INSERT INTO public.testdata(name, age) + VALUES ('Jhon', 42), ('Lisa', 21), ('Mike', 24), ('Mary', 27); + """) + conn.close() + result = invoke_ingest_command( + f"duckdb:///{duck_src.name}", + "public.testdata", + f"csv://{csv_dest.name}", + "dataset.table", # unused by csv dest + ) + assert result.exit_code == 0 + finally: + os.remove(duck_src.name) + os.remove(csv_dest.name) + + @dataclass class DynamoDBTestConfig: From bbf40d3038ddcc957da58a991d7c139c272e9de2 Mon Sep 17 00:00:00 2001 From: turtleDev Date: Fri, 31 Jan 2025 18:03:44 +0530 Subject: [PATCH 2/3] dest: csv: use parquet for reading data --- ingestr/main_test.py | 6 +++--- ingestr/src/destinations.py | 26 ++++++++++++-------------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/ingestr/main_test.py b/ingestr/main_test.py index b2e7d0dd..ccd5e000 100644 --- a/ingestr/main_test.py +++ b/ingestr/main_test.py @@ -1784,6 +1784,7 @@ def test_date_coercion_issue(): source_instance.stop() dest_instance.stop() + def test_csv_dest(): """ Smoke test to ensure that CSV destination works. @@ -1807,15 +1808,14 @@ def test_csv_dest(): f"duckdb:///{duck_src.name}", "public.testdata", f"csv://{csv_dest.name}", - "dataset.table", # unused by csv dest + "dataset.table", # unused by csv dest ) - assert result.exit_code == 0 + assert result.exit_code == 0 finally: os.remove(duck_src.name) os.remove(csv_dest.name) - @dataclass class DynamoDBTestConfig: db_name: str diff --git a/ingestr/src/destinations.py b/ingestr/src/destinations.py index df05b209..2fa49651 100644 --- a/ingestr/src/destinations.py +++ b/ingestr/src/destinations.py @@ -1,6 +1,5 @@ import base64 import csv -import gzip import json import os import shutil @@ -8,6 +7,7 @@ from urllib.parse import parse_qs, quote, urlparse import dlt +import pyarrow.parquet # type: ignore from dlt.common.configuration.specs import AwsCredentials from dlt.destinations.impl.clickhouse.configuration import ( ClickHouseCredentials, @@ -184,19 +184,17 @@ def filter_keys(dictionary): if output_path.count("/") > 1: os.makedirs(os.path.dirname(output_path), exist_ok=True) - with gzip.open(first_file_path, "rt", encoding="utf-8") as jsonl_file: # type: ignore - with open(output_path, "w", newline="") as csv_file: - csv_writer = None - for line in jsonl_file: - json_obj = filter_keys(json.loads(line)) - if csv_writer is None: - csv_writer = csv.DictWriter( - csv_file, fieldnames=json_obj.keys() - ) - csv_writer.writeheader() - - csv_writer.writerow(json_obj) - + table = pyarrow.parquet.read_table(first_file_path) + rows = table.to_pylist() + with open(output_path, "w", newline="") as csv_file: + csv_writer = None + for row in rows: + row = filter_keys(row) + if csv_writer is None: + csv_writer = csv.DictWriter(csv_file, fieldnames=row.keys()) + csv_writer.writeheader() + + csv_writer.writerow(row) shutil.rmtree(self.temp_path) From de44b6cba189565441c503ff07d87f70a88ba376 Mon Sep 17 00:00:00 2001 From: turtleDev Date: Fri, 31 Jan 2025 18:08:41 +0530 Subject: [PATCH 3/3] csv dest: add more test validations --- ingestr/main_test.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ingestr/main_test.py b/ingestr/main_test.py index ccd5e000..9938c91f 100644 --- a/ingestr/main_test.py +++ b/ingestr/main_test.py @@ -1811,6 +1811,10 @@ def test_csv_dest(): "dataset.table", # unused by csv dest ) assert result.exit_code == 0 + with open(csv_dest.name, "r") as output: + reader = csv.DictReader(output) + rows = [row for row in reader] + assert len(rows) == 4 finally: os.remove(duck_src.name) os.remove(csv_dest.name)