-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use flyteremote to access subnodes information of array nodes #3152
Use flyteremote to access subnodes information of array nodes #3152
Conversation
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
…it into use-flyteremote-to-access-subnodes-of-array-node
Signed-off-by: Troy Chiu <[email protected]>
Code Review Agent Run Status
|
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you take a look at the integration test failure? The error:
def test_sync_execution_sync_nodes_get_all_executions(register):
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
flyte_launch_plan = remote.fetch_launch_plan(name="basic.deep_child_workflow.parent_wf", version=VERSION)
execution = remote.execute(
flyte_launch_plan,
inputs={"a": 3},
)
poll_interval = datetime.timedelta(seconds=1)
time_to_give_up = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=600)
execution = remote.sync_execution(execution, sync_nodes=True)
while datetime.datetime.now(datetime.timezone.utc) < time_to_give_up:
if execution.is_done:
break
with pytest.raises(
FlyteAssertion, match="Please wait until the execution has completed before requesting the outputs.",
):
execution.outputs
time.sleep(poll_interval.total_seconds())
> execution = remote.sync_execution(execution, sync_nodes=True)
tests/flytekit/integration/remote/test_remote.py:272:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
flytekit/remote/remote.py:2[47](https://github.com/flyteorg/flytekit/actions/runs/13506680993/job/37737798227?pr=3152#step:11:48)9: in sync_execution
node_execs[n.id.node_id] = self.sync_node_execution(n, node_mapping) # noqa
flytekit/remote/remote.py:26[49](https://github.com/flyteorg/flytekit/actions/runs/13506680993/job/37737798227?pr=3152#step:11:50): in sync_node_execution
self._assign_inputs_and_outputs(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <flytekit.remote.remote.FlyteRemote object at 0x7f56b5b6bd40>
execution = Flyte Serialized object (FlyteNodeExecution):
id:
node_id: n1
execution_id:
project: flytesnacks
... nanos: 117081000
updated_at:
seconds: 1740427608
nanos: 1222[51](https://github.com/flyteorg/flytekit/actions/runs/13506680993/job/37737798227?pr=3152#step:11:52)000
metadata:
spec_node_id: n1
execution_data = Flyte Serialized object (NodeExecutionGetDataResponse):
full_inputs:
literals: {'a': scalar { primitive { integer: 6 } } }
interface = None
def _assign_inputs_and_outputs(
self,
execution: typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution],
execution_data,
interface: TypedInterface,
):
"""Helper for assigning synced inputs and outputs to an execution object."""
input_literal_map = self._get_input_literal_map(execution_data)
> execution._inputs = LiteralsResolver(input_literal_map.literals, interface.inputs, self.context)
E AttributeError: 'NoneType' object has no attribute 'inputs'
Code Review Agent Run #8b6e6aActionable Suggestions - 4
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
Why are the changes needed?
Currently there is no way you can access subnodes' information with FlyteRemote
What changes were proposed in this pull request?
Expose
externalResources
intaskExecutionMetadata
How was this patch tested?
Added integration tests
Check all the applicable boxes
Summary by Bito
This PR enhances FlyteRemote by exposing external resources information through task execution metadata, introducing a new TaskExecutionMetadata class in event.py and integrating it with TaskExecutionClosure. The implementation includes integration tests for verifying external resources information in array nodes and subnodes.Unit tests added: True
Estimated effort to review (1-5, lower is better): 2