-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8e18da2
commit 1699043
Showing
1 changed file
with
17 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,31 +53,22 @@ runner() | |
A runner by default executes all the jobs provided in the configuration file, for all the viable execution dates according to the configuration file for which the job has not yet run successfully (i.e. the date partition doesn't exist on the storage) | ||
This behavior can be modified by various parameters and switches available. | ||
|
||
* **feature_metadata_schema** - path to schema where feature metadata are read and stored, needed for [maker](#maker) jobs and jobs that utilized feature [loader](#loader) | ||
* **run_date** - date at which the runner is triggered (defaults to day of running) | ||
* **date_from** - starting date (defaults to rundate - config watch period) | ||
* **date_until** - end date (defaults to rundate) | ||
* **feature_store_schema** - location of features, needed for jobs utilizing feature [loader](#loader) | ||
* **custom_job_config** - dictionary with key-value pairs that will be accessible under the "config" variable in your rialto jobs | ||
* **rerun** - rerun all jobs even if they already succeeded in the past runs | ||
* **op** - run only selected operation / pipeline | ||
|
||
* **skip_dependencies** - ignore dependency checks and run all jobs | ||
|
||
|
||
Transformations are not included in the runner itself, it imports them dynamically according to the configuration, therefore it's necessary to have them locally installed. | ||
|
||
A runner created table has will have automatically created **rialto_date_column** table property set according to target partition set in the configuration. | ||
|
||
### Configuration | ||
|
||
```yaml | ||
general: | ||
target_schema: catalog.schema # schema where tables will be created, must exist | ||
target_partition_column: INFORMATION_DATE # date to partition new tables on | ||
source_date_column_property: rialto_date_column # name of the date property on source tables | ||
runner: | ||
watched_period_units: "months" # unit of default run period | ||
watched_period_value: 2 # value of default run period | ||
job: "run" # run for running the pipelines, check for only checking dependencies | ||
mail: | ||
to: # a list of email addresses | ||
- [email protected] | ||
|
@@ -100,7 +91,7 @@ pipelines: # a list of pipelines to run | |
dependencies: # list of dependent tables | ||
- table: catalog.schema.table1 | ||
name: "table1" # Optional table name, used to recall dependency details in transformation | ||
date_col: generation_date # Optional date column name, takes priority | ||
date_col: generation_date # Mandatory date column name | ||
interval: # mandatory availability interval, subtracted from scheduled day | ||
units: "days" | ||
value: 1 | ||
|
@@ -109,6 +100,18 @@ pipelines: # a list of pipelines to run | |
interval: | ||
units: "months" | ||
value: 1 | ||
target: | ||
target_schema: catalog.schema # schema where tables will be created, must exist | ||
target_partition_column: INFORMATION_DATE # date to partition new tables on | ||
metadata_manager: # optional | ||
metadata_schema: catalog.metadata # schema where metadata is stored | ||
feature_loader: # optional | ||
config_path: model_features_config.yaml # path to the feature loader configuration file | ||
feature_schema: catalog.feature_tables # schema where feature tables are stored | ||
metadata_schema: catalog.metadata # schema where metadata is stored | ||
extras: #optional arguments processed as dictionary | ||
some_value: 3 | ||
some_other_value: giraffe | ||
|
||
- name: PipelineTable1 # will be written as pipeline_table1 | ||
module: | ||
|
@@ -302,6 +305,7 @@ We have a set of pre-defined dependencies: | |
* **dependencies** returns a dictionary containing the job dependencies config | ||
* **table_reader** returns *TableReader* | ||
* **feature_loader** provides *PysparkFeatureLoader* | ||
* **metadata_manager** provides *MetadataManager* | ||
|
||
Apart from that, each **datasource** also becomes a fully usable dependency. Note, that this means that datasources can also be dependent on other datasources - just beware of any circular dependencies! | ||
|
||
|
@@ -575,7 +579,7 @@ from datetime import datetime | |
start = datetime.strptime("2020-01-01", "%Y-%m-%d").date() | ||
end = datetime.strptime("2024-01-01", "%Y-%m-%d").date() | ||
|
||
df = reader.get_table(table="catalog.schema.table", date_from=start, date_to=end) | ||
df = reader.get_table(table="catalog.schema.table", date_from=start, date_to=end, date_column="information_date") | ||
``` | ||
|
||
usage of _get_latest_: | ||
|
@@ -595,21 +599,6 @@ For full information on parameters and their optionality see technical documenta | |
_TableReader_ needs an active spark session and an information which column is the **date column**. | ||
There are three options how to pass that information on. | ||
|
||
In order of priority from highest: | ||
* Explicit _date_column_ parameter in _get_table_ and _get_latest_ | ||
```python | ||
reader.get_latest(table="catalog.schema.table", date_column="information_date") | ||
``` | ||
* Inferred from delta metadata, triggered by init parameter, only works on delta tables (e.g. doesn't work on views) | ||
```python | ||
reader = TableReader(spark=spark_instance, infer_partition=True) | ||
reader.get_latest(table="catalog.schema.table") | ||
``` | ||
* A custom sql property defined on the table containing the date column name, defaults to _rialto_date_column_ | ||
```python | ||
reader = TableReader(spark=spark_instance, date_property="rialto_date_column") | ||
reader.get_latest(table="catalog.schema.table") | ||
``` | ||
|
||
# <a id="contributing"></a> 3. Contributing | ||
Contributing: | ||
|