Skip to content

Commit

Permalink
IOParserActorAllocateStorageEnabled
Browse files Browse the repository at this point in the history
  • Loading branch information
sharpeye committed Jan 30, 2025
1 parent ffeebf5 commit 4780db4
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 38 deletions.
4 changes: 4 additions & 0 deletions cloud/blockstore/config/disk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ message TDiskAgentConfig

// Settings for traffic shaping.
optional TDiskAgentThrottlingConfig ThrottlingConfig = 38;

// If enabled, IOParserActor allocates a storage buffer and copies the
// request data into it.
optional uint32 IOParserActorAllocateStorageEnabled = 39;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
21 changes: 16 additions & 5 deletions cloud/blockstore/libs/storage/core/proto_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,16 +413,22 @@ TBlockRange64 BuildRequestBlockRange(

TBlockRange64 BuildRequestBlockRange(
const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request)
{
return BuildRequestBlockRange(request.Record);
}

TBlockRange64 BuildRequestBlockRange(
const NProto::TWriteDeviceBlocksRequest& request)
{
ui64 totalSize = 0;
for (const auto& buffer: request.Record.GetBlocks().GetBuffers()) {
for (const auto& buffer: request.GetBlocks().GetBuffers()) {
totalSize += buffer.length();
}
Y_ABORT_UNLESS(totalSize % request.Record.GetBlockSize() == 0);
Y_ABORT_UNLESS(totalSize % request.GetBlockSize() == 0);

return TBlockRange64::WithLength(
request.Record.GetStartIndex(),
totalSize / request.Record.GetBlockSize());
request.GetStartIndex(),
totalSize / request.GetBlockSize());
}

TBlockRange64 BuildRequestBlockRange(
Expand All @@ -436,7 +442,12 @@ TBlockRange64 BuildRequestBlockRange(
ui64 GetVolumeRequestId(
const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request)
{
return request.Record.GetVolumeRequestId();
return GetVolumeRequestId(request.Record);
}

ui64 GetVolumeRequestId(const NProto::TWriteDeviceBlocksRequest& request)
{
return request.GetVolumeRequestId();
}

ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request)
Expand Down
4 changes: 4 additions & 0 deletions cloud/blockstore/libs/storage/core/proto_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,14 @@ TBlockRange64 BuildRequestBlockRange(
TBlockRange64 BuildRequestBlockRange(
const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request);

TBlockRange64 BuildRequestBlockRange(
const NProto::TWriteDeviceBlocksRequest& request);

TBlockRange64 BuildRequestBlockRange(
const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request);

ui64 GetVolumeRequestId(const TEvDiskAgent::TEvWriteDeviceBlocksRequest& request);
ui64 GetVolumeRequestId(const NProto::TWriteDeviceBlocksRequest& request);
ui64 GetVolumeRequestId(const TEvDiskAgent::TEvZeroDeviceBlocksRequest& request);

TString LogDevices(const TVector<NProto::TDeviceConfig>& devices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,23 @@ class TIORequestParserActor: public TActor<TIORequestParserActor>
auto* msg = ev->Get<TEvDiskAgent::TEvWriteDeviceBlocksRequest>();
request->Record.Swap(&msg->Record);

ui64 bytesCount = 0;
for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) {
bytesCount += buffer.size();
if (Allocator) {
ui64 bytesCount = 0;
for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) {
bytesCount += buffer.size();
}

request->Storage = Allocator(bytesCount);
request->StorageSize = bytesCount;

char* dst = request->Storage.get();
for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) {
std::memcpy(dst, buffer.data(), buffer.size());
dst += buffer.size();
}
request->Record.ClearBlocks();
}

request->Storage =
Allocator(request->Record.GetBlockSize(), bytesCount);

char* dst = request->Storage.get();
for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) {
std::memcpy(dst, buffer.data(), buffer.size());
dst += buffer.size();
}

request->ByteCount = bytesCount;
request->Record.ClearBlocks();

auto newEv = std::make_unique<IEventHandle>(
ev->Recipient,
ev->Sender,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace NCloud::NBlockStore::NStorage::NDiskAgent {
////////////////////////////////////////////////////////////////////////////////

using TStorageBufferAllocator =
std::function<std::shared_ptr<char>(ui32 blockSize, ui64 bytesCount)>;
std::function<std::shared_ptr<char>(ui64 bytesCount)>;

std::unique_ptr<NActors::IActor> CreateIORequestParserActor(
const NActors::TActorId& owner,
Expand Down
25 changes: 16 additions & 9 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,24 @@ void TDiskAgentActor::HandleInitAgentCompleted(
ctx,
TBlockStoreComponents::DISK_AGENT,
"Create " << count << " IORequestParserActor actors");

NDiskAgent::TStorageBufferAllocator allocator;
if (AgentConfig->GetIOParserActorAllocateStorageEnabled() &&
AgentConfig->GetBackend() == NProto::DISK_AGENT_BACKEND_AIO)
{
allocator = [](ui64 byteCount)
{
return std::shared_ptr<char>(
static_cast<char*>(
std::aligned_alloc(DefaultBlockSize, byteCount)),
std::free);
};
}

IOParserActors.reserve(count);
for (ui32 i = 0; i != count; ++i) {
auto actor = NDiskAgent::CreateIORequestParserActor(
ctx.SelfID,
[](ui32 blockSize, ui64 byteCount)
{
return std::shared_ptr<char>(
static_cast<char*>(
std::aligned_alloc(blockSize, byteCount)),
std::free);
});
auto actor =
NDiskAgent::CreateIORequestParserActor(ctx.SelfID, allocator);

IOParserActors.push_back(ctx.Register(
actor.release(),
Expand Down
24 changes: 17 additions & 7 deletions cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ ui64 GetVolumeRequestId(
TBlockRange64 BuildRequestBlockRange(
const TEvDiskAgentPrivate::TParsedWriteDeviceBlocksRequest& request)
{
Y_ABORT_UNLESS(request.ByteCount % request.Record.GetBlockSize() == 0);
if (!request.StorageSize) {
return NStorage::BuildRequestBlockRange(request.Record);
}

Y_ABORT_UNLESS(request.StorageSize % request.Record.GetBlockSize() == 0);

return TBlockRange64::WithLength(
request.Record.GetStartIndex(),
request.ByteCount / request.Record.GetBlockSize());
request.StorageSize / request.Record.GetBlockSize());
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -357,30 +361,36 @@ void TDiskAgentActor::HandleParsedWriteDeviceBlocks(
return;
}

auto* msg = ev->Get();

if (!msg->Storage) {
PerformIO<TMethod>(ctx, ev, &TDiskAgentState::Write);
return;
}

// Attach storage to NProto::TWriteBlocksRequest
struct TWriteBlocksRequestWithStorage
: NProto::TWriteBlocksRequest
{
TStorageBuffer Storage;
};

auto* msg = ev->Get();

PerformIO<TMethod>(
ctx,
ev,
[storage = std::move(msg->Storage), byteCount = msg->ByteCount](
[storage = std::move(msg->Storage), storageSize = msg->StorageSize](
TDiskAgentState& self,
TInstant now,
NProto::TWriteDeviceBlocksRequest request) mutable
{
auto writeRequest =
std::make_shared<TWriteBlocksRequestWithStorage>();
writeRequest->Storage = std::move(storage);
writeRequest->MutableHeaders()->Swap(request.MutableHeaders());
writeRequest->MutableBlocks()->Swap(request.MutableBlocks());
writeRequest->SetStartIndex(request.GetStartIndex());
writeRequest->Storage = std::move(storage);

TStringBuf buffer{writeRequest->Storage.get(), byteCount};
TStringBuf buffer{writeRequest->Storage.get(), storageSize};

return self.WriteBlocks(
now,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4847,6 +4847,7 @@ Y_UNIT_TEST_SUITE(TDiskAgentTest)
auto config = DiskAgentConfig({deviceId});
config.SetIOParserActorCount(4);
config.SetOffloadAllIORequestsParsingEnabled(true);
config.SetIOParserActorAllocateStorageEnabled(true);

return config;
}();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ struct TEvDiskAgentPrivate
{
NProto::TWriteDeviceBlocksRequest Record;
TStorageBuffer Storage;
ui64 ByteCount = 0;
ui64 StorageSize = 0;
};

//
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/disk_agent/model/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ namespace {
xxx(MaxAIOContextEvents, ui32, 1024 )\
xxx(PathsPerFileIOService, ui32, 0 )\
xxx(DisableBrokenDevices, bool, 0 )\
xxx(IOParserActorAllocateStorageEnabled, bool, 0 )\
// BLOCKSTORE_AGENT_CONFIG

#define BLOCKSTORE_DECLARE_CONFIG(name, type, value) \
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/disk_agent/model/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class TDiskAgentConfig

ui32 GetIOParserActorCount() const;
bool GetOffloadAllIORequestsParsingEnabled() const;
bool GetIOParserActorAllocateStorageEnabled() const;
bool GetDisableNodeBrokerRegistrationOnDevicelessAgent() const;
ui32 GetMaxAIOContextEvents() const;
ui32 GetPathsPerFileIOService() const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ NProto::TDiskAgentConfig CreateDefaultAgentConfig()

config.SetIOParserActorCount(4);
config.SetOffloadAllIORequestsParsingEnabled(true);
config.SetIOParserActorAllocateStorageEnabled(true);

return config;
}
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/tests/python/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ def generate_disk_agent_txt(
config.ShutdownTimeout = 0
config.IOParserActorCount = 4
config.OffloadAllIORequestsParsingEnabled = True
config.IOParserActorAllocateStorageEnabled = True
config.PathsPerFileIOService = 1

if device_erase_method is not None:
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/tests/python/lib/nonreplicated_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ def setup_disk_agent_config(
config.ShutdownTimeout = get_shutdown_agent_interval()
config.IOParserActorCount = 4
config.OffloadAllIORequestsParsingEnabled = True
config.IOParserActorAllocateStorageEnabled = True
config.PathsPerFileIOService = 2

if cached_sessions_path is not None:
Expand Down

0 comments on commit 4780db4

Please sign in to comment.