Skip to content

Commit

Permalink
RDG storage_format_version_6 (#904)
Browse files Browse the repository at this point in the history
support for optional datastructures
bump rdgs to storage_format_version_6
better logging in test_storage_format test
  • Loading branch information
e-mcginnis authored Feb 22, 2022
1 parent 42d08d0 commit 77b2655
Show file tree
Hide file tree
Showing 17 changed files with 312 additions and 195 deletions.
3 changes: 2 additions & 1 deletion cmake/Modules/TestDatasets.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set(MISC_TEST_DATASETS ${KATANA_TEST_DATASETS}/misc_datasets)

## latest supported rdg storage_format_version
#TODO(emcginnis) get this envar in RDGPartHeader.h instead of having to hard code it here and there
set(KATANA_RDG_STORAGE_FORMAT_VERSION "5")
set(KATANA_RDG_STORAGE_FORMAT_VERSION "6")


## returns path to the specified rdg dataset at the specified storage_format_version
Expand Down Expand Up @@ -85,3 +85,4 @@ rdg_dataset(RDG_GNN_K5_SINGLE "gnn_k5_single")
# Used the latest storage_format_version RDGs defined above instead
rdg_dataset_at_version(RDG_LDBC_003_V1 "ldbc_003" "1")
rdg_dataset_at_version(RDG_LDBC_003_V3 "ldbc_003" "3")
rdg_dataset_at_version(RDG_LDBC_003_V5 "ldbc_003" "5")
2 changes: 1 addition & 1 deletion external/test-datasets
Submodule test-datasets updated 697 files
2 changes: 1 addition & 1 deletion libtsuba/include/katana/RDGManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class KATANA_EXPORT RDGManifest {
std::string ToJsonString() const;

/// Return the set of file names that hold this RDG's data by reading partition files
/// Useful to garbage collect unused files
/// Useful to garbage collect unused files, and copy an RDG to a new location
katana::Result<std::set<std::string>> FileNames();

// Required by nlohmann
Expand Down
3 changes: 2 additions & 1 deletion libtsuba/include/katana/RDGStorageFormatVersion.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ static const uint32_t kPartitionStorageFormatVersion2 = 2;
static const uint32_t kPartitionStorageFormatVersion3 = 3;
static const uint32_t kPartitionStorageFormatVersion4 = 4;
static const uint32_t kPartitionStorageFormatVersion5 = 5;
static const uint32_t kPartitionStorageFormatVersion6 = 6;

/// kLatestPartitionStorageFormatVersion to be bumped any time
/// the on disk format of RDGPartHeader changes
static const uint32_t kLatestPartitionStorageFormatVersion =
kPartitionStorageFormatVersion5;
kPartitionStorageFormatVersion6;

}; // namespace katana

Expand Down
5 changes: 4 additions & 1 deletion libtsuba/include/katana/RDKLSHIndexPrimitive.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace katana {

const std::string kOptionalDatastructureRDKLSHIndexPrimitive =
"kg.v1.rdk_lsh_index";
const std::string kOptionalDatastructureRDKLSHIndexPrimitiveFilename =
"rdk_lsh_index_manifest";

class KATANA_EXPORT RDKLSHIndexPrimitive
: private katana::RDGOptionalDatastructure {
Expand All @@ -35,7 +37,8 @@ class KATANA_EXPORT RDKLSHIndexPrimitive

katana::Result<std::string> Write(katana::Uri rdg_dir_path) {
// Write out our json manifest
katana::Uri manifest_path = rdg_dir_path.RandFile("rdk_lsh_index_manifest");
katana::Uri manifest_path = rdg_dir_path.RandFile(
kOptionalDatastructureRDKLSHIndexPrimitiveFilename);
KATANA_CHECKED(WriteManifest(manifest_path.string()));
return manifest_path.BaseName();
}
Expand Down
6 changes: 4 additions & 2 deletions libtsuba/include/katana/RDKSubstructureIndexPrimitive.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace katana {

const std::string kOptionalDatastructureRDKSubstructureIndexPrimitive =
"kg.v1.rdk_substructure_index";
const std::string kOptionalDatastructureRDKSubstructureIndexPrimitiveFilename =
"rdk_substructure_index_manifest";

class KATANA_EXPORT RDKSubstructureIndexPrimitive
: private katana::RDGOptionalDatastructure {
Expand All @@ -35,8 +37,8 @@ class KATANA_EXPORT RDKSubstructureIndexPrimitive

katana::Result<std::string> Write(katana::Uri rdg_dir_path) {
// Write out our json manifest
katana::Uri manifest_path =
rdg_dir_path.RandFile("rdk_substructure_index_manifest");
katana::Uri manifest_path = rdg_dir_path.RandFile(
kOptionalDatastructureRDKSubstructureIndexPrimitiveFilename);
KATANA_CHECKED(WriteManifest(manifest_path.string()));
return manifest_path.BaseName();
}
Expand Down
24 changes: 0 additions & 24 deletions libtsuba/src/RDG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1079,12 +1079,6 @@ katana::RDG::edge_entity_type_id_array() const {

katana::Result<std::optional<katana::RDKLSHIndexPrimitive>>
katana::RDG::LoadRDKLSHIndexPrimitive() {
if (!KATANA_EXPERIMENTAL_ENABLED(UnstableRDGStorageFormat)) {
return KATANA_ERROR(
ErrorCode::InvalidArgument,
"The UnstableRDGStorageFormat feature flag must be set to use this "
"feature");
}
std::optional<std::string> res =
KATANA_CHECKED(core_->part_header().OptionalDatastructureManifest(
kOptionalDatastructureRDKLSHIndexPrimitive));
Expand All @@ -1100,12 +1094,6 @@ katana::RDG::LoadRDKLSHIndexPrimitive() {

katana::Result<void>
katana::RDG::WriteRDKLSHIndexPrimitive(katana::RDKLSHIndexPrimitive& index) {
if (!KATANA_EXPERIMENTAL_ENABLED(UnstableRDGStorageFormat)) {
return KATANA_ERROR(
ErrorCode::InvalidArgument,
"The UnstableRDGStorageFormat feature flag must be set to use this "
"feature");
}
std::string path = KATANA_CHECKED(index.Write(rdg_dir()));
core_->part_header().AppendOptionalDatastructureManifest(
kOptionalDatastructureRDKLSHIndexPrimitive, path);
Expand All @@ -1115,12 +1103,6 @@ katana::RDG::WriteRDKLSHIndexPrimitive(katana::RDKLSHIndexPrimitive& index) {

katana::Result<std::optional<katana::RDKSubstructureIndexPrimitive>>
katana::RDG::LoadRDKSubstructureIndexPrimitive() {
if (!KATANA_EXPERIMENTAL_ENABLED(UnstableRDGStorageFormat)) {
return KATANA_ERROR(
ErrorCode::InvalidArgument,
"The UnstableRDGStorageFormat feature flag must be set to use this "
"feature");
}
std::optional<std::string> res =
KATANA_CHECKED(core_->part_header().OptionalDatastructureManifest(
kOptionalDatastructureRDKSubstructureIndexPrimitive));
Expand All @@ -1138,12 +1120,6 @@ katana::RDG::LoadRDKSubstructureIndexPrimitive() {
katana::Result<void>
katana::RDG::WriteRDKSubstructureIndexPrimitive(
katana::RDKSubstructureIndexPrimitive& index) {
if (!KATANA_EXPERIMENTAL_ENABLED(UnstableRDGStorageFormat)) {
return KATANA_ERROR(
ErrorCode::InvalidArgument,
"The UnstableRDGStorageFormat feature flag must be set to use this "
"feature");
}
std::string path = KATANA_CHECKED(index.Write(rdg_dir()));
core_->part_header().AppendOptionalDatastructureManifest(
kOptionalDatastructureRDKSubstructureIndexPrimitive, path);
Expand Down
28 changes: 27 additions & 1 deletion libtsuba/src/RDGManifest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "katana/FileView.h"
#include "katana/JSON.h"
#include "katana/ParquetReader.h"
#include "katana/RDGOptionalDatastructure.h"
#include "katana/Result.h"
#include "katana/tsuba.h"

Expand Down Expand Up @@ -215,10 +216,28 @@ AddPropertySubFiles(std::set<std::string>& fnames, std::string full_path) {
}
return katana::ResultSuccess();
}

katana::Result<void>
AddOptionalDatastructureSubfiles(
std::set<std::string>& fnames, std::string full_path) {
katana::FileView fv;
KATANA_CHECKED(fv.Bind(full_path, true));
katana::RDGOptionalDatastructure data;
KATANA_CHECKED(
katana::JsonParse<katana::RDGOptionalDatastructure>(fv, &data));

// copy over any extra files the optional datastructure relies on
// Assumes that all OptionalDatastructures properly extend the RDGOptionalDatastructure class
for (const auto& file : data.paths()) {
fnames.emplace(file.second);
}
return katana::ResultSuccess();
}

} // namespace

// Return the set of file names that hold this RDG's data by reading partition files
// Useful to garbage collect unused files
// Useful to garbage collect unused files, and copy an RDG to a new location
katana::Result<std::set<std::string>>
katana::RDGManifest::FileNames() {
std::set<std::string> fnames{};
Expand All @@ -238,6 +257,7 @@ katana::RDGManifest::FileNames() {
version(), view_specifier(), header_res.error());
} else {
auto header = std::move(header_res.value());

for (const auto& node_prop : header.node_prop_info_list()) {
fnames.emplace(node_prop.path());
KATANA_CHECKED(AddPropertySubFiles(
Expand All @@ -264,6 +284,12 @@ katana::RDGManifest::FileNames() {
for (size_t i = 0; i < header.topology_metadata()->num_entries(); i++) {
fnames.emplace(header.topology_metadata()->Entries().at(i).path_);
}

for (auto it : header.optional_datastructure_manifests()) {
fnames.emplace(it.first);
KATANA_CHECKED(AddOptionalDatastructureSubfiles(
fnames, katana::Uri::JoinPath(dir().string(), it.second)));
}
}
}
return fnames;
Expand Down
63 changes: 24 additions & 39 deletions libtsuba/src/RDGPartHeader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,42 +345,25 @@ katana::to_json(json& j, const katana::RDGPartHeader& header) {
"RDGPartHeader.unstable_storage_format_ is true");
}

if (KATANA_EXPERIMENTAL_ENABLED(UnstableRDGStorageFormat)) {
j = json{
{kNodePropertyKey, header.node_prop_info_list_},
{kEdgePropertyKey, header.edge_prop_info_list_},
{kPartPropertyFilesKey, header.part_prop_info_list_},
{kPartPropertyMetaKey, header.metadata_},
{kStorageFormatVersionKey, header.storage_format_version_},
{kUnstableStorageFormatFlagKey, header.unstable_storage_format_},
{kNodeEntityTypeIDArrayPathKey, header.node_entity_type_id_array_path_},
{kEdgeEntityTypeIDArrayPathKey, header.edge_entity_type_id_array_path_},
{kNodeEntityTypeIDDictionaryKey,
header.node_entity_type_id_dictionary_},
{kEdgeEntityTypeIDDictionaryKey,
header.edge_entity_type_id_dictionary_},
{kNodeEntityTypeIDNameKey, header.node_entity_type_id_name_},
{kEdgeEntityTypeIDNameKey, header.edge_entity_type_id_name_},
{kPartitionTopologyMetadataKey, header.topology_metadata_},
{kOptionalDatastructuresKey, header.optional_datastructure_manifests_}};
} else {
j = json{
{kNodePropertyKey, header.node_prop_info_list_},
{kEdgePropertyKey, header.edge_prop_info_list_},
{kPartPropertyFilesKey, header.part_prop_info_list_},
{kPartPropertyMetaKey, header.metadata_},
{kStorageFormatVersionKey, header.storage_format_version_},
{kUnstableStorageFormatFlagKey, header.unstable_storage_format_},
{kNodeEntityTypeIDArrayPathKey, header.node_entity_type_id_array_path_},
{kEdgeEntityTypeIDArrayPathKey, header.edge_entity_type_id_array_path_},
{kNodeEntityTypeIDDictionaryKey,
header.node_entity_type_id_dictionary_},
{kEdgeEntityTypeIDDictionaryKey,
header.edge_entity_type_id_dictionary_},
{kNodeEntityTypeIDNameKey, header.node_entity_type_id_name_},
{kEdgeEntityTypeIDNameKey, header.edge_entity_type_id_name_},
{kPartitionTopologyMetadataKey, header.topology_metadata_}};
}
KATANA_LOG_DEBUG(
"Storing paths to {} optional datastructure manifests",
header.optional_datastructure_manifests_.size());

j = json{
{kNodePropertyKey, header.node_prop_info_list_},
{kEdgePropertyKey, header.edge_prop_info_list_},
{kPartPropertyFilesKey, header.part_prop_info_list_},
{kPartPropertyMetaKey, header.metadata_},
{kStorageFormatVersionKey, header.storage_format_version_},
{kUnstableStorageFormatFlagKey, header.unstable_storage_format_},
{kNodeEntityTypeIDArrayPathKey, header.node_entity_type_id_array_path_},
{kEdgeEntityTypeIDArrayPathKey, header.edge_entity_type_id_array_path_},
{kNodeEntityTypeIDDictionaryKey, header.node_entity_type_id_dictionary_},
{kEdgeEntityTypeIDDictionaryKey, header.edge_entity_type_id_dictionary_},
{kNodeEntityTypeIDNameKey, header.node_entity_type_id_name_},
{kEdgeEntityTypeIDNameKey, header.edge_entity_type_id_name_},
{kPartitionTopologyMetadataKey, header.topology_metadata_},
{kOptionalDatastructuresKey, header.optional_datastructure_manifests_}};
}

void
Expand Down Expand Up @@ -478,11 +461,13 @@ katana::from_json(const json& j, katana::RDGPartHeader& header) {
header.topology_metadata_.Append(entry);
}

// Version N added optional data structures
// TODO(emcginnis) unstable for now
if (header.unstable_storage_format_) {
// Version 6 added optional data structures
if (header.storage_format_version_ >= kPartitionStorageFormatVersion6) {
j.at(kOptionalDatastructuresKey)
.get_to(header.optional_datastructure_manifests_);
KATANA_LOG_DEBUG(
"Loaded {} optional datastructure manifests",
header.optional_datastructure_manifests_.size());
}
}

Expand Down
22 changes: 21 additions & 1 deletion libtsuba/src/RDGPartHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,15 @@ class KATANA_EXPORT RDGPartHeader {
if (search != optional_datastructure_manifests_.end()) {
return search->second;
}
KATANA_LOG_DEBUG(
KATANA_LOG_WARN(
"No optional datastructure manifest available matching [{}]",
optional_datastructure_name);
KATANA_LOG_WARN(
"Available optional datastructures, size = {}:",
optional_datastructure_manifests_.size());
for (auto it : optional_datastructure_manifests_) {
KATANA_LOG_WARN(" {} : {}", it.first, it.second);
}
return std::nullopt;
}

Expand All @@ -489,8 +495,22 @@ class KATANA_EXPORT RDGPartHeader {
const std::string& optional_datastructure_path) {
optional_datastructure_manifests_.emplace(
optional_datastructure_name, optional_datastructure_path);
KATANA_LOG_DEBUG(
"Appended optional datastructure manifest {}, at path {}, total count "
"= {}",
optional_datastructure_name, optional_datastructure_path,
optional_datastructure_manifests_.size());
}

const std::unordered_map<std::string, std::string>&
optional_datastructure_manifests() const {
return optional_datastructure_manifests_;
}

/// Return the set of file names that hold this partitions data
/// Useful to garbage collect unused files, and copy an RDG to a new location
katana::Result<std::set<std::string>> FileNames();

friend void to_json(nlohmann::json& j, const RDGPartHeader& header);
friend void from_json(const nlohmann::json& j, RDGPartHeader& header);

Expand Down
23 changes: 19 additions & 4 deletions libtsuba/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,31 @@ add_test(NAME ${name} COMMAND ${test_name} ${RDG_LDBC_003_V3})



set(name storage-format-version-v4-v5-optional-datastructure-rdk)
# Test upgrading a v5 RDG to v6 and adding rdg optional datastructures
set(name storage-format-version-v5-v6-optional-datastructure-rdk)
set(test_name ${name}-test)
add_test_dataset_fixture(${PROJECT_BINARY_DIR} ${RDG_LDBC_003_V5} -${name} tmp_input_location input-setup-fixture-group)
add_executable(${test_name} storage-format-version/v6-optional-datastructure-rdk.cpp)
target_link_libraries(${test_name} katana_tsuba)
target_link_libraries(${test_name} katana_galois)
target_include_directories(${test_name} PRIVATE ../src)
add_test(NAME ${name} COMMAND ${test_name} ${tmp_input_location})
set_property(TEST ${name} APPEND PROPERTY LABELS quick)
set_tests_properties(${name} PROPERTIES LABELS quick)
set_property(TEST ${name}
APPEND PROPERTY
FIXTURES_REQUIRED ${input-setup-fixture-group})


# Test adding rdk optional datastructures to an RDG that already supports them
set(name storage-format-version-v6-v6-optional-datastructure-rdk)
set(test_name ${name}-test)
add_test_dataset_fixture(${PROJECT_BINARY_DIR} ${RDG_LDBC_003} -${name} tmp_input_location input-setup-fixture-group)
add_executable(${test_name} storage-format-version/v5-optional-datastructure-rdk.cpp)
add_executable(${test_name} storage-format-version/v6-optional-datastructure-rdk.cpp)
target_link_libraries(${test_name} katana_tsuba)
target_link_libraries(${test_name} katana_galois)
target_include_directories(${test_name} PRIVATE ../src)
add_test(NAME ${name} COMMAND ${test_name} ${tmp_input_location})
set_tests_properties(${name} PROPERTIES
ENVIRONMENT KATANA_ENABLE_EXPERIMENTAL=UnstableRDGStorageFormat)
set_property(TEST ${name} APPEND PROPERTY LABELS quick)
set_tests_properties(${name} PROPERTIES LABELS quick)
set_property(TEST ${name}
Expand Down
Loading

0 comments on commit 77b2655

Please sign in to comment.