diff --git a/examples/molecule_search/experiment.py b/examples/molecule_search/experiment.py index d8e407d14..79cea2bbe 100644 --- a/examples/molecule_search/experiment.py +++ b/examples/molecule_search/experiment.py @@ -16,6 +16,8 @@ normalized_logp, CLScorer from golem.core.dag.verification_rules import has_no_self_cycled_nodes, has_no_isolated_components, \ has_no_isolated_nodes +from golem.core.optimisers.adaptive.agent_trainer import AgentTrainer +from golem.core.optimisers.adaptive.history_collector import HistoryReader from golem.core.optimisers.adaptive.operator_agent import MutationAgentTypeEnum from golem.core.optimisers.genetic.gp_optimizer import EvoGraphOptimizer from golem.core.optimisers.genetic.gp_params import GPAlgorithmParameters @@ -25,6 +27,7 @@ from golem.core.optimisers.objective import Objective from golem.core.optimisers.opt_history_objects.opt_history import OptHistory from golem.core.optimisers.optimizer import GraphGenerationParams, GraphOptimizer +from golem.core.paths import project_root from golem.visualisation.opt_history.multiple_fitness_line import MultipleFitnessLines from golem.visualisation.opt_viz_extra import visualise_pareto @@ -129,6 +132,16 @@ def visualize_results(molecules: Iterable[MolGraph], image.show() +def pretrain_agent(optimizer: EvoGraphOptimizer, objective: Objective, results_dir: str) -> AgentTrainer: + agent = optimizer.mutation.agent + trainer = AgentTrainer(objective, optimizer.mutation, agent) + # load histories + history_reader = HistoryReader(Path(results_dir)) + # train agent + trainer.fit(histories=history_reader.load_histories(), validate_each=1) + return trainer + + def run_experiment(optimizer_setup: Callable, optimizer_cls: Type[GraphOptimizer] = EvoGraphOptimizer, adaptive_kind: MutationAgentTypeEnum = MutationAgentTypeEnum.random, @@ -143,13 +156,14 @@ def run_experiment(optimizer_setup: Callable, trial_iterations: Optional[int] = None, visualize: bool = False, save_history: bool = True, + pretrain_dir: Optional[str] = None, ): + metrics = metrics or ['qed_score'] optimizer_id = optimizer_cls.__name__.lower()[:3] experiment_id = f'Experiment [optimizer={optimizer_id} metrics={", ".join(metrics)} pop_size={pop_size}]' exp_name = f'{optimizer_id}_{adaptive_kind.value}_popsize{pop_size}_min{trial_timeout}_{"_".join(metrics)}' atom_types = atom_types or ['C', 'N', 'O', 'F', 'P', 'S', 'Cl', 'Br'] - metrics = metrics or ['qed_score'] trial_results = [] trial_histories = [] trial_timedelta = timedelta(minutes=trial_timeout) if trial_timeout else None @@ -165,6 +179,9 @@ def run_experiment(optimizer_setup: Callable, pop_size, metrics, initial_molecules) + if pretrain_dir: + pretrain_agent(optimizer, objective, pretrain_dir) + found_graphs = optimizer.optimise(objective) history = optimizer.history @@ -208,10 +225,11 @@ def plot_experiment_comparison(experiment_ids: Sequence[str], metric_id: int = 0 if __name__ == '__main__': run_experiment(molecule_search_setup, - adaptive_kind=MutationAgentTypeEnum.random, + adaptive_kind=MutationAgentTypeEnum.bandit, max_heavy_atoms=38, - trial_timeout=15, + trial_timeout=6, pop_size=50, - metrics=['qed_score', 'cl_score'], visualize=True, - num_trials=5) + num_trials=5, + pretrain_dir=os.path.join(project_root(), 'examples', 'molecule_search', 'histories') + ) diff --git a/examples/molecule_search/mol_adapter.py b/examples/molecule_search/mol_adapter.py index dfd0a2367..226d6ca31 100644 --- a/examples/molecule_search/mol_adapter.py +++ b/examples/molecule_search/mol_adapter.py @@ -17,8 +17,10 @@ def __init__(self): def _restore(self, opt_graph: OptGraph, metadata: Optional[Dict[str, Any]] = None) -> MolGraph: digraph = self.nx_adapter.restore(opt_graph) - # return to previous node indexing - digraph = nx.relabel_nodes(digraph, dict(digraph.nodes(data='nxid'))) + # to ensure backward compatibility with old individuals without 'nxid' field in nodes + if not any(x is None for x in list(dict(digraph.nodes(data='nxid')).values())): + # return to previous node indexing + digraph = nx.relabel_nodes(digraph, dict(digraph.nodes(data='nxid'))) digraph = restore_edges_params_from_nodes(digraph) nx_graph = digraph.to_undirected() mol_graph = MolGraph.from_nx_graph(nx_graph) @@ -50,7 +52,11 @@ def restore_edges_params_from_nodes(graph: nx.DiGraph) -> nx.DiGraph: all_edges_params = {} for node in graph.nodes(): for predecessor in graph.predecessors(node): - edge_params = edge_params_by_node[node][predecessor] - all_edges_params.update({(predecessor, node): edge_params}) + node_params = edge_params_by_node[node] + # sometimes by unknown reason some nodes are encoded as int, some as str. + # maybe that's deserialization messing up somewhere. + edge_params = node_params.get(predecessor) or node_params.get(str(predecessor)) + if edge_params: + all_edges_params[(predecessor, node)] = edge_params nx.set_edge_attributes(graph, all_edges_params) return graph diff --git a/examples/molecule_search/mol_graph.py b/examples/molecule_search/mol_graph.py index 7db32b687..799c6c8f2 100644 --- a/examples/molecule_search/mol_graph.py +++ b/examples/molecule_search/mol_graph.py @@ -5,7 +5,7 @@ from rdkit import Chem from rdkit.Chem import MolFromSmiles, MolToSmiles, SanitizeMol, Kekulize, MolToInchi from rdkit.Chem.Draw import rdMolDraw2D -from rdkit.Chem.rdchem import Atom, BondType, RWMol, GetPeriodicTable +from rdkit.Chem.rdchem import Atom, BondType, RWMol, GetPeriodicTable, ChiralType, HybridizationType class MolGraph: @@ -32,10 +32,10 @@ def from_nx_graph(graph: nx.Graph): node_to_idx = {} for node in graph.nodes(): a = Chem.Atom(atomic_nums[node]) - a.SetChiralTag(chiral_tags[node]) + a.SetChiralTag(ChiralType(chiral_tags[node])) a.SetFormalCharge(formal_charges[node]) a.SetIsAromatic(node_is_aromatics[node]) - a.SetHybridization(node_hybridizations[node]) + a.SetHybridization(HybridizationType(node_hybridizations[node])) a.SetNumExplicitHs(num_explicit_hss[node]) idx = mol.AddAtom(a) node_to_idx[node] = idx @@ -45,7 +45,7 @@ def from_nx_graph(graph: nx.Graph): first, second = edge ifirst = node_to_idx[first] isecond = node_to_idx[second] - bond_type = bond_types[first, second] + bond_type = BondType(bond_types[first, second]) mol.AddBond(ifirst, isecond, bond_type) SanitizeMol(mol) diff --git a/golem/core/adapter/nx_adapter.py b/golem/core/adapter/nx_adapter.py index 91645b71d..183e4456d 100644 --- a/golem/core/adapter/nx_adapter.py +++ b/golem/core/adapter/nx_adapter.py @@ -22,13 +22,14 @@ def __init__(self): def _node_restore(self, node: GraphNode) -> Dict: """Transforms GraphNode to dict of NetworkX node attributes. Override for custom behavior.""" + parameters = {} if hasattr(node, 'parameters'): - parameters = node.parameters - if node.name: - parameters['name'] = node.name - return deepcopy(parameters) - else: - return {} + parameters = deepcopy(node.parameters) + + if node.name: + parameters['name'] = node.name + + return parameters def _node_adapt(self, data: Dict) -> OptNode: """Transforms a dict of NetworkX node attributes to GraphNode. diff --git a/golem/core/optimisers/adaptive/agent_trainer.py b/golem/core/optimisers/adaptive/agent_trainer.py new file mode 100644 index 000000000..d01f5a4f6 --- /dev/null +++ b/golem/core/optimisers/adaptive/agent_trainer.py @@ -0,0 +1,198 @@ +import operator +from copy import deepcopy +from functools import reduce +from typing import Sequence, Optional, Any, Tuple, List, Iterable + +import numpy as np + +from golem.core.dag.graph import Graph +from golem.core.log import default_log +from golem.core.optimisers.adaptive.common_types import TrajectoryStep, GraphTrajectory +from golem.core.optimisers.adaptive.experience_buffer import ExperienceBuffer +from golem.core.optimisers.adaptive.operator_agent import OperatorAgent +from golem.core.optimisers.fitness import Fitness +from golem.core.optimisers.genetic.operators.mutation import Mutation +from golem.core.optimisers.objective import Objective +from golem.core.optimisers.opt_history_objects.individual import Individual +from golem.core.optimisers.opt_history_objects.opt_history import OptHistory +from golem.core.optimisers.opt_history_objects.parent_operator import ParentOperator +from golem.core.utilities.data_structures import unzip + + +class AgentTrainer: + """Utility class providing fit/validate logic for adaptive Mutation agents. + Works in tandem with `HistoryReader`. + + How to use offline training: + + 1. Collect histories to some directory using `ExperimentLauncher` + 2. Create optimizer & Pretrain mutation agent on these histories using `HistoryReader` and `AgentTrainer` + 3. Optionally, validate the Agent on validation set of histories + 4. Run optimization with pretrained agent + """ + + def __init__(self, + objective: Objective, + mutation_operator: Mutation, + agent: Optional[OperatorAgent] = None, + ): + self._log = default_log(self) + self.agent = agent if agent is not None else mutation_operator.agent + self.mutation = mutation_operator + self.objective = objective + self._adapter = self.mutation.graph_generation_params.adapter + + def fit(self, histories: Iterable[OptHistory], validate_each: int = -1) -> OperatorAgent: + """ + Method to fit trainer on collected histories. + param histories: histories to use in training. + param validate_each: validate agent once in validate_each generation. + """ + # Set mutation probabilities to 1.0 + initial_req = deepcopy(self.mutation.requirements) + self.mutation.requirements.mutation_prob = 1.0 + + for i, history in enumerate(histories): + # Preliminary validity check + # This allows to filter out histories with different objectives automatically + if history.objective.metric_names != self.objective.metric_names: + self._log.warning(f'History #{i+1} has different objective! ' + f'Expected {self.objective}, got {history.objective}.') + continue + + # Build datasets + experience = ExperienceBuffer.from_history(history) + val_experience = None + if validate_each > 0 and i % validate_each == 0: + experience, val_experience = experience.split(ratio=0.8, shuffle=True) + + # Train + self._log.info(f'Training on history #{i+1} with {len(history.generations)} generations') + self.agent.partial_fit(experience) + + # Validate + if val_experience: + reward_loss, reward_target = self.validate_agent(experience=val_experience) + self._log.info(f'Agent validation for history #{i+1} & {experience}: ' + f'Reward target={reward_target:.3f}, loss={reward_loss:.3f}') + + # Reset mutation probabilities to default + self.mutation.update_requirements(requirements=initial_req) + return self.agent + + def validate_on_rollouts(self, histories: Sequence[OptHistory]) -> float: + """Validates rollouts of agent vs. historic trajectories, comparing + their mean total rewards (i.e. total fitness gain over the trajectory).""" + + # Collect all trajectories from all histories; and their rewards + trajectories = concat_lists(map(ExperienceBuffer.unroll_trajectories, histories)) + + mean_traj_len = int(np.mean([len(tr) for tr in trajectories])) + traj_rewards = [sum(reward for _, reward, _ in traj) for traj in trajectories] + mean_baseline_reward = np.mean(traj_rewards) + + # Collect same number of trajectories of the same length; and their rewards + agent_trajectories = [self._sample_trajectory(initial=tr[0][0], length=mean_traj_len) + for tr in trajectories] + agent_traj_rewards = [sum(reward for _, reward, _ in traj) for traj in agent_trajectories] + mean_agent_reward = np.mean(agent_traj_rewards) + + # Compute improvement score of agent over baseline histories + improvement = mean_agent_reward - mean_baseline_reward + return improvement + + def validate_history(self, history: OptHistory) -> Tuple[float, float]: + """Validates history of mutated individuals against optimal policy.""" + history_trajectories = ExperienceBuffer.unroll_trajectories(history) + return self._validate_against_optimal(history_trajectories) + + def validate_agent(self, + graphs: Optional[Sequence[Graph]] = None, + experience: Optional[ExperienceBuffer] = None) -> Tuple[float, float]: + """Validates agent policy against optimal policy on given graphs.""" + if experience: + agent_steps = experience.retrieve_trajectories() + elif graphs: + agent_steps = [self._make_action_step(Individual(g)) for g in graphs] + else: + self._log.warning('Either graphs or history must not be None for validation!') + return 0., 0. + return self._validate_against_optimal(trajectories=[agent_steps]) + + def _validate_against_optimal(self, trajectories: Sequence[GraphTrajectory]) -> Tuple[float, float]: + """Validates a policy trajectories against optimal policy + that at each step always chooses the best action with max reward.""" + reward_losses = [] + reward_targets = [] + for trajectory in trajectories: + inds, actions, rewards = unzip(trajectory) + _, best_actions, best_rewards = self._apply_best_action(inds) + reward_loss = self._compute_reward_loss(rewards, best_rewards) + reward_losses.append(reward_loss) + reward_targets.append(np.mean(best_rewards)) + reward_loss = float(np.mean(reward_losses)) + reward_target = float(np.mean(reward_targets)) + return reward_loss, reward_target + + @staticmethod + def _compute_reward_loss(rewards, optimal_rewards, normalized=False) -> float: + """Returns difference (or deviation) from optimal reward. + When normalized, 0. means actual rewards match optimal rewards completely, + 0.5 means they on average deviate by 50% from optimal rewards, + and 2.2 means they on average deviate by more than 2 times from optimal reward.""" + reward_losses = np.subtract(optimal_rewards, rewards) # always positive + if normalized: + reward_losses = reward_losses / np.abs(optimal_rewards) \ + if np.count_nonzero(optimal_rewards) == optimal_rewards.size else reward_losses + means = np.mean(reward_losses) + return float(means) + + def _apply_best_action(self, inds: Sequence[Individual]) -> TrajectoryStep: + """Returns greedily optimal mutation for given graph and associated reward.""" + candidates = [] + for ind in inds: + for mutation_id in self.agent.available_actions: + try: + values = self._apply_action(mutation_id, ind) + candidates.append(values) + except Exception as e: + self._log.warning(f'Eval error for mutation <{mutation_id}> ' + f'on graph: {ind.graph.descriptive_id}:\n{e}') + continue + best_step = max(candidates, key=lambda step: step[-1]) + return best_step + + def _apply_action(self, action: Any, ind: Individual) -> TrajectoryStep: + new_graph, applied = self.mutation._adapt_and_apply_mutation(ind.graph, action) + fitness = self._eval_objective(new_graph) if applied else None + parent_op = ParentOperator(type_='mutation', operators=applied, parent_individuals=ind) + new_ind = Individual(new_graph, fitness=fitness, parent_operator=parent_op) + + prev_fitness = ind.fitness or self._eval_objective(ind.graph) + if prev_fitness and fitness: + reward = prev_fitness.value - fitness.value + elif prev_fitness and not fitness: + reward = -1. + else: + reward = 0. + return new_ind, action, reward + + def _eval_objective(self, graph: Graph) -> Fitness: + return self._adapter.adapt_func(self.objective)(graph) + + def _make_action_step(self, ind: Individual) -> TrajectoryStep: + action = self.agent.choose_action(ind.graph) + return self._apply_action(action, ind) + + def _sample_trajectory(self, initial: Individual, length: int) -> GraphTrajectory: + trajectory = [] + past_ind = initial + for i in range(length): + next_ind, action, reward = self._make_action_step(past_ind) + trajectory.append((next_ind, action, reward)) + past_ind = next_ind + return trajectory + + +def concat_lists(lists: Iterable[List]) -> List: + return reduce(operator.add, lists, []) diff --git a/golem/core/optimisers/adaptive/common_types.py b/golem/core/optimisers/adaptive/common_types.py new file mode 100644 index 000000000..528ebcc84 --- /dev/null +++ b/golem/core/optimisers/adaptive/common_types.py @@ -0,0 +1,11 @@ +from typing import Union, Hashable, Tuple, Sequence + +from golem.core.dag.graph import Graph +from golem.core.optimisers.opt_history_objects.individual import Individual + +ObsType = Union[Individual, Graph] +ActType = Hashable +# Trajectory step includes (past observation, action, reward) +TrajectoryStep = Tuple[Individual, ActType, float] +# Trajectory is a sequence of applied mutations and received rewards +GraphTrajectory = Sequence[TrajectoryStep] diff --git a/golem/core/optimisers/adaptive/experience_buffer.py b/golem/core/optimisers/adaptive/experience_buffer.py new file mode 100644 index 000000000..6b4852dd1 --- /dev/null +++ b/golem/core/optimisers/adaptive/experience_buffer.py @@ -0,0 +1,142 @@ +from collections import deque +from typing import List, Iterable, Tuple, Optional + +import numpy as np + +from golem.core.optimisers.adaptive.common_types import ObsType, ActType, TrajectoryStep, GraphTrajectory +from golem.core.optimisers.opt_history_objects.individual import Individual +from golem.core.optimisers.opt_history_objects.opt_history import OptHistory + + +class ExperienceBuffer: + """Buffer for learning experience of ``OperatorAgent``. + Keeps (State, Action, Reward) lists until retrieval.""" + + def __init__(self, window_size: Optional[int] = None, inds=None, actions=None, rewards=None): + self.window_size = window_size + self._prev_pop = set() + self._next_pop = set() + + if inds and not (len(inds) == len(actions) == len(rewards)): + raise ValueError('lengths of buffers do not match') + self._individuals = deque(inds) if inds else deque(maxlen=self.window_size) + self._actions = deque(actions) if actions else deque(maxlen=self.window_size) + self._rewards = deque(rewards) if rewards else deque(maxlen=self.window_size) + + @staticmethod + def from_history(history: OptHistory) -> 'ExperienceBuffer': + exp = ExperienceBuffer() + exp.collect_history(history) + return exp + + def _reset(self): + self._prev_pop = set() + self._next_pop = set() + + # if window size was not specified than there is no need to store these values for reuse. + # Otherwise, if the window_size is specified, then storages will be updated automatically in queue + if self.window_size is None: + self._individuals = deque(maxlen=self.window_size) + self._actions = deque(maxlen=self.window_size) + self._rewards = deque(maxlen=self.window_size) + + @staticmethod + def unroll_action_step(result: Individual) -> TrajectoryStep: + """Unrolls individual's history to get its source individual, action and resulting reward.""" + if not result.parent_operator or result.parent_operator.type_ != 'mutation': + return None, None, np.nan + source_ind = result.parent_operator.parent_individuals[0] + action = result.parent_operator.operators[0] + # we're minimising the fitness, that's why less is better + reward = (source_ind.fitness.value - result.fitness.value) / abs(source_ind.fitness.value)\ + if source_ind.fitness and source_ind.fitness.value != 0. else 0. + return source_ind, action, reward + + @staticmethod + def unroll_trajectories(history: OptHistory) -> List[GraphTrajectory]: + """Iterates through history and find continuous sequences of applied operator actions.""" + trajectories = [] + seen_uids = set() + for terminal_individual in history.final_choices: + trajectory = [] + next_ind = terminal_individual + while True: + seen_uids.add(next_ind.uid) + source_ind, action, reward = ExperienceBuffer.unroll_action_step(next_ind) + if source_ind is None or source_ind.uid in seen_uids: + break + # prepend step to keep historical direction + trajectory.insert(0, (source_ind, action, reward)) + next_ind = source_ind + trajectories.append(trajectory) + return trajectories + + def collect_history(self, history: OptHistory): + seen = set() + # We don't need the initial assumptions, as they have no parent operators, hence [1:] + for generation in history.generations[1:]: + for ind in generation: + if ind.uid not in seen: + seen.add(ind.uid) + self.collect_result(ind) + + def collect_results(self, results: Iterable[Individual]): + for ind in results: + self.collect_result(ind) + + def collect_result(self, result: Individual): + if result.uid in self._prev_pop: + # avoid collecting results from individuals that didn't change + return + self._next_pop.add(result.uid) + + source_ind, action, reward = self.unroll_action_step(result) + if action is None: + return + self.collect_experience(source_ind, action, reward) + + def collect_experience(self, obs: Individual, action: ActType, reward: float): + self._individuals.append(obs) + self._actions.append(action) + self._rewards.append(reward) + + def retrieve_experience(self, as_graphs: bool = True) -> Tuple[List[ObsType], List[ActType], List[float]]: + """Get all collected experience and clear the experience buffer. + Args: + as_graphs: if True (by default) returns observations as graphs, otherwise as individuals. + Return: + Unzipped trajectories (tuple of lists of observations, actions, rewards). + """ + individuals, actions, rewards = self._individuals, self._actions, self._rewards + observations = [ind.graph for ind in individuals] if as_graphs else individuals + next_pop = self._next_pop + self._reset() + self._prev_pop = next_pop + return list(observations), list(actions), list(rewards) + + def retrieve_trajectories(self) -> GraphTrajectory: + """Same as `retrieve_experience` but in the form of zipped trajectories that consist from steps.""" + trajectories = list(zip(*self.retrieve_experience(as_graphs=False))) + return trajectories + + def split(self, ratio: float = 0.8, shuffle: bool = False + ) -> Tuple['ExperienceBuffer', 'ExperienceBuffer']: + """Splits buffer in 2 parts, useful for train/validation split.""" + mask_train = np.full_like(self._individuals, False, dtype=bool) + num_train = int(len(self._individuals) * ratio) + mask_train[-num_train:] = True + if shuffle: + np.random.default_rng().shuffle(mask_train) + buffer_train = ExperienceBuffer(inds=np.array(self._individuals)[mask_train].tolist(), + actions=np.array(self._actions)[mask_train].tolist(), + rewards=np.array(self._rewards)[mask_train].tolist()) + buffer_val = ExperienceBuffer(inds=np.array(self._individuals)[~mask_train].tolist(), + actions=np.array(self._actions)[~mask_train].tolist(), + rewards=np.array(self._rewards)[~mask_train].tolist()) + return buffer_train, buffer_val + + def __len__(self): + return len(self._individuals) + + def __str__(self): + return f'{self.__class__.__name__}({len(self)})' diff --git a/golem/core/optimisers/adaptive/history_collector.py b/golem/core/optimisers/adaptive/history_collector.py new file mode 100644 index 000000000..b33c2197c --- /dev/null +++ b/golem/core/optimisers/adaptive/history_collector.py @@ -0,0 +1,42 @@ +import os +from pathlib import Path +from typing import Optional, Iterable + +from golem.core.log import default_log +from golem.core.optimisers.opt_history_objects.opt_history import OptHistory + + +class HistoryReader: + """Simplifies reading a bunch of histories from single directory.""" + + def __init__(self, save_path: Optional[Path] = None): + self.log = default_log(self) + self.save_path = save_path or Path("results") + self.save_path.mkdir(parents=True, exist_ok=True) + + def load_histories(self) -> Iterable[OptHistory]: + """Iteratively loads saved histories one-by-ony.""" + num_histories = 0 + total_individuals = 0 + for history_path in HistoryReader.traverse_histories(self.save_path): + history = OptHistory.load(history_path) + num_histories += 1 + total_individuals += sum(map(len, history.generations)) + yield history + + if num_histories == 0 or total_individuals == 0: + raise ValueError(f'Could not load any individuals.' + f'Possibly, path {self.save_path} does not exist or is empty.') + else: + self.log.info(f'Loaded {num_histories} histories ' + f'with {total_individuals} individuals in total.') + + @staticmethod + def traverse_histories(path) -> Iterable[Path]: + if path.exists(): + # recursive traversal of the save directory + for root, dirs, files in os.walk(path): + for history_filename in files: + if history_filename.startswith('history'): + full_path = Path(root) / history_filename + yield full_path diff --git a/golem/core/optimisers/adaptive/mab_agents/contextual_mab_agent.py b/golem/core/optimisers/adaptive/mab_agents/contextual_mab_agent.py index 01abda14c..ccd66480d 100644 --- a/golem/core/optimisers/adaptive/mab_agents/contextual_mab_agent.py +++ b/golem/core/optimisers/adaptive/mab_agents/contextual_mab_agent.py @@ -82,6 +82,14 @@ def partial_fit(self, experience: ExperienceBuffer): contexts = self.get_context(obs=obs) self._agent.partial_fit(decisions=arms, rewards=processed_rewards, contexts=contexts) + def _get_experience(self, experience: ExperienceBuffer): + """ Get experience from ExperienceBuffer, process rewards and log. """ + obs, actions, rewards = experience.retrieve_experience() + arms = [self._arm_by_action[action] for action in actions] + # there is no need to process rewards as in MAB, since this processing unifies rewards for all contexts + self._dbg_log(obs, actions, rewards) + return obs, arms, rewards + def get_context(self, obs: Union[List[ObsType], ObsType]) -> np.array: """ Returns contexts based on specified context agent. """ if not isinstance(obs, list): diff --git a/golem/core/optimisers/adaptive/mab_agents/mab_agent.py b/golem/core/optimisers/adaptive/mab_agents/mab_agent.py index a07c75e05..538c2ed2f 100644 --- a/golem/core/optimisers/adaptive/mab_agents/mab_agent.py +++ b/golem/core/optimisers/adaptive/mab_agents/mab_agent.py @@ -22,12 +22,12 @@ def __init__(self, decaying_factor: float = 1.0, path_to_save: Optional[str] = None, is_initial_fit: bool = True): - super().__init__(enable_logging) + super().__init__(actions=actions, enable_logging=enable_logging) self.actions = list(actions) self._indices = list(range(len(actions))) self._arm_by_action = dict(zip(actions, self._indices)) self._agent = MAB(arms=self._indices, - learning_policy=LearningPolicy.UCB1(alpha=1.25), + learning_policy=LearningPolicy.EpsilonGreedy(epsilon=0.4), n_jobs=n_jobs) self._reward_agent = FitnessRateRankRewardTransformer(decaying_factor=decaying_factor) if is_initial_fit: diff --git a/golem/core/optimisers/adaptive/mab_agents/neural_contextual_mab_agent.py b/golem/core/optimisers/adaptive/mab_agents/neural_contextual_mab_agent.py index ad7510319..1265ca6ac 100644 --- a/golem/core/optimisers/adaptive/mab_agents/neural_contextual_mab_agent.py +++ b/golem/core/optimisers/adaptive/mab_agents/neural_contextual_mab_agent.py @@ -3,7 +3,7 @@ from golem.core.optimisers.adaptive.mab_agents.contextual_mab_agent import ContextualMultiArmedBanditAgent from golem.core.optimisers.adaptive.neural_mab import NeuralMAB from golem.core.optimisers.adaptive.context_agents import ContextAgentTypeEnum -from golem.core.optimisers.adaptive.operator_agent import ActType +from golem.core.optimisers.adaptive.common_types import ActType class NeuralContextualMultiArmedBanditAgent(ContextualMultiArmedBanditAgent): diff --git a/golem/core/optimisers/adaptive/operator_agent.py b/golem/core/optimisers/adaptive/operator_agent.py index ac3c906e5..7b1034504 100644 --- a/golem/core/optimisers/adaptive/operator_agent.py +++ b/golem/core/optimisers/adaptive/operator_agent.py @@ -1,19 +1,15 @@ import random from abc import ABC, abstractmethod -from collections import deque from enum import Enum -from typing import Union, Sequence, Hashable, Tuple, Optional, List +from typing import Union, Sequence, Optional import numpy as np from golem.core.dag.graph import Graph from golem.core.dag.graph_node import GraphNode from golem.core.log import default_log -from golem.core.optimisers.genetic.operators.base_mutations import MutationTypesEnum -from golem.core.optimisers.opt_history_objects.individual import Individual - -ObsType = Graph -ActType = Hashable +from golem.core.optimisers.adaptive.common_types import ObsType, ActType +from golem.core.optimisers.adaptive.experience_buffer import ExperienceBuffer class MutationAgentTypeEnum(Enum): @@ -24,77 +20,16 @@ class MutationAgentTypeEnum(Enum): neural_bandit = 'neural_bandit' -class ExperienceBuffer: - """ - Buffer for learning experience of ``OperatorAgent``. - Keeps (State, Action, Reward) lists until retrieval. - Can be used with window_size for actualizing experience. - """ - - def __init__(self, window_size: Optional[int] = None): - self.window_size = window_size - self._reset_main_storages() - self.reset() - - def reset(self): - self._current_observations = [] - self._current_actions = [] - self._current_rewards = [] - self._prev_pop = set() - self._next_pop = set() - - # if window size was not specified than there is no need to store these values for reuse - if self.window_size is None: - self._reset_main_storages() - - def _reset_main_storages(self): - self._observations = deque(maxlen=self.window_size) - self._actions = deque(maxlen=self.window_size) - self._rewards = deque(maxlen=self.window_size) - - def collect_results(self, results: Sequence[Individual]): - for ind in results: - self.collect_result(ind) - self._observations += self._current_observations - self._actions += self._current_actions - self._rewards += self._current_rewards - - def collect_result(self, result: Individual): - if result.uid in self._prev_pop: - return - if not result.parent_operator or result.parent_operator.type_ != 'mutation': - return - self._next_pop.add(result.uid) - obs = result.graph - action = result.parent_operator.operators[0] - prev_fitness = result.parent_operator.parent_individuals[0].fitness.value - # we're minimising the fitness, that's why less is better - # reward is defined as fitness improvement rate (FIR) to stabilize the algorithm - reward = (prev_fitness - result.fitness.value) / abs(prev_fitness) \ - if prev_fitness is not None and prev_fitness != 0 else 0. - self.collect_experience(obs, action, reward) - - def collect_experience(self, obs: ObsType, action: ActType, reward: float): - self._current_observations.append(obs) - self._current_actions.append(action) - self._current_rewards.append(reward) - - def retrieve_experience(self) -> Tuple[List[ObsType], List[ActType], List[float]]: - """Get all collected experience and clear the experience buffer.""" - observations, actions, rewards = self._observations, self._actions, self._rewards - next_pop = self._next_pop - self.reset() - self._prev_pop = next_pop - return list(observations), \ - list(actions), \ - list(rewards) - - class OperatorAgent(ABC): - def __init__(self, enable_logging: bool = True): + def __init__(self, actions: Sequence[ActType], enable_logging: bool = True): + self.actions = list(actions) self._enable_logging = enable_logging self._log = default_log(self) + @property + def available_actions(self) -> Sequence[ActType]: + return self.actions + @abstractmethod def partial_fit(self, experience: ExperienceBuffer): raise NotImplementedError() @@ -104,7 +39,7 @@ def choose_action(self, obs: Optional[ObsType]) -> ActType: raise NotImplementedError() @abstractmethod - def choose_nodes(self, graph: Graph, num_nodes: int = 1) -> Union[GraphNode, Sequence[GraphNode]]: + def choose_nodes(self, graph: ObsType, num_nodes: int = 1) -> Union[GraphNode, Sequence[GraphNode]]: raise NotImplementedError() @abstractmethod @@ -122,14 +57,11 @@ def _dbg_log(self, obs, actions, rewards): nonzero = rr[rr.nonzero()] msg = f'len={len(rr)} nonzero={len(nonzero)} ' if len(nonzero) > 0: - msg += (f'avg={nonzero.mean()} std={nonzero.std()} ' - f'min={nonzero.min()} max={nonzero.max()} ') + msg += (f'avg={nonzero.mean():.3f} std={nonzero.std():.3f} ' + f'min={nonzero.min():.3f} max={nonzero.max():.3f} ') self._log.info(msg) - - actions_names = [action.name if isinstance(action, MutationTypesEnum) else action.__name__ - for action in actions] - self._log.info(f'actions/rewards: {list(zip(actions_names, rr))}') + self._log.info(f'actions/rewards: {list(zip(actions, rr))}') action_values = list(map(self.get_action_values, obs)) action_probs = list(map(self.get_action_probs, obs)) @@ -145,11 +77,10 @@ def __init__(self, actions: Sequence[ActType], probs: Optional[Sequence[float]] = None, enable_logging: bool = True): - self.actions = list(actions) + super().__init__(actions, enable_logging) self._probs = probs or [1. / len(actions)] * len(actions) - super().__init__(enable_logging) - def choose_action(self, obs: ObsType) -> ActType: + def choose_action(self, obs: Graph) -> ActType: action = np.random.choice(self.actions, p=self.get_action_probs(obs)) return action @@ -161,8 +92,8 @@ def partial_fit(self, experience: ExperienceBuffer): obs, actions, rewards = experience.retrieve_experience() self._dbg_log(obs, actions, rewards) - def get_action_probs(self, obs: Optional[ObsType] = None) -> Sequence[float]: + def get_action_probs(self, obs: Optional[Graph] = None) -> Sequence[float]: return self._probs - def get_action_values(self, obs: Optional[ObsType] = None) -> Sequence[float]: + def get_action_values(self, obs: Optional[Graph] = None) -> Sequence[float]: return self._probs diff --git a/golem/core/optimisers/adaptive/reward_agent.py b/golem/core/optimisers/adaptive/reward_agent.py index f5677c4b3..08f17dce7 100644 --- a/golem/core/optimisers/adaptive/reward_agent.py +++ b/golem/core/optimisers/adaptive/reward_agent.py @@ -1,9 +1,5 @@ from typing import List, Tuple -from numpy import sign - -from golem.core.optimisers.adaptive.operator_agent import ObsType - class FitnessRateRankRewardTransformer: """ @@ -32,4 +28,4 @@ def get_decay_values_for_arms(self, rewards: List[float], arms: List[int]) -> Tu def get_fitness_rank_rate(decay_values: List[float]) -> List[float]: # abs() is used to save the initial sign of each decay value total_decay_sum = abs(sum(decay_values)) - return [decay / total_decay_sum for decay in decay_values] if total_decay_sum != 0 else [0.] + return [decay / total_decay_sum for decay in decay_values] if total_decay_sum != 0 else [0.]*len(decay_values) diff --git a/golem/core/optimisers/fitness/fitness.py b/golem/core/optimisers/fitness/fitness.py index 7b2310004..33cdbd446 100644 --- a/golem/core/optimisers/fitness/fitness.py +++ b/golem/core/optimisers/fitness/fitness.py @@ -102,6 +102,9 @@ def __eq__(self, other: 'Fitness') -> bool: self.valid and other.valid and self.allclose(self.values, other.values)) + def __bool__(self) -> bool: + return self.valid + @staticmethod def allclose(values1, values2) -> bool: return np.allclose(values1, values2, rtol=1e-8, atol=1e-10) diff --git a/golem/core/optimisers/genetic/evaluation.py b/golem/core/optimisers/genetic/evaluation.py index 70a516d9b..8c59fb479 100644 --- a/golem/core/optimisers/genetic/evaluation.py +++ b/golem/core/optimisers/genetic/evaluation.py @@ -255,7 +255,7 @@ def evaluate_population(self, individuals: PopulationT) -> PopulationT: if not successful_evals: for single_ind in individuals: evaluation_result = eval_func(single_ind.graph, single_ind.uid, with_time_limit=False) - successful_evals = self.apply_evaluation_results([single_ind], [evaluation_result]) or [] + successful_evals = self.apply_evaluation_results([single_ind], [evaluation_result]) if successful_evals: break MemoryAnalytics.log(self.logger, diff --git a/golem/core/optimisers/genetic/operators/mutation.py b/golem/core/optimisers/genetic/operators/mutation.py index 2b1127299..2005377e9 100644 --- a/golem/core/optimisers/genetic/operators/mutation.py +++ b/golem/core/optimisers/genetic/operators/mutation.py @@ -6,14 +6,14 @@ from golem.core.dag.graph import Graph from golem.core.optimisers.adaptive.mab_agents.contextual_mab_agent import ContextualMultiArmedBanditAgent -from golem.core.optimisers.adaptive.mab_agents.neural_contextual_mab_agent import NeuralContextualMultiArmedBanditAgent from golem.core.optimisers.adaptive.mab_agents.mab_agent import MultiArmedBanditAgent +from golem.core.optimisers.adaptive.mab_agents.neural_contextual_mab_agent import NeuralContextualMultiArmedBanditAgent from golem.core.optimisers.adaptive.operator_agent import \ - OperatorAgent, RandomAgent, ExperienceBuffer, MutationAgentTypeEnum + OperatorAgent, RandomAgent, MutationAgentTypeEnum +from golem.core.optimisers.adaptive.experience_buffer import ExperienceBuffer from golem.core.optimisers.genetic.operators.base_mutations import \ base_mutations_repo, MutationTypesEnum from golem.core.optimisers.genetic.operators.operator import PopulationT, Operator -from golem.core.optimisers.graph import OptGraph from golem.core.optimisers.opt_history_objects.individual import Individual from golem.core.optimisers.opt_history_objects.parent_operator import ParentOperator from golem.core.optimisers.optimization_parameters import GraphRequirements, OptimizationParameters @@ -113,7 +113,7 @@ def _mutation(self, individual: Individual) -> Tuple[Individual, Optional[Mutati break else: # Collect invalid actions - self.agent_experience.collect_experience(individual.graph, mutation_applied, reward=-1.0) + self.agent_experience.collect_experience(individual, mutation_applied, reward=-1.0) else: self.log.debug('Number of mutation attempts exceeded. ' 'Please check optimization parameters for correctness.') @@ -127,7 +127,7 @@ def _sample_num_of_mutations(self) -> int: num_mut = 1 return num_mut - def _apply_mutations(self, new_graph: OptGraph) -> Tuple[OptGraph, Optional[MutationIdType]]: + def _apply_mutations(self, new_graph: Graph) -> Tuple[Graph, Optional[MutationIdType]]: """Apply mutation 1 or few times iteratively""" mutation_type = self._operator_agent.choose_action(new_graph) mutation_applied = None @@ -140,7 +140,7 @@ def _apply_mutations(self, new_graph: OptGraph) -> Tuple[OptGraph, Optional[Muta break return new_graph, mutation_applied - def _adapt_and_apply_mutation(self, new_graph: OptGraph, mutation_type) -> Tuple[OptGraph, bool]: + def _adapt_and_apply_mutation(self, new_graph: Graph, mutation_type) -> Tuple[Graph, bool]: applied = self._will_mutation_be_applied(mutation_type) if applied: # get the mutation function and adapt it diff --git a/golem/core/optimisers/objective/objective.py b/golem/core/optimisers/objective/objective.py index db10b5ed1..e9649b3e3 100644 --- a/golem/core/optimisers/objective/objective.py +++ b/golem/core/optimisers/objective/objective.py @@ -19,6 +19,9 @@ class ObjectiveInfo: is_multi_objective: bool = False metric_names: Sequence[str] = () + def __str__(self): + return f'{self.__class__.__name__}(multi={self.is_multi_objective}, metrics={self.metric_names})' + def format_fitness(self, fitness: Union[Fitness, Sequence[float]]) -> str: """Returns formatted fitness string. Example for 3 metrics: ``""" diff --git a/golem/core/optimisers/opt_history_objects/opt_history.py b/golem/core/optimisers/opt_history_objects/opt_history.py index c8b65fe3b..b14cd551e 100644 --- a/golem/core/optimisers/opt_history_objects/opt_history.py +++ b/golem/core/optimisers/opt_history_objects/opt_history.py @@ -99,9 +99,8 @@ def save_current_results(self, save_dir: Optional[os.PathLike] = None): last_gen = self.generations[last_gen_id] for individual in last_gen: ind_path = Path(save_dir, str(last_gen_id), str(individual.uid)) - if not os.path.isdir(ind_path): - os.makedirs(ind_path) - individual.save(json_file_path=Path(ind_path, f'{str(individual.uid)}.json')) + ind_path.mkdir(exist_ok=True) + individual.save(json_file_path=ind_path / f'{str(individual.uid)}.json') except Exception as ex: self._log.exception(ex) diff --git a/golem/core/utilities/data_structures.py b/golem/core/utilities/data_structures.py index 2e36e3361..c2f1c1610 100644 --- a/golem/core/utilities/data_structures.py +++ b/golem/core/utilities/data_structures.py @@ -1,10 +1,11 @@ import collections.abc +import dataclasses from abc import ABC, abstractmethod from copy import deepcopy from enum import Enum from typing import Callable, Container, Generic, Iterable, Iterator, List, Optional, Sequence, Sized, TypeVar, Union, \ - Tuple + Tuple, Any, Dict T = TypeVar('T') @@ -306,5 +307,11 @@ def __iter__(self): return self -def unzip(tuples: Iterable[Tuple]) -> Tuple[Sequence, Sequence]: +def unzip(tuples: Iterable[Tuple]) -> Tuple[Sequence, ...]: return tuple(zip(*tuples)) + + +def update_dataclass(base_dc: Any, update_dc: Union[Any, Dict]) -> Any: + update_dict = dataclasses.asdict(update_dc) if dataclasses.is_dataclass(update_dc) else update_dc + new_base = dataclasses.replace(base_dc, **update_dict) + return new_base