diff --git a/cmake/Modules/TestDatasets.cmake b/cmake/Modules/TestDatasets.cmake index 30f6b6bf98..5c6b636499 100644 --- a/cmake/Modules/TestDatasets.cmake +++ b/cmake/Modules/TestDatasets.cmake @@ -6,7 +6,7 @@ set(MISC_TEST_DATASETS ${KATANA_TEST_DATASETS}/misc_datasets) ## latest supported rdg storage_format_version #TODO(emcginnis) get this envar in RDGPartHeader.h instead of having to hard code it here and there -set(KATANA_RDG_STORAGE_FORMAT_VERSION "5") +set(KATANA_RDG_STORAGE_FORMAT_VERSION "6") ## returns path to the specified rdg dataset at the specified storage_format_version @@ -85,3 +85,4 @@ rdg_dataset(RDG_GNN_K5_SINGLE "gnn_k5_single") # Used the latest storage_format_version RDGs defined above instead rdg_dataset_at_version(RDG_LDBC_003_V1 "ldbc_003" "1") rdg_dataset_at_version(RDG_LDBC_003_V3 "ldbc_003" "3") +rdg_dataset_at_version(RDG_LDBC_003_V5 "ldbc_003" "5") diff --git a/external/test-datasets b/external/test-datasets index 9934dddf3e..219aaa5156 160000 --- a/external/test-datasets +++ b/external/test-datasets @@ -1 +1 @@ -Subproject commit 9934dddf3e37ecc14a21cfc814b117eac0266156 +Subproject commit 219aaa51568c45e4c9599a160ee2b95265b06310 diff --git a/libtsuba/include/katana/RDGManifest.h b/libtsuba/include/katana/RDGManifest.h index d6038d5d7d..5a4bc7f230 100644 --- a/libtsuba/include/katana/RDGManifest.h +++ b/libtsuba/include/katana/RDGManifest.h @@ -147,7 +147,7 @@ class KATANA_EXPORT RDGManifest { std::string ToJsonString() const; /// Return the set of file names that hold this RDG's data by reading partition files - /// Useful to garbage collect unused files + /// Useful to garbage collect unused files, and copy an RDG to a new location katana::Result> FileNames(); // Required by nlohmann diff --git a/libtsuba/include/katana/RDGStorageFormatVersion.h b/libtsuba/include/katana/RDGStorageFormatVersion.h index ba668bfa65..53b9cb1f1a 100644 --- a/libtsuba/include/katana/RDGStorageFormatVersion.h +++ b/libtsuba/include/katana/RDGStorageFormatVersion.h @@ -13,11 +13,12 @@ static const uint32_t kPartitionStorageFormatVersion2 = 2; static const uint32_t kPartitionStorageFormatVersion3 = 3; static const uint32_t kPartitionStorageFormatVersion4 = 4; static const uint32_t kPartitionStorageFormatVersion5 = 5; +static const uint32_t kPartitionStorageFormatVersion6 = 6; /// kLatestPartitionStorageFormatVersion to be bumped any time /// the on disk format of RDGPartHeader changes static const uint32_t kLatestPartitionStorageFormatVersion = - kPartitionStorageFormatVersion5; + kPartitionStorageFormatVersion6; }; // namespace katana diff --git a/libtsuba/include/katana/RDKLSHIndexPrimitive.h b/libtsuba/include/katana/RDKLSHIndexPrimitive.h index fde73aa1b4..814d600e5c 100644 --- a/libtsuba/include/katana/RDKLSHIndexPrimitive.h +++ b/libtsuba/include/katana/RDKLSHIndexPrimitive.h @@ -22,6 +22,8 @@ namespace katana { const std::string kOptionalDatastructureRDKLSHIndexPrimitive = "kg.v1.rdk_lsh_index"; +const std::string kOptionalDatastructureRDKLSHIndexPrimitiveFilename = + "rdk_lsh_index_manifest"; class KATANA_EXPORT RDKLSHIndexPrimitive : private katana::RDGOptionalDatastructure { @@ -35,7 +37,8 @@ class KATANA_EXPORT RDKLSHIndexPrimitive katana::Result Write(katana::Uri rdg_dir_path) { // Write out our json manifest - katana::Uri manifest_path = rdg_dir_path.RandFile("rdk_lsh_index_manifest"); + katana::Uri manifest_path = rdg_dir_path.RandFile( + kOptionalDatastructureRDKLSHIndexPrimitiveFilename); KATANA_CHECKED(WriteManifest(manifest_path.string())); return manifest_path.BaseName(); } diff --git a/libtsuba/include/katana/RDKSubstructureIndexPrimitive.h b/libtsuba/include/katana/RDKSubstructureIndexPrimitive.h index 64ca0ce1ab..b161a3d5df 100644 --- a/libtsuba/include/katana/RDKSubstructureIndexPrimitive.h +++ b/libtsuba/include/katana/RDKSubstructureIndexPrimitive.h @@ -22,6 +22,8 @@ namespace katana { const std::string kOptionalDatastructureRDKSubstructureIndexPrimitive = "kg.v1.rdk_substructure_index"; +const std::string kOptionalDatastructureRDKSubstructureIndexPrimitiveFilename = + "rdk_substructure_index_manifest"; class KATANA_EXPORT RDKSubstructureIndexPrimitive : private katana::RDGOptionalDatastructure { @@ -35,8 +37,8 @@ class KATANA_EXPORT RDKSubstructureIndexPrimitive katana::Result Write(katana::Uri rdg_dir_path) { // Write out our json manifest - katana::Uri manifest_path = - rdg_dir_path.RandFile("rdk_substructure_index_manifest"); + katana::Uri manifest_path = rdg_dir_path.RandFile( + kOptionalDatastructureRDKSubstructureIndexPrimitiveFilename); KATANA_CHECKED(WriteManifest(manifest_path.string())); return manifest_path.BaseName(); } diff --git a/libtsuba/src/RDG.cpp b/libtsuba/src/RDG.cpp index 3848bbab27..5800f87835 100644 --- a/libtsuba/src/RDG.cpp +++ b/libtsuba/src/RDG.cpp @@ -1079,12 +1079,6 @@ katana::RDG::edge_entity_type_id_array() const { katana::Result> katana::RDG::LoadRDKLSHIndexPrimitive() { - if (!KATANA_EXPERIMENTAL_ENABLED(UnstableRDGStorageFormat)) { - return KATANA_ERROR( - ErrorCode::InvalidArgument, - "The UnstableRDGStorageFormat feature flag must be set to use this " - "feature"); - } std::optional res = KATANA_CHECKED(core_->part_header().OptionalDatastructureManifest( kOptionalDatastructureRDKLSHIndexPrimitive)); @@ -1100,12 +1094,6 @@ katana::RDG::LoadRDKLSHIndexPrimitive() { katana::Result katana::RDG::WriteRDKLSHIndexPrimitive(katana::RDKLSHIndexPrimitive& index) { - if (!KATANA_EXPERIMENTAL_ENABLED(UnstableRDGStorageFormat)) { - return KATANA_ERROR( - ErrorCode::InvalidArgument, - "The UnstableRDGStorageFormat feature flag must be set to use this " - "feature"); - } std::string path = KATANA_CHECKED(index.Write(rdg_dir())); core_->part_header().AppendOptionalDatastructureManifest( kOptionalDatastructureRDKLSHIndexPrimitive, path); @@ -1115,12 +1103,6 @@ katana::RDG::WriteRDKLSHIndexPrimitive(katana::RDKLSHIndexPrimitive& index) { katana::Result> katana::RDG::LoadRDKSubstructureIndexPrimitive() { - if (!KATANA_EXPERIMENTAL_ENABLED(UnstableRDGStorageFormat)) { - return KATANA_ERROR( - ErrorCode::InvalidArgument, - "The UnstableRDGStorageFormat feature flag must be set to use this " - "feature"); - } std::optional res = KATANA_CHECKED(core_->part_header().OptionalDatastructureManifest( kOptionalDatastructureRDKSubstructureIndexPrimitive)); @@ -1138,12 +1120,6 @@ katana::RDG::LoadRDKSubstructureIndexPrimitive() { katana::Result katana::RDG::WriteRDKSubstructureIndexPrimitive( katana::RDKSubstructureIndexPrimitive& index) { - if (!KATANA_EXPERIMENTAL_ENABLED(UnstableRDGStorageFormat)) { - return KATANA_ERROR( - ErrorCode::InvalidArgument, - "The UnstableRDGStorageFormat feature flag must be set to use this " - "feature"); - } std::string path = KATANA_CHECKED(index.Write(rdg_dir())); core_->part_header().AppendOptionalDatastructureManifest( kOptionalDatastructureRDKSubstructureIndexPrimitive, path); diff --git a/libtsuba/src/RDGManifest.cpp b/libtsuba/src/RDGManifest.cpp index 29492db93e..ef2266b0d1 100644 --- a/libtsuba/src/RDGManifest.cpp +++ b/libtsuba/src/RDGManifest.cpp @@ -10,6 +10,7 @@ #include "katana/FileView.h" #include "katana/JSON.h" #include "katana/ParquetReader.h" +#include "katana/RDGOptionalDatastructure.h" #include "katana/Result.h" #include "katana/tsuba.h" @@ -215,10 +216,28 @@ AddPropertySubFiles(std::set& fnames, std::string full_path) { } return katana::ResultSuccess(); } + +katana::Result +AddOptionalDatastructureSubfiles( + std::set& fnames, std::string full_path) { + katana::FileView fv; + KATANA_CHECKED(fv.Bind(full_path, true)); + katana::RDGOptionalDatastructure data; + KATANA_CHECKED( + katana::JsonParse(fv, &data)); + + // copy over any extra files the optional datastructure relies on + // Assumes that all OptionalDatastructures properly extend the RDGOptionalDatastructure class + for (const auto& file : data.paths()) { + fnames.emplace(file.second); + } + return katana::ResultSuccess(); +} + } // namespace // Return the set of file names that hold this RDG's data by reading partition files -// Useful to garbage collect unused files +// Useful to garbage collect unused files, and copy an RDG to a new location katana::Result> katana::RDGManifest::FileNames() { std::set fnames{}; @@ -238,6 +257,7 @@ katana::RDGManifest::FileNames() { version(), view_specifier(), header_res.error()); } else { auto header = std::move(header_res.value()); + for (const auto& node_prop : header.node_prop_info_list()) { fnames.emplace(node_prop.path()); KATANA_CHECKED(AddPropertySubFiles( @@ -264,6 +284,12 @@ katana::RDGManifest::FileNames() { for (size_t i = 0; i < header.topology_metadata()->num_entries(); i++) { fnames.emplace(header.topology_metadata()->Entries().at(i).path_); } + + for (auto it : header.optional_datastructure_manifests()) { + fnames.emplace(it.first); + KATANA_CHECKED(AddOptionalDatastructureSubfiles( + fnames, katana::Uri::JoinPath(dir().string(), it.second))); + } } } return fnames; diff --git a/libtsuba/src/RDGPartHeader.cpp b/libtsuba/src/RDGPartHeader.cpp index c29b19d473..dbafa2bd4b 100644 --- a/libtsuba/src/RDGPartHeader.cpp +++ b/libtsuba/src/RDGPartHeader.cpp @@ -345,42 +345,25 @@ katana::to_json(json& j, const katana::RDGPartHeader& header) { "RDGPartHeader.unstable_storage_format_ is true"); } - if (KATANA_EXPERIMENTAL_ENABLED(UnstableRDGStorageFormat)) { - j = json{ - {kNodePropertyKey, header.node_prop_info_list_}, - {kEdgePropertyKey, header.edge_prop_info_list_}, - {kPartPropertyFilesKey, header.part_prop_info_list_}, - {kPartPropertyMetaKey, header.metadata_}, - {kStorageFormatVersionKey, header.storage_format_version_}, - {kUnstableStorageFormatFlagKey, header.unstable_storage_format_}, - {kNodeEntityTypeIDArrayPathKey, header.node_entity_type_id_array_path_}, - {kEdgeEntityTypeIDArrayPathKey, header.edge_entity_type_id_array_path_}, - {kNodeEntityTypeIDDictionaryKey, - header.node_entity_type_id_dictionary_}, - {kEdgeEntityTypeIDDictionaryKey, - header.edge_entity_type_id_dictionary_}, - {kNodeEntityTypeIDNameKey, header.node_entity_type_id_name_}, - {kEdgeEntityTypeIDNameKey, header.edge_entity_type_id_name_}, - {kPartitionTopologyMetadataKey, header.topology_metadata_}, - {kOptionalDatastructuresKey, header.optional_datastructure_manifests_}}; - } else { - j = json{ - {kNodePropertyKey, header.node_prop_info_list_}, - {kEdgePropertyKey, header.edge_prop_info_list_}, - {kPartPropertyFilesKey, header.part_prop_info_list_}, - {kPartPropertyMetaKey, header.metadata_}, - {kStorageFormatVersionKey, header.storage_format_version_}, - {kUnstableStorageFormatFlagKey, header.unstable_storage_format_}, - {kNodeEntityTypeIDArrayPathKey, header.node_entity_type_id_array_path_}, - {kEdgeEntityTypeIDArrayPathKey, header.edge_entity_type_id_array_path_}, - {kNodeEntityTypeIDDictionaryKey, - header.node_entity_type_id_dictionary_}, - {kEdgeEntityTypeIDDictionaryKey, - header.edge_entity_type_id_dictionary_}, - {kNodeEntityTypeIDNameKey, header.node_entity_type_id_name_}, - {kEdgeEntityTypeIDNameKey, header.edge_entity_type_id_name_}, - {kPartitionTopologyMetadataKey, header.topology_metadata_}}; - } + KATANA_LOG_DEBUG( + "Storing paths to {} optional datastructure manifests", + header.optional_datastructure_manifests_.size()); + + j = json{ + {kNodePropertyKey, header.node_prop_info_list_}, + {kEdgePropertyKey, header.edge_prop_info_list_}, + {kPartPropertyFilesKey, header.part_prop_info_list_}, + {kPartPropertyMetaKey, header.metadata_}, + {kStorageFormatVersionKey, header.storage_format_version_}, + {kUnstableStorageFormatFlagKey, header.unstable_storage_format_}, + {kNodeEntityTypeIDArrayPathKey, header.node_entity_type_id_array_path_}, + {kEdgeEntityTypeIDArrayPathKey, header.edge_entity_type_id_array_path_}, + {kNodeEntityTypeIDDictionaryKey, header.node_entity_type_id_dictionary_}, + {kEdgeEntityTypeIDDictionaryKey, header.edge_entity_type_id_dictionary_}, + {kNodeEntityTypeIDNameKey, header.node_entity_type_id_name_}, + {kEdgeEntityTypeIDNameKey, header.edge_entity_type_id_name_}, + {kPartitionTopologyMetadataKey, header.topology_metadata_}, + {kOptionalDatastructuresKey, header.optional_datastructure_manifests_}}; } void @@ -478,11 +461,13 @@ katana::from_json(const json& j, katana::RDGPartHeader& header) { header.topology_metadata_.Append(entry); } - // Version N added optional data structures - // TODO(emcginnis) unstable for now - if (header.unstable_storage_format_) { + // Version 6 added optional data structures + if (header.storage_format_version_ >= kPartitionStorageFormatVersion6) { j.at(kOptionalDatastructuresKey) .get_to(header.optional_datastructure_manifests_); + KATANA_LOG_DEBUG( + "Loaded {} optional datastructure manifests", + header.optional_datastructure_manifests_.size()); } } diff --git a/libtsuba/src/RDGPartHeader.h b/libtsuba/src/RDGPartHeader.h index c3a850bc73..5ef0c1fff3 100644 --- a/libtsuba/src/RDGPartHeader.h +++ b/libtsuba/src/RDGPartHeader.h @@ -478,9 +478,15 @@ class KATANA_EXPORT RDGPartHeader { if (search != optional_datastructure_manifests_.end()) { return search->second; } - KATANA_LOG_DEBUG( + KATANA_LOG_WARN( "No optional datastructure manifest available matching [{}]", optional_datastructure_name); + KATANA_LOG_WARN( + "Available optional datastructures, size = {}:", + optional_datastructure_manifests_.size()); + for (auto it : optional_datastructure_manifests_) { + KATANA_LOG_WARN(" {} : {}", it.first, it.second); + } return std::nullopt; } @@ -489,8 +495,22 @@ class KATANA_EXPORT RDGPartHeader { const std::string& optional_datastructure_path) { optional_datastructure_manifests_.emplace( optional_datastructure_name, optional_datastructure_path); + KATANA_LOG_DEBUG( + "Appended optional datastructure manifest {}, at path {}, total count " + "= {}", + optional_datastructure_name, optional_datastructure_path, + optional_datastructure_manifests_.size()); } + const std::unordered_map& + optional_datastructure_manifests() const { + return optional_datastructure_manifests_; + } + + /// Return the set of file names that hold this partitions data + /// Useful to garbage collect unused files, and copy an RDG to a new location + katana::Result> FileNames(); + friend void to_json(nlohmann::json& j, const RDGPartHeader& header); friend void from_json(const nlohmann::json& j, RDGPartHeader& header); diff --git a/libtsuba/test/CMakeLists.txt b/libtsuba/test/CMakeLists.txt index fd0735d8c6..2ab0bf14cb 100644 --- a/libtsuba/test/CMakeLists.txt +++ b/libtsuba/test/CMakeLists.txt @@ -97,16 +97,31 @@ add_test(NAME ${name} COMMAND ${test_name} ${RDG_LDBC_003_V3}) -set(name storage-format-version-v4-v5-optional-datastructure-rdk) +# Test upgrading a v5 RDG to v6 and adding rdg optional datastructures +set(name storage-format-version-v5-v6-optional-datastructure-rdk) +set(test_name ${name}-test) +add_test_dataset_fixture(${PROJECT_BINARY_DIR} ${RDG_LDBC_003_V5} -${name} tmp_input_location input-setup-fixture-group) +add_executable(${test_name} storage-format-version/v6-optional-datastructure-rdk.cpp) +target_link_libraries(${test_name} katana_tsuba) +target_link_libraries(${test_name} katana_galois) +target_include_directories(${test_name} PRIVATE ../src) +add_test(NAME ${name} COMMAND ${test_name} ${tmp_input_location}) +set_property(TEST ${name} APPEND PROPERTY LABELS quick) +set_tests_properties(${name} PROPERTIES LABELS quick) +set_property(TEST ${name} + APPEND PROPERTY + FIXTURES_REQUIRED ${input-setup-fixture-group}) + + +# Test adding rdk optional datastructures to an RDG that already supports them +set(name storage-format-version-v6-v6-optional-datastructure-rdk) set(test_name ${name}-test) add_test_dataset_fixture(${PROJECT_BINARY_DIR} ${RDG_LDBC_003} -${name} tmp_input_location input-setup-fixture-group) -add_executable(${test_name} storage-format-version/v5-optional-datastructure-rdk.cpp) +add_executable(${test_name} storage-format-version/v6-optional-datastructure-rdk.cpp) target_link_libraries(${test_name} katana_tsuba) target_link_libraries(${test_name} katana_galois) target_include_directories(${test_name} PRIVATE ../src) add_test(NAME ${name} COMMAND ${test_name} ${tmp_input_location}) -set_tests_properties(${name} PROPERTIES - ENVIRONMENT KATANA_ENABLE_EXPERIMENTAL=UnstableRDGStorageFormat) set_property(TEST ${name} APPEND PROPERTY LABELS quick) set_tests_properties(${name} PROPERTIES LABELS quick) set_property(TEST ${name} diff --git a/libtsuba/test/storage-format-version/v5-optional-datastructure-rdk.cpp b/libtsuba/test/storage-format-version/v6-optional-datastructure-rdk.cpp similarity index 65% rename from libtsuba/test/storage-format-version/v5-optional-datastructure-rdk.cpp rename to libtsuba/test/storage-format-version/v6-optional-datastructure-rdk.cpp index 132ea1074c..a59d0c2d03 100644 --- a/libtsuba/test/storage-format-version/v5-optional-datastructure-rdk.cpp +++ b/libtsuba/test/storage-format-version/v6-optional-datastructure-rdk.cpp @@ -1,5 +1,8 @@ +#include "v6-optional-datastructure-rdk.h" + #include #include +#include #include #include @@ -18,107 +21,6 @@ #include "katana/TextTracer.h" #include "katana/URI.h" -std::vector>> -GenerateHashes() { - std::vector>> hashes(128); - for (uint64_t i = 0; i < 128; i++) { - for (uint64_t j = 0; j < 64; j++) { - std::map> tmp; - tmp[j] = {i, j, i + j}; - hashes.emplace_back(tmp); - } - } - return hashes; -} - -std::vector -GenerateFingerprints() { - std::vector fingerprints; - - for (size_t i = 0; i < 4; i++) { - katana::DynamicBitset bset; - for (size_t j = 0; j < i; j++) { - bset.resize(j + 1); - bset.set(j); - } - fingerprints.emplace_back(std::move(bset)); - } - - return fingerprints; -} - -std::vector -GenerateSmiles() { - std::vector smiles = {"smile1", "smile2", "smile3", "smile4"}; - return smiles; -} - -std::vector> -GenerateIndices() { - std::vector> indices( - 128, std::vector(64)); - for (size_t i = 0; i < 128; i++) { - for (size_t j = 0; j < 64; j++) { - indices[i][j] = i + j; - } - } - return indices; -} - -katana::RDKLSHIndexPrimitive -GenerateLSHIndex() { - katana::RDKLSHIndexPrimitive index; - - std::vector fingerprints = GenerateFingerprints(); - index.set_num_hashes_per_bucket(16); - index.set_num_buckets(96); - index.set_fingerprint_length(42); - index.set_num_fingerprints(fingerprints.size()); - index.set_hash_structure(GenerateHashes()); - index.set_fingerprints(std::move(fingerprints)); - index.set_smiles(GenerateSmiles()); - return index; -} - -void -ValidateLSHIndex(katana::RDKLSHIndexPrimitive& index) { - KATANA_LOG_ASSERT(index.num_hashes_per_bucket() == 16); - KATANA_LOG_ASSERT(index.num_buckets() == 96); - KATANA_LOG_ASSERT(index.fingerprint_length() == 42); - KATANA_LOG_ASSERT(index.num_fingerprints() == 4); - KATANA_LOG_ASSERT(index.hash_structure() == GenerateHashes()); - KATANA_LOG_ASSERT(index.fingerprints() == GenerateFingerprints()); - KATANA_LOG_ASSERT(index.smiles() == GenerateSmiles()); -} - -katana::RDKSubstructureIndexPrimitive -GenerateSubstructIndex() { - katana::RDKSubstructureIndexPrimitive index; - - std::vector fingerprints = GenerateFingerprints(); - auto smiles = GenerateSmiles(); - auto indices = GenerateIndices(); - KATANA_LOG_VASSERT( - smiles.size() == fingerprints.size(), "smiles = {}, finger = {}", - smiles.size(), fingerprints.size()); - - index.set_fp_size(indices.size()); - index.set_num_entries(smiles.size()); - index.set_index(std::move(indices)); - index.set_fingerprints(std::move(fingerprints)); - index.set_smiles(std::move(smiles)); - return index; -} - -void -ValidateSubstructIndex(katana::RDKSubstructureIndexPrimitive& index) { - KATANA_LOG_ASSERT(index.fp_size() == 128); - KATANA_LOG_ASSERT(index.num_entries() == 4); - KATANA_LOG_ASSERT(index.index() == GenerateIndices()); - KATANA_LOG_ASSERT(index.fingerprints() == GenerateFingerprints()); - KATANA_LOG_ASSERT(index.smiles() == GenerateSmiles()); -} - /* * Tests: Optional Datastructure, RDKLSHIndexPrimitive, RDKSubstructureIndexPrimitive functionality * @@ -229,6 +131,7 @@ TestLoadFail(const std::string& rdg_dir) { // rdg must have only one of these manifests available for this test to function propertly std::string path = KATANA_CHECKED(find_file(rdg_dir2, "rdk_lsh_index_manifest")); + KATANA_LOG_DEBUG("replacing manifest file at {}", path); std::filesystem::remove(path); ff->Bind(path); KATANA_CHECKED(ff->Persist()); @@ -236,7 +139,8 @@ TestLoadFail(const std::string& rdg_dir) { // expect this to fail auto res = rdg2.LoadRDKLSHIndexPrimitive(); if (res) { - KATANA_LOG_ASSERT("Loading the garbage manifest should fail!"); + KATANA_LOG_FATAL("Loading the bad index should fail"); + return KATANA_ERROR(katana::ErrorCode::InvalidArgument, "should fail"); } return katana::ResultSuccess(); @@ -256,9 +160,6 @@ main(int argc, char* argv[]) { katana::ProgressScope host_scope = katana::GetTracer().StartActiveSpan("rdg-slice test"); - // Ensure the feature flag is actually set - KATANA_LOG_ASSERT(KATANA_EXPERIMENTAL_ENABLED(UnstableRDGStorageFormat)); - const std::string& rdg = argv[1]; auto res = TestRoundTripRDKIndex(rdg); diff --git a/libtsuba/test/storage-format-version/v6-optional-datastructure-rdk.h b/libtsuba/test/storage-format-version/v6-optional-datastructure-rdk.h new file mode 100644 index 0000000000..ea7804c4bc --- /dev/null +++ b/libtsuba/test/storage-format-version/v6-optional-datastructure-rdk.h @@ -0,0 +1,120 @@ +#include +#include +#include + +#include + +#include "../test-rdg.h" +#include "katana/Experimental.h" +#include "katana/Galois.h" +#include "katana/Logging.h" +#include "katana/ProgressTracer.h" +#include "katana/RDG.h" +#include "katana/RDGManifest.h" +#include "katana/RDGStorageFormatVersion.h" +#include "katana/RDKLSHIndexPrimitive.h" +#include "katana/RDKSubstructureIndexPrimitive.h" +#include "katana/Result.h" +#include "katana/TextTracer.h" +#include "katana/URI.h" + +std::vector>> +GenerateHashes() { + std::vector>> hashes(128); + for (uint64_t i = 0; i < 128; i++) { + for (uint64_t j = 0; j < 64; j++) { + std::map> tmp; + tmp[j] = {i, j, i + j}; + hashes.emplace_back(tmp); + } + } + return hashes; +} + +std::vector +GenerateFingerprints() { + std::vector fingerprints; + + for (size_t i = 0; i < 4; i++) { + katana::DynamicBitset bset; + for (size_t j = 0; j < i; j++) { + bset.resize(j + 1); + bset.set(j); + } + fingerprints.emplace_back(std::move(bset)); + } + + return fingerprints; +} + +std::vector +GenerateSmiles() { + std::vector smiles = {"smile1", "smile2", "smile3", "smile4"}; + return smiles; +} + +std::vector> +GenerateIndices() { + std::vector> indices( + 128, std::vector(64)); + for (size_t i = 0; i < 128; i++) { + for (size_t j = 0; j < 64; j++) { + indices[i][j] = i + j; + } + } + return indices; +} + +katana::RDKLSHIndexPrimitive +GenerateLSHIndex() { + katana::RDKLSHIndexPrimitive index; + + std::vector fingerprints = GenerateFingerprints(); + index.set_num_hashes_per_bucket(16); + index.set_num_buckets(96); + index.set_fingerprint_length(42); + index.set_num_fingerprints(fingerprints.size()); + index.set_hash_structure(GenerateHashes()); + index.set_fingerprints(std::move(fingerprints)); + index.set_smiles(GenerateSmiles()); + return index; +} + +katana::RDKSubstructureIndexPrimitive +GenerateSubstructIndex() { + katana::RDKSubstructureIndexPrimitive index; + + std::vector fingerprints = GenerateFingerprints(); + auto smiles = GenerateSmiles(); + auto indices = GenerateIndices(); + KATANA_LOG_VASSERT( + smiles.size() == fingerprints.size(), "smiles = {}, finger = {}", + smiles.size(), fingerprints.size()); + + index.set_fp_size(indices.size()); + index.set_num_entries(smiles.size()); + index.set_index(std::move(indices)); + index.set_fingerprints(std::move(fingerprints)); + index.set_smiles(std::move(smiles)); + return index; +} + +void +ValidateLSHIndex(katana::RDKLSHIndexPrimitive& index) { + KATANA_LOG_ASSERT(index.num_hashes_per_bucket() == 16); + KATANA_LOG_ASSERT(index.num_buckets() == 96); + KATANA_LOG_ASSERT(index.fingerprint_length() == 42); + KATANA_LOG_ASSERT(index.num_fingerprints() == 4); + KATANA_LOG_ASSERT(index.hash_structure() == GenerateHashes()); + KATANA_LOG_ASSERT(index.fingerprints() == GenerateFingerprints()); + KATANA_LOG_ASSERT(index.smiles() == GenerateSmiles()); +} + +void +ValidateSubstructIndex(katana::RDKSubstructureIndexPrimitive& index) { + KATANA_LOG_ASSERT(index.fp_size() == 128); + KATANA_LOG_ASSERT(index.num_entries() == 4); + KATANA_LOG_ASSERT(index.index() == GenerateIndices()); + KATANA_LOG_ASSERT(index.fingerprints() == GenerateFingerprints()); + KATANA_LOG_ASSERT(index.smiles() == GenerateSmiles()); +} diff --git a/libtsuba/test/test-rdg.h b/libtsuba/test/test-rdg.h index c07cc8b5d2..b95c9f2bf3 100644 --- a/libtsuba/test/test-rdg.h +++ b/libtsuba/test/test-rdg.h @@ -74,6 +74,7 @@ WriteRDG(katana::RDG&& rdg_, std::string out_dir) { katana::Result LoadRDG(const std::string& rdg_name) { + KATANA_LOG_WARN("Loading RDG at location {}", rdg_name); katana::RDGManifest manifest = KATANA_CHECKED(katana::FindManifest(rdg_name)); katana::RDGFile rdg_file{ KATANA_CHECKED(katana::Open(std::move(manifest), katana::kReadWrite))}; @@ -89,13 +90,14 @@ find_file(const std::string& search_path, const std::string& substring) { search_path.find("file://") == std::string::npos, "Function cannot handle paths with the file:// prefix"); + KATANA_LOG_DEBUG("finding file matching {}", substring); const std::filesystem::directory_iterator end; try { for (std::filesystem::directory_iterator iter{search_path}; iter != end; iter++) { - const std::string file_name = iter->path().filename(); + const std::string file_name = iter->path().filename().string(); if (std::filesystem::is_regular_file(*iter)) { - if (file_name.find(substring)) { + if (file_name.find(substring) != std::string::npos) { return (iter->path().string()); } } diff --git a/python/katana/example_data.py b/python/katana/example_data.py index 3400d6f354..e5a5d34d84 100644 --- a/python/katana/example_data.py +++ b/python/katana/example_data.py @@ -19,7 +19,7 @@ # git sha of the datasets repo to download/cache if it is not available locally in the source # TODO(emcginnis) it would be really really nice if this got updated automatically # when the submodule ref held by open katana is updated -DATASETS_SHA = "9934dddf3e37ecc14a21cfc814b117eac0266156" +DATASETS_SHA = "219aaa51568c45e4c9599a160ee2b95265b06310" logger = logging.getLogger(__name__) diff --git a/python/test/test_storage_format.py b/python/test/test_storage_format.py index 2c971289dc..0f993784d2 100644 --- a/python/test/test_storage_format.py +++ b/python/test/test_storage_format.py @@ -60,6 +60,7 @@ def test_storage_format_unchanged_local(): # json file filename substrings manifest_filename_substring = "_rdg.manifest" part_header_filename_substring = "part_vers" +optional_manifest_filename_substring = "_manifest" def list_diff(li1, li2): @@ -171,6 +172,8 @@ def validate_rdg_storage_format_subset(subset_path, superset_path, verbose=False """ manifest_files = [] part_header_files = [] + # handle json files with names containing uuids differently + uuid_json_files = [] compare_paths = [] file_paths = [x for x in subset_path.glob("**/*") if x.is_file()] @@ -181,6 +184,8 @@ def validate_rdg_storage_format_subset(subset_path, superset_path, verbose=False manifest_files.append(path) elif part_header_filename_substring in path.name: part_header_files.append(path) + elif optional_manifest_filename_substring in path.name: + uuid_json_files.append(path) else: # all json files that we do not recognize will end up getting compared as normal files compare_paths.append(path) @@ -193,14 +198,29 @@ def validate_rdg_storage_format_subset(subset_path, superset_path, verbose=False for superset_file in superset_path.glob(f"{subset_no_rand}*"): if filecmp.cmp(subset_file, superset_file, shallow=False): if verbose: - print(f"{subset_file} in {subset_path} is equal to {superset_file} in {superset_path}") + print(f"[{subset_file}] in [{subset_path}] is equal to [{superset_file}] in [{superset_path}]") found = True if not found: - print(f"Failed to find matching file for {subset_no_rand}-* in {superset_path}") + print(f"Failed to find matching file for {subset_file} in {superset_path}") return False - # compare the json files + # compare the json files that have file names containing random strings + filecmp.clear_cache() + for subset_file in uuid_json_files: + subset_no_rand = remove_rand_string(subset_file.name) + found = False + for superset_file in superset_path.glob(f"{subset_no_rand}*"): + if json_match(subset_file, superset_file): + if verbose: + print(f"[{subset_file}] in [{subset_path}] is equal to [{superset_file}] in [{superset_path}]") + found = True + + if not found: + print(f"Failed to find matching file for {subset_file} in {superset_path}") + return False + + # compare the standard json files for manifest in manifest_files: if verbose: print(f"checking json manifest files {manifest}, {superset_path / manifest.name}") @@ -210,7 +230,7 @@ def validate_rdg_storage_format_subset(subset_path, superset_path, verbose=False for part_header in part_header_files: if verbose: - print(f"checking json part_header files {part_header}, {superset_path / part_header.name}") + print(f"checking json part_header files [{part_header}], [{superset_path / part_header.name}]") if not json_match(part_header, superset_path / part_header.name, verbose): print(f"json part_header file {part_header} does not match {superset_path / part_header.name}") return False diff --git a/tools/generate-maximal-storage-format-rdg/generate-maximal-storage-format-rdg.cpp b/tools/generate-maximal-storage-format-rdg/generate-maximal-storage-format-rdg.cpp index e8e66dc55d..ea88424422 100644 --- a/tools/generate-maximal-storage-format-rdg/generate-maximal-storage-format-rdg.cpp +++ b/tools/generate-maximal-storage-format-rdg/generate-maximal-storage-format-rdg.cpp @@ -2,9 +2,13 @@ #include +#include "../../libtsuba/test/storage-format-version/v6-optional-datastructure-rdk.h" #include "katana/Logging.h" #include "katana/PropertyGraph.h" #include "katana/RDG.h" +#include "katana/RDKLSHIndexPrimitive.h" +#include "katana/RDKSubstructureIndexPrimitive.h" +#include "katana/Result.h" #include "katana/SharedMemSys.h" #include "katana/analytics/Utils.h" #include "llvm/Support/CommandLine.h" @@ -53,21 +57,59 @@ StoreGraph(katana::PropertyGraph* g, std::string& output_path) { return output_path; } -void +/// Load/store cycle the provided RDG to cleanly relocate the graph without +/// Carrying along stale files +katana::PropertyGraph +CleanRelocateGraphLoad(const std::string& rdg_file) { + katana::PropertyGraph g_orig = LoadGraph(rdg_file); + auto uri_res = katana::Uri::MakeRand("/tmp/propertyfilegraph"); + KATANA_LOG_ASSERT(uri_res); + std::string tmp_rdg_dir(uri_res.value().path()); // path() because local + std::string tmp_path = StoreGraph(&g_orig, tmp_rdg_dir); + + katana::PropertyGraph g = LoadGraph(tmp_path); + return g; +} + +/// Load/store cycle the provided RDG to cleanly relocate the graph without +/// Carrying along stale files +std::string +CleanRelocateGraphStore(katana::PropertyGraph* g, std::string& output_path) { + auto uri_res = katana::Uri::MakeRand("/tmp/propertyfilegraph"); + KATANA_LOG_ASSERT(uri_res); + std::string tmp_rdg_dir_2(uri_res.value().path()); // path() because local + std::string g_tmp_rdg_file = StoreGraph(g, tmp_rdg_dir_2); + + katana::PropertyGraph g_new = LoadGraph(g_tmp_rdg_file); + std::string g_new_rdg_file = StoreGraph(&g_new, output_path); + + return g_new_rdg_file; +} + +katana::Result MaximizeGraph(std::string& input_rdg, std::string& output_path) { - katana::PropertyGraph g = LoadGraph(input_rdg); + katana::PropertyGraph g_tmp = CleanRelocateGraphLoad(input_rdg); // Add calls which add optional data structures to the RDG here - auto generated_sorted_view_sort1 = g.BuildView< + auto generated_sorted_view_sort1 = g_tmp.BuildView< katana::PropertyGraphViews::NodesSortedByDegreeEdgesSortedByDestID>(); auto generated_sorted_view_sort2 = - g.BuildView(); + g_tmp.BuildView(); auto generated_sorted_view_sort3 = - g.BuildView(); + g_tmp.BuildView(); + + katana::RDKLSHIndexPrimitive lsh = GenerateLSHIndex(); + katana::RDKSubstructureIndexPrimitive substruct = GenerateSubstructIndex(); + + KATANA_CHECKED(g_tmp.WriteRDKLSHIndexPrimitive(lsh)); + KATANA_CHECKED(g_tmp.WriteRDKSubstructureIndexPrimitive(substruct)); + + std::string g2_rdg_file = CleanRelocateGraphStore(&g_tmp, output_path); - std::string g2_rdg_file = StoreGraph(&g, output_path); KATANA_LOG_WARN( "maximized version of {} stored at {}", input_rdg, g2_rdg_file); + + return katana::ResultSuccess(); } int @@ -77,7 +119,10 @@ main(int argc, char** argv) { cll::ParseCommandLineOptions(argc, argv); KATANA_LOG_ASSERT(!InputFile.empty()); - MaximizeGraph(InputFile, OutputFile); + auto res = MaximizeGraph(InputFile, OutputFile); + if (!res) { + KATANA_LOG_FATAL("failed to generate maximal graph"); + } return 0; }