Skip to content

Commit

Permalink
fix: delete references if schema is deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
keejon committed Jan 21, 2025
1 parent 2da698f commit 73711ca
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 13 deletions.
13 changes: 12 additions & 1 deletion src/karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ def insert_schema_version(
references=references,
)

if references:
for ref in references:
self.insert_referenced_by(subject=ref.subject, version=ref.version, schema_id=schema_id)

if not deleted:
self._set_schema_id_on_subject(
subject=subject,
Expand Down Expand Up @@ -352,12 +356,19 @@ def delete_subject(self, *, subject: Subject, version: Version) -> None:

def delete_subject_hard(self, *, subject: Subject) -> None:
with self.schema_lock_thread:
for schema in self.subjects[subject].schemas.values():
if schema.references:
self.remove_referenced_by(schema.schema_id, schema.references)
del self.subjects[subject]
self._delete_subject_from_schema_id_on_subject(subject=subject)

def delete_subject_schema(self, *, subject: Subject, version: Version) -> None:
with self.schema_lock_thread:
self.subjects[subject].schemas.pop(version, None)
schema = self.subjects[subject].schemas.pop(version, None)
if schema:
if schema.references:
self.remove_referenced_by(schema.schema_id, schema.references)
self._delete_from_schema_id_on_subject(subject=subject, schema=schema.schema)

def num_schemas(self) -> int:
return len(self.schemas)
Expand Down
9 changes: 1 addition & 8 deletions src/karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, SchemaId, SchemaReaderStoppper, Subject, Version
from karapace.typing import JsonObject, SchemaReaderStoppper, Subject, Version
from karapace.utils import json_decode, JSONDecodeError, shutdown
from threading import Event, Lock, Thread
from typing import Final
Expand Down Expand Up @@ -687,13 +687,6 @@ def handle_msg(self, key: dict, value: dict | None) -> None:
)
raise InvalidSchema("Message key doesn't contain the `keytype` attribute")

def remove_referenced_by(
self,
schema_id: SchemaId,
references: Sequence[Reference],
) -> None:
self.database.remove_referenced_by(schema_id, references)

def get_referenced_by(
self,
subject: Subject,
Expand Down
4 changes: 0 additions & 4 deletions src/karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ async def subject_delete_local(self, subject: Subject, permanent: bool) -> list[
deleted=True,
references=schema_version.references,
)
if schema_version.references and len(schema_version.references) > 0:
self.schema_reader.remove_referenced_by(schema_version.schema_id, schema_version.references)
else:
try:
schema_versions_live = self.subject_get(subject, include_deleted=False)
Expand Down Expand Up @@ -225,8 +223,6 @@ async def subject_version_delete_local(self, subject: Subject, version: Version,
deleted=True,
references=schema_version.references,
)
if schema_version.references and len(schema_version.references) > 0:
self.schema_reader.remove_referenced_by(schema_version.schema_id, schema_version.references)
return resolved_version

def subject_get(self, subject: Subject, include_deleted: bool = False) -> dict[Version, SchemaVersion]:
Expand Down
79 changes: 79 additions & 0 deletions tests/unit/test_in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
from karapace.kafka.types import Timestamp
from karapace.key_format import KeyFormatter
from karapace.offset_watcher import OffsetWatcher
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_reader import KafkaSchemaReader
from karapace.schema_references import Reference, Referents
from karapace.schema_type import SchemaType
from karapace.typing import SchemaId, Version
from pathlib import Path
from typing import Final

import pytest

TEST_DATA_FOLDER: Final = Path("tests/unit/test_data/")


Expand Down Expand Up @@ -259,3 +263,78 @@ def test_can_ingest_schemas_from_log() -> None:
schema_id_to_duplicated_subjects = compute_schema_id_to_subjects(duplicates, database.subject_to_subject_data())
assert schema_id_to_duplicated_subjects == {}, "there shouldn't be any duplicated schemas"
assert duplicates == {}, "the schema database is broken. The id should be unique"


@pytest.fixture(name="db_with_schemas")
def fixture_in_memory_database_with_schemas() -> InMemoryDatabase:
db = InMemoryDatabase()
schema_str = "syntax = 'proto3'; message Test { string test = 1; }"

subject_a = Subject("subject_a")
schema_a = TypedSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=schema_str,
schema=ProtobufSchema(schema=schema_str),
)
db.insert_subject(subject=subject_a)
schema_id_a = db.get_schema_id(schema_a)
db.insert_schema_version(
subject=subject_a, schema_id=schema_id_a, version=Version(1), schema=schema_a, deleted=False, references=None
)
db.insert_schema_version(
subject=subject_a, schema_id=schema_id_a, version=Version(2), schema=schema_a, deleted=False, references=None
)

subject_b = Subject("subject_b")
references_b = [Reference(name="test", subject=subject_a, version=Version(1))]
schema_b = TypedSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=schema_str,
schema=ProtobufSchema(schema=schema_str),
references=references_b,
)
db.insert_subject(subject=subject_b)
schema_id_b = db.get_schema_id(schema_b)

db.insert_schema_version(
subject=subject_b,
schema_id=schema_id_b,
version=Version(1),
schema=schema_b,
deleted=False,
references=references_b,
)

return db


def test_delete_schema_references(db_with_schemas: InMemoryDatabase) -> None:
# Check that the schema is referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert referents is not None
version = db_with_schemas.find_schema_versions_by_schema_id(schema_id=referents[0], include_deleted=False)[0]
assert version.subject == Subject("subject_b")
assert version.version == Version(1)

# Delete the schema from subject_b
db_with_schemas.delete_subject_schema(subject=Subject("subject_b"), version=Version(1))

# Check that the schema is no longer referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert referents == [], "referents should be gone after deleting the schema"


def test_delete_subject(db_with_schemas: InMemoryDatabase) -> None:
# Check that the schema is referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert referents is not None
version = db_with_schemas.find_schema_versions_by_schema_id(schema_id=referents[0], include_deleted=False)[0]
assert version.subject == Subject("subject_b")
assert version.version == Version(1)

# Hard delete subject_b
db_with_schemas.delete_subject_hard(subject=Subject("subject_b"))

# Check that the schema is no longer referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert referents == [], "referents should be gone after hard deleting the subject"

0 comments on commit 73711ca

Please sign in to comment.