Skip to content

Commit

Permalink
[refactor](metrics) Complete metrics for some operators (apache#42992)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Nov 1, 2024
1 parent 9959932 commit 37547aa
Show file tree
Hide file tree
Showing 26 changed files with 228 additions and 806 deletions.
52 changes: 36 additions & 16 deletions be/src/pipeline/exec/group_commit_block_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) {
}

Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
SCOPED_TIMER(_init_load_queue_timer);
auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
if (_state->exec_env()->wal_mgr()->is_running()) {
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
Expand Down Expand Up @@ -240,6 +241,17 @@ Status GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
return Status::OK();
}

Status GroupCommitBlockSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
_init_load_queue_timer = ADD_TIMER(_profile, "InitLoadQueueTime");
_valid_and_convert_block_timer = ADD_TIMER(_profile, "ValidAndConvertBlockTime");
_find_partition_timer = ADD_TIMER(_profile, "FindPartitionTime");
_append_blocks_timer = ADD_TIMER(_profile, "AppendBlocksTime");
return Status::OK();
}

Status GroupCommitBlockSinkOperatorX::init(const TDataSink& t_sink) {
RETURN_IF_ERROR(Base::init(t_sink));
DCHECK(t_sink.__isset.olap_table_sink);
Expand Down Expand Up @@ -321,10 +333,15 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc

std::shared_ptr<vectorized::Block> block;
bool has_filtered_rows = false;
RETURN_IF_ERROR(local_state._block_convertor->validate_and_convert_block(
state, input_block, block, local_state._output_vexpr_ctxs, rows, has_filtered_rows));
{
SCOPED_TIMER(local_state._valid_and_convert_block_timer);
RETURN_IF_ERROR(local_state._block_convertor->validate_and_convert_block(
state, input_block, block, local_state._output_vexpr_ctxs, rows,
has_filtered_rows));
}
local_state._has_filtered_rows = false;
if (!local_state._vpartition->is_auto_partition()) {
SCOPED_TIMER(local_state._find_partition_timer);
//reuse vars for find_partition
local_state._partitions.assign(rows, nullptr);
local_state._filter_bitmap.Reset(rows);
Expand Down Expand Up @@ -354,23 +371,26 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc
}
}
}

