Skip to content

Commit

Permalink
csv dest: add improved error handling in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
turtleDev committed Feb 5, 2025
1 parent 726d25d commit 28845af
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
7 changes: 4 additions & 3 deletions ingestr/src/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
from contextlib import contextmanager
from typing import Generator

from pyarrow.parquet import ParquetFile

from pyarrow.parquet import ParquetFile # type: ignore

PARQUET_BATCH_SIZE = 64


class UnsupportedLoaderFileFormat(Exception):
pass

Expand Down Expand Up @@ -48,6 +48,7 @@ def jsonlfile(filepath: str):
def reader(fd):
for line in fd:
yield json.loads(line.decode().strip())

with gzip.open(filepath) as fd:
yield reader(fd)

Expand All @@ -63,6 +64,6 @@ def parquetfile(filepath: str):
def reader(pf: ParquetFile):
for batch in pf.iter_batches(PARQUET_BATCH_SIZE):
yield from batch.to_pylist()

with open(filepath, "rb") as fd:
yield reader(ParquetFile(fd))
13 changes: 10 additions & 3 deletions ingestr/src/loader_test.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import csv
import gzip
import json
import logging
import os
import tempfile
from typing import List

import pyarrow.parquet
import pyarrow.parquet # type: ignore
import pytest
from loader import load_dlt_file

from ingestr.src.loader import load_dlt_file

logger = logging.getLogger(__name__)

TESTDATA = [
{"name": "Jhon", "email": "[email protected]"},
Expand Down Expand Up @@ -57,7 +61,10 @@ def testfiles():
yield files

for file in files:
os.remove(file)
try:
os.remove(file)
except Exception as e:
logger.error(f"error removing temporary file {file}", exc_info=e)


def test_loader(testfiles):
Expand Down

0 comments on commit 28845af

Please sign in to comment.