Skip to content

Commit

Permalink
Implement hash join v2(inner join) (#9191)
Browse files Browse the repository at this point in the history
ref #9060

Implement hash join v2(inner join)

Signed-off-by: gengliqi <[email protected]>
  • Loading branch information
gengliqi authored Jan 17, 2025
1 parent 999018c commit cb68ed2
Show file tree
Hide file tree
Showing 64 changed files with 4,904 additions and 179 deletions.
2 changes: 1 addition & 1 deletion contrib/client-c
4 changes: 4 additions & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ add_headers_and_sources(dbms src/IO/Compression)
add_headers_and_sources(dbms src/IO/FileProvider)
add_headers_and_sources(dbms src/Interpreters)
add_headers_and_sources(dbms src/Interpreters/SharedContexts)
add_headers_and_sources(dbms src/Interpreters/JoinV2)
add_headers_and_sources(dbms src/Columns)
add_headers_and_sources(dbms src/Storages)
add_headers_and_sources(dbms src/Storages/S3)
Expand Down Expand Up @@ -111,6 +112,9 @@ check_then_add_sources_compile_flag (
src/Columns/ColumnUtil.cpp
src/DataTypes/DataTypeString.cpp
src/Interpreters/Join.cpp
src/Interpreters/JoinV2/HashJoin.cpp
src/Interpreters/JoinV2/HashJoinBuild.cpp
src/Interpreters/JoinV2/HashJoinProbe.cpp
src/IO/Compression/EncodingUtil.cpp
src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp
src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Columns/ColumnAggregateFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ class ColumnAggregateFunction final : public COWPtrHelper<IColumn, ColumnAggrega
insertFrom(src_, n);
}

void insertDisjunctFrom(const IColumn & src_, const std::vector<size_t> & position_vec) override
void insertSelectiveFrom(const IColumn & src_, const Offsets & selective_offsets) override
{
for (auto position : position_vec)
for (auto position : selective_offsets)
insertFrom(src_, position);
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Columns/ColumnArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
insertFrom(src_, n);
}

void insertDisjunctFrom(const IColumn & src_, const std::vector<size_t> & position_vec) override
void insertSelectiveFrom(const IColumn & src_, const Offsets & selective_offsets) override
{
for (auto position : position_vec)
for (auto position : selective_offsets)
insertFrom(src_, position);
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Columns/ColumnConst.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ class ColumnConst final : public COWPtrHelper<IColumn, ColumnConst>

void insertManyFrom(const IColumn &, size_t, size_t length) override { s += length; }

void insertDisjunctFrom(const IColumn &, const std::vector<size_t> & position_vec) override
void insertSelectiveFrom(const IColumn &, const Offsets & selective_offsets) override
{
s += position_vec.size();
s += selective_offsets.size();
}

void insertMany(const Field &, size_t length) override { s += length; }
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Columns/ColumnDecimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,14 +629,14 @@ void ColumnDecimal<T>::insertManyFrom(const IColumn & src, size_t position, size
}

template <typename T>
void ColumnDecimal<T>::insertDisjunctFrom(const IColumn & src, const std::vector<size_t> & position_vec)
void ColumnDecimal<T>::insertSelectiveFrom(const IColumn & src, const IColumn::Offsets & selective_offsets)
{
const auto & src_data = static_cast<const ColumnDecimal &>(src).data;
size_t old_size = data.size();
size_t to_add_size = position_vec.size();
size_t to_add_size = selective_offsets.size();
data.resize(old_size + to_add_size);
for (size_t i = 0; i < to_add_size; ++i)
data[i + old_size] = src_data[position_vec[i]];
data[i + old_size] = src_data[selective_offsets[i]];
}

#pragma GCC diagnostic pop
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Columns/ColumnDecimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class ColumnDecimal final : public COWPtrHelper<ColumnVectorHelper, ColumnDecima
friend class COWPtrHelper<ColumnVectorHelper, Self>;

public:
using value_type = T;
using Container = DecimalPaddedPODArray<T>;

private:
Expand Down Expand Up @@ -127,7 +128,7 @@ class ColumnDecimal final : public COWPtrHelper<ColumnVectorHelper, ColumnDecima
void insert(const Field & x) override { data.push_back(DB::get<typename NearestFieldType<T>::Type>(x)); }
void insertRangeFrom(const IColumn & src, size_t start, size_t length) override;
void insertManyFrom(const IColumn & src_, size_t position, size_t length) override;
void insertDisjunctFrom(const IColumn & src_, const std::vector<size_t> & position_vec) override;
void insertSelectiveFrom(const IColumn & src_, const IColumn::Offsets & selective_offsets) override;
void popBack(size_t n) override { data.resize_assume_reserved(data.size() - n); }

StringRef getRawData() const override
Expand Down
11 changes: 6 additions & 5 deletions dbms/src/Columns/ColumnFixedString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,28 +89,29 @@ void ColumnFixedString::insertManyFrom(const IColumn & src_, size_t position, si
memcpySmallAllowReadWriteOverflow15(&chars[i], src_char_ptr, n);
}

void ColumnFixedString::insertDisjunctFrom(const IColumn & src_, const std::vector<size_t> & position_vec)
void ColumnFixedString::insertSelectiveFrom(const IColumn & src_, const Offsets & selective_offsets)
{
const auto & src = static_cast<const ColumnFixedString &>(src_);
if (n != src.getN())
throw Exception("Size of FixedString doesn't match", ErrorCodes::SIZE_OF_FIXED_STRING_DOESNT_MATCH);
size_t old_size = chars.size();
size_t new_size = old_size + position_vec.size() * n;
size_t new_size = old_size + selective_offsets.size() * n;
chars.resize(new_size);
const auto & src_chars = src.chars;
for (size_t i = old_size, j = 0; i < new_size; i += n, ++j)
memcpySmallAllowReadWriteOverflow15(&chars[i], &src_chars[position_vec[j] * n], n);
memcpySmallAllowReadWriteOverflow15(&chars[i], &src_chars[selective_offsets[j] * n], n);
}

void ColumnFixedString::insertData(const char * pos, size_t length)
{
if (length > n)
if unlikely (length > n)
throw Exception("Too large string for FixedString column", ErrorCodes::TOO_LARGE_STRING_SIZE);

size_t old_size = chars.size();
chars.resize(old_size + n);
inline_memcpy(chars.data() + old_size, pos, length);
memset(chars.data() + old_size + length, 0, n - length);
if unlikely (n > length)
memset(chars.data() + old_size + length, 0, n - length);
}

StringRef ColumnFixedString::serializeValueIntoArena(
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Columns/ColumnFixedString.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class ColumnFixedString final : public COWPtrHelper<IColumn, ColumnFixedString>

void insertManyFrom(const IColumn & src_, size_t position, size_t length) override;

void insertDisjunctFrom(const IColumn & src_, const std::vector<size_t> & position_vec) override;
void insertSelectiveFrom(const IColumn & src_, const Offsets & selective_offsets) override;

void insertData(const char * pos, size_t length) override;

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Columns/ColumnFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class ColumnFunction final : public COWPtrHelper<IColumn, ColumnFunction>
throw Exception("Cannot insert into " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}

void insertDisjunctFrom(const IColumn &, const std::vector<size_t> &) override
void insertSelectiveFrom(const IColumn &, const Offsets &) override
{
throw Exception("Cannot insert into " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Columns/ColumnNullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,17 +370,17 @@ void ColumnNullable::insertManyFrom(const IColumn & src, size_t n, size_t length
map.resize_fill(map.size() + length, src_concrete.getNullMapData()[n]);
}

void ColumnNullable::insertDisjunctFrom(const IColumn & src, const std::vector<size_t> & position_vec)
void ColumnNullable::insertSelectiveFrom(const IColumn & src, const Offsets & selective_offsets)
{
const auto & src_concrete = static_cast<const ColumnNullable &>(src);
getNestedColumn().insertDisjunctFrom(src_concrete.getNestedColumn(), position_vec);
getNestedColumn().insertSelectiveFrom(src_concrete.getNestedColumn(), selective_offsets);
auto & map = getNullMapData();
const auto & src_map = src_concrete.getNullMapData();
size_t old_size = map.size();
size_t to_add_size = position_vec.size();
size_t to_add_size = selective_offsets.size();
map.resize(old_size + to_add_size);
for (size_t i = 0; i < to_add_size; ++i)
map[i + old_size] = src_map[position_vec[i]];
map[i + old_size] = src_map[selective_offsets[i]];
}

void ColumnNullable::popBack(size_t n)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Columns/ColumnNullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class ColumnNullable final : public COWPtrHelper<IColumn, ColumnNullable>
void insert(const Field & x) override;
void insertFrom(const IColumn & src, size_t n) override;
void insertManyFrom(const IColumn & src, size_t n, size_t length) override;
void insertDisjunctFrom(const IColumn & src, const std::vector<size_t> & position_vec) override;
void insertSelectiveFrom(const IColumn & src, const Offsets & selective_offsets) override;

void insertDefault() override
{
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Columns/ColumnString.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ class ColumnString final : public COWPtrHelper<IColumn, ColumnString>
insertFromImpl(src, position);
}

void insertDisjunctFrom(const IColumn & src_, const std::vector<size_t> & position_vec) override
void insertSelectiveFrom(const IColumn & src_, const Offsets & selective_offsets) override
{
const auto & src = static_cast<const ColumnString &>(src_);
offsets.reserve(offsets.size() + position_vec.size());
for (auto position : position_vec)
offsets.reserve(offsets.size() + selective_offsets.size());
for (auto position : selective_offsets)
insertFromImpl(src, position);
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Columns/ColumnTuple.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ class ColumnTuple final : public COWPtrHelper<IColumn, ColumnTuple>
insertFrom(src_, n);
}

void insertDisjunctFrom(const IColumn & src_, const std::vector<size_t> & position_vec) override
void insertSelectiveFrom(const IColumn & src_, const Offsets & selective_offsets) override
{
for (auto position : position_vec)
for (auto position : selective_offsets)
insertFrom(src_, position);
}

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Columns/ColumnVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,14 @@ class ColumnVector final : public COWPtrHelper<ColumnVectorHelper, ColumnVector<
data.resize_fill(data.size() + length, value);
}

void insertDisjunctFrom(const IColumn & src, const std::vector<size_t> & position_vec) override
void insertSelectiveFrom(const IColumn & src, const IColumn::Offsets & selective_offsets) override
{
const auto & src_container = static_cast<const Self &>(src).getData();
size_t old_size = data.size();
size_t to_add_size = position_vec.size();
size_t to_add_size = selective_offsets.size();
data.resize(old_size + to_add_size);
for (size_t i = 0; i < to_add_size; ++i)
data[i + old_size] = src_container[position_vec[i]];
data[i + old_size] = src_container[selective_offsets[i]];
}

void insertMany(const Field & field, size_t length) override
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Columns/IColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ class IColumn : public COWPtr<IColumn>
virtual MutablePtr clone() const = 0;

public:
using Offset = UInt64;
using Offsets = PaddedPODArray<Offset>;
/// Name of a Column. It is used in info messages.
virtual std::string getName() const { return getFamilyName(); }

Expand Down Expand Up @@ -146,9 +144,11 @@ class IColumn : public COWPtr<IColumn>
/// Note: the source column and the destination column must be of the same type, can not ColumnXXX->insertManyFrom(ConstColumnXXX, ...)
virtual void insertManyFrom(const IColumn & src, size_t position, size_t length) = 0;

/// Appends disjunctive elements from other column with the same type.
/// Note: the source column and the destination column must be of the same type, can not ColumnXXX->insertDisjunctFrom(ConstColumnXXX, ...)
virtual void insertDisjunctFrom(const IColumn & src, const std::vector<size_t> & position_vec) = 0;
/// Appends selective elements from other column with the same type.
/// Note: the source column and the destination column must be of the same type, can not ColumnXXX->insertSelectiveFrom(ConstColumnXXX, ...)
using Offset = UInt64;
using Offsets = PaddedPODArray<Offset>;
virtual void insertSelectiveFrom(const IColumn & src, const Offsets & selective_offsets) = 0;

/// Appends one field multiple times. Can be optimized in inherited classes.
virtual void insertMany(const Field & field, size_t length)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Columns/IColumnDummy.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ class IColumnDummy : public IColumn

void insertManyFrom(const IColumn &, size_t, size_t length) override { s += length; }

void insertDisjunctFrom(const IColumn &, const std::vector<size_t> & position_vec) override
void insertSelectiveFrom(const IColumn &, const Offsets & selective_offsets) override
{
s += position_vec.size();
s += selective_offsets.size();
}

void insertRangeFrom(const IColumn & /*src*/, size_t /*start*/, size_t length) override { s += length; }
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Columns/filterColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ inline void filterImplAligned(


template <typename T, typename Container>
void filterImpl(const UInt8 *& filt_pos, const UInt8 *& filt_end, const T *& data_pos, Container & res_data)
void filterImpl(const UInt8 * filt_pos, const UInt8 * filt_end, const T * data_pos, Container & res_data)
{
const UInt8 * filt_end_aligned = filt_pos + (filt_end - filt_pos) / FILTER_SIMD_BYTES * FILTER_SIMD_BYTES;
filterImplAligned<T, Container>(filt_pos, filt_end_aligned, data_pos, res_data);
Expand All @@ -301,9 +301,9 @@ void filterImpl(const UInt8 *& filt_pos, const UInt8 *& filt_end, const T *& dat
/// Explicit instantiations - not to place the implementation of the function above in the header file.
#define INSTANTIATE(T, Container) \
template void filterImpl<T, Container>( \
const UInt8 *& filt_pos, \
const UInt8 *& filt_end, \
const T *& data_pos, \
const UInt8 * filt_pos, \
const UInt8 * filt_end, \
const T * data_pos, \
Container & res_data); // NOLINT

INSTANTIATE(UInt8, PaddedPODArray<UInt8>)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Columns/filterColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ void filterArraysImplOnlyData(
ssize_t result_size_hint);

template <typename T, typename Container>
void filterImpl(const UInt8 *& filt_pos, const UInt8 *& filt_end, const T *& data_pos, Container & res_data);
void filterImpl(const UInt8 * filt_pos, const UInt8 * filt_end, const T * data_pos, Container & res_data);

} // namespace DB
20 changes: 10 additions & 10 deletions dbms/src/Columns/tests/gtest_column_insertFrom.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,22 @@ class TestColumnInsertFrom : public ::testing::Test
}
}

/// Test insertDisjunctFrom
/// Test insertSelectiveFrom
{
for (size_t i = 0; i < 2; ++i)
{
cols[i] = column_ptr->cloneEmpty();
}
std::vector<size_t> position_vec;
position_vec.push_back(0);
position_vec.push_back(2);
position_vec.push_back(4);
for (size_t position : position_vec)
IColumn::Offsets selective_offsets;
selective_offsets.push_back(0);
selective_offsets.push_back(2);
selective_offsets.push_back(4);
for (size_t position : selective_offsets)
cols[0]->insertFrom(*column_ptr, position);
for (size_t position : position_vec)
for (size_t position : selective_offsets)
cols[0]->insertFrom(*column_ptr, position);
cols[1]->insertDisjunctFrom(*column_ptr, position_vec);
cols[1]->insertDisjunctFrom(*column_ptr, position_vec);
cols[1]->insertSelectiveFrom(*column_ptr, selective_offsets);
cols[1]->insertSelectiveFrom(*column_ptr, selective_offsets);
{
ColumnWithTypeAndName ref(std::move(cols[0]), col_with_type_and_name.type, "");
ColumnWithTypeAndName result(std::move(cols[1]), col_with_type_and_name.type, "");
Expand Down Expand Up @@ -271,4 +271,4 @@ try
CATCH

} // namespace tests
} // namespace DB
} // namespace DB
5 changes: 5 additions & 0 deletions dbms/src/Common/BitHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,8 @@ inline size_t roundUpToPowerOfTwoOrZero(size_t n)

return n;
}

inline bool isPowerOfTwo(size_t val)
{
return (val & (val - 1)) == 0;
}
Loading

0 comments on commit cb68ed2

Please sign in to comment.