Skip to content

Commit

Permalink
fix: delete references if schema is deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
keejon committed Jan 21, 2025
1 parent 2da698f commit ba65ad9
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 1 deletion.
13 changes: 12 additions & 1 deletion src/karapace/in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ def insert_schema_version(
references=references,
)

if references:
for ref in references:
self.insert_referenced_by(subject=ref.subject, version=ref.version, schema_id=schema_id)

if not deleted:
self._set_schema_id_on_subject(
subject=subject,
Expand Down Expand Up @@ -352,12 +356,19 @@ def delete_subject(self, *, subject: Subject, version: Version) -> None:

def delete_subject_hard(self, *, subject: Subject) -> None:
with self.schema_lock_thread:
for schema in self.subjects[subject].schemas.values():
if schema.references:
self.remove_referenced_by(schema.schema_id, schema.references)
del self.subjects[subject]
self._delete_subject_from_schema_id_on_subject(subject=subject)

def delete_subject_schema(self, *, subject: Subject, version: Version) -> None:
with self.schema_lock_thread:
self.subjects[subject].schemas.pop(version, None)
schema = self.subjects[subject].schemas.pop(version, None)
if schema:
if schema.references:
self.remove_referenced_by(schema.schema_id, schema.references)
self._delete_from_schema_id_on_subject(subject=subject, schema=schema.schema)

def num_schemas(self) -> int:
return len(self.schemas)
Expand Down
79 changes: 79 additions & 0 deletions tests/unit/test_in_memory_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
from karapace.kafka.types import Timestamp
from karapace.key_format import KeyFormatter
from karapace.offset_watcher import OffsetWatcher
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import SchemaVersion, TypedSchema
from karapace.schema_reader import KafkaSchemaReader
from karapace.schema_references import Reference, Referents
from karapace.schema_type import SchemaType
from karapace.typing import SchemaId, Version
from pathlib import Path
from typing import Final

import pytest

TEST_DATA_FOLDER: Final = Path("tests/unit/test_data/")


Expand Down Expand Up @@ -259,3 +263,78 @@ def test_can_ingest_schemas_from_log() -> None:
schema_id_to_duplicated_subjects = compute_schema_id_to_subjects(duplicates, database.subject_to_subject_data())
assert schema_id_to_duplicated_subjects == {}, "there shouldn't be any duplicated schemas"
assert duplicates == {}, "the schema database is broken. The id should be unique"


@pytest.fixture(name="db_with_schemas")
def fixture_in_memory_database_with_schemas() -> InMemoryDatabase:
db = InMemoryDatabase()
schema_str = "syntax = 'proto3'; message Test { string test = 1; }"

subject_a = Subject("subject_a")
schema_a = TypedSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=schema_str,
schema=ProtobufSchema(schema=schema_str),
)
db.insert_subject(subject=subject_a)
schema_id_a = db.get_schema_id(schema_a)
db.insert_schema_version(
subject=subject_a, schema_id=schema_id_a, version=Version(1), schema=schema_a, deleted=False, references=None
)
db.insert_schema_version(
subject=subject_a, schema_id=schema_id_a, version=Version(2), schema=schema_a, deleted=False, references=None
)

subject_b = Subject("subject_b")
references_b = [Reference(name="test", subject=subject_a, version=Version(1))]
schema_b = TypedSchema(
schema_type=SchemaType.PROTOBUF,
schema_str=schema_str,
schema=ProtobufSchema(schema=schema_str),
references=references_b,
)
db.insert_subject(subject=subject_b)
schema_id_b = db.get_schema_id(schema_b)

db.insert_schema_version(
subject=subject_b,
schema_id=schema_id_b,
version=Version(1),
schema=schema_b,
deleted=False,
references=references_b,
)

return db


def test_delete_schema_references(db_with_schemas: InMemoryDatabase) -> None:
# Check that the schema is referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert referents is not None
version = db_with_schemas.find_schema_versions_by_schema_id(schema_id=referents[0], include_deleted=False)[0]
assert version.subject == Subject("subject_b")
assert version.version == Version(1)

# Delete the schema from subject_b
db_with_schemas.delete_subject_schema(subject=Subject("subject_b"), version=Version(1))

# Check that the schema is no longer referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert referents == [], "referents should be gone after deleting the schema"


def test_delete_subject(db_with_schemas: InMemoryDatabase) -> None:
# Check that the schema is referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert referents is not None
version = db_with_schemas.find_schema_versions_by_schema_id(schema_id=referents[0], include_deleted=False)[0]
assert version.subject == Subject("subject_b")
assert version.version == Version(1)

# Hard delete subject_b
db_with_schemas.delete_subject_hard(subject=Subject("subject_b"))

# Check that the schema is no longer referenced by subject_b
referents = db_with_schemas.get_referenced_by(subject=Subject("subject_a"), version=Version(1))
assert referents == [], "referents should be gone after hard deleting the subject"

0 comments on commit ba65ad9

Please sign in to comment.