Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](metrics) Complete metrics for some operators #42992

Merged
merged 4 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions be/src/pipeline/exec/group_commit_block_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) {
}

Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
SCOPED_TIMER(_init_load_queue_timer);
auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
if (_state->exec_env()->wal_mgr()->is_running()) {
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
Expand Down Expand Up @@ -238,6 +239,17 @@ Status GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
return Status::OK();
}

Status GroupCommitBlockSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_init_load_queue_timer = ADD_TIMER(_profile, "InitLoadQueueTime");
_valid_and_convert_block_timer = ADD_TIMER(_profile, "ValidAndConvertBlockTime");
_find_partition_timer = ADD_TIMER(_profile, "FindPartitionTime");
_append_blocks_timer = ADD_TIMER(_profile, "AppendBlocksTime");
return Status::OK();
}

Status GroupCommitBlockSinkOperatorX::init(const TDataSink& t_sink) {
RETURN_IF_ERROR(Base::init(t_sink));
DCHECK(t_sink.__isset.olap_table_sink);
Expand Down Expand Up @@ -318,10 +330,15 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc

std::shared_ptr<vectorized::Block> block;
bool has_filtered_rows = false;
RETURN_IF_ERROR(local_state._block_convertor->validate_and_convert_block(
state, input_block, block, local_state._output_vexpr_ctxs, rows, has_filtered_rows));
{
SCOPED_TIMER(local_state._valid_and_convert_block_timer);
RETURN_IF_ERROR(local_state._block_convertor->validate_and_convert_block(
state, input_block, block, local_state._output_vexpr_ctxs, rows,
has_filtered_rows));
}
local_state._has_filtered_rows = false;
if (!local_state._vpartition->is_auto_partition()) {
SCOPED_TIMER(local_state._find_partition_timer);
//reuse vars for find_partition
local_state._partitions.assign(rows, nullptr);
local_state._filter_bitmap.Reset(rows);
Expand Down Expand Up @@ -351,23 +368,26 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc
}
}
}

