Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: log on builder celery tasks #713

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions backend/api/migrations/0053_function_logs_address.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Django 4.1.7 on 2023-08-23 10:03

from django.db import migrations
from django.db import models

import api.models.utils


class Migration(migrations.Migration):
dependencies = [
("api", "0052_remove_metric_from_performance"),
]

operations = [
migrations.AddField(
model_name="function",
name="logs_address",
field=models.URLField(null=True, validators=[api.models.utils.URLValidatorWithOptionalTLD()]),
),
]
1 change: 1 addition & 0 deletions backend/api/models/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class Function(models.Model, AssetPermissionMixin):
permissions_process_authorized_ids = ArrayField(models.CharField(max_length=1024), size=100)
owner = models.CharField(max_length=100)
creation_date = models.DateTimeField()
logs_address = models.URLField(validators=[URLValidatorWithOptionalTLD()], null=True)
metadata = models.JSONField()
channel = models.CharField(max_length=100)

Expand Down
26 changes: 22 additions & 4 deletions backend/api/tests/asset_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
from api.models import Model
from api.models import Performance
from api.models import TaskProfiling
from substrapp.models import ComputeTaskFailureReport as ComputeTaskLogs
from substrapp.models import CeleryTaskFailureReport
from substrapp.models import DataManager as DataManagerFiles
from substrapp.models import DataSample as DataSampleFiles
from substrapp.models import Function as FunctionFiles
Expand Down Expand Up @@ -537,19 +537,37 @@ def create_model_files(
def create_computetask_logs(
compute_task_key: uuid.UUID,
logs: files.File = None,
) -> ComputeTaskLogs:
) -> CeleryTaskFailureReport:
if logs is None:
logs = files.base.ContentFile("dummy content")

