Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/2.0.0 #16

Merged
merged 10 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ extend-ignore =
D100,
D104,
D107,
E203,
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,37 @@
# Change Log
All notable changes to this project will be documented in this file.

## 2.0.0 - 2024-mm-dd
#### Runner
- runner config now accepts environment variables
- restructured runner config
- added metadata and feature loader sections
- target moved to pipeline
- dependency date_col is now mandatory
- custom extras config is available in each pipeline and will be passed as dictionary available under pipeline_config.extras
- general section is renamed to runner
- info_date_shift is always a list
- transformation header changed
- added argument to skip dependency checking
- added overrides parameter to allow for dynamic overriding of config values
- removed date_from and date_to from arguments, use overrides instead
#### Jobs
- jobs are now the main way to create all pipelines
- config holder removed from jobs
- metadata_manager and feature_loader are now available arguments, depending on configuration
- added @config decorator, similar use case to @datasource, for parsing configuration
- reworked Resolver + Added ModuleRegister
- datasources no longer just by importing, thus are no longer available for all jobs
- register_dependency_callable and register_dependency_module added to register datasources
- together, it's now possilbe to have 2 datasources with the same name, but different implementations for 2 jobs.
#### TableReader
- function signatures changed
- until -> date_until
- info_date_from -> date_from, info_date_to -> date_to
- date_column is now mandatory
- removed TableReaders ability to infer schema from partitions or properties
#### Loader
- removed DataLoader class, now only PysparkFeatureLoader is needed with additional parameters

## 1.3.0 - 2024-06-07

Expand Down
279 changes: 209 additions & 70 deletions README.md

Large diffs are not rendered by default.

707 changes: 378 additions & 329 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "rialto"

version = "1.3.1"
version = "2.0.0"

