Skip to content

Commit

Permalink
Fix comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaka11chen committed Jan 17, 2025
1 parent 199a491 commit 19197e3
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 39 deletions.
50 changes: 19 additions & 31 deletions be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
auto& p = _parent->cast<FileScanOperatorX>();
// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
uint32_t shard_num =
std::min(config::doris_scanner_thread_pool_thread_num /
(_batch_split_mode ? 1 : state()->query_parallel_instance_num()),
std::min(config::doris_scanner_thread_pool_thread_num / p.query_parallel_instance_num(),
_max_scanners);
shard_num = std::max(shard_num, 1U);
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
Expand All @@ -55,17 +54,6 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
return Status::OK();
}

Status FileScanLocalState::start_scanners(
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners) {
auto& p = _parent->cast<FileScanOperatorX>();
// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
_scan_dependency, p.is_serial_operator(), true,
_batch_split_mode ? 1 : state()->query_parallel_instance_num());
return Status::OK();
}

std::string FileScanLocalState::name_suffix() const {
return fmt::format(" (id={}. nereids_id={}. table name = {})",
std::to_string(_parent->node_id()), std::to_string(_parent->nereids_id()),
Expand All @@ -74,35 +62,34 @@ std::string FileScanLocalState::name_suffix() const {

void FileScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
auto& p = _parent->cast<FileScanOperatorX>();

auto calc_max_scanners = [&](int parallel_instance_num) -> int {
int max_scanners = config::doris_scanner_thread_pool_thread_num / parallel_instance_num;
max_scanners =
std::max(std::max(max_scanners, state->parallel_scan_max_scanners_count()), 1);
if (should_run_serial()) {
max_scanners = 1;
}
return max_scanners;
};

if (scan_ranges.size() == 1) {
auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range;
if (scan_range.__isset.split_source) {
p._batch_split_mode = true;
auto split_source = scan_range.split_source;
RuntimeProfile::Counter* get_split_timer = ADD_TIMER(_runtime_profile, "GetSplitTime");
// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
_max_scanners = config::doris_scanner_thread_pool_thread_num;
_max_scanners =
std::max(std::max(_max_scanners, state->parallel_scan_max_scanners_count()), 1);
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
_max_scanners = 1;
}

_max_scanners = calc_max_scanners(p.query_parallel_instance_num());
_split_source = std::make_shared<vectorized::RemoteSplitSourceConnector>(
state, get_split_timer, split_source.split_source_id, split_source.num_splits,
_max_scanners);
_batch_split_mode = true;
}
}

if (!_batch_split_mode) {
_max_scanners =
config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num();
_max_scanners =
std::max(std::max(_max_scanners, state->parallel_scan_max_scanners_count()), 1);
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
_max_scanners = 1;
}
if (!p._batch_split_mode) {
_max_scanners = calc_max_scanners(p.query_parallel_instance_num());
if (_split_source == nullptr) {
_split_source = std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges,
_max_scanners);
Expand All @@ -111,6 +98,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
// so we don't do it in the batch split mode.
_max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges());
}

if (scan_ranges.size() > 0 &&
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
// for compatibility.
Expand Down
9 changes: 6 additions & 3 deletions be/src/pipeline/exec/file_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {

Status _process_conjuncts(RuntimeState* state) override;
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override;
Status start_scanners(
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners) override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
int parent_id() { return _parent->node_id(); }
Expand All @@ -67,7 +65,6 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {
// KVCache<std::string> _kv_cache;
std::unique_ptr<vectorized::ShardedKVCache> _kv_cache;
TupleId _output_tuple_id = -1;
bool _batch_split_mode = false;
};

class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
Expand All @@ -84,10 +81,16 @@ class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {

bool is_file_scan_operator() const override { return true; }

// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
int query_parallel_instance_num() const override {
return _batch_split_mode ? 1 : _query_parallel_instance_num;
}

private:
friend class FileScanLocalState;

const std::string _table_name;
bool _batch_split_mode = false;
};

#include "common/compile_check_end.h"
Expand Down
8 changes: 5 additions & 3 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -985,19 +985,19 @@ Status ScanLocalState<Derived>::_prepare_scanners() {
scanner->set_query_statistics(_query_statistics.get());
}
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
RETURN_IF_ERROR(start_scanners(_scanners));
RETURN_IF_ERROR(_start_scanners(_scanners));
}
return Status::OK();
}

template <typename Derived>
Status ScanLocalState<Derived>::start_scanners(
Status ScanLocalState<Derived>::_start_scanners(
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
_scan_dependency, p.is_serial_operator(), p.is_file_scan_operator(),
state()->query_parallel_instance_num());
p.query_parallel_instance_num());
return Status::OK();
}

Expand Down Expand Up @@ -1206,6 +1206,8 @@ Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState*
}
}

_query_parallel_instance_num = state->query_parallel_instance_num();

return Status::OK();
}

Expand Down
9 changes: 7 additions & 2 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,7 @@ class ScanLocalState : public ScanLocalStateBase {
Status _prepare_scanners();

// Submit the scanner to the thread pool and start execution
virtual Status start_scanners(
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners);
Status _start_scanners(const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners);

// For some conjunct there is chance to elimate cast operator
// Eg. Variant's sub column could eliminate cast in storage layer if
Expand Down Expand Up @@ -373,6 +372,10 @@ class ScanOperatorX : public OperatorX<LocalStateType> {

[[nodiscard]] virtual bool is_file_scan_operator() const { return false; }

[[nodiscard]] virtual int query_parallel_instance_num() const {
return _query_parallel_instance_num;
}

const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
return _runtime_filter_descs;
}
Expand Down Expand Up @@ -435,6 +438,8 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
int64_t _push_down_count = -1;
const int _parallel_tasks = 0;

int _query_parallel_instance_num = 0;

std::vector<int> topn_filter_source_node_ids;
};

Expand Down

0 comments on commit 19197e3

Please sign in to comment.