diff --git a/docs/.buildinfo b/docs/.buildinfo
index c05a61b00..bbb1f0912 100644
--- a/docs/.buildinfo
+++ b/docs/.buildinfo
@@ -1,4 +1,4 @@
# Sphinx build info version 1
# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done.
-config: 80ad8422a14367dc91c123d767b2edc6
+config: 76e6161839572ede51e212e970f19cd7
tags: 645f666f9bcd5a90fca523b33c5a78b7
diff --git a/docs/_modules/api/dtype.html b/docs/_modules/api/dtype.html
index 0e2f8f029..bf1dc52ce 100644
--- a/docs/_modules/api/dtype.html
+++ b/docs/_modules/api/dtype.html
@@ -7,7 +7,7 @@
-
# TODO: type hint the returns
-# TODO: df_std_dev is not clear in behavior; this would imply all std. of each column but that is not true, it should be renamed df_std_target_dev # noqafromtypingimportDict,List,Optional,Unionimportsys
@@ -323,7 +328,7 @@
Source code for api.types
in the information. :param nr_rows: Number of rows (samples) in the dataset
- :param df_std_dev: The standard deviation of the target of the dataset
+ :param df_target_stddev: The standard deviation of the target of the dataset :param train_observed_classes: :param target_class_distribution: :param histograms:
@@ -336,7 +341,7 @@
[docs]defcleaner(
+ data:pd.DataFrame,
+ dtype_dict:Dict[str,str],
+ pct_invalid:float,
+ identifiers:Dict[str,str],
+ target:str,
+ mode:str,
+ timeseries_settings:TimeseriesSettings,
+ anomaly_detection:bool,
+ custom_cleaning_functions:Dict[str,str]={}
+)->pd.DataFrame:
+ """
+ The cleaner is a function which takes in the raw data, plus additional information about it's types and about the problem. Based on this it generates a "clean" representation of the data, where each column has an ideal standardized type and all malformed or otherwise missing or invalid elements are turned into ``None``
+
+ :param data: The raw data
+ :param dtype_dict: Type information for each column
+ :param pct_invalid: How much of each column can be invalid
+ :param identifiers: A dict containing all identifier typed columns
+ :param target: The target columns
+ :param mode: Can be "predict" or "train"
+ :param timeseries_settings: Timeseries related settings, only relevant for timeseries predictors, otherwise can be the default object
+ :param anomaly_detection: Are we detecting anomalies with this predictor?
+
+ :returns: The cleaned data
+ """# noqa
+
+ data=_remove_columns(data,identifiers,target,mode,timeseries_settings,
+ anomaly_detection,dtype_dict)
+
+ forcolin_get_columns_to_clean(data,dtype_dict,mode,target):
+
+ # Get and apply a cleaning function for each data type
+ # If you want to customize the cleaner, it's likely you can to modify ``get_cleaning_func``
+ data[col]=data[col].apply(get_cleaning_func(dtype_dict[col],custom_cleaning_functions))
+
+ returndata
+
+
+def_check_if_invalid(new_data:pd.Series,pct_invalid:float,col_name:str):
+ """
+ Checks how many invalid data points there are. Invalid data points are flagged as "Nones" from the cleaning processs (see data/cleaner.py for default).
+ If there are too many invalid data points (specified by `pct_invalid`), then an error message will pop up. This is used as a safeguard for very messy data.
+
+ :param new_data: data to check for invalid values.
+ :param pct_invalid: maximum percentage of invalid values. If this threshold is surpassed, an exception is raised.
+ :param col_name: name of the column to analyze.
+
+ """# noqa
+
+ chk_invalid=(
+ 100
+ *(len(new_data)-len([xforxinnew_dataifxisnotNone]))
+ /len(new_data)
+ )
+
+ ifchk_invalid>pct_invalid:
+ err=f'Too many ({chk_invalid}%) invalid values in column {col_name}nam'
+ log.error(err)
+ raiseException(err)
+
+
+
[docs]defget_cleaning_func(data_dtype:dtype,custom_cleaning_functions:Dict[str,str])->Callable:
+ """
+ For the provided data type, provide the appropriate cleaning function. Below are the defaults, users can either override this function OR impose a custom block.
+
+ :param data_dtype: The data-type (inferred from a column) as prescribed from ``api.dtype``
+
+ :returns: The appropriate function that will pre-process (clean) data of specified dtype.
+ """# noqa
+ ifdata_dtypeincustom_cleaning_functions:
+ clean_func=eval(custom_cleaning_functions[data_dtype])
+
+ elifdata_dtypein(dtype.date,dtype.datetime):
+ clean_func=_standardize_datetime
+
+ elifdata_dtypein(dtype.float,dtype.tsarray):
+ clean_func=_clean_float
+
+ elifdata_dtypein(dtype.integer):
+ clean_func=_clean_int
+
+ elifdata_dtypein(dtype.array):
+ clean_func=_standardize_array
+
+ elifdata_dtypein(dtype.tags):
+ clean_func=_tags_to_tuples
+
+ elifdata_dtypein(dtype.quantity):
+ clean_func=_clean_quantity
+
+ elifdata_dtypein(
+ dtype.short_text,
+ dtype.rich_text,
+ dtype.categorical,
+ dtype.binary,
+ dtype.audio,
+ dtype.image,
+ dtype.video
+ ):
+ clean_func=_clean_text
+
+ else:
+ raiseValueError(f"{data_dtype} is not supported. Check lightwood.api.dtype")
+
+ returnclean_func
+
+
+# ------------------------- #
+# Temporal Cleaning
+# ------------------------- #
+
+
+def_standardize_datetime(element:object)->Optional[float]:
+ """
+ Parses an expected date-time element. Intakes an element that can in theory be anything.
+ """
+ try:
+ date=parse_dt(str(element))
+ exceptException:
+ try:
+ date=datetime.datetime.utcfromtimestamp(element)
+ exceptException:
+ returnNone
+
+ returndate.timestamp()
+
+
+# ------------------------- #
+# Tags/Sequences
+# ------------------------- #
+
+# TODO Make it split on something other than commas
+def_tags_to_tuples(tags_str:str)->Tuple[str]:
+ """
+ Converts comma-separated values into a tuple to preserve a sequence/array.
+
+ Ex:
+ >> x = 'apples, oranges, bananas'
+ >> _tags_to_tuples(x)
+ >> ('apples', 'oranges', 'bananas')
+ """
+ try:
+ returntuple([x.strip()forxintags_str.split(",")])
+ exceptException:
+ returntuple()
+
+
+def_standardize_array(element:object)->Optional[Union[List[float],float]]:
+ """
+ Given an array of numbers in the form ``[1, 2, 3, 4]``, converts into a numerical sequence.
+
+ :param element: An array-like element in a sequence
+ :returns: standardized array OR scalar number IF edge case
+
+ Ex of edge case:
+ >> element = [1]
+ >> _standardize_array(element)
+ >> 1
+ """
+ try:
+ element=str(element)
+ element=element.rstrip("]").lstrip("[")
+ element=element.rstrip(" ").lstrip(" ")
+ element=element.replace(", "," ").replace(","," ")
+ # Handles cases where arrays are numbers
+ if" "notinelement:
+ element=_clean_float(element)
+ else:
+ element=[float(x)forxinelement.split(" ")]
+ exceptException:
+ pass
+
+ returnelement
+
+
+# ------------------------- #
+# Integers/Floats/Quantities
+# ------------------------- #
+
+def_clean_float(element:object)->Optional[float]:
+ """
+ Given an element, converts it into float numeric format. If element is NaN, or inf, then returns None.
+ """
+ try:
+ cleaned_float=text.clean_float(element)
+ ifis_nan_numeric(cleaned_float):
+ returnNone
+ returncleaned_float
+ exceptException:
+ returnNone
+
+
+def_clean_int(element:object)->Optional[int]:
+ """
+ Given an element, converts it into integer numeric format. If element is NaN, or inf, then returns None.
+ """
+ element=_clean_float(element)
+ ifelementisnotNone:
+ element=int(element)
+ returnelement
+
+
+def_clean_quantity(element:object)->Optional[float]:
+ """
+ Given a quantity, clean and convert it into float numeric format. If element is NaN, or inf, then returns None.
+ """
+ element=float(re.sub("[^0-9.,]","",str(element)).replace(",","."))
+ return_clean_float(element)
+
+
+# ------------------------- #
+# Text
+# ------------------------- #
+def_clean_text(element:object)->str:
+ returnstr(element)
+
+
+# ------------------------- #
+# Other helpers
+# ------------------------- #
+def_rm_rows_w_empty_targets(df:pd.DataFrame,target:str)->pd.DataFrame:
+ """
+ Drop any rows that have targets as unknown. Targets are necessary to train.
+
+ :param df: The input dataframe including the target value
+ :param target: the column name that is the output target variable
+
+ :returns: Data with any target smissing
+ """
+ # Compare length before/after
+ len_before=len(df)
+
+ # Use Pandas ```dropna``` to omit any rows with missing values for targets; these cannot be trained
+ df=df.dropna(subset=[target])
+
+ # Compare length with after
+ len_after=len(df)
+ nr_removed=len_before-len_after
+
+ ifnr_removed!=0:
+ log.warning(
+ f"Removed {nr_removed} rows because target was missing. Training on these rows is not possible."
+ )# noqa
+
+ returndf
+
+
+def_remove_columns(data:pd.DataFrame,identifiers:Dict[str,object],target:str,
+ mode:str,timeseries_settings:TimeseriesSettings,anomaly_detection:bool,
+ dtype_dict:Dict[str,dtype])->pd.DataFrame:
+ """
+ Drop columns we don't want to use in order to train or predict
+
+ :param data: The raw data
+ :param dtype_dict: Type information for each column
+ :param identifiers: A dict containing all identifier typed columns
+ :param target: The target columns
+ :param mode: Can be "predict" or "train"
+ :param timeseries_settings: Timeseries related settings, only relevant for timeseries predictors, otherwise can be the default object
+ :param anomaly_detection: Are we detecting anomalies with this predictor?
+
+ :returns: A (new) dataframe without the dropped columns
+ """# noqa
+ data=deepcopy(data)
+ to_drop=[*[xforxinidentifiers.keys()ifx!=target],
+ *[xforxindata.columnsifxindtype_dictanddtype_dict[x]==dtype.invalid]]
+ exceptions=["__mdb_make_predictions"]
+ to_drop=[xforxinto_dropifxindata.columns]
+ data=data.drop(columns=to_drop)
+
+ ifmode=="train":
+ data=_rm_rows_w_empty_targets(data,target)
+ ifmode=="predict":
+ if(
+ targetindata.columns
+ andnottimeseries_settings.use_previous_target
+ andnotanomaly_detection
+ ):
+ data=data.drop(columns=[target])
+
+ # Drop extra columns
+ fornameinlist(data.columns):
+ ifnamenotindtype_dictandnamenotinexceptions:
+ data=data.drop(columns=[name])
+
+ returndata
+
+
+def_get_columns_to_clean(data:pd.DataFrame,dtype_dict:Dict[str,dtype],mode:str,target:str)->List[str]:
+ """
+ :param data: The raw data
+ :param dtype_dict: Type information for each column
+ :param target: The target columns
+ :param mode: Can be "predict" or "train"
+
+ :returns: A list of columns that we want to clean
+ """# noqa
+
+ cleanable_columns=[]
+ forname,_indtype_dict.items():
+ ifmode=="predict":
+ ifname==target:
+ continue
+ ifnameindata.columns:
+ cleanable_columns.append(name)
+ returncleanable_columns
+
[docs]classBaseAnalysisBlock:
+ """Class to be inherited by any analysis/explainer block."""
+ def__init__(self,
+ deps:Optional[Tuple]=()
+ ):
+
+ self.dependencies=deps# can be parallelized when there are no dependencies @TODO enforce
+
+
[docs]defanalyze(self,info:Dict[str,object],**kwargs)->Dict[str,object]:
+ """
+ This method should be called once during the analysis phase, or not called at all.
+ It computes any information that the block may either output to the model analysis object,
+ or use at inference time when `.explain()` is called (in this case, make sure all needed
+ objects are added to the runtime analyzer so that `.explain()` can access them).
+
+ :param info: Dictionary where any new information or objects are added. The next analysis block will use
+ the output of the previous block as a starting point.
+ :param kwargs: Dictionary with named variables from either the core analysis or the rest of the prediction
+ pipeline.
+ """
+ log.info(f"{self.__class__.__name__}.analyze() has not been implemented, no modifications will be done to the model analysis.")# noqa
+ returninfo
+
+
[docs]defexplain(self,
+ row_insights:pd.DataFrame,
+ global_insights:Dict[str,object],**kwargs)->Tuple[pd.DataFrame,Dict[str,object]]:
+ """
+ This method should be called once during the explaining phase at inference time, or not called at all.
+ Additional explanations can be at an instance level (row-wise) or global.
+ For the former, return a data frame with any new insights. For the latter, a dictionary is required.
+
+ :param row_insights: dataframe with previously computed row-level explanations.
+ :param global_insights: dict() with any explanations that concern all predicted instances or the model itself.
+
+ :returns:
+ - row_insights: modified input dataframe with any new row insights added here.
+ - global_insights: dict() with any explanations that concern all predicted instances or the model itself.
+ """
+ log.info(f"{self.__class__.__name__}.explain() has not been implemented, no modifications will be done to the data insights.")# noqa
+ returnrow_insights,global_insights
[docs]classAccStats(BaseAnalysisBlock):
+ """ Computes accuracy stats and a confusion matrix for the validation dataset """
+
+ def__init__(self,deps=('ICP',)):
+ super().__init__(deps=deps)# @TODO: enforce that this actually prevents early execution somehow
+
+
[docs]defanalyze(self,info:Dict[str,object],**kwargs)->Dict[str,object]:
+ ns=SimpleNamespace(**kwargs)
+
+ # @TODO: maybe pass ts_analysis to trigger group-wise MASE instead of R2 mean, though it wouldn't be 0-1 bounded
+ info['score_dict']=evaluate_accuracy(ns.data,ns.normal_predictions['prediction'],
+ ns.target,ns.accuracy_functions)
+ info['normal_accuracy']=np.mean(list(info['score_dict'].values()))
+
+ self.fit(ns,info['result_df'])
+ info['val_overall_acc'],info['acc_histogram'],info['cm'],info['acc_samples']=self.get_accuracy_stats()
+ returninfo
+
+ deffit(self,ns:SimpleNamespace,conf=Optional[np.ndarray]):
+ self.col_stats=ns.dtype_dict
+ self.target=ns.target
+ self.input_cols=list(ns.dtype_dict.keys())
+ self.buckets=ns.stats_info.bucketsifns.stats_info.bucketselse{}
+
+ self.normal_predictions_bucketized=[]
+ self.real_values_bucketized=[]
+ self.numerical_samples_arr=[]
+
+ column_indexes={}
+ fori,colinenumerate(self.input_cols):
+ column_indexes[col]=i
+
+ real_present_inputs_arr=[]
+ for_,rowinns.data.iterrows():
+ present_inputs=[1]*len(self.input_cols)
+ fori,colinenumerate(self.input_cols):
+ ifstr(row[col])in('None','nan','','Nan','NAN','NaN'):
+ present_inputs[i]=0
+ real_present_inputs_arr.append(present_inputs)
+
+ forninrange(len(ns.normal_predictions)):
+ row=ns.data.iloc[n]
+ real_value=row[self.target]
+ predicted_value=ns.normal_predictions.iloc[n]['prediction']
+
+ ifisinstance(predicted_value,list):
+ # T+N time series, for now we compare the T+1 prediction only @TODO: generalize
+ predicted_value=predicted_value[0]
+
+ predicted_value=predicted_value \
+ ifself.col_stats[self.target]notin[dtype.integer,dtype.float,dtype.quantity] \
+ elsefloat(predicted_value)
+
+ real_value=real_value \
+ ifself.col_stats[self.target]notin[dtype.integer,dtype.float,dtype.quantity] \
+ elsefloat(real_value)
+
+ ifself.buckets:
+ bucket=self.buckets[self.target]
+ predicted_value_b=get_value_bucket(predicted_value,bucket,self.col_stats[self.target])
+ real_value_b=get_value_bucket(real_value,bucket,self.col_stats[self.target])
+ else:
+ predicted_value_b=predicted_value
+ real_value_b=real_value
+
+ ifconfisnotNoneandself.col_stats[self.target]in[dtype.integer,dtype.float,dtype.quantity]:
+ predicted_range=conf.iloc[n][['lower','upper']].tolist()
+ else:
+ predicted_range=(predicted_value_b,predicted_value_b)
+
+ self.real_values_bucketized.append(real_value_b)
+ self.normal_predictions_bucketized.append(predicted_value_b)
+ ifconfisnotNoneandself.col_stats[self.target]in[dtype.integer,dtype.float,dtype.quantity]:
+ self.numerical_samples_arr.append((real_value,predicted_range))
+
+ defget_accuracy_stats(self,is_classification=None,is_numerical=None):
+ bucket_accuracy={}
+ bucket_acc_counts={}
+ fori,bucketinenumerate(self.normal_predictions_bucketized):
+ ifbucketnotinbucket_acc_counts:
+ bucket_acc_counts[bucket]=[]
+
+ iflen(self.numerical_samples_arr)!=0:
+ bucket_acc_counts[bucket].append(self.numerical_samples_arr[i][1][0]<
+ self.numerical_samples_arr[i][0]<self.numerical_samples_arr[i][1][1])# noqa
+ else:
+ bucket_acc_counts[bucket].append(1ifbucket==self.real_values_bucketized[i]else0)
+
+ forbucketinbucket_acc_counts:
+ bucket_accuracy[bucket]=sum(bucket_acc_counts[bucket])/len(bucket_acc_counts[bucket])
+
+ accuracy_count=[]
+ forcountsinlist(bucket_acc_counts.values()):
+ accuracy_count+=counts
+
+ overall_accuracy=sum(accuracy_count)/len(accuracy_count)
+
+ forbucketinrange(len(self.buckets)):
+ ifbucketnotinbucket_accuracy:
+ ifbucketinself.real_values_bucketized:
+ # If it was never predicted, but it did exist as a real value, then assume 0% confidence when it does get predicted # noqa
+ bucket_accuracy[bucket]=0
+
+ forbucketinrange(len(self.buckets)):
+ ifbucketnotinbucket_accuracy:
+ # If it wasn't seen either in the real values or in the predicted values, assume average confidence (maybe should be 0 instead ?) # noqa
+ bucket_accuracy[bucket]=overall_accuracy
+
+ accuracy_histogram={
+ 'buckets':list(bucket_accuracy.keys()),
+ 'accuracies':list(bucket_accuracy.values()),
+ 'is_classification':is_classification,
+ 'is_numerical':is_numerical
+ }
+
+ labels=list(set([*self.real_values_bucketized,*self.normal_predictions_bucketized]))
+ matrix=confusion_matrix(self.real_values_bucketized,self.normal_predictions_bucketized,labels=labels)
+ matrix=[[int(y)ifstr(y)!='nan'else0foryinx]forxinmatrix]
+
+ target_bucket=self.buckets[self.target]
+ bucket_values=[target_bucket[i]ifi<len(target_bucket)elseNoneforiinlabels]
+
+ cm={
+ 'matrix':matrix,
+ 'predicted':bucket_values,
+ 'real':bucket_values
+ }
+
+ accuracy_samples=None
+ iflen(self.numerical_samples_arr)>0:
+ nr_samples=min(400,len(self.numerical_samples_arr))
+ sampled_numerical_samples_arr=random.sample(self.numerical_samples_arr,nr_samples)
+ accuracy_samples={
+ 'y':[x[0]forxinsampled_numerical_samples_arr],
+ 'x':[x[1]forxinsampled_numerical_samples_arr]
+ }
+
+ returnoverall_accuracy,accuracy_histogram,cm,accuracy_samples
+
+
+defget_value_bucket(value,buckets,target_dtype):
+ """
+ :return: The bucket in the `histogram` in which our `value` falls
+ """
+ ifbucketsisNone:
+ returnNone
+
+ iftarget_dtypein(dtype.binary,dtype.categorical):
+ ifvalueinbuckets:
+ bucket=buckets.index(value)
+ else:
+ bucket=len(buckets)# for null values
+
+ eliftarget_dtypein(dtype.integer,dtype.float,dtype.quantity):
+ bucket=closest(buckets,value)
+ else:
+ bucket=len(buckets)# for null values
+
+ returnbucket
+
+
+defclosest(arr,value):
+ """
+ :return: The index of the member of `arr` which is closest to `value`
+ """
+ ifvalueisNone:
+ return-1
+
+ fori,eleinenumerate(arr):
+ value=float(str(value).replace(',','.'))
+ ifele>value:
+ returni-1
+
+ returnlen(arr)-1
+
[docs]classGlobalFeatureImportance(BaseAnalysisBlock):
+ """
+ Analysis block that estimates column importance with a variant of the LOCO (leave-one-covariate-out) algorithm.
+
+ Roughly speaking, the procedure:
+ - iterates over all input columns
+ - if the input column is optional, then make a predict with its values set to None
+ - compare this accuracy with the accuracy obtained using all data
+ - all accuracy differences are passed through a softmax and reported as estimated column importance scores
+
+ Note that, crucially, this method does not refit the predictor at any point.
+
+ Reference:
+ https://compstat-lmu.github.io/iml_methods_limitations/pfi.html
+ """
+ def__init__(self,disable_column_importance):
+ super().__init__()
+ self.disable_column_importance=disable_column_importance
+
+
[docs]defcleaner(
+ data:pd.DataFrame,
+ dtype_dict:Dict[str,str],
+ pct_invalid:float,
+ identifiers:Dict[str,str],
+ target:str,
+ mode:str,
+ timeseries_settings:TimeseriesSettings,
+ anomaly_detection:bool,
+ custom_cleaning_functions:Dict[str,str]={}
+)->pd.DataFrame:
+ """
+ The cleaner is a function which takes in the raw data, plus additional information about it's types and about the problem. Based on this it generates a "clean" representation of the data, where each column has an ideal standardized type and all malformed or otherwise missing or invalid elements are turned into ``None``
+
+ :param data: The raw data
+ :param dtype_dict: Type information for each column
+ :param pct_invalid: How much of each column can be invalid
+ :param identifiers: A dict containing all identifier typed columns
+ :param target: The target columns
+ :param mode: Can be "predict" or "train"
+ :param timeseries_settings: Timeseries related settings, only relevant for timeseries predictors, otherwise can be the default object
+ :param anomaly_detection: Are we detecting anomalies with this predictor?
+
+ :returns: The cleaned data
+ """# noqa
+
+ data=_remove_columns(data,identifiers,target,mode,timeseries_settings,
+ anomaly_detection,dtype_dict)
+
+ forcolin_get_columns_to_clean(data,dtype_dict,mode,target):
+
+ # Get and apply a cleaning function for each data type
+ # If you want to customize the cleaner, it's likely you can to modify ``get_cleaning_func``
+ data[col]=data[col].apply(get_cleaning_func(dtype_dict[col],custom_cleaning_functions))
+
+ returndata
+
+
+def_check_if_invalid(new_data:pd.Series,pct_invalid:float,col_name:str):
+ """
+ Checks how many invalid data points there are. Invalid data points are flagged as "Nones" from the cleaning processs (see data/cleaner.py for default).
+ If there are too many invalid data points (specified by `pct_invalid`), then an error message will pop up. This is used as a safeguard for very messy data.
+
+ :param new_data: data to check for invalid values.
+ :param pct_invalid: maximum percentage of invalid values. If this threshold is surpassed, an exception is raised.
+ :param col_name: name of the column to analyze.
+
+ """# noqa
+
+ chk_invalid=(
+ 100
+ *(len(new_data)-len([xforxinnew_dataifxisnotNone]))
+ /len(new_data)
+ )
+
+ ifchk_invalid>pct_invalid:
+ err=f'Too many ({chk_invalid}%) invalid values in column {col_name}nam'
+ log.error(err)
+ raiseException(err)
+
+
+defget_cleaning_func(data_dtype:dtype,custom_cleaning_functions:Dict[str,str])->Callable:
+ """
+ For the provided data type, provide the appropriate cleaning function. Below are the defaults, users can either override this function OR impose a custom block.
+
+ :param data_dtype: The data-type (inferred from a column) as prescribed from ``api.dtype``
+
+ :returns: The appropriate function that will pre-process (clean) data of specified dtype.
+ """# noqa
+ ifdata_dtypeincustom_cleaning_functions:
+ clean_func=eval(custom_cleaning_functions[data_dtype])
+
+ elifdata_dtypein(dtype.date,dtype.datetime):
+ clean_func=_standardize_datetime
+
+ elifdata_dtypein(dtype.float,dtype.tsarray):
+ clean_func=_clean_float
+
+ elifdata_dtypein(dtype.integer):
+ clean_func=_clean_int
+
+ elifdata_dtypein(dtype.array):
+ clean_func=_standardize_array
+
+ elifdata_dtypein(dtype.tags):
+ clean_func=_tags_to_tuples
+
+ elifdata_dtypein(dtype.quantity):
+ clean_func=_clean_quantity
+
+ elifdata_dtypein(
+ dtype.short_text,
+ dtype.rich_text,
+ dtype.categorical,
+ dtype.binary,
+ dtype.audio,
+ dtype.image,
+ dtype.video
+ ):
+ clean_func=_clean_text
+
+ else:
+ raiseValueError(f"{data_dtype} is not supported. Check lightwood.api.dtype")
+
+ returnclean_func
+
+
+# ------------------------- #
+# Temporal Cleaning
+# ------------------------- #
+
+
+def_standardize_datetime(element:object)->Optional[float]:
+ """
+ Parses an expected date-time element. Intakes an element that can in theory be anything.
+ """
+ try:
+ date=parse_dt(str(element))
+ exceptException:
+ try:
+ date=datetime.datetime.utcfromtimestamp(element)
+ exceptException:
+ returnNone
+
+ returndate.timestamp()
+
+
+# ------------------------- #
+# Tags/Sequences
+# ------------------------- #
+
+# TODO Make it split on something other than commas
+def_tags_to_tuples(tags_str:str)->Tuple[str]:
+ """
+ Converts comma-separated values into a tuple to preserve a sequence/array.
+
+ Ex:
+ >> x = 'apples, oranges, bananas'
+ >> _tags_to_tuples(x)
+ >> ('apples', 'oranges', 'bananas')
+ """
+ try:
+ returntuple([x.strip()forxintags_str.split(",")])
+ exceptException:
+ returntuple()
+
+
+def_standardize_array(element:object)->Optional[Union[List[float],float]]:
+ """
+ Given an array of numbers in the form ``[1, 2, 3, 4]``, converts into a numerical sequence.
+
+ :param element: An array-like element in a sequence
+ :returns: standardized array OR scalar number IF edge case
+
+ Ex of edge case:
+ >> element = [1]
+ >> _standardize_array(element)
+ >> 1
+ """
+ try:
+ element=str(element)
+ element=element.rstrip("]").lstrip("[")
+ element=element.rstrip(" ").lstrip(" ")
+ element=element.replace(", "," ").replace(","," ")
+ # Handles cases where arrays are numbers
+ if" "notinelement:
+ element=_clean_float(element)
+ else:
+ element=[float(x)forxinelement.split(" ")]
+ exceptException:
+ pass
+
+ returnelement
+
+
+# ------------------------- #
+# Integers/Floats/Quantities
+# ------------------------- #
+
+def_clean_float(element:object)->Optional[float]:
+ """
+ Given an element, converts it into float numeric format. If element is NaN, or inf, then returns None.
+ """
+ try:
+ cleaned_float=text.clean_float(element)
+ ifis_nan_numeric(cleaned_float):
+ returnNone
+ returncleaned_float
+ exceptException:
+ returnNone
+
+
+def_clean_int(element:object)->Optional[int]:
+ """
+ Given an element, converts it into integer numeric format. If element is NaN, or inf, then returns None.
+ """
+ element=_clean_float(element)
+ ifelementisnotNone:
+ element=int(element)
+ returnelement
+
+
+def_clean_quantity(element:object)->Optional[float]:
+ """
+ Given a quantity, clean and convert it into float numeric format. If element is NaN, or inf, then returns None.
+ """
+ element=float(re.sub("[^0-9.,]","",str(element)).replace(",","."))
+ return_clean_float(element)
+
+
+# ------------------------- #
+# Text
+# ------------------------- #
+def_clean_text(element:object)->str:
+ returnstr(element)
+
+
+# ------------------------- #
+# Other helpers
+# ------------------------- #
+def_rm_rows_w_empty_targets(df:pd.DataFrame,target:str)->pd.DataFrame:
+ """
+ Drop any rows that have targets as unknown. Targets are necessary to train.
+
+ :param df: The input dataframe including the target value
+ :param target: the column name that is the output target variable
+
+ :returns: Data with any target smissing
+ """
+ # Compare length before/after
+ len_before=len(df)
+
+ # Use Pandas ```dropna``` to omit any rows with missing values for targets; these cannot be trained
+ df=df.dropna(subset=[target])
+
+ # Compare length with after
+ len_after=len(df)
+ nr_removed=len_before-len_after
+
+ ifnr_removed!=0:
+ log.warning(
+ f"Removed {nr_removed} rows because target was missing. Training on these rows is not possible."
+ )# noqa
+
+ returndf
+
+
+def_remove_columns(data:pd.DataFrame,identifiers:Dict[str,object],target:str,
+ mode:str,timeseries_settings:TimeseriesSettings,anomaly_detection:bool,
+ dtype_dict:Dict[str,dtype])->pd.DataFrame:
+ """
+ Drop columns we don't want to use in order to train or predict
+
+ :param data: The raw data
+ :param dtype_dict: Type information for each column
+ :param identifiers: A dict containing all identifier typed columns
+ :param target: The target columns
+ :param mode: Can be "predict" or "train"
+ :param timeseries_settings: Timeseries related settings, only relevant for timeseries predictors, otherwise can be the default object
+ :param anomaly_detection: Are we detecting anomalies with this predictor?
+
+ :returns: A (new) dataframe without the dropped columns
+ """# noqa
+ data=deepcopy(data)
+ to_drop=[*[xforxinidentifiers.keys()ifx!=target],
+ *[xforxindata.columnsifxindtype_dictanddtype_dict[x]==dtype.invalid]]
+ exceptions=["__mdb_make_predictions"]
+ to_drop=[xforxinto_dropifxindata.columns]
+ data=data.drop(columns=to_drop)
+
+ ifmode=="train":
+ data=_rm_rows_w_empty_targets(data,target)
+ ifmode=="predict":
+ if(
+ targetindata.columns
+ andnottimeseries_settings.use_previous_target
+ andnotanomaly_detection
+ ):
+ data=data.drop(columns=[target])
+
+ # Drop extra columns
+ fornameinlist(data.columns):
+ ifnamenotindtype_dictandnamenotinexceptions:
+ data=data.drop(columns=[name])
+
+ returndata
+
+
+def_get_columns_to_clean(data:pd.DataFrame,dtype_dict:Dict[str,dtype],mode:str,target:str)->List[str]:
+ """
+ :param data: The raw data
+ :param dtype_dict: Type information for each column
+ :param target: The target columns
+ :param mode: Can be "predict" or "train"
+
+ :returns: A list of columns that we want to clean
+ """# noqa
+
+ cleanable_columns=[]
+ forname,_indtype_dict.items():
+ ifmode=="predict":
+ ifname==target:
+ continue
+ ifnameindata.columns:
+ cleanable_columns.append(name)
+ returncleanable_columns
+
[docs]classEncodedDs(Dataset):
+ def__init__(self,encoders:List[BaseEncoder],data_frame:pd.DataFrame,target:str)->None:
+ """
+ Create a Lightwood datasource from a data frame and some encoders. This class inherits from `torch.utils.data.Dataset`.
+
+ Note: normal behavior is to cache encoded representations to avoid duplicated computations. If you want an option to disable, this please open an issue.
+
+ :param encoders: list of Lightwood encoders used to encode the data per each column.
+ :param data_frame: original dataframe.
+ :param target: name of the target column to predict.
+ """# noqa
+ self.data_frame=data_frame
+ self.encoders=encoders
+ self.target=target
+ self.cache_encoded=True
+ self.cache=[None]*len(self.data_frame)
+ self.encoder_spans={}
+ self.input_length=0
+
+ # save encoder span, has to use same iterator as in __getitem__ for correct indeces
+ forcolinself.data_frame:
+ ifcol!=self.targetandself.encoders.get(col,False):
+ self.encoder_spans[col]=(self.input_length,
+ self.input_length+self.encoders[col].output_size)
+ self.input_length+=self.encoders[col].output_size
+
+ def__len__(self):
+ """
+ The length of an `EncodedDs` datasource equals the amount of rows of the original dataframe.
+
+ :return: length of the `EncodedDs`
+ """
+ returnint(self.data_frame.shape[0])
+
+ def__getitem__(self,idx:int)->Tuple[torch.Tensor,torch.Tensor]:
+ """
+ The getter yields a tuple (X, y), where:
+ - `X `is a concatenation of all encoded representations of the row
+ - `y` is the encoded target
+
+ :param idx: index of the row to access.
+
+ :return: tuple (X, y) with encoded data.
+
+ """# noqa
+ ifself.cache_encoded:
+ ifself.cache[idx]isnotNone:
+ returnself.cache[idx]
+
+ X=torch.FloatTensor()
+ Y=torch.FloatTensor()
+ forcolinself.data_frame:
+ ifself.encoders.get(col,None):
+ kwargs={}
+ if'dependency_data'ininspect.signature(self.encoders[col].encode).parameters:
+ kwargs['dependency_data']={dep:[self.data_frame.iloc[idx][dep]]
+ fordepinself.encoders[col].dependencies}
+ ifhasattr(self.encoders[col],'data_window'):
+ cols=[self.target]+[f'{self.target}_timestep_{i}'
+ foriinrange(1,self.encoders[col].data_window)]
+ else:
+ cols=[col]
+
+ data=self.data_frame[cols].iloc[idx].tolist()
+ encoded_tensor=self.encoders[col].encode(data,**kwargs)[0]
+ ifcol!=self.target:
+ X=torch.cat([X,encoded_tensor])
+ else:
+ Y=encoded_tensor
+
+ ifself.cache_encoded:
+ self.cache[idx]=(X,Y)
+
+ returnX,Y
+
+
[docs]defget_column_original_data(self,column_name:str)->pd.Series:
+ """
+ Gets the original data for any given column of the `EncodedDs`.
+
+ :param column_name: name of the column.
+ :return: A `pd.Series` with the original data stored in the `column_name` column.
+ """
+ returnself.data_frame[column_name]
+
+
[docs]defget_encoded_column_data(self,column_name:str)->torch.Tensor:
+ """
+ Gets the encoded data for any given column of the `EncodedDs`.
+
+ :param column_name: name of the column.
+ :return: A `torch.Tensor` with the encoded data of the `column_name` column.
+ """
+ kwargs={}
+ if'dependency_data'ininspect.signature(self.encoders[column_name].encode).parameters:
+ deps=[depfordepinself.encoders[column_name].dependenciesifdepinself.data_frame.columns]
+ kwargs['dependency_data']={dep:self.data_frame[dep].tolist()fordepindeps}
+ encoded_data=self.encoders[column_name].encode(self.data_frame[column_name],**kwargs)
+
+ ifnotisinstance(encoded_data,torch.Tensor):
+ raiseException(
+ f'The encoder: {self.encoders[column_name]} for column: {column_name} does not return a Tensor !')
+ returnencoded_data
+
+
[docs]defget_encoded_data(self,include_target=True)->torch.Tensor:
+ """
+ Gets all encoded data.
+
+ :param include_target: whether to include the target column in the output or not.
+ :return: A `torch.Tensor` with the encoded dataframe.
+ """
+ encoded_dfs=[]
+ forcolinself.data_frame.columns:
+ if(include_targetorcol!=self.target)andself.encoders.get(col,False):
+ encoded_dfs.append(self.get_encoded_column_data(col))
+
+ returntorch.cat(encoded_dfs,1)
[docs]classConcatedEncodedDs(EncodedDs):
+ """
+ `ConcatedEncodedDs` abstracts over multiple encoded datasources (`EncodedDs`) as if they were a single entity.
+ """# noqa
+ def__init__(self,encoded_ds_arr:List[EncodedDs])->None:
+ # @TODO: missing super() call here?
+ self.encoded_ds_arr=encoded_ds_arr
+ self.encoded_ds_lenghts=[len(x)forxinself.encoded_ds_arr]
+ self.encoders=self.encoded_ds_arr[0].encoders
+ self.encoder_spans=self.encoded_ds_arr[0].encoder_spans
+ self.target=self.encoded_ds_arr[0].target
+
+ def__len__(self):
+ """
+ See `lightwood.data.encoded_ds.EncodedDs.__len__()`.
+ """
+ # @TODO: behavior here is not intuitive
+ returnmax(0,np.sum(self.encoded_ds_lenghts)-2)
+
+ def__getitem__(self,idx:int)->Tuple[torch.Tensor,torch.Tensor]:
+ """
+ See `lightwood.data.encoded_ds.EncodedDs.__getitem__()`.
+ """
+ fords_idx,lengthinenumerate(self.encoded_ds_lenghts):
+ ifidx-length<0:
+ returnself.encoded_ds_arr[ds_idx][idx]
+ else:
+ idx-=length
+ raiseStopIteration()
+
+ @property
+ defdata_frame(self)->pd.DataFrame:
+ """
+ Property that concatenates all underlying `EncodedDs`'s dataframes and returns them.
+
+ Note: be careful to not modify a `ConcatedEncodedDs`, as you can see in the source, it will not have an effect.
+
+ :return: Dataframe with all original data.
+ """# noqa
+ returnpd.concat([x.data_frameforxinself.encoded_ds_arr])
+
+
[docs]defsplitter(
+ data:pd.DataFrame,
+ tss:TimeseriesSettings,
+ dtype_dict:Dict[str,str],
+ seed:int,
+ pct_train:float,
+ pct_dev:float,
+ pct_test:float,
+ target:str
+)->Dict[str,pd.DataFrame]:
+ """
+ Splits data into training, dev and testing datasets.
+
+ The proportion of data for each split must be specified (JSON-AI sets defaults to 80/10/10). First, rows in the dataset are shuffled randomly. Then a simple split is done. If a target value is provided and is of data type categorical/binary, then the splits will be stratified to maintain the representative populations of each class.
+
+ :param data: Input dataset to be split
+ :param tss: time-series specific details for splitting
+ :param dtype_dict: Dictionary with the data type of all columns
+ :param seed: Random state for pandas data-frame shuffling
+ :param pct_train: training fraction of data; must be less than 1
+ :param pct_dev: dev fraction of data; must be less than 1
+ :param pct_test: testing fraction of data; must be less than 1
+ :param target: Name of the target column; if specified, data will be stratified on this column
+
+ :returns: A dictionary containing the keys train, test and dev with their respective data frames, as well as the "stratified_on" key indicating which columns the data was stratified on (None if it wasn't stratified on anything)
+ """# noqa
+ pct_sum=pct_train+pct_dev+pct_test
+ ifnot(np.isclose(pct_sum,1,atol=0.001)andnp.less(pct_sum,1+1e-5)):
+ raiseException(f'The train, dev and test percentage of the data needs to sum up to 1 (got {pct_sum})')
+
+ # Shuffle the data
+ np.random.seed(seed)
+ ifnottss.is_timeseries:
+ data=data.sample(frac=1,random_state=seed).reset_index(drop=True)
+
+ # Check if stratification should be done
+ stratify_on=[]
+ iftargetisnotNone:
+ ifdtype_dict[target]in(dtype.categorical,dtype.binary)andnottss.is_timeseries:
+ stratify_on=[target]
+ iftss.is_timeseriesandisinstance(tss.group_by,list):
+ stratify_on=tss.group_by
+
+ # Split the data
+ ifstratify_on:
+ reshuffle=nottss.is_timeseries
+ train,dev,test=stratify(data,pct_train,pct_dev,pct_test,stratify_on,seed,reshuffle)
+ else:
+ train,dev,test=simple_split(data,pct_train,pct_dev,pct_test)
+
+ return{"train":train,"test":test,"dev":dev,"stratified_on":stratify_on}
+
+
+defsimple_split(data:pd.DataFrame,
+ pct_train:float,
+ pct_dev:float,
+ pct_test:float)->List[pd.DataFrame]:
+ """
+ Simple split method to separate data into training, dev and testing datasets.
+
+ :param data: Input dataset to be split
+ :param pct_train: training fraction of data; must be less than 1
+ :param pct_dev: dev fraction of data; must be less than 1
+ :param pct_test: testing fraction of data; must be less than 1
+
+ :returns Train, dev, and test dataframes
+ """
+ train_cutoff=round(data.shape[0]*pct_train)
+ dev_cutoff=round(data.shape[0]*pct_dev)+train_cutoff
+ test_cutoff=round(data.shape[0]*pct_test)+dev_cutoff
+
+ train=data[:train_cutoff]
+ dev=data[train_cutoff:dev_cutoff]
+ test=data[dev_cutoff:test_cutoff]
+
+ return[train,dev,test]
+
+
+defstratify(data:pd.DataFrame,
+ pct_train:float,
+ pct_dev:float,
+ pct_test:float,
+ stratify_on:List[str],
+ seed:int,
+ reshuffle:bool)->List[pd.DataFrame]:
+ """
+ Stratified data splitter.
+
+ The `stratify_on` columns yield a cartesian product by which every different subset will be stratified
+ independently from the others, and recombined at the end in fractions specified by `pcts`.
+
+ For grouped time series tasks, stratification is done based on the group-by columns.
+
+ :param data: dataframe with data to be split
+ :param pct_train: fraction of data to use for training split
+ :param pct_dev: fraction of data to use for dev split (used internally by mixers)
+ :param pct_test: fraction of data to use for test split (used post-training for analysis)
+ :param stratify_on: Columns to consider when stratifying
+ :param seed: Random state for pandas data-frame shuffling
+ :param reshuffle: specify if reshuffling should be done post-split
+
+ :returns Stratified train, dev, test dataframes
+ """# noqa
+
+ train_st=pd.DataFrame(columns=data.columns)
+ dev_st=pd.DataFrame(columns=data.columns)
+ test_st=pd.DataFrame(columns=data.columns)
+
+ all_group_combinations=list(product(*[data[col].unique()forcolinstratify_on]))
+ forgroupinall_group_combinations:
+ df=data
+ foridx,colinenumerate(stratify_on):
+ df=df[df[col]==group[idx]]
+
+ train_cutoff=round(df.shape[0]*pct_train)
+ dev_cutoff=round(df.shape[0]*pct_dev)+train_cutoff
+ test_cutoff=round(df.shape[0]*pct_test)+dev_cutoff
+
+ train_st=train_st.append(df[:train_cutoff])
+ dev_st=dev_st.append(df[train_cutoff:dev_cutoff])
+ test_st=test_st.append(df[dev_cutoff:test_cutoff])
+
+ ifreshuffle:
+ train_st,dev_st,test_st=[df.sample(frac=1,random_state=seed).reset_index(drop=True)
+ fordfin[train_st,dev_st,test_st]]
+
+ # check that stratified lengths conform to expected percentages
+ ifnotnp.isclose(len(train_st)/len(data),pct_train,atol=0.01)or \
+ notnp.isclose(len(dev_st)/len(data),pct_dev,atol=0.01)or \
+ notnp.isclose(len(test_st)/len(data),pct_test,atol=0.01):
+ log.info("Could not stratify; reverting to simple split")
+ train_st,dev_st,test_st=simple_split(data,pct_train,pct_dev,pct_test)
+
+ return[train_st,dev_st,test_st]
+
[docs]deftimeseries_analyzer(data:pd.DataFrame,dtype_dict:Dict[str,str],
+ timeseries_settings:TimeseriesSettings,target:str)->Dict:
+ """
+ This module analyzes (pre-processed) time series data and stores a few useful insights used in the rest of Lightwood's pipeline.
+
+ :param data: dataframe with time series dataset.
+ :param dtype_dict: dictionary with inferred types for every column.
+ :param timeseries_settings: A `TimeseriesSettings` object. For more details, check `lightwood.types.TimeseriesSettings`.
+ :param target: name of the target column.
+
+ The following things are extracted from each time series inside the dataset:
+ - group_combinations: all observed combinations of values for the set of `group_by` columns. The length of this list determines how many time series are in the data.
+ - deltas: inferred sampling interval
+ - ts_naive_residuals: Residuals obtained from the data by a naive forecaster that repeats the last-seen value.
+ - ts_naive_mae: Mean residual value obtained from the data by a naive forecaster that repeats the last-seen value.
+ - target_normalizers: objects that may normalize the data within any given time series for effective learning. See `lightwood.encoder.time_series.helpers.common` for available choices.
+
+ :return: Dictionary with the aforementioned insights and the `TimeseriesSettings` object for future references.
+ """# noqa
+ info={
+ 'original_type':dtype_dict[target],
+ 'data':data[target].values
+ }
+ iftimeseries_settings.group_byisnotNone:
+ info['group_info']={gcol:data[gcol].tolist()forgcolintimeseries_settings.group_by}# group col values
+ else:
+ info['group_info']={}
+
+ # @TODO: maybe normalizers should fit using only the training subsets??
+ new_data=generate_target_group_normalizers(info)
+
+ ifdtype_dict[target]in(dtype.integer,dtype.float,dtype.tsarray):
+ naive_forecast_residuals,scale_factor=get_grouped_naive_residuals(info,new_data['group_combinations'])
+ else:
+ naive_forecast_residuals,scale_factor={},{}
+
+ deltas=get_delta(data[timeseries_settings.order_by],
+ info,
+ new_data['group_combinations'],
+ timeseries_settings.order_by)
+
+ return{'target_normalizers':new_data['target_normalizers'],
+ 'deltas':deltas,
+ 'tss':timeseries_settings,
+ 'group_combinations':new_data['group_combinations'],
+ 'ts_naive_residuals':naive_forecast_residuals,
+ 'ts_naive_mae':scale_factor
+ }
+
+
+defget_delta(df:pd.DataFrame,ts_info:dict,group_combinations:list,order_cols:list)->Dict[str,Dict]:
+ """
+ Infer the sampling interval of each time series, by picking the most popular time interval observed in the training data.
+
+ :param df: Dataframe with time series data.
+ :param ts_info: Dictionary used internally by `timeseries_analyzer`. Contains group-wise series information, among other things.
+ :param group_combinations: all tuples with distinct values for `TimeseriesSettings.group_by` columns, defining all available time series.
+ :param order_cols: all columns specified in `TimeseriesSettings.order_by`.
+
+ :return:
+ Dictionary with group combination tuples as keys. Values are dictionaries with the inferred delta for each series, for each `order_col`.
+ """# noqa
+ deltas={"__default":{}}
+
+ # get default delta for all data
+ forcolinorder_cols:
+ series=pd.Series([x[-1]forxindf[col]])
+ rolling_diff=series.rolling(window=2).apply(lambdax:x.iloc[1]-x.iloc[0])
+ delta=rolling_diff.value_counts(ascending=False).keys()[0]# pick most popular
+ deltas["__default"][col]=delta
+
+ # get group-wise deltas (if applicable)
+ ifts_info.get('group_info',False):
+ original_data=ts_info['data']
+ forgroupingroup_combinations:
+ ifgroup!="__default":
+ deltas[group]={}
+ forcolinorder_cols:
+ ts_info['data']=pd.Series([x[-1]forxindf[col]])
+ _,subset=get_group_matches(ts_info,group)
+ ifsubset.size>1:
+ rolling_diff=pd.Series(
+ subset.squeeze()).rolling(
+ window=2).apply(
+ lambdax:x.iloc[1]-x.iloc[0])
+ delta=rolling_diff.value_counts(ascending=False).keys()[0]
+ deltas[group][col]=delta
+ ts_info['data']=original_data
+
+ returndeltas
+
+
+defget_naive_residuals(target_data:pd.DataFrame,m:int=1)->Tuple[List,float]:
+ """
+ Computes forecasting residuals for the naive method (forecasts for time `t` is the value observed at `t-1`).
+ Useful for computing MASE forecasting error.
+
+ Note: method assumes predictions are all for the same group combination. For a dataframe that contains multiple
+ series, use `get_grouped_naive_resiudals`.
+
+ :param target_data: observed time series targets
+ :param m: season length. the naive forecasts will be the m-th previously seen value for each series
+
+ :return: (list of naive residuals, average residual value)
+ """# noqa
+ residuals=target_data.rolling(window=m+1).apply(lambdax:abs(x.iloc[m]-x.iloc[0]))[m:].values.flatten()
+ scale_factor=np.average(residuals)
+ returnresiduals.tolist(),scale_factor
+
+
+defget_grouped_naive_residuals(info:Dict,group_combinations:List)->Tuple[Dict,Dict]:
+ """
+ Wraps `get_naive_residuals` for a dataframe with multiple co-existing time series.
+ """# noqa
+ group_residuals={}
+ group_scale_factors={}
+ forgroupingroup_combinations:
+ idxs,subset=get_group_matches(info,group)
+ residuals,scale_factor=get_naive_residuals(pd.DataFrame(subset))# @TODO: pass m once we handle seasonality
+ group_residuals[group]=residuals
+ group_scale_factors[group]=scale_factor
+ returngroup_residuals,group_scale_factors
+
[docs]deftransform_timeseries(
+ data:pd.DataFrame,dtype_dict:Dict[str,str],
+ timeseries_settings:TimeseriesSettings,target:str,mode:str)->pd.DataFrame:
+ """
+ Block that transforms the dataframe of a time series task to a convenient format for use in posterior phases like model training.
+
+ The main transformations performed by this block are:
+ - Type casting (e.g. to numerical for `order_by` columns).
+ - Windowing functions for historical context based on `TimeseriesSettings.window` parameter.
+ - Explicitly add target columns according to the `TimeseriesSettings.nr_predictions` parameter.
+ - Flag all rows that are "predictable" based on all `TimeseriesSettings`.
+ - Plus, handle all logic for the streaming use case (where forecasts are only emitted for the last observed data point).
+
+ :param data: Dataframe with data to transform.
+ :param dtype_dict: Dictionary with the types of each column.
+ :param timeseries_settings: A `TimeseriesSettings` object.
+ :param target: The name of the target column to forecast.
+ :param mode: Either "train" or "predict", depending on what phase is calling this procedure.
+
+ :return: A dataframe with all the transformations applied.
+ """# noqa
+
+ tss=timeseries_settings
+ original_df=copy.deepcopy(data)
+ gb_arr=tss.group_byiftss.group_byisnotNoneelse[]
+ ob_arr=tss.order_by
+ window=tss.window
+
+ if'__mdb_make_predictions'inoriginal_df.columns:
+ index=original_df[original_df['__mdb_make_predictions'].map(
+ {'True':True,'False':False,True:True,False:False}).isin([True])]
+ infer_mode=index.shape[0]==0# condition to trigger: __mdb_make_predictions is set to False everywhere
+ # @TODO: dont drop and use instead of original_index?
+ original_df=original_df.reset_index(drop=True)ifinfer_modeelseoriginal_df
+ else:
+ infer_mode=False
+
+ original_index_list=[]
+ idx=0
+ forrowinoriginal_df.itertuples():
+ if_make_pred(row)orinfer_mode:
+ original_index_list.append(idx)
+ idx+=1
+ else:
+ original_index_list.append(None)
+
+ original_df['original_index']=original_index_list
+
+ secondary_type_dict={}
+ forcolinob_arr:
+ ifdtype_dict[col]in(dtype.date,dtype.integer,dtype.float):
+ secondary_type_dict[col]=dtype_dict[col]
+
+ # Convert order_by columns to numbers (note, rows are references to mutable rows in `original_df`)
+ for_,rowinoriginal_df.iterrows():
+ forcolinob_arr:
+ # @TODO: Remove if the TS encoder can handle `None`
+ ifrow[col]isNoneorpd.isna(row[col]):
+ row[col]=0.0
+ else:
+ ifdtype_dict[col]==dtype.date:
+ try:
+ row[col]=dateutil.parser.parse(
+ row[col],
+ **{}
+ )
+ except(TypeError,ValueError):
+ pass
+
+ ifisinstance(row[col],datetime.datetime):
+ row[col]=row[col].timestamp()
+
+ try:
+ row[col]=float(row[col])
+ exceptValueError:
+ raiseValueError(f'Failed to order based on column: "{col}" due to faulty value: {row[col]}')
+
+ forobyintss.order_by:
+ original_df[f'__mdb_original_{oby}']=original_df[oby]
+
+ group_lengths=[]
+ iflen(gb_arr)>0:
+ df_arr=[]
+ for_,dfinoriginal_df.groupby(gb_arr):
+ df_arr.append(df.sort_values(by=ob_arr))
+ group_lengths.append(len(df))
+ else:
+ df_arr=[original_df]
+ group_lengths.append(len(original_df))
+
+ n_groups=len(df_arr)
+ last_index=original_df['original_index'].max()
+ fori,subdfinenumerate(df_arr):
+ if'__mdb_make_predictions'insubdf.columnsandmode=='predict':
+ ifinfer_mode:
+ df_arr[i]=_ts_infer_next_row(subdf,ob_arr,last_index)
+ last_index+=1
+
+ iflen(original_df)>500:
+ # @TODO: restore possibility to override this with args
+ nr_procs=get_nr_procs(original_df)
+ log.info(f'Using {nr_procs} processes to reshape.')
+ pool=mp.Pool(processes=nr_procs)
+ # Make type `object` so that dataframe cells can be python lists
+ df_arr=pool.map(partial(_ts_to_obj,historical_columns=ob_arr+tss.historical_columns),df_arr)
+ df_arr=pool.map(partial(_ts_order_col_to_cell_lists,
+ order_cols=ob_arr+tss.historical_columns),df_arr)
+ df_arr=pool.map(
+ partial(
+ _ts_add_previous_rows,order_cols=ob_arr+tss.historical_columns,window=window),
+ df_arr)
+
+ df_arr=pool.map(partial(_ts_add_future_target,target=target,nr_predictions=tss.nr_predictions,
+ data_dtype=tss.target_type,mode=mode),
+ df_arr)
+
+ iftss.use_previous_target:
+ df_arr=pool.map(
+ partial(_ts_add_previous_target,target=target,window=tss.window),
+ df_arr)
+ pool.close()
+ pool.join()
+ else:
+ foriinrange(n_groups):
+ df_arr[i]=_ts_to_obj(df_arr[i],historical_columns=ob_arr+tss.historical_columns)
+ df_arr[i]=_ts_order_col_to_cell_lists(df_arr[i],order_cols=ob_arr+tss.historical_columns)
+ df_arr[i]=_ts_add_previous_rows(df_arr[i],
+ order_cols=ob_arr+tss.historical_columns,window=window)
+ df_arr[i]=_ts_add_future_target(df_arr[i],target=target,nr_predictions=tss.nr_predictions,
+ data_dtype=tss.target_type,mode=mode)
+ iftss.use_previous_target:
+ df_arr[i]=_ts_add_previous_target(df_arr[i],target=target,window=tss.window)
+
+ combined_df=pd.concat(df_arr)
+
+ if'__mdb_make_predictions'incombined_df.columns:
+ combined_df=pd.DataFrame(combined_df[combined_df['__mdb_make_predictions'].astype(bool).isin([True])])
+ delcombined_df['__mdb_make_predictions']
+
+ ifnotinfer_modeandany([i<tss.windowforiingroup_lengths]):
+ iftss.allow_incomplete_history:
+ log.warning("Forecasting with incomplete historical context, predictions might be subpar")
+ else:
+ raiseException(f'Not enough historical context to make a timeseries prediction. Please provide a number of rows greater or equal to the window size. If you can\'t get enough rows, consider lowering your window size. If you want to force timeseries predictions lacking historical context please set the `allow_incomplete_history` timeseries setting to `True`, but this might lead to subpar predictions.')# noqa
+
+ df_gb_map=None
+ ifn_groups>1:
+ df_gb_list=list(combined_df.groupby(tss.group_by))
+ df_gb_map={}
+ forgb,dfindf_gb_list:
+ df_gb_map['_'+'_'.join(gb)]=df
+
+ timeseries_row_mapping={}
+ idx=0
+
+ ifdf_gb_mapisNone:
+ for_,rowincombined_df.iterrows():
+ ifnotinfer_mode:
+ timeseries_row_mapping[idx]=int(
+ row['original_index'])ifrow['original_index']isnotNoneandnotnp.isnan(
+ row['original_index'])elseNone
+ else:
+ timeseries_row_mapping[idx]=idx
+ idx+=1
+ else:
+ forgbindf_gb_map:
+ for_,rowindf_gb_map[gb].iterrows():
+ ifnotinfer_mode:
+ timeseries_row_mapping[idx]=int(
+ row['original_index'])ifrow['original_index']isnotNoneandnotnp.isnan(
+ row['original_index'])elseNone
+ else:
+ timeseries_row_mapping[idx]=idx
+
+ idx+=1
+
+ delcombined_df['original_index']
+
+ # return combined_df, secondary_type_dict, timeseries_row_mapping, df_gb_map
+ returncombined_df
+
+
+def_ts_infer_next_row(df:pd.DataFrame,ob:str,last_index:int)->pd.DataFrame:
+ """
+ Adds an inferred next row for streaming mode purposes.
+
+ :param df: dataframe from which next row is inferred.
+ :param ob: `order_by` column.
+ :param last_index: index number of the latest row in `df`.
+
+ :return: Modified `df` with the inferred row appended to it.
+ """
+ last_row=df.iloc[[-1]].copy()
+ ifdf.shape[0]>1:
+ butlast_row=df.iloc[[-2]]
+ delta=(last_row[ob].values-butlast_row[ob].values).flatten()[0]
+ else:
+ delta=1
+ last_row.original_index=None
+ last_row.index=[last_index+1]
+ last_row['__mdb_make_predictions']=True
+ last_row['__mdb_ts_inferred']=True
+ last_row[ob]+=delta
+ returndf.append(last_row)
+
+
+def_make_pred(row)->bool:
+ """
+ Indicates whether a prediction should be made for `row` or not.
+ """
+ returnnothasattr(row,'__mdb_make_predictions')orrow.make_predictions
+
+
+def_ts_to_obj(df:pd.DataFrame,historical_columns:list)->pd.DataFrame:
+ """
+ Casts all historical columns in a dataframe to `object` type.
+
+ :param df: Input dataframe
+ :param historical_columns: Historical columns to type cast
+
+ :return: Dataframe with `object`-typed historical columns
+ """
+ forhist_colinhistorical_columns:
+ df.loc[:,hist_col]=df[hist_col].astype(object)
+ returndf
+
+
+def_ts_order_col_to_cell_lists(df:pd.DataFrame,order_cols:list)->pd.DataFrame:
+ """
+ Casts all data in `order_by` columns into cells.
+
+ :param df: Input dataframe
+ :param order_cols: `order_by` columns
+
+ :return: Dataframe with all `order_cols` modified so that their values are cells, e.g. `1` -> `[1]`
+ """
+ fororder_colinorder_cols:
+ foriiinrange(len(df)):
+ label=df.index.values[ii]
+ df.at[label,order_col]=[df.at[label,order_col]]
+ returndf
+
+
+def_ts_add_previous_rows(df:pd.DataFrame,order_cols:list,window:int)->pd.DataFrame:
+ """
+ Adds previous rows (as determined by `TimeseriesSettings.window`) into the cells of all `order_by` columns.
+
+ :param df: Input dataframe.
+ :param order_cols: `order_by` columns.
+ :param window: value of `TimeseriesSettings.window` parameter.
+
+ :return: Dataframe with all `order_cols` modified so that their values are now arrays of historical context.
+ """# noqa
+ fororder_colinorder_cols:
+ foriinrange(len(df)):
+ previous_indexes=[*range(max(0,i-window),i)]
+
+ forprev_iinreversed(previous_indexes):
+ df.iloc[i][order_col].append(
+ df.iloc[prev_i][order_col][-1]
+ )
+
+ # Zero pad
+ # @TODO: Remove since RNN encoder can do without (???)
+ df.iloc[i][order_col].extend(
+ [0]*(1+window-len(df.iloc[i][order_col]))
+ )
+ df.iloc[i][order_col].reverse()
+ returndf
+
+
+def_ts_add_previous_target(df:pd.DataFrame,target:str,window:int)->pd.DataFrame:
+ """
+ Adds previous rows (as determined by `TimeseriesSettings.window`) into the cells of the target column.
+
+ :param df: Input dataframe.
+ :param target: target column name.
+ :param window: value of `TimeseriesSettings.window` parameter.
+
+ :return: Dataframe with new `__mdb_ts_previous_{target}` column that contains historical target context.
+ """# noqa
+ iftargetnotindf:
+ returndf
+ previous_target_values=list(df[target])
+ delprevious_target_values[-1]
+ previous_target_values=[None]+previous_target_values
+
+ previous_target_values_arr=[]
+ foriinrange(len(previous_target_values)):
+ prev_vals=previous_target_values[max(i-window,0):i+1]
+ arr=[None]*(window-len(prev_vals)+1)
+ arr.extend(prev_vals)
+ previous_target_values_arr.append(arr)
+
+ df[f'__mdb_ts_previous_{target}']=previous_target_values_arr
+ returndf
+
+
+def_ts_add_future_target(df,target,nr_predictions,data_dtype,mode):
+ """
+ Adds as many columns to the input dataframe as the forecasting horizon asks for (as determined by `TimeseriesSettings.nr_predictions`).
+
+ :param df: Input dataframe.
+ :param target: target column name.
+ :param nr_predictions: value of `TimeseriesSettings.nr_predictions` parameter.
+ :param data_dtype: dictionary with types of all input columns
+ :param mode: either "train" or "predict". `Train` will drop rows with incomplet target info. `Predict` has no effect, for now.
+
+ :return: Dataframe with new `{target}_timestep_{i}'` columns that contains target labels at timestep `i` of a total `TimeseriesSettings.nr_predictions`.
+ """# noqa
+ iftargetnotindf:
+ returndf
+ ifdata_dtypein(dtype.integer,dtype.float,dtype.array,dtype.tsarray):
+ df[target]=df[target].astype(float)
+
+ fortimestep_indexinrange(1,nr_predictions):
+ next_target_value_arr=list(df[target])
+ fordel_indexinrange(0,min(timestep_index,len(next_target_value_arr))):
+ delnext_target_value_arr[0]
+ next_target_value_arr.append(None)
+ col_name=f'{target}_timestep_{timestep_index}'
+ df[col_name]=next_target_value_arr
+ df[col_name]=df[col_name].fillna(value=np.nan)
+
+ # drop rows with incomplete target info.
+ ifmode=='train':
+ forcolin[f'{target}_timestep_{i}'foriinrange(1,nr_predictions)]:
+ if'__mdb_make_predictions'notindf.columns:
+ df['__mdb_make_predictions']=True
+ df.loc[df[col].isna(),['__mdb_make_predictions']]=False
+
+ returndf
+
[docs]classBaseEncoder:
+ """
+ Base class for all encoders.
+
+ An encoder should return encoded representations of any columnar data.
+ The procedure for this is defined inside the `encode()` method.
+
+ If this encoder is expected to handle an output column, then it also needs to implement the respective `decode()` method that handles the inverse transformation from encoded representations to the final prediction in the original column space.
+
+ For encoders that learn representations (as opposed to rule-based), the `prepare()` method will handle all learning logic.
+
+ The `to()` method is used to move PyTorch-based encoders to and from a GPU.
+
+ :param is_target: Whether the data to encode is the target, as per the problem definition.
+ :param is_timeseries_encoder: Whether encoder represents sequential/time-series data. Lightwood must provide specific treatment for this kind of encoder
+ :param is_trainable_encoder: Whether the encoder must return learned representations. Lightwood checks whether this flag is present in order to pass data to the feature representation via the ``prepare`` statement.
+
+ Class Attributes:
+ - is_prepared: Internal flag to signal that the `prepare()` method has been successfully executed.
+ - is_nn_encoder: Whether the encoder is neural network-based.
+ - dependencies: list of additional columns that the encoder might need to encode.
+ - output_size: length of each encoding tensor for a single data point.
+
+ """# noqa
+ is_target:bool
+ is_prepared:bool
+
+ is_timeseries_encoder:bool=False
+ is_trainable_encoder:bool=False
+
+ def__init__(self,is_target=False)->None:
+ self.is_target=is_target
+ self.is_prepared=False
+ self.dependencies=[]
+ self.output_size=None
+
+ # Not all encoders need to be prepared
+ defprepare(self,priming_data)->None:
+ self.is_prepared=True
+
+ defencode(self,column_data)->torch.Tensor:
+ raiseNotImplementedError
+
+ defdecode(self,encoded_data)->List[object]:
+ raiseNotImplementedError
+
+ # Should work for all torch-based encoders, but custom behavior may have to be implemented for weird models
+ defto(self,device,available_devices):
+ # Find all nn.Module type objects and convert them
+ # @TODO: Make this work recursively
+ forvinvars(self):
+ attr=getattr(self,v)
+ ifisinstance(attr,torch.nn.Module):
+ attr.to(device)
+ returnself
Source code for lightwood.encoder.categorical.binary
+importtorch
+importnumpyasnp
+fromscipy.specialimportsoftmax
+fromlightwood.encoder.baseimportBaseEncoder
+
+
+# Exists mainly for datasets with loads of binary flags where OHE can be too slow to fit
+
[docs]classDatetimeEncoder(BaseEncoder):
+ """
+ This encoder produces an encoded representation for timestamps.
+
+ The approach consists on decomposing the timestamp objects into its constituent units (e.g. day-of-week, month, year, etc), and describing each of those with a single value that represents the magnitude in a sensible cycle length.
+ """# noqa
+ def__init__(self,is_target:bool=False):
+ super().__init__(is_target)
+ self.fields=['year','month','day','weekday','hour','minute','second']
+ self.constants={'year':3000.0,'month':12.0,'weekday':7.0,
+ 'hour':24.0,'minute':60.0,'second':60.0}
+ self.output_size=7
+
+ defprepare(self,priming_data):
+ ifself.is_prepared:
+ raiseException('You can only call "prepare" once for a given encoder.')
+
+ self.is_prepared=True
+
+
[docs]defencode(self,data):
+ """
+ :param data: # @TODO: receive a consistent data type here; currently either list of lists or pd.Series w/lists
+ :return: encoded data
+ """
+ ifnotself.is_prepared:
+ raiseException('You need to call "prepare" before calling "encode" or "decode".')
+
+ ret=[self.encode_one(unix_timestamp)forunix_timestampindata]
+
+ returntorch.Tensor(ret)
+
+
[docs]defencode_one(self,unix_timestamp:Optional[float]):
+ """
+ Encodes a list of unix_timestamps, or a list of tensors with unix_timestamps
+ :param data: list of unix_timestamps (unix_timestamp resolution is seconds)
+ :return: a list of vectors
+ """
+ ifis_none(unix_timestamp):
+ vector=[0]*len(self.fields)
+ else:
+ c=self.constants
+ date=datetime.datetime.fromtimestamp(unix_timestamp)
+ day_constant=calendar.monthrange(date.year,date.month)[1]
+ vector=[date.year/c['year'],date.month/c['month'],date.day/day_constant,
+ date.weekday()/c['weekday'],date.hour/c['hour'],
+ date.minute/c['minute'],date.second/c['second']]
+ returnvector
[docs]classDatetimeNormalizerEncoder(BaseEncoder):
+ def__init__(self,is_target:bool=False,sinusoidal:bool=False):
+ super().__init__(is_target)
+ self.sinusoidal=sinusoidal
+ self.fields=['year','month','day','weekday','hour','minute','second']
+ self.constants={'year':3000.0,'month':12.0,'weekday':7.0,
+ 'hour':24.0,'minute':60.0,'second':60.0}
+ ifself.sinusoidal:
+ self.output_size=2
+ else:
+ self.output_size=7
+
+ defprepare(self,priming_data):
+ ifself.is_prepared:
+ raiseException('You can only call "prepare" once for a given encoder.')
+
+ self.is_prepared=True
+
+
[docs]defencode(self,data):
+ """
+ :param data: # @TODO: receive a consistent data type here; currently either list of lists or pd.Series w/lists
+ :return: encoded data
+ """
+ ifnotself.is_prepared:
+ raiseException('You need to call "prepare" before calling "encode" or "decode".')
+
+ ifisinstance(data,pd.Series):
+ data=data.values
+ ifnotisinstance(data[0],Iterable):
+ data=[data]
+
+ ret=[self.encode_one(row)forrowindata]
+
+ returntorch.Tensor(ret)
+
+
[docs]defencode_one(self,data):
+ """
+ Encodes a list of unix_timestamps, or a list of tensors with unix_timestamps
+ :param data: list of unix_timestamps (unix_timestamp resolution is seconds)
+ :return: a list of vectors
+ """
+ ret=[]
+ forunix_timestampindata:
+ ifis_none(unix_timestamp):
+ ifself.sinusoidal:
+ vector=[0,1]*len(self.fields)
+ else:
+ vector=[0]*len(self.fields)
+ else:
+ c=self.constants
+ ifisinstance(unix_timestamp,torch.Tensor):
+ unix_timestamp=unix_timestamp.item()
+ date=datetime.datetime.fromtimestamp(unix_timestamp)
+ day_constant=calendar.monthrange(date.year,date.month)[1]
+ vector=[date.year/c['year'],date.month/c['month'],date.day/day_constant,
+ date.weekday()/c['weekday'],date.hour/c['hour'],
+ date.minute/c['minute'],date.second/c['second']]
+ ifself.sinusoidal:
+ vector=np.array([(np.sin(n),np.cos(n))forninvector]).flatten()
+
+ ret.append(vector)
+
+ returnret
[docs]classImg2VecEncoder(BaseEncoder):
+ is_trainable_encoder:bool=True
+
+ def__init__(self,stop_after:int=3600,is_target:bool=False):
+ super().__init__(is_target)
+ # # I think we should make this an enum, something like: speed, balance, accuracy
+ # self.aim = aim
+ self.is_prepared=False
+
+ self._scaler=transforms.Resize((224,224))
+ self._normalize=transforms.Normalize(mean=[0.485,0.456,0.406],std=[0.229,0.224,0.225])
+ self._to_tensor=transforms.ToTensor()
+ self._img_to_tensor=transforms.Compose([
+ self._scaler,
+ self._to_tensor,
+ self._normalize
+ ])
+ self.stop_after=stop_after
+
+ pil_logger=logging.getLogger('PIL')
+ pil_logger.setLevel(logging.ERROR)
+
+ defprepare(self,train_priming_data:pd.Series,dev_priming_data:pd.Series):
+ # @TODO: Add a bit of training here (maybe? depending on time aim)
+ ifself.is_prepared:
+ raiseException('You can only call "prepare" once for a given encoder.')
+
+ self.model=Img2Vec()
+ self.output_size=self.model.output_size
+ self.is_prepared=True
+
+ defto(self,device,available_devices):
+ self.model.to(device,available_devices)
+ returnself
+
+
[docs]defencode(self,images:List[str])->torch.Tensor:
+ """
+ Encode list of images
+
+ :param images: list of images, each image is a path to a file or a url
+ :return: a torch.floatTensor
+ """
+ ifnotself.is_prepared:
+ raiseException('You need to call "prepare" before calling "encode" or "decode".')
+
+ img_tensors=[self._img_to_tensor(
+ Image.open(img_path)
+ )forimg_pathinimages]
+ vec_arr=[]
+
+ self.model.eval()
+ withtorch.no_grad():
+ forimg_tensorinimg_tensors:
+ vec=self.model(img_tensor.unsqueeze(0),batch=False)
+ vec_arr.append(vec)
+ returntorch.stack(vec_arr).to('cpu')
+
+ defdecode(self,encoded_values_tensor):
+ raiseException('This encoder is not bi-directional')
+"""
+2021.07.16
+Adding flag "embedmode".
+
+Embed-mode is made for when text is one of many columns in the model.
+IF the model is direct (text) -> output, then it's worth just using
+the fine-tuned encoder as the "mixer" persay; thus, turn embed-mode OFF.
+
+This means there are 3 possible modes:
+
+(1) Classification
+ -> Fine tuned, output of encoder is [CLS] embedding
+ -> Fine tuned, output of encoder is the class value
+(2) Regression
+ -> Untrained; output of encoder is [CLS] embedding
+
+Training with regression is WIP; seems like quantile-binning is the best approach
+but using MSE loss while fine-tuning did not demonstrate decent results. Particularly
+because the mixer seems to address this.
+
+2021.03.18
+
+## Padding changes the answer slightly in the model.
+
+The following text encoder uses huggingface's
+Distilbert. Internal benchmarks suggest
+1 epoch of fine tuning is ideal [classification].
+Training ONLY occurs for classification. Regression problems
+are not trained, embeddings are directly generated.
+
+See: https://huggingface.co/transformers/training.html
+for further details.
+
+Currently the model supports only distilbert.
+
+When instantiating the DistilBertForSeq.Class object,
+num_labels indicates whether you use classification or regression.
+
+See: https://huggingface.co/transformers/model_doc/distilbert.html#distilbertforsequenceclassification
+under the 'labels' command
+
+For classification - we use num_labels = 1 + num_classes ***
+
+If you do num_classes + 1, we reserve the LAST label
+as the "unknown" label; this is different from the original
+distilbert model. (prior to 2021.03)
+
+TODOs:
++ Regression
++ Batch encodes() tokenization step
++ Look into auto-encoding lower dimensional representations
+of the output embedding
++ Look into regression tuning (will require grad. clipping)
++ Look into tuning to the encoded space of output.
+"""
+importtime
+importtorch
+fromtorch.utils.dataimportDataLoader
+importos
+importpandasaspd
+fromlightwood.encoder.text.helpers.pretrained_helpersimportTextEmbed
+fromlightwood.helpers.deviceimportget_devices
+fromlightwood.encoder.baseimportBaseEncoder
+fromlightwood.helpers.logimportlog
+fromlightwood.helpers.torchimportLightwoodAutocast
+fromlightwood.apiimportdtype
+fromtransformersimport(
+ DistilBertModel,
+ DistilBertForSequenceClassification,
+ DistilBertTokenizerFast,
+ AdamW,
+ get_linear_schedule_with_warmup,
+)
+fromlightwood.helpers.generalimportis_none
+
+
+
[docs]classPretrainedLangEncoder(BaseEncoder):
+ is_trainable_encoder:bool=True
+
+ """
+ Pretrained language models.
+ Option to train on a target encoding of choice.
+
+ Args:
+ is_target ::Bool; data column is the target of ML.
+ model_name ::str; name of pre-trained model
+ custom_tokenizer ::function; custom tokenizing function
+ batch_size ::int; size of batch
+ max_position_embeddings ::int; max sequence length of input text
+ custom_train ::Bool; If true, trains model on target procided
+ frozen ::Bool; If true, freezes transformer layers during training.
+ epochs ::int; number of epochs to train model with
+ embed_mode ::Bool; If true, assumes the output of the encode() step is the CLS embedding.
+ """
+
+ def__init__(
+ self,
+ stop_after:int,
+ is_target=False,
+ model_name="distilbert",
+ custom_tokenizer=None,
+ batch_size=10,
+ max_position_embeddings=None,
+ frozen=False,
+ epochs=1,
+ output_type=None,
+ embed_mode=True,
+ ):
+ super().__init__(is_target)
+
+ self.output_type=output_type
+ self.name=model_name+" text encoder"
+ log.info(self.name)
+
+ self._max_len=max_position_embeddings
+ self._frozen=frozen
+ self._batch_size=batch_size
+ self._epochs=epochs
+
+ # Model setup
+ self._tokenizer=custom_tokenizer
+ self._model=None
+ self.model_type=None
+
+ # TODO: Other LMs; Distilbert is a good balance of speed/performance
+ self._classifier_model_class=DistilBertForSequenceClassification
+ self._embeddings_model_class=DistilBertModel
+ self._tokenizer_class=DistilBertTokenizerFast
+ self._pretrained_model_name="distilbert-base-uncased"
+
+ self.device,_=get_devices()
+ self.stop_after=stop_after
+
+ self.embed_mode=embed_mode
+ self.uses_target=True
+ self.output_size=None
+
+ # DEBUGGING!!!
+ ifself.embed_mode:
+ log.info("Embedding mode on. [CLS] embedding dim output of encode()")
+ else:
+ log.info("Embedding mode off. Logits are output of encode()")
+
+
[docs]defprepare(self,train_priming_data:pd.Series,dev_priming_data:pd.Series,encoded_target_values:torch.Tensor):
+ """
+ Prepare the encoder by training on the target.
+
+ Training data must be a dict with "targets" avail.
+ Automatically assumes this.
+ """
+ os.environ['TOKENIZERS_PARALLELISM']='true'
+ priming_data=pd.concat([train_priming_data,dev_priming_data])
+ priming_data=priming_data.values
+ ifself.is_prepared:
+ raiseException("Encoder is already prepared.")
+
+ # TODO: Make tokenizer custom with partial function; feed custom->model
+ ifself._tokenizerisNone:
+ self._tokenizer=self._tokenizer_class.from_pretrained(self._pretrained_model_name)
+
+ # Replaces empty strings with ''
+ priming_data=[xifxisnotNoneelse""forxinpriming_data]
+
+ # Checks training data details
+ # TODO: Regression flag; currently training supported for categorical only
+
+ if(self.output_typein(dtype.categorical,dtype.binary)):
+ log.info("Training model.")
+
+ # Prepare priming data into tokenized form + attention masks
+ text=self._tokenizer(priming_data,truncation=True,padding=True)
+
+ log.info("\tOutput trained is categorical")
+
+ labels=encoded_target_values.argmax(dim=1)
+
+ # Construct the model
+ self._model=self._classifier_model_class.from_pretrained(
+ self._pretrained_model_name,
+ num_labels=len(encoded_target_values[0]),
+ ).to(self.device)
+
+ # Construct the dataset for training
+ xinp=TextEmbed(text,labels)
+ dataset=DataLoader(xinp,batch_size=self._batch_size,shuffle=True)
+
+ # If max length not set, adjust
+ ifself._max_lenisNone:
+ self._max_len=self._model.config.max_position_embeddings
+
+ ifself._frozen:
+ log.info("\tFrozen Model + Training Classifier Layers")
+ """
+ Freeze the base transformer model and train
+ a linear layer on top
+ """
+ # Freeze all the transformer parameters
+ forparaminself._model.base_model.parameters():
+ param.requires_grad=False
+
+ optimizer_grouped_parameters=self._model.parameters()
+
+ else:
+ log.info("\tFine-tuning model")
+ """
+ Fine-tuning parameters with weight decay
+ """
+ no_decay=[
+ "bias",
+ "LayerNorm.weight",
+ ]# decay on all terms EXCLUDING bias/layernorms
+ optimizer_grouped_parameters=[
+ {
+ "params":[
+ p
+ forn,pinself._model.named_parameters()
+ ifnotany(ndinnforndinno_decay)
+ ],
+ "weight_decay":0.01,
+ },
+ {
+ "params":[
+ p
+ forn,pinself._model.named_parameters()
+ ifany(ndinnforndinno_decay)
+ ],
+ "weight_decay":0.0,
+ },
+ ]
+
+ optimizer=AdamW(optimizer_grouped_parameters,lr=1e-5)
+ scheduler=get_linear_schedule_with_warmup(
+ optimizer,
+ num_warmup_steps=0,# default value for GLUE
+ num_training_steps=len(dataset)*self._epochs,
+ )
+
+ # Train model; declare optimizer earlier if desired.
+ self._tune_model(
+ dataset,optim=optimizer,scheduler=scheduler,n_epochs=self._epochs
+ )
+
+ else:
+ log.info("Target is not classification; Embeddings Generator only")
+
+ self.model_type="embeddings_generator"
+ self._model=self._embeddings_model_class.from_pretrained(
+ self._pretrained_model_name
+ ).to(self.device)
+
+ # TODO: Not a great flag
+ # Currently, if the task is not classification, you must have
+ # an embedding generator only.
+ ifself.embed_modeisFalse:
+ log.info("Embedding mode must be ON for non-classification targets.")
+ self.embed_mode=True
+
+ self.is_prepared=True
+ encoded=self.encode(priming_data[0:1])
+ self.output_size=len(encoded[0])
+
+ def_tune_model(self,dataset,optim,scheduler,n_epochs=1):
+ """
+ Given a model, train for n_epochs.
+ Specifically intended for tuning; it does NOT use loss/
+ stopping criterion.
+
+ model - torch.nn model;
+ dataset - torch.DataLoader; dataset to train
+ device - torch.device; cuda/cpu
+ log - lightwood.logger.log; log.info output
+ optim - transformers.optimization.AdamW; optimizer
+ scheduler - scheduling params
+ n_epochs - number of epochs to train
+
+ """
+ self._model.train()
+
+ ifoptimisNone:
+ log.info("No opt. provided, setting all params with AdamW.")
+ optim=AdamW(self._model.parameters(),lr=5e-5)
+ else:
+ log.info("Optimizer provided")
+
+ ifschedulerisNone:
+ log.info("No scheduler provided.")
+ else:
+ log.info("Scheduler provided.")
+
+ started=time.time()
+ forepochinrange(n_epochs):
+ total_loss=0
+ iftime.time()-started>self.stop_after:
+ break
+
+ forbatchindataset:
+ optim.zero_grad()
+
+ withLightwoodAutocast():
+ inpids=batch["input_ids"].to(self.device)
+ attn=batch["attention_mask"].to(self.device)
+ labels=batch["labels"].to(self.device)
+ outputs=self._model(inpids,attention_mask=attn,labels=labels)
+ loss=outputs[0]
+
+ total_loss+=loss.item()
+
+ loss.backward()
+ optim.step()
+ ifschedulerisnotNone:
+ scheduler.step()
+
+ self._train_callback(epoch,total_loss/len(dataset))
+
+ def_train_callback(self,epoch,loss):
+ log.info(f"{self.name} at epoch {epoch+1} and loss {loss}!")
+
+
[docs]defencode(self,column_data):
+ """
+ TODO: Maybe batch the text up; may take too long
+ Given column data, encode the dataset.
+
+ Currently, returns the embedding of the pre-classifier layer.
+
+ Args:
+ column_data:: [list[str]] list of text data in str form
+
+ Returns:
+ encoded_representation:: [torch.Tensor] N_sentences x Nembed_dim
+ """
+ ifself.is_preparedisFalse:
+ raiseException("You need to first prepare the encoder.")
+
+ # Set model to testing/eval mode.
+ self._model.eval()
+
+ encoded_representation=[]
+
+ withtorch.no_grad():
+ # Set the weights; this is GPT-2
+ fortextincolumn_data:
+
+ # Omit NaNs
+ ifis_none(text):
+ text=""
+
+ # Tokenize the text with the built-in tokenizer.
+ inp=self._tokenizer.encode(
+ text,truncation=True,return_tensors="pt"
+ ).to(self.device)
+
+ ifself.embed_mode:# Embedding mode ON; return [CLS]
+ output=self._model.base_model(inp).last_hidden_state[:,0]
+
+ # If the model has a pre-classifier layer, use this embedding.
+ ifhasattr(self._model,"pre_classifier"):
+ output=self._model.pre_classifier(output)
+
+ else:# Embedding mode off; return classes
+ output=self._model(inp).logits
+
+ encoded_representation.append(output.detach())
+
+ returntorch.stack(encoded_representation).squeeze(1).to('cpu')
[docs]classTimeSeriesEncoder(BaseEncoder):
+ """
+ Time series encoder. This module can learn features for any `order_by` temporal column, both with and without accompanying target data.
+
+ The backbone of this encoder is either a recurrent neural network or a transformer; both structured in an encoder-decoder fashion.
+ """# noqa
+ is_timeseries_encoder:bool=True
+ is_trainable_encoder:bool=True
+
+ def__init__(self,stop_after:int,is_target=False,original_type:str=None,target:str=None,
+ grouped_by:List[str]=[],encoder_type='rnn'):
+ super().__init__(is_target)
+ self.device,_=get_devices()
+ self.target=target
+ self.grouped_by=grouped_by
+ self._learning_rate=0.01
+ self.output_size=128
+ self._transformer_hidden_size=None
+ self._epochs=int(1e5)# default training epochs
+ self._stop_on_n_bad_epochs=5# stop training after N epochs where loss is worse than running avg
+ self._epochs_running_avg=5# amount of epochs for running average
+ self._pytorch_wrapper=torch.FloatTensor
+ self.is_prepared=False
+ self._is_setup=False
+ self._max_ts_length=0
+ self._sos=0.0# start of sequence for decoding
+ self._eos=0.0# end of input sequence -- padding value for batches
+ self._n_dims=1
+ self._normalizer=None
+ self.dep_norms={}# dict of dict of normalizers for each dependency (can be grouped-by some column)
+ self._target_type=None
+ self._group_combinations=None
+ self.original_type=original_type
+ self.stop_after=stop_after
+ ifencoder_type.lower()=='rnn':
+ self.encoder_class=EncoderRNNNumerical
+ elifencoder_type.lower()=='transformer':
+ self.encoder_class=TransformerEncoder
+
+
[docs]defsetup_nn(self,ts_analysis,dependencies=None):
+ """This method must be executed after initializing, else types are unassigned"""
+ ifself.original_typein(dtype.datetime,dtype.date):
+ self._normalizer=DatetimeNormalizerEncoder(sinusoidal=True)
+ self._n_dims*=len(self._normalizer.fields)*2# sinusoidal datetime components
+ elifself.original_typein(dtype.float,dtype.integer):
+ self._normalizer=MinMaxNormalizer()
+
+ total_dims=self._n_dims
+ dec_hsize=self.output_size
+
+ ifdependencies:
+ fordep_name,depindependencies.items():
+ self.dependencies.append(dep_name)
+
+ ifdep_nameinself.grouped_by:
+ continue# we only use group column for indexing and selecting rows
+
+ assertdep['original_type']in(dtype.categorical,dtype.binary,
+ dtype.integer,dtype.float,dtype.tsarray)
+
+ iff'__mdb_ts_previous_{self.target}'==dep_name:
+ self.dep_norms[dep_name]=ts_analysis['target_normalizers']
+ self._group_combinations=ts_analysis['group_combinations']
+ self._target_type=dep['original_type']
+
+ # if TS analysis yields no normalizers for this dependency, we create a generic one based on its dtype
+ else:
+ ifdep['original_type']in(dtype.categorical,dtype.binary):
+ self.dep_norms[dep_name]['__default']=CatNormalizer()
+ else:
+ self.dep_norms[dep_name]['__default']=MinMaxNormalizer()
+
+ self.dep_norms[dep_name]['__default'].prepare(dep['data'])
+ self._group_combinations={'__default':None}
+
+ # add descriptor size to the total encoder output dimensionality
+ ifdep['original_type']in(dtype.categorical,dtype.binary):
+ total_dims+=len(self.dep_norms[dep_name]['__default'].scaler.categories_[0])
+ elifdep['original_type']in(dtype.integer,dtype.float,dtype.tsarray):
+ total_dims+=1
+
+ ifself.encoder_class==EncoderRNNNumerical:
+ self._enc_criterion=nn.MSELoss()
+ self._dec_criterion=self._enc_criterion
+ self._encoder=self.encoder_class(input_size=total_dims,
+ hidden_size=self.output_size).to(self.device)
+ elifself.encoder_class==TransformerEncoder:
+ self._enc_criterion=self._masked_criterion
+ self._dec_criterion=nn.MSELoss()
+ self._base_criterion=nn.MSELoss(reduction="none")
+ ifself._transformer_hidden_sizeisNone:
+ self._transformer_hidden_size=total_dims*2# arbitrary
+
+ self._encoder=self.encoder_class(ninp=total_dims,
+ nhead=gcd(dec_hsize,total_dims),
+ nhid=self._transformer_hidden_size,
+ nlayers=1).to(self.device)
+ else:
+ raiseException(f"Time series encoder class not supported: {self.encoder_class}")
+
+ self._decoder=DecoderRNNNumerical(output_size=total_dims,hidden_size=dec_hsize).to(self.device)
+ self._parameters=list(self._encoder.parameters())+list(self._decoder.parameters())
+ self._optimizer=optim.AdamW(self._parameters,lr=self._learning_rate,weight_decay=1e-4)
+ self._n_dims=total_dims
+ self._is_setup=True
[docs]defprepare(self,train_priming_data:pd.Series,dev_priming_data:pd.Series,dependency_data={},ts_analysis=None,
+ feedback_hoop_function=log.info,batch_size=256):
+ """
+ :param priming_data: a list of (self._n_dims)-dimensional time series [[dim1_data], ...]
+ :param dependency_data: raw data from other columns
+ :param ts_analysis: dictionary with time analysis info (e.g. normalizers for each target group)
+ :param feedback_hoop_function: method to use if you want to get feedback on the training process
+ :param batch_size
+ """
+ priming_data=pd.concat([train_priming_data,dev_priming_data])
+ priming_data=list(priming_data.values)
+
+ ifself.is_prepared:
+ raiseException('You can only call "prepare" once for a given encoder.')
+ else:
+ self.setup_nn(ts_analysis,dependency_data)
+
+ started=time.time()
+
+ # Convert to array and determine max length
+ priming_data,lengths_data=self._prepare_raw_data(priming_data)
+ self._max_ts_length=int(lengths_data.max())
+
+ ifself._normalizer:
+ self._normalizer.prepare(priming_data)
+ priming_data=self._normalizer.encode(priming_data).to(self.device)
+ iflen(priming_data.shape)<3:
+ priming_data=priming_data.unsqueeze(-1)
+ else:
+ priming_data=torch.stack([dfordinpriming_data]).unsqueeze(-1).to(self.device)
+
+ # merge all normalized data into a training batch
+ normalized_tensors=[]
+ fordep_name,dep_dataindependency_data.items():
+ ifdep_nameinself.grouped_by:
+ continue
+ ifdep_data['original_type']in(dtype.integer,dtype.float):
+ dep_data['group_info']={group:dependency_data[group]['data']forgroupinself.grouped_by}
+ data=torch.zeros((len(priming_data),lengths_data.max().int().item(),1))
+ all_idxs=set(range(len(data)))
+ forgroup_name,normalizerinself.dep_norms[dep_name].items():
+ ifgroup_name!='__default':
+ idxs,subset=get_group_matches(dep_data,normalizer.combination)
+ normalized=normalizer.encode(subset).unsqueeze(-1)
+ data[idxs,:,:]=normalized
+ all_idxs-=set(idxs)
+ iflen(all_idxs)>0and'__default'inself.dep_norms[dep_name].keys():
+ default_norm=self.dep_norms[dep_name]['__default']
+ subset=[dep_data['data'][idx]foridxinlist(all_idxs)]
+ data[list(all_idxs),:,:]=torch.Tensor(default_norm.encode(subset)).unsqueeze(-1)
+
+ else:
+ # categorical has only one normalizer at all times
+ normalizer=self.dep_norms[dep_name]['__default']
+ data=normalizer.encode(dep_data['data'].values)
+ iflen(data.shape)<3:
+ data=data.unsqueeze(-1)# add feature dimension
+ data[torch.isnan(data)]=0.0
+ normalized_tensors.append(data)
+
+ ifnormalized_tensors:
+ normalized_data=torch.cat(normalized_tensors,dim=-1).to(self.device)
+ priming_data=torch.cat([priming_data,normalized_data],dim=-1)
+
+ self._encoder.train()
+ running_losses=np.full(self._epochs_running_avg,np.nan)
+ bad_epochs=0
+
+ forepochinrange(self._epochs):
+ average_loss=0
+
+ forbatch_idxinrange(0,len(priming_data),batch_size):
+ # setup loss and optimizer
+ self._optimizer.zero_grad()
+ loss=0
+
+ # shape: (batch_size, timesteps, n_dims)
+ batch=self._get_batch(priming_data,batch_idx,min(batch_idx+batch_size,len(priming_data)))
+
+ # encode and decode through time
+ withLightwoodAutocast():
+ ifself.encoder_class==TransformerEncoder:
+ # pack batch length info tensor
+ len_batch=self._get_batch(lengths_data,batch_idx,min(
+ batch_idx+batch_size,len(priming_data)))
+ batch=batch,len_batch
+
+ next_tensor,hidden_state,dec_loss=self._encoder.bptt(
+ batch,self._enc_criterion,self.device)
+ loss+=dec_loss
+
+ else:
+ next_tensor,hidden_state,enc_loss=self._encoder.bptt(
+ batch,self._enc_criterion,self.device)
+ loss+=enc_loss
+
+ next_tensor,hidden_state,dec_loss=self._decoder.decode(
+ batch,next_tensor,self._dec_criterion,self.device,hidden_state=hidden_state)
+ loss+=dec_loss
+
+ loss.backward()
+
+ self._optimizer.step()
+ average_loss+=loss.item()
+
+ average_loss=average_loss/len(priming_data)
+ batch_idx+=batch_size
+
+ ifepoch>self._epochs_running_avgandaverage_loss>np.average(running_losses):
+ bad_epochs+=1
+
+ # update running loss
+ running_losses[:-1]=running_losses[1:]
+ running_losses[-1]=average_loss
+
+ iffeedback_hoop_functionisnotNone:
+ feedback_hoop_function(
+ "time series encoder epoch [{epoch_n}/{total}] average_loss = {average_loss}".format(
+ epoch_n=epoch+1,total=self._epochs,average_loss=average_loss))
+
+ ifbad_epochs>self._stop_on_n_bad_epochs:
+ break
+ elif(time.time()-started)>self.stop_after:
+ break
+
+ self.is_prepared=True
+
+ def_encode_one(self,data,previous=None,initial_hidden=None,return_next_value=False):
+ """
+ This method encodes one single row of serial data
+ :param data: multidimensional time series as list of lists [[dim1_data], [dim2_data], ...]
+ (dim_data: string with format "x11, x12, ... x1n")
+ :param initial_hidden: if you want to encode from an initial hidden state other than 0s
+ :param return_next_value: if you want to return the next value in the time series too
+
+ :return: either encoded_value or (encoded_value, next_value)
+ """
+ self._encoder.eval()
+ withtorch.no_grad():
+ # Convert to array and determine max length
+ data,lengths_data=self._prepare_raw_data(data)
+ self._max_ts_length=int(lengths_data.max())
+
+ ifself._normalizer:
+ data=self._normalizer.encode(data).to(self.device)
+ iflen(data.shape)<3:
+ data=data.unsqueeze(-1)
+ else:
+ data=torch.stack([dfordindata]).unsqueeze(-1).to(self.device)
+
+ ifpreviousisnotNone:
+ target_tensor=torch.stack(previous).to(self.device)
+ target_tensor[torch.isnan(target_tensor)]=0.0
+ iflen(target_tensor.shape)<3:
+ target_tensor=target_tensor.transpose(0,1).unsqueeze(0)
+ data_tensor=torch.cat((data,target_tensor),dim=-1)
+ else:
+ data_tensor=data
+
+ steps=data_tensor.shape[1]
+
+ ifself.encoder_class==EncoderRNNNumerical:
+ encoder_hidden=self._encoder.init_hidden(self.device)
+ encoder_hidden=encoder_hiddenifinitial_hiddenisNoneelseinitial_hidden
+
+ next_tensor=None
+ fortensor_iinrange(steps):
+ next_tensor,encoder_hidden=self._encoder.forward(data_tensor[:,tensor_i,:].unsqueeze(dim=0),
+ encoder_hidden)
+
+ else:
+ next_tensor=None
+ len_batch=self._get_batch(lengths_data,0,len(data))
+ batch_size,timesteps,_=data_tensor.shape
+
+ forstart_chunkinrange(0,timesteps,timesteps):
+ data,targets,lengths_chunk=get_chunk(data_tensor,len_batch,start_chunk,timesteps)
+ data=data.transpose(0,1)
+ next_tensor,encoder_hidden=self._encoder.forward(data,lengths_chunk,self.device)
+
+ ifreturn_next_value:
+ returnencoder_hidden,next_tensor
+ else:
+ returnencoder_hidden
+
+
[docs]defencode(self,column_data,dependency_data=None,get_next_count=None):
+ """
+ Encode a list of time series data
+ :param column_data: a list of (self._n_dims)-dimensional time series [[dim1_data], ...] to encode
+ :param get_next_count: default None, but you can pass a number X and it will return the X following predictions
+ on the series for each ts_data_point in column_data
+ :return: a list of encoded time series or if get_next_count !=0 two lists (encoded_values, projected_numbers)
+ """
+
+ ifnotself.is_prepared:
+ raiseException('You need to call "prepare" before calling "encode" or "decode".')
+
+ ifisinstance(column_data,pd.Series):
+ data=deepcopy(column_data.values)# get a copy to avoid modifying the actual data frame
+ else:
+ data=column_data
+
+ foriinrange(len(data)):
+ ifnotisinstance(data[i][0],list):
+ data[i]=[data[i]]# add dimension for 1D timeseries
+
+ # include autoregressive target data
+ ptd=[]
+ ifdependency_dataisnotNone:
+ fordep,dep_dataindependency_data.items():
+ ifdepinself.grouped_by:
+ continue
+ # normalize numerical target per group-by
+ ifself._target_typein(dtype.integer,dtype.float,dtype.tsarray):
+ dep_info={
+ 'group_info':{group:dependency_data[group]forgroupinself.grouped_by},
+ 'data':dep_data
+ }
+ tensor=torch.zeros((len(dep_data),len(dep_data[0]),1)).to(self.device)
+ all_idxs=set(range(len(dep_data)))
+
+ forcombinationin[cforcinself._group_combinationsifc!='__default']:
+ normalizer=self.dep_norms[dep].get(frozenset(combination),None)
+ ifnormalizerisNone:
+ normalizer=self.dep_norms[dep]['__default']
+ idxs,subset=get_group_matches(dep_info,normalizer.combination)
+ ifidxs:
+ tensor[idxs,:,:]=torch.Tensor(normalizer.encode(subset)).unsqueeze(-1).to(self.device)
+ all_idxs-=set(idxs)
+
+ # encode all remaining rows (not belonging to any grouped combination) with default normalizer
+ ifall_idxs:
+ default_norm=self.dep_norms[dep]['__default']
+ subset=[dep_data[idx]foridxinall_idxs]
+ tensor[list(all_idxs),:,:]=torch.Tensor(
+ default_norm.encode(subset)).unsqueeze(-1).to(self.device)
+ tensor[torch.isnan(tensor)]=0.0
+
+ # normalize categorical target
+ else:
+ normalizer=self.dep_norms[dep]['__default']
+ tensor=normalizer.encode(dep_data)
+ tensor[torch.isnan(tensor)]=0.0
+
+ ptd.append(tensor)
+
+ ret=[]
+ next=[]
+
+ fori,valinenumerate(data):
+ ifget_next_countisNone:
+ ifdependency_dataisnotNoneandlen(dependency_data)>0andlen(ptd)>0:
+ encoded=self._encode_one(val,previous=[values[i]forvaluesinptd])
+ else:
+ encoded=self._encode_one(val)
+
+ else:
+ ifget_next_count<=0:
+ raiseException('get_next_count must be greater than 0')
+
+ hidden=None
+ vector=val
+ next_i=[]
+
+ forjinrange(get_next_count):
+ hidden,next_reading=self._encode_one(vector,initial_hidden=hidden,return_next_value=True)
+ vector=[next_reading]
+ ifj==0:
+ encoded=hidden
+ next_i.append(next_reading)
+
+ next_value=next_i[0][0].cpu()
+
+ ifself._normalizer:
+ next_value=torch.Tensor(self._normalizer.decode(next_value))
+
+ next.append(next_value)
+
+ ret.append(encoded[0][0].cpu())
+
+ ifget_next_countisNone:
+ returntorch.stack(ret)
+ else:
+ returntorch.stack(ret),torch.stack(next)
+
+ def_decode_one(self,hidden,steps):
+ """
+ Decodes a single time series from its encoded representation.
+ :param hidden: time series embedded representation tensor, with size self.output_size
+ :param steps: as in decode(), defines how many values to output when reconstructing
+ :return: decoded time series list
+ """
+ self._decoder.eval()
+ withtorch.no_grad():
+ ret=[]
+ next_tensor=torch.full((1,1,self._n_dims),self._sos,dtype=torch.float32).to(self.device)
+ timesteps=stepsifstepselseself._max_ts_length
+ for_inrange(timesteps):
+ next_tensor,hidden=self._decoder.forward(next_tensor,hidden)
+ ret.append(next_tensor)
+ returntorch.stack(ret)
+
+
[docs]defdecode(self,encoded_data,steps=None):
+ """
+ Decode a list of embedded multidimensional time series
+ :param encoded_data: a list of embeddings [ e1, e2, ...] to be decoded into time series
+ :param steps: fixed number of timesteps to reconstruct from each embedding.
+ If None, encoder will output the largest length encountered during training.
+ :return: a list of reconstructed time series
+ """
+ ifnotself.is_prepared:
+ raiseException('You need to call "prepare" before calling "encode" or "decode".')
+
+ ret=[]
+ for_,valinenumerate(encoded_data):
+ hidden=torch.unsqueeze(torch.unsqueeze(val,dim=0),dim=0).to(self.device)
+ reconstruction=self._decode_one(hidden,steps).cpu().squeeze().T.numpy()
+
+ ifself._n_dims==1:
+ reconstruction=reconstruction.reshape(1,-1)
+
+ ifself._normalizer:
+ reconstruction=self._normalizer.decode(reconstruction)
+
+ ret.append(reconstruction)
+
+ returntorch.Tensor(ret)
+
+ def_masked_criterion(self,output,targets,lengths):
+ """ Computes the loss of the first `lengths` items in the chunk """
+ # Put in (B, T) format and zero-out the unnecessary values
+ mask=len_to_mask(lengths,zeros=False).t()
+
+ # Inflate to feature dimension
+ mask=mask.unsqueeze(-1).repeat(1,1,output.shape[-1])
+ output=output*mask
+ targets=targets*mask
+
+ # compute the loss with respect to the appropriate lengths and average across the batch-size
+ # We compute for every output (x_i)_i=1^L and target (y_i)_i=1^L, loss = 1/L \sum (x_i - y_i)^2
+ # And average across the mini-batch
+ losses=self._base_criterion(output,targets).sum(dim=2).sum(dim=0)
+
+ # The TBPTT will compute a slightly different loss, but it is not problematic
+ loss=torch.dot((1.0/lengths.float()),losses)/len(losses)
+
+ returnloss
[docs]classBaseEnsemble:
+ """
+ Base class for all ensembles.
+
+ Ensembles wrap sets of Lightwood mixers, with the objective of generating better predictions based on the output of each mixer.
+
+ There are two important methods for any ensemble to work:
+ 1. `__init__()` should prepare all mixers and internal ensemble logic.
+ 2. `__call__()` applies any aggregation rules to generate final predictions based on the output of each mixer.
+
+ Class Attributes:
+ - mixers: List of mixers the ensemble will use.
+ - supports_proba: For classification tasks, whether the ensemble supports yielding per-class scores rather than only returning the predicted label.
+
+ """# noqa
+ data:EncodedDs
+ mixers:List[BaseMixer]
+ best_index:int# @TODO: maybe only applicable to BestOf
+ supports_proba:bool
+
+ def__init__(self,target,mixers:List[BaseMixer],data:EncodedDs)->None:
+ self.data=data
+ self.mixers=mixers
+ self.best_index=0
+ self.supports_proba=False
+
+ def__call__(self,ds:EncodedDs,args:PredictionArguments)->pd.DataFrame:
+ raiseNotImplementedError()
[docs]classModeEnsemble(BaseEnsemble):
+ mixer_scores:Dict[str,float]
+
+ def__init__(self,target,mixers:List[BaseMixer],data:EncodedDs,dtype_dict:dict,
+ accuracy_functions,args:PredictionArguments,ts_analysis:Optional[dict]=None)->None:
+ super().__init__(target,mixers,data)
+ self.mixer_scores={}
+
+ ifdtype_dict[target]notin(dtype.binary,dtype.categorical,dtype.tags):
+ raiseException(
+ 'This ensemble can only be used in classification problems! '+
+ f'Got target dtype {dtype_dict[target]} instead!')
+
+ for_,mixerinenumerate(mixers):
+ score_dict=evaluate_accuracy(
+ data.data_frame,
+ mixer(data,args)['prediction'],
+ target,
+ accuracy_functions,
+ ts_analysis=ts_analysis
+ )
+ avg_score=np.mean(list(score_dict.values()))
+ log.info(f'Mixer: {type(mixer).__name__} got accuracy: {avg_score}')
+
+ ifis_nan_numeric(avg_score):
+ avg_score=-pow(2,63)
+ log.warning(f'Change the accuracy of mixer {type(mixer).__name__} to valid value: {avg_score}')
+
+ self.mixer_scores[f'__mdb_mixer_{type(mixer).__name__}']=avg_score
+
+ def_pick_mode_highest_score(self,prediction:pd.Series):
+ """If the predictions are unimodal, return the mode. If there are multiple modes, return the mode whose voting
+ mixers have the highest score."""
+ prediction_counts=prediction.value_counts()
+
+ # If there is a clear winner, i.e. only one prediction
+ iflen(prediction_counts)==1:
+ returnprediction_counts.index[0]
+
+ counts=prediction_counts.values# how many times all predictions have appeared
+ max_count=np.max(counts)# how many times the most frequent predictions have apppeared
+
+ # most frequent predictions and how many times they appeared
+ modes=prediction_counts[prediction_counts==max_count]
+
+ modes_predictions=modes.index# most frequent predictions
+
+ # For each mode, get the sum of the scores of the predictors who voted for it
+ modes_predictions_scores={}
+ formode_predictioninmodes_predictions:
+ voting_mixers_name=prediction[prediction==mode_prediction].index.tolist()
+ modes_predictions_scores[mode_prediction]=np.sum(
+ [self.mixer_scores[mixer_name]formixer_nameinvoting_mixers_name])
+
+ # Return the mode with the maximum sum of accuracies
+ returnmax(modes_predictions_scores,key=modes_predictions_scores.get)
+
+ def__call__(self,ds:EncodedDs,args:PredictionArguments)->pd.DataFrame:
+ predictions_df=pd.DataFrame()
+ formixerinself.mixers:
+ predictions_df[f'__mdb_mixer_{type(mixer).__name__}']=mixer(ds,args=args)['prediction']
+
+ mode_df=predictions_df.apply(func=self._pick_mode_highest_score,axis='columns')
+
+ returnpd.DataFrame(mode_df,columns=['prediction'])
[docs]classBaseMixer:
+ """
+ Base class for all mixers.
+
+ Mixers are the backbone of all Lightwood machine learning models. They intake encoded feature representations for every column, and are tasked with learning to fulfill the predictive requirements stated in a problem definition.
+
+ There are two important methods for any mixer to work:
+ 1. `fit()` contains all logic to train the mixer with the training data that has been encoded by all the (already trained) Lightwood encoders for any given task.
+ 2. `__call__()` is executed to generate predictions once the mixer has been trained using `fit()`.
+
+ An additional `partial_fit()` method is used to update any mixer that has already been trained.
+
+ Class Attributes:
+ - stable: If set to `True`, this mixer should always work. Any mixer with `stable=False` can be expected to fail under some circumstances.
+ - fit_data_len: Length of the training data.
+ - supports_proba: For classification tasks, whether the mixer supports yielding per-class scores rather than only returning the predicted label.
+
+ """# noqa
+ stable:bool
+ fit_data_len:int# @TODO (Patricio): should this really be in `BaseMixer`?
+ supports_proba:bool
+
+ def__init__(self,stop_after:int):
+ """
+ Initializer a mixer.
+
+ :param stop_after: Time budget to train this mixer.
+ """
+ self.stop_after=stop_after
+ self.supports_proba=False
+
+
[docs]deffit(self,train_data:EncodedDs,dev_data:EncodedDs)->None:
+ """
+ Fits/trains a mixer with training data.
+
+ :param train_data: encoded representations of the training data subset.
+ :param dev_data: encoded representations of the "dev" data subset. This can be used as an internal validation subset (e.g. it is used for early stopping in the default `Neural` mixer).
+
+ """# noqa
+ raiseNotImplementedError()
+
+ def__call__(self,ds:EncodedDs,
+ args:PredictionArguments=PredictionArguments())->pd.DataFrame:
+ """
+ Calls a trained mixer to predict the target column given some input data.
+
+ :param ds: encoded representations of input data.
+ :param args: a `lightwood.api.types.PredictionArguments` object, including all relevant inference-time arguments to customize the behavior.
+ :return:
+ """# noqa
+ raiseNotImplementedError()
+
+
[docs]defpartial_fit(self,train_data:EncodedDs,dev_data:EncodedDs)->None:
+ """
+ Partially fits/trains a mixer with new training data. This is a somewhat experimental method, and it aims at updating pre-existing Lightwood predictors.
+
+ :param train_data: encoded representations of the new training data subset.
+ :param dev_data: encoded representations of new the "dev" data subset. As in `fit()`, this can be used as an internal validation subset.
+
+ """# noqa
+ pass
[docs]classNeural(BaseMixer):
+ model:nn.Module
+ dtype_dict:dict
+ target:str
+ epochs_to_best:int
+ fit_on_dev:bool
+ supports_proba:bool
+
+ def__init__(
+ self,stop_after:int,target:str,dtype_dict:Dict[str,str],
+ timeseries_settings:TimeseriesSettings,target_encoder:BaseEncoder,net:str,fit_on_dev:bool,
+ search_hyperparameters:bool):
+ """
+ The Neural mixer trains a fully connected dense network from concatenated encoded outputs of each of the features in the dataset to predicted the encoded output.
+
+ :param stop_after: How long the total fitting process should take
+ :param target: Name of the target column
+ :param dtype_dict: Data type dictionary
+ :param timeseries_settings: TimeseriesSettings object for time-series tasks, refer to its documentation for available settings.
+ :param target_encoder: Reference to the encoder used for the target
+ :param net: The network type to use (`DeafultNet` or `ArNet`)
+ :param fit_on_dev: If we should fit on the dev dataset
+ :param search_hyperparameters: If the network should run a more through hyperparameter search (currently disabled)
+ """# noqa
+ super().__init__(stop_after)
+ self.dtype_dict=dtype_dict
+ self.target=target
+ self.timeseries_settings=timeseries_settings
+ self.target_encoder=target_encoder
+ self.epochs_to_best=0
+ self.fit_on_dev=fit_on_dev
+ self.net_class=DefaultNetifnet=='DefaultNet'elseArNet
+ self.supports_proba=dtype_dict[target]in[dtype.binary,dtype.categorical]
+ self.search_hyperparameters=search_hyperparameters
+ self.stable=True
+
+ def_final_tuning(self,data):
+ ifself.dtype_dict[self.target]in(dtype.integer,dtype.float,dtype.quantity):
+ self.model=self.model.eval()
+ withtorch.no_grad():
+ acc_dict={}
+ fordecode_login[True,False]:
+ self.target_encoder.decode_log=decode_log
+ decoded_predictions=[]
+ decoded_real_values=[]
+ forX,Yindata:
+ X=X.to(self.model.device)
+ Y=Y.to(self.model.device)
+ Yh=self.model(X)
+
+ Yh=torch.unsqueeze(Yh,0)iflen(Yh.shape)<2elseYh
+ Y=torch.unsqueeze(Y,0)iflen(Y.shape)<2elseY
+
+ decoded_predictions.extend(self.target_encoder.decode(Yh))
+ decoded_real_values.extend(self.target_encoder.decode(Y))
+
+ acc_dict[decode_log]=r2_score(decoded_real_values,decoded_predictions)
+
+ self.target_encoder.decode_log=acc_dict[True]>acc_dict[False]
+
+ def_select_criterion(self)->torch.nn.Module:
+ ifself.dtype_dict[self.target]in(dtype.categorical,dtype.binary):
+ criterion=TransformCrossEntropyLoss(weight=self.target_encoder.index_weights.to(self.model.device))
+ elifself.dtype_dict[self.target]in(dtype.tags):
+ criterion=nn.BCEWithLogitsLoss()
+ elif(self.dtype_dict[self.target]in(dtype.integer,dtype.float,dtype.tsarray,dtype.quantity)
+ andself.timeseries_settings.is_timeseries):
+ criterion=nn.L1Loss()
+ elifself.dtype_dict[self.target]in(dtype.integer,dtype.float,dtype.quantity):
+ criterion=MSELoss()
+ else:
+ criterion=MSELoss()
+
+ returncriterion
+
+ def_select_optimizer(self)->Optimizer:
+ # ad_optim.Ranger
+ # torch.optim.AdamW
+ ifself.timeseries_settings.is_timeseries:
+ optimizer=ad_optim.Ranger(self.model.parameters(),lr=self.lr)
+ else:
+ optimizer=ad_optim.Ranger(self.model.parameters(),lr=self.lr,weight_decay=2e-2)
+
+ returnoptimizer
+
+ def_find_lr(self,dl):
+ optimizer=self._select_optimizer()
+ criterion=self._select_criterion()
+ scaler=GradScaler()
+
+ running_losses:List[float]=[]
+ cum_loss=0
+ lr_log=[]
+ best_model=self.model
+ stop=False
+ batches=0
+ forepochinrange(1,101):
+ ifstop:
+ break
+
+ fori,(X,Y)inenumerate(dl):
+ ifstop:
+ break
+
+ batches+=len(X)
+ X=X.to(self.model.device)
+ Y=Y.to(self.model.device)
+ withLightwoodAutocast():
+ optimizer.zero_grad()
+ Yh=self.model(X)
+ loss=criterion(Yh,Y)
+ ifLightwoodAutocast.active:
+ scaler.scale(loss).backward()
+ scaler.step(optimizer)
+ scaler.update()
+ else:
+ loss.backward()
+ optimizer.step()
+ cum_loss+=loss.item()
+
+ # Account for ranger lookahead update
+ if(i+1)*epoch%6:
+ batches=0
+ lr=optimizer.param_groups[0]['lr']
+ log.info(f'Loss of {cum_loss} with learning rate {lr}')
+ running_losses.append(cum_loss)
+ lr_log.append(lr)
+ cum_loss=0
+ iflen(running_losses)<2ornp.mean(running_losses[:-1])>np.mean(running_losses):
+ optimizer.param_groups[0]['lr']=lr*1.4
+ # Time saving since we don't have to start training fresh
+ best_model=deepcopy(self.model)
+ else:
+ stop=True
+
+ best_loss_lr=lr_log[np.argmin(running_losses)]
+ lr=best_loss_lr
+ log.info(f'Found learning rate of: {lr}')
+ returnlr,best_model
+
+ def_max_fit(self,train_dl,dev_dl,criterion,optimizer,scaler,stop_after,return_model_after):
+ started=time.time()
+ epochs_to_best=0
+ best_dev_error=pow(2,32)
+ running_errors=[]
+ best_model=self.model
+
+ forepochinrange(1,return_model_after+1):
+ self.model=self.model.train()
+ running_losses:List[float]=[]
+ fori,(X,Y)inenumerate(train_dl):
+ X=X.to(self.model.device)
+ Y=Y.to(self.model.device)
+ withLightwoodAutocast():
+ optimizer.zero_grad()
+ Yh=self.model(X)
+ loss=criterion(Yh,Y)
+ ifLightwoodAutocast.active:
+ scaler.scale(loss).backward()
+ scaler.step(optimizer)
+ scaler.update()
+ else:
+ loss.backward()
+ optimizer.step()
+
+ running_losses.append(loss.item())
+
+ train_error=np.mean(running_losses)
+ epoch_error=self._error(dev_dl,criterion)
+ running_errors.append(epoch_error)
+ log.info(f'Loss @ epoch {epoch}: {epoch_error}')
+
+ ifnp.isnan(train_error)ornp.isnan(
+ running_errors[-1])ornp.isinf(train_error)ornp.isinf(
+ running_errors[-1]):
+ break
+
+ ifbest_dev_error>running_errors[-1]:
+ best_dev_error=running_errors[-1]
+ best_model=deepcopy(self.model)
+ epochs_to_best=epoch
+
+ iflen(running_errors)>=5:
+ delta_mean=np.average([running_errors[-i-1]-running_errors[-i]foriinrange(1,5)],
+ weights=[(1/2)**iforiinrange(1,5)])
+ ifdelta_mean<=0:
+ break
+ elif(time.time()-started)>stop_after:
+ break
+ elifrunning_errors[-1]<0.0001ortrain_error<0.0001:
+ break
+
+ ifnp.isnan(best_dev_error):
+ best_dev_error=pow(2,32)
+ returnbest_model,epochs_to_best,best_dev_error
+
+ def_error(self,dev_dl,criterion)->float:
+ self.model=self.model.eval()
+ running_losses:List[float]=[]
+ withtorch.no_grad():
+ forX,Yindev_dl:
+ X=X.to(self.model.device)
+ Y=Y.to(self.model.device)
+ Yh=self.model(X)
+ running_losses.append(criterion(Yh,Y).item())
+ returnnp.mean(running_losses)
+
+ def_init_net(self,ds:EncodedDs):
+ net_kwargs={'input_size':len(ds[0][0]),
+ 'output_size':len(ds[0][1]),
+ 'num_hidden':self.num_hidden,
+ 'dropout':0}
+
+ ifself.net_class==ArNet:
+ net_kwargs['encoder_span']=ds.encoder_spans
+ net_kwargs['target_name']=self.target
+
+ self.model=self.net_class(**net_kwargs)
+
+ # @TODO: Compare partial fitting fully on and fully off on the benchmarks!
+ # @TODO: Writeup on the methodology for partial fitting
+
[docs]deffit(self,train_data:EncodedDs,dev_data:EncodedDs)->None:
+ """
+ Fits the Neural mixer on some data, making it ready to predit
+
+ :param train_data: The EncodedDs on which to train the network
+ :param dev_data: Data used for early stopping and hyperparameter determination
+ """
+ # ConcatedEncodedDs
+ self.batch_size=min(200,int(len(train_data)/10))
+ self.batch_size=max(40,self.batch_size)
+
+ dev_dl=DataLoader(dev_data,batch_size=self.batch_size,shuffle=False)
+ train_dl=DataLoader(train_data,batch_size=self.batch_size,shuffle=False)
+
+ self.lr=1e-4
+ self.num_hidden=1
+
+ # Find learning rate
+ # keep the weights
+ self._init_net(train_data)
+ self.lr,self.model=self._find_lr(train_dl)
+
+ # Keep on training
+ optimizer=self._select_optimizer()
+ criterion=self._select_criterion()
+ scaler=GradScaler()
+
+ self.model,epoch_to_best_model,err=self._max_fit(
+ train_dl,dev_dl,criterion,optimizer,scaler,self.stop_after,return_model_after=20000)
+
+ self.epochs_to_best+=epoch_to_best_model
+
+ ifself.fit_on_dev:
+ self.partial_fit(dev_data,train_data)
+ self._final_tuning(dev_data)
+
+
[docs]defpartial_fit(self,train_data:EncodedDs,dev_data:EncodedDs)->None:
+ """
+ Augments the mixer's fit with new data, nr of epochs is based on the amount of epochs the original fitting took
+
+ :param train_data: The EncodedDs on which to train the network
+ :param dev_data: Data used for early stopping and hyperparameter determination
+ """
+
+ # Based this on how long the initial training loop took, at a low learning rate as to not mock anything up tooo badly # noqa
+ train_dl=DataLoader(train_data,batch_size=self.batch_size,shuffle=True)
+ dev_dl=DataLoader(dev_data,batch_size=self.batch_size,shuffle=True)
+ optimizer=self._select_optimizer()
+ criterion=self._select_criterion()
+ scaler=GradScaler()
+
+ self.model,_,_=self._max_fit(train_dl,dev_dl,criterion,optimizer,scaler,
+ self.stop_after,max(1,int(self.epochs_to_best/3)))
+
+ def__call__(self,ds:EncodedDs,
+ args:PredictionArguments=PredictionArguments())->pd.DataFrame:
+ """
+ Make predictions based on datasource similar to the one used to fit (sans the target column)
+
+ :param ds: The EncodedDs for which to generate the predictions
+ :param arg: Argument for predicting
+
+ :returns: A dataframe cotaining the decoded predictions and (depending on the args) additional information such as the probabilites for each target class
+ """# noqa
+ self.model=self.model.eval()
+ decoded_predictions:List[object]=[]
+ all_probs:List[List[float]]=[]
+ rev_map={}
+
+ withtorch.no_grad():
+ foridx,(X,Y)inenumerate(ds):
+ X=X.to(self.model.device)
+ Yh=self.model(X)
+ Yh=torch.unsqueeze(Yh,0)iflen(Yh.shape)<2elseYh
+
+ kwargs={}
+ fordepinself.target_encoder.dependencies:
+ kwargs['dependency_data']={dep:ds.data_frame.iloc[idx][[dep]].values}
+
+ ifargs.predict_probaandself.supports_proba:
+ kwargs['return_raw']=True
+ decoded_prediction,probs,rev_map=self.target_encoder.decode(Yh,**kwargs)
+ all_probs.append(probs)
+ else:
+ decoded_prediction=self.target_encoder.decode(Yh,**kwargs)
+
+ ifnotself.timeseries_settings.is_timeseriesorself.timeseries_settings.nr_predictions==1:
+ decoded_predictions.extend(decoded_prediction)
+ else:
+ decoded_predictions.append(decoded_prediction)
+
+ ydf=pd.DataFrame({'prediction':decoded_predictions})
+
+ ifargs.predict_probaandself.supports_proba:
+ raw_predictions=np.array(all_probs).squeeze()
+ foridx,labelinenumerate(rev_map.values()):
+ ydf[f'__mdb_proba_{label}']=raw_predictions[:,idx]
+
+ returnydf
[docs]defpartial_fit(self,train_data:EncodedDs,dev_data:EncodedDs)->None:
+ """
+ Note: sktime asks for "specification of the time points for which forecasts are requested",
+ and this mixer complies by assuming forecasts will start immediately after the last observed
+ value.
+
+ Because of this, `partial_fit` ensures that both `dev` and `test` splits are used to fit the AutoARIMA model.
+
+ Due to how lightwood implements the `update` procedure, expected inputs are (for a train-dev-test split):
+
+ :param dev_data: original `test` split (used to validate and select model if ensemble is `BestOf`)
+ :param train_data: includes original `train` and `dev` split
+ """# noqa
+ self.fit(dev_data,train_data)
+ self.prepared=True
This method should be called once during the analysis phase, or not called at all.
+It computes any information that the block may either output to the model analysis object,
+or use at inference time when .explain() is called (in this case, make sure all needed
+objects are added to the runtime analyzer so that .explain() can access them).
+
+
Parameters
+
info (Dict[str, object]) – Dictionary where any new information or objects are added. The next analysis block will use
+
+
+
the output of the previous block as a starting point.
+:param kwargs: Dictionary with named variables from either the core analysis or the rest of the prediction
+pipeline.
This method should be called once during the analysis phase, or not called at all.
+It computes any information that the block may either output to the model analysis object,
+or use at inference time when .explain() is called (in this case, make sure all needed
+objects are added to the runtime analyzer so that .explain() can access them).
+
+
Parameters
+
info (Dict[str, object]) – Dictionary where any new information or objects are added. The next analysis block will use
+
+
+
the output of the previous block as a starting point.
+:param kwargs: Dictionary with named variables from either the core analysis or the rest of the prediction
+pipeline.
This method should be called once during the explaining phase at inference time, or not called at all.
+Additional explanations can be at an instance level (row-wise) or global.
+For the former, return a data frame with any new insights. For the latter, a dictionary is required.
+
+
Parameters
+
+
row_insights (DataFrame) – dataframe with previously computed row-level explanations.
+
global_insights (Dict[str, object]) – dict() with any explanations that concern all predicted instances or the model itself.
+
+
+
Return type
+
Tuple[DataFrame, Dict[str, object]]
+
+
Returns
+
+
row_insights: modified input dataframe with any new row insights added here.
+
global_insights: dict() with any explanations that concern all predicted instances or the model itself.
This method should be called once during the analysis phase, or not called at all.
+It computes any information that the block may either output to the model analysis object,
+or use at inference time when .explain() is called (in this case, make sure all needed
+objects are added to the runtime analyzer so that .explain() can access them).
+
+
Parameters
+
info (Dict[str, object]) – Dictionary where any new information or objects are added. The next analysis block will use
+
+
+
the output of the previous block as a starting point.
+:param kwargs: Dictionary with named variables from either the core analysis or the rest of the prediction
+pipeline.
This method should be called once during the analysis phase, or not called at all.
+It computes any information that the block may either output to the model analysis object,
+or use at inference time when .explain() is called (in this case, make sure all needed
+objects are added to the runtime analyzer so that .explain() can access them).
+
+
Parameters
+
info (Dict[str, object]) – Dictionary where any new information or objects are added. The next analysis block will use
+
+
+
the output of the previous block as a starting point.
+:param kwargs: Dictionary with named variables from either the core analysis or the rest of the prediction
+pipeline.
This method should be called once during the explaining phase at inference time, or not called at all.
+Additional explanations can be at an instance level (row-wise) or global.
+For the former, return a data frame with any new insights. For the latter, a dictionary is required.
+
+
Parameters
+
+
row_insights (DataFrame) – dataframe with previously computed row-level explanations.
+
global_insights (Dict[str, object]) – dict() with any explanations that concern all predicted instances or the model itself.
+
+
+
Return type
+
Tuple[DataFrame, Dict[str, object]]
+
+
Returns
+
+
row_insights: modified input dataframe with any new row insights added here.
+
global_insights: dict() with any explanations that concern all predicted instances or the model itself.
This procedure runs at the end of every normal .predict() call. Its goal is to generate prediction insights,
+potentially using information generated at the model analysis stage (e.g. confidence estimation).
+
As in analysis(), any user-specified analysis blocks (see class BaseAnalysisBlock) are also called here.
+
+
Returns
+
+
+
+
row_insights: a DataFrame containing predictions and all generated insights at a row-level.
Analyses model on a validation subset to evaluate accuracy, estimate feature importance and generate a
+calibration model to estimating confidence in future predictions.
+
Additionally, any user-specified analysis blocks (see class BaseAnalysisBlock) are also called here.
+
+
Return type
+
Tuple[ModelAnalysis, Dict[str, object]]
+
+
Returns
+
+
+
+
runtime_analyzer: This dictionary object gets populated in a sequential fashion with data generated from
+any .analyze() block call. This dictionary object is stored in the predictor itself, and used when
+calling the .explain() method of all analysis blocks when generating predictions.
+
model_analysis: ModelAnalysis object that contains core analysis metrics, not necessarily needed when predicting.
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/docs/api.html b/docs/api.html
index 3519e4817..911fa0ad0 100644
--- a/docs/api.html
+++ b/docs/api.html
@@ -7,7 +7,7 @@
- API Module — lightwood 1.6.0 documentation
+ API — lightwood 1.6.1 documentation
@@ -41,7 +41,7 @@
-
+
@@ -69,7 +69,7 @@