packages = [
{ include = "rialto" },
Expand Down Expand Up @@ -30,6 +30,8 @@ pytest-mock = "^3.11.1"
pandas = "^2.1.0"
flake8-broken-line = "^1.0.0"
loguru = "^0.7.2"
importlib-metadata = "^7.2.1"
numpy = "<2.0.0"

[tool.poetry.dev-dependencies]
pyspark = "^3.4.1"
Expand Down
2 changes: 1 addition & 1 deletion rialto/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from rialto.common.table_reader import TableReader
from rialto.common.table_reader import DataReader, TableReader
35 changes: 35 additions & 0 deletions rialto/common/env_yaml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os
import re

import yaml
from loguru import logger

__all__ = ["EnvLoader"]

# Regex pattern to capture variable and the rest of the string
_path_matcher = re.compile(r"(?P<before>.*)\$\{(?P<env_name>[^}^{:]+)(?::(?P<default_value>[^}^{]*))?\}(?P<after>.*)")


def _path_constructor(loader, node):
value = node.value
match = _path_matcher.search(value)
if match:
before = match.group("before")
after = match.group("after")
sub = os.getenv(match.group("env_name"), match.group("default_value"))
if sub is None:
raise ValueError(f"Environment variable {match.group('env_name')} has no assigned value")
new_value = before + sub + after
logger.info(f"Config: Replacing {value}, with {new_value}")
return new_value
return value


class EnvLoader(yaml.SafeLoader):
"""Custom loader that replaces values with environment variables"""

pass


EnvLoader.add_implicit_resolver("!env_substitute", _path_matcher, None)
EnvLoader.add_constructor("!env_substitute", _path_constructor)
70 changes: 23 additions & 47 deletions rialto/common/table_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession

from rialto.common.utils import get_date_col_property, get_delta_partition


class DataReader(metaclass=abc.ABCMeta):
"""
Expand All @@ -36,16 +34,15 @@ class DataReader(metaclass=abc.ABCMeta):
def get_latest(
self,
table: str,
until: Optional[datetime.date] = None,
date_column: str = None,
date_column: str,
date_until: Optional[datetime.date] = None,
uppercase_columns: bool = False,
) -> DataFrame:
"""
Get latest available date partition of the table until specified date

:param table: input table path
:param until: Optional until date (inclusive)
:param date_column: column to filter dates on, takes highest priority
:param date_until: Optional until date (inclusive)
:param uppercase_columns: Option to refactor all column names to uppercase
:return: Dataframe
"""
Expand All @@ -55,18 +52,17 @@ def get_latest(
def get_table(
self,
table: str,
info_date_from: Optional[datetime.date] = None,
info_date_to: Optional[datetime.date] = None,
date_column: str = None,
date_column: str,
date_from: Optional[datetime.date] = None,
date_to: Optional[datetime.date] = None,
uppercase_columns: bool = False,
) -> DataFrame:
"""
Get a whole table or a slice by selected dates

:param table: input table path
:param info_date_from: Optional date from (inclusive)
:param info_date_to: Optional date to (inclusive)
:param date_column: column to filter dates on, takes highest priority
:param date_from: Optional date from (inclusive)
:param date_to: Optional date to (inclusive)
:param uppercase_columns: Option to refactor all column names to uppercase
:return: Dataframe
"""
Expand All @@ -76,17 +72,13 @@ def get_table(
class TableReader(DataReader):
"""An implementation of data reader for databricks tables"""

def __init__(self, spark: SparkSession, date_property: str = "rialto_date_column", infer_partition: bool = False):
def __init__(self, spark: SparkSession):
"""
Init

:param spark:
:param date_property: Databricks table property specifying date column, take priority over inference
:param infer_partition: infer date column as tables partition from delta metadata
"""
self.spark = spark
self.date_property = date_property
self.infer_partition = infer_partition
super().__init__()

def _uppercase_column_names(self, df: DataFrame) -> DataFrame:
Expand All @@ -106,41 +98,26 @@ def _get_latest_available_date(self, df: DataFrame, date_col: str, until: Option
df = df.select(F.max(date_col)).alias("latest")
return df.head()[0]

def _get_date_col(self, table: str, date_column: str):
"""
Get tables date column

column specified at get_table/get_latest takes priority, if inference is enabled it
takes 2nd place, last resort is table property
"""
if date_column:
return date_column
elif self.infer_partition:
return get_delta_partition(self.spark, table)
else:
return get_date_col_property(self.spark, table, self.date_property)

def get_latest(
self,
table: str,
until: Optional[datetime.date] = None,
date_column: str = None,
date_column: str,
date_until: Optional[datetime.date] = None,
uppercase_columns: bool = False,
) -> DataFrame:
"""
Get latest available date partition of the table until specified date

:param table: input table path
:param until: Optional until date (inclusive)
:param date_until: Optional until date (inclusive)
:param date_column: column to filter dates on, takes highest priority
:param uppercase_columns: Option to refactor all column names to uppercase
:return: Dataframe
"""
date_col = self._get_date_col(table, date_column)
df = self.spark.read.table(table)

selected_date = self._get_latest_available_date(df, date_col, until)
df = df.filter(F.col(date_col) == selected_date)
selected_date = self._get_latest_available_date(df, date_column, date_until)
df = df.filter(F.col(date_column) == selected_date)

if uppercase_columns:
df = self._uppercase_column_names(df)
Expand All @@ -149,28 +126,27 @@ def get_latest(
def get_table(
self,
table: str,
info_date_from: Optional[datetime.date] = None,
info_date_to: Optional[datetime.date] = None,
date_column: str = None,
date_column: str,
date_from: Optional[datetime.date] = None,
date_to: Optional[datetime.date] = None,
uppercase_columns: bool = False,
) -> DataFrame:
"""
Get a whole table or a slice by selected dates

:param table: input table path
:param info_date_from: Optional date from (inclusive)
:param info_date_to: Optional date to (inclusive)
:param date_from: Optional date from (inclusive)
:param date_to: Optional date to (inclusive)
:param date_column: column to filter dates on, takes highest priority
:param uppercase_columns: Option to refactor all column names to uppercase
:return: Dataframe
"""
date_col = self._get_date_col(table, date_column)
df = self.spark.read.table(table)

if info_date_from:
df = df.filter(F.col(date_col) >= info_date_from)
if info_date_to:
df = df.filter(F.col(date_col) <= info_date_to)
if date_from:
df = df.filter(F.col(date_column) >= date_from)
if date_to:
df = df.filter(F.col(date_column) <= date_to)
if uppercase_columns:
df = self._uppercase_column_names(df)
return df
60 changes: 25 additions & 35 deletions rialto/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__all__ = ["load_yaml", "get_date_col_property", "get_delta_partition"]
__all__ = ["load_yaml", "cast_decimals_to_floats", "get_caller_module"]

import inspect
import os
from typing import Any
from typing import Any, List

import pyspark.sql.functions as F
import yaml
from pyspark.sql import DataFrame
from pyspark.sql.types import FloatType

from rialto.common.env_yaml import EnvLoader


def load_yaml(path: str) -> Any:
"""
Expand All @@ -34,50 +37,37 @@ def load_yaml(path: str) -> Any:
raise FileNotFoundError(f"Can't find {path}.")

with open(path, "r") as stream:
return yaml.safe_load(stream)
return yaml.load(stream, EnvLoader)


def get_date_col_property(spark, table: str, property: str) -> str:
def cast_decimals_to_floats(df: DataFrame) -> DataFrame:
"""
Retrieve a data column name from a given table property
Find all decimal types in the table and cast them to floats. Fixes errors in .toPandas() conversions.

:param spark: spark session
:param table: path to table
:param property: name of the property
:return: data column name
:param df: input df
:return: pyspark DataFrame with fixed types
"""
props = spark.sql(f"show tblproperties {table}")
date_col = props.filter(F.col("key") == property).select("value").collect()
if len(date_col):
return date_col[0].value
else:
raise RuntimeError(f"Table {table} has no property {property}.")
decimal_cols = [col_name for col_name, data_type in df.dtypes if "decimal" in data_type]
for c in decimal_cols:
df = df.withColumn(c, F.col(c).cast(FloatType()))

return df

def get_delta_partition(spark, table: str) -> str:
"""
Select first partition column of the delta table

:param table: full table name
:return: partition column name
def get_caller_module() -> Any:
"""
columns = spark.catalog.listColumns(table)
partition_columns = list(filter(lambda c: c.isPartition, columns))
if len(partition_columns):
return partition_columns[0].name
else:
raise RuntimeError(f"Delta table has no partitions: {table}.")
Ged module containing the function which is calling your function.

Inspects the call stack, where:
0th entry is this function
1st entry is the function which needs to know who called it
2nd entry is the calling function

def cast_decimals_to_floats(df: DataFrame) -> DataFrame:
"""
Find all decimal types in the table and cast them to floats. Fixes errors in .toPandas() conversions.
Therefore, we'll return a module which contains the function at the 2nd place on the stack.

:param df: pyspark DataFrame
:return: pyspark DataFrame with fixed types
:return: Python Module containing the calling function.
"""
decimal_cols = [col_name for col_name, data_type in df.dtypes if "decimal" in data_type]
for c in decimal_cols:
df = df.withColumn(c, F.col(c).cast(FloatType()))

return df
stack = inspect.stack()
last_stack = stack[2]
return inspect.getmodule(last_stack[0])
6 changes: 6 additions & 0 deletions rialto/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from rialto.jobs.decorators import config_parser, datasource, job
from rialto.jobs.module_register import (
register_dependency_callable,
register_dependency_module,
)
Loading
Loading