Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve downstream nodes performance with local search #19

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/power_grid_model_ds/_core/model/graphs/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,26 @@ def get_connected(
nodes_to_ignore=self._externals_to_internals(nodes_to_ignore),
inclusive=inclusive,
)

return self._internals_to_externals(nodes)

def get_downstream_nodes(self, node_id: int, stop_node_ids: list[int], inclusive: bool = False) -> list[int]:
"""Find all nodes connected to the node_id
args:
node_id: node id to start the search from
stop_node_ids: list of node ids to stop the search at
inclusive: whether to include the given node id in the result
returns:
list of node ids sorted by distance, downstream of to the node id
"""
downstream_nodes = self._get_downstream_nodes(
node_id=self.external_to_internal(node_id),
stop_node_ids=self._externals_to_internals(stop_node_ids),
inclusive=inclusive,
)

return self._internals_to_externals(downstream_nodes)
Comment on lines +241 to +256
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what way are the stop_node_ids still needed with the tmp_remove_nodes context implemented by @Thijss?
Isn't this equal to:

with graph.tmp_remove_node(stop_node_ids): 
  graph.get_downstream_nodes(node_id = node_id)

Copy link
Contributor

@Thijss Thijss Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functionally it should be equal, but since we aren't modifying the graph here it could give significant better performance?
Perhaps it's a good idea to base this PR on #17 and add a performance comparison.

If the performance increases significantly we can merge the PR. Otherwise we can drop it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could try that, the main performance improve should be in not using get_nearest_substation_node to get_connected on the whole grid. Technically using tmp_remove_nodes can achieve a similar result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue when we want to do this is that we do not have the information about which substation is the feeder, so I do not think the tmp_remove_nodes will work (without additional complexity)


def find_fundamental_cycles(self) -> list[list[int]]:
"""Find all fundamental cycles in the graph.
Returns:
Expand Down Expand Up @@ -273,6 +291,9 @@ def _branch_is_relevant(self, branch: BranchArray) -> bool:
@abstractmethod
def _get_connected(self, node_id: int, nodes_to_ignore: list[int], inclusive: bool = False) -> list[int]: ...

@abstractmethod
def _get_downstream_nodes(self, node_id: int, stop_node_ids: list[int], inclusive: bool = False) -> list[int]: ...

@abstractmethod
def _has_branch(self, from_node_id, to_node_id) -> bool: ...

Expand Down
11 changes: 11 additions & 0 deletions src/power_grid_model_ds/_core/model/graphs/models/rustworkx.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ def _get_connected(self, node_id: int, nodes_to_ignore: list[int], inclusive: bo

return connected_nodes

def _get_downstream_nodes(self, node_id: int, stop_node_ids: list[int], inclusive: bool = False) -> list[int]:
visitor = _NodeVisitor(stop_node_ids)
rx.bfs_search(self._graph, [node_id], visitor)
connected_nodes = visitor.nodes
path_to_substation, _ = self._get_shortest_path(node_id, visitor.discovered_nodes_to_ignore[0])
if inclusive:
_ = path_to_substation.pop(0)
return [node for node in connected_nodes if node not in path_to_substation]

def _find_fundamental_cycles(self) -> list[list[int]]:
"""Find all fundamental cycles in the graph using Rustworkx.

Expand All @@ -112,8 +121,10 @@ class _NodeVisitor(BFSVisitor):
def __init__(self, nodes_to_ignore: list[int]):
self.nodes_to_ignore = nodes_to_ignore
self.nodes: list[int] = []
self.discovered_nodes_to_ignore: list[int] = []

def discover_vertex(self, v):
if v in self.nodes_to_ignore:
self.discovered_nodes_to_ignore.append(v)
raise PruneSearch
self.nodes.append(v)
12 changes: 6 additions & 6 deletions src/power_grid_model_ds/_core/model/grids/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ def get_nearest_substation_node(self, node_id: int):

def get_downstream_nodes(self, node_id: int, inclusive: bool = False):
"""Get the downstream nodes from a node.
Assuming each node has a single feeding substation and the grid is radial

Example:
given this graph: [1] - [2] - [3] - [4], with 1 being a substation node
Expand All @@ -349,15 +350,14 @@ def get_downstream_nodes(self, node_id: int, inclusive: bool = False):
Returns:
list[int]: The downstream nodes.
"""
substation_node_id = self.get_nearest_substation_node(node_id).id.item()
substation_nodes = self.node.filter(node_type=NodeType.SUBSTATION_NODE.value)

