Skip to content

Commit

Permalink
double-buffering during consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Jan 15, 2025
1 parent 9c970c1 commit 0d9022a
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 76 deletions.
16 changes: 10 additions & 6 deletions include/mofka/ConsumerHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <mofka/ForwardDcl.hpp>
#include <mofka/BulkRef.hpp>
#include <mofka/EventID.hpp>
#include <mofka/Future.hpp>

#include <thallium.hpp>
#include <memory>
Expand Down Expand Up @@ -72,13 +73,16 @@ class ConsumerHandle {
* @param metadata Bulk wrapping the metadata.
* @param data_desc_sizes Bulk wrapping data descriptor sizes (count*size_t).
* @param data_desc Bulk wrapping data descriptors.
*
* @returns a Future representing the operation. The BulkRef objects passed
* to the function need to remain valid until the future has completed.
*/
void feed(size_t count,
EventID firstID,
const BulkRef& metadata_sizes,
const BulkRef& metadata,
const BulkRef& data_desc_sizes,
const BulkRef& data_desc);
Future<void> feed(size_t count,
EventID firstID,
const BulkRef& metadata_sizes,
const BulkRef& metadata,
const BulkRef& data_desc_sizes,
const BulkRef& data_desc);

/**
* @brief Check if we should stop feeding the ConsumerHandle.
Expand Down
7 changes: 7 additions & 0 deletions include/mofka/Future.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ class Future {
*/
~Future() = default;

/**
* @brief Check the validity of the Future.
*/
operator bool() const {
return static_cast<bool>(m_wait) || static_cast<bool>(m_completed);
}

/**
* @brief Wait for the request to complete.
*/
Expand Down
19 changes: 16 additions & 3 deletions src/ConsumerHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
#include "ConsumerHandleImpl.hpp"
#include "PimplUtil.hpp"
#include "Promise.hpp"
#include <spdlog/spdlog.h>
#include <limits>

Expand All @@ -23,7 +24,7 @@ bool ConsumerHandle::shouldStop() const {
return self->m_should_stop;
}

void ConsumerHandle::feed(
Future<void> ConsumerHandle::feed(
size_t count,
EventID firstID,
const BulkRef &metadata_sizes,
Expand All @@ -32,7 +33,7 @@ void ConsumerHandle::feed(
const BulkRef &data_desc)
{
try {
self->m_send_batch.on(self->m_consumer_endpoint)(
auto request = self->m_send_batch.on(self->m_consumer_endpoint).async(
self->m_consumer_ptr,
self->m_partition_index,
count,
Expand All @@ -41,8 +42,20 @@ void ConsumerHandle::feed(
metadata,
data_desc_sizes,
data_desc);
auto req_ptr = std::make_shared<thallium::async_response>(std::move(request));
return Future<void>{
[req_ptr]() {
req_ptr->wait();
},
[req_ptr]() {
return req_ptr->received();
}
};
} catch(const std::exception& ex) {
spdlog::warn("Exception thrown while sending batch to consumer: {}", ex.what());
return Future<void>{
[ex](){ throw ex; },
[](){ return true; }
};
}
}

Expand Down
167 changes: 100 additions & 67 deletions src/YokanEventStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,56 +144,80 @@ class YokanEventStore {

auto c = batchSize.value;

// buffers to hold the metadata and descriptors
std::vector<yk_id_t> ids(c);
std::vector<size_t> metadata_sizes(c);
std::vector<char> metadata_buffer(c * 1024 * 8);

// note: because we are using docLoad for descriptors, we need
// the sizes and documents to be contiguous even if the number
// of items requested varies.
std::vector<char> descriptors_sizes_and_data(c*(sizeof(size_t)+1024));

// TODO: change 1024*8 to an actually good estimation of metadata size
// TODO: properly add Adaptive support

// expose these buffers as bulk handles
auto local_metadata_bulk = m_engine.expose(
{{metadata_sizes.data(), c*sizeof(metadata_sizes[0])},
{ids.data(), c*sizeof(ids[0])},
{metadata_buffer.data(), metadata_buffer.size()*sizeof(metadata_buffer[0])}},
thallium::bulk_mode::read_write);
auto local_descriptors_bulk = m_engine.expose(
{{descriptors_sizes_and_data.data(), descriptors_sizes_and_data.size()}},
thallium::bulk_mode::read_write);

// create the BulkRef objects
auto self_addr = static_cast<std::string>(m_engine.self());
auto metadata_sizes_bulk_ref = BulkRef{
local_metadata_bulk,
0,
c*sizeof(size_t),
self_addr
};
auto metadata_bulk_ref = BulkRef{
local_metadata_bulk,
c*(sizeof(size_t) + sizeof(yk_id_t)),
metadata_buffer.size(),
self_addr
};
auto descriptors_sizes_bulk_ref = BulkRef{
local_descriptors_bulk,
0,
c*sizeof(size_t),
self_addr
};
auto descriptors_bulk_ref = BulkRef{
local_descriptors_bulk,
c*sizeof(size_t),
descriptors_sizes_and_data.size() - c*sizeof(size_t),
self_addr
struct BufferSet {

std::vector<yk_id_t> ids;
std::vector<size_t> metadata_sizes;
std::vector<char> metadata_buffer;

// buffers to hold the metadata and descriptors
// note: because we are using docLoad for descriptors, we need
// the sizes and documents to be contiguous even if the number
// of items requested varies.
std::vector<char> descriptors_sizes_and_data;

thallium::bulk local_metadata_bulk;
thallium::bulk local_descriptors_bulk;

std::string self_addr;

BulkRef metadata_sizes_bulk_ref;
BulkRef metadata_bulk_ref;
BulkRef descriptors_sizes_bulk_ref;
BulkRef descriptors_bulk_ref;

BufferSet(thallium::engine engine, size_t count)
: ids(count)
, metadata_sizes(count)
, metadata_buffer(count * 1024 * 8)
, descriptors_sizes_and_data(count * (sizeof(size_t) + 1024))
, self_addr(engine.self())
{

// expose these buffers as bulk handles
local_metadata_bulk = engine.expose(
{{metadata_sizes.data(), count*sizeof(metadata_sizes[0])},
{ids.data(), count*sizeof(ids[0])},
{metadata_buffer.data(), metadata_buffer.size()*sizeof(metadata_buffer[0])}},
thallium::bulk_mode::read_write);
local_descriptors_bulk = engine.expose(
{{descriptors_sizes_and_data.data(),
descriptors_sizes_and_data.size()}},
thallium::bulk_mode::read_write);
// create bulk refs
metadata_sizes_bulk_ref = BulkRef{
local_metadata_bulk,
0,
count*sizeof(size_t),
self_addr
};
metadata_bulk_ref = BulkRef{
local_metadata_bulk,
count*(sizeof(size_t) + sizeof(yk_id_t)),
metadata_buffer.size(),
self_addr
};
descriptors_sizes_bulk_ref = BulkRef{
local_descriptors_bulk,
0,
count*sizeof(size_t),
self_addr
};
descriptors_bulk_ref = BulkRef{
local_descriptors_bulk,
count*sizeof(size_t),
descriptors_sizes_and_data.size() - count*sizeof(size_t),
self_addr
};

}

};

auto b1 = std::make_shared<BufferSet>(m_engine, c);
auto b2 = std::make_shared<BufferSet>(m_engine, c);
Future<void> lastFeed;

while(!consumerHandle.shouldStop()) {

bool should_stop = false;
Expand All @@ -211,52 +235,61 @@ class YokanEventStore {
if(num_available_events == 0) { // m_is_marked_complete must be true
// feed consumer 0 events with first_id = NoMoreEvents to indicate
// that there are no more events to consume from this partition
consumerHandle.feed(
if(lastFeed) lastFeed.wait();
lastFeed = consumerHandle.feed(
0, NoMoreEvents, BulkRef{}, BulkRef{}, BulkRef{}, BulkRef{});
break;
}

// list metadata documents
m_metadata_coll.listBulk(
firstID+1, 0, local_metadata_bulk.get_bulk(),
0, metadata_buffer.size(), true, batchSize.value);
firstID+1, 0, b1->local_metadata_bulk.get_bulk(),
0, b1->metadata_buffer.size(), true, batchSize.value);

// check how many we actually pulled
auto it = std::find_if(metadata_sizes.begin(),
metadata_sizes.end(),
auto it = std::find_if(b1->metadata_sizes.begin(),
b1->metadata_sizes.end(),
[](auto size) {
return size > YOKAN_LAST_VALID_SIZE;
});

size_t num_events = it - metadata_sizes.begin();
metadata_bulk_ref.size = std::accumulate(metadata_sizes.begin(), it, (size_t)0);
metadata_sizes_bulk_ref.size = num_events*sizeof(size_t);
size_t num_events = it - b1->metadata_sizes.begin();
b1->metadata_bulk_ref.size = std::accumulate(b1->metadata_sizes.begin(), it, (size_t)0);
b1->metadata_sizes_bulk_ref.size = num_events*sizeof(size_t);

// load the corresponding descriptors
m_descriptors_coll.loadBulk(
num_events, ids.data(), local_descriptors_bulk.get_bulk(),
0, local_descriptors_bulk.size(), true);
auto descriptors_sizes = reinterpret_cast<size_t*>(descriptors_sizes_and_data.data());
descriptors_sizes_bulk_ref.size = num_events*sizeof(size_t);
num_events, b1->ids.data(), b1->local_descriptors_bulk.get_bulk(),
0, b1->local_descriptors_bulk.size(), true);
auto descriptors_sizes = reinterpret_cast<size_t*>(b1->descriptors_sizes_and_data.data());
b1->descriptors_sizes_bulk_ref.size = num_events*sizeof(size_t);
for(size_t i = 0; i < num_events; ++i) {
if(descriptors_sizes[i] > YOKAN_LAST_VALID_SIZE)
descriptors_sizes[i] = 0;
}
descriptors_bulk_ref.offset = descriptors_sizes_bulk_ref.size;
descriptors_bulk_ref.size = std::accumulate(
b1->descriptors_bulk_ref.offset = b1->descriptors_sizes_bulk_ref.size;
b1->descriptors_bulk_ref.size = std::accumulate(
descriptors_sizes, descriptors_sizes + num_events, (size_t)0);

if(lastFeed)
lastFeed.wait();

// feed the consumer handle
consumerHandle.feed(
lastFeed = consumerHandle.feed(
num_events, firstID,
metadata_sizes_bulk_ref,
metadata_bulk_ref,
descriptors_sizes_bulk_ref,
descriptors_bulk_ref);
b1->metadata_sizes_bulk_ref,
b1->metadata_bulk_ref,
b1->descriptors_sizes_bulk_ref,
b1->descriptors_bulk_ref);

firstID += num_events;

std::swap(b1, b2);
}

if(lastFeed)
lastFeed.wait();

return result;
}

Expand Down

0 comments on commit 0d9022a

Please sign in to comment.