diff --git a/README.md b/README.md index 2ac915f..f179dc6 100644 --- a/README.md +++ b/README.md @@ -85,8 +85,8 @@ pipelines: # a list of pipelines to run frequency: weekly # daily/weekly/monthly day: 7 # day of the week or month info_date_shift: #Optional shift in the written information date from the scheduled day - units: "days" # days/weeks/months/years - value: 5 # subtracted from scheduled day + - units: "days" # days/weeks/months/years + value: 5 # subtracted from scheduled day dependencies: # list of dependent tables - table: catalog.schema.table1 name: "table1" # Optional table name, used to recall dependency details in transformation @@ -372,7 +372,7 @@ With that sorted out, we can now provide a quick example of the *rialto.jobs* mo ```python from pyspark.sql import DataFrame from rialto.common import TableReader -from rialto.jobs.decorators import config, job, datasource +from rialto.jobs.decorators import config_parser, job, datasource from rialto.runner.config_loader import PipelineConfig from pydantic import BaseModel @@ -381,10 +381,12 @@ class ConfigModel(BaseModel): some_value: int some_other_value: str -@config + +@config_parser def my_config(config: PipelineConfig): return ConfigModel(**config.extras) + @datasource def my_datasource(run_date: datetime.date, table_reader: TableReader) -> DataFrame: return table_reader.get_latest("my_catalog.my_schema.my_table", date_until=run_date) @@ -442,44 +444,48 @@ Assuming we have a my_package.test_job_module.py module: ```python3 @datasource def datasource_a(...) - ... code ... + ... code... @job def my_job(datasource_a, ...) - ... code ... + ... code... ``` The *disable_job_decorators* context manager, as the name suggests, disables all decorator functionality and lets you access your functions as raw functions - making it super simple to unit-test: + ```python3 -from rialto.jobs.decorators.test_utils import disable_job_decorators +from rialto.jobs.test_utils import disable_job_decorators import my_package.test_job_module as tjm + # Datasource Testing def test_datasource_a(): - ... mocks here ... + ... mocks here... with disable_job_decorators(tjm): - datasource_a_output = tjm.datasource_a(... mocks ...) + datasource_a_output = tjm.datasource_a(...mocks...) + + ...asserts... - ... asserts ... # Job Testing def test_my_job(): datasource_a_mock = ... - ... other mocks... + ...other mocks... with disable_job_decorators(tjm): - job_output = tjm.my_job(datasource_a_mock, ... mocks ...) + job_output = tjm.my_job(datasource_a_mock, ...mocks...) - ... asserts ... + ...asserts... ``` #### 2. Testing the @job Dependency Tree In complex use cases, it may happen that the dependencies of a job become quite complex. Or you simply want to be sure that you didn't accidentally misspelled your dependency name: ```python3 -from rialto.jobs.decorators.test_utils import resolver_resolves +from rialto.jobs.test_utils import resolver_resolves import my_job.test_job_module as tjm + def test_my_job_resolves(spark): assert resolver_resolves(spark, tjm.my_job) ``` diff --git a/rialto/jobs/__init__.py b/rialto/jobs/__init__.py index a6ee6cb..0c3e01c 100644 --- a/rialto/jobs/__init__.py +++ b/rialto/jobs/__init__.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from rialto.jobs.decorators import config, datasource, job +from rialto.jobs.decorators import config_parser, datasource, job diff --git a/rialto/jobs/decorators/decorators.py b/rialto/jobs/decorators.py similarity index 92% rename from rialto/jobs/decorators/decorators.py rename to rialto/jobs/decorators.py index d288b7b..dd79bdd 100644 --- a/rialto/jobs/decorators/decorators.py +++ b/rialto/jobs/decorators.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -__all__ = ["datasource", "job", "config"] +__all__ = ["datasource", "job", "config_parser"] import inspect import typing @@ -20,22 +20,22 @@ import importlib_metadata from loguru import logger -from rialto.jobs.decorators.job_base import JobBase -from rialto.jobs.decorators.resolver import Resolver +from rialto.jobs.job_base import JobBase +from rialto.jobs.resolver import Resolver -def config(ds_getter: typing.Callable) -> typing.Callable: +def config_parser(cf_getter: typing.Callable) -> typing.Callable: """ Config parser functions decorator. Registers a config parsing function into a rialto job prerequisite. You can then request the job via job function arguments. - :param ds_getter: dataset reader function - :return: raw reader function, unchanged + :param cf_getter: dataset reader function + :return: raw function, unchanged """ - Resolver.register_callable(ds_getter) - return ds_getter + Resolver.register_callable(cf_getter) + return cf_getter def datasource(ds_getter: typing.Callable) -> typing.Callable: diff --git a/rialto/jobs/decorators/__init__.py b/rialto/jobs/decorators/__init__.py deleted file mode 100644 index 6f2713a..0000000 --- a/rialto/jobs/decorators/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright 2022 ABSA Group Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from .decorators import config, datasource, job diff --git a/rialto/jobs/decorators/job_base.py b/rialto/jobs/job_base.py similarity index 98% rename from rialto/jobs/decorators/job_base.py rename to rialto/jobs/job_base.py index d91537f..c65341d 100644 --- a/rialto/jobs/decorators/job_base.py +++ b/rialto/jobs/job_base.py @@ -24,7 +24,7 @@ from pyspark.sql import DataFrame, SparkSession from rialto.common import TableReader -from rialto.jobs.decorators.resolver import Resolver +from rialto.jobs.resolver import Resolver from rialto.loader import PysparkFeatureLoader from rialto.metadata import MetadataManager from rialto.runner import Transformation diff --git a/rialto/jobs/decorators/resolver.py b/rialto/jobs/resolver.py similarity index 100% rename from rialto/jobs/decorators/resolver.py rename to rialto/jobs/resolver.py diff --git a/rialto/jobs/decorators/test_utils.py b/rialto/jobs/test_utils.py similarity index 84% rename from rialto/jobs/decorators/test_utils.py rename to rialto/jobs/test_utils.py index 39d76ce..3f6e3e2 100644 --- a/rialto/jobs/decorators/test_utils.py +++ b/rialto/jobs/test_utils.py @@ -19,8 +19,8 @@ from contextlib import contextmanager from unittest.mock import MagicMock, create_autospec, patch -from rialto.jobs.decorators.job_base import JobBase -from rialto.jobs.decorators.resolver import Resolver, ResolverException +from rialto.jobs.job_base import JobBase +from rialto.jobs.resolver import Resolver, ResolverException def _passthrough_decorator(*args, **kwargs) -> typing.Callable: @@ -33,12 +33,12 @@ def _passthrough_decorator(*args, **kwargs) -> typing.Callable: @contextmanager def _disable_job_decorators() -> None: patches = [ + patch("rialto.jobs.datasource", _passthrough_decorator), patch("rialto.jobs.decorators.datasource", _passthrough_decorator), - patch("rialto.jobs.decorators.decorators.datasource", _passthrough_decorator), - patch("rialto.jobs.decorators.config", _passthrough_decorator), - patch("rialto.jobs.decorators.decorators.config", _passthrough_decorator), + patch("rialto.jobs.config_parser", _passthrough_decorator), + patch("rialto.jobs.decorators.config_parser", _passthrough_decorator), + patch("rialto.jobs.job", _passthrough_decorator), patch("rialto.jobs.decorators.job", _passthrough_decorator), - patch("rialto.jobs.decorators.decorators.job", _passthrough_decorator), ] for i in patches: @@ -101,7 +101,7 @@ def __getitem__(self, func_name): return fake_method - with patch("rialto.jobs.decorators.resolver.Resolver._storage", SmartStorage()): + with patch("rialto.jobs.resolver.Resolver._storage", SmartStorage()): job().run(reader=MagicMock(), run_date=MagicMock(), spark=spark) return True diff --git a/tests/jobs/resources.py b/tests/jobs/resources.py index 60fda7b..273bf38 100644 --- a/tests/jobs/resources.py +++ b/tests/jobs/resources.py @@ -15,7 +15,7 @@ import pandas as pd -from rialto.jobs.decorators.job_base import JobBase +from rialto.jobs.job_base import JobBase def custom_callable(): diff --git a/tests/jobs/test_decorators.py b/tests/jobs/test_decorators.py index 54cb4a4..a09ee69 100644 --- a/tests/jobs/test_decorators.py +++ b/tests/jobs/test_decorators.py @@ -14,8 +14,8 @@ from importlib import import_module -from rialto.jobs.decorators.job_base import JobBase -from rialto.jobs.decorators.resolver import Resolver +from rialto.jobs.job_base import JobBase +from rialto.jobs.resolver import Resolver def test_dataset_decorator(): diff --git a/tests/jobs/test_job/dependency_tests_job.py b/tests/jobs/test_job/dependency_tests_job.py index 38e10ba..7452d02 100644 --- a/tests/jobs/test_job/dependency_tests_job.py +++ b/tests/jobs/test_job/dependency_tests_job.py @@ -1,4 +1,4 @@ -from rialto.jobs.decorators import datasource, job +from rialto.jobs import datasource, job @datasource @@ -47,5 +47,5 @@ def missing_dependency_job(a, x): @job -def default_dependency_job(run_date, spark, config, table_reader, feature_loader): +def default_dependency_job(run_date, spark, config): return 1 diff --git a/tests/jobs/test_job/test_job.py b/tests/jobs/test_job/test_job.py index 3d648b5..4e47364 100644 --- a/tests/jobs/test_job/test_job.py +++ b/tests/jobs/test_job/test_job.py @@ -11,10 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from rialto.jobs.decorators import config, datasource, job +from rialto.jobs import config_parser, datasource, job -@config +@config_parser def custom_config(): return "config_return" diff --git a/tests/jobs/test_job_base.py b/tests/jobs/test_job_base.py index 55fced1..2fb01ea 100644 --- a/tests/jobs/test_job_base.py +++ b/tests/jobs/test_job_base.py @@ -19,7 +19,7 @@ import pyspark.sql.types import tests.jobs.resources as resources -from rialto.jobs.decorators.resolver import Resolver +from rialto.jobs.resolver import Resolver from rialto.loader import PysparkFeatureLoader diff --git a/tests/jobs/test_resolver.py b/tests/jobs/test_resolver.py index df56b72..c6ccdb0 100644 --- a/tests/jobs/test_resolver.py +++ b/tests/jobs/test_resolver.py @@ -13,7 +13,7 @@ # limitations under the License. import pytest -from rialto.jobs.decorators.resolver import Resolver, ResolverException +from rialto.jobs.resolver import Resolver, ResolverException def test_simple_resolve_custom_name(): @@ -56,8 +56,8 @@ def test_register_resolve(mocker): def f(): return 7 - mocker.patch("rialto.jobs.decorators.resolver.Resolver.register_callable", return_value="f") - mocker.patch("rialto.jobs.decorators.resolver.Resolver.resolve") + mocker.patch("rialto.jobs.resolver.Resolver.register_callable", return_value="f") + mocker.patch("rialto.jobs.resolver.Resolver.resolve") Resolver.register_resolve(f) diff --git a/tests/jobs/test_test_utils.py b/tests/jobs/test_test_utils.py index 63884b4..e6ef9da 100644 --- a/tests/jobs/test_test_utils.py +++ b/tests/jobs/test_test_utils.py @@ -14,10 +14,10 @@ import pytest import rialto.jobs.decorators as decorators -import tests.jobs.test_job.test_job as test_job import tests.jobs.test_job.dependency_tests_job as dependency_tests_job -from rialto.jobs.decorators.resolver import Resolver -from rialto.jobs.decorators.test_utils import disable_job_decorators, resolver_resolves +import tests.jobs.test_job.test_job as test_job +from rialto.jobs.resolver import Resolver +from rialto.jobs.test_utils import disable_job_decorators, resolver_resolves def test_raw_dataset_patch(mocker):