Skip to content

Commit

Permalink
fixing partition selector, validator, and serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Oct 28, 2024
1 parent 0f97a21 commit e0e06a7
Show file tree
Hide file tree
Showing 15 changed files with 51 additions and 49 deletions.
4 changes: 3 additions & 1 deletion include/mofka/PartitionSelector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ class PartitionSelector {
*
* @return PartitionSelector instance.
*/
static PartitionSelector FromMetadata(const char* type, const Metadata& metadata);
static PartitionSelector FromMetadata(
const char* type,
const Metadata& metadata);

/**
* @brief Version of the above function that does not require a type.
Expand Down
4 changes: 3 additions & 1 deletion include/mofka/Serializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ class Serializer {
*
* @return Serializer instance.
*/
static Serializer FromMetadata(const char* type, const Metadata& metadata);
static Serializer FromMetadata(
const char* type,
const Metadata& metadata);

/**
* @brief Same as the above function but the type is expected
Expand Down
4 changes: 3 additions & 1 deletion include/mofka/Validator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ class Validator {
*
* @return Validator instance.
*/
static Validator FromMetadata(const char* type, const Metadata& metadata);
static Validator FromMetadata(
const char* type,
const Metadata& metadata);

/**
* @brief Same as the above function but will look for a "__type__"
Expand Down
5 changes: 5 additions & 0 deletions python/mochi/mofka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ def __init__(self, group_file, arg=None):
else:
super().__init__(group_file, self._mid)

def create_topic(self, *args, schema: dict|None = None, **kwargs):
if schema is not None:
kwargs["validator"] = Validator.from_metadata("schema", {"schema":schema})
super().create_topic(*args, **kwargs)


class ServiceHandle(MofkaDriver):

Expand Down
8 changes: 4 additions & 4 deletions python/py-mofka-client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ PYBIND11_MODULE(pymofka_client, m) {
;

py::class_<mofka::PartitionSelector>(m, "PartitionSelector")
.def_static("from_metadata",
[](const nlohmann::json& md){
return mofka::PartitionSelector::FromMetadata(md);
}, "metadata"_a=nlohmann::json::object())
.def_static("from_metadata",
[](const char* type, const nlohmann::json& md){
return mofka::PartitionSelector::FromMetadata(type, md);
}, "type"_a, "metadata"_a=nlohmann::json::object())
.def_static("from_metadata",
[](const nlohmann::json& md){
return mofka::PartitionSelector::FromMetadata(md);
}, "metadata"_a=nlohmann::json::object())
;

py::class_<mofka::MofkaDriver>(m, "MofkaDriver")
Expand Down
4 changes: 1 addition & 3 deletions src/BatchProducer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ class BatchProducer : public ProducerInterface {
, m_topic(std::move(topic))
{}

~BatchProducer() {
flush();
}
virtual ~BatchProducer() = default;

const std::string& name() const override {
return m_name;
Expand Down
2 changes: 1 addition & 1 deletion src/EventbridgeValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ void EventbridgeValidator::validate(const Metadata& metadata, const Data& data)

Metadata EventbridgeValidator::metadata() const {
json config = json::object();
config["__type__"] = "eventbridge";
config["type"] = "eventbridge";
config["schema"] = m_schema;
return Metadata{std::move(config)};
}
Expand Down
4 changes: 4 additions & 0 deletions src/MofkaProducer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class MofkaProducer : public BatchProducer {
std::shared_ptr<MofkaTopicHandle> topic);

std::shared_ptr<ProducerBatchInterface> newBatchForPartition(size_t index) const override;

~MofkaProducer() {
flush();
}
};

}
Expand Down
8 changes: 4 additions & 4 deletions src/PartitionSelector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ PartitionSelector PartitionSelector::FromMetadata(const Metadata& metadata) {
"Cannot create PartitionSelector from Metadata: "
"invalid Metadata (expected JSON object)");
}
if(!json.contains("__type__")) {
if(!json.contains("type")) {
return PartitionSelector{};
}
auto& type = json["__type__"];
auto& type = json["type"];
if(!type.is_string()) {
throw Exception(
"Cannot create PartitionSelector from Metadata: "
"invalid __type__ in Metadata (expected string)");
"invalid \"type\" field in Metadata (expected string)");
}
auto& type_str = type.get_ref<const std::string&>();
std::shared_ptr<PartitionSelectorInterface> ts = PartitionSelectorFactory::create(type_str, metadata);
Expand All @@ -67,7 +67,7 @@ PartitionSelector PartitionSelector::FromMetadata(const char* type, const Metada
"invalid Metadata (expected JSON object)");
}
auto md_copy = metadata;
md_copy.json()["__type__"] = type;
md_copy.json()["type"] = type;
std::shared_ptr<PartitionSelectorInterface> ts = PartitionSelectorFactory::create(type, md_copy);
return ts;
}
Expand Down
8 changes: 4 additions & 4 deletions src/Serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ Serializer Serializer::FromMetadata(const Metadata& metadata) {
"Cannot create Serializer from Metadata: "
"invalid Metadata (expected JSON object)");
}
if(!json.contains("__type__")) {
if(!json.contains("type")) {
return Serializer{};
}
auto& type = json["__type__"];
auto& type = json["type"];
if(!type.is_string()) {
throw Exception(
"Cannot create Serializer from Metadata: "
"invalid __type__ in Metadata (expected string)");
"invalid \"type\" field in Metadata (expected string)");
}
const auto& type_str = type.get_ref<const std::string&>();
std::shared_ptr<SerializerInterface> s = SerializerFactory::create(type_str, metadata);
Expand All @@ -66,7 +66,7 @@ Serializer Serializer::FromMetadata(const char* type, const Metadata& metadata)
"invalid Metadata (expected JSON object)");
}
auto md_copy = metadata;
md_copy.json()["__type__"] = type;
md_copy.json()["type"] = type;
std::shared_ptr<SerializerInterface> s = SerializerFactory::create(type, md_copy);
return Serializer(std::move(s));
}
Expand Down
8 changes: 4 additions & 4 deletions src/Validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ Validator Validator::FromMetadata(const Metadata& metadata) {
"Cannot create Validator from Metadata: "
"invalid Metadata (expected JSON object)");
}
if(!json.contains("__type__")) {
if(!json.contains("type")) {
return Validator{};
}
auto& type = json["__type__"];
auto& type = json["type"];
if(!type.is_string()) {
throw Exception(
"Cannot create Validator from Metadata: "
"invalid __type__ in Metadata (expected string)");
"invalid \"type\" field in Metadata (expected string)");
}
auto& type_str = type.get_ref<const std::string&>();
std::shared_ptr<ValidatorInterface> v = ValidatorFactory::create(type_str, metadata);
Expand All @@ -62,7 +62,7 @@ Validator Validator::FromMetadata(const char* type, const Metadata& metadata) {
"invalid Metadata (expected JSON object)");
}
auto md_copy = metadata;
md_copy.json()["__type__"] = type;
md_copy.json()["type"] = type;
std::shared_ptr<ValidatorInterface> v = ValidatorFactory::create(type, md_copy);
return v;
}
Expand Down
5 changes: 3 additions & 2 deletions tests/python/customs/MyPartitionSelector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ class MyPartitionSelector : public mofka::PartitionSelectorInterface {
(void)metadata;
if(m_targets.size() == 0)
throw mofka::Exception("PartitionSelector has no target to select from");
if(m_index >= m_targets.size()) m_index = m_index % m_targets.size();
auto result = m_index;
m_index += 1;
return m_targets.at(m_index-1);
if(m_index == m_targets.size()) m_index = 0;
return result;
}

mofka::Metadata metadata() const override {
Expand Down
18 changes: 6 additions & 12 deletions tests/python/test_mofka_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,18 @@ def test_get_servers(self):
def test_create_open_topic(self):
"""Test create and open a topic"""
name = "my_topic"
validator = mofka.Validator.from_metadata(
{"__type__":"my_validator:libmy_validator.so"})
selector = mofka.PartitionSelector.from_metadata(
{"__type__":"my_partition_selector:libmy_partition_selector.so"})
serializer = mofka.Serializer.from_metadata(
{"__type__":"my_serializer:libmy_serializer.so"})
validator = mofka.Validator.from_metadata("my_validator:libmy_validator.so")
selector = mofka.PartitionSelector.from_metadata("my_partition_selector:libmy_partition_selector.so")
serializer = mofka.Serializer.from_metadata("my_serializer:libmy_serializer.so")
self.service.create_topic(name, validator, selector, serializer)
topic = self.service.open_topic(name)

def test_add_memory_partition(self):
"""Test add partition"""
topic_name = "my_topic"
validator = mofka.Validator.from_metadata(
{"__type__":"my_validator:libmy_validator.so"})
selector = mofka.PartitionSelector.from_metadata(
{"__type__":"my_partition_selector:libmy_partition_selector.so"})
serializer = mofka.Serializer.from_metadata(
{"__type__":"my_serializer:libmy_serializer.so"})
validator = mofka.Validator.from_metadata("my_validator:libmy_validator.so")
selector = mofka.PartitionSelector.from_metadata("my_partition_selector:libmy_partition_selector.so")
serializer = mofka.Serializer.from_metadata("my_serializer:libmy_serializer.so")
server_rank = 0
self.service.create_topic(topic_name, validator, selector, serializer)
self.service.add_memory_partition(topic_name, server_rank)
Expand Down
9 changes: 3 additions & 6 deletions tests/python/test_mofka_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@ def setUp(self):

# create a topic
name = "my_topic"
validator = mofka.Validator.from_metadata(
{"__type__":"my_validator:libmy_validator.so"})
selector = mofka.PartitionSelector.from_metadata(
{"__type__":"my_partition_selector:libmy_partition_selector.so"})
serializer = mofka.Serializer.from_metadata(
{"__type__":"my_serializer:libmy_serializer.so"})
validator = mofka.Validator.from_metadata("my_validator:libmy_validator.so")
selector = mofka.PartitionSelector.from_metadata("my_partition_selector:libmy_partition_selector.so")
serializer = mofka.Serializer.from_metadata("my_serializer:libmy_serializer.so")
self.service.create_topic(name, validator, selector, serializer)
self.service.add_memory_partition(name, 0)

Expand Down
9 changes: 3 additions & 6 deletions tests/python/test_mofka_topic_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@ def setUp(self):
self.service = mofka.MofkaDriver("mofka.json", self.bedrock_server.margo.mid)

name = "my_topic"
validator = mofka.Validator.from_metadata(
{"__type__":"my_validator:libmy_validator.so"})
selector = mofka.PartitionSelector.from_metadata(
{"__type__":"my_partition_selector:libmy_partition_selector.so"})
serializer = mofka.Serializer.from_metadata(
{"__type__":"my_serializer:libmy_serializer.so"})
validator = mofka.Validator.from_metadata("my_validator:libmy_validator.so")
selector = mofka.PartitionSelector.from_metadata("my_partition_selector:libmy_partition_selector.so")
serializer = mofka.Serializer.from_metadata("my_serializer:libmy_serializer.so")
self.service.create_topic(name, validator, selector, serializer)
self.topic = self.service.open_topic(name)

Expand Down

0 comments on commit e0e06a7

Please sign in to comment.