Skip to content

Commit

Permalink
Merge pull request #636 from Altinity/project-antalya-24.12.2_metadat…
Browse files Browse the repository at this point in the history
…a_cache_fixes_1

use parquet metadata cache for parquetmetadata format as well
  • Loading branch information
Enmk authored Mar 1, 2025
2 parents 799e2c4 + 8abf0b1 commit dc3ad7f
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 32 deletions.
9 changes: 9 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@
# include <azure/core/diagnostics/logger.hpp>
#endif

#if USE_PARQUET
# include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
#endif


#include <incbin.h>
/// A minimal file used when the server is run without installation
Expand Down Expand Up @@ -286,6 +290,7 @@ namespace ServerSetting
extern const ServerSettingsUInt64 primary_index_cache_size;
extern const ServerSettingsDouble primary_index_cache_size_ratio;
extern const ServerSettingsBool use_legacy_mongodb_integration;
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
}

}
Expand Down Expand Up @@ -2234,6 +2239,10 @@ try
if (dns_cache_updater)
dns_cache_updater->start();

#if USE_PARQUET
ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]);
#endif

/// Set current database name before loading tables and databases because
/// system logs may copy global context.
std::string default_database = server_settings[ServerSetting::default_database].toString();
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class IInputFormat : public SourceWithKeyCondition
void needOnlyCount() { need_only_count = true; }

/// Set additional info/key/id related to underlying storage of the ReadBuffer
virtual void setStorageRelatedUniqueKey(const ServerSettings & /* server_settings */, const Settings & /*settings*/, const String & /*key*/) {}
virtual void setStorageRelatedUniqueKey(const Settings & /*settings*/, const String & /*key*/) {}

protected:
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }
Expand Down
15 changes: 3 additions & 12 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <Common/FieldVisitorsAccurateComparison.h>
#include <Processors/Formats/Impl/Parquet/ParquetRecordReader.h>
#include <Processors/Formats/Impl/Parquet/parquetBloomFilterHash.h>
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
#include <Interpreters/convertFieldToType.h>

namespace ProfileEvents
Expand Down Expand Up @@ -522,15 +523,6 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
return hyperrectangle;
}

ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_size_bytes)
: CacheBase(max_size_bytes) {}

ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_size_bytes)
{
static ParquetFileMetaDataCache instance(max_size_bytes);
return &instance;
}

std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::readMetadataFromFile()
{
createArrowFileIfNotCreated();
Expand All @@ -547,7 +539,7 @@ std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::getFileMetaData(
return readMetadataFromFile();
}

auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_size_bytes)->getOrSet(
auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet(
metadata_cache.key,
[&]()
{
Expand Down Expand Up @@ -834,11 +826,10 @@ void ParquetBlockInputFormat::initializeIfNeeded()
}
}

void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_)
void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_)
{
metadata_cache.key = key_;
metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache];
metadata_cache.max_size_bytes = server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size];
}

void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx)
Expand Down
18 changes: 4 additions & 14 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#include "config.h"
#if USE_PARQUET

#include <Common/CacheBase.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h>
Expand Down Expand Up @@ -73,7 +72,7 @@ class ParquetBlockInputFormat : public IInputFormat

size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; }

void setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) override;
void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override;

private:
Chunk read() override;
Expand Down Expand Up @@ -350,8 +349,9 @@ class ParquetBlockInputFormat : public IInputFormat
{
String key;
bool use_cache = false;
UInt64 max_size_bytes{0};
} metadata_cache;
};

Cache metadata_cache;
};

class ParquetSchemaReader : public ISchemaReader
Expand All @@ -370,16 +370,6 @@ class ParquetSchemaReader : public ISchemaReader
std::shared_ptr<parquet::FileMetaData> metadata;
};

class ParquetFileMetaDataCache : public CacheBase<String, parquet::FileMetaData>
{
public:
static ParquetFileMetaDataCache * instance(UInt64 max_size_bytes);
void clear() {}

private:
ParquetFileMetaDataCache(UInt64 max_size_bytes);
};

}

#endif
20 changes: 20 additions & 0 deletions src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>

#ifdef USE_PARQUET

namespace DB
{

ParquetFileMetaDataCache::ParquetFileMetaDataCache()
: CacheBase<String, parquet::FileMetaData>(0)
{}

ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance()
{
static ParquetFileMetaDataCache instance;
return &instance;
}

}

#endif
30 changes: 30 additions & 0 deletions src/Processors/Formats/Impl/ParquetFileMetaDataCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include "config.h"

#if USE_PARQUET

namespace parquet
{

class FileMetaData;

}

