From 1d13dbb95e6ddd758f663a7dd08732f001fc6033 Mon Sep 17 00:00:00 2001 From: Sergey Kasyanov Date: Fri, 27 Oct 2023 11:48:06 +0300 Subject: [PATCH] new approach with shared memory objects --- .../optimisers/genetic/operators/mutation.py | 11 ++-- .../genetic/operators/reproduction.py | 57 +++++++++++-------- 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/golem/core/optimisers/genetic/operators/mutation.py b/golem/core/optimisers/genetic/operators/mutation.py index b1aaa30c..bc4e1420 100644 --- a/golem/core/optimisers/genetic/operators/mutation.py +++ b/golem/core/optimisers/genetic/operators/mutation.py @@ -1,4 +1,5 @@ from copy import deepcopy +from multiprocessing.managers import ValueProxy from random import random from typing import Callable, Union, Tuple, TYPE_CHECKING, Mapping, Hashable, Optional @@ -169,8 +170,8 @@ def __init__(self, requirements: GraphRequirements, graph_gen_params: GraphGenerationParams, mutations_repo: MutationRepo, - operator_agent: OperatorAgent, - agent_experience: ExperienceBuffer, + operator_agent: ValueProxy, + agent_experience: ValueProxy, ): super().__init__(parameters=parameters, requirements=requirements, @@ -182,16 +183,14 @@ def __init__(self, def __call__(self, individual: Individual) -> Individual: new_graph = deepcopy(individual.graph) - mutation_type = self._operator_agent.choose_action(new_graph) + mutation_type = self._operator_agent.value.choose_action(new_graph) mutation_func = self._get_mutation_func(mutation_type) new_graph = mutation_func(new_graph, requirements=self.requirements, graph_gen_params=self.graph_generation_params, parameters=self.parameters) - parent_operator = ParentOperator(type_='mutation', - operators=mutation_type, - parent_individuals=individual) + parent_operator = ParentOperator(type_='mutation', operators=mutation_type, parent_individuals=individual) individual = Individual(new_graph, parent_operator, metadata=self.requirements.static_individual_metadata) return individual diff --git a/golem/core/optimisers/genetic/operators/reproduction.py b/golem/core/optimisers/genetic/operators/reproduction.py index 869bf055..fa98bed6 100644 --- a/golem/core/optimisers/genetic/operators/reproduction.py +++ b/golem/core/optimisers/genetic/operators/reproduction.py @@ -4,6 +4,7 @@ from functools import partial from itertools import cycle, chain from math import ceil +from multiprocessing.managers import ValueProxy, DictProxy from typing import Callable, Dict, Union, List, Optional from multiprocessing import Queue, Manager import queue @@ -82,37 +83,38 @@ def reproduce(self, population: PopulationT, evaluator: EvaluationOperator) -> P return new_population def _mutate_over_population(self, population: PopulationT, evaluator: EvaluationOperator) -> PopulationT: - # create common objects for parallel use - with Manager() as manager: - # create new mutation that suitable for parallel evaluation + with (Manager() as manager, + Parallel(n_jobs=self.mutation.requirements.n_jobs, return_as='generator') as parallel): + initial_parameters = deepcopy(self.parameters) initial_parameters.mutation_prob = 1.0 operator_agent = manager.Value('operator_agent', self.mutation._operator_agent) agent_experience = manager.Value('agent_experience', self.mutation.agent_experience) - mutation = SpecialSingleMutation(parameters=initial_parameters, - requirements=self.mutation.requirements, - graph_gen_params=self.mutation.graph_generation_params, - mutations_repo=self.mutation._mutations_repo, - operator_agent=operator_agent, - agent_experience=agent_experience) - mutation_fun = partial(self._mutation_n_evaluation, mutation=mutation, evaluator=evaluator) + requirements=self.mutation.requirements, + graph_gen_params=self.mutation.graph_generation_params, + mutations_repo=self.mutation._mutations_repo, + operator_agent=operator_agent, + agent_experience=agent_experience) + pop_graph_descriptive_ids = manager.dict(zip(self._pop_graph_descriptive_ids, + range(len(self._pop_graph_descriptive_ids)))) + mutation_fun = partial(self._mutation_n_evaluation, + pop_graph_descriptive_ids=pop_graph_descriptive_ids, + mutation=mutation, + evaluator=evaluator) max_tries = self.parameters.pop_size * MAX_GRAPH_GEN_ATTEMPTS_AS_POP_SIZE_MULTIPLIER new_population = [] - with Parallel(n_jobs=self.mutation.requirements.n_jobs, return_as='generator') as parallel: - new_ind_generator = parallel(delayed(mutation_fun)(ind) - for ind, _ in zip(cycle(population), range(max_tries))) - for new_ind in new_ind_generator: - if new_ind: - descriptive_id = new_ind.graph.descriptive_id - if descriptive_id not in self._pop_graph_descriptive_ids: - new_population.append(new_ind) - self._pop_graph_descriptive_ids.add(descriptive_id) - if len(new_population) >= self.parameters.pop_size: - break + new_ind_generator = parallel(delayed(mutation_fun)(ind) + for ind, _ in zip(cycle(population), range(max_tries))) + for new_ind in new_ind_generator: + if new_ind: + new_population.append(new_ind) + if len(new_population) >= self.parameters.pop_size: + break + self._pop_graph_descriptive_ids |= set(pop_graph_descriptive_ids) return new_population def _check_final_population(self, population: PopulationT) -> None: @@ -129,9 +131,14 @@ def _check_final_population(self, population: PopulationT) -> None: f'have {len(population)},' f' required {target_pop_size}!\n' + helpful_msg) - def _mutation_n_evaluation(self, individual: Individual, mutation: Mutation, evaluator: EvaluationOperator): + def _mutation_n_evaluation(self, individual: Individual, pop_graph_descriptive_ids: DictProxy, + mutation: Mutation, evaluator: EvaluationOperator): individual = mutation(individual) if individual and self.verifier(individual.graph): - individuals = evaluator([individual]) - if individuals: - return individuals[0] + descriptive_id = individual.graph.descriptive_id + if descriptive_id not in pop_graph_descriptive_ids: + individuals = evaluator([individual]) + if individuals: + pop_graph_descriptive_ids[descriptive_id] = True + return individuals[0] + pop_graph_descriptive_ids[descriptive_id] = False