Skip to content

Commit

Permalink
Merge pull request #1150 from mindsdb/staging
Browse files Browse the repository at this point in the history
Release 23.6.2.0
  • Loading branch information
paxcema authored Jun 13, 2023
2 parents 6aaefff + 7eaf17e commit 51a3f03
Show file tree
Hide file tree
Showing 22 changed files with 544 additions and 841 deletions.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion lightwood/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__title__ = 'lightwood'
__package_name__ = 'lightwood'
__version__ = '23.5.1.1'
__version__ = '23.6.2.0'
__description__ = "Lightwood is a toolkit for automatic machine learning model building"
__email__ = "[email protected]"
__author__ = 'MindsDB Inc'
Expand Down
3 changes: 0 additions & 3 deletions lightwood/analysis/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from dataprep_ml import StatisticalAnalysis

from lightwood.helpers.log import log
from lightwood.helpers.ts import filter_ds
from type_infer.dtype import dtype
from lightwood.ensemble import BaseEnsemble
from lightwood.analysis.base import BaseAnalysisBlock
Expand Down Expand Up @@ -60,8 +59,6 @@ def model_analyzer(
normal_predictions = None

if len(analysis_blocks) > 0:
filtered_df = filter_ds(encoded_val_data, tss)
encoded_val_data = EncodedDs(encoded_val_data.encoders, filtered_df, encoded_val_data.target)
normal_predictions = predictor(encoded_val_data, args=PredictionArguments.from_dict(args))
normal_predictions = normal_predictions.set_index(encoded_val_data.data_frame.index)

Expand Down
1 change: 1 addition & 0 deletions lightwood/analysis/helpers/feature_importance.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def analyze(self, info: Dict[str, object], **kwargs) -> Dict[str, object]:
shuffle_data = deepcopy(ref_data)
shuffle_data.clear_cache()
shuffle_data.data_frame[col] = shuffle(shuffle_data.data_frame[col].values)
shuffle_data.build_cache() # TODO: bottleneck, add a method to build a single column instead!

shuffled_preds = ns.predictor(shuffle_data, args=PredictionArguments.from_dict(args))
shuffled_col_accuracy[col] = np.mean(list(evaluate_accuracies(
Expand Down
42 changes: 31 additions & 11 deletions lightwood/api/json_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def lookup_encoder(
dtype.binary: "BinaryEncoder",
dtype.categorical: "CategoricalAutoEncoder"
if statistical_analysis is None
or len(statistical_analysis.histograms[col_name]) > 100
or len(statistical_analysis.histograms[col_name]['x']) > 16
else "OneHotEncoder",
dtype.tags: "MultiHotEncoder",
dtype.date: "DatetimeEncoder",
Expand Down Expand Up @@ -617,7 +617,6 @@ def _add_implicit_values(json_ai: JsonAI) -> JsonAI:
mixers[i]["args"]["target_encoder"] = mixers[i]["args"].get(
"target_encoder", "$encoders[self.target]"
)
mixers[i]["args"]["use_optuna"] = True

elif mixers[i]["module"] == "LightGBMArray":
mixers[i]["args"]["input_cols"] = mixers[i]["args"].get(
Expand Down Expand Up @@ -944,14 +943,17 @@ def code_from_json_ai(json_ai: JsonAI) -> str:
parallel_encoding = parallel_encoding_check(data['train'], self.encoders)
if parallel_encoding:
log.debug('Preparing in parallel...')
for col_name, encoder in self.encoders.items():
if col_name != self.target and not encoder.is_trainable_encoder:
prepped_encoders[col_name] = (encoder, concatenated_train_dev[col_name], 'prepare')
prepped_encoders = mut_method_call(prepped_encoders)
else:
log.debug('Preparing sequentially...')
for col_name, encoder in self.encoders.items():
if col_name != self.target and not encoder.is_trainable_encoder:
log.debug(f'Preparing encoder for {{col_name}}...')
encoder.prepare(concatenated_train_dev[col_name])
prepped_encoders[col_name] = encoder
Expand Down Expand Up @@ -997,7 +999,22 @@ def code_from_json_ai(json_ai: JsonAI) -> str:
feature_body = f"""
log.info('Featurizing the data')
feature_data = {{ key: EncodedDs(self.encoders, data, self.target) for key, data in split_data.items() if key != "stratified_on"}}
tss = self.problem_definition.timeseries_settings
feature_data = dict()
for key, data in split_data.items():
if key != 'stratified_on':
# compute and store two splits - full and filtered (useful for time series post-train analysis)
if key not in self.feature_cache:
featurized_split = EncodedDs(self.encoders, data, self.target)
filtered_subset = EncodedDs(self.encoders, filter_ts(data, tss), self.target)
for k, s in zip((key, f'{{key}}_filtered'), (featurized_split, filtered_subset)):
self.feature_cache[k] = s
for k in (key, f'{{key}}_filtered'):
feature_data[k] = self.feature_cache[k]
return feature_data
Expand All @@ -1018,9 +1035,7 @@ def code_from_json_ai(json_ai: JsonAI) -> str:
# Extract the featurized data into train/dev/test
encoded_train_data = enc_data['train']
encoded_dev_data = enc_data['dev']
encoded_test_data = enc_data['test']
filtered_df = filter_ds(encoded_test_data, self.problem_definition.timeseries_settings)
encoded_test_data = EncodedDs(encoded_test_data.encoders, filtered_df, encoded_test_data.target)
encoded_test_data = enc_data['test_filtered']
log.info('Training the mixers')
Expand Down Expand Up @@ -1174,6 +1189,7 @@ def code_from_json_ai(json_ai: JsonAI) -> str:
enc_train_test["dev"]]).data_frame,
adjust_args={'learn_call': True})
self.feature_cache = dict() # empty feature cache to avoid large predictor objects
"""
learn_body = align(learn_body, 2)
# ----------------- #
Expand Down Expand Up @@ -1208,13 +1224,14 @@ def code_from_json_ai(json_ai: JsonAI) -> str:
log.info(f'[Predict phase 3/{{n_phases}}] - Calling ensemble')
df = self.ensemble(encoded_ds, args=self.pred_args)
if self.pred_args.all_mixers:
return df
else:
if not self.pred_args.all_mixers:
log.info(f'[Predict phase 4/{{n_phases}}] - Analyzing output')
insights, global_insights = {call(json_ai.explainer)}
df, global_insights = {call(json_ai.explainer)}
self.global_insights = {{**self.global_insights, **global_insights}}
return insights
self.feature_cache = dict() # empty feature cache to avoid large predictor objects
return df
"""

predict_body = align(predict_body, 2)
Expand Down Expand Up @@ -1252,6 +1269,9 @@ def __init__(self):
self.runtime_log = dict()
self.global_insights = dict()
# Feature cache
self.feature_cache = dict()
@timed
def analyze_data(self, data: pd.DataFrame) -> None:
# Perform a statistical analysis on the unprocessed data
Expand Down
104 changes: 78 additions & 26 deletions lightwood/data/encoded_ds.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import inspect
from typing import List, Tuple
from typing import List, Tuple, Dict
import torch
import numpy as np
import pandas as pd
Expand All @@ -8,7 +8,7 @@


class EncodedDs(Dataset):
def __init__(self, encoders: List[BaseEncoder], data_frame: pd.DataFrame, target: str) -> None:
def __init__(self, encoders: Dict[str, BaseEncoder], data_frame: pd.DataFrame, target: str) -> None:
"""
Create a Lightwood datasource from a data frame and some encoders. This class inherits from `torch.utils.data.Dataset`.
Expand All @@ -21,10 +21,8 @@ def __init__(self, encoders: List[BaseEncoder], data_frame: pd.DataFrame, target
self.data_frame = data_frame
self.encoders = encoders
self.target = target
self.cache_encoded = True
self.cache = [None] * len(self.data_frame)
self.encoder_spans = {}
self.input_length = 0
self.input_length = 0 # feature tensor dim

# save encoder span, has to use same iterator as in __getitem__ for correct indeces
for col in self.data_frame:
Expand All @@ -33,6 +31,13 @@ def __init__(self, encoders: List[BaseEncoder], data_frame: pd.DataFrame, target
self.input_length + self.encoders[col].output_size)
self.input_length += self.encoders[col].output_size

# if cache enabled, we immediately build it
self.use_cache = True
self.cache_built = False
self.X_cache: torch.Tensor = torch.full((len(self.data_frame),), fill_value=torch.nan)
self.Y_cache: torch.Tensor = torch.full((len(self.data_frame),), fill_value=torch.nan)
self.build_cache()

def __len__(self):
"""
The length of an `EncodedDs` datasource equals the amount of rows of the original dataframe.
Expand All @@ -44,45 +49,65 @@ def __len__(self):
def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
"""
The getter yields a tuple (X, y), where:
- `X `is a concatenation of all encoded representations of the row
- `y` is the encoded target
- `X `is a concatenation of all encoded representations of the row. Size: (B, n_features)
- `y` is the encoded target. Size: (B, n_features)
:param idx: index of the row to access.
:return: tuple (X, y) with encoded data.
""" # noqa
if self.cache_encoded:
if self.cache[idx] is not None:
return self.cache[idx]
if self.use_cache and self.X_cache[idx] is not torch.nan:
X = self.X_cache[idx, :]
Y = self.Y_cache[idx]
else:
X, Y = self._encode_idxs([idx, ])
if self.use_cache:
self.X_cache[idx, :] = X
self.Y_cache[idx, :] = Y

return X, Y

def _encode_idxs(self, idxs: list):
if not isinstance(idxs, list):
raise Exception(f"Passed indexes is not an iterable. Check the type! Index: {idxs}")

X = torch.FloatTensor()
Y = torch.FloatTensor()
X = torch.zeros((len(idxs), self.input_length))
Y = torch.zeros((len(idxs),))
for col in self.data_frame:
if self.encoders.get(col, None):
kwargs = {}
if 'dependency_data' in inspect.signature(self.encoders[col].encode).parameters:
kwargs['dependency_data'] = {dep: [self.data_frame.iloc[idx][dep]]
kwargs['dependency_data'] = {dep: [self.data_frame.iloc[idxs][dep]]
for dep in self.encoders[col].dependencies}
if hasattr(self.encoders[col], 'data_window'):
cols = [self.target] + [f'{self.target}_timestep_{i}'
for i in range(1, self.encoders[col].data_window)]
data = [self.data_frame[cols].iloc[idx].tolist()]
data = self.data_frame[cols].iloc[idxs].values
else:
cols = [col]
data = self.data_frame[cols].iloc[idx].tolist()
data = self.data_frame[cols].iloc[idxs].values.flatten()

encoded_tensor = self.encoders[col].encode(data, **kwargs)[0]
encoded_tensor = self.encoders[col].encode(data, **kwargs)
if torch.isnan(encoded_tensor).any() or torch.isinf(encoded_tensor).any():
raise Exception(f'Encoded tensor: {encoded_tensor} contains nan or inf values, this tensor is \
the encoding of column {col} using {self.encoders[col].__class__}')
if col != self.target:
X = torch.cat([X, encoded_tensor])
a, b = self.encoder_spans[col]
X[:, a:b] = torch.squeeze(encoded_tensor, dim=list(range(2, len(encoded_tensor.shape))))

# target post-processing
else:
Y = encoded_tensor

if self.cache_encoded:
self.cache[idx] = (X, Y)
if len(encoded_tensor.shape) > 2:
Y = encoded_tensor.squeeze()

if len(encoded_tensor.shape) < 2:
Y = encoded_tensor.unsqueeze(1)

# else:
# Y = encoded_tensor.ravel()

return X, Y

Expand All @@ -102,20 +127,35 @@ def get_encoded_column_data(self, column_name: str) -> torch.Tensor:
:param column_name: name of the column.
:return: A `torch.Tensor` with the encoded data of the `column_name` column.
"""
if self.use_cache and self.cache_built:
if column_name == self.target and self.Y_cache is not None:
return self.Y_cache
elif self.X_cache is not torch.nan:
a, b = self.encoder_spans[column_name]
return self.X_cache[:, a:b]

kwargs = {}
if 'dependency_data' in inspect.signature(self.encoders[column_name].encode).parameters:
deps = [dep for dep in self.encoders[column_name].dependencies if dep in self.data_frame.columns]
kwargs['dependency_data'] = {dep: self.data_frame[dep].tolist() for dep in deps}
kwargs['dependency_data'] = {dep: self.data_frame[dep] for dep in deps}
encoded_data = self.encoders[column_name].encode(self.data_frame[column_name], **kwargs)
if torch.isnan(encoded_data).any() or torch.isinf(encoded_data).any():
raise Exception(f'Encoded tensor: {encoded_data} contains nan or inf values')

if not isinstance(encoded_data, torch.Tensor):
raise Exception(
f'The encoder: {self.encoders[column_name]} for column: {column_name} does not return a Tensor !')
f'The encoder: {self.encoders[column_name]} for column: {column_name} does not return a Tensor!')

if self.use_cache and not self.cache_built:
if column_name == self.target:
self.Y_cache = encoded_data
else:
a, b = self.encoder_spans[column_name]
self.X_cache = self.X_cache[:, a:b]

return encoded_data

def get_encoded_data(self, include_target=True) -> torch.Tensor:
def get_encoded_data(self, include_target: bool = True) -> torch.Tensor:
"""
Gets all encoded data.
Expand All @@ -129,17 +169,29 @@ def get_encoded_data(self, include_target=True) -> torch.Tensor:

return torch.cat(encoded_dfs, 1)

def build_cache(self):
""" This method builds a cache for the entire dataframe provided at initialization. """
if not self.use_cache:
raise RuntimeError("Cannot build a cache for EncodedDS with `use_cache` set to False.")

idxs = list(range(len(self.data_frame)))
X, Y = self._encode_idxs(idxs)
self.X_cache = X
self.Y_cache = Y
self.cache_built = True

def clear_cache(self):
"""
Clears the `EncodedDs` cache.
"""
self.cache = [None] * len(self.data_frame)
""" Clears the `EncodedDs` cache. """
self.X_cache = torch.full((len(self.data_frame),), fill_value=torch.nan)
self.Y_cache = torch.full((len(self.data_frame),), fill_value=torch.nan)
self.cache_built = False


class ConcatedEncodedDs(EncodedDs):
"""
`ConcatedEncodedDs` abstracts over multiple encoded datasources (`EncodedDs`) as if they were a single entity.
""" # noqa

# TODO: We should probably delete this abstraction, it's not really useful and it adds complexity/overhead
def __init__(self, encoded_ds_arr: List[EncodedDs]) -> None:
# @TODO: missing super() call here?
Expand Down
33 changes: 4 additions & 29 deletions lightwood/encoder/array/ts_num_array.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from typing import List, Dict, Iterable, Optional

import torch
import torch.nn.functional as F

from lightwood.encoder import BaseEncoder
from lightwood.encoder.numeric import TsNumericEncoder


class TsArrayNumericEncoder(BaseEncoder):
def __init__(self, timesteps: int, is_target: bool = False, positive_domain: bool = False, grouped_by=None):
def __init__(self, timesteps: int, is_target: bool = False, positive_domain: bool = False, grouped_by=None, nan=0):
"""
This encoder handles arrays of numerical time series data by wrapping the numerical encoder with behavior specific to time series tasks.
Expand All @@ -23,6 +22,7 @@ def __init__(self, timesteps: int, is_target: bool = False, positive_domain: boo
self.dependencies = grouped_by
self.data_window = timesteps
self.positive_domain = positive_domain
self.nan_value = nan
self.sub_encoder = TsNumericEncoder(is_target=is_target, positive_domain=positive_domain, grouped_by=grouped_by)
self.output_size = self.data_window * self.sub_encoder.output_size

Expand Down Expand Up @@ -52,34 +52,9 @@ def encode(self, data: Iterable[Iterable], dependency_data: Optional[Dict[str, s
if not dependency_data:
dependency_data = {'__default': [None] * len(data)}

ret = []
for series in data:
ret.append(self.encode_one(series, dependency_data=dependency_data))

return torch.vstack(ret)

def encode_one(self, data: Iterable, dependency_data: Optional[Dict[str, str]] = {}) -> torch.Tensor:
"""
Encodes a single windowed slice of any given time series.
ret = self.sub_encoder.encode(data, dependency_data=dependency_data)

:param data: windowed slice of a numerical time series.
:param dependency_data: used to determine the correct normalizer for the input.
:return: an encoded time series array, as per the underlying `TsNumericEncoder` object.
The output of this encoder for all time steps is concatenated, so the final shape of the tensor is (1, NxK), where N: self.data_window and K: sub-encoder # of output features.
""" # noqa
ret = []

for data_point in data:
ret.append(self.sub_encoder.encode([data_point], dependency_data=dependency_data))

ret = torch.hstack(ret)
padding_size = self.output_size - ret.shape[-1]

if padding_size > 0:
ret = F.pad(ret, (0, padding_size))

return ret
return torch.Tensor(ret).nan_to_num(self.nan_value)

def decode(self, encoded_values, dependency_data=None) -> List[List]:
"""
Expand Down
Loading

0 comments on commit 51a3f03

Please sign in to comment.