Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Hashagg prefetch ckbench add batch bench prefetch threshold #9740

Open
wants to merge 43 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
b869b2f
new hash
guo-shaoge Nov 25, 2024
e8a2df8
prefetch done
guo-shaoge Nov 26, 2024
b314166
executeImplBatchStringHashMap done
guo-shaoge Nov 27, 2024
ec6e892
handle resize exception done
guo-shaoge Nov 27, 2024
9dc702d
tmp save
guo-shaoge Nov 27, 2024
ce1f767
revert Serialized Key changes
guo-shaoge Nov 28, 2024
8ac8beb
refine
guo-shaoge Nov 28, 2024
0053ce8
refine
guo-shaoge Nov 28, 2024
fcf8ed2
fix unit test
guo-shaoge Nov 29, 2024
ae7b969
refine
guo-shaoge Dec 2, 2024
3a86617
unit test
guo-shaoge Dec 2, 2024
623fef5
prefetch
guo-shaoge Dec 2, 2024
19f320d
fix
guo-shaoge Dec 2, 2024
3a226df
refine
guo-shaoge Dec 3, 2024
c44ace7
refine
guo-shaoge Dec 3, 2024
3e30f95
revert new hasher
guo-shaoge Dec 3, 2024
ea85d19
debug low distinct value
guo-shaoge Dec 3, 2024
16937ff
Revert "revert new hasher"
guo-shaoge Dec 3, 2024
d2fba57
refine original code path
guo-shaoge Dec 3, 2024
71b6ecd
Reapply "revert new hasher"
guo-shaoge Dec 3, 2024
40ceb08
one level old hash; two level new hash
guo-shaoge Dec 3, 2024
4cb24c2
Revert "one level old hash; two level new hash"
guo-shaoge Dec 4, 2024
c02cf71
revert new hasher; refine original code path
guo-shaoge Dec 4, 2024
352b710
fix case
guo-shaoge Dec 5, 2024
e5ab87c
refine
guo-shaoge Dec 13, 2024
30d99be
is_serialized_key
guo-shaoge Dec 15, 2024
01cae80
fix
guo-shaoge Dec 15, 2024
95c597d
fix start_row
guo-shaoge Dec 15, 2024
83fb879
Merge branch 'master' into hashagg_prefetch
guo-shaoge Dec 16, 2024
e0dea79
revert prefetch for StringHashTable
guo-shaoge Dec 17, 2024
143fee3
comment
guo-shaoge Dec 17, 2024
6cea464
fmt
guo-shaoge Dec 18, 2024
6750737
comment
guo-shaoge Dec 19, 2024
404cb10
addBatch
guo-shaoge Dec 20, 2024
14c661a
addBatch prefetch first
guo-shaoge Dec 20, 2024
4a445ea
prefetch agg data when insert into columns
guo-shaoge Dec 23, 2024
b7dd05e
addBatch prefetch step 16->32
guo-shaoge Dec 23, 2024
7721022
fix insertResult bug
guo-shaoge Dec 23, 2024
f4d9282
prefetch threshold 8192*16
guo-shaoge Dec 23, 2024
cd43b99
emplaceOrFindKey mini batch 256
guo-shaoge Dec 23, 2024
490a89f
remove addBatch prefetch
guo-shaoge Dec 23, 2024
9d68d28
only prefetch agg data when key exists
guo-shaoge Dec 23, 2024
286b5b7
getHash inside emplace for loop
guo-shaoge Dec 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,12 @@ class AggregateFunctionGroupUniqArrayGeneric

void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
auto & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
IColumn & data_to = arr_to.getData();

auto & set = this->data(place).value;
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + set.size());
offsets_to.push_back((offsets_to.empty() ? 0 : offsets_to.back()) + set.size());

