Skip to content

Commit

Permalink
Reset internal skeleton front queues when stuck (#14042)
Browse files Browse the repository at this point in the history
  • Loading branch information
serbel324 authored Jan 31, 2025
1 parent d0ca0e1 commit 71a3ed4
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 0 deletions.
61 changes: 61 additions & 0 deletions ydb/core/blobstorage/ut_blobstorage/vdisk_malfunction.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
#include <ydb/core/blobstorage/ut_blobstorage/lib/common.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>

#include <util/stream/null.h>

#include "ut_helpers.h"

#define Ctest Cnull

Y_UNIT_TEST_SUITE(VDiskMalfunction) {
Y_UNIT_TEST(StuckInternalQueues) {
TEnvironmentSetup env({
.NodeCount = 8,
.Erasure = TBlobStorageGroupType::Erasure4Plus2Block,
});
env.CreateBoxAndPool(1, 1);
env.Sim(TDuration::Minutes(1));

auto groups = env.GetGroups();
UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1);

ui32 groupId = env.GetGroupInfo(groups.front())->GroupID.GetRawId();
TActorId edge = env.Runtime->AllocateEdgeActor(1);

ui32 initialPuts = 100;

env.Runtime->FilterFunction = [&](ui32 /*nodeId*/, std::unique_ptr<IEventHandle>& ev) {
if (ev->Type == TEvVDiskRequestCompleted::EventType) {
return false;
}
return true;
};

auto sendPut = [&](ui64 cookie) {
env.Runtime->WrapInActorContext(edge, [&] {
TString data = MakeData(100000);
TLogoBlobID blobId(1, 1, 1, 1, data.size(), cookie);
TEvBlobStorage::TEvPut* ev = new TEvBlobStorage::TEvPut(blobId, data, TInstant::Max());
SendToBSProxy(edge, groupId, ev);
});
};

for (ui32 i = 0; i < initialPuts; ++i) {
sendPut(i);
}

for (ui32 i = 0; i < initialPuts; ++i) {
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(
edge, false, TInstant::Max());
}

env.Sim(TDuration::Minutes(10));
env.Runtime->FilterFunction = {};

sendPut(initialPuts);
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(
edge, false, TAppData::TimeProvider->Now() + TDuration::Seconds(10));
UNIT_ASSERT(res);
}
}
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_blobstorage/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ SRCS(
sync.cpp
ut_helpers.cpp
validation.cpp
vdisk_malfunction.cpp
)

PEERDIR(
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h
Original file line number Diff line number Diff line change
Expand Up @@ -729,5 +729,15 @@ public:
COUNTER_DEF(BlobsFixed);
};

