diff --git a/src/karapace/__main__.py b/src/karapace/__main__.py index da954488b..52d721703 100644 --- a/src/karapace/__main__.py +++ b/src/karapace/__main__.py @@ -6,6 +6,7 @@ from karapace.api.container import SchemaRegistryContainer from karapace.api.factory import create_karapace_application, karapace_schema_registry_lifespan from karapace.api.telemetry.container import TelemetryContainer +from karapace.core.auth_container import AuthContainer from karapace.core.container import KarapaceContainer import karapace.api.controller @@ -18,48 +19,79 @@ import karapace.api.routers.mode import karapace.api.routers.schemas import karapace.api.routers.subjects -import karapace.api.telemetry.meter +import karapace.core.instrumentation.meter import karapace.api.telemetry.middleware import karapace.api.telemetry.setup import karapace.api.telemetry.tracer import karapace.api.user import uvicorn +from karapace.core.metrics_container import MetricsContainer + if __name__ == "__main__": karapace_container = KarapaceContainer() karapace_container.wire( modules=[ __name__, - karapace.api.controller, karapace.api.telemetry.tracer, - karapace.api.telemetry.meter, + karapace.core.instrumentation.meter, ] ) - telemetry_container = TelemetryContainer(karapace_container=karapace_container) - telemetry_container.wire( + auth_container = AuthContainer( + karapace_container=karapace_container, + ) + auth_container.wire( + modules=[ + karapace.api.controller, + karapace.api.factory, + karapace.api.routers.compatibility, + karapace.api.routers.config, + karapace.api.routers.mode, + karapace.api.routers.schemas, + karapace.api.routers.subjects, + karapace.api.user, + ] + ) + + metrics_container = MetricsContainer( + karapace_container=karapace_container, + ) + metrics_container.wire( modules=[ + karapace.api.factory, karapace.api.telemetry.setup, + ] + ) + + telemetry_container = TelemetryContainer( + karapace_container=karapace_container, + metrics_container=metrics_container, + ) + telemetry_container.wire( + modules=[ karapace.api.telemetry.middleware, + karapace.api.telemetry.setup, ] ) schema_registry_container = SchemaRegistryContainer( - karapace_container=karapace_container, telemetry_container=telemetry_container + karapace_container=karapace_container, + metrics_container=metrics_container, + telemetry_container=telemetry_container, ) schema_registry_container.wire( modules=[ __name__, karapace.api.factory, - karapace.api.user, + karapace.api.routers.compatibility, + karapace.api.routers.config, karapace.api.routers.health, + karapace.api.routers.master_availability, karapace.api.routers.metrics, - karapace.api.routers.subjects, - karapace.api.routers.schemas, - karapace.api.routers.config, - karapace.api.routers.compatibility, karapace.api.routers.mode, - karapace.api.routers.master_availability, + karapace.api.routers.schemas, + karapace.api.routers.subjects, ] ) diff --git a/src/karapace/api/container.py b/src/karapace/api/container.py index e70d4509a..7c10d748a 100644 --- a/src/karapace/api/container.py +++ b/src/karapace/api/container.py @@ -7,18 +7,24 @@ from karapace.api.controller import KarapaceSchemaRegistryController from karapace.api.telemetry.container import TelemetryContainer from karapace.core.container import KarapaceContainer +from karapace.core.metrics_container import MetricsContainer from karapace.core.schema_registry import KarapaceSchemaRegistry class SchemaRegistryContainer(containers.DeclarativeContainer): karapace_container = providers.Container(KarapaceContainer) + metrics_container = providers.Container(MetricsContainer) telemetry_container = providers.Container(TelemetryContainer) - schema_registry = providers.Singleton(KarapaceSchemaRegistry, config=karapace_container.config) + schema_registry = providers.Singleton( + KarapaceSchemaRegistry, + config=karapace_container.config, + stats=metrics_container.stats, + ) schema_registry_controller = providers.Singleton( KarapaceSchemaRegistryController, config=karapace_container.config, schema_registry=schema_registry, - stats=karapace_container.statsd, + stats=metrics_container.stats, ) diff --git a/src/karapace/api/controller.py b/src/karapace/api/controller.py index 7095ba30c..ff303d09e 100644 --- a/src/karapace/api/controller.py +++ b/src/karapace/api/controller.py @@ -25,11 +25,11 @@ SubjectVersion, ) from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.core.auth_container import AuthContainer from karapace.core.compatibility import CompatibilityModes from karapace.core.compatibility.jsonschema.checks import is_incompatible from karapace.core.compatibility.schema_compatibility import SchemaCompatibility from karapace.core.config import Config -from karapace.core.container import KarapaceContainer from karapace.core.errors import ( IncompatibleSchema, InvalidReferences, @@ -57,7 +57,7 @@ ) from karapace.core.schema_references import LatestVersionReference, Reference from karapace.core.schema_registry import KarapaceSchemaRegistry -from karapace.core.statsd import StatsClient +from karapace.core.stats import StatsClient from karapace.core.typing import JsonData, JsonObject, SchemaId, Subject, Version from karapace.core.utils import JSONDecodeError from typing import Any, cast @@ -71,8 +71,6 @@ class KarapaceSchemaRegistryController: def __init__(self, config: Config, schema_registry: KarapaceSchemaRegistry, stats: StatsClient) -> None: - # super().__init__(config=config, not_ready_handler=self._forward_if_not_ready_to_serve) - self.config = config self._process_start_time = time.monotonic() self.stats = stats @@ -157,7 +155,7 @@ async def schemas_list( deleted: bool, latest_only: bool, user: User | None, - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), ) -> list[SchemaListingItem]: schemas = await self.schema_registry.schemas_list(include_deleted=deleted, latest_only=latest_only) response_schemas: list[SchemaListingItem] = [] @@ -190,7 +188,7 @@ async def schemas_get( include_subjects: bool, format_serialized: str, user: User | None, - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), ) -> SchemasResponse: try: parsed_schema_id = SchemaId(int(schema_id)) @@ -267,7 +265,7 @@ async def schemas_get_versions( schema_id: str, deleted: bool, user: User | None, - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), ) -> list[SubjectVersion]: try: schema_id_int = SchemaId(int(schema_id)) @@ -387,7 +385,7 @@ async def subjects_list( self, deleted: bool, user: User | None, - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), ) -> list[str]: subjects = [str(subject) for subject in self.schema_registry.database.find_subjects(include_deleted=deleted)] if authorizer: diff --git a/src/karapace/api/factory.py b/src/karapace/api/factory.py index cf60a2409..55c27140b 100644 --- a/src/karapace/api/factory.py +++ b/src/karapace/api/factory.py @@ -15,10 +15,12 @@ from karapace.api.routers.setup import setup_routers from karapace.api.telemetry.setup import setup_metering, setup_tracing from karapace.core.auth import AuthenticatorAndAuthorizer +from karapace.core.auth_container import AuthContainer from karapace.core.config import Config from karapace.core.logging_setup import configure_logging, log_config_without_secrets +from karapace.core.metrics_container import MetricsContainer from karapace.core.schema_registry import KarapaceSchemaRegistry -from karapace.core.statsd import StatsClient +from karapace.core.stats import StatsClient from typing import AsyncContextManager import logging @@ -29,20 +31,20 @@ async def karapace_schema_registry_lifespan( _: FastAPI, forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), - stastd: StatsClient = Depends(Provide[SchemaRegistryContainer.karapace_container.statsd]), + stats: StatsClient = Depends(Provide[MetricsContainer.stats]), schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), ) -> AsyncGenerator[None, None]: try: await schema_registry.start() - await authorizer.start(stats=stastd) + await authorizer.start(stats=stats) yield finally: await schema_registry.close() await authorizer.close() await forward_client.close() - stastd.close() + stats.close() def create_karapace_application( diff --git a/src/karapace/api/routers/compatibility.py b/src/karapace/api/routers/compatibility.py index c0079159b..d3c631bf9 100644 --- a/src/karapace/api/routers/compatibility.py +++ b/src/karapace/api/routers/compatibility.py @@ -12,6 +12,7 @@ from karapace.api.routers.requests import CompatibilityCheckResponse, SchemaRequest from karapace.api.user import get_current_user from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.core.auth_container import AuthContainer from karapace.core.typing import Subject from typing import Annotated from urllib.parse import unquote_plus @@ -31,7 +32,7 @@ async def compatibility_post( version: str, # TODO support actual Version object schema_request: SchemaRequest, user: Annotated[User, Depends(get_current_user)], - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityCheckResponse: subject = Subject(unquote_plus(subject)) diff --git a/src/karapace/api/routers/config.py b/src/karapace/api/routers/config.py index 5eaa1d365..6f8e5b724 100644 --- a/src/karapace/api/routers/config.py +++ b/src/karapace/api/routers/config.py @@ -13,6 +13,7 @@ from karapace.api.routers.requests import CompatibilityLevelResponse, CompatibilityRequest, CompatibilityResponse from karapace.api.user import get_current_user from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.core.auth_container import AuthContainer from karapace.core.schema_registry import KarapaceSchemaRegistry from karapace.core.typing import Subject from typing import Annotated @@ -30,7 +31,7 @@ @inject async def config_get( user: Annotated[User, Depends(get_current_user)], - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityLevelResponse: if authorizer and not authorizer.check_authorization(user, Operation.Read, "Config:"): @@ -47,7 +48,7 @@ async def config_put( user: Annotated[User, Depends(get_current_user)], schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, "Config:"): @@ -69,7 +70,7 @@ async def config_get_subject( subject: Subject, user: Annotated[User, Depends(get_current_user)], defaultToGlobal: bool = False, - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityLevelResponse: subject = Subject(unquote_plus(subject)) @@ -88,7 +89,7 @@ async def config_set_subject( user: Annotated[User, Depends(get_current_user)], schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityResponse: subject = Subject(unquote_plus(subject)) @@ -113,7 +114,7 @@ async def config_delete_subject( user: Annotated[User, Depends(get_current_user)], schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityResponse: subject = Subject(unquote_plus(subject)) diff --git a/src/karapace/api/routers/mode.py b/src/karapace/api/routers/mode.py index a2e78a1a6..649f1394f 100644 --- a/src/karapace/api/routers/mode.py +++ b/src/karapace/api/routers/mode.py @@ -12,6 +12,7 @@ from karapace.api.routers.requests import ModeResponse from karapace.api.user import get_current_user from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.core.auth_container import AuthContainer from karapace.core.typing import Subject from typing import Annotated from urllib.parse import unquote_plus @@ -28,7 +29,7 @@ @inject async def mode_get( user: Annotated[User, Depends(get_current_user)], - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> ModeResponse: if authorizer and not authorizer.check_authorization(user, Operation.Read, "Config:"): @@ -42,7 +43,7 @@ async def mode_get( async def mode_get_subject( subject: Subject, user: Annotated[User, Depends(get_current_user)], - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> ModeResponse: subject = Subject(unquote_plus(subject)) diff --git a/src/karapace/api/routers/schemas.py b/src/karapace/api/routers/schemas.py index 7c8c1c944..a2ceb7220 100644 --- a/src/karapace/api/routers/schemas.py +++ b/src/karapace/api/routers/schemas.py @@ -12,6 +12,8 @@ from karapace.core.auth import AuthenticatorAndAuthorizer, User from typing import Annotated +from karapace.core.auth_container import AuthContainer + schemas_router = APIRouter( prefix="/schemas", tags=["schemas"], @@ -26,7 +28,7 @@ async def schemas_get_list( user: Annotated[User, Depends(get_current_user)], deleted: bool = False, latestOnly: bool = False, - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[SchemaListingItem]: return await controller.schemas_list( @@ -45,7 +47,7 @@ async def schemas_get( includeSubjects: bool = False, # TODO: include subjects? fetchMaxId: bool = False, # TODO: fetch max id? format_serialized: str = Query("", alias="format"), - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> SchemasResponse: return await controller.schemas_get( @@ -72,7 +74,7 @@ async def schemas_get_versions( user: Annotated[User, Depends(get_current_user)], schema_id: str, deleted: bool = False, - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[SubjectVersion]: return await controller.schemas_get_versions( diff --git a/src/karapace/api/routers/subjects.py b/src/karapace/api/routers/subjects.py index 73650d1eb..7773b4286 100644 --- a/src/karapace/api/routers/subjects.py +++ b/src/karapace/api/routers/subjects.py @@ -13,6 +13,7 @@ from karapace.api.routers.requests import SchemaIdResponse, SchemaRequest, SchemaResponse, SubjectSchemaVersionResponse from karapace.api.user import get_current_user from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.core.auth_container import AuthContainer from karapace.core.schema_registry import KarapaceSchemaRegistry from karapace.core.typing import Subject from typing import Annotated @@ -36,7 +37,7 @@ async def subjects_get( user: Annotated[User, Depends(get_current_user)], deleted: bool = False, - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[str]: return await controller.subjects_list( @@ -54,7 +55,7 @@ async def subjects_subject_post( schema_request: SchemaRequest, deleted: bool = False, normalize: bool = False, - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> SchemaResponse: subject = Subject(unquote_plus(subject)) @@ -77,7 +78,7 @@ async def subjects_subject_delete( user: Annotated[User, Depends(get_current_user)], permanent: bool = False, forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[int]: @@ -103,7 +104,7 @@ async def subjects_subject_versions_post( schema_request: SchemaRequest, user: Annotated[User, Depends(get_current_user)], forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), normalize: bool = False, controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> SchemaIdResponse: @@ -128,7 +129,7 @@ async def subjects_subject_versions_list( subject: Subject, user: Annotated[User, Depends(get_current_user)], deleted: bool = False, - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[int]: subject = Subject(unquote_plus(subject)) @@ -145,7 +146,7 @@ async def subjects_subject_version_get( version: str, user: Annotated[User, Depends(get_current_user)], deleted: bool = False, - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> SubjectSchemaVersionResponse: subject = Subject(unquote_plus(subject)) @@ -164,7 +165,7 @@ async def subjects_subject_version_delete( user: Annotated[User, Depends(get_current_user)], permanent: bool = False, forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> int: @@ -188,7 +189,7 @@ async def subjects_subject_version_schema_get( subject: Subject, version: str, user: Annotated[User, Depends(get_current_user)], - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> dict: subject = Subject(unquote_plus(subject)) @@ -204,7 +205,7 @@ async def subjects_subject_version_referenced_by( subject: Subject, version: str, user: Annotated[User, Depends(get_current_user)], - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[int]: subject = Subject(unquote_plus(subject)) diff --git a/src/karapace/api/telemetry/container.py b/src/karapace/api/telemetry/container.py index 6c7d1e397..15ef4d295 100644 --- a/src/karapace/api/telemetry/container.py +++ b/src/karapace/api/telemetry/container.py @@ -4,11 +4,11 @@ """ from dependency_injector import containers, providers -from karapace.api.telemetry.meter import Meter from karapace.api.telemetry.metrics import HTTPRequestMetrics from karapace.api.telemetry.tracer import Tracer from karapace.core.config import Config from karapace.core.container import KarapaceContainer +from karapace.core.metrics_container import MetricsContainer from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.semconv.attributes import telemetry_attributes as T @@ -28,10 +28,8 @@ def create_telemetry_resource(config: Config) -> Resource: class TelemetryContainer(containers.DeclarativeContainer): karapace_container = providers.Container(KarapaceContainer) - telemetry_resource = providers.Factory(create_telemetry_resource, config=karapace_container.config) - - meter = providers.Singleton(Meter) - http_request_metrics = providers.Singleton(HTTPRequestMetrics, meter=meter) + metrics_container = providers.Container(MetricsContainer) + http_request_metrics = providers.Singleton(HTTPRequestMetrics, meter=metrics_container.meter) tracer = providers.Singleton(Tracer) tracer_provider = providers.Singleton(TracerProvider, resource=telemetry_resource) diff --git a/src/karapace/api/telemetry/metrics.py b/src/karapace/api/telemetry/metrics.py index 3db7232eb..99f87f4b3 100644 --- a/src/karapace/api/telemetry/metrics.py +++ b/src/karapace/api/telemetry/metrics.py @@ -5,7 +5,7 @@ from collections.abc import Mapping from fastapi import HTTPException, Request, Response -from karapace.api.telemetry.meter import Meter +from karapace.core.instrumentation.meter import Meter from opentelemetry.metrics import Counter, Histogram, UpDownCounter from typing import Final diff --git a/src/karapace/api/telemetry/setup.py b/src/karapace/api/telemetry/setup.py index 61aef45f2..a338e3da3 100644 --- a/src/karapace/api/telemetry/setup.py +++ b/src/karapace/api/telemetry/setup.py @@ -5,8 +5,9 @@ from dependency_injector.wiring import inject, Provide from karapace.api.telemetry.container import TelemetryContainer -from karapace.api.telemetry.meter import Meter +from karapace.core.instrumentation.meter import Meter from karapace.api.telemetry.tracer import Tracer +from karapace.core.metrics_container import MetricsContainer from opentelemetry import metrics, trace from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource @@ -29,8 +30,9 @@ def setup_tracing( @inject def setup_metering( - meter: Meter = Provide[TelemetryContainer.meter], + meter: Meter = Provide[MetricsContainer.meter], telemetry_resource: Resource = Provide[TelemetryContainer.telemetry_resource], ) -> None: LOG.info("Setting OTel meter provider") - metrics.set_meter_provider(MeterProvider(resource=telemetry_resource, metric_readers=[meter.get_metric_reader()])) + # metrics.set_meter_provider(MeterProvider(resource=telemetry_resource, metric_readers=[meter.get_metric_reader()])) + metrics.set_meter_provider(MeterProvider(resource=telemetry_resource, metric_readers=[Meter.get_metric_reader()])) diff --git a/src/karapace/api/user.py b/src/karapace/api/user.py index 5d6911517..813e5de8d 100644 --- a/src/karapace/api/user.py +++ b/src/karapace/api/user.py @@ -6,15 +6,16 @@ from dependency_injector.wiring import inject, Provide from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBasic, HTTPBasicCredentials -from karapace.api.container import SchemaRegistryContainer from karapace.core.auth import AuthenticationError, AuthenticatorAndAuthorizer, User from typing import Annotated +from karapace.core.auth_container import AuthContainer + @inject async def get_current_user( credentials: Annotated[HTTPBasicCredentials, Depends(HTTPBasic(auto_error=False))], - authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]), ) -> User | None: if authorizer.MUST_AUTHENTICATE and not credentials: raise HTTPException( diff --git a/src/karapace/core/auth.py b/src/karapace/core/auth.py index af293f9bb..da75210fb 100644 --- a/src/karapace/core/auth.py +++ b/src/karapace/core/auth.py @@ -10,7 +10,7 @@ from enum import Enum, unique from hmac import compare_digest from karapace.core.config import Config, InvalidConfiguration -from karapace.core.statsd import StatsClient +from karapace.core.stats import StatsClient from karapace.core.utils import json_decode, json_encode from typing import Protocol from typing_extensions import override, TypedDict diff --git a/src/karapace/core/auth_container.py b/src/karapace/core/auth_container.py new file mode 100644 index 000000000..d06daeebb --- /dev/null +++ b/src/karapace/core/auth_container.py @@ -0,0 +1,25 @@ +""" +Copyright (c) 2025 Aiven Ltd +See LICENSE for details +""" + +from dependency_injector import containers, providers +from karapace.core.auth import get_authorizer, HTTPAuthorizer, NoAuthAndAuthz +from karapace.core.container import KarapaceContainer + + +# def create_http_authorizer(config: Config) -> HTTPAuthorizer: +# return HTTPAuthorizer(auth_file=config.registry_authfile) + + +class AuthContainer(containers.DeclarativeContainer): + karapace_container = providers.Container(KarapaceContainer) + no_auth_authorizer = providers.Singleton(NoAuthAndAuthz) + http_authorizer = providers.Singleton(HTTPAuthorizer, auth_file=karapace_container.config().registry_authfile) + # http_authorizer = providers.Singleton(HTTPAuthorizer, create_http_authorizer) + authorizer = providers.Factory( + get_authorizer, + config=karapace_container.config, + http_authorizer=http_authorizer, + no_auth_authorizer=no_auth_authorizer, + ) diff --git a/src/karapace/core/container.py b/src/karapace/core/container.py index 4d9085b60..74f0976f5 100644 --- a/src/karapace/core/container.py +++ b/src/karapace/core/container.py @@ -5,28 +5,11 @@ from dependency_injector import containers, providers from karapace.api.forward_client import ForwardClient -from karapace.core.auth import get_authorizer, HTTPAuthorizer, NoAuthAndAuthz from karapace.core.config import Config from karapace.core.instrumentation.prometheus import PrometheusInstrumentation -from karapace.core.statsd import StatsClient class KarapaceContainer(containers.DeclarativeContainer): config = providers.Singleton(Config) - - statsd = providers.Singleton(StatsClient, config=config) - - no_auth_authorizer = providers.Singleton(NoAuthAndAuthz) - - http_authorizer = providers.Singleton(HTTPAuthorizer, auth_file=config().registry_authfile) - forward_client = providers.Singleton(ForwardClient) - - authorizer = providers.Factory( - get_authorizer, - config=config, - http_authorizer=http_authorizer, - no_auth_authorizer=no_auth_authorizer, - ) - prometheus = providers.Singleton(PrometheusInstrumentation) diff --git a/src/karapace/api/telemetry/meter.py b/src/karapace/core/instrumentation/meter.py similarity index 100% rename from src/karapace/api/telemetry/meter.py rename to src/karapace/core/instrumentation/meter.py diff --git a/src/karapace/core/metrics_container.py b/src/karapace/core/metrics_container.py new file mode 100644 index 000000000..8830a78eb --- /dev/null +++ b/src/karapace/core/metrics_container.py @@ -0,0 +1,15 @@ +""" +Copyright (c) 2025 Aiven Ltd +See LICENSE for details +""" + +from dependency_injector import containers, providers +from karapace.core.container import KarapaceContainer +from karapace.core.instrumentation.meter import Meter +from karapace.core.stats import StatsClient + + +class MetricsContainer(containers.DeclarativeContainer): + karapace_container = providers.Container(KarapaceContainer) + meter = providers.Singleton(Meter) + stats = providers.Singleton(StatsClient, config=karapace_container.config, meter=meter) diff --git a/src/karapace/core/schema_reader.py b/src/karapace/core/schema_reader.py index d90a7cd9d..843369792 100644 --- a/src/karapace/core/schema_reader.py +++ b/src/karapace/core/schema_reader.py @@ -5,7 +5,7 @@ See LICENSE for details """ -from __future__ import annotations +from __future__ import annotations, with_statement from aiokafka.errors import ( GroupAuthorizationFailedError, @@ -44,7 +44,8 @@ from karapace.core.protobuf.schema import ProtobufSchema from karapace.core.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.core.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents -from karapace.core.statsd import StatsClient + +from karapace.core.stats import StatsClient from karapace.core.typing import JsonObject, SchemaReaderStoppper, Subject, Version from karapace.core.utils import json_decode, JSONDecodeError, shutdown from threading import Event, Lock, Thread @@ -80,13 +81,6 @@ MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP: Final = 1 MESSAGE_CONSUME_TIMEOUT_SECONDS: Final = 0.2 -# Metric names -METRIC_SCHEMA_TOPIC_RECORDS_PROCESSED_COUNT: Final = "karapace_schema_reader_records_processed" -METRIC_SCHEMA_TOPIC_RECORDS_PER_KEYMODE_GAUGE: Final = "karapace_schema_reader_records_per_keymode" -METRIC_SCHEMAS_GAUGE: Final = "karapace_schema_reader_schemas" -METRIC_SUBJECTS_GAUGE: Final = "karapace_schema_reader_subjects" -METRIC_SUBJECT_DATA_SCHEMA_VERSIONS_GAUGE: Final = "karapace_schema_reader_subject_data_schema_versions" - class MessageType(Enum): config = "CONFIG" @@ -138,6 +132,7 @@ def __init__( offset_watcher: OffsetWatcher, key_formatter: KeyFormatter, database: KarapaceDatabase, + stats: StatsClient, master_coordinator: MasterCoordinator | None = None, ) -> None: Thread.__init__(self, name="schema-reader") @@ -151,7 +146,7 @@ def __init__( self.topic_replication_factor = self.config.replication_factor self.consumer: KafkaConsumer | None = None self._offset_watcher = offset_watcher - self.stats = StatsClient(config=config) + self.stats = stats self.kafka_error_handler: KafkaErrorHandler = KafkaErrorHandler(config=config) self._tracer = Tracer() @@ -510,15 +505,9 @@ def _report_schema_metrics( schema_records_processed_keymode_deprecated_karapace: int, ) -> None: # Update processing counter always. - self.stats.increase( - metric=METRIC_SCHEMA_TOPIC_RECORDS_PROCESSED_COUNT, - inc_value=schema_records_processed_keymode_canonical, - tags={"keymode": KeyMode.CANONICAL}, - ) - self.stats.increase( - metric=METRIC_SCHEMA_TOPIC_RECORDS_PROCESSED_COUNT, - inc_value=schema_records_processed_keymode_deprecated_karapace, - tags={"keymode": KeyMode.DEPRECATED_KARAPACE}, + self.stats.schema_records_processed( + with_canonical_key=schema_records_processed_keymode_canonical, + with_deprecated_key=schema_records_processed_keymode_deprecated_karapace, ) # Update following gauges only if there is a possibility of a change. @@ -526,32 +515,14 @@ def _report_schema_metrics( schema_records_processed_keymode_canonical or schema_records_processed_keymode_deprecated_karapace ) if records_processed: - self.processed_canonical_keys_total += schema_records_processed_keymode_canonical - self.stats.gauge( - metric=METRIC_SCHEMA_TOPIC_RECORDS_PER_KEYMODE_GAUGE, - value=self.processed_canonical_keys_total, - tags={"keymode": KeyMode.CANONICAL}, - ) - self.processed_deprecated_karapace_keys_total += schema_records_processed_keymode_deprecated_karapace - self.stats.gauge( - metric=METRIC_SCHEMA_TOPIC_RECORDS_PER_KEYMODE_GAUGE, - value=self.processed_deprecated_karapace_keys_total, - tags={"keymode": KeyMode.DEPRECATED_KARAPACE}, - ) num_schemas = self.database.num_schemas() num_subjects = self.database.num_subjects() - self.stats.gauge(metric=METRIC_SCHEMAS_GAUGE, value=num_schemas) - self.stats.gauge(metric=METRIC_SUBJECTS_GAUGE, value=num_subjects) + self.stats.set_schemas_num_total(value=num_schemas) + self.stats.set_subjects_num_total(value=num_subjects) + live_versions, soft_deleted_versions = self.database.num_schema_versions() - self.stats.gauge( - metric=METRIC_SUBJECT_DATA_SCHEMA_VERSIONS_GAUGE, - value=live_versions, - tags={"state": "live"}, - ) - self.stats.gauge( - metric=METRIC_SUBJECT_DATA_SCHEMA_VERSIONS_GAUGE, - value=soft_deleted_versions, - tags={"state": "soft_deleted"}, + self.stats.set_schema_versions_num_total( + live_versions=live_versions, soft_deleted_versions=soft_deleted_versions ) def _handle_msg_config(self, key: dict, value: dict | None) -> None: diff --git a/src/karapace/core/schema_registry.py b/src/karapace/core/schema_registry.py index a2bbdce74..04f84a187 100644 --- a/src/karapace/core/schema_registry.py +++ b/src/karapace/core/schema_registry.py @@ -40,6 +40,7 @@ ) from karapace.core.schema_reader import KafkaSchemaReader from karapace.core.schema_references import LatestVersionReference, Reference +from karapace.core.stats import StatsClient from karapace.core.typing import JsonObject, Mode, PrimaryInfo, SchemaId, Subject, Version import asyncio @@ -49,7 +50,7 @@ class KarapaceSchemaRegistry: - def __init__(self, config: Config) -> None: + def __init__(self, config: Config, stats: StatsClient) -> None: # TODO: compatibility was previously in mutable dict, fix the runtime config to be distinct from static config. self.config = config self._tracer = Tracer() @@ -68,6 +69,7 @@ def __init__(self, config: Config) -> None: key_formatter=self._key_formatter, master_coordinator=self.mc, database=self.database, + stats=stats, ) self.mc.set_stoppper(self.schema_reader) diff --git a/src/karapace/core/stats.py b/src/karapace/core/stats.py new file mode 100644 index 000000000..092347035 --- /dev/null +++ b/src/karapace/core/stats.py @@ -0,0 +1,91 @@ +""" +karapace - statistics + +Copyright (c) 2025 Aiven Ltd +See LICENSE for details +""" + +from __future__ import annotations + +from karapace.core.config import Config +from karapace.core.instrumentation.meter import Meter +from karapace.core.key_format import KeyMode +from karapace.core.sentry import get_sentry_client +from opentelemetry.metrics import Counter, _Gauge +from typing import Final, Mapping + +import logging + +LOG = logging.getLogger(__name__) + +# Metric names +METRIC_SCHEMA_TOPIC_RECORDS_PROCESSED_COUNT: Final = "karapace_schema_reader_records_processed_total" +METRIC_SCHEMAS_GAUGE: Final = "karapace_schema_reader_schemas_total" +METRIC_SUBJECTS_GAUGE: Final = "karapace_schema_reader_subjects_total" +METRIC_SUBJECT_DATA_SCHEMA_VERSIONS_GAUGE: Final = "karapace_schema_reader_subject_data_schema_versions_total" +METRIC_EXCEPTIONS = "karapace_exceptions_total" + + +class StatsClient: + """Core statistics and exception reporting for Karapace. + + Exception reporting uses Sentry integration if Sentry DSN is set. + """ + + def __init__(self, *, config: Config, meter: Meter) -> None: + self._tags: Mapping[str, str] = config.tags.dict() + self.sentry_client: Final = get_sentry_client(sentry_config=(config.sentry or None)) + self._meter = meter + + # Supports labels for keymode + self._schema_records_processed_counter: Final[Counter] = self._meter.get_meter().create_counter( + name=METRIC_SCHEMA_TOPIC_RECORDS_PROCESSED_COUNT, + description="Total processed schema records", + ) + self._total_schemas_gauge: Final[_Gauge] = self._meter.get_meter().create_gauge( + name=METRIC_SCHEMAS_GAUGE, + description="Total number of schemas", + ) + self._total_subjects_gauge: Final[_Gauge] = self._meter.get_meter().create_gauge( + name=METRIC_SUBJECTS_GAUGE, + description="Total number of subjects", + ) + self._schema_versions_gauge: Final[_Gauge] = self._meter.get_meter().create_gauge( + name=METRIC_SUBJECT_DATA_SCHEMA_VERSIONS_GAUGE, + description="Schema versions", + ) + self._exceptions_total: Final[Counter] = self._meter.get_meter().create_counter( + name=METRIC_EXCEPTIONS, description="Unexpected exceptions" + ) + + def schema_records_processed(self, *, with_canonical_key: int, with_deprecated_key: int) -> None: + self._schema_records_processed_counter.add( + amount=with_canonical_key, attributes={"keymode": KeyMode.CANONICAL.name, **self._tags} + ) + self._schema_records_processed_counter.add( + amount=with_deprecated_key, attributes={"keymode": KeyMode.DEPRECATED_KARAPACE.name, **self._tags} + ) + + def set_schemas_num_total(self, *, value: int) -> None: + self._total_schemas_gauge.set(amount=value, attributes=self._tags) + + def set_subjects_num_total(self, *, value: int) -> None: + self._total_subjects_gauge.set(amount=value, attributes=self._tags) + + def set_schema_versions_num_total(self, *, live_versions: int, soft_deleted_versions: int) -> None: + self._schema_versions_gauge.set(amount=live_versions, attributes={"state": "live", **self._tags}) + self._schema_versions_gauge.set(amount=soft_deleted_versions, attributes={"state": "soft_deleted", **self._tags}) + + def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None: + all_tags = { + "exception": ex.__class__.__name__, + "where": where, + **self._tags, + } + all_tags.update(tags or {}) + self._exceptions_total.add(amount=1, attributes=all_tags) + scope_args = {**(tags or {}), "where": where} + self.sentry_client.unexpected_exception(error=ex, where=where, tags=scope_args) + + def close(self) -> None: + self.sentry_client.close() diff --git a/src/karapace/rapu.py b/src/karapace/rapu.py index 5fbb47689..8f8c9f83f 100644 --- a/src/karapace/rapu.py +++ b/src/karapace/rapu.py @@ -11,7 +11,7 @@ from collections.abc import Callable from http import HTTPStatus from karapace.core.config import Config, create_server_ssl_context -from karapace.core.statsd import StatsClient +from karapace.statsd import StatsClient from karapace.core.utils import json_decode, json_encode from karapace.version import __version__ from typing import NoReturn, overload diff --git a/src/karapace/core/statsd.py b/src/karapace/statsd.py similarity index 98% rename from src/karapace/core/statsd.py rename to src/karapace/statsd.py index 31107364e..08fd72c83 100644 --- a/src/karapace/core/statsd.py +++ b/src/karapace/statsd.py @@ -26,7 +26,7 @@ class StatsClient: - def __init__(self, config: Config) -> None: + def __init__(self, *, config: Config) -> None: self._dest_addr: Final = (config.statsd_host, config.statsd_port) self._socket: Final = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._tags: Final[KarapaceTags] = config.tags diff --git a/tests/conftest.py b/tests/conftest.py index 3a84aeb2a..359267bbf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,7 +13,7 @@ from avro.compatibility import SchemaCompatibilityResult import karapace.api.controller -import karapace.api.telemetry.meter +import karapace.core.instrumentation.meter import karapace.api.telemetry.middleware import karapace.api.telemetry.setup import karapace.api.telemetry.tracer @@ -196,7 +196,7 @@ def fixture_karapace_container() -> KarapaceContainer: modules=[ karapace.api.controller, karapace.api.telemetry.tracer, - karapace.api.telemetry.meter, + karapace.core.instrumentation.meter, ] ) return karapace_container diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index a50479a29..759d41b4f 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -6,6 +6,8 @@ import asyncio from contextlib import closing from dataclasses import dataclass +from unittest.mock import Mock +from karapace.core.stats import StatsClient import pytest @@ -18,7 +20,7 @@ from karapace.core.key_format import KeyFormatter, KeyMode from karapace.core.offset_watcher import OffsetWatcher from karapace.core.schema_reader import KafkaSchemaReader -from karapace.core.typing import PrimaryInfo +from karapace.core.typing import PrimaryInfo, Subject from karapace.core.utils import json_encode from tests.base_testcase import BaseTestCase from tests.integration.test_master_coordinator import AlwaysAvailableSchemaReaderStoppper @@ -64,6 +66,7 @@ async def test_regression_soft_delete_schemas_should_be_registered( topic_name = new_random_name("topic") subject = create_subject_name_factory(test_name)() group_id = create_group_name_factory(test_name)() + stats_mock = Mock(spec=StatsClient) config = Config() config.bootstrap_uri = kafka_servers.bootstrap_servers[0] @@ -83,6 +86,7 @@ async def test_regression_soft_delete_schemas_should_be_registered( key_formatter=KeyFormatter(), master_coordinator=master_coordinator, database=database, + stats=stats_mock, ) schema_reader.start() @@ -113,7 +117,7 @@ async def test_regression_soft_delete_schemas_should_be_registered( schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5) - schemas = database.find_subject_schemas(subject=subject, include_deleted=True) + schemas = database.find_subject_schemas(subject=Subject(subject), include_deleted=True) assert len(schemas) == 1, "Deleted schemas must have been registered" # Produce a soft deleted schema, this is the regression test @@ -143,7 +147,7 @@ async def test_regression_soft_delete_schemas_should_be_registered( assert seen is True assert database.global_schema_id == test_global_schema_id - schemas = database.find_subject_schemas(subject=subject, include_deleted=True) + schemas = database.find_subject_schemas(subject=Subject(subject), include_deleted=True) assert len(schemas) == 2, "Deleted schemas must have been registered" finally: await master_coordinator.close() @@ -156,6 +160,7 @@ async def test_regression_config_for_inexisting_object_should_not_throw( test_name = "test_regression_config_for_inexisting_object_should_not_throw" subject = create_subject_name_factory(test_name)() group_id = create_group_name_factory(test_name)() + stats_mock = Mock(spec=StatsClient) config = Config() config.bootstrap_uri = kafka_servers.bootstrap_servers[0] @@ -174,6 +179,7 @@ async def test_regression_config_for_inexisting_object_should_not_throw( key_formatter=KeyFormatter(), master_coordinator=master_coordinator, database=database, + stats=stats_mock, ) schema_reader.start() @@ -198,7 +204,9 @@ async def test_regression_config_for_inexisting_object_should_not_throw( seen = schema_reader._offset_watcher.wait_for_offset(msg.offset(), timeout=5) assert seen is True - assert database.find_subject(subject=subject) is not None, "The above message should be handled gracefully" + assert ( + database.find_subject(subject=Subject(subject)) is not None + ), "The above message should be handled gracefully" finally: await master_coordinator.close() @@ -248,6 +256,7 @@ async def test_key_format_detection( producer: KafkaProducer, admin_client: KafkaAdminClient, ) -> None: + stats_mock = Mock(spec=StatsClient) group_id = create_group_name_factory(testcase.test_name)() test_topic = new_topic(admin_client) @@ -278,6 +287,7 @@ async def test_key_format_detection( key_formatter=key_formatter, master_coordinator=master_coordinator, database=database, + stats=stats_mock, ) schema_reader.start() diff --git a/tests/unit/api/telemetry/test_metrics.py b/tests/unit/api/telemetry/test_metrics.py index db98e71f4..0aba2ae7c 100644 --- a/tests/unit/api/telemetry/test_metrics.py +++ b/tests/unit/api/telemetry/test_metrics.py @@ -10,7 +10,7 @@ import pytest from fastapi import HTTPException, Request, Response -from karapace.api.telemetry.meter import Meter +from karapace.core.instrumentation.meter import Meter from karapace.api.telemetry.metrics import HTTPRequestMetrics diff --git a/tests/unit/api/telemetry/test_middleware.py b/tests/unit/api/telemetry/test_middleware.py index 687fdcaea..c922c9d6d 100644 --- a/tests/unit/api/telemetry/test_middleware.py +++ b/tests/unit/api/telemetry/test_middleware.py @@ -13,7 +13,7 @@ from fastapi import FastAPI, Request, Response from opentelemetry.trace import SpanKind, Status, StatusCode -from karapace.api.telemetry.meter import Meter +from karapace.core.instrumentation.meter import Meter from karapace.api.telemetry.metrics import HTTPRequestMetrics from karapace.api.telemetry.middleware import setup_telemetry_middleware, telemetry_middleware from karapace.api.telemetry.tracer import Tracer diff --git a/tests/unit/api/telemetry/test_meter.py b/tests/unit/core/instrumentation/test_meter.py similarity index 83% rename from tests/unit/api/telemetry/test_meter.py rename to tests/unit/core/instrumentation/test_meter.py index af7b6f24f..edf051e66 100644 --- a/tests/unit/api/telemetry/test_meter.py +++ b/tests/unit/core/instrumentation/test_meter.py @@ -9,13 +9,13 @@ from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, MetricExporter -from karapace.api.telemetry.meter import Meter, NOOPMetricExporter +from karapace.core.instrumentation.meter import Meter, NOOPMetricExporter from karapace.core.config import KarapaceTelemetry from karapace.core.container import KarapaceContainer def test_meter(karapace_container: KarapaceContainer): - with patch("karapace.api.telemetry.meter.metrics") as mock_metrics: + with patch("karapace.core.instrumentation.meter.metrics") as mock_metrics: Meter.get_meter(config=karapace_container.config()) mock_metrics.get_meter_provider.return_value.get_meter.assert_called_once_with("Karapace.meter") @@ -55,7 +55,7 @@ def test_get_metric_exporter_otlp(karapace_container: KarapaceContainer) -> None ) } ) - with patch("karapace.api.telemetry.meter.OTLPMetricExporter") as mock_otlp_exporter: + with patch("karapace.core.instrumentation.meter.OTLPMetricExporter") as mock_otlp_exporter: exporter: MetricExporter = Meter.get_metric_exporter(config=config) mock_otlp_exporter.assert_called_once_with(endpoint="http://otel:4317") assert exporter is mock_otlp_exporter.return_value @@ -66,8 +66,8 @@ def test_get_metric_reader_without_otel_endpoint(karapace_container: KarapaceCon new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url=None)} ) with ( - patch("karapace.api.telemetry.meter.NOOPMetricExporter") as mock_noop_exporter, - patch("karapace.api.telemetry.meter.PeriodicExportingMetricReader") as mock_periodic_exporting_metric_reader, + patch("karapace.core.instrumentation.meter.NOOPMetricExporter") as mock_noop_exporter, + patch("karapace.core.instrumentation.meter.PeriodicExportingMetricReader") as mock_periodic_exporting_metric_reader, ): reader = Meter.get_metric_reader(config=config) mock_noop_exporter.assert_called_once() @@ -88,8 +88,8 @@ def test_get_metric_reader_with_otel_endpoint(karapace_container: KarapaceContai } ) with ( - patch("karapace.api.telemetry.meter.OTLPMetricExporter") as mock_otlp_exporter, - patch("karapace.api.telemetry.meter.PeriodicExportingMetricReader") as mock_periodic_exporting_metric_reader, + patch("karapace.core.instrumentation.meter.OTLPMetricExporter") as mock_otlp_exporter, + patch("karapace.core.instrumentation.meter.PeriodicExportingMetricReader") as mock_periodic_exporting_metric_reader, ): reader = Meter.get_metric_reader(config=config) mock_otlp_exporter.assert_called_once_with(endpoint="http://otel:4317") diff --git a/tests/unit/test_in_memory_database.py b/tests/unit/test_in_memory_database.py index 18a14f9e0..ca7c7c9dd 100644 --- a/tests/unit/test_in_memory_database.py +++ b/tests/unit/test_in_memory_database.py @@ -9,6 +9,8 @@ from collections.abc import Sequence from pathlib import Path from typing import Final +from unittest.mock import Mock +from karapace.core.stats import StatsClient import pytest from confluent_kafka.cimpl import KafkaError @@ -223,6 +225,7 @@ def test_can_ingest_schemas_from_log(karapace_container: KarapaceContainer) -> N on a node running kafka that hosts the `_schemas` topic. """ + stats_mock = Mock(spec=StatsClient) restore_location = TEST_DATA_FOLDER / "schemas.log" schema_log = restore_location.read_text(encoding="utf-8").strip() @@ -233,6 +236,7 @@ def test_can_ingest_schemas_from_log(karapace_container: KarapaceContainer) -> N key_formatter=KeyFormatter(), master_coordinator=None, database=database, + stats=stats_mock, ) kafka_messages: list[AlwaysFineKafkaMessage] = [] diff --git a/tests/unit/test_rapu.py b/tests/unit/test_rapu.py index 5a57277ae..2b8c0a4c9 100644 --- a/tests/unit/test_rapu.py +++ b/tests/unit/test_rapu.py @@ -13,7 +13,7 @@ from karapace.core.container import KarapaceContainer from karapace.kafka_rest_apis.karapace import KarapaceBase -from karapace.core.statsd import StatsClient +from karapace.statsd import StatsClient from karapace.rapu import REST_ACCEPT_RE, REST_CONTENT_TYPE_RE, HTTPRequest diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index fc737973f..9b7920554 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -15,6 +15,7 @@ from unittest.mock import Mock import confluent_kafka +from karapace.core.stats import StatsClient import pytest from _pytest.logging import LogCaptureFixture from confluent_kafka import Message @@ -154,7 +155,8 @@ class ReadinessTestCase(BaseTestCase): ], ) def test_readiness_check(testcase: ReadinessTestCase, karapace_container: KarapaceContainer) -> None: - key_formatter_mock = Mock() + key_formatter_mock = Mock(spec=KeyFormatter) + stats_mock = Mock(spec=StatsClient) consumer_mock = Mock() consumer_mock.consume.return_value = [] # Return tuple (beginning, end), end offset is the next upcoming record offset @@ -167,6 +169,7 @@ def test_readiness_check(testcase: ReadinessTestCase, karapace_container: Karapa key_formatter=key_formatter_mock, master_coordinator=None, database=InMemoryDatabase(), + stats=stats_mock, ) schema_reader.consumer = consumer_mock schema_reader.offset = testcase.cur_offset @@ -176,7 +179,8 @@ def test_readiness_check(testcase: ReadinessTestCase, karapace_container: Karapa def test_num_max_messages_to_consume_moved_to_one_after_ready(karapace_container: KarapaceContainer) -> None: - key_formatter_mock = Mock() + key_formatter_mock = Mock(spec=KeyFormatter) + stats_mock = Mock(spec=StatsClient) consumer_mock = Mock() consumer_mock.consume.return_value = [] # Return tuple (beginning, end), end offset is the next upcoming record offset @@ -189,6 +193,7 @@ def test_num_max_messages_to_consume_moved_to_one_after_ready(karapace_container key_formatter=key_formatter_mock, master_coordinator=None, database=InMemoryDatabase(), + stats=stats_mock, ) schema_reader.consumer = consumer_mock schema_reader.offset = 0 @@ -203,6 +208,8 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche karapace_container: KarapaceContainer, ) -> None: key_formatter_mock = Mock(spec=KeyFormatter) + stats_mock = Mock(spec=StatsClient) + consumer_mock = Mock(spec=KafkaConsumer) schema_str = json.dumps( @@ -236,6 +243,7 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche key_formatter=key_formatter_mock, master_coordinator=None, database=InMemoryDatabase(), + stats=stats_mock, ) schema_reader.consumer = consumer_mock schema_reader.offset = 0 @@ -261,6 +269,7 @@ def test_soft_deleted_schema_storing(karapace_container: KarapaceContainer) -> N the soft deleted version of the schema is present. """ key_formatter_mock = Mock(spec=KeyFormatter) + stats_mock = Mock(spec=StatsClient) consumer_mock = Mock(spec=KafkaConsumer) soft_deleted_schema_record = Mock(spec=confluent_kafka.Message) soft_deleted_schema_record.error.return_value = None @@ -293,6 +302,7 @@ def test_soft_deleted_schema_storing(karapace_container: KarapaceContainer) -> N key_formatter=key_formatter_mock, master_coordinator=None, database=InMemoryDatabase(), + stats=stats_mock, ) schema_reader.consumer = consumer_mock schema_reader.offset = 0 @@ -304,6 +314,8 @@ def test_soft_deleted_schema_storing(karapace_container: KarapaceContainer) -> N def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture, karapace_container: KarapaceContainer) -> None: + key_formatter_mock = Mock(spec=KeyFormatter) + stats_mock = Mock(spec=StatsClient) database_mock = Mock(spec=InMemoryDatabase) database_mock.find_subject.return_value = True database_mock.find_subject_schemas.return_value = { @@ -312,9 +324,10 @@ def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture, karapace_cont schema_reader = KafkaSchemaReader( config=karapace_container.config(), offset_watcher=OffsetWatcher(), - key_formatter=KeyFormatter(), + key_formatter=key_formatter_mock, master_coordinator=None, database=database_mock, + stats=stats_mock, ) with caplog.at_level(logging.WARNING, logger="karapace.core.schema_reader"): @@ -379,7 +392,8 @@ async def test_schema_reader_health_check( testcase: HealthCheckTestCase, monkeypatch: MonkeyPatch, karapace_container: KarapaceContainer ) -> None: offset_watcher = OffsetWatcher() - key_formatter_mock = Mock() + key_formatter_mock = Mock(spec=KeyFormatter) + stats_mock = Mock(spec=StatsClient) admin_client_mock = Mock() emtpy_future = Future() @@ -395,6 +409,7 @@ async def test_schema_reader_health_check( key_formatter=key_formatter_mock, master_coordinator=None, database=InMemoryDatabase(), + stats=stats_mock, ) monkeypatch.setattr(time, "monotonic", lambda: testcase.current_time) @@ -407,10 +422,10 @@ async def test_schema_reader_health_check( @dataclass class KafkaMessageHandlingErrorTestCase(BaseTestCase): - key: bytes - value: bytes - schema_type: SchemaType - message_type: MessageType + key: bytes | None + value: bytes | None + schema_type: SchemaType | None + message_type: MessageType | None expected_error: ShutdownException expected_log_message: str @@ -421,6 +436,7 @@ def fixture_schema_reader_with_consumer_messages_factory( ) -> Callable[[tuple[list[Message]]], KafkaSchemaReader]: def factory(consumer_messages: tuple[list[Message]]) -> KafkaSchemaReader: key_formatter_mock = Mock(spec=KeyFormatter) + stats_mock = Mock(spec=StatsClient) consumer_mock = Mock(spec=KafkaConsumer) consumer_mock.consume.side_effect = consumer_messages @@ -437,6 +453,7 @@ def factory(consumer_messages: tuple[list[Message]]) -> KafkaSchemaReader: key_formatter=key_formatter_mock, master_coordinator=None, database=InMemoryDatabase(), + stats=stats_mock, ) schema_reader.consumer = consumer_mock schema_reader.offset = 0