Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use flyteremote to access subnodes information of array nodes #3152

Merged
merged 22 commits into from
Feb 24, 2025

Conversation

troychiu
Copy link
Member

@troychiu troychiu commented Feb 24, 2025

Why are the changes needed?

Currently there is no way you can access subnodes' information with FlyteRemote

What changes were proposed in this pull request?

Expose externalResources in taskExecutionMetadata

How was this patch tested?

Added integration tests

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Summary by Bito

This PR enhances FlyteRemote by exposing external resources information through task execution metadata, introducing a new TaskExecutionMetadata class in event.py and integrating it with TaskExecutionClosure. The implementation includes integration tests for verifying external resources information in array nodes and subnodes.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 2

Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
…it into use-flyteremote-to-access-subnodes-of-array-node
Signed-off-by: Troy Chiu <[email protected]>
@flyte-bot
Copy link
Contributor

Code Review Agent Run Status

  • Limitations and other issues: ❌ Failure - The AI Code Review Agent skipped reviewing this change because it is configured to exclude certain pull requests based on the source/target branch or the pull request status. You can change the settings here, or contact the agent instance creator at [email protected].

Signed-off-by: Troy Chiu <[email protected]>
Signed-off-by: Troy Chiu <[email protected]>
@troychiu troychiu marked this pull request as ready for review February 24, 2025 19:58
@troychiu troychiu changed the title Use flyteremote to access subnodes of array node Use flyteremote to access subnodes information of array nodes Feb 24, 2025
Copy link
Collaborator

@eapolinario eapolinario left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you take a look at the integration test failure? The error:

def test_sync_execution_sync_nodes_get_all_executions(register):
        remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
        flyte_launch_plan = remote.fetch_launch_plan(name="basic.deep_child_workflow.parent_wf", version=VERSION)
        execution = remote.execute(
            flyte_launch_plan,
            inputs={"a": 3},
        )
    
        poll_interval = datetime.timedelta(seconds=1)
        time_to_give_up = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=600)
    
        execution = remote.sync_execution(execution, sync_nodes=True)
        while datetime.datetime.now(datetime.timezone.utc) < time_to_give_up:
            if execution.is_done:
                break
    
            with pytest.raises(
                    FlyteAssertion, match="Please wait until the execution has completed before requesting the outputs.",
            ):
                execution.outputs
    
            time.sleep(poll_interval.total_seconds())
>           execution = remote.sync_execution(execution, sync_nodes=True)

tests/flytekit/integration/remote/test_remote.py:272: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
flytekit/remote/remote.py:2[47](https://github.com/flyteorg/flytekit/actions/runs/13506680993/job/37737798227?pr=3152#step:11:48)9: in sync_execution
    node_execs[n.id.node_id] = self.sync_node_execution(n, node_mapping)  # noqa
flytekit/remote/remote.py:26[49](https://github.com/flyteorg/flytekit/actions/runs/13506680993/job/37737798227?pr=3152#step:11:50): in sync_node_execution
    self._assign_inputs_and_outputs(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <flytekit.remote.remote.FlyteRemote object at 0x7f56b5b6bd40>
execution = Flyte Serialized object (FlyteNodeExecution):
  id:
    node_id: n1
    execution_id:
      project: flytesnacks
     ...     nanos: 117081000
    updated_at:
      seconds: 1740427608
      nanos: 1222[51](https://github.com/flyteorg/flytekit/actions/runs/13506680993/job/37737798227?pr=3152#step:11:52)000
  metadata:
    spec_node_id: n1
execution_data = Flyte Serialized object (NodeExecutionGetDataResponse):
  full_inputs:
    literals: {'a': scalar { primitive { integer: 6 } } }
interface = None

    def _assign_inputs_and_outputs(
        self,
        execution: typing.Union[FlyteWorkflowExecution, FlyteNodeExecution, FlyteTaskExecution],
        execution_data,
        interface: TypedInterface,
    ):
        """Helper for assigning synced inputs and outputs to an execution object."""
        input_literal_map = self._get_input_literal_map(execution_data)
>       execution._inputs = LiteralsResolver(input_literal_map.literals, interface.inputs, self.context)
E       AttributeError: 'NoneType' object has no attribute 'inputs'

@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 24, 2025

Code Review Agent Run #8b6e6a

Actionable Suggestions - 4
  • tests/flytekit/integration/remote/test_remote.py - 2
    • Consider using enum for phase check · Line 669-669
    • Consider extracting external resources validation logic · Line 686-688
  • flytekit/models/admin/task_execution.py - 2
Review Details
  • Files reviewed - 3 · Commit Range: 8ba253c..14cb643
    • flytekit/models/admin/task_execution.py
    • flytekit/models/event.py
    • tests/flytekit/integration/remote/test_remote.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Contributor

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
Feature Improvement - Task Execution Metadata Enhancement

task_execution.py - Added metadata support to TaskExecutionClosure class

event.py - Created new TaskExecutionMetadata class for handling external resources

test_remote.py - Added tests to verify external resources in map tasks

@eapolinario eapolinario merged commit 9ddc8d7 into master Feb 24, 2025
113 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants