diff --git a/golem/core/constants.py b/golem/core/constants.py index 4104ead3..4cc1e297 100644 --- a/golem/core/constants.py +++ b/golem/core/constants.py @@ -1,5 +1,6 @@ import numpy as np +MAX_GRAPH_GEN_ATTEMPTS_PER_IND = 20 MAX_GRAPH_GEN_ATTEMPTS = 1000 MAX_TUNING_METRIC_VALUE = np.inf MIN_TIME_FOR_TUNING_IN_SEC = 3 diff --git a/golem/core/optimisers/genetic/evaluation.py b/golem/core/optimisers/genetic/evaluation.py index 14ee73f8..8b23a20d 100644 --- a/golem/core/optimisers/genetic/evaluation.py +++ b/golem/core/optimisers/genetic/evaluation.py @@ -241,15 +241,18 @@ def dispatch(self, objective: ObjectiveFunction, timer: Optional[Timer] = None) def evaluate_population(self, individuals: PopulationT) -> PopulationT: individuals_to_evaluate, individuals_to_skip = self.split_individuals_to_evaluate(individuals) - # Evaluate individuals without valid fitness in parallel. - n_jobs = determine_n_jobs(self._n_jobs, self.logger) - parallel = Parallel(n_jobs=n_jobs, verbose=0, pre_dispatch="2*n_jobs") + # Evaluate individuals without valid fitness eval_func = partial(self.evaluate_single, logs_initializer=Log().get_parameters()) - evaluation_results = parallel(delayed(eval_func)(ind.graph, ind.uid) for ind in individuals_to_evaluate) + + if len(individuals_to_evaluate) == 1 or self._n_jobs == 1: + evaluation_results = [eval_func(ind.graph, ind.uid) for ind in individuals_to_evaluate] + else: + n_jobs = determine_n_jobs(self._n_jobs, self.logger) + parallel = Parallel(n_jobs=n_jobs) + evaluation_results = parallel(delayed(eval_func)(ind.graph, ind.uid) for ind in individuals_to_evaluate) + individuals_evaluated = self.apply_evaluation_results(individuals_to_evaluate, evaluation_results) - # If there were no successful evals then try once again getting at least one, - # even if time limit was reached successful_evals = individuals_evaluated + individuals_to_skip self.population_evaluation_info(evaluated_pop_size=len(successful_evals), pop_size=len(individuals)) diff --git a/golem/core/optimisers/genetic/gp_optimizer.py b/golem/core/optimisers/genetic/gp_optimizer.py index eec6e4ae..ffea0b49 100644 --- a/golem/core/optimisers/genetic/gp_optimizer.py +++ b/golem/core/optimisers/genetic/gp_optimizer.py @@ -1,14 +1,11 @@ -from copy import deepcopy -from random import choice from typing import Sequence, Union, Any -from golem.core.constants import MAX_GRAPH_GEN_ATTEMPTS from golem.core.dag.graph import Graph from golem.core.optimisers.genetic.gp_params import GPAlgorithmParameters -from golem.core.optimisers.genetic.operators.crossover import Crossover +from golem.core.optimisers.genetic.operators.crossover import Crossover, SinglePredefinedGraphCrossover from golem.core.optimisers.genetic.operators.elitism import Elitism from golem.core.optimisers.genetic.operators.inheritance import Inheritance -from golem.core.optimisers.genetic.operators.mutation import Mutation +from golem.core.optimisers.genetic.operators.mutation import Mutation, SinglePredefinedGraphMutation from golem.core.optimisers.genetic.operators.operator import PopulationT, EvaluationOperator from golem.core.optimisers.genetic.operators.regularization import Regularization from golem.core.optimisers.genetic.operators.reproduction import ReproductionController @@ -38,13 +35,18 @@ def __init__(self, # Define genetic operators self.regularization = Regularization(graph_optimizer_params, graph_generation_params) self.selection = Selection(graph_optimizer_params) - self.crossover = Crossover(graph_optimizer_params, requirements, graph_generation_params) - self.mutation = Mutation(graph_optimizer_params, requirements, graph_generation_params) + self.crossover = SinglePredefinedGraphCrossover(graph_optimizer_params, requirements, graph_generation_params) + self.mutation = SinglePredefinedGraphMutation(graph_optimizer_params, requirements, graph_generation_params) self.inheritance = Inheritance(graph_optimizer_params, self.selection) self.elitism = Elitism(graph_optimizer_params) self.operators = [self.regularization, self.selection, self.crossover, self.mutation, self.inheritance, self.elitism] - self.reproducer = ReproductionController(graph_optimizer_params, self.selection, self.mutation, self.crossover) + + self.reproducer = ReproductionController(parameters=graph_optimizer_params, + selection=self.selection, + mutation=self.mutation, + crossover=self.crossover, + verifier=self.graph_generation_params.verifier) # Define adaptive parameters self._pop_size: PopulationSize = init_adaptive_pop_size(graph_optimizer_params, self.generations) @@ -65,39 +67,13 @@ def _initial_population(self, evaluator: EvaluationOperator): """ Initializes the initial population """ # Adding of initial assumptions to history as zero generation self._update_population(evaluator(self.initial_individuals), 'initial_assumptions') - pop_size = self.graph_optimizer_params.pop_size - - if len(self.initial_individuals) < pop_size: - self.initial_individuals = self._extend_population(self.initial_individuals, pop_size) - # Adding of extended population to history - self._update_population(evaluator(self.initial_individuals), 'extended_initial_assumptions') - - def _extend_population(self, pop: PopulationT, target_pop_size: int) -> PopulationT: - verifier = self.graph_generation_params.verifier - extended_pop = list(pop) - pop_graphs = [ind.graph for ind in extended_pop] - - # Set mutation probabilities to 1.0 - initial_req = deepcopy(self.requirements) - initial_req.mutation_prob = 1.0 - self.mutation.update_requirements(requirements=initial_req) - - for iter_num in range(MAX_GRAPH_GEN_ATTEMPTS): - if len(extended_pop) == target_pop_size: - break - new_ind = self.mutation(choice(pop)) - if new_ind: - new_graph = new_ind.graph - if new_graph not in pop_graphs and verifier(new_graph): - extended_pop.append(new_ind) - pop_graphs.append(new_graph) - else: - self.log.warning(f'Exceeded max number of attempts for extending initial graphs, stopping.' - f'Current size {len(pop)}, required {target_pop_size} graphs.') - - # Reset mutation probabilities to default - self.mutation.update_requirements(requirements=self.requirements) - return extended_pop + # pop_size = self.graph_optimizer_params.pop_size + # + # if len(self.initial_individuals) < pop_size: + # self.initial_individuals += self.reproducer._reproduce(population=self.initial_individuals, + # evaluator=evaluator) + # # Adding of extended population to history + # self._update_population(self.initial_individuals, 'extended_initial_assumptions') def _evolve_population(self, evaluator: EvaluationOperator) -> PopulationT: """ Method realizing full evolution cycle """ @@ -120,7 +96,6 @@ def _evolve_population(self, evaluator: EvaluationOperator) -> PopulationT: # Use some part of previous pop in the next pop new_population = self.inheritance(self.population, new_population) new_population = self.elitism(self.generations.best_individuals, new_population) - return new_population def _update_requirements(self): diff --git a/golem/core/optimisers/genetic/gp_optimizer_new.py b/golem/core/optimisers/genetic/gp_optimizer_new.py new file mode 100644 index 00000000..ffea0b49 --- /dev/null +++ b/golem/core/optimisers/genetic/gp_optimizer_new.py @@ -0,0 +1,116 @@ +from typing import Sequence, Union, Any + +from golem.core.dag.graph import Graph +from golem.core.optimisers.genetic.gp_params import GPAlgorithmParameters +from golem.core.optimisers.genetic.operators.crossover import Crossover, SinglePredefinedGraphCrossover +from golem.core.optimisers.genetic.operators.elitism import Elitism +from golem.core.optimisers.genetic.operators.inheritance import Inheritance +from golem.core.optimisers.genetic.operators.mutation import Mutation, SinglePredefinedGraphMutation +from golem.core.optimisers.genetic.operators.operator import PopulationT, EvaluationOperator +from golem.core.optimisers.genetic.operators.regularization import Regularization +from golem.core.optimisers.genetic.operators.reproduction import ReproductionController +from golem.core.optimisers.genetic.operators.selection import Selection +from golem.core.optimisers.genetic.parameters.graph_depth import AdaptiveGraphDepth +from golem.core.optimisers.genetic.parameters.operators_prob import init_adaptive_operators_prob +from golem.core.optimisers.genetic.parameters.population_size import init_adaptive_pop_size, PopulationSize +from golem.core.optimisers.objective.objective import Objective +from golem.core.optimisers.opt_history_objects.individual import Individual +from golem.core.optimisers.optimization_parameters import GraphRequirements +from golem.core.optimisers.optimizer import GraphGenerationParams +from golem.core.optimisers.populational_optimizer import PopulationalOptimizer + + +class EvoGraphOptimizer(PopulationalOptimizer): + """ + Multi-objective evolutionary graph optimizer named GPComp + """ + + def __init__(self, + objective: Objective, + initial_graphs: Sequence[Union[Graph, Any]], + requirements: GraphRequirements, + graph_generation_params: GraphGenerationParams, + graph_optimizer_params: GPAlgorithmParameters): + super().__init__(objective, initial_graphs, requirements, graph_generation_params, graph_optimizer_params) + # Define genetic operators + self.regularization = Regularization(graph_optimizer_params, graph_generation_params) + self.selection = Selection(graph_optimizer_params) + self.crossover = SinglePredefinedGraphCrossover(graph_optimizer_params, requirements, graph_generation_params) + self.mutation = SinglePredefinedGraphMutation(graph_optimizer_params, requirements, graph_generation_params) + self.inheritance = Inheritance(graph_optimizer_params, self.selection) + self.elitism = Elitism(graph_optimizer_params) + self.operators = [self.regularization, self.selection, self.crossover, + self.mutation, self.inheritance, self.elitism] + + self.reproducer = ReproductionController(parameters=graph_optimizer_params, + selection=self.selection, + mutation=self.mutation, + crossover=self.crossover, + verifier=self.graph_generation_params.verifier) + + # Define adaptive parameters + self._pop_size: PopulationSize = init_adaptive_pop_size(graph_optimizer_params, self.generations) + self._operators_prob = init_adaptive_operators_prob(graph_optimizer_params) + self._graph_depth = AdaptiveGraphDepth(self.generations, + start_depth=requirements.start_depth, + max_depth=requirements.max_depth, + max_stagnation_gens=graph_optimizer_params.adaptive_depth_max_stagnation, + adaptive=graph_optimizer_params.adaptive_depth) + + # Define initial parameters + self.requirements.max_depth = self._graph_depth.initial + self.graph_optimizer_params.pop_size = self._pop_size.initial + self.initial_individuals = [Individual(graph, metadata=requirements.static_individual_metadata) + for graph in self.initial_graphs] + + def _initial_population(self, evaluator: EvaluationOperator): + """ Initializes the initial population """ + # Adding of initial assumptions to history as zero generation + self._update_population(evaluator(self.initial_individuals), 'initial_assumptions') + # pop_size = self.graph_optimizer_params.pop_size + # + # if len(self.initial_individuals) < pop_size: + # self.initial_individuals += self.reproducer._reproduce(population=self.initial_individuals, + # evaluator=evaluator) + # # Adding of extended population to history + # self._update_population(self.initial_individuals, 'extended_initial_assumptions') + + def _evolve_population(self, evaluator: EvaluationOperator) -> PopulationT: + """ Method realizing full evolution cycle """ + + # Defines adaptive changes to algorithm parameters + # like pop_size and operator probabilities + self._update_requirements() + + # Regularize previous population + individuals_to_select = self.regularization(self.population, evaluator) + # Reproduce from previous pop to get next population + new_population = self.reproducer.reproduce(individuals_to_select, evaluator) + + # Adaptive agent experience collection & learning + # Must be called after reproduction (that collects the new experience) + experience = self.mutation.agent_experience + experience.collect_results(new_population) + self.mutation.agent.partial_fit(experience) + + # Use some part of previous pop in the next pop + new_population = self.inheritance(self.population, new_population) + new_population = self.elitism(self.generations.best_individuals, new_population) + return new_population + + def _update_requirements(self): + if not self.generations.is_any_improved: + self.graph_optimizer_params.mutation_prob, self.graph_optimizer_params.crossover_prob = \ + self._operators_prob.next(self.population) + self.log.info( + f'Next mutation proba: {self.graph_optimizer_params.mutation_prob}; ' + f'Next crossover proba: {self.graph_optimizer_params.crossover_prob}') + self.graph_optimizer_params.pop_size = self._pop_size.next(self.population) + self.requirements.max_depth = self._graph_depth.next() + self.log.info( + f'Next population size: {self.graph_optimizer_params.pop_size}; ' + f'max graph depth: {self.requirements.max_depth}') + + # update requirements in operators + for operator in self.operators: + operator.update_requirements(self.graph_optimizer_params, self.requirements) diff --git a/golem/core/optimisers/genetic/gp_params.py b/golem/core/optimisers/genetic/gp_params.py index 7579dc96..8ad3e06d 100644 --- a/golem/core/optimisers/genetic/gp_params.py +++ b/golem/core/optimisers/genetic/gp_params.py @@ -76,6 +76,9 @@ class GPAlgorithmParameters(AlgorithmParameters): mutation_prob: float = 0.8 variable_mutation_num: bool = True max_num_of_operator_attempts: int = 100 + max_num_of_crossover_reproducer_attempts: int = 1 + max_num_of_mutation_reproducer_attempts: int = 2 + mutation_attempts_per_each_crossover_reproducer: int = 8 mutation_strength: MutationStrengthEnum = MutationStrengthEnum.mean min_pop_size_with_elitism: int = 5 required_valid_ratio: float = 0.9 diff --git a/golem/core/optimisers/genetic/operators/crossover.py b/golem/core/optimisers/genetic/operators/crossover.py index 866b0d27..47160482 100644 --- a/golem/core/optimisers/genetic/operators/crossover.py +++ b/golem/core/optimisers/genetic/operators/crossover.py @@ -2,7 +2,9 @@ from itertools import chain from math import ceil from random import choice, random, sample -from typing import Callable, Union, Iterable, Tuple, TYPE_CHECKING +from typing import Callable, Union, Iterable, Tuple, TYPE_CHECKING, Optional, List + +from joblib import Parallel, delayed from golem.core.adapter import register_native from golem.core.dag.graph_utils import nodes_from_layer, node_depth @@ -40,12 +42,13 @@ def __init__(self, self.graph_generation_params = graph_generation_params def __call__(self, population: PopulationT) -> PopulationT: - if len(population) == 1: - new_population = population + if len(population) > 1: + with Parallel(n_jobs=self.requirements.n_jobs) as parallel: + new_population = parallel(delayed(self._crossover)(ind_1, ind_2) + for ind_1, ind_2 in Crossover.crossover_parents_selection(population)) + new_population = list(chain(*new_population)) else: - new_population = [] - for ind_1, ind_2 in Crossover.crossover_parents_selection(population): - new_population += self._crossover(ind_1, ind_2) + new_population = population[:] return new_population @staticmethod @@ -93,12 +96,12 @@ def _crossover_by_type(self, crossover_type: CrossoverTypesEnum) -> CrossoverCal raise ValueError(f'Required crossover type is not found: {crossover_type}') def _get_individuals(self, new_graphs: Tuple[OptGraph, OptGraph], parent_individuals: Tuple[Individual, Individual], - crossover_type: Union[CrossoverTypesEnum, Callable]) -> Tuple[Individual, Individual]: + crossover_type: Union[CrossoverTypesEnum, Callable], **kwargs) -> Tuple[Individual, Individual]: operator = ParentOperator(type_='crossover', operators=str(crossover_type), parent_individuals=parent_individuals) metadata = self.requirements.static_individual_metadata - return tuple(Individual(graph, operator, metadata=metadata) for graph in new_graphs) + return tuple(Individual(graph, operator, metadata=metadata, **kwargs) for graph in new_graphs) def _will_crossover_be_applied(self, graph_first, graph_second, crossover_type) -> bool: return not (graph_first is graph_second or @@ -106,6 +109,31 @@ def _will_crossover_be_applied(self, graph_first, graph_second, crossover_type) crossover_type is CrossoverTypesEnum.none) +class SinglePredefinedGraphCrossover(Crossover): + """ Crossover that tries to create new graph/graphs from only two graphs + in one attempt without any checks + """ + def __call__(self, + individuals: List[Individual], + crossover_type: Optional[CrossoverTypesEnum] = None) -> Tuple[List[Individual], CrossoverTypesEnum]: + if len(individuals) < 2: + raise ValueError(f"Crossover needs 2 individuals, get {len(individuals)}") + elif len(individuals) > 2: + individuals = sample(individuals, 2) + graphs = [deepcopy(ind.graph) for ind in individuals] + + crossover_type = crossover_type or choice(self.parameters.crossover_types) + if crossover_type is CrossoverTypesEnum.none: + return individuals, crossover_type + crossover_func = self._get_crossover_function(crossover_type) + + new_graphs = crossover_func(*graphs, max_depth=self.requirements.max_depth) + new_individuals = self._get_individuals(new_graphs=new_graphs, + parent_individuals=individuals, + crossover_type=crossover_type) + return new_individuals, crossover_type + + @register_native def subtree_crossover(graph_1: OptGraph, graph_2: OptGraph, max_depth: int, inplace: bool = True) -> Tuple[OptGraph, OptGraph]: diff --git a/golem/core/optimisers/genetic/operators/mutation.py b/golem/core/optimisers/genetic/operators/mutation.py index 2005377e..f54e3bcf 100644 --- a/golem/core/optimisers/genetic/operators/mutation.py +++ b/golem/core/optimisers/genetic/operators/mutation.py @@ -1,6 +1,6 @@ from copy import deepcopy from random import random -from typing import Callable, Union, Tuple, TYPE_CHECKING, Mapping, Hashable, Optional +from typing import Callable, Union, Tuple, TYPE_CHECKING, Mapping, Hashable, Optional, List import numpy as np @@ -22,6 +22,7 @@ if TYPE_CHECKING: from golem.core.optimisers.genetic.gp_params import GPAlgorithmParameters +MutationType = Union[MutationTypesEnum, Callable] MutationFunc = Callable[[Graph, GraphRequirements, GraphGenerationParams, AlgorithmParameters], Graph] MutationIdType = Hashable MutationRepo = Mapping[MutationIdType, MutationFunc] @@ -81,11 +82,11 @@ def __call__(self, population: Union[Individual, PopulationT]) -> Union[Individu if isinstance(population, Individual): population = [population] - final_population, mutations_applied, application_attempts = tuple(zip(*map(self._mutation, population))) - - # drop individuals to which mutations could not be applied - final_population = [ind for ind, init_ind, attempt in zip(final_population, population, application_attempts) - if not attempt or ind.graph != init_ind.graph] + final_population = [] + for individual in population: + new_ind, _, applied = self._mutation(individual) + if not applied or new_ind.graph != individual.graph: + final_population.append(new_ind) if len(population) == 1: return final_population[0] if final_population else final_population @@ -105,11 +106,9 @@ def _mutation(self, individual: Individual) -> Tuple[Individual, Optional[Mutati application_attempt = True is_correct_graph = self.graph_generation_params.verifier(new_graph) if is_correct_graph: - parent_operator = ParentOperator(type_='mutation', - operators=mutation_applied, - parent_individuals=individual) - individual = Individual(new_graph, parent_operator, - metadata=self.requirements.static_individual_metadata) + individual = self._get_individual(new_graph=new_graph, + mutation_type=mutation_applied, + parent=individual) break else: # Collect invalid actions @@ -160,3 +159,32 @@ def _get_mutation_func(self, mutation_type: Union[MutationTypesEnum, Callable]) mutation_func = self._mutations_repo[mutation_type] adapted_mutation_func = self.graph_generation_params.adapter.adapt_func(mutation_func) return adapted_mutation_func + + def _get_individual(self, new_graph: Graph, mutation_type: MutationType, parent: Individual, **kwargs): + parent_operator = ParentOperator(type_='mutation', + operators=mutation_type, + parent_individuals=parent) + individual = Individual(new_graph, parent_operator, + metadata=self.requirements.static_individual_metadata, **kwargs) + return individual + + +class SinglePredefinedGraphMutation(Mutation): + """ Mutation that tries to create new graph (not individual) from the only graph in one attempt + without any checks + """ + def __call__(self, individuals: List[Individual], mutation_type: Optional[MutationType] = None) -> Tuple[Graph, MutationIdType]: + if len(individuals) != 1: + raise ValueError('individuals should be len 1') + + individual = individuals[0] + new_graph = deepcopy(individual.graph) + mutation_type = mutation_type or self._operator_agent.choose_action(new_graph) + if mutation_type is MutationTypesEnum.none: + return None, None + mutation_func = self._get_mutation_func(mutation_type) + new_graph = mutation_func(new_graph, requirements=self.requirements, + graph_gen_params=self.graph_generation_params, + parameters=self.parameters) + new_individual = self._get_individual(new_graph=new_graph, mutation_type=mutation_type, parent=individual) + return new_individual, mutation_type diff --git a/golem/core/optimisers/genetic/operators/node.py b/golem/core/optimisers/genetic/operators/node.py new file mode 100644 index 00000000..52ebea28 --- /dev/null +++ b/golem/core/optimisers/genetic/operators/node.py @@ -0,0 +1,240 @@ +from dataclasses import dataclass, replace, field +from enum import Enum +from itertools import chain +from math import ceil +from typing import Optional, List, Union, Any, Dict + +from golem.core.optimisers.genetic.operators.operator import Operator +from golem.core.optimisers.graph import OptGraph +from golem.core.optimisers.opt_history_objects.individual import Individual + +GeneticNodeAllowedType = Union['GeneticNode', str, None] + + +class TaskStagesEnum(Enum): + (INIT, SUCCESS, FAIL, FINISH) = range(4) + + +@dataclass +class GeneticOperatorTask: + """ Contain individuals and information what to do with it and what was made """ + individuals: List[Individual] + operator_type: Optional[Any] = None + + stage: TaskStagesEnum = TaskStagesEnum.INIT + next_stage_node: GeneticNodeAllowedType = None + prev_stage_node: GeneticNodeAllowedType = None + + # parent data + parent_task: Optional['GeneticOperatorTask'] = None + + exception: Optional[Exception] = None + left_tries: int = 1 + + def __repr__(self): + s = (f"{self.__class__.__name__}('{self.stage.name}', " + f"next: '{self.next_stage_node}', prev: '{self.prev_stage_node}', " + f"individuals: {len(self.individuals) if isinstance(self.individuals, list) else type(self.individuals)}, " + f"operator_type: '{self.operator_type}', " + f"tries: {self.left_tries}, " + f"parent: {int(self.parent_task is not None)})") + return s + + def __copy__(self): + # TODO test + return self.copy() + + def __deepcopy__(self, memodict: Dict = dict()): + # TODO test + raise NotImplementedError('Deepcopy is not allowed for task') + + def copy(self, **parameters): + # TODO test + new_task = replace(self) + for parameter, value in parameters.items(): + setattr(new_task, parameter, value) + return new_task + + def create_failed_task(self, exception: Exception, **parameters): + parameters = {'stage': TaskStagesEnum.FAIL, 'exception': exception, + 'left_tries': self.left_tries - 1, **parameters} + return self.copy(**parameters) + + def create_successive_task(self, individuals: List[Individual], **parameters): + if not isinstance(individuals, list): + raise ValueError(f"individuals should be list, got {type(individuals)} instead") + parameters = {'stage': TaskStagesEnum.SUCCESS, 'individuals': individuals, + 'parent_task': self, **parameters} + return self.copy(**parameters) + + +@dataclass(frozen=True) +class GeneticNode: + """ Operator wrapper with data/tools for task routing """ + + name: str + operator: Operator + success_outputs: Optional[List[GeneticNodeAllowedType]] = field(default_factory=lambda: [None]) + fail_outputs: Optional[List[GeneticNodeAllowedType]] = field(default_factory=lambda: [None]) + + task_params_if_success: Dict[str, Any] = field(default_factory=dict) + task_params_if_fail: Dict[str, Any] = field(default_factory=dict) + + individuals_input_count: Optional[int] = None + repeat_count: int = 1 + tries_count: int = 1 + + def __post_init__(self): + # some checks + _check_list_with_genetic_nodes(self.success_outputs) + _check_list_with_genetic_nodes(self.fail_outputs) + + # TODO check interface of operator + + def __call__(self, task: GeneticOperatorTask): + final_tasks = list() + + if task.stage is not TaskStagesEnum.FAIL: + # if task from previous node then set max tries + task.left_tries = self.tries_count + + # if there are unappropriated individuals count + # then divide task to subtasks with appropriate individuals count + length, max_length = len(task.individuals), self.individuals_input_count + if max_length is not None and length > max_length: + individuals_groups = [task.individuals[i * max_length:min(length, (i + 1) * max_length)] + for i in range(ceil(length / max_length))] + for individuals_group in reversed(individuals_groups): + final_tasks.append(task.copy(individuals=individuals_group)) + # get task for current run + task = final_tasks.pop() + + # repeat each task if it is allowed + if self.repeat_count > 1: + final_tasks.append(task) + final_tasks = [task.copy() for task in final_tasks * self.repeat_count] + # get task for current run + task = final_tasks.pop() + + # run operator + if task.stage is not TaskStagesEnum.FAIL or task.left_tries > 0: + try: + # TODO all operator should return list of lists of graph + individuals, operator_type = self.operator(task.individuals, task.operator_type) + tasks = [task.create_successive_task(individuals, prev_stage_node=self.name, + operator_type=None, **self.task_params_if_success)] + next_nodes = self.success_outputs + except Exception as exception: + # TODO save where it fails + tasks = [task.create_failed_task(exception, **self.task_params_if_fail)] + next_nodes = self.fail_outputs + + for _task in tasks: + for _node in next_nodes: + new_task = _task.copy() + if _node is None: + if new_task.stage is TaskStagesEnum.SUCCESS: + new_task.stage = TaskStagesEnum.FINISH + elif new_task.stage is TaskStagesEnum.FAIL: + # if there is no next node, then no tries + new_task.left_tries = -1 + new_task.next_stage_node = _node + final_tasks.append(new_task) + return final_tasks + + def __hash__(self): + # TODO add test for hash + return self.name.__hash__() + + def __copy__(self): + """ because hash is the name """ + raise NotImplementedError('Use ``copy`` function instead') + + def __deepcopy__(self, memodict: Dict = dict()): + """ because hash is the name """ + raise NotImplementedError('Use ``copy`` function instead') + + def copy(self, name: str): + """ Create new node with same data but new name """ + # TODO add tests that all fields are copied + # new_node = replace(self) + return GeneticNode(name=name, operator=self.operator, + success_outputs=self.success_outputs, + fail_outputs=self.fail_outputs) + + # def call_operation(self, task: GeneticOperatorTask): + # graphs_grouped, operator_type = self.operator(task.graphs, task.operator_type) + # graphs_grouped = [([graph] if not isinstance(graph, list) else graph) for graph in graphs_grouped] + # + # new_graphs_grouped = list() + # for graphs in graphs_grouped: + # if len(graphs) > self.max_graphs_output: + # raise NotImplementedError() + # else: + # new_graphs_grouped.append(graphs) + # return graphs, operator_type + + +@dataclass +class GeneticPipeline: + """ Pool of connected nodes with useful checks + Call only a one node in time + """ + + name: str + nodes: List[GeneticNode] + __nodes_map: Optional[Dict[str, GeneticNode]] = None + + def __post_init__(self): + # some checks + _check_list_with_genetic_nodes(self.nodes, force_genetic_node_type_check=True) + + # check that all connection between nodes connect existing nodes + # TODO fix + # connection_goals = set(chain(*[node.success_outputs + node.fail_outputs for node in self.nodes])) + # connection_goals -= {None} + # if not (set(self.nodes) > connection_goals): + # raise ValueError('Some nodes have connection with nonexisting nodes') + + self.__nodes_map = {node.name: node for node in self.nodes} + + def __call__(self, task: GeneticOperatorTask): + """ Call one node and return result """ + if not isinstance(task, GeneticOperatorTask): + raise ValueError(f"``task`` should be ``GeneticOperatorTask``, get {type(task)} instead") + + if task.stage is TaskStagesEnum.FINISH: + raise ValueError('Task is finished') + + if task.next_stage_node not in self.__nodes_map: + raise ValueError(f"Unknown stage node {task.stage}") + + return self.__nodes_map[task.next_stage_node](task) + + def __getitem__(self, node_name: str): + if node_name not in self.__nodes_map: + raise KeyError(f"Unknown node {node_name}") + return self.__nodes_map[node_name] + + def __contains__(self, node_name: str): + # TODO test that contains also return true when getitem works + return node_name in self.__nodes_map + +def _check_list_with_genetic_nodes(list_with_nodes, force_genetic_node_type_check=False): + # check that nodes is list with nodes + list_with_nodes_is_appropriate = True + list_with_nodes_is_appropriate &= isinstance(list_with_nodes, list) + list_with_nodes_is_appropriate &= len(list_with_nodes) > 0 + checked_type = GeneticNode if force_genetic_node_type_check else GeneticNodeAllowedType + # TODO fix it + # list_with_nodes_is_appropriate &= all(isinstance(node, checked_type) for node in list_with_nodes) + + if not list_with_nodes_is_appropriate: + raise ValueError('``nodes`` parameter should be list with ``GeneticNodes``') + + # check that all nodes have unique name + # hash of node is calculated as hash of it is name, therefore check may be done as: + if len(set(list_with_nodes)) != len(list_with_nodes): + # TODO add test for that line + # TODO add test for that line works as is + raise AttributeError(f"nodes names should be unique") diff --git a/golem/core/optimisers/genetic/operators/regularization.py b/golem/core/optimisers/genetic/operators/regularization.py index 7d8e7dbe..8fe45d82 100644 --- a/golem/core/optimisers/genetic/operators/regularization.py +++ b/golem/core/optimisers/genetic/operators/regularization.py @@ -34,6 +34,7 @@ def __call__(self, population: PopulationT, evaluator: EvaluationOperator) -> Po raise ValueError(f'Required regularization type not found: {regularization_type}') def _decremental_regularization(self, population: PopulationT, evaluator: EvaluationOperator) -> PopulationT: + # TODO: do it in parallel if it can be done size = self.parameters.pop_size additional_inds = [] prev_nodes_ids = set() diff --git a/golem/core/optimisers/genetic/operators/reproduction.py b/golem/core/optimisers/genetic/operators/reproduction.py index bbaacde4..8ebda906 100644 --- a/golem/core/optimisers/genetic/operators/reproduction.py +++ b/golem/core/optimisers/genetic/operators/reproduction.py @@ -1,16 +1,33 @@ -from typing import Optional +import time +from copy import deepcopy, copy +from dataclasses import dataclass +from enum import Enum +from itertools import chain +from multiprocessing.managers import DictProxy +from multiprocessing import Manager +from queue import Empty, Queue +from random import sample, randint +from typing import Optional, Dict, Union, List -import numpy as np +from joblib import Parallel, delayed -from golem.core.constants import MIN_POP_SIZE, EVALUATION_ATTEMPTS_NUMBER +from golem.core.constants import MAX_GRAPH_GEN_ATTEMPTS_PER_IND +from golem.core.dag.graph_verifier import GraphVerifier from golem.core.log import default_log +from golem.core.optimisers.fitness import Fitness from golem.core.optimisers.genetic.gp_params import GPAlgorithmParameters -from golem.core.optimisers.genetic.operators.crossover import Crossover -from golem.core.optimisers.genetic.operators.mutation import Mutation +from golem.core.optimisers.genetic.operators.crossover import Crossover, CrossoverTypesEnum +from golem.core.optimisers.genetic.operators.mutation import Mutation, MutationType +from golem.core.optimisers.genetic.operators.node import GeneticPipeline, TaskStagesEnum, GeneticNode, \ + GeneticOperatorTask from golem.core.optimisers.genetic.operators.operator import PopulationT, EvaluationOperator from golem.core.optimisers.genetic.operators.selection import Selection +from golem.core.optimisers.graph import OptGraph +from golem.core.optimisers.opt_history_objects.parent_operator import ParentOperator from golem.core.optimisers.populational_optimizer import EvaluationAttemptsError -from golem.utilities.data_structures import ensure_wrapped_in_sequence +from golem.core.optimisers.opt_history_objects.individual import Individual +from golem.utilities.random import RandomStateHandler + class ReproductionController: @@ -18,26 +35,11 @@ class ReproductionController: Task of the Reproduction Controller is to reproduce population while keeping population size as specified in optimizer settings. - It implements a simple proportional controller that compensates for - invalid results each generation by computing average ratio of valid results. - Invalid results include cases when Operators, Evaluator or GraphVerifier - return output population that's smaller than the input population. - - Example. - Let's say we need a population of size 50. Let's say about 20% of individuals - are *usually* evaluated with an error. If we take select only 50 for the new population, - we will get about 40 valid ones. Not enough. Therefore, we need to take more. - How much more? Approximately by `target_pop_size / mean_success_rate = 50 / 0.8 ~= 62'. - Here `mean_success_rate` estimates number of successfully evaluated individuals. - Then we request 62, then approximately 62*0.8~=50 of them are valid in the end, - and we achieve target size more reliably. This runs in a loop to control stochasticity. - Args: parameters: genetic algorithm parameters. selection: operator used in reproduction. mutation: operator used in reproduction. crossover: operator used in reproduction. - window_size: size in iterations of the moving window to compute reproduction success rate. """ def __init__(self, @@ -45,90 +47,323 @@ def __init__(self, selection: Selection, mutation: Mutation, crossover: Crossover, - window_size: int = 10, - ): + verifier: Optional[GraphVerifier] = None): self.parameters = parameters self.selection = selection self.mutation = mutation self.crossover = crossover + self.verifier = verifier or self.mutation.graph_generation_params.verifier + self._pop_graph_descriptive_ids = set() self._minimum_valid_ratio = parameters.required_valid_ratio * 0.5 - self._window_size = window_size - self._success_rate_window = np.full(self._window_size, 1.0) self._log = default_log(self) - @property - def mean_success_rate(self) -> float: - """Returns mean success rate of reproduction + evaluation, - fraction of how many individuals were reproduced and mutated successfully. - Computed as average fraction for the last N iterations (N = window size param)""" - return float(np.mean(self._success_rate_window)) - - def reproduce_uncontrolled(self, - population: PopulationT, - evaluator: EvaluationOperator, - pop_size: Optional[int] = None, - ) -> PopulationT: + def reproduce(self, population: PopulationT, evaluator: EvaluationOperator) -> PopulationT: """Reproduces and evaluates population (select, crossover, mutate). - Doesn't implement any additional checks on population. """ - # If operators can return unchanged individuals from previous population - # (e.g. both Mutation & Crossover are not applied with some probability) - # then there's a probability that duplicate individuals can appear - - # TODO: it can't choose more than len(population)! - # It can be faster if it could. - selected_individuals = self.selection(population, pop_size) - new_population = self.crossover(selected_individuals) - new_population = ensure_wrapped_in_sequence(self.mutation(new_population)) - new_population = evaluator(new_population) + selected_individuals = self.selection(population, self.parameters.pop_size) + new_population = self._reproduce(selected_individuals, evaluator) + self._check_final_population(new_population) return new_population - def reproduce(self, - population: PopulationT, - evaluator: EvaluationOperator - ) -> PopulationT: - """Reproduces and evaluates population (select, crossover, mutate). - Implements additional checks on population to ensure that population size - follows required population size. + def _reproduce(self, population: PopulationT, evaluator: EvaluationOperator) -> PopulationT: + """Generate new individuals by mutation in parallel. + Implements additional checks on population to ensure that population size is greater or equal to + required population size. Also controls uniqueness of population. """ - total_target_size = self.parameters.pop_size # next population size - collected_next_population = {} - for i in range(EVALUATION_ATTEMPTS_NUMBER): - # Estimate how many individuals we need to complete new population - # based on average success rate of valid results - residual_size = total_target_size - len(collected_next_population) - residual_size = max(MIN_POP_SIZE, - int(residual_size / self.mean_success_rate)) - residual_size = min(len(population), residual_size) - - # Reproduce the required number of individuals that equals residual size - partial_next_population = self.reproduce_uncontrolled(population, evaluator, residual_size) - # Avoid duplicate individuals that can come unchanged from previous population - collected_next_population.update({ind.uid: ind for ind in partial_next_population}) - - # Keep running average of transform success rate (if sample is big enough) - if len(partial_next_population) >= MIN_POP_SIZE: - valid_ratio = len(partial_next_population) / residual_size - self._success_rate_window = np.roll(self._success_rate_window, shift=1) - self._success_rate_window[0] = valid_ratio - - # Successful return: got enough individuals - if len(collected_next_population) >= total_target_size * self.parameters.required_valid_ratio: - self._log.info(f'Reproduction achieved pop size {len(collected_next_population)}' - f' using {i+1} attempt(s) with success rate {self.mean_success_rate:.3f}') - return list(collected_next_population.values())[:total_target_size] + with Manager() as manager: + task_queue, result_queue = [manager.Queue() for _ in range(2)] + + def evaluate(graphs, operator_type, evaluator=evaluator): + individuals = [Individual(deepcopy(graph), metadata=self.mutation.requirements.static_individual_metadata) + for graph in graphs] + evaluated_individuals = self.evaluator(individuals) + if evaluated_individuals: + return evaluated_individuals[0].graph, None + raise ValueError('evaluator error') + + empty_task = GeneticOperatorTask(population, next_stage_node='crossover') + + crossover = GeneticNode(name='crossover', operator=self.crossover, + success_outputs=['mutation_1', 'mutation_2']) + mutation_1 = GeneticNode(name='mutation_1', operator=self.mutation, + success_outputs=['evaluation'], individuals_input_count=1) + mutation_2 = GeneticNode(name='mutation_2', operator=self.mutation, + success_outputs=['mutation_1'], individuals_input_count=1) + evaluation = GeneticNode(name='evaluation', operator=evaluate) + + pipeline = GeneticPipeline('main', [crossover, mutation_1, mutation_2, evaluation]) + + # parameters for worker + worker_parameters = dict(pipeline=pipeline, + empty_task=empty_task, + task_queue=task_queue, + result_queue=result_queue, + log=self._log) + + n_jobs = self.mutation.requirements.n_jobs + with Parallel(n_jobs=n_jobs + 1, prefer='processes', return_as='generator') as parallel: + # prepare (n_jobs + 1) workers + workers = [ReproduceWorker(seed=randint(0, int(2**32 - 1)), **worker_parameters) + for _ in range(n_jobs + 1)] + # run n_jobs workers with run_flag = True + # and one worker with run_flag = False + # It guarantees n_jobs workers parallel execution also if n_jobs == 1 + # because joblib for n_jobs == 1 does not start parallel pool + _ = parallel(delayed(worker)(run_flag) for worker, run_flag in zip(workers, [True] * n_jobs + [False])) + + finished_tasks, failed_tasks = list(), list() + left_tries = self.parameters.pop_size * MAX_GRAPH_GEN_ATTEMPTS_PER_IND * n_jobs + while left_tries > 0 and len(finished_tasks) < self.parameters.pop_size: + # main thread is fast + # frequent queue.qsize() is not good idea + time.sleep(1) + for _ in range(result_queue.qsize()): + left_tries -= 1 + task = result_queue.get() + if task.stage is TaskStagesEnum.FINISH: + finished_tasks.append(task) + else: + failed_tasks.append(task) + + # get all finished works + while result_queue.qsize() > 0: + task = result_queue.get() + if task.stage is TaskStagesEnum.FINISH: + finished_tasks.append(task) + else: + failed_tasks.append(task) + + # rebuild population + new_population = self._process_tasks(population=population, + finished_tasks=finished_tasks, + failed_tasks=failed_tasks) + return new_population + + def _rebuild_individual(self, individual: Individual, + known_uid_to_population_map: Dict[str, Individual]): + # TODO add test + if individual.uid in known_uid_to_population_map: + # if individual is known, then no need to rebuild it + new_individual = known_uid_to_population_map[individual.uid] else: - # If number of evaluation attempts is exceeded return a warning or raise exception - helpful_msg = ('Check objective, constraints and evo operators. ' - 'Possibly they return too few valid individuals.') - - if len(collected_next_population) >= total_target_size * self._minimum_valid_ratio: - self._log.warning(f'Could not achieve required population size: ' - f'have {len(collected_next_population)},' - f' required {total_target_size}!\n' + helpful_msg) - return list(collected_next_population.values()) - else: - raise EvaluationAttemptsError('Could not collect valid individuals' - ' for next population.' + helpful_msg) + parent_operator = None + if individual.parent_operator: + operator = individual.parent_operator + parent_individuals = [self._rebuild_individual(ind) for ind in operator.parent_individuals] + parent_operator = ParentOperator(type_=operator.type_, + operators=operator.operators, + parent_individuals=parent_individuals) + + new_individual = Individual(individual.graph, + parent_operator, + fitness=individual.fitness, + # TODO get requirements from self, not from mutation + metadata=self.mutation.requirements.static_individual_metadata) + # add new individual to known individuals + known_uid_to_population_map[individual.uid] = individual + return new_individual + + def _process_tasks(self, + population: PopulationT, + finished_tasks: List['ReproducerWorkerTask'], + failed_tasks: List['ReproducerWorkerTask']): + population_uid_map = {ind.uid: ind for ind in population} + + new_population = list() + for task in finished_tasks: + new_inds = [self._rebuild_individual(ind, population_uid_map) for ind in task.individuals] + new_population.extend(new_inds) + # experience for mab + # self.mutation.agent_experience.collect_experience(individual, task.mutation_type, reward=-1.0) + return new_population + + def _check_final_population(self, population: PopulationT) -> None: + """ If population do not achieve required length return a warning or raise exception """ + target_pop_size = self.parameters.pop_size + helpful_msg = ('Check objective, constraints and evo operators. ' + 'Possibly they return too few valid individuals.') + + if len(population) < target_pop_size * self._minimum_valid_ratio: + raise EvaluationAttemptsError('Could not collect valid individuals' + ' for population.' + helpful_msg) + elif len(population) < target_pop_size: + self._log.warning(f'Could not achieve required population size: ' + f'have {len(population)},' + f' required {target_pop_size}!\n' + helpful_msg) + + +class ReproduceWorker: + def __init__(self, + pipeline: GeneticPipeline, + empty_task, + task_queue, + result_queue, + seed: int, + log + ): + self.pipeline = pipeline + self._seed = seed + self._log = log + self._task_queue = task_queue + self._result_queue = result_queue + self._empty_task = empty_task + + def __call__(self, run: bool = True): + self._log.warning(f"CALLED") + with RandomStateHandler(self._seed): + tasks = [self._empty_task.copy()] + while run: + # is there is no tasks, try to get 1. task from queue 2. empty task + if not tasks: + try: + tasks.append(self._task_queue.get(timeout=0.02)) + except Empty: + tasks.append(self._empty_task.copy()) + + # send task to pipeline + processed_tasks = self.pipeline(tasks.pop()) + + # process result + for processed_task in processed_tasks: + if processed_task.stage is TaskStagesEnum.FINISH: + self._result_queue.put(processed_task) + continue + if processed_task.stage is TaskStagesEnum.FAIL: + self._log.warning(f"FAIL: {processed_task.failed_stage}") + self._result_queue.put(processed_task) + if processed_task.left_tries > 0: + tasks.append(processed_task) + else: + tasks.append(processed_task) + + # if there are some tasks, add it to parallel queue + for _ in range(len(tasks) - 1): + self._task_queue.put(tasks.pop()) + + # def process_task(self, task: ReproducerWorkerTask) -> List[ReproducerWorkerTask]: + # """ Get task, make 1 stage and return processed task """ + # # self._log.warning(f"START: {task.stage} {task.crossover_tries}:{task.mutation_tries}") + # task = copy(task) # input task + # task.fail = False + # + # # crossover + # if task.stage is ReproducerWorkerStageEnum.CROSSOVER: + # return self.crossover_stage(task) + # + # # crossover result verification + # if task.stage is ReproducerWorkerStageEnum.CROSSOVER_VERIFICATION: + # task.fail = not self.verifier(task.graph_for_mutation) + # task.step_in_stage(-1 if task.fail else 1) + # return [task] + # + # # crossover uniqueness check + # if task.stage is ReproducerWorkerStageEnum.CROSSOVER_UNIQUENESS_CHECK: + # task.step_in_stage(1) + # return [task] + # # processed_task = self.uniqueness_check_stage(task)[0] + # # processed_task.step_in_stage(-2 if processed_task.fail else 1) + # # return [processed_task] + # + # # crossover result evaluation + # if task.stage is ReproducerWorkerStageEnum.CROSSOVER_EVALUATION: + # task.step_in_stage(1) + # return [copy(task) for _ in range(task.mutation_attempts_per_each_crossover)] + # # processed_task = self.evaluation_stage(task)[0] + # # if processed_task.fail: + # # processed_task.step_in_stage(-3) + # # return [processed_task] + # # else: + # # # create some tasks for mutation for crossover result + # # processed_task.step_in_stage(1) + # # return [copy(processed_task) for _ in range(task.mutation_attempts_per_each_crossover)] + # + # # mutation + # if task.stage is ReproducerWorkerStageEnum.MUTATION: + # return self.mutation_stage(task) + # + # # mutation result verification + # if task.stage is ReproducerWorkerStageEnum.MUTATION_VERIFICATION: + # task.fail = not self.verifier(task.final_graph) + # task.step_in_stage(-1 if task.fail else 1) + # return [task] + # + # # mutation uniqueness check + # if task.stage is ReproducerWorkerStageEnum.MUTATION_UNIQUENESS_CHECK: + # processed_task = self.uniqueness_check_stage(task)[0] + # processed_task.step_in_stage(-2 if processed_task.fail else 1) + # return [processed_task] + # + # # mutation result evaluation + # if task.stage is ReproducerWorkerStageEnum.MUTATION_EVALUATION: + # processed_task = self.evaluation_stage(task)[0] + # processed_task.step_in_stage(-3 if processed_task.fail else 1) + # return [processed_task] + # + # def crossover_stage(self, task: ReproducerWorkerTask) -> List[ReproducerWorkerTask]: + # tasks = [] # tasks to return + # + # # if there is no graphs for crossover then get random graphs + # if task.graph_1_for_crossover is None or task.graph_1_for_crossover is None: + # inds_for_crossover = sample(self._population, k=2) + # task.graph_1_uid, task.graph_1_for_crossover = inds_for_crossover[0].uid, inds_for_crossover[0].graph + # task.graph_2_uid, task.graph_2_for_crossover = inds_for_crossover[1].uid, inds_for_crossover[1].graph + # + # # make crossover + # task.crossover_tries -= 1 + # *new_graphs, task.crossover_type = self.crossover(task.graph_1_for_crossover, + # task.graph_2_for_crossover, + # task.crossover_type) + # + # if not new_graphs: + # # if there is no new_graphs then go to new try + # task.fail = True + # tasks.append(task) + # else: + # # create new task for each new graph after crossover for next stage + # task.step_in_stage(1) + # for graph in new_graphs: + # new_task = copy(task) + # new_task.graph_for_mutation = graph + # tasks.append(new_task) + # return tasks + # + # def mutation_stage(self, task: ReproducerWorkerTask) -> List[ReproducerWorkerTask]: + # task.final_graph, task.mutation_type = self.mutation(task.graph_for_mutation, task.mutation_type) + # task.mutation_tries -= 1 + # if task.final_graph is None: + # task.fail = True + # else: + # task.step_in_stage(1) + # return [task] + # + # def uniqueness_check_stage(self, task: ReproducerWorkerTask) -> List[ReproducerWorkerTask]: + # if task.is_crossover: + # graph = task.graph_for_mutation + # else: + # graph = task.final_graph + # descriptive_id = graph.descriptive_id + # if descriptive_id not in self._pop_graph_descriptive_ids: + # self._pop_graph_descriptive_ids[descriptive_id] = True + # task.fail = False + # else: + # task.fail = True + # return [task] + # + # def evaluation_stage(self, task: ReproducerWorkerTask) -> List[ReproducerWorkerTask]: + # if task.is_crossover: + # graph = task.graph_for_mutation + # else: + # graph = task.final_graph + # individual = Individual(deepcopy(graph), metadata=self.mutation.requirements.static_individual_metadata) + # evaluated_individuals = self.evaluator([individual]) + # if evaluated_individuals:# and evaluated_individuals[0].fitness.valid: + # task.fail = False + # if task.is_crossover: + # task.crossover_fitness = evaluated_individuals[0].fitness + # else: + # task.final_fitness = evaluated_individuals[0].fitness + # else: + # task.fail = True + # return [task] diff --git a/golem/core/optimisers/genetic/pool.py b/golem/core/optimisers/genetic/pool.py new file mode 100644 index 00000000..fa43d452 --- /dev/null +++ b/golem/core/optimisers/genetic/pool.py @@ -0,0 +1,137 @@ +from dataclasses import dataclass +from enum import Enum, auto +from typing import Optional, List, Any, Callable + + +class ParametersTypesEnum(Enum): + UNKNOWN = auto() + OPTIMIZER = auto() + POOL = auto() + NODE = auto() + + def __ge__(self, other): + if self.__class__ is other.__class__: + return self.value >= other.value + return NotImplemented + + def __gt__(self, other): + if self.__class__ is other.__class__: + return self.value > other.value + return NotImplemented + + def __next__(self): + return ParametersTypesEnum(self.value + 1) + + +# class Parameters: +# def __init__(self, type_: ParametersTypesEnum, data: Optional[dict] = None): +# data = data or dict() +# +# for k in data: +# if isinstance(data[k], dict): +# data[k] = Parameters(next(type_), data[k]) +# self.type = type_ +# self.__data = data +# +# def __getitem__(self, keys): +# data = self.__data +# for key in keys: +# data = data[key] +# return data +# +# def __setitem__(self, keys, value): +# data = self.__data +# for key in keys[:-1]: +# if key not in data: +# data[key] = Parameters(next(self.type)) +# data = data[key] +# data[keys[-1]] = value +# +# def __repr__(self): +# def pp(parameters, indent=0): +# return '\n' + '\n'.join(f"{' ' * indent}'{key}': {value.type.name + pp(value, indent + 2) if isinstance(value, self.__class__) else value}" +# for key, value in parameters.__data.items()) +# return self.type.name + pp(self) +# +# def __iter__(self): +# return (x for x in self.__data.keys()) +# +# def items(self): +# return (x for x in self.__data.items()) +# +# def filter_by_type(self, type_: ParametersTypesEnum): +# return [pars for name, pars in self.items() +# if isinstance(pars, Parameters) and pars.type is type_] + + +class Parameters: + pass + + +@dataclass +class OptimizerParameters(Parameters): + pool_parameters: List['PoolParameters'] + n_jobs: int = -1 + + +@dataclass +class PoolParameters(Parameters): + name: str + constructor: Callable + n_jobs: int + nodes: List['Node'] + scheme: 'Scheme' + task_constructor: Callable + task_history: List[Any] + + +class Optimizer: + def __init__(self, parameters: OptimizerParameters): + self.parameters = parameters + + def _evolve_population(self): + common_parameters = self.parameters + for pool_params in common_parameters.pool_parameters: + pool = pool_params.constructor(pool_params, common_parameters) + common_parameters.update(pool.run()) + + +class Pool: + """ Pool of nodes """ + + def __init__(self, pool_parameters: PoolParameters, parameters: OptimizerParameters): + self.name = pool_parameters.name + self.nodes_map = {node.name: node for node in pool_parameters.nodes} + self.task = pool_parameters.task + self.scheme = pool_parameters.scheme + + # TODO error if there are some nodes with same name + + def __call__(self, task: Task): + if not task.next in self.nodes_map: + raise ValueError((f"Pool {self.name}. Unknown node {task.next}. " + f"Existing nodes: {', '.join(self.nodes_map)}.")) + processed_task = task.run_on_node(self.nodes_map[task.next]) + return processed_task + + +class Node: + """ Node with operation """ + + def __init__(self, name: str, operation: Callable): + self.name = name + self.operation = operation + + def __call__(self, *args, **kwargs): + return self.operation(*args, **kwargs) + + +class Task: + """ Data with parameters for operation """ + + def __init__(self, data: Any, parameters: Any): + self.data = data + self.parameters = parameters + + def run_on_node(self, node: Node): + result = node(self.data, self.parameters) diff --git a/golem/core/optimisers/optimizer.py b/golem/core/optimisers/optimizer.py index 5a54bb50..d67b2394 100644 --- a/golem/core/optimisers/optimizer.py +++ b/golem/core/optimisers/optimizer.py @@ -20,8 +20,6 @@ from golem.core.optimisers.random_graph_factory import RandomGraphFactory, RandomGrowthGraphFactory from golem.utilities.random import RandomStateHandler -STRUCTURAL_DIVERSITY_FREQUENCY_CHECK = 5 - def do_nothing_callback(*args, **kwargs): pass @@ -46,7 +44,6 @@ class AlgorithmParameters: max_pop_size: Optional[int] = 55 adaptive_depth: bool = False adaptive_depth_max_stagnation: int = 3 - structural_diversity_frequency_check: int = STRUCTURAL_DIVERSITY_FREQUENCY_CHECK @dataclass diff --git a/golem/core/optimisers/populational_optimizer.py b/golem/core/optimisers/populational_optimizer.py index c95be9ce..2144f19d 100644 --- a/golem/core/optimisers/populational_optimizer.py +++ b/golem/core/optimisers/populational_optimizer.py @@ -2,7 +2,6 @@ from random import choice from typing import Any, Optional, Sequence, Dict -from golem.core.constants import MIN_POP_SIZE from golem.core.dag.graph import Graph from golem.core.optimisers.archive import GenerationKeeper from golem.core.optimisers.genetic.evaluation import MultiprocessingDispatcher, SequentialDispatcher @@ -44,8 +43,10 @@ def __init__(self, self.generations = GenerationKeeper(self.objective, keep_n_best=requirements.keep_n_best) self.timer = OptimisationTimer(timeout=self.requirements.timeout) - dispatcher_type = MultiprocessingDispatcher if self.requirements.parallelization_mode == 'populational' else \ - SequentialDispatcher + # dispatcher_type = MultiprocessingDispatcher if self.requirements.parallelization_mode == 'populational' else \ + # SequentialDispatcher + + dispatcher_type = SequentialDispatcher self.eval_dispatcher = dispatcher_type(adapter=graph_generation_params.adapter, n_jobs=requirements.n_jobs, @@ -69,10 +70,7 @@ def __init__(self, 'Optimisation finished: Early stopping iterations criteria was satisfied' ).add_condition( lambda: self.generations.stagnation_time_duration >= max_stagnation_time, - 'Optimisation finished: Early stopping timeout criteria was satisfied' - ) - # in how many generations structural diversity check should be performed - self.gen_structural_diversity_check = self.graph_optimizer_params.structural_diversity_frequency_check + 'Optimisation finished: Early stopping timeout criteria was satisfied') @property def current_generation_num(self) -> int: @@ -94,10 +92,6 @@ def optimise(self, objective: ObjectiveFunction) -> Sequence[Graph]: while not self.stop_optimization(): try: new_population = self._evolve_population(evaluator) - if self.gen_structural_diversity_check != -1 \ - and self.generations.generation_num % self.gen_structural_diversity_check == 0 \ - and self.generations.generation_num != 0: - new_population = self.get_structure_unique_population(new_population, evaluator) pbar.update() except EvaluationAttemptsError as ex: self.log.warning(f'Composition process was stopped due to: {ex}') @@ -150,17 +144,6 @@ def _log_to_history(self, population: PopulationT, label: Optional[str] = None, if self.requirements.history_dir: self.history.save_current_results(self.requirements.history_dir) - def get_structure_unique_population(self, population: PopulationT, evaluator: EvaluationOperator) -> PopulationT: - """ Increases structurally uniqueness of population to prevent stagnation in optimization process. - Returned population may be not entirely unique, if the size of unique population is lower than MIN_POP_SIZE. """ - unique_population_with_ids = {ind.graph.descriptive_id: ind for ind in population} - unique_population = list(unique_population_with_ids.values()) - - # if size of unique population is too small, then extend it to MIN_POP_SIZE by repeating individuals - if len(unique_population) < MIN_POP_SIZE: - unique_population = self._extend_population(pop=unique_population, target_pop_size=MIN_POP_SIZE) - return evaluator(unique_population) - # TODO: remove this hack (e.g. provide smth like FitGraph with fit/unfit interface) def _try_unfit_graph(graph: Any): diff --git a/test/integration/test_structural_diversity.py b/test/integration/test_structural_diversity.py index c5f4fe6a..4c05213a 100644 --- a/test/integration/test_structural_diversity.py +++ b/test/integration/test_structural_diversity.py @@ -8,14 +8,13 @@ from golem.core.optimisers.genetic.operators.base_mutations import MutationTypesEnum from golem.core.optimisers.genetic.operators.crossover import CrossoverTypesEnum from golem.core.optimisers.objective import Objective -from golem.core.optimisers.optimizer import STRUCTURAL_DIVERSITY_FREQUENCY_CHECK from golem.metrics.edit_distance import tree_edit_dist from golem.metrics.graph_metrics import degree_distance DIVERSITY_THRESHOLD = 0.5 -def set_up_params(gen_structural_check: int): +def set_up_params(gen_structural_check: int = -1): """ It is possible to run test with and without structural check. To run test without structural test set `gen_structural_check` to -1, otherwise it has to be set to positive integer value. """ @@ -32,7 +31,6 @@ def set_up_params(gen_structural_check: int): MutationTypesEnum.single_drop, ], crossover_types=[CrossoverTypesEnum.none], - structural_diversity_frequency_check=gen_structural_check ) return gp_params @@ -41,7 +39,7 @@ def test_structural_diversity(): """ Checks population's structural diversity. Diversity should not be lower than DIVERSITY_THRESHOLD. """ target_graph = generate_labeled_graph('tree', 4, node_labels=['x']) node_types = ['x', 'y', 'z', 'w', 'v', 'u'] - gen_structural_check = STRUCTURAL_DIVERSITY_FREQUENCY_CHECK + gen_structural_check = -1 reset_diversity_threshold = 0.90 gp_params = set_up_params(gen_structural_check=gen_structural_check) diff --git a/test/unit/optimizers/gp_operators/test_genetic_pipelines.py b/test/unit/optimizers/gp_operators/test_genetic_pipelines.py new file mode 100644 index 00000000..b3ab483a --- /dev/null +++ b/test/unit/optimizers/gp_operators/test_genetic_pipelines.py @@ -0,0 +1,219 @@ +import random +from collections import Counter +from math import ceil +from itertools import product +from typing import Optional + +import pytest +from examples.synthetic_graph_evolution.generators import generate_labeled_graph + +from golem.core.adapter.nx_adapter import BaseNetworkxAdapter +from golem.core.optimisers.genetic.operators.node import GeneticOperatorTask, TaskStagesEnum, GeneticNode +from golem.core.optimisers.genetic.operators.operator import EvaluationOperator, Operator, PopulationT +from golem.core.optimisers.opt_history_objects.individual import Individual + + +class UncorrectIndividualsCount(Exception): + pass + + +class Mock: + def __init__(self, success_prob: float = 1.0): + self.success_prob = success_prob + + def __call__(self): + if random.random() > self.success_prob: + raise Exception() + + +class MockOperator(Mock, Operator): + def __init__(self, *args, + individuals_input_count: Optional[int] = None, + individuals_output_count: Optional[int] = None, + **kwargs): + super().__init__(*args, **kwargs) + self.individuals_input_count = individuals_input_count + self.individuals_output_count = individuals_output_count + + def __call__(self, individuals, operation_type = None): + if ((self.individuals_input_count is not None and len(individuals) > self.individuals_input_count) or + len(individuals) == 0): + raise UncorrectIndividualsCount() + super().__call__() + if self.individuals_output_count is None: + return individuals, operation_type + else: + return individuals[:1] * self.individuals_output_count, operation_type + + +class MockEvaluator(Mock, EvaluationOperator): + def __call__(self, pop): + super().__call__() + n_valid = int(ceil(self.success_prob * len(pop))) + evaluated = random.sample(pop, n_valid) + return evaluated + +def get_rand_population(pop_size: int = 10) -> PopulationT: + graph_sizes = list(range(5, 15)) + random_pop = [generate_labeled_graph('tree', size=random.choice(graph_sizes), + directed=True) + for _ in range(pop_size)] + graph_pop = BaseNetworkxAdapter().adapt(random_pop) + individuals = [Individual(graph) for graph in graph_pop] + return individuals + + +def get_random_task(pop_size: int = 10, operator_type: str = 'test_operator_type', **params): + return GeneticOperatorTask(individuals=get_rand_population(pop_size), + operator_type=operator_type, + **params) + + +def test_genetic_task_constructor(): + individuals = get_rand_population() + operator_type = 'test_operator_type' + + task = GeneticOperatorTask(individuals=individuals, + operator_type=operator_type) + + # check task constructor + assert task.individuals == individuals + assert task.operator_type == operator_type + assert task.left_tries == 1 + assert task.exception is None + assert task.stage is TaskStagesEnum.INIT + + +def test_genetic_failed_task(): + task = get_random_task() + + left_tries = task.left_tries + stage = task.stage + + exception = Exception('test') + new_task = task.create_failed_task(exception) + + assert id(new_task) != id(task) + assert task.exception is None + assert task.left_tries == left_tries + assert task.stage == stage + + assert new_task.exception == exception + assert (task.left_tries - new_task.left_tries) == 1 + assert new_task.stage is TaskStagesEnum.FAIL + + for attr in ('individuals', 'operator_type', 'next_stage_node', 'prev_stage_node', 'parent_task'): + assert getattr(new_task, attr) == getattr(task, attr) + + +def test_genetic_successive_task(): + task = get_random_task() + + stage = task.stage + individuals = task.individuals + new_individuals = get_rand_population(5) + new_task = task.create_successive_task(new_individuals) + + assert id(new_task) != id(task) + assert task.stage == stage + assert task.individuals == individuals + + assert task.left_tries == new_task.left_tries + assert new_task.stage is TaskStagesEnum.SUCCESS + assert new_task.individuals == new_individuals + assert new_task.parent_task == task + + for attr in ('next_stage_node', 'prev_stage_node'): + assert getattr(new_task, attr) == getattr(task, attr) + + +@pytest.mark.parametrize(['stage', 'success_outputs', 'left_tries', + 'individuals_input_count', 'individuals_output_count', + 'repeat_count', 'tries_count'], + product([TaskStagesEnum.INIT, TaskStagesEnum.SUCCESS], # stage + [[None], ['1', '2', '3']], # success_outputs + [1, 3], # left_tries + [1, 3, None], # individuals_input_count + [1, 3, None], # individuals_output_count + [1, 3], # repeat_count + [1, 3], # tries_count + )) +def test_genetic_node_with_nonfailed_task(stage, success_outputs, left_tries, individuals_input_count, + individuals_output_count, repeat_count, tries_count): + pop_size = 10 + node_name = 'test' + + task = get_random_task(pop_size=pop_size, stage=stage, left_tries=left_tries) + operator = MockOperator(success_prob=1, individuals_input_count=individuals_input_count, + individuals_output_count=individuals_output_count) + node = GeneticNode(name=node_name, operator=operator, success_outputs=success_outputs, + individuals_input_count=individuals_input_count, + repeat_count=repeat_count, tries_count=tries_count) + + final_tasks = node(task) + + # check final_tasks count + # individuals that MockOperator returns + _individuals_output_count = individuals_output_count or pop_size + # individuals that MockOperator can get + _individuals_input_count = individuals_input_count or pop_size + # if there are repeats condition or task is divided due to + # unappropriate individuals count (higher than individuals_input_count) + # then incoming tasks are copied and divided + incoming_tasks_count = ceil(pop_size / _individuals_input_count) * repeat_count + # then only one task may be processed + incoming_tasks_count -= 1 + processed_tasks_count = 1 * len(success_outputs) + assert len(final_tasks) == (incoming_tasks_count + processed_tasks_count) + + + # check tasks stage + processed_task_stage = TaskStagesEnum.FINISH if success_outputs == [None] else TaskStagesEnum.SUCCESS + if stage is TaskStagesEnum.INIT: + assert sum(_task.stage is TaskStagesEnum.INIT for _task in final_tasks) == incoming_tasks_count + assert sum(_task.stage is processed_task_stage for _task in final_tasks) == processed_tasks_count + elif processed_task_stage is TaskStagesEnum.SUCCESS: + assert all(_task.stage is TaskStagesEnum.SUCCESS for _task in final_tasks) + else: + assert sum(_task.stage is TaskStagesEnum.SUCCESS for _task in final_tasks) == incoming_tasks_count + assert sum(_task.stage is processed_task_stage for _task in final_tasks) == processed_tasks_count + + # check left_tries + assert all(_task.left_tries == tries_count for _task in final_tasks) + + # check prev and next nodes + assert sum(_task.prev_stage_node == node_name for _task in final_tasks) == processed_tasks_count + next_nodes = Counter(_task.next_stage_node for _task in final_tasks) + if success_outputs == [None]: + assert next_nodes[None] == len(final_tasks) + else: + assert set(next_nodes[name] for name in success_outputs) == {1} + + # check that processed task has correct individuals count + assert all(len(_task.individuals) == (individuals_output_count or _individuals_input_count) + for _task in final_tasks if _task.prev_stage_node == node.name) + + +@pytest.mark.parametrize(['success_outputs', 'left_tries', + 'individuals_input_count', 'individuals_output_count', + 'repeat_count', 'tries_count'], + product([[None], ['1', '2', '3']], # success_outputs + [1, 3], # left_tries + [1, 3, None], # individuals_input_count + [1, 3, None], # individuals_output_count + [1, 3], # repeat_count + [1, 3], # tries_count + )) +def test_genetic_node_with_nonfailed_task(success_outputs, left_tries, individuals_input_count, + individuals_output_count, repeat_count, tries_count): + pop_size = 10 + node_name = 'test' + + task = get_random_task(pop_size=pop_size, stage=TaskStagesEnum.FAIL, left_tries=left_tries) + operator = MockOperator(success_prob=1, individuals_input_count=individuals_input_count, + individuals_output_count=individuals_output_count) + node = GeneticNode(name=node_name, operator=operator, success_outputs=success_outputs, + individuals_input_count=individuals_input_count, + repeat_count=repeat_count, tries_count=tries_count) + + final_tasks = node(task) diff --git a/test/unit/optimizers/gp_operators/test_gp_operators.py b/test/unit/optimizers/gp_operators/test_gp_operators.py index 77ff0a6b..33b4cb5f 100644 --- a/test/unit/optimizers/gp_operators/test_gp_operators.py +++ b/test/unit/optimizers/gp_operators/test_gp_operators.py @@ -4,13 +4,11 @@ import pytest from golem.core.adapter import DirectAdapter -from golem.core.constants import MIN_POP_SIZE from golem.core.dag.graph_utils import nodes_from_layer from golem.core.dag.linked_graph import LinkedGraph from golem.core.dag.linked_graph_node import LinkedGraphNode from golem.core.optimisers.archive import ParetoFront from golem.core.optimisers.fitness.multi_objective_fitness import MultiObjFitness -from golem.core.optimisers.genetic.evaluation import SequentialDispatcher from golem.core.optimisers.genetic.gp_operators import filter_duplicates, replace_subtrees, equivalent_subtree from golem.core.optimisers.genetic.gp_params import GPAlgorithmParameters from golem.core.optimisers.objective import Objective @@ -19,7 +17,7 @@ from golem.core.optimisers.optimizer import GraphGenerationParams from golem.core.optimisers.populational_optimizer import PopulationalOptimizer from test.unit.utils import graph_first, graph_second, graph_third, graph_fourth, graph_with_multi_roots_second, \ - graph_with_multi_roots_first, graphs_same, RandomMetric + graph_with_multi_roots_first, RandomMetric def get_graph_with_operation(operation: str) -> LinkedGraph: @@ -128,34 +126,3 @@ def test_graphs_with_multi_root_equivalent_subtree(): similar_nodes_first_and_second = equivalent_subtree(graph_first=graph_first, graph_second=graph_second, with_primary_nodes=True) assert len(similar_nodes_first_and_second) == 8 - - -def test_structural_diversity(): - """ Checks if `get_structure_unique_population` method returns population without structural duplicates. """ - operations = ['a', 'b', 'c', 'd', 'e'] - population_with_reps = population_with_structural_duplicates(operations=operations) - optimizer, objective = set_up_optimizer(operations=operations) - - adapter = DirectAdapter() - evaluator = SequentialDispatcher(adapter).dispatch(objective) - new_population = optimizer.get_structure_unique_population(population_with_reps, evaluator) - - target_new_population = [] - for op in operations: - target_new_population += [Individual(adapter.adapt(get_graph_with_operation(operation=op)))] - - for i in range(len(target_new_population)): - assert graphs_same(new_population[i].graph, target_new_population[i].graph) - - -def test_recover_pop_size_after_structure_check(): - """ Checks that `get_structure_unique_population` extends population - if after structural check there sre less than MIN_POP_SIZE individuals in population. """ - operations = ['a', 'b', 'c'] - population_with_reps = population_with_structural_duplicates(operations=operations) - optimizer, objective = set_up_optimizer(operations=operations) - adapter = DirectAdapter() - evaluator = SequentialDispatcher(adapter).dispatch(objective) - new_population = optimizer.get_structure_unique_population(population_with_reps, evaluator) - - assert len(new_population) == MIN_POP_SIZE diff --git a/test/unit/optimizers/gp_operators/test_reproduction_controller.py b/test/unit/optimizers/gp_operators/test_reproduction_controller.py index e1034f45..752a404d 100644 --- a/test/unit/optimizers/gp_operators/test_reproduction_controller.py +++ b/test/unit/optimizers/gp_operators/test_reproduction_controller.py @@ -2,15 +2,15 @@ from math import ceil from typing import Optional -import numpy as np import pytest from examples.synthetic_graph_evolution.generators import generate_labeled_graph from golem.core.adapter.nx_adapter import BaseNetworkxAdapter from golem.core.optimisers.genetic.gp_params import GPAlgorithmParameters from golem.core.optimisers.genetic.operators.base_mutations import MutationTypesEnum -from golem.core.optimisers.genetic.operators.crossover import Crossover, CrossoverTypesEnum -from golem.core.optimisers.genetic.operators.mutation import Mutation +from golem.core.optimisers.genetic.operators.crossover import Crossover, CrossoverTypesEnum, \ + SinglePredefinedGraphCrossover +from golem.core.optimisers.genetic.operators.mutation import Mutation, SinglePredefinedGraphMutation from golem.core.optimisers.genetic.operators.operator import EvaluationOperator, PopulationT from golem.core.optimisers.genetic.operators.reproduction import ReproductionController from golem.core.optimisers.genetic.operators.selection import Selection @@ -61,29 +61,15 @@ def reproducer() -> ReproductionController: rules_for_constraint=[]) requirements = GraphRequirements() - mutation = Mutation(params, requirements, graph_gen_params) - crossover = Crossover(params, requirements, graph_gen_params) + mutation = SinglePredefinedGraphMutation(params, requirements, graph_gen_params) + crossover = SinglePredefinedGraphCrossover(params, requirements, graph_gen_params) selection = Selection(params, requirements) reproduction = ReproductionController(params, selection, mutation, crossover) return reproduction -@pytest.mark.parametrize('success_rate', [0.4, 0.5, 0.9, 1.0]) -def test_mean_success_rate(reproducer: ReproductionController, success_rate: float): - """Tests that Reproducer correctly estimates average success rate""" - assert np.isclose(reproducer.mean_success_rate, 1.0) - - evaluator = MockEvaluator(success_rate) - pop = get_rand_population(reproducer.parameters.pop_size) - num_iters = 50 - for i in range(num_iters): - pop = reproducer.reproduce(pop, evaluator) - - assert np.isclose(reproducer.mean_success_rate, success_rate, rtol=0.1) - - -@pytest.mark.parametrize('success_rate', [0.0, 0.1]) +@pytest.mark.parametrize('success_rate', [0.0]) def test_too_little_valid_evals(reproducer: ReproductionController, success_rate: float): evaluator = MockEvaluator(success_rate) pop = get_rand_population(reproducer.parameters.pop_size) @@ -101,7 +87,7 @@ def test_minimal_valid_evals(reproducer: ReproductionController, success_rate: f for i in range(num_iters): pop = reproducer.reproduce(pop, evaluator) actual_valid_ratio = len(pop) / parameters.pop_size - assert parameters.required_valid_ratio > actual_valid_ratio >= reproducer._minimum_valid_ratio + assert actual_valid_ratio >= reproducer._minimum_valid_ratio @pytest.mark.parametrize('success_rate', [0.4, 0.9, 1.0]) @@ -125,7 +111,7 @@ def test_pop_size_progression(reproducer: ReproductionController, success_rate: assert (actual_pop_size > len(prev_pop) or actual_pop_size >= parameters.max_pop_size * required_valid) # and that this increase follows the one from parameters - assert 1.0 >= (actual_pop_size / parameters.pop_size) >= required_valid + assert (actual_pop_size / parameters.pop_size) >= required_valid # update pop size parameters.pop_size = pop_size_progress.next(pop)