Skip to content

Commit

Permalink
YQ-3893 Add timeout in read_actor after TEvUndelivered (#13692)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Jan 22, 2025
1 parent b6b0e97 commit 84ac963
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ struct TEvPrivate {
class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::NDq::NInternal::TDqPqReadActorBase {

const ui64 PrintStatePeriodSec = 300;
const ui64 SleepPeriodSec = 2;
const ui64 ProcessStatePeriodSec = 1;

struct TReadyBatch {
public:
Expand Down Expand Up @@ -305,7 +305,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
void NotifyCA();
void SendStartSession(TSession& sessionInfo);
void Init();
void Sleep();
void ScheduleProcessState();
void ProcessGlobalState();
void ProcessSessionsState();
void UpdateSessions();
Expand Down Expand Up @@ -629,7 +629,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev
}

void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev) {
SRC_LOG_T("TEvRetry, EventQueueId " << ev->Get()->EventQueueId);
SRC_LOG_T("Received TEvRetry, EventQueueId " << ev->Get()->EventQueueId);
Counters.Retry++;

auto readActorIt = ReadActorByEventQueueId.find(ev->Get()->EventQueueId);
Expand All @@ -651,13 +651,13 @@ void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartb
Counters.PrivateHeartbeat++;
auto readActorIt = ReadActorByEventQueueId.find(ev->Get()->EventQueueId);
if (readActorIt == ReadActorByEventQueueId.end()) {
SRC_LOG_D("Ignore TEvRetry, wrong EventQueueId " << ev->Get()->EventQueueId);
SRC_LOG_D("Ignore TEvEvHeartbeat, wrong EventQueueId " << ev->Get()->EventQueueId);
return;
}

auto sessionIt = Sessions.find(readActorIt->second);
if (sessionIt == Sessions.end()) {
SRC_LOG_D("Ignore TEvRetry, wrong read actor id " << readActorIt->second);
SRC_LOG_D("Ignore TEvEvHeartbeat, wrong read actor id " << readActorIt->second);
return;
}
auto& sessionInfo = sessionIt->second;
Expand Down Expand Up @@ -700,15 +700,15 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr

CoordinatorActorId = ev->Get()->CoordinatorActorId;
ReInit("Coordinator is changed");
Sleep();
ScheduleProcessState();
}

void TDqPqRdReadActor::Sleep() {
void TDqPqRdReadActor::ScheduleProcessState() {
if (ProcessStateScheduled) {
return;
}
ProcessStateScheduled = true;
Schedule(TDuration::Seconds(SleepPeriodSec), new TEvPrivate::TEvProcessState());
Schedule(TDuration::Seconds(ProcessStatePeriodSec), new TEvPrivate::TEvProcessState());
}

void TDqPqRdReadActor::ReInit(const TString& reason) {
Expand Down Expand Up @@ -776,16 +776,16 @@ void TDqPqRdReadActor::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
ReadActorByEventQueueId.erase(sessionInfo.EventQueueId);
Sessions.erase(sessionIt);
ReInit("Reset session state (by TEvUndelivered)");
ScheduleProcessState();
}
}
}

if (CoordinatorActorId && *CoordinatorActorId == ev->Sender) {
ReInit("TEvUndelivered to coordinator");
Sleep();
ScheduleProcessState();
return;
}
ProcessState();
}

void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) {
Expand Down

0 comments on commit 84ac963

Please sign in to comment.