Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature - Multipart Uploads #324

Merged
merged 13 commits into from
Jan 28, 2025
Merged
66 changes: 66 additions & 0 deletions backend/app/api/endpoints/base/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
from fastapi.responses import FileResponse

from app.domain.schemas.base.model import (
AbortMultipartRequest,
BatchCreateExampleRequest,
BatchURLsResponse,
CompleteBigModelUploadRequest,
ConversationWithBufferMemoryRequest,
DownloadAllExamplesRequest,
ModelInTheLoopRequest,
ModelPredictionPerDatasetRequest,
SingleModelEvaluationRequest,
SingleModelEvaluationResponse,
UpdateModelInfoRequest,
UploadBigModelRequest,
UploadModelToS3AndEvaluateRequest,
)
from app.domain.services.base.model import ModelService
Expand Down Expand Up @@ -121,6 +125,68 @@ def heavy_evaluation(
return "The model will be evaluated in the background"


@router.post("/initiate-mutipart-upload", response_model=BatchURLsResponse)
def initiate_multipart_upload(model: UploadBigModelRequest):
return ModelService().initiate_multipart_upload(
model.model_name,
model.file_name,
model.content_type,
model.user_id,
model.task_code,
model.parts_count,
)


@router.post("/abort-mutipart-upload")
def abort_multipart_upload(model: AbortMultipartRequest):
return ModelService().abort_multipart_upload(
model.upload_id,
model.task_code,
model.model_name,
model.user_id,
model.file_name,
)


@router.post("/complete-multipart-upload")
def complete_multipart_upload(
model: CompleteBigModelUploadRequest,
background_tasks: BackgroundTasks,
):
ModelService().complete_multipart_upload(
model.upload_id,
model.parts,
model.user_id,
model.task_code,
model.model_name,
model.file_name,
)
data = ModelService().create_model(
model.model_name,
model.description,
model.num_paramaters,
model.languages,
model.license,
model.file_name,
model.user_id,
model.task_code,
)
background_tasks.add_task(
ModelService().run_heavy_evaluation,
data["model_path"],
data["model_id"],
data["save_s3_path"],
data["inference_url"],
data["metadata_url"],
)
background_tasks.add_task(
ModelService().send_uploaded_model_email,
data["user_email"],
data["model_name"],
)
return "The model will be evaluated in the background"


@router.get("/initiate_lambda_models")
def initiate_lambda_models() -> None:
return ModelService().initiate_lambda_models()
Expand Down
46 changes: 43 additions & 3 deletions backend/app/domain/schemas/base/model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) MLCommons and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from typing import Optional, Union
from typing import List, Optional, Union

from fastapi import File, UploadFile
from pydantic import BaseModel
Expand Down Expand Up @@ -35,8 +35,7 @@ class ModelInTheLoopRequest(BaseModel):
task_id: int


@form_body
class UploadModelToS3AndEvaluateRequest(BaseModel):
class BaseforModelRequest(BaseModel):
model_name: Optional[str]
description: Optional[str]
num_paramaters: Optional[float]
Expand All @@ -45,6 +44,10 @@ class UploadModelToS3AndEvaluateRequest(BaseModel):
file_name: str
user_id: int
task_code: str


@form_body
class UploadModelToS3AndEvaluateRequest(BaseforModelRequest):
file_to_upload: UploadFile = File(...)


Expand Down Expand Up @@ -86,3 +89,40 @@ class UpdateModelInfoRequest(BaseModel):

class DownloadAllExamplesRequest(BaseModel):
task_id: int


class UploadBigModelRequest(BaseModel):
model_name: str
file_name: str
content_type: str
user_id: int
task_code: str
parts_count: int


class PreSignedURL(BaseModel):
batch_numer: int
batch_presigned_url: str


class BatchURLsResponse(BaseModel):
upload_id: str
urls: List


class FilePart(BaseModel):
ETag: str
PartNumber: int


class CompleteBigModelUploadRequest(BaseforModelRequest):
upload_id: str
parts: List[FilePart]


