Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate CE #9

Merged
merged 10 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[run]
source =
datamimic
parallel = True
[report]
exclude_lines =
pragma: no cover
omit =
docker/*
script/*
target/*
venv/*
tests/*
13 changes: 13 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Since the ".env" file is gitignored, you can use the ".env.example" file to
# build a new ".env" file when you clone the repo. Keep this file up-to-date
# when you add new variables to `.env`.

# This file will be committed to version control, so make sure not to have any
# secrets in it. If you are cloning this repo, create a copy of this file named
# ".env" and populate it with your secrets.

# Should be development or production
#WARNING: Please make sure this is set correctly for desired deployment behavior
RUNTIME_ENVIRONMENT=development
SQLALCHEMY_WARN_20=1

2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
- development

env:
DATAMIMIC_LIB_ENVIRONMENT: lib_staging
RUNTIME_ENVIRONMENT: production

jobs:
setup:
Expand Down
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,10 @@ uv.lock
#datamimic
datamimic.log
datamimic.log.*
exported_data/
exported_data/

# Temporary result files
**/**/temp_result*

# Exporter files result
**/**/exporter_result*
2 changes: 1 addition & 1 deletion datamimic_ce/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License (CC BY-NC-SA 4.0).
# For commercial use, please contact Rapiddweller at [email protected] to obtain a commercial license.
# Full license text available at: http://creativecommons.org/licenses/by-nc-sa/4.0/

import argparse
import json
import os
from importlib.resources import files
Expand Down
12 changes: 7 additions & 5 deletions datamimic_ce/clients/rdbms_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from sqlalchemy.orm import sessionmaker

from datamimic_ce.clients.database_client import DatabaseClient
from datamimic_ce.config import settings
from datamimic_ce.credentials.rdbms_credential import RdbmsCredential
from datamimic_ce.data_sources.data_source_pagination import \
DataSourcePagination
Expand Down Expand Up @@ -86,18 +87,19 @@ def create_sqlalchemy_engine(driver, user, password, host, port, db):
# Match the DBMS type and create the appropriate SQLAlchemy engine
match dbms:
case "sqlite":
environment = os.getenv("DATAMIMIC_LIB_ENVIRONMENT")
if environment in {"local", "staging", "production"}:
environment = settings.RUNTIME_ENVIRONMENT
if environment in {"development", "production"}:
if not self._task_id:
raise ValueError(
"Task ID is required to create SQLite db in task folder"
)
# Construct the database path within the task folder
db_path = Path("data") / "task" / self._task_id / f"{db}.sqlite"
if not db_path.exists():
logger.info(
f"Creating SQLite db file in task folder: {db_path}"
)
# Ensure the parent directory exists
logger.info(f"Creating SQLite db file in task folder: {db_path}")
db_path.parent.mkdir(parents=True, exist_ok=True)

else:
# Use a simple file-based SQLite database
db_path = Path(f"{db}.sqlite")
Expand Down
6 changes: 3 additions & 3 deletions datamimic_ce/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
# ENV Vars are automatically retrieved from .env or
# ENVIRONMENT VARS by using extending Class BaseSettings
class Settings(BaseSettings):
# Should be lib_local (for lib testing on local)
DATAMIMIC_LIB_ENVIRONMENT: Literal["lib_local", "lib_staging"] = "lib_local"
# Should be development or production
RUNTIME_ENVIRONMENT: Literal["development", "production"] = "production"

SC_PAGE_SIZE: int = 1000

DEFEAULT_LOGGER: str = "DATAMIMIC"
DEFAULT_LOGGER: str = "DATAMIMIC"

