diff --git a/dbms/src/IO/Compression/ALP/Analyze.h b/dbms/src/IO/Compression/ALP/Analyze.h new file mode 100644 index 00000000000..4775b053b68 --- /dev/null +++ b/dbms/src/IO/Compression/ALP/Analyze.h @@ -0,0 +1,57 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include + +namespace DB::ALP +{ + +template +class Analyze +{ +public: + using State = CompressionState; + + static void run(const std::span & vectors, size_t n_samples, State & state) + { + std::vector> vectors_sampled; + vectors_sampled.reserve(n_samples); + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, vectors.size() - 1); + + // Sample vectors. + // Each vector is a sample of SAMPLES_PER_VECTOR randomly selected values. + for (size_t i = 0; i < n_samples; ++i) + { + std::vector sample; + sample.reserve(Constants::SAMPLES_PER_VECTOR); + for (size_t j = 0; j < Constants::SAMPLES_PER_VECTOR; ++j) + { + auto random_idx = dis(gen); + sample.push_back(vectors[random_idx]); + } + vectors_sampled.emplace_back(std::move(sample)); + } + Compression::findTopKCombinations(vectors_sampled, state); + } +}; + +}; // namespace DB::ALP diff --git a/dbms/src/IO/Compression/ALP/Compression.h b/dbms/src/IO/Compression/ALP/Compression.h new file mode 100644 index 00000000000..fe65eb12284 --- /dev/null +++ b/dbms/src/IO/Compression/ALP/Compression.h @@ -0,0 +1,390 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB::ALP +{ + +template +struct Compression +{ + using State = CompressionState; + static constexpr UInt8 EXACT_TYPE_BITSIZE = sizeof(T) * 8; + + /* + * Check for special values which are impossible for ALP to encode + * because they cannot be cast to int64 without an undefined behaviour + */ + static bool isImpossibleToEncode(T n) + { + return std::isinf(n) || std::isnan(n) || n > Constants::ENCODING_UPPER_LIMIT + || n < Constants::ENCODING_LOWER_LIMIT || (n == 0.0 && std::signbit(n)); //! Verification for -0.0 + } + + /* + * Conversion from a Floating-Point number to Int64 without rounding + */ + static Int64 numberToInt64(T n) + { + if (isImpossibleToEncode(n)) + { + return static_cast(Constants::ENCODING_UPPER_LIMIT); + } + n = n + TypedConstants::MAGIC_NUMBER - TypedConstants::MAGIC_NUMBER; + return static_cast(n); + } + + /* + * Encoding a single value with ALP + */ + static Int64 encodeValue(T value, EncodingIndices encoding_indices) + { + T tmp_encoded_value = value * TypedConstants::EXP_ARR[encoding_indices.exponent] + * TypedConstants::FRAC_ARR[encoding_indices.factor]; + Int64 encoded_value = numberToInt64(tmp_encoded_value); + return encoded_value; + } + + /* + * Decoding a single value with ALP + */ + static T decodeValue(Int64 encoded_value, EncodingIndices encoding_indices) + { + //! The cast to T is needed to prevent a signed integer overflow + T decoded_value = static_cast(encoded_value) * Constants::FACT_ARR[encoding_indices.factor] + * TypedConstants::FRAC_ARR[encoding_indices.exponent]; + return decoded_value; + } + + /* + * Return TRUE if c1 is a better combination than c2 + * First criteria is number of times it appears as best combination + * Second criteria is the estimated compression size + * Third criteria is bigger exponent + * Fourth criteria is bigger factor + */ + static bool compareCombinations(const Combination & c1, const Combination & c2) + { + return (c1.n_appearances > c2.n_appearances) + || (c1.n_appearances == c2.n_appearances && (c1.estimated_compression_size < c2.estimated_compression_size)) + || ((c1.n_appearances == c2.n_appearances && c1.estimated_compression_size == c2.estimated_compression_size) + && (c2.encoding_indices.exponent < c1.encoding_indices.exponent)) + || ((c1.n_appearances == c2.n_appearances && c1.estimated_compression_size == c2.estimated_compression_size + && c2.encoding_indices.exponent == c1.encoding_indices.exponent) + && (c2.encoding_indices.factor < c1.encoding_indices.factor)); + } + + /* + * Dry compress a vector (ideally a sample) to estimate ALP compression size given a exponent and factor + */ + template + static UInt64 dryCompressToEstimateSize(const std::vector & input_vector, EncodingIndices encoding_indices) + { + size_t n_values = input_vector.size(); + size_t exceptions_count = 0; + size_t non_exceptions_count = 0; + UInt32 estimated_bits_per_value = 0; + UInt64 estimated_compression_size = 0; + Int64 max_encoded_value = std::numeric_limits::min(); + Int64 min_encoded_value = std::numeric_limits::max(); + + for (const T & value : input_vector) + { + Int64 encoded_value = encodeValue(value, encoding_indices); + T decoded_value = decodeValue(encoded_value, encoding_indices); + if (decoded_value == value) + { + non_exceptions_count++; + max_encoded_value = std::max(encoded_value, max_encoded_value); + min_encoded_value = std::min(encoded_value, min_encoded_value); + continue; + } + exceptions_count++; + } + + // We penalize combinations which yields to almost all exceptions + if (PENALIZE_EXCEPTIONS && non_exceptions_count < 2) + { + return std::numeric_limits::max(); + } + + // Evaluate factor/exponent compression size (we optimize for FOR) + UInt64 delta = (static_cast(max_encoded_value) - static_cast(min_encoded_value)); + estimated_bits_per_value = static_cast(std::ceil(std::log2(delta + 1))); + estimated_compression_size += n_values * estimated_bits_per_value; + estimated_compression_size + += exceptions_count * (EXACT_TYPE_BITSIZE + (Constants::EXCEPTION_POSITION_SIZE * 8)); + return estimated_compression_size; + } + + /* + * Find the best combinations of factor-exponent from each vector sampled from a rowgroup + * This function is called once per segment + * This operates over ALP first level samples + */ + static void findTopKCombinations(const std::vector> & vectors_sampled, State & state) + { + state.resetCombinations(); + + std::unordered_map + best_k_combinations_hash; + // For each vector sampled + for (auto & sampled_vector : vectors_sampled) + { + size_t n_samples = sampled_vector.size(); + EncodingIndices best_encoding_indices = {TypedConstants::MAX_EXPONENT, TypedConstants::MAX_EXPONENT}; + + //! We start our optimization with the worst possible total bits obtained from compression + size_t best_total_bits = (n_samples * (EXACT_TYPE_BITSIZE + Constants::EXCEPTION_POSITION_SIZE * 8)) + + (n_samples * EXACT_TYPE_BITSIZE); + + // N of appearances is irrelevant at this phase; we search for the best compression for the vector + Combination best_combination = {best_encoding_indices, 0, best_total_bits}; + //! We try all combinations in search for the one which minimize the compression size + for (int8_t exp_idx = TypedConstants::MAX_EXPONENT; exp_idx >= 0; exp_idx--) + { + for (int8_t factor_idx = exp_idx; factor_idx >= 0; factor_idx--) + { + EncodingIndices current_encoding_indices + = {static_cast(exp_idx), static_cast(factor_idx)}; + UInt64 estimated_compression_size + = dryCompressToEstimateSize(sampled_vector, current_encoding_indices); + Combination current_combination = {current_encoding_indices, 0, estimated_compression_size}; + if (compareCombinations(current_combination, best_combination)) + { + best_combination = current_combination; + } + } + } + best_k_combinations_hash[best_combination.encoding_indices]++; + } + + // Convert our hash to a Combination vector to be able to sort + // Note that this vector is always small (< 10 combinations) + std::vector best_k_combinations; + best_k_combinations.reserve(best_k_combinations_hash.size()); + for (auto const & combination : best_k_combinations_hash) + { + best_k_combinations.emplace_back( + combination.first, // Encoding Indices + combination.second, // N of times it appeared (hash value) + 0 // Compression size is irrelevant at this phase since we compare combinations from different vectors + ); + } + std::sort(best_k_combinations.begin(), best_k_combinations.end(), compareCombinations); + + // Save k' best combinations + for (size_t i = 0; i < std::min(Constants::MAX_COMBINATIONS, static_cast(best_k_combinations.size())); + ++i) + { + state.best_k_combinations.push_back(best_k_combinations[i]); + } + } + + /* + * Find the best combination of factor-exponent for a vector from within the best k combinations + * This is ALP second level sampling + */ + static void findBestFactorAndExponent(const T * input_vector, size_t n_values, State & state) + { + //! We sample equidistant values within a vector; to do this we skip a fixed number of values + std::vector vector_sample; + auto idx_increments = std::max( + 1, + static_cast(std::ceil(static_cast(n_values) / Constants::SAMPLES_PER_VECTOR))); + for (size_t i = 0; i < n_values; i += idx_increments) + { + vector_sample.push_back(input_vector[i]); + } + + EncodingIndices best_encoding_indices = {0, 0}; + UInt64 best_total_bits = std::numeric_limits::max(); + size_t worse_total_bits_counter = 0; + + //! We try each K combination in search for the one which minimize the compression size in the vector + for (auto & combination : state.best_k_combinations) + { + UInt64 estimated_compression_size + = dryCompressToEstimateSize(vector_sample, combination.encoding_indices); + + // If current compression size is worse (higher) or equal than the current best combination + if (estimated_compression_size >= best_total_bits) + { + worse_total_bits_counter += 1; + // Early exit strategy + if (worse_total_bits_counter == Constants::SAMPLING_EARLY_EXIT_THRESHOLD) + { + break; + } + continue; + } + // Otherwise we replace the best and continue trying with the next combination + best_total_bits = estimated_compression_size; + best_encoding_indices = combination.encoding_indices; + worse_total_bits_counter = 0; + } + state.vector_encoding_indices = best_encoding_indices; + } + + /* + * ALP Compress + */ + static void compress( + const T * input_vector, + size_t n_values, + const UInt16 * vector_null_positions, + size_t nulls_count, + State & state) + { + if (state.best_k_combinations.size() > 1) + { + findBestFactorAndExponent(input_vector, n_values, state); + } + else + { + state.vector_encoding_indices = state.best_k_combinations[0].encoding_indices; + } + + state.encoded_integers.resize(n_values); + state.exceptions.reserve(n_values); + state.exceptions_positions.reserve(n_values); + + // Encoding Floating-Point to Int64 + //! We encode all the values regardless of their correctness to recover the original floating-point + UInt64 min_encoded_value = std::numeric_limits::max(); + for (size_t i = 0; i < n_values; ++i) + { + T actual_value = input_vector[i]; + Int64 encoded_value = encodeValue(actual_value, state.vector_encoding_indices); + T decoded_value = decodeValue(encoded_value, state.vector_encoding_indices); + state.encoded_integers[i] = encoded_value; + //! We detect exceptions using a predicated comparison + if (decoded_value == actual_value) + { + min_encoded_value = std::min(min_encoded_value, static_cast(encoded_value)); + continue; + } + state.exceptions_positions.push_back(i); + } + + // Replacing exceptions with the minimum encoded value + for (size_t i = 0; i < state.exceptions_positions.size(); ++i) + { + size_t exception_pos = state.exceptions_positions[i]; + T actual_value = input_vector[exception_pos]; + state.encoded_integers[exception_pos] = min_encoded_value; + state.exceptions.push_back(actual_value); + } + + // Replacing nulls with that the non-exception-value + for (size_t i = 0; i < nulls_count; i++) + { + UInt16 null_value_pos = vector_null_positions[i]; + state.encoded_integers[null_value_pos] = min_encoded_value; + } + + // Analyze FFOR + auto minmax = std::minmax_element(state.encoded_integers.begin(), state.encoded_integers.end()); + auto min_value = *minmax.first; + auto max_value = *minmax.second; + UInt64 min_max_diff = (static_cast(max_value) - static_cast(min_value)); + + auto * u_encoded_integers = reinterpret_cast(state.encoded_integers.data()); + + // Subtract FOR + if (!EMPTY) + { //! We only execute the FOR if we are writing the data + for (size_t i = 0; i < n_values; ++i) + { + u_encoded_integers[i] -= static_cast(min_value); + } + } + + auto bit_width = BitpackingPrimitives::minimumBitWidth(min_max_diff); + auto bp_size = BitpackingPrimitives::getRequiredSize(n_values, bit_width); + if (!EMPTY && bit_width > 0) + { //! We only execute the BP if we are writing the data + state.values_encoded.reserve(bp_size); + BitpackingPrimitives::packBuffer( + state.values_encoded.data(), + u_encoded_integers, + n_values, + bit_width); + } + state.bit_width = bit_width; // in bits + state.bp_size = bp_size; // in bytes + state.frame_of_reference = static_cast(min_value); // understood this can be negative + } + + /* + * Overload without specifying nulls + */ + static void compress(const T * input_vector, size_t n_values, State & state) + { + compress(input_vector, n_values, nullptr, 0, state); + } +}; + +template +struct Decompression +{ + static void decompress( + UInt8 * for_encoded, + T * output, + size_t count, + UInt8 vector_factor, + UInt8 vector_exponent, + UInt16 exceptions_count, + T * exceptions, + const UInt16 * exceptions_positions, + UInt64 frame_of_reference, + UInt8 bit_width) + { + EncodingIndices encoding_indices = {vector_exponent, vector_factor}; + + // Bit Unpacking + auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count); + std::vector for_decoded(round_size * sizeof(T), 0); + if (bit_width > 0) + { + BitpackingPrimitives::unPackBuffer(for_decoded.data(), for_encoded, count, bit_width); + } + auto * encoded_integers = reinterpret_cast(for_decoded.data()); + + // unFOR + for (size_t i = 0; i < count; i++) + { + encoded_integers[i] += frame_of_reference; + } + + // Decoding + for (size_t i = 0; i < count; i++) + { + auto encoded_integer = static_cast(encoded_integers[i]); + output[i] = Compression::decodeValue(encoded_integer, encoding_indices); + } + + // Exceptions Patching + for (size_t i = 0; i < exceptions_count; ++i) + { + output[exceptions_positions[i]] = exceptions[i]; + } + } +}; + +}; // namespace DB::ALP diff --git a/dbms/src/IO/Compression/ALP/CompressionState.h b/dbms/src/IO/Compression/ALP/CompressionState.h new file mode 100644 index 00000000000..f991afac7f8 --- /dev/null +++ b/dbms/src/IO/Compression/ALP/CompressionState.h @@ -0,0 +1,109 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB::ALP +{ + +struct EncodingIndices +{ +public: + UInt8 exponent; + UInt8 factor; + + EncodingIndices(UInt8 exponent, UInt8 factor) + : exponent(exponent) + , factor(factor) + {} + + EncodingIndices() + : exponent(0) + , factor(0) + {} +}; + +struct EncodingIndicesEquality +{ + bool operator()(const EncodingIndices & a, const EncodingIndices & b) const + { + return a.exponent == b.exponent && a.factor == b.factor; + } +}; + +struct EncodingIndicesHash +{ + size_t operator()(const EncodingIndices & encoding_indices) const + { + using boost::hash_combine; + using boost::hash_value; + + size_t seed = 0; + hash_combine(seed, hash_value(encoding_indices.exponent)); + hash_combine(seed, hash_value(encoding_indices.factor)); + return seed; + } +}; + +struct Combination +{ + EncodingIndices encoding_indices; + UInt64 n_appearances; + UInt64 estimated_compression_size; + + Combination(EncodingIndices encoding_indices, UInt64 n_appearances, UInt64 estimated_compression_size) + : encoding_indices(encoding_indices) + , n_appearances(n_appearances) + , estimated_compression_size(estimated_compression_size) + {} +}; + +template +class CompressionState +{ +public: + CompressionState() + : vector_encoding_indices(0, 0) + , bit_width(0) + , bp_size(0) + , frame_of_reference(0) + {} + + void reset() + { + vector_encoding_indices = {0, 0}; + bit_width = 0; + } + + void resetCombinations() { best_k_combinations.clear(); } + +public: + // The following variables will be the same for all vectors + std::vector best_k_combinations; + + // The following variables will be different for each vector + EncodingIndices vector_encoding_indices; // Selected factor-exponent for the vector + UInt8 bit_width; // Bit width for the bitpacking + UInt64 bp_size; // Number of bytes after bitpacking + UInt64 frame_of_reference; // Frame of reference for the vector + std::vector encoded_integers; // After ALP encoding + std::vector exceptions; // Exceptions + std::vector exceptions_positions; // Positions of the exceptions + std::vector values_encoded; // After bitpacking +}; + +} // namespace DB::ALP diff --git a/dbms/src/IO/Compression/ALP/Constants.h b/dbms/src/IO/Compression/ALP/Constants.h new file mode 100644 index 00000000000..dbed8691386 --- /dev/null +++ b/dbms/src/IO/Compression/ALP/Constants.h @@ -0,0 +1,194 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB::ALP +{ + +class Constants +{ +public: + static constexpr UInt16 SAMPLES_PER_VECTOR = 32; + + static constexpr UInt8 FRAME_OF_REFERENCE_SIZE = sizeof(UInt64); + static constexpr UInt8 BIT_WIDTH_SIZE = sizeof(UInt8); + static constexpr UInt8 EXPONENT_SIZE = sizeof(UInt8); + static constexpr UInt8 FACTOR_SIZE = sizeof(UInt8); + static constexpr UInt8 FOR_SIZE = sizeof(UInt64); + static constexpr UInt8 EXCEPTION_POSITION_SIZE = sizeof(UInt16); + static constexpr UInt8 HEADER_SIZE = FRAME_OF_REFERENCE_SIZE + BIT_WIDTH_SIZE + EXPONENT_SIZE + FACTOR_SIZE; + + static constexpr UInt8 SAMPLING_EARLY_EXIT_THRESHOLD = 2; + + static void writeHeader( + char *& dest, + UInt64 frame_of_reference, + UInt8 bit_width, + UInt8 vector_exponent, + UInt8 vector_factor) + { + unalignedStore(dest, frame_of_reference); + dest += FRAME_OF_REFERENCE_SIZE; + unalignedStore(dest, bit_width); + dest += BIT_WIDTH_SIZE; + unalignedStore(dest, vector_exponent); + dest += EXPONENT_SIZE; + unalignedStore(dest, vector_factor); + dest += FACTOR_SIZE; + } + + static std::tuple readHeader(const char *& source) + { + auto frame_of_reference = unalignedLoad(source); + source += FRAME_OF_REFERENCE_SIZE; + auto bit_width = unalignedLoad(source); + source += BIT_WIDTH_SIZE; + auto vector_exponent = unalignedLoad(source); + source += EXPONENT_SIZE; + auto vector_factor = unalignedLoad(source); + source += FACTOR_SIZE; + return {frame_of_reference, bit_width, vector_exponent, vector_factor}; + } + + // Largest double which fits into an int64 + static constexpr double ENCODING_UPPER_LIMIT = 9223372036854774784.0; + static constexpr double ENCODING_LOWER_LIMIT = -9223372036854774784.0; + + static constexpr UInt8 MAX_COMBINATIONS = 5; + + static constexpr const Int64 FACT_ARR[] + = {1, + 10, + 100, + 1000, + 10000, + 100000, + 1000000, + 10000000, + 100000000, + 1000000000, + 10000000000, + 100000000000, + 1000000000000, + 10000000000000, + 100000000000000, + 1000000000000000, + 10000000000000000, + 100000000000000000, + 1000000000000000000}; +}; + +template +struct TypedConstants +{ +}; + +template <> +struct TypedConstants +{ + static constexpr float MAGIC_NUMBER = 12582912.0; //! 2^22 + 2^23 + static constexpr UInt8 MAX_EXPONENT = 10; + // exceptions positions + exceptions values + static constexpr UInt8 EXCEPTIONS_PAIR_SIZE = sizeof(UInt16) + sizeof(float); + + static constexpr const float EXP_ARR[] + = {1.0F, + 10.0F, + 100.0F, + 1000.0F, + 10000.0F, + 100000.0F, + 1000000.0F, + 10000000.0F, + 100000000.0F, + 1000000000.0F, + 10000000000.0F}; + + static constexpr float FRAC_ARR[] + = {1.0F, + 0.1F, + 0.01F, + 0.001F, + 0.0001F, + 0.00001F, + 0.000001F, + 0.0000001F, + 0.00000001F, + 0.000000001F, + 0.0000000001F}; +}; + +template <> +struct TypedConstants +{ + static constexpr double MAGIC_NUMBER = 6755399441055744.0; //! 2^51 + 2^52 + static constexpr UInt8 MAX_EXPONENT = 18; //! 10^18 is the maximum int64 + // exceptions positions + exceptions values + static constexpr UInt8 EXCEPTIONS_PAIR_SIZE = sizeof(UInt16) + sizeof(double); + + static constexpr const double EXP_ARR[] + = {1.0, + 10.0, + 100.0, + 1000.0, + 10000.0, + 100000.0, + 1000000.0, + 10000000.0, + 100000000.0, + 1000000000.0, + 10000000000.0, + 100000000000.0, + 1000000000000.0, + 10000000000000.0, + 100000000000000.0, + 1000000000000000.0, + 10000000000000000.0, + 100000000000000000.0, + 1000000000000000000.0, + 10000000000000000000.0, + 100000000000000000000.0, + 1000000000000000000000.0, + 10000000000000000000000.0, + 100000000000000000000000.0}; + + static constexpr double FRAC_ARR[] + = {1.0, + 0.1, + 0.01, + 0.001, + 0.0001, + 0.00001, + 0.000001, + 0.0000001, + 0.00000001, + 0.000000001, + 0.0000000001, + 0.00000000001, + 0.000000000001, + 0.0000000000001, + 0.00000000000001, + 0.000000000000001, + 0.0000000000000001, + 0.00000000000000001, + 0.000000000000000001, + 0.0000000000000000001, + 0.00000000000000000001}; +}; + +} // namespace DB::ALP diff --git a/dbms/src/IO/Compression/CompressionCodecFactory.cpp b/dbms/src/IO/Compression/CompressionCodecFactory.cpp index 85bae075aa6..cd23eff2f8e 100644 --- a/dbms/src/IO/Compression/CompressionCodecFactory.cpp +++ b/dbms/src/IO/Compression/CompressionCodecFactory.cpp @@ -175,23 +175,21 @@ CompressionCodecPtr CompressionCodecFactory::create(const CompressionSetting & s if constexpr (IS_COMPRESS) { - // If method_byte is Lightweight, use LZ4 codec for non-integral types + // If method_byte is Lightweight, use LZ4 codec for non-number types + if (!isInteger(setting.data_type) && !isFloat(setting.data_type) + && setting.method_byte == CompressionMethodByte::Lightweight) + { + auto method = CompressionMethod::LZ4; + CompressionSetting setting(method, CompressionSetting::getDefaultLevel(method)); + return getStaticCodec(setting); + } + // If method_byte is DeltaFOR/RunLength/FOR, since we do not support use these methods independently, // there must be another codec to compress data. Use that compress codec directly. - if (!isInteger(setting.data_type)) + if (!isInteger(setting.data_type) && setting.method_byte != CompressionMethodByte::Lightweight) { - if (setting.method_byte == CompressionMethodByte::Lightweight) - { - // Use LZ4 codec for non-integral types - // TODO: maybe we can use zstd? - auto method = CompressionMethod::LZ4; - CompressionSetting setting(method, CompressionSetting::getDefaultLevel(method)); - return getStaticCodec(setting); - } - else - return nullptr; + return nullptr; } - // else fallthrough } switch (setting.method_byte) diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight.cpp b/dbms/src/IO/Compression/CompressionCodecLightweight.cpp index ac297c17cac..7a0900a40e9 100644 --- a/dbms/src/IO/Compression/CompressionCodecLightweight.cpp +++ b/dbms/src/IO/Compression/CompressionCodecLightweight.cpp @@ -32,6 +32,7 @@ extern const int CANNOT_DECOMPRESS; CompressionCodecLightweight::CompressionCodecLightweight(CompressionDataType data_type_, int level_) : ctx(level_) + , float_ctx(level_) , data_type(data_type_) {} @@ -43,7 +44,7 @@ UInt8 CompressionCodecLightweight::getMethodByte() const UInt32 CompressionCodecLightweight::getMaxCompressedDataSize(UInt32 uncompressed_size) const { // 1 byte for bytes_size, 1 byte for mode, and the rest for compressed data - return 1 + 1 + LZ4_COMPRESSBOUND(uncompressed_size); + return 1 + 1 + LZ4_COMPRESSBOUND(uncompressed_size) * 4; } UInt32 CompressionCodecLightweight::doCompressData(const char * source, UInt32 source_size, char * dest) const @@ -61,10 +62,12 @@ UInt32 CompressionCodecLightweight::doCompressData(const char * source, UInt32 s case CompressionDataType::Int64: return 1 + compressDataForInteger(source, source_size, dest); case CompressionDataType::Float32: + return 1 + compressDataForFloat(source, source_size, dest); case CompressionDataType::Float64: + return 1 + compressDataForFloat(source, source_size, dest); case CompressionDataType::String: case CompressionDataType::Unknown: - return 1 + compressDataForNonInteger(source, source_size, dest); + return 1 + compressDataForString(source, source_size, dest); default: throw Exception( ErrorCodes::CANNOT_COMPRESS, @@ -111,10 +114,14 @@ void CompressionCodecLightweight::doDecompressData( decompressDataForInteger(&source[1], source_size_no_header, dest, uncompressed_size); break; case CompressionDataType::Float32: + decompressDataForFloat(&source[1], source_size_no_header, dest, uncompressed_size); + break; case CompressionDataType::Float64: + decompressDataForFloat(&source[1], source_size_no_header, dest, uncompressed_size); + break; case CompressionDataType::String: case CompressionDataType::Unknown: - decompressDataForNonInteger(&source[1], source_size_no_header, dest, uncompressed_size); + decompressDataForString(&source[1], source_size_no_header, dest, uncompressed_size); break; default: throw Exception( diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight.h b/dbms/src/IO/Compression/CompressionCodecLightweight.h index 3bc3475c8ff..0e8359393a8 100644 --- a/dbms/src/IO/Compression/CompressionCodecLightweight.h +++ b/dbms/src/IO/Compression/CompressionCodecLightweight.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -130,13 +131,59 @@ class CompressionCodecLightweight : public ICompressionCodec template void decompressDataForInteger(const char * source, UInt32 source_size, char * dest, UInt32 output_size) const; - /// Non-integer data + /// Float data - static size_t compressDataForNonInteger(const char * source, UInt32 source_size, char * dest); - static void decompressDataForNonInteger(const char * source, UInt32 source_size, char * dest, UInt32 output_size); + enum class FloatMode : UInt8 + { + Invalid = 0, + ALP = 1, // Adaptive lossless floating-point compression + ALPRD = 2, // + LZ4 = 3, // the above modes are not suitable, use LZ4 instead + }; + + class FloatCompressContext + { + public: + explicit FloatCompressContext(int n_samples_) + : n_samples(std::min(1, n_samples_)) + {} + + template + void analyze(const std::span & values); + + template + ALP::CompressionState & getState() + { + if constexpr (std::is_same_v) + return float_state; + else if constexpr (std::is_same_v) + return double_state; + } + + private: + bool needAnalyze() const { return !analyzed; } + + private: + const int n_samples; + bool analyzed = false; + ALP::CompressionState float_state; + ALP::CompressionState double_state; + }; + + + template + size_t compressDataForFloat(const char * source, UInt32 source_size, char * dest) const; + template + static void decompressDataForFloat(const char * source, UInt32 source_size, char * dest, UInt32 output_size); + + /// String data + + static size_t compressDataForString(const char * source, UInt32 source_size, char * dest); + static void decompressDataForString(const char * source, UInt32 source_size, char * dest, UInt32 output_size); private: mutable IntegerCompressContext ctx; + mutable FloatCompressContext float_ctx; const CompressionDataType data_type; }; diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight_Float.cpp b/dbms/src/IO/Compression/CompressionCodecLightweight_Float.cpp new file mode 100644 index 00000000000..4ed67d66144 --- /dev/null +++ b/dbms/src/IO/Compression/CompressionCodecLightweight_Float.cpp @@ -0,0 +1,159 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int CANNOT_COMPRESS; +extern const int CANNOT_DECOMPRESS; +} // namespace ErrorCodes + +template +void CompressionCodecLightweight::FloatCompressContext::analyze(const std::span & values) +{ + if (!needAnalyze()) + return; + + auto & state = getState(); + ALP::Analyze::run(values, n_samples, state); +} + +template void CompressionCodecLightweight::FloatCompressContext::analyze(const std::span & values); +template void CompressionCodecLightweight::FloatCompressContext::analyze( + const std::span & values); + +template +size_t CompressionCodecLightweight::compressDataForFloat(const char * source, UInt32 source_size, char * dest) const +{ + char * dest_start = dest; + + const auto bytes_size = sizeof(T); + if unlikely (source_size % bytes_size != 0) + throw Exception( + ErrorCodes::CANNOT_COMPRESS, + "Cannot compress with lightweight-float codec, data size {} is not aligned to {}", + source_size, + bytes_size); + + // Load values + const size_t count = source_size / bytes_size; + std::span values(reinterpret_cast(source), count); + + // Analyze + float_ctx.analyze(values); + + // Compress + auto & state = float_ctx.getState(); + ALP::Compression::compress(values.data(), count, state); + + // Store + // Write header + ALP::Constants::writeHeader( + dest, + state.frame_of_reference, + state.bit_width, + state.vector_encoding_indices.exponent, + state.vector_encoding_indices.factor); + // Write bitpacked values + memcpy(dest, state.values_encoded.data(), state.bp_size); + dest += state.bp_size; + // Write exceptions + if (!state.exceptions.empty()) + { + memcpy(dest, state.exceptions.data(), state.exceptions.size() * sizeof(T)); + dest += state.exceptions.size() * sizeof(T); + memcpy(dest, state.exceptions_positions.data(), state.exceptions_positions.size() * sizeof(UInt16)); + dest += state.exceptions_positions.size() * sizeof(UInt16); + } + + return dest - dest_start; +} + +template size_t CompressionCodecLightweight::compressDataForFloat( + const char * source, + UInt32 source_size, + char * dest) const; +template size_t CompressionCodecLightweight::compressDataForFloat( + const char * source, + UInt32 source_size, + char * dest) const; + +template +void CompressionCodecLightweight::decompressDataForFloat( + const char * source, + UInt32 source_size, + char * dest, + UInt32 output_size) +{ + if unlikely (output_size % sizeof(T) != 0) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot decompress lightweight-float codec data. Uncompressed size {} is not aligned to {}", + output_size, + sizeof(T)); + + size_t count = output_size / sizeof(T); + // Read header + auto [frame_of_reference, bit_width, exponent, factor] = ALP::Constants::readHeader(source); + // Read bitpacked values + auto bp_size = BitpackingPrimitives::getRequiredSize(count, bit_width); + std::vector for_encoded(bp_size); + memcpy(for_encoded.data(), source, bp_size); + source += bp_size; + // Read exceptions + size_t exceptions_count + = (source_size - (ALP::Constants::HEADER_SIZE + bp_size)) / (ALP::TypedConstants::EXCEPTIONS_PAIR_SIZE); + std::vector exceptions(exceptions_count); + std::vector exceptions_positions(exceptions_count); + if (exceptions_count > 0) + { + memcpy(exceptions.data(), source, exceptions_count * sizeof(T)); + source += exceptions_count * sizeof(T); + memcpy(exceptions_positions.data(), source, exceptions_count * sizeof(UInt16)); + source += exceptions_count * sizeof(UInt16); + } + // Decompress + ALP::Decompression::decompress( + for_encoded.data(), + reinterpret_cast(dest), + count, + factor, + exponent, + exceptions_count, + exceptions.data(), + exceptions_positions.data(), + frame_of_reference, + bit_width); +} + +template void CompressionCodecLightweight::decompressDataForFloat( + const char * source, + UInt32 source_size, + char * dest, + UInt32 output_size); +template void CompressionCodecLightweight::decompressDataForFloat( + const char * source, + UInt32 source_size, + char * dest, + UInt32 output_size); + +} // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight_NonInteger.cpp b/dbms/src/IO/Compression/CompressionCodecLightweight_String.cpp similarity index 89% rename from dbms/src/IO/Compression/CompressionCodecLightweight_NonInteger.cpp rename to dbms/src/IO/Compression/CompressionCodecLightweight_String.cpp index efe669a2825..38e6c8aa64c 100644 --- a/dbms/src/IO/Compression/CompressionCodecLightweight_NonInteger.cpp +++ b/dbms/src/IO/Compression/CompressionCodecLightweight_String.cpp @@ -26,7 +26,7 @@ extern const int CANNOT_COMPRESS; extern const int CANNOT_DECOMPRESS; } // namespace ErrorCodes -size_t CompressionCodecLightweight::compressDataForNonInteger(const char * source, UInt32 source_size, char * dest) +size_t CompressionCodecLightweight::compressDataForString(const char * source, UInt32 source_size, char * dest) { auto success = LZ4_compress_fast( source, @@ -40,7 +40,7 @@ size_t CompressionCodecLightweight::compressDataForNonInteger(const char * sourc } -void CompressionCodecLightweight::decompressDataForNonInteger( +void CompressionCodecLightweight::decompressDataForString( const char * source, UInt32 source_size, char * dest, diff --git a/dbms/src/IO/Compression/CompressionInfo.h b/dbms/src/IO/Compression/CompressionInfo.h index 3f13294f402..ea48fb2f4c5 100644 --- a/dbms/src/IO/Compression/CompressionInfo.h +++ b/dbms/src/IO/Compression/CompressionInfo.h @@ -85,4 +85,9 @@ inline bool isInteger(CompressionDataType type) return type >= CompressionDataType::Int8 && type <= CompressionDataType::Int64; } +inline bool isFloat(CompressionDataType type) +{ + return type == CompressionDataType::Float32 || type == CompressionDataType::Float64; +} + } // namespace DB diff --git a/dbms/src/IO/Compression/tests/CodecTestSequence.h b/dbms/src/IO/Compression/tests/CodecTestSequence.h index b7a98c1a0dd..4c9503f3824 100644 --- a/dbms/src/IO/Compression/tests/CodecTestSequence.h +++ b/dbms/src/IO/Compression/tests/CodecTestSequence.h @@ -72,18 +72,25 @@ DataTypePtr makeDataType() return nullptr; } +template +std::string bin(const T & value, size_t bits = sizeof(T) * 8) +{ + static const uint8_t MAX_BITS = sizeof(T) * 8; + assert(bits <= MAX_BITS); + + return std::bitset(static_cast(value)).to_string().substr(MAX_BITS - bits, bits); +} + struct CodecTestSequence { std::string name; std::vector serialized_data; DataTypePtr data_type; - UInt8 type_byte; - CodecTestSequence(std::string name_, std::vector serialized_data_, DataTypePtr data_type_, UInt8 type_byte_) + CodecTestSequence(std::string name_, std::vector serialized_data_, DataTypePtr data_type_) : name(name_) , serialized_data(serialized_data_) , data_type(data_type_) - , type_byte(type_byte_) {} CodecTestSequence & append(const CodecTestSequence & other) @@ -122,7 +129,7 @@ CodecTestSequence operator*(CodecTestSequence && left, T times) left.name + " x " + std::to_string(times), std::move(data), std::move(left.data_type), - sizeof(T)}; + }; } std::ostream & operator<<(std::ostream & ostr, const CompressionMethodByte method_byte) @@ -156,7 +163,7 @@ CodecTestSequence makeSeq(Args &&... args) (fmt::format("{} values of {}", std::size(vals), type_name())), std::move(data), makeDataType(), - sizeof(T)}; + }; } template @@ -178,7 +185,7 @@ CodecTestSequence generateSeq(Generator gen, const char * gen_name, int Begin = (fmt::format("{} values of {} from {}", (End - Begin), type_name(), gen_name)), std::move(data), makeDataType(), - sizeof(T)}; + }; } /////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp b/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp index f359c2f9e49..a4170cc54f6 100644 --- a/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp +++ b/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp @@ -102,6 +102,7 @@ ::testing::AssertionResult EqualByteContainersAs(const ContainerLeft & left, con auto l = AsSequenceOf(left); auto r = AsSequenceOf(right); + size_t i = 0; while (l && r) { @@ -109,12 +110,17 @@ ::testing::AssertionResult EqualByteContainersAs(const ContainerLeft & left, con const auto right_value = *r; ++l; ++r; + ++i; if (left_value != right_value) { if (result) { - result = ::testing::AssertionFailure(); + result = ::testing::AssertionFailure() + << "\nmismatch at position " << i << "\nexpected: " << bin(left_value) << " (0x" << std::hex + << static_cast(left_value) << ")" + << "\ngot : " << bin(right_value) << " (0x" << std::hex << static_cast(right_value) + << ")"; break; } } @@ -148,10 +154,9 @@ ::testing::AssertionResult EqualByteContainers( } } -CompressionCodecPtr makeCodec(const CompressionMethodByte method_byte, UInt8 type_byte) +CompressionCodecPtr makeCodec(const CompressionMethodByte method_byte, const DataTypePtr & data_type) { - CompressionSetting setting(method_byte); - setting.data_type = magic_enum::enum_cast(type_byte).value(); + auto setting = CompressionSetting::create(method_byte, 1, *data_type); return CompressionCodecFactory::create(setting); } @@ -193,11 +198,9 @@ try const auto method_byte = std::get<0>(GetParam()); const auto sequences = std::get<1>(GetParam()); ASSERT_FALSE(sequences.empty()); - auto type_byte = sequences.front().type_byte; - const auto codec = DB::tests::makeCodec(method_byte, type_byte); + const auto codec = DB::tests::makeCodec(method_byte, sequences.front().data_type); for (const auto & sequence : sequences) { - ASSERT_EQ(sequence.type_byte, type_byte); DB::tests::testTranscoding(*codec, sequence); } } @@ -212,7 +215,7 @@ try { const auto method_byte = std::get<0>(GetParam()); const auto sequence = std::get<1>(GetParam()); - const auto codec = DB::tests::makeCodec(method_byte, sequence.type_byte); + const auto codec = DB::tests::makeCodec(method_byte, sequence.data_type); DB::tests::testTranscoding(*codec, sequence); } CATCH @@ -401,25 +404,27 @@ INSTANTIATE_TEST_CASE_P( generateSeq(G(RepeatGenerator(0))), generateSeq(G(RepeatGenerator(0)))))); -// INSTANTIATE_TEST_CASE_P( -// RandomishInt, -// CodecTest, -// ::testing::Combine( -// IntegerCodecsToTest, -// ::testing::Values( -// generateSeq(G(RandomishGenerator)), -// generateSeq(G(RandomishGenerator)), -// generateSeq(G(RandomishGenerator)), -// generateSeq(G(RandomishGenerator)), -// generateSeq(G(RandomishGenerator)), -// generateSeq(G(RandomishGenerator))))); - - -// INSTANTIATE_TEST_CASE_P( -// RandomishFloat, -// CodecTest, -// ::testing::Combine( -// IntegerCodecsToTest, -// ::testing::Values(generateSeq(G(RandomishGenerator)), generateSeq(G(RandomishGenerator))))); +const auto GeneralCodecsToTest = ::testing::Values(CompressionMethodByte::Lightweight); + +auto RandomishGenerator = [](auto i) { + using T = decltype(i); + double sin_value = sin(static_cast(i * i)) * i; + if (sin_value < std::numeric_limits::lowest() || sin_value > static_cast(std::numeric_limits::max())) + return T{}; + return T(sin_value); +}; + +INSTANTIATE_TEST_CASE_P( + RandomishInt, + CodecTest, + ::testing::Combine( + GeneralCodecsToTest, + ::testing::Values( + generateSeq(G(RandomishGenerator)), + generateSeq(G(RandomishGenerator)), + generateSeq(G(RandomishGenerator)), + generateSeq(G(RandomishGenerator)), + generateSeq(G(RandomishGenerator)), + generateSeq(G(RandomishGenerator))))); } // namespace DB::tests