From 618900756e423a637660c7bd5d262551dd1d13ba Mon Sep 17 00:00:00 2001 From: Pavel Misko Date: Wed, 29 Jan 2025 18:10:27 +0100 Subject: [PATCH] issue-2277: move copying of the request buffer to TIORequestParserActor --- .../disk_agent/actors/io_request_parser.cpp | 48 ++++++++++++++-- .../storage/disk_agent/disk_agent_actor.cpp | 10 +--- .../storage/disk_agent/disk_agent_actor.h | 8 ++- .../disk_agent/disk_agent_actor_io.cpp | 55 ++++++++++++++++++- .../storage/disk_agent/disk_agent_private.h | 15 +++++ .../storage/disk_agent/disk_agent_state.cpp | 11 +++- .../storage/disk_agent/disk_agent_state.h | 5 ++ .../storage/disk_agent/hash_table_storage.cpp | 10 +++- 8 files changed, 142 insertions(+), 20 deletions(-) diff --git a/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp b/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp index dba36fbf162..a58062f90d9 100644 --- a/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/actors/io_request_parser.cpp @@ -32,12 +32,10 @@ class TIORequestParserActor: public TActor STFUNC(StateWork) { switch (ev->GetTypeRewrite()) { - HFunc(NActors::TEvents::TEvPoisonPill, HandlePoisonPill); + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); case TEvDiskAgent::EvWriteDeviceBlocksRequest: - HandleRequest( - ev, - TEvDiskAgentPrivate::EvParsedWriteDeviceBlocksRequest); + HandleWriteDeviceBlocks(ev); break; case TEvDiskAgent::EvReadDeviceBlocksRequest: @@ -69,6 +67,48 @@ class TIORequestParserActor: public TActor Die(ctx); } + void HandleWriteDeviceBlocks(TAutoPtr& ev) + { + auto request = std::make_unique< + TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest>(); + + // parse protobuf + auto* msg = ev->Get(); + request->Record.Swap(&msg->Record); + + ui64 bytesCount = 0; + for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) { + bytesCount += buffer.size(); + } + + request->Storage.reset( + static_cast( + std::aligned_alloc(request->Record.GetBlockSize(), bytesCount)), + std::free); + + char* dst = request->Storage.get(); + for (const auto& buffer: request->Record.GetBlocks().GetBuffers()) { + std::memcpy(dst, buffer.data(), buffer.size()); + dst += buffer.size(); + } + + request->ByteCount = bytesCount; + request->Record.ClearBlocks(); + + auto newEv = std::make_unique( + ev->Recipient, + ev->Sender, + request.release(), + ev->Flags, + ev->Cookie, + nullptr, // forwardOnNondelivery + std::move(ev->TraceId)); + + newEv->Rewrite(newEv->Type, Owner); + + ActorContext().Send(std::move(newEv)); + } + template void HandleRequest(TAutoPtr& ev, ui32 typeRewrite) { diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp index c10c5de69c3..9feac6ccbc8 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.cpp @@ -372,13 +372,9 @@ STFUNC(TDiskAgentActor::StateWork) TEvDiskAgent::TEvDisableConcreteAgentRequest, HandleDisableConcreteAgent); - case TEvDiskAgentPrivate::EvParsedWriteDeviceBlocksRequest: - HandleWriteDeviceBlocks( - *reinterpret_cast< - typename TEvDiskAgent::TEvWriteDeviceBlocksRequest::TPtr*>( - &ev), - ActorContext()); - break; + HFunc( + TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest, + HandleParsedWriteDeviceBlocks); case TEvDiskAgentPrivate::EvParsedReadDeviceBlocksRequest: HandleReadDeviceBlocks( diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h index 5854307216c..768c5abce1d 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor.h @@ -137,10 +137,10 @@ class TDiskAgentActor final void SendRegisterRequest(const NActors::TActorContext& ctx); - template + template void PerformIO( const NActors::TActorContext& ctx, - const typename TMethod::TRequest::TPtr& ev, + const TEv& ev, TOp operation); template @@ -225,6 +225,10 @@ class TDiskAgentActor final const TEvDiskAgentPrivate::TEvCancelSuspensionRequest::TPtr& ev, const NActors::TActorContext& ctx); + void HandleParsedWriteDeviceBlocks( + const TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest::TPtr& ev, + const NActors::TActorContext& ctx); + bool HandleRequests(STFUNC_SIG); bool RejectRequests(STFUNC_SIG); diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp index d5f55290426..ed09227e6f3 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_actor_io.cpp @@ -17,6 +17,24 @@ namespace { //////////////////////////////////////////////////////////////////////////////// +ui64 GetVolumeRequestId( + const TEvDiskAgentPrivate::TParsedWriteDeviceBlocksRequest& request) +{ + return request.Record.GetVolumeRequestId(); +} + +TBlockRange64 BuildRequestBlockRange( + const TEvDiskAgentPrivate::TParsedWriteDeviceBlocksRequest& request) +{ + Y_ABORT_UNLESS(request.ByteCount % request.Record.GetBlockSize() == 0); + + return TBlockRange64::WithLength( + request.Record.GetStartIndex(), + request.ByteCount / request.Record.GetBlockSize()); +} + +//////////////////////////////////////////////////////////////////////////////// + template constexpr bool IsWriteDeviceMethod = std::is_same_v || @@ -113,10 +131,10 @@ std::pair HandleException( //////////////////////////////////////////////////////////////////////////////// -template +template void TDiskAgentActor::PerformIO( const TActorContext& ctx, - const typename TMethod::TRequest::TPtr& ev, + const TEv& ev, TOp operation) { auto* msg = ev->Get(); @@ -327,6 +345,39 @@ void TDiskAgentActor::HandleWriteDeviceBlocks( PerformIO(ctx, ev, &TDiskAgentState::Write); } +void TDiskAgentActor::HandleParsedWriteDeviceBlocks( + const TEvDiskAgentPrivate::TEvParsedWriteDeviceBlocksRequest::TPtr& ev, + const TActorContext& ctx) +{ + BLOCKSTORE_DISK_AGENT_COUNTER(WriteDeviceBlocks); + + using TMethod = TEvDiskAgent::TWriteDeviceBlocksMethod; + + if (CheckIntersection(ctx, ev)) { + return; + } + + auto* msg = ev->Get(); + + PerformIO( + ctx, + ev, + [storage = std::move(msg->Storage), byteCount = msg->ByteCount]( + TDiskAgentState& self, + TInstant now, + NProto::TWriteDeviceBlocksRequest request) + { + auto future = self.WriteBuffer( + now, + std::move(request), + {storage.get(), byteCount}); + + future.Subscribe([storage](const auto&) { Y_UNUSED(storage); }); + + return future; + }); +} + void TDiskAgentActor::HandleZeroDeviceBlocks( const TEvDiskAgent::TEvZeroDeviceBlocksRequest::TPtr& ev, const TActorContext& ctx) diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h b/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h index 94cfa9f17d0..bf3ea889612 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_private.h @@ -157,6 +157,17 @@ struct TEvDiskAgentPrivate struct TCancelSuspensionRequest {}; + // + // ParsedWriteDeviceBlocksRequest + // + + struct TParsedWriteDeviceBlocksRequest + { + NProto::TWriteDeviceBlocksRequest Record; + std::shared_ptr Storage; + ui64 ByteCount = 0; + }; + // // Events declaration // @@ -207,6 +218,10 @@ struct TEvDiskAgentPrivate TCancelSuspensionRequest, EvCancelSuspensionRequest>; + using TEvParsedWriteDeviceBlocksRequest = TRequestEvent< + TParsedWriteDeviceBlocksRequest, + EvParsedWriteDeviceBlocksRequest>; + BLOCKSTORE_DECLARE_EVENTS(UpdateSessionCache) }; diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp index 0c955b57742..d64d3067b44 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.cpp @@ -644,6 +644,14 @@ void TDiskAgentState::WriteProfileLog( TFuture TDiskAgentState::Write( TInstant now, NProto::TWriteDeviceBlocksRequest request) +{ + return WriteBuffer(now, std::move(request), {}); +} + +TFuture TDiskAgentState::WriteBuffer( + TInstant now, + NProto::TWriteDeviceBlocksRequest request, + TStringBuf buffer) { CheckIfDeviceIsDisabled( request.GetDeviceUUID(), @@ -672,8 +680,7 @@ TFuture TDiskAgentState::Write( MakeIntrusive(), std::move(writeRequest), request.GetBlockSize(), - {} // no data buffer - ); + buffer); return result.Apply( [] (const auto& future) { diff --git a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.h b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.h index e36ce10810b..ec1e5a531e5 100644 --- a/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.h +++ b/cloud/blockstore/libs/storage/disk_agent/disk_agent_state.h @@ -103,6 +103,11 @@ class TDiskAgentState TInstant now, NProto::TWriteDeviceBlocksRequest request); + NThreading::TFuture WriteBuffer( + TInstant now, + NProto::TWriteDeviceBlocksRequest request, + TStringBuf buffer); + NThreading::TFuture WriteZeroes( TInstant now, NProto::TZeroDeviceBlocksRequest request); diff --git a/cloud/blockstore/libs/storage/disk_agent/hash_table_storage.cpp b/cloud/blockstore/libs/storage/disk_agent/hash_table_storage.cpp index 7cf9cc2e135..7500f974a9c 100644 --- a/cloud/blockstore/libs/storage/disk_agent/hash_table_storage.cpp +++ b/cloud/blockstore/libs/storage/disk_agent/hash_table_storage.cpp @@ -109,7 +109,7 @@ struct THashTableStorage final return MakeFuture(std::move(response)); } - auto sglist = guard.Get(); + const auto& sglist = guard.Get(); auto b = request->GetStartIndex(); auto e = request->GetStartIndex() + request->BlocksCount; @@ -120,11 +120,15 @@ struct THashTableStorage final return MakeFuture(std::move(response)); } - while (b < e) { - Blocks[b] = sglist[b - request->GetStartIndex()].AsStringBuf(); + TSgList dst(request->BlocksCount); + while (b < e) { + auto& block = Blocks[b]; + block.resize(request->BlockSize); + dst[b - request->GetStartIndex()] = {block.data(), block.size()}; ++b; } + SgListCopy(sglist, dst); return MakeFuture(std::move(response)); }