compute_task_logs = ComputeTaskLogs.objects.create(
compute_task_key=compute_task_key,
compute_task_logs = CeleryTaskFailureReport.objects.create(
asset_key=compute_task_key,
asset_type="COMPUTE_TASK",
logs_checksum=get_hash(logs),
creation_date=timezone.now(),
)
compute_task_logs.logs.save("logs", logs)
return compute_task_logs


def create_function_build_logs(
function_key: uuid.UUID,
logs: files.File = None,
) -> CeleryTaskFailureReport:
if logs is None:
logs = files.base.ContentFile("dummy content")

function_build_logs = CeleryTaskFailureReport.objects.create(
asset_key=function_key,
asset_type="FUNCTION",
logs_checksum=get_hash(logs),
creation_date=timezone.now(),
)
function_build_logs.logs.save("logs", logs)
return function_build_logs


def create_computetask_profiling(compute_task: ComputeTask) -> TaskProfiling:
profile = TaskProfiling.objects.create(compute_task=compute_task)
return profile
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
from api.views import utils as view_utils
from organization import authentication as organization_auth
from organization import models as organization_models
from substrapp.models import ComputeTaskFailureReport
from substrapp.models import CeleryTaskFailureReport


@pytest.fixture
def compute_task_failure_report() -> tuple[ComputeTask, ComputeTaskFailureReport]:
def compute_task_failure_report() -> tuple[ComputeTask, CeleryTaskFailureReport]:
compute_task = factory.create_computetask(
factory.create_computeplan(),
factory.create_function(),
Expand Down
2 changes: 1 addition & 1 deletion backend/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
router.register(r"compute_plan_metadata", views.ComputePlanMetadataViewSet, basename="compute_plan_metadata")
router.register(r"news_feed", views.NewsFeedViewSet, basename="news_feed")
router.register(r"performance", views.PerformanceViewSet, basename="performance")
router.register(r"logs", views.ComputeTaskLogsViewSet, basename="logs")
router.register(r"logs", views.CeleryTaskLogsViewSet, basename="logs")
router.register(r"task_profiling", views.TaskProfilingViewSet, basename="task_profiling")

task_profiling_router = routers.NestedDefaultRouter(router, r"task_profiling", lookup="task_profiling")
Expand Down
4 changes: 2 additions & 2 deletions backend/api/views/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from .celery_task_logs import CeleryTaskLogsViewSet
from .compute_plan_graph import get_cp_graph
from .computeplan import ComputePlanViewSet
from .computetask import ComputeTaskViewSet
from .computetask import CPTaskViewSet
from .computetask_logs import ComputeTaskLogsViewSet
from .datamanager import DataManagerPermissionViewSet
from .datamanager import DataManagerViewSet
from .datasample import DataSampleViewSet
Expand Down Expand Up @@ -31,7 +31,7 @@
"CPTaskViewSet",
"CPFunctionViewSet",
"NewsFeedViewSet",
"ComputeTaskLogsViewSet",
"CeleryTaskLogsViewSet",
"CPPerformanceViewSet",
"ComputePlanMetadataViewSet",
"PerformanceViewSet",
Expand Down
24 changes: 24 additions & 0 deletions backend/api/views/celery_task_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from rest_framework import response as drf_response
from rest_framework import viewsets
from rest_framework.decorators import action

from api.models import ComputeTask
from api.models import Function
from api.views import utils as view_utils
from substrapp.models import celery_task_failure_report


class CeleryTaskLogsViewSet(view_utils.PermissionMixin, viewsets.GenericViewSet):
queryset = celery_task_failure_report.CeleryTaskFailureReport.objects.all()

@action(detail=True, url_path=celery_task_failure_report.LOGS_FILE_PATH)
def file(self, request, pk=None) -> drf_response.Response:
report = self.get_object()
if report.asset_type == "COMPUTE_TASK":
asset_model = ComputeTask
elif report.asset_type == "FUNCTION":
asset_model = Function
response = self.download_file(request, asset_model, "logs", "logs_address")
response.headers["Content-Type"] = "text/plain; charset=utf-8"
response.headers["Content-Disposition"] = f'attachment; filename="tuple_logs_{pk}.txt"'
return response
18 changes: 0 additions & 18 deletions backend/api/views/computetask_logs.py

This file was deleted.

2 changes: 2 additions & 0 deletions backend/backend/settings/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@

LEDGER_MSP_ID = "testOrgMSP"
LEDGER_CHANNELS = {"mychannel": {"chaincode": {"name": "mycc"}}}

IMAGE_BUILD_TIMEOUT = 3 * 60 * 60 # 3 hours
53 changes: 29 additions & 24 deletions backend/builder/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@
from django.conf import settings

import orchestrator

# from substrapp.compute_tasks import errors as compute_task_errors
# from substrapp.orchestrator import get_orchestrator_client
# from substrapp.utils.errors import store_failure

from substrapp.compute_tasks import errors as compute_task_errors
from substrapp.orchestrator import get_orchestrator_client
from substrapp.utils.errors import store_failure

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -38,25 +36,32 @@ def on_failure(

close_old_connections()

# channel_name, function, compute_task_key = self.split_args(args)

# failure_report = store_failure(exc, compute_task_key)
# error_type = compute_task_errors.get_error_type(exc)

# with get_orchestrator_client(channel_name) as client:
# # On the backend, only execution errors lead to the creation of compute task failure report instances
# # to store the execution logs.
# if failure_report:
# logs_address = {
# "checksum": failure_report.logs_checksum,
# "storage_address": failure_report.logs_address,
# }
# else:
# logs_address = None

# client.register_failure_report(
# {"compute_task_key": compute_task_key, "error_type": error_type, "logs_address": logs_address}
# )
channel_name, function = self.split_args(args)
asset_key = function.key
asset_type = "FUNCTION"

failure_report = store_failure(exc=exc, asset_key=asset_key, asset_type=asset_type)
error_type = compute_task_errors.get_error_type(exc)

with get_orchestrator_client(channel_name) as client:
# On the backend, only execution errors lead to the creation of compute task failure report instances
# to store the execution logs.
if failure_report:
logs_address = {
"checksum": failure_report.logs_checksum,
"storage_address": failure_report.logs_address,
}
else:
logs_address = None

client.register_failure_report(
{
"asset_key": asset_key,
"asset_type": asset_type,
"error_type": error_type,
"logs_address": logs_address,
}
)

def split_args(self, celery_args: tuple) -> tuple[str, orchestrator.Function, str]:
channel_name = celery_args[1]
Expand Down
6 changes: 3 additions & 3 deletions backend/builder/tests/test_task_build_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

@pytest.mark.django_db
def test_store_failure_build_error():
compute_task_key = "42ff54eb-f4de-43b2-a1a0-a9f4c5f4737f"
asset_key = "42ff54eb-f4de-43b2-a1a0-a9f4c5f4737f"
msg = "Error building image"
exc = BuildError(msg)

failure_report = store_failure(exc, compute_task_key)
failure_report = store_failure(exc=exc, asset_key=asset_key, asset_type="FUNCTION")
failure_report.refresh_from_db()

assert str(failure_report.compute_task_key) == compute_task_key
assert str(failure_report.compute_task_key) == asset_key
assert failure_report.logs.read() == str.encode(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Migration(migrations.Migration):
models.FileField(
max_length=36,
storage=django.core.files.storage.FileSystemStorage(),
upload_to=substrapp.models.compute_task_failure_report._upload_to,
upload_to=substrapp.models.celery_task_failure_report._upload_to,
),
),
("logs_checksum", models.CharField(max_length=64)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from django.db import migrations
from django.db import models

import substrapp.models.compute_task_failure_report
import substrapp.models.celery_task_failure_report
import substrapp.models.datamanager
import substrapp.models.function
import substrapp.storages.minio
Expand Down Expand Up @@ -39,7 +39,7 @@ class Migration(migrations.Migration):
field=models.FileField(
max_length=36,
storage=substrapp.storages.minio.MinioStorage("substra-compute-task-logs"),
upload_to=substrapp.models.compute_task_failure_report._upload_to,
upload_to=substrapp.models.celery_task_failure_report._upload_to,
),
),
migrations.AlterField(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from django.db import migrations
from django.db import models

import substrapp.models.compute_task_failure_report
import substrapp.models.celery_task_failure_report
import substrapp.models.datamanager
import substrapp.models.function

Expand Down Expand Up @@ -39,7 +39,7 @@ class Migration(migrations.Migration):
field=models.FileField(
max_length=36,
storage=django.core.files.storage.FileSystemStorage(),
upload_to=substrapp.models.compute_task_failure_report._upload_to,
upload_to=substrapp.models.celery_task_failure_report._upload_to,
),
),
migrations.AlterField(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from django.db import migrations
from django.db import models

import substrapp.models.compute_task_failure_report
import substrapp.models.celery_task_failure_report
import substrapp.storages.minio


Expand All @@ -19,7 +19,7 @@ class Migration(migrations.Migration):
field=models.FileField(
max_length=36,
storage=substrapp.storages.minio.MinioStorage("substra-compute-task-logs"),
upload_to=substrapp.models.compute_task_failure_report._upload_to,
upload_to=substrapp.models.celery_task_failure_report._upload_to,
),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Generated by Django 4.1.7 on 2023-08-23 10:03

import django.core.files.storage
from django.db import migrations
from django.db import models

import substrapp.models.celery_task_failure_report
import substrapp.models.function


class Migration(migrations.Migration):
dependencies = [
("substrapp", "0016_add_functionimage"),
]

operations = [
migrations.CreateModel(
name="CeleryTaskFailureReport",
fields=[
("asset_key", models.UUIDField(editable=False, primary_key=True, serialize=False)),
("asset_type", models.CharField(max_length=100)),
(
"logs",
models.FileField(
max_length=36,
storage=django.core.files.storage.FileSystemStorage(),
upload_to=substrapp.models.celery_task_failure_report._upload_to,
),
),
("logs_checksum", models.CharField(max_length=64)),
("creation_date", models.DateTimeField(auto_now_add=True)),
],
),
migrations.DeleteModel(
name="ComputeTaskFailureReport",
),
migrations.AlterField(
model_name="functionimage",
name="file",
field=models.FileField(
max_length=500,
storage=django.core.files.storage.FileSystemStorage(),
upload_to=substrapp.models.function.upload_to_function,
),
),
]
4 changes: 2 additions & 2 deletions backend/substrapp/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .compute_task_failure_report import ComputeTaskFailureReport
from .celery_task_failure_report import CeleryTaskFailureReport
from .computeplan_worker_mapping import ComputePlanWorkerMapping
from .datamanager import DataManager
from .datasample import DataSample
Expand All @@ -16,6 +16,6 @@
"Model",
"ComputePlanWorkerMapping",
"ImageEntrypoint",
"ComputeTaskFailureReport",
"CeleryTaskFailureReport",
"WorkerLastEvent",
]
Loading