Skip to content

Commit

Permalink
Merge pull request #101 from bruin-data/bugfix-csv-dest/BRU-1532
Browse files Browse the repository at this point in the history
Bugfix: CSV Destination
  • Loading branch information
turtleDev authored Jan 31, 2025
2 parents 052873b + de44b6c commit aee17a7
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 14 deletions.
35 changes: 35 additions & 0 deletions ingestr/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,41 @@ def test_date_coercion_issue():
dest_instance.stop()


def test_csv_dest():
"""
Smoke test to ensure that CSV destination works.
"""
with (
tempfile.NamedTemporaryFile("w") as duck_src,
tempfile.NamedTemporaryFile("w") as csv_dest,
):
duck_src.close()
csv_dest.close()
try:
conn = duckdb.connect(duck_src.name)
conn.sql("""
CREATE SCHEMA public;
CREATE TABLE public.testdata(name varchar, age integer);
INSERT INTO public.testdata(name, age)
VALUES ('Jhon', 42), ('Lisa', 21), ('Mike', 24), ('Mary', 27);
""")
conn.close()
result = invoke_ingest_command(
f"duckdb:///{duck_src.name}",
"public.testdata",
f"csv://{csv_dest.name}",
"dataset.table", # unused by csv dest
)
assert result.exit_code == 0
with open(csv_dest.name, "r") as output:
reader = csv.DictReader(output)
rows = [row for row in reader]
assert len(rows) == 4
finally:
os.remove(duck_src.name)
os.remove(csv_dest.name)


@dataclass
class DynamoDBTestConfig:
db_name: str
Expand Down
26 changes: 12 additions & 14 deletions ingestr/src/destinations.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import base64
import csv
import gzip
import json
import os
import shutil
import tempfile
from urllib.parse import parse_qs, quote, urlparse

import dlt
import pyarrow.parquet # type: ignore
from dlt.common.configuration.specs import AwsCredentials
from dlt.destinations.impl.clickhouse.configuration import (
ClickHouseCredentials,
Expand Down Expand Up @@ -184,19 +184,17 @@ def filter_keys(dictionary):
if output_path.count("/") > 1:
os.makedirs(os.path.dirname(output_path), exist_ok=True)

with gzip.open(first_file_path, "rt", encoding="utf-8") as jsonl_file: # type: ignore
with open(output_path, "w", newline="") as csv_file:
csv_writer = None
for line in jsonl_file:
json_obj = filter_keys(json.loads(line))
if csv_writer is None:
csv_writer = csv.DictWriter(
csv_file, fieldnames=json_obj.keys()
)
csv_writer.writeheader()

csv_writer.writerow(json_obj)

table = pyarrow.parquet.read_table(first_file_path)
rows = table.to_pylist()
with open(output_path, "w", newline="") as csv_file:
csv_writer = None
for row in rows:
row = filter_keys(row)
if csv_writer is None:
csv_writer = csv.DictWriter(csv_file, fieldnames=row.keys())
csv_writer.writeheader()

csv_writer.writerow(row)
shutil.rmtree(self.temp_path)


Expand Down

0 comments on commit aee17a7

Please sign in to comment.