Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kasyanovse committed Dec 7, 2023
1 parent 39b07fe commit 2e2a318
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 94 deletions.
79 changes: 18 additions & 61 deletions golem/core/optimisers/common_optimizer/common_optimizer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from abc import abstractmethod
from collections import deque
from dataclasses import dataclass
from typing import Optional, Sequence, Union, Any, Dict, List
from typing import Optional, Sequence, Union, Any, Dict, List, Callable

from golem.core.dag.graph import Graph
from golem.core.optimisers.common_optimizer.node import Node
from golem.core.optimisers.common_optimizer.scheme import Scheme
from golem.core.optimisers.common_optimizer.stage import Stage
from golem.core.optimisers.common_optimizer.task import Task, TaskStatusEnum
from golem.core.optimisers.genetic.operators.operator import PopulationT
from golem.core.optimisers.graph import OptGraph
Expand All @@ -18,54 +19,32 @@

@dataclass
class CommonOptimizerParameters:
_run: bool
generations: List[PopulationT]

objective: Objective
initial_graphs: Sequence[Union[Graph, Any]]
requirements: OptimizationParameters
graph_generation_params: GraphGenerationParams
graph_optimizer_params: AlgorithmParameters
stages: List[Stage]
history: OptHistory


class Runner:
def __init__(self):
self.history = list()

@abstractmethod
def run(self, scheme: Scheme, task: Task):
raise NotImplementedError('It is abstract method')


class ParallelRunner(Runner):
def __init__(self):
super().__init__()


class OneThreadRunner(Runner):
def __init__(self):
super().__init__()

def run(self, scheme: Scheme, task: Task, nodes: Dict[Node]):
self.history.append(task)
tasks = deque()
while task.status is not TaskStatusEnum.FINISH_RUNNER:
task = scheme.next(task)
processed_tasks = nodes[task.get_next_node()](task)
tasks.extend(processed_tasks)
task = tasks.popleft()
self.history.append(task)


class CommonOptimizer(GraphOptimizer):
__parameters_attrs = ('objective', 'initial_graphs', 'requirements', 'graph_generation_params',
'graph_optimizer_params', 'history')
__parameters_allowed_to_change = ('requirements', 'graph_generation_params', 'graph_optimizer_params')
'graph_optimizer_params', 'history', 'stages', '_run', 'generations')
__parameters_allowed_to_change = ('requirements', 'graph_generation_params',
'graph_optimizer_params', 'stages', '_run', 'generations')

def __init__(self,
objective: Objective,
initial_graphs: Optional[Sequence[Union[Graph, Any]]] = None,
requirements: Optional[OptimizationParameters] = None,
graph_generation_params: Optional[GraphGenerationParams] = None,
graph_optimizer_params: Optional[AlgorithmParameters] = None):
graph_optimizer_params: Optional[AlgorithmParameters] = None,
stages: Optional[List[Stage]] = None):

