Skip to content

Commit

Permalink
refactor cancel checker for lake compactions in be
Browse files Browse the repository at this point in the history
Signed-off-by: drake_wang <[email protected]>
  • Loading branch information
wxl24life committed Jan 10, 2025
1 parent e88d410 commit 2f80404
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 77 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ CONF_mInt64(lake_metadata_cache_limit, /*2GB=*/"2147483648");
CONF_mBool(lake_print_delete_log, "false");
CONF_mInt64(lake_compaction_stream_buffer_size_bytes, "1048576"); // 1MB
// The interval to check whether lake compaction is valid. Set to <= 0 to disable the check.
CONF_mInt32(lake_compaction_check_valid_interval_minutes, "30"); // 30 minutes
CONF_mInt32(lake_compaction_check_valid_interval_minutes, "5"); // 5 minutes
// Used to ensure service availability in extreme situations by sacrificing a certain degree of correctness
CONF_mBool(experimental_lake_ignore_lost_segment, "false");
CONF_mInt64(experimental_lake_wait_per_put_ms, "0");
Expand Down
72 changes: 39 additions & 33 deletions be/src/storage/lake/compaction_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,18 +181,16 @@ void CompactionScheduler::compact(::google::protobuf::RpcController* controller,
// thread to avoid blocking other transactions, but if there are idle threads, they will steal
// tasks from busy threads to execute.
auto cb = std::make_shared<CompactionTaskCallback>(this, request, response, done);
bool is_checker = true; // make the first tablet as checker
std::vector<std::unique_ptr<CompactionTaskContext>> contexts_vec;
for (auto tablet_id : request->tablet_ids()) {
auto context = std::make_unique<CompactionTaskContext>(request->txn_id(), tablet_id, request->version(),
request->force_base_compaction(), is_checker, cb);
request->force_base_compaction(), cb);
{
std::lock_guard l(_contexts_lock);
_contexts.Append(context.get());
}
contexts_vec.push_back(std::move(context));
// DO NOT touch `context` from here!
is_checker = false;
}
// initialize last check time, compact request is received right after FE sends it, so consider it valid now
cb->set_last_check_time(time(nullptr));
Expand Down Expand Up @@ -304,50 +302,58 @@ Status compaction_should_cancel(CompactionTaskContext* context) {
RETURN_IF_ERROR(context->callback->has_error());

int64_t check_interval_seconds = 60LL * config::lake_compaction_check_valid_interval_minutes;
if (!context->is_checker || check_interval_seconds <= 0) {
if (check_interval_seconds <= 0) {
return Status::OK();
}

int64_t now = time(nullptr);
int64_t last_check_time = context->callback->last_check_time();
if (now > last_check_time && (now - last_check_time) >= check_interval_seconds) {
// ask FE whether this compaction transaction is still valid
std::unique_lock<std::mutex> lock(context->callback->_txn_valid_check_mutex, std::defer_lock);
if (lock.try_lock()) {
// Check again inside the locked section to avoid race conditions
last_check_time = context->callback->last_check_time();
if (now <= last_check_time || (now - last_check_time) < check_interval_seconds) {
return Status::OK();
}

// ask FE whether this compaction transaction is still valid
#ifndef BE_TEST
TNetworkAddress master_addr = get_master_address();
if (master_addr.hostname.size() > 0 && master_addr.port > 0) {
TReportLakeCompactionRequest request;
request.__set_txn_id(context->txn_id);
TReportLakeCompactionResponse result;
auto status = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->reportLakeCompaction(result, request);
},
3000 /* timeout 3 seconds */);
if (status.ok()) {
if (!result.valid) {
// notify all tablets in this compaction request
LOG(WARNING) << "validate compaction transaction " << context->txn_id << " for tablet "
<< context->tablet_id << ", abort invalid compaction";
Status rs = Status::Aborted("compaction validation failed");
context->callback->update_status(rs);
return rs; // should cancel compaction
TNetworkAddress master_addr = get_master_address();
if (master_addr.hostname.size() > 0 && master_addr.port > 0) {
TReportLakeCompactionRequest request;
request.__set_txn_id(context->txn_id);
TReportLakeCompactionResponse result;
auto status = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->reportLakeCompaction(result, request);
},
3000 /* timeout 3 seconds */);
if (status.ok()) {
if (!result.valid) {
// notify all tablets in this compaction request
LOG(WARNING) << "validate compaction transaction " << context->txn_id << " for tablet "
<< context->tablet_id << ", abort invalid compaction";
Status rs = Status::Aborted("compaction validation failed");
context->callback->update_status(rs);
return rs; // should cancel compaction
} else {
// everything is fine
}
} else {
// everything is fine
LOG(WARNING) << "fail to validate compaction transaction " << context->txn_id << " for tablet "
<< context->tablet_id << ", error: " << status;
}
} else {
LOG(WARNING) << "fail to validate compaction transaction " << context->txn_id << " for tablet "
<< context->tablet_id << ", error: " << status;
<< context->tablet_id << ", error: leader FE address not found";
}
} else {
LOG(WARNING) << "fail to validate compaction transaction " << context->txn_id << " for tablet "
<< context->tablet_id << ", error: leader FE address not found";
}
#endif
// update check time, if check rpc failed, wait next round
context->callback->set_last_check_time(now);
// update check time, if check rpc failed, wait next round
context->callback->set_last_check_time(now);
}
}
return Status::OK();
}

