Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](exec) refactor analytic operator to improve performance #46181

Merged
merged 25 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 4 additions & 30 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -549,41 +549,15 @@ struct MultiCastSharedState : public BasicSharedState {
std::unique_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer;
};

struct BlockRowPos {
int64_t block_num {}; //the pos at which block
int64_t row_num {}; //the pos at which row
int64_t pos {}; //pos = all blocks size + row_num
std::string debug_string() const {
std::string res = "\t block_num: ";
res += std::to_string(block_num);
res += "\t row_num: ";
res += std::to_string(row_num);
res += "\t pos: ";
res += std::to_string(pos);
return res;
}
};

struct AnalyticSharedState : public BasicSharedState {
ENABLE_FACTORY_CREATOR(AnalyticSharedState)

public:
AnalyticSharedState() = default;

int64_t current_row_position = 0;
BlockRowPos partition_by_end;
int64_t input_total_rows = 0;
BlockRowPos all_block_end;
std::vector<vectorized::Block> input_blocks;
bool input_eos = false;
BlockRowPos found_partition_end;
std::vector<int64_t> origin_cols;
std::vector<int64_t> input_block_first_row_positions;
std::vector<std::vector<vectorized::MutableColumnPtr>> agg_input_columns;

// TODO: maybe global?
std::vector<int64_t> partition_by_column_idxs;
std::vector<int64_t> ordey_by_column_idxs;
std::queue<vectorized::Block> blocks_buffer;
std::mutex buffer_mutex;
bool sink_eos = false;
std::mutex sink_eos_lock;
};

