Skip to content

Commit

Permalink
RLG-3595 rialto v2 (#12)
Browse files Browse the repository at this point in the history
* v2 changes
  • Loading branch information
MDobransky authored Sep 3, 2024
1 parent f2be7cb commit 93815c2
Show file tree
Hide file tree
Showing 39 changed files with 844 additions and 908 deletions.
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,33 @@
# Change Log
All notable changes to this project will be documented in this file.

## 2.0.0 - 2024-mm-dd
#### Runner
- runner config now accepts environment variables
- restructured runner config
- added metadata and feature loader sections
- target moved to pipeline
- dependency date_col is now mandatory
- custom extras config is available in each pipeline and will be passed as dictionary available under pipeline_config.extras
- general section is renamed to runner
- info_date_shift is always a list
- transformation header changed
- added argument to skip dependency checking
- added overrides parameter to allow for dynamic overriding of config values
- removed date_from and date_to from arguments, use overrides instead
#### Jobs
- jobs are now the main way to create all pipelines
- config holder removed from jobs
- metadata_manager and feature_loader are now available arguments, depending on configuration
- added @config decorator, similar use case to @datasource, for parsing configuration
#### TableReader
- function signatures changed
- until -> date_until
- info_date_from -> date_from, info_date_to -> date_to
- date_column is now mandatory
- removed TableReaders ability to infer schema from partitions or properties
#### Loader
- removed DataLoader class, now only PysparkFeatureLoader is needed with additional parameters

## 1.3.0 - 2024-06-07

Expand Down
176 changes: 110 additions & 66 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

from pydantic import BaseModelfrom rialto.runner.config_loader import PipelineConfigfrom rialto.jobs import config

# Rialto

Expand Down Expand Up @@ -53,31 +53,21 @@ runner()
A runner by default executes all the jobs provided in the configuration file, for all the viable execution dates according to the configuration file for which the job has not yet run successfully (i.e. the date partition doesn't exist on the storage)
This behavior can be modified by various parameters and switches available.

* **feature_metadata_schema** - path to schema where feature metadata are read and stored, needed for [maker](#maker) jobs and jobs that utilized feature [loader](#loader)
* **run_date** - date at which the runner is triggered (defaults to day of running)
* **date_from** - starting date (defaults to rundate - config watch period)
* **date_until** - end date (defaults to rundate)
* **feature_store_schema** - location of features, needed for jobs utilizing feature [loader](#loader)
* **custom_job_config** - dictionary with key-value pairs that will be accessible under the "config" variable in your rialto jobs
* **rerun** - rerun all jobs even if they already succeeded in the past runs
* **op** - run only selected operation / pipeline

* **skip_dependencies** - ignore dependency checks and run all jobs
* **overrides** - dictionary of overrides for the configuration


Transformations are not included in the runner itself, it imports them dynamically according to the configuration, therefore it's necessary to have them locally installed.

A runner created table has will have automatically created **rialto_date_column** table property set according to target partition set in the configuration.

### Configuration

```yaml
general:
target_schema: catalog.schema # schema where tables will be created, must exist
target_partition_column: INFORMATION_DATE # date to partition new tables on
source_date_column_property: rialto_date_column # name of the date property on source tables
runner:
watched_period_units: "months" # unit of default run period
watched_period_value: 2 # value of default run period
job: "run" # run for running the pipelines, check for only checking dependencies
mail:
to: # a list of email addresses
- [email protected]
Expand All @@ -100,7 +90,7 @@ pipelines: # a list of pipelines to run
dependencies: # list of dependent tables
- table: catalog.schema.table1
name: "table1" # Optional table name, used to recall dependency details in transformation
date_col: generation_date # Optional date column name, takes priority
date_col: generation_date # Mandatory date column name
interval: # mandatory availability interval, subtracted from scheduled day
units: "days"
value: 1
Expand All @@ -109,6 +99,18 @@ pipelines: # a list of pipelines to run
interval:
units: "months"
value: 1
target:
target_schema: catalog.schema # schema where tables will be created, must exist
target_partition_column: INFORMATION_DATE # date to partition new tables on
metadata_manager: # optional
metadata_schema: catalog.metadata # schema where metadata is stored
feature_loader: # optional
config_path: model_features_config.yaml # path to the feature loader configuration file
feature_schema: catalog.feature_tables # schema where feature tables are stored
metadata_schema: catalog.metadata # schema where metadata is stored
extras: #optional arguments processed as dictionary
some_value: 3
some_other_value: giraffe

- name: PipelineTable1 # will be written as pipeline_table1
module:
Expand All @@ -127,8 +129,67 @@ pipelines: # a list of pipelines to run
interval:
units: "days"
value: 6
target:
target_schema: catalog.schema # schema where tables will be created, must exist
target_partition_column: INFORMATION_DATE # date to partition new tables on
```
The configuration can be dynamically overridden by providing a dictionary of overrides to the runner. All overrides must adhere to configurations schema, with pipeline.extras section available for custom schema.
Here are few examples of overrides:
#### Simple override of a single value
Specify the path to the value in the configuration file as a dot-separated string
```python
Runner(
spark,
config_path="tests/overrider.yaml",
run_date="2023-03-31",
overrides={"runner.watch_period_value": 4},
)
```

#### Override list element
You can refer to list elements by their index (starting with 0)
```python
overrides={"runner.mail.to[1]": "[email protected]"}
```

#### Append to list
You can append to list by using index -1
```python
overrides={"runner.mail.to[-1]": "[email protected]"}
```

#### Lookup by attribute value in a list
You can use the following syntax to find a specific element in a list by its attribute value
```python
overrides={"pipelines[name=SimpleGroup].target.target_schema": "new_schema"},
```

#### Injecting/Replacing whole sections
You can directly replace a bigger section of the configuration by providing a dictionary
When the whole section doesn't exist, it will be added to the configuration, however it needs to be added as a whole.
i.e. if the yaml file doesn't specify feature_loader, you can't just add a feature_loader.config_path, you need to add the whole section.
```python
overrides={"pipelines[name=SimpleGroup].feature_loader":
{"config_path": "features_cfg.yaml",
"feature_schema": "catalog.features",
"metadata_schema": "catalog.metadata"}}
```

#### Multiple overrides
You can provide multiple overrides at once, the order of execution is not guaranteed
```python
overrides={"runner.watch_period_value": 4,
"runner.watch_period_units": "weeks",
"pipelines[name=SimpleGroup].target.target_schema": "new_schema",
"pipelines[name=SimpleGroup].feature_loader":
{"config_path": "features_cfg.yaml",
"feature_schema": "catalog.features",
"metadata_schema": "catalog.metadata"}
}
```


## <a id="maker"></a> 2.2 - maker
Expand Down Expand Up @@ -302,6 +363,7 @@ We have a set of pre-defined dependencies:
* **dependencies** returns a dictionary containing the job dependencies config
* **table_reader** returns *TableReader*
* **feature_loader** provides *PysparkFeatureLoader*
* **metadata_manager** provides *MetadataManager*

Apart from that, each **datasource** also becomes a fully usable dependency. Note, that this means that datasources can also be dependent on other datasources - just beware of any circular dependencies!

Expand All @@ -310,19 +372,30 @@ With that sorted out, we can now provide a quick example of the *rialto.jobs* mo
```python
from pyspark.sql import DataFrame
from rialto.common import TableReader
from rialto.jobs.decorators import job, datasource
from rialto.jobs.decorators import config, job, datasource
from rialto.runner.config_loader import PipelineConfig
from pydantic import BaseModel


class ConfigModel(BaseModel):
some_value: int
some_other_value: str

@config
def my_config(config: PipelineConfig):
return ConfigModel(**config.extras)

@datasource
def my_datasource(run_date: datetime.date, table_reader: TableReader) -> DataFrame:
return table_reader.get_latest("my_catalog.my_schema.my_table", until=run_date)
return table_reader.get_latest("my_catalog.my_schema.my_table", date_until=run_date)


@job
def my_job(my_datasource: DataFrame) -> DataFrame:
return my_datasource.withColumn("HelloWorld", F.lit(1))
def my_job(my_datasource: DataFrame, my_config: ConfigModel) -> DataFrame:
return my_datasource.withColumn("HelloWorld", F.lit(my_config.some_value))
```
This piece of code
1. creates a rialto transformation called *my_job*, which is then callable by the rialto runner.
This piece of code
1. creates a rialto transformation called *my_job*, which is then callable by the rialto runner.
2. It sources the *my_datasource* and then runs *my_job* on top of that datasource.
3. Rialto adds VERSION (of your package) and INFORMATION_DATE (as per config) columns automatically.
4. The rialto runner stores the final to a catalog, to a table according to the job's name.
Expand Down Expand Up @@ -383,20 +456,20 @@ import my_package.test_job_module as tjm
# Datasource Testing
def test_datasource_a():
... mocks here ...

with disable_job_decorators(tjm):
datasource_a_output = tjm.datasource_a(... mocks ...)

... asserts ...

# Job Testing
def test_my_job():
datasource_a_mock = ...
... other mocks...

with disable_job_decorators(tjm):
job_output = tjm.my_job(datasource_a_mock, ... mocks ...)

... asserts ...
```

Expand All @@ -418,37 +491,23 @@ This module is used to load features from feature store into your models and scr

Two public classes are exposed form this module. **DatabricksLoader**(DataLoader), **PysparkFeatureLoader**(FeatureLoaderInterface).

### DatabricksLoader
This is a support class for feature loader and provides the data reading capability from the feature store.

This class needs to be instantiated with an active spark session and a path to the feature store schema (in the format of "catalog_name.schema_name").
Optionally a date_column information can be passed, otherwise it defaults to use INFORMATION_DATE
```python
from rialto.loader import DatabricksLoader

data_loader = DatabricksLoader(spark= spark_instance, schema= "catalog.schema", date_column= "INFORMATION_DATE")
```

This class provides one method, read_group(...), which returns a whole feature group for selected date. This is mostly used inside feature loader.

### PysparkFeatureLoader

This class needs to be instantiated with an active spark session, data loader and a path to the metadata schema (in the format of "catalog_name.schema_name").

```python
from rialto.loader import PysparkFeatureLoader

feature_loader = PysparkFeatureLoader(spark= spark_instance, data_loader= data_loader_instance, metadata_schema= "catalog.schema")
feature_loader = PysparkFeatureLoader(spark= spark_instance, feature_schema="catalog.schema", metadata_schema= "catalog.schema2", date_column="information_date")
```

#### Single feature

```python
from rialto.loader import DatabricksLoader, PysparkFeatureLoader
from rialto.loader import PysparkFeatureLoader
from datetime import datetime

data_loader = DatabricksLoader(spark, "feature_catalog.feature_schema")
feature_loader = PysparkFeatureLoader(spark, data_loader, "metadata_catalog.metadata_schema")
feature_loader = PysparkFeatureLoader(spark, "feature_catalog.feature_schema", "metadata_catalog.metadata_schema")
my_date = datetime.strptime("2020-01-01", "%Y-%m-%d").date()

feature = feature_loader.get_feature(group_name="CustomerFeatures", feature_name="AGE", information_date=my_date)
Expand All @@ -459,11 +518,10 @@ metadata = feature_loader.get_feature_metadata(group_name="CustomerFeatures", fe
This method of data access is only recommended for experimentation, as the group schema can evolve over time.

```python
from rialto.loader import DatabricksLoader, PysparkFeatureLoader
from rialto.loader import PysparkFeatureLoader
from datetime import datetime

data_loader = DatabricksLoader(spark, "feature_catalog.feature_schema")
feature_loader = PysparkFeatureLoader(spark, data_loader, "metadata_catalog.metadata_schema")
feature_loader = PysparkFeatureLoader(spark, "feature_catalog.feature_schema", "metadata_catalog.metadata_schema")
my_date = datetime.strptime("2020-01-01", "%Y-%m-%d").date()

features = feature_loader.get_group(group_name="CustomerFeatures", information_date=my_date)
Expand All @@ -473,11 +531,10 @@ metadata = feature_loader.get_group_metadata(group_name="CustomerFeatures")
#### Configuration

```python
from rialto.loader import DatabricksLoader, PysparkFeatureLoader
from rialto.loader import PysparkFeatureLoader
from datetime import datetime

data_loader = DatabricksLoader(spark, "feature_catalog.feature_schema")
feature_loader = PysparkFeatureLoader(spark, data_loader, "metadata_catalog.metadata_schema")
feature_loader = PysparkFeatureLoader(spark, "feature_catalog.feature_schema", "metadata_catalog.metadata_schema")
my_date = datetime.strptime("2020-01-01", "%Y-%m-%d").date()

features = feature_loader.get_features_from_cfg(path="local/configuration/file.yaml", information_date=my_date)
Expand Down Expand Up @@ -563,6 +620,7 @@ reader = TableReader(spark=spark_instance)
```

usage of _get_table_:

```python
# get whole table
df = reader.get_table(table="catalog.schema.table", date_column="information_date")
Expand All @@ -573,40 +631,26 @@ from datetime import datetime
start = datetime.strptime("2020-01-01", "%Y-%m-%d").date()
end = datetime.strptime("2024-01-01", "%Y-%m-%d").date()

df = reader.get_table(table="catalog.schema.table", info_date_from=start, info_date_to=end)
df = reader.get_table(table="catalog.schema.table", date_from=start, date_to=end, date_column="information_date")
```

usage of _get_latest_:

```python
# most recent partition
df = reader.get_latest(table="catalog.schema.table", date_column="information_date")

# most recent partition until
until = datetime.strptime("2020-01-01", "%Y-%m-%d").date()

df = reader.get_latest(table="catalog.schema.table", until=until, date_column="information_date")
df = reader.get_latest(table="catalog.schema.table", date_until=until, date_column="information_date")

```
For full information on parameters and their optionality see technical documentation.

_TableReader_ needs an active spark session and an information which column is the **date column**.
There are three options how to pass that information on.

In order of priority from highest:
* Explicit _date_column_ parameter in _get_table_ and _get_latest_
```python
reader.get_latest(table="catalog.schema.table", date_column="information_date")
```
* Inferred from delta metadata, triggered by init parameter, only works on delta tables (e.g. doesn't work on views)
```python
reader = TableReader(spark=spark_instance, infer_partition=True)
reader.get_latest(table="catalog.schema.table")
```
* A custom sql property defined on the table containing the date column name, defaults to _rialto_date_column_
```python
reader = TableReader(spark=spark_instance, date_property="rialto_date_column")
reader.get_latest(table="catalog.schema.table")
```

# <a id="contributing"></a> 3. Contributing
Contributing:
Expand Down
Loading

0 comments on commit 93815c2

Please sign in to comment.