Skip to content

Commit

Permalink
Fix/1092 task cancellation issue (#1093)
Browse files Browse the repository at this point in the history
* Fix analysis cancellation if analyses is queued

* Allow exection cancel to be re-issued if sub-tasks are marked as running or queued

* Revert "Allow exection cancel to be re-issued if sub-tasks are marked as running or queued"

This reverts commit bbb2f32.

* WIP

* Revert "Fix analysis cancellation if analyses is queued"

This reverts commit 8ca9da0.

* Send Update sub-task ID when creaing chain + kill any task marked as REVOKE

* pep

* replace prints with logger

* Set celery workers to deubg

* Fix task-id extraction

* handle exceptions - don't fail job if task-id extract fails
  • Loading branch information
sambles committed Aug 5, 2024
1 parent 14a1271 commit 0eff028
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 94 deletions.
232 changes: 159 additions & 73 deletions compose/debug.docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,30 +1,51 @@
version: '3.4'
volumes:
server-db-OasisData:
celery-db-OasisData:
filestore-OasisData:
x-shared-env: &shared-env
OASIS_DEBUG: 1
#OASIS_CELERY_BROKER_URL: "amqp://rabbit:rabbit@broker:5672"
OASIS_CELERY_BROKER_URL: "redis://broker:6379"
OASIS_KEEP_RUN_DIR: 1
OASIS_KEEP_LOCAL_DATA: 1
OASIS_KEEP_REMOTE_DATA: 1
OASIS_URL_SUB_PATH: 0
OASIS_CELERY_BROKER_URL: "amqp://rabbit:rabbit@broker:5672"
#OASIS_CELERY_BROKER_URL: "redis://broker:6379"
OASIS_SERVER_DB_HOST: server-db
OASIS_SERVER_DB_PASS: oasis
OASIS_SERVER_DB_USER: oasis
OASIS_SERVER_DB_NAME: oasis
OASIS_SERVER_DB_PORT: 3306
OASIS_SERVER_DB_PORT: 5432
OASIS_SERVER_CHANNEL_LAYER_HOST: channel-layer
OASIS_CELERY_DB_ENGINE: db+mysql+pymysql
OASIS_SERVER_DB_ENGINE: django.db.backends.postgresql_psycopg2
OASIS_CELERY_DB_ENGINE: db+postgresql+psycopg2
OASIS_CELERY_DB_HOST: celery-db
OASIS_CELERY_DB_PASS: password
OASIS_CELERY_DB_USER: celery
OASIS_CELERY_DB_NAME: celery
OASIS_CELERY_DB_PORT: 3306
OASIS_CELERY_DB_PORT: 5432
OASIS_TASK_CONTROLLER_QUEUE: task-controller

x-oasis-env-v1: &oasis-env-v1
OASIS_DEBUG: ${DEBUG:-0}
OASIS_RABBIT_HOST: broker
OASIS_RABBIT_PORT: 5672
OASIS_RABBIT_USER: rabbit
OASIS_RABBIT_PASS: rabbit
OASIS_CELERY_DB_ENGINE: db+postgresql+psycopg2
OASIS_CELERY_DB_HOST: celery-db
OASIS_CELERY_DB_PASS: password
OASIS_CELERY_DB_USER: celery
OASIS_CELERY_DB_NAME: celery
OASIS_CELERY_DB_PORT: 5432

x-volumes: &shared-volumes
- ${OASIS_MEDIA_ROOT:-./docker-shared-fs}:/shared-fs:rw
# DEBUG Mounts
- ./src/startup_server.sh:/usr/local/bin/startup
- ./src/server/oasisapi/analyses:/var/www/oasis/src/server/oasisapi/analyses
- filestore-OasisData:/shared-fs:rw
services:
server:
restart: always
image: coreoasis/api_server:latest
image: coreoasis/api_server:dev
command: ["./wsgi/run-wsgi.sh"]
build:
context: .
dockerfile: Dockerfile.api_server
Expand All @@ -40,43 +61,92 @@ services:
STARTUP_RUN_MIGRATIONS: "true"
OASIS_ADMIN_USER: admin
OASIS_ADMIN_PASS: password
volumes: *shared-volumes
worker-monitor:
volumes:
- filestore-OasisData:/shared-fs:rw
- ../src/server/oasisapi:/var/www/oasis/src/server/oasisapi
server_websocket:
restart: always
image: coreoasis/api_server:latest
command: [celery, -A, src.server.oasisapi, worker, --loglevel=INFO]
image: coreoasis/api_server:dev
command: ["./asgi/run-asgi.sh"]
links:
- server-db
- celery-db
- broker
ports:
- 8001:8001
environment:
<<: *shared-env
volumes: *shared-volumes
task-controller:
volumes:
- filestore-OasisData:/shared-fs:rw
- ../src/server/oasisapi:/var/www/oasis/src/server/oasisapi
v1-worker-monitor:
restart: always
image: coreoasis/api_server:latest
command: [celery, -A, src.server.oasisapi, worker, --loglevel=INFO, -Q, task-controller]
image: coreoasis/api_server:dev
command: [celery, -A, 'src.server.oasisapi.celery_app_v1', worker, --loglevel=DEBUG,]
links:
- server-db
- celery-db
- broker
environment:
<<: *shared-env
volumes: *shared-volumes
celery-beat:
volumes:
- filestore-OasisData:/shared-fs:rw
- ../src/server/oasisapi:/var/www/oasis/src/server/oasisapi
v2-worker-monitor:
restart: always
image: coreoasis/api_server:dev
command: [celery, -A, 'src.server.oasisapi.celery_app_v2', worker, --loglevel=DEBUG, -Q, celery-v2]
links:
- server-db
- celery-db
- broker
environment:
<<: *shared-env
volumes:
- filestore-OasisData:/shared-fs:rw
- ../src/server/oasisapi:/var/www/oasis/src/server/oasisapi
v2-task-controller:
restart: always
image: coreoasis/api_server:latest
command: [celery, -A, src.server.oasisapi, beat, --loglevel=INFO]
image: coreoasis/api_server:dev
command: [celery, -A, 'src.server.oasisapi.celery_app_v2', worker, --loglevel=DEBUG, -Q, task-controller]
links:
- server-db
- celery-db
- broker
environment:
<<: *shared-env
volumes:
- filestore-OasisData:/shared-fs:rw
- ../src/server/oasisapi/analyses:/var/www/oasis/src/server/oasisapi/analyses
celery-beat_v2:
restart: always
image: coreoasis/api_server:dev
command: [celery, -A, src.server.oasisapi.celery_app_v2, beat, --loglevel=DEBUG]
links:
- server-db
- celery-db
- broker
environment:
<<: *shared-env
volumes: *shared-volumes
worker:
stable-worker:
restart: always
image: coreoasis/model_worker:1.28.4
links:
- celery-db
- broker:mybroker
environment:
<<: *oasis-env-v1
OASIS_MODEL_DATA_DIRECTORY: /home/worker/model
OASIS_MODEL_SUPPLIER_ID: OasisLMF
OASIS_MODEL_ID: PiWind
OASIS_MODEL_VERSION_ID: '1.28.4'
volumes:
- ${OASIS_MODEL_DATA_DIR:-./data/static}:/home/worker/model:rw
- filestore-OasisData:/shared-fs:rw
v1-worker:
restart: always
image: coreoasis/model_worker:latest
image: coreoasis/model_worker:dev
build:
context: .
dockerfile: Dockerfile.model_worker
Expand All @@ -87,73 +157,89 @@ services:
<<: *shared-env
OASIS_MODEL_SUPPLIER_ID: OasisLMF
OASIS_MODEL_ID: PiWind
OASIS_MODEL_VERSION_ID: 1
OASIS_MODEL_NUM_ANALYSIS_CHUNKS: 8
OASIS_MODEL_VERSION_ID: 'v1'
OASIS_RUN_MODE: v1
volumes:
- /home/sam/repos/models/piwind:/home/worker/model:rw
- ${OASIS_MEDIA_ROOT:-./docker-shared-fs}:/shared-fs:rw
# DEBUG Mounts
- ./src/model_execution_worker:/home/worker/src/model_execution_worker
- ./src/startup_worker.sh:/home/worker/startup.sh

- ${OASIS_MODEL_DATA_DIR:-./data/static}:/home/worker/model:rw
- ../src/model_execution_worker:/home/worker/src/model_execution_worker
- filestore-OasisData:/shared-fs:rw
v2-worker:
restart: always
image: coreoasis/model_worker:dev
build:
context: .
dockerfile: Dockerfile.model_worker
links:
- celery-db
- broker:mybroker
environment:
<<: *shared-env
OASIS_MODEL_SUPPLIER_ID: OasisLMF
OASIS_MODEL_ID: PiWind
OASIS_MODEL_VERSION_ID: 'v2'
OASIS_RUN_MODE: v2
volumes:
- ${OASIS_MODEL_DATA_DIR:-./data/static}:/home/worker/model:rw
- ../src/model_execution_worker:/home/worker/src/model_execution_worker
- filestore-OasisData:/shared-fs:rw
server-db:
restart: always
image: mysql:8.0
command:
- --default-authentication-plugin=mysql_native_password
image: postgres
environment:
- MYSQL_ROOT_PASSWORD=password
- MYSQL_USER=oasis
- MYSQL_PASSWORD=oasis
- MYSQL_DATABASE=oasis
- POSTGRES_DB=oasis
- POSTGRES_USER=oasis
- POSTGRES_PASSWORD=oasis
volumes:
- ${OASIS_DOCKER_DB_DATA_DIR:-./db-data}/server:/var/lib/mysql/:rw
- server-db-OasisData:/var/lib/postgresql/data:rw
ports:
- 33307:3306
- 33307:5432
celery-db:
restart: always
image: mysql
command:
- --default-authentication-plugin=mysql_native_password
image: postgres
environment:
- MYSQL_ROOT_PASSWORD=password
- MYSQL_USER=celery
- MYSQL_PASSWORD=password
- MYSQL_DATABASE=celery
ports:
- 33306:3306
- POSTGRES_DB=celery
- POSTGRES_USER=celery
- POSTGRES_PASSWORD=password
volumes:
- ${OASIS_DOCKER_DB_DATA_DIR:-./db-data}/celery:/var/lib/mysql/:rw
broker:
restart: always
image: redis:5.0.7
- celery-db-OasisData:/var/lib/postgresql/data:rw
ports:
- 6379
- 33306:5432
# broker:
# restart: always
# image: rabbitmq:3.8.14-management
# environment:
# - RABBITMQ_DEFAULT_USER=rabbit
# - RABBITMQ_DEFAULT_PASS=rabbit
# image: redis:5.0.7
# ports:
# - 5672:5672
# - 15672:15672
flower:
# - 5672:6379
broker:
restart: always
image: iserko/docker-celery-flower
ports:
- 5555:5555
image: rabbitmq:3.8.14-management
environment:
- CELERY_BROKER_URL=amqp://rabbit:rabbit@broker:5672
entrypoint:
- flower
- --port=5555
- --broker_api=http://rabbit:rabbit@broker:15672/api/
links:
- celery-db
- broker
- RABBITMQ_DEFAULT_USER=rabbit
- RABBITMQ_DEFAULT_PASS=rabbit
ports:
- 5672:5672
- 15672:15672
channel-layer:
restart: always
image: redis:5.0.7
ports:
- 6379:6379
user-interface:
restart: always
image: coreoasis/oasisui_app:${VERS_UI:-latest}
container_name: oasisui_app
environment:
- API_IP=server:8000/api/
- API_VERSION=v2
- API_SHARE_FILEPATH=./downloads
- OASIS_ENVIRONMENT=oasis_localhost
ports:
- 8080:3838
portainer:
restart: always
image: portainer/portainer:latest
ports:
- 8002:8002
- 9000:9000
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./portainer_data:/data portainer/portainer
6 changes: 3 additions & 3 deletions src/model_execution_worker/distributed_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ def check_task_redelivered(task, analysis_id, initiator_id, task_slug, error_sta
logger.debug(f"redelivered: {redelivered}")
logger.debug(f"state: {state}")

if redelivered and state == 'REVOKED':
logger.error('ERROR: task requeued three times - aborting task')
if state == 'REVOKED':
logger.error('ERROR: task requeued three times or cancelled - aborting task')
notify_subtask_status(
analysis_id=analysis_id,
initiator_id=initiator_id,
task_slug=task_slug,
subtask_status='ERROR',
error_msg='Task failed on third redelivery, possible out of memory error'
error_msg='Task revoked, possible out of memory error or cancellation'
)
notify_api_status(analysis_id, error_state)
task.app.control.revoke(task.request.id, terminate=True)
Expand Down
3 changes: 2 additions & 1 deletion src/server/oasisapi/analyses/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,8 @@ def run(self, initiator, run_mode_override=None):
if errors:
raise ValidationError(detail=errors)

self.status = self.status_choices.RUN_QUEUED
self.save()
# Start V1 run
if run_mode == self.run_mode_choices.V1:
task = self.v1_run_analysis_signature
Expand All @@ -500,7 +502,6 @@ def run(self, initiator, run_mode_override=None):
task_id = task.apply_async(args=[self.pk, initiator.pk, events_total], priority=self.priority).id

self.run_task_id = task_id
self.status = self.status_choices.RUN_QUEUED
self.task_started = timezone.now()
self.task_finished = None
self.save()
Expand Down
Loading

0 comments on commit 0eff028

Please sign in to comment.