From 2f80404d0ae2d48f549f199ad95d0a5f7781bb0b Mon Sep 17 00:00:00 2001 From: "sunny.xl" Date: Fri, 10 Jan 2025 11:50:42 +0800 Subject: [PATCH] refactor cancel checker for lake compactions in be Signed-off-by: drake_wang --- be/src/common/config.h | 2 +- be/src/storage/lake/compaction_scheduler.cpp | 72 ++++++++++--------- be/src/storage/lake/compaction_scheduler.h | 3 + be/src/storage/lake/compaction_task_context.h | 5 +- .../lake/compaction_scheduler_test.cpp | 16 ++--- .../lake/compaction_task_context_test.cpp | 3 +- be/test/storage/lake/compaction_task_test.cpp | 12 ++-- .../lake_primary_key_consistency_test.cpp | 4 +- .../lake/primary_key_compaction_task_test.cpp | 28 ++++---- .../storage/lake/primary_key_publish_test.cpp | 10 +-- 10 files changed, 78 insertions(+), 77 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index c7e08ca45d0bbd..f773779395a275 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1039,7 +1039,7 @@ CONF_mInt64(lake_metadata_cache_limit, /*2GB=*/"2147483648"); CONF_mBool(lake_print_delete_log, "false"); CONF_mInt64(lake_compaction_stream_buffer_size_bytes, "1048576"); // 1MB // The interval to check whether lake compaction is valid. Set to <= 0 to disable the check. -CONF_mInt32(lake_compaction_check_valid_interval_minutes, "30"); // 30 minutes +CONF_mInt32(lake_compaction_check_valid_interval_minutes, "5"); // 5 minutes // Used to ensure service availability in extreme situations by sacrificing a certain degree of correctness CONF_mBool(experimental_lake_ignore_lost_segment, "false"); CONF_mInt64(experimental_lake_wait_per_put_ms, "0"); diff --git a/be/src/storage/lake/compaction_scheduler.cpp b/be/src/storage/lake/compaction_scheduler.cpp index 64f44dcd794efb..fadf09fecd8ef1 100644 --- a/be/src/storage/lake/compaction_scheduler.cpp +++ b/be/src/storage/lake/compaction_scheduler.cpp @@ -181,18 +181,16 @@ void CompactionScheduler::compact(::google::protobuf::RpcController* controller, // thread to avoid blocking other transactions, but if there are idle threads, they will steal // tasks from busy threads to execute. auto cb = std::make_shared(this, request, response, done); - bool is_checker = true; // make the first tablet as checker std::vector> contexts_vec; for (auto tablet_id : request->tablet_ids()) { auto context = std::make_unique(request->txn_id(), tablet_id, request->version(), - request->force_base_compaction(), is_checker, cb); + request->force_base_compaction(), cb); { std::lock_guard l(_contexts_lock); _contexts.Append(context.get()); } contexts_vec.push_back(std::move(context)); // DO NOT touch `context` from here! - is_checker = false; } // initialize last check time, compact request is received right after FE sends it, so consider it valid now cb->set_last_check_time(time(nullptr)); @@ -304,50 +302,58 @@ Status compaction_should_cancel(CompactionTaskContext* context) { RETURN_IF_ERROR(context->callback->has_error()); int64_t check_interval_seconds = 60LL * config::lake_compaction_check_valid_interval_minutes; - if (!context->is_checker || check_interval_seconds <= 0) { + if (check_interval_seconds <= 0) { return Status::OK(); } int64_t now = time(nullptr); int64_t last_check_time = context->callback->last_check_time(); if (now > last_check_time && (now - last_check_time) >= check_interval_seconds) { - // ask FE whether this compaction transaction is still valid + std::unique_lock lock(context->callback->_txn_valid_check_mutex, std::defer_lock); + if (lock.try_lock()) { + // Check again inside the locked section to avoid race conditions + last_check_time = context->callback->last_check_time(); + if (now <= last_check_time || (now - last_check_time) < check_interval_seconds) { + return Status::OK(); + } + + // ask FE whether this compaction transaction is still valid #ifndef BE_TEST - TNetworkAddress master_addr = get_master_address(); - if (master_addr.hostname.size() > 0 && master_addr.port > 0) { - TReportLakeCompactionRequest request; - request.__set_txn_id(context->txn_id); - TReportLakeCompactionResponse result; - auto status = ThriftRpcHelper::rpc( - master_addr.hostname, master_addr.port, - [&request, &result](FrontendServiceConnection& client) { - client->reportLakeCompaction(result, request); - }, - 3000 /* timeout 3 seconds */); - if (status.ok()) { - if (!result.valid) { - // notify all tablets in this compaction request - LOG(WARNING) << "validate compaction transaction " << context->txn_id << " for tablet " - << context->tablet_id << ", abort invalid compaction"; - Status rs = Status::Aborted("compaction validation failed"); - context->callback->update_status(rs); - return rs; // should cancel compaction + TNetworkAddress master_addr = get_master_address(); + if (master_addr.hostname.size() > 0 && master_addr.port > 0) { + TReportLakeCompactionRequest request; + request.__set_txn_id(context->txn_id); + TReportLakeCompactionResponse result; + auto status = ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->reportLakeCompaction(result, request); + }, + 3000 /* timeout 3 seconds */); + if (status.ok()) { + if (!result.valid) { + // notify all tablets in this compaction request + LOG(WARNING) << "validate compaction transaction " << context->txn_id << " for tablet " + << context->tablet_id << ", abort invalid compaction"; + Status rs = Status::Aborted("compaction validation failed"); + context->callback->update_status(rs); + return rs; // should cancel compaction + } else { + // everything is fine + } } else { - // everything is fine + LOG(WARNING) << "fail to validate compaction transaction " << context->txn_id << " for tablet " + << context->tablet_id << ", error: " << status; } } else { LOG(WARNING) << "fail to validate compaction transaction " << context->txn_id << " for tablet " - << context->tablet_id << ", error: " << status; + << context->tablet_id << ", error: leader FE address not found"; } - } else { - LOG(WARNING) << "fail to validate compaction transaction " << context->txn_id << " for tablet " - << context->tablet_id << ", error: leader FE address not found"; - } #endif - // update check time, if check rpc failed, wait next round - context->callback->set_last_check_time(now); + // update check time, if check rpc failed, wait next round + context->callback->set_last_check_time(now); + } } - return Status::OK(); } Status CompactionScheduler::do_compaction(std::unique_ptr context) { diff --git a/be/src/storage/lake/compaction_scheduler.h b/be/src/storage/lake/compaction_scheduler.h index 8c974ded046bff..195d6c00aa7157 100644 --- a/be/src/storage/lake/compaction_scheduler.h +++ b/be/src/storage/lake/compaction_scheduler.h @@ -80,6 +80,9 @@ class CompactionTaskCallback { return _last_check_time; } +public: + std::mutex _txn_valid_check_mutex; + private: const static int64_t kDefaultTimeoutMs = 24L * 60 * 60 * 1000; // 1 day diff --git a/be/src/storage/lake/compaction_task_context.h b/be/src/storage/lake/compaction_task_context.h index 0c6977038a89f3..d73084b6cf14ff 100644 --- a/be/src/storage/lake/compaction_task_context.h +++ b/be/src/storage/lake/compaction_task_context.h @@ -60,12 +60,11 @@ struct CompactionTaskStats { // Context of a single tablet compaction task. struct CompactionTaskContext : public butil::LinkNode { explicit CompactionTaskContext(int64_t txn_id_, int64_t tablet_id_, int64_t version_, bool force_base_compaction_, - bool is_checker_, std::shared_ptr cb_) + std::shared_ptr cb_) : txn_id(txn_id_), tablet_id(tablet_id_), version(version_), force_base_compaction(force_base_compaction_), - is_checker(is_checker_), callback(std::move(cb_)) {} #ifndef NDEBUG @@ -82,8 +81,6 @@ struct CompactionTaskContext : public butil::LinkNode { std::atomic finish_time{0}; std::atomic skipped{false}; std::atomic runs{0}; - // the first tablet of a compaction request, will ask FE periodically to see if compaction is valid - bool is_checker; Status status; Progress progress; int64_t enqueue_time_sec; // time point when put into queue diff --git a/be/test/storage/lake/compaction_scheduler_test.cpp b/be/test/storage/lake/compaction_scheduler_test.cpp index b6380e51142e52..b548e3b578a103 100644 --- a/be/test/storage/lake/compaction_scheduler_test.cpp +++ b/be/test/storage/lake/compaction_scheduler_test.cpp @@ -49,17 +49,15 @@ class LakeCompactionSchedulerTest : public TestBase { TEST_F(LakeCompactionSchedulerTest, test_task_queue) { CompactionScheduler::WrapTaskQueues queue(10); - auto ctx = - std::make_unique(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */, - false /* force_base_compaction */, false /* is_checker */, nullptr); + auto ctx = std::make_unique(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */, + false /* force_base_compaction */, nullptr); queue.set_target_size(5); ASSERT_EQ(5, queue.target_size()); queue.put_by_txn_id(ctx->txn_id, ctx); std::vector> v; - auto ctx2 = - std::make_unique(101 /* txn_id */, 102 /* tablet_id */, 1 /* version */, - false /* force_base_compaction */, false /* is_checker */, nullptr); + auto ctx2 = std::make_unique(101 /* txn_id */, 102 /* tablet_id */, 1 /* version */, + false /* force_base_compaction */, nullptr); v.push_back(std::move(ctx2)); queue.put_by_txn_id(101 /* txn_id */, v); } @@ -113,7 +111,7 @@ TEST_F(LakeCompactionSchedulerTest, test_compaction_cancel) { { auto cb = std::make_shared(nullptr, &request, &response, nullptr); CompactionTaskContext ctx(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */, - false /* force_base_compaction */, false /* is_checker */, cb); + false /* force_base_compaction */, cb); cb->update_status(Status::Aborted("aborted for test")); EXPECT_FALSE(compaction_should_cancel(&ctx).ok()); } @@ -122,7 +120,7 @@ TEST_F(LakeCompactionSchedulerTest, test_compaction_cancel) { { auto cb = std::make_shared(nullptr, &request, &response, nullptr); CompactionTaskContext ctx(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */, - false /* force_base_compaction */, false /* is_checker */, cb); + false /* force_base_compaction */, cb); EXPECT_TRUE(compaction_should_cancel(&ctx).ok()); } @@ -130,7 +128,7 @@ TEST_F(LakeCompactionSchedulerTest, test_compaction_cancel) { { auto cb = std::make_shared(nullptr, &request, &response, nullptr); CompactionTaskContext ctx(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */, - false /* force_base_compaction */, true /* is_checker */, cb); + false /* force_base_compaction */, cb); cb->set_last_check_time(0); EXPECT_TRUE(compaction_should_cancel(&ctx).ok()); } diff --git a/be/test/storage/lake/compaction_task_context_test.cpp b/be/test/storage/lake/compaction_task_context_test.cpp index 8d9fa3304bd0a0..465e45816cd357 100644 --- a/be/test/storage/lake/compaction_task_context_test.cpp +++ b/be/test/storage/lake/compaction_task_context_test.cpp @@ -43,7 +43,7 @@ class CompactionTaskContextTest : public testing::Test { protected: // Implement a mock version of CompactionTaskCallback if needed std::shared_ptr callback; - CompactionTaskContext context{123, 456, 789, false, false, callback}; + CompactionTaskContext context{123, 456, 789, false, callback}; void SetUp() override { // Initialize your context or mock callback here if necessary @@ -54,7 +54,6 @@ TEST_F(CompactionTaskContextTest, test_constructor) { EXPECT_EQ(123, context.txn_id); EXPECT_EQ(456, context.tablet_id); EXPECT_EQ(789, context.version); - EXPECT_EQ(false, context.is_checker); } TEST_F(CompactionTaskContextTest, test_calculation) { diff --git a/be/test/storage/lake/compaction_task_test.cpp b/be/test/storage/lake/compaction_task_test.cpp index 9af09cfbd56bcb..60f6ae1a9c84af 100644 --- a/be/test/storage/lake/compaction_task_test.cpp +++ b/be/test/storage/lake/compaction_task_test.cpp @@ -166,7 +166,7 @@ TEST_P(LakeDuplicateKeyCompactionTest, test1) { ASSERT_EQ(kChunkSize * 3, read(version)); auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); @@ -203,7 +203,7 @@ TEST_P(LakeDuplicateKeyCompactionTest, test_empty_tablet) { auto txn_id = next_id(); auto tablet_id = _tablet_metadata->id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); EXPECT_EQ(100, task_context->progress.value()); @@ -309,7 +309,7 @@ TEST_P(LakeDuplicateKeyOverlapSegmentsCompactionTest, test) { // Cancelled compaction task { auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); auto st = task->execute(CompactionTask::kCancelledFn); @@ -319,7 +319,7 @@ TEST_P(LakeDuplicateKeyOverlapSegmentsCompactionTest, test) { // Completed compaction task without error { auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); @@ -463,7 +463,7 @@ TEST_P(LakeUniqueKeyCompactionTest, test1) { ASSERT_EQ(kChunkSize, read(version)); auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); @@ -610,7 +610,7 @@ TEST_P(LakeUniqueKeyCompactionWithDeleteTest, test_base_compaction_with_delete) } auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); diff --git a/be/test/storage/lake/lake_primary_key_consistency_test.cpp b/be/test/storage/lake/lake_primary_key_consistency_test.cpp index 556a439744e1bb..c05c821d336e11 100644 --- a/be/test/storage/lake/lake_primary_key_consistency_test.cpp +++ b/be/test/storage/lake/lake_primary_key_consistency_test.cpp @@ -544,8 +544,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa Status compact_op() { auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, _tablet_metadata->id(), _version, false, - false, nullptr); + auto task_context = + std::make_unique(txn_id, _tablet_metadata->id(), _version, false, nullptr); ASSIGN_OR_RETURN(auto task, _tablet_mgr->compact(task_context.get())); RETURN_IF_ERROR(task->execute(CompactionTask::kNoCancelFn)); RETURN_IF_ERROR(publish_single_version(_tablet_metadata->id(), _version + 1, txn_id)); diff --git a/be/test/storage/lake/primary_key_compaction_task_test.cpp b/be/test/storage/lake/primary_key_compaction_task_test.cpp index 0b73914597046a..4ba9f291724413 100644 --- a/be/test/storage/lake/primary_key_compaction_task_test.cpp +++ b/be/test/storage/lake/primary_key_compaction_task_test.cpp @@ -218,7 +218,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test1) { } auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); @@ -276,7 +276,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test2) { ASSERT_EQ(kChunkSize * 3, read(version)); auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); @@ -343,7 +343,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test3) { ASSERT_EQ(kChunkSize * 2, read(version)); auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); @@ -618,7 +618,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_policy_min_input) { { // do compaction without input rowsets auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); EXPECT_EQ(100, task_context->progress.value()); @@ -725,7 +725,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_sorted) { ASSERT_EQ(kChunkSize * 3, read(version)); auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); @@ -813,7 +813,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_remove_compaction_state) { } auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); @@ -896,7 +896,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_abort_txn) { auto txn_id = next_id(); std::thread t1([&]() { - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); @@ -957,7 +957,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_multi_output_seg) { // make sure compact can generate more than one segment in output rowset config::max_segment_file_size = 50; config::vector_chunk_size = 10; - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); @@ -1024,7 +1024,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_pk_recover_rowset_order_after_compact) config::lake_pk_compaction_max_input_rowsets = 2; auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); @@ -1216,7 +1216,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_rows_mapper) { ExecEnv::GetInstance()->delete_file_thread_pool()->wait(); auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); @@ -1308,8 +1308,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_data_load_conc) { // compact rowset 1 & 2 without publish auto compact_txn_id = next_id(); { - auto task_context = - std::make_unique(compact_txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(compact_txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); @@ -1378,7 +1377,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_major_compaction) { EXPECT_EQ(new_tablet_metadata->rowsets_size(), N); auto txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); @@ -1441,8 +1440,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_major_compaction_thread_safe) { workers.emplace_back([&]() { for (int i = 0; i < N; i++) { auto txn_id = next_id() + N * 10; - auto task_context = - std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); check_task(task); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); diff --git a/be/test/storage/lake/primary_key_publish_test.cpp b/be/test/storage/lake/primary_key_publish_test.cpp index 5221be144669a6..9777ee90f56b1c 100644 --- a/be/test/storage/lake/primary_key_publish_test.cpp +++ b/be/test/storage/lake/primary_key_publish_test.cpp @@ -1394,7 +1394,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_index_rebuild_with_dels) { auto old_val = config::lake_pk_compaction_min_input_segments; config::lake_pk_compaction_min_input_segments = 1; int64_t txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); EXPECT_EQ(100, task_context->progress.value()); @@ -1560,7 +1560,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_index_rebuild_with_dels3) { auto old_val = config::lake_pk_compaction_min_input_segments; config::lake_pk_compaction_min_input_segments = 1; int64_t txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); EXPECT_EQ(100, task_context->progress.value()); @@ -1622,7 +1622,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_index_rebuild_with_dels4) { auto old_val = config::lake_pk_compaction_min_input_segments; config::lake_pk_compaction_min_input_segments = 1; int64_t txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); EXPECT_EQ(100, task_context->progress.value()); @@ -1723,7 +1723,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_individual_index_compaction) { config::lake_pk_compaction_min_input_segments = 1; config::lake_pk_index_sst_max_compaction_versions = 1; int64_t txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); EXPECT_EQ(100, task_context->progress.value()); @@ -1741,7 +1741,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_individual_index_compaction) { // 4. compaction with sst { int64_t txn_id = next_id(); - auto task_context = std::make_unique(txn_id, tablet_id, version, false, false, nullptr); + auto task_context = std::make_unique(txn_id, tablet_id, version, false, nullptr); ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get())); ASSERT_OK(task->execute(CompactionTask::kNoCancelFn)); EXPECT_EQ(100, task_context->progress.value());