Skip to content

Commit

Permalink
Merge branch 'StarRocks:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
renzhimin7 authored Nov 8, 2024
2 parents 5a74557 + a57fa9d commit 8d2df2e
Show file tree
Hide file tree
Showing 137 changed files with 8,263 additions and 401 deletions.
2 changes: 1 addition & 1 deletion be/src/column/chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void Chunk::append_vector_column(ColumnPtr column, const FieldPtr& field, SlotId
}

void Chunk::append_column(ColumnPtr column, SlotId slot_id) {
DCHECK(!_slot_id_to_index.contains(slot_id));
DCHECK(!_slot_id_to_index.contains(slot_id)) << "slot_id:" + std::to_string(slot_id) << std::endl;
_slot_id_to_index[slot_id] = _columns.size();
_columns.emplace_back(std::move(column));
check_or_die();
Expand Down
8 changes: 4 additions & 4 deletions be/src/connector/lake_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,8 @@ Status LakeDataSource::init_tablet_reader(RuntimeState* runtime_state) {

_non_pushdown_predicates_counter = ADD_COUNTER_SKIP_MERGE(_runtime_profile, "NonPushdownPredicates",
TUnit::UNIT, TCounterMergeType::SKIP_ALL);
COUNTER_SET(_non_pushdown_predicates_counter,
static_cast<int64_t>(_not_push_down_conjuncts.size() + _non_pushdown_pred_tree.size()));
COUNTER_UPDATE(_non_pushdown_predicates_counter,
static_cast<int64_t>(_not_push_down_conjuncts.size() + _non_pushdown_pred_tree.size()));
if (runtime_state->fragment_ctx()->pred_tree_params().enable_show_in_profile) {
_runtime_profile->add_info_string(
"NonPushdownPredicateTree",
Expand Down Expand Up @@ -405,7 +405,7 @@ Status LakeDataSource::init_column_access_paths(Schema* schema) {
_params.column_access_paths = &_column_access_paths;

// update counter
COUNTER_SET(_pushdown_access_paths_counter, leaf_size);
COUNTER_UPDATE(_pushdown_access_paths_counter, leaf_size);
return Status::OK();
}

Expand Down Expand Up @@ -656,7 +656,7 @@ void LakeDataSource::update_counter() {
COUNTER_UPDATE(_segments_read_count, _reader->stats().segments_read_count);
COUNTER_UPDATE(_total_columns_data_page_count, _reader->stats().total_columns_data_page_count);

COUNTER_SET(_pushdown_predicates_counter, (int64_t)_params.pred_tree.size());
COUNTER_UPDATE(_pushdown_predicates_counter, (int64_t)_params.pred_tree.size());

if (_runtime_state->fragment_ctx()->pred_tree_params().enable_show_in_profile) {
_runtime_profile->add_info_string(
Expand Down
30 changes: 26 additions & 4 deletions be/src/exec/analytor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,17 @@ Status Analytor::prepare(RuntimeState* state, ObjectPool* pool, RuntimeProfile*
if (_tnode.analytic_node.__isset.sql_aggregate_functions) {
_runtime_profile->add_info_string("AggregateFunctions", _tnode.analytic_node.sql_aggregate_functions);
}

_is_merge_funcs = _tnode.analytic_node.analytic_functions[0].nodes[0].agg_expr.is_merge_agg;
if (_is_merge_funcs) {
for (size_t i = 1; i < _tnode.analytic_node.analytic_functions.size(); i++) {
DCHECK(_tnode.analytic_node.analytic_functions[i].nodes[0].agg_expr.is_merge_agg);
}
}
if (_is_merge_funcs) {
_runtime_profile->add_info_string("isMerge", "true");
}

_mem_pool = std::make_unique<MemPool>();

const TAnalyticNode& analytic_node = _tnode.analytic_node;
Expand Down Expand Up @@ -180,6 +191,9 @@ Status Analytor::prepare(RuntimeState* state, ObjectPool* pool, RuntimeProfile*
return_type = TYPE_DOUBLE;
}
is_input_nullable = !fn.arg_types.empty() && (desc.nodes[0].has_nullable_child || has_outer_join_child);
if (_is_merge_funcs && fn.name.function_name == "count") {
is_input_nullable = false;
}
auto* func = get_window_function(fn.name.function_name, TYPE_BIGINT, return_type, is_input_nullable,
fn.binary_type, state->func_version());
_agg_functions[i] = func;
Expand Down Expand Up @@ -339,7 +353,8 @@ Status Analytor::open(RuntimeState* state) {
}
AggDataPtr agg_states = _mem_pool->allocate_aligned(_agg_states_total_size, _max_agg_state_align_size);
SCOPED_THREAD_LOCAL_AGG_STATE_ALLOCATOR_SETTER(_allocator.get());
_managed_fn_states.emplace_back(std::make_unique<ManagedFunctionStates>(&_agg_fn_ctxs, agg_states, this));
_managed_fn_states.emplace_back(
std::make_unique<ManagedFunctionStates<Analytor>>(&_agg_fn_ctxs, agg_states, this));
return Status::OK();
};

Expand Down Expand Up @@ -949,9 +964,16 @@ void Analytor::_update_window_batch(int64_t partition_start, int64_t partition_e
// instead of _partition.end to refer to the current right boundary.
frame_end = std::min<int64_t>(frame_end, _partition.end);
}
_agg_functions[i]->update_batch_single_state_with_frame(
_agg_fn_ctxs[i], _managed_fn_states[0]->mutable_data() + _agg_states_offsets[i], data_columns,
partition_start, partition_end, frame_start, frame_end);
if (_is_merge_funcs) {
for (size_t j = frame_start; j < frame_end; j++) {
_agg_functions[i]->merge(_agg_fn_ctxs[i], data_columns[0],
_managed_fn_states[0]->mutable_data() + _agg_states_offsets[i], j);
}
} else {
_agg_functions[i]->update_batch_single_state_with_frame(
_agg_fn_ctxs[i], _managed_fn_states[0]->mutable_data() + _agg_states_offsets[i], data_columns,
partition_start, partition_end, frame_start, frame_end);
}
}
}

Expand Down
30 changes: 18 additions & 12 deletions be/src/exec/analytor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@

namespace starrocks {

class ManagedFunctionStates;
using ManagedFunctionStatesPtr = std::unique_ptr<ManagedFunctionStates>;

struct FunctionTypes {
TypeDescriptor result_type;
bool has_nullable_child;
Expand All @@ -42,10 +39,16 @@ class Analytor;
using AnalytorPtr = std::shared_ptr<Analytor>;
using Analytors = std::vector<AnalytorPtr>;

template <typename T>
class ManagedFunctionStates;

template <typename T>
using ManagedFunctionStatesPtr = std::unique_ptr<ManagedFunctionStates<T>>;

// Component used to do analytic processing
// it contains common data struct and algorithm of analysis
class Analytor final : public pipeline::ContextWithDependency {
friend class ManagedFunctionStates;
friend class ManagedFunctionStates<Analytor>;

// [start, end)
struct FrameRange {
Expand Down Expand Up @@ -268,7 +271,7 @@ class Analytor final : public pipeline::ContextWithDependency {
std::vector<bool> _is_lead_lag_functions;
std::vector<FunctionContext*> _agg_fn_ctxs;
std::vector<const AggregateFunction*> _agg_functions;
std::vector<ManagedFunctionStatesPtr> _managed_fn_states;
std::vector<ManagedFunctionStatesPtr<Analytor>> _managed_fn_states;
std::vector<std::vector<ExprContext*>> _agg_expr_ctxs;
std::vector<std::vector<ColumnPtr>> _agg_intput_columns;
std::vector<FunctionTypes> _agg_fn_types;
Expand Down Expand Up @@ -337,21 +340,24 @@ class Analytor final : public pipeline::ContextWithDependency {
SegmentStatistics _peer_group_statistics;
std::queue<int64_t> _candidate_peer_group_ends;
std::unique_ptr<Allocator> _allocator = std::make_unique<MemHookAllocator>();

bool _is_merge_funcs;
};

// Helper class that properly invokes destructor when state goes out of scope.
template <typename T>
class ManagedFunctionStates {
public:
ManagedFunctionStates(std::vector<FunctionContext*>* ctxs, AggDataPtr __restrict agg_states, Analytor* agg_node)
: _ctxs(ctxs), _agg_states(agg_states), _agg_node(agg_node) {
for (int i = 0; i < _agg_node->_agg_functions.size(); i++) {
_agg_node->_agg_functions[i]->create((*_ctxs)[i], _agg_states + _agg_node->_agg_states_offsets[i]);
ManagedFunctionStates(std::vector<FunctionContext*>* ctxs, AggDataPtr __restrict agg_states, T* context)
: _ctxs(ctxs), _agg_states(agg_states), _context(context) {
for (int i = 0; i < _context->_agg_functions.size(); i++) {
_context->_agg_functions[i]->create((*_ctxs)[i], _agg_states + _context->_agg_states_offsets[i]);
}
}

~ManagedFunctionStates() {
for (int i = 0; i < _agg_node->_agg_functions.size(); i++) {
_agg_node->_agg_functions[i]->destroy((*_ctxs)[i], _agg_states + _agg_node->_agg_states_offsets[i]);
for (int i = 0; i < _context->_agg_functions.size(); i++) {
_context->_agg_functions[i]->destroy((*_ctxs)[i], _agg_states + _context->_agg_states_offsets[i]);
}
}

Expand All @@ -361,7 +367,7 @@ class ManagedFunctionStates {
private:
std::vector<FunctionContext*>* _ctxs;
AggDataPtr _agg_states;
Analytor* _agg_node;
T* _context;
};

class AnalytorFactory;
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/chunks_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class ChunksSorter {
// Return accurate output rows of this operator
virtual size_t get_output_rows() const = 0;

size_t get_next_output_row() { return _next_output_row; }

virtual int64_t mem_usage() const = 0;

virtual bool is_full() { return false; }
Expand Down
11 changes: 7 additions & 4 deletions be/src/exec/partition/chunks_partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ namespace starrocks {

ChunksPartitioner::ChunksPartitioner(const bool has_nullable_partition_column,
const std::vector<ExprContext*>& partition_exprs,
std::vector<PartitionColumnType> partition_types)
std::vector<PartitionColumnType> partition_types, MemPool* mem_pool)
: _has_nullable_partition_column(has_nullable_partition_column),
_partition_exprs(partition_exprs),
_partition_types(std::move(partition_types)) {
_partition_types(std::move(partition_types)),
_mem_pool(mem_pool) {
_partition_columns.resize(partition_exprs.size());
}

Status ChunksPartitioner::prepare(RuntimeState* state, RuntimeProfile* runtime_profile) {
Status ChunksPartitioner::prepare(RuntimeState* state, RuntimeProfile* runtime_profile, bool enable_pre_agg) {
_state = state;
_mem_pool = std::make_unique<MemPool>();
_obj_pool = std::make_unique<ObjectPool>();
_init_hash_map_variant();

Expand All @@ -45,6 +45,9 @@ Status ChunksPartitioner::prepare(RuntimeState* state, RuntimeProfile* runtime_p
_limited_buffer = std::make_unique<LimitedPipelineChunkBuffer<ChunksPartitionStatistics>>(
&_statistics, 1, config::local_exchange_buffer_mem_limit_per_driver,
state->chunk_size() * config::streaming_agg_chunk_buffer_size);
if (enable_pre_agg) {
_hash_map_variant.set_enable_pre_agg();
}
return Status::OK();
}

Expand Down
9 changes: 4 additions & 5 deletions be/src/exec/partition/chunks_partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ struct ChunksPartitionStatistics {
class ChunksPartitioner {
public:
ChunksPartitioner(const bool has_nullable_partition_column, const std::vector<ExprContext*>& partition_exprs,
std::vector<PartitionColumnType> partition_types);
std::vector<PartitionColumnType> partition_types, MemPool* mem_pool);

Status prepare(RuntimeState* state, RuntimeProfile* runtime_profile);
Status prepare(RuntimeState* state, RuntimeProfile* runtime_profile, bool enable_pre_agg = false);

// Chunk is divided into multiple parts by partition columns,
// and each partition corresponds to a key-value pair in the hash map.
Expand Down Expand Up @@ -125,7 +125,6 @@ class ChunksPartitioner {

if (is_hash_map_eos()) {
_hash_map_variant.reset();
_mem_pool.reset();
_obj_pool.reset();
}

Expand All @@ -148,7 +147,7 @@ class ChunksPartitioner {
PartitionChunkConsumer&& partition_chunk_consumer) {
if (!_is_passthrough) {
_is_passthrough = hash_map_with_key.template append_chunk<EnablePassthrough>(
chunk, _partition_columns, _mem_pool.get(), _obj_pool.get(),
chunk, _partition_columns, _mem_pool, _obj_pool.get(),
std::forward<NewPartitionCallback>(new_partition_cb),
std::forward<PartitionChunkConsumer>(partition_chunk_consumer));
}
Expand Down Expand Up @@ -250,7 +249,7 @@ class ChunksPartitioner {
const std::vector<PartitionColumnType> _partition_types;

RuntimeState* _state = nullptr;
std::unique_ptr<MemPool> _mem_pool = nullptr;
MemPool* _mem_pool = nullptr;
std::unique_ptr<ObjectPool> _obj_pool = nullptr;

Columns _partition_columns;
Expand Down
6 changes: 5 additions & 1 deletion be/src/exec/partition/partition_hash_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ struct PartitionHashMapBase {
bool init_null_key_partition = false;
static constexpr size_t kNullKeyPartitionIdx = 0;

bool enable_pre_agg = false;

PartitionHashMapBase(int32_t chunk_size) : chunk_size(chunk_size) {}

protected:
Expand Down Expand Up @@ -148,7 +150,9 @@ struct PartitionHashMapBase {
return;
}
auto partition_num = hash_map.size();
if (partition_num > 512 && total_num_rows < 10000 * partition_num) {
size_t partition_num_hwm = enable_pre_agg ? 32768 : 512;

if (enable_pre_agg && partition_num > partition_num_hwm && total_num_rows < 10000 * partition_num) {
is_passthrough = true;
}
}
Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/partition/partition_hash_variant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,11 @@ void PartitionHashMapVariant::set_passthrough() {
});
}

void PartitionHashMapVariant::set_enable_pre_agg() {
visit([](auto& hash_map_with_key) {
DCHECK(hash_map_with_key != nullptr);
hash_map_with_key->enable_pre_agg = true;
});
}

} // namespace starrocks
2 changes: 2 additions & 0 deletions be/src/exec/partition/partition_hash_variant.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,5 +202,7 @@ struct PartitionHashMapVariant {
bool is_nullable() const;

void set_passthrough();

void set_enable_pre_agg();
};
} // namespace starrocks
5 changes: 4 additions & 1 deletion be/src/exec/pipeline/hash_partition_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ Status HashPartitionContext::prepare(RuntimeState* state, RuntimeProfile* profil
}

_acc.set_max_size(state->chunk_size());
_chunks_partitioner = std::make_unique<ChunksPartitioner>(_has_nullable_key, _partition_exprs, _partition_types);
_mem_pool = std::make_unique<MemPool>();
_chunks_partitioner =
std::make_unique<ChunksPartitioner>(_has_nullable_key, _partition_exprs, _partition_types, _mem_pool.get());
return _chunks_partitioner->prepare(state, profile);
}

Expand All @@ -47,6 +49,7 @@ Status HashPartitionContext::push_one_chunk_to_partitioner(RuntimeState* state,
}

void HashPartitionContext::sink_complete() {
_mem_pool.reset();
_is_sink_complete = true;
}

Expand Down
1 change: 1 addition & 0 deletions be/src/exec/pipeline/hash_partition_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class HashPartitionContext {
bool _is_sink_complete = false;

ChunksPartitionerPtr _chunks_partitioner;
std::unique_ptr<MemPool> _mem_pool;

ChunkPipelineAccumulator _acc;
};
Expand Down
8 changes: 5 additions & 3 deletions be/src/exec/pipeline/scan/connector_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,11 @@ ChunkSourcePtr ConnectorScanOperator::create_chunk_source(MorselPtr morsel, int3
}
}

return std::make_shared<ConnectorChunkSource>(this, _chunk_source_profiles[chunk_source_index].get(),
std::move(morsel), scan_node, factory->get_chunk_buffer(),
_enable_adaptive_io_tasks);
// Only use one chunk source profile, so we can see metrics on scan operator level.
// Since there is adaptive io tasks feature, chunk sources will be used unevenly,
// which leads to sort of "skewed" profile and makes harder to analysis.
return std::make_shared<ConnectorChunkSource>(this, _chunk_source_profiles[0].get(), std::move(morsel), scan_node,
factory->get_chunk_buffer(), _enable_adaptive_io_tasks);
}

void ConnectorScanOperator::attach_chunk_source(int32_t source_index) {
Expand Down
9 changes: 5 additions & 4 deletions be/src/exec/pipeline/scan/olap_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ Status OlapChunkSource::_init_column_access_paths(Schema* schema) {
_params.column_access_paths = &_column_access_paths;

// update counter
COUNTER_SET(_pushdown_access_paths_counter, leaf_size);
COUNTER_UPDATE(_pushdown_access_paths_counter, leaf_size);
return Status::OK();
}

Expand Down Expand Up @@ -520,8 +520,9 @@ Status OlapChunkSource::_init_olap_reader(RuntimeState* runtime_state) {

_non_pushdown_predicates_counter = ADD_COUNTER_SKIP_MERGE(_runtime_profile, "NonPushdownPredicates",
TUnit::UNIT, TCounterMergeType::SKIP_ALL);
COUNTER_SET(_non_pushdown_predicates_counter,
static_cast<int64_t>(_scan_ctx->not_push_down_conjuncts().size() + _non_pushdown_pred_tree.size()));
COUNTER_UPDATE(
_non_pushdown_predicates_counter,
static_cast<int64_t>(_scan_ctx->not_push_down_conjuncts().size() + _non_pushdown_pred_tree.size()));
if (runtime_state->fragment_ctx()->pred_tree_params().enable_show_in_profile) {
_runtime_profile->add_info_string(
"NonPushdownPredicateTree",
Expand Down Expand Up @@ -697,7 +698,7 @@ void OlapChunkSource::_update_counter() {
COUNTER_UPDATE(_segments_read_count, _reader->stats().segments_read_count);
COUNTER_UPDATE(_total_columns_data_page_count, _reader->stats().total_columns_data_page_count);

COUNTER_SET(_pushdown_predicates_counter, (int64_t)_params.pred_tree.size());
COUNTER_UPDATE(_pushdown_predicates_counter, (int64_t)_params.pred_tree.size());

if (_runtime_state->fragment_ctx()->pred_tree_params().enable_show_in_profile) {
_runtime_profile->add_info_string(
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/scan/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ void OlapScanOperator::do_close(RuntimeState* state) {}

ChunkSourcePtr OlapScanOperator::create_chunk_source(MorselPtr morsel, int32_t chunk_source_index) {
auto* olap_scan_node = down_cast<OlapScanNode*>(_scan_node);
return std::make_shared<OlapChunkSource>(this, _chunk_source_profiles[chunk_source_index].get(), std::move(morsel),
olap_scan_node, _ctx.get());
return std::make_shared<OlapChunkSource>(this, _chunk_source_profiles[0].get(), std::move(morsel), olap_scan_node,
_ctx.get());
}

int64_t OlapScanOperator::get_scan_table_id() const {
Expand Down
Loading

0 comments on commit 8d2df2e

Please sign in to comment.