diff --git a/rialto/runner/runner.py b/rialto/runner/runner.py index ade89ff..343d2fe 100644 --- a/rialto/runner/runner.py +++ b/rialto/runner/runner.py @@ -217,6 +217,9 @@ def check_dependencies(self, pipeline: PipelineConfig, run_date: date) -> bool: :return: bool """ logger.info(f"{pipeline.name} checking dependencies for {run_date}") + + error = "" + for dependency in pipeline.dependencies: dep_from = DateManager.date_subtract(run_date, dependency.interval.units, dependency.interval.value) logger.info(f"Looking for {dependency.table} from {dep_from} until {run_date}") @@ -237,12 +240,15 @@ def check_dependencies(self, pipeline: PipelineConfig, run_date: date) -> bool: source = Table(table_path=dependency.table, partition=date_col) if True in self.check_dates_have_partition(source, possible_dep_dates): logger.info(f"Dependency for {dependency.table} from {dep_from} until {run_date} is fulfilled") - continue else: msg = f"Missing dependency for {dependency.table} from {dep_from} until {run_date}" logger.info(msg) - self.tracker.last_error = msg - return False + error = error + msg + "\n" + + if error != "": + self.tracker.last_error = error + return False + return True def get_possible_run_dates(self, schedule: ScheduleConfig) -> List[date]: diff --git a/rialto/runner/transformation.py b/rialto/runner/transformation.py index 210cb0b..4399ce0 100644 --- a/rialto/runner/transformation.py +++ b/rialto/runner/transformation.py @@ -43,6 +43,7 @@ def run( :param run_date: date :param spark: spark session :param metadata_manager: metadata api object + :param dependencies: dictionary of dependencies :return: dataframe """ raise NotImplementedError