-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Synchronous and asynchronous compatibility within Prefect #15008
Labels
enhancement
An improvement of an existing feature
Comments
This was referenced Aug 20, 2024
This was referenced Sep 9, 2024
1 task
This was referenced Nov 19, 2024
This was referenced Dec 2, 2024
This was referenced Dec 9, 2024
Closed
for anyone willing to contribute to this effort, here's roughly how you can help removing
|
This was referenced Jan 28, 2025
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Describe the current behavior
Currently, Prefect supports many user interfaces that we attempt to maintain as compatible with both synchronous execution as well as asynchronous execution. A popular example of this is
Block.load
. The way this is achieved is through an internal utility that "magically" - and more critically: quietly - attempts to decide on the user's behalf whether the user will be able to await the coroutine or not.This works well in certain situations such as methods / utilities called within Prefect tasks and flows - this is because we explicitly track whether that task or flow is synchronous or asynchronous.
However, this can turn out very poorly in situations where something about the runtime environment changes between local development and production. A few examples to make the point:
sync_compatible
decorator returning coroutines unexpectedly when running a flow #14625Prefect 3.0 exposes a new keyword argument on these special methods / functions for explicitly setting the behavior a user wants: to continue with the block loading example, users can now specify
Block.load(**kwargs, _sync=True)
for enforcing synchronous execution andawait Block.load(**kwargs, _sync=False)
for enforcing asynchronous execution. This is useful as an escape hatch, but under the hood it still engages with complex event loop and threading logic that risks performance degradation and more difficult to inspect failure modes.Clearly there is room for improvement here along a few dimensions:
_sync=True/False
behavior in a more first class way for discoverabilityDescribe the proposed behavior
To achieve the goals outlined above, I propose first expanding the
sync_compatible
interface in two ways:sync_compatible(sync_version=sync_method)
that explicitly provides an alternative synchronous implementation to dispatch between; note this does mean expanding the codebase into both synchronous and asynchronous implementations. The decorator and our current "magic" will allow us to take a strangler fig approach and incrementally add these duplicate implementations as we developasync_compatible(async_version=async_method)
(the need for this will become clear below when I discuss naming conventions)For any decorated function / method that has a dual implementation, Prefect can begin issuing a warning whenever the user relies on behavior for which that function / method is dispatching to another implementation. For example:
To make sure this part is not glossed over: this will ultimately result in Prefect maintaining two implementations for a large class of user interfaces (primarily those that interact with the Prefect client / API).
Naming Convention
To achieve this, we will rely on the following naming conventions:
Async
; e.g.,PrefectClient
vs.AsyncPrefectClient
a
prefix: e.g.,Block.load
vs.Block.aload
a
prefix: e.g.,run_deployment
vsarun_deployment
There is one edge case with this, which is
.wait
methods on Prefect futures; for this we will make the corresponding class awaitable for async waits.Remaining uses of
@sync_compatible
prefect
prefect.artifacts
Artifact.create
Artifact.get
Artifact.get_or_create
create_link_artifact
create_markdown_artifact
create_table_artifact
update_progress_artifact
create_image_artifact
prefect.filesystems
LocalFileSystem.get_directory
LocalFileSystem.put_directory
LocalFileSystem.read_path
LocalFileSystem.write_path
RemoteFileSystem.get_directory
RemoteFileSystem.put_directory
RemoteFileSystem.read_path
RemoteFileSystem.write_path
SMB.get_directory
SMB.put_directory
SMB.read_path
SMB.write_path
prefect.flow_runs
pause_flow_run
suspend_flow_run
resume_flow_run
prefect.flows
Flow.to_deployment
Flow.from_source
Flow.deploy
Flow.visualize
prefect.results
ResultStore.update_for_flow
ResultStore.update_for_task
ResultStore._exists
ResultStore._read
ResultStore._persist_result_record
ResultStore.store_parameters
ResultStore.read_parameters
prefect.states
_get_state_result
get_state_exception
raise_state_exception
prefect.task_worker
TaskWorker.start
TaskWorker.stop
serve
prefect.tasks
Task.serve
prefect.blocks.core
Block.load_from_ref
Block.register_type_and_schema
Block.save
Block.delete
prefect.blocks.notifications
AbstractAppriseNotificationBlock.notify
AppriseNotifictionBlock.notify
PagerDutyWebHook.notify
CustomWebhookNotificationBlock.notify
prefect.blocks.redis
RedisStorageContainer.read_path
RedisStorageContainer.write_path
`prefect.concurrency.v1._asyncio
acquire_concurrency_slots
release_concurrency_slots
prefect.deployments.flow_runs
run_deployment
prefect.deployments.runner
RunnerDeployment.apply
RunnerDeployment.from_storage
deploy
prefect.input.actions
create_flow_run_input_from_model
create_flow_run_input
filter_flow_run_input
read_flow_run_input
delete_flow_run_input
prefect.input.run_input
BaseRunInput.save
BaseRunInput.load
BaseRunInput.respond
BaseRunInput.send_to
AutomaticRunInput.load
AutomaticRunInput.next
GetAutomaticInputHandler.next
send_input
prefect.runner.runner
Runner.add_deployment
Runner.add_flow
Runner._add_storage
Runner.stop
prefect.runner.submit
submit_to_runner
wait_for_submitted_runs
prefect.testing.fixtures
process_events
x2Additional context
Feel free to comment and discuss.
The text was updated successfully, but these errors were encountered: