Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregator support batch serialize #9777

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
4 changes: 2 additions & 2 deletions dbms/src/Columns/ColumnAggregateFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class ColumnAggregateFunction final : public COWPtrHelper<IColumn, ColumnAggrega
PaddedPODArray<char *> & /* pos */,
size_t /* start */,
size_t /* length */,
bool /* has_null */,
const NullMap * /* nullmap */,
const TiDB::TiDBCollatorPtr & /* collator */,
String * /* sort_key_container */) const override
{
Expand All @@ -217,7 +217,7 @@ class ColumnAggregateFunction final : public COWPtrHelper<IColumn, ColumnAggrega
PaddedPODArray<char *> & /* pos */,
size_t /* start */,
size_t /* length */,
bool /* has_null */,
const NullMap * /* nullmap */,
const IColumn::Offsets & /* offsets */,
const TiDB::TiDBCollatorPtr & /* collator */,
String * /* sort_key_container */) const override
Expand Down
62 changes: 48 additions & 14 deletions dbms/src/Columns/ColumnArray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ void ColumnArray::countSerializeByteSize(PaddedPODArray<size_t> & byte_size) con
countSerializeByteSizeImpl<false>(byte_size, nullptr);
}

template <bool for_compare>
template <bool compare_semantics>
void ColumnArray::countSerializeByteSizeImpl(PaddedPODArray<size_t> & byte_size, const TiDB::TiDBCollatorPtr & collator)
const
{
Expand All @@ -251,7 +251,7 @@ void ColumnArray::countSerializeByteSizeImpl(PaddedPODArray<size_t> & byte_size,
for (size_t i = 0; i < size; ++i)
byte_size[i] += sizeof(UInt32);

if constexpr (for_compare)
if constexpr (compare_semantics)
getData().countSerializeByteSizeForCmpColumnArray(byte_size, getOffsets(), collator);
else
getData().countSerializeByteSizeForColumnArray(byte_size, getOffsets());
Expand All @@ -261,35 +261,63 @@ void ColumnArray::serializeToPosForCmp(
PaddedPODArray<char *> & pos,
size_t start,
size_t length,
bool has_null,
const NullMap * nullmap,
const TiDB::TiDBCollatorPtr & collator,
String * sort_key_container) const
{
if (has_null)
serializeToPosImpl<true, true>(pos, start, length, collator, sort_key_container);
if (nullmap != nullptr)
serializeToPosImpl</*has_null=*/false, /*compare_semantics=*/true, /*has_nullmap=*/true>(
pos,
start,
length,
collator,
sort_key_container,
nullmap);
else
serializeToPosImpl<false, true>(pos, start, length, collator, sort_key_container);
serializeToPosImpl</*has_null=*/false, /*compare_semantics=*/true, /*has_nullmap=*/false>(
pos,
start,
length,
collator,
sort_key_container,
nullptr);
}

void ColumnArray::serializeToPos(PaddedPODArray<char *> & pos, size_t start, size_t length, bool has_null) const
{
if (has_null)
serializeToPosImpl<true, false>(pos, start, length, nullptr, nullptr);
serializeToPosImpl</*has_null=*/true, /*compare_semantics=*/false, /*has_nullmap=*/false>(
pos,
start,
length,
nullptr,
nullptr,
nullptr);
else
serializeToPosImpl<false, false>(pos, start, length, nullptr, nullptr);
serializeToPosImpl</*has_null=*/false, /*compare_semantics=*/false, /*has_nullmap=*/false>(
pos,
start,
length,
nullptr,
nullptr,
nullptr);
}

template <bool has_null, bool for_compare>
template <bool has_null, bool compare_semantics, bool has_nullmap>
void ColumnArray::serializeToPosImpl(
PaddedPODArray<char *> & pos,
size_t start,
size_t length,
const TiDB::TiDBCollatorPtr & collator,
String * sort_key_container) const
String * sort_key_container,
const NullMap * nullmap) const
{
RUNTIME_CHECK_MSG(length <= pos.size(), "length({}) > size of pos({})", length, pos.size());
RUNTIME_CHECK_MSG(start + length <= size(), "start({}) + length({}) > size of column({})", start, length, size());

static_assert(!(has_null && has_nullmap));
RUNTIME_CHECK(!has_nullmap || (nullmap && nullmap->size() == size()));

/// countSerializeByteSize has already checked that the size of one element is not greater than UINT32_MAX
for (size_t i = 0; i < length; ++i)
{
Expand All @@ -298,14 +326,20 @@ void ColumnArray::serializeToPosImpl(
if (pos[i] == nullptr)
continue;
}

UInt32 len = sizeAt(start + i);
if constexpr (has_nullmap)
{
if (DB::isNullAt(*nullmap, start + i))
len = 0;
}
tiflash_compiler_builtin_memcpy(pos[i], &len, sizeof(UInt32));
pos[i] += sizeof(UInt32);
}

if constexpr (for_compare)
if constexpr (compare_semantics)
getData()
.serializeToPosForCmpColumnArray(pos, start, length, has_null, getOffsets(), collator, sort_key_container);
.serializeToPosForCmpColumnArray(pos, start, length, nullmap, getOffsets(), collator, sort_key_container);
else
getData().serializeToPosForColumnArray(pos, start, length, has_null, getOffsets());
}
Expand All @@ -320,7 +354,7 @@ void ColumnArray::deserializeAndInsertFromPos(PaddedPODArray<char *> & pos, bool
deserializeAndInsertFromPosImpl<false>(pos, use_nt_align_buffer);
}

template <bool for_compare>
template <bool compare_semantics>
void ColumnArray::deserializeAndInsertFromPosImpl(PaddedPODArray<char *> & pos, bool use_nt_align_buffer)
{
auto & offsets = getOffsets();
Expand All @@ -336,7 +370,7 @@ void ColumnArray::deserializeAndInsertFromPosImpl(PaddedPODArray<char *> & pos,
pos[i] += sizeof(UInt32);
}

if constexpr (for_compare)
if constexpr (compare_semantics)
getData().deserializeForCmpAndInsertFromPosColumnArray(pos, offsets, use_nt_align_buffer);
else
getData().deserializeAndInsertFromPosForColumnArray(pos, offsets, use_nt_align_buffer);
Expand Down
13 changes: 7 additions & 6 deletions dbms/src/Columns/ColumnArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,19 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>

ColumnArray(const ColumnArray &) = default;

template <bool for_compare>
template <bool compare_semantics>
void countSerializeByteSizeImpl(PaddedPODArray<size_t> & byte_size, const TiDB::TiDBCollatorPtr & collator) const;

template <bool has_null, bool for_compare>
template <bool has_null, bool compare_semantics, bool has_nullmap>
void serializeToPosImpl(
PaddedPODArray<char *> & pos,
size_t start,
size_t length,
const TiDB::TiDBCollatorPtr & collator,
String * sort_key_container) const;
String * sort_key_container,
const NullMap * nullmap) const;

template <bool for_compare>
template <bool compare_semantics>
void deserializeAndInsertFromPosImpl(PaddedPODArray<char *> & pos, bool use_nt_align_buffer);

public:
Expand Down Expand Up @@ -122,7 +123,7 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
PaddedPODArray<char *> & pos,
size_t start,
size_t length,
bool has_null,
const NullMap * nullmap,
const TiDB::TiDBCollatorPtr & collator,
String * sort_key_container) const override;
void serializeToPos(PaddedPODArray<char *> & pos, size_t start, size_t length, bool has_null) const override;
Expand All @@ -131,7 +132,7 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
PaddedPODArray<char *> & /* pos */,
size_t /* start */,
size_t /* length */,
bool /* has_null */,
const NullMap * /* nullmap */,
const IColumn::Offsets & /* array_offsets */,
const TiDB::TiDBCollatorPtr & /* collator */,
String * /* sort_key_container */) const override
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Columns/ColumnConst.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class ColumnConst final : public COWPtrHelper<IColumn, ColumnConst>
PaddedPODArray<char *> & /* pos */,
size_t /* start */,
size_t /* length */,
bool /* has_null */,
const NullMap * /* nullmap */,
const TiDB::TiDBCollatorPtr & /* collator */,
String * /* sort_key_container */) const override
{
Expand All @@ -166,7 +166,7 @@ class ColumnConst final : public COWPtrHelper<IColumn, ColumnConst>
PaddedPODArray<char *> & /* pos */,
size_t /* start */,
size_t /* length */,
bool /* has_null */,
const NullMap * /* nullmap */,
const IColumn::Offsets & /* array_offsets */,
const TiDB::TiDBCollatorPtr & /* collator */,
String * /* sort_key_container */) const override
Expand Down
54 changes: 38 additions & 16 deletions dbms/src/Columns/ColumnDecimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,13 @@ const char * ColumnDecimal<T>::deserializeAndInsertFromArena(const char * pos, c
}

template <typename T>
template <bool for_compare>
void ColumnDecimal<T>::countSerializeByteSizeImpl(PaddedPODArray<size_t> & byte_size) const
template <bool compare_semantics>
void ColumnDecimal<T>::countSerializeByteSizeImpl(PaddedPODArray<size_t> & byte_size, const NullMap *) const
{
RUNTIME_CHECK_MSG(byte_size.size() == size(), "size of byte_size({}) != column size({})", byte_size.size(), size());

size_t size = byte_size.size();
if constexpr (for_compare && is_Decimal256)
if constexpr (compare_semantics && is_Decimal256)
{
for (size_t i = 0; i < size; ++i)
{
Expand All @@ -160,9 +160,8 @@ void ColumnDecimal<T>::countSerializeByteSizeImpl(PaddedPODArray<size_t> & byte_
}
}

// TODO add unit test
template <typename T>
template <bool for_compare>
template <bool compare_semantics>
void ColumnDecimal<T>::countSerializeByteSizeForColumnArrayImpl(
PaddedPODArray<size_t> & byte_size,
const IColumn::Offsets & array_offsets) const
Expand All @@ -173,7 +172,7 @@ void ColumnDecimal<T>::countSerializeByteSizeForColumnArrayImpl(
byte_size.size(),
array_offsets.size());

if constexpr (for_compare && is_Decimal256)
if constexpr (compare_semantics && is_Decimal256)
{
size_t size = array_offsets.size();
for (size_t i = 0; i < size; ++i)
Expand All @@ -194,21 +193,35 @@ void ColumnDecimal<T>::countSerializeByteSizeForColumnArrayImpl(
}

template <typename T>
template <bool has_null, bool for_compare>
void ColumnDecimal<T>::serializeToPosImpl(PaddedPODArray<char *> & pos, size_t start, size_t length) const
template <bool has_null, bool compare_semantics, bool has_nullmap>
void ColumnDecimal<T>::serializeToPosImpl(
PaddedPODArray<char *> & pos,
size_t start,
size_t length,
const NullMap * nullmap) const
{
RUNTIME_CHECK_MSG(length <= pos.size(), "length({}) > size of pos({})", length, pos.size());
RUNTIME_CHECK_MSG(start + length <= size(), "start({}) + length({}) > size of column({})", start, length, size());

static_assert(!(has_null && has_nullmap));
RUNTIME_CHECK(!has_nullmap || (nullmap && nullmap->size() == size()));

static constexpr T def_val{};
for (size_t i = 0; i < length; ++i)
{
if constexpr (has_null)
{
if (pos[i] == nullptr)
continue;
}
if constexpr (has_nullmap)
{
if (DB::isNullAt(*nullmap, start + i))
pos[i] = serializeDecimal256Helper(pos[i], def_val);
continue;
}

if constexpr (for_compare && is_Decimal256)
if constexpr (compare_semantics && is_Decimal256)
{
pos[i] = serializeDecimal256Helper(pos[i], data[start + i]);
}
Expand All @@ -221,12 +234,13 @@ void ColumnDecimal<T>::serializeToPosImpl(PaddedPODArray<char *> & pos, size_t s
}

template <typename T>
template <bool has_null, bool for_compare>
template <bool has_null, bool compare_semantics, bool has_nullmap>
void ColumnDecimal<T>::serializeToPosForColumnArrayImpl(
PaddedPODArray<char *> & pos,
size_t start,
size_t length,
const IColumn::Offsets & array_offsets) const
const IColumn::Offsets & array_offsets,
const NullMap * nullmap) const
{
RUNTIME_CHECK_MSG(length <= pos.size(), "length({}) > size of pos({})", length, pos.size());
RUNTIME_CHECK_MSG(
Expand All @@ -241,16 +255,24 @@ void ColumnDecimal<T>::serializeToPosForColumnArrayImpl(
array_offsets.back(),
size());

static_assert(!(has_null && has_nullmap));
RUNTIME_CHECK(!has_nullmap || (nullmap && nullmap->size() == array_offsets.size()));

for (size_t i = 0; i < length; ++i)
{
if constexpr (has_null)
{
if (pos[i] == nullptr)
continue;
}
if constexpr (has_nullmap)
{
if (DB::isNullAt(*nullmap, start + i))
continue;
}

size_t len = array_offsets[start + i] - array_offsets[start + i - 1];
if constexpr (for_compare && is_Decimal256)
if constexpr (compare_semantics && is_Decimal256)
{
for (size_t j = 0; j < len; ++j)
pos[i] = serializeDecimal256Helper(pos[i], data[array_offsets[start + i - 1] + j]);
Expand All @@ -275,7 +297,7 @@ void ColumnDecimal<T>::serializeToPosForColumnArrayImpl(
}

template <typename T>
template <bool for_compare>
template <bool compare_semantics>
void ColumnDecimal<T>::deserializeAndInsertFromPosImpl(
PaddedPODArray<char *> & pos,
bool use_nt_align_buffer [[maybe_unused]])
Expand All @@ -285,7 +307,7 @@ void ColumnDecimal<T>::deserializeAndInsertFromPosImpl(

// is_complex_decimal256 is true means Decimal256 is serialized by [bool, limb_count, n * limb].
// NT optimization is not implemented for simplicity.
static const bool is_complex_decimal256 = (for_compare && is_Decimal256);
static const bool is_complex_decimal256 = (compare_semantics && is_Decimal256);

#ifdef TIFLASH_ENABLE_AVX_SUPPORT
if (use_nt_align_buffer)
Expand Down Expand Up @@ -383,7 +405,7 @@ void ColumnDecimal<T>::deserializeAndInsertFromPosImpl(
}

template <typename T>
template <bool for_compare>
template <bool compare_semantics>
void ColumnDecimal<T>::deserializeAndInsertFromPosForColumnArrayImpl(
PaddedPODArray<char *> & pos,
const IColumn::Offsets & array_offsets,
Expand All @@ -410,7 +432,7 @@ void ColumnDecimal<T>::deserializeAndInsertFromPosForColumnArrayImpl(
for (size_t i = 0; i < size; ++i)
{
size_t len = array_offsets[start_point + i] - array_offsets[start_point + i - 1];
if constexpr (for_compare && is_Decimal256)
if constexpr (compare_semantics && is_Decimal256)
{
for (size_t j = 0; j < len; ++j)
pos[i] = const_cast<char *>(
Expand Down
Loading