From 2a280e9597960c5688f40b6ce2ff8f5c6587ce7a Mon Sep 17 00:00:00 2001 From: Dang Ly Date: Mon, 11 Nov 2024 02:18:26 +0700 Subject: [PATCH 1/9] Migrate CE --- .coveragerc | 13 + .github/workflows/main.yml | 2 +- .gitignore | 5 +- datamimic_ce/cli.py | 2 +- datamimic_ce/clients/rdbms_client.py | 12 +- datamimic_ce/config.py | 6 +- datamimic_ce/contexts/setup_context.py | 4 + datamimic_ce/exporters/exporter_util.py | 110 ++- datamimic_ce/exporters/mongodb_exporter.py | 26 +- datamimic_ce/logger/__init__.py | 4 +- datamimic_ce/parsers/parser_util.py | 83 +- datamimic_ce/statements/statement_util.py | 16 +- datamimic_ce/tasks/generate_task.py | 878 +++++++++--------- datamimic_ce/tasks/task_util.py | 219 ++--- tests_ce/conftest.py | 4 +- .../consumer_csv/test_csv_consumer.xml | 2 +- .../datamimic_demo/j-json/datamimic.xml | 2 +- .../datamimic_demo/p-xml/datamimic.xml | 2 +- .../test_exporters/data/people.json | 20 + .../test_exporters/data/products.ent.csv | 4 + .../test_exporters/multi_json.xml | 56 ++ .../test_exporters/multi_opensearch_bulk.xml | 46 + .../test_exporters/script/template_xyz.json | 32 + .../test_exporters/single_cascaded_cases.xml | 30 + .../test_exporters/single_combine_all.xml | 63 ++ .../test_exporters/single_csv.xml | 77 ++ .../test_exporters/single_json.xml | 63 ++ .../single_json_single_cascaded_cases.xml | 56 ++ .../test_exporters/single_opensearch_bulk.xml | 60 ++ .../test_exporters/single_txt.xml | 63 ++ .../test_exporters/single_xml.xml | 63 ++ .../test_exporters/test_exporters.py | 56 ++ .../test_mongodb/test_mongodb_intergration.py | 6 +- .../test_rdbms/test_rdbms.py | 4 +- tests_ce/test_exporter_util.py | 117 +++ tests_ce/unit_tests/exporter/__init__.py | 7 + .../unit_tests/exporter/test_csv_exporter.py | 456 +++++++++ .../unit_tests/exporter/test_json_exporter.py | 428 +++++++++ .../exporter/test_opeansearch_bulk.py | 371 ++++++++ .../unit_tests/exporter/test_txt_exporter.py | 452 +++++++++ .../unit_tests/exporter/test_xml_exporter.py | 286 ++++++ update_copyright.py | 141 +++ 42 files changed, 3684 insertions(+), 663 deletions(-) create mode 100644 .coveragerc create mode 100644 tests_ce/integration_tests/test_exporters/data/people.json create mode 100644 tests_ce/integration_tests/test_exporters/data/products.ent.csv create mode 100644 tests_ce/integration_tests/test_exporters/multi_json.xml create mode 100644 tests_ce/integration_tests/test_exporters/multi_opensearch_bulk.xml create mode 100644 tests_ce/integration_tests/test_exporters/script/template_xyz.json create mode 100644 tests_ce/integration_tests/test_exporters/single_cascaded_cases.xml create mode 100644 tests_ce/integration_tests/test_exporters/single_combine_all.xml create mode 100644 tests_ce/integration_tests/test_exporters/single_csv.xml create mode 100644 tests_ce/integration_tests/test_exporters/single_json.xml create mode 100644 tests_ce/integration_tests/test_exporters/single_json_single_cascaded_cases.xml create mode 100644 tests_ce/integration_tests/test_exporters/single_opensearch_bulk.xml create mode 100644 tests_ce/integration_tests/test_exporters/single_txt.xml create mode 100644 tests_ce/integration_tests/test_exporters/single_xml.xml create mode 100644 tests_ce/integration_tests/test_exporters/test_exporters.py create mode 100644 tests_ce/test_exporter_util.py create mode 100644 tests_ce/unit_tests/exporter/__init__.py create mode 100644 tests_ce/unit_tests/exporter/test_csv_exporter.py create mode 100644 tests_ce/unit_tests/exporter/test_json_exporter.py create mode 100644 tests_ce/unit_tests/exporter/test_opeansearch_bulk.py create mode 100644 tests_ce/unit_tests/exporter/test_txt_exporter.py create mode 100644 tests_ce/unit_tests/exporter/test_xml_exporter.py create mode 100644 update_copyright.py diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..328d727 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,13 @@ +[run] +source = + datamimic +parallel = True +[report] +exclude_lines = + pragma: no cover +omit = + docker/* + script/* + target/* + venv/* + tests/* diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 07e1b25..7c9ae25 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -11,7 +11,7 @@ on: - development env: - DATAMIMIC_LIB_ENVIRONMENT: lib_staging + RUNTIME_ENVIRONMENT: production jobs: setup: diff --git a/.gitignore b/.gitignore index 3af70db..dbbdeac 100644 --- a/.gitignore +++ b/.gitignore @@ -59,4 +59,7 @@ uv.lock #datamimic datamimic.log datamimic.log.* -exported_data/ \ No newline at end of file +exported_data/ + +# Temporary result files +**/**/temp_result* \ No newline at end of file diff --git a/datamimic_ce/cli.py b/datamimic_ce/cli.py index d60db19..2f2b6ca 100644 --- a/datamimic_ce/cli.py +++ b/datamimic_ce/cli.py @@ -3,7 +3,7 @@ # Licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License (CC BY-NC-SA 4.0). # For commercial use, please contact Rapiddweller at info@rapiddweller.com to obtain a commercial license. # Full license text available at: http://creativecommons.org/licenses/by-nc-sa/4.0/ - +import argparse import json import os from importlib.resources import files diff --git a/datamimic_ce/clients/rdbms_client.py b/datamimic_ce/clients/rdbms_client.py index 6654223..e689e98 100644 --- a/datamimic_ce/clients/rdbms_client.py +++ b/datamimic_ce/clients/rdbms_client.py @@ -17,6 +17,7 @@ from sqlalchemy.orm import sessionmaker from datamimic_ce.clients.database_client import DatabaseClient +from datamimic_ce.config import settings from datamimic_ce.credentials.rdbms_credential import RdbmsCredential from datamimic_ce.data_sources.data_source_pagination import \ DataSourcePagination @@ -86,8 +87,8 @@ def create_sqlalchemy_engine(driver, user, password, host, port, db): # Match the DBMS type and create the appropriate SQLAlchemy engine match dbms: case "sqlite": - environment = os.getenv("DATAMIMIC_LIB_ENVIRONMENT") - if environment in {"local", "staging", "production"}: + environment = settings.RUNTIME_ENVIRONMENT + if environment in {"development", "production"}: if not self._task_id: raise ValueError( "Task ID is required to create SQLite db in task folder" @@ -95,9 +96,10 @@ def create_sqlalchemy_engine(driver, user, password, host, port, db): # Construct the database path within the task folder db_path = Path("data") / "task" / self._task_id / f"{db}.sqlite" if not db_path.exists(): - logger.info( - f"Creating SQLite db file in task folder: {db_path}" - ) + # Ensure the parent directory exists + logger.info(f"Creating SQLite db file in task folder: {db_path}") + db_path.parent.mkdir(parents=True, exist_ok=True) + else: # Use a simple file-based SQLite database db_path = Path(f"{db}.sqlite") diff --git a/datamimic_ce/config.py b/datamimic_ce/config.py index 50ac073..04104fd 100644 --- a/datamimic_ce/config.py +++ b/datamimic_ce/config.py @@ -12,12 +12,12 @@ # ENV Vars are automatically retrieved from .env or # ENVIRONMENT VARS by using extending Class BaseSettings class Settings(BaseSettings): - # Should be lib_local (for lib testing on local) - DATAMIMIC_LIB_ENVIRONMENT: Literal["lib_local", "lib_staging"] = "lib_local" + # Should be development or production + RUNTIME_ENVIRONMENT: Literal["development", "production"] = "production" SC_PAGE_SIZE: int = 1000 - DEFEAULT_LOGGER: str = "DATAMIMIC" + DEFAULT_LOGGER: str = "DATAMIMIC" LIB_EDITION: str = "CE" model_config = SettingsConfigDict( diff --git a/datamimic_ce/contexts/setup_context.py b/datamimic_ce/contexts/setup_context.py index e380665..6be3331 100644 --- a/datamimic_ce/contexts/setup_context.py +++ b/datamimic_ce/contexts/setup_context.py @@ -393,6 +393,10 @@ def report_logging(self) -> bool: def report_logging(self, value) -> None: self._report_logging = value + @property + def default_encoding(self): + return self._default_encoding + def add_client(self, client_id: str, client: Client): """ Add client info to context diff --git a/datamimic_ce/exporters/exporter_util.py b/datamimic_ce/exporters/exporter_util.py index fcf0935..5327134 100644 --- a/datamimic_ce/exporters/exporter_util.py +++ b/datamimic_ce/exporters/exporter_util.py @@ -76,11 +76,21 @@ def create_exporter_list( exporter_str_list = list(stmt.targets) - for exporter_str in exporter_str_list: - # Handle consumers with operation - if "." in exporter_str: - # consumer, operation = ConsumerUtil.get_consumer_with_operation() - consumer_name, operation = exporter_str.split(".") + # Join the list back into a string + target_str = ",".join(exporter_str_list) + + # Parse the target string using the parse_function_string function + try: + parsed_targets = ExporterUtil.parse_function_string(target_str) + except ValueError as e: + raise ValueError(f"Error parsing target string: {e}") + + # Now loop over the parsed functions and create exporters + for target in parsed_targets: + exporter_name = target["function_name"] + params = target["params"] if target["params"] else {} # Handle consumers with operation + if "." in exporter_name: + consumer_name, operation = exporter_name.split(".", 1) client = setup_context.get_client_by_id(consumer_name) consumer = ExporterUtil._create_exporter_from_client( client, consumer_name @@ -90,13 +100,97 @@ def create_exporter_list( else: consumer = ExporterUtil.get_exporter_by_name( setup_context=setup_context, - name=exporter_str, + name=exporter_name, + params=params, ) if consumer is not None: consumers_without_operation.append(consumer) return consumers_with_operation, consumers_without_operation + def parse_function_string(function_string): + parsed_functions = [] + # Remove spaces and check if only commas or blank string are provided + if function_string.strip() == "" or all(char in ", " for char in function_string): + return parsed_functions + + # Wrap the function string in a list to make it valid Python code + code_to_parse = f"[{function_string}]" + + try: + # Parse the code into an AST node + module = ast.parse(code_to_parse, mode="eval") + except SyntaxError as e: + raise ValueError(f"Error parsing function string: {e}") + + # Ensure the parsed node is a list + if not isinstance(module.body, ast.List): + raise ValueError("Function string is not a valid list of function calls.") + + # Iterate over each element in the list + for element in module.body.elts: + # Handle function calls with parameters + if isinstance(element, ast.Call): + # Extract function name, including dot notation (e.g., mongodb.upsert) + if isinstance(element.func, ast.Name): + function_name = element.func.id + elif isinstance(element.func, ast.Attribute): + # Capture the full dotted name + parts = [] + current = element.func + while isinstance(current, ast.Attribute): + parts.append(current.attr) + current = current.value + if isinstance(current, ast.Name): + parts.append(current.id) + function_name = ".".join(reversed(parts)) + else: + raise ValueError("Unsupported function type in function call.") + + params = {} + # Extract keyword arguments + for keyword in element.keywords: + key = keyword.arg + try: + # Safely evaluate the value using ast.literal_eval + value = ast.literal_eval(keyword.value) + except (ValueError, SyntaxError): + # If evaluation fails, raise error for non-literal parameters + raise ValueError(f"Non-literal parameter found: {keyword.value}") + params[key] = value + + parsed_functions.append({"function_name": function_name, "params": params}) + # Handle function names without parameters, including dotted names like mongodb.delete + elif isinstance(element, ast.Attribute): + # For dotted names like mongodb.delete + parts = [] + current = element + while isinstance(current, ast.Attribute): + parts.append(current.attr) + current = current.value + if isinstance(current, ast.Name): + parts.append(current.id) + function_name = ".".join(reversed(parts)) + parsed_functions.append({"function_name": function_name, "params": None}) + elif isinstance(element, ast.Name): + # For single names like CSV + function_name = element.id + parsed_functions.append({"function_name": function_name, "params": None}) + elif isinstance(element, ast.Constant): # For Python 3.8+, for older versions use ast.Str or ast.Num + # This handles cases like 'CSV' and 'JSON' if they are given as strings + function_name = element.value + parsed_functions.append({"function_name": function_name, "params": None}) + else: + # Attempt to evaluate other expressions (e.g., strings, numbers) + try: + value = ast.literal_eval(element) + function_name = str(value) + parsed_functions.append({"function_name": function_name, "params": None}) + except Exception: + raise ValueError("Unsupported expression in function string.") + + return parsed_functions + @staticmethod def get_exporter_by_name(setup_context: SetupContext, name: str) -> Exporter: """ @@ -146,11 +240,11 @@ def _create_exporter_from_client(client: Client, client_name: str): raise ValueError(f"Cannot create target for client {client_name}") @staticmethod - def json_dumps(data: object) -> str: + def json_dumps(data: object, indent=4) -> str: """ JSON dump with default custom serializer """ - return json.dumps(data, default=custom_serializer, ensure_ascii=False, indent=4) + return json.dumps(data, default=custom_serializer, ensure_ascii=False, indent=indent) @staticmethod def check_path_format(path) -> str: diff --git a/datamimic_ce/exporters/mongodb_exporter.py b/datamimic_ce/exporters/mongodb_exporter.py index 3aad04b..53db3d0 100644 --- a/datamimic_ce/exporters/mongodb_exporter.py +++ b/datamimic_ce/exporters/mongodb_exporter.py @@ -3,7 +3,7 @@ # Licensed under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License (CC BY-NC-SA 4.0). # For commercial use, please contact Rapiddweller at info@rapiddweller.com to obtain a commercial license. # Full license text available at: http://creativecommons.org/licenses/by-nc-sa/4.0/ - +import copy from typing import Tuple from datamimic_ce.clients.mongodb_client import MongoDBClient @@ -16,8 +16,9 @@ def __init__(self, client: MongoDBClient): def consume(self, product) -> None: """Write data into MongoDB database""" - name = product[0] - data = product[1] + temp_product = copy.deepcopy(product) + name = temp_product[0] + data = temp_product[1] self._client.insert(name, data, False) def update(self, product: Tuple) -> int: @@ -25,8 +26,9 @@ def update(self, product: Tuple) -> int: Update data into MongoDB database :return: The number of documents matched for an update. """ - if len(product) > 2: - data = product[1] + temp_product = copy.deepcopy(product) + if len(temp_product) > 2: + data = temp_product[1] # product[2] contain "type" or "selector" attribute info target_query = product[2] return self._client.update(target_query, data) @@ -37,17 +39,17 @@ def upsert(self, product: Tuple) -> Tuple: """ Update MongoDB data with upsert {true} """ - name, product_list, selector_dict = product - return name, self._client.upsert( - selector_dict=selector_dict, updated_data=product_list - ) + temp_product = copy.deepcopy(product) + name, product_list, selector_dict = temp_product + return name, self._client.upsert(selector_dict=selector_dict, updated_data=product_list) def delete(self, product: Tuple): """ Delete data from MongoDB database """ - if len(product) > 2: - data = product[1] + temp_product = copy.deepcopy(product) + if len(temp_product) > 2: + data = temp_product[1] # product[2] contain "type" or "selector" attribute info - target_query = product[2] + target_query = temp_product[2] self._client.delete(target_query, data) diff --git a/datamimic_ce/logger/__init__.py b/datamimic_ce/logger/__init__.py index ec93271..eaeff46 100644 --- a/datamimic_ce/logger/__init__.py +++ b/datamimic_ce/logger/__init__.py @@ -43,7 +43,7 @@ def setup_logger(logger_name, log_file, task_id, level=logging.DEBUG): if not l.handlers: # Avoid adding duplicate handlers l.setLevel(level) - if settings.DATAMIMIC_LIB_ENVIRONMENT in ["lib_local", "lib_staging"]: + if settings.RUNTIME_ENVIRONMENT in ["development", "production"]: l.addHandler(stream_handler) l.addHandler(file_handler) else: @@ -52,4 +52,4 @@ def setup_logger(logger_name, log_file, task_id, level=logging.DEBUG): l.propagate = False # Avoid propagation to the parent logger -logger = logging.getLogger(settings.DEFEAULT_LOGGER) +logger = logging.getLogger(settings.DEFAULT_LOGGER) diff --git a/datamimic_ce/parsers/parser_util.py b/datamimic_ce/parsers/parser_util.py index bcbdb74..b6e89dd 100644 --- a/datamimic_ce/parsers/parser_util.py +++ b/datamimic_ce/parsers/parser_util.py @@ -11,20 +11,31 @@ from typing import Dict, List, Set from xml.etree.ElementTree import Element -from datamimic_ce.constants.attribute_constants import (ATTR_ENVIRONMENT, - ATTR_ID, ATTR_SYSTEM) -from datamimic_ce.constants.element_constants import (EL_ARRAY, EL_CONDITION, - EL_DATABASE, EL_ECHO, - EL_ELEMENT, EL_ELSE, - EL_ELSE_IF, EL_EXECUTE, - EL_GENERATE, - EL_GENERATOR, EL_IF, - EL_INCLUDE, EL_ITEM, - EL_KEY, EL_LIST, - EL_MEMSTORE, EL_MONGODB, - EL_NESTED_KEY, - EL_REFERENCE, EL_SETUP, - EL_VARIABLE) +from datamimic_ce.config import settings +from datamimic_ce.constants.attribute_constants import ATTR_ENVIRONMENT, ATTR_ID, ATTR_SYSTEM +from datamimic_ce.constants.element_constants import ( + EL_ARRAY, + EL_CONDITION, + EL_DATABASE, + EL_ECHO, + EL_ELEMENT, + EL_ELSE, + EL_ELSE_IF, + EL_EXECUTE, + EL_GENERATE, + EL_GENERATOR, + EL_IF, + EL_INCLUDE, + EL_ITEM, + EL_KEY, + EL_LIST, + EL_MEMSTORE, + EL_MONGODB, + EL_NESTED_KEY, + EL_REFERENCE, + EL_SETUP, + EL_VARIABLE, +) from datamimic_ce.logger import logger from datamimic_ce.parsers.array_parser import ArrayParser from datamimic_ce.parsers.condition_parser import ConditionParser @@ -220,19 +231,13 @@ def parse_sub_elements( copied_props = copy.deepcopy(properties) or {} for child_ele in element: - parser = self.get_parser_by_element( - class_factory_util, child_ele, copied_props - ) + parser = self.get_parser_by_element(class_factory_util, child_ele, copied_props) # TODO: add more child-element-able parsers such as attribute, reference, part,... (i.e. elements which have attribute 'name') - if isinstance( - parser, (VariableParser, GenerateParser, NestedKeyParser, ElementParser) - ): + if isinstance(parser, (VariableParser, GenerateParser, NestedKeyParser, ElementParser)): if isinstance(parser, VariableParser) and element.tag == "setup": stmt = parser.parse(parent_stmt=parent_stmt, has_parent_setup=True) elif isinstance(parser, (GenerateParser, NestedKeyParser)): - stmt = parser.parse( - descriptor_dir=descriptor_dir, parent_stmt=parent_stmt - ) + stmt = parser.parse(descriptor_dir=descriptor_dir, parent_stmt=parent_stmt) else: stmt = parser.parse(parent_stmt=parent_stmt) else: @@ -249,10 +254,7 @@ def parse_sub_elements( ), ): stmt = parser.parse() - elif isinstance( - parser, - (KeyParser, ConditionParser, IfParser, ElseIfParser, ElseParser), - ): + elif isinstance(parser, (KeyParser, ConditionParser, IfParser, ElseIfParser, ElseParser)): stmt = parser.parse(descriptor_dir, parent_stmt) else: stmt = parser.parse(descriptor_dir) @@ -270,9 +272,7 @@ def parse_sub_elements( return result @staticmethod - def retrieve_element_attributes( - attributes: Dict[str, str], properties: Dict[str, str] - ) -> Dict[str, str]: + def retrieve_element_attributes(attributes: Dict[str, str], properties: Dict[str, str]) -> Dict[str, str]: """ Retrieve element's attributes using environment properties :param attributes: @@ -284,11 +284,7 @@ def retrieve_element_attributes( # Look up element's attributes defined as variable then evaluate them for key, value in attributes.items(): - if ( - type(value) is str - and re.match(r"^\{[a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z0-9_]+)*\}$", value) - is not None - ): + if type(value) is str and re.match(r"^\{[a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z0-9_]+)*\}$", value) is not None: prop_key = value[1:-1] if "." not in prop_key: @@ -329,11 +325,7 @@ def fulfill_credentials_v2( environment = ( descriptor_attr.get(ATTR_ENVIRONMENT) - or ( - "local" - if os.environ.get("DATAMIMIC_LIB_ENVIRONMENT") == "lib_local" - else None - ) + or ("local" if os.environ.get("DATAMIMIC_LIB_ENVIRONMENT") == "lib_local" else None) or "environment" ) system = descriptor_attr.get(ATTR_SYSTEM) @@ -359,18 +351,13 @@ def fulfill_credentials_v2( ) # Update env props from env file conf_props.update(env_props_from_env_file) - except FileNotFoundError: - logger.info( - f"Environment file not found {str(descriptor_dir / f'conf/{environment}.env.properties')}" - ) + except FileNotFoundError as e: + logger.info(f"Environment file not found {str(descriptor_dir / f'conf/{environment}.env.properties')}") credentials = copy.deepcopy(descriptor_attr) for attr_key, attr_value in conf_props.items(): - if ( - attr_key.startswith(f"{system}.{system_type}.") - and attr_value is not None - ): + if attr_key.startswith(f"{system}.{system_type}.") and attr_value is not None: attr_name = "".join(attr_key.split(".")[2:]) credentials[attr_name] = attr_value diff --git a/datamimic_ce/statements/statement_util.py b/datamimic_ce/statements/statement_util.py index af336f0..ff28bd9 100644 --- a/datamimic_ce/statements/statement_util.py +++ b/datamimic_ce/statements/statement_util.py @@ -13,16 +13,24 @@ class StatementUtil: @staticmethod def parse_consumer(consumer_string: str) -> Set[str]: """ - Parse attribute "consumer" to list of consumer - :param consumer_string: - :return: + Parse the 'consumer' attribute into a set of consumers. + Splits on commas not enclosed within parentheses. """ consumer_list = ( [] if consumer_string is None else list(map(lambda ele: ele.strip(), consumer_string.split(","))) ) - # Avoid duplicated consumer + + # Pattern to split on commas not inside parentheses + pattern = r",\s*(?![^(]*\))" + consumer_list = re.split(pattern, consumer_string) + + # Strip whitespace from each consumer + consumer_list = [consumer.strip() for consumer in consumer_list if consumer.strip()] + + # Avoid duplicated consumers +>>>>>>> e96c3c0 (Migrate CE) consumer_set = set(consumer_list) return consumer_set diff --git a/datamimic_ce/tasks/generate_task.py b/datamimic_ce/tasks/generate_task.py index 4729775..e73ed3b 100644 --- a/datamimic_ce/tasks/generate_task.py +++ b/datamimic_ce/tasks/generate_task.py @@ -25,19 +25,28 @@ from datamimic_ce.clients.database_client import DatabaseClient from datamimic_ce.clients.rdbms_client import RdbmsClient from datamimic_ce.constants.exporter_constants import ( - EXPORTER_CSV, EXPORTER_JSON, EXPORTER_TEST_RESULT_EXPORTER, EXPORTER_TXT, - EXPORTER_XML) + EXPORTER_CSV, + EXPORTER_JSON, + EXPORTER_TEST_RESULT_EXPORTER, + EXPORTER_TXT, + EXPORTER_XML, +) from datamimic_ce.contexts.context import Context from datamimic_ce.contexts.geniter_context import GenIterContext from datamimic_ce.contexts.setup_context import SetupContext -from datamimic_ce.data_sources.data_source_pagination import \ - DataSourcePagination +from datamimic_ce.data_sources.data_source_pagination import DataSourcePagination from datamimic_ce.data_sources.data_source_util import DataSourceUtil from datamimic_ce.exporters.mongodb_exporter import MongoDBExporter from datamimic_ce.logger import logger +from datamimic_ce.statements.array_statement import ArrayStatement from datamimic_ce.statements.composite_statement import CompositeStatement +from datamimic_ce.statements.element_statement import ElementStatement from datamimic_ce.statements.generate_statement import GenerateStatement +from datamimic_ce.statements.if_statement import IfStatement +from datamimic_ce.statements.item_statement import ItemStatement from datamimic_ce.statements.key_statement import KeyStatement +from datamimic_ce.statements.list_statement import ListStatement +from datamimic_ce.statements.nested_key_statement import NestedKeyStatement from datamimic_ce.statements.setup_statement import SetupStatement from datamimic_ce.statements.statement import Statement from datamimic_ce.tasks.task import Task @@ -46,10 +55,10 @@ def _wrapper(args): """ - Wrapper mp function, deserialize args functions + Wrapper multiprocessing function to deserialize args and execute the generate function. - :param args: - :return: + :param args: Tuple containing necessary arguments. + :return: Result from single_process_execute_function. """ ( local_ctx, @@ -62,25 +71,23 @@ def _wrapper(args): mp_chunk_size, ) = args - # Deserialized util functions + # Deserialize utility functions namespace_functions = dill.loads(namespace_functions) local_ctx.namespace.update(namespace_functions) local_ctx.generators = dill.loads(local_ctx.generators) - return single_process_execute_function( - (local_ctx, statement, chunk_data, mp_idx, page_size, mp_chunk_size) - ) + return single_process_execute_function((local_ctx, statement, chunk_data, mp_idx, page_size, mp_chunk_size)) def _geniter_single_process_generate(args: Tuple) -> Dict[str, List]: """ - (IMPORTANT: Only be used as mp function) Generate product in each single process + (IMPORTANT: Only to be used as multiprocessing function) Generate product in each single process. - :param args: - :return: + :param args: Tuple containing context, statement, and index range. + :return: Dictionary with generated products. """ - # Parse args list + # Parse args context: Context = args[0] root_context: SetupContext = context.root stmt: GenerateStatement = args[1] @@ -101,16 +108,13 @@ def _geniter_single_process_generate(args: Tuple) -> Dict[str, List]: # 1: Build sub-tasks in GenIterStatement tasks = [ - task_util_cls.get_task_by_statement(root_context, child_stmt, pagination) - for child_stmt in stmt.sub_statements + task_util_cls.get_task_by_statement(root_context, child_stmt, pagination) for child_stmt in stmt.sub_statements ] # 2: Load data source source_str = stmt.source source_scripted = ( - stmt.source_script - if stmt.source_script is not None - else bool(root_context.default_source_scripted) + stmt.source_script if stmt.source_script is not None else bool(root_context.default_source_scripted) ) separator = stmt.separator or root_context.default_separator is_random_distribution = stmt.distribution in ("random", None) @@ -120,31 +124,25 @@ def _geniter_single_process_generate(args: Tuple) -> Dict[str, List]: load_start_idx = None load_end_idx = None load_pagination = None - source_data, build_from_source = ( - context.root.class_factory_util.get_task_util_cls().gen_task_load_data_from_source( - context, - stmt, - source_str, - separator, - source_scripted, - processed_data_count, - load_start_idx, - load_end_idx, - load_pagination, - ) + source_data, build_from_source = context.root.class_factory_util.get_task_util_cls().gen_task_load_data_from_source( + context, + stmt, + source_str, + separator, + source_scripted, + processed_data_count, + load_start_idx, + load_end_idx, + load_pagination, ) if is_random_distribution: seed = root_context.get_distribution_seed() # Use original pagination for shuffling - source_data = DataSourceUtil.get_shuffled_data_with_cyclic( - source_data, pagination, stmt.cyclic, seed - ) - - # TODO: split loading data and generating/modifying data + source_data = DataSourceUtil.get_shuffled_data_with_cyclic(source_data, pagination, stmt.cyclic, seed) # Keep current product and sub product in product_holder on non low memory mode - product_holder: Dict[str, List] = dict() + product_holder: Dict[str, List] = {} # Store temp result result = [] @@ -203,11 +201,11 @@ def _geniter_single_process_generate(args: Tuple) -> Dict[str, List]: def _geniter_single_process_generate_and_consume_by_page(args: Tuple) -> Dict: """ - IMPORTANT: Used as mp page process function only - Generate then consume product in each single process by page + IMPORTANT: Used as multiprocessing page process function only. + Generate then consume product in each single process by page. - :param args: - :return: + :param args: Tuple containing necessary arguments. + :return: Dictionary with generated products if needed. """ context: SetupContext = args[0] stmt: GenerateStatement = args[1] @@ -217,37 +215,39 @@ def _geniter_single_process_generate_and_consume_by_page(args: Tuple) -> Dict: mp_chunk_size = args[5] # Calculate page chunk - index_chunk = [ - (i, min(i + page_size, end_idx)) for i in range(start_idx, end_idx, page_size) - ] + index_chunk = [(i, min(i + page_size, end_idx)) for i in range(start_idx, end_idx, page_size)] - # Check if product result should be returned on mp process + # Check if product result should be returned on multiprocessing process return_product_result = context.test_mode or any( - [ - context.memstore_manager.contain(consumer_str) - for consumer_str in stmt.targets - ] + [context.memstore_manager.contain(consumer_str) for consumer_str in stmt.targets] ) result = {} # Generate and consume product by page args_list = list(args) for page_idx, index_tuple in enumerate(index_chunk): + # Index tuple for each page args_list[2] = index_tuple updated_args = tuple(args_list) result_dict = _geniter_single_process_generate(updated_args) _consume_by_page( - stmt, context, result_dict, page_idx, page_size, mp_idx, mp_chunk_size + stmt, + context, + result_dict, + page_idx, + page_size, + mp_idx, + mp_chunk_size, + page_idx == len(index_chunk) - 1, ) - if return_product_result: for key, value in result_dict.items(): result[key] = result.get(key, []) + value # Manual garbage collection del result_dict - gc.collect() + # gc.collect() return result @@ -260,56 +260,31 @@ def _consume_by_page( page_size: int, mp_idx: int, mp_chunk_size: int, + is_last_page: bool, ) -> None: """ - Consume product by page. Only write temp data for MinioConsumer for later consume - :param stmt: - :param context: - :param xml_result: - :param page_idx: - :param mp_idx: - :return: + Consume product by page. + + :param stmt: GenerateStatement instance. + :param context: Context instance. + :param xml_result: Generated product data. + :param page_idx: Current page index. + :param page_size: Page size for processing. + :param mp_idx: Multiprocessing index. + :param mp_chunk_size: Chunk size for multiprocessing. + :param preview_record_length: Length for preview records. + :return: None """ root_ctx = context.root - # Consume MinioConsumer by writing temp file + # Consume specific exporters by writing temp files if necessary for stmt_name, xml_value in xml_result.items(): # Load current gen_stmt with corresponding targets current_stmt = stmt.retrieve_sub_statement_by_fullname(stmt_name) if current_stmt is None: raise ValueError(f"Cannot find element '{stmt_name}'") - # Write temp file for later MinioConsumer - if ( - len( - {EXPORTER_CSV, EXPORTER_JSON, EXPORTER_TXT, EXPORTER_XML} - & current_stmt.targets - ) - > 0 - ): - mp_idx_path = f"mp_{mp_idx}_" if mp_idx is not None else "" - page_idx_path = f"page_{page_idx}_" - - # Init temp file path - result_temp_file = ( - root_ctx.descriptor_dir - / f"temp_result_{root_ctx.task_id}" - / f"{mp_idx_path + page_idx_path + current_stmt.full_name}.json" - ) - - # Prepare temp directory - result_temp_file.parent.mkdir(parents=True, exist_ok=True) - - # Write to temp file - with result_temp_file.open("a") as file: - file.write( - json.dumps( - xml_value, indent=4, default=minio_serialization_function - )[1:-1] - + "," - ) - - # Consume non MinioConsumer + # Consume non-specific exporters _consume_outermost_gen_stmt_by_page( stmt, context, @@ -320,57 +295,40 @@ def _consume_by_page( mp_chunk_size, page_size, ), + is_last_page ) -def minio_serialization_function(obj): - """ - Serialization function for MinioConsumer - :param obj: - :return: +def _pre_consume_product(stmt: GenerateStatement, dict_result: List[Dict]) -> Tuple: """ - if isinstance(obj, datetime): - return obj.isoformat() - raise TypeError( - f"Object of type '{type(obj).__name__}' is not JSON serializable on MinioConsumer" - ) - + Preprocess consumer data to adapt some special consumer (e.g., MongoDB upsert). -def _load_temp_result_file(result_temp_dir: Path) -> Dict: - """ - Load temp result file - :return: + :param stmt: GenerateStatement instance. + :param dict_result: Generated data. + :return: Preprocessed product tuple. """ - result_dict = {} - - # Gather file path by product name - file_dict = defaultdict(list) - for file_path in result_temp_dir.glob("*.json"): - product_name = re.sub(r"^page_\d+_|^mp_\d+_page_\d+_", "", file_path.stem) - file_dict[product_name].append(file_path) - - # Load data from each file - for product_name, file_paths in file_dict.items(): - sorted_file_paths = sorted(file_paths) - file_data = "" - for file_path in sorted_file_paths: - with file_path.open("r") as file: - file_data += file.read()[:-1] + "," - result_dict[product_name] = json.loads("[" + file_data[:-1] + "]") - - return result_dict + if getattr(stmt, "selector", False): + packed_result = (stmt.name, dict_result, {"selector": stmt.selector}) + elif getattr(stmt, "type", False): + packed_result = (stmt.name, dict_result, {"type": stmt.type}) + else: + packed_result = (stmt.name, dict_result) + return packed_result def _consume_outermost_gen_stmt_by_page( - stmt: GenerateStatement, context: Context, result_dict: Dict, page_info: Tuple + stmt: GenerateStatement, context: Context, result_dict: Dict, page_info: Tuple, is_last_page: bool, + ) -> None: """ - Consume result_dict returned by outermost gen_stmt - - :param stmt: - :param context: - :param result_dict: - :return: + Consume result_dict returned by outermost gen_stmt. + + :param stmt: GenerateStatement instance. + :param context: Context instance. + :param result_dict: Generated product data. + :param page_info: Tuple containing page information. + :param preview_record_length: Length for preview records. + :return: None """ report_logging = isinstance(context, SetupContext) and context.report_logging @@ -390,23 +348,45 @@ def _consume_outermost_gen_stmt_by_page( xml_result=result, page_info=page_info, ) + if is_last_page: + _finalize_and_export_consumers(context, sub_stmt) -def _pre_consume_product(stmt, dict_result): +def _finalize_and_export_consumers(context: Context, stmt: GenerateStatement): """ - Preprocess consumer data to adapt some special consumer (e.g. MongoDB upsert) - :param stmt: - :param dict_result: - :return: + Finalize chunks and export data for all consumers that require it. + + :param context: Context instance. + :param stmt: GenerateStatement instance. + :return: None """ - if getattr(stmt, "selector", False): - packed_result = (stmt.name, dict_result, {"selector": stmt.selector}) - elif getattr(stmt, "type", False): - packed_result = (stmt.name, dict_result, {"type": stmt.type}) - else: - packed_result = (stmt.name, dict_result) - return packed_result + # Create list of consumers that need to finalize and export + consumers_with_operation, consumers_without_operation = ExporterUtil.create_exporter_list( + setup_context=context.root, + product_name=stmt.name, + consumer_str_list=list(stmt.targets), + storage_type=stmt.storage_id, + target_uri=stmt.export_uri, + ) + # Combine all consumers + all_consumers = [consumer for consumer, _ in consumers_with_operation] + consumers_without_operation + + for consumer in all_consumers: + # Only finalize and export if the exporter has these methods + try: + # Construct the export name to include task_id + export_name = f"{context.root.task_id}/{stmt.name}" + + if hasattr(consumer, "finalize_chunks"): + consumer.finalize_chunks() + if hasattr(consumer, "upload_to_storage"): + consumer.upload_to_storage(bucket=stmt.bucket or stmt.container, name=export_name) + # Only clean up on outermost generate task + if isinstance(context, SetupContext) and hasattr(consumer, "cleanup"): + consumer.cleanup() + except Exception as e: + logger.error(f"Error finalizing and exporting data for {consumer}: {e}") def _load_csv_file( ctx: SetupContext, @@ -420,77 +400,65 @@ def _load_csv_file( suffix: str, ) -> List[Dict]: """ - Load csv content from file with skip and limit - :param separator: - :param file_path: - :param start_idx: - :param end_idx: - :param prefix: - :param suffix: - :return: + Load CSV content from file with skip and limit. + + :param file_path: Path to the CSV file. + :param separator: CSV delimiter. + :param cyclic: Whether to cycle through data. + :param start_idx: Starting index. + :param end_idx: Ending index. + :return: List of dictionaries representing CSV rows. """ from datamimic_ce.tasks.task_util import TaskUtil - with file_path.open( - newline="", - ) as csvfile: + with file_path.open(newline="") as csvfile: reader = csv.DictReader(csvfile, delimiter=separator) pagination = ( DataSourcePagination(start_idx, end_idx - start_idx) if (start_idx is not None and end_idx is not None) else None ) - result = DataSourceUtil.get_cyclic_data_list( - data=list(reader), cyclic=cyclic, pagination=pagination - ) + result = DataSourceUtil.get_cyclic_data_list(data=list(reader), cyclic=cyclic, pagination=pagination) # if sourceScripted then evaluate python expression in csv if source_scripted: - return TaskUtil.evaluate_file_script_template( - ctx=ctx, datas=result, prefix=prefix, suffix=suffix - ) + return TaskUtil.evaluate_file_script_template(ctx=ctx, datas=result, prefix=prefix, suffix=suffix) return result -def _load_json_file( - file_path: Path, cyclic: bool, start_idx: int, end_idx: int -) -> List[Dict]: +def _load_json_file(file_path: Path, cyclic: bool, start_idx: int, end_idx: int) -> List[Dict]: """ - Load json content from file using skip and limit - :param cyclic: - :param file_path: - :param start_idx: - :param end_idx: - :return: + Load JSON content from file using skip and limit. + + :param file_path: Path to the JSON file. + :param cyclic: Whether to cycle through data. + :param start_idx: Starting index. + :param end_idx: Ending index. + :return: List of dictionaries representing JSON objects. """ # Read the JSON data from a file with file_path.open("r") as file: data = json.load(file) if not isinstance(data, list): - raise ValueError( - f"JSON file '{file_path.name}' must contain a list of objects" - ) + raise ValueError(f"JSON file '{file_path.name}' must contain a list of objects") pagination = ( DataSourcePagination(start_idx, end_idx - start_idx) if (start_idx is not None and end_idx is not None) else None ) - return DataSourceUtil.get_cyclic_data_list( - data=data, cyclic=cyclic, pagination=pagination - ) + return DataSourceUtil.get_cyclic_data_list(data=data, cyclic=cyclic, pagination=pagination) -def _load_xml_file( - file_path: Path, cyclic: bool, start_idx: int, end_idx: int -) -> List[Dict]: +def _load_xml_file(file_path: Path, cyclic: bool, start_idx: int, end_idx: int) -> List[Dict]: """ - Load xml content from file using skip and limit - :param cyclic: - :param file_path: - :param start_idx: - :param end_idx: - :return: + Load XML content from file using skip and limit. + + :param file_path: Path to the XML file. + :param cyclic: Whether to cycle through data. + :param start_idx: Starting index. + :param end_idx: Ending index. + :return: List of dictionaries representing XML items. """ # Read the XML data from a file with file_path.open("r") as file: @@ -502,39 +470,36 @@ def _load_xml_file( if (start_idx is not None and end_idx is not None) else None ) - return DataSourceUtil.get_cyclic_data_list( - data=data, cyclic=cyclic, pagination=pagination - ) + return DataSourceUtil.get_cyclic_data_list(data=data, cyclic=cyclic, pagination=pagination) def _evaluate_selector_script(context: Context, stmt: GenerateStatement): """ - Evaluate script selector + Evaluate script selector. + + :param context: Context instance. + :param stmt: GenerateStatement instance. + :return: Evaluated selector. """ from datamimic_ce.tasks.task_util import TaskUtil selector = stmt.selector prefix = stmt.variable_prefix or context.root.default_variable_prefix suffix = stmt.variable_suffix or context.root.default_variable_suffix - return TaskUtil.evaluate_variable_concat_prefix_suffix( - context, selector, prefix=prefix, suffix=suffix - ) + return TaskUtil.evaluate_variable_concat_prefix_suffix(context, selector, prefix=prefix, suffix=suffix) @contextmanager -def gen_timer( - process: Literal["generate", "consume", "process"], - report_logging: bool, - product_name: str, -): +def gen_timer(process: Literal["generate", "consume", "process"], report_logging: bool, product_name: str): """ - Timer for generate and consume process - :param process: - :param report_logging: - :param product_name: - :return: + Timer for generate and consume process. + + :param process: Type of process ('generate', 'consume', 'process'). + :param report_logging: Whether to log the timing information. + :param product_name: Name of the product being processed. + :return: Context manager. """ - timer_result: Dict = dict() + timer_result: Dict = {} # Ignore timer if report_logging is False if not report_logging: @@ -548,17 +513,17 @@ def gen_timer( finally: end_time = time.perf_counter() elapsed_time = end_time - start_time - records_count = timer_result.get("records_count", 0) + records_count = timer_result.get('records_count', 0) process_name = { - "generate": "Generating", - "consume": "Consuming", - "generate_consume": "Generating and consuming", - }.get(process, "Processing") + 'generate': 'Generating', + 'consume': 'Consuming', + 'generate_consume': 'Generating and consuming' + }.get(process, 'Processing') if elapsed_time > 0: records_per_second = int(records_count / elapsed_time) else: - records_per_second = "N/A" + records_per_second = 'N/A' logger.info( f"{process_name} {records_count} records '{product_name}' takes {round(elapsed_time, 5)} seconds " @@ -567,9 +532,11 @@ def gen_timer( class GenerateTask(Task): - def __init__( - self, statement: GenerateStatement, class_factory_util: BaseClassFactoryUtil - ): + """ + Task class for generating data based on the GenerateStatement. + + """ + def __init__(self, statement: GenerateStatement, class_factory_util: BaseClassFactoryUtil): self._statement = statement self._class_factory_util = class_factory_util @@ -579,8 +546,10 @@ def statement(self) -> GenerateStatement: def _determine_count(self, context: Context) -> int: """ - Determine count of generate task - :return: + Determine the count of records to generate. + + :param context: Context instance. + :return: Number of records to generate. """ root_context: SetupContext = context.root @@ -594,20 +563,18 @@ def _determine_count(self, context: Context) -> int: # Check if "selector" is defined with "source" if self.statement.selector: # Evaluate script selector - selector = _evaluate_selector_script( - context=context, stmt=self._statement - ) + selector = _evaluate_selector_script(context=context, stmt=self._statement) client = root_context.get_client_by_id(self.statement.source) if isinstance(client, DatabaseClient): count = client.count_query_length(selector) else: raise ValueError( - "using selector without count only support for DatabaseClient (MongoDB, Relational Database)" + "Using selector without count only supports DatabaseClient (MongoDB, Relational Database)" ) else: count = root_context.data_source_len[self.statement.full_name] - # Check if there is (special) consumer mongodb_upsert + # Check if there is a special consumer (e.g., mongodb_upsert) if count == 0 and self.statement.contain_mongodb_upsert(root_context): # Upsert one collection when no record found by query count = 1 @@ -623,27 +590,29 @@ def _prepare_mp_generate_args( page_size: int, ) -> List[Tuple]: """ - Prepare args of multiprocess function - - :param setup_ctx: - :param count: - :return: + Prepare arguments for multiprocessing function. + + :param setup_ctx: SetupContext instance. + :param single_process_execute_function: Function to execute in single process. + :param count: Total number of records. + :param num_processes: Number of processes. + :param page_size: Page size for processing. + :return: List of argument tuples. """ # Determine chunk size mp_chunk_size = math.ceil(count / num_processes) # Log processing info logger.info( - f"Run {type(self.statement).__name__} task for entity {self.statement.name} with {num_processes} processes in parallel and chunk size: {mp_chunk_size}" + f"Run {type(self.statement).__name__} task for entity {self.statement.name} with " + f"{num_processes} processes in parallel and chunk size: {mp_chunk_size}" ) # Determine chunk indices chunk_data_list = self._get_chunk_indices(mp_chunk_size, count) # Split namespace functions from current namespace - namespace_functions = { - k: v for k, v in setup_ctx.namespace.items() if callable(v) - } + namespace_functions = {k: v for k, v in setup_ctx.namespace.items() if callable(v)} for func in namespace_functions: setup_ctx.namespace.pop(func) @@ -676,8 +645,12 @@ def _prepare_mp_generate_args( def _sp_generate(self, context: Context, start: int, end: int) -> Dict[str, List]: """ - Single-process generate product - :return: + Single-process generate product. + + :param context: Context instance. + :param start: Start index. + :param end: End index. + :return: Generated product data. """ if end - start == 0: return {} @@ -685,241 +658,228 @@ def _sp_generate(self, context: Context, start: int, end: int) -> Dict[str, List report_logging = isinstance(context, SetupContext) and context.report_logging if report_logging: logger.info(f"Process product '{self.statement.name}' with single process") - with gen_timer( - "generate", report_logging, self.statement.full_name - ) as timer_result: + with gen_timer("generate", report_logging, self.statement.full_name) as timer_result: # Generate product - result = _geniter_single_process_generate( - (context, self._statement, (start, end)) - ) - timer_result["records_count"] = ( - len(result[self._statement.full_name]) if len(result) > 0 else 0 - ) + result = _geniter_single_process_generate((context, self._statement, (start, end))) + timer_result["records_count"] = len(result.get(self._statement.full_name, [])) return result def _mp_page_process( - self, - setup_ctx: SetupContext, - page_size: int, - single_process_execute_function: Callable[[Tuple], None], + self, setup_ctx: SetupContext, page_size: int, single_process_execute_function: Callable[[Tuple], None] ): """ - Multi-process generate and consume product by page - :return: + Multi-process page generation and consumption of products. + + This method divides the work across multiple processes, each of which generates and consumes + products in chunks. After multiprocessing, a post-processing step applies any necessary + consumer/exporter operations on the merged results from all processes. + + :param setup_ctx: The setup context instance containing configurations and resources. + :param page_size: The page size for each process to handle per batch. + :param single_process_execute_function: The function each process will execute. """ exporter_util = setup_ctx.root.class_factory_util.get_exporter_util() - with gen_timer( - "process", setup_ctx.report_logging, self.statement.full_name - ) as timer_result: - # Determine count of generate process + + # Start timer to measure entire process duration + with gen_timer("process", setup_ctx.report_logging, self.statement.full_name) as timer_result: + # 1. Determine the total record count and number of processes count = self._determine_count(setup_ctx) + num_processes = setup_ctx.num_process or multiprocessing.cpu_count() timer_result["records_count"] = count - # Determine number of processes - num_processes = setup_ctx.num_process or multiprocessing.cpu_count() - - # Prepare args list of mp function + # 2. Prepare arguments for each process based on count, process count, and page size arg_list = self._prepare_mp_generate_args( - setup_ctx, - single_process_execute_function, - count, - num_processes, - page_size, + setup_ctx, single_process_execute_function, count, num_processes, page_size ) - logger.info( - f"Start processing {count} products with {num_processes} processes, chunks: {[args[2] for args in arg_list]}" - ) - - # Create list of post-consumer after mp process, like MemstoreConsumer, TestResultExporterConsumer - post_exporter_list_str = [] + # Debug log the chunks each process will handle + chunk_info = [args[2] for args in arg_list] + logger.debug(f"Prepared argument list for multiprocessing with chunks: {chunk_info}") + # 3. Initialize any required post-process consumers, e.g., for testing or memory storage + post_consumer_list = [] if setup_ctx.test_mode: - post_exporter_list_str.append(EXPORTER_TEST_RESULT_EXPORTER) - post_exporter_list_str.extend( - list( - filter( - lambda consumer_str: setup_ctx.memstore_manager.contain( - consumer_str - ), - self.statement.targets, - ) + post_consumer_list.append(EXPORTER_TEST_RESULT_EXPORTER) + + post_consumer_list.extend( + filter(lambda consumer_str: setup_ctx.memstore_manager.contain(consumer_str), + self.statement.targets) ) - ) - post_exporter_list = [] - for exporter_str in post_exporter_list_str: - exporter = exporter_util.get_exporter_by_name(setup_ctx, exporter_str) - if exporter is not None: - post_exporter_list.append(exporter) - # Apply the wrapper function to each item in arg_list using the Pool + # Initialize exporters for each post-process consumer + _, post_consumer_list_instances = exporter_util.create_exporter_list( + setup_ctx, post_consumer_list, self.statement.storage_id, self.statement.export_uri) + logger.debug( + f"Post-consumer exporters initialized: {[consumer.__class__.__name__ for consumer in post_consumer_list_instances]}" + ) + + # 4. Run multiprocessing Pool to handle the generation/consumption function for each chunk with multiprocessing.Pool(processes=num_processes) as pool: # Collect then merge result mp_result_list = pool.map(_wrapper, arg_list) - # Consume post consumer - if len(post_exporter_list) > 0: + # 5. Post-processing with consumer consumption for merged results across processes + if post_consumer_list_instances: + logger.debug("Processing merged results with post-consumers.") + for mp_result in mp_result_list: - for key, value in mp_result.items(): - for consumer in post_exporter_list: - consumer.consume((key, value)) + for key, value in mp_result.items(): + for consumer in post_consumer_list_instances: + logger.debug(f"Consuming result for {key} with {consumer.__class__.__name__}") + consumer.consume((key, value)) - del mp_result_list - gc.collect() + # 6. Clean up and finalize + del mp_result_list # Free up memory from the merged results + # gc.collect() def _calculate_default_page_size(self, entity_count: int) -> int: """ - Calculate default page size for processing by page - :param entity_count: - :return: + Calculate default page size for processing by page. + + :param entity_count: Total number of entities. + :return: Page size. """ stmt_page_size = self.statement.page_size # Return page size if defined in statement explicitly if stmt_page_size: - logger.info(f"Using page size {stmt_page_size} defined in statement") + logger.warning(f"Using low page size {stmt_page_size} (< 100) may cause performance issues") if stmt_page_size < 100: - logger.warn( - f"Using low page size {stmt_page_size} (< 100) may cause performance issue" - ) + logger.warn(f"Using low page size {stmt_page_size} (< 100) may cause performance issue") return stmt_page_size if entity_count > 10000: - # Set default page size as 10.000 if entity count > 10.000 + # Set default page size as 10,000 if entity count > 10,000 default_page_size = 10000 else: - # Set default page size as entity count if entity count <= 10.000 + # Set default page size as entity count if entity count <= 10,000 default_page_size = entity_count # Reduce default page size if column count > 25 - col_count = len(self.statement.sub_statements) + col_count = len( + [ + stmt + for stmt in self.statement.sub_statements + if isinstance( + stmt, + ( + ArrayStatement, + ElementStatement, + ItemStatement, + IfStatement, + KeyStatement, + ListStatement, + NestedKeyStatement, + ), + ) + ] + ) if col_count > 25: reduction_factor = col_count / 25 default_page_size = int(default_page_size / reduction_factor) # Log calculated default page size default_page_size = max(1, default_page_size) - logger.info( - f"Using default page size {default_page_size} for processing by page" - ) + logger.info(f"Using default page size {default_page_size} for processing by page") return default_page_size - def execute(self, context: Context) -> Dict[str, List] | None: + @staticmethod + def _scan_data_source(ctx: SetupContext, statement: Statement) -> None: """ - Execute generate task. If gen_stmt is inner, return generated product, otherwise consume them. - :param context: - :return: + Scan data source and set data source length. + + :param ctx: SetupContext instance. + :param statement: Statement instance. + :return: None """ - try: - self.pre_execute(context) + # 1. Scan statement + DataSourceUtil.set_data_source_length(ctx, statement) + # 2. Scan sub-statement + if isinstance(statement, CompositeStatement): + for child_stmt in statement.sub_statements: + GenerateTask._scan_data_source(ctx, child_stmt) - # Determine count of generate process - count = self._determine_count(context) + def execute(self, context: SetupContext) -> Dict[str, List] | None: + """ + Execute generate task. If gen_stmt is inner, return generated product; otherwise, consume them. - if count == 0: - return {self.statement.full_name: []} + :param context: Context instance. + :return: Generated product data or None. + """ + self.pre_execute(context) - page_size = self._calculate_default_page_size(count) + # Determine count of generate process + count = self._determine_count(context) - task_util_cls = context.root.class_factory_util.get_task_util_cls() + if count == 0: + return {self.statement.full_name: []} - # Generate and consume if gen_stmt is outermost (which has context as SetupContext) - if isinstance(context, SetupContext): - exporter_util = context.root.class_factory_util.get_exporter_util() - consumer_with_operations, _ = exporter_util.create_exporter_list( - context, self.statement - ) + page_size = self._calculate_default_page_size(count) - # Switch to non mp if there is MongoDB delete operation - has_mongodb_delete = any( - [ - operation == "delete" and isinstance(consumer, MongoDBExporter) - for consumer, operation in consumer_with_operations - ] - ) - match self.statement.multiprocessing: - case None: - use_mp = (not has_mongodb_delete) and context.use_mp - case _: - use_mp = bool(self.statement.multiprocessing) - - # Generate in mp - if use_mp: - # IMPORTANT: always use deep copied setup_ctx for mp to avoid modify original setup_ctx accidentally - copied_ctx = copy.deepcopy(context) - # Process data by page - logger.info( - f"Processing by page with size {page_size} for '{self.statement.name}'" - ) - self._mp_page_process( - copied_ctx, - page_size, - _geniter_single_process_generate_and_consume_by_page, - ) - task_util_cls.consume_minio_after_page_processing( - self.statement, context - ) - # Generate and consume in sp - else: - # Process data by page in single process - index_chunk = self._get_chunk_indices(page_size, count) + task_util_cls = context.root.class_factory_util.get_task_util_cls() + + # Generate and consume if gen_stmt is outermost (which has context as SetupContext) + if isinstance(context, SetupContext): + exporter_util = context.root.class_factory_util.get_exporter_util() + consumer_with_operations, consumer_without_operations = exporter_util.create_exporter_list( + setup_context=context, + consumer_str_list=list(self.statement.targets), + storage_type=self.statement.storage_id, + target_uri=self.statement.export_uri, + ) + # Check for conditions to use multiprocessing + has_mongodb_delete = any( + [ + operation == "delete" and isinstance(consumer, MongoDBExporter) + for consumer, operation in consumer_with_operations + ] + ) + match self.statement.multiprocessing: + case None: + use_mp = (not has_mongodb_delete) and context.use_mp + case _: + use_mp = bool(self.statement.multiprocessing) + + # Generate in multiprocessing + if use_mp: + # IMPORTANT: always use deep copied setup_ctx for mp to avoid modify original setup_ctx accidentally + copied_ctx = copy.deepcopy(context) + # Process data by page + logger.info(f"Processing by page with size {page_size} for '{self.statement.name}'") + self._mp_page_process(copied_ctx, page_size, _geniter_single_process_generate_and_consume_by_page) + task_util_cls.consume_minio_after_page_processing(self.statement, context) + # Generate and consume in single process + else: + # Process data by page in single process + index_chunk = self._get_chunk_indices(page_size, count) + logger.info(f"Processing {len(index_chunk)} pages for {count} products of '{self.statement.name}'") + for page_index, page_tuple in enumerate(index_chunk): + start, end = page_tuple logger.info( - f"Processing {len(index_chunk)} pages for {count} products of '{self.statement.name}'" - ) - for page_index, page_tuple in enumerate(index_chunk): - start, end = page_tuple - logger.info( - f"Processing {end - start} product '{self.statement.name}' on page {page_index + 1}/{len(index_chunk)} in a single process" - ) - # Generate product - result = self._sp_generate(context, start, end) - # Consume by page - _consume_by_page( - self.statement, - context, - result, - page_index, - page_size, - None, - None, - ) - - # Manual garbage collection to free memory - del result - gc.collect() - # Consume MinioConsumer - task_util_cls.consume_minio_after_page_processing( - self.statement, context + f"Processing {end - start} product '{self.statement.name}' on page {page_index + 1}/{len(index_chunk)} in a single process" ) + # Generate product + result = self._sp_generate(context, start, end) + # Consume by page + _consume_by_page(self.statement, context, result, page_index, page_size, None, None) - # Just return product generated in sp if gen_stmt is inner one - else: - # Do not apply process by page for inner gen_stmt - return self._sp_generate(context, 0, count) - finally: - # Attempt to delete the temporary result directory - try: - result_temp_dir = ( - context.root.descriptor_dir / f"temp_result_{context.root.task_id}" - ) - shutil.rmtree(result_temp_dir) - logger.debug( - f"Processing temp result directory '{result_temp_dir}' has been removed successfully." - ) - except FileNotFoundError: - logger.debug( - f"Processing temp result directory has been removed successfully." - ) - except Exception as e: - logger.error(f"Failed to remove Processing temp result directory : {e}") + # Manual garbage collection to free memory + del result + # gc.collect() + + # Just return product generated in single process if gen_stmt is inner one + else: + # Do not apply process by page for inner gen_stmt + return self._sp_generate(context, 0, count) @staticmethod def convert_xml_dict_to_json_dict(xml_dict: Dict): """ - Convert XML dict with #text and @attribute to pure JSON dict - :param xml_dict: - :return: + Convert XML dict with #text and @attribute to pure JSON dict. + + :param xml_dict: XML dictionary. + :return: JSON dictionary. """ if "#text" in xml_dict: return xml_dict["#text"] @@ -930,10 +890,7 @@ def convert_xml_dict_to_json_dict(xml_dict: Dict): res[key] = GenerateTask.convert_xml_dict_to_json_dict(value) elif isinstance(value, list): res[key] = [ - GenerateTask.convert_xml_dict_to_json_dict(v) - if isinstance(v, dict) - else v - for v in value + GenerateTask.convert_xml_dict_to_json_dict(v) if isinstance(v, dict) else v for v in value ] else: res[key] = value @@ -942,48 +899,71 @@ def convert_xml_dict_to_json_dict(xml_dict: Dict): @staticmethod def _get_chunk_indices(chunk_size: int, data_count: int) -> List: """ - Create list of chunk indices based on chunk size and required data count - :param chunk_size: - :param data_count: - :return: - """ - return [ - (i, min(i + chunk_size, data_count)) - for i in range(0, data_count, chunk_size) - ] + Create list of chunk indices based on chunk size and required data count. - def _scan_data_source(self, ctx: SetupContext, statement: Statement) -> None: - # 1. Scan statement - self._class_factory_util.get_datasource_util_cls().set_data_source_length( - ctx, statement - ) - # 2. Scan sub-statement - if isinstance(statement, CompositeStatement): - for child_stmt in statement.sub_statements: - self._scan_data_source(ctx, child_stmt) + :param chunk_size: Size of each chunk. + :param data_count: Total data count. + :return: List of tuples representing chunk indices. + """ + return [(i, min(i + chunk_size, data_count)) for i in range(0, data_count, chunk_size)] def pre_execute(self, context: Context): """ - Pre-execute task in sp before mp execution - :param context: - :return: + Pre-execute task in single process before multiprocessing execution. + + :param context: Context instance. + :return: None """ root_context = context.root + from datamimic.tasks.task_util import TaskUtil - task_util_cls = root_context.class_factory_util.get_task_util_cls() - # Execute pre-tasks pre_tasks = [ - task_util_cls.get_task_by_statement(root_context, child_stmt, None) + TaskUtil.get_task_by_statement(root_context, child_stmt, None) for child_stmt in self.statement.sub_statements if isinstance(child_stmt, KeyStatement) ] for task in pre_tasks: task.pre_execute(context) + # def _determine_preview_record_len(self) -> int: + # """ + # Determine preview record length. + # + # :return: Preview record length. + # """ + # num_of_field = len( + # [ + # stmt + # for stmt in self.statement.sub_statements + # if isinstance( + # stmt, + # ( + # ArrayStatement, + # ElementStatement, + # ItemStatement, + # IfStatement, + # KeyStatement, + # ListStatement, + # NestedKeyStatement, + # ), + # ) + # ] + # ) + # + # # Determine max_data_len based on max_preview_data and num_of_fields + # default_max_preview_data = settings.LOGGER_REDIS_MAX_DATA + # max_data_len = default_max_preview_data + # if default_max_preview_data >= 500 and num_of_field >= 20: + # max_data_len = 400 + # if default_max_preview_data >= 100 and num_of_field >= 40: + # max_data_len = 100 + # if default_max_preview_data >= 10 and num_of_field >= 100: + # max_data_len = 10 + # + # return max_data_len + @staticmethod - def execute_include( - setup_stmt: SetupStatement, parent_context: GenIterContext - ) -> None: + def execute_include(setup_stmt: SetupStatement, parent_context: GenIterContext) -> None: """ Execute include XML model inside :param setup_stmt: @@ -1003,3 +983,39 @@ def execute_include( for stmt in setup_stmt.sub_statements: task = task_util_cls.get_task_by_statement(root_context, stmt) task.execute(root_context) + + @staticmethod + def _finalize_and_export_consumers(context: Context, stmt: GenerateStatement): + """ + Finalize chunks and export data for all consumers that require it. + + :param context: Context instance. + :param stmt: GenerateStatement instance. + :return: None + """ + # Create list of consumers that need to finalize and export + consumers_with_operation, consumers_without_operation = ExporterUtil.create_exporter_list( + setup_context=context.root, + consumer_str_list=list(stmt.targets), + storage_type=stmt.storage_id, + target_uri=stmt.export_uri, + ) + + # Combine all consumers + all_consumers = [consumer for consumer, _ in consumers_with_operation] + consumers_without_operation + + for consumer in all_consumers: + # Only finalize and export if the exporter has these methods + try: + # Construct the export name to include task_id + export_name = f"{context.root.task_id}/{stmt.name}" + + consumer.finalize_chunks() if hasattr(consumer, "finalize_chunks") else None + ( + consumer.upload_to_storage(bucket=stmt.bucket or stmt.container, name=export_name) + if hasattr(consumer, "upload_to_storage") + else None + ) + except Exception as e: + logger.error(f"Error finalizing and exporting data for {consumer}: {e}") + diff --git a/datamimic_ce/tasks/task_util.py b/datamimic_ce/tasks/task_util.py index 1160cf4..e1c2262 100644 --- a/datamimic_ce/tasks/task_util.py +++ b/datamimic_ce/tasks/task_util.py @@ -9,28 +9,23 @@ from datamimic_ce.clients.mongodb_client import MongoDBClient from datamimic_ce.clients.rdbms_client import RdbmsClient -from datamimic_ce.constants.exporter_constants import \ - EXPORTER_TEST_RESULT_EXPORTER +from datamimic_ce.constants.exporter_constants import EXPORTER_TEST_RESULT_EXPORTER from datamimic_ce.contexts.context import Context from datamimic_ce.contexts.setup_context import SetupContext from datamimic_ce.converter.append_converter import AppendConverter from datamimic_ce.converter.converter import Converter from datamimic_ce.converter.cut_length_converter import CutLengthConverter -from datamimic_ce.converter.date2timestamp_converter import \ - Date2TimestampConverter +from datamimic_ce.converter.date2timestamp_converter import Date2TimestampConverter from datamimic_ce.converter.date_format_converter import DateFormatConverter from datamimic_ce.converter.hash_converter import HashConverter from datamimic_ce.converter.java_hash_converter import JavaHashConverter from datamimic_ce.converter.lower_case_converter import LowerCaseConverter from datamimic_ce.converter.mask_converter import MaskConverter from datamimic_ce.converter.middle_mask_converter import MiddleMaskConverter -from datamimic_ce.converter.remove_none_or_empty_element_converter import \ - RemoveNoneOrEmptyElementConverter -from datamimic_ce.converter.timestamp2date_converter import \ - Timestamp2DateConverter +from datamimic_ce.converter.remove_none_or_empty_element_converter import RemoveNoneOrEmptyElementConverter +from datamimic_ce.converter.timestamp2date_converter import Timestamp2DateConverter from datamimic_ce.converter.upper_case_converter import UpperCaseConverter -from datamimic_ce.data_sources.data_source_pagination import \ - DataSourcePagination +from datamimic_ce.data_sources.data_source_pagination import DataSourcePagination from datamimic_ce.enums.converter_enums import ConverterEnum from datamimic_ce.exporters.csv_exporter import CSVExporter from datamimic_ce.exporters.exporter_util import ExporterUtil @@ -69,12 +64,15 @@ from datamimic_ce.tasks.else_if_task import ElseIfTask from datamimic_ce.tasks.else_task import ElseTask from datamimic_ce.tasks.execute_task import ExecuteTask -from datamimic_ce.tasks.generate_task import (GenerateTask, - _evaluate_selector_script, - _load_csv_file, _load_json_file, - _load_temp_result_file, - _load_xml_file, - _pre_consume_product) +from datamimic_ce.tasks.generate_task import ( + GenerateTask, + _evaluate_selector_script, + _load_csv_file, + _load_json_file, + _load_temp_result_file, + _load_xml_file, + _pre_consume_product, +) from datamimic_ce.tasks.generator_task import GeneratorTask from datamimic_ce.tasks.if_task import IfTask from datamimic_ce.tasks.include_task import IncludeTask @@ -122,9 +120,7 @@ def get_task_by_statement( elif isinstance(stmt, ReferenceStatement): return ReferenceTask(stmt, pagination) elif isinstance(stmt, ListStatement): - return ListTask( - ctx=ctx, statement=stmt, class_factory_util=class_factory_util - ) + return ListTask(ctx=ctx, statement=stmt, class_factory_util=class_factory_util) elif isinstance(stmt, ItemStatement): return ItemTask(ctx, stmt, class_factory_util) elif isinstance(stmt, IfStatement): @@ -142,14 +138,10 @@ def get_task_by_statement( elif isinstance(stmt, GeneratorStatement): return GeneratorTask(stmt) else: - raise ValueError( - f"Cannot created task for statement {stmt.__class__.__name__}" - ) + raise ValueError(f"Cannot created task for statement {stmt.__class__.__name__}") @staticmethod - def evaluate_file_script_template( - ctx: Context, datas: Union[Dict, List], prefix: str, suffix: str - ): + def evaluate_file_script_template(ctx: Context, datas: Union[Dict, List], prefix: str, suffix: str): """ Check value in csv or json file that contain python expression then evaluate variables and functions @@ -159,13 +151,9 @@ def evaluate_file_script_template( result = {} for key, json_value in datas.items(): if isinstance(json_value, Union[Dict, List]): - value = TaskUtil.evaluate_file_script_template( - ctx, json_value, prefix, suffix - ) + value = TaskUtil.evaluate_file_script_template(ctx, json_value, prefix, suffix) elif isinstance(json_value, str): - value = TaskUtil._evaluate_script_value( - ctx, json_value, prefix, suffix - ) + value = TaskUtil._evaluate_script_value(ctx, json_value, prefix, suffix) else: value = json_value result.update({key: value}) @@ -174,21 +162,11 @@ def evaluate_file_script_template( result = [] for value in datas: if isinstance(value, List): - result.extend( - TaskUtil.evaluate_file_script_template( - ctx, value, prefix, suffix - ) - ) + result.extend(TaskUtil.evaluate_file_script_template(ctx, value, prefix, suffix)) elif isinstance(value, Dict): - result.append( - TaskUtil.evaluate_file_script_template( - ctx, value, prefix, suffix - ) - ) + result.append(TaskUtil.evaluate_file_script_template(ctx, value, prefix, suffix)) elif isinstance(value, str): - result.append( - TaskUtil._evaluate_script_value(ctx, value, prefix, suffix) - ) + result.append(TaskUtil._evaluate_script_value(ctx, value, prefix, suffix)) else: result.append(value) return result @@ -214,9 +192,7 @@ def _evaluate_script_value(ctx: Context, data: str, prefix: str, suffix: str): match = re.search(r"^{(.*)}$", data) return ctx.evaluate_python_expression(match.group(1)) - return TaskUtil.evaluate_variable_concat_prefix_suffix( - ctx, data, prefix, suffix - ) + return TaskUtil.evaluate_variable_concat_prefix_suffix(ctx, data, prefix, suffix) except Exception as e: logger.error(f"Error evaluating script '{data}': {e}") @@ -279,9 +255,7 @@ def create_converter_list(context: Context, converter_str: str) -> List[Converte ) @staticmethod - def evaluate_variable_concat_prefix_suffix( - context: Context, expr: str, prefix: str, suffix: str - ) -> str: + def evaluate_variable_concat_prefix_suffix(context: Context, expr: str, prefix: str, suffix: str) -> str: """ Evaluate expression data, replace dynamic variables have prefix and suffix with value :param context: @@ -300,13 +274,7 @@ def evaluate_variable_concat_prefix_suffix( return expr # Evaluate all dynamic variables (this return only string value), e.g. '{my_name} is {my_age} years old' - return re.sub( - pattern, - lambda matched_var: str( - context.evaluate_python_expression(matched_var.group(1)) - ), - expr, - ) + return re.sub(pattern, lambda matched_var: str(context.evaluate_python_expression(matched_var.group(1))), expr) @staticmethod def gen_task_load_data_from_source( @@ -350,10 +318,7 @@ def gen_task_load_data_from_source( # Load data from JSON elif source_str.endswith(".json"): source_data = _load_json_file( - root_context.descriptor_dir / source_str, - stmt.cyclic, - load_start_idx, - load_end_idx, + root_context.descriptor_dir / source_str, stmt.cyclic, load_start_idx, load_end_idx ) # if sourceScripted then evaluate python expression in json if source_scripted: @@ -362,16 +327,11 @@ def gen_task_load_data_from_source( ctx=context, datas=source_data, prefix=prefix, suffix=suffix ) except Exception as e: - logger.debug( - f"Failed to pre-evaluate source script for {stmt.full_name}: {e}" - ) + logger.debug(f"Failed to pre-evaluate source script for {stmt.full_name}: {e}") # Load data from XML elif source_str.endswith(".template.xml"): source_data = _load_xml_file( - root_context.descriptor_dir / source_str, - stmt.cyclic, - load_start_idx, - load_end_idx, + root_context.descriptor_dir / source_str, stmt.cyclic, load_start_idx, load_end_idx ) # if sourceScripted then evaluate python expression in json if source_scripted: @@ -380,9 +340,9 @@ def gen_task_load_data_from_source( ) # Load data from in-memory memstore elif root_context.memstore_manager.contain(source_str): - source_data = root_context.memstore_manager.get_memstore( - source_str - ).get_data_by_type(stmt.type or stmt.name, load_pagination, stmt.cyclic) + source_data = root_context.memstore_manager.get_memstore(source_str).get_data_by_type( + stmt.type or stmt.name, load_pagination, stmt.cyclic + ) # Load data from client (MongoDB, RDBMS,...) elif root_context.clients.get(source_str) is not None: client = root_context.clients.get(source_str) @@ -390,13 +350,9 @@ def gen_task_load_data_from_source( if isinstance(client, MongoDBClient): if stmt.selector: selector = _evaluate_selector_script(context, stmt) - source_data = client.get_by_page_with_query( - query=selector, pagination=load_pagination - ) + source_data = client.get_by_page_with_query(query=selector, pagination=load_pagination) elif stmt.type: - source_data = client.get_by_page_with_type( - collection_name=stmt.type, pagination=load_pagination - ) + source_data = client.get_by_page_with_type(collection_name=stmt.type, pagination=load_pagination) else: raise ValueError( "MongoDB source requires at least attribute 'type', 'selector' or 'iterationSelector'" @@ -412,52 +368,42 @@ def gen_task_load_data_from_source( elif isinstance(client, RdbmsClient): if stmt.selector: selector = _evaluate_selector_script(context, stmt) - source_data = client.get_by_page_with_query( - original_query=selector, pagination=load_pagination - ) + source_data = client.get_by_page_with_query(original_query=selector, pagination=load_pagination) else: - source_data = client.get_by_page_with_type( - type=stmt.type or stmt.name, pagination=load_pagination - ) + source_data = client.get_by_page_with_type(type=stmt.type or stmt.name, pagination=load_pagination) else: - raise ValueError( - f"Cannot load data from client: {type(client).__name__}" - ) + raise ValueError(f"Cannot load data from client: {type(client).__name__}") else: raise ValueError(f"cannot find data source {source_str} for iterate task") return source_data, build_from_source - @staticmethod - def consume_minio_after_page_processing(stmt, context: Context) -> None: - """ - Load all temp files and consume MinioConsumer - :param stmt: - :param context: - :return: - """ - # Load temp result file - result_temp_dir = ( - context.root.descriptor_dir / f"temp_result_{context.root.task_id}" - ) - consumed_result = _load_temp_result_file(result_temp_dir) - - for stmt_name, product_result in consumed_result.items(): - # Load current gen_stmt with corresponding targets - current_stmt = stmt.retrieve_sub_statement_by_fullname(stmt_name) - - # Get list of MinioConsumer - _, consumers_without_operation = ExporterUtil.create_exporter_list( - setup_context=context.root, stmt=current_stmt - ) - - # Preprocess and consume data - consumed_result = _pre_consume_product(current_stmt, product_result) - for consumer in consumers_without_operation: - if isinstance( - consumer, (XMLExporter, JsonExporter, TXTExporter, CSVExporter) - ): - consumer.consume(consumed_result) + # @staticmethod + # def consume_minio_after_page_processing(stmt, context: Context) -> None: + # """ + # Load all temp files and consume MinioConsumer + # :param stmt: + # :param context: + # :return: + # """ + # # Load temp result file + # result_temp_dir = context.root.descriptor_dir / f"temp_result_{context.root.task_id}" + # consumed_result = _load_temp_result_file(result_temp_dir) + # + # for stmt_name, product_result in consumed_result.items(): + # # Load current gen_stmt with corresponding targets + # current_stmt = stmt.retrieve_sub_statement_by_fullname(stmt_name) + # + # # Get list of MinioConsumer + # _, consumers_without_operation = ExporterUtil.create_exporter_list( + # setup_context=context.root, stmt=current_stmt + # ) + # + # # Preprocess and consume data + # consumed_result = _pre_consume_product(current_stmt, product_result) + # for consumer in consumers_without_operation: + # if isinstance(consumer, (XMLExporter, JsonExporter, TXTExporter, CSVExporter)): + # consumer.consume(consumed_result) @staticmethod def consume_product_by_page( @@ -467,17 +413,16 @@ def consume_product_by_page( page_info: Tuple, ) -> None: """ - Consume single list of product in generate statement + Consume single list of product in generate statement. - :param root_context: - :param stmt: - :param xml_result: - :return: + :param root_context: SetupContext instance. + :param stmt: GenerateStatement instance. + :param xml_result: List of generated product data. + :param page_info: Tuple containing page information. + :return: None """ # Convert XML result into JSON result - json_result = [ - GenerateTask.convert_xml_dict_to_json_dict(res) for res in xml_result - ] + json_result = [GenerateTask.convert_xml_dict_to_json_dict(res) for res in xml_result] # Wrap product key and value into a tuple # for iterate database may have key, value, and other statement attribute info @@ -488,33 +433,31 @@ def consume_product_by_page( consumer_set: Set[str] = stmt.targets # Add TestResultConsumer if process is in testing mode if root_context.test_mode: - # Only add TestResultExporterConsumer if not using multiprocess, otherwise collect all result in the end + # Only add TestResultExporterConsumer if not using multiprocessing, otherwise collect all result in the end if not root_context.use_mp: consumer_set.add(EXPORTER_TEST_RESULT_EXPORTER) # 3.2: Consume data # dbms consumer can have operation (e.g. mongodb.update), if so consumer is tuple[Consumer, operation] exporter_util = root_context.class_factory_util.get_exporter_util() - consumers_with_operation, consumers_without_operation = ( - exporter_util.create_exporter_list(setup_context=root_context, stmt=stmt) + consumers_with_operation, consumers_without_operation = exporter_util.create_exporter_list( + setup_context=root_context, stmt=stmt ) # run consumer with operation first, because some operation may change the product result for consumer in consumers_with_operation: - temp_consumer, operation = consumer - if isinstance(temp_consumer, MongoDBExporter) and operation == "upsert": - json_product = temp_consumer.upsert(product=json_product) - elif hasattr(temp_consumer, operation): - getattr(temp_consumer, operation)(json_product) + consumer_obj, operation = consumer + if isinstance(consumer_obj, MongoDBExporter) and operation == "upsert": + json_product = consumer_obj.upsert(product=json_product) + elif hasattr(consumer_obj, operation): + getattr(consumer_obj, operation)(json_product) else: - raise ValueError(f"Consumer not found: {consumer}.{operation}") + raise ValueError(f"Consumer does not support operation: {consumer_obj}.{operation}") for consumer in consumers_without_operation: - if isinstance( - consumer, (XMLExporter, JsonExporter, TXTExporter, CSVExporter) - ): - continue - # TODO: re-check XMLExporter with page processing + if isinstance(consumer, (XMLExporter, JsonExporter, TXTExporter, CSVExporter)): + # Specific exporters handle their own consumption + consumer.consume(json_product) # TODO: re-check XMLExporter with page processing if isinstance(consumer, XMLExporter): xml_product = _pre_consume_product(stmt, xml_result) consumer.consume(xml_product) diff --git a/tests_ce/conftest.py b/tests_ce/conftest.py index 2f99418..06ac491 100644 --- a/tests_ce/conftest.py +++ b/tests_ce/conftest.py @@ -18,8 +18,8 @@ @pytest.fixture def mysql_services(): try: - if settings.DATAMIMIC_LIB_ENVIRONMENT == "lib_staging": - logger.info("Staging Environment detected, no need to manually activate my sql services") + if settings.RUNTIME_ENVIRONMENT == "production": + logger.info("Production Environment detected, no need to manually activate my sql services") yield None return else: diff --git a/tests_ce/integration_tests/consumer_csv/test_csv_consumer.xml b/tests_ce/integration_tests/consumer_csv/test_csv_consumer.xml index f040b59..4e55161 100644 --- a/tests_ce/integration_tests/consumer_csv/test_csv_consumer.xml +++ b/tests_ce/integration_tests/consumer_csv/test_csv_consumer.xml @@ -1,5 +1,5 @@ - + diff --git a/tests_ce/integration_tests/datamimic_demo/j-json/datamimic.xml b/tests_ce/integration_tests/datamimic_demo/j-json/datamimic.xml index 5fae004..4232aba 100644 --- a/tests_ce/integration_tests/datamimic_demo/j-json/datamimic.xml +++ b/tests_ce/integration_tests/datamimic_demo/j-json/datamimic.xml @@ -1,4 +1,4 @@ - + diff --git a/tests_ce/integration_tests/datamimic_demo/p-xml/datamimic.xml b/tests_ce/integration_tests/datamimic_demo/p-xml/datamimic.xml index d3b9b72..162c541 100644 --- a/tests_ce/integration_tests/datamimic_demo/p-xml/datamimic.xml +++ b/tests_ce/integration_tests/datamimic_demo/p-xml/datamimic.xml @@ -1,4 +1,4 @@ - +