Skip to content

Commit

Permalink
Automatically convert non-flows into flows (#17024)
Browse files Browse the repository at this point in the history
  • Loading branch information
jakekaplan authored Feb 10, 2025
1 parent 332413a commit 6d04927
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 4 deletions.
13 changes: 11 additions & 2 deletions src/prefect/flow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,18 @@
)
from prefect.exceptions import (
Abort,
MissingFlowError,
Pause,
PrefectException,
TerminationSignal,
UpstreamTaskError,
)
from prefect.flows import Flow, load_flow_from_entrypoint, load_flow_from_flow_run
from prefect.flows import (
Flow,
load_flow_from_entrypoint,
load_flow_from_flow_run,
load_function_and_convert_to_flow,
)
from prefect.futures import PrefectFuture, resolve_futures_to_states
from prefect.logging.loggers import (
flow_run_logger,
Expand Down Expand Up @@ -125,7 +131,10 @@ def load_flow(flow_run: FlowRun) -> Flow[..., Any]:

if entrypoint:
# we should not accept a placeholder flow at runtime
flow = load_flow_from_entrypoint(entrypoint, use_placeholder_flow=False)
try:
flow = load_flow_from_entrypoint(entrypoint, use_placeholder_flow=False)
except MissingFlowError:
flow = load_function_and_convert_to_flow(entrypoint)
else:
flow = run_coro_as_sync(
load_flow_from_flow_run(flow_run, use_placeholder_flow=False)
Expand Down
23 changes: 23 additions & 0 deletions src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2080,6 +2080,29 @@ def load_flow_from_entrypoint(
return flow


def load_function_and_convert_to_flow(entrypoint: str) -> Flow[P, Any]:
"""
Loads a function from an entrypoint and converts it to a flow if it is not already a flow.
"""

if ":" in entrypoint:
# split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff
path, func_name = entrypoint.rsplit(":", maxsplit=1)
else:
path, func_name = entrypoint.rsplit(".", maxsplit=1)
try:
func = import_object(entrypoint) # pyright: ignore[reportRedeclaration]
except AttributeError as exc:
raise RuntimeError(
f"Function with name {func_name!r} not found in {path!r}."
) from exc

if isinstance(func, Flow):
return func
else:
return Flow(func, log_prints=True)


def serve(
*args: "RunnerDeployment",
pause_on_shutdown: bool = True,
Expand Down
4 changes: 2 additions & 2 deletions tests/runner/test_webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ async def test_flow_router_runs_managed_flow(self, runner: Runner):
assert isinstance(FlowRun.model_validate(response.json()), FlowRun)
mock_run.assert_called()

@pytest.mark.parametrize("flow_name", ["a_missing_flow", "a_non_flow_function"])
@pytest.mark.parametrize("flow_name", ["a_missing_flow"])
@pytest.mark.parametrize(
"flow_file", [__file__, "/not/a/path.py", "not/a/python/file.txt"]
)
async def test_non_flow_raises_a_404(
async def test_missing_flow_raises_a_404(
self,
runner: Runner,
flow_file: str,
Expand Down
43 changes: 43 additions & 0 deletions tests/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
load_flow_arguments_from_entrypoint,
load_flow_from_entrypoint,
load_flow_from_flow_run,
load_function_and_convert_to_flow,
safe_load_flow_from_entrypoint,
)
from prefect.logging import get_run_logger
Expand Down Expand Up @@ -2695,6 +2696,48 @@ def dog():
load_flow_from_entrypoint(f"{fpath}:dog", use_placeholder_flow=False)


class TestLoadFunctionAndConvertToFlow:
def test_func_is_a_flow(self, tmp_path):
flow_code = """
from prefect import flow
@flow
def dog():
return "woof!"
"""
fpath = tmp_path / "f.py"
fpath.write_text(dedent(flow_code))

flow = load_function_and_convert_to_flow(f"{fpath}:dog")
assert flow.fn() == "woof!"
assert isinstance(flow, Flow)
assert flow.name == "dog"

def test_func_is_not_a_flow(self, tmp_path):
flow_code = """
def dog():
return "woof!"
"""
fpath = tmp_path / "f.py"
fpath.write_text(dedent(flow_code))

flow = load_function_and_convert_to_flow(f"{fpath}:dog")
assert isinstance(flow, Flow)
assert flow.name == "dog"
assert flow.log_prints is True
assert flow.fn() == "woof!"

def test_func_not_found(self, tmp_path):
flow_code = ""
fpath = tmp_path / "f.py"
fpath.write_text(dedent(flow_code))

with pytest.raises(
RuntimeError, match=f"Function with name 'dog' not found in '{fpath}'."
):
load_function_and_convert_to_flow(f"{fpath}:dog")


class TestFlowRunName:
async def test_invalid_runtime_run_name(self):
class InvalidFlowRunNameArg:
Expand Down

0 comments on commit 6d04927

Please sign in to comment.