Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: delete references if schema is deleted #1027

Merged
merged 2 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions src/karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,10 @@ def num_subjects(self) -> int:
def num_schema_versions(self) -> tuple[int, int]:
pass

@abstractmethod
def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None:
pass

@abstractmethod
def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None:
pass

@abstractmethod
def remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None:
pass


class InMemoryDatabase(KarapaceDatabase):
def __init__(self) -> None:
Expand Down Expand Up @@ -257,6 +249,9 @@ def insert_schema_version(
schema=schema,
schema_id=schema_id,
)
if references:
for ref in references:
self._insert_referenced_by(subject=ref.subject, version=ref.version, schema_id=schema_id)
else:
self._delete_from_schema_id_on_subject(
subject=subject,
Expand Down Expand Up @@ -352,12 +347,19 @@ def delete_subject(self, *, subject: Subject, version: Version) -> None:

def delete_subject_hard(self, *, subject: Subject) -> None:
with self.schema_lock_thread:
for schema in self.subjects[subject].schemas.values():
if schema.references:
self._remove_referenced_by(schema.schema_id, schema.references)
del self.subjects[subject]
self._delete_subject_from_schema_id_on_subject(subject=subject)

def delete_subject_schema(self, *, subject: Subject, version: Version) -> None:
with self.schema_lock_thread:
self.subjects[subject].schemas.pop(version, None)
schema = self.subjects[subject].schemas.pop(version, None)
if schema:
if schema.references:
self._remove_referenced_by(schema.schema_id, schema.references)
self._delete_from_schema_id_on_subject(subject=subject, schema=schema.schema)

def num_schemas(self) -> int:
return len(self.schemas)
Expand All @@ -377,19 +379,19 @@ def num_schema_versions(self) -> tuple[int, int]:
soft_deleted_versions += 1
return (live_versions, soft_deleted_versions)

def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None:
def _insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None:
with self.schema_lock_thread:
referents = self.referenced_by.get((subject, version), None)
if referents:
referents.append(schema_id)
referents.add(schema_id)
else:
self.referenced_by[(subject, version)] = Referents([schema_id])
self.referenced_by[(subject, version)] = Referents({schema_id})

def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None:
with self.schema_lock_thread:
return self.referenced_by.get((subject, version), None)

def remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None:
def _remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None:
with self.schema_lock_thread:
for ref in references:
key = (ref.subject, ref.version)
Expand Down
13 changes: 1 addition & 12 deletions src/karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, SchemaId, SchemaReaderStoppper, Subject, Version
from karapace.typing import JsonObject, SchemaReaderStoppper, Subject, Version
from karapace.utils import json_decode, JSONDecodeError, shutdown
from threading import Event, Lock, Thread
from typing import Final
Expand Down Expand Up @@ -660,10 +660,6 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:
references=resolved_references,
)

if resolved_references:
for ref in resolved_references:
self.database.insert_referenced_by(subject=ref.subject, version=ref.version, schema_id=schema_id)

def handle_msg(self, key: dict, value: dict | None) -> None:
if "keytype" in key:
try:
Expand All @@ -687,13 +683,6 @@ def handle_msg(self, key: dict, value: dict | None) -> None:
)
raise InvalidSchema("Message key doesn't contain the `keytype` attribute")

def remove_referenced_by(
self,
schema_id: SchemaId,
references: Sequence[Reference],
) -> None:
self.database.remove_referenced_by(schema_id, references)

def get_referenced_by(
self,
subject: Subject,
Expand Down
2 changes: 1 addition & 1 deletion src/karapace/schema_references.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from karapace.typing import JsonData, JsonObject, SchemaId, Subject, Version
from typing import cast, NewType, TypeVar

Referents = NewType("Referents", list[SchemaId])
Referents = NewType("Referents", set[SchemaId])

T = TypeVar("T")

Expand Down
4 changes: 0 additions & 4 deletions src/karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[
deleted=True,
references=schema_version.references,
)
if schema_version.references and len(schema_version.references) > 0:
self.schema_reader.remove_referenced_by(schema_version.schema_id, schema_version.references)
else:
try:
schema_versions_live = self.subject_get(subject, include_deleted=False)
Expand Down Expand Up @@ -225,8 +223,6 @@ async def subject_version_delete_local(self, subject: Subject, version: Version,
deleted=True,
references=schema_version.references,
)
if schema_version.references and len(schema_version.references) > 0:
self.schema_reader.remove_referenced_by(schema_version.schema_id, schema_version.references)
return resolved_version

def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[Version, SchemaVersion]:
Expand Down
5 changes: 3 additions & 2 deletions stubs/confluent_kafka/admin/_config.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ._resource import ResourceType
from enum import Enum
from typing import cast

class ConfigResource:
Type = ResourceType
Expand All @@ -12,7 +13,7 @@ class ConfigResource:
) -> None: ...

