Skip to content

Commit

Permalink
pep8 fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
samgdotson committed Jan 16, 2025
1 parent e35fdfb commit 88c5f80
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 113 deletions.
119 changes: 61 additions & 58 deletions osier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,23 +188,23 @@ def get_objective_names(res_obj):
This function returns a list of named objectives based on the names of the
functions passed to Osier. In the case of partial functions, the first
keyword value is used.
Parameters
----------
res_obj : :class:`pymoo.Result`
The simulation results object containing all data and metadata.
Returns
-------
obj_columns : list of str
A list of function name strings.
"""
obj_columns=[]
obj_columns = []
for ofunc in res_obj.problem.objectives:
if isinstance(ofunc, types.FunctionType):
obj_columns.append(ofunc.__name__)
elif isinstance(ofunc, functools.partial):
obj_columns.append(list(ofunc.keywords.values())[0])
obj_columns.append(list(ofunc.keywords.values())[0])
return obj_columns


Expand Down Expand Up @@ -259,34 +259,37 @@ def apply_slack(pareto_front, slack, sense='minimize'):
near_optimal_front : :class:`numpy.ndarray`
The near-optimal front.
"""

try:
n_objectives = pareto_front.shape[1]
except IndexError as e:
n_objectives = len(pareto_front)

if isinstance(slack, (list, np.ndarray)):
try:
assert len(slack) == n_objectives
except AssertionError:
print("Number of slack values must equal number of objectives.")
raise ValueError

near_optimal_front = (np.ones(n_objectives)+np.array(slack))*np.array(pareto_front)
near_optimal_front = (np.ones(n_objectives) +
np.array(slack)) * np.array(pareto_front)
if sense.lower() == 'minimize':
near_optimal_front = np.array(pareto_front)*(np.ones(n_objectives)+np.array(slack))
near_optimal_front = np.array(
pareto_front) * (np.ones(n_objectives) + np.array(slack))
return near_optimal_front
elif sense.lower() == 'maximize':
near_optimal_front = np.array(pareto_front)*(np.ones(n_objectives)-np.array(slack))
near_optimal_front = np.array(
pareto_front) * (np.ones(n_objectives) - np.array(slack))
return near_optimal_front

return near_optimal_front
elif isinstance(slack, float):
if sense.lower() == 'minimize':
near_optimal_front = np.array(pareto_front)*(1+slack)
near_optimal_front = np.array(pareto_front) * (1 + slack)
return near_optimal_front
elif sense.lower() == 'maximize':
near_optimal_front = np.array(pareto_front)*(1-slack)
near_optimal_front = np.array(pareto_front) * (1 - slack)
return near_optimal_front

