Tools for tasks embedded in an AWS Step Functions state machine. This is a helper library for py2sfn.
Features:
- Offload state data to DynamoDB/S3 instead of storing data in the very constrained state machine input data object
- Cancel the currently executing workflow
Table of Contents:
py2sfn-task-tools requires Python 3.6 or above. It should be installed in a py2sfn task entry point.
pip install py2sfn-task-tools
Once the py2sfn-task-tools library is installed, a Context
should be created and passed to the tasks. Each py2sfn task will then have a context
object to work with.
If you need to stop/cancel/abort the execution from within a task, you can use the context.stop_execution
method within your task's run
method. A common use case is if you need to check the value of a feature flag at the beginning of the execution and abort if it's false. For example:
if not some_condition:
return await context.stop_execution()
You can provide extra detail by passing error
and cause
keyword arguments to the stop_execution
method. The error
is a short string like a code or enum value whereas cause
is a longer description.
One of the stated Step Functions best practices is to avoid passing large payloads between states; the input data limit is only 32K characters. To get around this, you can choose to store data from your task code in a DynamoDB table. With DynamoDB, we have an item limit of 400KB to work with. When you put items into the table you receive a pointer to the DynamoDB item which you can return from your task so it gets includes in the input data object. From there, since the pointer is in the data
dict, you can reload the stored data in a downstream task. This library's StateDataClient
class provides methods for putting and getting items from this DynamoDB table. It's available in your task's run
method as context.state_data_client
.
The client methods are split between "local" and "global" variants. Local methods operate on items stored within the project whereas global methods can operate on items that were stored from any project. Global methods require a fully-specified partition key (primary key, contains the execution ID) and table name to locate the item whereas local methods only need a simple key because the partition key and table name can be infered from the project automatically. The put_*
methods return a dict with metadata about the location of the item, including the key
, partition_key
, and table_name
. If you return this metadata object from a task, it will get put on the data
object and you can call a get_*
method later in the state machine.
Many methods also accept an optional index
argument. This argument needs to be provided when getting/putting an item that was originally stored as part of a put_items
or put_global_items
call. Providing the index
is usually only done within a map iteration task.
Below are a few of the more common methods:
The put_item
method puts an item in the state store. It takes key
, data
, and index
arguments. For example:
context.state_data_client.put_item("characters", {"name": "jerry"})
context.state_data_client.put_item("characters", {"name": "elaine"}, index=24)
Note that the item at the given array index doesn't actually have to exist in the table before you call put_item
. However, if it doesn't exist then you may have a fan-out logic bug upstream in your state machine.
The put_items
method puts an entire list of items into the state store. Each item will be stored separately under its corresponding array index. For example:
context.state_data_client.put_items("characters", [{"name": "jerry"}, {"name": "elaine"}])
The get_item
method gets the data attribute from an item in the state store. It takes key
and index
arguments. For example:
context.state_data_client.get_item("characters") # -> {"name": "jerry"}
context.state_data_client.get_item("characters", index=24) # -> {"name": "elaine"}
The get_item_for_map_iteration
method gets the data attribute from an item in the state store using the event
object. This method only works when called within a map iterator task. For example, if the put_items
example above was called in a task, and its value was given to a map state to fan out, we can use the get_item_for_map_iteration
method within our iterator task to fetch each item:
# Iteration 0:
context.state_data_client.get_item_for_map_iteration(event) # -> {"name": "jerry"}
# Iteration 1:
context.state_data_client.get_item_for_map_iteration(event) # -> {"name": "elaine"}
This works because the map iterator state machine receives an input data object with the schema:
{
"items_result_table_name": "<DynamoDB table for the project>",
"items_result_partition_key": "<execution ID>:characters",
"items_result_key": "characters",
"context_index": "<array index>",
"context_value.$": "1"
}
The get_item_for_map_iteration
is a helper method that uses that input to locate the right item. The get_global_item_for_map_iteration
method has the same signature. It should be called when you know that the array used to fan out could have come from another project (e.g. the map state is the first state in a state machine triggered by a subscription).
To run functional tests, you need an AWS IAM account with permissions to:
- Create/update/delete a DynamoDB table
- Create/update/delete an S3 bucket
Set the following environment variables:
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
AWS_DEFAULT_REGION
To run tests:
tox