Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NOT FOR MERGE]Flush preaggregation vased on rows count #13681

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ extern TStatKey Combine_MaxRowsCount;

namespace {

bool HasMemoryForProcessing() {
return !TlsAllocState->IsMemoryYellowZoneEnabled();
}

struct TMyValueEqual {
TMyValueEqual(const TKeyTypes& types)
: Types(types)
Expand Down Expand Up @@ -244,11 +248,12 @@ class TState : public TComputationValue<TState> {
return KeyWidth + StateWidth;
}
public:
TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = false)
: TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), States(hash, equal, CountRowsOnPage) {
TState(TMemoryUsageInfo* memInfo, ui32 keyWidth, ui32 stateWidth, const THashFunc& hash, const TEqualsFunc& equal, bool allowOutOfMemory = true)
: TBase(memInfo), KeyWidth(keyWidth), StateWidth(stateWidth), AllowOutOfMemory(allowOutOfMemory), Hash(hash), Equal(equal) {
CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod());
CurrentPosition = 0;
Tongue = CurrentPage->data();
States = std::make_unique<TStates>(Hash, Equal, CountRowsOnPage);
}

~TState() {
Expand All @@ -261,15 +266,16 @@ class TState : public TComputationValue<TState> {

ExtractIt.reset();
Storage.clear();
States.Clear();
States->Clear();

CleanupCurrentContext();
}

bool TasteIt() {
Y_ABORT_UNLESS(!ExtractIt);
++Rows;
bool isNew = false;
auto itInsert = States.Insert(Tongue, isNew);
auto itInsert = States->Insert(Tongue, isNew);
if (isNew) {
CurrentPosition += RowSize();
if (CurrentPosition == CurrentPage->size()) {
Expand All @@ -278,16 +284,17 @@ class TState : public TComputationValue<TState> {
}
Tongue = CurrentPage->data() + CurrentPosition;
}
Throat = States.GetKey(itInsert) + KeyWidth;
Throat = States->GetKey(itInsert) + KeyWidth;
if (isNew) {
GrowStates();
}
IsOutOfMemory = IsOutOfMemory || (!HasMemoryForProcessing() && Rows > 1000);
return isNew;
}

void GrowStates() {
try {
States.CheckGrow();
States->CheckGrow();
} catch (TMemoryLimitExceededException) {
YQL_LOG(INFO) << "State failed to grow";
if (IsOutOfMemory || !AllowOutOfMemory) {
Expand All @@ -309,38 +316,44 @@ class TState : public TComputationValue<TState> {
return true;
}

if (!States.Empty())
if (!States->Empty())
return false;

{
TStorage localStorage;
std::swap(localStorage, Storage);

}

CurrentPage = &Storage.emplace_back(RowSize() * CountRowsOnPage, NUdf::TUnboxedValuePod());
CurrentPosition = 0;
Tongue = CurrentPage->data();
StoredDataSize = 0;
IsOutOfMemory = false;
Rows = 0;


States = std::make_unique<TStates>(Hash, Equal, CountRowsOnPage);
CleanupCurrentContext();
return true;
}

void PushStat(IStatsRegistry* stats) const {
if (!States.Empty()) {
MKQL_SET_MAX_STAT(stats, Combine_MaxRowsCount, static_cast<i64>(States.GetSize()));
if (!States->Empty()) {
MKQL_SET_MAX_STAT(stats, Combine_MaxRowsCount, static_cast<i64>(States->GetSize()));
MKQL_INC_STAT(stats, Combine_FlushesCount);
}
}

NUdf::TUnboxedValuePod* Extract() {
if (!ExtractIt) {
ExtractIt.emplace(Storage, RowSize(), States.GetSize());
ExtractIt.emplace(Storage, RowSize(), States->GetSize());
} else {
ExtractIt->Next();
}
if (!ExtractIt->IsValid()) {
ExtractIt.reset();
States.Clear();
States->Clear();
return nullptr;
}
NUdf::TUnboxedValuePod* result = ExtractIt->GetValuePtr();
Expand All @@ -352,17 +365,20 @@ class TState : public TComputationValue<TState> {
NUdf::TUnboxedValuePod* Tongue = nullptr;
NUdf::TUnboxedValuePod* Throat = nullptr;
i64 StoredDataSize = 0;
bool IsOutOfMemory = false;
NYql::NUdf::TCounter CounterOutputRows_;
ui64 Rows = 0;

private:
std::optional<TStorageIterator> ExtractIt;
const ui32 KeyWidth, StateWidth;
const bool AllowOutOfMemory;
bool IsOutOfMemory = false;
ui64 CurrentPosition = 0;
TRow* CurrentPage = nullptr;
TStorage Storage;
TStates States;
std::unique_ptr<TStates> States;
const THashFunc Hash;
const TEqualsFunc Equal;
};

class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
Expand Down Expand Up @@ -861,7 +877,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
for (auto &b: SpilledBuckets) {
b.SpilledState = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, KeyAndStateType, 5_MB);
b.SpilledData = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, UsedInputItemType, 5_MB);
b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal);
b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth, KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal, false);
}
break;
}
Expand Down Expand Up @@ -889,10 +905,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
Mode = mode;
}

bool HasMemoryForProcessing() const {
return !TlsAllocState->IsMemoryYellowZoneEnabled();
}

bool IsSwitchToSpillingModeCondition() const {
return !HasMemoryForProcessing() || TlsAllocState->GetMaximumLimitValueReached();
}
Expand Down Expand Up @@ -1048,7 +1060,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideCombinerWrapper

Nodes.ExtractKey(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue));
Nodes.ProcessItem(ctx, ptr->TasteIt() ? nullptr : static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
} while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage - ptr->StoredDataSize));
} while (!ctx.template CheckAdjustedMemLimit<TrackRss>(MemLimit, initUsage - ptr->StoredDataSize) && !ptr->IsOutOfMemory);

ptr->PushStat(ctx.Stats);
}
Expand Down
Loading