return
Expand All @@ -295,7 +298,7 @@ def apply_slack(pareto_front, slack, sense='minimize'):
def distance_matrix(X, metric='euclidean'):
"""
This function calculates the distance matrix for an MxN matrix and returns
the symmetrical square form of the matrix.
the symmetrical square form of the matrix.
Parameters
----------
Expand All @@ -307,7 +310,7 @@ def distance_matrix(X, metric='euclidean'):
See the documentation for
[`scipy.spatial.distance.pdist`](https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.pdist.html)
for a complete list of values. Default is 'euclidean.'
Returns
-------
D : :class:`numpy.ndarray`
Expand Down Expand Up @@ -342,7 +345,7 @@ def farthest_first(X, D, n_points, start_idx=None, seed=1234):
chosen randomly. Default is `None`.
seed : int
Specifies the seed for a random number generator to ensure repeatable
results. Default is 1234.
results. Default is 1234.
Returns
-------
Expand All @@ -364,8 +367,8 @@ def farthest_first(X, D, n_points, start_idx=None, seed=1234):

if not start_idx:
rng = np.random.default_rng(seed)
start_idx = rng.integers(low=0, high=rows-1)
start_idx = rng.integers(low=0, high=rows - 1)

checked_points.append(start_idx)
prev_mean_dist = None
while len(checked_points) < n_points:
Expand All @@ -381,7 +384,7 @@ def farthest_first(X, D, n_points, start_idx=None, seed=1234):
checked_points.append(j)
break
prev_mean_dist = mean_distance

return np.array(checked_points)


Expand Down Expand Up @@ -432,27 +435,27 @@ def check_if_interior(points, par_front, slack_front):

n_objs = points.shape[1]
interior_idxs = []

checked_points = set()
for i, p in enumerate(points):
if tuple(p) in checked_points:
continue
else:
checked_points.add(tuple(p))
cond1 = np.any((p < slack_front).sum(axis=1)==n_objs)
cond2 = np.any((p > par_front).sum(axis=1)==n_objs)
cond1 = np.any((p < slack_front).sum(axis=1) == n_objs)
cond2 = np.any((p > par_front).sum(axis=1) == n_objs)
if cond1 and cond2:
interior_idxs.append(i)

return np.array(interior_idxs)


def n_mga(results_obj,
n_points=10,
slack=0.1,
sense='minimize',
how='farthest',
seed=1234,
def n_mga(results_obj,
n_points=10,
slack=0.1,
sense='minimize',
how='farthest',
seed=1234,
metric='euclidean',
start_idx=None,
wide_form=False):
Expand All @@ -461,20 +464,20 @@ def n_mga(results_obj,
efficiently search decision space by relaxing the objective function(s) by a
specified amount of slack. This implementation will identify all points
inside of an N-polytope (a polygon in N-dimensions). Then a reduced subset
of points will be selected.
of points will be selected.
The algorithm:
1. Generate a near-optimal front based on the given slack values.
2. Loop through each point in the model's history.
3. Add each point to a set of checked points to prevent repeated calculations.
4. Check if a point is inside the N-polytope bounded by the Pareto and
4. Check if a point is inside the N-polytope bounded by the Pareto and
near-optimal fronts.
5. Select a subset of points based on a random selection or with a farthest first traversal.
5. Select a subset of points based on a random selection or with a farthest first traversal.
Parameters
----------
Expand All @@ -494,19 +497,19 @@ def n_mga(results_obj,
sense : str
Indicates whether the optimization was a minimization or maximization.
If min, the sub-optimal front is greater than the Pareto front. If max,
the sub-optimal front is below the Pareto front. Default is "minimize."
the sub-optimal front is below the Pareto front. Default is "minimize."
how : str
Sets the method used to traverse the near-optimal region. Accepts
['all','random','farthest'].
* `'all'` : Returns all near-optimal points.
* `'random'` : Returns a random selection a set of `n_points` from the near-optimal region.
* `'farthest'` : Returns `n_points` from the near-optimal space by doing a farthest-first-traversal in the design space.
seed : int
Specifies the seed for a random number generator to ensure repeatable
results. Default is 1234.
results. Default is 1234.
metric : str
The string describing how the metric should be calculated. See the
documentation for
Expand Down Expand Up @@ -541,9 +544,9 @@ def n_mga(results_obj,
use the `all` or the `random` options to generate alternative points. Or
inspect their results.
"""

