Skip to content

Commit

Permalink
feat(clp-s): Add support for reading and searching single file archiv…
Browse files Browse the repository at this point in the history
…es. (y-scope#656)

Co-authored-by: wraymo <[email protected]>
  • Loading branch information
gibber9809 and wraymo authored Jan 15, 2025
1 parent 1ecd9c7 commit 8f00463
Show file tree
Hide file tree
Showing 23 changed files with 698 additions and 269 deletions.
2 changes: 2 additions & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ add_subdirectory(src/reducer)
set(SOURCE_FILES_clp_s_unitTest
src/clp_s/ArchiveReader.cpp
src/clp_s/ArchiveReader.hpp
src/clp_s/ArchiveReaderAdaptor.cpp
src/clp_s/ArchiveReaderAdaptor.hpp
src/clp_s/ArchiveWriter.cpp
src/clp_s/ArchiveWriter.hpp
src/clp_s/ColumnReader.cpp
Expand Down
51 changes: 25 additions & 26 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <string_view>

#include "archive_constants.hpp"
#include "ArchiveReaderAdaptor.hpp"
#include "InputConfig.hpp"
#include "ReaderUtils.hpp"

Expand All @@ -20,35 +21,28 @@ void ArchiveReader::open(Path const& archive_path, NetworkAuthOption const& netw
throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__);
}

if (InputSource::Filesystem != archive_path.source) {
throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__);
}
m_archive_reader_adaptor = std::make_shared<ArchiveReaderAdaptor>(archive_path, network_auth);

if (false == std::filesystem::is_directory(archive_path.path)) {
throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__);
if (auto const rc = m_archive_reader_adaptor->load_archive_metadata(); ErrorCodeSuccess != rc) {
throw OperationFailed(rc, __FILENAME__, __LINE__);
}
auto const archive_path_str = archive_path.path;

m_var_dict = ReaderUtils::get_variable_dictionary_reader(archive_path_str);
m_log_dict = ReaderUtils::get_log_type_dictionary_reader(archive_path_str);
m_array_dict = ReaderUtils::get_array_dictionary_reader(archive_path_str);
m_timestamp_dict = ReaderUtils::get_timestamp_dictionary_reader(archive_path_str);

m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str);
m_schema_map = ReaderUtils::read_schemas(archive_path_str);
m_schema_tree = ReaderUtils::read_schema_tree(*m_archive_reader_adaptor);
m_schema_map = ReaderUtils::read_schemas(*m_archive_reader_adaptor);

m_log_event_idx_column_id = m_schema_tree->get_metadata_field_id(constants::cLogEventIdxName);

m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile);
m_stream_reader.open_packed_streams(archive_path_str + constants::cArchiveTablesFile);
m_var_dict = ReaderUtils::get_variable_dictionary_reader(*m_archive_reader_adaptor);
m_log_dict = ReaderUtils::get_log_type_dictionary_reader(*m_archive_reader_adaptor);
m_array_dict = ReaderUtils::get_array_dictionary_reader(*m_archive_reader_adaptor);
}

