Skip to content

Commit

Permalink
don't try to parallelise indexers
Browse files Browse the repository at this point in the history
  • Loading branch information
pudo committed Jan 9, 2024
1 parent fceb858 commit 7bb9b7f
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 10 deletions.
3 changes: 2 additions & 1 deletion yente/search/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
import asyncio
import warnings
from threading import Lock
from typing import cast, Any, Dict
from structlog.contextvars import get_contextvars
from elasticsearch import AsyncElasticsearch
Expand All @@ -15,7 +16,7 @@
log = get_logger(__name__)
POOL: Dict[int, AsyncElasticsearch] = {}
query_semaphore = asyncio.Semaphore(settings.QUERY_CONCURRENCY)
index_semaphore = asyncio.Semaphore(settings.INDEX_CONCURRENCY)
index_lock = Lock()


def get_opaque_id() -> str:
Expand Down
13 changes: 7 additions & 6 deletions yente/search/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from yente.data.dataset import Dataset
from yente.data import get_catalog
from yente.data.loader import load_json_lines
from yente.search.base import get_es, close_es, index_semaphore
from yente.search.base import get_es, close_es, index_lock
from yente.search.mapping import make_entity_mapping
from yente.search.mapping import INDEX_SETTINGS
from yente.search.mapping import NAMES_FIELD, NAME_PHONETIC_FIELD
Expand Down Expand Up @@ -66,7 +66,10 @@ async def iter_entity_docs(
async def index_entities_rate_limit(
es: AsyncElasticsearch, dataset: Dataset, force: bool
) -> bool:
async with index_semaphore:
if index_lock.locked():
log.info("Index is already being updated", dataset=dataset.name, force=force)
return False
with index_lock:
return await index_entities(es, dataset, force=force)


Expand Down Expand Up @@ -174,11 +177,9 @@ async def update_index(force: bool = False) -> bool:
try:
catalog = await get_catalog()
log.info("Index update check")
indexers = []
changed = False
for dataset in catalog.datasets:
indexers.append(index_entities_rate_limit(es, dataset, force))
results = await asyncio.gather(*indexers)
changed = True in results
changed = changed or await index_entities_rate_limit(es, dataset, force)
log.info("Index update complete.", changed=changed)
return changed
finally:
Expand Down
3 changes: 0 additions & 3 deletions yente/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,6 @@ def env_str(name: str, default: str) -> str:
# How many match and search queries to run against ES in parallel:
QUERY_CONCURRENCY = int(env_str("YENTE_QUERY_CONCURRENCY", "10"))

# How many index operations to run against ES in parallel:
INDEX_CONCURRENCY = int(env_str("YENTE_INDEX_CONCURRENCY", "5"))

# Default scoring threshold for /match results:
SCORE_THRESHOLD = 0.70

Expand Down

0 comments on commit 7bb9b7f

Please sign in to comment.