Skip to content

Commit

Permalink
reader
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed May 26, 2020
1 parent 37afa4b commit 76800bb
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 44 deletions.
17 changes: 9 additions & 8 deletions be/src/olap/rowset/alpha_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ AlphaRowsetReader::AlphaRowsetReader(
: _num_rows_per_row_block(num_rows_per_row_block),
_rowset(std::move(rowset)),
_alpha_rowset_meta(std::static_pointer_cast<AlphaRowsetMeta>(_rowset->rowset_meta()).get()),
_segment_groups(_rowset->_segment_groups),
_key_range_size(0) {
_segment_groups(_rowset->_segment_groups) {
_rowset->aquire();
}

Expand All @@ -48,7 +47,6 @@ OLAPStatus AlphaRowsetReader::init(RowsetReaderContext* read_context) {
}

_is_segments_overlapping = _alpha_rowset_meta->is_segments_overlapping();
_ordinal = 0;

RETURN_NOT_OK(_init_merge_ctxs(read_context));

Expand Down Expand Up @@ -155,7 +153,10 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) {

VLOG(10) << "get merged row: " << row_cursor->to_string();

// TODO(yingchun): wrap these code
_read_block->get_row(_read_block->pos(), _dst_cursor);
// TODO(yingchun): optimize to avoid copy row, as long as this row is in memory
// (i.e. merge_ctx didn't switch to next block), we can use pointer and not copy.
copy_row(_dst_cursor, *row_cursor, _read_block->mem_pool());
_read_block->pos_inc();
num_rows_in_block++;
Expand Down Expand Up @@ -317,6 +318,7 @@ OLAPStatus AlphaRowsetReader::_pull_first_block(AlphaMergeContext* merge_ctx) {
}

OLAPStatus AlphaRowsetReader::_init_merge_ctxs(RowsetReaderContext* read_context) {
// TODO(yingchun): do these check in upper layer
if (read_context->reader_type == READER_QUERY) {
if (read_context->lower_bound_keys->size() != read_context->is_lower_keys_included->size()
|| read_context->lower_bound_keys->size() != read_context->upper_bound_keys->size()
Expand Down Expand Up @@ -358,8 +360,7 @@ OLAPStatus AlphaRowsetReader::_init_merge_ctxs(RowsetReaderContext* read_context
if (new_column_data->rowset_pruning_filter()) {
_stats->rows_stats_filtered += new_column_data->num_rows();
VLOG(3) << "filter segment group in query in condition. version="
<< new_column_data->version().first
<< "-" << new_column_data->version().second;
<< new_column_data->version();
continue;
}
}
Expand All @@ -368,15 +369,15 @@ OLAPStatus AlphaRowsetReader::_init_merge_ctxs(RowsetReaderContext* read_context
if (ret == DEL_SATISFIED) {
_stats->rows_del_filtered += new_column_data->num_rows();
VLOG(3) << "filter segment group in delete predicate:"
<< new_column_data->version().first << ", " << new_column_data->version().second;
<< new_column_data->version().first;
continue;
} else if (ret == DEL_PARTIAL_SATISFIED) {
VLOG(3) << "filter segment group partially in delete predicate:"
<< new_column_data->version().first << ", " << new_column_data->version().second;
<< new_column_data->version().first;
new_column_data->set_delete_status(DEL_PARTIAL_SATISFIED);
} else {
VLOG(3) << "not filter segment group in delete predicate:"
<< new_column_data->version().first << ", " << new_column_data->version().second;
<< new_column_data->version().first;
new_column_data->set_delete_status(DEL_NOT_SATISFIED);
}
AlphaMergeContext merge_ctx;
Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/rowset/alpha_rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class AlphaRowsetReader : public RowsetReader {
// merge by priority queue(_merge_heap)
// this method has same function with _pull_next_row_for_merge_rowset, but using heap merge.
// and this should replace the _pull_next_row_for_merge_rowset later.
// TODO(yingchun): remove _pull_next_row_for_merge_rowset now
OLAPStatus _pull_next_row_for_merge_rowset_v2(RowCursor** row);
// init the merge heap, this should be call before calling _pull_next_row_for_merge_rowset_v2();
OLAPStatus _init_merge_heap();
Expand All @@ -109,15 +110,15 @@ class AlphaRowsetReader : public RowsetReader {
std::unique_ptr<RowBlock> _read_block;
OLAPStatus (AlphaRowsetReader::*_next_block)(RowBlock** block) = nullptr;
RowCursor* _dst_cursor = nullptr;
int _key_range_size;
int _key_range_size = 0;

// In streaming ingestion, row among different segment
// groups may overlap, and is necessary to be taken
// into consideration deliberately.
bool _is_segments_overlapping;

// ordinal of ColumnData upon reading
size_t _ordinal;
// ordinal of ColumnData upon reading, only used for _union_block (non-ordered read?).
size_t _ordinal = 0;

RowsetReaderContext* _current_read_context;
OlapReaderStatistics _owned_stats;
Expand Down
37 changes: 15 additions & 22 deletions be/src/olap/rowset/column_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,19 @@
namespace doris {

ColumnData* ColumnData::create(SegmentGroup* segment_group) {
ColumnData* data = new(std::nothrow) ColumnData(segment_group);
return data;
return new(std::nothrow) ColumnData(segment_group);
}

ColumnData::ColumnData(SegmentGroup* segment_group)
: _segment_group(segment_group),
_eof(false),
_conditions(nullptr),
_col_predicates(nullptr),
_delete_status(DEL_NOT_SATISFIED),
_runtime_state(nullptr),
_schema(segment_group->get_tablet_schema()),
_is_using_cache(false),
_segment_reader(nullptr),
_lru_cache(nullptr) {
_num_rows_per_block(segment_group->get_num_rows_per_row_block()) {
if (StorageEngine::instance() != nullptr) {
_lru_cache = StorageEngine::instance()->index_stream_lru_cache();
} else {
// for independent usage, eg: unit test/segment tool
_lru_cache = FileHandler::get_fd_cache();
}
_num_rows_per_block = _segment_group->get_num_rows_per_row_block();
}

ColumnData::~ColumnData() {
Expand Down Expand Up @@ -92,6 +83,7 @@ OLAPStatus ColumnData::_next_row(const RowCursor** row, bool without_filter) {
return OLAP_SUCCESS;
}

// TODO(yingchun): what does DEL_NOT_SATISFIED mean?
// when without_filter is true, _include_blocks is nullptr
if (_read_block->block_status() == DEL_NOT_SATISFIED) {
*row = &_cursor;
Expand Down Expand Up @@ -130,8 +122,7 @@ OLAPStatus ColumnData::_seek_to_block(const RowBlockPosition& block_pos, bool wi
return OLAP_ERR_DATA_EOF;
}
SAFE_DELETE(_segment_reader);
std::string file_name;
file_name = segment_group()->construct_data_file_path(block_pos.segment);
std::string file_name = segment_group()->construct_data_file_path(block_pos.segment);
_segment_reader = new(std::nothrow) SegmentReader(
file_name, segment_group(), block_pos.segment,
_seek_columns, _load_bf_columns, _conditions,
Expand Down Expand Up @@ -173,7 +164,8 @@ OLAPStatus ColumnData::_find_position_by_short_key(
}
return res;
}
res = segment_group()->find_prev_point(tmp_pos, position);
// TODO(yingchun): why get prev?
res = _segment_group->find_prev_point(tmp_pos, position);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("find prev row block failed. [res=%d]", res);
return res;
Expand All @@ -194,7 +186,7 @@ OLAPStatus ColumnData::_find_position_by_full_key(
return res;
}
RowBlockPosition start_position;
res = segment_group()->find_prev_point(tmp_pos, &start_position);
res = _segment_group->find_prev_point(tmp_pos, &start_position);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("find prev row block failed. [res=%d]", res);
return res;
Expand All @@ -220,7 +212,7 @@ OLAPStatus ColumnData::_find_position_by_full_key(
OLAPIndexOffset index_offset;
index_offset.segment = _end_segment;
index_offset.offset = _end_block;
res = segment_group()->get_row_block_position(index_offset, &end_position);
res = _segment_group->get_row_block_position(index_offset, &end_position);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("fail to get row block position. [res=%d]", res);
return res;
Expand Down Expand Up @@ -276,6 +268,7 @@ OLAPStatus ColumnData::_find_position_by_full_key(
return OLAP_SUCCESS;
}

// TODO(yingchun): find_last_key -> include_last_key?
OLAPStatus ColumnData::_seek_to_row(const RowCursor& key, bool find_last_key, bool is_end_key) {
RowBlockPosition position;
OLAPStatus res = OLAP_SUCCESS;
Expand Down Expand Up @@ -324,7 +317,7 @@ OLAPStatus ColumnData::_seek_to_row(const RowCursor& key, bool find_last_key, bo
} else {
// 找last key。返回大于这个key的第一个。也就是
// row_cursor > key
while (res == OLAP_SUCCESS && compare_row_key(*row_cursor,key) <= 0) {
while (res == OLAP_SUCCESS && compare_row_key(*row_cursor, key) <= 0) {
res = _next_row(&row_cursor, without_filter);
}
}
Expand Down Expand Up @@ -353,7 +346,7 @@ OLAPStatus ColumnData::prepare_block_read(
const RowCursor* end_key, bool find_end_key,
RowBlock** first_block) {
SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
set_eof(false);
_eof = false;
_end_key_is_set = false;
_is_normal_read = false;
// set end position
Expand All @@ -366,13 +359,13 @@ OLAPStatus ColumnData::prepare_block_read(
_end_row_index = _read_block->pos();
_end_key_is_set = true;
} else if (res != OLAP_ERR_DATA_EOF) {
LOG(WARNING) << "Find end key failed.key=" << end_key->to_string();
LOG(WARNING) << "Find end key failed. key=" << end_key->to_string();
return res;
}
// res == OLAP_ERR_DATA_EOF means there is no end key, then we read to
// the end of this ColumnData
}
set_eof(false);
_eof = false;
if (start_key != nullptr) {
auto res = _seek_to_row(*start_key, !find_start_key, false);
if (res == OLAP_SUCCESS) {
Expand All @@ -382,7 +375,7 @@ OLAPStatus ColumnData::prepare_block_read(
*first_block = nullptr;
return res;
} else {
LOG(WARNING) << "start_key can't be found.key=" << start_key->to_string();
LOG(WARNING) << "start_key can't be found. key=" << start_key->to_string();
return res;
}
} else {
Expand Down Expand Up @@ -663,7 +656,7 @@ OLAPStatus ColumnData::_get_block(bool without_filter, int rows_read) {
_stats->rows_vec_cond_filtered += old_size - vec_batch->size();
}
// if vector is empty after predicate evaluate, get next block
if (vec_batch->size() == 0) {
if (vec_batch->empty()) {
continue;
}
SCOPED_RAW_TIMER(&_stats->block_convert_ns);
Expand Down
23 changes: 12 additions & 11 deletions be/src/olap/rowset/column_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace doris {
class Tablet;
class SegmentReader;

// TODO(yingchun): why not rename it to ColumnDataReader?
// This class is column data reader. this class will be used in two case.
class ColumnData {
public:
Expand Down Expand Up @@ -100,7 +101,6 @@ class ColumnData {
// 开放接口查询_eof,让外界知道数据读取是否正常终止
// 因为这个函数被频繁访问, 从性能考虑, 放在基类而不是虚函数
bool eof() { return _eof; }
void set_eof(bool eof) { _eof = eof; }
bool* eof_ptr() { return &_eof; }

bool empty() const { return _segment_group->empty(); }
Expand All @@ -110,6 +110,7 @@ class ColumnData {
int delete_pruning_filter();
uint64_t get_filted_rows();

// TODO(yingchun): remove it
SegmentGroup* segment_group() const { return _segment_group; }
void set_segment_group(SegmentGroup* segment_group) { _segment_group = segment_group; }
int64_t num_rows() const { return _segment_group->num_rows(); }
Expand Down Expand Up @@ -154,29 +155,29 @@ class ColumnData {
return &_cursor;
}
private:
SegmentGroup* _segment_group;
SegmentGroup* _segment_group = nullptr;
// 当到达文件末尾或者到达end key时设置此标志
bool _eof;
const Conditions* _conditions;
const std::vector<ColumnPredicate*>* _col_predicates;
bool _eof = false;
const Conditions* _conditions = nullptr;
const std::vector<ColumnPredicate*>* _col_predicates = nullptr;
const DeleteHandler*_delete_handler = nullptr;
DelCondSatisfied _delete_status;
RuntimeState* _runtime_state;
OlapReaderStatistics* _stats;
DelCondSatisfied _delete_status = DEL_NOT_SATISFIED;
RuntimeState* _runtime_state = nullptr;
OlapReaderStatistics* _stats = nullptr;

const TabletSchema& _schema;
// whether in normal read, use return columns to load block
bool _is_normal_read = false;
bool _end_key_is_set = false;
bool _is_using_cache;
bool _is_using_cache = false;
bool _segment_eof = false;
bool _need_eval_predicates = false;

std::vector<uint32_t> _return_columns;
std::vector<uint32_t> _seek_columns;
std::set<uint32_t> _load_bf_columns;

SegmentReader* _segment_reader;
SegmentReader* _segment_reader = nullptr;

std::unique_ptr<VectorizedRowBatch> _seek_vector_batch;
std::unique_ptr<VectorizedRowBatch> _read_vector_batch;
Expand All @@ -195,7 +196,7 @@ class ColumnData {
int64_t _end_row_index = 0;

size_t _num_rows_per_block;
Cache* _lru_cache;
Cache* _lru_cache = nullptr;
};

class ColumnDataComparator {
Expand Down

0 comments on commit 76800bb

Please sign in to comment.