LIB_EDITION: str = "CE"
model_config = SettingsConfigDict(
Expand Down
4 changes: 4 additions & 0 deletions datamimic_ce/contexts/setup_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,10 @@ def report_logging(self) -> bool:
def report_logging(self, value) -> None:
self._report_logging = value

@property
def default_encoding(self):
return self._default_encoding

def add_client(self, client_id: str, client: Client):
"""
Add client info to context
Expand Down
133 changes: 97 additions & 36 deletions datamimic_ce/exporters/csv_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,110 @@
# Licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License (CC BY-NC-SA 4.0).
# For commercial use, please contact Rapiddweller at [email protected] to obtain a commercial license.
# Full license text available at: http://creativecommons.org/licenses/by-nc-sa/4.0/

import csv
import os
from typing import Dict, List, Tuple
from pathlib import Path
from typing import List

from datamimic_ce.contexts.setup_context import SetupContext
from datamimic_ce.exporters.file_exporter import FileExporter
from datamimic_ce.exporters.unified_buffered_exporter import UnifiedBufferedExporter
from datamimic_ce.logger import logger


class CSVExporter(FileExporter):
class CSVExporter(UnifiedBufferedExporter):
"""
Export generated data to CSV saved on Minio server
"""
def __init__(
self,
setup_context: SetupContext,
product_name: str,
fieldnames: List[str] = None,
chunk_size: int = None,
delimiter: str = None,
quotechar: str = None,
quoting: int = None,
encoding: str = None,
line_terminator: str = None,
**kwargs,
):

# Remove singleton pattern and initialize instance variables
self.fieldnames = fieldnames
self._task_id = setup_context.task_id

# Retrieve encoding and delimiter from setup_context or use defaults
kwargs["encoding"] = encoding or setup_context.default_encoding or "utf-8"
self.delimiter = delimiter or setup_context.default_separator or ","
self.quotechar = quotechar or '"'
self.quoting = quoting or csv.QUOTE_MINIMAL
self.line_terminator = line_terminator or setup_context.default_line_separator or os.linesep or "\n"

super().__init__("csv", setup_context, product_name,chunk_size=chunk_size, **kwargs)
logger.info(
f"CSVEEExporter initialized with chunk size {chunk_size}, fieldnames '{fieldnames}', "
f"encoding '{self._encoding}', delimiter '{self.delimiter}'"
)

def _write_data_to_buffer(self, data: List[dict]) -> None:
"""Writes data to the current buffer file in CSV format."""
try:
write_header = not self._buffer_file.exists()
with self._buffer_file.open("a", newline="", encoding=self._encoding) as csvfile:
if not self.fieldnames and data:
self.fieldnames = list(data[0].keys())
writer = csv.DictWriter(
csvfile,
fieldnames=self.fieldnames,
delimiter=self.delimiter,
quotechar=self.quotechar,
quoting=self.quoting,
extrasaction="ignore",
)
if write_header and self.fieldnames:
writer.writeheader()
for record in data:
writer.writerow(record)
logger.debug(f"Wrote {len(data)} records to buffer file: {self._buffer_file}")
self._is_first_write = False
except Exception as e:
logger.error(f"Error writing data to buffer: {e}")
raise

def _define_suffix(self) -> str:
"""Defines the file suffix based on the format."""
return "csv"

def _get_content_type(self) -> str:
"""Returns the MIME type for the data content."""
return "text/csv"

def _finalize_buffer_file(self, buffer_file: Path) -> None:
# No finalization needed for CSV files
pass

def _reset_state(self):
"""Resets the exporter state for reuse."""
super()._reset_state()
logger.debug("CSVEEExporter state has been reset.")

# def __init__(self, setup_context: SetupContext, product_name: str):
# super().__init__("csv", setup_context, product_name)
# self._separator = setup_context.default_separator
# self._line_separator = setup_context.default_line_separator or os.linesep

_instance = None

def __new__(cls, *_):
"""
Create singleton instance of CSVConsumer
"""
if cls._instance is None:
cls._instance = super(CSVExporter, cls).__new__(cls)
return cls._instance

def __init__(self, setup_context: SetupContext):
super().__init__(setup_context)
self._separator = setup_context.default_separator
self._line_separator = setup_context.default_line_separator or os.linesep

def consume(self, product: Tuple[str, List[Dict]]) -> None:
name = product[0]
data = product[1]

file_name = f"{name}.csv"
with open(self._exported_data_dir / file_name, "w") as file:
if data:
keys = data[0].keys()
file.write(f"{self._separator}".join(keys))
file.write(self._line_separator)
for row in data:
file.write(
f"{self._separator}".join([str(row[key]) for key in keys])
)
file.write(self._line_separator)

logger.debug(f"Exported data to {self._exported_data_dir / file_name}")
# def consume(self, product: Tuple[str, List[Dict]]) -> None:
# name = product[0]
# data = product[1]
#
# file_name = f"{name}.csv"
# with open(self._exported_data_dir / file_name, "w") as file:
# if data:
# keys = data[0].keys()
# file.write(f"{self._separator}".join(keys))
# file.write(self._line_separator)
# for row in data:
# file.write(f"{self._separator}".join([str(row[key]) for key in keys]))
# file.write(self._line_separator)
#
# logger.debug(f"Exported data to {self._exported_data_dir / file_name}")
Loading