Skip to content

Commit

Permalink
feat: add logs management for functions
Browse files Browse the repository at this point in the history
Signed-off-by: ThibaultFy <[email protected]>
  • Loading branch information
ThibaultFy committed Aug 23, 2023
1 parent ba5c1d3 commit d4fec54
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 91 deletions.
1 change: 1 addition & 0 deletions backend/api/models/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class Function(models.Model, AssetPermissionMixin):
permissions_process_authorized_ids = ArrayField(models.CharField(max_length=1024), size=100)
owner = models.CharField(max_length=100)
creation_date = models.DateTimeField()
logs_address = models.URLField(validators=[URLValidatorWithOptionalTLD()], null=True)
metadata = models.JSONField()
channel = models.CharField(max_length=100)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from rest_framework.decorators import action

from api.models import ComputeTask
from api.models import Function
from api.views import utils as view_utils
from substrapp.models import celery_task_failure_report

Expand All @@ -12,7 +13,12 @@ class CeleryTaskLogsViewSet(view_utils.PermissionMixin, viewsets.GenericViewSet)

@action(detail=True, url_path=celery_task_failure_report.LOGS_FILE_PATH)
def file(self, request, pk=None) -> drf_response.Response:
response = self.download_file(request, ComputeTask, "logs", "logs_address")
report = self.get_object()
if report.asset_type == "COMPUTE_TASK":
asset_model = ComputeTask
elif report.asset_type == "FUNCTION":
asset_model = Function
response = self.download_file(request, asset_model, "logs", "logs_address")
response.headers["Content-Type"] = "text/plain; charset=utf-8"
response.headers["Content-Disposition"] = f'attachment; filename="tuple_logs_{pk}.txt"'
return response
18 changes: 0 additions & 18 deletions backend/api/views/computetask_logs.py

This file was deleted.

52 changes: 28 additions & 24 deletions backend/builder/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@
from django.conf import settings

import orchestrator

# from substrapp.compute_tasks import errors as compute_task_errors
# from substrapp.orchestrator import get_orchestrator_client
# from substrapp.utils.errors import store_failure

from substrapp.orchestrator import get_orchestrator_client
from substrapp.utils.errors import store_failure

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -38,25 +35,32 @@ def on_failure(

close_old_connections()

# channel_name, function, compute_task_key = self.split_args(args)

# failure_report = store_failure(exc, compute_task_key)
# error_type = compute_task_errors.get_error_type(exc)

# with get_orchestrator_client(channel_name) as client:
# # On the backend, only execution errors lead to the creation of compute task failure report instances
# # to store the execution logs.
# if failure_report:
# logs_address = {
# "checksum": failure_report.logs_checksum,
# "storage_address": failure_report.logs_address,
# }
# else:
# logs_address = None

# client.register_failure_report(
# {"compute_task_key": compute_task_key, "error_type": error_type, "logs_address": logs_address}
# )
channel_name, function = self.split_args(args)
asset_key = function.key
asset_type = "FUNCTION"

failure_report = store_failure(exc=exc, asset_key=asset_key, asset_type=asset_type)
error_type = "Build Error"

with get_orchestrator_client(channel_name) as client:
# On the backend, only execution errors lead to the creation of compute task failure report instances
# to store the execution logs.
if failure_report:
logs_address = {
"checksum": failure_report.logs_checksum,
"storage_address": failure_report.logs_address,
}
else:
logs_address = None

client.register_failure_report(
{
"asset_key": asset_key,
"asset_type": asset_type,
"error_type": error_type,
"logs_address": logs_address,
}
)

def split_args(self, celery_args: tuple) -> tuple[str, orchestrator.Function, str]:
channel_name = celery_args[1]
Expand Down
4 changes: 2 additions & 2 deletions backend/substrapp/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .compute_task_failure_report import ComputeTaskFailureReport
from .celery_task_failure_report import CeleryTaskFailureReport
from .computeplan_worker_mapping import ComputePlanWorkerMapping
from .datamanager import DataManager
from .datasample import DataSample
Expand All @@ -16,6 +16,6 @@
"Model",
"ComputePlanWorkerMapping",
"ImageEntrypoint",
"ComputeTaskFailureReport",
"CeleryTaskFailureReport",
"WorkerLastEvent",
]
4 changes: 2 additions & 2 deletions backend/substrapp/models/celery_task_failure_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class CeleryTaskFailureReport(models.Model):
"""Store information relative to a celery task."""

