Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/development' into DAT-328
Browse files Browse the repository at this point in the history
  • Loading branch information
ake2l committed Nov 15, 2024
2 parents 4592189 + 1f77f28 commit 1178c44
Show file tree
Hide file tree
Showing 76 changed files with 98 additions and 162 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ jobs:
- name: Run SonarCloud Scan
uses: SonarSource/sonarcloud-github-action@master
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any
# GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}

e2e_test:
Expand Down Expand Up @@ -479,4 +479,5 @@ jobs:
env:
EE_TOKEN: ${{ secrets.EE_TOKEN }}
shell: bash


File renamed without changes.
24 changes: 1 addition & 23 deletions datamimic_ce/exporters/csv_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def __init__(
self._task_id = setup_context.task_id

# Retrieve encoding and delimiter from setup_context or use defaults
kwargs["encoding"] = encoding or setup_context.default_encoding or "utf-8"
self.delimiter = delimiter or setup_context.default_separator or ","
self.quotechar = quotechar or '"'
self.quoting = quoting or csv.QUOTE_MINIMAL
Expand All @@ -56,7 +55,7 @@ def __init__(
**kwargs,
)
logger.info(
f"CSVEEExporter initialized with chunk size {chunk_size}, fieldnames '{fieldnames}', "
f"CSVExporter initialized with chunk size {chunk_size}, fieldnames '{fieldnames}', "
f"encoding '{self._encoding}', delimiter '{self.delimiter}'"
)

Expand Down Expand Up @@ -102,24 +101,3 @@ def _reset_state(self):
"""Resets the exporter state for reuse."""
super()._reset_state()
logger.debug("CSVEEExporter state has been reset.")

# def __init__(self, setup_context: SetupContext, product_name: str):
# super().__init__("csv", setup_context, product_name)
# self._separator = setup_context.default_separator
# self._line_separator = setup_context.default_line_separator or os.linesep

# def consume(self, product: Tuple[str, List[Dict]]) -> None:
# name = product[0]
# data = product[1]
#
# file_name = f"{name}.csv"
# with open(self._exported_data_dir / file_name, "w") as file:
# if data:
# keys = data[0].keys()
# file.write(f"{self._separator}".join(keys))
# file.write(self._line_separator)
# for row in data:
# file.write(f"{self._separator}".join([str(row[key]) for key in keys]))
# file.write(self._line_separator)
#
# logger.debug(f"Exported data to {self._exported_data_dir / file_name}")
2 changes: 2 additions & 0 deletions datamimic_ce/exporters/exporter_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def create_exporter_list(
:param setup_context:
:param stmt:
:param targets:
:param mp_idx:
:return:
"""
consumers_with_operation = []
Expand Down
1 change: 1 addition & 0 deletions datamimic_ce/exporters/json_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(
mp_id: int | None,
use_ndjson: bool = False,
chunk_size: int = None,
encoding: str = None,
):
self.use_ndjson = use_ndjson
self._task_id = setup_context.task_id
Expand Down
1 change: 0 additions & 1 deletion datamimic_ce/exporters/txt_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def __init__(
)

# Pass encoding via kwargs to the base class
kwargs["encoding"] = encoding or setup_context.default_encoding or "utf-8"

super().__init__(
"txt",
Expand Down
88 changes: 19 additions & 69 deletions datamimic_ce/exporters/unified_buffered_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,7 @@ def __init__(
self.chunk_size = chunk_size # Max entities per chunk

# Prepare temporary buffer directory
# / f"exporter_result_{self._task_id}_pid_{self._pid}_exporter_{self._exporter_type}_product_{self.product_name}"
self._buffer_tmp_dir = (
self._descriptor_dir
/ f"exporter_result_{self._task_id}_exporter_{self._exporter_type}_product_{self.product_name}"
)
self._buffer_tmp_dir = self._get_buffer_tmp_dir()
self._buffer_tmp_dir.mkdir(parents=True, exist_ok=True)

# Initialize state variables
Expand All @@ -79,14 +75,19 @@ def __init__(
# Handle any additional parameters from kwargs
for key, value in kwargs.items():
setattr(self, key, value)
# self._buffer_file = self._get_buffer_file() # Initialize first buffer file

@property
def encoding(self) -> str:
return self._encoding

def _get_buffer_tmp_dir(self) -> Path:
return self._descriptor_dir / f"exporter_result_{self._task_id}_exporter_{self._exporter_type}_product_{self.product_name}"

def _get_buffer_dir_by_pid(self) -> Path:
"""
Returns the buffer directory for the current PID.
:return:
"""
# / f"exporter_result_{self._task_id}_pid_{self._pid}_exporter_{self._exporter_type}_product_{self.product_name}"
return (
self._descriptor_dir
/ f"exporter_result_{self._task_id}_exporter_{self._exporter_type}_product_{self.product_name}"
Expand Down Expand Up @@ -168,7 +169,6 @@ def _rotate_chunk(self) -> None:
self.chunk_index += 1
self.current_counter = 0 # Reset the current count for new chunk
self._save_state()
# self._buffer_file = self._get_buffer_file() # Create new buffer file for the next chunk
self._is_first_write = True # Reset for the new chunk

def store_data(self, data: list[dict]) -> None:
Expand Down Expand Up @@ -282,40 +282,6 @@ def consume(self, product: tuple):
logger.debug(f"Storing data for '{self.product_name}' with {len(data)} records")
self.store_data(data)

# def upload_to_storage(self, bucket: str, name: str) -> None:
# """Uploads all buffered chunk files to the object storage in the specified format."""
# buffer_dir = self._get_buffer_dir_by_pid()
#
# # logger.debug(f"Found {len(buffer_dirs)} buffer directories for export.")
#
# # Define file suffix
# suffix = self._define_suffix()
#
# buffer_files = sorted(
# buffer_dir.glob(f"*.{suffix}"),
# key=lambda x: int(re.search(r"_chunk_(\d+)", x.name).group(1)),
# )
# logger.debug(f"Found {len(buffer_files)} buffer files in directory {buffer_dir}.")
#
# for buffer_file in buffer_files:
# # Corresponding metadata file
# metadata = self._load_metadata(buffer_file.with_suffix(".meta"))
#
# total_count, uri = self._craft_uri(metadata, suffix)
#
# with buffer_file.open("r", encoding=self._encoding) as f:
# data_content = f.read()
# data_buffer = BytesIO(data_content.encode(self._encoding))
# content_type = self._get_content_type()
# # Perform the upload
# self._storage.write(bucket or self._default_bucket, uri, data_buffer, content_type)
# logger.debug(f"Uploaded {uri} with {total_count} records")
#
# logger.info(f"Exported buffer files successfully.")
#
# # Cleanup buffer directories
# self._reset_state()

@abstractmethod
def _define_suffix(self) -> str:
"""Defines the file suffix based on the format."""
Expand All @@ -342,44 +308,28 @@ def _craft_uri(self, metadata, suffix):
)

# Determine URI based on chunk size and multiprocessing
if self._mp:
if chunk_size is None:
uri = f"{product_name}_pid_{pid}.{suffix}"
elif chunk_size == 1:
uri = f"{product_name}_{chunk_start}_pid_{pid}.{suffix}"
else:
uri = f"{product_name}_{chunk_start}_{chunk_end}_pid_{pid}.{suffix}"
if chunk_size is None:
uri = f"{product_name}{self._pid_placeholder}.{suffix}"
elif chunk_size == 1:
uri = f"{product_name}_{chunk_start}{self._pid_placeholder}.{suffix}"
else:
if chunk_size is None:
uri = f"{product_name}.{suffix}"
elif chunk_size == 1:
uri = f"{product_name}_{chunk_start}.{suffix}"
else:
uri = f"{product_name}_{chunk_start}_{chunk_end}.{suffix}"
uri = f"{product_name}_{chunk_start}_{chunk_end}{self._pid_placeholder}.{suffix}"

return total_count, f"{self._task_id}/{uri}"

def cleanup(self) -> None:
"""Cleans up the temporary buffer directories."""
# Identify all buffer directories for this task

# buffer_dir = self._get_buffer_dir_by_pid()
#
# if buffer_dir.exists():
# # Delete all files in the buffer directory
# for file in buffer_dir.iterdir():
# if file.is_file() and file.suffix == ".meta":
# file.unlink()

buffer_metadata_file = self._get_buffer_file().with_suffix(".meta")
buffer_metadata_file.unlink()
self._get_state_meta_file().unlink()
# pass
# # Remove the buffer directory
# try:
# buffer_dir.rmdir()
# except OSError as e:
# logger.error(f"Failed to remove buffer directory {buffer_dir}: {e}")
# pass
# # Remove the buffer directory
# try:
# buffer_dir.rmdir()
# except OSError as e:
# logger.error(f"Failed to remove buffer directory {buffer_dir}: {e}")
logger.debug("Buffer directory cleanup complete.")

def _reset_state(self):
Expand Down
3 changes: 2 additions & 1 deletion datamimic_ce/exporters/xml_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ def __init__(
self.item_element = item_element

# Pass encoding via kwargs to the base class
kwargs["encoding"] = encoding or setup_context.default_encoding or "utf-8"
# kwargs["encoding"] = encoding or setup_context.default_encoding or "utf-8"

super().__init__(
exporter_type="xml",
setup_context=setup_context,
product_name=product_name,
encoding=encoding,
chunk_size=chunk_size,
mp_idx=mp_id,
**kwargs,
Expand Down
4 changes: 4 additions & 0 deletions datamimic_ce/tasks/generate_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,10 @@ def execute(self, context: SetupContext) -> dict[str, list] | None:
del result
# gc.collect()

# Clean temp directory on outermost gen_stmt
for temp_dir in context.descriptor_dir.glob(f"temp_result_{context.task_id}*"):
shutil.rmtree(temp_dir)

# Just return product generated in single process if gen_stmt is inner one
else:
# Do not apply process by page for inner gen_stmt
Expand Down
2 changes: 1 addition & 1 deletion tests_ce/functional_tests/csv_separatorr/test_separator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from pathlib import Path

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestDataIteration:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from pathlib import Path

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestDataSourceCyclic:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pathlib import Path
from types import NoneType

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestDataType:
Expand Down
2 changes: 1 addition & 1 deletion tests_ce/functional_tests/test_condition/test_conditon.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import pytest

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestCondition(TestCase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from pathlib import Path

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestConditionElement:
Expand Down
2 changes: 1 addition & 1 deletion tests_ce/functional_tests/test_datetime/test_datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import pytest

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestDateTime:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from pathlib import Path

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestArrayFunctional:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from pathlib import Path

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestFunctionalPageProcess:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from pathlib import Path

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestVariableFunctional:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import math
from pathlib import Path

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestWeightedDataSourceFunctional:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from pathlib import Path

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestGenerateDistribution:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from datamimic_ce.generators.generator_util import GeneratorUtil
from datamimic_ce.utils.file_util import FileUtil
from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


def count_digits_after_decimal(number):
Expand Down
2 changes: 1 addition & 1 deletion tests_ce/functional_tests/test_include/test_include.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from pathlib import Path

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestInclude:
Expand Down
2 changes: 1 addition & 1 deletion tests_ce/functional_tests/test_memstore/test_memstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from pathlib import Path

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestMemStore:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from pathlib import Path

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestMongoDbFunction:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from pathlib import Path

from tests_ce.data_mimic_test import DataMimicTest
from datamimic_ce.data_mimic_test import DataMimicTest


class TestNestedGenerate:
Expand Down
Loading

0 comments on commit 1178c44

Please sign in to comment.