Skip to content

Commit

Permalink
Fulltext realtime index (#2450)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Fix docid decode from buffer
Add realtime index for fulltext index.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
  • Loading branch information
small-turtle-1 authored Jan 13, 2025
1 parent 8b30e7d commit 6166f1e
Show file tree
Hide file tree
Showing 15 changed files with 226 additions and 13 deletions.
125 changes: 123 additions & 2 deletions python/restart_test/test_fulltext.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from infinity.errors import ErrorCode
from restart_util import *
from util import RtnThread
import pickle


@pytest.mark.slow
class TestFullText:
@pytest.mark.slow
@pytest.mark.parametrize(
"config",
[
Expand All @@ -37,7 +38,9 @@ def test_fulltext(self, infinity_runner: InfinityRunner, config: str):
infinity_runner.clear()

decorator = infinity_runner_decorator_factory(config, uri, infinity_runner)
decorator2 = infinity_runner_decorator_factory(config, uri, infinity_runner, shutdown_out=True)
decorator2 = infinity_runner_decorator_factory(
config, uri, infinity_runner, shutdown_out=True
)

@decorator
def part1(infinity_obj):
Expand Down Expand Up @@ -221,6 +224,124 @@ def part3(infinity_obj):

part3()

def test_fulltext_realtime(self, infinity_runner: InfinityRunner):
enwiki_path = "test/data/csv/enwiki_9999.csv"
enwiki_size = 10000
config = "test/data/config/restart_test/test_fulltext/1.toml"
uri = common_values.TEST_LOCAL_HOST
infinity_runner.clear()

decorator = infinity_runner_decorator_factory(config, uri, infinity_runner)

matching_text = "American"
test_num = 100

@decorator
def get_gt_list(infinity_obj):
gt_res_list = []
gt_table_name = "test_fulltext_gt"
db_obj = infinity_obj.get_database("default_db")
db_obj.drop_table(gt_table_name, ConflictType.Ignore)

gt_table_obj = db_obj.create_table(
gt_table_name,
{
"id": {"type": "int"},
"title": {"type": "varchar"},
"date": {"type": "varchar"},
"body": {"type": "varchar"},
},
ConflictType.Error,
)
enwiki_gen1 = EnwikiGenerator.gen_factory(enwiki_path)(enwiki_size)
for id, (title, date, body) in enumerate(enwiki_gen1):
gt_table_obj.insert(
[{"id": id, "title": title, "date": date, "body": body}]
)
gt_table_obj.create_index(
"ft_index",
index.IndexInfo("body", index.IndexType.FullText),
ConflictType.Error,
)

for i in range(test_num):
bound = enwiki_size // test_num * (i + 1)
gt_res = (
gt_table_obj.output(["id"])
.match_text(
fields="body",
matching_text=matching_text,
topn=3,
extra_options=None,
)
.filter("id<{}".format(bound))
.to_result()
)
gt_data_dict, _, _ = gt_res
gt_res_list.append(gt_data_dict)

db_obj.drop_table(gt_table_name, ConflictType.Error)
return gt_res_list

pickle_dir = "tmp"
if not os.path.exists(pickle_dir):
os.makedirs(pickle_dir)
gt_filename = f"{pickle_dir}/gt_res_list_{enwiki_size}.pkl"
if os.path.exists(gt_filename):
with open(gt_filename, "rb") as f:
gt_res_list = pickle.load(f)
else:
gt_res_list = get_gt_list()
with open(gt_filename, "wb") as f:
pickle.dump(gt_res_list, f)

@decorator
def test(infinity_obj):
table_name = "test_fulltext"
db_obj = infinity_obj.get_database("default_db")
db_obj.drop_table(table_name, ConflictType.Ignore)

table_obj = db_obj.create_table(
table_name,
{
"id": {"type": "int"},
"title": {"type": "varchar"},
"date": {"type": "varchar"},
"body": {"type": "varchar"},
},
ConflictType.Error,
)
table_obj.create_index(
"ft_index",
index.IndexInfo("body", index.IndexType.FullText, {"realtime": "true"}),
ConflictType.Error,
)

enwiki_gen2 = EnwikiGenerator.gen_factory(enwiki_path)(enwiki_size)
query_everytime = False
for id, (title, date, body) in enumerate(enwiki_gen2):
table_obj.insert(
[{"id": id, "title": title, "date": date, "body": body}]
)
if (id + 1) % (enwiki_size // test_num) == 0 or query_everytime:
gt_i = (id + 1) // (enwiki_size // test_num) - 1
res = (
table_obj.output(["id"])
.match_text(
fields="body",
matching_text=matching_text,
topn=3,
extra_options=None,
)
.to_result()
)
data_dict, _, _ = res
assert data_dict == gt_res_list[gt_i]

db_obj.drop_table(table_name, ConflictType.Error)

test()


if __name__ == "__main__":
pass
12 changes: 11 additions & 1 deletion src/storage/definition/index_full_text.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ SharedPtr<IndexBase> IndexFullText::Make(SharedPtr<String> index_name,
Vector<String> column_names,
const Vector<InitParameter *> &index_param_list) {
String analyzer_name{};
u64 flag = OPTION_FLAG_ALL;
optionflag_t flag = OPTION_FLAG_ALL;
SizeT param_count = index_param_list.size();
for (SizeT param_idx = 0; param_idx < param_count; ++param_idx) {
InitParameter *parameter = index_param_list[param_idx];
Expand All @@ -54,6 +54,16 @@ SharedPtr<IndexBase> IndexFullText::Make(SharedPtr<String> index_name,
analyzer_name = parameter->param_value_;
} else if (para_name == "flag") {
flag = std::strtoul(parameter->param_value_.c_str(), nullptr, 10);
} else if (para_name == "realtime") {
String realtime_str = parameter->param_value_;
ToLowerString(realtime_str);
if (realtime_str == "true") {
FlagAddRealtime(flag);
} else if (realtime_str != "false") {
LOG_WARN(fmt::format("Unknown parameter value: {}, {}", para_name, parameter->param_value_));
}
} else {
LOG_WARN(fmt::format("Unknown parameter: {}", para_name));
}
}
if (analyzer_name.empty()) {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/definition/index_full_text.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public:

static SharedPtr<IndexFullText> Deserialize(const nlohmann::json &index_def_json);

bool IsRealtime() const { return FlagIsRealtime(flag_); }

public:
static void ValidateColumnDataType(const SharedPtr<BaseTableRef> &base_table_ref, const String &column_name);

Expand Down
8 changes: 8 additions & 0 deletions src/storage/invertedindex/column_inverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ void ColumnInverter::Merge(ColumnInverter &rhs) {
}
doc_count_ += rhs.doc_count_;
merged_++;
if (semas_.empty()) {
semas_ = std::move(rhs.semas_);
} else {
semas_.reserve(semas_.size() + rhs.semas_.size());
std::move(rhs.semas_.begin(), rhs.semas_.end(), std::back_inserter(semas_));
rhs.semas_.clear();
}

rhs.terms_per_doc_.clear();
rhs.doc_count_ = 0;
rhs.merged_ = 0;
Expand Down
5 changes: 5 additions & 0 deletions src/storage/invertedindex/column_inverter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public:

void SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter> &buf_writer);

void AddSema(std::binary_semaphore *sema) { semas_.push_back(sema); }

const Vector<std::binary_semaphore *> &semas() const { return semas_; }

private:
using TermBuffer = Vector<char>;
using PosInfoVec = Vector<PosInfo>;
Expand Down Expand Up @@ -124,5 +128,6 @@ private:
Vector<Pair<u32, UniquePtr<TermList>>> terms_per_doc_;
PostingWriterProvider posting_writer_provider_{};
VectorWithLock<u32> &column_lengths_;
Vector<std::binary_semaphore *> semas_{};
};
} // namespace infinity
4 changes: 1 addition & 3 deletions src/storage/invertedindex/format/inmem_doc_list_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ bool InMemDocListDecoder::DecodeSkipListWithoutSkipList(docid_t last_doc_id_in_p
doc_buffer_to_copy_ = new docid_t[MAX_DOC_PER_RECORD];

finish_decoded_ = false;
if (!doc_list_reader_.Seek(offset)) {
return false;
}
doc_list_reader_.Seek(offset);
if (!doc_list_reader_.Decode(doc_buffer_to_copy_, MAX_DOC_PER_RECORD, decode_count_)) {
return false;
}
Expand Down
5 changes: 5 additions & 0 deletions src/storage/invertedindex/index_defines.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export {
of_position_list = 4, // 1 << 2
of_term_frequency = 8, // 1 << 3
of_block_max = 16, // 1 << 4
of_realtime = 32, // 1 << 5
};

typedef u16 docpayload_t;
Expand All @@ -35,6 +36,10 @@ export {
constexpr optionflag_t NO_BLOCK_MAX = of_term_payload | of_doc_payload | of_position_list | of_term_frequency;
constexpr optionflag_t NO_TERM_FREQUENCY = of_term_payload | of_doc_payload;
constexpr optionflag_t OPTION_FLAG_NONE = of_none;

void FlagAddRealtime(optionflag_t &flag) { flag |= of_realtime; }
bool FlagIsRealtime(const optionflag_t &flag) { return flag & of_realtime; }

constexpr docid_t INVALID_DOCID = u32(-1);
constexpr RowID INVALID_ROWID = u64(-1);
constexpr pos_t INVALID_POSITION = u32(-1);
Expand Down
44 changes: 44 additions & 0 deletions src/storage/invertedindex/memory_indexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,44 @@ void MemoryIndexer::Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset
}
}

UniquePtr<std::binary_semaphore> MemoryIndexer::AsyncInsert(SharedPtr<ColumnVector> column_vector, u32 row_offset, u32 row_count) {
if (is_spilled_) {
Load();
}

u64 seq_inserted(0);
u32 doc_count(0);
{
std::unique_lock<std::mutex> lock(mutex_);
seq_inserted = seq_inserted_++;
doc_count = doc_count_;
doc_count_ += row_count;
}

auto task = MakeShared<BatchInvertTask>(seq_inserted, column_vector, row_offset, row_count, doc_count);
auto sema = MakeUnique<std::binary_semaphore>(0);

IncreaseMemoryUsage(sizeof(u32) * row_count);
PostingWriterProvider provider = [this](const String &term) -> SharedPtr<PostingWriter> { return GetOrAddPosting(term); };
auto inverter = MakeShared<ColumnInverter>(provider, column_lengths_);
inverter->InitAnalyzer(this->analyzer_);
inverter->AddSema(sema.get());
auto func = [this, task, inverter](int id) {
// LOG_INFO(fmt::format("online inverter {} begin", id));
SizeT column_length_sum = inverter->InvertColumn(task->column_vector_, task->row_offset_, task->row_count_, task->start_doc_id_);
column_length_sum_ += column_length_sum;
this->ring_inverted_.Put(task->task_seq_, inverter);
// LOG_INFO(fmt::format("online inverter {} end", id));
CommitSync(100);
};
{
std::unique_lock<std::mutex> lock(mutex_);
inflight_tasks_++;
}
inverting_thread_pool_.push(std::move(func));
return sema;
}

void MemoryIndexer::InsertGap(u32 row_count) {
if (is_spilled_) {
Load();
Expand Down Expand Up @@ -246,6 +284,12 @@ SizeT MemoryIndexer::CommitSync(SizeT wait_if_empty_ms) {
for (auto &inverter : inverters) {
mem_usage_change.Add(inverter->GeneratePosting());
num_generated += inverter->GetMerged();

if (const auto &semas = inverter->semas(); !semas.empty()) {
for (auto sema : semas) {
sema->release();
}
}
}
}
if (num_generated > 0) {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/invertedindex/memory_indexer.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public:
// Insert is non-blocking. Caller must ensure there's no RowID gap between each call.
void Insert(SharedPtr<ColumnVector> column_vector, u32 row_offset, u32 row_count, bool offline = false);

UniquePtr<std::binary_semaphore> AsyncInsert(SharedPtr<ColumnVector> column_vector, u32 row_offset, u32 row_count);

// InsertGap insert some empty documents. This is for abnormal case.
void InsertGap(u32 row_count);

Expand Down
10 changes: 8 additions & 2 deletions src/storage/meta/entry/segment_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ void SegmentIndexEntry::MemIndexInsert(SharedPtr<BlockEntry> block_entry,
u32 row_offset,
u32 row_count,
TxnTimeStamp commit_ts,
BufferManager *buffer_manager) {
BufferManager *buffer_manager,
TxnStore *txn_store) {
SegmentOffset block_offset = block_entry->segment_offset();
RowID begin_row_id = block_entry->base_row_id() + row_offset;

Expand Down Expand Up @@ -228,7 +229,12 @@ void SegmentIndexEntry::MemIndexInsert(SharedPtr<BlockEntry> block_entry,
}
}
SharedPtr<ColumnVector> column_vector = MakeShared<ColumnVector>(block_entry->GetConstColumnVector(buffer_manager, column_idx));
memory_indexer_->Insert(column_vector, row_offset, row_count, false);
if (index_fulltext->IsRealtime() && txn_store != nullptr) {
UniquePtr<std::binary_semaphore> sema = memory_indexer_->AsyncInsert(column_vector, row_offset, row_count);
txn_store->AddSemaphore(std::move(sema));
} else {
memory_indexer_->Insert(column_vector, row_offset, row_count, false);
}
break;
}
case IndexType::kHnsw: {
Expand Down
3 changes: 2 additions & 1 deletion src/storage/meta/entry/segment_index_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class EMVBIndexInMem;
class BMPIndexInMem;
class BaseMemIndex;
class MemoryIndexer;
class TxnStore;

export struct PopulateEntireConfig {
bool prepare_;
Expand Down Expand Up @@ -116,7 +117,7 @@ public:
SharedPtr<String> index_dir() const { return index_dir_; }

// MemIndexInsert is non-blocking. Caller must ensure there's no RowID gap between each call.
void MemIndexInsert(SharedPtr<BlockEntry> block_entry, u32 row_offset, u32 row_count, TxnTimeStamp commit_ts, BufferManager *buffer_manager);
void MemIndexInsert(SharedPtr<BlockEntry> block_entry, u32 row_offset, u32 row_count, TxnTimeStamp commit_ts, BufferManager *buffer_manager, TxnStore *txn_store);

// User shall invoke this reguarly to populate recently inserted rows into the fulltext index. Noop for other types of index.
void MemIndexCommit();
Expand Down
5 changes: 3 additions & 2 deletions src/storage/meta/entry/table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ void TableEntry::MemIndexInsertInner(TableIndexEntry *table_index_entry, Txn *tx
for (SizeT i = 0; i < num_ranges; i++) {
AppendRange &range = append_ranges[i];
SharedPtr<BlockEntry> block_entry = block_entries[i];
segment_index_entry->MemIndexInsert(block_entry, range.start_offset_, range.row_count_, txn->CommitTS(), txn->buffer_mgr());
segment_index_entry->MemIndexInsert(block_entry, range.start_offset_, range.row_count_, txn->CommitTS(), txn->buffer_mgr(), txn->txn_store());
if ((i == dump_idx && segment_index_entry->MemIndexRowCount() >= infinity::InfinityContext::instance().config()->MemIndexCapacity()) ||
(i == num_ranges - 1 && segment_entry->Room() <= 0)) {
SharedPtr<ChunkIndexEntry> chunk_index_entry = segment_index_entry->MemIndexDump();
Expand Down Expand Up @@ -893,7 +893,8 @@ void TableEntry::MemIndexRecover(BufferManager *buffer_manager, TxnTimeStamp ts)
range.start_offset_,
range.row_count_,
segment_index_entry->max_ts(),
buffer_manager);
buffer_manager,
nullptr);
}
segment_index_entry->MemIndexWaitInflightTasks();
message = fmt::format("Table {}.{} index {} segment {} MemIndex recovered.", *GetDBName(), *table_name_, index_name, segment_id);
Expand Down
4 changes: 4 additions & 0 deletions src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,10 @@ void Txn::CommitBottom() {
void Txn::PostCommit() {
txn_store_.MaintainCompactionAlg();

for (auto &sema : txn_store_.semas()) {
sema->acquire();
}

auto *wal_manager = InfinityContext::instance().storage()->wal_manager();
for (const SharedPtr<WalCmd> &wal_cmd : wal_entry_->cmds_) {
if (wal_cmd->GetType() == WalCommandType::CHECKPOINT) {
Expand Down
Loading

0 comments on commit 6166f1e

Please sign in to comment.