void ArchiveReader::read_metadata() {
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB
m_table_metadata_decompressor.open(
m_table_metadata_file_reader,
cDecompressorFileReadBufferCapacity
auto table_metadata_reader = m_archive_reader_adaptor->checkout_reader_for_section(
constants::cArchiveTableMetadataFile
);
m_table_metadata_decompressor.open(*table_metadata_reader, cDecompressorFileReadBufferCapacity);

m_stream_reader.read_metadata(m_table_metadata_decompressor);

Expand Down Expand Up @@ -131,14 +125,19 @@ void ArchiveReader::read_metadata() {
- prev_metadata.stream_offset;
m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
m_table_metadata_decompressor.close();

m_archive_reader_adaptor->checkin_reader_for_section(constants::cArchiveTableMetadataFile);
}

void ArchiveReader::read_dictionaries_and_metadata() {
m_var_dict->read_new_entries();
m_log_dict->read_new_entries();
m_array_dict->read_new_entries();
m_timestamp_dict->read_new_entries();
read_metadata();
m_var_dict->read_entries();
m_log_dict->read_entries();
m_array_dict->read_entries();
}

void ArchiveReader::open_packed_streams() {
m_stream_reader.open_packed_streams(m_archive_reader_adaptor);
}

SchemaReader& ArchiveReader::read_schema_table(
Expand Down Expand Up @@ -205,7 +204,7 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3
column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_array_dict, true);
break;
case NodeType::DateString:
column_reader = new DateStringColumnReader(column_id, m_timestamp_dict);
column_reader = new DateStringColumnReader(column_id, get_timestamp_dictionary());
break;
// No need to push columns without associated object readers into the SchemaReader.
case NodeType::Metadata:
Expand Down Expand Up @@ -288,7 +287,8 @@ void ArchiveReader::initialize_schema_reader(
m_id_to_schema_metadata[schema_id].num_messages,
should_marshal_records
);
auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids();
auto timestamp_column_ids
= get_timestamp_dictionary()->get_authoritative_timestamp_column_ids();
for (size_t i = 0; i < schema.size(); ++i) {
int32_t column_id = schema[i];
if (Schema::schema_entry_is_unordered_object(column_id)) {
Expand Down Expand Up @@ -355,10 +355,9 @@ void ArchiveReader::close() {
m_var_dict->close();
m_log_dict->close();
m_array_dict->close();
m_timestamp_dict->close();

m_stream_reader.close();
m_table_metadata_file_reader.close();
m_archive_reader_adaptor.reset();

m_id_to_schema_metadata.clear();
m_schema_ids.clear();
Expand Down
26 changes: 11 additions & 15 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <string_view>
#include <utility>

#include "ArchiveReaderAdaptor.hpp"
#include "DictionaryReader.hpp"
#include "InputConfig.hpp"
#include "PackedStreamReader.hpp"
Expand Down Expand Up @@ -41,13 +42,18 @@ class ArchiveReader {
*/
void read_dictionaries_and_metadata();

/**
* Opens packed streams for reading.
*/
void open_packed_streams();

/**
* Reads the variable dictionary from the archive.
* @param lazy
* @return the variable dictionary reader
*/
std::shared_ptr<VariableDictionaryReader> read_variable_dictionary(bool lazy = false) {
m_var_dict->read_new_entries(lazy);
m_var_dict->read_entries(lazy);
return m_var_dict;
}

Expand All @@ -57,7 +63,7 @@ class ArchiveReader {
* @return the log type dictionary reader
*/
std::shared_ptr<LogTypeDictionaryReader> read_log_type_dictionary(bool lazy = false) {
m_log_dict->read_new_entries(lazy);
m_log_dict->read_entries(lazy);
return m_log_dict;
}

Expand All @@ -67,7 +73,7 @@ class ArchiveReader {
* @return the array dictionary reader
*/
std::shared_ptr<LogTypeDictionaryReader> read_array_dictionary(bool lazy = false) {
m_array_dict->read_new_entries(lazy);
m_array_dict->read_entries(lazy);
return m_array_dict;
}

Expand All @@ -76,15 +82,6 @@ class ArchiveReader {
*/
void read_metadata();

/**
* Reads the local timestamp dictionary from the archive.
* @return the timestamp dictionary reader
*/
std::shared_ptr<TimestampDictionaryReader> read_timestamp_dictionary() {
m_timestamp_dict->read_new_entries();
return m_timestamp_dict;
}

/**
* Reads a table from the archive.
* @param schema_id
Expand Down Expand Up @@ -113,7 +110,7 @@ class ArchiveReader {
std::shared_ptr<LogTypeDictionaryReader> get_array_dictionary() { return m_array_dict; }

std::shared_ptr<TimestampDictionaryReader> get_timestamp_dictionary() {
return m_timestamp_dict;
return m_archive_reader_adaptor->get_timestamp_dictionary();
}

std::shared_ptr<SchemaTree> get_schema_tree() { return m_schema_tree; }
Expand Down Expand Up @@ -201,7 +198,7 @@ class ArchiveReader {
std::shared_ptr<VariableDictionaryReader> m_var_dict;
std::shared_ptr<LogTypeDictionaryReader> m_log_dict;
std::shared_ptr<LogTypeDictionaryReader> m_array_dict;
std::shared_ptr<TimestampDictionaryReader> m_timestamp_dict;
std::shared_ptr<ArchiveReaderAdaptor> m_archive_reader_adaptor;

std::shared_ptr<SchemaTree> m_schema_tree;
std::shared_ptr<ReaderUtils::SchemaMap> m_schema_map;
Expand All @@ -212,7 +209,6 @@ class ArchiveReader {
};

PackedStreamReader m_stream_reader;
FileReader m_table_metadata_file_reader;
ZstdDecompressor m_table_metadata_decompressor;
SchemaReader m_schema_reader;
std::shared_ptr<char[]> m_stream_buffer{};
Expand Down
Loading

0 comments on commit 8f00463

Please sign in to comment.