Skip to content

Commit

Permalink
feat: ensure building order (#740)
Browse files Browse the repository at this point in the history
* feat: decouple image builder from worker

Signed-off-by: SdgJlbl <[email protected]>

* fix: update skaffold config

Signed-off-by: Guilhem Barthes <[email protected]>

* feat: add `ServiceAccount` and modify role

Signed-off-by: Guilhem Barthes <[email protected]>

* feat: build image in new pod

Signed-off-by: Guilhem Barthes <[email protected]>

* chore: rename `deployment-builder.yaml` to `stateful-builder.yaml`

Signed-off-by: Guilhem Barthes <[email protected]>

* chore: rename `stateful-builder.yaml` to `statefulset-builder.yaml`

Signed-off-by: Guilhem Barthes <[email protected]>

* chore: centralize params

Signed-off-by: Guilhem Barthes <[email protected]>

* feat: create `BuildTask`

Signed-off-by: Guilhem Barthes <[email protected]>

* feat: move more code to `builder`

Signed-off-by: Guilhem Barthes <[email protected]>

* fix: remove TaskProfiling as Celery task + save Entrypoint in DB

Signed-off-by: SdgJlbl <[email protected]>

* feat: build function at registration (#707)

<!-- Please reference issue if any. -->

<!-- Please include a summary of your changes. -->

<!-- Please describe the tests that you ran to verify your changes.  -->

- [ ] [changelog](../CHANGELOG.md) was updated with notable changes
- [ ] documentation was updated

---------

Signed-off-by: SdgJlbl <[email protected]>
Signed-off-by: Guilhem Barthes <[email protected]>
Co-authored-by: SdgJlbl <[email protected]>

* feat: share images between backends (#708)



Signed-off-by: SdgJlbl <[email protected]>

* chore: update helm worklfow

Signed-off-by: ThibaultFy <[email protected]>

* [sub]fix: add missing migration poc (#728)

## Description

Add a migration missing in the poc. 
This migration alters two things:

-  modify `ComputeTaskFailureReport.logs` 
-  modify `FunctionImage.file`

This migration has been generated automatically with `make migrations`

## How has this been tested?

<!-- Please describe the tests that you ran to verify your changes.  -->

## Checklist

- [ ] [changelog](../CHANGELOG.md) was updated with notable changes
- [ ] documentation was updated

Signed-off-by: Guilhem Barthes <[email protected]>

* [sub]feat: add function events (#714)

- Substra/orchestrator#263

Add function events, used now we decoupled the building of the function
with the execution of the compute task. For that it add a status field
on the Function. It also includes another PR (merged here), to have
functions build logs working again.

In a future PR, we will change the compute task execution to avoid
having to wait_for_function_built in compute_task()

Fixes FL-1160

As this is going to be merged on a branch that is going to be merged to
a POC branch, we use MNIST as a baseline of a working model. We will
deal with failing tests on the POC before merging on main.

- [x] [changelog](../CHANGELOG.md) was updated with notable changes
- [ ] documentation was updated

---------

Signed-off-by: SdgJlbl <[email protected]>
Signed-off-by: Guilhem Barthes <[email protected]>
Signed-off-by: Guilhem Barthés <[email protected]>
Co-authored-by: SdgJlbl <[email protected]>

* [sub]fix(app/orchestrator/resources): FunctionStatus.FUNCTION_STATUS_CREATED -> FunctionStatus.FUNCTION_STATUS_WAITING (#742)

# Issue

Backend FunctionStatus are not aligned with [orchestrator
definitions](https://github.com/Substra/orchestrator/blob/poc-decoupled-builder/lib/asset/function.proto#L29-L36).
In particular, `FunctionStatus.FUNCTION_STATUS_CREATED` leading to the
following error:

```txt
ValueError: 'FUNCTION_STATUS_WAITING' is not a valid FunctionStatus
```

## Description

FunctionStatus.FUNCTION_STATUS_CREATED ->
FunctionStatus.FUNCTION_STATUS_WAITING

## How has this been tested?

Running Camelyon benchmark on
[poc-builder-flpc](https://substra.org-1.poc-builder-flpc.cg.owkin.tech/compute_plans/a420306f-5719-412b-ab9c-688b7bed9c70/tasks?page=1&ordering=-rank)
environment.

## Checklist

- [ ] [changelog](../CHANGELOG.md) was updated with notable changes
- [ ] documentation was updated

---------

Signed-off-by: Thibault Camalon <[email protected]>

* fix: rebase changelog

Signed-off-by: Guilhem Barthés <[email protected]>

* feat: decouple image builder from worker

Signed-off-by: SdgJlbl <[email protected]>

* feat: add `ServiceAccount` and modify role

Signed-off-by: Guilhem Barthes <[email protected]>

* feat: build function at registration (#707)

<!-- Please reference issue if any. -->

<!-- Please include a summary of your changes. -->

<!-- Please describe the tests that you ran to verify your changes.  -->

- [ ] [changelog](../CHANGELOG.md) was updated with notable changes
- [ ] documentation was updated

---------

Signed-off-by: SdgJlbl <[email protected]>
Signed-off-by: Guilhem Barthes <[email protected]>
Co-authored-by: SdgJlbl <[email protected]>

* feat: save status update in orc

Signed-off-by: Guilhem Barthes <[email protected]>

* feat: use status for build waiting

Signed-off-by: Guilhem Barthes <[email protected]>

* fix: re-add `container_image_exists`

Signed-off-by: Guilhem Barthes <[email protected]>

* fix: rebase errors

Signed-off-by: Guilhem Barthes <[email protected]>

* fix: format

Signed-off-by: Guilhem Barthes <[email protected]>

* fix: tests

Signed-off-by: Guilhem Barthes <[email protected]>

* fix: add `si` to building invokations

Signed-off-by: Guilhem Barthes <[email protected]>

* fix: tests

Signed-off-by: Guilhem Barthes <[email protected]>

* fix: apply feedback

Signed-off-by: Guilhem Barthes <[email protected]>

* fix: only import during typing

Signed-off-by: Guilhem Barthes <[email protected]>

* [sub]feat: modify computetask failure report (#727)

## Companion PR

- Substra/orchestrator#277
- Substra/substra-frontend#240

## Description

The aim is to allow registering failure reports not only for compute
task but for other kind of assets (for now, functions which are not
building as part of the execution of a compute task)

- Modifies `ComputeTaskFailureReport`:
    - renamed the model to `AssetFailureReport`
- renamed field `compute_task_key` to `asset_key` (as we can now have a
function key)
    - added field `asset_type` to provide 
- Updates protobuf reflecting the previous changes
- refactor `download_file` in `PermissionMixin` to provide mroe
flexibility (and decouple from DRF)
- create new `FailableTask` (Celery task):
  - centralize the logic to submit asset failure

## How has this been tested?

As this is going to be merged on a branch that is going to be merged to
a POC branch, we use MNIST as a baseline of a working model. We will
deal with failing tests on the POC before merging on main.

## Checklist

- [x] [changelog](../CHANGELOG.md) was updated with notable changes
- [ ] documentation was updated

---------

Signed-off-by: Guilhem Barthes <[email protected]>

* feat: add config to run celery in tests

Signed-off-by: Guilhem Barthés <[email protected]>

* feat: add tests

Signed-off-by: Guilhem Barthés <[email protected]>

* fix: remove rebqse duplicate

Signed-off-by: Guilhem Barthés <[email protected]>

* docs: changelog

Signed-off-by: Guilhem Barthés <[email protected]>

* fix: adapt to pydantic 2.x.x

Signed-off-by: Guilhem Barthés <[email protected]>

* fix: remove rebase artifacts

Signed-off-by: Guilhem Barthés <[email protected]>

* fix: update to pydantic 2.x.x

Signed-off-by: Guilhem Barthés <[email protected]>

---------

Signed-off-by: SdgJlbl <[email protected]>
Signed-off-by: Guilhem Barthes <[email protected]>
Signed-off-by: ThibaultFy <[email protected]>
Signed-off-by: Guilhem Barthés <[email protected]>
Signed-off-by: Thibault Camalon <[email protected]>
Co-authored-by: SdgJlbl <[email protected]>
Co-authored-by: ThibaultFy <[email protected]>
Co-authored-by: Thibault Camalon <[email protected]>
  • Loading branch information
4 people authored Feb 12, 2024
1 parent a85d5b3 commit e7aa3e7
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Celery task `FailableTask` that contains the logic to store the failure report, that can be re-used in different assets. ([#727](https://github.com/Substra/substra-backend/pull/727))
- Add `FunctionStatus` enum ([#714](https://github.com/Substra/orchestrator/pull/714))
- BREAKING: Add `status` on `api.Function` (type `FunctionStatus`) ([#714](https://github.com/Substra/substra-backend/pull/714))
- Tests to ensure build order is made in order of submission (including retries) ([#740](https://github.com/Substra/substra-backend/pull/740))

### Changed

Expand Down
4 changes: 4 additions & 0 deletions backend/backend/settings/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,7 @@

MSP_ID = "testOrgMSP"
CHANNELS = {"mychannel": {"model_export_enabled": False}}

CELERY_BROKER_URL = "memory://"
CELERY_RESULT_BACKEND = "cache+memory://"
CELERY_TASK_ALWAYS_EAGER = False
2 changes: 1 addition & 1 deletion backend/builder/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ def before_start(self, task_id: str, args: tuple, kwargs: dict) -> None:
)

def get_task_info(self, args: tuple, kwargs: dict) -> tuple[str, str]:
function = orchestrator.Function.parse_raw(kwargs["function_serialized"])
function = orchestrator.Function.model_validate_json(kwargs["function_serialized"])
channel_name = kwargs["channel_name"]
return function.key, channel_name
2 changes: 1 addition & 1 deletion backend/builder/tasks/tasks_build_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# see http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-reject-on-worker-lost
# and https://github.com/celery/celery/issues/5106
def build_image(task: BuildTask, function_serialized: str, channel_name: str) -> None:
function = orchestrator.Function.parse_raw(function_serialized)
function = orchestrator.Function.model_validate_json(function_serialized)

attempt = 0
while attempt <= task.max_retries:
Expand Down
65 changes: 65 additions & 0 deletions backend/builder/tests/test_task_build_image.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import time

import celery
import pytest

import orchestrator.mock as orc_mock
from builder.exceptions import BuildError
from builder.exceptions import BuildRetryError
from builder.tasks.tasks_build_image import build_image
from substrapp.models import FailedAssetKind
from substrapp.utils.errors import store_failure

CHANNEL = "mychannel"


@pytest.mark.django_db
def test_store_failure_build_error():
Expand All @@ -18,3 +26,60 @@ def test_store_failure_build_error():

assert str(failure_report.asset_key) == compute_task_key
assert failure_report.logs.read() == str.encode(msg)


@pytest.mark.parametrize("execution_number", range(10))
def test_order_building_success(celery_app, celery_worker, mocker, execution_number):
function_1 = orc_mock.FunctionFactory()
function_2 = orc_mock.FunctionFactory()

# BuildTask `before_start` uses this client to change the status, which would lead to `OrcError`
mocker.patch("builder.tasks.task.get_orchestrator_client")
mocker.patch("builder.tasks.tasks_build_image.build_image_if_missing", side_effect=lambda x, y: time.sleep(0.5))

result_1 = build_image.apply_async(
kwargs={"function_serialized": function_1.model_dump_json(), "channel_name": CHANNEL}
)
result_2 = build_image.apply_async(
kwargs={"function_serialized": function_2.model_dump_json(), "channel_name": CHANNEL}
)
# get waits for the completion
result_1.get()

assert result_1.state == celery.states.SUCCESS
assert result_2.state == "WAITING"


@pytest.mark.parametrize("execution_number", range(10))
def test_order_building_retry(celery_app, celery_worker, mocker, execution_number):
function_retry = orc_mock.FunctionFactory()
function_other = orc_mock.FunctionFactory()

# Only retry once for function_retry
def side_effect_creator():
already_raised = False

def side_effect(*args, **kwargs):
nonlocal already_raised
time.sleep(0.5)
key = args[1].key
if not already_raised and function_retry.key == key:
already_raised = True
raise BuildRetryError("random retriable error")

return side_effect

# BuildTask `before_start` uses this client to change the status, which would lead to `OrcError`
mocker.patch("builder.tasks.task.get_orchestrator_client")
mocker.patch("builder.tasks.tasks_build_image.build_image_if_missing", side_effect=side_effect_creator())

result_retry = build_image.apply_async(
kwargs={"function_serialized": function_retry.model_dump_json(), "channel_name": CHANNEL}
)
result_other = build_image.apply_async(
kwargs={"function_serialized": function_other.model_dump_json(), "channel_name": CHANNEL}
)

result_retry.get()
assert result_retry.state == celery.states.SUCCESS
assert result_other.state == "WAITING"
3 changes: 2 additions & 1 deletion backend/dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ mypy==1.4.1
djangorestframework-stubs==1.8.0
django-stubs==1.14.0
celery-types==0.14.0
docker==6.1.3
docker==6.1.3
celery[pytest]
2 changes: 1 addition & 1 deletion backend/substrapp/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def on_retry(self, exc: Exception, task_id: str, args: tuple, kwargs: dict[str,

def split_args(self, celery_args: tuple) -> tuple[str, orchestrator.ComputeTask]:
channel_name = celery_args[0]
task = orchestrator.ComputeTask.parse_raw(celery_args[1])
task = orchestrator.ComputeTask.model_validate_json(celery_args[1])
return channel_name, task

def get_task_info(self, args: tuple, kwargs: dict) -> tuple[str, str]:
Expand Down
4 changes: 2 additions & 2 deletions backend/substrapp/tasks/tasks_save_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def attempt(self) -> int:

# Returns (function key, channel)
def get_task_info(self, args: tuple, kwargs: dict) -> tuple[str, str]:
function = orchestrator.Function.parse_raw(kwargs["function_serialized"])
function = orchestrator.Function.model_validate_json(kwargs["function_serialized"])
channel_name = kwargs["channel_name"]
return function.key, channel_name

Expand Down Expand Up @@ -75,7 +75,7 @@ def save_image_task(task: SaveImageTask, function_serialized: str, channel_name:
logger.info("Starting save_image_task")
logger.info(f"Parameters: function_serialized {function_serialized}, " f"channel_name {channel_name}")
# create serialized image
function = orchestrator.Function.parse_raw(function_serialized)
function = orchestrator.Function.model_validate_json(function_serialized)
container_image_tag = utils.container_image_tag_from_function(function)

os.makedirs(SUBTUPLE_TMP_DIR, exist_ok=True)
Expand Down

0 comments on commit e7aa3e7

Please sign in to comment.