diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 688f24fe1..450903ecd 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -7,12 +7,13 @@ from _pytest.fixtures import SubRequest from aiohttp.pytest_plugin import AiohttpClient from aiohttp.test_utils import TestClient -from contextlib import closing, ExitStack +from contextlib import asynccontextmanager, closing, ExitStack from dataclasses import asdict from filelock import FileLock from kafka import KafkaProducer from karapace.client import Client -from karapace.config import Config, set_config_defaults, write_config +from karapace.config import Config, ConfigDefaults, set_config_defaults, write_config +from karapace.dataclasses import default_dataclass from karapace.kafka_admin import KafkaAdminClient, NewTopic from karapace.kafka_rest_apis import KafkaRest from pathlib import Path @@ -30,7 +31,7 @@ from tests.integration.utils.synchronization import lock_path_for from tests.integration.utils.zookeeper import configure_and_start_zk from tests.utils import repeat_until_successful_request -from typing import AsyncIterator, Iterator, List, Optional +from typing import AsyncContextManager, AsyncIterator, Callable, Iterator, List, Optional from urllib.parse import urlparse import asyncio @@ -215,13 +216,14 @@ def fixture_admin(kafka_servers: KafkaServers) -> Iterator[KafkaAdminClient]: yield KafkaAdminClient(bootstrap_servers=kafka_servers.bootstrap_servers) -@pytest.fixture(scope="function", name="rest_async") -async def fixture_rest_async( +@asynccontextmanager +async def _kafka_rest_async( request: SubRequest, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument tmp_path: Path, kafka_servers: KafkaServers, registry_async_client: Client, + custom_values: Optional[ConfigDefaults] = None, ) -> AsyncIterator[Optional[KafkaRest]]: # Do not start a REST api when the user provided an external service. Doing # so would cause this node to join the existing group and participate in @@ -234,14 +236,17 @@ async def fixture_rest_async( config_path = tmp_path / "karapace_config.json" - config = set_config_defaults( - { - "admin_metadata_max_age": 2, - "bootstrap_uri": kafka_servers.bootstrap_servers, - # Use non-default max request size for REST producer. - "producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES, - } - ) + override_values = { + "admin_metadata_max_age": 2, + "bootstrap_uri": kafka_servers.bootstrap_servers, + # Use non-default max request size for REST producer. + "producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES, + } + + if custom_values is not None: + override_values.update(custom_values) + + config = set_config_defaults(override_values) write_config(config_path, config) rest = KafkaRest(config=config) @@ -253,8 +258,49 @@ async def fixture_rest_async( await rest.close() -@pytest.fixture(scope="function", name="rest_async_client") -async def fixture_rest_async_client( +@pytest.fixture(scope="function", name="rest_async") +async def fixture_rest_async( + request: SubRequest, + loop: asyncio.AbstractEventLoop, + tmp_path: Path, + kafka_servers: KafkaServers, + registry_async_client: Client, +) -> AsyncIterator[Optional[KafkaRest]]: + async with _kafka_rest_async( + request, + loop, + tmp_path, + kafka_servers, + registry_async_client, + ) as kafka_rest_async: + yield kafka_rest_async + + +@pytest.fixture(scope="function", name="rest_async_from_config") +def fixture_rest_async_from_config( + request: SubRequest, + loop: asyncio.AbstractEventLoop, + tmp_path: Path, + kafka_servers: KafkaServers, + registry_async_client_from_config: Callable[[ConfigDefaults], AsyncIterator[RegistryDescription]], +) -> Callable[[ConfigDefaults], AsyncContextManager[Optional[KafkaRest]]]: + @asynccontextmanager + async def async_kafka_from_custom_config(config: ConfigDefaults) -> KafkaRest: + async with registry_async_client_from_config(config) as registry_async_client: + async with _kafka_rest_async( + request, + loop, + tmp_path, + kafka_servers, + registry_async_client, + ) as kafka_rest_async: + yield kafka_rest_async + + return async_kafka_from_custom_config + + +@asynccontextmanager +async def _rest_async_client( request: SubRequest, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument rest_async: KafkaRest, @@ -288,6 +334,43 @@ async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument await client.close() +@pytest.fixture(scope="function", name="rest_async_client") +async def fixture_rest_async_client( + request: SubRequest, + loop: asyncio.AbstractEventLoop, + rest_async: KafkaRest, + aiohttp_client: AiohttpClient, +) -> AsyncIterator[Client]: + async with _rest_async_client( + request, + loop, + rest_async, + aiohttp_client, + ) as rest_async_client: + yield rest_async_client + + +@pytest.fixture(scope="function", name="rest_async_client_from_config") +async def fixture_rest_async_client_from_config( + request: SubRequest, + loop: asyncio.AbstractEventLoop, + rest_async_from_config: Callable[[ConfigDefaults], AsyncContextManager[Optional[KafkaRest]]], + aiohttp_client: AiohttpClient, +) -> Callable[[ConfigDefaults], AsyncContextManager[Client]]: + @asynccontextmanager + async def async_client_from_custom_config(config: ConfigDefaults) -> Client: + async with rest_async_from_config(config) as rest_async: + async with _rest_async_client( + request, + loop, + rest_async, + aiohttp_client, + ) as rest_async_client: + yield rest_async_client + + return async_client_from_custom_config + + @pytest.fixture(scope="function", name="rest_async_novalidation") async def fixture_rest_async_novalidation( request: SubRequest, @@ -453,13 +536,14 @@ async def fixture_registry_async_pair( yield [server.endpoint.to_url() for server in endpoints] -@pytest.fixture(scope="function", name="registry_cluster") -async def fixture_registry_cluster( +@asynccontextmanager +async def _registry_cluster( request: SubRequest, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument session_logdir: Path, kafka_servers: KafkaServers, port_range: PortRangeInclusive, + custom_values: Optional[ConfigDefaults] = None, ) -> AsyncIterator[RegistryDescription]: # Do not start a registry when the user provided an external service. Doing # so would cause this node to join the existing group and participate in @@ -477,12 +561,56 @@ async def fixture_registry_cluster( config_templates=[config], data_dir=session_logdir / _clear_test_name(request.node.name), port_range=port_range, + custom_values=custom_values, ) as servers: yield servers[0] -@pytest.fixture(scope="function", name="registry_async_client") -async def fixture_registry_async_client( +@pytest.fixture(scope="function", name="registry_cluster") +async def fixture_registry_cluster( + request: SubRequest, + loop: asyncio.AbstractEventLoop, + session_logdir: Path, + kafka_servers: KafkaServers, + port_range: PortRangeInclusive, + custom_values: Optional[ConfigDefaults] = None, +) -> AsyncIterator[RegistryDescription]: + async with _registry_cluster( + request, + loop, + session_logdir, + kafka_servers, + port_range, + custom_values, + ) as registry_description: + yield registry_description + + +@pytest.fixture(scope="function", name="registry_cluster_from_custom_config") +def fixture_registry_cluster_with_custom_config( + request: SubRequest, + loop: asyncio.AbstractEventLoop, + session_logdir: Path, + kafka_servers: KafkaServers, + port_range: PortRangeInclusive, +) -> Callable[[ConfigDefaults], AsyncContextManager[RegistryDescription]]: + @asynccontextmanager + async def registry_from_custom_config(config: ConfigDefaults) -> RegistryDescription: + async with _registry_cluster( + request, + loop, + session_logdir, + kafka_servers, + port_range, + config, + ) as registry_description: + yield registry_description + + return registry_from_custom_config + + +@asynccontextmanager +async def _registry_async_client( request: SubRequest, registry_cluster: RegistryDescription, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument @@ -508,6 +636,80 @@ async def fixture_registry_async_client( await client.close() +@pytest.fixture(scope="function", name="registry_async_client") +async def fixture_registry_async_client( + request: SubRequest, + registry_cluster: RegistryDescription, + loop: asyncio.AbstractEventLoop, +) -> Client: + async with _registry_async_client( + request, + registry_cluster, + loop, + ) as client: + yield client + + +@pytest.fixture(scope="function", name="registry_async_client_from_config") +def fixture_registry_async_client_custom_config( + request: SubRequest, + registry_cluster_from_custom_config: Callable[[ConfigDefaults], AsyncIterator[RegistryDescription]], + loop: asyncio.AbstractEventLoop, +) -> Callable[[ConfigDefaults], AsyncContextManager[Client]]: + @asynccontextmanager + async def client_from_custom_config(config: ConfigDefaults) -> Client: + async with registry_cluster_from_custom_config(config) as registry_description: + async with _registry_async_client(request, registry_description, loop) as client: + yield client + + return client_from_custom_config + + +@default_dataclass +class RestClientAndRegistryClient: + registry_client: Client + rest_client: Client + + +@pytest.fixture(scope="function", name="rest_async_client_and_rest_async_client_from_config") +def fixture_rest_async_client_and_rest_async_client_from_config( + request: SubRequest, + loop: asyncio.AbstractEventLoop, + aiohttp_client: AiohttpClient, + session_logdir: Path, + kafka_servers: KafkaServers, + tmp_path: Path, + port_range: PortRangeInclusive, +) -> Callable[[ConfigDefaults], AsyncContextManager[RestClientAndRegistryClient]]: + @asynccontextmanager + async def client_from_custom_config(config: ConfigDefaults) -> RestClientAndRegistryClient: + # ugly but without python 3.9 we cannot join those :( + async with _registry_cluster( + request, + loop, + session_logdir, + kafka_servers, + port_range, + config, + ) as registry_description: + async with _registry_async_client(request, registry_description, loop) as registry_async_client: + async with _kafka_rest_async( + request, loop, tmp_path, kafka_servers, registry_async_client, config + ) as kafka_rest_async: + async with _rest_async_client( + request, + loop, + kafka_rest_async, + aiohttp_client, + ) as rest_async_client: + yield RestClientAndRegistryClient( + registry_client=registry_async_client, + rest_client=rest_async_client, + ) + + return client_from_custom_config + + @pytest.fixture(scope="function", name="credentials_folder") def fixture_credentials_folder() -> str: integration_test_folder = os.path.dirname(__file__) diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index d19c259ec..26cb3e1c4 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -6,12 +6,16 @@ from kafka import KafkaProducer from karapace.client import Client +from karapace.config import ConfigDefaults from karapace.kafka_admin import KafkaAdminClient from karapace.kafka_rest_apis import KafkaRest, SUBJECT_VALID_POSTFIX from karapace.schema_models import ValidatedTypedSchema from karapace.schema_type import SchemaType +from karapace.serialization import get_subject_name +from karapace.typing import NameStrategy, SubjectType from karapace.version import __version__ -from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES +from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES, RestClientAndRegistryClient +from tests.unit.test_serialization import TYPED_AVRO_SCHEMA, TYPED_PROTOBUF_SCHEMA from tests.utils import ( new_random_name, new_topic, @@ -23,10 +27,12 @@ test_objects_avro_evolution, wait_for_topics, ) +from typing import AsyncContextManager, Callable import asyncio import base64 import json +import pytest import time NEW_TOPIC_TIMEOUT = 10 @@ -278,22 +284,289 @@ async def test_list_topics(rest_async_client, admin_client) -> None: assert tn1 in topic_list and tn2 in topic_list, f"Topic list contains all topics tn1={tn1} and tn2={tn2}" -async def test_publish(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: +async def test_publish_topic_json(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: + topic = new_topic(admin_client) + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + partition_url = f"/topics/{topic}/partitions/0" + res = await rest_async_client.post( + partition_url, + json={"records": [{"value": {"foo": "bar"}}]}, + headers=REST_HEADERS["json"], + ) + res_json = res.json() + assert res.ok + assert "offsets" in res_json + for o in res_json["offsets"]: + assert "partition" in o + assert int(o["partition"]) == 0 + + +async def test_publish_topic_binary(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: topic = new_topic(admin_client) await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) - topic_url = f"/topics/{topic}" partition_url = f"/topics/{topic}/partitions/0" - # Proper Json / Binary - for url in [topic_url, partition_url]: - for payload, h in [({"value": {"foo": "bar"}}, "json"), ({"value": "Zm9vCg=="}, "binary")]: - res = await rest_async_client.post(url, json={"records": [payload]}, headers=REST_HEADERS[h]) - res_json = res.json() - assert res.ok - assert "offsets" in res_json - if "partition" in url: - for o in res_json["offsets"]: - assert "partition" in o - assert o["partition"] == 0 + res = await rest_async_client.post( + partition_url, json={"records": [{"value": "Zm9vCg=="}]}, headers=REST_HEADERS["binary"] + ) + res_json = res.json() + assert res.ok + assert "offsets" in res_json + for o in res_json["offsets"]: + assert "partition" in o + assert int(o["partition"]) == 0 + + +async def test_publish_partition_json(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: + topic = new_topic(admin_client) + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + topic_url = f"/topics/{topic}" + + res = await rest_async_client.post( + topic_url, + json={"records": [{"value": {"foo": "bar"}}]}, + headers=REST_HEADERS["json"], + ) + res_json = res.json() + assert res.ok + assert "offsets" in res_json + + +async def test_publish_partition_binary(rest_async_client: Client, admin_client: KafkaAdminClient) -> None: + topic = new_topic(admin_client) + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + topic_url = f"/topics/{topic}" + res = await rest_async_client.post( + topic_url, + json={"records": [{"value": "Zm9vCg=="}]}, + headers=REST_HEADERS["binary"], + ) + res_json = res.json() + assert res.ok + for o in res_json["offsets"]: + assert "partition" in o + assert int(o["partition"]) == 0 + + +@pytest.mark.parametrize( + "name_strategy", + ( + (NameStrategy.topic_name), + (NameStrategy.record_name), + (NameStrategy.topic_record_name), + ), +) +async def test_publish_partition_protobuf( + rest_async_client_and_rest_async_client_from_config: Callable[ + [ConfigDefaults], AsyncContextManager[RestClientAndRegistryClient] + ], + admin_client: KafkaAdminClient, + name_strategy: NameStrategy, +) -> None: + topic = new_topic(admin_client) + async with rest_async_client_and_rest_async_client_from_config( + {"name_strategy": name_strategy} + ) as async_rest_and_registry_client: + registry_async_client = async_rest_and_registry_client.registry_client + rest_async_client = async_rest_and_registry_client.rest_client + + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + topic_url = f"/topics/{topic}/partitions/0" + + res = await registry_async_client.post( + f"subjects/{new_random_name('random_subject')}/versions", + json={"schemaType": "PROTOBUF", "schema": TYPED_PROTOBUF_SCHEMA.schema_str}, + ) + assert res.ok + + message = [{"value": {"attr1": "Value for attr1", "attr2": "Value for attr2"}}] + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["protobuf"], + ) + assert res.status_code == 422 + + res = await registry_async_client.post( + f"subjects/{get_subject_name(topic, TYPED_PROTOBUF_SCHEMA, SubjectType.value_, name_strategy)}/versions", + json={"schemaType": "PROTOBUF", "schema": TYPED_PROTOBUF_SCHEMA.schema_str}, + ) + assert res.ok + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["protobuf"], + ) + assert res.ok + for o in res.json()["offsets"]: + assert "partition" in o + assert int(o["partition"]) == 0 + + +@pytest.mark.parametrize( + "name_strategy", + ( + (NameStrategy.topic_name), + (NameStrategy.record_name), + (NameStrategy.topic_record_name), + ), +) +async def test_publish_topic_protobuf( + rest_async_client_and_rest_async_client_from_config: Callable[ + [ConfigDefaults], AsyncContextManager[RestClientAndRegistryClient] + ], + admin_client: KafkaAdminClient, + name_strategy: NameStrategy, +) -> None: + topic = new_topic(admin_client) + async with rest_async_client_and_rest_async_client_from_config( + {"name_strategy": name_strategy} + ) as async_rest_and_registry_client: + registry_async_client = async_rest_and_registry_client.registry_client + rest_async_client = async_rest_and_registry_client.rest_client + + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + topic_url = f"/topics/{topic}/partitions/0" + + res = await registry_async_client.post( + f"subjects/{new_random_name('random_subject')}/versions", + json={"schemaType": "PROTOBUF", "schema": TYPED_PROTOBUF_SCHEMA.schema_str}, + ) + assert res.ok + + message = [{"value": {"attr1": "Value for attr1", "attr2": "Value for attr2"}}] + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["protobuf"], + ) + assert res.status_code == 422 + + res = await registry_async_client.post( + f"subjects/{get_subject_name(topic, TYPED_PROTOBUF_SCHEMA, SubjectType.value_, name_strategy)}/versions", + json={"schemaType": "PROTOBUF", "schema": TYPED_PROTOBUF_SCHEMA.schema_str}, + ) + assert res.ok + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["protobuf"], + ) + assert res.ok + + +@pytest.mark.parametrize( + "name_strategy", + ( + (NameStrategy.topic_name), + (NameStrategy.record_name), + (NameStrategy.topic_record_name), + ), +) +async def test_publish_topic_avro( + rest_async_client_and_rest_async_client_from_config: Callable[ + [ConfigDefaults], AsyncContextManager[RestClientAndRegistryClient] + ], + admin_client: KafkaAdminClient, + name_strategy: NameStrategy, +) -> None: + topic = new_topic(admin_client) + async with rest_async_client_and_rest_async_client_from_config( + {"name_strategy": name_strategy} + ) as async_rest_and_registry_client: + registry_async_client = async_rest_and_registry_client.registry_client + rest_async_client = async_rest_and_registry_client.rest_client + + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + topic_url = f"/topics/{topic}/partitions/0" + + res = await registry_async_client.post( + f"subjects/{new_random_name('random_subject')}/versions", + json={"schemaType": "AVRO", "schema": TYPED_AVRO_SCHEMA.schema_str}, + ) + assert res.ok + + message = [{"value": {"attr1": {"string": "sample data"}, "attr2": None}}] + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["avro"], + ) + assert res.status_code == 422 + + res = await registry_async_client.post( + f"subjects/{get_subject_name(topic, TYPED_AVRO_SCHEMA, SubjectType.value_, name_strategy)}/versions", + json={"schemaType": "AVRO", "schema": TYPED_AVRO_SCHEMA.schema_str}, + ) + assert res.ok + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["avro"], + ) + assert res.ok + + +@pytest.mark.parametrize( + "name_strategy", + ( + (NameStrategy.topic_name), + (NameStrategy.record_name), + (NameStrategy.topic_record_name), + ), +) +async def test_publish_partition_avro( + rest_async_client_and_rest_async_client_from_config: Callable[ + [ConfigDefaults], AsyncContextManager[RestClientAndRegistryClient] + ], + admin_client: KafkaAdminClient, + name_strategy: NameStrategy, +) -> None: + topic = new_topic(admin_client) + async with rest_async_client_and_rest_async_client_from_config( + {"name_strategy": name_strategy} + ) as async_rest_and_registry_client: + registry_async_client = async_rest_and_registry_client.registry_client + rest_async_client = async_rest_and_registry_client.rest_client + + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + topic_url = f"/topics/{topic}/partitions/0" + + res = await registry_async_client.post( + f"subjects/{new_random_name('random_subject')}/versions", + json={"schemaType": "AVRO", "schema": TYPED_AVRO_SCHEMA.schema_str}, + ) + assert res.ok + + message = [{"value": {"attr1": {"string": "sample data"}, "attr2": None}}] + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["avro"], + ) + assert res.status_code == 422 + + res = await registry_async_client.post( + f"subjects/{get_subject_name(topic, TYPED_AVRO_SCHEMA, SubjectType.value_, name_strategy)}/versions", + json={"schemaType": "AVRO", "schema": TYPED_AVRO_SCHEMA.schema_str}, + ) + assert res.ok + + res = await rest_async_client.post( + topic_url, + json={"value_schema_id": res.json()["id"], "records": message}, + headers=REST_HEADERS["avro"], + ) + assert res.ok + for o in res.json()["offsets"]: + assert "partition" in o + assert int(o["partition"]) == 0 # Produce messages to a topic without key and without explicit partition to verify that @@ -615,12 +888,12 @@ async def test_publish_with_schema_id_of_another_subject_novalidation( async def test_can_produce_anything_with_no_validation_policy( rest_async_client: Client, registry_async_client: Client, - admin_client: KafkaRestAdminClient, + admin_client: KafkaAdminClient, ) -> None: - first_topic = new_topic(admin_client) - second_topic = new_topic(admin_client) + topic = new_topic(admin_client) + url = f"/topics/{topic}" - await wait_for_topics(rest_async_client, topic_names=[first_topic, second_topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) + await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1) typed_schema = ValidatedTypedSchema.parse( SchemaType.AVRO, @@ -643,9 +916,25 @@ async def test_can_produce_anything_with_no_validation_policy( json={"schema": str(typed_schema)}, ) assert res.status_code == 200 + schema_id = res.json()["id"] + + res = await rest_async_client.post( + url, + json={"value_schema_id": schema_id, "records": [{"value": {"name": "Mr. Mustache"}}]}, + headers=REST_HEADERS["avro"], + ) + assert not res.ok + assert res.status_code == 422 # with the no_validation strategy we can produce even if we use a totally random subject name - res = await registry_async_client.post(f"/topics/{first_topic}/disable_validation", json={}) + res = await registry_async_client.post(f"/topics/{topic}/disable_validation", json={}) + assert res.ok + + await rest_async_client.post( + url, + json={"value_schema_id": schema_id, "records": [{"value": {"name": "Mr. Mustache"}}]}, + headers=REST_HEADERS["avro"], + ) assert res.ok diff --git a/tests/integration/test_rest_consumer.py b/tests/integration/test_rest_consumer.py index ed0186f9f..5e35701dd 100644 --- a/tests/integration/test_rest_consumer.py +++ b/tests/integration/test_rest_consumer.py @@ -2,6 +2,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from karapace.client import Client from karapace.kafka_rest_apis.consumer_manager import KNOWN_FORMATS from tests.utils import ( consumer_valid_payload, @@ -22,7 +23,7 @@ @pytest.mark.parametrize("trail", ["", "/"]) -async def test_create_and_delete(rest_async_client, trail): +async def test_create_and_delete(rest_async_client: Client, trail: str): header = REST_HEADERS["json"] group_name = "test_group" resp = await rest_async_client.post(f"/consumers/{group_name}{trail}", json=consumer_valid_payload, headers=header) diff --git a/tests/integration/test_rest_consumer_protobuf.py b/tests/integration/test_rest_consumer_protobuf.py index 52662aeb9..a21798364 100644 --- a/tests/integration/test_rest_consumer_protobuf.py +++ b/tests/integration/test_rest_consumer_protobuf.py @@ -23,7 +23,12 @@ @pytest.mark.parametrize("schema_type", ["protobuf"]) @pytest.mark.parametrize("trail", ["", "/"]) -async def test_publish_consume_protobuf(rest_async_client, admin_client, trail, schema_type): +async def test_publish_consume_protobuf( + rest_async_client: Client, + admin_client: KafkaAdminClient, + trail: str, + schema_type: str, +): header = REST_HEADERS[schema_type] group_name = "e2e_protobuf_group" instance_id = await new_consumer(rest_async_client, group_name, fmt=schema_type, trail=trail) @@ -54,7 +59,12 @@ async def test_publish_consume_protobuf(rest_async_client, admin_client, trail, @pytest.mark.parametrize("schema_type", ["protobuf"]) @pytest.mark.parametrize("trail", ["", "/"]) -async def test_publish_consume_protobuf_second(rest_async_client, admin_client, trail, schema_type): +async def test_publish_consume_protobuf_second( + rest_async_client: Client, + admin_client: KafkaAdminClient, + trail: str, + schema_type: str, +): header = REST_HEADERS[schema_type] group_name = "e2e_proto_second" instance_id = await new_consumer(rest_async_client, group_name, fmt=schema_type, trail=trail) @@ -244,7 +254,7 @@ async def test_publish_and_consume_protobuf_with_recursive_references( res = await rest_async_client.post(subscribe_path, json={"topics": [topic_name]}, headers=REST_HEADERS["binary"]) assert res.ok - resp = await rest_async_client.get(consume_path, headers=REST_HEADERS["avro"]) + resp = await rest_async_client.get(consume_path, headers=REST_HEADERS["protobuf"]) data = resp.json() assert isinstance(data, list) diff --git a/tests/integration/utils/cluster.py b/tests/integration/utils/cluster.py index 31c06e4bd..c8611ca07 100644 --- a/tests/integration/utils/cluster.py +++ b/tests/integration/utils/cluster.py @@ -2,14 +2,16 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from __future__ import annotations + from contextlib import asynccontextmanager, ExitStack from dataclasses import dataclass -from karapace.config import Config, set_config_defaults, write_config +from karapace.config import Config, ConfigDefaults, set_config_defaults, write_config from pathlib import Path from tests.integration.utils.network import PortRangeInclusive from tests.integration.utils.process import stop_process, wait_for_port_subprocess from tests.utils import new_random_name, popen_karapace_all -from typing import AsyncIterator, List +from typing import AsyncIterator @dataclass(frozen=True) @@ -30,10 +32,11 @@ class RegistryDescription: @asynccontextmanager async def start_schema_registry_cluster( - config_templates: List[Config], + config_templates: list[Config], data_dir: Path, port_range: PortRangeInclusive, -) -> AsyncIterator[List[RegistryDescription]]: + custom_values: ConfigDefaults | None = None, +) -> AsyncIterator[list[RegistryDescription]]: """Start a cluster of schema registries, one process per `config_templates`.""" for template in config_templates: assert "bootstrap_uri" in template, "base_config must have the value `bootstrap_uri` set" @@ -76,6 +79,9 @@ async def start_schema_registry_cluster( log_path = group_dir / f"{pos}.log" error_path = group_dir / f"{pos}.error" + if custom_values is not None: + config.update(custom_values) + config = set_config_defaults(config) write_config(config_path, config)