#include <Common/CacheBase.h>

namespace DB
{

class ParquetFileMetaDataCache : public CacheBase<String, parquet::FileMetaData>
{
public:
static ParquetFileMetaDataCache * instance();

private:
ParquetFileMetaDataCache();
};

}

#endif
53 changes: 49 additions & 4 deletions src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,17 @@
#include <parquet/statistics.h>
#include "ArrowBufferedStreams.h"
#include <DataTypes/NestedUtils.h>
#include <Core/Settings.h>
#include <Common/ProfileEvents.h>
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>


namespace ProfileEvents
{
extern const Event ParquetMetaDataCacheHits;
extern const Event ParquetMetaDataCacheMisses;
}

namespace DB
{

Expand All @@ -32,6 +41,11 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}

namespace Setting
{
extern const SettingsBool input_format_parquet_use_metadata_cache;
}

static NamesAndTypesList getHeaderForParquetMetadata()
{
NamesAndTypesList names_and_types{
Expand Down Expand Up @@ -129,10 +143,35 @@ void checkHeader(const Block & header)
static std::shared_ptr<parquet::FileMetaData> getFileMetadata(
ReadBuffer & in,
const FormatSettings & format_settings,
std::atomic<int> & is_stopped)
std::atomic<int> & is_stopped,
ParquetMetadataInputFormat::Cache metadata_cache)
{
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
return parquet::ReadMetaData(arrow_file);
// in-memory cache is not implemented for local file operations, only for remote files
// there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation
// and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key
if (!metadata_cache.use_cache || metadata_cache.key.empty())
{
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
return parquet::ReadMetaData(arrow_file);
}

auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet(
metadata_cache.key,
[&]()
{
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
return parquet::ReadMetaData(arrow_file);
}
);

if (loaded)
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses);
else
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits);

return parquet_file_metadata;


}

ParquetMetadataInputFormat::ParquetMetadataInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
Expand All @@ -147,7 +186,7 @@ Chunk ParquetMetadataInputFormat::read()
if (done)
return res;

auto metadata = getFileMetadata(*in, format_settings, is_stopped);
auto metadata = getFileMetadata(*in, format_settings, is_stopped, metadata_cache);

const auto & header = getPort().getHeader();
auto names_and_types = getHeaderForParquetMetadata();
Expand Down Expand Up @@ -486,6 +525,12 @@ void ParquetMetadataInputFormat::resetParser()
done = false;
}

void ParquetMetadataInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_)
{
metadata_cache.key = key_;
metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache];
}

ParquetMetadataSchemaReader::ParquetMetadataSchemaReader(ReadBuffer & in_)
: ISchemaReader(in_)
{
Expand Down
10 changes: 10 additions & 0 deletions src/Processors/Formats/Impl/ParquetMetadataInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ class ParquetMetadataInputFormat : public IInputFormat

void resetParser() override;

void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override;

struct Cache
{
String key;
bool use_cache = false;
};

private:
Chunk read() override;

Expand All @@ -78,6 +86,8 @@ class ParquetMetadataInputFormat : public IInputFormat
const FormatSettings format_settings;
bool done = false;
std::atomic<int> is_stopped{0};

Cache metadata_cache;
};

class ParquetMetadataSchemaReader : public ISchemaReader
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
input_format->needOnlyCount();

if (!object_info->getPath().empty())
input_format->setStorageRelatedUniqueKey(context_->getServerSettings(), context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag);
input_format->setStorageRelatedUniqueKey(context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag);

builder.init(Pipe(input_format));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
10
10
10
10
10
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ SELECT COUNT(*)
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
SETTINGS input_format_parquet_use_metadata_cache=1, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache';

SELECT COUNT(*)
FROM s3(s3_conn, filename = 'test_03262_*', format = ParquetMetadata)
SETTINGS input_format_parquet_use_metadata_cache=1, log_comment='test_03262_parquet_metadata_format_metadata_cache';

SYSTEM FLUSH LOGS;

SELECT ProfileEvents['ParquetMetaDataCacheHits']
Expand All @@ -25,4 +29,11 @@ AND type = 'QueryFinish'
ORDER BY event_time desc
LIMIT 1;

SELECT ProfileEvents['ParquetMetaDataCacheHits']
FROM system.query_log
where log_comment = 'test_03262_parquet_metadata_format_metadata_cache'
AND type = 'QueryFinish'
ORDER BY event_time desc
LIMIT 1;

DROP TABLE t_parquet_03262;

0 comments on commit dc3ad7f

Please sign in to comment.