Skip to content

Commit

Permalink
feat(tasks): add task to unlock tasks locked for over 3 days and rese…
Browse files Browse the repository at this point in the history
…t entities after an hour (#1984)
  • Loading branch information
Anuj-Gupta4 authored Feb 5, 2025
1 parent f734fb9 commit 4eb34e0
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 3 deletions.
13 changes: 12 additions & 1 deletion src/backend/app/central/central_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import csv
import json
from asyncio import gather
from datetime import datetime
from io import BytesIO, StringIO
from typing import Optional, Union
from uuid import uuid4
Expand Down Expand Up @@ -722,6 +723,7 @@ async def get_entities_data(
odk_id: int,
dataset_name: str = "features",
fields: str = "__system/updatedAt, osm_id, status, task_id, submission_ids",
filter_date: Optional[datetime] = None,
) -> list:
"""Get all the entity mapping statuses.
Expand All @@ -733,17 +735,26 @@ async def get_entities_data(
dataset_name (str): The dataset / Entity list name in ODK Central.
fields (str): Extra fields to include in $select filter.
__id is included by default.
filter_date (datetime): Filter entities last updated after this date.
Returns:
list: JSON list containing Entity info. If updated_at is included,
the format is string 2022-01-31T23:59:59.999Z.
"""
try:
url_params = f"$select=__id{',' if fields else ''} {fields}"

filters = []
if filter_date:
filters.append(f"__system/updatedAt gt {filter_date}")
if filters:
url_params += f"&$filter={' and '.join(filters)}"

async with central_deps.get_odk_dataset(odk_creds) as odk_central:
entities = await odk_central.getEntityData(
odk_id,
dataset_name,
url_params=f"$select=__id{',' if fields else ''} {fields}",
url_params=url_params,
)
except Exception as e:
log.exception(f"Error: {e}", stack_info=True)
Expand Down
1 change: 1 addition & 0 deletions src/backend/app/db/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class TaskEvent(StrEnum, Enum):
MERGE = "MERGE"
ASSIGN = "ASSIGN"
COMMENT = "COMMENT"
RESET = "RESET"


class MappingState(StrEnum, Enum):
Expand Down
117 changes: 116 additions & 1 deletion src/backend/app/tasks/task_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
#
"""Logic for FMTM tasks."""

from datetime import timedelta
from datetime import datetime, timedelta, timezone

from psycopg import Connection
from psycopg.rows import class_row

from app.central.central_crud import get_entities_data, update_entity_mapping_status
from app.db.enums import EntityState
from app.db.models import DbOdkEntities, DbProject
from app.db.postgis_utils import timestamp
from app.tasks import task_schemas

Expand Down Expand Up @@ -63,3 +66,115 @@ async def get_project_task_activity(
async with db.cursor(row_factory=class_row(task_schemas.TaskEventCount)) as cur:
await cur.execute(sql, {"project_id": project_id, "end_date": end_date})
return await cur.fetchall()


async def trigger_unlock_tasks(db: Connection):
"""Function to unlock_old_locked_tasks manually."""
active_projects_query = """
SELECT DISTINCT project_id
FROM task_events
WHERE created_at >= NOW() - INTERVAL '7 days'
"""
async with db.cursor() as cur:
await cur.execute(active_projects_query)
active_projects = await cur.fetchall()

time_now = datetime.now(timezone.utc)
threedaysago = (time_now - timedelta(days=3)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
onehourago = (time_now - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")

for (project_id,) in active_projects:
project = await DbProject.one(db, project_id, True)

recent_project_entities = await get_entities_data(
project.odk_credentials, project.odkid, filter_date=threedaysago
)
# If there are recent entity updates, skip this project
if recent_project_entities:
continue

await reset_entities_status(db, project.id, onehourago)

# Only unlock tasks if there are no recent entity updates
await unlock_old_locked_tasks(db, project.id)


async def reset_entities_status(db, project_id, filter_date):
"""Reset status for entities that have been 'open in ODK' for more than 1hr."""
project = await DbProject.one(db, project_id, True)
recent_opened_entities = await get_entities_data(
project.odk_credentials,
project.odkid,
filter_date=filter_date,
)
for entity in recent_opened_entities:
if entity["status"] != str(EntityState.OPENED_IN_ODK):
continue
await update_entity_mapping_status(
project.odk_credentials,
project.odkid,
entity["id"],
f"Task {entity['task_id']} Feature {entity['osm_id']}",
str(EntityState.READY),
)

# Sync ODK entities in our database
project_entities = await get_entities_data(project.odk_credentials, project.odkid)
await DbOdkEntities.upsert(db, project.id, project_entities)


async def unlock_old_locked_tasks(db, project_id):
"""Unlock tasks locked for more than 3 days."""
unlock_query = """
WITH svc_user AS (
SELECT id AS svc_user_id, username AS svc_username
FROM users
WHERE username = 'svcfmtm'
),
recent_events AS (
SELECT DISTINCT ON (t.id, t.project_id)
t.id AS task_id,
t.project_id,
the.created_at AS last_event_time,
the.event AS last_event
FROM tasks t
JOIN task_events the
ON t.id = the.task_id
AND t.project_id = the.project_id
AND the.comment IS NULL
WHERE t.project_id = %(project_id)s
ORDER BY t.id, t.project_id, the.created_at DESC
),
filtered_events AS (
SELECT *
FROM recent_events
WHERE (
(last_event = 'ASSIGN' AND last_event_time < NOW() - INTERVAL '3 days')
OR
(last_event = 'MAP' AND last_event_time < NOW() - INTERVAL '3 hours')
)
)
INSERT INTO task_events (
event_id,
task_id,
project_id,
event,
user_id,
state,
created_at,
username
)
SELECT
gen_random_uuid(),
fe.task_id,
fe.project_id,
'RESET'::taskevent,
svc.svc_user_id,
'UNLOCKED_TO_MAP'::mappingstate,
NOW(),
svc.svc_username
FROM filtered_events fe
CROSS JOIN svc_user svc;
"""
async with db.cursor() as cur:
await cur.execute(unlock_query, {"project_id": project_id})
9 changes: 9 additions & 0 deletions src/backend/app/tasks/task_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,12 @@ async def get_task_event_history(
):
"""Get the detailed history for a task."""
return await DbTaskEvent.all(db, task_id=task_id, days=days, comments=comments)


@router.post("/unlock-tasks")
async def unlock_tasks(db: Annotated[Connection, Depends(db_conn)]):
"""Endpoint to trigger unlock_old_locked_tasks manually."""
log.info("Start processing inactive tasks")
await task_crud.trigger_unlock_tasks(db)
log.info("Finished processing inactive tasks")
return {"message": "Old locked tasks unlocked successfully."}
61 changes: 61 additions & 0 deletions src/backend/migrations/008-add-taskevent-enumtype.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
-- ## Migration to enable SFCGAL for usage of PostGIS StraightSkeleton function.

-- Start a transaction
BEGIN;

DO $$
BEGIN
-- Check if 'RESET' already exists in the taskevent enum
IF NOT EXISTS (
SELECT 1
FROM pg_enum
WHERE enumlabel = 'RESET'
AND enumtypid = 'taskevent'::regtype
) THEN
-- Add 'RESET' to the taskevent enum
ALTER TYPE public.taskevent ADD VALUE 'RESET';
END IF;
END $$;


-- Recreate function to unlock task on reset
CREATE OR REPLACE FUNCTION public.set_task_state()
RETURNS TRIGGER AS $$
BEGIN
CASE NEW.event
WHEN 'MAP' THEN
NEW.state := 'LOCKED_FOR_MAPPING';
WHEN 'FINISH' THEN
NEW.state := 'UNLOCKED_TO_VALIDATE';
WHEN 'VALIDATE' THEN
NEW.state := 'LOCKED_FOR_VALIDATION';
WHEN 'GOOD' THEN
NEW.state := 'UNLOCKED_DONE';
WHEN 'BAD' THEN
NEW.state := 'UNLOCKED_TO_MAP';
WHEN 'SPLIT' THEN
NEW.state := 'UNLOCKED_DONE';
WHEN 'MERGE' THEN
NEW.state := 'UNLOCKED_DONE';
WHEN 'ASSIGN' THEN
NEW.state := 'LOCKED_FOR_MAPPING';
WHEN 'COMMENT' THEN
NEW.state := OLD.state;
WHEN 'RESET' THEN
NEW.state := 'UNLOCKED_TO_MAP';
ELSE
RAISE EXCEPTION 'Unknown task event type: %', NEW.event;
END CASE;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Recreate trigger to unlock task on reset
CREATE OR REPLACE TRIGGER task_event_state_trigger
BEFORE INSERT ON public.task_events
FOR EACH ROW
EXECUTE FUNCTION public.set_task_state();


-- Commit the transaction
COMMIT;
5 changes: 4 additions & 1 deletion src/backend/migrations/init/fmtm_base_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ CREATE TYPE public.taskevent AS ENUM (
'SPLIT',
'MERGE',
'ASSIGN',
'COMMENT'
'COMMENT',
'RESET'
);
ALTER TYPE public.taskevent OWNER TO fmtm;

Expand Down Expand Up @@ -621,6 +622,8 @@ BEGIN
NEW.state := 'LOCKED_FOR_MAPPING';
WHEN 'COMMENT' THEN
NEW.state := OLD.state;
WHEN 'RESET' THEN
NEW.state := 'UNLOCKED_TO_MAP';
ELSE
RAISE EXCEPTION 'Unknown task event type: %', NEW.event;
END CASE;
Expand Down

0 comments on commit 4eb34e0

Please sign in to comment.