Skip to content

Commit

Permalink
Adapt to new Bedrock API (#12)
Browse files Browse the repository at this point in the history
* updated bedrock module

* added test

* fixed configuration space
  • Loading branch information
mdorier authored Sep 23, 2024
1 parent 56781f4 commit 4f5b07b
Show file tree
Hide file tree
Showing 14 changed files with 132 additions and 132 deletions.
3 changes: 2 additions & 1 deletion include/mofka/PartitionManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
#include <mofka/ConsumerHandle.hpp>
#include <mofka/Factory.hpp>

#include <bedrock/AbstractServiceFactory.hpp>
#include <bedrock/AbstractComponent.hpp>

#include <thallium.hpp>
#include <unordered_map>
#include <string_view>
Expand Down
4 changes: 1 addition & 3 deletions include/mofka/Provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <mofka/Metadata.hpp>
#include <mofka/Json.hpp>

#include <bedrock/AbstractServiceFactory.hpp>
#include <bedrock/AbstractComponent.hpp>
#include <thallium.hpp>
#include <memory>
#include <string_view>
Expand All @@ -33,13 +33,11 @@ class Provider {
* @param engine Thallium engine to use to receive RPCs.
* @param provider_id Provider id.
* @param config JSON configuration.
* @param pool Argobots pool to use to handle RPCs.
* @param dependencies Dependencies resolved by Bedrock or manually.
*/
Provider(const thallium::engine& engine,
uint16_t provider_id = 0,
const Metadata& config = Metadata{"{}"},
const thallium::pool& pool = thallium::pool{},
const bedrock::ResolvedDependencyMap& dependencies = {});

/**
Expand Down
35 changes: 16 additions & 19 deletions python/mochi/mofka/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@


from mochi.bedrock.spec import ProviderSpec, ProcSpec, ServiceSpec
from mochi.yokan.spec import YokanProviderSpec
from mochi.warabi.spec import WarabiProviderSpec
from mochi.yokan.config_space import YokanSpaceBuilder
from mochi.warabi.config_space import WarabiSpaceBuilder


class MofkaServiceSpec(ServiceSpec):
Expand All @@ -37,39 +37,36 @@ def space(*, num_servers: int = 1,
max_num_pools = num_pools_in_servers if isinstance(num_pools_in_servers, int) \
else num_pools_in_servers[1]
# Master database provider
master_db_cs = YokanProviderSpec.space(
master_db_space_builder = YokanSpaceBuilder(
paths=[f'{p}/master' for p in master_db_path_prefixes],
need_sorted_db=True, need_values=True,
need_persistence=master_db_needs_persistence,
tags=['mofka:master'],
max_num_pools=max_num_pools)
tags=['mofka:master'])
# Metadata database provider
metadata_db_cs = YokanProviderSpec.space(
metadata_db_space_builder = YokanSpaceBuilder(
paths=[f'{p}/metadata' for p in metadata_db_path_prefixes],
need_sorted_db=True, need_values=True,
need_persistence=metadata_db_needs_persistence,
tags=['mofka:metadata'],
max_num_pools=max_num_pools)
tags=['mofka:metadata'])
# Data storage provider
data_storage_cs = WarabiProviderSpec.space(
data_storage_space_builder = WarabiSpaceBuilder(
paths=[f'{p}/data' for p in data_storage_path_prefixes],
need_persistence=data_storage_needs_persistence,
tags=['mofka:data'],
max_num_pools=max_num_pools)
tags=['mofka:data'])
provider_space_factories = [
{
'family': 'master',
'space' : master_db_cs,
'builder' : master_db_space_builder,
'count' : 1
},
{
'family': 'metadata',
'space' : metadata_db_cs,
'builder' : metadata_db_space_builder,
'count' : num_metadata_db_per_proc
},
{
'family': 'data',
'space' : data_storage_cs,
'builder' : data_storage_space_builder,
'count' : num_data_storage_per_proc
}
]
Expand Down Expand Up @@ -110,13 +107,13 @@ def from_config(*, config: 'Configuration', **kwargs):
# Also because processes may share the same node, we will add
# a prefix to all the paths
for proc in spec.processes:
proc.libraries['yokan'] = 'libyokan-bedrock-module.so'
proc.libraries['warabi'] = 'libwarabi-bedrock-module.so'
proc.libraries['flock'] = 'libflock-bedrock-module.so'
proc.libraries['mofka'] = 'libmofka-bedrock-module.so'
proc.libraries.extend(['libyokan-bedrock-module.so',
'libwarabi-bedrock-module.so',
'libflock-bedrock-module.so',
'libmofka-bedrock-module.so'])
proc.providers.add(
name='group', type='flock',
pool=proc.margo.argobots.pools[0],
dependencies={'pool': proc.margo.argobots.pools[0].name},
provider_id=len(proc.providers)+1,
config={
'bootstrap': 'mpi',
Expand Down
2 changes: 1 addition & 1 deletion python/mochi/mofka/test_config_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class TestConfigSpace(unittest.TestCase):

def test_mofka_config_space(self):
space = MofkaServiceSpec.space(
num_procs=4, num_pools_in_servers=4, num_xstreams=4,
num_procs=4, num_pools_in_servers=4, num_xstreams_in_servers=4,
num_metadata_db_per_proc=8,
num_data_storage_per_proc=2).freeze()

Expand Down
4 changes: 2 additions & 2 deletions spack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ spack:
- "+space"
mochi-bedrock:
require:
- "@0.14.0:"
- "@0.15.0:"
- "+python"
- "+mpi"
- "+flock"
- "+space"
mochi-flock:
require:
- "@0.4.0:"
- "@0.5.0:"
- "+bedrock"
- "+python"
- "+mpi"
Expand Down
96 changes: 33 additions & 63 deletions src/BedrockModule.cpp
Original file line number Diff line number Diff line change
@@ -1,84 +1,54 @@
/*
* (C) 2020 The University of Chicago
* (C) 2024 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "mofka/Client.hpp"
#include "mofka/Provider.hpp"
#include "mofka/ProviderHandle.hpp"
#include <bedrock/AbstractServiceFactory.hpp>

namespace tl = thallium;

class MofkaFactory : public bedrock::AbstractServiceFactory {

public:

MofkaFactory() {}
#include <bedrock/AbstractComponent.hpp>

void *registerProvider(const bedrock::FactoryArgs &args) override {
mofka::Metadata config;
try {
config = mofka::Metadata{args.config.c_str(), true};
} catch(const mofka::Exception& ex) {
spdlog::error("Error parsing configuration for Mofka provider: {}", ex.what());
return nullptr;
}
auto provider = new mofka::Provider(
args.mid, args.provider_id,
config, tl::pool(args.pool),
args.dependencies);
return static_cast<void *>(provider);
}
namespace tl = thallium;

void deregisterProvider(void *p) override {
auto provider = static_cast<mofka::Provider *>(p);
delete provider;
}
class MofkaComponent : public bedrock::AbstractComponent {

std::string getProviderConfig(void *p) override {
auto provider = static_cast<mofka::Provider *>(p);
return provider->getConfig().string();
}
std::unique_ptr<mofka::Provider> m_provider;

void *initClient(const bedrock::FactoryArgs& args) override {
return static_cast<void *>(new mofka::Client(args.mid));
}

void finalizeClient(void *client) override {
delete static_cast<mofka::Client *>(client);
}
public:

std::string getClientConfig(void* c) override {
auto client = static_cast<mofka::Client*>(c);
return client->getConfig().string();
}
MofkaComponent(const tl::engine& engine,
uint16_t provider_id,
const mofka::Metadata& config,
const bedrock::ResolvedDependencyMap& dependencies)
: m_provider{std::make_unique<mofka::Provider>(engine, provider_id, config, dependencies)}
{}

void *createProviderHandle(void *c, hg_addr_t address,
uint16_t provider_id) override {
auto client = static_cast<mofka::Client *>(c);
auto ph = new mofka::ProviderHandle(
client->engine(),
address,
provider_id,
false);
return static_cast<void *>(ph);
void* getHandle() override {
return static_cast<void*>(m_provider.get());
}

void destroyProviderHandle(void *providerHandle) override {
auto ph = static_cast<mofka::ProviderHandle *>(providerHandle);
delete ph;
std::string getConfig() override {
return m_provider->getConfig().string();
}

std::vector<bedrock::Dependency> getProviderDependencies(const char* config) override {
auto config_metadata = mofka::Metadata{config, true};
return mofka::Provider::getDependencies(config_metadata);
}
static std::shared_ptr<bedrock::AbstractComponent>
Register(const bedrock::ComponentArgs& args) {
tl::pool pool;
auto it = args.dependencies.find("pool");
if(it != args.dependencies.end() && !it->second.empty()) {
pool = it->second[0]->getHandle<tl::pool>();
}
auto config = mofka::Metadata{args.config, true};
return std::make_shared<MofkaComponent>(
args.engine, args.provider_id, config, args.dependencies);
}

const std::vector<bedrock::Dependency> &getClientDependencies() override {
static const std::vector<bedrock::Dependency> no_dependency;
return no_dependency;
}
static std::vector<bedrock::Dependency>
GetDependencies(const bedrock::ComponentArgs& args) {
auto config_metadata = mofka::Metadata{args.config, true};
return mofka::Provider::getDependencies(config_metadata);
}
};

BEDROCK_REGISTER_MODULE_FACTORY(mofka, MofkaFactory)
BEDROCK_REGISTER_COMPONENT_TYPE(mofka, MofkaComponent)
29 changes: 9 additions & 20 deletions src/DefaultPartitionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
#include <numeric>
#include <iostream>

namespace tl = thallium;

namespace mofka {

MOFKA_REGISTER_PARTITION_MANAGER_WITH_DEPENDENCIES(
default, DefaultPartitionManager,
{"data", "warabi", BEDROCK_REQUIRED},
{"metadata", "yokan", BEDROCK_REQUIRED});
{"data", "warabi", true, false, false},
{"metadata", "yokan", true, false, false});

Result<EventID> DefaultPartitionManager::receiveBatch(
const thallium::endpoint& sender,
Expand Down Expand Up @@ -126,28 +128,15 @@ std::unique_ptr<mofka::PartitionManager> DefaultPartitionManager::create(
throw Exception{"Error(s) while validating JSON config for DefaultPartitionManager"};
}

/* Check that the dependencies are there */
auto it = dependencies.find("data");
warabi::TargetHandle* warabi_target = nullptr;;
if(it == dependencies.end()) {
throw Exception{"Warabi TargetHandle not provided as dependency"};
} else {
warabi_target = it->second.dependencies[0]->getHandle<warabi::TargetHandle*>();
}

yk_database_handle_t yokan_db = nullptr;
it = dependencies.find("metadata");
if(it == dependencies.end()) {
throw Exception{"Yokan Database not provided as dependency"};
} else {
yokan_db = it->second.dependencies[0]->getHandle<yk_database_handle_t>();
}
/* the data and metadata dependencies are required so we know they are in the map */
auto warabi_ph = dependencies.at("data")[0]->getHandle<tl::provider_handle>();
auto yokan_ph = dependencies.at("metadata")[0]->getHandle<tl::provider_handle>();

/* create data store */
auto data_store = WarabiDataStore::create(engine, std::move(*warabi_target));
auto data_store = WarabiDataStore::create(engine, std::move(warabi_ph));

/* create event store */
auto event_store = YokanEventStore::create(engine, topic_name, partition_uuid, yokan_db);
auto event_store = YokanEventStore::create(engine, topic_name, partition_uuid, std::move(yokan_ph));

/* create topic manager */
return std::unique_ptr<mofka::PartitionManager>(
Expand Down
16 changes: 11 additions & 5 deletions src/Provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,26 @@ Provider::Provider(
const tl::engine& engine,
uint16_t provider_id,
const Metadata& config,
const thallium::pool& pool,
const bedrock::ResolvedDependencyMap& dependencies)
: self(std::make_shared<ProviderImpl>(engine, provider_id, config, pool, dependencies)) {
const bedrock::ResolvedDependencyMap& dependencies) {
/* the pool argument is optional */
auto it = dependencies.find("pool");
auto pool = it != dependencies.end() ?
it->second[0]->getHandle<tl::pool>()
: engine.get_handler_pool();
self = std::make_shared<ProviderImpl>(engine, provider_id, config, pool, dependencies);
self->get_engine().push_finalize_callback(this, [p=this]() { p->self.reset(); });
}

std::vector<bedrock::Dependency> Provider::getDependencies(const Metadata& metadata) {
std::vector<bedrock::Dependency> dependencies;
auto& json = metadata.json();
if(json.is_object() && json.contains("type") && json["type"].is_string()) {
return PartitionManagerDependencyFactory::getDependencies(
dependencies = PartitionManagerDependencyFactory::getDependencies(
json["type"].get_ref<const std::string&>()
);
}
return {};
dependencies.push_back({"pool", "pool", false, false, false});
return dependencies;
}

Provider::Provider(Provider&& other) {
Expand Down
12 changes: 10 additions & 2 deletions src/WarabiDataStore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class WarabiDataStore {
};

thallium::engine m_engine;
warabi::Client m_warabi_client;
warabi::TargetHandle m_target;

public:
Expand Down Expand Up @@ -138,12 +139,19 @@ class WarabiDataStore {

WarabiDataStore(
thallium::engine engine,
warabi::Client warabi_client,
warabi::TargetHandle target)
: m_engine(std::move(engine))
, m_warabi_client(std::move(warabi_client))
, m_target(std::move(target)) {}

static std::unique_ptr<WarabiDataStore> create(thallium::engine engine, warabi::TargetHandle target) {
return std::make_unique<WarabiDataStore>(std::move(engine), std::move(target));
static std::unique_ptr<WarabiDataStore> create(
thallium::engine engine,
thallium::provider_handle warabi_ph) {
auto warabi_client = warabi::Client{engine};
auto target = warabi_client.makeTargetHandle(warabi_ph, warabi_ph.provider_id());
return std::make_unique<WarabiDataStore>(
std::move(engine), std::move(warabi_client), std::move(target));
}

};
Expand Down
Loading

0 comments on commit 4f5b07b

Please sign in to comment.