diff --git a/autotests/helm/values.yaml b/autotests/helm/values.yaml index cda6a5e..0b053f6 100644 --- a/autotests/helm/values.yaml +++ b/autotests/helm/values.yaml @@ -25,8 +25,8 @@ global: activeDeadlineSeconds: 3600 # 1h env: - PARTICIPANT_NAME: - api_host: http://inca-smc-mlops-challenge-solution.default.svc.cluster.local/ + PARTICIPANT_NAME: electriclizard + api_host: http://inca-smc-mlops-challenge-solution.default.svc.cluster.local/process # K6, do not edit! K6_PROMETHEUS_RW_SERVER_URL: http://kube-prometheus-stack-prometheus.monitoring.svc.cluster.local:9090/api/v1/write diff --git a/solution/Dockerfile b/solution/Dockerfile new file mode 100644 index 0000000..de256d6 --- /dev/null +++ b/solution/Dockerfile @@ -0,0 +1,16 @@ +FROM huggingface/transformers-pytorch-gpu +ARG DEBIAN_FRONTEND=noninteractive + +WORKDIR /src +ENV PYTHONPATH="${PYTHONPATH}:${WORKDIR}" + +COPY requirements.txt $WORKDIR + +RUN apt-get update && apt upgrade -y && \ + apt-get install -y libsm6 libxrender1 libfontconfig1 libxext6 libgl1-mesa-glx ffmpeg && \ + pip install -U pip setuptools && \ + pip install -U --no-cache-dir -r requirements.txt + +COPY . $WORKDIR + +ENTRYPOINT [ "uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080", "--workers", "1" ] diff --git a/solution/app.py b/solution/app.py new file mode 100644 index 0000000..a0f78de --- /dev/null +++ b/solution/app.py @@ -0,0 +1,132 @@ +from typing import List + +import asyncio + +import uvicorn +from fastapi import FastAPI, APIRouter +from fastapi.openapi.docs import get_swagger_ui_html +from fastapi.openapi.utils import get_openapi +from fastapi.responses import HTMLResponse +from starlette.requests import Request + +from configs.config import AppConfig, ModelConfig +from infrastructure.models import TransformerTextClassificationModel +from service.recognition import TextClassificationService +from handlers.recognition import PredictionHandler +from handlers.data_models import ResponseSchema + + +def build_models(model_configs: List[ModelConfig], tokenizer: str) -> List[TransformerTextClassificationModel]: + models = [ + TransformerTextClassificationModel(conf.model, conf.model_path, tokenizer) + for conf in model_configs + ] + return models + + +config = AppConfig.parse_file("./configs/app_config.yaml") +models = build_models(config.models, config.tokenizer) + +recognition_service = TextClassificationService(models) +recognition_handler = PredictionHandler(recognition_service, config.timeout) + +app = FastAPI() +router = APIRouter() + + +@app.on_event("startup") +def count_max_batch_size(): + print("Calculating Max batch size") + batch_size = 100 + + try: + while True: + text = ["this is simple text"]*batch_size + inputs = [model.tokenize_texts(text) for model in models] + outputs = [model(m_inputs) for model, m_inputs in zip(models, inputs)] + batch_size += 100 + + except RuntimeError as err: + if "CUDA out of memory" in str(err): + batch_size -= 100 + app.max_batch_size = batch_size + print(f"Max batch size calculated = {app.max_batch_size}") + + +@app.on_event("startup") +def create_queues(): + app.models_queues = {} + for md in models: + task_queue = asyncio.Queue() + app.models_queues[md.name] = task_queue + asyncio.create_task(recognition_handler.handle(md.name, task_queue)) + + app.tokenizer_queue = asyncio.Queue() + asyncio.create_task( + recognition_handler.tokenize_texts_batch( + app.tokenizer_queue, + list(app.models_queues.values()), + app.max_batch_size + ) + ) + + +@router.post("/process", response_model=ResponseSchema) +async def process(request: Request): + text = (await request.body()).decode() + + results = [] + response_q = asyncio.Queue() # init a response queue for every request, one for all models + + await app.tokenizer_queue.put((text, response_q)) + + for model_name, model_queue in app.models_queues.items(): + model_res = await response_q.get() + results.append(model_res) + return recognition_handler.serialize_answer(results) + + +app.include_router(router) + + +@app.get("/healthcheck") +async def main(): + return {"message": "I am alive"} + + +def custom_openapi(): + if app.openapi_schema: + return app.openapi_schema + openapi_schema = get_openapi( + title="NLP Model Service", + version="0.1.0", + description="Inca test task", + routes=app.routes, + ) + app.openapi_schema = openapi_schema + return app.openapi_schema + + +@app.get( + "/documentation/swagger-ui/", + response_class=HTMLResponse, +) +async def swagger_ui_html(): + return get_swagger_ui_html( + openapi_url="/documentation/openapi.json", + title="API documentation" + ) + + +@app.get( + "/documentation/openapi.json", + response_model_exclude_unset=True, + response_model_exclude_none=True, +) +async def openapi_endpoint(): + return custom_openapi() + + +if __name__ == "__main__": + uvicorn.run("app:app", host="0.0.0.0", port=config.port, workers=config.workers) + diff --git a/solution/configs/app_config.yaml b/solution/configs/app_config.yaml new file mode 100644 index 0000000..de01e2f --- /dev/null +++ b/solution/configs/app_config.yaml @@ -0,0 +1,18 @@ +tokenizer: "roberta-base" +models: + - model: "cardiffnlp" + model_path: "cardiffnlp/twitter-xlm-roberta-base-sentiment" + - model: "ivanlau" + model_path: "ivanlau/language-detection-fine-tuned-on-xlm-roberta-base" + - model: "svalabs" + model_path: "svalabs/twitter-xlm-roberta-crypto-spam" + - model: "EIStakovskii" + model_path: "EIStakovskii/xlm_roberta_base_multilingual_toxicity_classifier_plus" + - model: "jy46604790" + model_path: "jy46604790/Fake-News-Bert-Detect" + +port: 8080 +workers: 1 + +timeout: 0.01 + diff --git a/solution/configs/config.py b/solution/configs/config.py new file mode 100644 index 0000000..c5c1cf8 --- /dev/null +++ b/solution/configs/config.py @@ -0,0 +1,20 @@ +from typing import List + +from pydantic_yaml import YamlModel + + +class ModelConfig(YamlModel): + model: str + model_path: str + + +class AppConfig(YamlModel): + # model parameters + tokenizer: str + models: List[ModelConfig] + # app parameters + port: int + workers: int + # async queues parameters + timeout: float + diff --git a/solution/handlers/data_models.py b/solution/handlers/data_models.py new file mode 100644 index 0000000..a132a27 --- /dev/null +++ b/solution/handlers/data_models.py @@ -0,0 +1,17 @@ +from typing import List + +from pydantic import BaseModel, validator + + +class RecognitionSchema(BaseModel): + score: float + label: str + + +class ResponseSchema(BaseModel): + cardiffnlp: RecognitionSchema + ivanlau: RecognitionSchema + svalabs: RecognitionSchema + EIStakovskii: RecognitionSchema + jy46604790: RecognitionSchema + diff --git a/solution/handlers/recognition.py b/solution/handlers/recognition.py new file mode 100644 index 0000000..35b09cb --- /dev/null +++ b/solution/handlers/recognition.py @@ -0,0 +1,68 @@ +from typing import List +import asyncio + +from pydantic import ValidationError + +from infrastructure.models import TextClassificationModelData +from service.recognition import TextClassificationService +from handlers.data_models import ResponseSchema, RecognitionSchema + + +class PredictionHandler: + + def __init__(self, recognition_service: TextClassificationService, timeout: float): + self.recognition_service = recognition_service + self.timeout = timeout + + async def tokenize_texts_batch(self, consumer_queue, producer_queues, max_batch_size: int): + while True: + texts = [] + queues = [] + + try: + while True: + text, response_q = await asyncio.wait_for(consumer_queue.get(), timeout=self.timeout) + texts.append(text) + queues.append(response_q) + + except asyncio.exceptions.TimeoutError: + pass + + if texts: + for text_batch in self._perform_batches(texts, max_batch_size): + inputs = self.recognition_service.service_models[0].tokenize_texts(text_batch) + + for output_queue in producer_queues: + await output_queue.put((inputs, queues)) + + async def handle(self, model_name, model_queue): + while True: + inputs = None + queues = [] + + while True: + inputs, queues = await model_queue.get() + + if inputs: + model = next( + (model for model in self.recognition_service.service_models if model.name == model_name), + None + ) + if model: + outs = model(inputs) + for rq, out in zip(queues, outs): + await rq.put(out) + + def serialize_answer(self, results: List[TextClassificationModelData]) -> ResponseSchema: + res_model = {rec.model_name: self._recognitions_to_schema(rec) for rec in results} + return ResponseSchema(**res_model) + + def _recognitions_to_schema(self, recognition: TextClassificationModelData) -> RecognitionSchema: + if recognition.model_name != "ivanlau": + recognition.label = recognition.label.upper() + return RecognitionSchema(score=recognition.score, label=recognition.label) + + def _perform_batches(self, texts: List[str], max_batch_size): + for i in range(0, len(texts), max_batch_size): + yield texts[i:i + max_batch_size] + diff --git a/solution/helm/envs/electriclizard.yaml b/solution/helm/envs/electriclizard.yaml new file mode 100644 index 0000000..6d3c75c --- /dev/null +++ b/solution/helm/envs/electriclizard.yaml @@ -0,0 +1,13 @@ +global: + # add any variables you need in format `key: value` + # variables will be available in the container as environment variables + + # change 8000 to your application target port + pod: + ports: + - name: http + containerPort: 8080 + protocol: TCP + service: + targetPort: 8080 + diff --git a/solution/infrastructure/models.py b/solution/infrastructure/models.py new file mode 100644 index 0000000..3786cfa --- /dev/null +++ b/solution/infrastructure/models.py @@ -0,0 +1,74 @@ +from abc import ABC, abstractmethod +from collections.abc import Callable +from dataclasses import dataclass +from typing import List + +import torch +from transformers import AutoTokenizer, AutoModelForSequenceClassification + + + +@dataclass +class TextClassificationModelData: + model_name: str + label: str + score: float + + +class BaseTextClassificationModel(ABC): + + def __init__(self, name: str, model_path: str, tokenizer: str): + self.name = name + self.model_path = model_path + self.tokenizer = tokenizer + self.device = 0 if torch.cuda.is_available() else -1 + self._load_model() + + @abstractmethod + def _load_model(self): + ... + + @abstractmethod + def __call__(self, inputs) -> List[TextClassificationModelData]: + ... + + +class TransformerTextClassificationModel(BaseTextClassificationModel): + + def _load_model(self): + self.tokenizer = AutoTokenizer.from_pretrained(self.tokenizer) + self.model = AutoModelForSequenceClassification.from_pretrained(self.model_path) + self.model = self.model.to(self.device) + + def tokenize_texts(self, texts: List[str]): + inputs = self.tokenizer.batch_encode_plus( + texts, + add_special_tokens=True, + padding='longest', + truncation=True, + return_token_type_ids=True, + return_tensors='pt' + ) + inputs = {k: v.to(self.device) for k, v in inputs.items()} # Move inputs to GPU + return inputs + + def _results_from_logits(self, logits: torch.Tensor): + id2label = self.model.config.id2label + + label_ids = logits.argmax(dim=1) + scores = logits.softmax(dim=-1) + results = [ + { + "label": id2label[label_id.item()], + "score": score[label_id.item()].item() + } + for label_id, score in zip(label_ids, scores) + ] + return results + + def __call__(self, inputs) -> List[TextClassificationModelData]: + logits = self.model(**inputs).logits + predictions = self._results_from_logits(logits) + predictions = [TextClassificationModelData(self.name, **prediction) for prediction in predictions] + return predictions + diff --git a/solution/requirements.txt b/solution/requirements.txt new file mode 100644 index 0000000..7c28cc1 --- /dev/null +++ b/solution/requirements.txt @@ -0,0 +1,6 @@ +fastapi[all]==0.95.1 +uvicorn==0.22.0 +numpy==1.23.5 +pydantic==1.10.7 +pydantic-yaml==0.11.2 + diff --git a/solution/service/recognition.py b/solution/service/recognition.py new file mode 100644 index 0000000..d72cbb1 --- /dev/null +++ b/solution/service/recognition.py @@ -0,0 +1,16 @@ +from abc import ABC, abstractmethod +from typing import List +from dataclasses import dataclass + +from infrastructure.models import BaseTextClassificationModel, TextClassificationModelData + + +class TextClassificationService: + + def __init__(self, models: List[BaseTextClassificationModel]): + self.service_models = models + + def get_results(self, input_texts: List[str]) -> List[List[TextClassificationModelData]]: + results = [model(input_texts) for model in self.service_models] + return results +