Status CompactionScheduler::do_compaction(std::unique_ptr<CompactionTaskContext> context) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/lake/compaction_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class CompactionTaskCallback {
return _last_check_time;
}

public:
std::mutex _txn_valid_check_mutex;

private:
const static int64_t kDefaultTimeoutMs = 24L * 60 * 60 * 1000; // 1 day

Expand Down
5 changes: 1 addition & 4 deletions be/src/storage/lake/compaction_task_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ struct CompactionTaskStats {
// Context of a single tablet compaction task.
struct CompactionTaskContext : public butil::LinkNode<CompactionTaskContext> {
explicit CompactionTaskContext(int64_t txn_id_, int64_t tablet_id_, int64_t version_, bool force_base_compaction_,
bool is_checker_, std::shared_ptr<CompactionTaskCallback> cb_)
std::shared_ptr<CompactionTaskCallback> cb_)
: txn_id(txn_id_),
tablet_id(tablet_id_),
version(version_),
force_base_compaction(force_base_compaction_),
is_checker(is_checker_),
callback(std::move(cb_)) {}

#ifndef NDEBUG
Expand All @@ -82,8 +81,6 @@ struct CompactionTaskContext : public butil::LinkNode<CompactionTaskContext> {
std::atomic<int64_t> finish_time{0};
std::atomic<bool> skipped{false};
std::atomic<int> runs{0};
// the first tablet of a compaction request, will ask FE periodically to see if compaction is valid
bool is_checker;
Status status;
Progress progress;
int64_t enqueue_time_sec; // time point when put into queue
Expand Down
16 changes: 7 additions & 9 deletions be/test/storage/lake/compaction_scheduler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,15 @@ class LakeCompactionSchedulerTest : public TestBase {

TEST_F(LakeCompactionSchedulerTest, test_task_queue) {
CompactionScheduler::WrapTaskQueues queue(10);
auto ctx =
std::make_unique<CompactionTaskContext>(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */,
false /* force_base_compaction */, false /* is_checker */, nullptr);
auto ctx = std::make_unique<CompactionTaskContext>(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */,
false /* force_base_compaction */, nullptr);
queue.set_target_size(5);
ASSERT_EQ(5, queue.target_size());
queue.put_by_txn_id(ctx->txn_id, ctx);

std::vector<std::unique_ptr<CompactionTaskContext>> v;
auto ctx2 =
std::make_unique<CompactionTaskContext>(101 /* txn_id */, 102 /* tablet_id */, 1 /* version */,
false /* force_base_compaction */, false /* is_checker */, nullptr);
auto ctx2 = std::make_unique<CompactionTaskContext>(101 /* txn_id */, 102 /* tablet_id */, 1 /* version */,
false /* force_base_compaction */, nullptr);
v.push_back(std::move(ctx2));
queue.put_by_txn_id(101 /* txn_id */, v);
}
Expand Down Expand Up @@ -113,7 +111,7 @@ TEST_F(LakeCompactionSchedulerTest, test_compaction_cancel) {
{
auto cb = std::make_shared<CompactionTaskCallback>(nullptr, &request, &response, nullptr);
CompactionTaskContext ctx(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */,
false /* force_base_compaction */, false /* is_checker */, cb);
false /* force_base_compaction */, cb);
cb->update_status(Status::Aborted("aborted for test"));
EXPECT_FALSE(compaction_should_cancel(&ctx).ok());
}
Expand All @@ -122,15 +120,15 @@ TEST_F(LakeCompactionSchedulerTest, test_compaction_cancel) {
{
auto cb = std::make_shared<CompactionTaskCallback>(nullptr, &request, &response, nullptr);
CompactionTaskContext ctx(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */,
false /* force_base_compaction */, false /* is_checker */, cb);
false /* force_base_compaction */, cb);
EXPECT_TRUE(compaction_should_cancel(&ctx).ok());
}

// is checker
{
auto cb = std::make_shared<CompactionTaskCallback>(nullptr, &request, &response, nullptr);
CompactionTaskContext ctx(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */,
false /* force_base_compaction */, true /* is_checker */, cb);
false /* force_base_compaction */, cb);
cb->set_last_check_time(0);
EXPECT_TRUE(compaction_should_cancel(&ctx).ok());
}
Expand Down
3 changes: 1 addition & 2 deletions be/test/storage/lake/compaction_task_context_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class CompactionTaskContextTest : public testing::Test {
protected:
// Implement a mock version of CompactionTaskCallback if needed
std::shared_ptr<CompactionTaskCallback> callback;
CompactionTaskContext context{123, 456, 789, false, false, callback};
CompactionTaskContext context{123, 456, 789, false, callback};

void SetUp() override {
// Initialize your context or mock callback here if necessary
Expand All @@ -54,7 +54,6 @@ TEST_F(CompactionTaskContextTest, test_constructor) {
EXPECT_EQ(123, context.txn_id);
EXPECT_EQ(456, context.tablet_id);
EXPECT_EQ(789, context.version);
EXPECT_EQ(false, context.is_checker);
}

TEST_F(CompactionTaskContextTest, test_calculation) {
Expand Down
12 changes: 6 additions & 6 deletions be/test/storage/lake/compaction_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ TEST_P(LakeDuplicateKeyCompactionTest, test1) {
ASSERT_EQ(kChunkSize * 3, read(version));

auto txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, false, nullptr);
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
check_task(task);
ASSERT_OK(task->execute(CompactionTask::kNoCancelFn));
Expand Down Expand Up @@ -203,7 +203,7 @@ TEST_P(LakeDuplicateKeyCompactionTest, test_empty_tablet) {

auto txn_id = next_id();
auto tablet_id = _tablet_metadata->id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, false, nullptr);
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
ASSERT_OK(task->execute(CompactionTask::kNoCancelFn));
EXPECT_EQ(100, task_context->progress.value());
Expand Down Expand Up @@ -309,7 +309,7 @@ TEST_P(LakeDuplicateKeyOverlapSegmentsCompactionTest, test) {
// Cancelled compaction task
{
auto txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, false, nullptr);
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
check_task(task);
auto st = task->execute(CompactionTask::kCancelledFn);
Expand All @@ -319,7 +319,7 @@ TEST_P(LakeDuplicateKeyOverlapSegmentsCompactionTest, test) {
// Completed compaction task without error
{
auto txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, false, nullptr);
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
check_task(task);
ASSERT_OK(task->execute(CompactionTask::kNoCancelFn));
Expand Down Expand Up @@ -463,7 +463,7 @@ TEST_P(LakeUniqueKeyCompactionTest, test1) {
ASSERT_EQ(kChunkSize, read(version));

auto txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, false, nullptr);
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
check_task(task);
ASSERT_OK(task->execute(CompactionTask::kNoCancelFn));
Expand Down Expand Up @@ -610,7 +610,7 @@ TEST_P(LakeUniqueKeyCompactionWithDeleteTest, test_base_compaction_with_delete)
}

auto txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, false, nullptr);
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
check_task(task);
ASSERT_OK(task->execute(CompactionTask::kNoCancelFn));
Expand Down
4 changes: 2 additions & 2 deletions be/test/storage/lake/lake_primary_key_consistency_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa

Status compact_op() {
auto txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, _tablet_metadata->id(), _version, false,
false, nullptr);
auto task_context =
std::make_unique<CompactionTaskContext>(txn_id, _tablet_metadata->id(), _version, false, nullptr);
ASSIGN_OR_RETURN(auto task, _tablet_mgr->compact(task_context.get()));
RETURN_IF_ERROR(task->execute(CompactionTask::kNoCancelFn));
RETURN_IF_ERROR(publish_single_version(_tablet_metadata->id(), _version + 1, txn_id));
Expand Down
Loading

0 comments on commit 2f80404

Please sign in to comment.