Skip to content

Commit

Permalink
[UT](pipeline) Add test cases for hash join (#46976)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?
Add test cases for hash join
  • Loading branch information
Gabriel39 authored Jan 15, 2025
1 parent c76f309 commit cc183eb
Show file tree
Hide file tree
Showing 41 changed files with 257 additions and 126 deletions.
7 changes: 4 additions & 3 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,9 +695,10 @@ Status AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs&
return Status::OK();
}

AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs, bool require_bucket_distribution)
: DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode.node_id),
AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id,
const TPlanNode& tnode, const DescriptorTbl& descs,
bool require_bucket_distribution)
: DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode.node_id, dest_id),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_output_tuple_id(tnode.agg_node.output_tuple_id),
_needs_finalize(tnode.agg_node.need_finalize),
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {

class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
public:
AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
AggSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, const TPlanNode& tnode,
const DescriptorTbl& descs, bool require_bucket_distribution);
~AggSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,10 @@ BlockRowPos AnalyticSinkLocalState::_get_partition_by_end() {
return cal_end;
}

AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id,
AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id,
const TPlanNode& tnode, const DescriptorTbl& descs,
bool require_bucket_distribution)
: DataSinkOperatorX(operator_id, tnode.node_id),
: DataSinkOperatorX(operator_id, tnode.node_id, dest_id),
_buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id
? tnode.analytic_node.buffered_tuple_id
: 0),
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat

class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalState> {
public:
AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, const TPlanNode& tnode,
const DescriptorTbl& descs, bool require_bucket_distribution);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/cache_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ Status CacheSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}