class TMalfunctionGroup : public TBase {
public:
GROUP_CONSTRUCTOR(TMalfunctionGroup)
{
COUNTER_INIT(DroppingStuckInternalQueue, false);
}

COUNTER_DEF(DroppingStuckInternalQueue);
};

} // NMonGroup
} // NKikimr
60 changes: 60 additions & 0 deletions ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ namespace NKikimr {
const ui64 MaxInFlightCount;
const ui64 MaxInFlightCost;
THashMap<ui64, TMsgInfo> Msgs;

TMonotonic LastUpdate;
static constexpr TDuration StuckQueueThreshold = TDuration::Minutes(3);

public:
const NKikimrBlobStorage::EVDiskInternalQueueId IntQueueId;
const TString Name;
Expand Down Expand Up @@ -205,6 +209,7 @@ namespace NKikimr {
, Deadlines(0)
, MaxInFlightCount(maxInFlightCount)
, MaxInFlightCost(maxInFlightCost)
, LastUpdate(TActivationContext::Monotonic())
, IntQueueId(intQueueId)
, Name(name)
, SkeletonFrontInFlightCount(MakeCounter(skeletonFrontGroup, "InFlightCount", false, false))
Expand Down Expand Up @@ -244,6 +249,7 @@ namespace NKikimr {
*SkeletonFrontInFlightBytes += recByteSize;

Msgs.emplace(internalMessageId, TMsgInfo(msgId.MsgId, ctx.Now(), std::move(trace)));
UpdateState();
} else {
// enqueue
++DelayedCount;
Expand Down Expand Up @@ -303,6 +309,7 @@ namespace NKikimr {
*SkeletonFrontInFlightBytes += recByteSize;

Msgs.emplace(rec->InternalMessageId, TMsgInfo(rec->MsgId.MsgId, ctx.Now(), std::move(rec->Trace)));
UpdateState();
}
Queue->Pop();
} else {
Expand All @@ -314,6 +321,11 @@ namespace NKikimr {
public:
template <class TFront>
void Completed(const TActorContext &ctx, const TVMsgContext &msgCtx, TFront &front) {
if (!Msgs.contains(msgCtx.InternalMessageId)) {
// Completed request after resetting queue
return;
}

Y_ABORT_UNLESS(InFlightCount >= 1 && InFlightBytes >= msgCtx.RecByteSize && InFlightCost >= msgCtx.Cost,
"IntQueueId# %s InFlightCount# %" PRIu64 " InFlightBytes# %" PRIu64
" InFlightCost# %" PRIu64 " msgCtx# %s Deadlines# %" PRIu64,
Expand All @@ -332,6 +344,7 @@ namespace NKikimr {
const size_t numErased = Msgs.erase(msgCtx.InternalMessageId);
Y_ABORT_UNLESS(numErased == 1);

UpdateState();
ProcessNext(ctx, front, false);
}

Expand Down Expand Up @@ -418,6 +431,28 @@ namespace NKikimr {
str << "<br>";
}

void UpdateState() {
LastUpdate = TActivationContext::Monotonic();
}

bool IsStuck() const {
return InFlightCount > 0 && TActivationContext::Monotonic() - LastUpdate > StuckQueueThreshold;
}

void ResetQueue() {
InFlightCount = 0;
InFlightCost = 0;
InFlightBytes = 0;

*SkeletonFrontInFlightCount = 0;
*SkeletonFrontInFlightCost = 0;
*SkeletonFrontInFlightBytes = 0;
*SkeletonFrontCostProcessed = 0;

Msgs.clear();
UpdateState();
}

TString GenerateHtmlState() const {
// NOTE: warning policy:
// 1. For InFlightCount and InFlightCost we output them in yellow, if
Expand Down Expand Up @@ -666,12 +701,15 @@ namespace NKikimr {
NMonGroup::TSyncerGroup SyncerMonGroup;
NMonGroup::TVDiskStateGroup VDiskMonGroup;
NMonGroup::TCostGroup CostGroup;
NMonGroup::TMalfunctionGroup MalfunctionGroup;
TVDiskIncarnationGuid VDiskIncarnationGuid;
bool HasUnreadableBlobs = false;
TInstant LastSanitizeTime = TInstant::Zero();
TInstant LastSanitizeWithErrorTime = TInstant::Zero();
ui64 NextUniqueMessageId = 1;

static constexpr TDuration StuckQueueCheckPeriod = TDuration::Seconds(60);

ui64 AllocateMessageId() {
return NextUniqueMessageId++;
}
Expand Down Expand Up @@ -821,6 +859,7 @@ namespace NKikimr {
}
case TEvFrontRecoveryStatus::SyncGuidRecoveryDone:
Become(&TThis::StateFunc);
HandleWakeup(ctx);
SendNotifications(ctx);
break;
default: Y_ABORT("Unexpected case");
Expand Down Expand Up @@ -1932,6 +1971,7 @@ namespace NKikimr {
IgnoreFunc(TEvVDiskRequestCompleted)
HFunc(NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate, Handle)
fFunc(TEvBlobStorage::EvForwardToSkeleton, HandleForwardToSkeleton)
IgnoreFunc(TEvents::TEvWakeup)
)

////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -2036,6 +2076,24 @@ namespace NKikimr {
Send(ev);
}

void HandleWakeup(const TActorContext& ctx) {
for (TIntQueueClass* queue : { IntQueueAsyncGets.get(), IntQueueFastGets.get(),
IntQueueDiscover.get(), IntQueueLowGets.get(), IntQueueLogPuts.get(),
IntQueueHugePutsForeground.get(), IntQueueHugePutsBackground.get() }) {
if (queue->IsStuck()) {
queue->DropWithError(ctx, *this);
queue->ResetQueue();
DisconnectClients(ctx);
LOG_CRIT_S(ctx, NKikimrServices::BS_SKELETON, VCtx->VDiskLogPrefix
<< "Stuck internal queue detected, dropping queues, "
<< " Queue.Name# " << queue->Name
<< " Marker# BSVSF08");
++MalfunctionGroup.DroppingStuckInternalQueue();
}
}
Schedule(StuckQueueCheckPeriod, new TEvents::TEvWakeup);
}

STRICT_STFUNC(StateFunc,
HFunc(TEvBlobStorage::TEvVMovedPatch, Check)
HFunc(TEvBlobStorage::TEvVPatchStart, Check)
Expand Down Expand Up @@ -2081,6 +2139,7 @@ namespace NKikimr {
HFunc(NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate, Handle)
fFunc(TEvBlobStorage::EvForwardToSkeleton, HandleForwardToSkeleton)
hFunc(TEvMinHugeBlobSizeUpdate, Handle)
CFunc(NActors::TEvents::TSystem::Wakeup, HandleWakeup)
)

#define HFuncStatus(TEvType, status, errorReason, now, wstatus) \
Expand Down Expand Up @@ -2207,6 +2266,7 @@ namespace NKikimr {
, SyncerMonGroup(VDiskCounters, "subsystem", "syncer")
, VDiskMonGroup(VDiskCounters, "subsystem", "state")
, CostGroup(VDiskCounters, "subsystem", "cost")
, MalfunctionGroup(VDiskCounters, "subsystem", "malfunction")
{
ReplMonGroup.ReplUnreplicatedVDisks() = 1;
VDiskMonGroup.VDiskState(NKikimrWhiteboard::EVDiskState::Initial);
Expand Down

0 comments on commit 71a3ed4

Please sign in to comment.