Skip to content

Commit

Permalink
Merge pull request #33 from CrimeIsDown/partitioning-setup
Browse files Browse the repository at this point in the history
Adding new columns and indexes to the database
  • Loading branch information
EricTendian authored Jan 6, 2025
2 parents f9d551b + 9775b6f commit a909aea
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 26 deletions.
1 change: 0 additions & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,4 @@ jobs:
run: |
docker compose --ansi never logs -f &
sleep 10
uv run alembic upgrade head
./make.sh test -s
4 changes: 1 addition & 3 deletions app/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ def get_url():
scheme="postgresql+psycopg",
username=os.getenv("POSTGRES_USER"),
password=os.getenv("POSTGRES_PASSWORD"),
host="127.0.0.1"
if os.getenv("POSTGRES_HOST") == "postgres"
else os.getenv("POSTGRES_HOST"),
host=os.getenv("POSTGRES_HOST"),
port=int(os.getenv("POSTGRES_PORT", "5432")),
path=os.getenv("POSTGRES_DB"),
).unicode_string()
Expand Down
59 changes: 59 additions & 0 deletions app/alembic/versions/09874b204f9e_add_columns_to_calls_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""add columns to calls table
Revision ID: 09874b204f9e
Revises: 71fea3333d68
Create Date: 2025-01-04 19:20:20.623118
"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa

from app.models.models import CALLS_TABLE_NAME


# revision identifiers, used by Alembic.
revision: str = "09874b204f9e"
down_revision: Union[str, None] = "71fea3333d68"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.alter_column(CALLS_TABLE_NAME, "raw_metadata", nullable=False)
op.alter_column(CALLS_TABLE_NAME, "raw_audio_url", nullable=False)
op.add_column(
CALLS_TABLE_NAME,
sa.Column("start_time", sa.TIMESTAMP(timezone=True), nullable=True),
)
op.add_column(
CALLS_TABLE_NAME, sa.Column("transcript_plaintext", sa.Text(), nullable=True)
)
op.execute(
f"""
UPDATE {CALLS_TABLE_NAME}
SET transcript_plaintext = (
SELECT string_agg(element->>1, E'\\n')
FROM json_array_elements(raw_transcript::json) AS element
)
WHERE raw_transcript IS NOT NULL
"""
)
op.execute(
f"""
UPDATE {CALLS_TABLE_NAME}
SET start_time = to_timestamp((raw_metadata->>'start_time')::bigint)
"""
)
op.alter_column(
CALLS_TABLE_NAME, "start_time", server_default=sa.text("now()"), nullable=False
)


def downgrade() -> None:
op.drop_column(CALLS_TABLE_NAME, "transcript_plaintext")
op.drop_column(CALLS_TABLE_NAME, "start_time")
op.alter_column(CALLS_TABLE_NAME, "raw_audio_url", nullable=True)
op.alter_column(CALLS_TABLE_NAME, "raw_metadata", nullable=True)
92 changes: 92 additions & 0 deletions app/alembic/versions/17dc425f0a6a_add_indexes_to_calls_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""add indexes to calls table
Revision ID: 17dc425f0a6a
Revises: 09874b204f9e
Create Date: 2025-01-04 21:49:40.293750
"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa

from app.models.models import CALLS_TABLE_NAME


# revision identifiers, used by Alembic.
revision: str = "17dc425f0a6a"
down_revision: Union[str, None] = "09874b204f9e"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# Create indexes concurrently to avoid locking the table; this requires us to do this outside a transaction block
with op.get_context().autocommit_block():
op.create_index(
"idx_short_name",
CALLS_TABLE_NAME,
[sa.text("(raw_metadata->>'short_name')")],
postgresql_concurrently=True,
)
op.create_index(
"idx_talkgroup_group",
CALLS_TABLE_NAME,
[sa.text("(raw_metadata->>'talkgroup_group')")],
postgresql_concurrently=True,
)
op.create_index(
"idx_talkgroup_group_tag",
CALLS_TABLE_NAME,
[sa.text("(raw_metadata->>'talkgroup_group_tag')")],
postgresql_concurrently=True,
)
op.create_index(
"idx_talkgroup_id",
CALLS_TABLE_NAME,
[sa.text("(raw_metadata->>'talkgroup')")],
postgresql_concurrently=True,
)
op.create_index(
"idx_talkgroup_tag",
CALLS_TABLE_NAME,
[sa.text("(raw_metadata->>'talkgroup_tag')")],
postgresql_concurrently=True,
)
op.create_index(
"idx_start_time",
CALLS_TABLE_NAME,
["start_time"],
postgresql_concurrently=True,
)


