Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use otel in core metrics #1035

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 44 additions & 12 deletions src/karapace/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from karapace.api.container import SchemaRegistryContainer
from karapace.api.factory import create_karapace_application, karapace_schema_registry_lifespan
from karapace.api.telemetry.container import TelemetryContainer
from karapace.core.auth_container import AuthContainer
from karapace.core.container import KarapaceContainer

import karapace.api.controller
Expand All @@ -18,48 +19,79 @@
import karapace.api.routers.mode
import karapace.api.routers.schemas
import karapace.api.routers.subjects
import karapace.api.telemetry.meter
import karapace.core.instrumentation.meter
import karapace.api.telemetry.middleware
import karapace.api.telemetry.setup
import karapace.api.telemetry.tracer
import karapace.api.user
import uvicorn

from karapace.core.metrics_container import MetricsContainer

if __name__ == "__main__":
karapace_container = KarapaceContainer()
karapace_container.wire(
modules=[
__name__,
karapace.api.controller,
karapace.api.telemetry.tracer,
karapace.api.telemetry.meter,
karapace.core.instrumentation.meter,
]
)

telemetry_container = TelemetryContainer(karapace_container=karapace_container)
telemetry_container.wire(
auth_container = AuthContainer(
karapace_container=karapace_container,
)
auth_container.wire(
modules=[
karapace.api.controller,
karapace.api.factory,
karapace.api.routers.compatibility,
karapace.api.routers.config,
karapace.api.routers.mode,
karapace.api.routers.schemas,
karapace.api.routers.subjects,
karapace.api.user,
]
)

metrics_container = MetricsContainer(
karapace_container=karapace_container,
)
metrics_container.wire(
modules=[
karapace.api.factory,
karapace.api.telemetry.setup,
]
)

telemetry_container = TelemetryContainer(
karapace_container=karapace_container,
metrics_container=metrics_container,
)
telemetry_container.wire(
modules=[
karapace.api.telemetry.middleware,
karapace.api.telemetry.setup,
]
)

schema_registry_container = SchemaRegistryContainer(
karapace_container=karapace_container, telemetry_container=telemetry_container
karapace_container=karapace_container,
metrics_container=metrics_container,
telemetry_container=telemetry_container,
)
schema_registry_container.wire(
modules=[
__name__,
karapace.api.factory,
karapace.api.user,
karapace.api.routers.compatibility,
karapace.api.routers.config,
karapace.api.routers.health,
karapace.api.routers.master_availability,
karapace.api.routers.metrics,
karapace.api.routers.subjects,
karapace.api.routers.schemas,
karapace.api.routers.config,
karapace.api.routers.compatibility,
karapace.api.routers.mode,
karapace.api.routers.master_availability,
karapace.api.routers.schemas,
karapace.api.routers.subjects,
]
)

Expand Down
10 changes: 8 additions & 2 deletions src/karapace/api/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@
from karapace.api.controller import KarapaceSchemaRegistryController
from karapace.api.telemetry.container import TelemetryContainer
from karapace.core.container import KarapaceContainer
from karapace.core.metrics_container import MetricsContainer
from karapace.core.schema_registry import KarapaceSchemaRegistry


class SchemaRegistryContainer(containers.DeclarativeContainer):
karapace_container = providers.Container(KarapaceContainer)
metrics_container = providers.Container(MetricsContainer)
telemetry_container = providers.Container(TelemetryContainer)

schema_registry = providers.Singleton(KarapaceSchemaRegistry, config=karapace_container.config)
schema_registry = providers.Singleton(
KarapaceSchemaRegistry,
config=karapace_container.config,
stats=metrics_container.stats,
)

schema_registry_controller = providers.Singleton(
KarapaceSchemaRegistryController,
config=karapace_container.config,
schema_registry=schema_registry,
stats=karapace_container.statsd,
stats=metrics_container.stats,
)
14 changes: 6 additions & 8 deletions src/karapace/api/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
SubjectVersion,
)
from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User
from karapace.core.auth_container import AuthContainer
from karapace.core.compatibility import CompatibilityModes
from karapace.core.compatibility.jsonschema.checks import is_incompatible
from karapace.core.compatibility.schema_compatibility import SchemaCompatibility
from karapace.core.config import Config
from karapace.core.container import KarapaceContainer
from karapace.core.errors import (
IncompatibleSchema,
InvalidReferences,
Expand Down Expand Up @@ -57,7 +57,7 @@
)
from karapace.core.schema_references import LatestVersionReference, Reference
from karapace.core.schema_registry import KarapaceSchemaRegistry
from karapace.core.statsd import StatsClient
from karapace.core.stats import StatsClient
from karapace.core.typing import JsonData, JsonObject, SchemaId, Subject, Version
from karapace.core.utils import JSONDecodeError
from typing import Any, cast
Expand All @@ -71,8 +71,6 @@