class ConfigSource(Enum):
UNKNOWN_CONFIG: int
DYNAMIC_TOPIC_CONFIG: int
UNKNOWN_CONFIG = cast(int, ...)
DYNAMIC_TOPIC_CONFIG = cast(int, ...)

class ConfigEntry: ...
3 changes: 2 additions & 1 deletion stubs/confluent_kafka/admin/_resource.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from typing import cast

class ResourceType(Enum):
TOPIC: int
TOPIC = cast(int, ...)
86 changes: 79 additions & 7 deletions tests/unit/test_in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@
from __future__ import annotations

from collections import defaultdict
from collections.abc import Iterable, Sequence
from collections.abc import Sequence
from confluent_kafka.cimpl import KafkaError
from karapace.config import DEFAULTS
from karapace.constants import DEFAULT_SCHEMA_TOPIC
from karapace.in_memory_database import InMemoryDatabase, KarapaceDatabase, Subject, SubjectData
from karapace.kafka.types import Timestamp
from karapace.key_format import KeyFormatter
from karapace.offset_watcher import OffsetWatcher
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_reader import KafkaSchemaReader
from karapace.schema_references import Reference, Referents
from karapace.schema_type import SchemaType
from karapace.typing import SchemaId, Version
from pathlib import Path
from typing import Final

import pytest

TEST_DATA_FOLDER: Final = Path("tests/unit/test_data/")


Expand Down Expand Up @@ -176,15 +180,9 @@ def num_subjects(self) -> int:
def num_schema_versions(self) -> tuple[int, int]:
return self.db.num_schema_versions()

def insert_referenced_by(self, *, subject: Subject, version: Version, schema_id: SchemaId) -> None:
return self.db.insert_referenced_by(subject=subject, version=version, schema_id=schema_id)

def get_referenced_by(self, subject: Subject, version: Version) -> Referents | None:
return self.db.get_referenced_by(subject=subject, version=version)

def remove_referenced_by(self, schema_id: SchemaId, references: Iterable[Reference]) -> None:
return self.db.remove_referenced_by(schema_id=schema_id, references=references)

def duplicates(self) -> dict[SchemaId, list[tuple[Subject, TypedSchema]]]:
duplicate_data = defaultdict(list)
for schema_id, schemas in self._duplicates.items():
Expand Down Expand Up @@ -259,3 +257,77 @@ def test_can_ingest_schemas_from_log() -> None:
schema_id_to_duplicated_subjects = compute_schema_id_to_subjects(duplicates, database.subject_to_subject_data())
assert schema_id_to_duplicated_subjects == {}, "there shouldn't be any duplicated schemas"
assert duplicates == {}, "the schema database is broken. The id should be unique"


@pytest.fixture(name="db_with_schemas")
def fixture_in_memory_database_with_schemas() -> InMemoryDatabase:
db = InMemoryDatabase()
schema_str = "syntax = 'proto3'; message Test { string test = 1; }"

subject_a = Subject("subject_a")
schema_a = TypedSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=schema_str,
schema=ProtobufSchema(schema=schema_str),
)
db.insert_subject(subject=subject_a)
schema_id_a = db.get_schema_id(schema_a)
db.insert_schema_version(
subject=subject_a, schema_id=schema_id_a, version=Version(1), schema=schema_a, deleted=False, references=None
)
db.insert_schema_version(
subject=subject_a, schema_id=schema_id_a, version=Version(2), schema=schema_a, deleted=False, references=None
)

subject_b = Subject("subject_b")
references_b = [Reference(name="test", subject=subject_a, version=Version(1))]
schema_b = TypedSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=schema_str,
schema=ProtobufSchema(schema=schema_str),
references=references_b,
)
db.insert_subject(subject=subject_b)
schema_id_b = db.get_schema_id(schema_b)
db.insert_schema_version(
subject=subject_b,
schema_id=schema_id_b,
version=Version(1),
schema=schema_b,
deleted=False,
references=references_b,
)

return db


def test_delete_schema_references(db_with_schemas: InMemoryDatabase) -> None:
# Check that the schema is referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert referents is not None
version = db_with_schemas.find_schema_versions_by_schema_id(schema_id=referents.pop(), include_deleted=False)[0]
assert version.subject == Subject("subject_b")
assert version.version == Version(1)

# Delete the schema from subject_b
db_with_schemas.delete_subject_schema(subject=Subject("subject_b"), version=Version(1))

# Check that the schema is no longer referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert len(referents) == 0, "referents should be gone after deleting the schema"


def test_delete_subject(db_with_schemas: InMemoryDatabase) -> None:
# Check that the schema is referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert referents is not None
version = db_with_schemas.find_schema_versions_by_schema_id(schema_id=referents.pop(), include_deleted=False)[0]
assert version.subject == Subject("subject_b")
assert version.version == Version(1)

# Hard delete subject_b
db_with_schemas.delete_subject_hard(subject=Subject("subject_b"))

# Check that the schema is no longer referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert len(referents) == 0, "referents should be gone after hard deleting the subject"
Loading