Skip to content

Commit

Permalink
Adds support for is_schedule_active to flow.deploy and `flow.serv…
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle authored Dec 13, 2023
1 parent 108f763 commit 8dcd43a
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 0 deletions.
15 changes: 15 additions & 0 deletions src/prefect/deployments/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ def from_flow(
cron: Optional[str] = None,
rrule: Optional[str] = None,
schedule: Optional[SCHEDULE_TYPES] = None,
is_schedule_active: Optional[bool] = None,
parameters: Optional[dict] = None,
triggers: Optional[List[DeploymentTrigger]] = None,
description: Optional[str] = None,
Expand All @@ -383,6 +384,9 @@ def from_flow(
rrule: An rrule schedule of when to execute runs of this flow.
schedule: A schedule object of when to execute runs of this flow. Used for
advanced scheduling options like timezone.
is_schedule_active: Whether or not to set the schedule for this deployment as active. If
not provided when creating a deployment, the schedule will be set as active. If not
provided when updating a deployment, the schedule's activation will not be changed.
triggers: A list of triggers that should kick of a run of this flow.
parameters: A dictionary of default parameter values to pass to runs of this flow.
description: A description for the created deployment. Defaults to the flow's
Expand All @@ -409,6 +413,7 @@ def from_flow(
name=Path(name).stem,
flow_name=flow.name,
schedule=schedule,
is_schedule_active=is_schedule_active,
tags=tags or [],
triggers=triggers or [],
parameters=parameters or {},
Expand Down Expand Up @@ -466,6 +471,7 @@ def from_entrypoint(
cron: Optional[str] = None,
rrule: Optional[str] = None,
schedule: Optional[SCHEDULE_TYPES] = None,
is_schedule_active: Optional[bool] = None,
parameters: Optional[dict] = None,
triggers: Optional[List[DeploymentTrigger]] = None,
description: Optional[str] = None,
Expand All @@ -489,6 +495,9 @@ def from_entrypoint(
rrule: An rrule schedule of when to execute runs of this flow.
schedule: A schedule object of when to execute runs of this flow. Used for
advanced scheduling options like timezone.
is_schedule_active: Whether or not to set the schedule for this deployment as active. If
not provided when creating a deployment, the schedule will be set as active. If not
provided when updating a deployment, the schedule's activation will not be changed.
triggers: A list of triggers that should kick of a run of this flow.
parameters: A dictionary of default parameter values to pass to runs of this flow.
description: A description for the created deployment. Defaults to the flow's
Expand Down Expand Up @@ -521,6 +530,7 @@ def from_entrypoint(
name=Path(name).stem,
flow_name=flow.name,
schedule=schedule,
is_schedule_active=is_schedule_active,
tags=tags or [],
triggers=triggers or [],
parameters=parameters or {},
Expand Down Expand Up @@ -549,6 +559,7 @@ async def from_storage(
cron: Optional[str] = None,
rrule: Optional[str] = None,
schedule: Optional[SCHEDULE_TYPES] = None,
is_schedule_active: Optional[bool] = None,
parameters: Optional[dict] = None,
triggers: Optional[List[DeploymentTrigger]] = None,
description: Optional[str] = None,
Expand All @@ -575,6 +586,9 @@ async def from_storage(
rrule: An rrule schedule of when to execute runs of this flow.
schedule: A schedule object of when to execute runs of this flow. Used for
advanced scheduling options like timezone.
is_schedule_active: Whether or not to set the schedule for this deployment as active. If
not provided when creating a deployment, the schedule will be set as active. If not
provided when updating a deployment, the schedule's activation will not be changed.
triggers: A list of triggers that should kick of a run of this flow.
parameters: A dictionary of default parameter values to pass to runs of this flow.
description: A description for the created deployment. Defaults to the flow's
Expand Down Expand Up @@ -611,6 +625,7 @@ async def from_storage(
name=Path(name).stem,
flow_name=flow.name,
schedule=schedule,
is_schedule_active=is_schedule_active,
tags=tags or [],
triggers=triggers or [],
parameters=parameters or {},
Expand Down
16 changes: 16 additions & 0 deletions src/prefect/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ async def to_deployment(
cron: Optional[str] = None,
rrule: Optional[str] = None,
schedule: Optional[SCHEDULE_TYPES] = None,
is_schedule_active: Optional[bool] = None,
parameters: Optional[dict] = None,
triggers: Optional[List[DeploymentTrigger]] = None,
description: Optional[str] = None,
Expand All @@ -578,6 +579,9 @@ async def to_deployment(
timezone: A timezone to use for the schedule. Defaults to UTC.
triggers: A list of triggers that will kick off runs of this deployment.
schedule: A schedule object defining when to execute runs of this deployment.
is_schedule_active: Whether or not to set the schedule for this deployment as active. If
not provided when creating a deployment, the schedule will be set as active. If not
provided when updating a deployment, the schedule's activation will not be changed.
parameters: A dictionary of default parameter values to pass to runs of this deployment.
description: A description for the created deployment. Defaults to the flow's
description if not provided.
Expand Down Expand Up @@ -623,6 +627,7 @@ def my_other_flow(name):
cron=cron,
rrule=rrule,
schedule=schedule,
is_schedule_active=is_schedule_active,
tags=tags,
triggers=triggers,
parameters=parameters or {},
Expand All @@ -641,6 +646,7 @@ def my_other_flow(name):
cron=cron,
rrule=rrule,
schedule=schedule,
is_schedule_active=is_schedule_active,
tags=tags,
triggers=triggers,
parameters=parameters or {},
Expand All @@ -660,6 +666,7 @@ async def serve(
cron: Optional[str] = None,
rrule: Optional[str] = None,
schedule: Optional[SCHEDULE_TYPES] = None,
is_schedule_active: Optional[bool] = None,
triggers: Optional[List[DeploymentTrigger]] = None,
parameters: Optional[dict] = None,
description: Optional[str] = None,
Expand All @@ -682,6 +689,9 @@ async def serve(
triggers: A list of triggers that will kick off runs of this deployment.
schedule: A schedule object defining when to execute runs of this deployment. Used to
define additional scheduling options like `timezone`.
is_schedule_active: Whether or not to set the schedule for this deployment as active. If
not provided when creating a deployment, the schedule will be set as active. If not
provided when updating a deployment, the schedule's activation will not be changed.
parameters: A dictionary of default parameter values to pass to runs of this deployment.
description: A description for the created deployment. Defaults to the flow's
description if not provided.
Expand Down Expand Up @@ -738,6 +748,7 @@ def my_flow(name):
cron=cron,
rrule=rrule,
schedule=schedule,
is_schedule_active=is_schedule_active,
parameters=parameters,
description=description,
tags=tags,
Expand Down Expand Up @@ -852,6 +863,7 @@ async def deploy(
cron: Optional[str] = None,
rrule: Optional[str] = None,
schedule: Optional[SCHEDULE_TYPES] = None,
is_schedule_active: Optional[bool] = None,
triggers: Optional[List[DeploymentTrigger]] = None,
parameters: Optional[dict] = None,
description: Optional[str] = None,
Expand Down Expand Up @@ -891,6 +903,9 @@ async def deploy(
triggers: A list of triggers that will kick off runs of this deployment.
schedule: A schedule object defining when to execute runs of this deployment. Used to
define additional scheduling options like `timezone`.
is_schedule_active: Whether or not to set the schedule for this deployment as active. If
not provided when creating a deployment, the schedule will be set as active. If not
provided when updating a deployment, the schedule's activation will not be changed.
parameters: A dictionary of default parameter values to pass to runs of this deployment.
description: A description for the created deployment. Defaults to the flow's
description if not provided.
Expand Down Expand Up @@ -956,6 +971,7 @@ def my_flow(name):
cron=cron,
rrule=rrule,
schedule=schedule,
is_schedule_active=is_schedule_active,
triggers=triggers,
parameters=parameters,
description=description,
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ async def add_flow(
cron: Optional[str] = None,
rrule: Optional[str] = None,
schedule: Optional[SCHEDULE_TYPES] = None,
is_schedule_active: Optional[bool] = None,
parameters: Optional[dict] = None,
triggers: Optional[List[DeploymentTrigger]] = None,
description: Optional[str] = None,
Expand All @@ -227,6 +228,9 @@ async def add_flow(
rrule: An rrule schedule of when to execute runs of this flow.
schedule: A schedule object of when to execute runs of this flow. Used for
advanced scheduling options like timezone.
is_schedule_active: Whether or not to set the schedule for this deployment as active. If
not provided when creating a deployment, the schedule will be set as active. If not
provided when updating a deployment, the schedule's activation will not be changed.
triggers: A list of triggers that should kick of a run of this flow.
parameters: A dictionary of default parameter values to pass to runs of this flow.
description: A description for the created deployment. Defaults to the flow's
Expand All @@ -249,6 +253,7 @@ async def add_flow(
cron=cron,
rrule=rrule,
schedule=schedule,
is_schedule_active=is_schedule_active,
triggers=triggers,
parameters=parameters,
description=description,
Expand Down
37 changes: 37 additions & 0 deletions tests/runner/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,7 @@ def test_from_flow(self, relative_file_path):
assert deployment.description == "Deployment descriptions"
assert deployment.version == "alpha"
assert deployment.tags == ["test"]
assert deployment.is_schedule_active is None
assert deployment.enforce_parameter_schema

def test_from_flow_accepts_interval(self):
Expand All @@ -769,6 +770,17 @@ def test_from_flow_accepts_rrule(self):

assert deployment.schedule.rrule == "FREQ=MINUTELY"

@pytest.mark.parametrize(
"value,expected",
[(True, True), (False, False), (None, None)],
)
def test_from_flow_accepts_is_schedule_active(self, value, expected):
deployment = RunnerDeployment.from_flow(
dummy_flow_1, __file__, is_schedule_active=value
)

assert deployment.is_schedule_active is expected

@pytest.mark.parametrize(
"kwargs",
[
Expand Down Expand Up @@ -871,6 +883,19 @@ def test_from_entrypoint_accepts_rrule(self, dummy_flow_1_entrypoint):

assert deployment.schedule.rrule == "FREQ=MINUTELY"

@pytest.mark.parametrize(
"value,expected",
[(True, True), (False, False), (None, None)],
)
def test_from_entrypoint_accepts_is_schedule_active(
self, dummy_flow_1_entrypoint, value, expected
):
deployment = RunnerDeployment.from_entrypoint(
dummy_flow_1_entrypoint, __file__, is_schedule_active=value
)

assert deployment.is_schedule_active is expected

@pytest.mark.parametrize(
"kwargs",
[
Expand Down Expand Up @@ -922,6 +947,7 @@ async def test_apply(self, prefect_client: PrefectClient):
assert deployment.path == "."
assert deployment.enforce_parameter_schema is False
assert deployment.infra_overrides == {}
assert deployment.is_schedule_active is True

async def test_apply_with_work_pool(self, prefect_client: PrefectClient, work_pool):
deployment = RunnerDeployment.from_flow(
Expand All @@ -942,6 +968,17 @@ async def test_apply_with_work_pool(self, prefect_client: PrefectClient, work_po
}
assert deployment.work_queue_name == "default"

async def test_apply_inactive_schedule(self, prefect_client: PrefectClient):
deployment = RunnerDeployment.from_flow(
dummy_flow_1, __file__, interval=3600, is_schedule_active=False
)

deployment_id = await deployment.apply()

deployment = await prefect_client.read_deployment(deployment_id)

assert deployment.is_schedule_active is False

@pytest.mark.parametrize(
"from_flow_kwargs, apply_kwargs, expected_message",
[
Expand Down
6 changes: 6 additions & 0 deletions tests/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3304,6 +3304,7 @@ async def test_serve_creates_deployment(self, prefect_client: PrefectClient):
description="This is a test",
version="alpha",
enforce_parameter_schema=True,
is_schedule_active=False,
)

deployment = await prefect_client.read_deployment_by_name(name="test-flow/test")
Expand All @@ -3318,6 +3319,7 @@ async def test_serve_creates_deployment(self, prefect_client: PrefectClient):
assert deployment.description == "This is a test"
assert deployment.version == "alpha"
assert deployment.enforce_parameter_schema
assert not deployment.is_schedule_active

async def test_serve_handles__file__(self, prefect_client: PrefectClient):
await test_flow.serve(__file__)
Expand Down Expand Up @@ -3547,6 +3549,7 @@ async def test_calls_deploy_with_expected_args(
build=False,
push=False,
enforce_parameter_schema=True,
is_schedule_active=False,
)

mock_deploy.assert_called_once_with(
Expand All @@ -3559,6 +3562,7 @@ async def test_calls_deploy_with_expected_args(
work_queue_name="line",
job_variables={"foo": "bar"},
enforce_parameter_schema=True,
is_schedule_active=False,
),
work_pool_name=work_pool.name,
image=image,
Expand Down Expand Up @@ -3592,6 +3596,7 @@ async def test_calls_deploy_with_expected_args_remote_flow(
image=image,
push=False,
enforce_parameter_schema=True,
is_schedule_active=False,
)

mock_deploy.assert_called_once_with(
Expand All @@ -3604,6 +3609,7 @@ async def test_calls_deploy_with_expected_args_remote_flow(
work_queue_name="line",
job_variables={"foo": "bar"},
enforce_parameter_schema=True,
is_schedule_active=False,
),
work_pool_name=work_pool.name,
image=image,
Expand Down

0 comments on commit 8dcd43a

Please sign in to comment.