diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp index b755ab15516d..c6f046ab9ad5 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp @@ -31,6 +31,10 @@ extern TStatKey Combine_MaxRowsCount; namespace { +bool HasMemoryForProcessing() { + return !TlsAllocState->IsMemoryYellowZoneEnabled(); +} + struct TMyValueEqual { TMyValueEqual(const TKeyTypes& types) : Types(types) @@ -244,11 +248,12 @@ class TState : public TComputationValue { return KeyWidth + StateWidth; } public: - TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = false) - : TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), States(hash, equal, CountRowsOnPage) { + TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = true) + : TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), Hash(hash), Equal(equal) { CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod()); CurrentPosition = 0; Tongue = CurrentPage->data(); + States = std::make_unique(Hash, Equal, CountRowsOnPage); } ~TState() { @@ -261,15 +266,16 @@ class TState : public TComputationValue { ExtractIt.reset(); Storage.clear(); - States.Clear(); + States->Clear(); CleanupCurrentContext(); } bool TasteIt() { Y_ABORT_UNLESS(!ExtractIt); + ++Rows; bool isNew = false; - auto itInsert = States.Insert(Tongue, isNew); + auto itInsert = States->Insert(Tongue, isNew); if (isNew) { CurrentPosition += RowSize(); if (CurrentPosition == CurrentPage->size()) { @@ -278,16 +284,17 @@ class TState : public TComputationValue { } Tongue = CurrentPage->data() + CurrentPosition; } - Throat = States.GetKey(itInsert) + KeyWidth; + Throat = States->GetKey(itInsert) + KeyWidth; if (isNew) { GrowStates(); } + IsOutOfMemory = IsOutOfMemory || (!HasMemoryForProcessing() && Rows > 1000); return isNew; } void GrowStates() { try { - States.CheckGrow(); + States->CheckGrow(); } catch (TMemoryLimitExceededException) { YQL_LOG(INFO) << "State failed to grow"; if (IsOutOfMemory || !AllowOutOfMemory) { @@ -309,38 +316,44 @@ class TState : public TComputationValue { return true; } - if (!States.Empty()) + if (!States->Empty()) return false; { TStorage localStorage; std::swap(localStorage, Storage); + } + CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod()); CurrentPosition = 0; Tongue = CurrentPage->data(); StoredDataSize = 0; + IsOutOfMemory = false; + Rows = 0; + + States = std::make_unique(Hash, Equal, CountRowsOnPage); CleanupCurrentContext(); return true; } void PushStat(IStatsRegistry* stats) const { - if (!States.Empty()) { - MKQL_SET_MAX_STAT(stats, Combine_MaxRowsCount, static_cast(States.GetSize())); + if (!States->Empty()) { + MKQL_SET_MAX_STAT(stats, Combine_MaxRowsCount, static_cast(States->GetSize())); MKQL_INC_STAT(stats, Combine_FlushesCount); } } NUdf::TUnboxedValuePod* Extract() { if (!ExtractIt) { - ExtractIt.emplace(Storage, RowSize(), States.GetSize()); + ExtractIt.emplace(Storage, RowSize(), States->GetSize()); } else { ExtractIt->Next(); } if (!ExtractIt->IsValid()) { ExtractIt.reset(); - States.Clear(); + States->Clear(); return nullptr; } NUdf::TUnboxedValuePod* result = ExtractIt->GetValuePtr(); @@ -352,17 +365,20 @@ class TState : public TComputationValue { NUdf::TUnboxedValuePod* Tongue = nullptr; NUdf::TUnboxedValuePod* Throat = nullptr; i64 StoredDataSize = 0; + bool IsOutOfMemory = false; NYql::NUdf::TCounter CounterOutputRows_; + ui64 Rows = 0; private: std::optional ExtractIt; const ui32 KeyWidth, StateWidth; const bool AllowOutOfMemory; - bool IsOutOfMemory = false; ui64 CurrentPosition = 0; TRow* CurrentPage = nullptr; TStorage Storage; - TStates States; + std::unique_ptr States; + const THashFunc Hash; + const TEqualsFunc Equal; }; class TSpillingSupportState : public TComputationValue { @@ -861,7 +877,7 @@ class TSpillingSupportState : public TComputationValue { for (auto &b: SpilledBuckets) { b.SpilledState = std::make_unique(spiller, KeyAndStateType, 5_MB); b.SpilledData = std::make_unique(spiller, UsedInputItemType, 5_MB); - b.InMemoryProcessingState = std::make_unique(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal); + b.InMemoryProcessingState = std::make_unique(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal, false); } break; } @@ -889,10 +905,6 @@ class TSpillingSupportState : public TComputationValue { Mode = mode; } - bool HasMemoryForProcessing() const { - return !TlsAllocState->IsMemoryYellowZoneEnabled(); - } - bool IsSwitchToSpillingModeCondition() const { return !HasMemoryForProcessing() || TlsAllocState->GetMaximumLimitValueReached(); } @@ -1048,7 +1060,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode(ptr->Tongue)); Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast(ptr->Tongue), static_cast(ptr->Throat)); - } while (!ctx.template CheckAdjustedMemLimit(MemLimit, initUsage - ptr->StoredDataSize)); + } while (!ctx.template CheckAdjustedMemLimit(MemLimit, initUsage - ptr->StoredDataSize) && !ptr->IsOutOfMemory); ptr->PushStat(ctx.Stats); }