From 2ff33b2bf04b7c1f505ef420478d31a1a7c7d257 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Fri, 17 Jan 2025 15:08:04 +0000 Subject: [PATCH 01/10] Add deployment related attributes Signed-off-by: Ankita Katiyar --- kedro/pipeline/pipeline.py | 40 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index 826acd1b13..8f14bd0c14 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -369,6 +369,46 @@ def grouped_nodes(self) -> list[list[Node]]: return [list(group) for group in self._toposorted_groups] + @property + def grouped_by_namespace(self): + """Return a dictionary of the pipeline nodes grouped by namespace. + + Returns: + The pipeline nodes grouped by namespace. + """ + nodes_by_namespace = defaultdict(list) + for node in self.nodes: + if node.namespace: + nodes_by_namespace[node.namespace].append(node) + else: + nodes_by_namespace[node.name].append(node) + return nodes_by_namespace + + @property + def node_dependencies_by_namespace(self): + """Return a dictionary of the pipeline nodes dependencies grouped by namespace. + + Returns: + The pipeline nodes dependencies grouped by namespace. + """ + node_dependencies_by_namespace = defaultdict(dict) + for node in self.nodes: + key = node.namespace if node.namespace else node.name + for parent in self.node_dependencies[node]: + if key not in node_dependencies_by_namespace: + node_dependencies_by_namespace[key] = [] + if parent.namespace and parent.namespace != key: + node_dependencies_by_namespace[key].append(parent.namespace) + elif parent.namespace and parent.namespace == key: + continue + else: + node_dependencies_by_namespace[key].append(parent.name) + + node_dependencies_by_namespace = { + key: set(value) for key, value in node_dependencies_by_namespace.items() + } + return node_dependencies_by_namespace + def only_nodes(self, *node_names: str) -> Pipeline: """Create a new ``Pipeline`` which will contain only the specified nodes by name. From ef4d664e4c1946363a3883bb528acc68e0a4dd8c Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Wed, 29 Jan 2025 13:49:06 +0000 Subject: [PATCH 02/10] Update with feedback Signed-off-by: Ankita Katiyar --- kedro/pipeline/pipeline.py | 46 +++++++++++++------------------------- 1 file changed, 15 insertions(+), 31 deletions(-) diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index 8f14bd0c14..42ce40b63e 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -370,44 +370,28 @@ def grouped_nodes(self) -> list[list[Node]]: return [list(group) for group in self._toposorted_groups] @property - def grouped_by_namespace(self): - """Return a dictionary of the pipeline nodes grouped by namespace. - - Returns: - The pipeline nodes grouped by namespace. - """ - nodes_by_namespace = defaultdict(list) - for node in self.nodes: - if node.namespace: - nodes_by_namespace[node.namespace].append(node) - else: - nodes_by_namespace[node.name].append(node) - return nodes_by_namespace - - @property - def node_dependencies_by_namespace(self): - """Return a dictionary of the pipeline nodes dependencies grouped by namespace. - - Returns: - The pipeline nodes dependencies grouped by namespace. - """ - node_dependencies_by_namespace = defaultdict(dict) + def grouped_nodes_by_namespace(self) -> dict[str, dict[str, Any]]: + grouped_nodes: dict[str, dict[str, Any]] = defaultdict(dict) for node in self.nodes: - key = node.namespace if node.namespace else node.name + key = node.namespace or node.name + if key not in grouped_nodes: + grouped_nodes[key] = {} + grouped_nodes[key]["name"] = key + grouped_nodes[key]["type"] = "namespace" if node.namespace else "node" + grouped_nodes[key]["nodes"] = [*grouped_nodes[key].get("nodes", []), node] + deps = set() for parent in self.node_dependencies[node]: - if key not in node_dependencies_by_namespace: - node_dependencies_by_namespace[key] = [] if parent.namespace and parent.namespace != key: - node_dependencies_by_namespace[key].append(parent.namespace) + deps.add(parent.namespace) elif parent.namespace and parent.namespace == key: continue else: - node_dependencies_by_namespace[key].append(parent.name) + deps.add(parent.name) + grouped_nodes[key]["dependencies"] = ( + grouped_nodes[key].get("dependencies", set()) | deps + ) - node_dependencies_by_namespace = { - key: set(value) for key, value in node_dependencies_by_namespace.items() - } - return node_dependencies_by_namespace + return grouped_nodes def only_nodes(self, *node_names: str) -> Pipeline: """Create a new ``Pipeline`` which will contain only the specified From a299ef36304a69bf753e81329332bbff6658ac85 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Fri, 31 Jan 2025 16:48:28 +0000 Subject: [PATCH 03/10] Minor formatting Signed-off-by: Ankita Katiyar --- kedro/pipeline/pipeline.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index 42ce40b63e..ff032fbf69 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -371,6 +371,8 @@ def grouped_nodes(self) -> list[list[Node]]: @property def grouped_nodes_by_namespace(self) -> dict[str, dict[str, Any]]: + """Return a dictionary of the pipeline nodes grouped by namespace with + information about the nodes, their type, and dependencies.""" grouped_nodes: dict[str, dict[str, Any]] = defaultdict(dict) for node in self.nodes: key = node.namespace or node.name @@ -379,18 +381,17 @@ def grouped_nodes_by_namespace(self) -> dict[str, dict[str, Any]]: grouped_nodes[key]["name"] = key grouped_nodes[key]["type"] = "namespace" if node.namespace else "node" grouped_nodes[key]["nodes"] = [*grouped_nodes[key].get("nodes", []), node] - deps = set() + dependencies = set() for parent in self.node_dependencies[node]: if parent.namespace and parent.namespace != key: - deps.add(parent.namespace) + dependencies.add(parent.namespace) elif parent.namespace and parent.namespace == key: continue else: - deps.add(parent.name) + dependencies.add(parent.name) grouped_nodes[key]["dependencies"] = ( - grouped_nodes[key].get("dependencies", set()) | deps + grouped_nodes[key].get("dependencies", set()) | dependencies ) - return grouped_nodes def only_nodes(self, *node_names: str) -> Pipeline: From b79f1f55ef6e6b9c3b467d9ed100054a7d260bc9 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Mon, 3 Feb 2025 15:45:48 +0000 Subject: [PATCH 04/10] Add test Signed-off-by: Ankita Katiyar --- tests/pipeline/test_pipeline.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 746ea4794a..13e079269e 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -376,6 +376,34 @@ def test_node_dependencies(self, complex_pipeline): } assert actual == expected + def test_node_grouping_by_namespace(self): + pipeline = modular_pipeline( + [ + node(identity, "A", "B", name="node1", namespace="name_1"), + node(identity, "B", "C", name="node2", namespace="name_1"), + node(identity, "C", "D", name="node3", namespace="name_2"), + node(identity, "D", "E", name="node4", namespace="name_2"), + node(identity, "E", "G", name="node5"), + ] + ) + grouped = pipeline.grouped_nodes_by_namespace + # Validate keys for namespace groups + for key in ["name_1", "name_2"]: + assert key in grouped + assert grouped[key]["name"] == key + assert grouped[key]["type"] == "namespace" + assert len(grouped[key]["nodes"]) == 2 + + # Validate dependencies for namespace groups + assert grouped["name_1"]["dependencies"] == set() + assert grouped["name_2"]["dependencies"] == {"name_1"} + + # Validate nodes for namespace groups + assert grouped["node5"]["type"] == "node" + assert grouped["node5"]["name"] == "node5" + assert len(grouped["node5"]["nodes"]) == 1 + assert grouped["node5"]["dependencies"] == {"name_2"} + @pytest.fixture def pipeline_with_circle(): From a8b1f45c200a74c52e9cb455b86e203bb38973cd Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Mon, 3 Feb 2025 15:51:51 +0000 Subject: [PATCH 05/10] Update test Signed-off-by: Ankita Katiyar --- tests/pipeline/test_pipeline.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 13e079269e..985874cc3b 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -384,6 +384,7 @@ def test_node_grouping_by_namespace(self): node(identity, "C", "D", name="node3", namespace="name_2"), node(identity, "D", "E", name="node4", namespace="name_2"), node(identity, "E", "G", name="node5"), + node(identity, "G", "H", name="node6"), ] ) grouped = pipeline.grouped_nodes_by_namespace @@ -403,6 +404,8 @@ def test_node_grouping_by_namespace(self): assert grouped["node5"]["name"] == "node5" assert len(grouped["node5"]["nodes"]) == 1 assert grouped["node5"]["dependencies"] == {"name_2"} + # Validate when node depends on node + assert grouped["node6"]["dependencies"] == {"node5"} @pytest.fixture From ada46e663040072f0fbf9284b4d7badc39c0d795 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Mon, 10 Feb 2025 10:58:52 +0000 Subject: [PATCH 06/10] Update with feedback Signed-off-by: Ankita Katiyar --- kedro/pipeline/pipeline.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index ff032fbf69..b3f9f1d347 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -380,7 +380,9 @@ def grouped_nodes_by_namespace(self) -> dict[str, dict[str, Any]]: grouped_nodes[key] = {} grouped_nodes[key]["name"] = key grouped_nodes[key]["type"] = "namespace" if node.namespace else "node" - grouped_nodes[key]["nodes"] = [*grouped_nodes[key].get("nodes", []), node] + grouped_nodes[key]["nodes"] = [] + grouped_nodes[key]["dependencies"] = set() + grouped_nodes[key]["nodes"].append(node) dependencies = set() for parent in self.node_dependencies[node]: if parent.namespace and parent.namespace != key: @@ -389,9 +391,7 @@ def grouped_nodes_by_namespace(self) -> dict[str, dict[str, Any]]: continue else: dependencies.add(parent.name) - grouped_nodes[key]["dependencies"] = ( - grouped_nodes[key].get("dependencies", set()) | dependencies - ) + grouped_nodes[key]["dependencies"].update(dependencies) return grouped_nodes def only_nodes(self, *node_names: str) -> Pipeline: From cee4a2ca427d693dfa46e22e83f525e2c0db8d2f Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Mon, 10 Feb 2025 16:14:13 +0000 Subject: [PATCH 07/10] break up test Signed-off-by: Ankita Katiyar --- tests/pipeline/test_pipeline.py | 133 +++++++++++++++++++++++++------- 1 file changed, 106 insertions(+), 27 deletions(-) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 985874cc3b..793ad5b065 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -376,36 +376,87 @@ def test_node_dependencies(self, complex_pipeline): } assert actual == expected - def test_node_grouping_by_namespace(self): - pipeline = modular_pipeline( - [ - node(identity, "A", "B", name="node1", namespace="name_1"), - node(identity, "B", "C", name="node2", namespace="name_1"), - node(identity, "C", "D", name="node3", namespace="name_2"), - node(identity, "D", "E", name="node4", namespace="name_2"), - node(identity, "E", "G", name="node5"), - node(identity, "G", "H", name="node6"), - ] - ) - grouped = pipeline.grouped_nodes_by_namespace - # Validate keys for namespace groups - for key in ["name_1", "name_2"]: - assert key in grouped + @pytest.mark.parametrize( + "pipeline_name, expected", + [ + ("pipeline_with_namespace_simple", ["namespace_1", "namespace_2"]), + ( + "pipeline_with_namespace_partial", + ["namespace_1", "node_3", "namespace_2", "node_6"], + ), + ], + ) + def test_node_grouping_by_namespace_name_type( + self, request, pipeline_name, expected + ): + p = request.getfixturevalue(pipeline_name) + grouped = p.grouped_nodes_by_namespace + assert set(grouped.keys()) == set(expected) + for key in expected: assert grouped[key]["name"] == key - assert grouped[key]["type"] == "namespace" - assert len(grouped[key]["nodes"]) == 2 + assert key.startswith(grouped[key]["type"]) - # Validate dependencies for namespace groups - assert grouped["name_1"]["dependencies"] == set() - assert grouped["name_2"]["dependencies"] == {"name_1"} + @pytest.mark.parametrize( + "pipeline_name, expected", + [ + ( + "pipeline_with_namespace_simple", + { + "namespace_1": [ + "namespace_1.node_1", + "namespace_1.node_2", + "namespace_1.node_3", + ], + "namespace_2": [ + "namespace_2.node_4", + "namespace_2.node_5", + "namespace_2.node_6", + ], + }, + ), + ( + "pipeline_with_namespace_partial", + { + "namespace_1": ["namespace_1.node_1", "namespace_1.node_2"], + "node_3": ["node_3"], + "namespace_2": ["namespace_2.node_4", "namespace_2.node_5"], + "node_6": ["node_6"], + }, + ), + ], + ) + def test_node_grouping_by_namespace_nodes(self, request, pipeline_name, expected): + p = request.getfixturevalue(pipeline_name) + grouped = p.grouped_nodes_by_namespace + for key, value in grouped.items(): + names = [node.name for node in value["nodes"]] + assert set(names) == set(expected[key]) - # Validate nodes for namespace groups - assert grouped["node5"]["type"] == "node" - assert grouped["node5"]["name"] == "node5" - assert len(grouped["node5"]["nodes"]) == 1 - assert grouped["node5"]["dependencies"] == {"name_2"} - # Validate when node depends on node - assert grouped["node6"]["dependencies"] == {"node5"} + @pytest.mark.parametrize( + "pipeline_name, expected", + [ + ( + "pipeline_with_namespace_simple", + {"namespace_1": set(), "namespace_2": {"namespace_1"}}, + ), + ( + "pipeline_with_namespace_partial", + { + "namespace_1": set(), + "node_3": {"namespace_1"}, + "namespace_2": {"node_3"}, + "node_6": {"namespace_2"}, + }, + ), + ], + ) + def test_node_grouping_by_namespace_dependencies( + self, request, pipeline_name, expected + ): + p = request.getfixturevalue(pipeline_name) + grouped = p.grouped_nodes_by_namespace + for key, value in grouped.items(): + assert set(value["dependencies"]) == set(expected[key]) @pytest.fixture @@ -789,6 +840,34 @@ def pipeline_with_namespaces(): ) +@pytest.fixture +def pipeline_with_namespace_simple(): + return modular_pipeline( + [ + node(identity, "A", "B", name="node_1", namespace="namespace_1"), + node(identity, "B", "C", name="node_2", namespace="namespace_1"), + node(identity, "C", "D", name="node_3", namespace="namespace_1"), + node(identity, "D", "E", name="node_4", namespace="namespace_2"), + node(identity, "E", "F", name="node_5", namespace="namespace_2"), + node(identity, "F", "G", name="node_6", namespace="namespace_2"), + ] + ) + + +@pytest.fixture +def pipeline_with_namespace_partial(): + return modular_pipeline( + [ + node(identity, "A", "B", name="node_1", namespace="namespace_1"), + node(identity, "B", "C", name="node_2", namespace="namespace_1"), + node(identity, "C", "D", name="node_3"), + node(identity, "D", "E", name="node_4", namespace="namespace_2"), + node(identity, "E", "F", name="node_5", namespace="namespace_2"), + node(identity, "F", "G", name="node_6"), + ] + ) + + class TestPipelineFilter: def test_no_filters(self, complex_pipeline): filtered_pipeline = complex_pipeline.filter() From 78e54f3f0bb9d0712c036f823f08d357155d0416 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Tue, 11 Feb 2025 11:46:47 +0000 Subject: [PATCH 08/10] Update docstrings + release notes Signed-off-by: Ankita Katiyar --- RELEASE.md | 1 + kedro/pipeline/pipeline.py | 11 ++++++++++- tests/pipeline/test_pipeline.py | 30 ++++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/RELEASE.md b/RELEASE.md index bd95ac471f..e974dd5bcc 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -2,6 +2,7 @@ ## Major features and improvements * Added `KedroDataCatalog.filter()` to filter datasets by name and type. +* Added `Pipeline.grouped_nodes_by_namespace` property which returns a dictionary of nodes grouped by namespace, intended to be used by plugins to facilitate deployment of namespaced nodes together. ## Bug fixes and other changes * Updated `_LazyDataset` representation when printing `KedroDataCatalog`. diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index b3f9f1d347..3a6b544c2e 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -372,7 +372,16 @@ def grouped_nodes(self) -> list[list[Node]]: @property def grouped_nodes_by_namespace(self) -> dict[str, dict[str, Any]]: """Return a dictionary of the pipeline nodes grouped by namespace with - information about the nodes, their type, and dependencies.""" + information about the nodes, their type, and dependencies. The structure of the dictionary is: + { + 'node_name/namespace_name' : { + 'name': 'node_name/namespace_name', + 'type': 'namespace' or 'node', + 'nodes': [list of nodes], + 'dependencies': [list of dependencies]} + } + This property is intended to be used by deployment plugins to group nodes by namespace. + """ grouped_nodes: dict[str, dict[str, Any]] = defaultdict(dict) for node in self.nodes: key = node.namespace or node.name diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 793ad5b065..1315e75938 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -389,6 +389,16 @@ def test_node_dependencies(self, complex_pipeline): def test_node_grouping_by_namespace_name_type( self, request, pipeline_name, expected ): + """Test for pipeline.grouped_nodes_by_namespace which returns a dictionary with the following structure: + { + 'node_name/namespace_name' : { + 'name': 'node_name/namespace_name', + 'type': 'namespace' or 'node', + 'nodes': [list of nodes], + 'dependencies': [list of dependencies]} + } + This test checks for the 'name' and 'type' keys in the dictionary. + """ p = request.getfixturevalue(pipeline_name) grouped = p.grouped_nodes_by_namespace assert set(grouped.keys()) == set(expected) @@ -426,6 +436,16 @@ def test_node_grouping_by_namespace_name_type( ], ) def test_node_grouping_by_namespace_nodes(self, request, pipeline_name, expected): + """Test for pipeline.grouped_nodes_by_namespace which returns a dictionary with the following structure: + { + 'node_name/namespace_name' : { + 'name': 'node_name/namespace_name', + 'type': 'namespace' or 'node', + 'nodes': [list of nodes], + 'dependencies': [list of dependencies]} + } + This test checks for the 'nodes' key in the dictionary which should be a list of nodes. + """ p = request.getfixturevalue(pipeline_name) grouped = p.grouped_nodes_by_namespace for key, value in grouped.items(): @@ -453,6 +473,16 @@ def test_node_grouping_by_namespace_nodes(self, request, pipeline_name, expected def test_node_grouping_by_namespace_dependencies( self, request, pipeline_name, expected ): + """Test for pipeline.grouped_nodes_by_namespace which returns a dictionary with the following structure: + { + 'node_name/namespace_name' : { + 'name': 'node_name/namespace_name', + 'type': 'namespace' or 'node', + 'nodes': [list of nodes], + 'dependencies': [list of dependencies]} + } + This test checks for the 'dependencies' in the dictionary which is a list of nodes/namespaces the group depends on. + """ p = request.getfixturevalue(pipeline_name) grouped = p.grouped_nodes_by_namespace for key, value in grouped.items(): From ae435ca6501652fe2f0518520ebbbba34eee769a Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Tue, 11 Feb 2025 11:58:35 +0000 Subject: [PATCH 09/10] please sphinx Signed-off-by: Ankita Katiyar --- kedro/pipeline/pipeline.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index 3a6b544c2e..b434bae49e 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -373,14 +373,9 @@ def grouped_nodes(self) -> list[list[Node]]: def grouped_nodes_by_namespace(self) -> dict[str, dict[str, Any]]: """Return a dictionary of the pipeline nodes grouped by namespace with information about the nodes, their type, and dependencies. The structure of the dictionary is: - { - 'node_name/namespace_name' : { - 'name': 'node_name/namespace_name', - 'type': 'namespace' or 'node', - 'nodes': [list of nodes], - 'dependencies': [list of dependencies]} - } + {'node_name/namespace_name' : {'name': 'node_name/namespace_name','type': 'namespace' or 'node','nodes': [list of nodes],'dependencies': [list of dependencies]}} This property is intended to be used by deployment plugins to group nodes by namespace. + """ grouped_nodes: dict[str, dict[str, Any]] = defaultdict(dict) for node in self.nodes: From c7278ffe3782406a04b8a66cb3dafb144d782020 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Tue, 11 Feb 2025 12:39:39 +0000 Subject: [PATCH 10/10] linkcheck Signed-off-by: Ankita Katiyar --- .../notebook-example/add_kedro_to_a_notebook.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/notebooks_and_ipython/notebook-example/add_kedro_to_a_notebook.md b/docs/source/notebooks_and_ipython/notebook-example/add_kedro_to_a_notebook.md index 31b5ba92fe..cf32ffd647 100644 --- a/docs/source/notebooks_and_ipython/notebook-example/add_kedro_to_a_notebook.md +++ b/docs/source/notebooks_and_ipython/notebook-example/add_kedro_to_a_notebook.md @@ -152,7 +152,7 @@ When writing exploratory code, it’s tempting to hard code values to save time, X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=3) ``` -[Good software engineering practice](https://towardsdatascience.com/five-software-engineering-principles-for-collaborative-data-science-ab26667a311) suggests that we extract *‘magic numbers’* into named constants. These could be defined at the top of a file or in a utility file, in a format such as yaml. +[Good software engineering practice](https://medium.com/towards-data-science/five-software-engineering-principles-for-collaborative-data-science-ab26667a311) suggests that we extract *‘magic numbers’* into named constants. These could be defined at the top of a file or in a utility file, in a format such as yaml. ```yaml