if (local_state._block_convertor->num_filtered_rows() > 0 || local_state._has_filtered_rows) {
auto cloneBlock = block->clone_without_columns();
auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
for (int i = 0; i < rows; ++i) {
if (local_state._block_convertor->filter_map()[i]) {
continue;
}
if (local_state._filter_bitmap.Get(i)) {
continue;
{
SCOPED_TIMER(local_state._append_blocks_timer);
if (local_state._block_convertor->num_filtered_rows() > 0 ||
local_state._has_filtered_rows) {
auto cloneBlock = block->clone_without_columns();
auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
for (int i = 0; i < rows; ++i) {
if (local_state._block_convertor->filter_map()[i]) {
continue;
}
if (local_state._filter_bitmap.Get(i)) {
continue;
}
res_block.add_row(block.get(), i);
}
res_block.add_row(block.get(), i);
block->swap(res_block.to_block());
}
block->swap(res_block.to_block());
// add block into block queue
RETURN_IF_ERROR(local_state._add_block(state, block));
}
// add block into block queue
RETURN_IF_ERROR(local_state._add_block(state, block));

return wind_up();
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/pipeline/exec/group_commit_block_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class GroupCommitBlockSinkLocalState final : public PipelineXSinkLocalState<Basi

~GroupCommitBlockSinkLocalState() override;

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;

Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get(); }
std::vector<Dependency*> dependencies() const override {
Expand Down Expand Up @@ -79,6 +79,11 @@ class GroupCommitBlockSinkLocalState final : public PipelineXSinkLocalState<Basi
std::shared_ptr<Dependency> _finish_dependency;
std::shared_ptr<Dependency> _create_plan_dependency = nullptr;
std::shared_ptr<Dependency> _put_block_dependency = nullptr;

RuntimeProfile::Counter* _init_load_queue_timer = nullptr;
RuntimeProfile::Counter* _valid_and_convert_block_timer = nullptr;
RuntimeProfile::Counter* _find_partition_timer = nullptr;
RuntimeProfile::Counter* _append_blocks_timer = nullptr;
};

class GroupCommitBlockSinkOperatorX final
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/group_commit_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ GroupCommitOperatorX::GroupCommitOperatorX(ObjectPool* pool, const TPlanNode& tn

Status GroupCommitOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
bool find_node = false;
while (!find_node && !*eos) {
RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, &find_node, eos,
Expand Down
17 changes: 7 additions & 10 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_shared_state->build_exprs_size = _build_expr_ctxs.size();

_should_build_hash_table = true;
profile()->add_info_string("BroadcastJoin", std::to_string(p._is_broadcast_join));
if (p._is_broadcast_join) {
profile()->add_info_string("BroadcastJoin", "true");
if (state->enable_share_hash_table_for_broadcast_join()) {
_should_build_hash_table = info.task_idx == 0;
if (_should_build_hash_table) {
profile()->add_info_string("ShareHashTableEnabled", "true");
p._shared_hashtable_controller->set_builder_and_consumers(
state->fragment_instance_id(), p.node_id());
}
} else {
profile()->add_info_string("ShareHashTableEnabled", "false");
}
}
profile()->add_info_string("BuildShareHashTable", std::to_string(_should_build_hash_table));
profile()->add_info_string("ShareHashTableEnabled",
std::to_string(state->enable_share_hash_table_for_broadcast_join()));
if (!_should_build_hash_table) {
_dependency->block();
_finish_dependency->block();
Expand All @@ -72,6 +72,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_finish_dependency->shared_from_this());
}

_runtime_filter_init_timer = ADD_TIMER(profile(), "RuntimeFilterInitTime");
_build_blocks_memory_usage =
ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageBuildBlocks", TUnit::BYTES, 1);
_hash_table_memory_usage =
Expand All @@ -81,13 +82,10 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo

// Build phase
auto* record_profile = _should_build_hash_table ? profile() : faker_runtime_profile();
_build_table_timer = ADD_TIMER(profile(), "BuildTableTime");
_build_side_merge_block_timer = ADD_TIMER(profile(), "BuildSideMergeBlockTime");
_build_table_timer = ADD_TIMER(profile(), "BuildHashTableTime");
_build_side_merge_block_timer = ADD_TIMER(profile(), "MergeBuildBlockTime");
_build_table_insert_timer = ADD_TIMER(record_profile, "BuildTableInsertTime");
_build_expr_call_timer = ADD_TIMER(record_profile, "BuildExprCallTime");
_build_side_compute_hash_timer = ADD_TIMER(record_profile, "BuildSideHashComputingTime");

_allocate_resource_timer = ADD_TIMER(profile(), "AllocateResourceTime");

// Hash Table Init
RETURN_IF_ERROR(_hash_table_init(state));
Expand Down Expand Up @@ -256,7 +254,6 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
if (UNLIKELY(rows == 0)) {
return Status::OK();
}
COUNTER_UPDATE(_build_rows_counter, rows);
block.replace_if_overflow();

vectorized::ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size());
Expand Down
4 changes: 1 addition & 3 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,12 @@ class HashJoinBuildSinkLocalState final
RuntimeProfile::Counter* _build_table_timer = nullptr;
RuntimeProfile::Counter* _build_expr_call_timer = nullptr;
RuntimeProfile::Counter* _build_table_insert_timer = nullptr;
RuntimeProfile::Counter* _build_side_compute_hash_timer = nullptr;
RuntimeProfile::Counter* _build_side_merge_block_timer = nullptr;

RuntimeProfile::Counter* _allocate_resource_timer = nullptr;

RuntimeProfile::Counter* _build_blocks_memory_usage = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::Counter* _build_arena_memory_usage = nullptr;
RuntimeProfile::Counter* _runtime_filter_init_timer = nullptr;
};

class HashJoinBuildSinkOperatorX final
Expand Down
7 changes: 2 additions & 5 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,11 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info)
_probe_arena_memory_usage =
profile()->AddHighWaterMarkCounter("MemoryUsageProbeKeyArena", TUnit::BYTES, "", 1);
// Probe phase
_probe_next_timer = ADD_TIMER(profile(), "ProbeFindNextTime");
_probe_expr_call_timer = ADD_TIMER(profile(), "ProbeExprCallTime");
_search_hashtable_timer = ADD_TIMER(profile(), "ProbeWhenSearchHashTableTime");
_build_side_output_timer = ADD_TIMER(profile(), "ProbeWhenBuildSideOutputTime");
_probe_side_output_timer = ADD_TIMER(profile(), "ProbeWhenProbeSideOutputTime");
_probe_process_hashtable_timer = ADD_TIMER(profile(), "ProbeWhenProcessHashTableTime");
_process_other_join_conjunct_timer = ADD_TIMER(profile(), "OtherJoinConjunctTime");
_non_equal_join_conjuncts_timer = ADD_TIMER(profile(), "NonEqualJoinConjunctEvaluationTime");
_init_probe_side_timer = ADD_TIMER(profile(), "InitProbeSideTime");
return Status::OK();
}
Expand Down Expand Up @@ -229,7 +227,6 @@ HashJoinProbeOperatorX::HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode
Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Block* output_block,
bool* eos) const {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state._probe_timer);
if (local_state._shared_state->short_circuit_for_probe) {
// If we use a short-circuit strategy, should return empty block directly.
*eos = true;
Expand Down Expand Up @@ -320,7 +317,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
if constexpr (!std::is_same_v<HashTableProbeType, std::monostate>) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
st = process_hashtable_ctx.process_data_in_hashtable(
st = process_hashtable_ctx.finish_probing(
arg, mutable_join_block, &temp_block, eos, _is_mark_join);
} else {
st = Status::InternalError("uninited hash table");
Expand Down
4 changes: 1 addition & 3 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,12 @@ class HashJoinProbeLocalState final
std::make_unique<HashTableCtxVariants>();

RuntimeProfile::Counter* _probe_expr_call_timer = nullptr;
RuntimeProfile::Counter* _probe_next_timer = nullptr;
RuntimeProfile::Counter* _probe_side_output_timer = nullptr;
RuntimeProfile::Counter* _probe_process_hashtable_timer = nullptr;
RuntimeProfile::HighWaterMarkCounter* _probe_arena_memory_usage = nullptr;
RuntimeProfile::Counter* _search_hashtable_timer = nullptr;
RuntimeProfile::Counter* _init_probe_side_timer = nullptr;
RuntimeProfile::Counter* _build_side_output_timer = nullptr;
RuntimeProfile::Counter* _process_other_join_conjunct_timer = nullptr;
RuntimeProfile::Counter* _non_equal_join_conjuncts_timer = nullptr;
};

class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLocalState> {
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/jdbc_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Status JdbcTableSinkOperatorX::open(RuntimeState* state) {
Status JdbcTableSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
RETURN_IF_ERROR(local_state.sink(state, block, eos));
return Status::OK();
}
Expand Down
7 changes: 3 additions & 4 deletions be/src/pipeline/exec/join/process_hash_table_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ struct ProcessHashTableProbe {
// Process full outer join/ right join / right semi/anti join to output the join result
// in hash table
template <typename HashTableType>
Status process_data_in_hashtable(HashTableType& hash_table_ctx,
vectorized::MutableBlock& mutable_block,
vectorized::Block* output_block, bool* eos, bool is_mark_join);
Status finish_probing(HashTableType& hash_table_ctx, vectorized::MutableBlock& mutable_block,
vectorized::Block* output_block, bool* eos, bool is_mark_join);

/// For null aware join with other conjuncts, if the probe key of one row on left side is null,
/// we should make this row match with all rows in build side.
Expand Down Expand Up @@ -136,7 +135,7 @@ struct ProcessHashTableProbe {
RuntimeProfile::Counter* _init_probe_side_timer = nullptr;
RuntimeProfile::Counter* _build_side_output_timer = nullptr;
RuntimeProfile::Counter* _probe_side_output_timer = nullptr;
RuntimeProfile::Counter* _probe_process_hashtable_timer = nullptr;
RuntimeProfile::Counter* _finish_probe_phase_timer = nullptr;

size_t _right_col_idx;
size_t _right_col_len;
Expand Down
16 changes: 8 additions & 8 deletions be/src/pipeline/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState
_init_probe_side_timer(parent->_init_probe_side_timer),
_build_side_output_timer(parent->_build_side_output_timer),
_probe_side_output_timer(parent->_probe_side_output_timer),
_probe_process_hashtable_timer(parent->_probe_process_hashtable_timer),
_finish_probe_phase_timer(parent->_finish_probe_phase_timer),
_right_col_idx((_is_right_semi_anti && !_have_other_join_conjunct)
? 0
: _parent->left_table_data_types().size()),
Expand Down Expand Up @@ -502,7 +502,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl
return Status::OK();
}

SCOPED_TIMER(_parent->_process_other_join_conjunct_timer);
SCOPED_TIMER(_parent->_non_equal_join_conjuncts_timer);
size_t orig_columns = output_block->columns();
vectorized::IColumn::Filter other_conjunct_filter(row_count, 1);
{
Expand Down Expand Up @@ -617,10 +617,11 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl

template <int JoinOpType>
template <typename HashTableType>
Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(
HashTableType& hash_table_ctx, vectorized::MutableBlock& mutable_block,
vectorized::Block* output_block, bool* eos, bool is_mark_join) {
SCOPED_TIMER(_probe_process_hashtable_timer);
Status ProcessHashTableProbe<JoinOpType>::finish_probing(HashTableType& hash_table_ctx,
vectorized::MutableBlock& mutable_block,
vectorized::Block* output_block, bool* eos,
bool is_mark_join) {
SCOPED_TIMER(_finish_probe_phase_timer);
auto& mcol = mutable_block.mutable_columns();
if (is_mark_join) {
std::unique_ptr<vectorized::ColumnFilterHelper> mark_column =
Expand Down Expand Up @@ -709,8 +710,7 @@ struct ExtractType<T(U)> {
ExtractType<void(T)>::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \
vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \
uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \
template Status \
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable<ExtractType<void(T)>::Type>( \
template Status ProcessHashTableProbe<JoinOpType>::finish_probing<ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx, vectorized::MutableBlock & mutable_block, \
vectorized::Block * output_block, bool* eos, bool is_mark_join);

Expand Down
8 changes: 2 additions & 6 deletions be/src/pipeline/exec/join_build_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,11 @@ Status JoinBuildSinkLocalState<SharedStateArg, Derived>::init(RuntimeState* stat

PipelineXSinkLocalState<SharedStateArg>::profile()->add_info_string("JoinType",
to_string(p._join_op));
_build_rows_counter = ADD_COUNTER(PipelineXSinkLocalState<SharedStateArg>::profile(),
"BuildRows", TUnit::UNIT);

_publish_runtime_filter_timer = ADD_TIMER(PipelineXSinkLocalState<SharedStateArg>::profile(),
"PublishRuntimeFilterTime");
_runtime_filter_compute_timer = ADD_TIMER(PipelineXSinkLocalState<SharedStateArg>::profile(),
"RuntimeFilterComputeTime");
_runtime_filter_init_timer =
ADD_TIMER(PipelineXSinkLocalState<SharedStateArg>::profile(), "RuntimeFilterInitTime");
_runtime_filter_compute_timer =
ADD_TIMER(PipelineXSinkLocalState<SharedStateArg>::profile(), "BuildRuntimeFilterTime");
return Status::OK();
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/join_build_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ class JoinBuildSinkLocalState : public PipelineXSinkLocalState<SharedStateType>
template <typename LocalStateType>
friend class JoinBuildSinkOperatorX;

RuntimeProfile::Counter* _build_rows_counter = nullptr;
RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr;
RuntimeProfile::Counter* _runtime_filter_init_timer = nullptr;
std::vector<std::shared_ptr<IRuntimeFilter>> _runtime_filters;
};

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ Status JoinProbeLocalState<SharedStateArg, Derived>::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));

_probe_timer = ADD_TIMER(Base::profile(), "ProbeTime");
_join_filter_timer = ADD_TIMER(Base::profile(), "JoinFilterTimer");
_build_output_block_timer = ADD_TIMER(Base::profile(), "BuildOutputBlock");
_probe_rows_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeRows", TUnit::UNIT, 1);

_finish_probe_phase_timer = ADD_TIMER(Base::profile(), "FinishProbePhaseTime");
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ class JoinProbeLocalState : public PipelineXLocalState<SharedStateArg> {

size_t _mark_column_id = -1;

RuntimeProfile::Counter* _probe_timer = nullptr;
RuntimeProfile::Counter* _probe_rows_counter = nullptr;
RuntimeProfile::Counter* _join_filter_timer = nullptr;
RuntimeProfile::Counter* _build_output_block_timer = nullptr;
RuntimeProfile::Counter* _finish_probe_phase_timer = nullptr;

std::unique_ptr<vectorized::Block> _child_block = nullptr;
bool _child_eos = false;
Expand Down
Loading
Loading