From 906338acde0411731dfa7e23287479629bafb3fc Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 31 Oct 2024 17:00:13 +0800 Subject: [PATCH 1/4] update --- .../exec/group_commit_block_sink_operator.cpp | 52 +++++++++---- .../exec/group_commit_block_sink_operator.h | 7 +- .../exec/group_commit_scan_operator.cpp | 1 + be/src/pipeline/exec/hashjoin_build_sink.cpp | 17 ++-- be/src/pipeline/exec/hashjoin_build_sink.h | 4 +- .../pipeline/exec/hashjoin_probe_operator.cpp | 7 +- .../pipeline/exec/hashjoin_probe_operator.h | 4 +- .../exec/jdbc_table_sink_operator.cpp | 1 + .../exec/join/process_hash_table_probe.h | 7 +- .../exec/join/process_hash_table_probe_impl.h | 16 ++-- .../exec/join_build_sink_operator.cpp | 8 +- .../pipeline/exec/join_build_sink_operator.h | 2 - be/src/pipeline/exec/join_probe_operator.cpp | 3 +- be/src/pipeline/exec/join_probe_operator.h | 2 +- .../exec/memory_scratch_sink_operator.cpp | 24 ++++-- .../exec/memory_scratch_sink_operator.h | 3 + .../exec/multi_cast_data_stream_source.cpp | 13 +++- .../exec/multi_cast_data_stream_source.h | 3 + .../exec/nested_loop_join_build_operator.cpp | 1 - .../exec/nested_loop_join_probe_operator.cpp | 41 ++++++---- .../exec/nested_loop_join_probe_operator.h | 77 +++++++++++-------- be/src/pipeline/exec/repeat_operator.cpp | 75 +++++++++++------- be/src/pipeline/exec/repeat_operator.h | 5 ++ be/src/util/arrow/row_batch.cpp | 28 +------ be/src/util/arrow/row_batch.h | 8 +- .../serde/data_type_serde_arrow_test.cpp | 4 +- 26 files changed, 231 insertions(+), 182 deletions(-) diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp index e0171b41ab1ee8..9f99d55d3ea989 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp @@ -64,6 +64,7 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) { } Status GroupCommitBlockSinkLocalState::_initialize_load_queue() { + SCOPED_TIMER(_init_load_queue_timer); auto& p = _parent->cast(); if (_state->exec_env()->wal_mgr()->is_running()) { RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue( @@ -238,6 +239,17 @@ Status GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state, return Status::OK(); } +Status GroupCommitBlockSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_init_timer); + _init_load_queue_timer = ADD_TIMER(_profile, "InitLoadQueueTime"); + _valid_and_convert_block_timer = ADD_TIMER(_profile, "ValidAndConvertBlockTime"); + _find_partition_timer = ADD_TIMER(_profile, "FindPartitionTime"); + _append_blocks_timer = ADD_TIMER(_profile, "AppendBlocksTime"); + return Status::OK(); +} + Status GroupCommitBlockSinkOperatorX::init(const TDataSink& t_sink) { RETURN_IF_ERROR(Base::init(t_sink)); DCHECK(t_sink.__isset.olap_table_sink); @@ -318,10 +330,15 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc std::shared_ptr block; bool has_filtered_rows = false; - RETURN_IF_ERROR(local_state._block_convertor->validate_and_convert_block( - state, input_block, block, local_state._output_vexpr_ctxs, rows, has_filtered_rows)); + { + SCOPED_TIMER(local_state._valid_and_convert_block_timer); + RETURN_IF_ERROR(local_state._block_convertor->validate_and_convert_block( + state, input_block, block, local_state._output_vexpr_ctxs, rows, + has_filtered_rows)); + } local_state._has_filtered_rows = false; if (!local_state._vpartition->is_auto_partition()) { + SCOPED_TIMER(local_state._find_partition_timer); //reuse vars for find_partition local_state._partitions.assign(rows, nullptr); local_state._filter_bitmap.Reset(rows); @@ -351,23 +368,26 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc } } } - - if (local_state._block_convertor->num_filtered_rows() > 0 || local_state._has_filtered_rows) { - auto cloneBlock = block->clone_without_columns(); - auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); - for (int i = 0; i < rows; ++i) { - if (local_state._block_convertor->filter_map()[i]) { - continue; - } - if (local_state._filter_bitmap.Get(i)) { - continue; + { + SCOPED_TIMER(local_state._append_blocks_timer); + if (local_state._block_convertor->num_filtered_rows() > 0 || + local_state._has_filtered_rows) { + auto cloneBlock = block->clone_without_columns(); + auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); + for (int i = 0; i < rows; ++i) { + if (local_state._block_convertor->filter_map()[i]) { + continue; + } + if (local_state._filter_bitmap.Get(i)) { + continue; + } + res_block.add_row(block.get(), i); } - res_block.add_row(block.get(), i); + block->swap(res_block.to_block()); } - block->swap(res_block.to_block()); + // add block into block queue + RETURN_IF_ERROR(local_state._add_block(state, block)); } - // add block into block queue - RETURN_IF_ERROR(local_state._add_block(state, block)); return wind_up(); } diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h b/be/src/pipeline/exec/group_commit_block_sink_operator.h index 32ca0613652ae4..e469aee8df595c 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.h +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h @@ -42,8 +42,8 @@ class GroupCommitBlockSinkLocalState final : public PipelineXSinkLocalState dependencies() const override { @@ -79,6 +79,11 @@ class GroupCommitBlockSinkLocalState final : public PipelineXSinkLocalState _finish_dependency; std::shared_ptr _create_plan_dependency = nullptr; std::shared_ptr _put_block_dependency = nullptr; + + RuntimeProfile::Counter* _init_load_queue_timer = nullptr; + RuntimeProfile::Counter* _valid_and_convert_block_timer = nullptr; + RuntimeProfile::Counter* _find_partition_timer = nullptr; + RuntimeProfile::Counter* _append_blocks_timer = nullptr; }; class GroupCommitBlockSinkOperatorX final diff --git a/be/src/pipeline/exec/group_commit_scan_operator.cpp b/be/src/pipeline/exec/group_commit_scan_operator.cpp index 9577639813a760..141a5e7bf770c5 100644 --- a/be/src/pipeline/exec/group_commit_scan_operator.cpp +++ b/be/src/pipeline/exec/group_commit_scan_operator.cpp @@ -31,6 +31,7 @@ GroupCommitOperatorX::GroupCommitOperatorX(ObjectPool* pool, const TPlanNode& tn Status GroupCommitOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.exec_time_counter()); bool find_node = false; while (!find_node && !*eos) { RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, &find_node, eos, diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index cbe9209eeb424d..37de9ac93d839f 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -51,19 +51,19 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _shared_state->build_exprs_size = _build_expr_ctxs.size(); _should_build_hash_table = true; + profile()->add_info_string("BroadcastJoin", std::to_string(p._is_broadcast_join)); if (p._is_broadcast_join) { - profile()->add_info_string("BroadcastJoin", "true"); if (state->enable_share_hash_table_for_broadcast_join()) { _should_build_hash_table = info.task_idx == 0; if (_should_build_hash_table) { - profile()->add_info_string("ShareHashTableEnabled", "true"); p._shared_hashtable_controller->set_builder_and_consumers( state->fragment_instance_id(), p.node_id()); } - } else { - profile()->add_info_string("ShareHashTableEnabled", "false"); } } + profile()->add_info_string("BuildShareHashTable", std::to_string(_should_build_hash_table)); + profile()->add_info_string("ShareHashTableEnabled", + std::to_string(state->enable_share_hash_table_for_broadcast_join())); if (!_should_build_hash_table) { _dependency->block(); _finish_dependency->block(); @@ -72,6 +72,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _finish_dependency->shared_from_this()); } + _runtime_filter_init_timer = ADD_TIMER(profile(), "RuntimeFilterInitTime"); _build_blocks_memory_usage = ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageBuildBlocks", TUnit::BYTES, 1); _hash_table_memory_usage = @@ -81,13 +82,10 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo // Build phase auto* record_profile = _should_build_hash_table ? profile() : faker_runtime_profile(); - _build_table_timer = ADD_TIMER(profile(), "BuildTableTime"); - _build_side_merge_block_timer = ADD_TIMER(profile(), "BuildSideMergeBlockTime"); + _build_table_timer = ADD_TIMER(profile(), "BuildHashTableTime"); + _build_side_merge_block_timer = ADD_TIMER(profile(), "MergeBuildBlockTime"); _build_table_insert_timer = ADD_TIMER(record_profile, "BuildTableInsertTime"); _build_expr_call_timer = ADD_TIMER(record_profile, "BuildExprCallTime"); - _build_side_compute_hash_timer = ADD_TIMER(record_profile, "BuildSideHashComputingTime"); - - _allocate_resource_timer = ADD_TIMER(profile(), "AllocateResourceTime"); // Hash Table Init RETURN_IF_ERROR(_hash_table_init(state)); @@ -256,7 +254,6 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, if (UNLIKELY(rows == 0)) { return Status::OK(); } - COUNTER_UPDATE(_build_rows_counter, rows); block.replace_if_overflow(); vectorized::ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size()); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 9f1cf486fe5027..45aa1e8c8a262d 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -94,14 +94,12 @@ class HashJoinBuildSinkLocalState final RuntimeProfile::Counter* _build_table_timer = nullptr; RuntimeProfile::Counter* _build_expr_call_timer = nullptr; RuntimeProfile::Counter* _build_table_insert_timer = nullptr; - RuntimeProfile::Counter* _build_side_compute_hash_timer = nullptr; RuntimeProfile::Counter* _build_side_merge_block_timer = nullptr; - RuntimeProfile::Counter* _allocate_resource_timer = nullptr; - RuntimeProfile::Counter* _build_blocks_memory_usage = nullptr; RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; RuntimeProfile::Counter* _build_arena_memory_usage = nullptr; + RuntimeProfile::Counter* _runtime_filter_init_timer = nullptr; }; class HashJoinBuildSinkOperatorX final diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index e7b784d4d77ab3..426bfcb219dc04 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -57,13 +57,11 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) _probe_arena_memory_usage = profile()->AddHighWaterMarkCounter("MemoryUsageProbeKeyArena", TUnit::BYTES, "", 1); // Probe phase - _probe_next_timer = ADD_TIMER(profile(), "ProbeFindNextTime"); _probe_expr_call_timer = ADD_TIMER(profile(), "ProbeExprCallTime"); _search_hashtable_timer = ADD_TIMER(profile(), "ProbeWhenSearchHashTableTime"); _build_side_output_timer = ADD_TIMER(profile(), "ProbeWhenBuildSideOutputTime"); _probe_side_output_timer = ADD_TIMER(profile(), "ProbeWhenProbeSideOutputTime"); - _probe_process_hashtable_timer = ADD_TIMER(profile(), "ProbeWhenProcessHashTableTime"); - _process_other_join_conjunct_timer = ADD_TIMER(profile(), "OtherJoinConjunctTime"); + _non_equal_join_conjuncts_timer = ADD_TIMER(profile(), "NonEqualJoinConjunctEvaluationTime"); _init_probe_side_timer = ADD_TIMER(profile(), "InitProbeSideTime"); return Status::OK(); } @@ -229,7 +227,6 @@ HashJoinProbeOperatorX::HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) const { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state._probe_timer); if (local_state._shared_state->short_circuit_for_probe) { // If we use a short-circuit strategy, should return empty block directly. *eos = true; @@ -320,7 +317,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc if constexpr (!std::is_same_v) { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - st = process_hashtable_ctx.process_data_in_hashtable( + st = process_hashtable_ctx.finish_probing( arg, mutable_join_block, &temp_block, eos, _is_mark_join); } else { st = Status::InternalError("uninited hash table"); diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 63673edc410fc1..1bdb9d13347d09 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -117,14 +117,12 @@ class HashJoinProbeLocalState final std::make_unique(); RuntimeProfile::Counter* _probe_expr_call_timer = nullptr; - RuntimeProfile::Counter* _probe_next_timer = nullptr; RuntimeProfile::Counter* _probe_side_output_timer = nullptr; - RuntimeProfile::Counter* _probe_process_hashtable_timer = nullptr; RuntimeProfile::HighWaterMarkCounter* _probe_arena_memory_usage = nullptr; RuntimeProfile::Counter* _search_hashtable_timer = nullptr; RuntimeProfile::Counter* _init_probe_side_timer = nullptr; RuntimeProfile::Counter* _build_side_output_timer = nullptr; - RuntimeProfile::Counter* _process_other_join_conjunct_timer = nullptr; + RuntimeProfile::Counter* _non_equal_join_conjuncts_timer = nullptr; }; class HashJoinProbeOperatorX final : public JoinProbeOperatorX { diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp index 10fd0d8e40bf25..29c881d1c28100 100644 --- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp +++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp @@ -47,6 +47,7 @@ Status JdbcTableSinkOperatorX::open(RuntimeState* state) { Status JdbcTableSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows()); RETURN_IF_ERROR(local_state.sink(state, block, eos)); return Status::OK(); } diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h b/be/src/pipeline/exec/join/process_hash_table_probe.h index 739783eb1fd1ec..14e0edd977f57b 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe.h @@ -87,9 +87,8 @@ struct ProcessHashTableProbe { // Process full outer join/ right join / right semi/anti join to output the join result // in hash table template - Status process_data_in_hashtable(HashTableType& hash_table_ctx, - vectorized::MutableBlock& mutable_block, - vectorized::Block* output_block, bool* eos, bool is_mark_join); + Status finish_probing(HashTableType& hash_table_ctx, vectorized::MutableBlock& mutable_block, + vectorized::Block* output_block, bool* eos, bool is_mark_join); /// For null aware join with other conjuncts, if the probe key of one row on left side is null, /// we should make this row match with all rows in build side. @@ -136,7 +135,7 @@ struct ProcessHashTableProbe { RuntimeProfile::Counter* _init_probe_side_timer = nullptr; RuntimeProfile::Counter* _build_side_output_timer = nullptr; RuntimeProfile::Counter* _probe_side_output_timer = nullptr; - RuntimeProfile::Counter* _probe_process_hashtable_timer = nullptr; + RuntimeProfile::Counter* _finish_probe_phase_timer = nullptr; size_t _right_col_idx; size_t _right_col_len; diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h index bf4325ccece042..231c231c81326e 100644 --- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h @@ -56,7 +56,7 @@ ProcessHashTableProbe::ProcessHashTableProbe(HashJoinProbeLocalState _init_probe_side_timer(parent->_init_probe_side_timer), _build_side_output_timer(parent->_build_side_output_timer), _probe_side_output_timer(parent->_probe_side_output_timer), - _probe_process_hashtable_timer(parent->_probe_process_hashtable_timer), + _finish_probe_phase_timer(parent->_finish_probe_phase_timer), _right_col_idx((_is_right_semi_anti && !_have_other_join_conjunct) ? 0 : _parent->left_table_data_types().size()), @@ -502,7 +502,7 @@ Status ProcessHashTableProbe::do_other_join_conjuncts(vectorized::Bl return Status::OK(); } - SCOPED_TIMER(_parent->_process_other_join_conjunct_timer); + SCOPED_TIMER(_parent->_non_equal_join_conjuncts_timer); size_t orig_columns = output_block->columns(); vectorized::IColumn::Filter other_conjunct_filter(row_count, 1); { @@ -617,10 +617,11 @@ Status ProcessHashTableProbe::do_other_join_conjuncts(vectorized::Bl template template -Status ProcessHashTableProbe::process_data_in_hashtable( - HashTableType& hash_table_ctx, vectorized::MutableBlock& mutable_block, - vectorized::Block* output_block, bool* eos, bool is_mark_join) { - SCOPED_TIMER(_probe_process_hashtable_timer); +Status ProcessHashTableProbe::finish_probing(HashTableType& hash_table_ctx, + vectorized::MutableBlock& mutable_block, + vectorized::Block* output_block, bool* eos, + bool is_mark_join) { + SCOPED_TIMER(_finish_probe_phase_timer); auto& mcol = mutable_block.mutable_columns(); if (is_mark_join) { std::unique_ptr mark_column = @@ -709,8 +710,7 @@ struct ExtractType { ExtractType::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \ vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \ uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ - template Status \ - ProcessHashTableProbe::process_data_in_hashtable::Type>( \ + template Status ProcessHashTableProbe::finish_probing::Type>( \ ExtractType::Type & hash_table_ctx, vectorized::MutableBlock & mutable_block, \ vectorized::Block * output_block, bool* eos, bool is_mark_join); diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp b/be/src/pipeline/exec/join_build_sink_operator.cpp index fc0d3b8746077b..8b3f5cd98ff7c0 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.cpp +++ b/be/src/pipeline/exec/join_build_sink_operator.cpp @@ -33,15 +33,11 @@ Status JoinBuildSinkLocalState::init(RuntimeState* stat PipelineXSinkLocalState::profile()->add_info_string("JoinType", to_string(p._join_op)); - _build_rows_counter = ADD_COUNTER(PipelineXSinkLocalState::profile(), - "BuildRows", TUnit::UNIT); _publish_runtime_filter_timer = ADD_TIMER(PipelineXSinkLocalState::profile(), "PublishRuntimeFilterTime"); - _runtime_filter_compute_timer = ADD_TIMER(PipelineXSinkLocalState::profile(), - "RuntimeFilterComputeTime"); - _runtime_filter_init_timer = - ADD_TIMER(PipelineXSinkLocalState::profile(), "RuntimeFilterInitTime"); + _runtime_filter_compute_timer = + ADD_TIMER(PipelineXSinkLocalState::profile(), "BuildRuntimeFilterTime"); return Status::OK(); } diff --git a/be/src/pipeline/exec/join_build_sink_operator.h b/be/src/pipeline/exec/join_build_sink_operator.h index 714e0c34190678..9d79a97397ff77 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.h +++ b/be/src/pipeline/exec/join_build_sink_operator.h @@ -39,10 +39,8 @@ class JoinBuildSinkLocalState : public PipelineXSinkLocalState template friend class JoinBuildSinkOperatorX; - RuntimeProfile::Counter* _build_rows_counter = nullptr; RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr; RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr; - RuntimeProfile::Counter* _runtime_filter_init_timer = nullptr; std::vector> _runtime_filters; }; diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index 76dc75a90d8f3c..11b5b29c8b556b 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -29,11 +29,10 @@ Status JoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); - _probe_timer = ADD_TIMER(Base::profile(), "ProbeTime"); _join_filter_timer = ADD_TIMER(Base::profile(), "JoinFilterTimer"); _build_output_block_timer = ADD_TIMER(Base::profile(), "BuildOutputBlock"); _probe_rows_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeRows", TUnit::UNIT, 1); - + _finish_probe_phase_timer = ADD_TIMER(Base::profile(), "FinishProbePhaseTime"); return Status::OK(); } diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index 3f68c73d04b161..078806cea4fc5a 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -49,10 +49,10 @@ class JoinProbeLocalState : public PipelineXLocalState { size_t _mark_column_id = -1; - RuntimeProfile::Counter* _probe_timer = nullptr; RuntimeProfile::Counter* _probe_rows_counter = nullptr; RuntimeProfile::Counter* _join_filter_timer = nullptr; RuntimeProfile::Counter* _build_output_block_timer = nullptr; + RuntimeProfile::Counter* _finish_probe_phase_timer = nullptr; std::unique_ptr _child_block = nullptr; bool _child_eos = false; diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp index 1d022f9304fd0d..2c69c0e2b2ba9f 100644 --- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp @@ -33,6 +33,9 @@ Status MemoryScratchSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); + _get_arrow_schema_timer = ADD_TIMER(_profile, "GetArrowSchemaTime"); + _convert_block_to_arrow_batch_timer = ADD_TIMER(_profile, "ConvertBlockToArrowBatchTime"); + _evaluation_timer = ADD_TIMER(_profile, "EvaluationTime"); // create queue state->exec_env()->result_queue_mgr()->create_queue(state->fragment_instance_id(), &_queue); @@ -92,13 +95,22 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block* // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec // failed, just return the error status vectorized::Block block; - RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( - local_state._output_vexpr_ctxs, *input_block, &block)); + { + SCOPED_TIMER(local_state._evaluation_timer); + RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( + local_state._output_vexpr_ctxs, *input_block, &block)); + } std::shared_ptr block_arrow_schema; - // After expr executed, use recaculated schema as final schema - RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema, state->timezone())); - RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, arrow::default_memory_pool(), - &result, _timezone_obj)); + { + SCOPED_TIMER(local_state._get_arrow_schema_timer); + // After expr executed, use recaculated schema as final schema + RETURN_IF_ERROR(get_arrow_schema(block, &block_arrow_schema, state->timezone())); + } + { + SCOPED_TIMER(local_state._convert_block_to_arrow_batch_timer); + RETURN_IF_ERROR(convert_to_arrow_batch( + block, block_arrow_schema, arrow::default_memory_pool(), &result, _timezone_obj)); + } local_state._queue->blocking_put(result); if (local_state._queue->size() > config::max_memory_sink_batch_count) { local_state._queue_dependency->block(); diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.h b/be/src/pipeline/exec/memory_scratch_sink_operator.h index 69c0fa14042ef2..c74659d15b96f2 100644 --- a/be/src/pipeline/exec/memory_scratch_sink_operator.h +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.h @@ -45,6 +45,9 @@ class MemoryScratchSinkLocalState final : public PipelineXSinkLocalState _queue_dependency = nullptr; + RuntimeProfile::Counter* _get_arrow_schema_timer = nullptr; + RuntimeProfile::Counter* _convert_block_to_arrow_batch_timer = nullptr; + RuntimeProfile::Counter* _evaluation_timer = nullptr; }; class MemoryScratchSinkOperatorX final : public DataSinkOperatorX { diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 71204f1285ce7b..e45e59d17e27b3 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -40,6 +40,9 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState auto& p = _parent->cast(); _shared_state->multi_cast_data_streamer->set_dep_by_sender_idx(p._consumer_id, _dependency); _wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter"); + _filter_timer = ADD_TIMER(_runtime_profile, "FilterTime"); + _get_data_timer = ADD_TIMER(_runtime_profile, "GetDataTime"); + _materialize_data_timer = ADD_TIMER(_runtime_profile, "MaterializeDataTime"); // init profile for runtime filter RuntimeFilterConsumer::_init_profile(profile()); init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), p.node_id(), @@ -86,15 +89,19 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, if (!local_state._output_expr_contexts.empty()) { output_block = &tmp_block; } - RETURN_IF_ERROR(local_state._shared_state->multi_cast_data_streamer->pull(_consumer_id, - output_block, eos)); - + { + SCOPED_TIMER(local_state._get_data_timer); + RETURN_IF_ERROR(local_state._shared_state->multi_cast_data_streamer->pull( + _consumer_id, output_block, eos)); + } if (!local_state._conjuncts.empty()) { + SCOPED_TIMER(local_state._filter_timer); RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, output_block->columns())); } if (!local_state._output_expr_contexts.empty() && output_block->rows() > 0) { + SCOPED_TIMER(local_state._materialize_data_timer); RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( local_state._output_expr_contexts, *output_block, block, true)); vectorized::materialize_block_inplace(*block); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 2059f706cad3f5..57410bf8d9568a 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -68,6 +68,9 @@ class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState> _filter_dependencies; RuntimeProfile::Counter* _wait_for_rf_timer = nullptr; + RuntimeProfile::Counter* _filter_timer = nullptr; + RuntimeProfile::Counter* _get_data_timer = nullptr; + RuntimeProfile::Counter* _materialize_data_timer = nullptr; }; class MultiCastDataStreamerSourceOperatorX final diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index 59020a5df437bd..83b378e792c3fa 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -139,7 +139,6 @@ Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, vector } if (eos) { - COUNTER_UPDATE(local_state._build_rows_counter, local_state._build_rows); RuntimeFilterBuild rf_ctx(&local_state); RETURN_IF_ERROR(rf_ctx(state)); diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index d0fb4ee19a5824..afa1a2e59b798c 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -43,6 +43,10 @@ Status NestedLoopJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); _loop_join_timer = ADD_TIMER(profile(), "LoopGenerateJoin"); + _output_temp_blocks_timer = ADD_TIMER(profile(), "OutputTempBlocksTime"); + _update_visited_flags_timer = ADD_TIMER(profile(), "UpdateVisitedFlagsTime"); + _join_conjuncts_evaluation_timer = ADD_TIMER(profile(), "JoinConjunctsEvaluationTime"); + _filtered_by_join_conjuncts_timer = ADD_TIMER(profile(), "FilteredByJoinConjunctsTime"); return Status::OK(); } @@ -168,23 +172,26 @@ Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta _process_left_child_block(_join_block, now_process_build_block); } - if constexpr (set_probe_side_flag) { - RETURN_IF_ERROR( - (_do_filtering_and_update_visited_flags( - &_join_block, !p._is_left_semi_anti))); - _update_additional_flags(&_join_block); - // If this join operation is left outer join or full outer join, when - // `_left_side_process_count`, means all rows from build - // side have been joined with _left_side_process_count, we should output current - // probe row with null from build side. - if (_left_side_process_count) { - _finalize_current_phase( - _join_block, state->batch_size()); + { + SCOPED_TIMER(_finish_probe_phase_timer); + if constexpr (set_probe_side_flag) { + RETURN_IF_ERROR( + (_do_filtering_and_update_visited_flags( + &_join_block, !p._is_left_semi_anti))); + _update_additional_flags(&_join_block); + // If this join operation is left outer join or full outer join, when + // `_left_side_process_count`, means all rows from build + // side have been joined with _left_side_process_count, we should output current + // probe row with null from build side. + if (_left_side_process_count) { + _finalize_current_phase( + _join_block, state->batch_size()); + } + } else if (_left_side_process_count && p._is_mark_join && + _shared_state->build_blocks.empty()) { + _append_left_data_with_null(_join_block); } - } else if (_left_side_process_count && p._is_mark_join && - _shared_state->build_blocks.empty()) { - _append_left_data_with_null(_join_block); } } @@ -377,6 +384,7 @@ void NestedLoopJoinProbeLocalState::_append_left_data_with_null(vectorized::Bloc void NestedLoopJoinProbeLocalState::_process_left_child_block( vectorized::Block& block, const vectorized::Block& now_process_build_block) const { + SCOPED_TIMER(_output_temp_blocks_timer); auto& p = _parent->cast(); auto dst_columns = block.mutate_columns(); const size_t max_added_rows = now_process_build_block.rows(); @@ -485,6 +493,7 @@ Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized set_build_side_flag, set_probe_side_flag>( state, join_op_variants); }; + SCOPED_TIMER(local_state._loop_join_timer); RETURN_IF_ERROR( std::visit(func, local_state._shared_state->join_op_variants, vectorized::make_bool_variant(_match_all_build || _is_right_semi_anti), diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 5b0fec159e28bf..c744e6acdc507e 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -68,42 +68,48 @@ class NestedLoopJoinProbeLocalState final size_t build_block_idx, size_t processed_blocks_num, bool materialize, Filter& filter) { - if constexpr (SetBuildSideFlag) { - for (size_t i = 0; i < processed_blocks_num; i++) { - auto& build_side_flag = - assert_cast( - _shared_state->build_side_visited_flags[build_block_idx].get()) - ->get_data(); - auto* __restrict build_side_flag_data = build_side_flag.data(); - auto cur_sz = build_side_flag.size(); - const size_t offset = _build_offset_stack.top(); - _build_offset_stack.pop(); - for (size_t j = 0; j < cur_sz; j++) { - build_side_flag_data[j] |= filter[offset + j]; + { + SCOPED_TIMER(_update_visited_flags_timer); + if constexpr (SetBuildSideFlag) { + for (size_t i = 0; i < processed_blocks_num; i++) { + auto& build_side_flag = + assert_cast( + _shared_state->build_side_visited_flags[build_block_idx].get()) + ->get_data(); + auto* __restrict build_side_flag_data = build_side_flag.data(); + auto cur_sz = build_side_flag.size(); + const size_t offset = _build_offset_stack.top(); + _build_offset_stack.pop(); + for (size_t j = 0; j < cur_sz; j++) { + build_side_flag_data[j] |= filter[offset + j]; + } + build_block_idx = build_block_idx == 0 ? _shared_state->build_blocks.size() - 1 + : build_block_idx - 1; } - build_block_idx = build_block_idx == 0 ? _shared_state->build_blocks.size() - 1 - : build_block_idx - 1; } - } - if constexpr (SetProbeSideFlag) { - int64_t end = filter.size(); - for (int i = _left_block_pos == _child_block->rows() ? _left_block_pos - 1 - : _left_block_pos; - i >= _left_block_start_pos; i--) { - int64_t offset = 0; - if (!_probe_offset_stack.empty()) { - offset = _probe_offset_stack.top(); - _probe_offset_stack.pop(); - } - if (!_cur_probe_row_visited_flags[i]) { - _cur_probe_row_visited_flags[i] = - simd::contain_byte(filter.data() + offset, end - offset, 1) ? 1 - : 0; + if constexpr (SetProbeSideFlag) { + int64_t end = filter.size(); + for (int i = _left_block_pos == _child_block->rows() ? _left_block_pos - 1 + : _left_block_pos; + i >= _left_block_start_pos; i--) { + int64_t offset = 0; + if (!_probe_offset_stack.empty()) { + offset = _probe_offset_stack.top(); + _probe_offset_stack.pop(); + } + if (!_cur_probe_row_visited_flags[i]) { + _cur_probe_row_visited_flags[i] = + simd::contain_byte(filter.data() + offset, end - offset, 1) + ? 1 + : 0; + } + end = offset; } - end = offset; } } + if (materialize) { + SCOPED_TIMER(_filtered_by_join_conjuncts_timer); vectorized::Block::filter_block_internal(block, filter, column_to_keep); } else { CLEAR_BLOCK @@ -125,8 +131,11 @@ class NestedLoopJoinProbeLocalState final if (LIKELY(!_join_conjuncts.empty() && block->rows() > 0)) { vectorized::IColumn::Filter filter(block->rows(), 1); bool can_filter_all = false; - RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts( - _join_conjuncts, nullptr, IgnoreNull, block, &filter, &can_filter_all)); + { + SCOPED_TIMER(_join_conjuncts_evaluation_timer); + RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts( + _join_conjuncts, nullptr, IgnoreNull, block, &filter, &can_filter_all)); + } if (can_filter_all) { CLEAR_BLOCK @@ -185,6 +194,10 @@ class NestedLoopJoinProbeLocalState final vectorized::VExprContextSPtrs _join_conjuncts; RuntimeProfile::Counter* _loop_join_timer = nullptr; + RuntimeProfile::Counter* _output_temp_blocks_timer = nullptr; + RuntimeProfile::Counter* _update_visited_flags_timer = nullptr; + RuntimeProfile::Counter* _join_conjuncts_evaluation_timer = nullptr; + RuntimeProfile::Counter* _filtered_by_join_conjuncts_timer = nullptr; }; class NestedLoopJoinProbeOperatorX final diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index dba4f27af7c385..5c94d43f0d1e05 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -46,6 +46,16 @@ Status RepeatLocalState::open(RuntimeState* state) { return Status::OK(); } +Status RepeatLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_init_timer); + _evaluate_input_timer = ADD_TIMER(profile(), "EvaluateInputDataTime"); + _get_repeat_data_timer = ADD_TIMER(profile(), "GetRepeatDataTime"); + _filter_timer = ADD_TIMER(profile(), "FilterTime"); + return Status::OK(); +} + Status RepeatOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(OperatorXBase::init(tnode, state)); RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.repeat_node.exprs, _expr_ctxs)); @@ -166,23 +176,24 @@ Status RepeatLocalState::add_grouping_id_column(std::size_t rows, std::size_t& c Status RepeatOperatorX::push(RuntimeState* state, vectorized::Block* input_block, bool eos) const { auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state._evaluate_input_timer); local_state._child_eos = eos; - auto& _intermediate_block = local_state._intermediate_block; - auto& _expr_ctxs = local_state._expr_ctxs; - DCHECK(!_intermediate_block || _intermediate_block->rows() == 0); + auto& intermediate_block = local_state._intermediate_block; + auto& expr_ctxs = local_state._expr_ctxs; + DCHECK(!intermediate_block || intermediate_block->rows() == 0); if (input_block->rows() > 0) { - _intermediate_block = vectorized::Block::create_unique(); + intermediate_block = vectorized::Block::create_unique(); - for (auto& expr : _expr_ctxs) { + for (auto& expr : expr_ctxs) { int result_column_id = -1; RETURN_IF_ERROR(expr->execute(input_block, &result_column_id)); DCHECK(result_column_id != -1); input_block->get_by_position(result_column_id).column = input_block->get_by_position(result_column_id) .column->convert_to_full_column_if_const(); - _intermediate_block->insert(input_block->get_by_position(result_column_id)); + intermediate_block->insert(input_block->get_by_position(result_column_id)); } - DCHECK_EQ(_expr_ctxs.size(), _intermediate_block->columns()); + DCHECK_EQ(expr_ctxs.size(), intermediate_block->columns()); } return Status::OK(); @@ -202,33 +213,39 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp } DCHECK(output_block->rows() == 0); - if (_intermediate_block && _intermediate_block->rows() > 0) { - RETURN_IF_ERROR(local_state.get_repeated_block(_intermediate_block.get(), _repeat_id_idx, - output_block)); + { + SCOPED_TIMER(local_state._get_repeat_data_timer); + if (_intermediate_block && _intermediate_block->rows() > 0) { + RETURN_IF_ERROR(local_state.get_repeated_block(_intermediate_block.get(), + _repeat_id_idx, output_block)); - _repeat_id_idx++; + _repeat_id_idx++; - int size = _repeat_id_list.size(); - if (_repeat_id_idx >= size) { - _intermediate_block->clear(); + int size = _repeat_id_list.size(); + if (_repeat_id_idx >= size) { + _intermediate_block->clear(); + _child_block.clear_column_data(_child->row_desc().num_materialized_slots()); + _repeat_id_idx = 0; + } + } else if (local_state._expr_ctxs.empty()) { + auto m_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( + output_block, _output_slots); + auto rows = _child_block.rows(); + auto& columns = m_block.mutable_columns(); + + for (int repeat_id_idx = 0; repeat_id_idx < _repeat_id_list.size(); repeat_id_idx++) { + std::size_t cur_col = 0; + RETURN_IF_ERROR( + local_state.add_grouping_id_column(rows, cur_col, columns, repeat_id_idx)); + } _child_block.clear_column_data(_child->row_desc().num_materialized_slots()); - _repeat_id_idx = 0; } - } else if (local_state._expr_ctxs.empty()) { - auto m_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, - _output_slots); - auto rows = _child_block.rows(); - auto& columns = m_block.mutable_columns(); - - for (int repeat_id_idx = 0; repeat_id_idx < _repeat_id_list.size(); repeat_id_idx++) { - std::size_t cur_col = 0; - RETURN_IF_ERROR( - local_state.add_grouping_id_column(rows, cur_col, columns, repeat_id_idx)); - } - _child_block.clear_column_data(_child->row_desc().num_materialized_slots()); } - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, - output_block->columns())); + { + SCOPED_TIMER(local_state._filter_timer); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, + output_block->columns())); + } *eos = _child_eos && _child_block.rows() == 0; local_state.reached_limit(output_block, eos); return Status::OK(); diff --git a/be/src/pipeline/exec/repeat_operator.h b/be/src/pipeline/exec/repeat_operator.h index 22398df372ae65..31f88f37231aaa 100644 --- a/be/src/pipeline/exec/repeat_operator.h +++ b/be/src/pipeline/exec/repeat_operator.h @@ -36,6 +36,7 @@ class RepeatLocalState final : public PipelineXLocalState { using Base = PipelineXLocalState; RepeatLocalState(RuntimeState* state, OperatorXBase* parent); + Status init(RuntimeState* state, LocalStateInfo& info) override; Status open(RuntimeState* state) override; Status get_repeated_block(vectorized::Block* child_block, int repeat_id_idx, @@ -53,6 +54,10 @@ class RepeatLocalState final : public PipelineXLocalState { int _repeat_id_idx; std::unique_ptr _intermediate_block; vectorized::VExprContextSPtrs _expr_ctxs; + + RuntimeProfile::Counter* _evaluate_input_timer = nullptr; + RuntimeProfile::Counter* _get_repeat_data_timer = nullptr; + RuntimeProfile::Counter* _filter_timer = nullptr; }; class RepeatOperatorX final : public StatefulOperatorX { diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp index 2c6ed52ddde65f..c68c47a9144a62 100644 --- a/be/src/util/arrow/row_batch.cpp +++ b/be/src/util/arrow/row_batch.cpp @@ -157,17 +157,8 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* field, - const std::string& timezone) { - std::shared_ptr type; - RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type, timezone)); - *field = arrow::field(desc->col_name(), type, desc->is_nullable()); - return Status::OK(); -} - -Status convert_block_arrow_schema(const vectorized::Block& block, - std::shared_ptr* result, - const std::string& timezone) { +Status get_arrow_schema(const vectorized::Block& block, std::shared_ptr* result, + const std::string& timezone) const { std::vector> fields; for (const auto& type_and_name : block) { std::shared_ptr arrow_type; @@ -180,21 +171,6 @@ Status convert_block_arrow_schema(const vectorized::Block& block, return Status::OK(); } -Status convert_to_arrow_schema(const RowDescriptor& row_desc, - std::shared_ptr* result, - const std::string& timezone) { - std::vector> fields; - for (auto tuple_desc : row_desc.tuple_descriptors()) { - for (auto desc : tuple_desc->slots()) { - std::shared_ptr field; - RETURN_IF_ERROR(convert_to_arrow_field(desc, &field, timezone)); - fields.push_back(field); - } - } - *result = arrow::schema(std::move(fields)); - return Status::OK(); -} - Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, std::shared_ptr* result, const std::string& timezone) { diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h index 9a33719a1cfbcc..9e4847cdf96e90 100644 --- a/be/src/util/arrow/row_batch.h +++ b/be/src/util/arrow/row_batch.h @@ -45,12 +45,8 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result, const std::string& timezone); - -Status convert_block_arrow_schema(const vectorized::Block& block, - std::shared_ptr* result, - const std::string& timezone); +Status get_arrow_schema(const RowDescriptor& row_desc, std::shared_ptr* result, + const std::string& timezone); Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, std::shared_ptr* result, diff --git a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp index fc692b8f67569e..4105e86f5590ee 100644 --- a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp +++ b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp @@ -489,7 +489,7 @@ void serialize_and_deserialize_arrow_test() { RowDescriptor row_desc(&tuple_desc, true); // arrow schema std::shared_ptr _arrow_schema; - EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK()); + EXPECT_EQ(get_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK()); // serialize std::shared_ptr result; @@ -623,7 +623,7 @@ TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) { RowDescriptor row_desc(&tuple_desc, true); // arrow schema std::shared_ptr _arrow_schema; - EXPECT_EQ(convert_to_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK()); + EXPECT_EQ(get_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK()); // serialize std::shared_ptr result; From 0058d598cbe8f1b383c2e11c65fe8594e78a9bdf Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 31 Oct 2024 17:20:06 +0800 Subject: [PATCH 2/4] update --- be/src/util/arrow/row_batch.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp index c68c47a9144a62..0cbb6bcd0c8916 100644 --- a/be/src/util/arrow/row_batch.cpp +++ b/be/src/util/arrow/row_batch.cpp @@ -158,7 +158,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result, - const std::string& timezone) const { + const std::string& timezone) { std::vector> fields; for (const auto& type_and_name : block) { std::shared_ptr arrow_type; From 2bdd9583172f09af87804d10ac39c7b1485020ee Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 31 Oct 2024 17:39:50 +0800 Subject: [PATCH 3/4] update --- be/src/util/arrow/row_batch.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h index 9e4847cdf96e90..3993003baf6e95 100644 --- a/be/src/util/arrow/row_batch.h +++ b/be/src/util/arrow/row_batch.h @@ -45,7 +45,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result, +Status get_arrow_schema(const vectorized::Block& block, std::shared_ptr* result, const std::string& timezone); Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, From e9becd6762895caaede5050dcb264133434b5e39 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 31 Oct 2024 18:46:42 +0800 Subject: [PATCH 4/4] update --- .../serde/data_type_serde_arrow_test.cpp | 654 ------------------ 1 file changed, 654 deletions(-) delete mode 100644 be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp diff --git a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp b/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp deleted file mode 100644 index 4105e86f5590ee..00000000000000 --- a/be/test/vec/data_types/serde/data_type_serde_arrow_test.cpp +++ /dev/null @@ -1,654 +0,0 @@ - -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include "gtest/gtest_pred_impl.h" -#include "olap/hll.h" -#include "runtime/descriptors.cpp" -#include "runtime/descriptors.h" -#include "util/arrow/block_convertor.h" -#include "util/arrow/row_batch.h" -#include "util/bitmap_value.h" -#include "util/quantile_state.h" -#include "util/string_parser.hpp" -#include "vec/columns/column.h" -#include "vec/columns/column_array.h" -#include "vec/columns/column_complex.h" -#include "vec/columns/column_decimal.h" -#include "vec/columns/column_map.h" -#include "vec/columns/column_nullable.h" -#include "vec/columns/column_string.h" -#include "vec/columns/column_vector.h" -#include "vec/core/block.h" -#include "vec/core/field.h" -#include "vec/core/types.h" -#include "vec/data_types/data_type.h" -#include "vec/data_types/data_type_array.h" -#include "vec/data_types/data_type_bitmap.h" -#include "vec/data_types/data_type_date.h" -#include "vec/data_types/data_type_date_time.h" -#include "vec/data_types/data_type_decimal.h" -#include "vec/data_types/data_type_hll.h" -#include "vec/data_types/data_type_ipv4.h" -#include "vec/data_types/data_type_ipv6.h" -#include "vec/data_types/data_type_map.h" -#include "vec/data_types/data_type_nullable.h" -#include "vec/data_types/data_type_number.h" -#include "vec/data_types/data_type_quantilestate.h" -#include "vec/data_types/data_type_string.h" -#include "vec/data_types/data_type_struct.h" -#include "vec/data_types/data_type_time_v2.h" -#include "vec/io/io_helper.h" -#include "vec/runtime/vdatetime_value.h" -#include "vec/utils/arrow_column_to_doris_column.h" - -namespace doris::vectorized { - -template -void serialize_and_deserialize_arrow_test() { - vectorized::Block block; - std::vector> cols; - if constexpr (is_scalar) { - cols = { - {"k1", FieldType::OLAP_FIELD_TYPE_INT, 1, TYPE_INT, false}, - {"k7", FieldType::OLAP_FIELD_TYPE_INT, 7, TYPE_INT, true}, - {"k2", FieldType::OLAP_FIELD_TYPE_STRING, 2, TYPE_STRING, false}, - {"k3", FieldType::OLAP_FIELD_TYPE_DECIMAL128I, 3, TYPE_DECIMAL128I, false}, - {"k11", FieldType::OLAP_FIELD_TYPE_DATETIME, 11, TYPE_DATETIME, false}, - {"k4", FieldType::OLAP_FIELD_TYPE_BOOL, 4, TYPE_BOOLEAN, false}, - {"k5", FieldType::OLAP_FIELD_TYPE_DECIMAL32, 5, TYPE_DECIMAL32, false}, - {"k6", FieldType::OLAP_FIELD_TYPE_DECIMAL64, 6, TYPE_DECIMAL64, false}, - {"k12", FieldType::OLAP_FIELD_TYPE_DATETIMEV2, 12, TYPE_DATETIMEV2, false}, - {"k8", FieldType::OLAP_FIELD_TYPE_IPV4, 8, TYPE_IPV4, false}, - {"k9", FieldType::OLAP_FIELD_TYPE_IPV6, 9, TYPE_IPV6, false}, - }; - } else { - cols = {{"a", FieldType::OLAP_FIELD_TYPE_ARRAY, 6, TYPE_ARRAY, true}, - {"m", FieldType::OLAP_FIELD_TYPE_MAP, 8, TYPE_MAP, true}, - {"s", FieldType::OLAP_FIELD_TYPE_STRUCT, 5, TYPE_STRUCT, true}}; - } - - int row_num = 7; - // make desc and generate block - TupleDescriptor tuple_desc(PTupleDescriptor(), true); - for (auto t : cols) { - TSlotDescriptor tslot; - std::string col_name = std::get<0>(t); - tslot.__set_colName(col_name); - TypeDescriptor type_desc(std::get<3>(t)); - bool is_nullable(std::get<4>(t)); - switch (std::get<3>(t)) { - case TYPE_BOOLEAN: - tslot.__set_slotType(type_desc.to_thrift()); - { - auto vec = vectorized::ColumnVector::create(); - auto& data = vec->get_data(); - for (int i = 0; i < row_num; ++i) { - data.push_back(i % 2); - } - vectorized::DataTypePtr data_type(std::make_shared()); - vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, - col_name); - block.insert(std::move(type_and_name)); - } - break; - case TYPE_INT: - tslot.__set_slotType(type_desc.to_thrift()); - if (is_nullable) { - { - auto column_vector_int32 = vectorized::ColumnVector::create(); - auto column_nullable_vector = - vectorized::make_nullable(std::move(column_vector_int32)); - auto mutable_nullable_vector = std::move(*column_nullable_vector).mutate(); - for (int i = 0; i < row_num; i++) { - if (i % 2 == 0) { - mutable_nullable_vector->insert_default(); - } else { - mutable_nullable_vector->insert(int32(i)); - } - } - auto data_type = vectorized::make_nullable( - std::make_shared()); - vectorized::ColumnWithTypeAndName type_and_name( - mutable_nullable_vector->get_ptr(), data_type, col_name); - block.insert(type_and_name); - } - } else { - auto vec = vectorized::ColumnVector::create(); - auto& data = vec->get_data(); - for (int i = 0; i < row_num; ++i) { - data.push_back(i); - } - vectorized::DataTypePtr data_type(std::make_shared()); - vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, - col_name); - block.insert(std::move(type_and_name)); - } - break; - case TYPE_DECIMAL32: - type_desc.precision = 9; - type_desc.scale = 2; - tslot.__set_slotType(type_desc.to_thrift()); - { - vectorized::DataTypePtr decimal_data_type = - std::make_shared>(type_desc.precision, - type_desc.scale); - auto decimal_column = decimal_data_type->create_column(); - auto& data = ((vectorized::ColumnDecimal>*) - decimal_column.get()) - ->get_data(); - for (int i = 0; i < row_num; ++i) { - if (i == 0) { - data.push_back(Int32(0)); - continue; - } - Int32 val; - StringParser::ParseResult result = StringParser::PARSE_SUCCESS; - i % 2 == 0 ? val = StringParser::string_to_decimal( - "1234567.56", 11, type_desc.precision, type_desc.scale, - &result) - : val = StringParser::string_to_decimal( - "-1234567.56", 12, type_desc.precision, type_desc.scale, - &result); - EXPECT_TRUE(result == StringParser::PARSE_SUCCESS); - data.push_back(val); - } - - vectorized::ColumnWithTypeAndName type_and_name(decimal_column->get_ptr(), - decimal_data_type, col_name); - block.insert(type_and_name); - } - break; - case TYPE_DECIMAL64: - type_desc.precision = 18; - type_desc.scale = 6; - tslot.__set_slotType(type_desc.to_thrift()); - { - vectorized::DataTypePtr decimal_data_type = - std::make_shared>(type_desc.precision, - type_desc.scale); - auto decimal_column = decimal_data_type->create_column(); - auto& data = ((vectorized::ColumnDecimal>*) - decimal_column.get()) - ->get_data(); - for (int i = 0; i < row_num; ++i) { - if (i == 0) { - data.push_back(Int64(0)); - continue; - } - Int64 val; - StringParser::ParseResult result = StringParser::PARSE_SUCCESS; - std::string decimal_string = - i % 2 == 0 ? "-123456789012.123456" : "123456789012.123456"; - val = StringParser::string_to_decimal( - decimal_string.c_str(), decimal_string.size(), type_desc.precision, - type_desc.scale, &result); - EXPECT_TRUE(result == StringParser::PARSE_SUCCESS); - data.push_back(val); - } - vectorized::ColumnWithTypeAndName type_and_name(decimal_column->get_ptr(), - decimal_data_type, col_name); - block.insert(type_and_name); - } - break; - case TYPE_DECIMAL128I: - type_desc.precision = 27; - type_desc.scale = 9; - tslot.__set_slotType(type_desc.to_thrift()); - { - vectorized::DataTypePtr decimal_data_type( - doris::vectorized::create_decimal(27, 9, true)); - auto decimal_column = decimal_data_type->create_column(); - auto& data = ((vectorized::ColumnDecimal>*) - decimal_column.get()) - ->get_data(); - for (int i = 0; i < row_num; ++i) { - __int128_t value = __int128_t(i * pow(10, 9) + i * pow(10, 8)); - data.push_back(value); - } - vectorized::ColumnWithTypeAndName type_and_name(decimal_column->get_ptr(), - decimal_data_type, col_name); - block.insert(type_and_name); - } - break; - case TYPE_STRING: - tslot.__set_slotType(type_desc.to_thrift()); - { - auto strcol = vectorized::ColumnString::create(); - for (int i = 0; i < row_num; ++i) { - std::string is = std::to_string(i); - strcol->insert_data(is.c_str(), is.size()); - } - vectorized::DataTypePtr data_type(std::make_shared()); - vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), data_type, - col_name); - block.insert(type_and_name); - } - break; - case TYPE_HLL: - tslot.__set_slotType(type_desc.to_thrift()); - { - vectorized::DataTypePtr hll_data_type(std::make_shared()); - auto hll_column = hll_data_type->create_column(); - std::vector& container = - ((vectorized::ColumnHLL*)hll_column.get())->get_data(); - for (int i = 0; i < row_num; ++i) { - HyperLogLog hll; - hll.update(i); - container.push_back(hll); - } - vectorized::ColumnWithTypeAndName type_and_name(hll_column->get_ptr(), - hll_data_type, col_name); - - block.insert(type_and_name); - } - break; - case TYPE_DATEV2: - tslot.__set_slotType(type_desc.to_thrift()); - { - auto column_vector_date_v2 = vectorized::ColumnVector::create(); - auto& date_v2_data = column_vector_date_v2->get_data(); - for (int i = 0; i < row_num; ++i) { - DateV2Value value; - value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6)); - date_v2_data.push_back(*reinterpret_cast(&value)); - } - vectorized::DataTypePtr date_v2_type( - std::make_shared()); - vectorized::ColumnWithTypeAndName test_date_v2(column_vector_date_v2->get_ptr(), - date_v2_type, col_name); - block.insert(test_date_v2); - } - break; - case TYPE_DATE: // int64 - tslot.__set_slotType(type_desc.to_thrift()); - { - auto column_vector_date = vectorized::ColumnVector::create(); - auto& date_data = column_vector_date->get_data(); - for (int i = 0; i < row_num; ++i) { - VecDateTimeValue value; - value.from_date_int64(20210501); - date_data.push_back(*reinterpret_cast(&value)); - } - vectorized::DataTypePtr date_type(std::make_shared()); - vectorized::ColumnWithTypeAndName test_date(column_vector_date->get_ptr(), - date_type, col_name); - block.insert(test_date); - } - break; - case TYPE_DATETIME: // int64 - tslot.__set_slotType(type_desc.to_thrift()); - { - auto column_vector_datetime = vectorized::ColumnVector::create(); - auto& datetime_data = column_vector_datetime->get_data(); - for (int i = 0; i < row_num; ++i) { - VecDateTimeValue value; - value.from_date_int64(20210501080910); - datetime_data.push_back(*reinterpret_cast(&value)); - } - vectorized::DataTypePtr datetime_type( - std::make_shared()); - vectorized::ColumnWithTypeAndName test_datetime(column_vector_datetime->get_ptr(), - datetime_type, col_name); - block.insert(test_datetime); - } - break; - case TYPE_DATETIMEV2: // uint64 - tslot.__set_slotType(type_desc.to_thrift()); - { - // 2022-01-01 11:11:11.111 - auto column_vector_datetimev2 = - vectorized::ColumnVector::create(); - // auto& datetimev2_data = column_vector_datetimev2->get_data(); - DateV2Value value; - string date_literal = "2022-01-01 11:11:11.111"; - value.from_date_str(date_literal.c_str(), date_literal.size()); - char to[64] = {}; - std::cout << "value: " << value.to_string(to) << std::endl; - for (int i = 0; i < row_num; ++i) { - column_vector_datetimev2->insert(value.to_date_int_val()); - } - vectorized::DataTypePtr datetimev2_type( - std::make_shared()); - vectorized::ColumnWithTypeAndName test_datetimev2( - column_vector_datetimev2->get_ptr(), datetimev2_type, col_name); - block.insert(test_datetimev2); - } - break; - case TYPE_ARRAY: // array - type_desc.add_sub_type(TYPE_STRING, true); - tslot.__set_slotType(type_desc.to_thrift()); - { - DataTypePtr s = - std::make_shared(std::make_shared()); - DataTypePtr au = std::make_shared(s); - Array a1, a2; - a1.push_back(String("sss")); - a1.push_back(Null()); - a1.push_back(String("clever amory")); - a2.push_back(String("hello amory")); - a2.push_back(Null()); - a2.push_back(String("cute amory")); - a2.push_back(String("sf")); - MutableColumnPtr array_column = au->create_column(); - array_column->reserve(2); - array_column->insert(a1); - array_column->insert(a2); - vectorized::ColumnWithTypeAndName type_and_name(array_column->get_ptr(), au, - col_name); - block.insert(type_and_name); - } - break; - case TYPE_MAP: - type_desc.add_sub_type(TYPE_STRING, true); - type_desc.add_sub_type(TYPE_STRING, true); - tslot.__set_slotType(type_desc.to_thrift()); - { - DataTypePtr s = - std::make_shared(std::make_shared()); - ; - DataTypePtr d = - std::make_shared(std::make_shared()); - DataTypePtr m = std::make_shared(s, d); - Array k1, k2, v1, v2; - k1.push_back("null"); - k1.push_back("doris"); - k1.push_back("clever amory"); - v1.push_back("ss"); - v1.push_back(Null()); - v1.push_back("NULL"); - k2.push_back("hello amory"); - k2.push_back("NULL"); - k2.push_back("cute amory"); - k2.push_back("doris"); - v2.push_back("s"); - v2.push_back("0"); - v2.push_back("sf"); - v2.push_back(Null()); - Map m1, m2; - m1.push_back(k1); - m1.push_back(v1); - m2.push_back(k2); - m2.push_back(v2); - MutableColumnPtr map_column = m->create_column(); - map_column->reserve(2); - map_column->insert(m1); - map_column->insert(m2); - vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), m, col_name); - block.insert(type_and_name); - } - break; - case TYPE_STRUCT: - type_desc.add_sub_type(TYPE_STRING, "name", true); - type_desc.add_sub_type(TYPE_LARGEINT, "age", true); - type_desc.add_sub_type(TYPE_BOOLEAN, "is", true); - tslot.__set_slotType(type_desc.to_thrift()); - { - DataTypePtr s = - std::make_shared(std::make_shared()); - DataTypePtr d = - std::make_shared(std::make_shared()); - DataTypePtr m = - std::make_shared(std::make_shared()); - DataTypePtr st = - std::make_shared(std::vector {s, d, m}); - Tuple t1, t2; - t1.push_back(String("amory cute")); - t1.push_back(__int128_t(37)); - t1.push_back(true); - t2.push_back("null"); - t2.push_back(__int128_t(26)); - t2.push_back(false); - MutableColumnPtr struct_column = st->create_column(); - struct_column->reserve(2); - struct_column->insert(t1); - struct_column->insert(t2); - vectorized::ColumnWithTypeAndName type_and_name(struct_column->get_ptr(), st, - col_name); - block.insert(type_and_name); - } - break; - case TYPE_IPV4: - tslot.__set_slotType(type_desc.to_thrift()); - { - auto vec = vectorized::ColumnIPv4::create(); - auto& data = vec->get_data(); - for (int i = 0; i < row_num; ++i) { - data.push_back(i); - } - vectorized::DataTypePtr data_type(std::make_shared()); - vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, - col_name); - block.insert(std::move(type_and_name)); - } - break; - case TYPE_IPV6: - tslot.__set_slotType(type_desc.to_thrift()); - { - auto vec = vectorized::ColumnIPv6::create(); - auto& data = vec->get_data(); - for (int i = 0; i < row_num; ++i) { - data.push_back(i); - } - vectorized::DataTypePtr data_type(std::make_shared()); - vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(), data_type, - col_name); - block.insert(std::move(type_and_name)); - } - break; - default: - break; - } - - tslot.__set_col_unique_id(std::get<2>(t)); - SlotDescriptor* slot = new SlotDescriptor(tslot); - tuple_desc.add_slot(slot); - } - - RowDescriptor row_desc(&tuple_desc, true); - // arrow schema - std::shared_ptr _arrow_schema; - EXPECT_EQ(get_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK()); - - // serialize - std::shared_ptr result; - std::cout << "block data: " << block.dump_data(0, row_num) << std::endl; - std::cout << "_arrow_schema: " << _arrow_schema->ToString(true) << std::endl; - - cctz::time_zone timezone_obj; - TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, timezone_obj); - static_cast(convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(), - &result, timezone_obj)); - Block new_block = block.clone_empty(); - EXPECT_TRUE(result != nullptr); - std::cout << "result: " << result->ToString() << std::endl; - // deserialize - for (auto t : cols) { - std::string real_column_name = std::get<0>(t); - auto* array = result->GetColumnByName(real_column_name).get(); - auto& column_with_type_and_name = new_block.get_by_name(real_column_name); - if (std::get<3>(t) == PrimitiveType::TYPE_DATE || - std::get<3>(t) == PrimitiveType::TYPE_DATETIME) { - { - auto strcol = vectorized::ColumnString::create(); - vectorized::DataTypePtr data_type(std::make_shared()); - vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), data_type, - real_column_name); - static_cast(arrow_column_to_doris_column( - array, 0, type_and_name.column, type_and_name.type, block.rows(), "UTC")); - { - auto& col = column_with_type_and_name.column.get()->assume_mutable_ref(); - auto& date_data = static_cast&>(col).get_data(); - for (int i = 0; i < strcol->size(); ++i) { - StringRef str = strcol->get_data_at(i); - VecDateTimeValue value; - value.from_date_str(str.data, str.size); - date_data.push_back(*reinterpret_cast(&value)); - } - } - } - continue; - } else if (std::get<3>(t) == PrimitiveType::TYPE_DATEV2) { - auto strcol = vectorized::ColumnString::create(); - vectorized::DataTypePtr data_type(std::make_shared()); - vectorized::ColumnWithTypeAndName type_and_name(strcol->get_ptr(), data_type, - real_column_name); - static_cast(arrow_column_to_doris_column( - array, 0, type_and_name.column, type_and_name.type, block.rows(), "UTC")); - { - auto& col = column_with_type_and_name.column.get()->assume_mutable_ref(); - auto& date_data = static_cast&>(col).get_data(); - for (int i = 0; i < strcol->size(); ++i) { - StringRef str = strcol->get_data_at(i); - DateV2Value value; - value.from_date_str(str.data, str.size); - date_data.push_back(*reinterpret_cast(&value)); - } - } - continue; - } else if (std::get<3>(t) == PrimitiveType::TYPE_DATETIMEV2) { - // now we only support read doris datetimev2 to arrow - block.erase(real_column_name); - new_block.erase(real_column_name); - continue; - } - static_cast(arrow_column_to_doris_column(array, 0, column_with_type_and_name.column, - column_with_type_and_name.type, block.rows(), - "UTC")); - } - - std::cout << block.dump_data() << std::endl; - std::cout << new_block.dump_data() << std::endl; - EXPECT_EQ(block.dump_data(), new_block.dump_data()); -} - -TEST(DataTypeSerDeArrowTest, DataTypeScalaSerDeTest) { - serialize_and_deserialize_arrow_test(); -} - -TEST(DataTypeSerDeArrowTest, DataTypeCollectionSerDeTest) { - serialize_and_deserialize_arrow_test(); -} - -TEST(DataTypeSerDeArrowTest, DataTypeMapNullKeySerDeTest) { - TupleDescriptor tuple_desc(PTupleDescriptor(), true); - TSlotDescriptor tslot; - std::string col_name = "map_null_key"; - tslot.__set_colName(col_name); - TypeDescriptor type_desc(TYPE_MAP); - type_desc.add_sub_type(TYPE_STRING, true); - type_desc.add_sub_type(TYPE_INT, true); - tslot.__set_slotType(type_desc.to_thrift()); - vectorized::Block block; - { - DataTypePtr s = std::make_shared(std::make_shared()); - ; - DataTypePtr d = std::make_shared(std::make_shared()); - DataTypePtr m = std::make_shared(s, d); - Array k1, k2, v1, v2, k3, v3; - k1.push_back("doris"); - k1.push_back("clever amory"); - v1.push_back(Null()); - v1.push_back(30); - k2.push_back("hello amory"); - k2.push_back("NULL"); - k2.push_back("cute amory"); - k2.push_back("doris"); - v2.push_back(26); - v2.push_back(Null()); - v2.push_back(6); - v2.push_back(7); - k3.push_back("test"); - v3.push_back(11); - Map m1, m2, m3; - m1.push_back(k1); - m1.push_back(v1); - m2.push_back(k2); - m2.push_back(v2); - m3.push_back(k3); - m3.push_back(v3); - MutableColumnPtr map_column = m->create_column(); - map_column->reserve(3); - map_column->insert(m1); - map_column->insert(m2); - map_column->insert(m3); - vectorized::ColumnWithTypeAndName type_and_name(map_column->get_ptr(), m, col_name); - block.insert(type_and_name); - } - - tslot.__set_col_unique_id(1); - SlotDescriptor* slot = new SlotDescriptor(tslot); - tuple_desc.add_slot(slot); - RowDescriptor row_desc(&tuple_desc, true); - // arrow schema - std::shared_ptr _arrow_schema; - EXPECT_EQ(get_arrow_schema(row_desc, &_arrow_schema, "UTC"), Status::OK()); - - // serialize - std::shared_ptr result; - std::cout << "block structure: " << block.dump_structure() << std::endl; - std::cout << "_arrow_schema: " << _arrow_schema->ToString(true) << std::endl; - - cctz::time_zone timezone_obj; - TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, timezone_obj); - static_cast(convert_to_arrow_batch(block, _arrow_schema, arrow::default_memory_pool(), - &result, timezone_obj)); - Block new_block = block.clone_empty(); - EXPECT_TRUE(result != nullptr); - std::cout << "result: " << result->ToString() << std::endl; - // deserialize - auto* array = result->GetColumnByName(col_name).get(); - auto& column_with_type_and_name = new_block.get_by_name(col_name); - static_cast(arrow_column_to_doris_column(array, 0, column_with_type_and_name.column, - column_with_type_and_name.type, block.rows(), - "UTC")); - std::cout << block.dump_data() << std::endl; - std::cout << new_block.dump_data() << std::endl; - // new block row_index 0, 2 which row has key null will be filter - EXPECT_EQ(new_block.dump_one_line(0, 1), "{\"doris\":null, \"clever amory\":30}"); - EXPECT_EQ(new_block.dump_one_line(2, 1), "{\"test\":11}"); - EXPECT_EQ(block.dump_data(1, 1), new_block.dump_data(1, 1)); -} - -} // namespace doris::vectorized