diff --git a/components/core/src/clp/BufferReader.cpp b/components/core/src/clp/BufferReader.cpp index b116b8080..cc27c0601 100644 --- a/components/core/src/clp/BufferReader.cpp +++ b/components/core/src/clp/BufferReader.cpp @@ -4,18 +4,21 @@ #include namespace clp { -BufferReader::BufferReader(char const* data, size_t data_size, size_t pos) { - if (nullptr == data) { +BufferReader::BufferReader(char const* data, size_t data_size, size_t pos) + : m_internal_buf{data}, + m_internal_buf_size{data_size}, + m_internal_buf_pos{pos} { + if (nullptr == data && (data_size != 0 || pos != 0)) { + throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); + } + if (pos > data_size) { throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); } - m_internal_buf = data; - m_internal_buf_size = data_size; - m_internal_buf_pos = pos; } auto BufferReader::peek_buffer(char const*& buf, size_t& peek_size) const -> void { peek_size = get_remaining_data_size(); - buf = m_internal_buf + m_internal_buf_pos; + buf = 0 == peek_size ? nullptr : m_internal_buf + m_internal_buf_pos; } auto BufferReader::try_read_to_delimiter( diff --git a/components/core/src/clp/BufferReader.hpp b/components/core/src/clp/BufferReader.hpp index 349697d34..4a64502ce 100644 --- a/components/core/src/clp/BufferReader.hpp +++ b/components/core/src/clp/BufferReader.hpp @@ -23,6 +23,8 @@ class BufferReader : public ReaderInterface { }; // Constructors + BufferReader() : BufferReader(nullptr, 0, 0) {} + BufferReader(char const* data, size_t data_size) : BufferReader(data, data_size, 0) {} BufferReader(char const* data, size_t data_size, size_t pos); diff --git a/components/core/src/clp/BufferedFileReader.cpp b/components/core/src/clp/BufferedFileReader.cpp index ad6636cef..7f5f8026b 100644 --- a/components/core/src/clp/BufferedFileReader.cpp +++ b/components/core/src/clp/BufferedFileReader.cpp @@ -1,159 +1,67 @@ #include "BufferedFileReader.hpp" -#include - +#include #include - -#include - +#include +#include +#include +#include +#include + +#include "BufferReader.hpp" +#include "ErrorCode.hpp" #include "math_utils.hpp" +#include "ReaderInterface.hpp" +using std::span; using std::string; +using std::unique_ptr; namespace clp { -namespace { -/** - * Reads from the given file descriptor - * @param fd - * @param buf - * @param num_bytes_to_read - * @param num_bytes_read - * @return ErrorCode_errno on error - * @return ErrorCode_EndOfFile on EOF - * @return ErrorCode_Success on success - */ -auto read_into_buffer(int fd, char* buf, size_t num_bytes_to_read, size_t& num_bytes_read) - -> ErrorCode; - -auto read_into_buffer(int fd, char* buf, size_t num_bytes_to_read, size_t& num_bytes_read) - -> ErrorCode { - num_bytes_read = 0; - while (true) { - auto const bytes_read = ::read(fd, buf, num_bytes_to_read); - if (0 == bytes_read) { - break; - } - if (bytes_read < 0) { - return ErrorCode_errno; - } - - buf += bytes_read; - num_bytes_read += bytes_read; - num_bytes_to_read -= bytes_read; - if (num_bytes_read == num_bytes_to_read) { - return ErrorCode_Success; - } - } - if (0 == num_bytes_read) { - return ErrorCode_EndOfFile; +BufferedFileReader::BufferedFileReader( + std::unique_ptr reader_interface, + size_t base_buffer_size +) + : m_reader(std::move(reader_interface)) { + if (nullptr == m_reader) { + throw OperationFailed( + ErrorCode_BadParam, + __FILENAME__, + __LINE__, + "reader_interface cannot be null" + ); } - return ErrorCode_Success; -} -} // namespace - -BufferedFileReader::BufferedFileReader(size_t base_buffer_size) { if (base_buffer_size % cMinBufferSize != 0) { throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); } m_base_buffer_size = base_buffer_size; m_buffer.resize(m_base_buffer_size); -} - -BufferedFileReader::~BufferedFileReader() { - close(); -} -auto BufferedFileReader::try_open(string const& path) -> ErrorCode { - // Cleanup in case caller forgot to call close before calling this function - close(); - - m_fd = ::open(path.c_str(), O_RDONLY); - if (-1 == m_fd) { - if (ENOENT == errno) { - return ErrorCode_FileNotFound; - } - return ErrorCode_errno; - } - m_path = path; - m_file_pos = 0; + m_pos = m_reader->get_pos(); m_buffer_begin_pos = 0; - m_buffer_reader.emplace(m_buffer.data(), 0); - m_highest_read_pos = 0; - return ErrorCode_Success; -} - -void BufferedFileReader::open(string const& path) { - auto const error_code = try_open(path); - if (ErrorCode_Success != error_code) { - if (ErrorCode_FileNotFound == error_code) { - throw OperationFailed( - error_code, - __FILENAME__, - __LINE__, - "File not found: " + boost::filesystem::weakly_canonical(path).string() - ); - } - throw OperationFailed(error_code, __FILENAME__, __LINE__); - } -} - -auto BufferedFileReader::close() -> void { - if (-1 == m_fd) { - return; - } - - if (m_checkpoint_pos.has_value()) { - m_buffer.resize(m_base_buffer_size); - m_checkpoint_pos.reset(); - } - - // NOTE: We don't check errors for close since, in the read case, it seems the only reason it - // could fail is if it was interrupted by a signal - ::close(m_fd); - m_fd = -1; + m_buffer_reader = BufferReader{m_buffer.data(), 0}; + m_highest_read_pos = m_pos; } auto BufferedFileReader::try_refill_buffer_if_empty() -> ErrorCode { - if (-1 == m_fd) { - return ErrorCode_NotInit; - } - if (m_buffer_reader->get_buffer_size() > 0) { + if (m_buffer_reader.get_buffer_size() > 0) { return ErrorCode_Success; } return refill_reader_buffer(m_base_buffer_size); } -void BufferedFileReader::refill_buffer_if_empty() { - auto error_code = try_refill_buffer_if_empty(); - if (ErrorCode_Success != error_code) { - throw OperationFailed(error_code, __FILENAME__, __LINE__); - } -} - -auto BufferedFileReader::try_peek_buffered_data(char const*& buf, size_t& peek_size) const - -> ErrorCode { - if (-1 == m_fd) { - return ErrorCode_NotInit; - } - m_buffer_reader->peek_buffer(buf, peek_size); - return ErrorCode_Success; -} - void BufferedFileReader::peek_buffered_data(char const*& buf, size_t& peek_size) const { - auto error_code = try_peek_buffered_data(buf, peek_size); - if (ErrorCode_Success != error_code) { - throw OperationFailed(error_code, __FILENAME__, __LINE__); - } + m_buffer_reader.peek_buffer(buf, peek_size); } auto BufferedFileReader::set_checkpoint() -> size_t { - if (m_checkpoint_pos.has_value() && m_checkpoint_pos < m_file_pos - && m_buffer_reader->get_buffer_size() != m_base_buffer_size) + if (m_checkpoint_pos.has_value() && m_checkpoint_pos < m_pos + && m_buffer_reader.get_buffer_size() != m_base_buffer_size) { drop_content_before_current_pos(); } - m_checkpoint_pos = m_file_pos; - return m_file_pos; + m_checkpoint_pos = m_pos; + return m_pos; } auto BufferedFileReader::clear_checkpoint() -> void { @@ -171,36 +79,31 @@ auto BufferedFileReader::clear_checkpoint() -> void { } auto BufferedFileReader::try_get_pos(size_t& pos) -> ErrorCode { - if (-1 == m_fd) { - return ErrorCode_NotInit; - } - pos = m_file_pos; + pos = m_pos; return ErrorCode_Success; } auto BufferedFileReader::try_seek_from_begin(size_t pos) -> ErrorCode { - if (-1 == m_fd) { - return ErrorCode_NotInit; - } - if (pos == m_file_pos) { + if (pos == m_pos) { return ErrorCode_Success; } - auto seek_lower_bound = m_checkpoint_pos.has_value() ? m_checkpoint_pos.value() : m_file_pos; + auto seek_lower_bound = m_checkpoint_pos.has_value() ? m_checkpoint_pos.value() : m_pos; if (pos < seek_lower_bound) { return ErrorCode_Unsupported; } - auto error_code = m_buffer_reader->try_seek_from_begin(get_buffer_relative_pos(pos)); + auto error_code = m_buffer_reader.try_seek_from_begin(get_buffer_relative_pos(pos)); if (ErrorCode_Truncated == error_code) { if (false == m_checkpoint_pos.has_value()) { // If checkpoint is not set, simply move the file_pos and invalidate // the buffer reader - auto offset = lseek(m_fd, static_cast(pos), SEEK_SET); - if (-1 == offset) { - return ErrorCode_errno; + if (auto const error_code = m_reader->try_seek_from_begin(pos); + error_code != ErrorCode_Success) + { + return error_code; } - m_buffer_reader.emplace(m_buffer.data(), 0); + m_buffer_reader = BufferReader{m_buffer.data(), 0}; m_buffer_begin_pos = pos; } else { auto const num_bytes_to_refill = pos - get_buffer_end_pos(); @@ -211,7 +114,7 @@ auto BufferedFileReader::try_seek_from_begin(size_t pos) -> ErrorCode { if (ErrorCode_Success != error_code) { return error_code; } - error_code = m_buffer_reader->try_seek_from_begin(get_buffer_relative_pos(pos)); + error_code = m_buffer_reader.try_seek_from_begin(get_buffer_relative_pos(pos)); if (ErrorCode_Success != error_code) { return error_code; } @@ -219,15 +122,12 @@ auto BufferedFileReader::try_seek_from_begin(size_t pos) -> ErrorCode { } else if (ErrorCode_Success != error_code) { return error_code; } - update_file_pos(pos); + update_pos(pos); return ErrorCode_Success; } auto BufferedFileReader::try_read(char* buf, size_t num_bytes_to_read, size_t& num_bytes_read) -> ErrorCode { - if (-1 == m_fd) { - return ErrorCode_NotInit; - } if (nullptr == buf) { return ErrorCode_BadParam; } @@ -235,31 +135,28 @@ auto BufferedFileReader::try_read(char* buf, size_t num_bytes_to_read, size_t& n return ErrorCode_Success; } + span dst_view{buf, num_bytes_to_read}; num_bytes_read = 0; - while (true) { + while (false == dst_view.empty()) { size_t bytes_read{0}; - auto error_code = m_buffer_reader->try_read(buf, num_bytes_to_read, bytes_read); + auto error_code = m_buffer_reader.try_read(dst_view.data(), dst_view.size(), bytes_read); if (ErrorCode_Success == error_code) { - buf += bytes_read; + dst_view = dst_view.subspan(bytes_read); num_bytes_read += bytes_read; - num_bytes_to_read -= bytes_read; - update_file_pos(m_file_pos + bytes_read); - if (0 == num_bytes_to_read) { + update_pos(m_pos + bytes_read); + } else if (ErrorCode_EndOfFile == error_code) { + error_code = refill_reader_buffer(m_base_buffer_size); + if (ErrorCode_EndOfFile == error_code) { break; } - } else if (ErrorCode_EndOfFile != error_code) { - return error_code; - } - - error_code = refill_reader_buffer(m_base_buffer_size); - if (ErrorCode_EndOfFile == error_code) { - break; - } - if (ErrorCode_Success != error_code) { + if (ErrorCode_Success != error_code) { + return error_code; + } + } else { return error_code; } } - if (0 == num_bytes_read) { + if (dst_view.size() == num_bytes_to_read) { return ErrorCode_EndOfFile; } return ErrorCode_Success; @@ -271,9 +168,6 @@ auto BufferedFileReader::try_read_to_delimiter( bool append, string& str ) -> ErrorCode { - if (-1 == m_fd) { - return ErrorCode_NotInit; - } if (false == append) { str.clear(); } @@ -281,7 +175,7 @@ auto BufferedFileReader::try_read_to_delimiter( size_t total_num_bytes_read{0}; while (true) { size_t num_bytes_read{0}; - if (auto ret_code = m_buffer_reader->try_read_to_delimiter( + if (auto ret_code = m_buffer_reader.try_read_to_delimiter( delim, keep_delimiter, str, @@ -292,7 +186,7 @@ auto BufferedFileReader::try_read_to_delimiter( { return ret_code; } - update_file_pos(m_file_pos + num_bytes_read); + update_pos(m_pos + num_bytes_read); total_num_bytes_read += num_bytes_read; if (found_delim) { break; @@ -314,7 +208,7 @@ auto BufferedFileReader::try_read_to_delimiter( auto BufferedFileReader::refill_reader_buffer(size_t num_bytes_to_refill) -> ErrorCode { auto const buffer_end_pos = get_buffer_end_pos(); - auto const data_size = m_buffer_reader->get_buffer_size(); + auto const data_size = m_buffer_reader.get_buffer_size(); auto const available_buffer_space = m_buffer.size() - data_size; size_t num_bytes_to_read{0}; @@ -342,31 +236,32 @@ auto BufferedFileReader::refill_reader_buffer(size_t num_bytes_to_refill) -> Err } size_t num_bytes_read{0}; - auto error_code - = read_into_buffer(m_fd, &m_buffer[next_buffer_pos], num_bytes_to_read, num_bytes_read); + auto const error_code + = m_reader->try_read(&m_buffer[next_buffer_pos], num_bytes_to_read, num_bytes_read); if (error_code != ErrorCode_Success && ErrorCode_EndOfFile != error_code) { return error_code; } // NOTE: We still want to set the buffer reader if no bytes were read on EOF - m_buffer_reader.emplace(m_buffer.data(), next_buffer_pos + num_bytes_read, next_buffer_pos); + m_buffer_reader + = BufferReader{m_buffer.data(), next_buffer_pos + num_bytes_read, next_buffer_pos}; m_buffer_begin_pos = next_buffer_begin_pos; return error_code; } auto BufferedFileReader::drop_content_before_current_pos() -> void { - auto buffer_reader_pos = m_buffer_reader->get_pos(); - auto const new_data_size = m_buffer_reader->get_buffer_size() - buffer_reader_pos; + auto const buffer_reader_pos = m_buffer_reader.get_pos(); + auto const new_data_size = m_buffer_reader.get_buffer_size() - buffer_reader_pos; auto const new_buffer_size = int_round_up_to_multiple(new_data_size, m_base_buffer_size); - m_buffer.erase(m_buffer.begin(), m_buffer.begin() + static_cast(buffer_reader_pos)); + m_buffer.erase(m_buffer.begin(), m_buffer.begin() + buffer_reader_pos); m_buffer.resize(new_buffer_size); m_buffer_begin_pos += buffer_reader_pos; - m_buffer_reader.emplace(m_buffer.data(), new_data_size); + m_buffer_reader = BufferReader{m_buffer.data(), new_data_size}; } -auto BufferedFileReader::update_file_pos(size_t pos) -> void { - m_file_pos = pos; - m_highest_read_pos = std::max(m_file_pos, m_highest_read_pos); +auto BufferedFileReader::update_pos(size_t pos) -> void { + m_pos = pos; + m_highest_read_pos = std::max(m_pos, m_highest_read_pos); } } // namespace clp diff --git a/components/core/src/clp/BufferedFileReader.hpp b/components/core/src/clp/BufferedFileReader.hpp index dd55e658e..f3e50cd1d 100644 --- a/components/core/src/clp/BufferedFileReader.hpp +++ b/components/core/src/clp/BufferedFileReader.hpp @@ -1,23 +1,24 @@ #ifndef CLP_BUFFEREDFILEREADER_HPP #define CLP_BUFFEREDFILEREADER_HPP -#include +#include #include #include #include +#include #include #include "BufferReader.hpp" -#include "Defs.h" #include "ErrorCode.hpp" #include "ReaderInterface.hpp" #include "TraceableException.hpp" namespace clp { /** - * Class for performing buffered (in memory) reads from an on-disk file with control over when and - * how much data is buffered. This allows us to support use cases where we want to perform unordered - * reads from files which only support sequential access (e.g. files from block storage like S3). + * Class for performing buffered (in memory) reads from another ReaderInterface with control over + * when and how much data is buffered. This allows us to support use cases where we want to perform + * unordered reads from input which only support sequential access (e.g. files from block storage + * like S3). * * To control how much data is buffered, we allow callers to set a checkpoint such that all reads * and seeks past the checkpoint will be buffered until the checkpoint is cleared. This allows @@ -25,7 +26,7 @@ namespace clp { * When no checkpoint is set, we maintain a fixed-size buffer. * * NOTE 1: Unless otherwise noted, the "file position" mentioned in docstrings is the position in - * the buffered file, not the position in the on-disk file. + * the buffered file, not the position in the original input file. * * NOTE 2: This class restricts the buffer size to a multiple of the page size and we avoid reading * anything less than a page to avoid multiple page faults. @@ -70,14 +71,20 @@ class BufferedFileReader : public ReaderInterface { // Constructors /** + * @param reader_interface * @param base_buffer_size The size for the fixed-size buffer used when no checkpoint is set. It * must be a multiple of BufferedFileReader::cMinBufferSize. */ - explicit BufferedFileReader(size_t base_buffer_size); + explicit BufferedFileReader( + std::unique_ptr reader_interface, + size_t base_buffer_size + ); - BufferedFileReader() : BufferedFileReader(cDefaultBufferSize) {} + explicit BufferedFileReader(std::unique_ptr reader_interface) + : BufferedFileReader{std::move(reader_interface), cDefaultBufferSize} {} - ~BufferedFileReader(); + // Destructor + ~BufferedFileReader() override = default; // Disable copy/move construction/assignment BufferedFileReader(BufferedFileReader const&) = delete; @@ -86,50 +93,13 @@ class BufferedFileReader : public ReaderInterface { auto operator=(BufferedFileReader&&) -> BufferedFileReader& = delete; // Methods - /** - * Tries to open a file - * @param path - * @return ErrorCode_Success on success - * @return ErrorCode_FileNotFound if the file was not found - * @return ErrorCode_errno otherwise - */ - [[nodiscard]] auto try_open(std::string const& path) -> ErrorCode; - - auto open(std::string const& path) -> void; - - /** - * Closes the file if it's open - */ - auto close() -> void; - - [[nodiscard]] auto get_path() const -> std::string const& { return m_path; } - /** * Tries to fill the internal buffer if it's empty - * @return ErrorCode_NotInit if the file is not opened - * @return ErrorCode_errno on error reading from the underlying file - * @return ErrorCode_EndOfFile on EOF + * @return Same as refill_reader_buffer if it fails * @return ErrorCode_Success on success */ [[nodiscard]] auto try_refill_buffer_if_empty() -> ErrorCode; - /** - * Fills the internal buffer if it's empty - */ - void refill_buffer_if_empty(); - - /** - * Tries to peek the remaining buffered content without advancing the read head. - * - * NOTE: Any subsequent read or seek operations may invalidate the returned buffer. - * @param buf Returns a pointer to the remaining content in the buffer - * @param peek_size Returns the size of the remaining content in the buffer - * @return ErrorCode_NotInit if the file is not opened - * @return ErrorCode_Success on success - */ - [[nodiscard]] auto - try_peek_buffered_data(char const*& buf, size_t& peek_size) const -> ErrorCode; - /** * Peeks the remaining buffered content without advancing the read head. * @@ -158,7 +128,6 @@ class BufferedFileReader : public ReaderInterface { // Methods implementing the ReaderInterface /** * @param pos Returns the position of the read head in the file - * @return ErrorCode_NotInit if the file isn't open * @return ErrorCode_Success on success */ [[nodiscard]] auto try_get_pos(size_t& pos) -> ErrorCode override; @@ -168,14 +137,13 @@ class BufferedFileReader : public ReaderInterface { * is set, callers can only seek forwards in the file; When a checkpoint is set, callers can * seek to any position in the file that's after and including the checkpoint. * @param pos - * @return ErrorCode_NotInit if the file isn't open * @return ErrorCode_Unsupported if a checkpoint is set and the requested position is less than * the checkpoint, or no checkpoint is set and the requested position is less the current read * head's position. * @return ErrorCode_Truncated if we reached the end of the file before we reached the given * position - * @return ErrorCode_errno on error reading from the underlying file * @return Same as BufferReader::try_seek_from_begin if it fails + * @return Same as ReaderInterface::try_seek_from_begin if it fails * @return ErrorCode_Success on success */ [[nodiscard]] auto try_seek_from_begin(size_t pos) -> ErrorCode override; @@ -185,10 +153,9 @@ class BufferedFileReader : public ReaderInterface { * @param buf * @param num_bytes_to_read The number of bytes to try and read * @param num_bytes_read The actual number of bytes read - * @return ErrorCode_NotInit if the file is not open * @return ErrorCode_BadParam if buf is null - * @return ErrorCode_errno on error reading from the underlying file * @return ErrorCode_EndOfFile on EOF + * @return Same as BufferReader::try_read if it fails * @return ErrorCode_Success on success */ [[nodiscard]] auto @@ -200,10 +167,9 @@ class BufferedFileReader : public ReaderInterface { * @param keep_delimiter Whether to include the delimiter in the output string * @param append Whether to append to the given string or replace its contents * @param str Returns the content read - * @return ErrorCode_NotInit if the file is not open * @return ErrorCode_EndOfFile on EOF - * @return ErrorCode_errno on error reading from the underlying file * @return Same as BufferReader::try_read_to_delimiter if it fails + * @return Same as refill_reader_buffer if it fails * @return ErrorCode_Success on success */ [[nodiscard]] auto try_read_to_delimiter( @@ -220,8 +186,8 @@ class BufferedFileReader : public ReaderInterface { * * NOTE: Callers must ensure the current buffer has been exhausted before calling this method * (i.e., the read head is at the end of the buffer). - * @param refill_size - * @return Same as read_into_buffer + * @param num_bytes_to_refill + * @return Same as ReaderInterface::try_read */ [[nodiscard]] auto refill_reader_buffer(size_t num_bytes_to_refill) -> ErrorCode; @@ -232,30 +198,29 @@ class BufferedFileReader : public ReaderInterface { /** * @param file_pos - * @return \p file_pos relative to the beginning of the buffer + * @return file_pos relative to the beginning of the buffer */ [[nodiscard]] auto get_buffer_relative_pos(size_t file_pos) const -> size_t { return file_pos - m_buffer_begin_pos; } [[nodiscard]] auto get_buffer_end_pos() const -> size_t { - return m_buffer_begin_pos + m_buffer_reader->get_buffer_size(); + return m_buffer_begin_pos + m_buffer_reader.get_buffer_size(); } - auto update_file_pos(size_t pos) -> void; + auto update_pos(size_t pos) -> void; // Constants static constexpr size_t cDefaultBufferSize = (16 * cMinBufferSize); // Variables - int m_fd{-1}; - std::string m_path; - size_t m_file_pos{0}; + size_t m_pos{0}; + std::unique_ptr m_reader; // Buffer specific data std::vector m_buffer; size_t m_base_buffer_size; - std::optional m_buffer_reader; + BufferReader m_buffer_reader; size_t m_buffer_begin_pos{0}; // Variables for checkpoint support diff --git a/components/core/src/clp/clp/FileCompressor.cpp b/components/core/src/clp/clp/FileCompressor.cpp index 9898602cc..c0842c13f 100644 --- a/components/core/src/clp/clp/FileCompressor.cpp +++ b/components/core/src/clp/clp/FileCompressor.cpp @@ -2,7 +2,9 @@ #include #include +#include #include +#include #include #include @@ -32,8 +34,11 @@ using log_surgeon::Reader; using log_surgeon::ReaderParser; using std::cout; using std::endl; +using std::make_unique; +using std::move; using std::set; using std::string; +using std::unique_ptr; using std::vector; // Local prototypes @@ -116,15 +121,15 @@ bool FileCompressor::compress_file( streaming_archive::writer::Archive& archive_writer, bool use_heuristic ) { - std::string file_name = std::filesystem::canonical(file_to_compress.get_path()).string(); + string file_name = std::filesystem::canonical(file_to_compress.get_path()).string(); PROFILER_SPDLOG_INFO("Start parsing {}", file_name) Profiler::start_continuous_measurement(); - m_file_reader.open(file_to_compress.get_path()); + BufferedFileReader buffered_file_reader{make_unique(file_to_compress.get_path())}; // Check that file is UTF-8 encoded - if (auto error_code = m_file_reader.try_refill_buffer_if_empty(); + if (auto error_code = buffered_file_reader.try_refill_buffer_if_empty(); ErrorCode_Success != error_code && ErrorCode_EndOfFile != error_code) { if (ErrorCode_errno == error_code) { @@ -144,31 +149,20 @@ bool FileCompressor::compress_file( } char const* utf8_validation_buf{nullptr}; size_t peek_size{0}; - m_file_reader.peek_buffered_data(utf8_validation_buf, peek_size); + buffered_file_reader.peek_buffered_data(utf8_validation_buf, peek_size); bool succeeded = true; auto const utf8_validation_buf_len = std::min(peek_size, cUtfMaxValidationLen); if (is_utf8_encoded({utf8_validation_buf, utf8_validation_buf_len})) { - if (use_heuristic) { - parse_and_encode_with_heuristic( - target_data_size_of_dicts, - archive_user_config, - target_encoded_file_size, - file_to_compress.get_path_for_compression(), - file_to_compress.get_group_id(), - archive_writer, - m_file_reader - ); - } else { - parse_and_encode_with_library( - target_data_size_of_dicts, - archive_user_config, - target_encoded_file_size, - file_to_compress.get_path_for_compression(), - file_to_compress.get_group_id(), - archive_writer, - m_file_reader - ); - } + parse_and_encode( + target_data_size_of_dicts, + archive_user_config, + target_encoded_file_size, + file_to_compress.get_path_for_compression(), + file_to_compress.get_group_id(), + archive_writer, + buffered_file_reader, + use_heuristic + ); } else { if (false == try_compressing_as_archive( @@ -177,6 +171,7 @@ bool FileCompressor::compress_file( target_encoded_file_size, file_to_compress, archive_writer, + buffered_file_reader, use_heuristic )) { @@ -184,8 +179,6 @@ bool FileCompressor::compress_file( } } - m_file_reader.close(); - Profiler::stop_continuous_measurement(); LOG_CONTINUOUS_MEASUREMENT(Profiler::ContinuousMeasurementIndex::ParseLogFile) PROFILER_SPDLOG_INFO("Done parsing {}", file_name) @@ -193,6 +186,39 @@ bool FileCompressor::compress_file( return succeeded; } +auto FileCompressor::parse_and_encode( + size_t target_data_size_of_dicts, + streaming_archive::writer::Archive::UserConfig& archive_user_config, + size_t target_encoded_file_size, + string const& path_for_compression, + group_id_t group_id, + streaming_archive::writer::Archive& archive_writer, + ReaderInterface& reader, + bool use_heuristic +) -> void { + if (use_heuristic) { + parse_and_encode_with_heuristic( + target_data_size_of_dicts, + archive_user_config, + target_encoded_file_size, + path_for_compression, + group_id, + archive_writer, + reader + ); + } else { + parse_and_encode_with_library( + target_data_size_of_dicts, + archive_user_config, + target_encoded_file_size, + path_for_compression, + group_id, + archive_writer, + reader + ); + } +} + void FileCompressor::parse_and_encode_with_library( size_t target_data_size_of_dicts, streaming_archive::writer::Archive::UserConfig& archive_user_config, @@ -274,6 +300,7 @@ bool FileCompressor::try_compressing_as_archive( size_t target_encoded_file_size, FileToCompress const& file_to_compress, streaming_archive::writer::Archive& archive_writer, + ReaderInterface& file_reader, bool use_heuristic ) { auto file_boost_path = boost::filesystem::path(file_to_compress.get_path_for_compression()); @@ -281,7 +308,7 @@ bool FileCompressor::try_compressing_as_archive( // Determine path without extension (used if file is a single compressed file, e.g., syslog.gz // -> syslog) - std::string filename_if_compressed; + string filename_if_compressed; if (file_boost_path.has_stem()) { filename_if_compressed = file_boost_path.stem().string(); } else { @@ -289,7 +316,7 @@ bool FileCompressor::try_compressing_as_archive( } // Check if it's an archive - auto error_code = m_libarchive_reader.try_open(m_file_reader, filename_if_compressed); + auto error_code = m_libarchive_reader.try_open(file_reader, filename_if_compressed); if (ErrorCode_Success != error_code) { SPDLOG_ERROR( "Cannot compress {} - failed to open with libarchive.", @@ -363,27 +390,16 @@ bool FileCompressor::try_compressing_as_archive( auto const utf8_validation_buf_len = std::min(peek_size, cUtfMaxValidationLen); if (is_utf8_encoded({utf8_validation_buf, utf8_validation_buf_len})) { auto boost_path_for_compression = parent_boost_path / file_path; - if (use_heuristic) { - parse_and_encode_with_heuristic( - target_data_size_of_dicts, - archive_user_config, - target_encoded_file_size, - boost_path_for_compression.string(), - file_to_compress.get_group_id(), - archive_writer, - m_libarchive_file_reader - ); - } else { - parse_and_encode_with_library( - target_data_size_of_dicts, - archive_user_config, - target_encoded_file_size, - boost_path_for_compression.string(), - file_to_compress.get_group_id(), - archive_writer, - m_libarchive_file_reader - ); - } + parse_and_encode( + target_data_size_of_dicts, + archive_user_config, + target_encoded_file_size, + boost_path_for_compression.string(), + file_to_compress.get_group_id(), + archive_writer, + m_libarchive_file_reader, + use_heuristic + ); } else if (has_ir_stream_magic_number({utf8_validation_buf, peek_size})) { // Remove .clp suffix if found static constexpr char cIrStreamExtension[] = ".clp"; diff --git a/components/core/src/clp/clp/FileCompressor.hpp b/components/core/src/clp/clp/FileCompressor.hpp index b8b6c55fd..b1b286492 100644 --- a/components/core/src/clp/clp/FileCompressor.hpp +++ b/components/core/src/clp/clp/FileCompressor.hpp @@ -55,7 +55,7 @@ class FileCompressor { // Methods /** - * Parses and encodes content from the given reader into the given archive_writer + * Parses and encodes content from the given reader into the given archive_writer. * @param target_data_size_of_dicts * @param archive_user_config * @param target_encoded_file_size @@ -63,7 +63,19 @@ class FileCompressor { * @param group_id * @param archive_writer * @param reader + * @param use_heuristic */ + auto parse_and_encode( + size_t target_data_size_of_dicts, + streaming_archive::writer::Archive::UserConfig& archive_user_config, + size_t target_encoded_file_size, + std::string const& path_for_compression, + group_id_t group_id, + streaming_archive::writer::Archive& archive_writer, + ReaderInterface& reader, + bool use_heuristic + ) -> void; + void parse_and_encode_with_library( size_t target_data_size_of_dicts, streaming_archive::writer::Archive::UserConfig& archive_user_config, @@ -91,6 +103,7 @@ class FileCompressor { * @param target_encoded_file_size * @param file_to_compress * @param archive_writer + * @param file_reader * @param use_heuristic * @return true if all files were compressed successfully, false otherwise */ @@ -100,6 +113,7 @@ class FileCompressor { size_t target_encoded_file_size, FileToCompress const& file_to_compress, streaming_archive::writer::Archive& archive_writer, + ReaderInterface& file_reader, bool use_heuristic ); @@ -150,7 +164,6 @@ class FileCompressor { // Variables boost::uuids::random_generator& m_uuid_generator; - BufferedFileReader m_file_reader; LibarchiveReader m_libarchive_reader; LibarchiveFileReader m_libarchive_file_reader; MessageParser m_message_parser; diff --git a/components/core/tests/test-BufferedFileReader.cpp b/components/core/tests/test-BufferedFileReader.cpp index f182a777c..2e77790bc 100644 --- a/components/core/tests/test-BufferedFileReader.cpp +++ b/components/core/tests/test-BufferedFileReader.cpp @@ -4,38 +4,43 @@ #include #include "../src/clp/BufferedFileReader.hpp" +#include "../src/clp/FileDescriptorReader.hpp" #include "../src/clp/FileReader.hpp" #include "../src/clp/FileWriter.hpp" using clp::BufferedFileReader; +using clp::ErrorCode; using clp::ErrorCode_EndOfFile; using clp::ErrorCode_Success; using clp::ErrorCode_Unsupported; +using clp::FileDescriptorReader; +using clp::FileReader; using clp::FileWriter; +using std::make_unique; +using std::string; static constexpr size_t cNumAlphabets = 'z' - 'a'; TEST_CASE("Test reading data", "[BufferedFileReader]") { // Initialize data for testing size_t const test_data_size = 4L * 1024 * 1024 + 1; // 4MB + 1 - auto test_data_uniq_ptr = std::make_unique>(); + auto test_data_uniq_ptr = make_unique>(); auto& test_data = *test_data_uniq_ptr; for (size_t i = 0; i < test_data.size(); ++i) { test_data[i] = static_cast('a' + (i % (cNumAlphabets))); } - std::string const test_file_path{"BufferedFileReader.test"}; + string const test_file_path{"BufferedFileReader.test"}; // Write to test file FileWriter file_writer; file_writer.open(test_file_path, FileWriter::OpenMode::CREATE_FOR_WRITING); file_writer.write(test_data.cbegin(), test_data_size); file_writer.close(); - auto read_buf_uniq_ptr = std::make_unique>(); + auto read_buf_uniq_ptr = make_unique>(); auto& read_buf = *read_buf_uniq_ptr; size_t const base_buffer_size = BufferedFileReader::cMinBufferSize << 4; - BufferedFileReader reader{base_buffer_size}; - reader.open(test_file_path); + BufferedFileReader reader{make_unique(test_file_path), base_buffer_size}; size_t num_bytes_read{0}; size_t buf_pos{0}; @@ -253,44 +258,58 @@ TEST_CASE("Test reading data", "[BufferedFileReader]") { REQUIRE(reader.get_pos() == buf_pos); } - reader.close(); boost::filesystem::remove(test_file_path); } TEST_CASE("Test delimiter", "[BufferedFileReader]") { + // Initialize random number generator + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> uniformly_distributed_alphabet('a', 'a' + cNumAlphabets - 1); + // Initialize data for testing size_t const test_data_size = 1L * 1024 * 1024 + 1; // 1MB - auto test_data_uniq_ptr = std::make_unique>(); + auto test_data_uniq_ptr = make_unique>(); auto& test_data = *test_data_uniq_ptr; + for (size_t i = 0; i < test_data.size(); ++i) { - test_data[i] = static_cast('a' + (std::rand() % (cNumAlphabets))); + test_data[i] = static_cast(uniformly_distributed_alphabet(gen)); } // Write to test file - std::string const test_file_path{"BufferedFileReader.delimiter.test"}; + string const test_file_path{"BufferedFileReader.delimiter.test"}; FileWriter file_writer; file_writer.open(test_file_path, FileWriter::OpenMode::CREATE_FOR_WRITING); file_writer.write(test_data.data(), test_data_size); file_writer.close(); - BufferedFileReader file_reader; - file_reader.open(test_file_path); - std::string test_string; + size_t const reader_begin_offset = GENERATE(0, 127); + + // Instantiate BufferedFileReader and the reference FileReader from a non-zero pos + auto fd_reader = make_unique(test_file_path); + fd_reader->seek_from_begin(reader_begin_offset); + BufferedFileReader buffered_file_reader{std::move(fd_reader)}; + string test_string; + + FileReader ref_file_reader{test_file_path}; + ref_file_reader.seek_from_begin(reader_begin_offset); + string ref_string; - clp::FileReader ref_file_reader{test_file_path}; - std::string ref_string; + // Validate a clearing a checkpoint without any reading wouldn't change the beginning offset + buffered_file_reader.clear_checkpoint(); + REQUIRE(reader_begin_offset == buffered_file_reader.get_pos()); // Validate that a FileReader and a BufferedFileReader return the same strings (split by // delimiters) - clp::ErrorCode error_code{ErrorCode_Success}; - auto delimiter = (char)('a' + (std::rand() % (cNumAlphabets))); + ErrorCode error_code{ErrorCode_Success}; + auto delimiter = static_cast(uniformly_distributed_alphabet(gen)); while (ErrorCode_EndOfFile != error_code) { error_code = ref_file_reader.try_read_to_delimiter(delimiter, true, false, ref_string); - auto error_code2 = file_reader.try_read_to_delimiter(delimiter, true, false, test_string); + auto const error_code2 + = buffered_file_reader.try_read_to_delimiter(delimiter, true, false, test_string); REQUIRE(error_code2 == error_code); REQUIRE(test_string == ref_string); } - file_reader.close(); boost::filesystem::remove(test_file_path); }