if (local_state._block_convertor->num_filtered_rows() > 0 || local_state._has_filtered_rows) {
auto cloneBlock = block->clone_without_columns();
auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
for (int i = 0; i < rows; ++i) {
if (local_state._block_convertor->filter_map()[i]) {
continue;
}
if (local_state._filter_bitmap.Get(i)) {
continue;
{
SCOPED_TIMER(local_state._append_blocks_timer);
if (local_state._block_convertor->num_filtered_rows() > 0 ||
local_state._has_filtered_rows) {
auto cloneBlock = block->clone_without_columns();
auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
for (int i = 0; i < rows; ++i) {
if (local_state._block_convertor->filter_map()[i]) {
continue;
}
if (local_state._filter_bitmap.Get(i)) {
continue;
}
res_block.add_row(block.get(), i);
}
res_block.add_row(block.get(), i);
block->swap(res_block.to_block());
}
block->swap(res_block.to_block());
// add block into block queue
RETURN_IF_ERROR(local_state._add_block(state, block));
}
// add block into block queue
RETURN_IF_ERROR(local_state._add_block(state, block));

return wind_up();
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/pipeline/exec/group_commit_block_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class GroupCommitBlockSinkLocalState final : public PipelineXSinkLocalState<Basi

~GroupCommitBlockSinkLocalState() override;

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;

Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get(); }
std::vector<Dependency*> dependencies() const override {
Expand Down Expand Up @@ -79,6 +79,11 @@ class GroupCommitBlockSinkLocalState final : public PipelineXSinkLocalState<Basi
std::shared_ptr<Dependency> _finish_dependency;
std::shared_ptr<Dependency> _create_plan_dependency = nullptr;
std::shared_ptr<Dependency> _put_block_dependency = nullptr;

RuntimeProfile::Counter* _init_load_queue_timer = nullptr;
RuntimeProfile::Counter* _valid_and_convert_block_timer = nullptr;
RuntimeProfile::Counter* _find_partition_timer = nullptr;
RuntimeProfile::Counter* _append_blocks_timer = nullptr;
};

class GroupCommitBlockSinkOperatorX final
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/group_commit_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ GroupCommitOperatorX::GroupCommitOperatorX(ObjectPool* pool, const TPlanNode& tn

Status GroupCommitOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
bool find_node = false;
while (!find_node && !*eos) {
RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, &find_node, eos,
Expand Down
17 changes: 7 additions & 10 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_shared_state->build_exprs_size = _build_expr_ctxs.size();

_should_build_hash_table = true;
profile()->add_info_string("BroadcastJoin", std::to_string(p._is_broadcast_join));
if (p._is_broadcast_join) {
profile()->add_info_string("BroadcastJoin", "true");
if (state->enable_share_hash_table_for_broadcast_join()) {
_should_build_hash_table = info.task_idx == 0;
if (_should_build_hash_table) {
profile()->add_info_string("ShareHashTableEnabled", "true");
p._shared_hashtable_controller->set_builder_and_consumers(
state->fragment_instance_id(), p.node_id());
}
} else {
profile()->add_info_string("ShareHashTableEnabled", "false");
}
}
profile()->add_info_string("BuildShareHashTable", std::to_string(_should_build_hash_table));
profile()->add_info_string("ShareHashTableEnabled",
std::to_string(state->enable_share_hash_table_for_broadcast_join()));
if (!_should_build_hash_table) {
_dependency->block();
_finish_dependency->block();
Expand All @@ -72,6 +72,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_finish_dependency->shared_from_this());
}

_runtime_filter_init_timer = ADD_TIMER(profile(), "RuntimeFilterInitTime");
_build_blocks_memory_usage =
ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "BuildBlocks", TUnit::BYTES, "MemoryUsage", 1);
_hash_table_memory_usage =
Expand All @@ -81,13 +82,10 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo

// Build phase
auto* record_profile = _should_build_hash_table ? profile() : faker_runtime_profile();
_build_table_timer = ADD_TIMER(profile(), "BuildTableTime");
_build_side_merge_block_timer = ADD_TIMER(profile(), "BuildSideMergeBlockTime");
_build_table_timer = ADD_TIMER(profile(), "BuildHashTableTime");
_build_side_merge_block_timer = ADD_TIMER(profile(), "MergeBuildBlockTime");
_build_table_insert_timer = ADD_TIMER(record_profile, "BuildTableInsertTime");
_build_expr_call_timer = ADD_TIMER(record_profile, "BuildExprCallTime");
_build_side_compute_hash_timer = ADD_TIMER(record_profile, "BuildSideHashComputingTime");

_allocate_resource_timer = ADD_TIMER(profile(), "AllocateResourceTime");

// Hash Table Init
_hash_table_init(state);
Expand Down Expand Up @@ -253,7 +251,6 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
if (UNLIKELY(rows == 0)) {
return Status::OK();
}
COUNTER_UPDATE(_build_rows_counter, rows);
block.replace_if_overflow();

vectorized::ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size());
Expand Down
6 changes: 2 additions & 4 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,12 @@ class HashJoinBuildSinkLocalState final
RuntimeProfile::Counter* _build_table_timer = nullptr;
RuntimeProfile::Counter* _build_expr_call_timer = nullptr;
RuntimeProfile::Counter* _build_table_insert_timer = nullptr;
RuntimeProfile::Counter* _build_side_compute_hash_timer = nullptr;
RuntimeProfile::Counter* _build_side_merge_block_timer = nullptr;

RuntimeProfile::Counter* _allocate_resource_timer = nullptr;

RuntimeProfile::Counter* _build_blocks_memory_usage = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::HighWaterMarkCounter* _build_arena_memory_usage = nullptr;
RuntimeProfile::Counter* _build_arena_memory_usage = nullptr;
RuntimeProfile::Counter* _runtime_filter_init_timer = nullptr;
};

