diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 19d23021882a..3e87a8b780bd 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -242,7 +242,7 @@ class TRowDispatcher : public TActorBootstrapped { struct TAggregatedStats{ NYql::TCounters::TEntry AllSessionsReadBytes; - TMap LastQueryStats; + TMap> LastQueryStats; TDuration LastUpdateMetricsPeriod; }; @@ -364,6 +364,7 @@ class TRowDispatcher : public TActorBootstrapped { TString GetInternalState(); template bool CheckSession(TAtomicSharedPtr& consumer, const TEventPtr& ev); + void SetQueryMetrics(const TQueryStatKey& queryKey, ui64 unreadBytesMax, ui64 unreadBytesAvg, i64 readLagMessagesMax); void PrintStateToLog(); STRICT_STFUNC( @@ -516,7 +517,9 @@ void TRowDispatcher::UpdateMetrics() { } AggrStats.AllSessionsReadBytes = NYql::TCounters::TEntry(); - AggrStats.LastQueryStats.clear(); + for (auto& [queryId, stat] : AggrStats.LastQueryStats) { + stat = Nothing(); + } for (auto& [key, sessionsInfo] : TopicSessions) { const auto& topic = key.TopicPath; @@ -527,18 +530,34 @@ void TRowDispatcher::UpdateMetrics() { sessionInfo.Stat.Clear(); for (auto& [readActorId, consumer] : sessionInfo.Consumers) { - AggrStats.LastQueryStats[TQueryStatKey{consumer->QueryId, topic}].Add(consumer->Stat); + auto& stat = AggrStats.LastQueryStats[TQueryStatKey{consumer->QueryId, topic}]; + if (!stat) { + stat = TAggQueryStat(); + } + stat->Add(consumer->Stat); consumer->Stat.Clear(); } } } - for (const auto& [queryStatKey, stat] : AggrStats.LastQueryStats) { - auto queryGroup = Metrics.Counters->GetSubgroup("queryId", queryStatKey.first); - auto topicGroup = queryGroup->GetSubgroup("topic", CleanupCounterValueString(queryStatKey.second)); - topicGroup->GetCounter("MaxUnreadBytes")->Set(stat.UnreadBytes.Max); - topicGroup->GetCounter("AvgUnreadBytes")->Set(stat.UnreadBytes.Avg); - topicGroup->GetCounter("MaxReadLag")->Set(stat.ReadLagMessages.Max); + for (auto it = AggrStats.LastQueryStats.begin(); it != AggrStats.LastQueryStats.end();) { + const auto& stats = it->second; + if (!stats) { + SetQueryMetrics(it->first, 0, 0, 0); + it = AggrStats.LastQueryStats.erase(it); + continue; + } + SetQueryMetrics(it->first, stats->UnreadBytes.Max, stats->UnreadBytes.Avg, stats->ReadLagMessages.Max); + ++it; } + PrintStateToLog(); +} + +void TRowDispatcher::SetQueryMetrics(const TQueryStatKey& queryKey, ui64 unreadBytesMax, ui64 unreadBytesAvg, i64 readLagMessagesMax) { + auto queryGroup = Metrics.Counters->GetSubgroup("queryId", queryKey.first); + auto topicGroup = queryGroup->GetSubgroup("topic", CleanupCounterValueString(queryKey.second)); + topicGroup->GetCounter("MaxUnreadBytes")->Set(unreadBytesMax); + topicGroup->GetCounter("AvgUnreadBytes")->Set(unreadBytesAvg); + topicGroup->GetCounter("MaxReadLag")->Set(readLagMessagesMax); } TString TRowDispatcher::GetInternalState() { @@ -598,7 +617,9 @@ TString TRowDispatcher::GetInternalState() { auto sessionsBufferSumSize = sessionCountByQuery[queryStatKey] * MaxSessionBufferSizeBytes; auto used = sessionsBufferSumSize ? (stat.UnreadBytes.Sum * 100.0 / sessionsBufferSumSize) : 0.0; str << " " << queryId << " / " << topic << ": buffer used (all partitions) " << LeftPad(Prec(used, 4), 10) << "% (" << toHuman(stat.UnreadBytes.Sum) << ") unread max (one partition) " << toHuman(stat.UnreadBytes.Max) << " data rate"; - printDataRate(aggStat.ReadBytes); + if (aggStat) { + printDataRate(aggStat->ReadBytes); + } str << " waiting " << stat.IsWaiting << " max read lag " << stat.ReadLagMessages.Max; str << "\n"; }