Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kasyanovse committed Oct 26, 2023
1 parent e18cee5 commit cee5afb
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 174 deletions.
103 changes: 65 additions & 38 deletions golem/core/optimisers/genetic/operators/mutation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

from golem.core.dag.graph import Graph
from golem.core.optimisers.adaptive.mab_agents.contextual_mab_agent import ContextualMultiArmedBanditAgent
from golem.core.optimisers.adaptive.mab_agents.neural_contextual_mab_agent import NeuralContextualMultiArmedBanditAgent
from golem.core.optimisers.adaptive.mab_agents.mab_agent import MultiArmedBanditAgent
from golem.core.optimisers.adaptive.mab_agents.neural_contextual_mab_agent import NeuralContextualMultiArmedBanditAgent
from golem.core.optimisers.adaptive.operator_agent import \
OperatorAgent, RandomAgent, ExperienceBuffer, MutationAgentTypeEnum
OperatorAgent, RandomAgent, MutationAgentTypeEnum
from golem.core.optimisers.adaptive.experience_buffer import ExperienceBuffer
from golem.core.optimisers.genetic.operators.base_mutations import \
base_mutations_repo, MutationTypesEnum
from golem.core.optimisers.genetic.operators.operator import PopulationT, Operator
from golem.core.optimisers.graph import OptGraph
from golem.core.optimisers.opt_history_objects.individual import Individual
from golem.core.optimisers.opt_history_objects.parent_operator import ParentOperator
from golem.core.optimisers.optimization_parameters import GraphRequirements, OptimizationParameters
Expand All @@ -22,8 +22,8 @@
if TYPE_CHECKING:
from golem.core.optimisers.genetic.gp_params import GPAlgorithmParameters

MutationFunc = Callable[[Graph, GraphRequirements, GraphGenerationParams, AlgorithmParameters], Graph]
MutationType = Union[MutationTypesEnum, Callable]
MutationFunc = Callable[[Graph, GraphRequirements, GraphGenerationParams, AlgorithmParameters], Graph]
MutationIdType = Hashable
MutationRepo = Mapping[MutationIdType, MutationFunc]

