diff --git a/yente/search/base.py b/yente/search/base.py index 87344899..ae7fc5d0 100644 --- a/yente/search/base.py +++ b/yente/search/base.py @@ -1,6 +1,7 @@ import time import asyncio import warnings +from threading import Lock from typing import cast, Any, Dict from structlog.contextvars import get_contextvars from elasticsearch import AsyncElasticsearch @@ -15,7 +16,7 @@ log = get_logger(__name__) POOL: Dict[int, AsyncElasticsearch] = {} query_semaphore = asyncio.Semaphore(settings.QUERY_CONCURRENCY) -index_semaphore = asyncio.Semaphore(settings.INDEX_CONCURRENCY) +index_lock = Lock() def get_opaque_id() -> str: diff --git a/yente/search/indexer.py b/yente/search/indexer.py index 6b21025f..cd2b9fa3 100644 --- a/yente/search/indexer.py +++ b/yente/search/indexer.py @@ -14,7 +14,7 @@ from yente.data.dataset import Dataset from yente.data import get_catalog from yente.data.loader import load_json_lines -from yente.search.base import get_es, close_es, index_semaphore +from yente.search.base import get_es, close_es, index_lock from yente.search.mapping import make_entity_mapping from yente.search.mapping import INDEX_SETTINGS from yente.search.mapping import NAMES_FIELD, NAME_PHONETIC_FIELD @@ -66,7 +66,10 @@ async def iter_entity_docs( async def index_entities_rate_limit( es: AsyncElasticsearch, dataset: Dataset, force: bool ) -> bool: - async with index_semaphore: + if index_lock.locked(): + log.info("Index is already being updated", dataset=dataset.name, force=force) + return False + with index_lock: return await index_entities(es, dataset, force=force) @@ -174,11 +177,9 @@ async def update_index(force: bool = False) -> bool: try: catalog = await get_catalog() log.info("Index update check") - indexers = [] + changed = False for dataset in catalog.datasets: - indexers.append(index_entities_rate_limit(es, dataset, force)) - results = await asyncio.gather(*indexers) - changed = True in results + changed = changed or await index_entities_rate_limit(es, dataset, force) log.info("Index update complete.", changed=changed) return changed finally: diff --git a/yente/settings.py b/yente/settings.py index 6c70e1d1..05fa717f 100644 --- a/yente/settings.py +++ b/yente/settings.py @@ -136,9 +136,6 @@ def env_str(name: str, default: str) -> str: # How many match and search queries to run against ES in parallel: QUERY_CONCURRENCY = int(env_str("YENTE_QUERY_CONCURRENCY", "10")) -# How many index operations to run against ES in parallel: -INDEX_CONCURRENCY = int(env_str("YENTE_INDEX_CONCURRENCY", "5")) - # Default scoring threshold for /match results: SCORE_THRESHOLD = 0.70