From 88c5f80ec90e0e457dcfe3b14d8a213453d6e5c3 Mon Sep 17 00:00:00 2001 From: Sam Dotson Date: Thu, 16 Jan 2025 16:20:26 -0500 Subject: [PATCH] pep8 fixes --- osier/utils.py | 119 +++++++++++++++++++++++--------------------- tests/test_mga.py | 44 +++++++++------- tests/test_utils.py | 78 +++++++++++++++-------------- 3 files changed, 128 insertions(+), 113 deletions(-) diff --git a/osier/utils.py b/osier/utils.py index 3aed8ac..8f6397e 100644 --- a/osier/utils.py +++ b/osier/utils.py @@ -188,23 +188,23 @@ def get_objective_names(res_obj): This function returns a list of named objectives based on the names of the functions passed to Osier. In the case of partial functions, the first keyword value is used. - + Parameters ---------- res_obj : :class:`pymoo.Result` The simulation results object containing all data and metadata. - + Returns ------- obj_columns : list of str A list of function name strings. """ - obj_columns=[] + obj_columns = [] for ofunc in res_obj.problem.objectives: if isinstance(ofunc, types.FunctionType): obj_columns.append(ofunc.__name__) elif isinstance(ofunc, functools.partial): - obj_columns.append(list(ofunc.keywords.values())[0]) + obj_columns.append(list(ofunc.keywords.values())[0]) return obj_columns @@ -259,12 +259,12 @@ def apply_slack(pareto_front, slack, sense='minimize'): near_optimal_front : :class:`numpy.ndarray` The near-optimal front. """ - + try: n_objectives = pareto_front.shape[1] except IndexError as e: n_objectives = len(pareto_front) - + if isinstance(slack, (list, np.ndarray)): try: assert len(slack) == n_objectives @@ -272,21 +272,24 @@ def apply_slack(pareto_front, slack, sense='minimize'): print("Number of slack values must equal number of objectives.") raise ValueError - near_optimal_front = (np.ones(n_objectives)+np.array(slack))*np.array(pareto_front) + near_optimal_front = (np.ones(n_objectives) + + np.array(slack)) * np.array(pareto_front) if sense.lower() == 'minimize': - near_optimal_front = np.array(pareto_front)*(np.ones(n_objectives)+np.array(slack)) + near_optimal_front = np.array( + pareto_front) * (np.ones(n_objectives) + np.array(slack)) return near_optimal_front elif sense.lower() == 'maximize': - near_optimal_front = np.array(pareto_front)*(np.ones(n_objectives)-np.array(slack)) + near_optimal_front = np.array( + pareto_front) * (np.ones(n_objectives) - np.array(slack)) return near_optimal_front - + return near_optimal_front elif isinstance(slack, float): if sense.lower() == 'minimize': - near_optimal_front = np.array(pareto_front)*(1+slack) + near_optimal_front = np.array(pareto_front) * (1 + slack) return near_optimal_front elif sense.lower() == 'maximize': - near_optimal_front = np.array(pareto_front)*(1-slack) + near_optimal_front = np.array(pareto_front) * (1 - slack) return near_optimal_front return @@ -295,7 +298,7 @@ def apply_slack(pareto_front, slack, sense='minimize'): def distance_matrix(X, metric='euclidean'): """ This function calculates the distance matrix for an MxN matrix and returns - the symmetrical square form of the matrix. + the symmetrical square form of the matrix. Parameters ---------- @@ -307,7 +310,7 @@ def distance_matrix(X, metric='euclidean'): See the documentation for [`scipy.spatial.distance.pdist`](https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.pdist.html) for a complete list of values. Default is 'euclidean.' - + Returns ------- D : :class:`numpy.ndarray` @@ -342,7 +345,7 @@ def farthest_first(X, D, n_points, start_idx=None, seed=1234): chosen randomly. Default is `None`. seed : int Specifies the seed for a random number generator to ensure repeatable - results. Default is 1234. + results. Default is 1234. Returns ------- @@ -364,8 +367,8 @@ def farthest_first(X, D, n_points, start_idx=None, seed=1234): if not start_idx: rng = np.random.default_rng(seed) - start_idx = rng.integers(low=0, high=rows-1) - + start_idx = rng.integers(low=0, high=rows - 1) + checked_points.append(start_idx) prev_mean_dist = None while len(checked_points) < n_points: @@ -381,7 +384,7 @@ def farthest_first(X, D, n_points, start_idx=None, seed=1234): checked_points.append(j) break prev_mean_dist = mean_distance - + return np.array(checked_points) @@ -432,27 +435,27 @@ def check_if_interior(points, par_front, slack_front): n_objs = points.shape[1] interior_idxs = [] - + checked_points = set() for i, p in enumerate(points): if tuple(p) in checked_points: continue else: checked_points.add(tuple(p)) - cond1 = np.any((p < slack_front).sum(axis=1)==n_objs) - cond2 = np.any((p > par_front).sum(axis=1)==n_objs) + cond1 = np.any((p < slack_front).sum(axis=1) == n_objs) + cond2 = np.any((p > par_front).sum(axis=1) == n_objs) if cond1 and cond2: interior_idxs.append(i) - + return np.array(interior_idxs) -def n_mga(results_obj, - n_points=10, - slack=0.1, - sense='minimize', - how='farthest', - seed=1234, +def n_mga(results_obj, + n_points=10, + slack=0.1, + sense='minimize', + how='farthest', + seed=1234, metric='euclidean', start_idx=None, wide_form=False): @@ -461,20 +464,20 @@ def n_mga(results_obj, efficiently search decision space by relaxing the objective function(s) by a specified amount of slack. This implementation will identify all points inside of an N-polytope (a polygon in N-dimensions). Then a reduced subset - of points will be selected. + of points will be selected. The algorithm: - + 1. Generate a near-optimal front based on the given slack values. - + 2. Loop through each point in the model's history. 3. Add each point to a set of checked points to prevent repeated calculations. - 4. Check if a point is inside the N-polytope bounded by the Pareto and + 4. Check if a point is inside the N-polytope bounded by the Pareto and near-optimal fronts. - 5. Select a subset of points based on a random selection or with a farthest first traversal. + 5. Select a subset of points based on a random selection or with a farthest first traversal. Parameters ---------- @@ -494,19 +497,19 @@ def n_mga(results_obj, sense : str Indicates whether the optimization was a minimization or maximization. If min, the sub-optimal front is greater than the Pareto front. If max, - the sub-optimal front is below the Pareto front. Default is "minimize." + the sub-optimal front is below the Pareto front. Default is "minimize." how : str Sets the method used to traverse the near-optimal region. Accepts ['all','random','farthest']. - + * `'all'` : Returns all near-optimal points. - + * `'random'` : Returns a random selection a set of `n_points` from the near-optimal region. - + * `'farthest'` : Returns `n_points` from the near-optimal space by doing a farthest-first-traversal in the design space. seed : int Specifies the seed for a random number generator to ensure repeatable - results. Default is 1234. + results. Default is 1234. metric : str The string describing how the metric should be calculated. See the documentation for @@ -541,9 +544,9 @@ def n_mga(results_obj, use the `all` or the `random` options to generate alternative points. Or inspect their results. """ - + pf = results_obj.F - try: + try: n_inds, n_objs = pf.shape except ValueError: n_inds = pf.shape[0] @@ -552,36 +555,36 @@ def n_mga(results_obj, pop_size = results_obj.algorithm.pop_size n_gen = results_obj.algorithm.n_gen - 1 - - pf_slack = apply_slack(pareto_front=pf, + pf_slack = apply_slack(pareto_front=pf, slack=slack, sense=sense) - - X_hist = np.array([hist.pop.get("X") - for hist in results_obj.history]).reshape(n_gen*pop_size,n_objs) - F_hist = np.array([hist.pop.get("F") - for hist in results_obj.history]).reshape(n_gen*pop_size,n_objs) - try: + + X_hist = np.array([hist.pop.get("X") for hist in results_obj.history]).reshape( + n_gen * pop_size, n_objs) + F_hist = np.array([hist.pop.get("F") for hist in results_obj.history]).reshape( + n_gen * pop_size, n_objs) + try: cols = get_objective_names(results_obj) except AttributeError: cols = [f'f{i}' for i in range(n_objs)] - + interior_idxs = check_if_interior(F_hist, pf, pf_slack) X_int = X_hist[interior_idxs] F_int = F_hist[interior_idxs] - + if n_points == 'all': selected_idxs = np.arange(len(interior_idxs)) elif how == 'random': rng = np.random.default_rng(seed) - selected_idxs = rng.integers(low=0, high=len(interior_idxs), size=n_points) + selected_idxs = rng.integers( + low=0, high=len(interior_idxs), size=n_points) elif how == 'farthest': distance = distance_matrix(X_int, metric=metric) - selected_idxs = farthest_first(X_int, - distance, - n_points=n_points, - start_idx=start_idx, - seed=seed) + selected_idxs = farthest_first(X_int, + distance, + n_points=n_points, + start_idx=start_idx, + seed=seed) X_select = X_int[selected_idxs] F_select = F_int[selected_idxs] mga_df = pd.DataFrame(dict(zip(cols, F_select.T))) @@ -599,5 +602,5 @@ def n_mga(results_obj, mga_df = pd.concat([mga_df, x_df], axis=1) else: mga_df['designs'] = [design for design in X_select] - - return mga_df \ No newline at end of file + + return mga_df diff --git a/tests/test_mga.py b/tests/test_mga.py index ad3671c..86c783e 100644 --- a/tests/test_mga.py +++ b/tests/test_mga.py @@ -13,7 +13,7 @@ from osier import distance_matrix, farthest_first, check_if_interior from osier.utils import * -#===============PROBLEM SET UP================== +# ===============PROBLEM SET UP================== problem = get_problem("bnh") seed = 45 @@ -22,21 +22,26 @@ algorithm = NSGA2(pop_size=pop_size) res = minimize(problem, - algorithm, - ('n_gen', n_gen), - seed=seed, - verbose=False, - save_history=True - ) + algorithm, + ('n_gen', n_gen), + seed=seed, + verbose=False, + save_history=True + ) F = res.F -#===============MANUAL MGA FOR COMPARISON================== +# ===============MANUAL MGA FOR COMPARISON================== slack = 0.1 slack_front = apply_slack(F, slack=slack) -X_hist = np.array([history.pop.get("X") for history in res.history]).reshape(n_gen*pop_size,2) -F_hist = np.array([history.pop.get("F") for history in res.history]).reshape(n_gen*pop_size,2) +X_hist = np.array([history.pop.get("X") + for history in res.history]).reshape(n_gen * pop_size, 2) +F_hist = np.array([history.pop.get("F") + for history in res.history]).reshape(n_gen * pop_size, 2) -int_pts = check_if_interior(points=F_hist, par_front=F, slack_front=slack_front) +int_pts = check_if_interior( + points=F_hist, + par_front=F, + slack_front=slack_front) X_int = X_hist[int_pts] F_int = F_hist[int_pts] @@ -48,20 +53,25 @@ F_select = F_int[idxs] X_select = X_int[idxs] -F_df = pd.DataFrame(dict(zip(['f0','f1'], F_select.T))) -X_df = pd.DataFrame(dict(zip(['x0','x1'], X_select.T))) +F_df = pd.DataFrame(dict(zip(['f0', 'f1'], F_select.T))) +X_df = pd.DataFrame(dict(zip(['x0', 'x1'], X_select.T))) mga_df = pd.concat([F_df, X_df], axis=1) + def test_nmga_integration(): mga_results = n_mga(results_obj=res, seed=seed, wide_form=True) - assert(mga_results.equals(mga_df)) + assert (mga_results.equals(mga_df)) def test_nmga_wideform(): mga_results = n_mga(results_obj=res, seed=seed, wide_form=True) - assert(mga_results.shape == (10,4)) + assert (mga_results.shape == (10, 4)) def test_nmga_all(): - mga_results = n_mga(results_obj=res, n_points='all', seed=seed, wide_form=True) - assert(mga_results.shape == (len(F_int),4)) + mga_results = n_mga( + results_obj=res, + n_points='all', + seed=seed, + wide_form=True) + assert (mga_results.shape == (len(F_int), 4)) diff --git a/tests/test_utils.py b/tests/test_utils.py index 924a127..e8bb05a 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -26,13 +26,13 @@ def pymoo_problem(): algorithm = NSGA2(pop_size=pop_size) res = minimize(problem, - algorithm, - ('n_gen', n_gen), - seed=1, - verbose=False, - save_history=True - ) - + algorithm, + ('n_gen', n_gen), + seed=1, + verbose=False, + save_history=True + ) + return problem, res @@ -76,82 +76,84 @@ def test_synchronize_units(technology_set_1): assert [t.unit_power for t in synced] == [u_p, u_p] assert [t.unit_time for t in synced] == [u_t, u_t] assert [t.unit_energy for t in synced] == [u_e, u_e] - - + + def test_apply_slack(): """ Tests that the function to apply slack values works as expected for a single objective, multiple objectives, and raises exceptions.. """ - + slack_values = np.array([0.1, 0.05, 0.2, 0.01]) n_objectives = len(slack_values) - pf1D = np.arange(20,0,-1) + pf1D = np.arange(20, 0, -1) pf4D = np.array([pf1D for i in range(n_objectives)]).T - + for slack in slack_values: - assert np.all(pf1D*(1+slack) == apply_slack(pf1D, slack, sense='minimize')) - - assert np.all(pf4D*(np.ones(n_objectives)+slack_values) == apply_slack(pf4D, slack_values)) - + assert np.all(pf1D * (1 + slack) == + apply_slack(pf1D, slack, sense='minimize')) + + assert np.all(pf4D * (np.ones(n_objectives) + slack_values) + == apply_slack(pf4D, slack_values)) + with pytest.raises(ValueError) as e: apply_slack(pf1D, slack_values) - + def test_distance_matrix_2D(): """ Tests the distance matrix function. """ - + N_techs = 5 population = 10 measure = 'euclidean' - + rng = np.random.default_rng(seed=1234) - + data = rng.multivariate_normal(mean=np.array([1]), cov=np.diag([2]), - size=(population,N_techs)) + size=(population, N_techs)) data = data.reshape((population, N_techs)) - + D = distance_matrix(data, metric=measure) test_matrix = squareform(pdist(data, metric=measure)) - - assert np.allclose(D,test_matrix) - - + + assert np.allclose(D, test_matrix) + + def test_check_if_interior_1(): """ Tests the `check_if_interior` function with a simple grid of points and specific test points. """ x = np.arange(3) - grid = np.array(list(it.product(x,x))) + grid = np.array(list(it.product(x, x))) - pf = np.array([[0,0]]) - sf = np.array([[2,0], [1,1], [0,2]]) - test_points = np.array([[0.5,0.5], [1.5,1.5]]) + pf = np.array([[0, 0]]) + sf = np.array([[2, 0], [1, 1], [0, 2]]) + test_points = np.array([[0.5, 0.5], [1.5, 1.5]]) int_idx = check_if_interior(test_points, pf, sf) - assert np.all(int_idx==[0]) + assert np.all(int_idx == [0]) def test_check_if_interior_2(): """ - Tests the `check_if_interior` function with a + Tests the `check_if_interior` function with a simple grid of points and some random test points. """ x = np.arange(3) - grid = np.array(list(it.product(x,x))) + grid = np.array(list(it.product(x, x))) N = 100 - pf = np.array([[0,0]]) - sf = np.c_[np.linspace(2,0,N), np.linspace(0,2,N)] + pf = np.array([[0, 0]]) + sf = np.c_[np.linspace(2, 0, N), np.linspace(0, 2, N)] rng = np.random.default_rng(seed=1234) - test_points = rng.uniform(low=0, high=2, size=(10,2)) + test_points = rng.uniform(low=0, high=2, size=(10, 2)) int_idx = check_if_interior(test_points, pf, sf) - + assert len(int_idx) == 3 - assert np.all(int_idx==[2,3,8]) + assert np.all(int_idx == [2, 3, 8])