asset_key = models.UUIDField(primary_key=True, editable=False)
asset_type = models.CharField(max_length=_UUID_STRING_REPR_LENGTH)
asset_type = models.CharField(max_length=100)
logs = models.FileField(
storage=settings.COMPUTE_TASK_LOGS_STORAGE, max_length=_UUID_STRING_REPR_LENGTH, upload_to=_upload_to
)
Expand All @@ -37,5 +37,5 @@ def type(self) -> str:

@property
def logs_address(self) -> str:
logs_path = f"{LOGS_BASE_PATH}/{self.asset_type}/{self.asset_key}/{LOGS_FILE_PATH}/"
logs_path = f"{LOGS_BASE_PATH}/{self.asset_key}/{LOGS_FILE_PATH}/"
return urllib.parse.urljoin(settings.DEFAULT_DOMAIN, logs_path)
36 changes: 0 additions & 36 deletions backend/substrapp/models/compute_task_failure_report.py

This file was deleted.

12 changes: 9 additions & 3 deletions backend/substrapp/tasks/tasks_compute_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ def on_failure(
close_old_connections()

channel_name, task = self.split_args(args)
compute_task_key = task.key
asset_key = task.key
asset_type = "COMPUTE_TASK"

failure_report = store_failure(exc, compute_task_key)
failure_report = store_failure(exc=exc, asset_key=asset_key, asset_type=asset_type)
error_type = compute_task_errors.get_error_type(exc)

with get_orchestrator_client(channel_name) as client:
Expand All @@ -123,7 +124,12 @@ def on_failure(
logs_address = None

client.register_failure_report(
{"compute_task_key": compute_task_key, "error_type": error_type, "logs_address": logs_address}
{
"asset_key": asset_key,
"asset_type": asset_type,
"error_type": error_type,
"logs_address": logs_address,
}
)

def split_args(self, celery_args: tuple) -> tuple[str, orchestrator.ComputeTask]:
Expand Down
10 changes: 5 additions & 5 deletions backend/substrapp/utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@
from substrapp.compute_tasks import errors as compute_task_errors


def store_failure(exc: Exception, compute_task_key: str) -> Optional[models.ComputeTaskFailureReport]:
def store_failure(exc: Exception, asset_key: str, asset_type: str) -> Optional[models.CeleryTaskFailureReport]:
"""If the provided exception is a `BuildError` or an `ExecutionError`, store its logs in the Django storage and
in the database. Otherwise, do nothing.
Returns:
An instance of `models.ComputeTaskFailureReport` storing the error logs or None if the provided exception is
An instance of `models.CeleryTaskFailureReport` storing the error logs or None if the provided exception is
neither a `BuildError` nor an `ExecutionError`.
"""

if not isinstance(exc, (compute_task_errors.ExecutionError, builder_errors.BuildError)):
return None

file = files.File(exc.logs)
failure_report = models.ComputeTaskFailureReport(
compute_task_key=compute_task_key, logs_checksum=utils.get_hash(file)
failure_report = models.CeleryTaskFailureReport(
asset_key=asset_key, asset_type=asset_type, logs_checksum=utils.get_hash(file)
)
failure_report.logs.save(name=compute_task_key, content=file, save=True)
failure_report.logs.save(name=asset_key, content=file, save=True)
return failure_report

0 comments on commit d4fec54

Please sign in to comment.