pf = results_obj.F
try:
try:
n_inds, n_objs = pf.shape
except ValueError:
n_inds = pf.shape[0]
Expand All @@ -552,36 +555,36 @@ def n_mga(results_obj,
pop_size = results_obj.algorithm.pop_size
n_gen = results_obj.algorithm.n_gen - 1


pf_slack = apply_slack(pareto_front=pf,
pf_slack = apply_slack(pareto_front=pf,
slack=slack,
sense=sense)
X_hist = np.array([hist.pop.get("X")
for hist in results_obj.history]).reshape(n_gen*pop_size,n_objs)
F_hist = np.array([hist.pop.get("F")
for hist in results_obj.history]).reshape(n_gen*pop_size,n_objs)
try:

X_hist = np.array([hist.pop.get("X") for hist in results_obj.history]).reshape(
n_gen * pop_size, n_objs)
F_hist = np.array([hist.pop.get("F") for hist in results_obj.history]).reshape(
n_gen * pop_size, n_objs)
try:
cols = get_objective_names(results_obj)
except AttributeError:
cols = [f'f{i}' for i in range(n_objs)]

interior_idxs = check_if_interior(F_hist, pf, pf_slack)
X_int = X_hist[interior_idxs]
F_int = F_hist[interior_idxs]

if n_points == 'all':
selected_idxs = np.arange(len(interior_idxs))
elif how == 'random':
rng = np.random.default_rng(seed)
selected_idxs = rng.integers(low=0, high=len(interior_idxs), size=n_points)
selected_idxs = rng.integers(
low=0, high=len(interior_idxs), size=n_points)
elif how == 'farthest':
distance = distance_matrix(X_int, metric=metric)
selected_idxs = farthest_first(X_int,
distance,
n_points=n_points,
start_idx=start_idx,
seed=seed)
selected_idxs = farthest_first(X_int,
distance,
n_points=n_points,
start_idx=start_idx,
seed=seed)
X_select = X_int[selected_idxs]
F_select = F_int[selected_idxs]
mga_df = pd.DataFrame(dict(zip(cols, F_select.T)))
Expand All @@ -599,5 +602,5 @@ def n_mga(results_obj,
mga_df = pd.concat([mga_df, x_df], axis=1)
else:
mga_df['designs'] = [design for design in X_select]
return mga_df

return mga_df
44 changes: 27 additions & 17 deletions tests/test_mga.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from osier import distance_matrix, farthest_first, check_if_interior
from osier.utils import *

#===============PROBLEM SET UP==================
# ===============PROBLEM SET UP==================
problem = get_problem("bnh")

seed = 45
Expand All @@ -22,21 +22,26 @@
algorithm = NSGA2(pop_size=pop_size)

res = minimize(problem,
algorithm,
('n_gen', n_gen),
seed=seed,
verbose=False,
save_history=True
)
algorithm,
('n_gen', n_gen),
seed=seed,
verbose=False,
save_history=True
)
F = res.F
#===============MANUAL MGA FOR COMPARISON==================
# ===============MANUAL MGA FOR COMPARISON==================
slack = 0.1
slack_front = apply_slack(F, slack=slack)

X_hist = np.array([history.pop.get("X") for history in res.history]).reshape(n_gen*pop_size,2)
F_hist = np.array([history.pop.get("F") for history in res.history]).reshape(n_gen*pop_size,2)
X_hist = np.array([history.pop.get("X")
for history in res.history]).reshape(n_gen * pop_size, 2)
F_hist = np.array([history.pop.get("F")
for history in res.history]).reshape(n_gen * pop_size, 2)

int_pts = check_if_interior(points=F_hist, par_front=F, slack_front=slack_front)
int_pts = check_if_interior(
points=F_hist,
par_front=F,
slack_front=slack_front)
X_int = X_hist[int_pts]
F_int = F_hist[int_pts]

Expand All @@ -48,20 +53,25 @@
F_select = F_int[idxs]
X_select = X_int[idxs]

F_df = pd.DataFrame(dict(zip(['f0','f1'], F_select.T)))
X_df = pd.DataFrame(dict(zip(['x0','x1'], X_select.T)))
F_df = pd.DataFrame(dict(zip(['f0', 'f1'], F_select.T)))
X_df = pd.DataFrame(dict(zip(['x0', 'x1'], X_select.T)))
mga_df = pd.concat([F_df, X_df], axis=1)


def test_nmga_integration():
mga_results = n_mga(results_obj=res, seed=seed, wide_form=True)
assert(mga_results.equals(mga_df))
assert (mga_results.equals(mga_df))


def test_nmga_wideform():
mga_results = n_mga(results_obj=res, seed=seed, wide_form=True)
assert(mga_results.shape == (10,4))
assert (mga_results.shape == (10, 4))


def test_nmga_all():
mga_results = n_mga(results_obj=res, n_points='all', seed=seed, wide_form=True)
assert(mga_results.shape == (len(F_int),4))
mga_results = n_mga(
results_obj=res,
n_points='all',
seed=seed,
wide_form=True)
assert (mga_results.shape == (len(F_int), 4))
Loading

0 comments on commit 88c5f80

Please sign in to comment.