Expand Down Expand Up @@ -65,7 +65,7 @@ def _init_operator_agent(graph_gen_params: GraphGenerationParams,
agent = NeuralContextualMultiArmedBanditAgent(
actions=parameters.mutation_types,
context_agent_type=parameters.context_agent_type,
available_operations=graph_gen_params.node_factory.available_nodes,
available_operations=graph_gen_params.node_factory.get_all_available_operations(),
n_jobs=requirements.n_jobs)
# if agent was specified pretrained (with instance)
elif isinstance(parameters.adaptive_mutation_type, OperatorAgent):
Expand All @@ -78,70 +78,78 @@ def _init_operator_agent(graph_gen_params: GraphGenerationParams,
def agent(self) -> OperatorAgent:
return self._operator_agent

def __call__(self,
population: Union[Individual, PopulationT],
mutation_type: Union[None, MutationTypesEnum, Callable] = None,
) -> Union[Individual, PopulationT]:
def __call__(self, population: Union[Individual, PopulationT]) -> Union[Individual, PopulationT]:
if isinstance(population, Individual):
population = [population]

final_population, _, application_attempts = \
tuple(zip(*map(lambda individual: self._mutation(individual, mutation_type=mutation_type), population)))

# drop individuals to which mutations could not be applied
final_population = [ind for ind, init_ind, attempt in zip(final_population, population, application_attempts)
if not attempt or ind.graph != init_ind.graph]
final_population = []
for individual in population:
new_ind, _, applied = self._mutation(individual)
if not applied or new_ind.graph != individual.graph:
final_population.append(new_ind)

if len(population) == 1:
return final_population[0] if final_population else final_population

return final_population

def _mutation(self,
individual: Individual,
mutation_type: Union[None, MutationTypesEnum, Callable] = None,
) -> Tuple[Individual, Union[MutationTypesEnum, Callable], bool]:
def _mutation(self, individual: Individual) -> Tuple[Individual, Optional[MutationIdType], bool]:
""" Function applies mutation operator to graph """
new_graph = deepcopy(individual.graph)
mutation_type = mutation_type or self._operator_agent.choose_action(new_graph)
applied = self._will_mutation_be_applied(mutation_type)
if applied:
new_graph = self._apply_mutations(new_graph, mutation_type=mutation_type)
application_attempt = False
mutation_applied = None
for _ in range(self.parameters.max_num_of_operator_attempts):
new_graph = deepcopy(individual.graph)

new_graph, mutation_applied = self._apply_mutations(new_graph)
if mutation_applied is None:
continue
application_attempt = True
is_correct_graph = self.graph_generation_params.verifier(new_graph)
if is_correct_graph:
parent_operator = ParentOperator(type_='mutation',
operators=mutation_type,
operators=mutation_applied,
parent_individuals=individual)
individual = Individual(new_graph, parent_operator,
metadata=self.requirements.static_individual_metadata)
break
else:
# Collect invalid actions
self.agent_experience.collect_experience(individual.graph, mutation_type, reward=-1.0)
return individual, mutation_type, applied
self.agent_experience.collect_experience(individual, mutation_applied, reward=-1.0)
else:
self.log.debug('Number of mutation attempts exceeded. '
'Please check optimization parameters for correctness.')
return individual, mutation_applied, application_attempt

def _sample_num_of_mutations(self, mutation_type: Union[MutationTypesEnum, Callable]) -> int:
def _sample_num_of_mutations(self) -> int:
# most of the time returns 1 or rarely several mutations
# if mutation is custom apply it only once
is_custom_mutation = isinstance(mutation_type, Callable)
if not is_custom_mutation and self.parameters.variable_mutation_num:
if self.parameters.variable_mutation_num:
num_mut = max(int(round(np.random.lognormal(0, sigma=0.5))), 1)
else:
num_mut = 1
return num_mut

def _apply_mutations(self,
new_graph: OptGraph,
mutation_type: Union[None, MutationTypesEnum, Callable] = None,
) -> Tuple[OptGraph]:
def _apply_mutations(self, new_graph: Graph) -> Tuple[Graph, Optional[MutationIdType]]:
"""Apply mutation 1 or few times iteratively"""
mutation_count = self._sample_num_of_mutations(mutation_type=mutation_type)
for _ in range(mutation_count):
mutation_type = self._operator_agent.choose_action(new_graph)
mutation_applied = None
for _ in range(self._sample_num_of_mutations()):
new_graph, applied = self._adapt_and_apply_mutation(new_graph, mutation_type)
if applied:
mutation_applied = mutation_type
is_custom_mutation = isinstance(mutation_type, Callable)
if is_custom_mutation: # custom mutation occurs once
break
return new_graph, mutation_applied

def _adapt_and_apply_mutation(self, new_graph: Graph, mutation_type) -> Tuple[Graph, bool]:
applied = self._will_mutation_be_applied(mutation_type)
if applied:
# get the mutation function and adapt it
mutation_func = self._get_mutation_func(mutation_type)
new_graph = mutation_func(new_graph, requirements=self.requirements,
graph_gen_params=self.graph_generation_params,
parameters=self.parameters)
return new_graph
return new_graph, applied

def _will_mutation_be_applied(self, mutation_type: Union[MutationTypesEnum, Callable]) -> bool:
return random() <= self.parameters.mutation_prob and mutation_type is not MutationTypesEnum.none
Expand All @@ -153,3 +161,22 @@ def _get_mutation_func(self, mutation_type: Union[MutationTypesEnum, Callable])
mutation_func = self._mutations_repo[mutation_type]
adapted_mutation_func = self.graph_generation_params.adapter.adapt_func(mutation_func)
return adapted_mutation_func


class FastSingleMutation(Mutation):
def __call__(self, individual: Individual) -> Individual:
new_graph = deepcopy(individual.graph)

mutation_type = self._operator_agent.choose_action(new_graph)
mutation_func = self._get_mutation_func(mutation_type)

new_graph = mutation_func(new_graph, requirements=self.requirements,
graph_gen_params=self.graph_generation_params,
parameters=self.parameters)

parent_operator = ParentOperator(type_='mutation',
operators=mutation_type,
parent_individuals=individual)
individual = Individual(new_graph, parent_operator,
metadata=self.requirements.static_individual_metadata)
return individual
160 changes: 24 additions & 136 deletions golem/core/optimisers/genetic/operators/reproduction.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,136 +82,30 @@ def reproduce(self, population: PopulationT, evaluator: EvaluationOperator) -> P
return new_population

def _mutate_over_population(self, population: PopulationT, evaluator: EvaluationOperator) -> PopulationT:
# TODO n_jobs may be -1, should be fixed
n_jobs = self.mutation.requirements.n_jobs
n_jobs = 8
target_pop_size = self.parameters.pop_size
population_descriptive_ids_mapping = {ind.graph.descriptive_id: ind for ind in population}
mutation_types = self.mutation._operator_agent.actions
left_tries = [target_pop_size * MAX_GRAPH_GEN_ATTEMPTS_AS_POP_SIZE_MULTIPLIER]
mutations_per_individual = ceil(target_pop_size / len(population))
all_mutations_count_for_each_ind = {descriptive_id: 0
for descriptive_id in population_descriptive_ids_mapping}
mutations_count = {mutation_type: 0 for mutation_type in mutation_types}
mutation_count_for_each_ind = {descriptive_id: {mutation_type: 0 for mutation_type in mutation_types}
for descriptive_id in population_descriptive_ids_mapping}

# increase probability of mutation
# increase probability of mutation to not spend tries for no mutations
initial_parameters = deepcopy(self.parameters)
initial_parameters.mutation_prob = 1.0
self.mutation.update_requirements(parameters=initial_parameters)

# additional functions
def try_mutation(descriptive_id: str, mutation_type: Optional[MutationType] = None):
left_tries[0] -= 1
return executor.submit(self._mutation_n_evaluation, descriptive_id,
population_descriptive_ids_mapping[descriptive_id],
mutation_type, evaluator)

def check_and_try_mutation(parent_descriptive_id: str,
mutation_type: Optional[MutationType] = None,
count: int = 1):
# probs should be the same order as mutation_types
probs = dict(zip(mutation_types, self.mutation._operator_agent.get_action_probs()))
real_probs = {mutation_type: mutation_count_for_each_ind[parent_descriptive_id][mutation_type] /
max(1, all_mutations_count_for_each_ind[parent_descriptive_id])
for mutation_type in mutation_types}
# check probability allows to make mutations
if (all_mutations_count_for_each_ind[parent_descriptive_id] == 0 or
probs[mutation_type] > real_probs[mutation_type] or
len(set(real_probs.values())) == 1):
# check that there is not enough mutations
if all_mutations_count_for_each_ind[parent_descriptive_id] < mutations_per_individual:
for _ in range(count):
futures.append(try_mutation(parent_descriptive_id, mutation_type))
return True
return False

def add_new_individual_to_new_population(parent_descriptive_id: str,
mutation_type: MutationType,
new_individual: Individual):
if new_individual:
descriptive_id = new_individual.graph.descriptive_id
if descriptive_id not in self._pop_graph_descriptive_ids:
mutation_count_for_each_ind[parent_descriptive_id][mutation_type] += 1
all_mutations_count_for_each_ind[parent_descriptive_id] += 1
mutations_count[mutation_type] += 1

new_population.append(new_individual)
self._pop_graph_descriptive_ids.add(descriptive_id)
return True
return False

# start reproducing
max_tries = self.parameters.pop_size * MAX_GRAPH_GEN_ATTEMPTS_AS_POP_SIZE_MULTIPLIER
mutation_fun = partial(self._mutation_n_evaluation, evaluator=evaluator)
new_population = []
executor = get_reusable_executor(max_workers=n_jobs)

# stage 1
# set up each type of mutation for each individual
futures = deque(try_mutation(descriptive_id, mutation_type)
for mutation_type in np.random.permutation(mutation_types)
for descriptive_id in np.random.permutation(list(population_descriptive_ids_mapping)))

# stage 2
delayed_mutations = deque()
individual_id_with_lowest_mutations, rarest_mutation_type = None, None
times = []
while futures:
if len(new_population) == target_pop_size or left_tries[0] <= 0:
break

# get next finished future
while True:
future = futures.popleft()
if future._state == 'FINISHED': break
futures.append(future)
time.sleep(0.01) # to prevent flooding

# add new individual to new population
parent_descriptive_id, mutation_type, new_ind = future.result()
added = add_new_individual_to_new_population(parent_descriptive_id, mutation_type, new_ind)

# define rarest ind and mutation
if added:
all_mutations = sum(mutations_count.values())
probs = dict(zip(mutation_types, self.mutation._operator_agent.get_action_probs()))
real_probs = {mutation_type: mutations_count[mutation_type] / (all_mutations * probs[mutation_type])
for mutation_type in mutation_types}

frequent_mutation_type = max(real_probs.items(), key=lambda x: x[1])[0]
rarest_mutation_type = min(real_probs.items(), key=lambda x: x[1])[0]
individual_id_with_lowest_mutations = min(all_mutations_count_for_each_ind.items(),
key=lambda x: x[1])[0]

# create new future with same mutation and same individual
count = (1 +
(individual_id_with_lowest_mutations == parent_descriptive_id) +
(rarest_mutation_type == mutation_type))
applied = check_and_try_mutation(parent_descriptive_id, mutation_type, count)

# if there is no need in parent_descriptive_id & mutation_type mutation
# then try to find new mutation
count = n_jobs + 1 - len(futures)
if not applied:
delayed_mutations.append((parent_descriptive_id, mutation_type))
for _ in range(len(delayed_mutations) - 1):
parent_descriptive_id, mutation_type = delayed_mutations.popleft()
if ((individual_id_with_lowest_mutations == parent_descriptive_id or
mutation_type == rarest_mutation_type) and
mutation_type != frequent_mutation_type):
futures.append(try_mutation(parent_descriptive_id, mutation_type))
applied = True
else:
applied = check_and_try_mutation(parent_descriptive_id, mutation_type)
count -= applied
if count <= 0: break
if not applied: delayed_mutations.append((parent_descriptive_id, mutation_type))

# if there are any feature then process it and add new_ind to new_population if it is ready
for future in futures:
if future._state == 'FINISHED':
add_new_individual_to_new_population(*future.result())
executor.shutdown(wait=False)

new_population = list(map(mutation_fun, cycle(population)))

with Parallel(n_jobs=self.mutation.requirements.n_jobs, return_as='generator') as parallel:
new_ind_generator = parallel(delayed(mutation_fun)(ind)
for ind, _ in zip(cycle(population), range(max_tries)))
for new_ind, mutation_type, applied in new_ind_generator:
if applied:
descriptive_id = new_ind.graph.descriptive_id
if descriptive_id not in self._pop_graph_descriptive_ids:
new_population.append(new_ind)
self._pop_graph_descriptive_ids.add(descriptive_id)
if len(new_population) >= self.parameters.pop_size:
break
else:
self.mutation.agent_experience.collect_experience(new_ind.graph, mutation_type, reward=-1.0)

# Reset mutation probabilities to default
self.mutation.update_requirements(requirements=self.parameters)
Expand All @@ -231,16 +125,10 @@ def _check_final_population(self, population: PopulationT) -> None:
f'have {len(population)},'
f' required {target_pop_size}!\n' + helpful_msg)

def _mutation_n_evaluation(self,
descriptive_id: str,
individual: Individual,
mutation_type: Optional[MutationType],
evaluator: EvaluationOperator):
individual, mutation_type, applied = self.mutation._mutation(individual, mutation_type=mutation_type)
if applied and individual and self.verifier(individual.graph):
def _mutation_n_evaluation(self, individual: Individual, evaluator: EvaluationOperator):
individual, mutation_type, applied = self.mutation._mutation(individual)
if individual and self.verifier(individual.graph):
individuals = evaluator([individual])
if individuals:
# if all is ok return all data
return descriptive_id, mutation_type, individuals[0]
# if something go wrong do not return new individual
return descriptive_id, mutation_type, None
return individuals[0], mutation_type, applied
return individual, mutation_type, False

0 comments on commit cee5afb

Please sign in to comment.