for (auto & elem : set)
deserializeAndInsert<is_plain_column>(elem.getValue(), data_to);
Expand Down
74 changes: 68 additions & 6 deletions dbms/src/AggregateFunctions/IAggregateFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ class IAggregateFunction
/** Contains a loop with calls to "add" function. You can collect arguments into array "places"
* and do a single call to "addBatch" for devirtualization and inlining.
*/
virtual void addBatchWithPrefetch(
size_t start_offset,
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const { RUNTIME_CHECK_MSG(false, "cannot be here");}

virtual void addBatch(
size_t start_offset,
size_t batch_size,
Expand Down Expand Up @@ -235,20 +244,73 @@ class IAggregateFunctionHelper : public IAggregateFunction
Arena * arena,
ssize_t if_argument_pos = -1) const override
{
const auto end = start_offset + batch_size;
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = start_offset; i < start_offset + batch_size; ++i)
for (size_t i = start_offset; i < end; ++i)
{
if (flags[i] && places[i - start_offset])
static_cast<const Derived *>(this)->add(places[i - start_offset] + place_offset, columns, i, arena);
const auto place_idx = i - start_offset;

if (flags[i] && places[place_idx])
static_cast<const Derived *>(this)->add(places[place_idx] + place_offset, columns, i, arena);
}
}
else
{
for (size_t i = start_offset; i < start_offset + batch_size; ++i)
if (places[i - start_offset])
static_cast<const Derived *>(this)->add(places[i - start_offset] + place_offset, columns, i, arena);
for (size_t i = start_offset; i < end; ++i)
{
const auto place_idx = i - start_offset;

if (places[place_idx])
static_cast<const Derived *>(this)->add(places[place_idx] + place_offset, columns, i, arena);
}
}
}

void addBatchWithPrefetch(
size_t start_offset,
size_t batch_size,
AggregateDataPtr * places,
size_t place_offset,
const IColumn ** columns,
Arena * arena,
ssize_t if_argument_pos = -1) const override
{
const auto end = start_offset + batch_size;
static constexpr size_t prefetch_step = 16;
if (if_argument_pos >= 0)
{
const auto & flags = assert_cast<const ColumnUInt8 &>(*columns[if_argument_pos]).getData();
for (size_t i = start_offset; i < end; ++i)
{
const auto place_idx = i - start_offset;
const auto prefetch_idx = place_idx + prefetch_step;

if (flags[i] && places[place_idx])
{
if likely (prefetch_idx < end)
__builtin_prefetch(places[prefetch_idx]);

static_cast<const Derived *>(this)->add(places[place_idx] + place_offset, columns, i, arena);
}
}
}
else
{
for (size_t i = start_offset; i < end; ++i)
{
const auto place_idx = i - start_offset;
const auto prefetch_idx = place_idx + prefetch_step;

if (places[place_idx])
{
if likely (prefetch_idx < end)
__builtin_prefetch(places[prefetch_idx]);

static_cast<const Derived *>(this)->add(places[place_idx] + place_offset, columns, i, arena);
}
}
}
}

Expand Down
85 changes: 46 additions & 39 deletions dbms/src/Common/ColumnsHashing.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,20 @@ struct HashMethodOneNumber
using Self = HashMethodOneNumber<Value, Mapped, FieldType, use_cache>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;

static constexpr bool is_serialized_key = false;

const FieldType * vec;
const size_t total_rows;

/// If the keys of a fixed length then key_sizes contains their lengths, empty otherwise.
HashMethodOneNumber(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const TiDB::TiDBCollators &)
: total_rows(key_columns[0]->size())
{
vec = &static_cast<const ColumnVector<FieldType> *>(key_columns[0])->getData()[0];
}

explicit HashMethodOneNumber(const IColumn * column)
: total_rows(column->size())
{
vec = &static_cast<const ColumnVector<FieldType> *>(column)->getData()[0];
}
Expand Down Expand Up @@ -86,54 +91,46 @@ struct HashMethodOneNumber