class KarapaceSchemaRegistryController:
def __init__(self, config: Config, schema_registry: KarapaceSchemaRegistry, stats: StatsClient) -> None:
# super().__init__(config=config, not_ready_handler=self._forward_if_not_ready_to_serve)

self.config = config
self._process_start_time = time.monotonic()
self.stats = stats
Expand Down Expand Up @@ -157,7 +155,7 @@ async def schemas_list(
deleted: bool,
latest_only: bool,
user: User | None,
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
) -> list[SchemaListingItem]:
schemas = await self.schema_registry.schemas_list(include_deleted=deleted, latest_only=latest_only)
response_schemas: list[SchemaListingItem] = []
Expand Down Expand Up @@ -190,7 +188,7 @@ async def schemas_get(
include_subjects: bool,
format_serialized: str,
user: User | None,
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
) -> SchemasResponse:
try:
parsed_schema_id = SchemaId(int(schema_id))
Expand Down Expand Up @@ -267,7 +265,7 @@ async def schemas_get_versions(
schema_id: str,
deleted: bool,
user: User | None,
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
) -> list[SubjectVersion]:
try:
schema_id_int = SchemaId(int(schema_id))
Expand Down Expand Up @@ -387,7 +385,7 @@ async def subjects_list(
self,
deleted: bool,
user: User | None,
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
) -> list[str]:
subjects = [str(subject) for subject in self.schema_registry.database.find_subjects(include_deleted=deleted)]
if authorizer:
Expand Down
12 changes: 7 additions & 5 deletions src/karapace/api/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
from karapace.api.routers.setup import setup_routers
from karapace.api.telemetry.setup import setup_metering, setup_tracing
from karapace.core.auth import AuthenticatorAndAuthorizer
from karapace.core.auth_container import AuthContainer
from karapace.core.config import Config
from karapace.core.logging_setup import configure_logging, log_config_without_secrets
from karapace.core.metrics_container import MetricsContainer
from karapace.core.schema_registry import KarapaceSchemaRegistry
from karapace.core.statsd import StatsClient
from karapace.core.stats import StatsClient
from typing import AsyncContextManager

import logging
Expand All @@ -29,20 +31,20 @@
async def karapace_schema_registry_lifespan(
_: FastAPI,
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
stastd: StatsClient = Depends(Provide[SchemaRegistryContainer.karapace_container.statsd]),
stats: StatsClient = Depends(Provide[MetricsContainer.stats]),
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
) -> AsyncGenerator[None, None]:
try:
await schema_registry.start()
await authorizer.start(stats=stastd)
await authorizer.start(stats=stats)

yield
finally:
await schema_registry.close()
await authorizer.close()
await forward_client.close()
stastd.close()
stats.close()


