Skip to content

Commit

Permalink
Merge branch 'main' into doc_build
Browse files Browse the repository at this point in the history
  • Loading branch information
jay-m-dev committed Jan 23, 2025
2 parents 83de78f + 1fb3d12 commit 2afe0c9
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 9 deletions.
10 changes: 9 additions & 1 deletion tpot/evolvers/steady_state_evolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ def optimize(self):
print("Cancelled future (likely memory related)")
scores = [np.nan for _ in range(len(self.objective_names))]
eval_error = "INVALID"
client.run(gc.collect)
else: #if the future is done and did not throw an error, get the scores
try:
scores = completed_future.result()
Expand All @@ -466,13 +467,14 @@ def optimize(self):
print("cancelld ", completed_future.cancelled())
scores = [np.nan for _ in range(len(self.objective_names))]
eval_error = "INVALID"
completed_future.release() #release the future
else: #if future is not done

if self.max_eval_time_mins is not None:
#check if the future has been running for too long, cancel the future
if time.time() - submitted_futures[completed_future]["time"] > self.max_eval_time_mins*1.25*60:
completed_future.cancel()

completed_future.release() #release the future
if self.verbose >= 4:
print(f'WARNING AN INDIVIDUAL TIMED OUT (Fallback): \n {submitted_futures[completed_future]} \n')

Expand Down Expand Up @@ -506,6 +508,8 @@ def optimize(self):
self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="INVALID")
self.population.remove_invalid_from_population(column_names="Eval Error", invalid_value="TIMEOUT")

#I am not entirely sure if this is necessary. I believe that calling release on the futures should be enough to free up memory. If memory issues persist, this may be a good place to start.
#client.run(gc.collect) #run garbage collection to free up memory

###############################
# Step 2: Early Stopping
Expand Down Expand Up @@ -717,6 +721,10 @@ def optimize(self):
#done, cleanup futures
for future in submitted_futures.keys():
future.cancel()
future.release() #release the future

#I am not entirely sure if this is necessary. I believe that calling release on the futures should be enough to free up memory. If memory issues persist, this may be a good place to start.
#client.run(gc.collect) #run garbage collection to free up memory

#checkpoint
if self.population_file is not None:
Expand Down
2 changes: 1 addition & 1 deletion tpot/tests/test_estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def tpot_estimator_with_pipeline(tpot_estimator,sample_dataset):
tpot_estimator.fit(sample_dataset[0], sample_dataset[1])
return tpot_estimator

@pytest.mark.skip(reason="Errors out, skipping to build docs")
# @pytest.mark.skip(reason="Errors out, skipping to build docs")
def test_tpot_estimator_predict(tpot_estimator_with_pipeline,sample_dataset):
#X_test = [[1, 2, 3], [4, 5, 6]]
X_test = sample_dataset[0]
Expand Down
4 changes: 2 additions & 2 deletions tpot/tpot_estimator/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,15 +581,15 @@ def fit(self, X, y):
if self.categorical_features is not None: #if categorical features are specified, use those
pipeline_steps.append(("impute_categorical", tpot.builtin_modules.ColumnSimpleImputer(self.categorical_features, strategy='most_frequent')))
pipeline_steps.append(("impute_numeric", tpot.builtin_modules.ColumnSimpleImputer("numeric", strategy='mean')))
pipeline_steps.append(("ColumnOneHotEncoder", tpot.builtin_modules.ColumnOneHotEncoder(self.categorical_features, strategy='most_frequent')))
pipeline_steps.append(("ColumnOneHotEncoder", tpot.builtin_modules.ColumnOneHotEncoder(self.categorical_features, min_frequency=0.0001))) # retain wrong param fix

else:
if isinstance(X, pd.DataFrame):
categorical_columns = X.select_dtypes(include=['object']).columns
if len(categorical_columns) > 0:
pipeline_steps.append(("impute_categorical", tpot.builtin_modules.ColumnSimpleImputer("categorical", strategy='most_frequent')))
pipeline_steps.append(("impute_numeric", tpot.builtin_modules.ColumnSimpleImputer("numeric", strategy='mean')))
pipeline_steps.append(("ColumnOneHotEncoder", tpot.builtin_modules.ColumnOneHotEncoder("categorical", strategy='most_frequent')))
pipeline_steps.append(("ColumnOneHotEncoder", tpot.builtin_modules.ColumnOneHotEncoder("categorical", min_frequency=0.0001))) # retain wrong param fix
else:
pipeline_steps.append(("impute_numeric", tpot.builtin_modules.ColumnSimpleImputer("all", strategy='mean')))
else:
Expand Down
4 changes: 2 additions & 2 deletions tpot/tpot_estimator/steady_state_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,15 +624,15 @@ def fit(self, X, y):
if self.categorical_features is not None: #if categorical features are specified, use those
pipeline_steps.append(("impute_categorical", tpot.builtin_modules.ColumnSimpleImputer(self.categorical_features, strategy='most_frequent')))
pipeline_steps.append(("impute_numeric", tpot.builtin_modules.ColumnSimpleImputer("numeric", strategy='mean')))
pipeline_steps.append(("impute_categorical", tpot.builtin_modules.ColumnOneHotEncoder(self.categorical_features, strategy='most_frequent')))
pipeline_steps.append(("ColumnOneHotEncoder", tpot.builtin_modules.ColumnOneHotEncoder(self.categorical_features, strategy='most_frequent')))

