Skip to content

Commit

Permalink
renamed config, flatten structure
Browse files Browse the repository at this point in the history
  • Loading branch information
MDobransky committed Sep 5, 2024
1 parent 93815c2 commit f22301a
Show file tree
Hide file tree
Showing 14 changed files with 57 additions and 58 deletions.
38 changes: 26 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ pipelines: # a list of pipelines to run
frequency: weekly # daily/weekly/monthly
day: 7 # day of the week or month
info_date_shift: #Optional shift in the written information date from the scheduled day
units: "days" # days/weeks/months/years
value: 5 # subtracted from scheduled day
- units: "days" # days/weeks/months/years
value: 5 # subtracted from scheduled day
dependencies: # list of dependent tables
- table: catalog.schema.table1
name: "table1" # Optional table name, used to recall dependency details in transformation
Expand Down Expand Up @@ -372,7 +372,7 @@ With that sorted out, we can now provide a quick example of the *rialto.jobs* mo
```python
from pyspark.sql import DataFrame
from rialto.common import TableReader
from rialto.jobs.decorators import config, job, datasource
from rialto.jobs.decorators import config_parser, job, datasource
from rialto.runner.config_loader import PipelineConfig
from pydantic import BaseModel

Expand All @@ -381,10 +381,12 @@ class ConfigModel(BaseModel):
some_value: int
some_other_value: str

@config

@config_parser
def my_config(config: PipelineConfig):
return ConfigModel(**config.extras)


@datasource
def my_datasource(run_date: datetime.date, table_reader: TableReader) -> DataFrame:
return table_reader.get_latest("my_catalog.my_schema.my_table", date_until=run_date)
Expand Down Expand Up @@ -449,37 +451,49 @@ def my_job(datasource_a, ...)
... code ...
```
The *disable_job_decorators* context manager, as the name suggests, disables all decorator functionality and lets you access your functions as raw functions - making it super simple to unit-test:

```python3
from rialto.jobs.decorators.test_utils import disable_job_decorators
from rialto.jobs.test_utils import disable_job_decorators
import my_package.test_job_module as tjm


# Datasource Testing
def test_datasource_a():
... mocks here ...
...
mocks
here...

with disable_job_decorators(tjm):
datasource_a_output = tjm.datasource_a(... mocks ...)
datasource_a_output = tjm.datasource_a(...
mocks...)

...
asserts...

... asserts ...

# Job Testing
def test_my_job():
datasource_a_mock = ...
... other mocks...
...
other
mocks...

with disable_job_decorators(tjm):
job_output = tjm.my_job(datasource_a_mock, ... mocks ...)
job_output = tjm.my_job(datasource_a_mock, ...
mocks...)

... asserts ...
...
asserts...
```

#### 2. Testing the @job Dependency Tree
In complex use cases, it may happen that the dependencies of a job become quite complex. Or you simply want to be sure that you didn't accidentally misspelled your dependency name:

```python3
from rialto.jobs.decorators.test_utils import resolver_resolves
from rialto.jobs.test_utils import resolver_resolves
import my_job.test_job_module as tjm


def test_my_job_resolves(spark):
assert resolver_resolves(spark, tjm.my_job)
```
Expand Down
2 changes: 1 addition & 1 deletion rialto/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from rialto.jobs.decorators import config, datasource, job
from rialto.jobs.decorators import config_parser, datasource, job
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__all__ = ["datasource", "job", "config"]
__all__ = ["datasource", "job", "config_parser"]

import inspect
import typing

import importlib_metadata
from loguru import logger

from rialto.jobs.decorators.job_base import JobBase
from rialto.jobs.decorators.resolver import Resolver
from rialto.jobs.job_base import JobBase
from rialto.jobs.resolver import Resolver


def config(ds_getter: typing.Callable) -> typing.Callable:
def config_parser(cf_getter: typing.Callable) -> typing.Callable:
"""
Config parser functions decorator.
Registers a config parsing function into a rialto job prerequisite.
You can then request the job via job function arguments.
:param ds_getter: dataset reader function
:return: raw reader function, unchanged
:param cf_getter: dataset reader function
:return: raw function, unchanged
"""
Resolver.register_callable(ds_getter)
return ds_getter
Resolver.register_callable(cf_getter)
return cf_getter


def datasource(ds_getter: typing.Callable) -> typing.Callable:
Expand Down
15 changes: 0 additions & 15 deletions rialto/jobs/decorators/__init__.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pyspark.sql import DataFrame, SparkSession

from rialto.common import TableReader
from rialto.jobs.decorators.resolver import Resolver
from rialto.jobs.resolver import Resolver
from rialto.loader import PysparkFeatureLoader
from rialto.metadata import MetadataManager
from rialto.runner import Transformation
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from contextlib import contextmanager
from unittest.mock import MagicMock, create_autospec, patch