struct JoinSharedState : public BasicSharedState {
Expand Down
866 changes: 699 additions & 167 deletions be/src/pipeline/exec/analytic_sink_operator.cpp

Large diffs are not rendered by default.

161 changes: 139 additions & 22 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,37 @@ namespace doris {
namespace pipeline {
class AnalyticSinkOperatorX;

struct BoundaryPose {
int64_t start = 0;
int64_t end = 0;
bool is_ended = false;
void remove_unused_rows(int64_t cnt) {
start -= cnt;
end -= cnt;
}
};

class PartitionStatistics {
public:
void update(int64_t size) {
_count++;
_cumulative_size += size;
_average_size = _cumulative_size / _count;
}

void reset() {
_count = 0;
_cumulative_size = 0;
_average_size = 0;
}

bool is_high_cardinality() const { return _count > 16 && _average_size < 8; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better avoid use magic number


int64_t _count = 0;
int64_t _cumulative_size = 0;
int64_t _average_size = 0;
};

class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedState> {
ENABLE_FACTORY_CREATOR(AnalyticSinkLocalState);

Expand All @@ -37,34 +68,99 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;

private:
friend class AnalyticSinkOperatorX;
Status _execute_impl();
// over(partition by k1 order by k2 range|rows unbounded preceding and unbounded following)
bool _get_next_for_partition(int64_t batch_rows, int64_t current_block_base_pos);
zhangstar333 marked this conversation as resolved.
Show resolved Hide resolved
// over(partition by k1 order by k2 range between unbounded preceding and current row)
bool _get_next_for_unbounded_range(int64_t batch_rows, int64_t current_block_base_pos);
// over(partition by k1 order by k2 range between M preceding and N following)
bool _get_next_for_range_between(int64_t batch_rows, int64_t current_block_base_pos);
// over(partition by k1 order by k2 rows between unbounded preceding and current row)
bool _get_next_for_unbounded_rows(int64_t batch_rows, int64_t current_block_base_pos);
// over(partition by k1 order by k2 rows between M preceding and N following)
bool _get_next_for_sliding_rows(int64_t batch_rows, int64_t current_block_base_pos);

void _init_result_columns();
void _execute_for_function(int64_t partition_start, int64_t partition_end, int64_t frame_start,
int64_t frame_end);
void _insert_result_info(int64_t real_deal_with_width);
void _output_current_block(vectorized::Block* block);
void _reset_state_for_next_partition();
void _refresh_buffer_and_dependency_state(vectorized::Block* block);

void _create_agg_status();
void _reset_agg_status();
void _destroy_agg_status();
void _remove_unused_rows();

void _get_partition_by_end();
void _find_next_partition_ends();
void _update_order_by_range();
void _find_next_order_by_ends();
int64_t find_first_not_equal(vectorized::IColumn* reference_column,
vectorized::IColumn* compared_column, int64_t target,
int64_t start, int64_t end);

bool _refresh_need_more_input() {
auto need_more_input = _whether_need_next_partition(_shared_state->found_partition_end);
if (need_more_input) {
_dependency->set_block_to_read();
_dependency->set_ready();
} else {
_dependency->block();
_dependency->set_ready_to_read();
}
return need_more_input;
}
BlockRowPos _get_partition_by_end();
BlockRowPos _compare_row_to_find_end(int64_t idx, BlockRowPos start, BlockRowPos end,
bool need_check_first = false);
bool _whether_need_next_partition(BlockRowPos& found_partition_end);
std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
vectorized::VExprContextSPtrs _partition_by_eq_expr_ctxs;
vectorized::VExprContextSPtrs _order_by_eq_expr_ctxs;
vectorized::VExprContextSPtrs _range_between_expr_ctxs;
std::vector<std::vector<vectorized::MutableColumnPtr>> _agg_input_columns;
std::vector<vectorized::MutableColumnPtr> _partition_by_columns;
std::vector<vectorized::MutableColumnPtr> _order_by_columns;
std::vector<vectorized::MutableColumnPtr> _range_result_columns;
size_t _partition_exprs_size = 0;
size_t _order_by_exprs_size = 0;
BoundaryPose _partition_by_pose;
BoundaryPose _order_by_pose;
PartitionStatistics _partition_column_statistics;
PartitionStatistics _order_by_column_statistics;
std::queue<int64_t> _next_partition_ends;
std::queue<int64_t> _next_order_by_ends;

size_t _agg_functions_size = 0;
bool _agg_functions_created = false;
vectorized::AggregateDataPtr _fn_place_ptr = nullptr;
std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
std::vector<vectorized::AggFnEvaluator*> _agg_functions;
std::vector<size_t> _offsets_of_aggregate_states;
std::vector<bool> _result_column_nullable_flags;

using vectorized_get_next = bool (AnalyticSinkLocalState::*)(int64_t, int64_t);
struct executor {
vectorized_get_next get_next_impl;
};
executor _executor;

bool _current_window_empty = false;
int64_t _current_row_position = 0;
int64_t _output_block_index = 0;
std::vector<vectorized::MutableColumnPtr> _result_window_columns;

int64_t _rows_start_offset = 0;
int64_t _rows_end_offset = 0;
int64_t _input_total_rows = 0;
bool _input_eos = false;
std::vector<vectorized::Block> _input_blocks;
std::vector<int64_t> _input_block_first_row_positions;
int64_t _removed_block_index = 0;
int64_t _have_removed_rows = 0;

RuntimeProfile::Counter* _evaluation_timer = nullptr;
RuntimeProfile::Counter* _compute_agg_data_timer = nullptr;
RuntimeProfile::Counter* _compute_partition_by_timer = nullptr;
RuntimeProfile::Counter* _compute_order_by_timer = nullptr;

std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
vectorized::VExprContextSPtrs _partition_by_eq_expr_ctxs;
vectorized::VExprContextSPtrs _order_by_eq_expr_ctxs;
RuntimeProfile::Counter* _compute_range_between_function_timer = nullptr;
RuntimeProfile::Counter* _partition_search_timer = nullptr;
RuntimeProfile::Counter* _order_search_timer = nullptr;
RuntimeProfile::Counter* _remove_rows_timer = nullptr;
RuntimeProfile::Counter* _remove_count = nullptr;
RuntimeProfile::Counter* _remove_rows = nullptr;
RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;
};

class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalState> {
Expand Down Expand Up @@ -94,23 +190,44 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
bool require_data_distribution() const override { return true; }

private:
friend class AnalyticSinkLocalState;
Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr,
vectorized::IColumn* dst_column, size_t length);
Status _add_input_block(doris::RuntimeState* state, vectorized::Block* input_block);

friend class AnalyticSinkLocalState;

ObjectPool* _pool = nullptr;
std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
vectorized::VExprContextSPtrs _partition_by_eq_expr_ctxs;
vectorized::VExprContextSPtrs _order_by_eq_expr_ctxs;
vectorized::VExprContextSPtrs _range_between_expr_ctxs;

size_t _agg_functions_size = 0;
std::vector<size_t> _num_agg_input;
std::vector<vectorized::AggFnEvaluator*> _agg_functions;

TupleId _intermediate_tuple_id;
TupleId _output_tuple_id;
TupleDescriptor* _intermediate_tuple_desc = nullptr;
TupleDescriptor* _output_tuple_desc = nullptr;
const TTupleId _buffered_tuple_id;

std::vector<size_t> _num_agg_input;
const bool _is_colocate;
const bool _require_bucket_distribution;
const std::vector<TExpr> _partition_exprs;

TAnalyticWindow _window;
bool _has_window;
bool _has_range_window;
bool _has_window_start;
bool _has_window_end;

/// The offset of the n-th functions.
std::vector<size_t> _offsets_of_aggregate_states;
/// The total size of the row from the functions.
size_t _total_size_of_aggregate_states = 0;
/// The max align size for functions
size_t _align_aggregate_states = 1;
std::vector<bool> _change_to_nullable_flags;
};

} // namespace pipeline
Expand Down
Loading
Loading