class AbortMultipartRequest(BaseModel):
upload_id: str
task_code: str
model_name: str
user_id: int
file_name: str
202 changes: 201 additions & 1 deletion backend/app/domain/services/base/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import re
import secrets
import time
from typing import List

import boto3
import boto3.session
import requests
import yaml
from fastapi import HTTPException, UploadFile
Expand All @@ -23,6 +25,7 @@
load_json_lines,
transform_list_to_csv,
)
from app.domain.schemas.base.model import BatchURLsResponse
from app.domain.services.base.example import ExampleService
from app.domain.services.base.rounduserexampleinfo import RoundUserExampleInfoService
from app.domain.services.base.score import ScoreService
Expand Down Expand Up @@ -60,7 +63,9 @@ def __init__(self):
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
region_name=os.getenv("AWS_REGION"),
)
self.s3 = self.session.client("s3")
self.s3 = self.session.client(
"s3", config=boto3.session.Config(signature_version="s3v4")
)
self.s3_bucket = os.getenv("AWS_S3_BUCKET")
self.email_helper = EmailHelper()
self.providers = {
Expand Down Expand Up @@ -627,3 +632,198 @@ def get_dynalab_model(self, task_code: str):
bucket = "https://models-dynalab.s3.eu-west-3.amazonaws.com"
dynalab_link = f"{bucket}/{task_code}/dynalab-base-{task_code}.zip"
return dynalab_link

def initiate_multipart_upload(
self,
model_name: str,
file_name: str,
content_type: str,
user_id: int,
task_code: str,
parts_count: int,
) -> BatchURLsResponse:
task_id = self.task_repository.get_task_id_by_task_code(task_code)[0]
yaml_file = self.task_repository.get_config_file_by_task_id(task_id)[0]
yaml_file = yaml.safe_load(yaml_file)
task_s3_bucket = self.task_repository.get_s3_bucket_by_task_id(task_id)[0]

file_name = file_name.lower()
file_name = file_name.replace("/", ":")
file_name = re.sub(r"\s+", "_", file_name)
clean_file_name = re.sub(r"_+", "_", file_name)

model_name_clean = model_name.lower()
model_name_clean = model_name_clean.replace("/", ":")
model_name_clean = re.sub(r"\s+", "_", model_name_clean)
model_name_clean = re.sub(r"_+", "_", model_name_clean)

model_path = f"{task_code}/submited_models/{task_id}-{user_id}-{model_name}-{clean_file_name}"
try:
response = self.s3.create_multipart_upload(
Bucket=task_s3_bucket, Key=model_path, ContentType=content_type
)
upload_id = response["UploadId"]

urls = []
for part_number in range(1, parts_count + 1):
presigned_url = self.s3.generate_presigned_url(
"upload_part",
Params={
"Bucket": task_s3_bucket,
"Key": model_path,
"UploadId": upload_id,
"PartNumber": part_number,
},
ExpiresIn=3600,
HttpMethod="put",
)
urls.append(presigned_url)
except Exception as e:
print("There was an error while generating pre signed urls", e)

return {"upload_id": upload_id, "urls": urls}

def complete_multipart_upload(
self,
upload_id: int,
parts: List,
user_id: str,
task_code: str,
model_name: str,
file_name: str,
):
task_id = self.task_repository.get_task_id_by_task_code(task_code)[0]
yaml_file = self.task_repository.get_config_file_by_task_id(task_id)[0]
yaml_file = yaml.safe_load(yaml_file)
task_s3_bucket = self.task_repository.get_s3_bucket_by_task_id(task_id)[0]

file_name = file_name.lower()
file_name = file_name.replace("/", ":")
file_name = re.sub(r"\s+", "_", file_name)
clean_file_name = re.sub(r"_+", "_", file_name)

model_name_clean = model_name.lower()
model_name_clean = model_name_clean.replace("/", ":")
model_name_clean = re.sub(r"\s+", "_", model_name_clean)
model_name_clean = re.sub(r"_+", "_", model_name_clean)

model_path = f"{task_code}/submited_models/{task_id}-{user_id}-{model_name}-{clean_file_name}"

parts = sorted(parts, key=lambda x: x.PartNumber)
parts = [p.dict() for p in parts]

try:
self.s3.complete_multipart_upload(
Bucket=task_s3_bucket,
Key=model_path,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
self.s3.head_object(
Bucket=task_s3_bucket,
Key=model_path,
)
except Exception as e:
print("Failed to complete upload:", e)
raise HTTPException(
status_code=500, detail=f"Failed to complete upload: {str(e)}"
)

def create_model(
self,
model_name: str,
description: str,
num_paramaters: str,
languages: str,
license: str,
file_name: str,
user_id: str,
task_code: str,
):
task_id = self.task_repository.get_task_id_by_task_code(task_code)[0]
yaml_file = self.task_repository.get_config_file_by_task_id(task_id)[0]
yaml_file = yaml.safe_load(yaml_file)
task_s3_bucket = self.task_repository.get_s3_bucket_by_task_id(task_id)[0]
user_email = self.user_repository.get_user_email(user_id)[0]

file_name = file_name.lower()
file_name = file_name.replace("/", ":")
file_name = re.sub(r"\s+", "_", file_name)
clean_file_name = re.sub(r"_+", "_", file_name)

model_name_clean = model_name.lower()
model_name_clean = model_name_clean.replace("/", ":")
model_name_clean = re.sub(r"\s+", "_", model_name_clean)
model_name_clean = re.sub(r"_+", "_", model_name_clean)

uri_logging = f"s3://{task_s3_bucket}/{task_code}/inference_logs/"
uri_model = f"s3://{task_s3_bucket}/{task_code}/submited_models/{task_id}-{user_id}-{model_name}-{clean_file_name}"
inference_url = yaml_file["evaluation"]["inference_url"]
metadata_url = f"s3://{task_s3_bucket}/{task_code}/metadata/"

try:
self.user_repository.increment_model_submitted_count(user_id)
model = self.model_repository.create_new_model(
task_id=task_id,
user_id=user_id,
model_name=model_name,
shortname=model_name,
longdesc=description,
desc=description,
languages=languages,
license=license,
params=num_paramaters,
deployment_status="uploaded",
secret=secrets.token_hex(),
)
print("The model has been uploaded and created in the DB")
return {
"model_path": uri_model,
"save_s3_path": uri_logging,
"model_id": model["id"],
"model_name": model_name,
"user_email": user_email,
"inference_url": inference_url,
"metadata_url": metadata_url,
"s3_bucket": task_s3_bucket,
}
except Exception as e:
print(f"An unexpected error occurred: {e}")
return "Model upload failed"

def abort_multipart_upload(
self,
upload_id: str,
task_code: str,
model_name: str,
user_id: int,
file_name: str,
):
task_id = self.task_repository.get_task_id_by_task_code(task_code)[0]
yaml_file = self.task_repository.get_config_file_by_task_id(task_id)[0]
yaml_file = yaml.safe_load(yaml_file)
task_s3_bucket = self.task_repository.get_s3_bucket_by_task_id(task_id)[0]

file_name = file_name.lower()
file_name = file_name.replace("/", ":")
file_name = re.sub(r"\s+", "_", file_name)
clean_file_name = re.sub(r"_+", "_", file_name)

model_name_clean = model_name.lower()
model_name_clean = model_name_clean.replace("/", ":")
model_name_clean = re.sub(r"\s+", "_", model_name_clean)
model_name_clean = re.sub(r"_+", "_", model_name_clean)

model_path = f"{task_code}/submited_models/{task_id}-{user_id}-{model_name}-{clean_file_name}"

try:
self.s3.abort_multipart_upload(
Bucket=task_s3_bucket,
Key=model_path,
UploadId=upload_id,
)
return {"message": "Multipart upload aborted successfully."}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to abort upload: {str(e)}"
)
Loading