Skip to content

Commit

Permalink
feat: Pipeline.connect() will now raise a PipelineConnectError if…
Browse files Browse the repository at this point in the history
… `sender` and `receiver` are the same Component (#8403)

* Added equality check for sender and receiver in connection function of pipeline

* Update base.py

irrelevant changes reverted

* added release note

* altered a walk with cycle test

* added a test to verify that pipeline raises PipelineConnectError when adding a component to itself

* Update release notes

* Remove self connection feature tests

* Tidy up connect unit test

---------

Co-authored-by: Silvano Cerza <[email protected]>
  • Loading branch information
ajit97singh and silvanocerza authored Sep 30, 2024
1 parent bbfd74f commit 7ba30d5
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 60 deletions.
3 changes: 1 addition & 2 deletions haystack/core/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,7 @@ def connect(self, sender: str, receiver: str) -> "PipelineBase": # noqa: PLR091
receiver_component_name, receiver_socket_name = parse_connect_string(receiver)

if sender_component_name == receiver_component_name:
msg = "Connecting a Component to itself is deprecated and will raise an error from version '2.7.0' onwards."
warnings.warn(msg, DeprecationWarning)
raise PipelineConnectError("Connecting a Component to itself is not supported.")

# Get the nodes data.
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
upgrade:
- |
`Pipeline.connect()` will now raise a `PipelineConnectError` if `sender` and `receiver` are the same Component.
We do not support this use case anymore.
2 changes: 0 additions & 2 deletions test/core/pipeline/features/pipeline_run.feature
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ Feature: Pipeline running
| that has components added in a different order from the order of execution |
| that has a component with only default inputs |
| that has a component with only default inputs as first to run |
| that has only a single component that sends one of its outputs to itself |
| that has a component that sends one of its outputs to itself |
| that has multiple branches that merge into a component with a single variadic input |
| that has multiple branches of different lengths that merge into a component with a single variadic input |
| that is linear and returns intermediate outputs |
Expand Down
52 changes: 0 additions & 52 deletions test/core/pipeline/features/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,58 +911,6 @@ def fake_generator_run(self, generation_kwargs: Optional[Dict[str, Any]] = None,
)


@given(
"a pipeline that has only a single component that sends one of its outputs to itself",
target_fixture="pipeline_data",
)
def pipeline_that_has_a_single_component_that_send_one_of_outputs_to_itself():
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("self_loop", SelfLoop())
pipeline.connect("self_loop.current_value", "self_loop.values")

return (
pipeline,
[
PipelineRunData(
inputs={"self_loop": {"values": 5}},
expected_outputs={"self_loop": {"final_result": 0}},
expected_run_order=["self_loop", "self_loop", "self_loop", "self_loop", "self_loop"],
)
],
)


@given("a pipeline that has a component that sends one of its outputs to itself", target_fixture="pipeline_data")
def pipeline_that_has_a_component_that_sends_one_of_its_outputs_to_itself():
pipeline = Pipeline(max_runs_per_component=10)
pipeline.add_component("add_1", AddFixedValue())
pipeline.add_component("self_loop", SelfLoop())
pipeline.add_component("add_2", AddFixedValue())
pipeline.connect("add_1", "self_loop.values")
pipeline.connect("self_loop.current_value", "self_loop.values")
pipeline.connect("self_loop.final_result", "add_2.value")

return (
pipeline,
[
PipelineRunData(
inputs={"add_1": {"value": 5}},
expected_outputs={"add_2": {"result": 1}},
expected_run_order=[
"add_1",
"self_loop",
"self_loop",
"self_loop",
"self_loop",
"self_loop",
"self_loop",
"add_2",
],
)
],
)


@given(
"a pipeline that has multiple branches that merge into a component with a single variadic input",
target_fixture="pipeline_data",
Expand Down
22 changes: 18 additions & 4 deletions test/core/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,8 +886,8 @@ def run(self, word1: str, word2: str):

def test_walk_pipeline_with_cycles(self):
"""
This pipeline consists of one component, which would run three times in a loop.
pipeline.walk() should return this component exactly once. The order is not guaranteed.
This pipeline consists of two components, which would run three times in a loop.
pipeline.walk() should return these components exactly once. The order is not guaranteed.
"""

@component
Expand All @@ -907,9 +907,12 @@ def run(self, word: str, intermediate: Optional[str] = None):

pipeline = Pipeline()
hello = Hello()
hello_again = Hello()
pipeline.add_component("hello", hello)
pipeline.connect("hello.intermediate", "hello.intermediate")
assert [("hello", hello)] == list(pipeline.walk())
pipeline.add_component("hello_again", hello_again)
pipeline.connect("hello.intermediate", "hello_again.intermediate")
pipeline.connect("hello_again.intermediate", "hello.intermediate")
assert {("hello", hello), ("hello_again", hello_again)} == set(pipeline.walk())

def test__init_graph(self):
pipe = Pipeline()
Expand Down Expand Up @@ -1185,6 +1188,17 @@ def test_connect_multiple_outputs_to_variadic_input(self):
assert comp3.__haystack_input__.value.senders == ["comp1", "comp2"]
assert list(pipe.graph.edges) == [("comp1", "comp3", "value/value"), ("comp2", "comp3", "value/value")]

def test_connect_same_component_as_sender_and_receiver(self):
"""
This pipeline consists of one component, which would be connected to itself.
Connecting a component to itself is raises PipelineConnectError.
"""
pipe = Pipeline()
single_component = FakeComponent()
pipe.add_component("single_component", single_component)
with pytest.raises(PipelineConnectError):
pipe.connect("single_component.out", "single_component.in")

def test__run_component(self, spying_tracer, caplog):
caplog.set_level(logging.INFO)
sentence_builder = component_class(
Expand Down

0 comments on commit 7ba30d5

Please sign in to comment.