diff --git a/tpot2/builtin_modules/__init__.py b/tpot2/builtin_modules/__init__.py index 4304576b..52310724 100644 --- a/tpot2/builtin_modules/__init__.py +++ b/tpot2/builtin_modules/__init__.py @@ -5,4 +5,5 @@ from .arithmetictransformer import AddTransformer, mul_neg_1_Transformer, MulTransformer, SafeReciprocalTransformer, EQTransformer, NETransformer, GETransformer, GTTransformer, LETransformer, LTTransformer, MinTransformer, MaxTransformer, ZeroTransformer, OneTransformer, NTransformer from .passthrough import Passthrough from .imputer import ColumnSimpleImputer -from .estimatortransformer import EstimatorTransformer \ No newline at end of file +from .estimatortransformer import EstimatorTransformer +from .passkbinsdiscretizer import PassKBinsDiscretizer \ No newline at end of file diff --git a/tpot2/builtin_modules/column_one_hot_encoder.py b/tpot2/builtin_modules/column_one_hot_encoder.py index 34c3320e..d3472b5c 100644 --- a/tpot2/builtin_modules/column_one_hot_encoder.py +++ b/tpot2/builtin_modules/column_one_hot_encoder.py @@ -37,7 +37,7 @@ def _X_selected(X, selected): class ColumnOneHotEncoder(BaseEstimator, TransformerMixin): - def __init__(self, columns='auto', drop=None, handle_unknown='error', sparse_output=False, min_frequency=None,max_categories=None): + def __init__(self, columns='auto', drop=None, handle_unknown='infrequent_if_exist', sparse_output=False, min_frequency=None,max_categories=None): ''' Parameters diff --git a/tpot2/builtin_modules/passkbinsdiscretizer.py b/tpot2/builtin_modules/passkbinsdiscretizer.py new file mode 100644 index 00000000..6ca5a9b5 --- /dev/null +++ b/tpot2/builtin_modules/passkbinsdiscretizer.py @@ -0,0 +1,43 @@ +import pandas as pd +from sklearn.base import BaseEstimator, TransformerMixin +from sklearn.compose import ColumnTransformer +from sklearn.preprocessing import KBinsDiscretizer +import numpy as np + +def select_features(X, min_unique=10,): + + if isinstance(X, pd.DataFrame): + return [col for col in X.columns if len(X[col].unique()) > min_unique] + else: + return [i for i in range(X.shape[1]) if len(np.unique(X[:, i])) > min_unique] + +class PassKBinsDiscretizer(BaseEstimator, TransformerMixin): + """ + Same as sklearn.preprocessing.KBinsDiscretizer, but passes through columns that are not discretized due to having fewer than n_bins unique values instead of ignoring them. + """ + def __init__(self, n_bins=5, encode='onehot-dense', strategy='quantile', subsample='warn', random_state=None): + self.n_bins = n_bins + self.encode = encode + self.strategy = strategy + self.subsample = subsample + self.random_state = random_state + + def fit(self, X, y=None): + # Identify columns with more than n unique values + # Create a ColumnTransformer to select and discretize the chosen columns + self.selected_columns_ = select_features(X, min_unique=10) + if isinstance(X, pd.DataFrame): + self.not_selected_columns_ = [col for col in X.columns if col not in self.selected_columns_] + else: + self.not_selected_columns_ = [i for i in range(X.shape[1]) if i not in self.selected_columns_] + + enc = KBinsDiscretizer(n_bins=self.n_bins, encode=self.encode, strategy=self.strategy, subsample=self.subsample, random_state=self.random_state) + self.transformer = ColumnTransformer([ + ('discretizer', enc, self.selected_columns_), + ('passthrough', 'passthrough', self.not_selected_columns_) + ]) + self.transformer.fit(X) + return self + + def transform(self, X): + return self.transformer.transform(X) \ No newline at end of file diff --git a/tpot2/config/classifiers.py b/tpot2/config/classifiers.py index cb954a42..49b714ac 100644 --- a/tpot2/config/classifiers.py +++ b/tpot2/config/classifiers.py @@ -386,10 +386,18 @@ def GradientBoostingClassifier_hyperparameter_parser(params): final_params['n_iter_no_change'] = None final_params['validation_fraction'] = None elif params['early_stop'] == 'valid': - final_params['n_iter_no_change'] = params['n_iter_no_change'] - final_params['validation_fraction'] = params['validation_fraction'] + #this is required because in crossover, its possible that n_iter_no_change is not in the params + if 'n_iter_no_change' not in params: + final_params['n_iter_no_change'] = 10 + else: + final_params['n_iter_no_change'] = params['n_iter_no_change'] + if 'validation_fraction' not in params: + final_params['validation_fraction'] = 0.1 + else: + final_params['validation_fraction'] = params['validation_fraction'] elif params['early_stop'] == 'train': - final_params['n_iter_no_change'] = params['n_iter_no_change'] + if 'n_iter_no_change' not in params: + final_params['n_iter_no_change'] = 10 final_params['validation_fraction'] = None @@ -445,16 +453,32 @@ def HistGradientBoostingClassifier_hyperparameter_parser(params): final_params['random_state'] = params['random_state'] + if params['early_stop'] == 'off': # final_params['n_iter_no_change'] = 0 final_params['validation_fraction'] = None final_params['early_stopping'] = False elif params['early_stop'] == 'valid': - final_params['n_iter_no_change'] = params['n_iter_no_change'] - final_params['validation_fraction'] = params['validation_fraction'] + + #this is required because in crossover, its possible that n_iter_no_change is not in the params + if 'n_iter_no_change' not in params: + final_params['n_iter_no_change'] = 10 + else: + final_params['n_iter_no_change'] = params['n_iter_no_change'] + if 'validation_fraction' not in params: + final_params['validation_fraction'] = 0.1 + else: + final_params['validation_fraction'] = params['validation_fraction'] + final_params['early_stopping'] = True elif params['early_stop'] == 'train': - final_params['n_iter_no_change'] = params['n_iter_no_change'] + + if 'n_iter_no_change' not in params: + final_params['n_iter_no_change'] = 10 + else: + final_params['n_iter_no_change'] = params['n_iter_no_change'] + + final_params['validation_fraction'] = None final_params['early_stopping'] = True diff --git a/tpot2/config/get_configspace.py b/tpot2/config/get_configspace.py index b984a36e..aa015321 100644 --- a/tpot2/config/get_configspace.py +++ b/tpot2/config/get_configspace.py @@ -26,7 +26,7 @@ from tpot2.builtin_modules import genetic_encoders, feature_encoding_frequency_selector from tpot2.builtin_modules import AddTransformer, mul_neg_1_Transformer, MulTransformer, SafeReciprocalTransformer, EQTransformer, NETransformer, GETransformer, GTTransformer, LETransformer, LTTransformer, MinTransformer, MaxTransformer, ZeroTransformer, OneTransformer, NTransformer from tpot2.builtin_modules.genetic_encoders import DominantEncoder, RecessiveEncoder, HeterosisEncoder, UnderDominanceEncoder, OverDominanceEncoder -from tpot2.builtin_modules import ZeroCount, ColumnOneHotEncoder +from tpot2.builtin_modules import ZeroCount, ColumnOneHotEncoder, PassKBinsDiscretizer from tpot2.builtin_modules import Passthrough from sklearn.linear_model import SGDClassifier, LogisticRegression, SGDRegressor, Ridge, Lasso, ElasticNet, Lars, LassoLars, LassoLarsCV, RidgeCV, ElasticNetCV, PassiveAggressiveClassifier, ARDRegression from sklearn.ensemble import BaggingClassifier, RandomForestClassifier, ExtraTreesClassifier, GradientBoostingClassifier, ExtraTreesRegressor, ExtraTreesClassifier, AdaBoostRegressor, AdaBoostClassifier, GradientBoostingRegressor,RandomForestRegressor, BaggingRegressor, ExtraTreesRegressor, HistGradientBoostingClassifier, HistGradientBoostingRegressor @@ -55,6 +55,7 @@ DominantEncoder, RecessiveEncoder, HeterosisEncoder, UnderDominanceEncoder, OverDominanceEncoder, GaussianProcessClassifier, BaggingClassifier,LGBMRegressor, Passthrough, + PassKBinsDiscretizer, ] @@ -117,7 +118,7 @@ "regressors" : ["LGBMRegressor", 'AdaBoostRegressor', "ARDRegression", 'DecisionTreeRegressor', 'ExtraTreesRegressor', 'HistGradientBoostingRegressor', 'KNeighborsRegressor', 'LinearSVR', "MLPRegressor", 'RandomForestRegressor', 'SGDRegressor', 'SVR', 'XGBRegressor'], - "transformers": ["Binarizer", "PCA", "ZeroCount", "ColumnOneHotEncoder", "FastICA", "FeatureAgglomeration", "Nystroem", "RBFSampler", "QuantileTransformer", "PowerTransformer"], + "transformers": ["PassKBinsDiscretizer", "Binarizer", "PCA", "ZeroCount", "ColumnOneHotEncoder", "FastICA", "FeatureAgglomeration", "Nystroem", "RBFSampler", "QuantileTransformer", "PowerTransformer"], "scalers": ["MinMaxScaler", "RobustScaler", "StandardScaler", "MaxAbsScaler", "Normalizer", ], "all_transformers" : ["transformers", "scalers"], @@ -291,6 +292,8 @@ def get_configspace(name, n_classes=3, n_samples=1000, n_features=100, random_st return transformers.PolynomialFeatures_configspace case "StandardScaler": return {} + case "PassKBinsDiscretizer": + return transformers.get_passkbinsdiscretizer_configspace(random_state=random_state) #selectors.py case "SelectFwe": diff --git a/tpot2/config/regressors.py b/tpot2/config/regressors.py index c90f7595..5186d135 100644 --- a/tpot2/config/regressors.py +++ b/tpot2/config/regressors.py @@ -436,10 +436,20 @@ def GradientBoostingRegressor_hyperparameter_parser(params): final_params['n_iter_no_change'] = None final_params['validation_fraction'] = None elif params['early_stop'] == 'valid': - final_params['n_iter_no_change'] = params['n_iter_no_change'] - final_params['validation_fraction'] = params['validation_fraction'] + #this is required because in crossover, its possible that n_iter_no_change is not in the params + if 'n_iter_no_change' not in params: + final_params['n_iter_no_change'] = 10 + else: + final_params['n_iter_no_change'] = params['n_iter_no_change'] + if 'validation_fraction' not in params: + final_params['validation_fraction'] = 0.1 + else: + final_params['validation_fraction'] = params['validation_fraction'] elif params['early_stop'] == 'train': - final_params['n_iter_no_change'] = params['n_iter_no_change'] + if 'n_iter_no_change' not in params: + final_params['n_iter_no_change'] = 10 + else: + final_params['n_iter_no_change'] = params['n_iter_no_change'] final_params['validation_fraction'] = None @@ -498,11 +508,20 @@ def HistGradientBoostingRegressor_hyperparameter_parser(params): # final_params['validation_fraction'] = None final_params['early_stopping'] = False elif params['early_stop'] == 'valid': - final_params['n_iter_no_change'] = params['n_iter_no_change'] - final_params['validation_fraction'] = params['validation_fraction'] + if 'n_iter_no_change' not in params: + final_params['n_iter_no_change'] = 10 + else: + final_params['n_iter_no_change'] = params['n_iter_no_change'] + if 'validation_fraction' not in params: + final_params['validation_fraction'] = 0.1 + else: + final_params['validation_fraction'] = params['validation_fraction'] final_params['early_stopping'] = True elif params['early_stop'] == 'train': - final_params['n_iter_no_change'] = params['n_iter_no_change'] + if 'n_iter_no_change' not in params: + final_params['n_iter_no_change'] = 10 + else: + final_params['n_iter_no_change'] = params['n_iter_no_change'] final_params['validation_fraction'] = None final_params['early_stopping'] = True diff --git a/tpot2/config/transformers.py b/tpot2/config/transformers.py index c34c03eb..6d393460 100644 --- a/tpot2/config/transformers.py +++ b/tpot2/config/transformers.py @@ -124,6 +124,22 @@ def get_QuantileTransformer_configspace(random_state=None): ) +def get_passkbinsdiscretizer_configspace(random_state=None): + space = { + 'n_bins': Integer('n_bins', bounds=(3, 100)), + 'encode': 'onehot-dense', + 'strategy': Categorical('strategy', ['uniform', 'quantile', 'kmeans']), + # 'subsample': Categorical('subsample', ['auto', 'warn', 'ignore']), + } + + if random_state is not None: #This is required because configspace doesn't allow None as a value + space['random_state'] = random_state + + return ConfigurationSpace( + space = space + + ) + ### ROBUST SCALER @@ -133,4 +149,7 @@ def get_QuantileTransformer_configspace(random_state=None): }) def robust_scaler_hyperparameter_parser(params): - return {"quantile_range": (params["q_min"], params["q_max"])} \ No newline at end of file + return {"quantile_range": (params["q_min"], params["q_max"])} + + + diff --git a/tpot2/evolvers/base_evolver.py b/tpot2/evolvers/base_evolver.py index 0147531a..756a4cde 100644 --- a/tpot2/evolvers/base_evolver.py +++ b/tpot2/evolvers/base_evolver.py @@ -498,7 +498,7 @@ def optimize(self, generations=None): self._client.close() self._cluster.close() - tpot2.utils.get_pareto_frontier(self.population.evaluated_individuals, column_names=self.objective_names, weights=self.objective_function_weights, invalid_values=["TIMEOUT","INVALID"]) + tpot2.utils.get_pareto_frontier(self.population.evaluated_individuals, column_names=self.objective_names, weights=self.objective_function_weights) def step(self,): if self.population_size_list is not None: @@ -624,7 +624,7 @@ def evaluate_population_full(self, budget=None): parallel_timeout = 10 #scores = tpot2.utils.eval_utils.parallel_eval_objective_list(individuals_to_evaluate, self.objective_functions, self.n_jobs, verbose=self.verbose, timeout=self.max_eval_time_seconds, budget=budget, n_expected_columns=len(self.objective_names), client=self._client, parallel_timeout=parallel_timeout, **self.objective_kwargs) - scores, start_times, end_times, eval_errors = tpot2.utils.eval_utils.parallel_eval_objective_list2(individuals_to_evaluate, self.objective_functions, verbose=self.verbose, max_eval_time_seconds=self.max_eval_time_seconds, budget=budget, n_expected_columns=len(self.objective_names), client=self._client, **self.objective_kwargs) + scores, start_times, end_times, eval_errors = tpot2.utils.eval_utils.parallel_eval_objective_list2(individuals_to_evaluate, self.objective_functions, verbose=self.verbose, max_eval_time_seconds=self.max_eval_time_seconds, budget=budget, n_expected_columns=len(self.objective_names), client=self._client, scheduled_timeout_time=self.scheduled_timeout_time, **self.objective_kwargs) self.population.update_column(individuals_to_evaluate, column_names=self.objective_names, data=scores) if budget is not None: @@ -705,6 +705,7 @@ def evaluate_population_selection_early_stop(self,survival_counts, thresholds=No generation = self.generation, n_expected_columns=len(self.objective_names), client=self._client, + scheduled_timeout_time=self.scheduled_timeout_time, **self.objective_kwargs, ) diff --git a/tpot2/evolvers/steady_state_evolver.py b/tpot2/evolvers/steady_state_evolver.py index 1aa457c8..eecc2b29 100644 --- a/tpot2/evolvers/steady_state_evolver.py +++ b/tpot2/evolvers/steady_state_evolver.py @@ -342,7 +342,7 @@ def optimize(self): ############################### if self.verbose >= 3: sign = np.sign(self.objective_function_weights) - valid_df = self.population.evaluated_individuals[~self.population.evaluated_individuals[self.objective_names].isin(["TIMEOUT","INVALID"]).any(axis=1)][self.objective_names]*sign + valid_df = self.population.evaluated_individuals[~self.population.evaluated_individuals[["Eval Error"]].isin(["TIMEOUT","INVALID"]).any(axis=1)][self.objective_names]*sign cur_best_scores = valid_df.max(axis=0)*sign cur_best_scores = cur_best_scores.to_numpy() for i, obj in enumerate(self.objective_names): @@ -353,7 +353,7 @@ def optimize(self): #get sign of objective_function_weights sign = np.sign(self.objective_function_weights) #get best score for each objective - valid_df = self.population.evaluated_individuals[~self.population.evaluated_individuals[self.objective_names].isin(["TIMEOUT","INVALID"]).any(axis=1)][self.objective_names]*sign + valid_df = self.population.evaluated_individuals[~self.population.evaluated_individuals[["Eval Error"]].isin(["TIMEOUT","INVALID"]).any(axis=1)][self.objective_names]*sign cur_best_scores = valid_df.max(axis=0) cur_best_scores = cur_best_scores.to_numpy() #cur_best_scores = self.population.get_column(self.population.population, column_names=self.objective_names).max(axis=0)*sign #TODO this assumes the current population is the best @@ -499,7 +499,7 @@ def optimize(self): elif len(submitted_futures) < self.max_queue_size: initial_population = self.population.evaluated_individuals.iloc[:self.initial_population_size*3] - invalid_initial_population = initial_population[initial_population[self.objective_names].isin(["TIMEOUT","INVALID"]).any(axis=1)] + invalid_initial_population = initial_population[initial_population[["Eval Error"]].isin(["TIMEOUT","INVALID"]).any(axis=1)] if len(invalid_initial_population) >= self.initial_population_size*3: #if all individuals in the 3*initial population are invalid raise Exception("No individuals could be evaluated in the initial population. This may indicate a bug in the configuration, included models, or objective functions. Set verbose>=4 to see the errors that caused individuals to fail.") @@ -540,8 +540,8 @@ def optimize(self): # Step 7: Cleanup ############################### - self.population.remove_invalid_from_population(column_names=self.objective_names, invalid_value="INVALID") - self.population.remove_invalid_from_population(column_names=self.objective_names, invalid_value="TIMEOUT") + self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="INVALID") + self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="TIMEOUT") #done, cleanup futures @@ -556,7 +556,7 @@ def optimize(self): self._client.close() self._cluster.close() - tpot2.utils.get_pareto_frontier(self.population.evaluated_individuals, column_names=self.objective_names, weights=self.objective_function_weights, invalid_values=["TIMEOUT","INVALID"]) + tpot2.utils.get_pareto_frontier(self.population.evaluated_individuals, column_names=self.objective_names, weights=self.objective_function_weights) diff --git a/tpot2/search_spaces/nodes/genetic_feature_selection.py b/tpot2/search_spaces/nodes/genetic_feature_selection.py index 13352d4b..0fe16586 100644 --- a/tpot2/search_spaces/nodes/genetic_feature_selection.py +++ b/tpot2/search_spaces/nodes/genetic_feature_selection.py @@ -50,6 +50,11 @@ def __init__( self, else: self.mask = mask + # check if there are no features selected, if so select one + if sum(self.mask) == 0: + index = rng.choice(len(self.mask)) + self.mask[index] = True + self.mutation_list = [self._mutate_add, self._mutate_remove] self.crossover_list = [self._crossover_swap] diff --git a/tpot2/search_spaces/pipelines/dynamic_linear.py b/tpot2/search_spaces/pipelines/dynamic_linear.py index 0267f595..6da90d81 100644 --- a/tpot2/search_spaces/pipelines/dynamic_linear.py +++ b/tpot2/search_spaces/pipelines/dynamic_linear.py @@ -14,13 +14,13 @@ class DynamicLinearPipelineIndividual(SklearnIndividual): # takes in a single search space. # will produce a pipeline of variable length. Each step in the pipeline will be pulled from the search space provided. - def __init__(self, search_space : SklearnIndividualGenerator, min_length: int, max_length: int ) -> None: + def __init__(self, search_space : SklearnIndividualGenerator, max_length: int , rng=None) -> None: super().__init__() - rng = np.random.default_rng() + rng = np.random.default_rng(rng) self.search_space = search_space - self.min_length = min_length + self.min_length = 1 self.max_length = max_length self.pipeline = self._generate_pipeline(rng) @@ -29,6 +29,8 @@ def _generate_pipeline(self, rng=None): rng = np.random.default_rng() pipeline = [] length = rng.integers(self.min_length, self.max_length) + length = min(length, 3) + for _ in range(length): pipeline.append(self.search_space.generate(rng)) return pipeline @@ -113,16 +115,20 @@ def _crossover_swap_step(self, other, rng): def _crossover_inner_step(self, other, rng): rng = np.random.default_rng() + pipeline1_indexes= list(range(len(self.pipeline))) + pipeline2_indexes= list(range(len(other.pipeline))) + + rng.shuffle(pipeline1_indexes) + rng.shuffle(pipeline2_indexes) + crossover_success = False - for idx in range(len(self.pipeline)): - if rng.random() < 0.5: - if self.pipeline[idx].crossover(other.pipeline[idx], rng): + for idx1, idx2 in zip(pipeline1_indexes, pipeline2_indexes): + if self.pipeline[idx1].crossover(other.pipeline[idx2], rng): crossover_success = True - return crossover_success - def export_pipeline(self, **graph_pipeline_args): - return [step.export_pipeline(**graph_pipeline_args) for step in self.pipeline] + def export_pipeline(self): + return sklearn.pipeline.make_pipeline(*[step.export_pipeline() for step in self.pipeline]) def unique_id(self): l = [step.unique_id() for step in self.pipeline] @@ -131,9 +137,8 @@ def unique_id(self): class DynamicLinearPipeline(SklearnIndividualGenerator): - def __init__(self, search_space : SklearnIndividualGenerator, min_length: int, max_length: int ) -> None: + def __init__(self, search_space : SklearnIndividualGenerator, max_length: int ) -> None: self.search_space = search_space - self.min_length = min_length self.max_length = max_length """ @@ -143,4 +148,4 @@ def __init__(self, search_space : SklearnIndividualGenerator, min_length: int, m """ def generate(self, rng=None): - return DynamicLinearPipelineIndividual(self.search_space, self.min_length, self.max_length, rng=rng) \ No newline at end of file + return DynamicLinearPipelineIndividual(self.search_space, self.max_length, rng=rng) \ No newline at end of file diff --git a/tpot2/search_spaces/pipelines/dynamicunion.py b/tpot2/search_spaces/pipelines/dynamicunion.py index edb51ffc..48fa9669 100644 --- a/tpot2/search_spaces/pipelines/dynamicunion.py +++ b/tpot2/search_spaces/pipelines/dynamicunion.py @@ -153,7 +153,7 @@ def _crossover_inner_step(self, other, rng): self.union_dict = {step.unique_id(): step for step in self_values} other.union_dict = {step.unique_id(): step for step in other_values} - + return changed def export_pipeline(self): values = list(self.union_dict.values()) diff --git a/tpot2/tpot_estimator/estimator.py b/tpot2/tpot_estimator/estimator.py index e673d3a7..1f4ccd5b 100644 --- a/tpot2/tpot_estimator/estimator.py +++ b/tpot2/tpot_estimator/estimator.py @@ -700,7 +700,7 @@ def ind_generator(rng): - tpot2.utils.get_pareto_frontier(self.evaluated_individuals, column_names=self.objective_names, weights=self.objective_function_weights, invalid_values=["TIMEOUT","INVALID"]) + tpot2.utils.get_pareto_frontier(self.evaluated_individuals, column_names=self.objective_names, weights=self.objective_function_weights) if validation_strategy == 'reshuffled': best_pareto_front_idx = list(self.pareto_front.index) @@ -742,15 +742,21 @@ def ind_generator(rng): )] objective_kwargs = {"X": X_future, "y": y_future} - val_scores = tpot2.utils.eval_utils.parallel_eval_objective_list( - best_pareto_front, - val_objective_function_list, n_jobs=self.n_jobs, verbose=self.verbose, timeout=self.max_eval_time_seconds,n_expected_columns=len(self.objective_names), client=_client, **objective_kwargs) + # val_scores = tpot2.utils.eval_utils.parallel_eval_objective_list( + # best_pareto_front, + # val_objective_function_list, n_jobs=self.n_jobs, verbose=self.verbose, timeout=self.max_eval_time_seconds,n_expected_columns=len(self.objective_names), client=_client, **objective_kwargs) + val_scores, start_times, end_times, eval_errors = tpot2.utils.eval_utils.parallel_eval_objective_list2(best_pareto_front, val_objective_function_list, verbose=self.verbose, max_eval_time_seconds=self.max_eval_time_seconds, n_expected_columns=len(self.objective_names), client=_client, **objective_kwargs) + + val_objective_names = ['validation_'+name for name in self.objective_names] self.objective_names_for_selection = val_objective_names self.evaluated_individuals.loc[best_pareto_front_idx,val_objective_names] = val_scores + self.evaluated_individuals.loc[best_pareto_front_idx,'validation_start_times'] = start_times + self.evaluated_individuals.loc[best_pareto_front_idx,'validation_end_times'] = end_times + self.evaluated_individuals.loc[best_pareto_front_idx,'validation_eval_errors'] = eval_errors - self.evaluated_individuals["Validation_Pareto_Front"] = tpot2.utils.get_pareto_frontier(self.evaluated_individuals, column_names=val_objective_names, weights=self.objective_function_weights, invalid_values=["TIMEOUT","INVALID"]) + self.evaluated_individuals["Validation_Pareto_Front"] = tpot2.utils.get_pareto_frontier(self.evaluated_individuals, column_names=val_objective_names, weights=self.objective_function_weights) elif validation_strategy == 'split': @@ -795,14 +801,19 @@ def ind_generator(rng): **kwargs, )] - val_scores = tpot2.utils.eval_utils.parallel_eval_objective_list( - best_pareto_front, - val_objective_function_list, n_jobs=self.n_jobs, verbose=self.verbose, timeout=self.max_eval_time_seconds,n_expected_columns=len(self.objective_names),client=_client, **objective_kwargs) + val_scores, start_times, end_times, eval_errors = tpot2.utils.eval_utils.parallel_eval_objective_list2(best_pareto_front, val_objective_function_list, verbose=self.verbose, max_eval_time_seconds=self.max_eval_time_seconds, n_expected_columns=len(self.objective_names), client=_client, **objective_kwargs) + + val_objective_names = ['validation_'+name for name in self.objective_names] self.objective_names_for_selection = val_objective_names self.evaluated_individuals.loc[best_pareto_front_idx,val_objective_names] = val_scores - self.evaluated_individuals["Validation_Pareto_Front"] = tpot2.utils.get_pareto_frontier(self.evaluated_individuals, column_names=val_objective_names, weights=self.objective_function_weights, invalid_values=["TIMEOUT","INVALID"]) + self.evaluated_individuals.loc[best_pareto_front_idx,'validation_start_times'] = start_times + self.evaluated_individuals.loc[best_pareto_front_idx,'validation_end_times'] = end_times + self.evaluated_individuals.loc[best_pareto_front_idx,'validation_eval_errors'] = eval_errors + + self.evaluated_individuals["Validation_Pareto_Front"] = tpot2.utils.get_pareto_frontier(self.evaluated_individuals, column_names=val_objective_names, weights=self.objective_function_weights) + else: self.objective_names_for_selection = self.objective_names @@ -835,8 +846,15 @@ def ind_generator(rng): if self.client is None: #no client was passed in #close cluster and client - _client.close() - cluster.close() + # _client.close() + # cluster.close() + try: + _client.shutdown() + cluster.close() + #catch exception + except Exception as e: + print("Error shutting down client and cluster") + Warning(e) return self diff --git a/tpot2/tpot_estimator/steady_state_estimator.py b/tpot2/tpot_estimator/steady_state_estimator.py index c73584b6..4de12c47 100644 --- a/tpot2/tpot_estimator/steady_state_estimator.py +++ b/tpot2/tpot_estimator/steady_state_estimator.py @@ -769,7 +769,7 @@ def ind_generator(rng): else: print("WARNING NO OPTUNA TRIALS COMPLETED") - tpot2.utils.get_pareto_frontier(self.evaluated_individuals, column_names=self.objective_names, weights=self.objective_function_weights, invalid_values=["TIMEOUT","INVALID"]) + tpot2.utils.get_pareto_frontier(self.evaluated_individuals, column_names=self.objective_names, weights=self.objective_function_weights) if validation_strategy == 'reshuffled': best_pareto_front_idx = list(self.pareto_front.index) @@ -811,16 +811,16 @@ def ind_generator(rng): )] objective_kwargs = {"X": X_future, "y": y_future} - val_scores = tpot2.utils.eval_utils.parallel_eval_objective_list( - best_pareto_front, - val_objective_function_list, n_jobs=self.n_jobs, verbose=self.verbose, timeout=self.max_eval_time_seconds,n_expected_columns=len(self.objective_names), client=_client, **objective_kwargs) + val_scores, start_times, end_times, eval_errors = tpot2.utils.eval_utils.parallel_eval_objective_list2(best_pareto_front, val_objective_function_list, verbose=self.verbose, max_eval_time_seconds=self.max_eval_time_seconds, n_expected_columns=len(self.objective_names), client=_client, **objective_kwargs) val_objective_names = ['validation_'+name for name in self.objective_names] self.objective_names_for_selection = val_objective_names self.evaluated_individuals.loc[best_pareto_front_idx,val_objective_names] = val_scores + self.evaluated_individuals.loc[best_pareto_front_idx,'validation_start_times'] = start_times + self.evaluated_individuals.loc[best_pareto_front_idx,'validation_end_times'] = end_times + self.evaluated_individuals.loc[best_pareto_front_idx,'validation_eval_errors'] = eval_errors - self.evaluated_individuals["Validation_Pareto_Front"] = tpot2.utils.get_pareto_front(self.evaluated_individuals, val_objective_names, self.objective_function_weights, invalid_values=["TIMEOUT","INVALID"]) - + self.evaluated_individuals["Validation_Pareto_Front"] = tpot2.utils.get_pareto_frontier(self.evaluated_individuals, column_names=val_objective_names, weights=self.objective_function_weights) elif validation_strategy == 'split': @@ -863,14 +863,18 @@ def ind_generator(rng): **kwargs, )] - val_scores = tpot2.utils.eval_utils.parallel_eval_objective_list( - best_pareto_front, - val_objective_function_list, n_jobs=self.n_jobs, verbose=self.verbose, timeout=self.max_eval_time_seconds,n_expected_columns=len(self.objective_names),client=_client, **objective_kwargs) + val_scores, start_times, end_times, eval_errors = tpot2.utils.eval_utils.parallel_eval_objective_list2(best_pareto_front, val_objective_function_list, verbose=self.verbose, max_eval_time_seconds=self.max_eval_time_seconds, n_expected_columns=len(self.objective_names), client=_client, **objective_kwargs) + + val_objective_names = ['validation_'+name for name in self.objective_names] self.objective_names_for_selection = val_objective_names self.evaluated_individuals.loc[best_pareto_front_idx,val_objective_names] = val_scores - self.evaluated_individuals["Validation_Pareto_Front"] = tpot2.utils.get_pareto_front(self.evaluated_individuals, val_objective_names, self.objective_function_weights, invalid_values=["TIMEOUT","INVALID"]) + self.evaluated_individuals.loc[best_pareto_front_idx,'validation_start_times'] = start_times + self.evaluated_individuals.loc[best_pareto_front_idx,'validation_end_times'] = end_times + self.evaluated_individuals.loc[best_pareto_front_idx,'validation_eval_errors'] = eval_errors + + self.evaluated_individuals["Validation_Pareto_Front"] = tpot2.utils.get_pareto_frontier(self.evaluated_individuals, column_names=val_objective_names, weights=self.objective_function_weights) else: self.objective_names_for_selection = self.objective_names @@ -901,8 +905,15 @@ def ind_generator(rng): if self.client is None: #no client was passed in #close cluster and client - _client.close() - cluster.close() + # _client.close() + # cluster.close() + try: + _client.shutdown() + cluster.close() + #catch exception + except Exception as e: + print("Error shutting down client and cluster") + Warning(e) return self diff --git a/tpot2/utils/eval_utils.py b/tpot2/utils/eval_utils.py index ccc847f3..f37cb823 100644 --- a/tpot2/utils/eval_utils.py +++ b/tpot2/utils/eval_utils.py @@ -145,6 +145,7 @@ def parallel_eval_objective_list2(individual_list, max_eval_time_seconds=None, n_expected_columns=None, client=None, + scheduled_timeout_time=None, **objective_kwargs): individual_stack = list(individual_list) @@ -152,6 +153,7 @@ def parallel_eval_objective_list2(individual_list, submitted_futures = {} scores_dict = {} submitted_inds = set() + global_timeout_triggered = False while len(submitted_futures) < max_queue_size and len(individual_stack)>0: individual = individual_stack.pop() future = client.submit(eval_objective_list, individual, objective_list, verbose=verbose, timeout=max_eval_time_seconds,**objective_kwargs) @@ -172,6 +174,8 @@ def parallel_eval_objective_list2(individual_list, except dask.distributed.CancelledError: pass + global_timeout_triggered = scheduled_timeout_time is not None and time.time() > scheduled_timeout_time + #Loop through all futures, collect completed and timeout futures. for completed_future in list(submitted_futures.keys()): #get scores and update @@ -211,6 +215,8 @@ def parallel_eval_objective_list2(individual_list, eval_error = "INVALID" else: #if future is not done + + #check if the future has been running for too long, cancel the future if time.time() - submitted_futures[completed_future]["time"] > max_eval_time_seconds*1.25: completed_future.cancel() @@ -220,6 +226,15 @@ def parallel_eval_objective_list2(individual_list, scores = [np.nan for _ in range(n_expected_columns)] eval_error = "TIMEOUT" + elif global_timeout_triggered: + completed_future.cancel() + + if verbose >= 4: + print(f'WARNING AN INDIVIDUAL TIMED OUT (max_time_seconds): \n {submitted_futures[completed_future]} \n') + + scores = [np.nan for _ in range(n_expected_columns)] + eval_error = None + else: continue #otherwise, continue to next future @@ -235,6 +250,17 @@ def parallel_eval_objective_list2(individual_list, #update submitted futures submitted_futures.pop(completed_future) + #break if timeout + if global_timeout_triggered: + while len(individual_stack) > 0: + individual = individual_stack.pop() + scores_dict[individual] = {"scores": [np.nan for _ in range(n_expected_columns)], + "start_time": time.time(), + "end_time": time.time(), + "eval_error": None, + } + break + #submit new futures while len(submitted_futures) < max_queue_size and len(individual_stack)>0: individual = individual_stack.pop() @@ -246,6 +272,9 @@ def parallel_eval_objective_list2(individual_list, submitted_inds.add(individual.unique_id()) + #collect remaining futures + + final_scores = [scores_dict[individual]["scores"] for individual in individual_list] final_start_times = [scores_dict[individual]["start_time"] for individual in individual_list] final_end_times = [scores_dict[individual]["end_time"] for individual in individual_list] diff --git a/tpot2/utils/utils.py b/tpot2/utils/utils.py index fe165993..21ec0662 100644 --- a/tpot2/utils/utils.py +++ b/tpot2/utils/utils.py @@ -85,8 +85,9 @@ def is_pareto_efficient(scores, return_mask = True): else: return is_efficient -def get_pareto_frontier(df, column_names, weights, invalid_values=["TIMEOUT","INVALID"]): - dftmp = df[~df[column_names].isin(invalid_values).any(axis=1)] +def get_pareto_frontier(df, column_names, weights): + # dftmp = df[~df[column_names].isin(invalid_values).any(axis=1)] + dftmp = df[df[column_names].notnull().all(axis=1)] if "Budget" in dftmp.columns: #get rows with the max budget @@ -101,8 +102,8 @@ def get_pareto_frontier(df, column_names, weights, invalid_values=["TIMEOUT","IN -def get_pareto_front(df, column_names, weights, invalid_values=["TIMEOUT","INVALID"]): - dftmp = df[~df[column_names].isin(invalid_values).any(axis=1)] +def get_pareto_front(df, column_names, weights): + dftmp = df[df[column_names].notnull().all(axis=1)] if "Budget" in dftmp.columns: #get rows with the max budget