Skip to content

Commit

Permalink
Add snapshot, part4 (#2425)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Jan 4, 2025
1 parent 2b26868 commit c89db0b
Show file tree
Hide file tree
Showing 12 changed files with 382 additions and 51 deletions.
4 changes: 4 additions & 0 deletions src/executor/operator/physical_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,10 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat
}
case SnapshotOp::kDrop: {
LOG_INFO(fmt::format("Execute snapshot drop"));
Status snapshot_status = Snapshot::DropSnapshot(query_context, snapshot_name);
if (!snapshot_status.ok()) {
RecoverableError(snapshot_status);
}
break;
}
case SnapshotOp::kRestore: {
Expand Down
240 changes: 205 additions & 35 deletions src/executor/operator/physical_show.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ import peer_task;
import node_info;
import txn_context;
import txn_state;
import snapshot_brief;
import command_statement;

namespace infinity {

Expand Down Expand Up @@ -583,7 +585,7 @@ void PhysicalShow::Init() {
output_types_->emplace_back(varchar_type);
output_types_->emplace_back(varchar_type);
output_types_->emplace_back(bigint_type);
output_types_->emplace_back(bigint_type);
output_types_->emplace_back(varchar_type);
break;
}
case ShowStmtType::kShowSnapshot: {
Expand Down Expand Up @@ -5867,8 +5869,8 @@ void PhysicalShow::ExecuteShowTransaction(QueryContext *query_context, ShowOpera
}

void PhysicalShow::ExecuteShowTransactionHistory(QueryContext *query_context, ShowOperatorState *operator_state) {
Txn* txn = query_context->GetTxn();
// txn->AddOperation(MakeShared<String>("ShowTransactionHistory"));
Txn *txn = query_context->GetTxn();
// txn->AddOperation(MakeShared<String>("ShowTransactionHistory"));
TransactionID this_txn_id = txn->TxnID();
TxnManager *txn_manager = query_context->storage()->txn_manager();
Vector<SharedPtr<TxnContext>> txn_context_histories = txn_manager->GetTxnContextHistories();
Expand Down Expand Up @@ -6581,9 +6583,89 @@ void PhysicalShow::ExecuteListSnapshots(QueryContext *query_context, ShowOperato
auto varchar_type = MakeShared<DataType>(LogicalType::kVarchar);
auto bigint_type = MakeShared<DataType>(LogicalType::kBigInt);
UniquePtr<DataBlock> output_block_ptr = DataBlock::MakeUniquePtr();
Vector<SharedPtr<DataType>> column_types{varchar_type, varchar_type, varchar_type, bigint_type, bigint_type};

Vector<SharedPtr<DataType>> column_types{varchar_type, varchar_type, varchar_type, bigint_type, varchar_type};

output_block_ptr->Init(column_types);

String snapshot_dir = query_context->global_config()->SnapshotDir();
Vector<SnapshotBrief> snapshot_list = SnapshotBrief::GetSnapshots(snapshot_dir);

SizeT row_count = 0;
for (auto &snapshot_brief : snapshot_list) {
if (output_block_ptr.get() == nullptr) {
output_block_ptr = DataBlock::MakeUniquePtr();
output_block_ptr->Init(column_types);
}

{
// snapshot name
Value value = Value::MakeVarchar(snapshot_brief.snapshot_name_);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[0]);
}

{
// scope
String scope_str;
switch (snapshot_brief.scope_) {
case SnapshotScope::kTable: {
scope_str = "Table";
break;
}
case SnapshotScope::kDatabase: {
scope_str = "Database";
break;
}
case SnapshotScope::kSystem: {
scope_str = "System";
break;
}
case SnapshotScope::kIgnore: {
scope_str = "Ignore";
break;
}
default: {
Status status = Status::Unknown("Invalid scope type");
RecoverableError(status);
}
}
Value value = Value::MakeVarchar(scope_str);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[1]);
}

{
// snapshot create time
Value value = Value::MakeVarchar(snapshot_brief.create_time_);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[2]);
}

{
// snapshot commit ts
Value value = Value::MakeBigInt(snapshot_brief.commit_ts_);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[3]);
}

{
// snapshot size
String snapshot_size_str = Utility::FormatByteSize(snapshot_brief.size_);
Value value = Value::MakeVarchar(snapshot_size_str);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[4]);
}

++row_count;
if (row_count == output_block_ptr->capacity()) {
output_block_ptr->Finalize();
operator_state->output_.emplace_back(std::move(output_block_ptr));
output_block_ptr = nullptr;
row_count = 0;
}
}

