diff --git a/catalog/book/models.py b/catalog/book/models.py index 52e19f893..aaff46cff 100644 --- a/catalog/book/models.py +++ b/catalog/book/models.py @@ -231,6 +231,29 @@ def to_indexable_titles(self) -> list[str]: titles += [t for t in self.other_title if t] # type: ignore return list(set(titles)) + def to_indexable_people(self) -> list[str]: + return self.author + self.translator + + def to_indexable_company(self) -> list[str]: + return ([self.pub_house] if self.pub_house else []) + ( + [self.imprint] if self.imprint else [] + ) + + def to_indexable_doc(self) -> dict[str, str | int | list[str]]: + d = super().to_indexable_doc() + ids = [str(self.isbn)] if self.isbn else [] + if self.asin: + ids.append(str(self.asin)) + if ids: + d["lookup_id"] = ids + if self.pub_year: + d["year"] = self.pub_year + if self.series: + d["extra_title"] = [self.series] + if self.format: + d["subtype"] = [self.format] + return d + @property def isbn10(self): return isbn_13_to_10(self.isbn) diff --git a/catalog/common/models.py b/catalog/common/models.py index e29e1df4c..7b3efeffe 100644 --- a/catalog/common/models.py +++ b/catalog/common/models.py @@ -18,6 +18,7 @@ from polymorphic.models import PolymorphicModel from catalog.common import jsondata +from catalog.index import CatalogIndex from common.models import LANGUAGE_CHOICES, LOCALE_CHOICES, get_current_locales, uniq from .utils import item_cover_path, resource_cover_path @@ -659,6 +660,33 @@ def to_indexable_titles(self) -> list[str]: titles += self.parent_item.to_indexable_titles() return list(set(titles)) + def to_indexable_people(self) -> list[str]: + return [] + + def to_indexable_company(self) -> list[str]: + return [] + + def to_indexable_doc(self) -> dict[str, str | int | list[str]]: + return { + "id": str(self.pk), + "item_id": self.pk, + "item_class": self.__class__.__name__, + "title": self.to_indexable_titles(), + "people": self.to_indexable_people(), + "company": self.to_indexable_company(), + "tag": self.tags, + "mark_count": self.mark_count, + } + + def update_index(self): + index = CatalogIndex.instance() + index.replace_item(self) + # CatalogIndex.enqueue_replace_items([self.pk]) + + def save(self, *args, **kwargs): + super().save(*args, **kwargs) + self.update_index() + @classmethod def get_by_url(cls, url_or_b62: str, resolve_merge=False) -> "Self | None": b62 = url_or_b62.strip().split("/")[-1] @@ -761,9 +789,6 @@ def update_linked_items_from_external_resource(self, resource: "ExternalResource """Subclass should override this""" pass - def skip_index(self): - return False - @property def editable(self): return not self.is_deleted and self.merged_to_item is None @@ -780,6 +805,12 @@ def rating_count(self): return Rating.get_rating_count_for_item(self) + @property + def mark_count(self): + from journal.models import Mark + + return Mark.get_mark_count_for_item(self) + @property def rating_dist(self): from journal.models import Rating diff --git a/catalog/index.py b/catalog/index.py new file mode 100644 index 000000000..18e725a42 --- /dev/null +++ b/catalog/index.py @@ -0,0 +1,272 @@ +from datetime import timedelta +from functools import cached_property, reduce +from typing import TYPE_CHECKING, Iterable + +import django_rq +from django_redis import get_redis_connection +from loguru import logger +from rq.job import Job + +from common.models import Index, QueryParser, SearchResult +from common.models.misc import int_ + +if TYPE_CHECKING: + from catalog.models import Item + +_PENDING_INDEX_KEY = "pending_catalog_index_ids" +_PENDING_INDEX_QUEUE = "import" +_PENDING_INDEX_JOB_ID = "pending_catalog_index_flush" + + +def _update_catalog_index_task(): + item_ids = get_redis_connection("default").spop(_PENDING_INDEX_KEY, 1000) + updated = 0 + index = CatalogIndex.instance() + while item_ids: + index.replace_items(item_ids) + updated += len(item_ids) + item_ids = get_redis_connection("default").spop(_PENDING_INDEX_KEY, 1000) + logger.info(f"Catalog index updated for {updated} items") + + +def _cat_to_class(cat: str) -> list[str]: + from catalog.common.models import ItemCategory, item_categories + + return [c.__name__ for c in item_categories().get(ItemCategory(cat), [])] + + +class CatalogQueryParser(QueryParser): + fields = ["tag", "category", "type", "year", "language"] + default_search_params = { + "query_by": "title, people, company, lookup_id", + # "sort_by": "", + "per_page": 20, + "include_fields": "id, item_id", + "highlight_fields": "", + "facet_by": "item_class", + } + + def __init__( + self, + query: str, + page: int = 1, + page_size: int = 0, + filter_categories=[], + exclude_categories=[], + ): + from catalog.common.models import item_categories + + super().__init__(query, page, page_size) + + v = [i for i in set(self.parsed_fields.get("tag", "").split(",")) if i] + if v: + self.filter_by["tag"] = v + + v = [ + i for i in set(self.parsed_fields.get("category", "").split(",")) if i + ] or filter_categories + if v: + cats = { + c.value: [ic.__name__ for ic in cl] + for c, cl in item_categories().items() + } + v = list(set(v) & cats.keys()) + v = reduce(lambda a, b: a + b, [cats[i] for i in v], []) + if v: + self.filter_by["item_class"] = v + elif exclude_categories: + # apply exclude categories if no categories are specified + cs = reduce( + lambda a, b: a + b, [_cat_to_class(c) for c in exclude_categories], [] + ) + self.exclude("item_class", cs) + + v = self.parsed_fields.get("year", "").split("..") + if len(v) == 2: + start = int_(v[0]) + end = int_(v[1]) + if start and end: + self.filter_by["year"] = [f"{start}..{end}"] + elif len(v) == 1: + year = int_(v[0]) + if year: + self.filter_by["year"] = [f"{year}"] + + +class CatalogSearchResult(SearchResult): + @property + def facet_by_item_class(self): + return self.get_facet("item_class") + + @cached_property + def items(self): + from catalog.models import Item + + if not self: + return [] + ids = [hit["document"]["item_id"] for hit in self.response["hits"]] + items = Item.objects.filter(pk__in=ids, is_deleted=False) + items = [j for j in [i.final_item for i in items] if not j.is_deleted] + return items + + def __iter__(self): + return iter(self.items) + + def __getitem__(self, key): + return self.items[key] + + def __contains__(self, item): + return item in self.items + + +class CatalogIndex(Index): + name = "catalog" + schema = { + "fields": [ + { + "name": "item_id", + "type": "int64", + "sort": False, + }, + { + "name": "item_class", + "type": "string", + "facet": True, + }, + { + "name": "year", + "type": "int32", + "facet": True, + "optional": True, + }, + { + "name": "lookup_id", + "type": "string[]", + "optional": True, + }, + { + "name": "language", + "type": "string[]", + "facet": True, + "optional": True, + }, + { + "name": "title", + "locale": "zh", + "type": "string[]", + }, + { + "name": "extra_title", + "locale": "zh", + "type": "string[]", + }, + { + "name": "people", + "locale": "zh", + "type": "string[]", + "optional": True, + }, + { + "name": "company", + "locale": "zh", + "type": "string[]", + "optional": True, + }, + { + "name": "genre", + "type": "string[]", + "facet": True, + "optional": True, + }, + { + "name": "subtype", + "type": "string[]", + "facet": True, + "optional": True, + }, + { + "name": "mark_count", + "type": "int64", + "optional": True, + }, + { + "name": "tag", + "locale": "zh", + "type": "string[]", + "optional": True, + }, + {"name": ".*", "optional": True, "locale": "zh", "type": "auto"}, + ] + } + search_result_class = CatalogSearchResult + + @classmethod + def items_to_docs(cls, items: "Iterable[Item]") -> list[dict]: + docs = [i.to_indexable_doc() for i in items] + return [d for d in docs if d] + + def delete_all(self): + return self.delete_docs("item_id", ">0") + + def delete(self, item_ids): + return self.delete_docs("item_id", item_ids) + + def replace_items(self, item_ids): + from catalog.models import Item + + items = Item.objects.filter(pk__in=item_ids) + docs = [ + i.to_indexable_doc() + for i in items + if not i.is_deleted and not i.merged_to_item_id + ] + if docs: + self.replace_docs(docs) + if len(docs) < len(item_ids): + deletes = item_ids - [i.pk for i in items] + self.delete_docs("item_id", deletes) + + def replace_item(self, item: "Item"): + if not item.pk: + logger.error(f"Indexing {item} but no pk") + return + try: + if item.is_deleted or item.merged_to_item_id: + self.delete_docs("item_id", item.pk) + else: + doc = item.to_indexable_doc() + self.replace_docs([doc]) + except Exception as e: + logger.error(f"Indexing {item} error {e}") + + @classmethod + def enqueue_replace_items(cls, item_ids): + if not item_ids: + return + get_redis_connection("default").sadd(_PENDING_INDEX_KEY, *item_ids) + try: + job = Job.fetch( + id=_PENDING_INDEX_JOB_ID, + connection=django_rq.get_connection(_PENDING_INDEX_QUEUE), + ) + if job.get_status() in ["queued", "scheduled"]: + job.cancel() + except Exception: + pass + # using rq's built-in scheduler here, it can be switched to other similar implementations + django_rq.get_queue(_PENDING_INDEX_QUEUE).enqueue_in( + timedelta(seconds=2), + _update_catalog_index_task, + job_id=_PENDING_INDEX_JOB_ID, + ) + + def delete_item(self, item: "Item"): + if item.pk: + self.delete_docs("item_id", item.pk) + + def search( + self, + query, + ) -> CatalogSearchResult: # type:ignore + r = super().search(query) + return r # type:ignore diff --git a/catalog/management/commands/catalog.py b/catalog/management/commands/catalog.py index 0314c686d..bdc9cb3fa 100644 --- a/catalog/management/commands/catalog.py +++ b/catalog/management/commands/catalog.py @@ -1,26 +1,64 @@ +import json import time from django.contrib.contenttypes.models import ContentType from django.core.management.base import BaseCommand +from django.core.paginator import Paginator from django.db.models import Count, F from tqdm import tqdm from catalog.common.sites import SiteManager -from catalog.models import Edition, Item, Podcast, TVSeason, TVShow +from catalog.index import CatalogIndex, CatalogQueryParser +from catalog.models import ( + Edition, + Item, + Podcast, + TVSeason, + TVShow, +) from catalog.search.external import ExternalSources from catalog.sites.fedi import FediverseInstance from common.models import detect_language, uniq +_CONFIRM = "confirm deleting collection? [Y/N] " +_HELP_TEXT = """ +intergrity: check and fix integrity for merged and deleted items +purge: purge deleted items +search: search docs in index +extsearch: search external sites +idx-info: show index information +idx-init: check and create index if not exists +idx-destroy: delete index +idx-alt: update index schema +idx-delete: delete docs in index +idx-reindex: reindex docs +idx-get: dump one doc with --url +""" + class Command(BaseCommand): help = "catalog app utilities" def add_arguments(self, parser): parser.add_argument( - "--extsearch", + "action", + choices=[ + "integrity", + "purge", + "search", + "extsearch", + "idx-info", + "idx-init", + "idx-alt", + "idx-destroy", + "idx-reindex", + "idx-delete", + "idx-get", + ], + help=_HELP_TEXT, ) parser.add_argument( - "--category", + "--getegory", default="all", ) parser.add_argument( @@ -32,33 +70,19 @@ def add_arguments(self, parser): action="store_true", ) parser.add_argument( - "--purge", + "--yes", action="store_true", - help="purge deleted items", ) parser.add_argument( - "--localize", - action="store_true", - help="migrate localized title/description", + "--query", ) parser.add_argument( - "--integrity", - action="store_true", - help="check and fix integrity for merged and deleted items", + "--url", + ) + parser.add_argument( + "--batch-size", + default=1000, ) - - def handle(self, *args, **options): - self.verbose = options["verbose"] - self.fix = options["fix"] - if options["purge"]: - self.purge() - if options["integrity"]: - self.integrity() - if options["localize"]: - self.localize() - if options["extsearch"]: - self.external_search(options["extsearch"], options["category"]) - self.stdout.write(self.style.SUCCESS("Done.")) def external_search(self, q, cat): sites = SiteManager.get_sites_for_search() @@ -214,3 +238,76 @@ def integrity(self): if self.fix: i.show = i.show.merged_to_item i.save() + + def handle(self, action, query, yes, category, batch_size, url, *args, **options): + self.verbose = options["verbose"] + self.fix = options["fix"] + index = CatalogIndex.instance() + match action: + case "integrity": + self.integrity() + + case "purge": + self.purge() + + case "extsearch": + self.external_search(query, category) + + case "idx-destroy": + if yes or input(_CONFIRM).upper().startswith("Y"): + index.delete_collection() + self.stdout.write(self.style.SUCCESS("deleted.")) + + case "idx-alt": + # index.update_schema() + self.stdout.write(self.style.SUCCESS("not implemented.")) + + case "idx-init": + index.initialize_collection() + self.stdout.write(self.style.SUCCESS("initialized.")) + + case "idx-info": + try: + r = index.check() + self.stdout.write(str(r)) + except Exception as e: + self.stdout.write(self.style.ERROR(str(e))) + + case "idx-delete": + c = index.delete_all() + self.stdout.write(self.style.SUCCESS(f"deleted {c} documents.")) + + case "idx-reindex": + items = Item.objects.filter( + is_deleted=False, merged_to_item_id__isnull=True + ).order_by("id") + c = 0 + pg = Paginator(items, batch_size) + for p in tqdm(pg.page_range): + docs = index.items_to_docs(pg.get_page(p).object_list) + c += len(docs) + index.replace_docs(docs) + self.stdout.write(self.style.SUCCESS(f"indexed {c} docs.")) + + case "idx-get": + item = Item.get_by_url(url) + if not item: + self.stderr.write(self.style.ERROR("item not found.")) + else: + d = index.get_doc(item.pk) + self.stdout.write(json.dumps(d, indent=2)) + + case "search": + q = CatalogQueryParser("" if query == "-" else query, page_size=100) + # if category: + # q.filter("category", category) + r = index.search(q) + self.stdout.write(self.style.SUCCESS(str(r))) + self.stdout.write(f"{r.facet_by_item_class}") + for i in r: + self.stdout.write(str(i)) + + case _: + self.stdout.write(self.style.ERROR("action not found.")) + + self.stdout.write(self.style.SUCCESS("Done.")) diff --git a/catalog/models.py b/catalog/models.py index 3a0b943da..435f53828 100644 --- a/catalog/models.py +++ b/catalog/models.py @@ -44,7 +44,7 @@ ) from .search.models import Indexer, ExternalSearchResultItem # isort:skip - +from .index import CatalogIndex, CatalogQueryParser, CatalogSearchResult # class Exhibition(Item): @@ -151,4 +151,7 @@ def init_catalog_audit_log(): "Indexer", "init_catalog_search_models", "init_catalog_audit_log", + "CatalogIndex", + "CatalogQueryParser", + "CatalogSearchResult", ] diff --git a/common/models/index.py b/common/models/index.py index 994eac7f2..efff93edd 100644 --- a/common/models/index.py +++ b/common/models/index.py @@ -275,6 +275,9 @@ def delete_docs(self, field: str, values: list[int] | str) -> int: def patch_docs(self, partial_doc: dict, doc_filter: str): self.write_collection.documents.update(partial_doc, {"filter_by": doc_filter}) + def get_doc(self, doc_id): + return self.read_collection.documents[doc_id].retrieve() + def search( self, query: QueryParser, diff --git a/compose.yml b/compose.yml index 80096559c..5e6e1ba35 100644 --- a/compose.yml +++ b/compose.yml @@ -84,8 +84,8 @@ x-shared: IGDB_API_CLIENT_ID: IGDB_API_CLIENT_SECRET: DISCORD_WEBHOOKS: - SLACK_API_TOKEN: SSL_ONLY: + INDEX_ALIASES: restart: "unless-stopped" volumes: - ${NEODB_DATA:-../data}/neodb-media:/www/m diff --git a/journal/management/commands/journal.py b/journal/management/commands/journal.py index 69f3ec155..088355a9b 100644 --- a/journal/management/commands/journal.py +++ b/journal/management/commands/journal.py @@ -29,13 +29,13 @@ intergrity: check and fix remaining journal for merged and deleted items purge: delete invalid data (visibility=99) export: run export task +search: search docs in index idx-info: show index information idx-init: check and create index if not exists idx-destroy: delete index idx-alt: update index schema idx-delete: delete docs in index idx-reindex: reindex docs -idx-search: search docs in index """ @@ -60,7 +60,7 @@ def add_arguments(self, parser): "idx-destroy", "idx-reindex", "idx-delete", - "idx-search", + "search", ], help=_HELP_TEXT, ) @@ -255,7 +255,7 @@ def handle( # index.insert_docs(docs) # self.stdout.write(self.style.SUCCESS(f"indexed {c} posts.")) - case "idx-search": + case "search": q = JournalQueryParser("" if query == "-" else query, page_size=100) if owners: q.filter("owner_id", owners) diff --git a/journal/models/mark.py b/journal/models/mark.py index d91a1c678..070c0891b 100644 --- a/journal/models/mark.py +++ b/journal/models/mark.py @@ -301,3 +301,15 @@ def delete_log(self, log_id: int): def delete_all_logs(self): self.logs.delete() + + @staticmethod + def get_mark_count_for_item(item: Item) -> int: + if item.get_type() in ["Podcast", "TVSeason"]: + return ( + ShelfMember.objects.filter(item_id__in=item.child_item_ids + [item.pk]) + .values("owner_id") + .distinct() + .count() + ) + else: + return ShelfMember.objects.filter(item=item).count()