Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor #658

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
50 changes: 27 additions & 23 deletions amlb/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ def __init__(
self.framework_def, self.framework_name = framework, framework.name
log.debug("Using framework definition: %s.", self.framework_def)

self.constraint_def, self.constraint_name = rget().constraint_definition(
constraint_name
task_constraint = rget().constraint_definition(constraint_name)
self.constraint_def, self.constraint_name = (
task_constraint,
task_constraint.name,
)
log.debug("Using constraint definition: %s.", self.constraint_def)

Expand Down Expand Up @@ -614,33 +616,32 @@ def handle_unfulfilled(message, on_auto="warn"):
os_recommended_mem = ns.get(
rconfig(), f"{mode}.os_mem_size_mb", rconfig().benchmarks.os_mem_size_mb
)
left_for_app_mem = int(sys_mem.available - os_recommended_mem)
assigned_mem = round(
self.max_mem_size_mb
if self.max_mem_size_mb > 0
else left_for_app_mem
if left_for_app_mem > 0
else sys_mem.available
)

if self.max_mem_size_mb <= 0:
left_for_app_mem = int(sys_mem.available - os_recommended_mem)
self.max_mem_size_mb = (
left_for_app_mem if left_for_app_mem > 0 else sys_mem.available
)
self.max_mem_size_mb = round(self.max_mem_size_mb)

if self.max_mem_size_mb > sys_mem.total:
raise JobError(
f"Total system memory {sys_mem.total} MB does not meet requirements (max_mem_size_mb={self.max_mem_size_mb} MB)!.",
)

log.info(
"Assigning %.f MB (total=%.f MB) for new %s task.",
assigned_mem,
self.max_mem_size_mb,
sys_mem.total,
self.name,
)
self.max_mem_size_mb = assigned_mem
if assigned_mem > sys_mem.total:
if self.max_mem_size_mb > sys_mem.available:
handle_unfulfilled(
f"Total system memory {sys_mem.total} MB does not meet requirements ({assigned_mem} MB)!.",
on_auto="fail",
f"Assigned memory ({self.max_mem_size_mb} MB) exceeds system available memory ({sys_mem.available} MB / total={sys_mem.total} MB)!"
)
elif assigned_mem > sys_mem.available:
elif self.max_mem_size_mb > sys_mem.total - os_recommended_mem:
handle_unfulfilled(
f"Assigned memory ({assigned_mem} MB) exceeds system available memory ({sys_mem.available} MB / total={sys_mem.total} MB)!"
)
elif assigned_mem > sys_mem.total - os_recommended_mem:
handle_unfulfilled(
f"Assigned memory ({assigned_mem} MB) is within {sys_mem.available} MB of system total memory {sys_mem.total} MB): "
f"Assigned memory ({self.max_mem_size_mb} MB) is within {sys_mem.available} MB of system total memory {sys_mem.total} MB): "
f"We recommend a {os_recommended_mem} MB buffer, otherwise OS memory usage might interfere with the benchmark task."
)

Expand All @@ -649,12 +650,15 @@ def handle_unfulfilled(message, on_auto="warn"):
os_recommended_vol = rconfig().benchmarks.os_vol_size_mb
if self.min_vol_size_mb > sys_vol.free:
handle_unfulfilled(
f"Available storage ({sys_vol.free} MB / total={sys_vol.total} MB) does not meet requirements ({self.min_vol_size_mb+os_recommended_vol} MB)!"
f"Available storage ({sys_vol.free} MB / total={sys_vol.total} MB) does not meet requirements (min_vol_size_mb={self.min_vol_size_mb} MB)!"
)
elif self.min_vol_size_mb > sys_vol.free + os_recommended_vol:
handle_unfulfilled(
f"Required storage min_vol_size_mb ({self.min_vol_size_mb}MB) together with recommended storage for OS ({os_recommended_vol} MB exceeds available storage ({sys_vol.free} MB)."
)


class BenchmarkTask:

def __init__(self, benchmark: Benchmark, task_def, fold):
"""

Expand Down
6 changes: 3 additions & 3 deletions amlb/benchmarks/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ def benchmark_load(
name, benchmark_definition_dirs
)

hard_defaults = next((task for task in tasks if task.name == "__defaults__"), None)
tasks = [task for task in tasks if task is not hard_defaults]
file_defaults = next((task for task in tasks if task.name == "__defaults__"), None)
tasks = [task for task in tasks if task is not file_defaults]
for t in tasks:
t.name = str_sanitize(t.name)
return hard_defaults, tasks, benchmark_path, str_sanitize(benchmark_name)
return file_defaults, tasks, benchmark_path, str_sanitize(benchmark_name)
21 changes: 21 additions & 0 deletions amlb/frameworks/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,24 @@ def load_framework_definition(
framework_name, tag = framework_name.split(":", 1)
definition_ns, name = configuration.framework_definition(framework_name, tag)
return Framework(**Namespace.dict(definition_ns))


@dataclass
class TaskConstraint:
name: str
folds: int
max_runtime_seconds: int
cores: int
min_vol_size_mb: int | None = None
ec2_volume_type: str | None = None


@dataclass
class Task(TaskConstraint):
dataset: Namespace | None = None # TODO: Specify file dataset description
enabled: bool = True
description: str = ""
openml_task_id: int | None = None
metric: str | list[str] | None = None
# Specific to time series
quantile_levels: list[float] | None = None
163 changes: 88 additions & 75 deletions amlb/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from __future__ import annotations

import copy
import dataclasses
import logging
import os
import random
Expand All @@ -14,6 +15,7 @@

from amlb.benchmarks.parser import benchmark_load
from amlb.frameworks import default_tag, load_framework_definitions
from .frameworks.definitions import TaskConstraint
from .utils import (
Namespace,
lazy_property,
Expand Down Expand Up @@ -172,7 +174,7 @@ def _frameworks(self):
return load_framework_definitions(frameworks_file, self.config)

@memoize
def constraint_definition(self, name):
def constraint_definition(self, name: str) -> TaskConstraint:
"""
:param name: name of the benchmark constraint definition as defined in the constraints file
:return: a Namespace object with the constraint config (folds, cores, max_runtime_seconds, ...) for the current benchmamk run.
Expand All @@ -184,7 +186,7 @@ def constraint_definition(self, name):
name, self.config.benchmarks.constraints_file
)
)
return constraint, constraint.name
return TaskConstraint(**Namespace.dict(constraint))

