From f2be7cbbc73d9df762c7a2807d920bddd1906b62 Mon Sep 17 00:00:00 2001 From: Marek Dobransky Date: Fri, 23 Aug 2024 19:09:14 +0200 Subject: [PATCH] aggregated dependency printout (#11) --- rialto/runner/runner.py | 12 +++++++++--- rialto/runner/transformation.py | 1 + 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/rialto/runner/runner.py b/rialto/runner/runner.py index ade89ff..343d2fe 100644 --- a/rialto/runner/runner.py +++ b/rialto/runner/runner.py @@ -217,6 +217,9 @@ def check_dependencies(self, pipeline: PipelineConfig, run_date: date) -> bool: :return: bool """ logger.info(f"{pipeline.name} checking dependencies for {run_date}") + + error = "" + for dependency in pipeline.dependencies: dep_from = DateManager.date_subtract(run_date, dependency.interval.units, dependency.interval.value) logger.info(f"Looking for {dependency.table} from {dep_from} until {run_date}") @@ -237,12 +240,15 @@ def check_dependencies(self, pipeline: PipelineConfig, run_date: date) -> bool: source = Table(table_path=dependency.table, partition=date_col) if True in self.check_dates_have_partition(source, possible_dep_dates): logger.info(f"Dependency for {dependency.table} from {dep_from} until {run_date} is fulfilled") - continue else: msg = f"Missing dependency for {dependency.table} from {dep_from} until {run_date}" logger.info(msg) - self.tracker.last_error = msg - return False + error = error + msg + "\n" + + if error != "": + self.tracker.last_error = error + return False + return True def get_possible_run_dates(self, schedule: ScheduleConfig) -> List[date]: diff --git a/rialto/runner/transformation.py b/rialto/runner/transformation.py index 210cb0b..4399ce0 100644 --- a/rialto/runner/transformation.py +++ b/rialto/runner/transformation.py @@ -43,6 +43,7 @@ def run( :param run_date: date :param spark: spark session :param metadata_manager: metadata api object + :param dependencies: dictionary of dependencies :return: dataframe """ raise NotImplementedError