From cee080b0ae1e87247650a009962b7829b25460f6 Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Thu, 14 Mar 2024 19:53:47 -0700 Subject: [PATCH 1/8] fix: this refactors execution to properly use the proces pool Prior to this change the process pool was created and destroyed for every configuration, this would cause the cpu/memory to thrash and improperly allocate task to avaialble cpu, resulting in sometimes 25% utilization of available cpu resources The change corrects this, as well as tackles memory problems, by writing temporary results to disk and then reading them back at the end of the simulation. This is non configurable in this commit, and can also result in loading too much memory, as it does not include the ability to progressively or lazy load data into the final dataframe to complete the simulation. This commit is a WIP fixes #351 --- cadCAD/engine/execution.py | 107 ++++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 48 deletions(-) diff --git a/cadCAD/engine/execution.py b/cadCAD/engine/execution.py index 97a5fa87..0dbac770 100644 --- a/cadCAD/engine/execution.py +++ b/cadCAD/engine/execution.py @@ -1,8 +1,11 @@ +import os from typing import Callable, Dict, List, Any, Tuple, Sequence -from pathos.multiprocessing import ProcessPool # type: ignore +from pathos.multiprocessing import ProcessPool # type: ignore from collections import Counter from cadCAD.types import * from cadCAD.utils import flatten +import tempfile +import pickle VarDictType = Dict[str, List[object]] StatesListsType = List[dict[str, object]] @@ -25,15 +28,14 @@ def single_proc_exec( configured_n: Sequence[N_Runs], additional_objs=None ) -> List: - - + if not isinstance(var_dict_list, Sequence): var_dict_list = list([var_dict_list]) raw_params = ( simulation_execs, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, SubsetIDs, SubsetWindows, var_dict_list) - + results: List = [] print(f'Execution Mode: single_threaded') for raw_param in zip(*raw_params): @@ -44,6 +46,46 @@ def single_proc_exec( results.append(flatten(result)) return flatten(results) + +def process_executor(params): + if len_configs_structs > 1: + with ProcessPool(processes=len_configs_structs) as pp: + results = pp.map( + lambda t: t[0](t[1], t[2], t[3], t[4], t[5], + t[6], t[7], t[8], t[9], configured_n), params + ) + else: + t = params[0] + results = t[0](t[1], t[2], t[3], t[4], t[5], t[6], + t[7], t[8], t[9], configured_n) + return results + + +def process_executor(params): + simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params + + result = [simulation_exec( + var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n + )] + temp_file = tempfile.NamedTemporaryFile(delete=False) + with open(temp_file.name, 'wb') as f: # Note 'wb' for binary writing mode + pickle.dump(result, f) + return temp_file.name + + +def file_handler(filenames: List[str]) -> List: + combined_results = [] + for file_name in filenames: + with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode + result = pickle.load(f) + combined_results.append(result) + result = None + f.close() + os.remove(file_name) # Clean up temporary file + del result # Delete the result from memory after processing + return combined_results + + def parallelize_simulations( simulation_execs: List[ExecutorFunction], var_dict_list: List[Parameters], @@ -61,50 +103,19 @@ def parallelize_simulations( ): print(f'Execution Mode: parallelized') - params = list( - zip( - simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, - Ts, SimIDs, Ns, SubsetIDs, SubsetWindows - ) - ) - len_configs_structs = len(configs_structs) + params = [ + (sim_exec, var_dict, states_list, config, env_processes, + T, sim_id, N, subset_id, subset_window, configured_n) + for sim_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window in + zip(simulation_execs, var_dict_list, states_lists, configs_structs, + env_processes_list, Ts, SimIDs, Ns, SubsetIDs, SubsetWindows) + ] - unique_runs = Counter(SimIDs) - sim_count = max(unique_runs.values()) - highest_divisor = int(len_configs_structs / sim_count) + with ProcessPool(maxtasksperchild=1) as pool: + temp_files = pool.map(process_executor, params) - new_configs_structs, new_params = [], [] - for count in range(len(params)): - if count == 0: - new_params.append( - params[count: highest_divisor] - ) - new_configs_structs.append( - configs_structs[count: highest_divisor] - ) - elif count > 0: - new_params.append( - params[count * highest_divisor: (count + 1) * highest_divisor] - ) - new_configs_structs.append( - configs_structs[count * highest_divisor: (count + 1) * highest_divisor] - ) - - def process_executor(params): - if len_configs_structs > 1: - with ProcessPool(processes=len_configs_structs) as pp: - results = pp.map( - lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n), params - ) - else: - t = params[0] - results = t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n) - return results - - results = flatten(list(map(lambda params: process_executor(params), new_params))) - - return results + return flatten(file_handler(temp_files)) def local_simulations( @@ -121,15 +132,15 @@ def local_simulations( SubsetWindows: List[SubsetWindow], configured_n: List[N_Runs], additional_objs=None - ): +): config_amt = len(configs_structs) - if config_amt == 1: # and configured_n != 1 + if config_amt == 1: # and configured_n != 1 return single_proc_exec( simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs ) - elif config_amt > 1: # and configured_n != 1 + elif config_amt > 1: # and configured_n != 1 return parallelize_simulations( simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs From 4f2a5393a14c34eb8051fe9a02ffca5e5b8d0417 Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Thu, 14 Mar 2024 20:09:20 -0700 Subject: [PATCH 2/8] fix: use dill instead of pickle to support lambda --- cadCAD/engine/execution.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cadCAD/engine/execution.py b/cadCAD/engine/execution.py index 0dbac770..2d8993ef 100644 --- a/cadCAD/engine/execution.py +++ b/cadCAD/engine/execution.py @@ -6,6 +6,7 @@ from cadCAD.utils import flatten import tempfile import pickle +import dill VarDictType = Dict[str, List[object]] StatesListsType = List[dict[str, object]] @@ -69,7 +70,7 @@ def process_executor(params): )] temp_file = tempfile.NamedTemporaryFile(delete=False) with open(temp_file.name, 'wb') as f: # Note 'wb' for binary writing mode - pickle.dump(result, f) + dill.dump(result, f) return temp_file.name @@ -77,7 +78,7 @@ def file_handler(filenames: List[str]) -> List: combined_results = [] for file_name in filenames: with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode - result = pickle.load(f) + result = dill.load(f) combined_results.append(result) result = None f.close() From 30b07290a9a500c77b461993aad19b9be7805efd Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Mon, 18 Mar 2024 10:01:15 -0700 Subject: [PATCH 3/8] fix: rm dead code --- cadCAD/engine/execution.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/cadCAD/engine/execution.py b/cadCAD/engine/execution.py index 2d8993ef..a88ed53c 100644 --- a/cadCAD/engine/execution.py +++ b/cadCAD/engine/execution.py @@ -48,20 +48,6 @@ def single_proc_exec( return flatten(results) -def process_executor(params): - if len_configs_structs > 1: - with ProcessPool(processes=len_configs_structs) as pp: - results = pp.map( - lambda t: t[0](t[1], t[2], t[3], t[4], t[5], - t[6], t[7], t[8], t[9], configured_n), params - ) - else: - t = params[0] - results = t[0](t[1], t[2], t[3], t[4], t[5], t[6], - t[7], t[8], t[9], configured_n) - return results - - def process_executor(params): simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params From 0efa2e2d8849bd459a6b27d034c50fc60a9aa53f Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Wed, 20 Mar 2024 19:52:05 -0700 Subject: [PATCH 4/8] feat: add temporary usage sketch --- documentation/examples/headless_tools.py | 72 ++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 documentation/examples/headless_tools.py diff --git a/documentation/examples/headless_tools.py b/documentation/examples/headless_tools.py new file mode 100644 index 00000000..9464dae6 --- /dev/null +++ b/documentation/examples/headless_tools.py @@ -0,0 +1,72 @@ +# isort:skip_file +# fmt: off +import sys +sys.path.append(".") +from cadCAD.tools import easy_run +import plotly.express as px +import numpy as np +import sys +import seaborn as sns +from tqdm.auto import tqdm +# fmt: on + + +TIMESTEPS = 3650 +SAMPLES = 2 +GARBAGE_SIZE = 100000 +# STATE_SIZE = 1 +STATE_SIZE = 100000 + +initial_conditions = { + 'big_state': "a" * STATE_SIZE +} + +params = { + 'big_param': ["p" * GARBAGE_SIZE] +} + + +def p_big_policy(params, step, sL, s): + y = 'big_state' + x = s['big_state'] + "a" + return {'big_state': x} + + +def s_big_state(params, step, sL, s, _input): + y = 'big_state' + x = s['big_state'] + "b" + return (y, x) + + +partial_state_update_blocks = [ + { + 'label': 'Memory Consumer', + 'policies': { + 'big_policy': p_big_policy, + }, + 'variables': { + 'big_state': s_big_state + } + }, + { + 'label': 'Do Nothing', + 'policies': { + + }, + 'variables': { + + } + } +] + + +df = easy_run(initial_conditions, + params, + partial_state_update_blocks, + TIMESTEPS, + SAMPLES, + deepcopy_off=True, + lazy_eval=True, + assign_params=True, + drop_substeps=False) +print(df) From 996dd5c8dcd825eeb815efc66ae8bfb6b23033e3 Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Wed, 20 Mar 2024 20:00:10 -0700 Subject: [PATCH 5/8] feat: add support for lazy flattening of data structures --- cadCAD/utils/__init__.py | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/cadCAD/utils/__init__.py b/cadCAD/utils/__init__.py index 7b944a35..87e42aab 100644 --- a/cadCAD/utils/__init__.py +++ b/cadCAD/utils/__init__.py @@ -2,14 +2,14 @@ from collections import defaultdict from itertools import product import warnings -from typing import Union +from typing import Any, Generator, Union from cadCAD.types import * from typing import List, Dict, Union import functools import operator -from pandas import DataFrame # type: ignore +from pandas import DataFrame # type: ignore class SilentDF(DataFrame): @@ -33,7 +33,8 @@ def arrange_cols(df: DataFrame, reverse=False) -> DataFrame: """ session_metrics = ['session_id', 'user_id', 'simulation_id', 'run_id'] sys_metrics = ['run', 'timestep', 'substep'] - result_cols = list(set(df.columns) - set(session_metrics) - set(sys_metrics)) + result_cols = list(set(df.columns) - + set(session_metrics) - set(sys_metrics)) result_cols.sort(reverse=reverse) return df[session_metrics + sys_metrics + result_cols] @@ -75,6 +76,7 @@ def tupalize(k: object, vs: Union[list, dict]): l.append((k, vs)) return l + def flattenDict(l: dict) -> list: """ >>> flattenDict({1: [1, 2, 3], 4: 5}) @@ -92,6 +94,32 @@ def flatten(l: Union[list, dict]): return flattenDict(l) +# Incremental version of flatten with type hints +def lazy_tupalize(k: Any, vs: Union[Iterable[Any], Any]) -> Generator[tuple, None, None]: + if isinstance(vs, Iterable) and not isinstance(vs, str): + for v in vs: + yield (k, v) + else: + yield (k, vs) + + +def lazy_flattenDict(d: Dict[Any, Any]) -> Generator[Dict[Any, Any], None, None]: + flat_list = (lazy_tupalize(k, vs) for k, vs in d.items()) + for items in product(*flat_list): + yield dict(items) + + +def lazy_flatten(l: Union[Iterable[Any], Dict[Any, Any]]) -> Generator[Any, None, None]: + if isinstance(l, Iterable) and not isinstance(l, (str, dict)): + for item in l: + if isinstance(item, Iterable) and not isinstance(item, (str, dict)): + yield from lazy_flatten(item) + else: + yield item + elif isinstance(l, dict): + yield from lazy_flattenDict(l) + + def flatMap(f, collection): return flatten(list(map(f, collection))) From 024e91ccdf727f4f193e70f6292507b651c9b189 Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Wed, 20 Mar 2024 20:02:17 -0700 Subject: [PATCH 6/8] feat: add support for lazy evaluation during simulation processing This feature refactors the parallel processing to use lazy evaluation When simulation data grows, keeping the data in memory causes memory bloat, which causes too Ram to be consumed. The refactor uses lazy_flattens to keep the memory Python consumes low, the change was able to reduce consumption by a factor of 10 in many cases. The change also writes temporary files to disk before lazily rearranging the simulation results to fit the cadCAD expected format for a regular pandas dataframe. --- cadCAD/engine/__init__.py | 54 +++++++++++++++++++++++------- cadCAD/engine/execution.py | 44 ++++++++++++++++++++---- cadCAD/tools/execution/easy_run.py | 6 +++- 3 files changed, 85 insertions(+), 19 deletions(-) diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 4dc33409..e18a0b9f 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -1,8 +1,10 @@ +import itertools +from memory_profiler import profile from time import time -from typing import Callable, Dict, List, Any, Tuple, Union, Sequence, Mapping +from typing import Callable, Dict, Generator, List, Any, Tuple, Union, Sequence, Mapping from tqdm.auto import tqdm -from cadCAD.utils import flatten +from cadCAD.utils import flatten, lazy_flatten from cadCAD.utils.execution import print_exec_info from cadCAD.configuration import Configuration, Processor from cadCAD.configuration.utils import TensorFieldReport, configs_as_objs, configs_as_dicts @@ -80,6 +82,7 @@ def __init__(self, self.configs = configs self.empty_return = empty_return + @profile def execute(self) -> Tuple[object, object, Dict[str, object]]: if self.empty_return is True: return [], [], [] @@ -142,21 +145,44 @@ def get_final_dist_results(simulations: List[StateHistory], psu, ep) for psu, ep in list(zip(psus, eps))] return simulations, tensor_fields, sessions + def get_final_results_lazy(simulations: Generator, + psus: List[StateUpdateBlocks], + eps, + sessions: List[SessionDict], + remote_threshold: int): + is_generator: bool = isinstance(simulations, Generator) + if is_generator == False: + raise ValueError( + 'Invalid simulation results (Executor output is not a Generator required for lazy execution)') + + tensor_fields = [] + # NOTE here we change the result type to iterable + tensor_fields = itertools.chain.from_iterable( + map(create_tensor_field, zip(psus, eps))) + + flat_simulations = map( + lazy_flatten, map(lazy_flatten, simulations)) + + # NOTE here we change the result type, which is now an iterable + iterable_flat_simulations = itertools.chain.from_iterable( + flat_simulations) + + return iterable_flat_simulations, tensor_fields, sessions + def get_final_results(simulations: List[StateHistory], psus: List[StateUpdateBlocks], eps, sessions: List[SessionDict], remote_threshold: int): - + # if list of lists of lists of dicts: do flatten # if list of dicts: do not flatetn # else raise error - init: bool = isinstance(simulations, Sequence) failed_1 = False failed_2 = False - + try: init: bool = isinstance(simulations, Sequence) dont_flatten = init & isinstance(simulations[0], Mapping) @@ -174,8 +200,8 @@ def get_final_results(simulations: List[StateHistory], do_flatten = False if failed_1 and failed_2: - raise ValueError('Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])') - + raise ValueError( + 'Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])') flat_timesteps, tensor_fields = [], [] for sim_result, psu, ep in tqdm(list(zip(simulations, psus, eps)), @@ -184,7 +210,7 @@ def get_final_results(simulations: List[StateHistory], if do_flatten: flat_timesteps.append(flatten(sim_result)) tensor_fields.append(create_tensor_field(psu, ep)) - + if do_flatten: flat_simulations = flatten(flat_timesteps) else: @@ -209,15 +235,19 @@ def get_final_results(simulations: List[StateHistory], else: raise ValueError("Invalid execution mode specified") - print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs, ExpIDs, SubsetIDs, SubsetWindows, original_N, self.additional_objs ) - final_result = get_final_results( - simulations_results, partial_state_updates, eps, sessions, remote_threshold) + if (self.additional_objs is not None and self.additional_objs['lazy_eval']): + final_result = get_final_results_lazy( + simulations_results, partial_state_updates, eps, sessions, remote_threshold) + else: + final_result = get_final_results( + simulations_results, partial_state_updates, eps, sessions, remote_threshold) + elif self.exec_context == ExecutionMode.distributed: print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( @@ -228,6 +258,6 @@ def get_final_results(simulations: List[StateHistory], simulations_results, partial_state_updates, eps, sessions) t2 = time() - print(f"Total execution time: {t2 - t1 :.2f}s") + print(f"Total execution time: {t2 - t1:.2f}s") return final_result diff --git a/cadCAD/engine/execution.py b/cadCAD/engine/execution.py index a88ed53c..29073f4b 100644 --- a/cadCAD/engine/execution.py +++ b/cadCAD/engine/execution.py @@ -1,11 +1,14 @@ import os -from typing import Callable, Dict, List, Any, Tuple, Sequence +from typing import Callable, Dict, Generator, List, Any, Tuple, Sequence from pathos.multiprocessing import ProcessPool # type: ignore from collections import Counter from cadCAD.types import * -from cadCAD.utils import flatten +from cadCAD.utils import flatten, lazy_flatten import tempfile import pickle +import sys +from pympler import asizeof +from memory_profiler import profile import dill VarDictType = Dict[str, List[object]] @@ -51,6 +54,15 @@ def single_proc_exec( def process_executor(params): simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params + result = [simulation_exec( + var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n + )] + return result + + +def process_executor_disk(params): + simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params + result = [simulation_exec( var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n )] @@ -60,7 +72,20 @@ def process_executor(params): return temp_file.name -def file_handler(filenames: List[str]) -> List: +@profile +def file_handler_inc(filenames: List[str]) -> Generator[List, None, None]: + # combined_results = [] + for file_name in filenames: + with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode + result = dill.load(f) + yield result # Yield the loaded result for immediate processing + + f.close() + os.remove(file_name) # Clean up temporary file + + +@profile +def file_handler(filenames: List[str]) -> Generator[List, None, None]: combined_results = [] for file_name in filenames: with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode @@ -69,10 +94,10 @@ def file_handler(filenames: List[str]) -> List: result = None f.close() os.remove(file_name) # Clean up temporary file - del result # Delete the result from memory after processing return combined_results +@profile def parallelize_simulations( simulation_execs: List[ExecutorFunction], var_dict_list: List[Parameters], @@ -90,6 +115,7 @@ def parallelize_simulations( ): print(f'Execution Mode: parallelized') + lazy_eval = additional_objs['lazy_eval'] params = [ (sim_exec, var_dict, states_list, config, env_processes, @@ -99,10 +125,16 @@ def parallelize_simulations( env_processes_list, Ts, SimIDs, Ns, SubsetIDs, SubsetWindows) ] + if (lazy_eval): + with ProcessPool(maxtasksperchild=1) as pool: + temp_files = pool.map(process_executor_disk, params) + generator = file_handler_inc(temp_files) + return lazy_flatten(generator) + with ProcessPool(maxtasksperchild=1) as pool: - temp_files = pool.map(process_executor, params) + results = pool.map(process_executor, params) - return flatten(file_handler(temp_files)) + return flatten(results) def local_simulations( diff --git a/cadCAD/tools/execution/easy_run.py b/cadCAD/tools/execution/easy_run.py index 5ea737e1..c1fda147 100644 --- a/cadCAD/tools/execution/easy_run.py +++ b/cadCAD/tools/execution/easy_run.py @@ -42,6 +42,7 @@ def easy_run( drop_substeps=True, exec_mode='local', deepcopy_off=False, + lazy_eval=False ) -> pd.DataFrame: """ Run cadCAD simulations without headaches. @@ -66,7 +67,10 @@ def easy_run( _exec_mode = ExecutionMode().local_mode elif exec_mode == 'single': _exec_mode = ExecutionMode().single_mode - exec_context = ExecutionContext(_exec_mode, additional_objs={'deepcopy_off': deepcopy_off}) + exec_context = ExecutionContext(_exec_mode, additional_objs={ + 'deepcopy_off': deepcopy_off, + 'lazy_eval': lazy_eval + }) executor = Executor(exec_context=exec_context, configs=configs) # Execute the cadCAD experiment From d9dce51fa9f7b01384dad31afb14f49a06dc1d8f Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Wed, 20 Mar 2024 20:12:45 -0700 Subject: [PATCH 7/8] chore: add temporary memory_profiler --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 763c639c..a215fc19 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,10 +7,11 @@ wheel>=0.38.1 pandas>=1.1.5 funcy>=1.16 dill>=0.3.4 +memory-profiler==0.61.0 pathos>=0.2.8 numpy>=1.22.0 pytz>=2021.1 setuptools>=69.0.2 graphviz>=0.20.1 tqdm>=4.65.0 -pytest>=7.4.3 \ No newline at end of file +pytest>=7.4.3 From 9f42e1f3bb1bfa2a2370df73f72d62462887a592 Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Wed, 20 Mar 2024 20:33:12 -0700 Subject: [PATCH 8/8] fix: check for additional obj existence --- cadCAD/engine/__init__.py | 2 +- cadCAD/engine/execution.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index e18a0b9f..19fa573f 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -241,7 +241,7 @@ def get_final_results(simulations: List[StateHistory], ExpIDs, SubsetIDs, SubsetWindows, original_N, self.additional_objs ) - if (self.additional_objs is not None and self.additional_objs['lazy_eval']): + if (self.additional_objs is not None and self.additional_objs.get('lazy_eval', False)): final_result = get_final_results_lazy( simulations_results, partial_state_updates, eps, sessions, remote_threshold) else: diff --git a/cadCAD/engine/execution.py b/cadCAD/engine/execution.py index 29073f4b..abd396a4 100644 --- a/cadCAD/engine/execution.py +++ b/cadCAD/engine/execution.py @@ -7,7 +7,6 @@ import tempfile import pickle import sys -from pympler import asizeof from memory_profiler import profile import dill @@ -115,7 +114,9 @@ def parallelize_simulations( ): print(f'Execution Mode: parallelized') - lazy_eval = additional_objs['lazy_eval'] + lazy_eval = False + if (additional_objs): + lazy_eval = additional_objs.get('lazy_eval', False) params = [ (sim_exec, var_dict, states_list, config, env_processes,