CacheSinkOperatorX::CacheSinkOperatorX(int sink_id, int child_id)
: Base(sink_id, child_id, child_id) {
CacheSinkOperatorX::CacheSinkOperatorX(int sink_id, int child_id, int dest_id)
: Base(sink_id, child_id, dest_id) {
_name = "CACHE_SINK_OPERATOR";
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/cache_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class CacheSinkOperatorX final : public DataSinkOperatorX<CacheSinkLocalState> {
using Base = DataSinkOperatorX<CacheSinkLocalState>;

friend class CacheSinkLocalState;
CacheSinkOperatorX(int sink_id, int child_id);
CacheSinkOperatorX(int sink_id, int child_id, int dest_id);
~CacheSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TDataSink",
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
RuntimeState* state, const RowDescriptor& row_desc, int operator_id,
const TDataStreamSink& sink, const std::vector<TPlanFragmentDestination>& destinations,
const std::vector<TUniqueId>& fragment_instance_ids)
: DataSinkOperatorX(operator_id, sink.dest_node_id),
: DataSinkOperatorX(operator_id, sink.dest_node_id, 0),
_texprs(sink.output_partition.partition_exprs),
_row_desc(row_desc),
_part_type(sink.output_partition.type),
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/group_commit_block_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class GroupCommitBlockSinkOperatorX final
public:
GroupCommitBlockSinkOperatorX(int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr)
: Base(operator_id, 0), _row_desc(row_desc), _t_output_expr(t_output_expr) {}
: Base(operator_id, 0, 0), _row_desc(row_desc), _t_output_expr(t_output_expr) {}

~GroupCommitBlockSinkOperatorX() override = default;

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,9 @@ Status HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
}

HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode,
int dest_id, const TPlanNode& tnode,
const DescriptorTbl& descs)
: JoinBuildSinkOperatorX(pool, operator_id, tnode, descs),
: JoinBuildSinkOperatorX(pool, operator_id, dest_id, tnode, descs),
_join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type
: TJoinDistributionType::NONE),
_is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join &&
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ class HashJoinBuildSinkLocalState final
class HashJoinBuildSinkOperatorX final
: public JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState> {
public:
HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id,
const TPlanNode& tnode, const DescriptorTbl& descs);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TDataSink",
JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::_name);
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hive_table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class HiveTableSinkOperatorX final : public DataSinkOperatorX<HiveTableSinkLocal
using Base = DataSinkOperatorX<HiveTableSinkLocalState>;
HiveTableSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr)
: Base(operator_id, 0),
: Base(operator_id, 0, 0),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_pool(pool) {};
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/iceberg_table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class IcebergTableSinkOperatorX final : public DataSinkOperatorX<IcebergTableSin
using Base = DataSinkOperatorX<IcebergTableSinkLocalState>;
IcebergTableSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr)
: Base(operator_id, 0),
: Base(operator_id, 0, 0),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_pool(pool) {};
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/jdbc_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ namespace doris::pipeline {
#include "common/compile_check_begin.h"
JdbcTableSinkOperatorX::JdbcTableSinkOperatorX(const RowDescriptor& row_desc, int operator_id,
const std::vector<TExpr>& t_output_expr)
: DataSinkOperatorX(operator_id, 0), _row_desc(row_desc), _t_output_expr(t_output_expr) {}
: DataSinkOperatorX(operator_id, 0, 0),
_row_desc(row_desc),
_t_output_expr(t_output_expr) {}

Status JdbcTableSinkOperatorX::init(const TDataSink& thrift_sink) {
RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::init(thrift_sink));
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/join_build_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ Status JoinBuildSinkLocalState<SharedStateArg, Derived>::init(RuntimeState* stat

template <typename LocalStateType>
JoinBuildSinkOperatorX<LocalStateType>::JoinBuildSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode,
int dest_id, const TPlanNode& tnode,
const DescriptorTbl& descs)
: DataSinkOperatorX<LocalStateType>(operator_id, tnode.node_id),
: DataSinkOperatorX<LocalStateType>(operator_id, tnode.node_id, dest_id),
_join_op(tnode.__isset.hash_join_node ? tnode.hash_join_node.join_op
: (tnode.__isset.nested_loop_join_node
? tnode.nested_loop_join_node.join_op
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/join_build_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class JoinBuildSinkLocalState : public PipelineXSinkLocalState<SharedStateType>
template <typename LocalStateType>
class JoinBuildSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
public:
JoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
JoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
~JoinBuildSinkOperatorX() override = default;

Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ Status MemoryScratchSinkLocalState::close(RuntimeState* state, Status exec_statu
MemoryScratchSinkOperatorX::MemoryScratchSinkOperatorX(const RowDescriptor& row_desc,
int operator_id,
const std::vector<TExpr>& t_output_expr)
: DataSinkOperatorX(operator_id, 0), _row_desc(row_desc), _t_output_expr(t_output_expr) {}
: DataSinkOperatorX(operator_id, 0, 0),
_row_desc(row_desc),
_t_output_expr(t_output_expr) {}

Status MemoryScratchSinkOperatorX::init(const TDataSink& thrift_sink) {
RETURN_IF_ERROR(DataSinkOperatorX<MemoryScratchSinkLocalState>::init(thrift_sink));
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/nested_loop_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* state) {
}

NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool,
int operator_id,
int operator_id, int dest_id,
const TPlanNode& tnode,
const DescriptorTbl& descs)
: JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>(pool, operator_id, tnode,
descs),
: JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>(pool, operator_id, dest_id,
tnode, descs),
_is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only &&
tnode.nested_loop_join_node.is_output_left_side_only),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/nested_loop_join_build_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ class NestedLoopJoinBuildSinkLocalState final
class NestedLoopJoinBuildSinkOperatorX final
: public JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState> {
public:
NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id,
const TPlanNode& tnode, const DescriptorTbl& descs);
Status init(const TDataSink& tsink) override {
return Status::InternalError(
"{} should not init with TDataSink",
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/olap_table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class OlapTableSinkOperatorX final : public DataSinkOperatorX<OlapTableSinkLocal
using Base = DataSinkOperatorX<OlapTableSinkLocalState>;
OlapTableSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr)
: Base(operator_id, 0),
: Base(operator_id, 0, 0),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_pool(pool) {};
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/olap_table_sink_v2_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class OlapTableSinkV2OperatorX final : public DataSinkOperatorX<OlapTableSinkV2L
using Base = DataSinkOperatorX<OlapTableSinkV2LocalState>;
OlapTableSinkV2OperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr)
: Base(operator_id, 0),
: Base(operator_id, 0, 0),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_pool(pool) {};
Expand Down
11 changes: 0 additions & 11 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,6 @@ class PipelineXSinkLocalState : public PipelineXSinkLocalStateBase {

class DataSinkOperatorXBase : public OperatorBase {
public:
DataSinkOperatorXBase(const int operator_id, const int node_id)
: OperatorBase(),
_operator_id(operator_id),
_node_id(node_id),
_dests_id({operator_id}) {}

DataSinkOperatorXBase(const int operator_id, const int node_id, const int dest_id)
: OperatorBase(), _operator_id(operator_id), _node_id(node_id), _dests_id({dest_id}) {}

Expand Down Expand Up @@ -507,8 +501,6 @@ class DataSinkOperatorXBase : public OperatorBase {

[[nodiscard]] const std::vector<int>& dests_id() const { return _dests_id; }

void set_dests_id(const std::vector<int>& dest_id) { _dests_id = dest_id; }

[[nodiscard]] int nereids_id() const { return _nereids_id; }

[[nodiscard]] int node_id() const { return _node_id; }
Expand Down Expand Up @@ -539,9 +531,6 @@ class DataSinkOperatorXBase : public OperatorBase {
template <typename LocalStateType>
class DataSinkOperatorX : public DataSinkOperatorXBase {
public:
DataSinkOperatorX(int operator_id, const int node_id)
: DataSinkOperatorXBase(operator_id, node_id) {}

DataSinkOperatorX(const int id, const int node_id, const int source_id)
: DataSinkOperatorXBase(id, node_id, source_id) {}

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
}

PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode,
int dest_id, const TPlanNode& tnode,
const DescriptorTbl& descs)
: DataSinkOperatorX(operator_id, tnode.node_id),
: DataSinkOperatorX(operator_id, tnode.node_id, dest_id),
_pool(pool),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_limit(tnode.limit),
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/partition_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState<PartitionSort

class PartitionSortSinkOperatorX final : public DataSinkOperatorX<PartitionSortSinkLocalState> {
public:
PartitionSortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
PartitionSortSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id,
const TPlanNode& tnode, const DescriptorTbl& descs);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
DataSinkOperatorX<PartitionSortSinkLocalState>::_name);
Expand Down
10 changes: 4 additions & 6 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile)
}

PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode,
int dest_id, const TPlanNode& tnode,
const DescriptorTbl& descs,
bool require_bucket_distribution)
: DataSinkOperatorX<PartitionedAggSinkLocalState>(operator_id, tnode.node_id) {
_agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, tnode, descs,
require_bucket_distribution);
: DataSinkOperatorX<PartitionedAggSinkLocalState>(operator_id, tnode.node_id, dest_id) {
_agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, dest_id, tnode,
descs, require_bucket_distribution);
}

Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
Expand All @@ -140,8 +140,6 @@ Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* s
if (state->query_options().__isset.external_agg_partition_bits) {
_spill_partition_count_bits = state->query_options().external_agg_partition_bits;
}

