Skip to content

Commit

Permalink
[improvement](profile) add profile counter 'BytesSent' for VDataBuffe…
Browse files Browse the repository at this point in the history
…rSender (apache#19826)
  • Loading branch information
luozenglin authored May 19, 2023
1 parent b3ce459 commit 3e010bb
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 1 deletion.
2 changes: 1 addition & 1 deletion be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,

// set up profile counters
profile()->add_child(_plan->runtime_profile(), true, nullptr);
profile()->add_info_string("DoriBeVersion", version::doris_build_short_hash());
profile()->add_info_string("DorisBeVersion", version::doris_build_short_hash());
_rows_produced_counter = ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT);
_blocks_produced_counter = ADD_COUNTER(profile(), "BlocksProduced", TUnit::UNIT);
_fragment_cpu_timer = ADD_TIMER(profile(), "FragmentCpuTime");
Expand Down
7 changes: 7 additions & 0 deletions be/src/vec/sink/vmysql_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ void VMysqlResultWriter<is_binary_format>::_init_profile() {
_convert_tuple_timer = ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime");
_result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime");
_sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT);
_bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES);
}

template <bool is_binary_format>
Expand Down Expand Up @@ -894,10 +895,12 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
}
}

uint64_t bytes_sent = 0;
// copy MysqlRowBuffer to Thrift
result->result_batch.rows.resize(num_rows);
for (int i = 0; i < num_rows; ++i) {
result->result_batch.rows[i].append(rows_buffer[i].buf(), rows_buffer[i].length());
bytes_sent += rows_buffer[i].length();
}

if (status) {
Expand All @@ -912,6 +915,9 @@ Status VMysqlResultWriter<is_binary_format>::append_block(Block& input_block) {
}
if (status.ok()) {
_written_rows += num_rows;
if (!_is_dry_run) {
_bytes_sent += bytes_sent;
}
} else {
LOG(WARNING) << "append result batch to sink failed.";
}
Expand All @@ -928,6 +934,7 @@ bool VMysqlResultWriter<is_binary_format>::can_sink() {
template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::close() {
COUNTER_SET(_sent_rows_counter, _written_rows);
COUNTER_UPDATE(_bytes_sent_counter, _bytes_sent);
return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/sink/vmysql_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,14 @@ class VMysqlResultWriter final : public VResultWriter {
RuntimeProfile::Counter* _result_send_timer = nullptr;
// number of sent rows
RuntimeProfile::Counter* _sent_rows_counter = nullptr;
// size of sent data
RuntimeProfile::Counter* _bytes_sent_counter = nullptr;
// for synchronized results
ResultList _results;
// If true, no block will be sent
bool _is_dry_run = false;

uint64_t _bytes_sent = 0;
};
} // namespace vectorized
} // namespace doris

0 comments on commit 3e010bb

Please sign in to comment.