Skip to content

Commit

Permalink
parallelized better the production path in DefaultPartitionManager
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Dec 31, 2024
1 parent 2f41bca commit 7f94952
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 58 deletions.
23 changes: 14 additions & 9 deletions src/DefaultPartitionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,25 @@ Result<EventID> DefaultPartitionManager::receiveBatch(
(void)sender;
Result<EventID> first_id;

// --------- transfer the data to the DataStore
auto descriptors = m_data_store->store(num_events, data_bulk);
if(!descriptors.success()) {
first_id.success() = false;
first_id.error() = descriptors.error();
return first_id;
}
// --------- asynchronously transfer the data to the DataStore
auto future_descriptors = m_data_store->store(num_events, data_bulk);

// --------- transfer the metadata to the EventStore
// --------- meanwhile transfer the metadata to the EventStore
first_id = m_event_store->appendMetadata(num_events, metadata_bulk);
if(!first_id.success()) return first_id;

// --------- wait for the data transfers
std::vector<DataDescriptor> descriptors;
try {
descriptors = future_descriptors.wait();
} catch(const std::exception& ex) {
first_id.success() = false;
first_id.error() = ex.what();
return first_id;
}

// --------- transfer the descriptors
auto ok = m_event_store->storeDataDescriptors(first_id.value(), descriptors.value());
auto ok = m_event_store->storeDataDescriptors(first_id.value(), descriptors);
if(!ok.success()) {
first_id.success() = false;
first_id.error() = ok.error();
Expand Down
2 changes: 1 addition & 1 deletion src/Promise.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct Promise {
auto v = state->wait();
if(std::holds_alternative<Exception>(v))
throw std::get<Exception>(v);
return std::get<Type>(v);
return std::get<Type>(std::move(v));
};
auto complete_fn = [state, on_test=std::move(on_test)]() mutable -> bool {
auto is_ready = state->test();
Expand Down
108 changes: 60 additions & 48 deletions src/WarabiDataStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <mofka/Metadata.hpp>
#include <mofka/DataDescriptor.hpp>
#include <mofka/BulkRef.hpp>
#include "Promise.hpp"
#include <spdlog/spdlog.h>
#include <cstddef>
#include <string_view>
Expand All @@ -33,61 +34,72 @@ class WarabiDataStore {

public:

Result<std::vector<DataDescriptor>> store(
Future<std::vector<DataDescriptor>> store(
size_t count,
const BulkRef& remoteBulk) {

/* prepare the result array and its content by resizing the location
* field to be able to hold a WarabiDataDescriptor. */
Result<std::vector<DataDescriptor>> result;
result.value().resize(count);
for(auto& descriptor : result.value()) {
auto& location = descriptor.location();
location.resize(sizeof(WarabiDataDescriptor));
}

/* lookup the sender. */
const auto source = m_engine.lookup(remoteBulk.address);

/* compute the offset at which the data start in the bulk handle
* (the first count*sizeof(size_t) bytes hold the data sizes). */
const auto dataOffset = count*sizeof(size_t);

/* create a local buffer to receive the sizes (these sizes are
* needed later to make the DataDescriptors). */
std::vector<size_t> sizes(count);
auto sizesBulk = m_engine.expose(
{{sizes.data(), dataOffset}},
thallium::bulk_mode::write_only);

// FIXME: the two following steps could be done in parallel.
Future<std::vector<DataDescriptor>> future;
Promise<std::vector<DataDescriptor>> promise;
std::tie(future, promise) = Promise<std::vector<DataDescriptor>>::CreateFutureAndPromise();

auto ult = [promise=std::move(promise), count, remoteBulk, this]() mutable {
/* prepare the result array and its content by resizing the location
* field to be able to hold a WarabiDataDescriptor. */
std::vector<DataDescriptor> result;
result.resize(count);
for(auto& descriptor : result) {
auto& location = descriptor.location();
location.resize(sizeof(WarabiDataDescriptor));
}

/* transfer size of each region */
sizesBulk << remoteBulk.handle.on(source)(remoteBulk.offset, dataOffset);
/* lookup the sender. */
const auto source = m_engine.lookup(remoteBulk.address);

/* compute the offset at which the data start in the bulk handle
* (the first count*sizeof(size_t) bytes hold the data sizes). */
const auto dataOffset = count*sizeof(size_t);

/* create a local buffer to receive the sizes (these sizes are
* needed later to make the DataDescriptors). */
std::vector<size_t> sizes(count);
auto sizesBulk = m_engine.expose(
{{sizes.data(), dataOffset}},
thallium::bulk_mode::write_only);

/* forward data as region into Warabi, if size > 0 */
warabi::RegionID region_id;
warabi::AsyncRequest req;
if(remoteBulk.size - dataOffset > 0) {
m_target.createAndWrite(
&region_id, remoteBulk.handle, remoteBulk.address,
remoteBulk.offset + dataOffset, remoteBulk.size - dataOffset, true,
&req);
} else {
memset(region_id.data(), 0, region_id.size());
}

/* forward data as region into Warabi, if size > 0 */
warabi::RegionID region_id;
if(remoteBulk.size - dataOffset > 0) {
m_target.createAndWrite(
&region_id, remoteBulk.handle, remoteBulk.address,
remoteBulk.offset + dataOffset, remoteBulk.size - dataOffset, true);
} else {
memset(region_id.data(), 0, region_id.size());
}
/* transfer size of each region */
sizesBulk << remoteBulk.handle.on(source)(remoteBulk.offset, dataOffset);

if(req) req.wait();

/* update the result vector */
WarabiDataDescriptor wdescriptor{0, region_id};
for(size_t j = 0; j < count; ++j) {
result[j] = DataDescriptor::From(
std::string_view{
reinterpret_cast<char*>(&wdescriptor),
sizeof(wdescriptor)
},
sizes[j]);
wdescriptor.offset += sizes[j];
}

/* update the result vector */
WarabiDataDescriptor wdescriptor{0, region_id};
for(size_t j = 0; j < count; ++j) {
result.value()[j] = DataDescriptor::From(
std::string_view{
reinterpret_cast<char*>(&wdescriptor),
sizeof(wdescriptor)
},
sizes[j]);
wdescriptor.offset += sizes[j];
}
promise.setValue(std::move(result));
};

return result;
ult();
return future;
}

std::vector<Result<void>> load(
Expand Down

0 comments on commit 7f94952

Please sign in to comment.