output_block_ptr->Finalize();
operator_state->output_.emplace_back(std::move(output_block_ptr));
return;
Expand All @@ -6599,37 +6681,125 @@ void PhysicalShow::ExecuteShowSnapshot(QueryContext *query_context, ShowOperator

output_block_ptr->Init(column_types);

// {
// SizeT column_id = 0;
// {
// Value value = Value::MakeVarchar("memory_objects");
// ValueExpression value_expr(value);
// value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
// }
//
// ++column_id;
// {
// Value value = Value::MakeVarchar(GlobalResourceUsage::GetObjectCountInfo());
// ValueExpression value_expr(value);
// value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
// }
// }
//
// {
// SizeT column_id = 0;
// {
// Value value = Value::MakeVarchar("memory_allocation");
// ValueExpression value_expr(value);
// value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
// }
//
// ++column_id;
// {
// Value value = Value::MakeVarchar(GlobalResourceUsage::GetRawMemoryInfo());
// ValueExpression value_expr(value);
// value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
// }
// }
String snapshot_dir = query_context->global_config()->SnapshotDir();
Vector<SnapshotBrief> snapshot_list = SnapshotBrief::GetSnapshots(snapshot_dir);

SnapshotBrief snapshot_brief;
for (const auto &ss_brief : snapshot_list) {
if (ss_brief.snapshot_name_ == object_name_.value()) {
snapshot_brief = ss_brief;
}
}

if (snapshot_brief.scope_ == SnapshotScope::kInvalid) {
Status status = Status::Unknown(fmt::format("can't find snapshot: {}", object_name_.value()));
RecoverableError(status);
}

{
SizeT column_id = 0;
{
Value value = Value::MakeVarchar("snapshot_name");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}

++column_id;
{
Value value = Value::MakeVarchar(snapshot_brief.snapshot_name_);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

{
SizeT column_id = 0;
{
Value value = Value::MakeVarchar("snapshot_scope");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}

++column_id;
{
String scope_str;
switch (snapshot_brief.scope_) {
case SnapshotScope::kTable: {
scope_str = "Table";
break;
}
case SnapshotScope::kDatabase: {
scope_str = "Database";
break;
}
case SnapshotScope::kSystem: {
scope_str = "System";
break;
}
case SnapshotScope::kIgnore: {
scope_str = "Ignore";
break;
}
default: {
Status status = Status::Unknown("Invalid scope type");
RecoverableError(status);
}
}

Value value = Value::MakeVarchar(scope_str);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

{
SizeT column_id = 0;
{
Value value = Value::MakeVarchar("create_time");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}

++column_id;
{
Value value = Value::MakeVarchar(snapshot_brief.create_time_);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

{
SizeT column_id = 0;
{
Value value = Value::MakeVarchar("commit_timestamp");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}

++column_id;
{
Value value = Value::MakeVarchar(std::to_string(snapshot_brief.commit_ts_));
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

{
SizeT column_id = 0;
{
Value value = Value::MakeVarchar("snapshot_size");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}

++column_id;
{
String snapshot_size_str = Utility::FormatByteSize(snapshot_brief.size_);
Value value = Value::MakeVarchar(snapshot_size_str);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

output_block_ptr->Finalize();
operator_state->output_.emplace_back(std::move(output_block_ptr));
Expand Down
62 changes: 62 additions & 0 deletions src/executor/operator/snapshot/snapshot.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright(C) 2024 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module;

#include <filesystem>

module snapshot;

import stl;
import txn;
import query_context;
import table_entry;
import status;
import third_party;
import config;
import virtual_store;
import logger;

namespace infinity {

Status Snapshot::DropSnapshot(QueryContext *query_context, const String &snapshot_name) {

String snapshot_dir = query_context->global_config()->SnapshotDir();
bool found = false;
for (const auto &entry : std::filesystem::directory_iterator(snapshot_dir)) {
if (entry.is_directory()) {
// Don't search the directory recursively
} else {
// Just the file base name
if (entry.path().stem() == snapshot_name) {
String extension = entry.path().extension();
if (extension == ".json" or extension == ".lz4") {
VirtualStore::DeleteFile(entry.path().string());
found = true;
}
} else {
String filename = entry.path().filename();
LOG_WARN(fmt::format("Invalid snapshot file name: {}", filename));
}
}
}

if (!found) {
return Status::NotFound(fmt::format("Snapshot: {} not found", snapshot_name));
}

return Status::OK();
}

} // namespace infinity
Loading

0 comments on commit c89db0b

Please sign in to comment.