def downgrade() -> None:
with op.get_context().autocommit_block():
op.drop_index(
"idx_short_name", table_name=CALLS_TABLE_NAME, postgresql_concurrently=True
)
op.drop_index(
"idx_talkgroup_group",
table_name=CALLS_TABLE_NAME,
postgresql_concurrently=True,
)
op.drop_index(
"idx_talkgroup_group_tag",
table_name=CALLS_TABLE_NAME,
postgresql_concurrently=True,
)
op.drop_index(
"idx_talkgroup_id",
table_name=CALLS_TABLE_NAME,
postgresql_concurrently=True,
)
op.drop_index(
"idx_talkgroup_tag",
table_name=CALLS_TABLE_NAME,
postgresql_concurrently=True,
)
op.drop_index(
"idx_start_time", table_name=CALLS_TABLE_NAME, postgresql_concurrently=True
)
12 changes: 5 additions & 7 deletions app/alembic/versions/71fea3333d68_create_calls_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

from app.models.models import CALLS_TABLE_NAME

# revision identifiers, used by Alembic.
revision: str = "71fea3333d68"
down_revision: Union[str, None] = None
Expand All @@ -23,10 +25,9 @@ def upgrade() -> None:
# Check if the table already exists, since we are introducing migrations after the table already exists
conn = op.get_bind()
inspector = sa.inspect(conn)
if not inspector.has_table("calls"):
# ### commands auto generated by Alembic - please adjust! ###
if not inspector.has_table(CALLS_TABLE_NAME):
op.create_table(
"calls",
CALLS_TABLE_NAME,
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column(
"raw_metadata", postgresql.JSONB(astext_type=sa.Text()), nullable=True
Expand All @@ -37,10 +38,7 @@ def upgrade() -> None:
),
sa.Column("geo", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("calls")
# ### end Alembic commands ###
op.drop_table(CALLS_TABLE_NAME)
15 changes: 12 additions & 3 deletions app/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3

from datetime import datetime
from typing import Annotated, Generator
import json
import logging
Expand All @@ -19,7 +20,7 @@

load_dotenv()

from app.search.helpers import get_default_index_name
from app.models.transcript import Transcript
from app.utils.exceptions import before_send
from app.models.database import engine
from app.models.metadata import Metadata
Expand Down Expand Up @@ -328,7 +329,11 @@ def create_call(
else:
raise HTTPException(status_code=400, detail="No audio provided")

call = models.CallCreate(raw_metadata=metadata, raw_audio_url=audio_url)
start_time = datetime.fromtimestamp(metadata["start_time"])

call = models.CallCreate(
raw_metadata=metadata, raw_audio_url=audio_url, start_time=start_time
)

db_call = models.create_call(db=db, call=call)

Expand All @@ -354,12 +359,16 @@ def update_call(
if db_call is None:
raise HTTPException(status_code=404, detail="Call not found")

if call.raw_transcript and not call.transcript_plaintext:
transcript = Transcript(call.raw_transcript)
call.transcript_plaintext = transcript.txt

return models.update_call(db=db, call=call, db_call=db_call)


@app.get("/talkgroups")
def talkgroups(db: Session = Depends(get_db)) -> JSONResponse:
tgs = models.get_talkgroups(db, get_default_index_name())
tgs = models.get_talkgroups(db)
return JSONResponse({"talkgroups": tgs})


Expand Down
20 changes: 16 additions & 4 deletions app/models/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from sqlalchemy import Column, text
from datetime import datetime
from sqlalchemy import TIMESTAMP, Column, text
from sqlalchemy.dialects.postgresql import JSONB
from sqlmodel import SQLModel, Session, Field

Expand All @@ -7,6 +8,9 @@
from .transcript import RawTranscript


CALLS_TABLE_NAME = "calls"


class Base(SQLModel):
pass

Expand All @@ -20,10 +24,16 @@ class CallBase(Base):
geo: GeoResponse | None = Field(
default=None, sa_column=Column(JSONB, nullable=True)
)
start_time: datetime = Field(
sa_column=Column(
TIMESTAMP(timezone=True), nullable=False, server_default=text("now()")
)
)
transcript_plaintext: str | None = Field(default=None)


class Call(CallBase, table=True):
__tablename__ = "calls"
__tablename__ = CALLS_TABLE_NAME
id: int | None = Field(default=None, primary_key=True)


Expand All @@ -36,6 +46,8 @@ class CallUpdate(Base):
raw_audio_url: str | None = None
raw_transcript: RawTranscript | None = None
geo: GeoResponse | None = None
start_time: datetime | None = None
transcript_plaintext: str | None = None


class CallPublic(CallBase):
Expand Down Expand Up @@ -64,15 +76,15 @@ def update_call(db: Session, call: CallUpdate, db_call: Call) -> Call:
return db_call


def get_talkgroups(db: Session, table_name: str) -> list[dict]:
def get_talkgroups(db: Session) -> list[dict]:
query = f"""
SELECT
raw_metadata::jsonb ->> 'short_name' AS short_name,
raw_metadata::jsonb ->> 'talkgroup_group' AS talkgroup_group,
raw_metadata::jsonb ->> 'talkgroup_tag' AS talkgroup_tag,
raw_metadata::jsonb ->> 'talkgroup' AS talkgroup
FROM
{table_name}
{CALLS_TABLE_NAME}
WHERE
raw_metadata::jsonb ->> 'talkgroup_tag' != ''
GROUP BY
Expand Down
12 changes: 4 additions & 8 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,16 @@ ENV UV_COMPILE_BYTECODE=1
# Copy from the cache instead of linking since it's a mounted volume
ENV UV_LINK_MODE=copy

COPY pyproject.toml pyproject.toml
COPY uv.lock uv.lock

# Install the project's dependencies using the lockfile and settings
RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,source=uv.lock,target=uv.lock \
--mount=type=bind,source=pyproject.toml,target=pyproject.toml \
uv sync --frozen --no-install-project --no-dev $WHISPER_IMPLEMENTATION_GROUP

# Then, add the rest of the project source code and install it
# Installing separately from its dependencies allows optimal layer caching
ADD . /src
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --no-dev $WHISPER_IMPLEMENTATION_GROUP

# $WHISPER_INSTALL_INSTRUCTIONS

COPY alembic.ini alembic.ini
COPY app app
COPY config config
COPY docker/docker-entrypoint.sh /usr/local/bin/
Expand Down
8 changes: 8 additions & 0 deletions docker/docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ if [ "$1" = 'api' ]; then
/bin/sh -c "while true; do find /tmp -type f -mmin +10 -delete; sleep 60; done" &
disown

if [ -n "$POSTGRES_HOST" ]; then
# Check if we have run any DB migrations yet
CURRENT_DB_VERSION="$(uv run alembic current)"
if [ -z "$CURRENT_DB_VERSION" ]; then
uv run alembic upgrade head
fi
fi

exec uv run uvicorn app.api:app --host 0.0.0.0 --log-level ${UVICORN_LOG_LEVEL:-info}
elif [ "$1" = 'worker' ]; then
# Clean up any old temp files
Expand Down
7 changes: 7 additions & 0 deletions update.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ git pull
echo "The .env file settings have recently changed. Please ensure your COMPOSE_FILE variable has all the proper services enabled that you want. Refer to .env.example for more information."

docker compose pull

read -p "Would you like to run database migrations? (y/n) " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
docker compose run --rm api uv run alembic upgrade head
fi

docker compose up -d $(docker compose ps --services)

# Optionally, reindex search calls if there was a change to the search schema
Expand Down

0 comments on commit a909aea

Please sign in to comment.