@lazy_property
def _constraints(self):
Expand All @@ -206,42 +208,44 @@ def _constraints(self):
constraints_lookup[name.lower()] = c
return constraints_lookup

# @memoize
def benchmark_definition(self, name, defaults=None):
def benchmark_definition(self, name: str, defaults: TaskConstraint | None = None):
return self._benchmark_definition(name, self.config, defaults)

def _benchmark_definition(
self,
name: str,
config_: Namespace,
defaults_for_task: TaskConstraint | None = None,
):
"""
:param name: name of the benchmark as defined by resources/benchmarks/{name}.yaml, the path to a user-defined benchmark description file or a study id.
:param defaults: defaults used as a base config for each task in the benchmark definition
:return:
"""
hard_defaults, tasks, benchmark_path, benchmark_name = benchmark_load(
name, self.config.benchmarks.definition_dir
file_defaults, tasks, benchmark_path, benchmark_name = benchmark_load(
name, config_.benchmarks.definition_dir
)

defaults = None
if defaults_for_task is not None:
defaults = Namespace(**dataclasses.asdict(defaults_for_task))
defaults = Namespace.merge(
defaults, hard_defaults, Namespace(name="__defaults__")
defaults, file_defaults, Namespace(name="__defaults__")
)
for task in tasks:
task |= defaults # add missing keys from hard defaults + defaults
self._validate_task(task)
Resources._validate_task(task)
Resources._add_task_defaults(task, config_)

self._validate_task(defaults, lenient=True)
Resources._add_task_defaults(defaults, config_)
defaults.enabled = False
tasks.append(defaults)
log.debug("Available task definitions:\n%s", tasks)
return tasks, benchmark_name, benchmark_path

def _validate_task(self, task, lenient=False):
missing = []
for conf in ["name"]:
if task[conf] is None:
missing.append(conf)
if not lenient and len(missing) > 0:
raise ValueError(
"{missing} mandatory properties as missing in task definition {taskdef}.".format(
missing=missing, taskdef=task
)
)

@staticmethod
def _add_task_defaults(task: Namespace, config_: Namespace):
if task["id"] is None:
task["id"] = Resources.generate_task_identifier(task)
for conf in [
"max_runtime_seconds",
"cores",
Expand All @@ -251,75 +255,84 @@ def _validate_task(self, task, lenient=False):
"quantile_levels",
]:
if task[conf] is None:
task[conf] = self.config.benchmarks.defaults[conf]
task[conf] = config_.benchmarks.defaults[conf]
log.debug(
"Config `{config}` not set for task {name}, using default `{value}`.".format(
config=conf, name=task.name, value=task[conf]
config=conf, name=task["name"], value=task[conf]
)
)

conf = "id"
if task[conf] is None:
task[conf] = (
"openml.org/t/{}".format(task.openml_task_id)
if task["openml_task_id"] is not None
else "openml.org/d/{}".format(task.openml_dataset_id)
if task["openml_dataset_id"] is not None
else (
(
task.dataset["id"]
if isinstance(task.dataset, (dict, Namespace))
else task.dataset
if isinstance(task.dataset, str)
else None
)
or task.name
)
if task["dataset"] is not None
else None
)
if not lenient and task[conf] is None:
raise ValueError(
"task definition must contain an ID or one property "
"among ['openml_task_id', 'dataset'] to create an ID, "
"but task definition is {task}".format(task=str(task))
)
if task["metric"] is None:
task["metric"] = None

conf = "metric"
if task[conf] is None:
task[conf] = None

conf = "ec2_instance_type"
if task[conf] is None:
i_series = self.config.aws.ec2.instance_type.series
i_map = self.config.aws.ec2.instance_type.map
if str(task.cores) in i_map:
i_size = i_map[str(task.cores)]
elif task.cores > 0:
supported_cores = list(
map(int, Namespace.dict(i_map).keys() - {"default"})
)
supported_cores.sort()
cores = next((c for c in supported_cores if c >= task.cores), "default")
i_size = i_map[str(cores)]
else:
i_size = i_map.default
task[conf] = ".".join([i_series, i_size])
if task["ec2_instance_type"] is None:
task["ec2_instance_type"] = Resources.lookup_ec2_instance_type(
config_, task.cores
)
log.debug(
"Config `{config}` not set for task {name}, using default selection `{value}`.".format(
config=conf, name=task.name, value=task[conf]
config=conf, name=task["name"], value=task["ec2_instance_type"]
)
)

conf = "ec2_volume_type"
if task[conf] is None:
task[conf] = self.config.aws.ec2.volume_type
if task["ec2_volume_type"] is None:
task["ec2_volume_type"] = config_.aws.ec2.volume_type
log.debug(
"Config `{config}` not set for task {name}, using default `{value}`.".format(
config=conf, name=task.name, value=task[conf]
config=conf, name=task["name"], value=task["ec2_volume_type"]
)
)

@staticmethod
def _validate_task(task: Namespace) -> None:
"""Raises ValueError if task does not have a name and a way to generate an identifier."""
if task["name"] is None:
raise ValueError(
f"`name` is mandatory but missing in task definition {task}."
)
task_id = Namespace.get(task, "id", Resources.generate_task_identifier(task))
if task_id is None:
raise ValueError(
"task definition must contain an ID or one property "
"among ['openml_task_id', 'dataset'] to create an ID, "
"but task definition is {task}".format(task=str(task))
)

@staticmethod
def lookup_ec2_instance_type(config_: Namespace, cores: int) -> str:
i_series = config_.aws.ec2.instance_type.series
i_map = config_.aws.ec2.instance_type.map
i_size = Resources.lookup_suitable_instance_size(i_map, cores)
return f"{i_series}.{i_size}"

@staticmethod
def lookup_suitable_instance_size(cores_to_size: Namespace, cores: int) -> str:
if str(cores) in cores_to_size:
return cores_to_size[str(cores)]

supported_cores = list(map(int, set(dir(cores_to_size)) - {"default"}))
if cores <= 0 or cores > max(supported_cores):
return cores_to_size.default

best_match = next(
(str(c) for c in sorted(supported_cores) if c >= cores), "default"
)
return cores_to_size[best_match]

@staticmethod
def generate_task_identifier(task: Namespace) -> str | None:
if task["openml_task_id"] is not None:
return f"openml.org/t/{task.openml_task_id}"
if task["openml_dataset_id"] is not None:
return f"openml.org/d/{task.openml_dataset_id}"
if task["dataset"] is None:
return None
if isinstance(task.dataset, (dict, Namespace)):
return task.dataset["id"]
if isinstance(task.dataset, str):
return task.dataset
return task.name


__INSTANCE__: Resources | None = None

Expand Down
8 changes: 5 additions & 3 deletions frameworks/FEDOT/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ def run_fedot_tabular(dataset: Dataset, config: TaskConfig):
__file__, "exec.py", input_data=data, dataset=dataset, config=config
)


def run_fedot_timeseries(dataset: Dataset, config: TaskConfig):
from frameworks.shared.caller import run_in_venv

dataset = deepcopy(dataset)

data = dict(
Expand All @@ -43,6 +45,6 @@ def run_fedot_timeseries(dataset: Dataset, config: TaskConfig):
repeated_item_id=dataset.repeated_item_id,
)

return run_in_venv(__file__, "exec_ts.py",
input_data=data, dataset=dataset, config=config)

return run_in_venv(
__file__, "exec_ts.py", input_data=data, dataset=dataset, config=config
)
Loading
Loading