Skip to content

Commit

Permalink
added full data selector and allocator
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Oct 28, 2024
1 parent e0e06a7 commit ba14819
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 15 deletions.
3 changes: 3 additions & 0 deletions python/mochi/mofka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
FutureUint = pymofka_client.FutureUint
FutureEvent = pymofka_client.FutureEvent
Ordering = pymofka_client.Ordering
FullDataSelector = pymofka_client.FullDataSelector
ByteArrayAllocator = pymofka_client.ByteArrayAllocator

try:
import pymofka_kafka
KafkaDriver = pymofka_kafka.KafkaDriver
Expand Down
18 changes: 17 additions & 1 deletion python/py-mofka-client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ static auto data_helper(const py::buffer& buffer) {
static auto data_helper(const py::list& buffers) {
std::vector<mofka::Data::Segment> segments;
segments.reserve(buffers.size());
for (auto buff : buffers){
for (auto& buff : buffers){
auto buff_info = get_buffer_info(buff.cast<py::buffer>());
check_buffer_is_contiguous(buff_info);
segments.push_back(
Expand Down Expand Up @@ -386,4 +386,20 @@ PYBIND11_MODULE(pymofka_client, m) {
.def("wait", &mofka::Future<mofka::Event>::wait)
.def("completed", &mofka::Future<mofka::Event>::completed)
;

PythonDataSelector select_full_data =
[](const nlohmann::json&, const mofka::DataDescriptor& d) -> std::optional<mofka::DataDescriptor> {
return d;
};
m.attr("FullDataSelector") = py::cast(select_full_data);

PythonDataBroker bytes_data_broker =
[](const nlohmann::json&, const mofka::DataDescriptor& d) -> py::list {
auto buffer = py::bytearray();
auto ret = PyByteArray_Resize(buffer.ptr(), d.size());
py::list result;
result.append(std::move(buffer));
return result;
};
m.attr("ByteArrayAllocator") = py::cast(bytes_data_broker);
}
18 changes: 12 additions & 6 deletions src/MofkaConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ void MofkaConsumer::forwardBatchToConsumer(
result.error() = "Consumer seems to have be destroyed be client";
result.success() = false;
} else {
// NOTE: we convert the pointer into a shared pointer to prevent
// the consumer from disappearing while the RPC executes.
std::shared_ptr<MofkaConsumer> consumer_ptr;
try {
consumer_ptr = consumer_impl->shared_from_this();
Expand All @@ -284,12 +286,16 @@ void MofkaConsumer::forwardBatchToConsumer(
req.respond(result);
return;
}
// NOTE: we convert the pointer into a shared pointer to prevent
// the consumer from disappearing while the RPC executes.
consumer_impl->recvBatch(
target_info_index, count, firstID,
metadata_sizes, metadata,
data_desc_sizes, data_desc);
auto thread_pool = consumer_ptr->m_thread_pool;
tl::eventual<void> completed;
thread_pool.pushWork([&]() {
consumer_ptr->recvBatch(
target_info_index, count, firstID,
metadata_sizes, metadata,
data_desc_sizes, data_desc);
completed.set_value();
});
completed.wait();
}
req.respond(result);
}
Expand Down
20 changes: 12 additions & 8 deletions src/MofkaConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ class MofkaConsumer : public std::enable_shared_from_this<MofkaConsumer>,
thallium::mutex m_futures_mtx;

MofkaConsumer(thallium::engine engine,
std::string_view name,
BatchSize batch_size,
ThreadPool thread_pool,
DataBroker broker,
DataSelector selector,
SP<MofkaTopicHandle> topic,
std::vector<SP<MofkaPartitionInfo>> partitions)
std::string_view name,
BatchSize batch_size,
ThreadPool thread_pool,
DataBroker broker,
DataSelector selector,
SP<MofkaTopicHandle> topic,
std::vector<SP<MofkaPartitionInfo>> partitions)
: m_engine(std::move(engine))
, m_name(name)
, m_batch_size(batch_size)
Expand All @@ -95,7 +95,11 @@ class MofkaConsumer : public std::enable_shared_from_this<MofkaConsumer>,
, m_consumer_ack_event(m_engine.define("mofka_consumer_ack_event"))
, m_consumer_remove_consumer(m_engine.define("mofka_consumer_remove_consumer"))
, m_consumer_request_data(m_engine.define("mofka_consumer_request_data"))
, m_consumer_recv_batch(m_engine.define("mofka_consumer_recv_batch", forwardBatchToConsumer))
, m_consumer_recv_batch(
m_engine.define("mofka_consumer_recv_batch",
forwardBatchToConsumer,
0,
m_engine.get_progress_pool()))
{}

~MofkaConsumer() {
Expand Down
13 changes: 13 additions & 0 deletions tests/python/test_mofka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ def test_pull(self):
self.assertEqual(len(data), 1)
self.assertEqual(data[0], self.data)

def test_consumer_with_full_selector(self):
"""Test FullDataSelector and ByteArrayAllocator"""
consumer = self.topic.consumer(
name="full_consumer",
batch_size=1,
data_broker=mofka.ByteArrayAllocator,
data_selector=mofka.FullDataSelector)
f = self.consumer.pull()
event = f.wait()
data = event.data
self.assertEqual(len(data), 1)
self.assertEqual(data[0], self.data)


if __name__ == '__main__':
unittest.main()
66 changes: 66 additions & 0 deletions tests/python/test_mofka_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
import sys
import unittest
import json
import string
import random
import time
import random

from mochi.bedrock.server import Server as BedrockServer
import mochi.mofka.client as mofka


class TestProducer(unittest.TestCase):

def setUp(self):
# Create and connect a create
bedrock_config_file = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "config.json")
with open(bedrock_config_file) as f:
self.bedrock_server = BedrockServer("na+sm", config=f.read())
self.service = mofka.MofkaDriver("mofka.json", self.bedrock_server.margo.mid)

# create a topic
schema = {
"type": "object",
"properties": {
"x": { "type": "string" },
"y": { "type": "integer"},
"z": { "type": "boolean"}
},
"require": ["x", "y"]
}

name = "my_topic"
self.service.create_topic(name, schema=schema)
self.service.add_memory_partition(name, 0)

# Create a producer
self.topic = self.service.open_topic(name)
batchsize = mofka.AdaptiveBatchSize
thread_pool = mofka.ThreadPool(0)
ordering = mofka.Ordering.Strict
self.producer = self.topic.producer(name, batchsize, thread_pool, ordering)

def tearDown(self):
del self.service
del self.topic
del self.producer
self.bedrock_server.finalize()
del self.bedrock_server

def test_produce_valid(self):
metadata = {"x": "abc", "y": 123, "z": True}
future = self.producer.push(metadata)
future.wait()

def test_produce_invalid(self):
metadata = {"x": 123, "y": "abc", "z": "abc"}
future = self.producer.push(metadata)
with self.assertRaises(mofka.ClientException) as context:
future.wait()


if __name__ == '__main__':
unittest.main()

0 comments on commit ba14819

Please sign in to comment.