from rialto.jobs.decorators.job_base import JobBase
from rialto.jobs.decorators.resolver import Resolver, ResolverException
from rialto.jobs.job_base import JobBase
from rialto.jobs.resolver import Resolver, ResolverException


def _passthrough_decorator(*args, **kwargs) -> typing.Callable:
Expand All @@ -33,12 +33,12 @@ def _passthrough_decorator(*args, **kwargs) -> typing.Callable:
@contextmanager
def _disable_job_decorators() -> None:
patches = [
patch("rialto.jobs.datasource", _passthrough_decorator),
patch("rialto.jobs.decorators.datasource", _passthrough_decorator),
patch("rialto.jobs.decorators.decorators.datasource", _passthrough_decorator),
patch("rialto.jobs.decorators.config", _passthrough_decorator),
patch("rialto.jobs.decorators.decorators.config", _passthrough_decorator),
patch("rialto.jobs.config_parser", _passthrough_decorator),
patch("rialto.jobs.decorators.config_parser", _passthrough_decorator),
patch("rialto.jobs.job", _passthrough_decorator),
patch("rialto.jobs.decorators.job", _passthrough_decorator),
patch("rialto.jobs.decorators.decorators.job", _passthrough_decorator),
]

for i in patches:
Expand Down Expand Up @@ -101,7 +101,7 @@ def __getitem__(self, func_name):

return fake_method

with patch("rialto.jobs.decorators.resolver.Resolver._storage", SmartStorage()):
with patch("rialto.jobs.resolver.Resolver._storage", SmartStorage()):
job().run(reader=MagicMock(), run_date=MagicMock(), spark=spark)

return True
2 changes: 1 addition & 1 deletion tests/jobs/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import pandas as pd

from rialto.jobs.decorators.job_base import JobBase
from rialto.jobs.job_base import JobBase


def custom_callable():
Expand Down
4 changes: 2 additions & 2 deletions tests/jobs/test_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

from importlib import import_module

from rialto.jobs.decorators.job_base import JobBase
from rialto.jobs.decorators.resolver import Resolver
from rialto.jobs.job_base import JobBase
from rialto.jobs.resolver import Resolver


def test_dataset_decorator():
Expand Down
4 changes: 2 additions & 2 deletions tests/jobs/test_job/dependency_tests_job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from rialto.jobs.decorators import datasource, job
from rialto.jobs import datasource, job


@datasource
Expand Down Expand Up @@ -47,5 +47,5 @@ def missing_dependency_job(a, x):


@job
def default_dependency_job(run_date, spark, config, table_reader, feature_loader):
def default_dependency_job(run_date, spark, config):
return 1
4 changes: 2 additions & 2 deletions tests/jobs/test_job/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from rialto.jobs.decorators import config, datasource, job
from rialto.jobs import config_parser, datasource, job


@config
@config_parser
def custom_config():
return "config_return"

Expand Down
2 changes: 1 addition & 1 deletion tests/jobs/test_job_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pyspark.sql.types

import tests.jobs.resources as resources
from rialto.jobs.decorators.resolver import Resolver
from rialto.jobs.resolver import Resolver
from rialto.loader import PysparkFeatureLoader


Expand Down
6 changes: 3 additions & 3 deletions tests/jobs/test_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
import pytest

from rialto.jobs.decorators.resolver import Resolver, ResolverException
from rialto.jobs.resolver import Resolver, ResolverException


def test_simple_resolve_custom_name():
Expand Down Expand Up @@ -56,8 +56,8 @@ def test_register_resolve(mocker):
def f():
return 7

mocker.patch("rialto.jobs.decorators.resolver.Resolver.register_callable", return_value="f")
mocker.patch("rialto.jobs.decorators.resolver.Resolver.resolve")
mocker.patch("rialto.jobs.resolver.Resolver.register_callable", return_value="f")
mocker.patch("rialto.jobs.resolver.Resolver.resolve")

Resolver.register_resolve(f)

Expand Down
6 changes: 3 additions & 3 deletions tests/jobs/test_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import pytest

import rialto.jobs.decorators as decorators
import tests.jobs.test_job.test_job as test_job
import tests.jobs.test_job.dependency_tests_job as dependency_tests_job
from rialto.jobs.decorators.resolver import Resolver
from rialto.jobs.decorators.test_utils import disable_job_decorators, resolver_resolves
import tests.jobs.test_job.test_job as test_job
from rialto.jobs.resolver import Resolver
from rialto.jobs.test_utils import disable_job_decorators, resolver_resolves


def test_raw_dataset_patch(mocker):
Expand Down

0 comments on commit f22301a

Please sign in to comment.