Skip to content

Commit

Permalink
some refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Jan 8, 2025
1 parent 0a07a78 commit cd4a1fe
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.PeerRpcHandler;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler;
import tech.pegasys.teku.networking.p2p.network.PeerAddress;
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason;
Expand All @@ -43,15 +43,11 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
import tech.pegasys.teku.spec.constants.NetworkConstants;

public class LibP2PPeer implements Peer {
private static final Logger LOG = LogManager.getLogger();

private record RpcHandlerAndOutgoingQueue(
RpcHandler<?, ?, ?> rpcHandler, ThrottlingTaskQueue outgoingQueue) {}

private final Map<RpcMethod<?, ?, ?>, RpcHandlerAndOutgoingQueue> rpcHandlersAndOutgoingQueues;
private final Map<RpcMethod<?, ?, ?>, PeerRpcHandler<?, ?, ?>> rpcHandlers;
private final ReputationManager reputationManager;
private final Function<PeerId, Double> peerScoreFunction;
private final Connection connection;
Expand All @@ -76,16 +72,9 @@ public LibP2PPeer(
final ReputationManager reputationManager,
final Function<PeerId, Double> peerScoreFunction) {
this.connection = connection;
this.rpcHandlersAndOutgoingQueues =
this.rpcHandlers =
rpcHandlers.stream()
.collect(
Collectors.toMap(
RpcHandler::getRpcMethod,
handler ->
new RpcHandlerAndOutgoingQueue(
handler,
// https://github.com/ethereum/consensus-specs/pull/3767
ThrottlingTaskQueue.create(NetworkConstants.MAX_CONCURRENT_REQUESTS))));
.collect(Collectors.toMap(RpcHandler::getRpcMethod, PeerRpcHandler::new));
this.reputationManager = reputationManager;
this.peerScoreFunction = peerScoreFunction;
this.peerId = connection.secureSession().getRemoteId();
Expand Down Expand Up @@ -217,19 +206,15 @@ SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
final RpcMethod<TOutgoingHandler, TRequest, RespHandler> rpcMethod,
final TRequest request,
final RespHandler responseHandler) {
final RpcHandlerAndOutgoingQueue rpcHandlerAndOutgoingQueue =
rpcHandlersAndOutgoingQueues.get(rpcMethod);
if (rpcHandlerAndOutgoingQueue == null) {
@SuppressWarnings("unchecked")
final PeerRpcHandler<TOutgoingHandler, TRequest, RespHandler> rpcHandler =
(PeerRpcHandler<TOutgoingHandler, TRequest, RespHandler>) rpcHandlers.get(rpcMethod);
if (rpcHandler == null) {
throw new IllegalArgumentException(
"Unknown rpc method invoked: " + String.join(",", rpcMethod.getIds()));
}

@SuppressWarnings("unchecked")
final RpcHandler<TOutgoingHandler, TRequest, RespHandler> rpcHandler =
(RpcHandler<TOutgoingHandler, TRequest, RespHandler>) rpcHandlerAndOutgoingQueue.rpcHandler;

return rpcHandlerAndOutgoingQueue.outgoingQueue.queueTask(
() -> rpcHandler.sendRequest(connection, request, responseHandler));
return rpcHandler.sendRequest(connection, request, responseHandler);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Consensys Software Inc., 2025
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.networking.p2p.libp2p.rpc;

import io.libp2p.core.Connection;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.ThrottlingTaskQueue;
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
import tech.pegasys.teku.spec.constants.NetworkConstants;

public class PeerRpcHandler<
TOutgoingHandler extends RpcRequestHandler,
TRequest,
TRespHandler extends RpcResponseHandler<?>> {

private final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate;
private final ThrottlingTaskQueue outgoingQueue =
ThrottlingTaskQueue.create(NetworkConstants.MAX_CONCURRENT_REQUESTS);

public PeerRpcHandler(final RpcHandler<TOutgoingHandler, TRequest, TRespHandler> delegate) {
this.delegate = delegate;
}

public SafeFuture<RpcStreamController<TOutgoingHandler>> sendRequest(
final Connection connection, final TRequest request, final TRespHandler responseHandler) {
return outgoingQueue.queueTask(
() -> delegate.sendRequest(connection, request, responseHandler));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,49 +11,42 @@
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.networking.p2p.libp2p;
package tech.pegasys.teku.networking.p2p.libp2p.rpc;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.libp2p.core.Connection;
import io.libp2p.core.security.SecureChannel.Session;
import java.util.List;
import java.util.stream.IntStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler;
import tech.pegasys.teku.networking.p2p.reputation.ReputationManager;
import tech.pegasys.teku.networking.p2p.rpc.RpcMethod;
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcStreamController;
import tech.pegasys.teku.spec.constants.NetworkConstants;

public class LibP2PPeerTest {
public class PeerRpcHandlerTest {

private final Connection connection = mock(Connection.class);

@SuppressWarnings("unchecked")
private final RpcHandler<RpcRequestHandler, Object, RpcResponseHandler<Void>> rpcHandler =
private final RpcHandler<RpcRequestHandler, Object, RpcResponseHandler<Void>> delegate =
mock(RpcHandler.class);

@SuppressWarnings("unchecked")
private final RpcMethod<RpcRequestHandler, Object, RpcResponseHandler<Void>> rpcMethod =
mock(RpcMethod.class);

private LibP2PPeer libP2PPeer;
private PeerRpcHandler<RpcRequestHandler, Object, RpcResponseHandler<Void>> peerRpcHandler;

@BeforeEach
public void init() {
when(rpcHandler.getRpcMethod()).thenReturn(rpcMethod);
final Session secureSession = mock(Session.class);
when(connection.secureSession()).thenReturn(secureSession);
when(connection.closeFuture()).thenReturn(new SafeFuture<>());
libP2PPeer =
new LibP2PPeer(connection, List.of(rpcHandler), ReputationManager.NOOP, peer -> 0.0);
when(delegate.getRpcMethod()).thenReturn(rpcMethod);
peerRpcHandler = new PeerRpcHandler<>(delegate);
}

@SuppressWarnings("unchecked")
Expand All @@ -67,17 +60,17 @@ public void sendRequest_throttlesRequests() {
__ -> {
final SafeFuture<RpcStreamController<RpcRequestHandler>> future =
new SafeFuture<>();
when(rpcHandler.sendRequest(connection, null, null)).thenReturn(future);
libP2PPeer.sendRequest(rpcMethod, null, null);
when(delegate.sendRequest(connection, null, null)).thenReturn(future);
peerRpcHandler.sendRequest(connection, null, null);
return future;
})
.toList();

when(rpcHandler.sendRequest(connection, null, null))
when(peerRpcHandler.sendRequest(connection, null, null))
.thenReturn(SafeFuture.completedFuture(mock(RpcStreamController.class)));

final SafeFuture<RpcStreamController<RpcRequestHandler>> throttledRequest =
libP2PPeer.sendRequest(rpcMethod, null, null);
peerRpcHandler.sendRequest(connection, null, null);

// completed request should be throttled
assertThat(throttledRequest).isNotDone();
Expand Down

0 comments on commit cd4a1fe

Please sign in to comment.