def create_karapace_application(
Expand Down
3 changes: 2 additions & 1 deletion src/karapace/api/routers/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from karapace.api.routers.requests import CompatibilityCheckResponse, SchemaRequest
from karapace.api.user import get_current_user
from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User
from karapace.core.auth_container import AuthContainer
from karapace.core.typing import Subject
from typing import Annotated
from urllib.parse import unquote_plus
Expand All @@ -31,7 +32,7 @@ async def compatibility_post(
version: str, # TODO support actual Version object
schema_request: SchemaRequest,
user: Annotated[User, Depends(get_current_user)],
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> CompatibilityCheckResponse:
subject = Subject(unquote_plus(subject))
Expand Down
11 changes: 6 additions & 5 deletions src/karapace/api/routers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from karapace.api.routers.requests import CompatibilityLevelResponse, CompatibilityRequest, CompatibilityResponse
from karapace.api.user import get_current_user
from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User
from karapace.core.auth_container import AuthContainer
from karapace.core.schema_registry import KarapaceSchemaRegistry
from karapace.core.typing import Subject
from typing import Annotated
Expand All @@ -30,7 +31,7 @@
@inject
async def config_get(
user: Annotated[User, Depends(get_current_user)],
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> CompatibilityLevelResponse:
if authorizer and not authorizer.check_authorization(user, Operation.Read, "Config:"):
Expand All @@ -47,7 +48,7 @@ async def config_put(
user: Annotated[User, Depends(get_current_user)],
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> CompatibilityResponse:
if authorizer and not authorizer.check_authorization(user, Operation.Write, "Config:"):
Expand All @@ -69,7 +70,7 @@ async def config_get_subject(
subject: Subject,
user: Annotated[User, Depends(get_current_user)],
defaultToGlobal: bool = False,
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> CompatibilityLevelResponse:
subject = Subject(unquote_plus(subject))
Expand All @@ -88,7 +89,7 @@ async def config_set_subject(
user: Annotated[User, Depends(get_current_user)],
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> CompatibilityResponse:
subject = Subject(unquote_plus(subject))
Expand All @@ -113,7 +114,7 @@ async def config_delete_subject(
user: Annotated[User, Depends(get_current_user)],
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> CompatibilityResponse:
subject = Subject(unquote_plus(subject))
Expand Down
5 changes: 3 additions & 2 deletions src/karapace/api/routers/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from karapace.api.routers.requests import ModeResponse
from karapace.api.user import get_current_user
from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User
from karapace.core.auth_container import AuthContainer
from karapace.core.typing import Subject
from typing import Annotated
from urllib.parse import unquote_plus
Expand All @@ -28,7 +29,7 @@
@inject
async def mode_get(
user: Annotated[User, Depends(get_current_user)],
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> ModeResponse:
if authorizer and not authorizer.check_authorization(user, Operation.Read, "Config:"):
Expand All @@ -42,7 +43,7 @@ async def mode_get(
async def mode_get_subject(
subject: Subject,
user: Annotated[User, Depends(get_current_user)],
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> ModeResponse:
subject = Subject(unquote_plus(subject))
Expand Down
8 changes: 5 additions & 3 deletions src/karapace/api/routers/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from karapace.core.auth import AuthenticatorAndAuthorizer, User
from typing import Annotated

from karapace.core.auth_container import AuthContainer

schemas_router = APIRouter(
prefix="/schemas",
tags=["schemas"],
Expand All @@ -26,7 +28,7 @@ async def schemas_get_list(
user: Annotated[User, Depends(get_current_user)],
deleted: bool = False,
latestOnly: bool = False,
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> list[SchemaListingItem]:
return await controller.schemas_list(
Expand All @@ -45,7 +47,7 @@ async def schemas_get(
includeSubjects: bool = False, # TODO: include subjects?
fetchMaxId: bool = False, # TODO: fetch max id?
format_serialized: str = Query("", alias="format"),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> SchemasResponse:
return await controller.schemas_get(
Expand All @@ -72,7 +74,7 @@ async def schemas_get_versions(
user: Annotated[User, Depends(get_current_user)],
schema_id: str,
deleted: bool = False,
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
) -> list[SubjectVersion]:
return await controller.schemas_get_versions(
Expand Down
Loading
Loading