Skip to content

Commit

Permalink
Implement PDisk shred reaction to Harakiri and YardInit, #12483 (#13685)
Browse files Browse the repository at this point in the history
  • Loading branch information
the-ancient-1 authored Jan 22, 2025
1 parent 0dc9b3c commit 9527980
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 119 deletions.
7 changes: 7 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk.h
Original file line number Diff line number Diff line change
Expand Up @@ -1727,5 +1727,12 @@ struct TPDiskCtx {
} \
} while (false)

#define S_LOG(LEVEL, MARKER, ...) \
do { \
if (PCtx && PCtx->ActorSystem) { \
STLOGX(*PCtx->ActorSystem, LEVEL, BS_PDISK_SHRED, MARKER, __VA_ARGS__, (PDiskId, PCtx->PDiskId)); \
} \
} while (false)

} // NPDisk
} // NKikimr
237 changes: 136 additions & 101 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp

Large diffs are not rendered by default.

15 changes: 3 additions & 12 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,21 +151,14 @@ class TPDisk : public IPDisk {
enum EShredState {
EShredStateDefault = 0,
EShredStateSendPreShredCompactVDisk = 1,
EShredStateGatherPreShredCompactVDisksResponse = 2,
EShredStateSendShredVDisk = 3,
EShredStateGatherShredVDisksResponse = 4,
EShredStateFinished = 5,
EShredStateFailed = 6,
EShredStateSendShredVDisk = 2,
EShredStateFinished = 3,
EShredStateFailed = 4,
};
EShredState ShredState = EShredStateDefault;
ui64 ShredGeneration = 0;
std::deque<TActorId> ShredRequesters;

i64 PreShredCompactVDiskRequestsSent = 0;
i64 PreShredCompactVDiskResponsesReceived = 0;
i64 ShredRequestsSent = 0;
i64 ShredResponsesReceived = 0;

// Chunks that are owned by killed owner, but have operations InFlight
TVector<TChunkIdx> QuarantineChunks;
TVector<TOwner> QuarantineOwners;
Expand Down Expand Up @@ -406,8 +399,6 @@ class TPDisk : public IPDisk {
void HandleNextWriteMetadata();
void ProcessWriteMetadataResult(TWriteMetadataResult& request);

void SendPreShredCompactVDiskRequests();
void SendShredVDiskRequests();
void ProgressShredState();
void ProcessShredPDisk(TShredPDisk& request);
void ProcessPreShredCompactVDiskResult(TPreShredCompactVDiskResult& request);
Expand Down
22 changes: 22 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,26 @@ struct TOwnerData {
return RenderStatus(Status);
}

const char* GetStringShredState() const {
return RenderShredState(ShredState);
}

static const char* RenderShredState(const EVDiskShredState shredState) {
switch(shredState) {
case EVDiskShredState::VDISK_SHRED_STATE_NOT_REQUESTED:
return "Not requested";
case EVDiskShredState::VDISK_SHRED_STATE_COMPACT_REQUESTED:
return "Compact requested";
case EVDiskShredState::VDISK_SHRED_STATE_COMPACT_FINISHED:
return "Compact finished";
case EVDiskShredState::VDISK_SHRED_STATE_SHRED_REQUESTED:
return "Shred requested";
case EVDiskShredState::VDISK_SHRED_STATE_SHRED_FINISHED:
return "Shred finished";
}
return "Unexpected enum value";
}

static const char* RenderStatus(const EVDiskStatus status) {
switch(status) {
case VDISK_STATUS_DEFAULT:
Expand Down Expand Up @@ -230,6 +250,8 @@ struct TOwnerData {
InFlight.Reset(TIntrusivePtr<TOwnerInflight>(new TOwnerInflight));
}
OnQuarantine = quarantine;
LastShredGeneration = 0;
ShredState = VDISK_SHRED_STATE_NOT_REQUESTED;
}
};

Expand Down
94 changes: 88 additions & 6 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1434,7 +1434,6 @@ Y_UNIT_TEST_SUITE(ReadOnlyPDisk) {
Y_UNIT_TEST_SUITE(ShredPDisk) {
Y_UNIT_TEST(EmptyShred) {
ui64 shredGeneration = 1;

TActorTestContext testCtx{{}};
TVDiskMock vdisk(&testCtx);
THolder<NPDisk::TEvShredPDiskResult> res = testCtx.TestResponse<NPDisk::TEvShredPDiskResult>(new NPDisk::TEvShredPDisk(shredGeneration), NKikimrProto::OK);
Expand All @@ -1443,19 +1442,102 @@ Y_UNIT_TEST_SUITE(ShredPDisk) {
}
Y_UNIT_TEST(SimpleShred) {
ui64 shredGeneration = 1;

TActorTestContext testCtx{{}};
TVDiskMock vdisk(&testCtx);
vdisk.InitFull();
vdisk.SendEvLogSync();

testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration));

vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, "");
vdisk.RespondToShred(shredGeneration, NKikimrProto::OK, "");

THolder<NPDisk::TEvShredPDiskResult> res = testCtx.TestResponse<NPDisk::TEvShredPDiskResult>(nullptr, NKikimrProto::OK);

UNIT_ASSERT_VALUES_EQUAL(res->ErrorReason, "");
UNIT_ASSERT_VALUES_EQUAL(res->ShredGeneration, shredGeneration);
}
Y_UNIT_TEST(KillVDiskWhilePreShredding) {
ui64 shredGeneration = 1;
TActorTestContext testCtx{{}};
TVDiskMock vdisk(&testCtx);
vdisk.InitFull();
vdisk.SendEvLogSync();
testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration));
THolder<NPDisk::TEvPreShredCompactVDisk> evReq = testCtx.Recv<NPDisk::TEvPreShredCompactVDisk>();
UNIT_ASSERT_VALUES_UNEQUAL(evReq.Get(), nullptr);
vdisk.PerformHarakiri();
THolder<NPDisk::TEvShredPDiskResult> res = testCtx.TestResponse<NPDisk::TEvShredPDiskResult>(
nullptr, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(res->ErrorReason, "");
UNIT_ASSERT_VALUES_EQUAL(res->ShredGeneration, shredGeneration);
}
Y_UNIT_TEST(KillVDiskWhileShredding) {
ui64 shredGeneration = 1;
TActorTestContext testCtx{{}};
TVDiskMock vdisk(&testCtx);
vdisk.InitFull();
vdisk.SendEvLogSync();
testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration));
vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, "");
THolder<NPDisk::TEvShredVDisk> evReq = testCtx.Recv<NPDisk::TEvShredVDisk>();
UNIT_ASSERT_VALUES_UNEQUAL(evReq.Get(), nullptr);
vdisk.PerformHarakiri();
THolder<NPDisk::TEvShredPDiskResult> res = testCtx.TestResponse<NPDisk::TEvShredPDiskResult>(
nullptr, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(res->ErrorReason, "");
UNIT_ASSERT_VALUES_EQUAL(res->ShredGeneration, shredGeneration);
}
Y_UNIT_TEST(InitVDiskAfterShredding) {
ui64 shredGeneration = 1;
TActorTestContext testCtx{{}};
TVDiskMock vdisk(&testCtx);
TVDiskMock vdisk2(&testCtx);
vdisk.InitFull();
vdisk.SendEvLogSync();
vdisk2.InitFull();
vdisk2.SendEvLogSync();
testCtx.RestartPDiskSync();
vdisk.InitFull();
vdisk.SendEvLogSync();
testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration));
vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, "");
vdisk2.InitFull();
vdisk2.SendEvLogSync();
vdisk2.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, "");
vdisk.RespondToShred(shredGeneration, NKikimrProto::OK, "");
vdisk2.RespondToShred(shredGeneration, NKikimrProto::OK, "");
THolder<NPDisk::TEvShredPDiskResult> res = testCtx.TestResponse<NPDisk::TEvShredPDiskResult>(nullptr, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(res->ErrorReason, "");
UNIT_ASSERT_VALUES_EQUAL(res->ShredGeneration, shredGeneration);
}
Y_UNIT_TEST(ReinitVDiskWhilePreShredding) {
ui64 shredGeneration = 1;
TActorTestContext testCtx{{}};
TVDiskMock vdisk(&testCtx);
vdisk.InitFull();
vdisk.SendEvLogSync();
testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration));
THolder<NPDisk::TEvPreShredCompactVDisk> evReq = testCtx.Recv<NPDisk::TEvPreShredCompactVDisk>();
UNIT_ASSERT_VALUES_UNEQUAL(evReq.Get(), nullptr);
vdisk.InitFull();
vdisk.SendEvLogSync();
vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, "");
vdisk.RespondToShred(shredGeneration, NKikimrProto::OK, "");
THolder<NPDisk::TEvShredPDiskResult> res = testCtx.TestResponse<NPDisk::TEvShredPDiskResult>(nullptr, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(res->ErrorReason, "");
UNIT_ASSERT_VALUES_EQUAL(res->ShredGeneration, shredGeneration);
}
Y_UNIT_TEST(ReinitVDiskWhileShredding) {
ui64 shredGeneration = 1;
TActorTestContext testCtx{{}};
TVDiskMock vdisk(&testCtx);
vdisk.InitFull();
vdisk.SendEvLogSync();
testCtx.Send(new NPDisk::TEvShredPDisk(shredGeneration));
vdisk.RespondToPreShredCompact(shredGeneration, NKikimrProto::OK, "");
THolder<NPDisk::TEvShredVDisk> evReq = testCtx.Recv<NPDisk::TEvShredVDisk>();
UNIT_ASSERT_VALUES_UNEQUAL(evReq.Get(), nullptr);
vdisk.InitFull();
vdisk.SendEvLogSync();
vdisk.RespondToShred(shredGeneration, NKikimrProto::OK, "");
THolder<NPDisk::TEvShredPDiskResult> res = testCtx.TestResponse<NPDisk::TEvShredPDiskResult>(nullptr, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(res->ErrorReason, "");
UNIT_ASSERT_VALUES_EQUAL(res->ShredGeneration, shredGeneration);
}
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ struct TActorTestContext {
Runtime->SetLogPriority(NKikimrServices::BS_PDISK, NLog::PRI_NOTICE);
Runtime->SetLogPriority(NKikimrServices::BS_PDISK_SYSLOG, NLog::PRI_NOTICE);
Runtime->SetLogPriority(NKikimrServices::BS_PDISK_TEST, NLog::PRI_DEBUG);
Runtime->SetLogPriority(NKikimrServices::BS_PDISK_SHRED, NLog::PRI_DEBUG);
Sender = Runtime->AllocateEdgeActor();

auto cfg = DefaultPDiskConfig(Settings.IsBad);
Expand Down Expand Up @@ -348,6 +349,12 @@ struct TVDiskMock {
return LastUsedLsn + 1 - FirstLsnToKeep;
}

void PerformHarakiri() {
TestCtx->TestResponse<NPDisk::TEvHarakiriResult>(
new NPDisk::TEvHarakiri(PDiskParams->Owner, PDiskParams->OwnerRound),
NKikimrProto::OK);
}

void RespondToPreShredCompact(ui64 shredGeneration, NKikimrProto::EReplyStatus status, const TString& errorReason) {
THolder<NPDisk::TEvPreShredCompactVDisk> evReq = TestCtx->Recv<NPDisk::TEvPreShredCompactVDisk>();
if (evReq) {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/services/services.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ enum EServiceKikimr {
BS_REPL = 273;
BS_PDISK = 274;
BS_PDISK_TEST = 1102;
BS_PDISK_SHRED = 1103;
BS_YARD = 275;
BS_PROXY = 276;
BS_NODE = 277;
Expand Down

0 comments on commit 9527980

Please sign in to comment.