else:
if isinstance(X, pd.DataFrame):
categorical_columns = X.select_dtypes(include=['object']).columns
if len(categorical_columns) > 0:
pipeline_steps.append(("impute_categorical", tpot.builtin_modules.ColumnSimpleImputer("categorical", strategy='most_frequent')))
pipeline_steps.append(("impute_numeric", tpot.builtin_modules.ColumnSimpleImputer("numeric", strategy='mean')))
pipeline_steps.append(("impute_categorical", tpot.builtin_modules.ColumnOneHotEncoder("categorical", strategy='most_frequent')))
pipeline_steps.append(("ColumnOneHotEncoder", tpot.builtin_modules.ColumnOneHotEncoder("categorical", strategy='most_frequent')))
else:
pipeline_steps.append(("impute_numeric", tpot.builtin_modules.ColumnSimpleImputer("all", strategy='mean')))
else:
Expand Down
15 changes: 12 additions & 3 deletions tpot/utils/eval_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from dask.distributed import progress
import distributed
import func_timeout
import gc

def process_scores(scores, n):
'''
Expand Down Expand Up @@ -163,6 +164,7 @@ def parallel_eval_objective_list(individual_list,
print("Cancelled future (likely memory related)")
scores = [np.nan for _ in range(n_expected_columns)]
eval_error = "INVALID"
client.run(gc.collect)
else: #if the future is done and did not throw an error, get the scores
try:
scores = completed_future.result()
Expand All @@ -186,20 +188,23 @@ def parallel_eval_objective_list(individual_list,
print("cancelld ", completed_future.cancelled())
scores = [np.nan for _ in range(n_expected_columns)]
eval_error = "INVALID"

completed_future.release() #release the future
else: #if future is not done

# check if the future has been running for too long, cancel the future
# we multiply max_eval_time_mins by 1.25 since the objective function in the future should be able to cancel itself. This is a backup in case it doesn't.
if max_eval_time_mins is not None and time.time() - submitted_futures[completed_future]["time"] > max_eval_time_mins*1.25*60:
completed_future.cancel()

completed_future.release()
if verbose >= 4:
print(f'WARNING AN INDIVIDUAL TIMED OUT (Fallback): \n {submitted_futures[completed_future]} \n')

scores = [np.nan for _ in range(n_expected_columns)]
eval_error = "TIMEOUT"
elif global_timeout_triggered:
completed_future.cancel()
completed_future.release()

if verbose >= 4:
print(f'WARNING AN INDIVIDUAL TIMED OUT (max_time_mins): \n {submitted_futures[completed_future]} \n')
Expand All @@ -222,6 +227,10 @@ def parallel_eval_objective_list(individual_list,
#update submitted futures
submitted_futures.pop(completed_future)


#I am not entirely sure if this is necessary. I believe that calling release on the futures should be enough to free up memory. If memory issues persist, this may be a good place to start.
#client.run(gc.collect) #run garbage collection to free up memory

#break if timeout
if global_timeout_triggered:
while len(individual_stack) > 0:
Expand All @@ -243,10 +252,10 @@ def parallel_eval_objective_list(individual_list,

submitted_inds.add(individual.unique_id())

#I am not entirely sure if this is necessary. I believe that calling release on the futures should be enough to free up memory. If memory issues persist, this may be a good place to start.
#client.run(gc.collect) #run garbage collection to free up memory

#collect remaining futures


final_scores = [scores_dict[individual]["scores"] for individual in individual_list]
final_start_times = [scores_dict[individual]["start_time"] for individual in individual_list]
final_end_times = [scores_dict[individual]["end_time"] for individual in individual_list]
Expand Down

0 comments on commit 2afe0c9

Please sign in to comment.