From 76800bb1c362b8e0f6ac12aa34970340b059c6b9 Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Fri, 22 May 2020 16:13:05 +0800 Subject: [PATCH] reader --- be/src/olap/rowset/alpha_rowset_reader.cpp | 17 +++++----- be/src/olap/rowset/alpha_rowset_reader.h | 7 ++-- be/src/olap/rowset/column_data.cpp | 37 +++++++++------------- be/src/olap/rowset/column_data.h | 23 +++++++------- 4 files changed, 40 insertions(+), 44 deletions(-) diff --git a/be/src/olap/rowset/alpha_rowset_reader.cpp b/be/src/olap/rowset/alpha_rowset_reader.cpp index 278576ae0de4c33..9463c87e0c407a8 100644 --- a/be/src/olap/rowset/alpha_rowset_reader.cpp +++ b/be/src/olap/rowset/alpha_rowset_reader.cpp @@ -27,8 +27,7 @@ AlphaRowsetReader::AlphaRowsetReader( : _num_rows_per_row_block(num_rows_per_row_block), _rowset(std::move(rowset)), _alpha_rowset_meta(std::static_pointer_cast(_rowset->rowset_meta()).get()), - _segment_groups(_rowset->_segment_groups), - _key_range_size(0) { + _segment_groups(_rowset->_segment_groups) { _rowset->aquire(); } @@ -48,7 +47,6 @@ OLAPStatus AlphaRowsetReader::init(RowsetReaderContext* read_context) { } _is_segments_overlapping = _alpha_rowset_meta->is_segments_overlapping(); - _ordinal = 0; RETURN_NOT_OK(_init_merge_ctxs(read_context)); @@ -155,7 +153,10 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) { VLOG(10) << "get merged row: " << row_cursor->to_string(); + // TODO(yingchun): wrap these code _read_block->get_row(_read_block->pos(), _dst_cursor); + // TODO(yingchun): optimize to avoid copy row, as long as this row is in memory + // (i.e. merge_ctx didn't switch to next block), we can use pointer and not copy. copy_row(_dst_cursor, *row_cursor, _read_block->mem_pool()); _read_block->pos_inc(); num_rows_in_block++; @@ -317,6 +318,7 @@ OLAPStatus AlphaRowsetReader::_pull_first_block(AlphaMergeContext* merge_ctx) { } OLAPStatus AlphaRowsetReader::_init_merge_ctxs(RowsetReaderContext* read_context) { + // TODO(yingchun): do these check in upper layer if (read_context->reader_type == READER_QUERY) { if (read_context->lower_bound_keys->size() != read_context->is_lower_keys_included->size() || read_context->lower_bound_keys->size() != read_context->upper_bound_keys->size() @@ -358,8 +360,7 @@ OLAPStatus AlphaRowsetReader::_init_merge_ctxs(RowsetReaderContext* read_context if (new_column_data->rowset_pruning_filter()) { _stats->rows_stats_filtered += new_column_data->num_rows(); VLOG(3) << "filter segment group in query in condition. version=" - << new_column_data->version().first - << "-" << new_column_data->version().second; + << new_column_data->version(); continue; } } @@ -368,15 +369,15 @@ OLAPStatus AlphaRowsetReader::_init_merge_ctxs(RowsetReaderContext* read_context if (ret == DEL_SATISFIED) { _stats->rows_del_filtered += new_column_data->num_rows(); VLOG(3) << "filter segment group in delete predicate:" - << new_column_data->version().first << ", " << new_column_data->version().second; + << new_column_data->version().first; continue; } else if (ret == DEL_PARTIAL_SATISFIED) { VLOG(3) << "filter segment group partially in delete predicate:" - << new_column_data->version().first << ", " << new_column_data->version().second; + << new_column_data->version().first; new_column_data->set_delete_status(DEL_PARTIAL_SATISFIED); } else { VLOG(3) << "not filter segment group in delete predicate:" - << new_column_data->version().first << ", " << new_column_data->version().second; + << new_column_data->version().first; new_column_data->set_delete_status(DEL_NOT_SATISFIED); } AlphaMergeContext merge_ctx; diff --git a/be/src/olap/rowset/alpha_rowset_reader.h b/be/src/olap/rowset/alpha_rowset_reader.h index 3f3eb584a647c50..ffd4cf2487278ff 100644 --- a/be/src/olap/rowset/alpha_rowset_reader.h +++ b/be/src/olap/rowset/alpha_rowset_reader.h @@ -89,6 +89,7 @@ class AlphaRowsetReader : public RowsetReader { // merge by priority queue(_merge_heap) // this method has same function with _pull_next_row_for_merge_rowset, but using heap merge. // and this should replace the _pull_next_row_for_merge_rowset later. + // TODO(yingchun): remove _pull_next_row_for_merge_rowset now OLAPStatus _pull_next_row_for_merge_rowset_v2(RowCursor** row); // init the merge heap, this should be call before calling _pull_next_row_for_merge_rowset_v2(); OLAPStatus _init_merge_heap(); @@ -109,15 +110,15 @@ class AlphaRowsetReader : public RowsetReader { std::unique_ptr _read_block; OLAPStatus (AlphaRowsetReader::*_next_block)(RowBlock** block) = nullptr; RowCursor* _dst_cursor = nullptr; - int _key_range_size; + int _key_range_size = 0; // In streaming ingestion, row among different segment // groups may overlap, and is necessary to be taken // into consideration deliberately. bool _is_segments_overlapping; - // ordinal of ColumnData upon reading - size_t _ordinal; + // ordinal of ColumnData upon reading, only used for _union_block (non-ordered read?). + size_t _ordinal = 0; RowsetReaderContext* _current_read_context; OlapReaderStatistics _owned_stats; diff --git a/be/src/olap/rowset/column_data.cpp b/be/src/olap/rowset/column_data.cpp index 080564d2d7ad087..61f4b5d8e980ed6 100644 --- a/be/src/olap/rowset/column_data.cpp +++ b/be/src/olap/rowset/column_data.cpp @@ -25,28 +25,19 @@ namespace doris { ColumnData* ColumnData::create(SegmentGroup* segment_group) { - ColumnData* data = new(std::nothrow) ColumnData(segment_group); - return data; + return new(std::nothrow) ColumnData(segment_group); } ColumnData::ColumnData(SegmentGroup* segment_group) : _segment_group(segment_group), - _eof(false), - _conditions(nullptr), - _col_predicates(nullptr), - _delete_status(DEL_NOT_SATISFIED), - _runtime_state(nullptr), _schema(segment_group->get_tablet_schema()), - _is_using_cache(false), - _segment_reader(nullptr), - _lru_cache(nullptr) { + _num_rows_per_block(segment_group->get_num_rows_per_row_block()) { if (StorageEngine::instance() != nullptr) { _lru_cache = StorageEngine::instance()->index_stream_lru_cache(); } else { // for independent usage, eg: unit test/segment tool _lru_cache = FileHandler::get_fd_cache(); } - _num_rows_per_block = _segment_group->get_num_rows_per_row_block(); } ColumnData::~ColumnData() { @@ -92,6 +83,7 @@ OLAPStatus ColumnData::_next_row(const RowCursor** row, bool without_filter) { return OLAP_SUCCESS; } + // TODO(yingchun): what does DEL_NOT_SATISFIED mean? // when without_filter is true, _include_blocks is nullptr if (_read_block->block_status() == DEL_NOT_SATISFIED) { *row = &_cursor; @@ -130,8 +122,7 @@ OLAPStatus ColumnData::_seek_to_block(const RowBlockPosition& block_pos, bool wi return OLAP_ERR_DATA_EOF; } SAFE_DELETE(_segment_reader); - std::string file_name; - file_name = segment_group()->construct_data_file_path(block_pos.segment); + std::string file_name = segment_group()->construct_data_file_path(block_pos.segment); _segment_reader = new(std::nothrow) SegmentReader( file_name, segment_group(), block_pos.segment, _seek_columns, _load_bf_columns, _conditions, @@ -173,7 +164,8 @@ OLAPStatus ColumnData::_find_position_by_short_key( } return res; } - res = segment_group()->find_prev_point(tmp_pos, position); + // TODO(yingchun): why get prev? + res = _segment_group->find_prev_point(tmp_pos, position); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("find prev row block failed. [res=%d]", res); return res; @@ -194,7 +186,7 @@ OLAPStatus ColumnData::_find_position_by_full_key( return res; } RowBlockPosition start_position; - res = segment_group()->find_prev_point(tmp_pos, &start_position); + res = _segment_group->find_prev_point(tmp_pos, &start_position); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("find prev row block failed. [res=%d]", res); return res; @@ -220,7 +212,7 @@ OLAPStatus ColumnData::_find_position_by_full_key( OLAPIndexOffset index_offset; index_offset.segment = _end_segment; index_offset.offset = _end_block; - res = segment_group()->get_row_block_position(index_offset, &end_position); + res = _segment_group->get_row_block_position(index_offset, &end_position); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to get row block position. [res=%d]", res); return res; @@ -276,6 +268,7 @@ OLAPStatus ColumnData::_find_position_by_full_key( return OLAP_SUCCESS; } +// TODO(yingchun): find_last_key -> include_last_key? OLAPStatus ColumnData::_seek_to_row(const RowCursor& key, bool find_last_key, bool is_end_key) { RowBlockPosition position; OLAPStatus res = OLAP_SUCCESS; @@ -324,7 +317,7 @@ OLAPStatus ColumnData::_seek_to_row(const RowCursor& key, bool find_last_key, bo } else { // 找last key。返回大于这个key的第一个。也就是 // row_cursor > key - while (res == OLAP_SUCCESS && compare_row_key(*row_cursor,key) <= 0) { + while (res == OLAP_SUCCESS && compare_row_key(*row_cursor, key) <= 0) { res = _next_row(&row_cursor, without_filter); } } @@ -353,7 +346,7 @@ OLAPStatus ColumnData::prepare_block_read( const RowCursor* end_key, bool find_end_key, RowBlock** first_block) { SCOPED_RAW_TIMER(&_stats->block_fetch_ns); - set_eof(false); + _eof = false; _end_key_is_set = false; _is_normal_read = false; // set end position @@ -366,13 +359,13 @@ OLAPStatus ColumnData::prepare_block_read( _end_row_index = _read_block->pos(); _end_key_is_set = true; } else if (res != OLAP_ERR_DATA_EOF) { - LOG(WARNING) << "Find end key failed.key=" << end_key->to_string(); + LOG(WARNING) << "Find end key failed. key=" << end_key->to_string(); return res; } // res == OLAP_ERR_DATA_EOF means there is no end key, then we read to // the end of this ColumnData } - set_eof(false); + _eof = false; if (start_key != nullptr) { auto res = _seek_to_row(*start_key, !find_start_key, false); if (res == OLAP_SUCCESS) { @@ -382,7 +375,7 @@ OLAPStatus ColumnData::prepare_block_read( *first_block = nullptr; return res; } else { - LOG(WARNING) << "start_key can't be found.key=" << start_key->to_string(); + LOG(WARNING) << "start_key can't be found. key=" << start_key->to_string(); return res; } } else { @@ -663,7 +656,7 @@ OLAPStatus ColumnData::_get_block(bool without_filter, int rows_read) { _stats->rows_vec_cond_filtered += old_size - vec_batch->size(); } // if vector is empty after predicate evaluate, get next block - if (vec_batch->size() == 0) { + if (vec_batch->empty()) { continue; } SCOPED_RAW_TIMER(&_stats->block_convert_ns); diff --git a/be/src/olap/rowset/column_data.h b/be/src/olap/rowset/column_data.h index 06b9a8cefd60a19..49bc8d1929a86fb 100644 --- a/be/src/olap/rowset/column_data.h +++ b/be/src/olap/rowset/column_data.h @@ -37,6 +37,7 @@ namespace doris { class Tablet; class SegmentReader; +// TODO(yingchun): why not rename it to ColumnDataReader? // This class is column data reader. this class will be used in two case. class ColumnData { public: @@ -100,7 +101,6 @@ class ColumnData { // 开放接口查询_eof,让外界知道数据读取是否正常终止 // 因为这个函数被频繁访问, 从性能考虑, 放在基类而不是虚函数 bool eof() { return _eof; } - void set_eof(bool eof) { _eof = eof; } bool* eof_ptr() { return &_eof; } bool empty() const { return _segment_group->empty(); } @@ -110,6 +110,7 @@ class ColumnData { int delete_pruning_filter(); uint64_t get_filted_rows(); + // TODO(yingchun): remove it SegmentGroup* segment_group() const { return _segment_group; } void set_segment_group(SegmentGroup* segment_group) { _segment_group = segment_group; } int64_t num_rows() const { return _segment_group->num_rows(); } @@ -154,21 +155,21 @@ class ColumnData { return &_cursor; } private: - SegmentGroup* _segment_group; + SegmentGroup* _segment_group = nullptr; // 当到达文件末尾或者到达end key时设置此标志 - bool _eof; - const Conditions* _conditions; - const std::vector* _col_predicates; + bool _eof = false; + const Conditions* _conditions = nullptr; + const std::vector* _col_predicates = nullptr; const DeleteHandler*_delete_handler = nullptr; - DelCondSatisfied _delete_status; - RuntimeState* _runtime_state; - OlapReaderStatistics* _stats; + DelCondSatisfied _delete_status = DEL_NOT_SATISFIED; + RuntimeState* _runtime_state = nullptr; + OlapReaderStatistics* _stats = nullptr; const TabletSchema& _schema; // whether in normal read, use return columns to load block bool _is_normal_read = false; bool _end_key_is_set = false; - bool _is_using_cache; + bool _is_using_cache = false; bool _segment_eof = false; bool _need_eval_predicates = false; @@ -176,7 +177,7 @@ class ColumnData { std::vector _seek_columns; std::set _load_bf_columns; - SegmentReader* _segment_reader; + SegmentReader* _segment_reader = nullptr; std::unique_ptr _seek_vector_batch; std::unique_ptr _read_vector_batch; @@ -195,7 +196,7 @@ class ColumnData { int64_t _end_row_index = 0; size_t _num_rows_per_block; - Cache* _lru_cache; + Cache* _lru_cache = nullptr; }; class ColumnDataComparator {