From e0e06a7b47a1d6b19e53499d4e8b15a8fea57f18 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Mon, 28 Oct 2024 13:47:53 +0000 Subject: [PATCH] fixing partition selector, validator, and serializer --- include/mofka/PartitionSelector.hpp | 4 +++- include/mofka/Serializer.hpp | 4 +++- include/mofka/Validator.hpp | 4 +++- python/mochi/mofka/client.py | 5 +++++ python/py-mofka-client.cpp | 8 ++++---- src/BatchProducer.hpp | 4 +--- src/EventbridgeValidator.cpp | 2 +- src/MofkaProducer.hpp | 4 ++++ src/PartitionSelector.cpp | 8 ++++---- src/Serializer.cpp | 8 ++++---- src/Validator.cpp | 8 ++++---- tests/python/customs/MyPartitionSelector.cpp | 5 +++-- tests/python/test_mofka_driver.py | 18 ++++++------------ tests/python/test_mofka_producer.py | 9 +++------ tests/python/test_mofka_topic_handle.py | 9 +++------ 15 files changed, 51 insertions(+), 49 deletions(-) diff --git a/include/mofka/PartitionSelector.hpp b/include/mofka/PartitionSelector.hpp index 48d9117..04a21be 100644 --- a/include/mofka/PartitionSelector.hpp +++ b/include/mofka/PartitionSelector.hpp @@ -146,7 +146,9 @@ class PartitionSelector { * * @return PartitionSelector instance. */ - static PartitionSelector FromMetadata(const char* type, const Metadata& metadata); + static PartitionSelector FromMetadata( + const char* type, + const Metadata& metadata); /** * @brief Version of the above function that does not require a type. diff --git a/include/mofka/Serializer.hpp b/include/mofka/Serializer.hpp index 325df61..9ad7b94 100644 --- a/include/mofka/Serializer.hpp +++ b/include/mofka/Serializer.hpp @@ -147,7 +147,9 @@ class Serializer { * * @return Serializer instance. */ - static Serializer FromMetadata(const char* type, const Metadata& metadata); + static Serializer FromMetadata( + const char* type, + const Metadata& metadata); /** * @brief Same as the above function but the type is expected diff --git a/include/mofka/Validator.hpp b/include/mofka/Validator.hpp index f305ded..b98141b 100644 --- a/include/mofka/Validator.hpp +++ b/include/mofka/Validator.hpp @@ -136,7 +136,9 @@ class Validator { * * @return Validator instance. */ - static Validator FromMetadata(const char* type, const Metadata& metadata); + static Validator FromMetadata( + const char* type, + const Metadata& metadata); /** * @brief Same as the above function but will look for a "__type__" diff --git a/python/mochi/mofka/client.py b/python/mochi/mofka/client.py index 7d616fa..00c45ec 100644 --- a/python/mochi/mofka/client.py +++ b/python/mochi/mofka/client.py @@ -44,6 +44,11 @@ def __init__(self, group_file, arg=None): else: super().__init__(group_file, self._mid) + def create_topic(self, *args, schema: dict|None = None, **kwargs): + if schema is not None: + kwargs["validator"] = Validator.from_metadata("schema", {"schema":schema}) + super().create_topic(*args, **kwargs) + class ServiceHandle(MofkaDriver): diff --git a/python/py-mofka-client.cpp b/python/py-mofka-client.cpp index 78a9388..c421f7f 100644 --- a/python/py-mofka-client.cpp +++ b/python/py-mofka-client.cpp @@ -110,14 +110,14 @@ PYBIND11_MODULE(pymofka_client, m) { ; py::class_(m, "PartitionSelector") - .def_static("from_metadata", - [](const nlohmann::json& md){ - return mofka::PartitionSelector::FromMetadata(md); - }, "metadata"_a=nlohmann::json::object()) .def_static("from_metadata", [](const char* type, const nlohmann::json& md){ return mofka::PartitionSelector::FromMetadata(type, md); }, "type"_a, "metadata"_a=nlohmann::json::object()) + .def_static("from_metadata", + [](const nlohmann::json& md){ + return mofka::PartitionSelector::FromMetadata(md); + }, "metadata"_a=nlohmann::json::object()) ; py::class_(m, "MofkaDriver") diff --git a/src/BatchProducer.hpp b/src/BatchProducer.hpp index 3166b9e..8968246 100644 --- a/src/BatchProducer.hpp +++ b/src/BatchProducer.hpp @@ -54,9 +54,7 @@ class BatchProducer : public ProducerInterface { , m_topic(std::move(topic)) {} - ~BatchProducer() { - flush(); - } + virtual ~BatchProducer() = default; const std::string& name() const override { return m_name; diff --git a/src/EventbridgeValidator.cpp b/src/EventbridgeValidator.cpp index 858e01f..f83e1bc 100644 --- a/src/EventbridgeValidator.cpp +++ b/src/EventbridgeValidator.cpp @@ -534,7 +534,7 @@ void EventbridgeValidator::validate(const Metadata& metadata, const Data& data) Metadata EventbridgeValidator::metadata() const { json config = json::object(); - config["__type__"] = "eventbridge"; + config["type"] = "eventbridge"; config["schema"] = m_schema; return Metadata{std::move(config)}; } diff --git a/src/MofkaProducer.hpp b/src/MofkaProducer.hpp index 7728216..2485289 100644 --- a/src/MofkaProducer.hpp +++ b/src/MofkaProducer.hpp @@ -43,6 +43,10 @@ class MofkaProducer : public BatchProducer { std::shared_ptr topic); std::shared_ptr newBatchForPartition(size_t index) const override; + + ~MofkaProducer() { + flush(); + } }; } diff --git a/src/PartitionSelector.cpp b/src/PartitionSelector.cpp index 1cf0ca2..91beca6 100644 --- a/src/PartitionSelector.cpp +++ b/src/PartitionSelector.cpp @@ -45,14 +45,14 @@ PartitionSelector PartitionSelector::FromMetadata(const Metadata& metadata) { "Cannot create PartitionSelector from Metadata: " "invalid Metadata (expected JSON object)"); } - if(!json.contains("__type__")) { + if(!json.contains("type")) { return PartitionSelector{}; } - auto& type = json["__type__"]; + auto& type = json["type"]; if(!type.is_string()) { throw Exception( "Cannot create PartitionSelector from Metadata: " - "invalid __type__ in Metadata (expected string)"); + "invalid \"type\" field in Metadata (expected string)"); } auto& type_str = type.get_ref(); std::shared_ptr ts = PartitionSelectorFactory::create(type_str, metadata); @@ -67,7 +67,7 @@ PartitionSelector PartitionSelector::FromMetadata(const char* type, const Metada "invalid Metadata (expected JSON object)"); } auto md_copy = metadata; - md_copy.json()["__type__"] = type; + md_copy.json()["type"] = type; std::shared_ptr ts = PartitionSelectorFactory::create(type, md_copy); return ts; } diff --git a/src/Serializer.cpp b/src/Serializer.cpp index 4233dee..6014860 100644 --- a/src/Serializer.cpp +++ b/src/Serializer.cpp @@ -44,14 +44,14 @@ Serializer Serializer::FromMetadata(const Metadata& metadata) { "Cannot create Serializer from Metadata: " "invalid Metadata (expected JSON object)"); } - if(!json.contains("__type__")) { + if(!json.contains("type")) { return Serializer{}; } - auto& type = json["__type__"]; + auto& type = json["type"]; if(!type.is_string()) { throw Exception( "Cannot create Serializer from Metadata: " - "invalid __type__ in Metadata (expected string)"); + "invalid \"type\" field in Metadata (expected string)"); } const auto& type_str = type.get_ref(); std::shared_ptr s = SerializerFactory::create(type_str, metadata); @@ -66,7 +66,7 @@ Serializer Serializer::FromMetadata(const char* type, const Metadata& metadata) "invalid Metadata (expected JSON object)"); } auto md_copy = metadata; - md_copy.json()["__type__"] = type; + md_copy.json()["type"] = type; std::shared_ptr s = SerializerFactory::create(type, md_copy); return Serializer(std::move(s)); } diff --git a/src/Validator.cpp b/src/Validator.cpp index a6bfac3..5e38406 100644 --- a/src/Validator.cpp +++ b/src/Validator.cpp @@ -40,14 +40,14 @@ Validator Validator::FromMetadata(const Metadata& metadata) { "Cannot create Validator from Metadata: " "invalid Metadata (expected JSON object)"); } - if(!json.contains("__type__")) { + if(!json.contains("type")) { return Validator{}; } - auto& type = json["__type__"]; + auto& type = json["type"]; if(!type.is_string()) { throw Exception( "Cannot create Validator from Metadata: " - "invalid __type__ in Metadata (expected string)"); + "invalid \"type\" field in Metadata (expected string)"); } auto& type_str = type.get_ref(); std::shared_ptr v = ValidatorFactory::create(type_str, metadata); @@ -62,7 +62,7 @@ Validator Validator::FromMetadata(const char* type, const Metadata& metadata) { "invalid Metadata (expected JSON object)"); } auto md_copy = metadata; - md_copy.json()["__type__"] = type; + md_copy.json()["type"] = type; std::shared_ptr v = ValidatorFactory::create(type, md_copy); return v; } diff --git a/tests/python/customs/MyPartitionSelector.cpp b/tests/python/customs/MyPartitionSelector.cpp index 905582e..9bbf840 100644 --- a/tests/python/customs/MyPartitionSelector.cpp +++ b/tests/python/customs/MyPartitionSelector.cpp @@ -14,9 +14,10 @@ class MyPartitionSelector : public mofka::PartitionSelectorInterface { (void)metadata; if(m_targets.size() == 0) throw mofka::Exception("PartitionSelector has no target to select from"); - if(m_index >= m_targets.size()) m_index = m_index % m_targets.size(); + auto result = m_index; m_index += 1; - return m_targets.at(m_index-1); + if(m_index == m_targets.size()) m_index = 0; + return result; } mofka::Metadata metadata() const override { diff --git a/tests/python/test_mofka_driver.py b/tests/python/test_mofka_driver.py index 4bf3d7a..445ca04 100644 --- a/tests/python/test_mofka_driver.py +++ b/tests/python/test_mofka_driver.py @@ -31,24 +31,18 @@ def test_get_servers(self): def test_create_open_topic(self): """Test create and open a topic""" name = "my_topic" - validator = mofka.Validator.from_metadata( - {"__type__":"my_validator:libmy_validator.so"}) - selector = mofka.PartitionSelector.from_metadata( - {"__type__":"my_partition_selector:libmy_partition_selector.so"}) - serializer = mofka.Serializer.from_metadata( - {"__type__":"my_serializer:libmy_serializer.so"}) + validator = mofka.Validator.from_metadata("my_validator:libmy_validator.so") + selector = mofka.PartitionSelector.from_metadata("my_partition_selector:libmy_partition_selector.so") + serializer = mofka.Serializer.from_metadata("my_serializer:libmy_serializer.so") self.service.create_topic(name, validator, selector, serializer) topic = self.service.open_topic(name) def test_add_memory_partition(self): """Test add partition""" topic_name = "my_topic" - validator = mofka.Validator.from_metadata( - {"__type__":"my_validator:libmy_validator.so"}) - selector = mofka.PartitionSelector.from_metadata( - {"__type__":"my_partition_selector:libmy_partition_selector.so"}) - serializer = mofka.Serializer.from_metadata( - {"__type__":"my_serializer:libmy_serializer.so"}) + validator = mofka.Validator.from_metadata("my_validator:libmy_validator.so") + selector = mofka.PartitionSelector.from_metadata("my_partition_selector:libmy_partition_selector.so") + serializer = mofka.Serializer.from_metadata("my_serializer:libmy_serializer.so") server_rank = 0 self.service.create_topic(topic_name, validator, selector, serializer) self.service.add_memory_partition(topic_name, server_rank) diff --git a/tests/python/test_mofka_producer.py b/tests/python/test_mofka_producer.py index 3970886..994f48c 100644 --- a/tests/python/test_mofka_producer.py +++ b/tests/python/test_mofka_producer.py @@ -33,12 +33,9 @@ def setUp(self): # create a topic name = "my_topic" - validator = mofka.Validator.from_metadata( - {"__type__":"my_validator:libmy_validator.so"}) - selector = mofka.PartitionSelector.from_metadata( - {"__type__":"my_partition_selector:libmy_partition_selector.so"}) - serializer = mofka.Serializer.from_metadata( - {"__type__":"my_serializer:libmy_serializer.so"}) + validator = mofka.Validator.from_metadata("my_validator:libmy_validator.so") + selector = mofka.PartitionSelector.from_metadata("my_partition_selector:libmy_partition_selector.so") + serializer = mofka.Serializer.from_metadata("my_serializer:libmy_serializer.so") self.service.create_topic(name, validator, selector, serializer) self.service.add_memory_partition(name, 0) diff --git a/tests/python/test_mofka_topic_handle.py b/tests/python/test_mofka_topic_handle.py index 6e3083d..b6baf28 100644 --- a/tests/python/test_mofka_topic_handle.py +++ b/tests/python/test_mofka_topic_handle.py @@ -20,12 +20,9 @@ def setUp(self): self.service = mofka.MofkaDriver("mofka.json", self.bedrock_server.margo.mid) name = "my_topic" - validator = mofka.Validator.from_metadata( - {"__type__":"my_validator:libmy_validator.so"}) - selector = mofka.PartitionSelector.from_metadata( - {"__type__":"my_partition_selector:libmy_partition_selector.so"}) - serializer = mofka.Serializer.from_metadata( - {"__type__":"my_serializer:libmy_serializer.so"}) + validator = mofka.Validator.from_metadata("my_validator:libmy_validator.so") + selector = mofka.PartitionSelector.from_metadata("my_partition_selector:libmy_partition_selector.so") + serializer = mofka.Serializer.from_metadata("my_serializer:libmy_serializer.so") self.service.create_topic(name, validator, selector, serializer) self.topic = self.service.open_topic(name)