Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp-core): Add the write path for single-file archives. #646

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a5ef64f
first draft
davemarco Dec 28, 2024
615fafd
small changes
davemarco Dec 28, 2024
b84c2da
small changes
davemarco Dec 28, 2024
d1ec9fe
refactor
davemarco Dec 28, 2024
869f1b3
refactor
davemarco Dec 28, 2024
d84c002
refactor
davemarco Dec 28, 2024
f4136f8
refactor
davemarco Dec 28, 2024
6bbf12e
refactor
davemarco Dec 28, 2024
5c75147
remove extra line
davemarco Dec 28, 2024
c1f12df
fix lint
davemarco Dec 28, 2024
0ab6e0e
fix lint
davemarco Dec 28, 2024
d4ed4f6
fix lint
davemarco Dec 28, 2024
82b9802
fix lint?
davemarco Dec 29, 2024
393049b
fix lint?
davemarco Dec 29, 2024
5428403
fix lint
davemarco Dec 29, 2024
7e261f7
fix lint
davemarco Dec 29, 2024
0bd9b27
fix lint
davemarco Dec 29, 2024
d207630
haiqi review
davemarco Jan 13, 2025
c2fd37d
haiqi review
davemarco Jan 13, 2025
cdafb8d
haiqi review
davemarco Jan 16, 2025
dcadf04
haiqi review
davemarco Jan 16, 2025
265b4e4
add initializer for variable
davemarco Jan 16, 2025
c7361c2
attempt to fix lint
davemarco Jan 16, 2025
910e558
saving previous work
davemarco Jan 27, 2025
3ab46fd
merge
davemarco Feb 9, 2025
a12ac42
moving to new metadata, sfav2
davemarco Feb 9, 2025
4c26177
small changes
davemarco Feb 9, 2025
7916bb9
small changes
davemarco Feb 9, 2025
7fc5cb9
latest changes
davemarco Feb 9, 2025
0ae4822
small
davemarco Feb 9, 2025
1b7d73e
small changes
davemarco Feb 9, 2025
7b68544
small changes
davemarco Feb 9, 2025
05377b8
haiqi review
davemarco Feb 10, 2025
e0fcefe
small fix
davemarco Feb 10, 2025
d8a3c70
haiqi review remove offset function
davemarco Feb 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions components/core/src/clp/clp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ set(
../streaming_archive/reader/Segment.hpp
../streaming_archive/reader/SegmentManager.cpp
../streaming_archive/reader/SegmentManager.hpp
../streaming_archive/single_file_archive/Defs.hpp
../streaming_archive/single_file_archive/writer.cpp
../streaming_archive/single_file_archive/writer.hpp
../streaming_archive/writer/Archive.cpp
../streaming_archive/writer/Archive.hpp
../streaming_archive/writer/File.cpp
Expand Down
4 changes: 4 additions & 0 deletions components/core/src/clp/clp/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
->default_value(m_schema_file_path),
"Path to a schema file. If not specified, heuristics are used to determine "
"dictionary variables. See README-Schema.md for details."
)(
"single-file-archive",
po::bool_switch(&m_single_file_archive),
"Output archive as a single-file"
davemarco marked this conversation as resolved.
Show resolved Hide resolved
);