class HashJoinBuildSinkOperatorX final
Expand Down
7 changes: 2 additions & 5 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,11 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info)
_probe_arena_memory_usage =
profile()->AddHighWaterMarkCounter("ProbeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
// Probe phase
_probe_next_timer = ADD_TIMER(profile(), "ProbeFindNextTime");
_probe_expr_call_timer = ADD_TIMER(profile(), "ProbeExprCallTime");
_search_hashtable_timer = ADD_TIMER(profile(), "ProbeWhenSearchHashTableTime");
_build_side_output_timer = ADD_TIMER(profile(), "ProbeWhenBuildSideOutputTime");
_probe_side_output_timer = ADD_TIMER(profile(), "ProbeWhenProbeSideOutputTime");
_probe_process_hashtable_timer = ADD_TIMER(profile(), "ProbeWhenProcessHashTableTime");
_process_other_join_conjunct_timer = ADD_TIMER(profile(), "OtherJoinConjunctTime");
_non_equal_join_conjuncts_timer = ADD_TIMER(profile(), "NonEqualJoinConjunctEvaluationTime");
_init_probe_side_timer = ADD_TIMER(profile(), "InitProbeSideTime");
return Status::OK();
}
Expand Down Expand Up @@ -230,7 +228,6 @@ HashJoinProbeOperatorX::HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode
Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Block* output_block,
bool* eos) const {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state._probe_timer);
if (local_state._shared_state->short_circuit_for_probe) {
// If we use a short-circuit strategy, should return empty block directly.
*eos = true;
Expand Down Expand Up @@ -325,7 +322,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
if constexpr (!std::is_same_v<HashTableProbeType, std::monostate>) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
st = process_hashtable_ctx.process_data_in_hashtable(
st = process_hashtable_ctx.finish_probing(
arg, mutable_join_block, &temp_block, eos, _is_mark_join);
} else {
st = Status::InternalError("uninited hash table");
Expand Down
4 changes: 1 addition & 3 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,12 @@ class HashJoinProbeLocalState final
std::make_unique<HashTableCtxVariants>();

RuntimeProfile::Counter* _probe_expr_call_timer = nullptr;
RuntimeProfile::Counter* _probe_next_timer = nullptr;
RuntimeProfile::Counter* _probe_side_output_timer = nullptr;
RuntimeProfile::Counter* _probe_process_hashtable_timer = nullptr;
RuntimeProfile::HighWaterMarkCounter* _probe_arena_memory_usage = nullptr;
RuntimeProfile::Counter* _search_hashtable_timer = nullptr;
RuntimeProfile::Counter* _init_probe_side_timer = nullptr;
RuntimeProfile::Counter* _build_side_output_timer = nullptr;
RuntimeProfile::Counter* _process_other_join_conjunct_timer = nullptr;
RuntimeProfile::Counter* _non_equal_join_conjuncts_timer = nullptr;
};

class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLocalState> {
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/jdbc_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Status JdbcTableSinkOperatorX::open(RuntimeState* state) {
Status JdbcTableSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
RETURN_IF_ERROR(local_state.sink(state, block, eos));
return Status::OK();
}
Expand Down
7 changes: 3 additions & 4 deletions be/src/pipeline/exec/join/process_hash_table_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,8 @@ struct ProcessHashTableProbe {
// Process full outer join/ right join / right semi/anti join to output the join result
// in hash table
template <typename HashTableType>
Status process_data_in_hashtable(HashTableType& hash_table_ctx,
vectorized::MutableBlock& mutable_block,
vectorized::Block* output_block, bool* eos, bool is_mark_join);
Status finish_probing(HashTableType& hash_table_ctx, vectorized::MutableBlock& mutable_block,
vectorized::Block* output_block, bool* eos, bool is_mark_join);

/// For null aware join with other conjuncts, if the probe key of one row on left side is null,
/// we should make this row match with all rows in build side.
Expand Down Expand Up @@ -137,7 +136,7 @@ struct ProcessHashTableProbe {
RuntimeProfile::Counter* _init_probe_side_timer = nullptr;
RuntimeProfile::Counter* _build_side_output_timer = nullptr;
RuntimeProfile::Counter* _probe_side_output_timer = nullptr;
RuntimeProfile::Counter* _probe_process_hashtable_timer = nullptr;
RuntimeProfile::Counter* _finish_probe_phase_timer = nullptr;

int _right_col_idx;
int _right_col_len;
Expand Down
18 changes: 9 additions & 9 deletions be/src/pipeline/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState
_init_probe_side_timer(parent->_init_probe_side_timer),
_build_side_output_timer(parent->_build_side_output_timer),
_probe_side_output_timer(parent->_probe_side_output_timer),
_probe_process_hashtable_timer(parent->_probe_process_hashtable_timer),
_finish_probe_phase_timer(parent->_finish_probe_phase_timer),
_right_col_idx((_is_right_semi_anti && !_have_other_join_conjunct)
? 0
: _parent->left_table_data_types().size()),
Expand Down Expand Up @@ -501,8 +501,8 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl
return Status::OK();
}

SCOPED_TIMER(_parent->_process_other_join_conjunct_timer);
int orig_columns = output_block->columns();
SCOPED_TIMER(_parent->_non_equal_join_conjuncts_timer);
size_t orig_columns = output_block->columns();
vectorized::IColumn::Filter other_conjunct_filter(row_count, 1);
{
bool can_be_filter_all = false;
Expand Down Expand Up @@ -616,10 +616,11 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl

template <int JoinOpType>
template <typename HashTableType>
Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(
HashTableType& hash_table_ctx, vectorized::MutableBlock& mutable_block,
vectorized::Block* output_block, bool* eos, bool is_mark_join) {
SCOPED_TIMER(_probe_process_hashtable_timer);
Status ProcessHashTableProbe<JoinOpType>::finish_probing(HashTableType& hash_table_ctx,
vectorized::MutableBlock& mutable_block,
vectorized::Block* output_block, bool* eos,
bool is_mark_join) {
SCOPED_TIMER(_finish_probe_phase_timer);
auto& mcol = mutable_block.mutable_columns();
if (is_mark_join) {
std::unique_ptr<vectorized::ColumnFilterHelper> mark_column =
Expand Down Expand Up @@ -717,8 +718,7 @@ struct ExtractType<T(U)> {
vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \
size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \
\
template Status \
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable<ExtractType<void(T)>::Type>( \
template Status ProcessHashTableProbe<JoinOpType>::finish_probing<ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx, vectorized::MutableBlock & mutable_block, \
vectorized::Block * output_block, bool* eos, bool is_mark_join);

Expand Down
8 changes: 2 additions & 6 deletions be/src/pipeline/exec/join_build_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,11 @@ Status JoinBuildSinkLocalState<SharedStateArg, Derived>::init(RuntimeState* stat

PipelineXSinkLocalState<SharedStateArg>::profile()->add_info_string("JoinType",
to_string(p._join_op));
_build_rows_counter = ADD_COUNTER(PipelineXSinkLocalState<SharedStateArg>::profile(),
"BuildRows", TUnit::UNIT);

_publish_runtime_filter_timer = ADD_TIMER(PipelineXSinkLocalState<SharedStateArg>::profile(),
"PublishRuntimeFilterTime");
_runtime_filter_compute_timer = ADD_TIMER(PipelineXSinkLocalState<SharedStateArg>::profile(),
"RuntimeFilterComputeTime");
_runtime_filter_init_timer =
ADD_TIMER(PipelineXSinkLocalState<SharedStateArg>::profile(), "RuntimeFilterInitTime");
_runtime_filter_compute_timer =
ADD_TIMER(PipelineXSinkLocalState<SharedStateArg>::profile(), "BuildRuntimeFilterTime");
return Status::OK();
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/join_build_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ class JoinBuildSinkLocalState : public PipelineXSinkLocalState<SharedStateType>
template <typename LocalStateType>
friend class JoinBuildSinkOperatorX;

RuntimeProfile::Counter* _build_rows_counter = nullptr;
RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr;
RuntimeProfile::Counter* _runtime_filter_init_timer = nullptr;
std::vector<std::shared_ptr<IRuntimeFilter>> _runtime_filters;
};

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ Status JoinProbeLocalState<SharedStateArg, Derived>::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));

_probe_timer = ADD_TIMER(Base::profile(), "ProbeTime");
_join_filter_timer = ADD_TIMER(Base::profile(), "JoinFilterTimer");
_build_output_block_timer = ADD_TIMER(Base::profile(), "BuildOutputBlock");
_probe_rows_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeRows", TUnit::UNIT, 1);

_finish_probe_phase_timer = ADD_TIMER(Base::profile(), "FinishProbePhaseTime");
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ class JoinProbeLocalState : public PipelineXLocalState<SharedStateArg> {

size_t _mark_column_id = -1;

RuntimeProfile::Counter* _probe_timer = nullptr;
RuntimeProfile::Counter* _probe_rows_counter = nullptr;
RuntimeProfile::Counter* _join_filter_timer = nullptr;
RuntimeProfile::Counter* _build_output_block_timer = nullptr;
RuntimeProfile::Counter* _finish_probe_phase_timer = nullptr;

std::unique_ptr<vectorized::Block> _child_block = nullptr;
bool _child_eos = false;
Expand Down
Loading

0 comments on commit 37547aa

Please sign in to comment.