if node_id == substation_node_id:
if node_id in substation_nodes.id:
raise NotImplementedError("get_downstream_nodes is not implemented for substation nodes!")

path_to_substation, _ = self.graphs.active_graph.get_shortest_path(node_id, substation_node_id)
upstream_node = path_to_substation[1]

return self.graphs.active_graph.get_connected(node_id, nodes_to_ignore=[upstream_node], inclusive=inclusive)
return self.graphs.active_graph.get_downstream_nodes(
node_id=node_id, stop_node_ids=list(substation_nodes.id), inclusive=inclusive
)

def cache(self, cache_dir: Path, cache_name: str, compress: bool = True):
"""Cache Grid to a folder
Expand Down
18 changes: 9 additions & 9 deletions tests/performance/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
"dtype = [('id', '<i8'), ('test_int', '<i8'), ('test_float', '<f8'), ('test_str', '<U50'), ('test_bool', '?')]; "
)

SETUP_CODES = {
"structured": "import numpy as np;" + NUMPY_DTYPE + "input_array = np.zeros({array_size}, dtype=dtype)",
"rec": "import numpy as np;" + NUMPY_DTYPE + "input_array = np.recarray(({array_size},),dtype=dtype)",
"fancy": "from tests.conftest import FancyTestArray; input_array=FancyTestArray.zeros({array_size});"
+ "import numpy as np;input_array.id = np.arange({array_size})",
ARRAY_SETUP_CODES = {
"structured": "import numpy as np;" + NUMPY_DTYPE + "input_array = np.zeros({size}, dtype=dtype)",
"rec": "import numpy as np;" + NUMPY_DTYPE + "input_array = np.recarray(({size},),dtype=dtype)",
"fancy": "from tests.conftest import FancyTestArray; input_array=FancyTestArray.zeros({size});"
+ "import numpy as np;input_array.id = np.arange({size})",
}

GRAPH_SETUP_CODES = {
"rustworkx": "from power_grid_model_ds.model.grids.base import Grid;"
+ "from power_grid_model_ds.data_source.generator.grid_generators import RadialGridGenerator;"
+ "from power_grid_model_ds.model.graphs.models import RustworkxGraphModel;"
+ "grid=RadialGridGenerator(nr_nodes={graph_size}, grid_class=Grid, graph_model=RustworkxGraphModel).run()",
"rustworkx": "from power_grid_model_ds import Grid;"
+ "from power_grid_model_ds.generators import RadialGridGenerator;"
+ "from power_grid_model_ds.graph_models import RustworkxGraphModel;"
+ "grid=RadialGridGenerator(nr_nodes={size}, grid_class=Grid, graph_model=RustworkxGraphModel).run()",
}

SINGLE_REPEATS = 1000
Expand Down
101 changes: 29 additions & 72 deletions tests/performance/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,100 +4,57 @@

import inspect
import timeit
from typing import Generator
from itertools import product
from typing import Generator, Union

from tests.performance._constants import GRAPH_SETUP_CODES, SETUP_CODES


def do_performance_test(code_to_test: str | dict[str, str], array_sizes: list[int], repeats: int):
"""Run the performance test for the given code."""

def do_performance_test(
code_to_test: Union[str, dict[str, str], list[str]],
size_list: list[int],
repeats: int,
setup_codes: dict[str, str],
):
"""Generalized performance test runner."""
print(f"{'-' * 20} {inspect.stack()[1][3]} {'-' * 20}")

for array_size in array_sizes:
for size in size_list:
formatted_setup_codes = {key: code.format(size=size) for key, code in setup_codes.items()}
if isinstance(code_to_test, dict):
code_to_test_list = [code_to_test[variant].format(array_size=array_size) for variant in SETUP_CODES]
else:
code_to_test_list = [code_to_test.format(array_size=array_size)] * len(SETUP_CODES)
print(f"\n\tArray size: {array_size}\n")
setup_codes = [setup_code.format(array_size=array_size) for setup_code in SETUP_CODES.values()]
timings = _get_timings(setup_codes, code_to_test_list, repeats)

if code_to_test == "pass":
_print_timings(timings, list(SETUP_CODES.keys()), setup_codes)
code_to_test_list = [code_to_test[variant].format(size=size) for variant in setup_codes]
test_generator = zip(formatted_setup_codes.items(), code_to_test_list)
elif isinstance(code_to_test, list):
code_to_test_list = [code.format(size=size) for code in code_to_test]
test_generator = product(formatted_setup_codes.items(), code_to_test_list)
else:
_print_timings(timings, list(SETUP_CODES.keys()), code_to_test_list)
print()
test_generator = product(formatted_setup_codes.items(), [code_to_test.format(size=size)])

print(f"\n\tsize: {size}\n")

def do_graph_test(code_to_test: str | dict[str, str], graph_sizes: list[int], repeats: int):
"""Run the performance test for the given code."""
timings = _get_timings(test_generator, repeats=repeats)
_print_timings(timings)

print(f"{'-' * 20} {inspect.stack()[1][3]} {'-' * 20}")

for graph_size in graph_sizes:
if isinstance(code_to_test, dict):
code_to_test_list = [code_to_test[variant] for variant in GRAPH_SETUP_CODES]
else:
code_to_test_list = [code_to_test] * len(GRAPH_SETUP_CODES)
print(f"\n\tGraph size: {graph_size}\n")
setup_codes = [setup_code.format(graph_size=graph_size) for setup_code in GRAPH_SETUP_CODES.values()]
timings = _get_timings(setup_codes, code_to_test_list, repeats)

if code_to_test == "pass":
_print_graph_timings(timings, list(GRAPH_SETUP_CODES.keys()), setup_codes)
else:
_print_graph_timings(timings, list(GRAPH_SETUP_CODES.keys()), code_to_test_list)
print()


def _print_test_code(code: str | dict[str, str], repeats: int):
print(f"{'-' * 40}")
if isinstance(code, dict):
for variant, code_variant in code.items():
print(f"{variant}")
print(f"\t{code_variant} (x {repeats})")
return
print(f"{code} (x {repeats})")


def _print_graph_timings(timings: Generator, graph_types: list[str], code_list: list[str]):
for graph_type, timing, code in zip(graph_types, timings, code_list):
if ";" in code:
code = code.split(";")[-1]

code = code.replace("\n", " ").replace("\t", " ")
code = f"{graph_type}: " + code

if isinstance(timing, Exception):
print(f"\t\t{code.ljust(100)} | Not supported")
continue
print(f"\t\t{code.ljust(100)} | {sum(timing):.2f}s")


def _print_timings(timings: Generator, array_types: list[str], code_list: list[str]):
for array, timing, code in zip(array_types, timings, code_list):
if ";" in code:
code = code.split(";")[-1]

code = code.replace("\n", " ").replace("\t", " ")
array_name = f"{array}_array"
code = code.replace("input_array", array_name)
def _print_timings(timings: Generator):
for key, code, timing in timings:
code = code.split(";")[-1].replace("\n", " ").replace("\t", " ")
code = f"{key}: {code}"

if isinstance(timing, Exception):
print(f"\t\t{code.ljust(100)} | Not supported")
continue
print(f"\t\t{code.ljust(100)} | {sum(timing):.2f}s")


def _get_timings(setup_codes: list[str], test_codes: list[str], repeats: int):
def _get_timings(test_generator, repeats: int):
"""Return a generator with the timings for each array type."""
for setup_code, test_code in zip(setup_codes, test_codes):
for (key, setup_code), test_code in test_generator:
if test_code == "pass":
yield timeit.repeat(setup_code, number=repeats)
yield key, "intialise", timeit.repeat(setup_code, number=repeats)
else:
try:
yield timeit.repeat(test_code, setup_code, number=repeats)
yield key, test_code, timeit.repeat(test_code, setup_code, number=repeats)
# pylint: disable=broad-exception-caught
except Exception as error: # noqa
yield error
yield key, test_code, error
54 changes: 30 additions & 24 deletions tests/performance/array_performance_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,24 @@

import logging

from tests.performance._constants import ARRAY_SIZES_LARGE, ARRAY_SIZES_SMALL, LOOP_REPEATS, SINGLE_REPEATS
from tests.performance._constants import (
ARRAY_SETUP_CODES,
ARRAY_SIZES_LARGE,
ARRAY_SIZES_SMALL,
LOOP_REPEATS,
SINGLE_REPEATS,
)
from tests.performance._helpers import do_performance_test

logging.basicConfig(level=logging.INFO)


def perftest_initialize():
do_performance_test("pass", ARRAY_SIZES_LARGE, SINGLE_REPEATS)
do_performance_test("pass", ARRAY_SIZES_LARGE, SINGLE_REPEATS, ARRAY_SETUP_CODES)


def perftest_slice():
do_performance_test("input_array[0:10]", ARRAY_SIZES_LARGE, SINGLE_REPEATS)
do_performance_test("input_array[0:10]", ARRAY_SIZES_LARGE, SINGLE_REPEATS, ARRAY_SETUP_CODES)


def perftest_set_attr():
Expand All @@ -31,77 +37,77 @@ def perftest_set_attr():
"rec": "input_array.id = 1",
"fancy": "input_array.id = 1",
}
do_performance_test(code_to_test, ARRAY_SIZES_LARGE, SINGLE_REPEATS)
do_performance_test(code_to_test, ARRAY_SIZES_LARGE, SINGLE_REPEATS, ARRAY_SETUP_CODES)


def perftest_set_field():
do_performance_test("input_array['id'] = 1", ARRAY_SIZES_LARGE, SINGLE_REPEATS)
do_performance_test("input_array['id'] = 1", ARRAY_SIZES_LARGE, SINGLE_REPEATS, ARRAY_SETUP_CODES)


def perftest_loop_slice_1():
code_to_test = "for i in range({array_size}): input_array[i]"
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, LOOP_REPEATS)
code_to_test = "for i in range({size}): input_array[i]"
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, LOOP_REPEATS, ARRAY_SETUP_CODES)


def perftest_loop_data_slice_1():
code_to_test = {
"structured": "for i in range({array_size}): input_array[i]",
"rec": "for i in range({array_size}): input_array[i]",
"fancy": "for i in range({array_size}): input_array.data[i]",
"structured": "for i in range({size}): input_array[i]",
"rec": "for i in range({size}): input_array[i]",
"fancy": "for i in range({size}): input_array.data[i]",
}
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, LOOP_REPEATS)
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, LOOP_REPEATS, ARRAY_SETUP_CODES)


def perftest_loop_slice():
code_to_test = "for i in range({array_size}): input_array[i:i+1]"
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, LOOP_REPEATS)
code_to_test = "for i in range({size}): input_array[i:i+1]"
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, LOOP_REPEATS, ARRAY_SETUP_CODES)


def perftest_loop_set_field():
code_to_test = "for i in range({array_size}): input_array['id'][i] = 1"
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, LOOP_REPEATS)
code_to_test = "for i in range({size}): input_array['id'][i] = 1"
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, LOOP_REPEATS, ARRAY_SETUP_CODES)


def perftest_loop_get_field():
code_to_test = "for row in input_array: row['id']"
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, LOOP_REPEATS)
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, LOOP_REPEATS, ARRAY_SETUP_CODES)


def perftest_loop_data_get_field():
code_to_test = "for row in input_array.data: row['id']"
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, LOOP_REPEATS)
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, LOOP_REPEATS, ARRAY_SETUP_CODES)


def perftest_loop_get_attr():
code_to_test = "for row in input_array: row.id"
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, 100)
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, 100, ARRAY_SETUP_CODES)


def perftest_fancypy_concat():
code_to_test = {
"structured": "import numpy as np;np.concatenate([input_array, input_array])",
"rec": "import numpy as np;np.concatenate([input_array, input_array])",
"fancy": "import power_grid_model_ds._core.fancypy as fp;fp.concatenate(input_array, input_array)",
"fancy": "import power_grid_model_ds.fancypy as fp;fp.concatenate(input_array, input_array)",
}
do_performance_test(code_to_test, ARRAY_SIZES_LARGE, 100)
do_performance_test(code_to_test, ARRAY_SIZES_LARGE, 100, ARRAY_SETUP_CODES)


def perftest_fancypy_unique():
code_to_test = {
"structured": "import numpy as np;np.unique(input_array)",
"rec": "import numpy as np;np.unique(input_array)",
"fancy": "import power_grid_model_ds._core.fancypy as fp;fp.unique(input_array)",
"fancy": "import power_grid_model_ds.fancypy as fp;fp.unique(input_array)",
}
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, 100)
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, 100, ARRAY_SETUP_CODES)


def perftest_fancypy_sort():
code_to_test = {
"structured": "import numpy as np;np.sort(input_array)",
"rec": "import numpy as np;np.sort(input_array)",
"fancy": "import power_grid_model_ds._core.fancypy as fp;fp.sort(input_array)",
"fancy": "import power_grid_model_ds.fancypy as fp;fp.sort(input_array)",
}
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, 100)
do_performance_test(code_to_test, ARRAY_SIZES_SMALL, 100, ARRAY_SETUP_CODES)


if __name__ == "__main__":
Expand Down
Loading