Skip to content

Commit

Permalink
Merge pull request #29 from CrimeIsDown/migrate-to-typesense
Browse files Browse the repository at this point in the history
Add support for Typesense search server
  • Loading branch information
EricTendian authored Dec 30, 2024
2 parents 0c2447f + 392c871 commit 0d51e21
Show file tree
Hide file tree
Showing 30 changed files with 2,222 additions and 1,287 deletions.
12 changes: 10 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,21 @@ API_KEY=testing
# Version of Meilisearch to use (e.g. v1.4.0)
# MEILI_VERSION=

MEILI_MASTER_KEY=testing
MEILI_URL=http://meilisearch:7700
# MEILI_MASTER_KEY=testing
# MEILI_URL=http://meilisearch:7700
# To use a different index name other than calls:
# MEILI_INDEX=calls
# For large systems, to make a new index for each month (e.g. calls_2024_01); MEILI_INDEX becomes the prefix
# MEILI_INDEX_SPLIT_BY_MONTH=true

# Version of Typesense to use
TYPESENSE_VERSION=27.1

TYPESENSE_API_KEY=testing
TYPESENSE_URL=http://typesense:8108

SEARCH_UI_URL=http://localhost:3000

#
# Storage settings
#
Expand Down
8 changes: 8 additions & 0 deletions .env.testing
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ MEILI_URL=http://meilisearch:7700
# For large systems, to make a new index for each month (e.g. calls_2024_01); MEILI_INDEX becomes the prefix
MEILI_INDEX_SPLIT_BY_MONTH=false

# Version of Typesense to use
TYPESENSE_VERSION=27.1

TYPESENSE_API_KEY=testing
TYPESENSE_URL=http://typesense:8108

SEARCH_UI_URL=http://localhost:3000

#
# Storage settings
#
Expand Down
1 change: 1 addition & 0 deletions .env.testing.local
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
API_BASE_URL=http://127.0.0.1:8000
MEILI_URL=http://127.0.0.1:7700
TYPESENSE_URL=http://127.0.0.1:8108
S3_PUBLIC_URL=http://127.0.0.1:9000/trunk-transcribe
2 changes: 1 addition & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@ jobs:
run: |
docker compose --ansi never logs -f &
sleep 10
./make.sh test
./make.sh test -s
18 changes: 7 additions & 11 deletions app/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import subprocess
import sys
import tempfile
import threading

from celery.result import AsyncResult
from dotenv import load_dotenv
Expand All @@ -20,7 +19,7 @@

load_dotenv()

from app.search import search
from app.search.helpers import get_default_index_name
from app.utils.exceptions import before_send
from app.models.database import SessionLocal, engine
from app.models.metadata import Metadata
Expand Down Expand Up @@ -60,15 +59,6 @@
logger.addHandler(stream_handler)


def create_search_index():
search_client = search.get_client()
search.create_or_update_index(search_client, search.get_default_index_name())


thread = threading.Thread(target=create_search_index)
thread.start()


