Skip to content

Commit

Permalink
new approach with shared memory objects
Browse files Browse the repository at this point in the history
  • Loading branch information
kasyanovse committed Oct 27, 2023
1 parent c49a9ff commit 1d13dbb
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 31 deletions.
11 changes: 5 additions & 6 deletions golem/core/optimisers/genetic/operators/mutation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from copy import deepcopy
from multiprocessing.managers import ValueProxy
from random import random
from typing import Callable, Union, Tuple, TYPE_CHECKING, Mapping, Hashable, Optional

Expand Down Expand Up @@ -169,8 +170,8 @@ def __init__(self,
requirements: GraphRequirements,
graph_gen_params: GraphGenerationParams,
mutations_repo: MutationRepo,
operator_agent: OperatorAgent,
agent_experience: ExperienceBuffer,
operator_agent: ValueProxy,
agent_experience: ValueProxy,
):
super().__init__(parameters=parameters,
requirements=requirements,
Expand All @@ -182,16 +183,14 @@ def __init__(self,
def __call__(self, individual: Individual) -> Individual:
new_graph = deepcopy(individual.graph)

mutation_type = self._operator_agent.choose_action(new_graph)
mutation_type = self._operator_agent.value.choose_action(new_graph)
mutation_func = self._get_mutation_func(mutation_type)

new_graph = mutation_func(new_graph, requirements=self.requirements,
graph_gen_params=self.graph_generation_params,
parameters=self.parameters)

parent_operator = ParentOperator(type_='mutation',
operators=mutation_type,
parent_individuals=individual)
parent_operator = ParentOperator(type_='mutation', operators=mutation_type, parent_individuals=individual)
individual = Individual(new_graph, parent_operator,
metadata=self.requirements.static_individual_metadata)
return individual
57 changes: 32 additions & 25 deletions golem/core/optimisers/genetic/operators/reproduction.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from functools import partial
from itertools import cycle, chain
from math import ceil
from multiprocessing.managers import ValueProxy, DictProxy
from typing import Callable, Dict, Union, List, Optional
from multiprocessing import Queue, Manager
import queue
Expand Down Expand Up @@ -82,37 +83,38 @@ def reproduce(self, population: PopulationT, evaluator: EvaluationOperator) -> P
return new_population

def _mutate_over_population(self, population: PopulationT, evaluator: EvaluationOperator) -> PopulationT:
# create common objects for parallel use
with Manager() as manager:
# create new mutation that suitable for parallel evaluation
with (Manager() as manager,
Parallel(n_jobs=self.mutation.requirements.n_jobs, return_as='generator') as parallel):

initial_parameters = deepcopy(self.parameters)
initial_parameters.mutation_prob = 1.0

operator_agent = manager.Value('operator_agent', self.mutation._operator_agent)
agent_experience = manager.Value('agent_experience', self.mutation.agent_experience)

mutation = SpecialSingleMutation(parameters=initial_parameters,
requirements=self.mutation.requirements,
graph_gen_params=self.mutation.graph_generation_params,
mutations_repo=self.mutation._mutations_repo,
operator_agent=operator_agent,
agent_experience=agent_experience)
mutation_fun = partial(self._mutation_n_evaluation, mutation=mutation, evaluator=evaluator)
requirements=self.mutation.requirements,
graph_gen_params=self.mutation.graph_generation_params,
mutations_repo=self.mutation._mutations_repo,
operator_agent=operator_agent,
agent_experience=agent_experience)
pop_graph_descriptive_ids = manager.dict(zip(self._pop_graph_descriptive_ids,
range(len(self._pop_graph_descriptive_ids))))
mutation_fun = partial(self._mutation_n_evaluation,
pop_graph_descriptive_ids=pop_graph_descriptive_ids,
mutation=mutation,
evaluator=evaluator)

max_tries = self.parameters.pop_size * MAX_GRAPH_GEN_ATTEMPTS_AS_POP_SIZE_MULTIPLIER
new_population = []

with Parallel(n_jobs=self.mutation.requirements.n_jobs, return_as='generator') as parallel:
new_ind_generator = parallel(delayed(mutation_fun)(ind)
for ind, _ in zip(cycle(population), range(max_tries)))
for new_ind in new_ind_generator:
if new_ind:
descriptive_id = new_ind.graph.descriptive_id
if descriptive_id not in self._pop_graph_descriptive_ids:
new_population.append(new_ind)
self._pop_graph_descriptive_ids.add(descriptive_id)
if len(new_population) >= self.parameters.pop_size:
break
new_ind_generator = parallel(delayed(mutation_fun)(ind)
for ind, _ in zip(cycle(population), range(max_tries)))
for new_ind in new_ind_generator:
if new_ind:
new_population.append(new_ind)
if len(new_population) >= self.parameters.pop_size:
break
self._pop_graph_descriptive_ids |= set(pop_graph_descriptive_ids)
return new_population

def _check_final_population(self, population: PopulationT) -> None:
Expand All @@ -129,9 +131,14 @@ def _check_final_population(self, population: PopulationT) -> None:
f'have {len(population)},'
f' required {target_pop_size}!\n' + helpful_msg)

def _mutation_n_evaluation(self, individual: Individual, mutation: Mutation, evaluator: EvaluationOperator):
def _mutation_n_evaluation(self, individual: Individual, pop_graph_descriptive_ids: DictProxy,
mutation: Mutation, evaluator: EvaluationOperator):
individual = mutation(individual)
if individual and self.verifier(individual.graph):
individuals = evaluator([individual])
if individuals:
return individuals[0]
descriptive_id = individual.graph.descriptive_id
if descriptive_id not in pop_graph_descriptive_ids:
individuals = evaluator([individual])
if individuals:
pop_graph_descriptive_ids[descriptive_id] = True
return individuals[0]
pop_graph_descriptive_ids[descriptive_id] = False

0 comments on commit 1d13dbb

Please sign in to comment.