Skip to content

Commit

Permalink
documenting changes
Browse files Browse the repository at this point in the history
  • Loading branch information
MDobransky committed Aug 29, 2024
1 parent 6427626 commit 714ad58
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 23 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ All notable changes to this project will be documented in this file.
- dependency date_col is now mandatory
- custom extras config is available in each pipeline and will be passed as dictionary available under pipeline_config.extras
- general section is renamed to runner
- info_date_shift is always a list
- transformation header changed
- added argument to skip dependency checking
- added overrides parameter to allow for dynamic overriding of config values
#### Jobs
- jobs are now the main way to create all pipelines
- config holder removed from jobs
Expand Down
57 changes: 57 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ This behavior can be modified by various parameters and switches available.
* **rerun** - rerun all jobs even if they already succeeded in the past runs
* **op** - run only selected operation / pipeline
* **skip_dependencies** - ignore dependency checks and run all jobs
* **overrides** - dictionary of overrides for the configuration


Transformations are not included in the runner itself, it imports them dynamically according to the configuration, therefore it's necessary to have them locally installed.
Expand Down Expand Up @@ -132,6 +133,62 @@ pipelines: # a list of pipelines to run
value: 6
```
The configuration can be dynamically overridden by providing a dictionary of overrides to the runner. All overrides must adhere to configurations schema, with pipeline.extras section available for custom schema.
Here are few examples of overrides:
#### Simple override of a single value
Specify the path to the value in the configuration file as a dot-separated string
```python
Runner(
spark,
config_path="tests/overrider.yaml",
run_date="2023-03-31",
overrides={"runner.watch_period_value": 4},
)
```

#### Override list element
You can refer to list elements by their index (starting with 0)
```python
overrides={"runner.mail.to[1]": "[email protected]"}
```

#### Append to list
You can append to list by using index -1
```python
overrides={"runner.mail.to[-1]": "[email protected]"}
```

#### Lookup by attribute value in a list
You can use the following syntax to find a specific element in a list by its attribute value
```python
overrides={"pipelines[name=SimpleGroup].target.target_schema": "new_schema"},
```

#### Injecting/Replacing whole sections
You can directly replace a bigger section of the configuration by providing a dictionary
When the whole section doesn't exist, it will be added to the configuration, however it needs to be added as a whole.
i.e. if the yaml file doesn't specify feature_loader, you can't just add a feature_loader.config_path, you need to add the whole section.
```python
overrides={"pipelines[name=SimpleGroup].feature_loader":
{"config_path": "features_cfg.yaml",
"feature_schema": "catalog.features",
"metadata_schema": "catalog.metadata"}}
```

#### Multiple overrides
You can provide multiple overrides at once, the order of execution is not guaranteed
```python
overrides={"runner.watch_period_value": 4,
"runner.watch_period_units": "weeks",
"pipelines[name=SimpleGroup].target.target_schema": "new_schema",
"pipelines[name=SimpleGroup].feature_loader":
{"config_path": "features_cfg.yaml",
"feature_schema": "catalog.features",
"metadata_schema": "catalog.metadata"}
}
```


## <a id="maker"></a> 2.2 - maker
Expand Down
4 changes: 2 additions & 2 deletions rialto/runner/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"get_pipelines_config",
]

from typing import Dict, List, Optional, Union
from typing import Dict, List, Optional

from pydantic import BaseModel

Expand All @@ -32,7 +32,7 @@ class IntervalConfig(BaseModel):
class ScheduleConfig(BaseModel):
frequency: str
day: Optional[int] = 0
info_date_shift: Union[Optional[IntervalConfig], List[IntervalConfig]] = IntervalConfig(units="days", value=0)
info_date_shift: List[IntervalConfig] = IntervalConfig(units="days", value=0)


class DependencyConfig(BaseModel):
Expand Down
4 changes: 2 additions & 2 deletions rialto/runner/config_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ def _override(config, path, value) -> Dict:
else:
raise IndexError(f"Index {index} out of bounds for key {key}")
else:
if key not in config:
raise ValueError(f"Invalid key {key}")
if len(path) == 1:
config[key] = value
else:
if key not in config:
raise ValueError(f"Invalid key {key}")
config[key] = _override(config[key], path[1:], value)
return config

Expand Down
18 changes: 4 additions & 14 deletions tests/runner/overrider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,22 @@ pipelines:
target:
target_schema: catalog.schema
target_partition_column: "INFORMATION_DATE"
loader:
feature_loader:
config_path: path/to/config.yaml
feature_schema: catalog.feature_tables
metadata_schema: catalog.metadata
metadata_manager:
metadata_schema: catalog.metadata
- name: GroupNoDeps
module:
python_module: tests.runner.transformations
python_class: SimpleGroup
schedule:
frequency: weekly
day: 7
info_date_shift:
value: 3
units: days
- name: NamedDeps
- name: OtherGroup
module:
python_module: tests.runner.transformations
python_class: SimpleGroup
schedule:
frequency: weekly
day: 7
info_date_shift:
value: 3
units: days
- value: 3
units: days
dependencies:
- table: source.schema.dep1
name: source1
Expand Down
34 changes: 33 additions & 1 deletion tests/runner/test_overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,38 @@ def test_invalid_key(spark):
spark,
config_path="tests/runner/overrider.yaml",
run_date="2023-03-31",
overrides={"runner.mail.test": "test"},
overrides={"runner.mail.test.param": "test"},
)
assert error.value.args[0] == "Invalid key test"


def test_replace_section(spark):
runner = Runner(
spark,
config_path="tests/runner/overrider.yaml",
run_date="2023-03-31",
overrides={
"pipelines[name=SimpleGroup].feature_loader": {
"config_path": "features_cfg.yaml",
"feature_schema": "catalog.features",
"metadata_schema": "catalog.metadata",
}
},
)
assert runner.config.pipelines[0].feature_loader.feature_schema == "catalog.features"


def test_add_section(spark):
runner = Runner(
spark,
config_path="tests/runner/overrider.yaml",
run_date="2023-03-31",
overrides={
"pipelines[name=OtherGroup].feature_loader": {
"config_path": "features_cfg.yaml",
"feature_schema": "catalog.features",
"metadata_schema": "catalog.metadata",
}
},
)
assert runner.config.pipelines[1].feature_loader.feature_schema == "catalog.features"
8 changes: 4 additions & 4 deletions tests/runner/transformations/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ pipelines:
frequency: weekly
day: 7
info_date_shift:
value: 3
units: days
- value: 3
units: days
dependencies:
- table: source.schema.dep1
interval:
Expand Down Expand Up @@ -65,8 +65,8 @@ pipelines:
frequency: weekly
day: 7
info_date_shift:
value: 3
units: days
- value: 3
units: days
dependencies:
- table: source.schema.dep1
name: source1
Expand Down

0 comments on commit 714ad58

Please sign in to comment.