Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kasyanovse committed Nov 16, 2023
1 parent 1172caa commit e62fa65
Show file tree
Hide file tree
Showing 5 changed files with 303 additions and 74 deletions.
21 changes: 12 additions & 9 deletions golem/core/optimisers/genetic/operators/crossover.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,24 @@ class SinglePredefinedGraphCrossover(Crossover):
in one attempt without any checks
"""
def __call__(self,
graphs: List[OptGraph],
crossover_type: Optional[CrossoverTypesEnum] = None) -> Tuple[OptGraph, CrossoverTypesEnum]:
if len(graphs) < 2:
raise ValueError(f"Crossover needs 2 graphs, get {len(graphs)}")
elif len(graphs) > 2:
graphs = sample(graphs, 2)
graphs = list(map(deepcopy, graphs))
individuals: List[Individual],
crossover_type: Optional[CrossoverTypesEnum] = None) -> Tuple[List[Individual], CrossoverTypesEnum]:
if len(individuals) < 2:
raise ValueError(f"Crossover needs 2 individuals, get {len(individuals)}")
elif len(individuals) > 2:
individuals = sample(individuals, 2)
graphs = [deepcopy(ind.graph) for ind in individuals]

crossover_type = crossover_type or choice(self.parameters.crossover_types)
if crossover_type is CrossoverTypesEnum.none:
return graphs, crossover_type
return individuals, crossover_type
crossover_func = self._get_crossover_function(crossover_type)

new_graphs = crossover_func(*graphs, max_depth=self.requirements.max_depth)
return new_graphs, crossover_type
new_individuals = self._get_individuals(new_graphs=new_graphs,
parent_individuals=individuals,
crossover_type=crossover_type)
return new_individuals, crossover_type


@register_native
Expand Down
13 changes: 9 additions & 4 deletions golem/core/optimisers/genetic/operators/mutation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from copy import deepcopy
from random import random
from typing import Callable, Union, Tuple, TYPE_CHECKING, Mapping, Hashable, Optional
from typing import Callable, Union, Tuple, TYPE_CHECKING, Mapping, Hashable, Optional, List

import numpy as np

Expand Down Expand Up @@ -173,13 +173,18 @@ class SinglePredefinedGraphMutation(Mutation):
""" Mutation that tries to create new graph (not individual) from the only graph in one attempt
without any checks
"""
def __call__(self, graph: Graph, mutation_type: Optional[MutationType] = None) -> Tuple[Graph, MutationIdType]:
new_graph = deepcopy(graph)
def __call__(self, individuals: List[Individual], mutation_type: Optional[MutationType] = None) -> Tuple[Graph, MutationIdType]:
if len(individuals) != 1:
raise ValueError('individuals should be len 1')

individual = individuals[0]
new_graph = deepcopy(individual.graph)
mutation_type = mutation_type or self._operator_agent.choose_action(new_graph)
if mutation_type is MutationTypesEnum.none:
return None, None
mutation_func = self._get_mutation_func(mutation_type)
new_graph = mutation_func(new_graph, requirements=self.requirements,
graph_gen_params=self.graph_generation_params,
parameters=self.parameters)
return new_graph, mutation_type
new_individual = self._get_individual(new_graph=new_graph, mutation_type=mutation_type, parent=individual)
return new_individual, mutation_type
92 changes: 63 additions & 29 deletions golem/core/optimisers/genetic/operators/node.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from dataclasses import dataclass, replace, field
from enum import Enum
from itertools import chain
from math import ceil
from typing import Optional, List, Union, Any, Dict

from golem.core.optimisers.genetic.operators.operator import Operator
from golem.core.optimisers.graph import OptGraph

from golem.core.optimisers.opt_history_objects.individual import Individual

GeneticNodeAllowedType = Union['GeneticNode', str, None]

Expand All @@ -16,49 +17,54 @@ class TaskStagesEnum(Enum):

@dataclass
class GeneticOperatorTask:
""" Contain graphs and information what to do with it and what was made """
graphs: List[OptGraph]
""" Contain individuals and information what to do with it and what was made """
individuals: List[Individual]
operator_type: Optional[Any] = None

stage: TaskStagesEnum = TaskStagesEnum.INIT
stage_node: GeneticNodeAllowedType = None
next_stage_node: GeneticNodeAllowedType = None
prev_stage_node: GeneticNodeAllowedType = None

# parent data
parent_task: Optional['GeneticOperatorTask'] = None

fail_message: str = ''
exception: Optional[Exception] = None
left_tries: int = 1

def __repr__(self):
s = (f"{self.__class__.__name__}('{self.stage.name}', "
f"next: '{self.stage_node}', "
f"graphs: {len(self.graphs) if isinstance(self.graphs, list) else type(self.graphs)}, "
f"operator_type: '{None if not self.operator_type else 'Operator'}', "
f"tries: {self.left_tries})")
f"next: '{self.next_stage_node}', prev: '{self.prev_stage_node}', "
f"individuals: {len(self.individuals) if isinstance(self.individuals, list) else type(self.individuals)}, "
f"operator_type: '{self.operator_type}', "
f"tries: {self.left_tries}, "
f"parent: {int(self.parent_task is not None)})")
return s

def __copy__(self):
# TODO test
return self.copy()

def __deepcopy__(self, memodict: Dict = dict()):
# TODO test
raise NotImplementedError('Deepcopy is not allowed for task')

def copy(self, **parameters):
# TODO test
new_task = replace(self)
for parameter, value in parameters.items():
setattr(new_task, parameter, value)
return new_task

def create_failed_task(self, exception: Exception, **parameters):
parameters = {**parameters, 'stage': TaskStagesEnum.FAIL,
'fail_message': exception.__str__(), 'left_tries': self.left_tries - 1}
parameters = {'stage': TaskStagesEnum.FAIL, 'exception': exception,
'left_tries': self.left_tries - 1, **parameters}
return self.copy(**parameters)

def create_successive_task(self, graphs: List[OptGraph], operator_type: Any, **parameters):
if not isinstance(graphs, list):
raise ValueError(f"graphs should be list, got {type(graphs)} instead")
parameters = {**parameters, 'stage': TaskStagesEnum.SUCCESS, 'graphs': graphs,
'operator_type': operator_type, 'parent_task': self}
def create_successive_task(self, individuals: List[Individual], **parameters):
if not isinstance(individuals, list):
raise ValueError(f"individuals should be list, got {type(individuals)} instead")
parameters = {'stage': TaskStagesEnum.SUCCESS, 'individuals': individuals,
'parent_task': self, **parameters}
return self.copy(**parameters)


Expand All @@ -73,8 +79,10 @@ class GeneticNode:

task_params_if_success: Dict[str, Any] = field(default_factory=dict)
task_params_if_fail: Dict[str, Any] = field(default_factory=dict)
max_graphs_input = False # TODO add support for task splitting
max_graphs_output = True # TODO add support for task splitting

individuals_input_count: Optional[int] = None
repeat_count: int = 1
tries_count: int = 1

def __post_init__(self):
# some checks
Expand All @@ -84,30 +92,56 @@ def __post_init__(self):
# TODO check interface of operator

def __call__(self, task: GeneticOperatorTask):
if task.left_tries > 0:
final_tasks = list()

if task.stage is not TaskStagesEnum.FAIL:
# if task from previous node then set max tries
task.left_tries = self.tries_count

# if there are unappropriated individuals count
# then divide task to subtasks with appropriate individuals count
length, max_length = len(task.individuals), self.individuals_input_count
if max_length is not None and length > max_length:
individuals_groups = [task.individuals[i * max_length:min(length, (i + 1) * max_length)]
for i in range(ceil(length / max_length))]
for individuals_group in individuals_groups:
final_tasks.append(task.copy(individuals=individuals_group))
# get task for current run
task = final_tasks.pop()

# repeat each task if it is allowed
if self.repeat_count > 1:
final_tasks.append(task)
for _ in range(self.repeat_count - 1):
final_tasks.extend([task.copy() for task in final_tasks])
# get task for current run
task = final_tasks.pop()

# run operator
if task.stage is not TaskStagesEnum.FAIL or task.left_tries > 0:
try:
# TODO all operator should return list of lists of graph
graphs, operator_type = self.operator(task.graphs, task.operator_type)
# tasks = [task.create_successive_task(graphs, operator_type) for graphs in grouped_graphs]
tasks = [task.create_successive_task(graphs, operator_type, **self.task_params_if_success)]
individuals, operator_type = self.operator(task.individuals, task.operator_type)
tasks = [task.create_successive_task(individuals, prev_stage_node=self.name,
operator_type=None, **self.task_params_if_success)]
next_nodes = self.success_outputs
except Exception as exception:
# TODO save where it fails
tasks = [task.create_failed_task(exception, **self.task_params_if_fail)]
next_nodes = self.fail_outputs

final_tasks = list()
for _task in tasks:
for _node in next_nodes:
new_task = _task.copy()
if _node is None:
if task.stage is TaskStagesEnum.SUCCESS:
if new_task.stage is TaskStagesEnum.SUCCESS:
new_task.stage = TaskStagesEnum.FINISH
elif task.stage is TaskStagesEnum.FAIL:
elif new_task.stage is TaskStagesEnum.FAIL:
# if there is no next node, then no tries
new_task.left_tries = -1
new_task.stage_node = _node
new_task.next_stage_node = _node
final_tasks.append(new_task)
return final_tasks
return final_tasks

def __hash__(self):
# TODO add test for hash
Expand Down Expand Up @@ -173,10 +207,10 @@ def __call__(self, task: GeneticOperatorTask):
if task.stage is TaskStagesEnum.FINISH:
raise ValueError('Task is finished')

if task.stage_node not in self.__nodes_map:
if task.next_stage_node not in self.__nodes_map:
raise ValueError(f"Unknown stage node {task.stage}")

return self.__nodes_map[task.stage_node](task)
return self.__nodes_map[task.next_stage_node](task)

def __getitem__(self, node_name: str):
if node_name not in self.__nodes_map:
Expand Down
69 changes: 37 additions & 32 deletions golem/core/optimisers/genetic/operators/reproduction.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from copy import deepcopy, copy
from dataclasses import dataclass
from enum import Enum
from itertools import chain
from multiprocessing.managers import DictProxy
from multiprocessing import Manager
from queue import Empty, Queue
Expand All @@ -22,6 +23,7 @@
from golem.core.optimisers.genetic.operators.operator import PopulationT, EvaluationOperator
from golem.core.optimisers.genetic.operators.selection import Selection
from golem.core.optimisers.graph import OptGraph
from golem.core.optimisers.opt_history_objects.parent_operator import ParentOperator
from golem.core.optimisers.populational_optimizer import EvaluationAttemptsError
from golem.core.optimisers.opt_history_objects.individual import Individual
from golem.utilities.random import RandomStateHandler
Expand Down Expand Up @@ -81,16 +83,14 @@ def evaluate(graphs, operator_type, evaluator=evaluator):
return evaluated_individuals[0].graph, None
raise ValueError('evaluator error')

empty_task = GeneticOperatorTask([x.graph for x in population],
stage_node='crossover')
empty_task = GeneticOperatorTask(population, next_stage_node='crossover')

crossover = GeneticNode(name='crossover', operator=self.crossover,
success_outputs=['mutation_1', 'mutation_2'],
task_params_if_success={'operation_type': None})
success_outputs=['mutation_1', 'mutation_2'])
mutation_1 = GeneticNode(name='mutation_1', operator=self.mutation,
success_outputs=['evaluation'])
success_outputs=['evaluation'], individuals_input_count=1)
mutation_2 = GeneticNode(name='mutation_2', operator=self.mutation,
success_outputs=['mutation_1'])
success_outputs=['mutation_1'], individuals_input_count=1)
evaluation = GeneticNode(name='evaluation', operator=evaluate)

pipeline = GeneticPipeline('main', [crossover, mutation_1, mutation_2, evaluation])
Expand All @@ -117,9 +117,9 @@ def evaluate(graphs, operator_type, evaluator=evaluator):
left_tries = self.parameters.pop_size * MAX_GRAPH_GEN_ATTEMPTS_PER_IND * n_jobs
while left_tries > 0 and len(finished_tasks) < self.parameters.pop_size:
# main thread is fast
# frequent queues blocking with qsize is not good idea
# frequent queue.qsize() is not good idea
time.sleep(1)
while result_queue.qsize() > 0:
for _ in range(result_queue.qsize()):
left_tries -= 1
task = result_queue.get()
if task.stage is TaskStagesEnum.FINISH:
Expand All @@ -141,37 +141,42 @@ def evaluate(graphs, operator_type, evaluator=evaluator):
failed_tasks=failed_tasks)
return new_population

def _rebuild_individual(self, individual: Individual,
known_uid_to_population_map: Dict[str, Individual]):
# TODO add test
if individual.uid in known_uid_to_population_map:
# if individual is known, then no need to rebuild it
new_individual = known_uid_to_population_map[individual.uid]
else:
parent_operator = None
if individual.parent_operator:
operator = individual.parent_operator
parent_individuals = [self._rebuild_individual(ind) for ind in operator.parent_individuals]
parent_operator = ParentOperator(type_=operator.type_,
operators=operator.operators,
parent_individuals=parent_individuals)

new_individual = Individual(individual.graph,
parent_operator,
fitness=individual.fitness,
# TODO get requirements from self, not from mutation
metadata=self.mutation.requirements.static_individual_metadata)
# add new individual to known individuals
known_uid_to_population_map[individual.uid] = individual
return new_individual

def _process_tasks(self,
population: PopulationT,
finished_tasks: List['ReproducerWorkerTask'],
failed_tasks: List['ReproducerWorkerTask']):
population_uid_map = {ind.uid: ind for ind in population}

individuals = list()
new_population = list()
for task in finished_tasks + failed_tasks:
if task.stage > ReproducerWorkerStageEnum.MUTATION:
uids = (task.graph_1_uid, task.graph_2_uid)
# create individuals, generated by crossover
if uids not in crossover_individuals:
individuals = self.crossover._get_individuals(new_graphs=[task.graph_for_mutation],
parent_individuals=[population_uid_map[uid]
for uid in uids],
crossover_type=task.crossover_type,
fitness=task.crossover_fitness)
crossover_individuals[uids] = individuals[0]

# create individuals, generated by mutation
if uids in crossover_individuals:
individual = self.mutation._get_individual(new_graph=task.final_graph,
mutation_type=task.mutation_type,
parent=crossover_individuals[uids],
fitness=task.final_fitness)
if task.stage is ReproducerWorkerStageEnum.FINISH:
new_population.append(individual)
elif task.failed_stage is ReproducerWorkerStageEnum.MUTATION_VERIFICATION:
# experience for mab
self.mutation.agent_experience.collect_experience(individual, task.mutation_type, reward=-1.0)
for task in finished_tasks:
new_inds = [self._rebuild_individual(ind, population_uid_map) for ind in task.individuals]
new_population.extend(new_inds)
# experience for mab
# self.mutation.agent_experience.collect_experience(individual, task.mutation_type, reward=-1.0)
return new_population

def _check_final_population(self, population: PopulationT) -> None:
Expand Down
Loading

0 comments on commit e62fa65

Please sign in to comment.