super().__init__(objective=objective,
initial_graphs=initial_graphs,
requirements=requirements,
Expand All @@ -74,6 +53,8 @@ def __init__(self,

self.timer = OptimisationTimer(timeout=self.requirements.timeout)
self.generations = list()
self.stages = stages
self._run = True

@property
def parameters(self):
Expand All @@ -86,31 +67,7 @@ def parameters(self, parameters: CommonOptimizerParameters):
for attr in self.__parameters_allowed_to_change:
setattr(self, attr, getattr(parameters, attr))

def optimise(self, objective: ObjectiveFunction) -> Sequence[OptGraph]:
parameters = self.parameters
with self.timer, self._progressbar as pbar:
while True:
parameters = self.parameters
for pool in self.pools:
parameters = pool(parameters)
self._update_population(parameters.population)
pbar.close()

self.parameters = parameters

self._update_population(self.best_individuals, 'final_choices')
return [ind.graph for ind in self.best_individuals]

def _update_population(self, next_population: PopulationT, label: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None):
self.generations.append(next_population)
if self.requirements.keep_history:
self._log_to_history(next_population, label, metadata)
self._iteration_callback(next_population, self)

def _log_to_history(self, population: PopulationT, label: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None):
self.history.add_to_history(population, label, metadata)
self.history.add_to_archive_history(self.generations.best_individuals)
if self.requirements.history_dir:
self.history.save_current_results(self.requirements.history_dir)
def optimise(self, objective: ObjectiveFunction):
while self._run:
for i_stage in range(len(self.stages)):
self.parameters = self.stages[i_stage].run(self.parameters)
109 changes: 95 additions & 14 deletions golem/core/optimisers/common_optimizer/config.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from collections import defaultdict
from copy import deepcopy
from enum import Enum, auto
from functools import partial
from random import random
from random import random, sample

import numpy as np
from typing import Optional, Tuple, List

from golem.core.optimisers.common_optimizer.common_optimizer import CommonOptimizer, CommonOptimizerParameters
from golem.core.optimisers.common_optimizer.node import Node
from golem.core.optimisers.common_optimizer.scheme import Scheme
from golem.core.optimisers.common_optimizer.runner import OneThreadRunner
from golem.core.optimisers.common_optimizer.scheme import Scheme, SequentialScheme
from golem.core.optimisers.common_optimizer.stage import Stage
from golem.core.optimisers.common_optimizer.task import Task, TaskStatusEnum
from golem.core.optimisers.initial_graphs_generator import InitialPopulationGenerator
from golem.core.optimisers.objective import Objective
Expand All @@ -26,20 +29,13 @@

class AdaptiveParametersTask(Task):
def __init__(self, parameters: CommonOptimizerParameters):
self.pop_size = parameters.graph_generation_params.pop_size
super().__init__()
self.pop_size = parameters.graph_optimizer_params.pop_size

def update_parameters(self, parameters: CommonOptimizerParameters):
parameters.graph_generation_params.pop_size = self.pop_size
parameters.graph_optimizer_params.pop_size = self.pop_size
return parameters


class Scheme_1(Scheme):
def __init__(self):
super().__init__()
self.__map = {None: {TaskStatusEnum.NEXT: 'adaptive_pop_size'},
'adaptive_pop_size': {TaskStatusEnum.SUCCESS: None,
TaskStatusEnum.FAIL: 'adaptive_pop_size'}}

def adaptive_size(task):
if random() > 0.5:
task.pop_size += 1
Expand All @@ -49,16 +45,101 @@ def adaptive_size(task):
task.status = TaskStatusEnum.FAIL
return task

pop_size_node = Node('pop_size', adaptive_size)
def adaptive_parameter_updater(results, parameters):
return results[0].update_parameters(parameters)

pop_size_node_1 = Node('pop_size1', adaptive_size)
pop_size_node_2 = Node('pop_size2', adaptive_size)
nodes = [pop_size_node_1, pop_size_node_2]

stage1 = Stage(runner=OneThreadRunner(), nodes=nodes, scheme=SequentialScheme(nodes=[x.name for x in nodes]),
task_builder=AdaptiveParametersTask, stop_fun=lambda f, a: bool(f),
parameter_updater=adaptive_parameter_updater)

optimizer = CommonOptimizer(objective=objective,
graph_generation_params=graph_generation_params,
requirements=requirements,
initial_graphs=initial_graphs,
graph_optimizer_params=graph_optimizer_params)

pool1 = Pool()
optimizer.generations = [[f"g{i}" for i in range(10)]]
# res = stage1.run(optimizer.parameters)


class PopulationReproducerTask(Task):
def __init__(self, parameters: CommonOptimizerParameters):
super().__init__()
self.generation = parameters.generations[-1]

def mock_crossover(task):
inds = sample(task.generation, max(1, round(random() * 10)))
task.status = TaskStatusEnum.SUCCESS

tasks = list()
for ind in inds:
new_task = task.copy()
new_task.generation = [ind + 'c']
tasks.append(new_task)
return tasks

def mock_mutation(task):
if random() > 0.5:
if task.generation[0][-1] == 'c':
task.generation[0] = task.generation[0] + 'm0'
parts = task.generation[0].split('m')
task.generation[0] = 'm'.join(parts[0:-1]) + 'm' + str(int(parts[-1]) + 1)
task.status = TaskStatusEnum.SUCCESS
else:
task.status = TaskStatusEnum.FAIL
return task

nodes = [Node('crossover', mock_crossover)]
nodes += [Node(f"mutation_{i}", mock_mutation) for i in range(5)]
# TODO some nodes for each variant
scheme_map = {None: defaultdict(lambda: 'crossover'),
'crossover': {TaskStatusEnum.SUCCESS: 'mutation_0',
TaskStatusEnum.FAIL: 'crossover'},
'mutation_0': {TaskStatusEnum.SUCCESS: 'mutation_1',
TaskStatusEnum.FAIL: 'mutation_0'},
'mutation_1': {TaskStatusEnum.SUCCESS: 'mutation_2',
TaskStatusEnum.FAIL: 'mutation_1'},
'mutation_2': {TaskStatusEnum.SUCCESS: 'mutation_3',
TaskStatusEnum.FAIL: 'mutation_2'},
'mutation_3': {TaskStatusEnum.SUCCESS: 'mutation_4',
TaskStatusEnum.FAIL: 'mutation_3'},
'mutation_4': {TaskStatusEnum.SUCCESS: None,
TaskStatusEnum.FAIL: 'mutation_4'}}
def reproducer_parameter_updater(results, parameters):
parameters.generations.append([res.generation[0] for res in results])
return parameters

stage2 = Stage(runner=OneThreadRunner(), nodes=nodes, scheme=Scheme(scheme_map),
task_builder=PopulationReproducerTask, stop_fun=lambda f, a: len(f) > 10,
parameter_updater=reproducer_parameter_updater)
# res = stage2.run(optimizer.parameters)


nodes = [Node('stopper', lambda x: x)]
def stopper_parameter_updater(results, parameters):
if len(parameters.generations) > 3:
parameters._run = False
return parameters

stage3 = Stage(runner=OneThreadRunner(), nodes=nodes, scheme=SequentialScheme(nodes=[x.name for x in nodes]),
task_builder=Task, stop_fun=lambda f, a: bool(f),
parameter_updater=stopper_parameter_updater)
# res = stage3.run(optimizer.parameters)



optimizer = CommonOptimizer(objective=objective,
graph_generation_params=graph_generation_params,
requirements=requirements,
initial_graphs=initial_graphs,
graph_optimizer_params=graph_optimizer_params,
stages=[stage1, stage2, stage3])
optimizer.generations = [[f"g{i}" for i in range(10)]]
optimizer.optimise(1)

print(*optimizer.generations, sep='\n')

41 changes: 41 additions & 0 deletions golem/core/optimisers/common_optimizer/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from abc import abstractmethod
from collections import deque

from typing import List, Callable

from golem.core.optimisers.common_optimizer.node import Node
from golem.core.optimisers.common_optimizer.scheme import Scheme
from golem.core.optimisers.common_optimizer.task import Task, TaskStatusEnum


class Runner:
def __init__(self):
pass

@abstractmethod
def run(self, scheme: Scheme, task: Task, nodes: List[Node], stop_fun: Callable):
raise NotImplementedError('It is abstract method')


class ParallelRunner(Runner):
def __init__(self):
super().__init__()


class OneThreadRunner(Runner):
def __init__(self):
super().__init__()

def run(self, scheme: Scheme, task: Task, nodes: List[Node], stop_fun: Callable):
origin_task = task
node_map = {node.name: node for node in nodes}
queued_tasks, finished_tasks, all_tasks = [list() for i in range(3)]
while not stop_fun(finished_tasks, all_tasks):
task = origin_task.copy() if len(queued_tasks) == 0 else queued_tasks.pop()
all_tasks.append(task)
task = scheme.next(task)
if task.status is TaskStatusEnum.FINISH:
finished_tasks.append(task)
continue
queued_tasks.extend(node_map[task.node](task))
return finished_tasks
17 changes: 8 additions & 9 deletions golem/core/optimisers/common_optimizer/scheme.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
from collections import defaultdict
from typing import List, Union, Optional
from typing import List, Union, Optional, Dict

from golem.core.optimisers.common_optimizer.node import Node
from golem.core.optimisers.common_optimizer.task import Task
from golem.core.optimisers.common_optimizer.task import Task, TaskStatusEnum
from golem.utilities.utilities import determine_n_jobs


class Scheme:
def __init__(self, n_jobs: int = -1):
self.__map = dict()
self.n_jobs = determine_n_jobs(n_jobs)
def __init__(self, scheme_map: Optional[Dict[str, Dict[TaskStatusEnum, str]]] = None):
self._map = scheme_map or dict()
self.nodes = None

def next(self, task: Task):
task = task.copy()
task.set_next_node(self.__map[task.status[0]][task.status[1]])
task.node = self._map[task.node][task.status]
return task


Expand All @@ -23,7 +22,7 @@ def __init__(self, *args, nodes: Optional[List[Union[str, Node]]] = None, **kwar
raise ValueError('nodes should be list with nodes')
super().__init__(*args, **kwargs)

self.__map = dict()
self._map = dict()
nodes = [node.name if isinstance(node, Node) else node for node in nodes]
for prev_node, next_node in zip([None] + nodes, nodes + [None]):
self.__map[prev_node] = defaultdict(next_node)
self._map[prev_node] = defaultdict(lambda *args, next_node=next_node: next_node)
29 changes: 29 additions & 0 deletions golem/core/optimisers/common_optimizer/stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from dataclasses import dataclass

from typing import List, Callable

from golem.core.optimisers.common_optimizer.node import Node
from golem.core.optimisers.common_optimizer.runner import Runner
from golem.core.optimisers.common_optimizer.scheme import Scheme
from golem.core.optimisers.common_optimizer.task import Task


@dataclass
class Stage:
runner: Runner
nodes: List[Node]
task_builder: Callable[['CommonOptimizerParameters'], Task]
scheme: Scheme
stop_fun: Callable[[List[Task], List[Task]], bool]
parameter_updater: Callable[[List[Task], 'CommonOptimizerParameters'], 'CommonOptimizerParameters']

def __post_init__(self):
# TODO add checks
# for node names
# for types
pass

def run(self, parameters: 'CommonOptimizerParameters'):
task = self.task_builder(parameters)
results = self.runner.run(nodes=self.nodes, task=task, scheme=self.scheme, stop_fun=self.stop_fun)
return self.parameter_updater(results, parameters)
Loading

0 comments on commit 2e2a318

Please sign in to comment.