# Dependency
def get_db(): # type: ignore
db = SessionLocal()
Expand Down Expand Up @@ -369,6 +359,12 @@ def update_call(
return call_model.update_call(db=db, call=call, db_call=db_call)


@app.get("/talkgroups")
def talkgroups(db: Session = Depends(get_db)) -> JSONResponse:
tgs = call_model.get_talkgroups(db, get_default_index_name())
return JSONResponse({"talkgroups": tgs})


@app.get("/config/{filename}")
def get_config(filename: str) -> JSONResponse:
if filename not in os.listdir("config"):
Expand Down
4 changes: 2 additions & 2 deletions app/bin/autoscale-vast.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def _update_pending_instances(self, instances: list[dict]):

def get_worker_status(self) -> list[dict]:
workers = []
result = worker.celery.control.inspect(timeout=10).stats()
result = worker.celery.control.inspect(timeout=10).stats() # type: ignore
if result:
for name, stats in result.items():
workers.append({"name": name, "stats": stats})
Expand Down Expand Up @@ -245,7 +245,7 @@ def get_current_instances(self) -> list[dict]:
def create_instances(self, count: int) -> int:
logging.info(f"Scaling up by {count} instances")

mem_util_factor = 1
mem_util_factor = 1.0
# Decrease the memory needed for certain forks
if self.implementation in ["faster-whisper", "whisper.cpp"]:
mem_util_factor = 0.4
Expand Down
13 changes: 7 additions & 6 deletions app/bin/import-to-db.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
# Load the .env file of our choice if specified before the regular .env can load
load_dotenv(os.getenv("ENV"))

from app.search import search
from app.search import helpers
from app.search.adapters import MeilisearchAdapter
from app.models import database


Expand All @@ -35,8 +36,8 @@ def get_documents(
MeiliDocument(hit) for hit in results["hits"]
]
else:
results = index.get_documents(pagination)
return results.total, results.results
results = index.get_documents(pagination) # type: ignore
return results.total, results.results # type: ignore


if __name__ == "__main__":
Expand All @@ -47,7 +48,7 @@ def get_documents(
parser.add_argument(
"--index",
type=str,
default=search.get_default_index_name(),
default=helpers.get_default_index_name(),
help="Meilisearch index to use",
)
parser.add_argument(
Expand Down Expand Up @@ -77,9 +78,9 @@ def get_documents(
level=logging.DEBUG if args.verbose else logging.INFO,
)

client = search.get_client()
adapter = MeilisearchAdapter()

index = client.index(args.index)
index = adapter.client.index(args.index)

total, _ = get_documents(index, {"limit": 1}, args.search)
logging.info(f"Found {total} total documents")
Expand Down
175 changes: 175 additions & 0 deletions app/bin/migrate-from-meilisearch-to-typesense.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#!/usr/bin/env python3

import argparse
import json
import logging
import os
from typing import Tuple

from dotenv import load_dotenv
from meilisearch.index import Index
from meilisearch.models.document import Document as MeiliDocument
from typesense.documents import Documents

# Load the .env file of our choice if specified before the regular .env can load
load_dotenv(os.getenv("ENV"))

from app.geocoding.geocoding import GeoResponse
from app.models.metadata import Metadata
from app.models.transcript import Transcript
from app.search import helpers, adapters


def convert_document(document: MeiliDocument) -> helpers.Document:
metadata: Metadata = json.loads(document.raw_metadata)
transcript = Transcript(json.loads(document.raw_transcript))

if hasattr(document, "_geo") and hasattr(document, "geo_formatted_address"):
geo = GeoResponse(
geo=document._geo, # type: ignore
geo_formatted_address=document.geo_formatted_address,
)
else:
geo = None

return typesense_adapter.build_document(
document.id, metadata, document.raw_audio_url, transcript, geo
)


def _import(collection: Documents, documents: list[helpers.Document]):
return collection.import_(
documents, {"action": "upsert", "batch_size": len(documents)}
)


def get_documents(index: Index, pagination: dict) -> Tuple[int, list[MeiliDocument]]:
opts = pagination

results = index.get_documents(opts)
return results.total, results.results


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Import calls from Meilisearch into Typesense.",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--batch-size",
type=int,
default=20000,
help="How many documents to fetch and update in a single request, defaults to 20000",
)
parser.add_argument(
"--index",
type=str,
default=helpers.get_default_index_name(),
help=f"The index to reindex, defaults to '{helpers.get_default_index_name()}'",
)
parser.add_argument(
"--all-indices",
action="store_true",
help="Reindex all indices",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable verbose mode (debug logging)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show the matching documents and associated transformations, without actually doing the reindex",
)
args = parser.parse_args()

logging.basicConfig(
format="[%(asctime)s] %(message)s",
level=logging.DEBUG if args.verbose else logging.INFO,
)

meili_adapter = adapters.MeilisearchAdapter(timeout=60 * 60)
typesense_adapter = adapters.TypesenseAdapter(timeout=60 * 60)

indicies = [args.index]

if args.all_indices:
indicies = [
index.uid for index in meili_adapter.client.get_indexes()["results"]
]

# Sort indicies to ensure we always process them in the same order
indicies.sort()

for index in indicies:
logging.info(f"Reindexing index: {index}")

meili_index = meili_adapter.client.index(index)

# Create collection in typesense
typesense_adapter.upsert_index(index)
collection_docs = typesense_adapter.client.collections[index].documents # type: ignore

total, _ = get_documents(meili_index, {"limit": 1})
logging.info(f"Found {total} total documents")
limit = args.batch_size
offset = 0
total_processed = 0
updated_documents = []

while offset < total:
total, documents = get_documents(
meili_index, {"offset": offset, "limit": limit}
)
offset += limit

completion = min((offset / total) * 100, 100)

if len(documents):
logging.log(
logging.INFO if args.dry_run else logging.DEBUG,
"First 5 documents that were matched:\n"
+ json.dumps(
[dict(doc)["_Document__doc"] for doc in documents[:5]],
sort_keys=True,
indent=4,
),
)
docs_to_add = list(
filter(
lambda doc: doc is not None,
map(lambda d: convert_document(d), documents),
)
)
updated_documents += docs_to_add
logging.info(f"Added {len(docs_to_add)} documents to be indexed")
total_processed += len(updated_documents)
logging.log(
logging.INFO if args.dry_run else logging.DEBUG,
"The documents to be imported:\n"
+ json.dumps(updated_documents[:5], sort_keys=True, indent=4),
)

if args.dry_run:
logging.warning(
f"Dry run enabled, exiting. We would have imported at least {len(documents)} documents"
)
break

if len(updated_documents):
# Only send the updated docs to be reindexed when we have a big enough batch
if len(updated_documents) >= limit or offset >= total:
logging.info(
f"Waiting for {len(updated_documents)} documents to be imported"
)
_import(collection_docs, updated_documents)
# Reset the list of updated documents
updated_documents = []

logging.info(f"{completion:.2f}% complete ({min(offset, total)}/{total})")

if not args.dry_run:
logging.info(
f"Successfully imported {total_processed} total matching documents"
)
Loading

0 comments on commit 0d51e21

Please sign in to comment.