diff --git a/golem/core/optimisers/genetic/operators/mutation.py b/golem/core/optimisers/genetic/operators/mutation.py index a0198571..f09cd4de 100644 --- a/golem/core/optimisers/genetic/operators/mutation.py +++ b/golem/core/optimisers/genetic/operators/mutation.py @@ -6,14 +6,14 @@ from golem.core.dag.graph import Graph from golem.core.optimisers.adaptive.mab_agents.contextual_mab_agent import ContextualMultiArmedBanditAgent -from golem.core.optimisers.adaptive.mab_agents.neural_contextual_mab_agent import NeuralContextualMultiArmedBanditAgent from golem.core.optimisers.adaptive.mab_agents.mab_agent import MultiArmedBanditAgent +from golem.core.optimisers.adaptive.mab_agents.neural_contextual_mab_agent import NeuralContextualMultiArmedBanditAgent from golem.core.optimisers.adaptive.operator_agent import \ - OperatorAgent, RandomAgent, ExperienceBuffer, MutationAgentTypeEnum + OperatorAgent, RandomAgent, MutationAgentTypeEnum +from golem.core.optimisers.adaptive.experience_buffer import ExperienceBuffer from golem.core.optimisers.genetic.operators.base_mutations import \ base_mutations_repo, MutationTypesEnum from golem.core.optimisers.genetic.operators.operator import PopulationT, Operator -from golem.core.optimisers.graph import OptGraph from golem.core.optimisers.opt_history_objects.individual import Individual from golem.core.optimisers.opt_history_objects.parent_operator import ParentOperator from golem.core.optimisers.optimization_parameters import GraphRequirements, OptimizationParameters @@ -22,8 +22,8 @@ if TYPE_CHECKING: from golem.core.optimisers.genetic.gp_params import GPAlgorithmParameters -MutationFunc = Callable[[Graph, GraphRequirements, GraphGenerationParams, AlgorithmParameters], Graph] MutationType = Union[MutationTypesEnum, Callable] +MutationFunc = Callable[[Graph, GraphRequirements, GraphGenerationParams, AlgorithmParameters], Graph] MutationIdType = Hashable MutationRepo = Mapping[MutationIdType, MutationFunc] @@ -65,7 +65,7 @@ def _init_operator_agent(graph_gen_params: GraphGenerationParams, agent = NeuralContextualMultiArmedBanditAgent( actions=parameters.mutation_types, context_agent_type=parameters.context_agent_type, - available_operations=graph_gen_params.node_factory.available_nodes, + available_operations=graph_gen_params.node_factory.get_all_available_operations(), n_jobs=requirements.n_jobs) # if agent was specified pretrained (with instance) elif isinstance(parameters.adaptive_mutation_type, OperatorAgent): @@ -78,70 +78,78 @@ def _init_operator_agent(graph_gen_params: GraphGenerationParams, def agent(self) -> OperatorAgent: return self._operator_agent - def __call__(self, - population: Union[Individual, PopulationT], - mutation_type: Union[None, MutationTypesEnum, Callable] = None, - ) -> Union[Individual, PopulationT]: + def __call__(self, population: Union[Individual, PopulationT]) -> Union[Individual, PopulationT]: if isinstance(population, Individual): population = [population] - final_population, _, application_attempts = \ - tuple(zip(*map(lambda individual: self._mutation(individual, mutation_type=mutation_type), population))) - - # drop individuals to which mutations could not be applied - final_population = [ind for ind, init_ind, attempt in zip(final_population, population, application_attempts) - if not attempt or ind.graph != init_ind.graph] + final_population = [] + for individual in population: + new_ind, _, applied = self._mutation(individual) + if not applied or new_ind.graph != individual.graph: + final_population.append(new_ind) if len(population) == 1: return final_population[0] if final_population else final_population return final_population - def _mutation(self, - individual: Individual, - mutation_type: Union[None, MutationTypesEnum, Callable] = None, - ) -> Tuple[Individual, Union[MutationTypesEnum, Callable], bool]: + def _mutation(self, individual: Individual) -> Tuple[Individual, Optional[MutationIdType], bool]: """ Function applies mutation operator to graph """ - new_graph = deepcopy(individual.graph) - mutation_type = mutation_type or self._operator_agent.choose_action(new_graph) - applied = self._will_mutation_be_applied(mutation_type) - if applied: - new_graph = self._apply_mutations(new_graph, mutation_type=mutation_type) + application_attempt = False + mutation_applied = None + for _ in range(self.parameters.max_num_of_operator_attempts): + new_graph = deepcopy(individual.graph) + + new_graph, mutation_applied = self._apply_mutations(new_graph) + if mutation_applied is None: + continue + application_attempt = True is_correct_graph = self.graph_generation_params.verifier(new_graph) if is_correct_graph: parent_operator = ParentOperator(type_='mutation', - operators=mutation_type, + operators=mutation_applied, parent_individuals=individual) individual = Individual(new_graph, parent_operator, metadata=self.requirements.static_individual_metadata) + break else: # Collect invalid actions - self.agent_experience.collect_experience(individual.graph, mutation_type, reward=-1.0) - return individual, mutation_type, applied + self.agent_experience.collect_experience(individual, mutation_applied, reward=-1.0) + else: + self.log.debug('Number of mutation attempts exceeded. ' + 'Please check optimization parameters for correctness.') + return individual, mutation_applied, application_attempt - def _sample_num_of_mutations(self, mutation_type: Union[MutationTypesEnum, Callable]) -> int: + def _sample_num_of_mutations(self) -> int: # most of the time returns 1 or rarely several mutations - # if mutation is custom apply it only once - is_custom_mutation = isinstance(mutation_type, Callable) - if not is_custom_mutation and self.parameters.variable_mutation_num: + if self.parameters.variable_mutation_num: num_mut = max(int(round(np.random.lognormal(0, sigma=0.5))), 1) else: num_mut = 1 return num_mut - def _apply_mutations(self, - new_graph: OptGraph, - mutation_type: Union[None, MutationTypesEnum, Callable] = None, - ) -> Tuple[OptGraph]: + def _apply_mutations(self, new_graph: Graph) -> Tuple[Graph, Optional[MutationIdType]]: """Apply mutation 1 or few times iteratively""" - mutation_count = self._sample_num_of_mutations(mutation_type=mutation_type) - for _ in range(mutation_count): + mutation_type = self._operator_agent.choose_action(new_graph) + mutation_applied = None + for _ in range(self._sample_num_of_mutations()): + new_graph, applied = self._adapt_and_apply_mutation(new_graph, mutation_type) + if applied: + mutation_applied = mutation_type + is_custom_mutation = isinstance(mutation_type, Callable) + if is_custom_mutation: # custom mutation occurs once + break + return new_graph, mutation_applied + + def _adapt_and_apply_mutation(self, new_graph: Graph, mutation_type) -> Tuple[Graph, bool]: + applied = self._will_mutation_be_applied(mutation_type) + if applied: # get the mutation function and adapt it mutation_func = self._get_mutation_func(mutation_type) new_graph = mutation_func(new_graph, requirements=self.requirements, graph_gen_params=self.graph_generation_params, parameters=self.parameters) - return new_graph + return new_graph, applied def _will_mutation_be_applied(self, mutation_type: Union[MutationTypesEnum, Callable]) -> bool: return random() <= self.parameters.mutation_prob and mutation_type is not MutationTypesEnum.none @@ -153,3 +161,22 @@ def _get_mutation_func(self, mutation_type: Union[MutationTypesEnum, Callable]) mutation_func = self._mutations_repo[mutation_type] adapted_mutation_func = self.graph_generation_params.adapter.adapt_func(mutation_func) return adapted_mutation_func + + +class FastSingleMutation(Mutation): + def __call__(self, individual: Individual) -> Individual: + new_graph = deepcopy(individual.graph) + + mutation_type = self._operator_agent.choose_action(new_graph) + mutation_func = self._get_mutation_func(mutation_type) + + new_graph = mutation_func(new_graph, requirements=self.requirements, + graph_gen_params=self.graph_generation_params, + parameters=self.parameters) + + parent_operator = ParentOperator(type_='mutation', + operators=mutation_type, + parent_individuals=individual) + individual = Individual(new_graph, parent_operator, + metadata=self.requirements.static_individual_metadata) + return individual diff --git a/golem/core/optimisers/genetic/operators/reproduction.py b/golem/core/optimisers/genetic/operators/reproduction.py index 2aebdf33..7b75b7d4 100644 --- a/golem/core/optimisers/genetic/operators/reproduction.py +++ b/golem/core/optimisers/genetic/operators/reproduction.py @@ -82,136 +82,30 @@ def reproduce(self, population: PopulationT, evaluator: EvaluationOperator) -> P return new_population def _mutate_over_population(self, population: PopulationT, evaluator: EvaluationOperator) -> PopulationT: - # TODO n_jobs may be -1, should be fixed - n_jobs = self.mutation.requirements.n_jobs - n_jobs = 8 - target_pop_size = self.parameters.pop_size - population_descriptive_ids_mapping = {ind.graph.descriptive_id: ind for ind in population} - mutation_types = self.mutation._operator_agent.actions - left_tries = [target_pop_size * MAX_GRAPH_GEN_ATTEMPTS_AS_POP_SIZE_MULTIPLIER] - mutations_per_individual = ceil(target_pop_size / len(population)) - all_mutations_count_for_each_ind = {descriptive_id: 0 - for descriptive_id in population_descriptive_ids_mapping} - mutations_count = {mutation_type: 0 for mutation_type in mutation_types} - mutation_count_for_each_ind = {descriptive_id: {mutation_type: 0 for mutation_type in mutation_types} - for descriptive_id in population_descriptive_ids_mapping} - - # increase probability of mutation + # increase probability of mutation to not spend tries for no mutations initial_parameters = deepcopy(self.parameters) initial_parameters.mutation_prob = 1.0 self.mutation.update_requirements(parameters=initial_parameters) - # additional functions - def try_mutation(descriptive_id: str, mutation_type: Optional[MutationType] = None): - left_tries[0] -= 1 - return executor.submit(self._mutation_n_evaluation, descriptive_id, - population_descriptive_ids_mapping[descriptive_id], - mutation_type, evaluator) - - def check_and_try_mutation(parent_descriptive_id: str, - mutation_type: Optional[MutationType] = None, - count: int = 1): - # probs should be the same order as mutation_types - probs = dict(zip(mutation_types, self.mutation._operator_agent.get_action_probs())) - real_probs = {mutation_type: mutation_count_for_each_ind[parent_descriptive_id][mutation_type] / - max(1, all_mutations_count_for_each_ind[parent_descriptive_id]) - for mutation_type in mutation_types} - # check probability allows to make mutations - if (all_mutations_count_for_each_ind[parent_descriptive_id] == 0 or - probs[mutation_type] > real_probs[mutation_type] or - len(set(real_probs.values())) == 1): - # check that there is not enough mutations - if all_mutations_count_for_each_ind[parent_descriptive_id] < mutations_per_individual: - for _ in range(count): - futures.append(try_mutation(parent_descriptive_id, mutation_type)) - return True - return False - - def add_new_individual_to_new_population(parent_descriptive_id: str, - mutation_type: MutationType, - new_individual: Individual): - if new_individual: - descriptive_id = new_individual.graph.descriptive_id - if descriptive_id not in self._pop_graph_descriptive_ids: - mutation_count_for_each_ind[parent_descriptive_id][mutation_type] += 1 - all_mutations_count_for_each_ind[parent_descriptive_id] += 1 - mutations_count[mutation_type] += 1 - - new_population.append(new_individual) - self._pop_graph_descriptive_ids.add(descriptive_id) - return True - return False - - # start reproducing + max_tries = self.parameters.pop_size * MAX_GRAPH_GEN_ATTEMPTS_AS_POP_SIZE_MULTIPLIER + mutation_fun = partial(self._mutation_n_evaluation, evaluator=evaluator) new_population = [] - executor = get_reusable_executor(max_workers=n_jobs) - - # stage 1 - # set up each type of mutation for each individual - futures = deque(try_mutation(descriptive_id, mutation_type) - for mutation_type in np.random.permutation(mutation_types) - for descriptive_id in np.random.permutation(list(population_descriptive_ids_mapping))) - - # stage 2 - delayed_mutations = deque() - individual_id_with_lowest_mutations, rarest_mutation_type = None, None - times = [] - while futures: - if len(new_population) == target_pop_size or left_tries[0] <= 0: - break - - # get next finished future - while True: - future = futures.popleft() - if future._state == 'FINISHED': break - futures.append(future) - time.sleep(0.01) # to prevent flooding - - # add new individual to new population - parent_descriptive_id, mutation_type, new_ind = future.result() - added = add_new_individual_to_new_population(parent_descriptive_id, mutation_type, new_ind) - - # define rarest ind and mutation - if added: - all_mutations = sum(mutations_count.values()) - probs = dict(zip(mutation_types, self.mutation._operator_agent.get_action_probs())) - real_probs = {mutation_type: mutations_count[mutation_type] / (all_mutations * probs[mutation_type]) - for mutation_type in mutation_types} - - frequent_mutation_type = max(real_probs.items(), key=lambda x: x[1])[0] - rarest_mutation_type = min(real_probs.items(), key=lambda x: x[1])[0] - individual_id_with_lowest_mutations = min(all_mutations_count_for_each_ind.items(), - key=lambda x: x[1])[0] - - # create new future with same mutation and same individual - count = (1 + - (individual_id_with_lowest_mutations == parent_descriptive_id) + - (rarest_mutation_type == mutation_type)) - applied = check_and_try_mutation(parent_descriptive_id, mutation_type, count) - - # if there is no need in parent_descriptive_id & mutation_type mutation - # then try to find new mutation - count = n_jobs + 1 - len(futures) - if not applied: - delayed_mutations.append((parent_descriptive_id, mutation_type)) - for _ in range(len(delayed_mutations) - 1): - parent_descriptive_id, mutation_type = delayed_mutations.popleft() - if ((individual_id_with_lowest_mutations == parent_descriptive_id or - mutation_type == rarest_mutation_type) and - mutation_type != frequent_mutation_type): - futures.append(try_mutation(parent_descriptive_id, mutation_type)) - applied = True - else: - applied = check_and_try_mutation(parent_descriptive_id, mutation_type) - count -= applied - if count <= 0: break - if not applied: delayed_mutations.append((parent_descriptive_id, mutation_type)) - - # if there are any feature then process it and add new_ind to new_population if it is ready - for future in futures: - if future._state == 'FINISHED': - add_new_individual_to_new_population(*future.result()) - executor.shutdown(wait=False) + + new_population = list(map(mutation_fun, cycle(population))) + + with Parallel(n_jobs=self.mutation.requirements.n_jobs, return_as='generator') as parallel: + new_ind_generator = parallel(delayed(mutation_fun)(ind) + for ind, _ in zip(cycle(population), range(max_tries))) + for new_ind, mutation_type, applied in new_ind_generator: + if applied: + descriptive_id = new_ind.graph.descriptive_id + if descriptive_id not in self._pop_graph_descriptive_ids: + new_population.append(new_ind) + self._pop_graph_descriptive_ids.add(descriptive_id) + if len(new_population) >= self.parameters.pop_size: + break + else: + self.mutation.agent_experience.collect_experience(new_ind.graph, mutation_type, reward=-1.0) # Reset mutation probabilities to default self.mutation.update_requirements(requirements=self.parameters) @@ -231,16 +125,10 @@ def _check_final_population(self, population: PopulationT) -> None: f'have {len(population)},' f' required {target_pop_size}!\n' + helpful_msg) - def _mutation_n_evaluation(self, - descriptive_id: str, - individual: Individual, - mutation_type: Optional[MutationType], - evaluator: EvaluationOperator): - individual, mutation_type, applied = self.mutation._mutation(individual, mutation_type=mutation_type) - if applied and individual and self.verifier(individual.graph): + def _mutation_n_evaluation(self, individual: Individual, evaluator: EvaluationOperator): + individual, mutation_type, applied = self.mutation._mutation(individual) + if individual and self.verifier(individual.graph): individuals = evaluator([individual]) if individuals: - # if all is ok return all data - return descriptive_id, mutation_type, individuals[0] - # if something go wrong do not return new individual - return descriptive_id, mutation_type, None + return individuals[0], mutation_type, applied + return individual, mutation_type, False