From 4f5b07b007d079325a404dedfcfce5159967d871 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Mon, 23 Sep 2024 16:57:55 -0500 Subject: [PATCH] Adapt to new Bedrock API (#12) * updated bedrock module * added test * fixed configuration space --- include/mofka/PartitionManager.hpp | 3 +- include/mofka/Provider.hpp | 4 +- python/mochi/mofka/spec.py | 35 +++++---- python/mochi/mofka/test_config_space.py | 2 +- spack.yaml | 4 +- src/BedrockModule.cpp | 96 +++++++++---------------- src/DefaultPartitionManager.cpp | 29 +++----- src/Provider.cpp | 16 +++-- src/WarabiDataStore.hpp | 12 +++- src/YokanEventStore.hpp | 12 +++- tests/CMakeLists.txt | 23 ++++++ tests/Configs.hpp | 12 ++-- tests/python/config.json | 12 ++-- tests/spack.yaml | 4 +- 14 files changed, 132 insertions(+), 132 deletions(-) diff --git a/include/mofka/PartitionManager.hpp b/include/mofka/PartitionManager.hpp index bfcb737..e95d71c 100644 --- a/include/mofka/PartitionManager.hpp +++ b/include/mofka/PartitionManager.hpp @@ -15,7 +15,8 @@ #include #include -#include +#include + #include #include #include diff --git a/include/mofka/Provider.hpp b/include/mofka/Provider.hpp index 5aab3fe..5013ebc 100644 --- a/include/mofka/Provider.hpp +++ b/include/mofka/Provider.hpp @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include #include @@ -33,13 +33,11 @@ class Provider { * @param engine Thallium engine to use to receive RPCs. * @param provider_id Provider id. * @param config JSON configuration. - * @param pool Argobots pool to use to handle RPCs. * @param dependencies Dependencies resolved by Bedrock or manually. */ Provider(const thallium::engine& engine, uint16_t provider_id = 0, const Metadata& config = Metadata{"{}"}, - const thallium::pool& pool = thallium::pool{}, const bedrock::ResolvedDependencyMap& dependencies = {}); /** diff --git a/python/mochi/mofka/spec.py b/python/mochi/mofka/spec.py index 7022b40..7130be4 100644 --- a/python/mochi/mofka/spec.py +++ b/python/mochi/mofka/spec.py @@ -14,8 +14,8 @@ from mochi.bedrock.spec import ProviderSpec, ProcSpec, ServiceSpec -from mochi.yokan.spec import YokanProviderSpec -from mochi.warabi.spec import WarabiProviderSpec +from mochi.yokan.config_space import YokanSpaceBuilder +from mochi.warabi.config_space import WarabiSpaceBuilder class MofkaServiceSpec(ServiceSpec): @@ -37,39 +37,36 @@ def space(*, num_servers: int = 1, max_num_pools = num_pools_in_servers if isinstance(num_pools_in_servers, int) \ else num_pools_in_servers[1] # Master database provider - master_db_cs = YokanProviderSpec.space( + master_db_space_builder = YokanSpaceBuilder( paths=[f'{p}/master' for p in master_db_path_prefixes], need_sorted_db=True, need_values=True, need_persistence=master_db_needs_persistence, - tags=['mofka:master'], - max_num_pools=max_num_pools) + tags=['mofka:master']) # Metadata database provider - metadata_db_cs = YokanProviderSpec.space( + metadata_db_space_builder = YokanSpaceBuilder( paths=[f'{p}/metadata' for p in metadata_db_path_prefixes], need_sorted_db=True, need_values=True, need_persistence=metadata_db_needs_persistence, - tags=['mofka:metadata'], - max_num_pools=max_num_pools) + tags=['mofka:metadata']) # Data storage provider - data_storage_cs = WarabiProviderSpec.space( + data_storage_space_builder = WarabiSpaceBuilder( paths=[f'{p}/data' for p in data_storage_path_prefixes], need_persistence=data_storage_needs_persistence, - tags=['mofka:data'], - max_num_pools=max_num_pools) + tags=['mofka:data']) provider_space_factories = [ { 'family': 'master', - 'space' : master_db_cs, + 'builder' : master_db_space_builder, 'count' : 1 }, { 'family': 'metadata', - 'space' : metadata_db_cs, + 'builder' : metadata_db_space_builder, 'count' : num_metadata_db_per_proc }, { 'family': 'data', - 'space' : data_storage_cs, + 'builder' : data_storage_space_builder, 'count' : num_data_storage_per_proc } ] @@ -110,13 +107,13 @@ def from_config(*, config: 'Configuration', **kwargs): # Also because processes may share the same node, we will add # a prefix to all the paths for proc in spec.processes: - proc.libraries['yokan'] = 'libyokan-bedrock-module.so' - proc.libraries['warabi'] = 'libwarabi-bedrock-module.so' - proc.libraries['flock'] = 'libflock-bedrock-module.so' - proc.libraries['mofka'] = 'libmofka-bedrock-module.so' + proc.libraries.extend(['libyokan-bedrock-module.so', + 'libwarabi-bedrock-module.so', + 'libflock-bedrock-module.so', + 'libmofka-bedrock-module.so']) proc.providers.add( name='group', type='flock', - pool=proc.margo.argobots.pools[0], + dependencies={'pool': proc.margo.argobots.pools[0].name}, provider_id=len(proc.providers)+1, config={ 'bootstrap': 'mpi', diff --git a/python/mochi/mofka/test_config_space.py b/python/mochi/mofka/test_config_space.py index 680dacb..0e8c346 100644 --- a/python/mochi/mofka/test_config_space.py +++ b/python/mochi/mofka/test_config_space.py @@ -10,7 +10,7 @@ class TestConfigSpace(unittest.TestCase): def test_mofka_config_space(self): space = MofkaServiceSpec.space( - num_procs=4, num_pools_in_servers=4, num_xstreams=4, + num_procs=4, num_pools_in_servers=4, num_xstreams_in_servers=4, num_metadata_db_per_proc=8, num_data_storage_per_proc=2).freeze() diff --git a/spack.yaml b/spack.yaml index 1c518f4..b563a28 100644 --- a/spack.yaml +++ b/spack.yaml @@ -37,14 +37,14 @@ spack: - "+space" mochi-bedrock: require: - - "@0.14.0:" + - "@0.15.0:" - "+python" - "+mpi" - "+flock" - "+space" mochi-flock: require: - - "@0.4.0:" + - "@0.5.0:" - "+bedrock" - "+python" - "+mpi" diff --git a/src/BedrockModule.cpp b/src/BedrockModule.cpp index 6c13c2f..ef58b8d 100644 --- a/src/BedrockModule.cpp +++ b/src/BedrockModule.cpp @@ -1,84 +1,54 @@ /* - * (C) 2020 The University of Chicago + * (C) 2024 The University of Chicago * * See COPYRIGHT in top-level directory. */ #include "mofka/Client.hpp" #include "mofka/Provider.hpp" #include "mofka/ProviderHandle.hpp" -#include -namespace tl = thallium; - -class MofkaFactory : public bedrock::AbstractServiceFactory { - - public: - - MofkaFactory() {} +#include - void *registerProvider(const bedrock::FactoryArgs &args) override { - mofka::Metadata config; - try { - config = mofka::Metadata{args.config.c_str(), true}; - } catch(const mofka::Exception& ex) { - spdlog::error("Error parsing configuration for Mofka provider: {}", ex.what()); - return nullptr; - } - auto provider = new mofka::Provider( - args.mid, args.provider_id, - config, tl::pool(args.pool), - args.dependencies); - return static_cast(provider); - } +namespace tl = thallium; - void deregisterProvider(void *p) override { - auto provider = static_cast(p); - delete provider; - } +class MofkaComponent : public bedrock::AbstractComponent { - std::string getProviderConfig(void *p) override { - auto provider = static_cast(p); - return provider->getConfig().string(); - } + std::unique_ptr m_provider; - void *initClient(const bedrock::FactoryArgs& args) override { - return static_cast(new mofka::Client(args.mid)); - } - - void finalizeClient(void *client) override { - delete static_cast(client); - } + public: - std::string getClientConfig(void* c) override { - auto client = static_cast(c); - return client->getConfig().string(); - } + MofkaComponent(const tl::engine& engine, + uint16_t provider_id, + const mofka::Metadata& config, + const bedrock::ResolvedDependencyMap& dependencies) + : m_provider{std::make_unique(engine, provider_id, config, dependencies)} + {} - void *createProviderHandle(void *c, hg_addr_t address, - uint16_t provider_id) override { - auto client = static_cast(c); - auto ph = new mofka::ProviderHandle( - client->engine(), - address, - provider_id, - false); - return static_cast(ph); + void* getHandle() override { + return static_cast(m_provider.get()); } - void destroyProviderHandle(void *providerHandle) override { - auto ph = static_cast(providerHandle); - delete ph; + std::string getConfig() override { + return m_provider->getConfig().string(); } - std::vector getProviderDependencies(const char* config) override { - auto config_metadata = mofka::Metadata{config, true}; - return mofka::Provider::getDependencies(config_metadata); - } + static std::shared_ptr + Register(const bedrock::ComponentArgs& args) { + tl::pool pool; + auto it = args.dependencies.find("pool"); + if(it != args.dependencies.end() && !it->second.empty()) { + pool = it->second[0]->getHandle(); + } + auto config = mofka::Metadata{args.config, true}; + return std::make_shared( + args.engine, args.provider_id, config, args.dependencies); + } - const std::vector &getClientDependencies() override { - static const std::vector no_dependency; - return no_dependency; - } + static std::vector + GetDependencies(const bedrock::ComponentArgs& args) { + auto config_metadata = mofka::Metadata{args.config, true}; + return mofka::Provider::getDependencies(config_metadata); + } }; -BEDROCK_REGISTER_MODULE_FACTORY(mofka, MofkaFactory) +BEDROCK_REGISTER_COMPONENT_TYPE(mofka, MofkaComponent) diff --git a/src/DefaultPartitionManager.cpp b/src/DefaultPartitionManager.cpp index 6467bd1..3d38dd0 100644 --- a/src/DefaultPartitionManager.cpp +++ b/src/DefaultPartitionManager.cpp @@ -11,12 +11,14 @@ #include #include +namespace tl = thallium; + namespace mofka { MOFKA_REGISTER_PARTITION_MANAGER_WITH_DEPENDENCIES( default, DefaultPartitionManager, - {"data", "warabi", BEDROCK_REQUIRED}, - {"metadata", "yokan", BEDROCK_REQUIRED}); + {"data", "warabi", true, false, false}, + {"metadata", "yokan", true, false, false}); Result DefaultPartitionManager::receiveBatch( const thallium::endpoint& sender, @@ -126,28 +128,15 @@ std::unique_ptr DefaultPartitionManager::create( throw Exception{"Error(s) while validating JSON config for DefaultPartitionManager"}; } - /* Check that the dependencies are there */ - auto it = dependencies.find("data"); - warabi::TargetHandle* warabi_target = nullptr;; - if(it == dependencies.end()) { - throw Exception{"Warabi TargetHandle not provided as dependency"}; - } else { - warabi_target = it->second.dependencies[0]->getHandle(); - } - - yk_database_handle_t yokan_db = nullptr; - it = dependencies.find("metadata"); - if(it == dependencies.end()) { - throw Exception{"Yokan Database not provided as dependency"}; - } else { - yokan_db = it->second.dependencies[0]->getHandle(); - } + /* the data and metadata dependencies are required so we know they are in the map */ + auto warabi_ph = dependencies.at("data")[0]->getHandle(); + auto yokan_ph = dependencies.at("metadata")[0]->getHandle(); /* create data store */ - auto data_store = WarabiDataStore::create(engine, std::move(*warabi_target)); + auto data_store = WarabiDataStore::create(engine, std::move(warabi_ph)); /* create event store */ - auto event_store = YokanEventStore::create(engine, topic_name, partition_uuid, yokan_db); + auto event_store = YokanEventStore::create(engine, topic_name, partition_uuid, std::move(yokan_ph)); /* create topic manager */ return std::unique_ptr( diff --git a/src/Provider.cpp b/src/Provider.cpp index d793ee2..bc2f4e4 100644 --- a/src/Provider.cpp +++ b/src/Provider.cpp @@ -16,20 +16,26 @@ Provider::Provider( const tl::engine& engine, uint16_t provider_id, const Metadata& config, - const thallium::pool& pool, - const bedrock::ResolvedDependencyMap& dependencies) -: self(std::make_shared(engine, provider_id, config, pool, dependencies)) { + const bedrock::ResolvedDependencyMap& dependencies) { + /* the pool argument is optional */ + auto it = dependencies.find("pool"); + auto pool = it != dependencies.end() ? + it->second[0]->getHandle() + : engine.get_handler_pool(); + self = std::make_shared(engine, provider_id, config, pool, dependencies); self->get_engine().push_finalize_callback(this, [p=this]() { p->self.reset(); }); } std::vector Provider::getDependencies(const Metadata& metadata) { + std::vector dependencies; auto& json = metadata.json(); if(json.is_object() && json.contains("type") && json["type"].is_string()) { - return PartitionManagerDependencyFactory::getDependencies( + dependencies = PartitionManagerDependencyFactory::getDependencies( json["type"].get_ref() ); } - return {}; + dependencies.push_back({"pool", "pool", false, false, false}); + return dependencies; } Provider::Provider(Provider&& other) { diff --git a/src/WarabiDataStore.hpp b/src/WarabiDataStore.hpp index 873529a..3546d20 100644 --- a/src/WarabiDataStore.hpp +++ b/src/WarabiDataStore.hpp @@ -28,6 +28,7 @@ class WarabiDataStore { }; thallium::engine m_engine; + warabi::Client m_warabi_client; warabi::TargetHandle m_target; public: @@ -138,12 +139,19 @@ class WarabiDataStore { WarabiDataStore( thallium::engine engine, + warabi::Client warabi_client, warabi::TargetHandle target) : m_engine(std::move(engine)) + , m_warabi_client(std::move(warabi_client)) , m_target(std::move(target)) {} - static std::unique_ptr create(thallium::engine engine, warabi::TargetHandle target) { - return std::make_unique(std::move(engine), std::move(target)); + static std::unique_ptr create( + thallium::engine engine, + thallium::provider_handle warabi_ph) { + auto warabi_client = warabi::Client{engine}; + auto target = warabi_client.makeTargetHandle(warabi_ph, warabi_ph.provider_id()); + return std::make_unique( + std::move(engine), std::move(warabi_client), std::move(target)); } }; diff --git a/src/YokanEventStore.hpp b/src/YokanEventStore.hpp index 6371f26..00bc9fe 100644 --- a/src/YokanEventStore.hpp +++ b/src/YokanEventStore.hpp @@ -7,6 +7,7 @@ #define MOFKA_YOKAN_EVENT_STORE_HPP #include "JsonUtil.hpp" +#include #include #include #include @@ -29,6 +30,7 @@ class YokanEventStore { thallium::engine m_engine; std::string m_topic_name; + yokan::Client m_yokan_client; yokan::Database m_database; yokan::Collection m_metadata_coll; yokan::Collection m_descriptors_coll; @@ -252,6 +254,7 @@ class YokanEventStore { YokanEventStore( thallium::engine engine, std::string topic_name, + yokan::Client yokan_client, yokan::Database db, yokan::Collection metadata_coll, yokan::Collection descriptors_coll, @@ -259,6 +262,7 @@ class YokanEventStore { bool marked_as_complete) : m_engine(std::move(engine)) , m_topic_name(std::move(topic_name)) + , m_yokan_client(std::move(yokan_client)) , m_database(std::move(db)) , m_metadata_coll(std::move(metadata_coll)) , m_descriptors_coll(std::move(descriptors_coll)) @@ -269,8 +273,11 @@ class YokanEventStore { thallium::engine engine, const std::string& topic_name, const UUID& partition_uuid, - yk_database_handle_t db) { - auto database = yokan::Database{db}; + thallium::provider_handle yokan_ph) { + + auto yokan_client = yokan::Client{engine}; + auto database = yokan_client.makeDatabaseHandle(yokan_ph.get_addr(), yokan_ph.provider_id()); + std::string marked_as_complete_key = "#"; marked_as_complete_key += topic_name + "#completed"; @@ -292,6 +299,7 @@ class YokanEventStore { return std::make_unique( std::move(engine), std::move(topic_name), + std::move(yokan_client), std::move(database), std::move(metadata_coll), std::move(descriptors_coll), diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 0791953..caf346e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -12,3 +12,26 @@ endforeach () if (ENABLE_PYTHON) add_subdirectory (python) endif () + +# Set the path to the python directory +set (PYTHON_MODULE_DIR ${CMAKE_SOURCE_DIR}/python) +# Use file(GLOB_RECURSE ...) to find all files matching the test_*.py pattern +file (GLOB_RECURSE PYTHON_TEST_FILES "${PYTHON_MODULE_DIR}/test_*.py") + +foreach (PYTHON_TEST_FILE ${PYTHON_TEST_FILES}) + # Remove the directory part + file (RELATIVE_PATH PYTHON_TEST_FILE_REL ${PYTHON_MODULE_DIR} ${PYTHON_TEST_FILE}) + # Remove the file extension + string (REPLACE ".py" "" PYTHON_TEST_FILE_BASE ${PYTHON_TEST_FILE_REL}) + # Replace slashes with dots + string (REPLACE "/" "." PYTHON_TEST_NAME ${PYTHON_TEST_FILE_BASE}) + # Add the test + if (${ENABLE_COVERAGE}) + message (STATUS "${PYTHON_TEST_NAME} test will run with code coverage") + add_test (NAME ${PYTHON_TEST_NAME} COMMAND coverage run -a -m unittest ${PYTHON_TEST_NAME}) + else () + add_test (NAME ${PYTHON_TEST_NAME} COMMAND python -m unittest ${PYTHON_TEST_NAME}) + endif () + set_property (TEST ${PYTHON_TEST_NAME} PROPERTY ENVIRONMENT + PYTHONPATH=${CMAKE_SOURCE_DIR}/python/:${CMAKE_BINARY_DIR}/python:$ENV{PYTHONPATH}) +endforeach() diff --git a/tests/Configs.hpp b/tests/Configs.hpp index 6bf267e..b8c5eae 100644 --- a/tests/Configs.hpp +++ b/tests/Configs.hpp @@ -9,12 +9,12 @@ static inline const char* config = R"( { - "libraries" : { - "mofka" : "libmofka-bedrock-module.so", - "flock" : "libflock-bedrock-module.so", - "warabi" : "libwarabi-bedrock-module.so", - "yokan" : "libyokan-bedrock-module.so" - }, + "libraries" : [ + "libmofka-bedrock-module.so", + "libflock-bedrock-module.so", + "libwarabi-bedrock-module.so", + "libyokan-bedrock-module.so" + ], "providers" : [ { "name" : "my_flock_provider", diff --git a/tests/python/config.json b/tests/python/config.json index f309000..d97f384 100644 --- a/tests/python/config.json +++ b/tests/python/config.json @@ -1,10 +1,10 @@ { - "libraries" : { - "mofka" : "libmofka-bedrock-module.so", - "flock" : "libflock-bedrock-module.so", - "warabi" : "libwarabi-bedrock-module.so", - "yokan" : "libyokan-bedrock-module.so" - }, + "libraries" : [ + "libmofka-bedrock-module.so", + "libflock-bedrock-module.so", + "libwarabi-bedrock-module.so", + "libyokan-bedrock-module.so" + ], "providers" : [ { "name" : "my_flock_provider", diff --git a/tests/spack.yaml b/tests/spack.yaml index ba43293..3e1a44d 100644 --- a/tests/spack.yaml +++ b/tests/spack.yaml @@ -42,14 +42,14 @@ spack: - "+space" mochi-bedrock: require: - - "@0.14.0:" + - "@0.15.0:" - "+python" - "+mpi" - "+flock" - "+space" mochi-flock: require: - - "@0.4.0:" + - "@0.5.0:" - "+bedrock" - "+python" - "+mpi"