From 9c5103808c313856f7a7e32ea79e3e4f5c4e1e84 Mon Sep 17 00:00:00 2001 From: shenyushi Date: Thu, 4 Jan 2024 17:26:35 +0800 Subject: [PATCH 1/4] Add new create index operator. --- .../operator/physical_create_index.cpp | 2 +- .../operator/physical_create_index_do.cpp | 76 +++++++++++++++ .../operator/physical_create_index_do.cppm | 62 ++++++++++++ .../operator/physical_create_index_finish.cpp | 56 +++++++++++ .../physical_create_index_finish.cppm | 60 ++++++++++++ .../physical_create_index_prepare.cpp | 70 +++++++++++++ .../physical_create_index_prepare.cppm | 62 ++++++++++++ src/executor/operator_state.cppm | 21 ++++ src/executor/physical_operator_type.cppm | 4 + src/function/table/create_index_data.cppm | 36 +++++++ src/planner/bound/base_table_ref.cppm | 4 + src/storage/meta/catalog.cpp | 11 +-- src/storage/meta/catalog.cppm | 13 ++- src/storage/meta/entry/column_index_entry.cpp | 11 +++ .../meta/entry/column_index_entry.cppm | 3 + .../meta/entry/segment_column_index_entry.cpp | 97 +++++++++++++++++++ .../entry/segment_column_index_entry.cppm | 4 + src/storage/meta/entry/segment_entry.cpp | 25 ++--- src/storage/meta/entry/segment_entry.cppm | 3 +- src/storage/meta/entry/table_entry.cpp | 8 +- src/storage/meta/entry/table_entry.cppm | 2 +- src/storage/meta/entry/table_index_entry.cpp | 11 +++ src/storage/meta/entry/table_index_entry.cppm | 3 + src/storage/meta/iter/segment_iter.cppm | 20 +++- src/storage/txn/txn.cpp | 27 +++++- src/storage/txn/txn.cppm | 6 +- src/storage/wal/wal_manager.cpp | 2 +- 27 files changed, 657 insertions(+), 42 deletions(-) create mode 100644 src/executor/operator/physical_create_index_do.cpp create mode 100644 src/executor/operator/physical_create_index_do.cppm create mode 100644 src/executor/operator/physical_create_index_finish.cpp create mode 100644 src/executor/operator/physical_create_index_finish.cppm create mode 100644 src/executor/operator/physical_create_index_prepare.cpp create mode 100644 src/executor/operator/physical_create_index_prepare.cppm create mode 100644 src/function/table/create_index_data.cppm diff --git a/src/executor/operator/physical_create_index.cpp b/src/executor/operator/physical_create_index.cpp index abc788c23f..c272e13a86 100644 --- a/src/executor/operator/physical_create_index.cpp +++ b/src/executor/operator/physical_create_index.cpp @@ -33,7 +33,7 @@ void PhysicalCreateIndex::Init() {} bool PhysicalCreateIndex::Execute(QueryContext *query_context, OperatorState *operator_state) { auto *txn = query_context->GetTxn(); - Status status = txn->CreateIndex(*schema_name_, *table_name_, index_def_ptr_, conflict_type_); + Status status = txn->CreateIndex(*schema_name_, *table_name_, index_def_ptr_, conflict_type_, false); if (!status.ok()) { operator_state->error_message_ = Move(status.msg_); } diff --git a/src/executor/operator/physical_create_index_do.cpp b/src/executor/operator/physical_create_index_do.cpp new file mode 100644 index 0000000000..e028a3fb65 --- /dev/null +++ b/src/executor/operator/physical_create_index_do.cpp @@ -0,0 +1,76 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module; + +import stl; +import parser; +import catalog; +import physical_operator_type; +import physical_operator; +import query_context; +import operator_state; +import load_meta; + +import index_def; +import create_index_data; +import base_table_ref; +import status; +import infinity_exception; +import buffer_handle; +import index_hnsw; +import index_base; +import hnsw_common; +import dist_func_l2; +import dist_func_ip; +import hnsw_alg; +import lvq_store; +import plain_store; +import buffer_manager; +import txn_store; +import third_party; +import logger; + +module physical_create_index_do; + +namespace infinity { +PhysicalCreateIndexDo::PhysicalCreateIndexDo(u64 id, + UniquePtr left, + SharedPtr base_table_ref, + SharedPtr index_name, + SharedPtr> output_names, + SharedPtr>> output_types, + SharedPtr> load_metas) + : PhysicalOperator(PhysicalOperatorType::kCreateIndexDo, Move(left), nullptr, id, load_metas), base_table_ref_(base_table_ref), + index_name_(index_name), output_names_(output_names), output_types_(output_types) {} + +void PhysicalCreateIndexDo::Init() {} + +// FIXME: fetch and add a block one time +bool PhysicalCreateIndexDo::Execute(QueryContext *query_context, OperatorState *operator_state) { + auto *txn = query_context->GetTxn(); + auto *create_index_do_state = static_cast(operator_state); + auto &create_index_idxes = create_index_do_state->create_index_shared_data_->create_index_idxes_; + + auto status = txn->CreateIndexDo(*base_table_ref_->schema_name(), *base_table_ref_->table_name(), *index_name_, create_index_idxes); + if (!status.ok()) { + operator_state->error_message_ = Move(status.msg_); + return false; + } + operator_state->SetComplete(); + + return true; +} + +}; // namespace infinity \ No newline at end of file diff --git a/src/executor/operator/physical_create_index_do.cppm b/src/executor/operator/physical_create_index_do.cppm new file mode 100644 index 0000000000..388324ae4b --- /dev/null +++ b/src/executor/operator/physical_create_index_do.cppm @@ -0,0 +1,62 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module; + +import stl; +import parser; +import physical_operator_type; +import physical_operator; +import query_context; +import operator_state; +import load_meta; + +import index_def; +import base_table_ref; + +export module physical_create_index_do; + +namespace infinity { + +export class PhysicalCreateIndexDo : public PhysicalOperator { +public: + PhysicalCreateIndexDo(u64 id, + UniquePtr left, + SharedPtr base_table_ref, + SharedPtr index_name, + SharedPtr> output_names, + SharedPtr>> output_types, + SharedPtr> load_metas); + +public: + void Init() override; + + bool Execute(QueryContext *query_context, OperatorState *operator_state) override; + + SizeT TaskletCount() override { return 0; } + + SharedPtr> GetOutputNames() const override { return output_names_; } + + SharedPtr>> GetOutputTypes() const override { return output_types_; } + +public: + // for create fragemnt context + const SharedPtr base_table_ref_{}; + const SharedPtr index_name_{}; + + const SharedPtr> output_names_{}; + const SharedPtr>> output_types_{}; +}; + +} // namespace infinity \ No newline at end of file diff --git a/src/executor/operator/physical_create_index_finish.cpp b/src/executor/operator/physical_create_index_finish.cpp new file mode 100644 index 0000000000..c2649ccfac --- /dev/null +++ b/src/executor/operator/physical_create_index_finish.cpp @@ -0,0 +1,56 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module; + +import stl; +import parser; +import physical_operator_type; +import physical_operator; +import query_context; +import operator_state; +import load_meta; +import index_def; +import wal; + +module physical_create_index_finish; + +namespace infinity { +PhysicalCreateIndexFinish::PhysicalCreateIndexFinish(u64 id, + UniquePtr left, + SharedPtr db_name, + SharedPtr table_name, + SharedPtr index_def, + SharedPtr> output_names, + SharedPtr>> output_types, + SharedPtr> load_metas) + : PhysicalOperator(PhysicalOperatorType::kCreateIndexFinish, Move(left), nullptr, id, load_metas), db_name_(db_name), table_name_(table_name), + index_def_(index_def), output_names_(output_names), output_types_(output_types) {} + +void PhysicalCreateIndexFinish::Init() {} + +bool PhysicalCreateIndexFinish::Execute(QueryContext *query_context, OperatorState *operator_state) { + auto *txn = query_context->GetTxn(); + auto *create_index_finish_op_state = static_cast(operator_state); + + if (create_index_finish_op_state->input_complete_) { + txn->AddWalCmd(MakeShared(*db_name_, *table_name_, index_def_)); + + operator_state->SetComplete(); + return true; + } + return false; +} + +} // namespace infinity \ No newline at end of file diff --git a/src/executor/operator/physical_create_index_finish.cppm b/src/executor/operator/physical_create_index_finish.cppm new file mode 100644 index 0000000000..0e5a295955 --- /dev/null +++ b/src/executor/operator/physical_create_index_finish.cppm @@ -0,0 +1,60 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module; + +import stl; +import parser; +import physical_operator_type; +import physical_operator; +import query_context; +import operator_state; +import load_meta; +import index_def; + +export module physical_create_index_finish; + +namespace infinity { +export class PhysicalCreateIndexFinish : public PhysicalOperator { +public: + PhysicalCreateIndexFinish(u64 id, + UniquePtr left, + SharedPtr db_name, + SharedPtr table_name, + SharedPtr index_def, + SharedPtr> output_names, + SharedPtr>> output_types, + SharedPtr> load_metas); + +public: + void Init() override; + + bool Execute(QueryContext *query_context, OperatorState *operator_state) override; + + SizeT TaskletCount() override { return 1; } + + SharedPtr> GetOutputNames() const override { return output_names_; } + + SharedPtr>> GetOutputTypes() const override { return output_types_; } + +public: + const SharedPtr db_name_{}; + const SharedPtr table_name_{}; + const SharedPtr index_def_{}; + + const SharedPtr> output_names_{}; + const SharedPtr>> output_types_{}; +}; + +} // namespace infinity \ No newline at end of file diff --git a/src/executor/operator/physical_create_index_prepare.cpp b/src/executor/operator/physical_create_index_prepare.cpp new file mode 100644 index 0000000000..ad8b60b1e5 --- /dev/null +++ b/src/executor/operator/physical_create_index_prepare.cpp @@ -0,0 +1,70 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module; + +import stl; +import parser; +import physical_operator_type; +import physical_operator; +import query_context; +import operator_state; +import load_meta; +import catalog; +import index_def; +import status; +import infinity_exception; +import index_base; +import index_file_worker; +import segment_iter; +import buffer_manager; +import buffer_handle; +import index_hnsw; +import default_values; +import txn_store; + +import hnsw_common; +import dist_func_l2; +import dist_func_ip; +import hnsw_alg; +import lvq_store; +import plain_store; + +module physical_create_index_prepare; + +namespace infinity { +PhysicalCreateIndexPrepare::PhysicalCreateIndexPrepare(u64 id, + SharedPtr schema_name, + SharedPtr table_name, + SharedPtr index_definition, + ConflictType conflict_type, + SharedPtr> output_names, + SharedPtr>> output_types, + SharedPtr> load_metas) + : PhysicalOperator(PhysicalOperatorType::kCreateIndexPrepare, nullptr, nullptr, id, load_metas), schema_name_(schema_name), + table_name_(table_name), index_def_ptr_(index_definition), conflict_type_(conflict_type), output_names_(output_names), + output_types_(output_types) {} + +void PhysicalCreateIndexPrepare::Init() {} + +bool PhysicalCreateIndexPrepare::Execute(QueryContext *query_context, OperatorState *operator_state) { + auto *txn = query_context->GetTxn(); + Status status = txn->CreateIndex(*schema_name_, *table_name_, index_def_ptr_, conflict_type_, true); + if (!status.ok()) { + operator_state->error_message_ = Move(status.msg_); + } + operator_state->SetComplete(); + return true; +} +} // namespace infinity \ No newline at end of file diff --git a/src/executor/operator/physical_create_index_prepare.cppm b/src/executor/operator/physical_create_index_prepare.cppm new file mode 100644 index 0000000000..b63d9ff43f --- /dev/null +++ b/src/executor/operator/physical_create_index_prepare.cppm @@ -0,0 +1,62 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module; + +import stl; +import parser; +import physical_operator_type; +import physical_operator; +import query_context; +import operator_state; +import load_meta; + +import index_def; + +export module physical_create_index_prepare; + +namespace infinity { +export class PhysicalCreateIndexPrepare : public PhysicalOperator { +public: + PhysicalCreateIndexPrepare(u64 id, + SharedPtr schema_name, + SharedPtr table_name, + SharedPtr index_definition, + ConflictType conflict_type, + SharedPtr> output_names, + SharedPtr>> output_types, + SharedPtr> load_metas); + +public: + void Init() override; + + bool Execute(QueryContext *query_context, OperatorState *operator_state) override; + + SizeT TaskletCount() override { return 1; } + + SharedPtr> GetOutputNames() const override { return output_names_; } + + SharedPtr>> GetOutputTypes() const override { return output_types_; } + +public: + const SharedPtr schema_name_{}; + const SharedPtr table_name_{}; + const SharedPtr index_def_ptr_{}; + const ConflictType conflict_type_{}; + + const SharedPtr> output_names_{}; + const SharedPtr>> output_types_{}; +}; + +} // namespace infinity \ No newline at end of file diff --git a/src/executor/operator_state.cppm b/src/executor/operator_state.cppm index 603721b3cf..2d4e7635ff 100644 --- a/src/executor/operator_state.cppm +++ b/src/executor/operator_state.cppm @@ -24,6 +24,7 @@ import knn_scan_data; import table_def; import parser; import merge_knn_data; +import create_index_data; import blocking_queue; export module operator_state; @@ -240,6 +241,26 @@ export struct CreateIndexOperatorState : public OperatorState { inline explicit CreateIndexOperatorState() : OperatorState(PhysicalOperatorType::kCreateIndex) {} }; +export struct CreateIndexPrepareOperatorState : public OperatorState { + inline explicit CreateIndexPrepareOperatorState() : OperatorState(PhysicalOperatorType::kCreateIndexPrepare) {} + + UniquePtr result_msg_{}; +}; + +export struct CreateIndexDoOperatorState : public OperatorState { + inline explicit CreateIndexDoOperatorState() : OperatorState(PhysicalOperatorType::kCreateIndexDo) {} + + bool input_complete_ = false; + CreateIndexSharedData *create_index_shared_data_; +}; + +export struct CreateIndexFinishOperatorState : public OperatorState { + inline explicit CreateIndexFinishOperatorState() : OperatorState(PhysicalOperatorType::kCreateIndexFinish) {} + + bool input_complete_ = false; + UniquePtr error_message_{}; +}; + // Create Collection export struct CreateCollectionOperatorState : public OperatorState { inline explicit CreateCollectionOperatorState() : OperatorState(PhysicalOperatorType::kCreateCollection) {} diff --git a/src/executor/physical_operator_type.cppm b/src/executor/physical_operator_type.cppm index 5d24b12b76..e9be8c31ed 100644 --- a/src/executor/physical_operator_type.cppm +++ b/src/executor/physical_operator_type.cppm @@ -70,6 +70,7 @@ export enum class PhysicalOperatorType : i8 { kInsert, kImport, kExport, + kCreateIndexDo, // DDL kAlter, @@ -84,6 +85,9 @@ export enum class PhysicalOperatorType : i8 { kDropDatabase, kDropView, + kCreateIndexPrepare, + kCreateIndexFinish, + // misc kExplain, kPreparedPlan, diff --git a/src/function/table/create_index_data.cppm b/src/function/table/create_index_data.cppm new file mode 100644 index 0000000000..ef27f0db9a --- /dev/null +++ b/src/function/table/create_index_data.cppm @@ -0,0 +1,36 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module; + +#include + +import stl; +import catalog; + +export module create_index_data; + +namespace infinity { + +export struct CreateIndexSharedData { + CreateIndexSharedData(const Map> &segment_map) { + for (const auto &[segment_id, _] : segment_map) { + create_index_idxes_.emplace(segment_id, 0); + } + } + + HashMap create_index_idxes_{}; +}; + +}; // namespace infinity \ No newline at end of file diff --git a/src/planner/bound/base_table_ref.cppm b/src/planner/bound/base_table_ref.cppm index 78789c5f2a..3c2a5d31a1 100644 --- a/src/planner/bound/base_table_ref.cppm +++ b/src/planner/bound/base_table_ref.cppm @@ -47,6 +47,10 @@ public: replace_field>(*column_types_, indices); }; + SharedPtr schema_name() const { return table_entry_ptr_->GetDBName(); } + + SharedPtr table_name() const { return table_entry_ptr_->GetTableName(); } + TableEntry *table_entry_ptr_{}; Vector column_ids_{}; SharedPtr block_index_{}; diff --git a/src/storage/meta/catalog.cpp b/src/storage/meta/catalog.cpp index 87d68ed5aa..b8e81f02cf 100644 --- a/src/storage/meta/catalog.cpp +++ b/src/storage/meta/catalog.cpp @@ -269,8 +269,9 @@ void NewCatalog::CreateIndexFile(TableEntry *table_entry, void *txn_store, TableIndexEntry *table_index_entry, TxnTimeStamp begin_ts, - BufferManager *buffer_mgr) { - return table_entry->CreateIndexFile(txn_store, table_index_entry, begin_ts, buffer_mgr); + BufferManager *buffer_mgr, + bool prepare) { + return table_entry->CreateIndexFile(txn_store, table_index_entry, begin_ts, buffer_mgr, prepare); } Status NewCatalog::RemoveIndexEntry(const String &index_name, TableIndexEntry *table_index_entry, u64 txn_id, TxnManager *txn_mgr) { @@ -314,15 +315,13 @@ u32 NewCatalog::GetNextSegmentID(TableEntry *table_entry) { return TableEntry::G u32 NewCatalog::GetMaxSegmentID(const TableEntry *table_entry) { return TableEntry::GetMaxSegmentID(table_entry); } -void NewCatalog::ImportSegment(TableEntry* table_entry, u32 segment_id, SharedPtr& segment_entry) { +void NewCatalog::ImportSegment(TableEntry *table_entry, u32 segment_id, SharedPtr &segment_entry) { table_entry->segment_map_.emplace(segment_id, Move(segment_entry)); // ATTENTION: focusing on the segment id table_entry->next_segment_id_++; } -void NewCatalog::IncreaseTableRowCount(TableEntry* table_entry, u64 increased_row_count) { - table_entry->row_count_ += increased_row_count; -} +void NewCatalog::IncreaseTableRowCount(TableEntry *table_entry, u64 increased_row_count) { table_entry->row_count_ += increased_row_count; } SharedPtr NewCatalog::GetFunctionSetByName(NewCatalog *catalog, String function_name) { // Transfer the function to upper case. diff --git a/src/storage/meta/catalog.cppm b/src/storage/meta/catalog.cppm index 2c6226baaa..cb039ab910 100644 --- a/src/storage/meta/catalog.cppm +++ b/src/storage/meta/catalog.cppm @@ -158,8 +158,12 @@ public: TxnTimeStamp begin_ts, TxnManager *txn_mgr); - static void - CreateIndexFile(TableEntry *table_entry, void *txn_store, TableIndexEntry *table_index_entry, TxnTimeStamp begin_ts, BufferManager *buffer_mgr); + static void CreateIndexFile(TableEntry *table_entry, + void *txn_store, + TableIndexEntry *table_index_entry, + TxnTimeStamp begin_ts, + BufferManager *buffer_mgr, + bool prepare); static Status RemoveIndexEntry(const String &index_name, TableIndexEntry *table_index_entry, u64 txn_id, TxnManager *txn_mgr); @@ -184,9 +188,10 @@ public: static u32 GetMaxSegmentID(const TableEntry *table_entry); - static void ImportSegment(TableEntry* table_entry, u32 segment_id, SharedPtr& segment_entry); + static void ImportSegment(TableEntry *table_entry, u32 segment_id, SharedPtr &segment_entry); + + static void IncreaseTableRowCount(TableEntry *table_entry, u64 increased_row_count); - static void IncreaseTableRowCount(TableEntry* table_entry, u64 increased_row_count); public: // Function related methods static SharedPtr GetFunctionSetByName(NewCatalog *catalog, String function_name); diff --git a/src/storage/meta/entry/column_index_entry.cpp b/src/storage/meta/entry/column_index_entry.cpp index ac57cc2bcf..ae79f2bbba 100644 --- a/src/storage/meta/entry/column_index_entry.cpp +++ b/src/storage/meta/entry/column_index_entry.cpp @@ -189,4 +189,15 @@ UniquePtr ColumnIndexEntry::CreateFileWorker(CreateIndexParam * String ColumnIndexEntry::IndexFileName(const String &index_name, u32 segment_id) { return Format("seg{}.idx", segment_id, index_name); } +Status ColumnIndexEntry::CreateIndexDo(const ColumnDef *column_def, HashMap &create_index_idxes) { + for (auto &[segment_id, segment_column_index_entry] : index_by_segment_) { + atomic_u64 &create_index_idx = create_index_idxes.at(segment_id); + auto status = segment_column_index_entry->CreateIndexDo(index_base_.get(), column_def, create_index_idx); + if (!status.ok()) { + return status; + } + } + return Status::OK(); +} + } // namespace infinity diff --git a/src/storage/meta/entry/column_index_entry.cppm b/src/storage/meta/entry/column_index_entry.cppm index 3c226d4489..5e4bf41a83 100644 --- a/src/storage/meta/entry/column_index_entry.cppm +++ b/src/storage/meta/entry/column_index_entry.cppm @@ -25,6 +25,7 @@ import index_base; import third_party; import index_base; import index_file_worker; +import status; namespace infinity { @@ -69,6 +70,8 @@ public: UniquePtr CreateFileWorker(CreateIndexParam *param, u32 segment_id); private: + Status CreateIndexDo(const ColumnDef *column_def, HashMap &create_index_idxes); + static SharedPtr DetermineIndexDir(const String &parent_dir, const String &index_name); void CommitCreatedIndex(u32 segment_id, UniquePtr index_entry); static String IndexFileName(const String &index_name, u32 segment_id); diff --git a/src/storage/meta/entry/segment_column_index_entry.cpp b/src/storage/meta/entry/segment_column_index_entry.cpp index 89fe3efaef..52b56f484f 100644 --- a/src/storage/meta/entry/segment_column_index_entry.cpp +++ b/src/storage/meta/entry/segment_column_index_entry.cpp @@ -26,6 +26,14 @@ import infinity_exception; import index_file_worker; import status; +import index_base; +import index_hnsw; +import hnsw_common; +import dist_func_l2; +import dist_func_ip; +import hnsw_alg; +import lvq_store; +import plain_store; namespace infinity { SegmentColumnIndexEntry::SegmentColumnIndexEntry(ColumnIndexEntry *column_index_entry, u32 segment_id, BufferObj *buffer) @@ -56,6 +64,95 @@ UniquePtr SegmentColumnIndexEntry::LoadIndexEntry(Colum BufferHandle SegmentColumnIndexEntry::GetIndex() { return buffer_->Load(); } +Status SegmentColumnIndexEntry::CreateIndexDo(IndexBase *index_base, const ColumnDef *column_def, atomic_u64 &create_index_idx) { + switch (index_base->index_type_) { + case IndexType::kHnsw: { + auto InsertHnswDo = [&](auto *hnsw_index, atomic_u64 &create_index_idx) { + SizeT vertex_n = hnsw_index->GetVertexNum(); + while (true) { + SizeT idx = create_index_idx.fetch_add(1); + if (idx % 10000 == 0) { + LOG_INFO(Format("Insert index: {}", idx)); + } + if (idx >= vertex_n) { + break; + } + hnsw_index->Build(idx); + } + }; + auto *index_hnsw = static_cast(index_base); + if (column_def->type()->type() != LogicalType::kEmbedding) { + Error("HNSW supports embedding type."); + } + TypeInfo *type_info = column_def->type()->type_info().get(); + auto embedding_info = static_cast(type_info); + + BufferHandle buffer_handle = GetIndex(); + + switch (embedding_info->Type()) { + case kElemFloat: { + switch (index_hnsw->encode_type_) { + case HnswEncodeType::kPlain: { + switch (index_hnsw->metric_type_) { + case MetricType::kMerticInnerProduct: { + auto *hnsw_index = + static_cast, PlainIPDist> *>(buffer_handle.GetDataMut()); + InsertHnswDo(hnsw_index, create_index_idx); + break; + } + case MetricType::kMerticL2: { + auto *hnsw_index = + static_cast, PlainL2Dist> *>(buffer_handle.GetDataMut()); + InsertHnswDo(hnsw_index, create_index_idx); + break; + } + default: { + Error("Not implemented"); + } + } + break; + } + case HnswEncodeType::kLVQ: { + switch (index_hnsw->metric_type_) { + case MetricType::kMerticInnerProduct: { + auto *hnsw_index = + static_cast>, LVQIPDist> *>( + buffer_handle.GetDataMut()); + InsertHnswDo(hnsw_index, create_index_idx); + break; + } + case MetricType::kMerticL2: { + auto *hnsw_index = + static_cast>, LVQL2Dist> *>( + buffer_handle.GetDataMut()); + InsertHnswDo(hnsw_index, create_index_idx); + break; + } + default: { + Error("Not implemented"); + } + } + break; + } + default: { + Error("Not implemented"); + } + } + break; + } + default: { + Error("Not implemented"); + } + } + break; + } + default: { + break; + } + } + return Status::OK(); +} + void SegmentColumnIndexEntry::UpdateIndex(TxnTimeStamp, FaissIndexPtr *, BufferManager *) { Error("Not implemented"); } bool SegmentColumnIndexEntry::Flush(TxnTimeStamp checkpoint_ts) { diff --git a/src/storage/meta/entry/segment_column_index_entry.cppm b/src/storage/meta/entry/segment_column_index_entry.cppm index 7c849a3f1f..044bf7dcc6 100644 --- a/src/storage/meta/entry/segment_column_index_entry.cppm +++ b/src/storage/meta/entry/segment_column_index_entry.cppm @@ -24,6 +24,8 @@ import third_party; import buffer_obj; import parser; import index_file_worker; +import status; +import index_base; namespace infinity { @@ -70,6 +72,8 @@ private: static UniquePtr LoadIndexEntry(ColumnIndexEntry *column_index_entry, u32 segment_id, BufferManager *buffer_manager, CreateIndexParam *create_index_param); + Status CreateIndexDo(IndexBase *index_base, const ColumnDef *column_def, atomic_u64 &create_index_idx); + private: const ColumnIndexEntry *column_index_entry_{}; u32 segment_id_{}; diff --git a/src/storage/meta/entry/segment_entry.cpp b/src/storage/meta/entry/segment_entry.cpp index b08be81326..16329e18ac 100644 --- a/src/storage/meta/entry/segment_entry.cpp +++ b/src/storage/meta/entry/segment_entry.cpp @@ -156,27 +156,12 @@ void SegmentEntry::DeleteData(u64 txn_id, TxnTimeStamp commit_ts, const HashMap< } } -template -class OneColumnIterator { -public: - OneColumnIterator(const SegmentEntry *entry, SizeT column_id) : segment_iter_(entry, MakeShared>(Vector{column_id})) {} - - Optional Next() { - if (auto ret = segment_iter_.Next(); ret) { - return reinterpret_cast((*ret)[0]); - } - return None; - } - -private: - SegmentIter segment_iter_; -}; - SharedPtr SegmentEntry::CreateIndexFile(ColumnIndexEntry *column_index_entry, SharedPtr column_def, TxnTimeStamp create_ts, BufferManager *buffer_mgr, - TxnTableStore *txn_store) { + TxnTableStore *txn_store, + bool prepare) { u64 column_id = column_def->id(); // SharedPtr index_def = index_def_entry->index_def_; const IndexBase *index_base = column_index_entry->index_base_ptr(); @@ -242,7 +227,11 @@ SharedPtr SegmentEntry::CreateIndexFile(ColumnIndexEntr segment_offset += DEFAULT_BLOCK_CAPACITY; } OneColumnIterator one_column_iter(this, column_id); - hnsw_index->InsertVecs(one_column_iter, row_ids.data(), row_ids.size()); + if (!prepare) { + hnsw_index->InsertVecs(one_column_iter, row_ids.data(), row_ids.size()); + } else { + hnsw_index->StoreData(one_column_iter, row_ids.data(), row_ids.size()); + } }; switch (embedding_info->Type()) { case kElemFloat: { diff --git a/src/storage/meta/entry/segment_entry.cppm b/src/storage/meta/entry/segment_entry.cppm index 7210a8dd6b..b73231f900 100644 --- a/src/storage/meta/entry/segment_entry.cppm +++ b/src/storage/meta/entry/segment_entry.cppm @@ -98,7 +98,8 @@ protected: SharedPtr column_def, TxnTimeStamp create_ts, BufferManager *buffer_mgr, - TxnTableStore *txn_store); + TxnTableStore *txn_store, + bool prepare); void CommitAppend(u64 txn_id, TxnTimeStamp commit_ts, u16 block_id, u16 start_pos, u16 row_count); diff --git a/src/storage/meta/entry/table_entry.cpp b/src/storage/meta/entry/table_entry.cpp index 834e472149..aa24f5a3f0 100644 --- a/src/storage/meta/entry/table_entry.cpp +++ b/src/storage/meta/entry/table_entry.cpp @@ -203,7 +203,11 @@ void TableEntry::Append(u64 txn_id, void *txn_store, BufferManager *buffer_mgr) } } -void TableEntry::CreateIndexFile(void *txn_store, TableIndexEntry *table_index_entry, TxnTimeStamp begin_ts, BufferManager *buffer_mgr) { +void TableEntry::CreateIndexFile(void *txn_store, + TableIndexEntry *table_index_entry, + TxnTimeStamp begin_ts, + BufferManager *buffer_mgr, + bool prepare) { IrsIndexEntry *irs_index_entry = table_index_entry->irs_index_entry().get(); if (irs_index_entry != nullptr) { @@ -222,7 +226,7 @@ void TableEntry::CreateIndexFile(void *txn_store, TableIndexEntry *table_index_e SharedPtr column_def = this->columns_[column_id]; for (const auto &[segment_id, segment_entry] : this->segment_map_) { SharedPtr segment_column_index_entry = - segment_entry->CreateIndexFile(column_index_entry, column_def, begin_ts, buffer_mgr, txn_store_ptr); + segment_entry->CreateIndexFile(column_index_entry, column_def, begin_ts, buffer_mgr, txn_store_ptr, prepare); column_index_entry->index_by_segment_.emplace(segment_id, segment_column_index_entry); } } else if (base_entry->entry_type_ == EntryType::kIRSIndex) { diff --git a/src/storage/meta/entry/table_entry.cppm b/src/storage/meta/entry/table_entry.cppm index bc65e0a005..ae4865ad5c 100644 --- a/src/storage/meta/entry/table_entry.cppm +++ b/src/storage/meta/entry/table_entry.cppm @@ -65,7 +65,7 @@ private: void RemoveIndexEntry(const String &index_name, u64 txn_id, TxnManager *txn_mgr); - void CreateIndexFile(void *txn_store, TableIndexEntry *table_index_entry, TxnTimeStamp begin_ts, BufferManager *buffer_mgr); + void CreateIndexFile(void *txn_store, TableIndexEntry *table_index_entry, TxnTimeStamp begin_ts, BufferManager *buffer_mgr, bool prepare); static void CommitCreateIndex(HashMap &txn_indexes_store_); diff --git a/src/storage/meta/entry/table_index_entry.cpp b/src/storage/meta/entry/table_index_entry.cpp index dea9fb0e5c..57811d51f0 100644 --- a/src/storage/meta/entry/table_index_entry.cpp +++ b/src/storage/meta/entry/table_index_entry.cpp @@ -178,4 +178,15 @@ SharedPtr TableIndexEntry::DetermineIndexDir(const String &parent_dir, c return index_dir; } +Status TableIndexEntry::CreateIndexDo(const TableEntry *table_entry, HashMap &create_index_idxes) { + if (column_index_map_.size() != 1) { + // TODO + Error("Not implemented"); + } + const auto &[column_id, column_index_entry] = *column_index_map_.begin(); + + const auto *column_def = table_entry->GetColumnDefByID(column_id); + return column_index_entry->CreateIndexDo(column_def, create_index_idxes); +} + } // namespace infinity diff --git a/src/storage/meta/entry/table_index_entry.cppm b/src/storage/meta/entry/table_index_entry.cppm index 4eff038a61..57532eb29e 100644 --- a/src/storage/meta/entry/table_index_entry.cppm +++ b/src/storage/meta/entry/table_index_entry.cppm @@ -24,6 +24,7 @@ import :base_entry; import stl; import index_def; import third_party; +import status; namespace infinity { @@ -62,6 +63,8 @@ public: const SharedPtr &irs_index_entry() const { return irs_index_entry_; } HashMap> &column_index_map() { return column_index_map_; } + Status CreateIndexDo(const TableEntry *table_entry, HashMap &create_index_idxes); + private: static SharedPtr DetermineIndexDir(const String &parent_dir, const String &index_name); diff --git a/src/storage/meta/iter/segment_iter.cppm b/src/storage/meta/iter/segment_iter.cppm index b2a78c0d5a..ac68cdb40b 100644 --- a/src/storage/meta/iter/segment_iter.cppm +++ b/src/storage/meta/iter/segment_iter.cppm @@ -25,7 +25,7 @@ namespace infinity { export class SegmentIter { public: SegmentIter(const SegmentEntry *entry, SharedPtr> column_ids) : entry_(entry), block_idx_(0), column_ids_(column_ids) { - const auto& block_entries = entry->block_entries(); + const auto &block_entries = entry->block_entries(); if (block_entries.empty()) { block_iter_ = None; } else { @@ -42,7 +42,7 @@ public: } block_idx_++; - const auto& block_entries = entry_->block_entries(); + const auto &block_entries = entry_->block_entries(); if (block_idx_ >= block_entries.size()) { block_iter_ = None; return None; @@ -59,4 +59,20 @@ private: SharedPtr> column_ids_; }; +export template +class OneColumnIterator { +public: + OneColumnIterator(const SegmentEntry *entry, SizeT column_id) : segment_iter_(entry, MakeShared>(Vector{column_id})) {} + + Optional Next() { + if (auto ret = segment_iter_.Next(); ret) { + return reinterpret_cast((*ret)[0]); + } + return None; + } + +private: + SegmentIter segment_iter_; +}; + } // namespace infinity \ No newline at end of file diff --git a/src/storage/txn/txn.cpp b/src/storage/txn/txn.cpp index 37f8c2a652..fe65bf7e15 100644 --- a/src/storage/txn/txn.cpp +++ b/src/storage/txn/txn.cpp @@ -48,7 +48,7 @@ namespace infinity { Txn::Txn(TxnManager *txn_mgr, NewCatalog *catalog, u32 txn_id) : txn_mgr_(txn_mgr), catalog_(catalog), txn_id_(txn_id), wal_entry_(MakeShared()) {} -Tuple Txn::GetTableEntry(const String &db_name, const String &table_name) { +Tuple Txn::GetTableEntry(const String &db_name, const String &table_name) { if (db_name_.empty()) { db_name_ = db_name; } else { @@ -267,7 +267,8 @@ Status Txn::DropTableCollectionByName(const String &db_name, const String &table return Status::OK(); } -Status Txn::CreateIndex(const String &db_name, const String &table_name, const SharedPtr &index_def, ConflictType conflict_type) { +Status +Txn::CreateIndex(const String &db_name, const String &table_name, const SharedPtr &index_def, ConflictType conflict_type, bool prepare) { TxnState txn_state = txn_context_.GetTxnState(); if (txn_state != TxnState::kStarted) { @@ -275,7 +276,8 @@ Status Txn::CreateIndex(const String &db_name, const String &table_name, const S } TxnTimeStamp begin_ts = txn_context_.GetBeginTS(); - auto [table_entry, table_index_entry, index_status] = catalog_->CreateIndex(db_name, table_name, index_def, conflict_type, txn_id_, begin_ts, txn_mgr_); + auto [table_entry, table_index_entry, index_status] = + catalog_->CreateIndex(db_name, table_name, index_def, conflict_type, txn_id_, begin_ts, txn_mgr_); if (!index_status.ok()) { return index_status; } @@ -293,12 +295,29 @@ Status Txn::CreateIndex(const String &db_name, const String &table_name, const S table_store = txn_tables_store_[table_name].get(); // Create Index Synchronously - NewCatalog::CreateIndexFile(table_entry, table_store, table_index_entry, begin_ts, GetBufferMgr()); + NewCatalog::CreateIndexFile(table_entry, table_store, table_index_entry, begin_ts, GetBufferMgr(), prepare); wal_entry_->cmds.push_back(MakeShared(db_name, table_name, index_def)); return index_status; } +Status Txn::CreateIndexDo(const String &db_name, const String &table_name, const String &index_name, HashMap &create_index_idxes) { + auto [table_entry, status] = GetTableEntry(db_name, table_name); + if (!status.ok()) { + return status; + } + auto *table_store = GetTxnTableStore(table_entry); + auto iter = table_store->txn_indexes_store_.find(index_name); + if (iter == table_store->txn_indexes_store_.end()) { + // the table is empty + return Status::OK(); + } + TxnIndexStore &txn_index_store = iter->second; + auto *table_index_entry = txn_index_store.table_index_entry_; + + return table_index_entry->CreateIndexDo(table_entry, create_index_idxes); +} + Status Txn::DropIndexByName(const String &db_name, const String &table_name, const String &index_name, ConflictType conflict_type) { TxnState txn_state = txn_context_.GetTxnState(); if (txn_state != TxnState::kStarted) { diff --git a/src/storage/txn/txn.cppm b/src/storage/txn/txn.cppm index b5ec9c2cfc..f73d1ebc80 100644 --- a/src/storage/txn/txn.cppm +++ b/src/storage/txn/txn.cppm @@ -90,10 +90,12 @@ public: Status GetCollectionByName(const String &db_name, const String &table_name, BaseEntry *&collection_entry); - Tuple GetTableEntry(const String &db_name, const String &table_name); + Tuple GetTableEntry(const String &db_name, const String &table_name); // Index OPs - Status CreateIndex(const String &db_name, const String &table_name, const SharedPtr &index_def, ConflictType conflict_type); + Status CreateIndex(const String &db_name, const String &table_name, const SharedPtr &index_def, ConflictType conflict_type, bool prepare); + + Status CreateIndexDo(const String &db_name, const String &table_name, const String &index_name, HashMap &create_index_idxes); Status DropIndexByName(const String &db_name, const String &table_name, const String &index_name, ConflictType conflict_type); diff --git a/src/storage/wal/wal_manager.cpp b/src/storage/wal/wal_manager.cpp index 0063d24907..47a01c7e37 100644 --- a/src/storage/wal/wal_manager.cpp +++ b/src/storage/wal/wal_manager.cpp @@ -652,7 +652,7 @@ void WalManager::WalCmdCreateIndexReplay(const WalCmdCreateIndex &cmd, u64 txn_i auto fake_txn = MakeUnique(storage_->txn_manager(), storage_->catalog(), txn_id); auto table_store = MakeShared(table_entry, fake_txn.get()); - NewCatalog::CreateIndexFile(table_entry, table_store.get(), table_index_entry, commit_ts, storage_->buffer_manager()); + NewCatalog::CreateIndexFile(table_entry, table_store.get(), table_index_entry, commit_ts, storage_->buffer_manager(), false); NewCatalog::CommitCreateIndex(table_store->txn_indexes_store_); table_index_entry->Commit(commit_ts); } From c9cd28f9d2103ac704e43f00a79c877c8d13e68d Mon Sep 17 00:00:00 2001 From: shenyushi Date: Fri, 5 Jan 2024 13:11:57 +0800 Subject: [PATCH 2/4] implement parallel create index. --- src/executor/fragment_builder.cpp | 49 +++- src/executor/operator/physical_knn_scan.cpp | 6 +- src/executor/operator/physical_sink.cpp | 77 +++--- src/executor/operator_state.cpp | 12 +- src/executor/physical_operator_type.cpp | 6 + src/executor/physical_planner.cpp | 55 ++++- src/function/table/knn_scan_data.cpp | 18 +- src/function/table/knn_scan_data.cppm | 2 +- src/main/resource_manager.cppm | 4 +- src/planner/explain_logical_plan.cpp | 4 +- src/planner/logical_planner.cpp | 6 +- src/planner/node/logical_create_index.cpp | 3 +- src/planner/node/logical_create_index.cppm | 21 +- src/planner/query_binder.cpp | 19 +- src/planner/query_binder.cppm | 2 + src/scheduler/fragment_context.cpp | 245 ++++++++++++-------- src/scheduler/fragment_context.cppm | 14 +- src/scheduler/task_scheduler.cpp | 20 +- 18 files changed, 365 insertions(+), 198 deletions(-) diff --git a/src/executor/fragment_builder.cpp b/src/executor/fragment_builder.cpp index 5def402acc..a15644a797 100644 --- a/src/executor/fragment_builder.cpp +++ b/src/executor/fragment_builder.cpp @@ -229,8 +229,8 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu if (phys_op->left() != nullptr or phys_op->right() != nullptr) { Error(Format("{} shouldn't have child.", phys_op->GetName())); } - PhysicalKnnScan* knn_scan = static_cast(phys_op); - if(knn_scan->TaskCount() == 1) { + PhysicalKnnScan *knn_scan = static_cast(phys_op); + if (knn_scan->TaskCount() == 1) { current_fragment_ptr->SetFragmentType(FragmentType::kSerialMaterialize); } else { current_fragment_ptr->SetFragmentType(FragmentType::kParallelMaterialize); @@ -262,6 +262,51 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu } return; } + case PhysicalOperatorType::kCreateIndexPrepare: { + if (phys_op->left() != nullptr || phys_op->right() != nullptr) { + Error(Format("Invalid input node of {}", phys_op->GetName())); + } + current_fragment_ptr->AddOperator(phys_op); + current_fragment_ptr->SetFragmentType(FragmentType::kSerialMaterialize); + current_fragment_ptr->SetSourceNode(query_context_ptr_, SourceType::kEmpty, phys_op->GetOutputNames(), phys_op->GetOutputTypes()); + return; + } + case PhysicalOperatorType::kCreateIndexDo: { + if (phys_op->left() == nullptr || phys_op->right() != nullptr) { + Error(Format("Invalid input node of {}", phys_op->GetName())); + } + current_fragment_ptr->AddOperator(phys_op); + current_fragment_ptr->SetFragmentType(FragmentType::kParallelMaterialize); + current_fragment_ptr->SetSourceNode(query_context_ptr_, SourceType::kLocalQueue, phys_op->GetOutputNames(), phys_op->GetOutputTypes()); + + auto next_plan_fragment = MakeUnique(GetFragmentId()); + next_plan_fragment->SetSinkNode(query_context_ptr_, + SinkType::kLocalQueue, + phys_op->left()->GetOutputNames(), + phys_op->left()->GetOutputTypes()); + BuildFragments(phys_op->left(), next_plan_fragment.get()); + + current_fragment_ptr->AddChild(Move(next_plan_fragment)); + return; + } + case PhysicalOperatorType::kCreateIndexFinish: { + if (phys_op->left() == nullptr || phys_op->right() != nullptr) { + Error(Format("Invalid input node of {}", phys_op->GetName())); + } + current_fragment_ptr->AddOperator(phys_op); + current_fragment_ptr->SetFragmentType(FragmentType::kSerialMaterialize); + current_fragment_ptr->SetSourceNode(query_context_ptr_, SourceType::kLocalQueue, phys_op->GetOutputNames(), phys_op->GetOutputTypes()); + + auto next_plan_fragment = MakeUnique(GetFragmentId()); + next_plan_fragment->SetSinkNode(query_context_ptr_, + SinkType::kLocalQueue, + phys_op->left()->GetOutputNames(), + phys_op->left()->GetOutputTypes()); + BuildFragments(phys_op->left(), next_plan_fragment.get()); + + current_fragment_ptr->AddChild(Move(next_plan_fragment)); + return; + } default: { LOG_ERROR(Format("Invalid operator type: {} in Fragment Builder", phys_op->GetName())); break; diff --git a/src/executor/operator/physical_knn_scan.cpp b/src/executor/operator/physical_knn_scan.cpp index b1bc41e1a7..6e9281b274 100644 --- a/src/executor/operator/physical_knn_scan.cpp +++ b/src/executor/operator/physical_knn_scan.cpp @@ -139,8 +139,8 @@ void PhysicalKnnScan::Init() {} bool PhysicalKnnScan::Execute(QueryContext *query_context, OperatorState *operator_state) { auto *knn_scan_operator_state = static_cast(operator_state); - auto elem_type = knn_scan_operator_state->knn_scan_function_data_->shared_data_->elem_type_; - auto dist_type = knn_scan_operator_state->knn_scan_function_data_->shared_data_->knn_distance_type_; + auto elem_type = knn_scan_operator_state->knn_scan_function_data_->knn_scan_shared_data_->elem_type_; + auto dist_type = knn_scan_operator_state->knn_scan_function_data_->knn_scan_shared_data_->knn_distance_type_; switch (elem_type) { case kElemFloat: { switch (dist_type) { @@ -232,7 +232,7 @@ SizeT PhysicalKnnScan::BlockEntryCount() const { return base_table_ref_->block_i template typename C> void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperatorState *operator_state) { auto knn_scan_function_data = operator_state->knn_scan_function_data_.get(); - auto knn_scan_shared_data = knn_scan_function_data->shared_data_; + auto knn_scan_shared_data = knn_scan_function_data->knn_scan_shared_data_; auto dist_func = static_cast *>(knn_scan_function_data->knn_distance_.get()); auto merge_heap = static_cast *>(knn_scan_function_data->merge_knn_base_.get()); diff --git a/src/executor/operator/physical_sink.cpp b/src/executor/operator/physical_sink.cpp index b7e5b7dd9b..35425b7b70 100644 --- a/src/executor/operator/physical_sink.cpp +++ b/src/executor/operator/physical_sink.cpp @@ -128,19 +128,6 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MaterializeSinkState *mate } break; } - case PhysicalOperatorType::kKnnScan: { - throw ExecutorException("KnnScan shouldn't be here"); - KnnScanOperatorState *knn_output_state = static_cast(task_op_state); - if (knn_output_state->data_block_array_.empty()) { - Error("Empty knn scan output"); - } - - for (auto &data_block : knn_output_state->data_block_array_) { - materialize_sink_state->data_block_array_.emplace_back(Move(data_block)); - } - knn_output_state->data_block_array_.clear(); - break; - } case PhysicalOperatorType::kAggregate: { AggregateOperatorState *agg_output_state = static_cast(task_op_state); if (agg_output_state->data_block_array_.empty()) { @@ -322,6 +309,17 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(ResultSinkState *result_si } break; } + case PhysicalOperatorType::kCreateIndexFinish: { + auto *output_state = static_cast(task_operator_state); + if (output_state->error_message_.get() != nullptr) { + result_sink_state->error_message_ = Move(output_state->error_message_); + break; + } + result_sink_state->result_def_ = { + MakeShared(0, MakeShared(LogicalType::kInteger), "OK", HashSet()), + }; + break; + } default: { Error(Format("{} isn't supported here.", PhysicalOperatorToString(task_operator_state->operator_type_))); } @@ -347,7 +345,9 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MessageSinkState *message_ } } -void PhysicalSink::FillSinkStateFromLastOperatorState(FragmentContext *fragment_context, QueueSinkState *queue_sink_state, OperatorState *task_operator_state) { +void PhysicalSink::FillSinkStateFromLastOperatorState(FragmentContext *fragment_context, + QueueSinkState *queue_sink_state, + OperatorState *task_operator_state) { if (queue_sink_state->error_message_.get() != nullptr) { LOG_TRACE(Format("Error: {} is sent to notify next fragment", *queue_sink_state->error_message_)); auto fragment_error = MakeShared(queue_sink_state->fragment_id_, MakeUnique(*queue_sink_state->error_message_)); @@ -362,33 +362,38 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(FragmentContext *fragment_ return; } SizeT output_data_block_count = task_operator_state->data_block_array_.size(); - if (output_data_block_count == 0) { - for (const auto &next_fragment_queue : queue_sink_state->fragment_data_queues_) { - next_fragment_queue->Enqueue(MakeShared(queue_sink_state->fragment_id_)); - } - return; - // Error("No output from last operator."); - } - for (SizeT idx = 0; idx < output_data_block_count; ++idx) { - auto fragment_data = MakeShared(queue_sink_state->fragment_id_, - Move(task_operator_state->data_block_array_[idx]), - queue_sink_state->task_id_, - idx, - output_data_block_count); - if (task_operator_state->Complete() && !fragment_context->IsMaterialize()) { - fragment_data->data_idx_ = None; + switch (task_operator_state->operator_type_) { + case PhysicalOperatorType::kCreateIndexPrepare: + case PhysicalOperatorType::kCreateIndexDo: { + for (const auto &next_fragment_queue : queue_sink_state->fragment_data_queues_) { + next_fragment_queue->Enqueue(MakeShared(queue_sink_state->fragment_id_)); + } + break; } + default: { + for (SizeT idx = 0; idx < output_data_block_count; ++idx) { + auto fragment_data = MakeShared(queue_sink_state->fragment_id_, + Move(task_operator_state->data_block_array_[idx]), + queue_sink_state->task_id_, + idx, + output_data_block_count); + if (task_operator_state->Complete() && !fragment_context->IsMaterialize()) { + fragment_data->data_idx_ = None; + } - for (const auto &next_fragment_queue : queue_sink_state->fragment_data_queues_) { - // when the Enqueue returns false, - // it means that the downstream has collected enough data, - // preventing the Queue from Enqueue in data again to avoid redundant calculations. - if (!next_fragment_queue->Enqueue(fragment_data)) { - task_operator_state->SetComplete(); + for (const auto &next_fragment_queue : queue_sink_state->fragment_data_queues_) { + // when the Enqueue returns false, + // it means that the downstream has collected enough data, + // preventing the Queue from Enqueue in data again to avoid redundant calculations. + if (!next_fragment_queue->Enqueue(fragment_data)) { + task_operator_state->SetComplete(); + } + } } + task_operator_state->data_block_array_.clear(); + break; } } - task_operator_state->data_block_array_.clear(); } } // namespace infinity diff --git a/src/executor/operator_state.cpp b/src/executor/operator_state.cpp index 9b6cebe69c..5644280fc3 100644 --- a/src/executor/operator_state.cpp +++ b/src/executor/operator_state.cpp @@ -107,11 +107,21 @@ bool QueueSourceState::GetData() { case PhysicalOperatorType::kMergeAggregate: { auto *fragment_data = static_cast(fragment_data_base.get()); MergeAggregateOperatorState *merge_aggregate_op_state = (MergeAggregateOperatorState *)next_op_state; - //merge_aggregate_op_state->input_data_blocks_.push_back(Move(fragment_data->data_block_)); + // merge_aggregate_op_state->input_data_blocks_.push_back(Move(fragment_data->data_block_)); merge_aggregate_op_state->input_data_block_ = Move(fragment_data->data_block_); merge_aggregate_op_state->input_complete_ = completed; break; } + case PhysicalOperatorType::kCreateIndexDo: { + auto *create_index_do_op_state = static_cast(next_op_state); + create_index_do_op_state->input_complete_ = completed; + break; + } + case PhysicalOperatorType::kCreateIndexFinish: { + auto *create_index_finish_op_state = static_cast(next_op_state); + create_index_finish_op_state->input_complete_ = completed; + break; + } default: { Error("Not support operator type"); break; diff --git a/src/executor/physical_operator_type.cpp b/src/executor/physical_operator_type.cpp index 3fea83d9b7..c93d36b97a 100644 --- a/src/executor/physical_operator_type.cpp +++ b/src/executor/physical_operator_type.cpp @@ -132,6 +132,12 @@ String PhysicalOperatorToString(PhysicalOperatorType type) { return "Fusion"; case PhysicalOperatorType::kMergeAggregate: return "MergeAggregate"; + case PhysicalOperatorType::kCreateIndexPrepare: + return "CreateIndexPrepare"; + case PhysicalOperatorType::kCreateIndexDo: + return "CreateIndexDo"; + case PhysicalOperatorType::kCreateIndexFinish: + return "CreateIndexFinish"; } Error("Unknown physical operator type"); diff --git a/src/executor/physical_planner.cpp b/src/executor/physical_planner.cpp index cdd2f85e54..573e41e4ea 100644 --- a/src/executor/physical_planner.cpp +++ b/src/executor/physical_planner.cpp @@ -75,6 +75,9 @@ import physical_drop_index; import physical_command; import physical_match; import physical_fusion; +import physical_create_index_prepare; +import physical_create_index_do; +import physical_create_index_finish; import logical_node; import logical_node_type; @@ -309,14 +312,48 @@ UniquePtr PhysicalPlanner::BuildCreateTable(const SharedPtr PhysicalPlanner::BuildCreateIndex(const SharedPtr &logical_operator) const { auto logical_create_index = static_pointer_cast(logical_operator); - return PhysicalCreateIndex::Make(logical_create_index->schema_name(), - logical_create_index->table_name(), - logical_create_index->index_definition(), - logical_create_index->conflict_type(), - logical_create_index->GetOutputNames(), - logical_create_index->GetOutputTypes(), - logical_create_index->node_id(), - logical_operator->load_metas()); + SharedPtr schema_name = logical_create_index->base_table_ref()->schema_name(); + SharedPtr table_name = logical_create_index->base_table_ref()->table_name(); + const auto &index_def_ptr = logical_create_index->index_definition(); + if (false || index_def_ptr->index_array_.size() != 1 || index_def_ptr->index_array_[0]->index_type_ != IndexType::kHnsw) { + // TODO: invalidate multiple index in one statement. + // TODO: support other index types build in parallel. + // use old `PhysicalCreateIndex` + return PhysicalCreateIndex::Make(schema_name, + table_name, + logical_create_index->index_definition(), + logical_create_index->conflict_type(), + logical_create_index->GetOutputNames(), + logical_create_index->GetOutputTypes(), + logical_create_index->node_id(), + logical_operator->load_metas()); + } + + // use new `PhysicalCreateIndexPrepare` `PhysicalCreateIndexDo` `PhysicalCreateIndexFinish` + auto create_index_prepare = MakeUnique(logical_create_index->node_id(), + schema_name, + table_name, + logical_create_index->index_definition(), + logical_create_index->conflict_type(), + logical_create_index->GetOutputNames(), + logical_create_index->GetOutputTypes(), + logical_create_index->load_metas()); + auto create_index_do = MakeUnique(logical_create_index->node_id(), + Move(create_index_prepare), + logical_create_index->base_table_ref(), + logical_create_index->index_definition()->index_name_, + logical_create_index->GetOutputNames(), + logical_create_index->GetOutputTypes(), + logical_create_index->load_metas()); + auto create_index_finish = MakeUnique(logical_create_index->node_id(), + Move(create_index_do), + schema_name, + table_name, + logical_create_index->index_definition(), + logical_create_index->GetOutputNames(), + logical_create_index->GetOutputTypes(), + logical_create_index->load_metas()); + return create_index_finish; } UniquePtr PhysicalPlanner::BuildCreateCollection(const SharedPtr &logical_operator) const { @@ -504,8 +541,6 @@ UniquePtr PhysicalPlanner::BuildAggregate(const SharedPtraggregate_index_, logical_operator->load_metas()); - - if (tasklet_count == 1) { return physical_agg_op; } else { diff --git a/src/function/table/knn_scan_data.cpp b/src/function/table/knn_scan_data.cpp index a947bb5b8d..f76575d8ff 100644 --- a/src/function/table/knn_scan_data.cpp +++ b/src/function/table/knn_scan_data.cpp @@ -63,8 +63,8 @@ KnnDistance1::KnnDistance1(KnnDistanceType dist_type) { // -------------------------------------------- KnnScanFunctionData::KnnScanFunctionData(KnnScanSharedData* shared_data, u32 current_parallel_idx) - : shared_data_(shared_data), task_id_(current_parallel_idx) { - switch (shared_data_->elem_type_) { + : knn_scan_shared_data_(shared_data), task_id_(current_parallel_idx) { + switch (knn_scan_shared_data_->elem_type_) { case EmbeddingDataType::kElemFloat: { Init(); break; @@ -77,32 +77,32 @@ KnnScanFunctionData::KnnScanFunctionData(KnnScanSharedData* shared_data, u32 cur template void KnnScanFunctionData::Init() { - switch (shared_data_->knn_distance_type_) { + switch (knn_scan_shared_data_->knn_distance_type_) { case KnnDistanceType::kInvalid: { throw ExecutorException("Invalid Knn distance type"); } case KnnDistanceType::kL2: case KnnDistanceType::kHamming: { - auto merge_knn_max = MakeUnique>(shared_data_->query_count_, shared_data_->topk_); + auto merge_knn_max = MakeUnique>(knn_scan_shared_data_->query_count_, knn_scan_shared_data_->topk_); merge_knn_max->Begin(); merge_knn_base_ = Move(merge_knn_max); break; } case KnnDistanceType::kCosine: case KnnDistanceType::kInnerProduct: { - auto merge_knn_min = MakeUnique>(shared_data_->query_count_, shared_data_->topk_); + auto merge_knn_min = MakeUnique>(knn_scan_shared_data_->query_count_, knn_scan_shared_data_->topk_); merge_knn_min->Begin(); merge_knn_base_ = Move(merge_knn_min); break; } } - knn_distance_ = MakeUnique>(shared_data_->knn_distance_type_); + knn_distance_ = MakeUnique>(knn_scan_shared_data_->knn_distance_type_); - if (shared_data_->filter_expression_) { - filter_state_ = ExpressionState::CreateState(shared_data_->filter_expression_); + if (knn_scan_shared_data_->filter_expression_) { + filter_state_ = ExpressionState::CreateState(knn_scan_shared_data_->filter_expression_); db_for_filter_ = MakeUnique(); - db_for_filter_->Init(*(shared_data_->table_ref_->column_types_)); // default capacity + db_for_filter_->Init(*(knn_scan_shared_data_->table_ref_->column_types_)); // default capacity bool_column_ = ColumnVector::Make(MakeShared(LogicalType::kBoolean)); // default capacity } } diff --git a/src/function/table/knn_scan_data.cppm b/src/function/table/knn_scan_data.cppm index 3d32cd3e63..bc4a67ddea 100644 --- a/src/function/table/knn_scan_data.cppm +++ b/src/function/table/knn_scan_data.cppm @@ -121,7 +121,7 @@ private: void Init(); public: - KnnScanSharedData* shared_data_; + KnnScanSharedData* knn_scan_shared_data_; const u32 task_id_; UniquePtr merge_knn_base_{}; diff --git a/src/main/resource_manager.cppm b/src/main/resource_manager.cppm index dd7849bcf9..8335c7042c 100644 --- a/src/main/resource_manager.cppm +++ b/src/main/resource_manager.cppm @@ -30,8 +30,8 @@ public: return cpu_count; } - // inline u64 GetCpuResource() { return GetCpuResource(Thread::hardware_concurrency()); } - inline u64 GetCpuResource() { return GetCpuResource(4); } + inline u64 GetCpuResource() { return GetCpuResource(Thread::hardware_concurrency()); } + // inline u64 GetCpuResource() { return GetCpuResource(4); } inline u64 GetMemoryResource(u64 memory_size) { total_memory_ -= memory_size; diff --git a/src/planner/explain_logical_plan.cpp b/src/planner/explain_logical_plan.cpp index 8b1f65e04b..e454026151 100644 --- a/src/planner/explain_logical_plan.cpp +++ b/src/planner/explain_logical_plan.cpp @@ -310,12 +310,12 @@ void ExplainLogicalPlan::Explain(const LogicalCreateIndex *create_node, SharedPt } { - String schema_name_str = String(intent_size, ' ') + " - schema name: " + *create_node->schema_name(); + String schema_name_str = String(intent_size, ' ') + " - schema name: " + *create_node->base_table_ref()->schema_name(); result->emplace_back(MakeShared(schema_name_str)); } { - String table_name_str = String(intent_size, ' ') + " - table name: " + *create_node->table_name(); + String table_name_str = String(intent_size, ' ') + " - table name: " + *create_node->base_table_ref()->table_name(); result->emplace_back(MakeShared(table_name_str)); } diff --git a/src/planner/logical_planner.cpp b/src/planner/logical_planner.cpp index c29d42e4bd..6c9bb7cb59 100644 --- a/src/planner/logical_planner.cpp +++ b/src/planner/logical_planner.cpp @@ -66,6 +66,7 @@ import index_base; import index_ivfflat; import index_hnsw; import index_full_text; +import base_table_ref; module logical_planner; @@ -486,8 +487,11 @@ Status LogicalPlanner::BuildCreateIndex(const CreateStatement *statement, Shared index_def_ptr->index_array_.emplace_back(base_index_ptr); } + UniquePtr query_binder_ptr = MakeUnique(this->query_context_ptr_, bind_context_ptr); + auto base_table_ref = std::static_pointer_cast(query_binder_ptr->GetTableRef(*schema_name, *table_name)); + auto logical_create_index_operator = - LogicalCreateIndex::Make(bind_context_ptr->GetNewLogicalNodeId(), schema_name, table_name, index_def_ptr, create_index_info->conflict_type_); + MakeShared(bind_context_ptr->GetNewLogicalNodeId(), base_table_ref, index_def_ptr, create_index_info->conflict_type_); this->logical_plan_ = logical_create_index_operator; this->names_ptr_->emplace_back("OK"); diff --git a/src/planner/node/logical_create_index.cpp b/src/planner/node/logical_create_index.cpp index b622d99390..4163106191 100644 --- a/src/planner/node/logical_create_index.cpp +++ b/src/planner/node/logical_create_index.cpp @@ -47,7 +47,8 @@ String LogicalCreateIndex::ToString(i64 &space) const { space -= 4; arrow_str = "-> "; } - ss << String(space, ' ') << arrow_str << "Create Table: " << *schema_name_ << "." << index_definition_->ToString(); + ss << String(space, ' ') << arrow_str << "Create Table: " << *base_table_ref_->table_name() << "." + << index_definition_->ToString(); space += arrow_str.size(); return ss.str(); diff --git a/src/planner/node/logical_create_index.cppm b/src/planner/node/logical_create_index.cppm index 25d3f8f86e..e9d2c92578 100644 --- a/src/planner/node/logical_create_index.cppm +++ b/src/planner/node/logical_create_index.cppm @@ -19,6 +19,7 @@ import logical_node_type; import column_binding; import logical_node; import parser; +import base_table_ref; export module logical_create_index; @@ -39,31 +40,19 @@ public: inline String name() override { return "LogicalCreateIndex"; } public: - static inline SharedPtr - Make(u64 node_id, SharedPtr schema_name, SharedPtr table_name, SharedPtr index_def, ConflictType conflict_type) { - return MakeShared(node_id, schema_name, table_name, index_def, conflict_type); - } - - inline LogicalCreateIndex(u64 node_id, - SharedPtr schema_name, - SharedPtr table_name, - SharedPtr index_def, - ConflictType conflict_type) - : LogicalNode(node_id, LogicalNodeType::kCreateIndex), schema_name_(schema_name), table_name_(table_name), index_definition_(index_def), + inline LogicalCreateIndex(u64 node_id, SharedPtr base_table_ref, SharedPtr index_def, ConflictType conflict_type) + : LogicalNode(node_id, LogicalNodeType::kCreateIndex), base_table_ref_(base_table_ref), index_definition_(index_def), conflict_type_(conflict_type) {} public: - [[nodiscard]] inline SharedPtr schema_name() const { return schema_name_; } - - [[nodiscard]] inline SharedPtr table_name() const { return table_name_; } + [[nodiscard]] inline SharedPtr base_table_ref() const { return base_table_ref_; } [[nodiscard]] inline SharedPtr index_definition() const { return index_definition_; } [[nodiscard]] inline ConflictType conflict_type() const { return conflict_type_; } private: - SharedPtr schema_name_{}; - SharedPtr table_name_{}; + SharedPtr base_table_ref_{}; SharedPtr index_definition_{}; ConflictType conflict_type_{ConflictType::kInvalid}; }; diff --git a/src/planner/query_binder.cpp b/src/planner/query_binder.cpp index b79591e19c..551d34bae0 100644 --- a/src/planner/query_binder.cpp +++ b/src/planner/query_binder.cpp @@ -911,13 +911,18 @@ void QueryBinder::CheckKnnAndOrderBy(KnnDistanceType distance_type, OrderType or } } +SharedPtr QueryBinder::GetTableRef(const String &db_name, const String &table_name) { + TableReference from_table; + from_table.db_name_ = db_name; + from_table.table_name_ = table_name; + return BuildBaseTable(this->query_context_ptr_, &from_table); +} + UniquePtr QueryBinder::BindDelete(const DeleteStatement &statement) { // refers to QueryBinder::BindSelect UniquePtr bound_delete_statement = BoundDeleteStatement::Make(bind_context_ptr_); - TableReference from_table; - from_table.db_name_ = statement.schema_name_; - from_table.table_name_ = statement.table_name_; - SharedPtr base_table_ref = QueryBinder::BuildBaseTable(this->query_context_ptr_, &from_table); + SharedPtr base_table_ref = GetTableRef(statement.schema_name_, statement.table_name_); + bound_delete_statement->table_ref_ptr_ = base_table_ref; if (base_table_ref.get() == nullptr) { Error(Format("Cannot bind {}.{} to a table", statement.schema_name_, statement.table_name_)); @@ -935,10 +940,8 @@ UniquePtr QueryBinder::BindDelete(const DeleteStatement &s UniquePtr QueryBinder::BindUpdate(const UpdateStatement &statement) { // refers to QueryBinder::BindSelect UniquePtr bound_update_statement = BoundUpdateStatement::Make(bind_context_ptr_); - TableReference from_table; - from_table.db_name_ = statement.schema_name_; - from_table.table_name_ = statement.table_name_; - SharedPtr base_table_ref = QueryBinder::BuildBaseTable(this->query_context_ptr_, &from_table); + SharedPtr base_table_ref = GetTableRef(statement.schema_name_, statement.table_name_); + bound_update_statement->table_ref_ptr_ = base_table_ref; if (base_table_ref.get() == nullptr) { Error(Format("Cannot bind {}.{} to a table", statement.schema_name_, statement.table_name_)); diff --git a/src/planner/query_binder.cppm b/src/planner/query_binder.cppm index 6d57072199..4707027aa5 100644 --- a/src/planner/query_binder.cppm +++ b/src/planner/query_binder.cppm @@ -42,6 +42,8 @@ public: UniquePtr BindUpdate(const UpdateStatement &statement); + SharedPtr GetTableRef(const String &db_name, const String &table_name); + QueryContext *query_context_ptr_; SharedPtr bind_context_ptr_; diff --git a/src/scheduler/fragment_context.cpp b/src/scheduler/fragment_context.cpp index 1bff87166d..7ded831341 100644 --- a/src/scheduler/fragment_context.cpp +++ b/src/scheduler/fragment_context.cpp @@ -31,6 +31,7 @@ import physical_table_scan; import physical_knn_scan; import physical_aggregate; import physical_explain; +import physical_create_index_do; import global_block_id; import knn_expression; @@ -44,6 +45,7 @@ import data_table; import data_block; import physical_merge_knn; import merge_knn_data; +import create_index_data; import logger; import plan_fragment; @@ -58,6 +60,13 @@ UniquePtr MakeTaskStateTemplate(PhysicalOperator *physical_op) { return MakeUnique(); } +UniquePtr MakeCreateIndexDoState(PhysicalCreateIndexDo *physical_create_index_do, FragmentTask *task, FragmentContext *fragment_ctx) { + UniquePtr operator_state = MakeUnique(); + auto *parallel_materialize_fragment_ctx = static_cast(fragment_ctx); + operator_state->create_index_shared_data_ = parallel_materialize_fragment_ctx->create_index_shared_data_.get(); + return operator_state; +} + UniquePtr MakeTableScanState(PhysicalTableScan *physical_table_scan, FragmentTask *task) { SourceState *source_state = task->source_state_.get(); @@ -88,13 +97,13 @@ UniquePtr MakeKnnScanState(PhysicalKnnScan *physical_knn_scan, Fr case FragmentType::kSerialMaterialize: { SerialMaterializedFragmentCtx *serial_materialize_fragment_ctx = static_cast(fragment_ctx); knn_scan_op_state_ptr->knn_scan_function_data_ = - MakeUnique(serial_materialize_fragment_ctx->shared_data_.get(), task->TaskID()); + MakeUnique(serial_materialize_fragment_ctx->knn_scan_shared_data_.get(), task->TaskID()); break; } case FragmentType::kParallelMaterialize: { ParallelMaterializedFragmentCtx *parallel_materialize_fragment_ctx = static_cast(fragment_ctx); knn_scan_op_state_ptr->knn_scan_function_data_ = - MakeUnique(parallel_materialize_fragment_ctx->shared_data_.get(), task->TaskID()); + MakeUnique(parallel_materialize_fragment_ctx->knn_scan_shared_data_.get(), task->TaskID()); break; } default: { @@ -218,6 +227,16 @@ MakeTaskState(SizeT operator_id, const Vector &physical_ops, case PhysicalOperatorType::kCreateIndex: { return MakeTaskStateTemplate(physical_ops[operator_id]); } + case PhysicalOperatorType::kCreateIndexPrepare: { + return MakeTaskStateTemplate(physical_ops[operator_id]); + } + case PhysicalOperatorType::kCreateIndexDo: { + auto *physical_create_index_do = static_cast(physical_ops[operator_id]); + return MakeCreateIndexDoState(physical_create_index_do, task, fragment_ctx); + } + case PhysicalOperatorType::kCreateIndexFinish: { + return MakeTaskStateTemplate(physical_ops[operator_id]); + } case PhysicalOperatorType::kCreateCollection: { return MakeTaskStateTemplate(physical_ops[operator_id]); } @@ -454,32 +473,32 @@ SizeT InitKnnScanFragmentContext(PhysicalKnnScan *knn_scan_operator, FragmentCon switch (fragment_context->ContextType()) { case FragmentType::kSerialMaterialize: { SerialMaterializedFragmentCtx *serial_materialize_fragment_ctx = static_cast(fragment_context); - serial_materialize_fragment_ctx->shared_data_ = MakeUnique(knn_scan_operator->base_table_ref_, - knn_scan_operator->filter_expression_, - Move(knn_scan_operator->block_column_entries_), - Move(knn_scan_operator->index_entries_), - Move(knn_expr->opt_params_), - knn_expr->topn_, - knn_expr->dimension_, - 1, - knn_expr->query_embedding_.ptr, - knn_expr->embedding_data_type_, - knn_expr->distance_type_); + serial_materialize_fragment_ctx->knn_scan_shared_data_ = MakeUnique(knn_scan_operator->base_table_ref_, + knn_scan_operator->filter_expression_, + Move(knn_scan_operator->block_column_entries_), + Move(knn_scan_operator->index_entries_), + Move(knn_expr->opt_params_), + knn_expr->topn_, + knn_expr->dimension_, + 1, + knn_expr->query_embedding_.ptr, + knn_expr->embedding_data_type_, + knn_expr->distance_type_); break; } case FragmentType::kParallelMaterialize: { ParallelMaterializedFragmentCtx *parallel_materialize_fragment_ctx = static_cast(fragment_context); - parallel_materialize_fragment_ctx->shared_data_ = MakeUnique(knn_scan_operator->base_table_ref_, - knn_scan_operator->filter_expression_, - Move(knn_scan_operator->block_column_entries_), - Move(knn_scan_operator->index_entries_), - Move(knn_expr->opt_params_), - knn_expr->topn_, - knn_expr->dimension_, - 1, - knn_expr->query_embedding_.ptr, - knn_expr->embedding_data_type_, - knn_expr->distance_type_); + parallel_materialize_fragment_ctx->knn_scan_shared_data_ = MakeUnique(knn_scan_operator->base_table_ref_, + knn_scan_operator->filter_expression_, + Move(knn_scan_operator->block_column_entries_), + Move(knn_scan_operator->index_entries_), + Move(knn_expr->opt_params_), + knn_expr->topn_, + knn_expr->dimension_, + 1, + knn_expr->query_embedding_.ptr, + knn_expr->embedding_data_type_, + knn_expr->distance_type_); break; } default: { @@ -490,65 +509,18 @@ SizeT InitKnnScanFragmentContext(PhysicalKnnScan *knn_scan_operator, FragmentCon return task_n; } -// Allocate tasks for the fragment and determine the sink and source -void FragmentContext::CreateTasks(i64 cpu_count, i64 operator_count) { - i64 parallel_count = cpu_count; - PhysicalOperator *first_operator = this->GetOperators().back(); - switch (first_operator->operator_type()) { - case PhysicalOperatorType::kTableScan: { - auto *table_scan_operator = static_cast(first_operator); - parallel_count = Min(parallel_count, (i64)(table_scan_operator->TaskletCount())); - if (parallel_count == 0) { - parallel_count = 1; - } - break; - } - case PhysicalOperatorType::kKnnScan: { - auto *knn_scan_operator = static_cast(first_operator); - SizeT task_n = InitKnnScanFragmentContext(knn_scan_operator, this, query_context_); - parallel_count = Min(parallel_count, (i64)task_n); - if (parallel_count == 0) { - parallel_count = 1; - } - break; - } - case PhysicalOperatorType::kMatch: - case PhysicalOperatorType::kMergeKnn: - case PhysicalOperatorType::kProjection: { - // Serial Materialize - parallel_count = 1; - break; - } - default: { - break; - } - } +SizeT InitCreateIndexDoFragmentContext(const PhysicalCreateIndexDo *create_index_do_operator, FragmentContext *fragment_ctx) { + auto *table_entry = create_index_do_operator->base_table_ref_->table_entry_ptr_; + // FIXME: to create index on unsealed_segment + SizeT segment_cnt = table_entry->segment_map().size(); - switch (fragment_type_) { - case FragmentType::kInvalid: { - Error("Invalid fragment type"); - } - case FragmentType::kSerialMaterialize: { - UniqueLock locker(locker_); - parallel_count = 1; - tasks_.reserve(parallel_count); - tasks_.emplace_back(MakeUnique(this, 0, operator_count)); - IncreaseTask(); - break; - } - case FragmentType::kParallelMaterialize: - case FragmentType::kParallelStream: { - UniqueLock locker(locker_); - tasks_.reserve(parallel_count); - for (i64 task_id = 0; task_id < parallel_count; ++task_id) { - tasks_.emplace_back(MakeUnique(this, task_id, operator_count)); - IncreaseTask(); - } - break; - } - } + auto *parallel_materialize_fragment_ctx = static_cast(fragment_ctx); + parallel_materialize_fragment_ctx->create_index_shared_data_ = MakeUnique(table_entry->segment_map()); + return segment_cnt; +} - // Determine which type of source state. +void FragmentContext::MakeSourceState(i64 parallel_count) { + PhysicalOperator *first_operator = this->GetOperators().back(); switch (first_operator->operator_type()) { case PhysicalOperatorType::kInvalid: { Error("Unexpected operator type"); @@ -589,7 +561,8 @@ void FragmentContext::CreateTasks(i64 cpu_count, i64 operator_count) { case PhysicalOperatorType::kMergeTop: case PhysicalOperatorType::kMergeSort: case PhysicalOperatorType::kMergeKnn: - case PhysicalOperatorType::kFusion: { + case PhysicalOperatorType::kFusion: + case PhysicalOperatorType::kCreateIndexFinish: { if (fragment_type_ != FragmentType::kSerialMaterialize) { Error( Format("{} should be serial materialized fragment", PhysicalOperatorToString(first_operator->operator_type()))); @@ -602,6 +575,16 @@ void FragmentContext::CreateTasks(i64 cpu_count, i64 operator_count) { tasks_[0]->source_state_ = MakeUnique(); break; } + case PhysicalOperatorType::kCreateIndexDo: { + if (fragment_type_ != FragmentType::kParallelMaterialize) { + Error( + Format("{} should in parallel materialized fragment", PhysicalOperatorToString(first_operator->operator_type()))); + } + for (auto &task : tasks_) { + task->source_state_ = MakeUnique(); + } + break; + } case PhysicalOperatorType::kUnionAll: case PhysicalOperatorType::kIntersect: case PhysicalOperatorType::kExcept: @@ -650,6 +633,7 @@ void FragmentContext::CreateTasks(i64 cpu_count, i64 operator_count) { case PhysicalOperatorType::kAlter: case PhysicalOperatorType::kCreateTable: case PhysicalOperatorType::kCreateIndex: + case PhysicalOperatorType::kCreateIndexPrepare: case PhysicalOperatorType::kCreateCollection: case PhysicalOperatorType::kCreateDatabase: case PhysicalOperatorType::kCreateView: @@ -680,8 +664,10 @@ void FragmentContext::CreateTasks(i64 cpu_count, i64 operator_count) { Error(Format("Unexpected operator type: {}", PhysicalOperatorToString(first_operator->operator_type()))); } } +} - // Determine which type of the sink state. +void FragmentContext::MakeSinkState(i64 parallel_count) { + PhysicalOperator *first_operator = this->GetOperators().back(); PhysicalOperator *last_operator = this->GetOperators().front(); switch (last_operator->operator_type()) { @@ -706,6 +692,7 @@ void FragmentContext::CreateTasks(i64 cpu_count, i64 operator_count) { } case PhysicalOperatorType::kParallelAggregate: case PhysicalOperatorType::kHash: + case PhysicalOperatorType::kLimit: case PhysicalOperatorType::kTop: { if (fragment_type_ != FragmentType::kParallelStream) { Error(Format("{} should in parallel stream fragment", PhysicalOperatorToString(last_operator->operator_type()))); @@ -724,22 +711,6 @@ void FragmentContext::CreateTasks(i64 cpu_count, i64 operator_count) { } break; } - case PhysicalOperatorType::kLimit: { - if (fragment_type_ != FragmentType::kParallelStream) { - Error(Format("{} should in parallel stream fragment", PhysicalOperatorToString(last_operator->operator_type()))); - } - - if ((i64)tasks_.size() != parallel_count) { - Error(Format("{} task count isn't correct.", PhysicalOperatorToString(last_operator->operator_type()))); - } - - for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { - auto sink_state = MakeUnique(fragment_ptr_->FragmentID(), task_id); - - tasks_[task_id]->sink_state_ = Move(sink_state); - } - break; - } case PhysicalOperatorType::kMergeParallelAggregate: case PhysicalOperatorType::kMergeAggregate: case PhysicalOperatorType::kMergeHash: @@ -784,7 +755,9 @@ void FragmentContext::CreateTasks(i64 cpu_count, i64 operator_count) { break; } case PhysicalOperatorType::kSort: - case PhysicalOperatorType::kKnnScan: { + case PhysicalOperatorType::kKnnScan: + case PhysicalOperatorType::kCreateIndexPrepare: + case PhysicalOperatorType::kCreateIndexDo: { if (fragment_type_ != FragmentType::kParallelMaterialize && fragment_type_ != FragmentType::kSerialMaterialize) { Error( Format("{} should in parallel/serial materialized fragment", PhysicalOperatorToString(first_operator->operator_type()))); @@ -881,6 +854,7 @@ void FragmentContext::CreateTasks(i64 cpu_count, i64 operator_count) { case PhysicalOperatorType::kCommand: case PhysicalOperatorType::kCreateTable: case PhysicalOperatorType::kCreateIndex: + case PhysicalOperatorType::kCreateIndexFinish: case PhysicalOperatorType::kCreateCollection: case PhysicalOperatorType::kCreateDatabase: case PhysicalOperatorType::kCreateView: @@ -909,6 +883,77 @@ void FragmentContext::CreateTasks(i64 cpu_count, i64 operator_count) { } } +// Allocate tasks for the fragment and determine the sink and source +void FragmentContext::CreateTasks(i64 cpu_count, i64 operator_count) { + i64 parallel_count = cpu_count; + PhysicalOperator *first_operator = this->GetOperators().back(); + switch (first_operator->operator_type()) { + case PhysicalOperatorType::kTableScan: { + auto *table_scan_operator = static_cast(first_operator); + parallel_count = Min(parallel_count, (i64)(table_scan_operator->TaskletCount())); + if (parallel_count == 0) { + parallel_count = 1; + } + break; + } + case PhysicalOperatorType::kKnnScan: { + auto *knn_scan_operator = static_cast(first_operator); + SizeT task_n = InitKnnScanFragmentContext(knn_scan_operator, this, query_context_); + parallel_count = Min(parallel_count, (i64)task_n); + if (parallel_count == 0) { + parallel_count = 1; + } + break; + } + case PhysicalOperatorType::kMatch: + case PhysicalOperatorType::kMergeKnn: + case PhysicalOperatorType::kProjection: { + // Serial Materialize + parallel_count = 1; + break; + } + case PhysicalOperatorType::kCreateIndexDo: { + const auto *create_index_do_operator = static_cast(first_operator); + SizeT segment_n = InitCreateIndexDoFragmentContext(create_index_do_operator, this); + parallel_count = Max(parallel_count, 1l); + break; + } + default: { + break; + } + } + + switch (fragment_type_) { + case FragmentType::kInvalid: { + Error("Invalid fragment type"); + } + case FragmentType::kSerialMaterialize: { + UniqueLock locker(locker_); + parallel_count = 1; + tasks_.reserve(parallel_count); + tasks_.emplace_back(MakeUnique(this, 0, operator_count)); + IncreaseTask(); + break; + } + case FragmentType::kParallelMaterialize: + case FragmentType::kParallelStream: { + UniqueLock locker(locker_); + tasks_.reserve(parallel_count); + for (i64 task_id = 0; task_id < parallel_count; ++task_id) { + tasks_.emplace_back(MakeUnique(this, task_id, operator_count)); + IncreaseTask(); + } + break; + } + } + + // Determine which type of source state. + MakeSourceState(parallel_count); + + // Determine which type of the sink state. + MakeSinkState(parallel_count); +} + SharedPtr SerialMaterializedFragmentCtx::GetResultInternal() { // Only one sink state if (tasks_.size() != 1) { diff --git a/src/scheduler/fragment_context.cppm b/src/scheduler/fragment_context.cppm index a10ed408ee..b87916438b 100644 --- a/src/scheduler/fragment_context.cppm +++ b/src/scheduler/fragment_context.cppm @@ -24,6 +24,7 @@ import physical_sink; import data_table; import data_block; import knn_scan_data; +import create_index_data; export module fragment_context; @@ -46,8 +47,6 @@ export enum class FragmentType { kParallelStream, }; -class PlanFragment; - export class FragmentContext { public: static void @@ -101,6 +100,11 @@ public: [[nodiscard]] inline FragmentType ContextType() const { return fragment_type_; } +private: + void MakeSourceState(i64 parallel_count); + + void MakeSinkState(i64 parallel_count); + protected: virtual SharedPtr GetResultInternal() = 0; @@ -133,7 +137,7 @@ public: SharedPtr GetResultInternal() final; public: - UniquePtr shared_data_{}; + UniquePtr knn_scan_shared_data_{}; }; export class ParallelMaterializedFragmentCtx final : public FragmentContext { @@ -146,7 +150,9 @@ public: SharedPtr GetResultInternal() final; public: - UniquePtr shared_data_{}; + UniquePtr knn_scan_shared_data_{}; + + UniquePtr create_index_shared_data_{}; protected: HashMap>> task_results_{}; diff --git a/src/scheduler/task_scheduler.cpp b/src/scheduler/task_scheduler.cpp index deb58d29cc..47b6edf2b1 100644 --- a/src/scheduler/task_scheduler.cpp +++ b/src/scheduler/task_scheduler.cpp @@ -30,6 +30,7 @@ import query_context; import plan_fragment; import fragment_context; import default_values; +import physical_operator_type; module task_scheduler; @@ -93,7 +94,20 @@ void TaskScheduler::Schedule(QueryContext *query_context, const VectorGetOperators().empty()) { + Error("Empty fragment"); + } + auto *last_operator = plan_fragment->GetOperators()[0]; + switch (last_operator->operator_type()) { + case PhysicalOperatorType::kCreateIndexFinish: { + ScheduleRoundRobin(query_context, tasks, plan_fragment); + break; + } + default: { + ScheduleOneWorkerIfPossible(query_context, tasks, plan_fragment); + break; + } + } } void TaskScheduler::ScheduleOneWorkerPerQuery(QueryContext *query_context, const Vector &tasks, PlanFragment *plan_fragment) { @@ -138,9 +152,11 @@ void TaskScheduler::ScheduleOneWorkerIfPossible(QueryContext *query_context, con void TaskScheduler::ScheduleRoundRobin(QueryContext *query_context, const Vector &tasks, PlanFragment *plan_fragment) { LOG_TRACE(Format("Schedule {} tasks of query id: {} into scheduler with RR policy", tasks.size(), query_context->query_id())); + u64 worker_id = 0; for (const auto &fragment_task : tasks) { - u64 worker_id = ProposedWorkerID(worker_count_); ScheduleTask(fragment_task, worker_id); + worker_id++; + worker_id %= worker_count_; } } From 04086c5237119fc8227c95cd9ef8b15b310bdd53 Mon Sep 17 00:00:00 2001 From: shenyushi Date: Fri, 5 Jan 2024 13:40:02 +0800 Subject: [PATCH 3/4] Fix bug for limit sink. --- src/scheduler/fragment_context.cpp | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/scheduler/fragment_context.cpp b/src/scheduler/fragment_context.cpp index 7ded831341..bc32b77ade 100644 --- a/src/scheduler/fragment_context.cpp +++ b/src/scheduler/fragment_context.cpp @@ -692,7 +692,6 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } case PhysicalOperatorType::kParallelAggregate: case PhysicalOperatorType::kHash: - case PhysicalOperatorType::kLimit: case PhysicalOperatorType::kTop: { if (fragment_type_ != FragmentType::kParallelStream) { Error(Format("{} should in parallel stream fragment", PhysicalOperatorToString(last_operator->operator_type()))); @@ -711,6 +710,22 @@ void FragmentContext::MakeSinkState(i64 parallel_count) { } break; } + case PhysicalOperatorType::kLimit: { + if (fragment_type_ != FragmentType::kParallelStream) { + Error(Format("{} should in parallel stream fragment", PhysicalOperatorToString(last_operator->operator_type()))); + } + + if ((i64)tasks_.size() != parallel_count) { + Error(Format("{} task count isn't correct.", PhysicalOperatorToString(last_operator->operator_type()))); + } + + for (u64 task_id = 0; (i64)task_id < parallel_count; ++task_id) { + auto sink_state = MakeUnique(fragment_ptr_->FragmentID(), task_id); + + tasks_[task_id]->sink_state_ = Move(sink_state); + } + break; + } case PhysicalOperatorType::kMergeParallelAggregate: case PhysicalOperatorType::kMergeAggregate: case PhysicalOperatorType::kMergeHash: From 4f143b13971cb630ec52edbf7afdf3dec1521d97 Mon Sep 17 00:00:00 2001 From: shenyushi Date: Fri, 5 Jan 2024 14:45:44 +0800 Subject: [PATCH 4/4] Add annotation. --- src/storage/txn/txn.cppm | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/storage/txn/txn.cppm b/src/storage/txn/txn.cppm index f73d1ebc80..aa1e5b487b 100644 --- a/src/storage/txn/txn.cppm +++ b/src/storage/txn/txn.cppm @@ -93,7 +93,11 @@ public: Tuple GetTableEntry(const String &db_name, const String &table_name); // Index OPs - Status CreateIndex(const String &db_name, const String &table_name, const SharedPtr &index_def, ConflictType conflict_type, bool prepare); + // If `prepare` is false, the index will be created in single thread. (called by `PhysicalCreateIndex`) + // Else, only data is stored in index (Called by `PhysicalCreateIndexPrepare`). And the index will be created by multiple threads in next + // operator. (called by `PhysicalCreateIndexDo`) + Status + CreateIndex(const String &db_name, const String &table_name, const SharedPtr &index_def, ConflictType conflict_type, bool prepare); Status CreateIndexDo(const String &db_name, const String &table_name, const String &index_name, HashMap &create_index_idxes);