Skip to content

Commit

Permalink
feat(sequences): create a table for easy detection aggregation (#405)
Browse files Browse the repository at this point in the history
* feat(models): add a new table called streams

* feat(config): add stream params to dynamic creation

* feat(streams): implement creation and update when detection are created

* fix(detections): update the query to fetch detections

* refactor(sequence): rename stream into sequence

* feat(crud): improve filters

* feat(detections): sequence are now azimuth specific

* fix(api): update crud getter

* ci(labeler): add sequences

* feat(sequences): add fetch of sequences

* test(e2e): ensured sequence was created

* test(sequences): add test suite

* test(e2e): fix typo

* test(confteat): fix mock data

* test(e2e): fix typo

* test(detections): fix unit tests

* feat(sequences): add DELETE route

* test(sequences): update table name
  • Loading branch information
frgfm authored Jan 15, 2025
1 parent ca49671 commit 76306aa
Show file tree
Hide file tree
Showing 16 changed files with 302 additions and 19 deletions.
6 changes: 6 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
- src/app/api/*/endpoints/detections.py
- src/app/crud/detections.py

'endpoint: sequences':
- changed-files:
- any-glob-to-any-file:
- src/app/api/*/endpoints/sequences.py
- src/app/crud/sequences.py

'endpoint: login':
- changed-files:
- any-glob-to-any-file: src/app/api/*/endpoints/login.py
Expand Down
26 changes: 26 additions & 0 deletions scripts/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,34 @@ def main(args):
api_request("get", f"{args.endpoint}/detections", agent_auth)
api_request("get", f"{args.endpoint}/detections/{detection_id}/url", agent_auth)

# Create a sequence by adding two additional detections
det_id_2 = requests.post(
f"{args.endpoint}/detections",
headers=cam_auth,
data={"azimuth": 45.6, "bboxes": "[(0.1,0.1,0.8,0.8,0.5)]"},
files={"file": ("logo.png", file_bytes, "image/png")},
timeout=5,
).json()["id"]
det_id_3 = requests.post(
f"{args.endpoint}/detections",
headers=cam_auth,
data={"azimuth": 45.6, "bboxes": "[(0.1,0.1,0.8,0.8,0.5)]"},
files={"file": ("logo.png", file_bytes, "image/png")},
timeout=5,
).json()["id"]
# Check that a sequence has been created
sequences = api_request("get", f"{args.endpoint}/sequences", superuser_auth)
assert len(sequences) == 1
assert sequences[0]["camera_id"] == cam_id
assert sequences[0]["started_at"] == response.json()["created_at"]
assert sequences[0]["last_seen_at"] > sequences[0]["started_at"]
assert sequences[0]["azimuth"] == response.json()["azimuth"]

# Cleaning (order is important because of foreign key protection in existing tables)
api_request("delete", f"{args.endpoint}/sequences/{sequences[0]['id']}/", superuser_auth)
api_request("delete", f"{args.endpoint}/detections/{detection_id}/", superuser_auth)
api_request("delete", f"{args.endpoint}/detections/{det_id_2}/", superuser_auth)
api_request("delete", f"{args.endpoint}/detections/{det_id_3}/", superuser_auth)
api_request("delete", f"{args.endpoint}/cameras/{cam_id}/", superuser_auth)
api_request("delete", f"{args.endpoint}/users/{user_id}/", superuser_auth)
api_request("delete", f"{args.endpoint}/organizations/{org_id}/", superuser_auth)
Expand Down
51 changes: 47 additions & 4 deletions src/app/api/api_v1/endpoints/detections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://www.apache.org/licenses/LICENSE-2.0> for full license details.

from datetime import datetime
from datetime import datetime, timedelta
from typing import List, Optional, cast

from fastapi import (
Expand All @@ -28,12 +28,13 @@
get_detection_crud,
get_jwt,
get_organization_crud,
get_sequence_crud,
get_webhook_crud,
)
from app.core.config import settings
from app.crud import CameraCRUD, DetectionCRUD, OrganizationCRUD, WebhookCRUD
from app.crud import CameraCRUD, DetectionCRUD, OrganizationCRUD, SequenceCRUD, WebhookCRUD
from app.db import get_session
from app.models import Camera, Detection, Organization, Role, UserRole
from app.models import Camera, Detection, Organization, Role, Sequence, UserRole
from app.schemas.detections import (
BOXES_PATTERN,
COMPILED_BOXES_PATTERN,
Expand All @@ -43,6 +44,7 @@
DetectionWithUrl,
)
from app.schemas.login import TokenPayload
from app.schemas.sequences import SequenceUpdate
from app.services.storage import s3_service, upload_file
from app.services.telegram import telegram_client
from app.services.telemetry import telemetry_client
Expand All @@ -65,6 +67,7 @@ async def create_detection(
detections: DetectionCRUD = Depends(get_detection_crud),
webhooks: WebhookCRUD = Depends(get_webhook_crud),
organizations: OrganizationCRUD = Depends(get_organization_crud),
sequences: SequenceCRUD = Depends(get_sequence_crud),
token_payload: TokenPayload = Security(get_jwt, scopes=[Role.CAMERA]),
) -> Detection:
telemetry_client.capture(f"camera|{token_payload.sub}", event="detections-create")
Expand All @@ -81,6 +84,46 @@ async def create_detection(
det = await detections.create(
DetectionCreate(camera_id=token_payload.sub, bucket_key=bucket_key, azimuth=azimuth, bboxes=bboxes)
)
# Sequence handling
# Check if there is a sequence that was seen recently
sequence = await sequences.fetch_all(
filters=[("camera_id", token_payload.sub), ("azimuth", det.azimuth)],
inequality_pair=(
"last_seen_at",
">",
datetime.utcnow() - timedelta(seconds=settings.SEQUENCE_RELAXATION_SECONDS),
),
order_by="last_seen_at",
order_desc=True,
limit=1,
)
if len(sequence) == 1:
# Add detection to existing sequence
await sequences.update(sequence[0].id, SequenceUpdate(last_seen_at=det.created_at))
else:
# Check if we've reached the threshold of detections per interval
dets_ = await detections.fetch_all(
filters=[("camera_id", token_payload.sub), ("azimuth", det.azimuth)],
inequality_pair=(
"created_at",
">",
datetime.utcnow() - timedelta(seconds=settings.SEQUENCE_MIN_INTERVAL_SECONDS),
),
order_by="created_at",
order_desc=False,
limit=settings.SEQUENCE_MIN_INTERVAL_DETS,
)
if len(dets_) >= settings.SEQUENCE_MIN_INTERVAL_DETS:
# Create new sequence
await sequences.create(
Sequence(
camera_id=token_payload.sub,
azimuth=det.azimuth,
started_at=dets_[0].created_at,
last_seen_at=det.created_at,
)
)

# Webhooks
whs = await webhooks.fetch_all()
if any(whs):
Expand Down Expand Up @@ -148,7 +191,7 @@ async def fetch_detections(
if UserRole.ADMIN in token_payload.scopes:
return [elt for elt in await detections.fetch_all()]

cameras_list = await cameras.fetch_all(filter_pair=("organization_id", token_payload.organization_id))
cameras_list = await cameras.fetch_all(filters=("organization_id", token_payload.organization_id))
camera_ids = [camera.id for camera in cameras_list]

return await detections.fetch_all(in_pair=("camera_id", camera_ids))
Expand Down
35 changes: 35 additions & 0 deletions src/app/api/api_v1/endpoints/sequences.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (C) 2025, Pyronear.

# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://www.apache.org/licenses/LICENSE-2.0> for full license details.

from typing import List

from fastapi import APIRouter, Depends, Path, Security, status

from app.api.dependencies import get_jwt, get_sequence_crud
from app.crud import SequenceCRUD
from app.models import Sequence, UserRole
from app.schemas.login import TokenPayload
from app.services.telemetry import telemetry_client

router = APIRouter()


@router.get("/", status_code=status.HTTP_200_OK, summary="Fetch all the sequences")
async def fetch_sequences(
sequences: SequenceCRUD = Depends(get_sequence_crud),
token_payload: TokenPayload = Security(get_jwt, scopes=[UserRole.ADMIN]),
) -> List[Sequence]:
telemetry_client.capture(token_payload.sub, event="sequence-fetch")
return [elt for elt in await sequences.fetch_all()]


@router.delete("/{sequence_id}", status_code=status.HTTP_200_OK, summary="Delete a sequence")
async def delete_sequence(
sequence_id: int = Path(..., gt=0),
sequences: SequenceCRUD = Depends(get_sequence_crud),
token_payload: TokenPayload = Security(get_jwt, scopes=[UserRole.ADMIN]),
) -> None:
telemetry_client.capture(token_payload.sub, event="sequence-deletion", properties={"sequence_id": sequence_id})
await sequences.delete(sequence_id)
3 changes: 2 additions & 1 deletion src/app/api/api_v1/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@

from fastapi import APIRouter

from app.api.api_v1.endpoints import cameras, detections, login, organizations, users, webhooks
from app.api.api_v1.endpoints import cameras, detections, login, organizations, sequences, users, webhooks

api_router = APIRouter(redirect_slashes=True)
api_router.include_router(login.router, prefix="/login", tags=["login"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(cameras.router, prefix="/cameras", tags=["cameras"])
api_router.include_router(detections.router, prefix="/detections", tags=["detections"])
api_router.include_router(sequences.router, prefix="/sequences", tags=["sequences"])
api_router.include_router(organizations.router, prefix="/organizations", tags=["organizations"])
api_router.include_router(webhooks.router, prefix="/webhooks", tags=["webhooks"])
6 changes: 5 additions & 1 deletion src/app/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from sqlmodel.ext.asyncio.session import AsyncSession

from app.core.config import settings
from app.crud import CameraCRUD, DetectionCRUD, OrganizationCRUD, UserCRUD, WebhookCRUD
from app.crud import CameraCRUD, DetectionCRUD, OrganizationCRUD, SequenceCRUD, UserCRUD, WebhookCRUD
from app.db import get_session
from app.models import User, UserRole
from app.schemas.login import TokenPayload
Expand Down Expand Up @@ -56,6 +56,10 @@ def get_webhook_crud(session: AsyncSession = Depends(get_session)) -> WebhookCRU
return WebhookCRUD(session=session)


def get_sequence_crud(session: AsyncSession = Depends(get_session)) -> SequenceCRUD:
return SequenceCRUD(session=session)


def decode_token(token: str, authenticate_value: Union[str, None] = None) -> Dict[str, str]:
try:
payload = jwt_decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
Expand Down
5 changes: 5 additions & 0 deletions src/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ def sqlachmey_uri(cls, v: str) -> str:
S3_PROXY_URL: str = os.environ.get("S3_PROXY_URL", "")
S3_URL_EXPIRATION: int = int(os.environ.get("S3_URL_EXPIRATION") or 24 * 3600)

# Sequence handling
SEQUENCE_RELAXATION_SECONDS: int = int(os.environ.get("SEQUENCE_RELAXATION_SECONDS") or 30 * 60)
SEQUENCE_MIN_INTERVAL_DETS: int = int(os.environ.get("SEQUENCE_MIN_INTERVAL_DETS") or 3)
SEQUENCE_MIN_INTERVAL_SECONDS: int = int(os.environ.get("SEQUENCE_MIN_INTERVAL_SECONDS") or 5 * 60)

# Notifications
TELEGRAM_TOKEN: Union[str, None] = os.environ.get("TELEGRAM_TOKEN")

Expand Down
1 change: 1 addition & 0 deletions src/app/crud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
from .crud_camera import *
from .crud_detection import *
from .crud_organization import *
from .crud_sequence import *
from .crud_webhook import *
18 changes: 14 additions & 4 deletions src/app/crud/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from fastapi import HTTPException, status
from pydantic import BaseModel
from sqlalchemy import exc
from sqlalchemy import desc, exc
from sqlmodel import SQLModel, delete, select
from sqlmodel.ext.asyncio.session import AsyncSession

Expand Down Expand Up @@ -60,15 +60,20 @@ async def get_by(self, field_name: str, val: Union[str, int], strict: bool = Fal

async def fetch_all(
self,
filter_pair: Union[Tuple[str, Any], None] = None,
filters: Union[Tuple[str, Any], List[Tuple[str, Any]], None] = None,
in_pair: Union[Tuple[str, List], None] = None,
inequality_pair: Optional[Tuple[str, str, Any]] = None,
order_by: Optional[str] = None,
order_desc: bool = False,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> List[ModelType]:
statement = select(self.model) # type: ignore[var-annotated]
if isinstance(filter_pair, tuple):
statement = statement.where(getattr(self.model, filter_pair[0]) == filter_pair[1])
if isinstance(filters, tuple):
statement = statement.where(getattr(self.model, filters[0]) == filters[1])
elif isinstance(filters, list):
for filter_ in filters:
statement = statement.where(getattr(self.model, filter_[0]) == filter_[1])

if isinstance(in_pair, tuple):
statement = statement.where(getattr(self.model, in_pair[0]).in_(in_pair[1]))
Expand All @@ -86,6 +91,11 @@ async def fetch_all(
else:
raise ValueError(f"Unsupported inequality operator: {op}")

if order_by is not None:
statement = statement.order_by(
desc(getattr(self.model, order_by)) if order_desc else getattr(self.model, order_by)
)

if offset is not None:
statement = statement.offset(offset)

Expand Down
17 changes: 17 additions & 0 deletions src/app/crud/crud_sequence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright (C) 2025, Pyronear.

# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://www.apache.org/licenses/LICENSE-2.0> for full license details.

from sqlmodel.ext.asyncio.session import AsyncSession

from app.crud.base import BaseCRUD
from app.models import Sequence
from app.schemas.sequences import SequenceUpdate

__all__ = ["SequenceCRUD"]


class SequenceCRUD(BaseCRUD[Sequence, Sequence, SequenceUpdate]):
def __init__(self, session: AsyncSession) -> None:
super().__init__(session, Sequence)
11 changes: 10 additions & 1 deletion src/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,23 @@ class Detection(SQLModel, table=True):
__tablename__ = "detections"
id: int = Field(None, primary_key=True)
camera_id: int = Field(..., foreign_key="cameras.id", nullable=False)
azimuth: float = Field(..., gt=0, lt=360)
azimuth: float = Field(..., ge=0, lt=360)
bucket_key: str
bboxes: str = Field(..., min_length=2, max_length=settings.MAX_BBOX_STR_LENGTH, nullable=False)
is_wildfire: Union[bool, None] = None
created_at: datetime = Field(default_factory=datetime.utcnow, nullable=False)
updated_at: datetime = Field(default_factory=datetime.utcnow, nullable=False)


class Sequence(SQLModel, table=True):
__tablename__ = "sequences"
id: int = Field(None, primary_key=True)
camera_id: int = Field(..., foreign_key="cameras.id", nullable=False)
azimuth: float = Field(..., ge=0, lt=360)
started_at: datetime = Field(..., nullable=False)
last_seen_at: datetime = Field(..., nullable=False)


class Organization(SQLModel, table=True):
__tablename__ = "organizations"
id: int = Field(None, primary_key=True)
Expand Down
1 change: 1 addition & 0 deletions src/app/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
from .login import *
from .users import *
from .organizations import *
from .sequences import *
from .webhooks import *
15 changes: 15 additions & 0 deletions src/app/schemas/sequences.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright (C) 2020-2025, Pyronear.

# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.

from datetime import datetime

from pydantic import BaseModel

__all__ = ["SequenceUpdate"]


# Accesses
class SequenceUpdate(BaseModel):
last_seen_at: datetime
Loading

0 comments on commit 76306aa

Please sign in to comment.