_agg_sink_operator->set_dests_id(DataSinkOperatorX<PartitionedAggSinkLocalState>::dests_id());
RETURN_IF_ERROR(
_agg_sink_operator->set_child(DataSinkOperatorX<PartitionedAggSinkLocalState>::_child));
return _agg_sink_operator->init(tnode, state);
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,9 @@ class PartitionedAggSinkLocalState

class PartitionedAggSinkOperatorX : public DataSinkOperatorX<PartitionedAggSinkLocalState> {
public:
PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs, bool require_bucket_distribution);
PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id,
const TPlanNode& tnode, const DescriptorTbl& descs,
bool require_bucket_distribution);
~PartitionedAggSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,12 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
}

PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX(ObjectPool* pool,
int operator_id,
int operator_id, int dest_id,
const TPlanNode& tnode,
const DescriptorTbl& descs,
uint32_t partition_count)
: JoinBuildSinkOperatorX<PartitionedHashJoinSinkLocalState>(pool, operator_id, tnode,
descs),
: JoinBuildSinkOperatorX<PartitionedHashJoinSinkLocalState>(pool, operator_id, dest_id,
tnode, descs),
_join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type
: TJoinDistributionType::NONE),
_distribution_partition_exprs(tnode.__isset.distribute_expr_lists
Expand Down
Loading

0 comments on commit cc183eb

Please sign in to comment.