/// For the case when there is one string key.
template <typename Value, typename Mapped, bool place_string_to_arena = true, bool use_cache = true>
template <typename Value, typename Mapped, bool use_cache = true>
struct HashMethodString
: public columns_hashing_impl::
HashMethodBase<HashMethodString<Value, Mapped, place_string_to_arena, use_cache>, Value, Mapped, use_cache>
: public columns_hashing_impl::HashMethodBase<HashMethodString<Value, Mapped, use_cache>, Value, Mapped, use_cache>
{
using Self = HashMethodString<Value, Mapped, place_string_to_arena, use_cache>;
using Self = HashMethodString<Value, Mapped, use_cache>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;

static constexpr bool is_serialized_key = false;

const IColumn::Offset * offsets;
const UInt8 * chars;
TiDB::TiDBCollatorPtr collator = nullptr;
const size_t total_rows;

HashMethodString(
const ColumnRawPtrs & key_columns,
const Sizes & /*key_sizes*/,
const TiDB::TiDBCollators & collators)
: total_rows(key_columns[0]->size())
{
const IColumn & column = *key_columns[0];
const auto & column_string = assert_cast<const ColumnString &>(column);
offsets = column_string.getOffsets().data();
chars = column_string.getChars().data();
if (!collators.empty())
{
if constexpr (!place_string_to_arena)
throw Exception("String with collator must be placed on arena.", ErrorCodes::LOGICAL_ERROR);
collator = collators[0];
}
}

ALWAYS_INLINE inline auto getKeyHolder(
ALWAYS_INLINE inline ArenaKeyHolder getKeyHolder(
ssize_t row,
[[maybe_unused]] Arena * pool,
std::vector<String> & sort_key_containers) const
{
auto last_offset = row == 0 ? 0 : offsets[row - 1];
// Remove last zero byte.
StringRef key(chars + last_offset, offsets[row] - last_offset - 1);
if (likely(collator))
key = collator->sortKey(key.data, key.size, sort_key_containers[0]);

if constexpr (place_string_to_arena)
{
if (likely(collator))
key = collator->sortKey(key.data, key.size, sort_key_containers[0]);
return ArenaKeyHolder{key, *pool};
}
else
{
return key;
}
return ArenaKeyHolder{key, *pool};
}

protected:
Expand All @@ -147,10 +144,14 @@ struct HashMethodStringBin
using Self = HashMethodStringBin<Value, Mapped, padding>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;

static constexpr bool is_serialized_key = false;

const IColumn::Offset * offsets;
const UInt8 * chars;
const size_t total_rows;

HashMethodStringBin(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const TiDB::TiDBCollators &)
: total_rows(key_columns[0]->size())
{
const IColumn & column = *key_columns[0];
const auto & column_string = assert_cast<const ColumnString &>(column);
Expand Down Expand Up @@ -344,12 +345,16 @@ struct HashMethodFastPathTwoKeysSerialized
using Self = HashMethodFastPathTwoKeysSerialized<Key1Desc, Key2Desc, Value, Mapped>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;

static constexpr bool is_serialized_key = true;

Key1Desc key_1_desc;
Key2Desc key_2_desc;
const size_t total_rows;

HashMethodFastPathTwoKeysSerialized(const ColumnRawPtrs & key_columns, const Sizes &, const TiDB::TiDBCollators &)
: key_1_desc(key_columns[0])
, key_2_desc(key_columns[1])
, total_rows(key_columns[0]->size())
{}

ALWAYS_INLINE inline auto getKeyHolder(ssize_t row, Arena * pool, std::vector<String> &) const
Expand All @@ -370,25 +375,26 @@ struct HashMethodFastPathTwoKeysSerialized


/// For the case when there is one fixed-length string key.
template <typename Value, typename Mapped, bool place_string_to_arena = true, bool use_cache = true>
template <typename Value, typename Mapped, bool use_cache = true>
struct HashMethodFixedString
: public columns_hashing_impl::HashMethodBase<
HashMethodFixedString<Value, Mapped, place_string_to_arena, use_cache>,
Value,
Mapped,
use_cache>
: public columns_hashing_impl::
HashMethodBase<HashMethodFixedString<Value, Mapped, use_cache>, Value, Mapped, use_cache>
{
using Self = HashMethodFixedString<Value, Mapped, place_string_to_arena, use_cache>;
using Self = HashMethodFixedString<Value, Mapped, use_cache>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;

static constexpr bool is_serialized_key = false;

size_t n;
const ColumnFixedString::Chars_t * chars;
TiDB::TiDBCollatorPtr collator = nullptr;
const size_t total_rows;

HashMethodFixedString(
const ColumnRawPtrs & key_columns,
const Sizes & /*key_sizes*/,
const TiDB::TiDBCollators & collators)
: total_rows(key_columns[0]->size())
{
const IColumn & column = *key_columns[0];
const auto & column_string = assert_cast<const ColumnFixedString &>(column);
Expand All @@ -398,26 +404,16 @@ struct HashMethodFixedString
collator = collators[0];
}

ALWAYS_INLINE inline auto getKeyHolder(
ALWAYS_INLINE inline ArenaKeyHolder getKeyHolder(
size_t row,
[[maybe_unused]] Arena * pool,
Arena * pool,
std::vector<String> & sort_key_containers) const
{
StringRef key(&(*chars)[row * n], n);

if (collator)
{
key = collator->sortKeyFastPath(key.data, key.size, sort_key_containers[0]);
}

if constexpr (place_string_to_arena)
{
return ArenaKeyHolder{key, *pool};
}
else
{
return key;
}
return ArenaKeyHolder{key, *pool};
}

protected:
Expand All @@ -438,10 +434,12 @@ struct HashMethodKeysFixed
using BaseHashed = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
using Base = columns_hashing_impl::BaseStateKeysFixed<Key, has_nullable_keys_>;

static constexpr bool is_serialized_key = false;
static constexpr bool has_nullable_keys = has_nullable_keys_;

Sizes key_sizes;
size_t keys_size;
const size_t total_rows;

/// SSSE3 shuffle method can be used. Shuffle masks will be calculated and stored here.
#if defined(__SSSE3__) && !defined(MEMORY_SANITIZER)
Expand All @@ -467,6 +465,7 @@ struct HashMethodKeysFixed
: Base(key_columns)
, key_sizes(std::move(key_sizes_))
, keys_size(key_columns.size())
, total_rows(key_columns[0]->size())
{
if (usePreparedKeys(key_sizes))
{
Expand Down Expand Up @@ -593,9 +592,12 @@ struct HashMethodSerialized
using Self = HashMethodSerialized<Value, Mapped>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;

static constexpr bool is_serialized_key = true;

ColumnRawPtrs key_columns;
size_t keys_size;
TiDB::TiDBCollators collators;
const size_t total_rows;

HashMethodSerialized(
const ColumnRawPtrs & key_columns_,
Expand All @@ -604,6 +606,7 @@ struct HashMethodSerialized
: key_columns(key_columns_)
, keys_size(key_columns_.size())
, collators(collators_)
, total_rows(key_columns_[0]->size())
{}

ALWAYS_INLINE inline SerializedKeyHolder getKeyHolder(
Expand All @@ -629,12 +632,16 @@ struct HashMethodHashed
using Self = HashMethodHashed<Value, Mapped, use_cache>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;

static constexpr bool is_serialized_key = false;

ColumnRawPtrs key_columns;
TiDB::TiDBCollators collators;
const size_t total_rows;

HashMethodHashed(ColumnRawPtrs key_columns_, const Sizes &, const TiDB::TiDBCollators & collators_)
: key_columns(std::move(key_columns_))
, collators(collators_)
, total_rows(key_columns[0]->size())
{}

ALWAYS_INLINE inline Key getKeyHolder(size_t row, Arena *, std::vector<String> & sort_key_containers) const
Expand Down
Loading