Skip to content

Commit

Permalink
Compaction sequentially in case huge memory usage (#13668)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 22, 2025
1 parent 3c535d3 commit 362d8a2
Show file tree
Hide file tree
Showing 15 changed files with 365 additions and 118 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1809,6 +1809,7 @@ message TColumnShardConfig {
optional uint64 WritingInFlightRequestBytesLimit = 35 [default = 128000000];
optional NKikimrSchemeOp.EColumnCodec DefaultCompression = 36;
optional int32 DefaultCompressionLevel = 37;
optional uint64 MemoryLimitMergeOnCompactionRawData = 38 [default = 512000000];
}

message TSchemeShardConfig {
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/common/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,10 @@ bool TBlobRangeLink16::CheckBlob(const TUnifiedBlobId& blobId) const {
return Offset + Size <= blobId.BlobSize();
}

TString TBlobRangeLink16::GetBlobData(const TString& blob) const {
AFL_VERIFY(Offset < blob.size());
AFL_VERIFY(Offset + Size <= blob.size());
return blob.substr(Offset, Size);
}

} // namespace NKikimr::NOlap
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/common/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class TBlobRangeLink16 {

TBlobRange RestoreRange(const TUnifiedBlobId& blobId) const;
bool CheckBlob(const TUnifiedBlobId& blobId) const;
TString GetBlobData(const TString& blob) const;
};

struct TBlobRange {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/counters/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ TIndexationCounters::TIndexationCounters(const TString& module)

CompactionDuration = TBase::GetHistogram("CompactionDuration", NMonitoring::ExponentialHistogram(18, 2, 20));
HistogramCompactionInputBytes = TBase::GetHistogram("CompactionInput/Bytes", NMonitoring::ExponentialHistogram(18, 2, 1024));
HistogramCompactionCorrectRawBytes = TBase::GetHistogram("CompactionCorrectInput/Bytes", NMonitoring::ExponentialHistogram(18, 2, 1024));
HistogramCompactionHugeRawBytes = TBase::GetHistogram("CompactionHugeInput/Bytes", NMonitoring::ExponentialHistogram(18, 2, 1024));
CompactionHugePartsCount = TBase::GetDeriviative("CompactionHugeInput/Parts");

CompactionInputBytes = TBase::GetDeriviative("CompactionInput/Bytes");
CompactionExceptions = TBase::GetDeriviative("Exceptions/Count");
CompactionFails = TBase::GetDeriviative("CompactionFails/Count");
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/tx/columnshard/counters/indexation.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ class TIndexationCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
NMonitoring::THistogramPtr HistogramCompactionInputBytes;

NMonitoring::THistogramPtr HistogramCompactionCorrectRawBytes;
NMonitoring::THistogramPtr HistogramCompactionHugeRawBytes;
NMonitoring::TDynamicCounters::TCounterPtr CompactionHugePartsCount;

public:
NMonitoring::TDynamicCounters::TCounterPtr CompactionInputBytes;

Expand Down Expand Up @@ -46,6 +51,15 @@ class TIndexationCounters: public TCommonCountersOwner {
HistogramCompactionInputBytes->Collect(size);
CompactionInputBytes->Add(size);
}

void OnCompactionCorrectMemory(const ui64 memorySize) const {
HistogramCompactionCorrectRawBytes->Collect(memorySize);
}

void OnCompactionHugeMemory(const ui64 memorySize, const ui32 partsCount) const {
HistogramCompactionHugeRawBytes->Collect(memorySize);
CompactionHugePartsCount->Add(partsCount);
}
};

}
48 changes: 48 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <ydb/core/tx/columnshard/data_locks/manager/manager.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/engines/portions/write_with_blobs.h>
#include <ydb/core/tx/columnshard/engines/scheme/versions/filtered_scheme.h>
#include <ydb/core/tx/columnshard/engines/storage/actualizer/common/address.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
#include <ydb/core/tx/columnshard/splitter/settings.h>
Expand Down Expand Up @@ -183,6 +184,53 @@ class TConstructionContext: TNonCopyable {
, Counters(counters)
, LastCommittedTx(lastCommittedTx) {
}

std::shared_ptr<TFilteredSnapshotSchema> BuildResultFiltered(
const std::vector<TPortionDataAccessor>& portionAccessors, std::set<ui32>& seqDataColumnIds) const {
auto resultSchema = SchemaVersions.GetLastSchema();
std::set<ui32> pkColumnIds;
{
auto pkColumnIdsVector = IIndexInfo::AddSnapshotFieldIds(resultSchema->GetIndexInfo().GetPKColumnIds());
pkColumnIds = std::set<ui32>(pkColumnIdsVector.begin(), pkColumnIdsVector.end());
}
std::set<ui32> dataColumnIds;
{
{
THashMap<ui64, ISnapshotSchema::TPtr> schemas;
for (auto& portion : portionAccessors) {
auto dataSchema = portion.GetPortionInfo().GetSchema(SchemaVersions);
schemas.emplace(dataSchema->GetVersion(), dataSchema);
}
dataColumnIds = ISnapshotSchema::GetColumnsWithDifferentDefaults(schemas, resultSchema);
}
for (auto&& accessor : portionAccessors) {
if (accessor.GetPortionInfo().GetMeta().GetDeletionsCount()) {
dataColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
}
if (dataColumnIds.size() != resultSchema->GetColumnsCount()) {
for (auto id : accessor.GetColumnIds()) {
if (resultSchema->HasColumnId(id)) {
dataColumnIds.emplace(id);
}
}
}
}
AFL_VERIFY(dataColumnIds.size() <= resultSchema->GetColumnsCount());
if (dataColumnIds.contains((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) {
pkColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
}
dataColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::WRITE_ID);
}
dataColumnIds.insert(IIndexInfo::GetSnapshotColumnIds().begin(), IIndexInfo::GetSnapshotColumnIds().end());
auto resultFiltered = std::make_shared<TFilteredSnapshotSchema>(resultSchema, dataColumnIds);
{
seqDataColumnIds = dataColumnIds;
for (auto&& i : pkColumnIds) {
AFL_VERIFY(seqDataColumnIds.erase(i))("id", i);
}
}
return resultFiltered;
}
};

class TGranuleMeta;
Expand Down
178 changes: 67 additions & 111 deletions ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "general_compaction.h"
#include "merge_subset.h"

#include "compaction/merger.h"
#include "counters/general.h"
Expand All @@ -10,126 +11,29 @@

namespace NKikimr::NOlap::NCompaction {

std::shared_ptr<NArrow::TColumnFilter> TGeneralCompactColumnEngineChanges::BuildPortionFilter(
const std::optional<NKikimr::NOlap::TGranuleShardingInfo>& shardingActual, const std::shared_ptr<NArrow::TGeneralContainer>& batch,
const TPortionInfo& pInfo, const THashSet<ui64>& portionsInUsage, const ISnapshotSchema::TPtr& /* resultSchema */) const {
std::shared_ptr<NArrow::TColumnFilter> filter;
if (shardingActual && pInfo.NeedShardingFilter(*shardingActual)) {
std::set<std::string> fieldNames;
for (auto&& i : shardingActual->GetShardingInfo()->GetColumnNames()) {
fieldNames.emplace(i);
}
auto table = batch->BuildTableVerified(fieldNames);
AFL_VERIFY(table);
filter = shardingActual->GetShardingInfo()->GetFilter(table);
}
NArrow::TColumnFilter filterDeleted = NArrow::TColumnFilter::BuildAllowFilter();
if (pInfo.GetMeta().GetDeletionsCount()) {
if (pInfo.HasInsertWriteId()) {
AFL_VERIFY(pInfo.GetMeta().GetDeletionsCount() == pInfo.GetRecordsCount());
filterDeleted = NArrow::TColumnFilter::BuildDenyFilter();
} else {
auto table = batch->BuildTableVerified(std::set<std::string>({ TIndexInfo::SPEC_COL_DELETE_FLAG }));
AFL_VERIFY(table);
auto col = table->GetColumnByName(TIndexInfo::SPEC_COL_DELETE_FLAG);
AFL_VERIFY(col);
AFL_VERIFY(col->type()->id() == arrow::Type::BOOL);
for (auto&& c : col->chunks()) {
auto bCol = static_pointer_cast<arrow::BooleanArray>(c);
for (ui32 i = 0; i < bCol->length(); ++i) {
filterDeleted.Add(!bCol->GetView(i));
}
}
}
if (GranuleMeta->GetPortionsIndex().HasOlderIntervals(pInfo, portionsInUsage)) {
filterDeleted = NArrow::TColumnFilter::BuildAllowFilter();
}
}
if (filter) {
*filter = filter->And(filterDeleted);
} else if (!filterDeleted.IsTotalAllowFilter()) {
filter = std::make_shared<NArrow::TColumnFilter>(std::move(filterDeleted));
}
return filter;
}

void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(
TConstructionContext& context, std::vector<TReadPortionInfoWithBlobs>&& portions) noexcept {
auto resultSchema = context.SchemaVersions.GetLastSchema();
std::vector<TWritePortionInfoWithBlobsResult> TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstructionContext& context,
std::vector<TPortionToMerge>&& portions, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered,
const std::shared_ptr<NArrow::NSplitter::TSerializationStats>& stats) noexcept {
auto shardingActual = context.SchemaVersions.GetShardingInfoActual(GranuleMeta->GetPathId());
if (portions.empty()) {
return;
return {};
}
std::shared_ptr<NArrow::NSplitter::TSerializationStats> stats = std::make_shared<NArrow::NSplitter::TSerializationStats>();
std::shared_ptr<TFilteredSnapshotSchema> resultFiltered;

NCompaction::TMerger merger(context, SaverContext);
merger.SetPortionExpectedSize(PortionExpectedSize);
{
std::set<ui32> pkColumnIds;
{
auto pkColumnIdsVector = IIndexInfo::AddSnapshotFieldIds(resultSchema->GetIndexInfo().GetPKColumnIds());
pkColumnIds = std::set<ui32>(pkColumnIdsVector.begin(), pkColumnIdsVector.end());
}
std::set<ui32> dataColumnIds;
{
{
THashMap<ui64, ISnapshotSchema::TPtr> schemas;
for (auto& portion : SwitchedPortions) {
auto dataSchema = portion->GetSchema(context.SchemaVersions);
schemas.emplace(dataSchema->GetVersion(), dataSchema);
}
dataColumnIds = ISnapshotSchema::GetColumnsWithDifferentDefaults(schemas, resultSchema);
}
for (auto&& i : SwitchedPortions) {
const auto& accessor = GetPortionDataAccessor(i->GetPortionId());
stats->Merge(accessor.GetSerializationStat(*resultSchema));
if (i->GetMeta().GetDeletionsCount()) {
dataColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
}
if (dataColumnIds.size() != resultSchema->GetColumnsCount()) {
for (auto id : accessor.GetColumnIds()) {
if (resultSchema->HasColumnId(id)) {
dataColumnIds.emplace(id);
}
}
}
}
AFL_VERIFY(dataColumnIds.size() <= resultSchema->GetColumnsCount());
if (dataColumnIds.contains((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) {
pkColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
}
dataColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::WRITE_ID);
}
dataColumnIds.insert(IIndexInfo::GetSnapshotColumnIds().begin(), IIndexInfo::GetSnapshotColumnIds().end());
resultFiltered = std::make_shared<TFilteredSnapshotSchema>(resultSchema, dataColumnIds);
{
auto seqDataColumnIds = dataColumnIds;
for (auto&& i : pkColumnIds) {
AFL_VERIFY(seqDataColumnIds.erase(i))("id", i);
}
THashSet<ui64> usedPortionIds;
for (auto&& i : portions) {
AFL_VERIFY(usedPortionIds.emplace(i.GetPortionInfo().GetPortionId()).second);
}

for (auto&& i : portions) {
auto blobsSchema = i.GetPortionInfo().GetSchema(context.SchemaVersions);
auto batch = i.RestoreBatch(*blobsSchema, *resultFiltered, seqDataColumnIds, false).DetachResult();
std::shared_ptr<NArrow::TColumnFilter> filter =
BuildPortionFilter(shardingActual, batch, i.GetPortionInfo(), usedPortionIds, resultFiltered);
merger.AddBatch(batch, filter);
}
}
for (auto&& i : portions) {
merger.AddBatch(i.GetBatch(), i.GetFilter());
}

std::optional<ui64> shardingActualVersion;
if (shardingActual) {
shardingActualVersion = shardingActual->GetSnapshotVersion();
}
AppendedPortions = merger.Execute(stats, CheckPoints, resultFiltered, GranuleMeta->GetPathId(), shardingActualVersion);
for (auto&& p : AppendedPortions) {
auto result = merger.Execute(stats, CheckPoints, resultFiltered, GranuleMeta->GetPathId(), shardingActualVersion);
for (auto&& p : result) {
p.GetPortionConstructor().MutablePortionConstructor().MutableMeta().UpdateRecordsMeta(NPortion::EProduced::SPLIT_COMPACTED);
}
return result;
}

TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
Expand All @@ -154,9 +58,60 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
}

{
std::vector<TReadPortionInfoWithBlobs> portions =
TReadPortionInfoWithBlobs::RestorePortions(GetPortionDataAccessors(SwitchedPortions), Blobs, context.SchemaVersions);
BuildAppendedPortionsByChunks(context, std::move(portions));
auto accessors = GetPortionDataAccessors(SwitchedPortions);
std::set<ui32> seqDataColumnIds;
std::shared_ptr<TFilteredSnapshotSchema> resultFiltered = context.BuildResultFiltered(accessors, seqDataColumnIds);
std::shared_ptr<NArrow::NSplitter::TSerializationStats> stats = std::make_shared<NArrow::NSplitter::TSerializationStats>();
for (auto&& accessor : accessors) {
stats->Merge(accessor.GetSerializationStat(*resultFiltered));
}

std::vector<TReadPortionInfoWithBlobs> portions = TReadPortionInfoWithBlobs::RestorePortions(accessors, Blobs, context.SchemaVersions);
THashSet<ui64> usedPortionIds;
std::vector<std::shared_ptr<ISubsetToMerge>> currentToMerge;
for (auto&& i : portions) {
AFL_VERIFY(usedPortionIds.emplace(i.GetPortionInfo().GetPortionId()).second);
currentToMerge.emplace_back(std::make_shared<TReadPortionToMerge>(std::move(i), GranuleMeta));
}
auto shardingActual = context.SchemaVersions.GetShardingInfoActual(GranuleMeta->GetPathId());
while (true) {
std::vector<TPortionToMerge> toMerge;
ui64 sumMemory = 0;
ui64 totalSumMemory = 0;
std::vector<std::shared_ptr<ISubsetToMerge>> appendedToMerge;
ui32 subsetsCount = 0;
for (auto&& i : currentToMerge) {
if (NYDBTest::TControllers::GetColumnShardController()->CheckPortionsToMergeOnCompaction(
sumMemory + i->GetColumnMaxChunkMemory(), subsetsCount) &&
subsetsCount > 1) {
appendedToMerge.emplace_back(std::make_shared<TWritePortionsToMerge>(
BuildAppendedPortionsByChunks(context, std::move(toMerge), resultFiltered, stats)));
toMerge.clear();
sumMemory = 0;
}
sumMemory += i->GetColumnMaxChunkMemory();
totalSumMemory += i->GetColumnMaxChunkMemory();
auto mergePortions = i->BuildPortionsToMerge(context, seqDataColumnIds, resultFiltered, usedPortionIds);
toMerge.insert(toMerge.end(), mergePortions.begin(), mergePortions.end());
++subsetsCount;
}
if (toMerge.size() > 1) {
auto merged = BuildAppendedPortionsByChunks(context, std::move(toMerge), resultFiltered, stats);
if (appendedToMerge.size()) {
appendedToMerge.emplace_back(std::make_shared<TWritePortionsToMerge>(std::move(merged)));
} else {
context.Counters.OnCompactionCorrectMemory(totalSumMemory);
AppendedPortions = std::move(merged);
break;
}
} else {
AFL_VERIFY(appendedToMerge.size());
AFL_VERIFY(currentToMerge.size());
appendedToMerge.emplace_back(currentToMerge.back());
}
context.Counters.OnCompactionHugeMemory(totalSumMemory, appendedToMerge.size());
currentToMerge = std::move(appendedToMerge);
}
}

if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) {
Expand Down Expand Up @@ -210,7 +165,8 @@ std::shared_ptr<TGeneralCompactColumnEngineChanges::IMemoryPredictor> TGeneralCo
ui64 TGeneralCompactColumnEngineChanges::TMemoryPredictorChunkedPolicy::AddPortion(const TPortionInfo::TConstPtr& portionInfo) {
SumMemoryFix += portionInfo->GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16)) + portionInfo->GetTotalBlobBytes();
SumMemoryRaw += portionInfo->GetTotalRawBytes();
return SumMemoryFix + std::min<ui64>(SumMemoryRaw, ((ui64)500 << 20));
return SumMemoryFix + std::min<ui64>(SumMemoryRaw,
NYDBTest::TControllers::GetColumnShardController()->GetConfig().GetMemoryLimitMergeOnCompactionRawData());
}

} // namespace NKikimr::NOlap::NCompaction
7 changes: 6 additions & 1 deletion ydb/core/tx/columnshard/engines/changes/general_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

namespace NKikimr::NOlap::NCompaction {

class TPortionToMerge;

class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges,
public NColumnShard::TMonitoringObjectsCounter<TGeneralCompactColumnEngineChanges> {
private:
Expand All @@ -15,7 +17,10 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges,
std::shared_ptr<NPrioritiesQueue::TAllocationGuard> PrioritiesAllocationGuard;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override;
NArrow::NMerger::TIntervalPositions CheckPoints;
void BuildAppendedPortionsByChunks(TConstructionContext& context, std::vector<TReadPortionInfoWithBlobs>&& portions) noexcept;

[[nodiscard]] std::vector<TWritePortionInfoWithBlobsResult> BuildAppendedPortionsByChunks(TConstructionContext& context,
std::vector<TPortionToMerge>&& portionsToMerge,
const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const std::shared_ptr<NArrow::NSplitter::TSerializationStats>& stats) noexcept;

std::shared_ptr<NArrow::TColumnFilter> BuildPortionFilter(const std::optional<NKikimr::NOlap::TGranuleShardingInfo>& shardingActual,
const std::shared_ptr<NArrow::TGeneralContainer>& batch, const TPortionInfo& pInfo, const THashSet<ui64>& portionsInUsage,
Expand Down
Loading

0 comments on commit 362d8a2

Please sign in to comment.