po::options_description all_compression_options;
Expand Down
4 changes: 4 additions & 0 deletions components/core/src/clp/clp/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class CommandLineArguments : public CommandLineArgumentsBase {
explicit CommandLineArguments(std::string const& program_name)
: CommandLineArgumentsBase(program_name),
m_show_progress(false),
m_single_file_archive(false),
m_sort_input_files(true),
m_print_archive_stats_progress(false),
m_target_segment_uncompressed_size(1L * 1024 * 1024 * 1024),
Expand All @@ -45,6 +46,8 @@ class CommandLineArguments : public CommandLineArgumentsBase {

bool show_progress() const { return m_show_progress; }

bool get_use_single_file_archive() const { return m_single_file_archive; }
davemarco marked this conversation as resolved.
Show resolved Hide resolved

bool sort_input_files() const { return m_sort_input_files; }

bool print_archive_stats_progress() const { return m_print_archive_stats_progress; }
Expand Down Expand Up @@ -92,6 +95,7 @@ class CommandLineArguments : public CommandLineArgumentsBase {
std::string m_output_dir;
std::string m_schema_file_path;
bool m_show_progress;
bool m_single_file_archive;
bool m_print_archive_stats_progress;
size_t m_target_encoded_file_size;
size_t m_target_segment_uncompressed_size;
Expand Down
12 changes: 9 additions & 3 deletions components/core/src/clp/clp/FileCompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ void FileCompressor::parse_and_encode_with_heuristic(

// Parse content from file
while (m_message_parser.parse_next_message(true, reader, m_parsed_message)) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts
davemarco marked this conversation as resolved.
Show resolved Hide resolved
&& false == archive_writer.get_use_single_file_archive())
{
split_file_and_archive(
archive_user_config,
path_for_compression,
Expand Down Expand Up @@ -337,7 +339,9 @@ bool FileCompressor::try_compressing_as_archive(
parent_directories.emplace(file_parent_path);
}

if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts
&& false == archive_writer.get_use_single_file_archive())
{
split_archive(archive_user_config, archive_writer);
}

Expand Down Expand Up @@ -537,7 +541,9 @@ std::error_code FileCompressor::compress_ir_stream_by_encoding(
}

// Split archive/encoded file if necessary before writing the new event
if (archive.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
if (archive.get_data_size_of_dictionaries() >= target_data_size_of_dicts
&& false == archive.get_use_single_file_archive())
{
split_file_and_archive(
archive_user_config,
path,
Expand Down
9 changes: 7 additions & 2 deletions components/core/src/clp/clp/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ bool compress(
archive_user_config.global_metadata_db = global_metadata_db.get();
archive_user_config.print_archive_stats_progress
= command_line_args.print_archive_stats_progress();
archive_user_config.use_single_file_archive = command_line_args.get_use_single_file_archive();

// Open Archive
streaming_archive::writer::Archive archive_writer;
Expand Down Expand Up @@ -135,7 +136,9 @@ bool compress(
);
}
for (auto it = files_to_compress.cbegin(); it != files_to_compress.cend(); ++it) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBD

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for this

&& false == archive_writer.get_use_single_file_archive())
{
split_archive(archive_user_config, archive_writer);
}
if (false
Expand Down Expand Up @@ -163,7 +166,9 @@ bool compress(
file_group_id_comparator);
// Compress grouped files
for (auto const& file_to_compress : grouped_files_to_compress) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries
&& false == archive_writer.get_use_single_file_archive())
{
split_archive(archive_user_config, archive_writer);
}
if (false
Expand Down
22 changes: 22 additions & 0 deletions components/core/src/clp/streaming_archive/ArchiveMetadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
#include <cstdint>

#include "../Defs.h"
#include "../ffi/encoding_methods.hpp"
#include "../FileReader.hpp"
#include "../FileWriter.hpp"
#include "Constants.hpp"

namespace clp::streaming_archive {

static constexpr std::string_view cCompressionTypeZstd = "ZSTD";

/**
* A class to encapsulate metadata directly relating to an archive.
*/
Expand Down Expand Up @@ -79,6 +83,18 @@ class ArchiveMetadata {

[[nodiscard]] auto get_end_timestamp() const { return m_end_timestamp; }

[[nodiscard]] auto get_variable_encoding_methods_version() const -> std::string const& {
return m_variable_encoding_methods_version;
}

[[nodiscard]] auto get_variables_schema_version() const -> std::string const& {
return m_variables_schema_version;
}

[[nodiscard]] auto get_compression_type() const -> std::string const& {
return m_compression_type;
}

/**
* Expands the archive's time range based to encompass the given time range
* @param begin_timestamp
Expand All @@ -102,6 +118,12 @@ class ArchiveMetadata {
// The size of the archive
uint64_t m_compressed_size{0};
uint64_t m_dynamic_compressed_size{0};
// TODO: The following fields are used in single-file archive; however, they are not
// currently part of multi-file archive metadata. Modifying multi-file archive metadata
// disk format is potentially a breaking change and not currently required.
std::string m_variable_encoding_methods_version{ffi::cVariableEncodingMethodsVersion};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess a high level question is. how are those constants related to multi-file-archive?

Technically, since they are not used by anything else, they can also be directly defined under struct "single_archive_metadata"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally these fields are written to multi-file archive metadata. However, since this is actually changing multi-file archives, I didn't want to make the change until we here from @kirkrodrigues. If there is no plan to add these to the multi-file metadata, perhaps we can move them into single file archive files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I didn't fully answer your question, but I asked kirk about these fields earlier and supposedly they are also relevant to multi file archive

std::string m_variables_schema_version{ffi::cVariablesSchemaVersion};
std::string m_compression_type{cCompressionTypeZstd};
};
} // namespace clp::streaming_archive

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#ifndef CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP
#define CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP

#include <cstdint>
#include <string>

#include "../clp/Defs.h"
#include "../Constants.hpp"
#include "msgpack.hpp"

namespace clp::streaming_archive::single_file_archive {

using single_file_archive_format_version_t = uint32_t;

// Single file archive version.
constexpr uint8_t cArchiveMajorVersion{0};
constexpr uint8_t cArchiveMinorVersion{1};
constexpr uint16_t cArchivePatchVersion{1};
constexpr single_file_archive_format_version_t cArchiveVersion{
cArchiveMajorVersion << 24 | cArchiveMinorVersion << 16 | cArchivePatchVersion
};

static constexpr size_t cNumMagicNumberChars{4};
static constexpr std::array<uint8_t, cNumMagicNumberChars>
cUnstructuredSfaMagicNumber{'Y', 'C', 'L', 'P'};
static constexpr std::string_view cUnstructuredSfaExtension{".clp"};
static constexpr size_t cFileSizeWarningThreshold{100L * 1024 * 1024};

static constexpr size_t cNumStaticFiles{5};
constexpr std::array<char const*, cNumStaticFiles> cStaticArchiveFileNames{
cMetadataDBFileName,
cLogTypeDictFilename,
cLogTypeSegmentIndexFilename,
cVarDictFilename,
cVarSegmentIndexFilename
};

static constexpr size_t cNumUnused{6};

struct __attribute__((packed)) SingleFileArchiveHeader {
std::array<uint8_t, cNumMagicNumberChars> magic;
single_file_archive_format_version_t version;
uint64_t metadata_size;
std::array<uint64_t, cNumUnused> unused;
};

struct FileInfo {
davemarco marked this conversation as resolved.
Show resolved Hide resolved
std::string n;
uint64_t o;
MSGPACK_DEFINE_MAP(n, o);
};

struct MultiFileArchiveMetadata {
davemarco marked this conversation as resolved.
Show resolved Hide resolved
archive_format_version_t archive_format_version;
std::string variable_encoding_methods_version;
std::string variables_schema_version;
std::string compression_type;
std::string creator_id;
epochtime_t begin_timestamp;
epochtime_t end_timestamp;
uint64_t uncompressed_size;
uint64_t compressed_size;
MSGPACK_DEFINE_MAP(
archive_format_version,
variable_encoding_methods_version,
variables_schema_version,
compression_type,
creator_id,
begin_timestamp,
end_timestamp,
uncompressed_size,
compressed_size
);
};

struct SingleFileArchiveMetadata {
std::vector<FileInfo> archive_files;
MultiFileArchiveMetadata archive_metadata;
uint64_t num_segments;
MSGPACK_DEFINE_MAP(archive_files, archive_metadata, num_segments);
};
} // namespace clp::streaming_archive::single_file_archive

#endif // CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP
Loading
Loading