Skip to content

Commit

Permalink
Add option to set X-User header value for forwarded requests (#1425)
Browse files Browse the repository at this point in the history
Fixes #1422.
  • Loading branch information
kuznetsss authored Jun 11, 2024
1 parent 9d3b4f0 commit 56ab943
Show file tree
Hide file tree
Showing 20 changed files with 205 additions and 62 deletions.
2 changes: 1 addition & 1 deletion docs/configure-clio.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ If you're running Clio and `rippled` on separate machines, in addition to uncomm

2. Open a public, unencrypted WebSocket port on your `rippled` server.

3. In the `rippled` config, change the IP specified for `secure_gateway`, under the `port_grpc` section, to the IP of your Clio server. This entry can take the form of a comma-separated list if you are running multiple Clio nodes.
3. In the `rippled` config, change the IP specified for `secure_gateway`, under the `port_grpc` and websocket server sections, to the IP of your Clio server. This entry can take the form of a comma-separated list if you are running multiple Clio nodes.

## Ledger sequence

Expand Down
2 changes: 1 addition & 1 deletion docs/examples/config/example-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"admin_password": "xrp",
// If local_admin is true, Clio will consider requests come from 127.0.0.1 as admin requests
// It's true by default unless admin_password is set,'local_admin' : true and 'admin_password' can not be set at the same time
"local_amdin": false
"local_admin": false
},
// Time in seconds for graceful shutdown. Defaults to 10 seconds. Not fully implemented yet.
"graceful_period": 10.0,
Expand Down
2 changes: 1 addition & 1 deletion src/etl/ETLState.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct ETLState {
fetchETLStateFromSource(Forward& source) noexcept
{
auto const serverInfoRippled = data::synchronous([&source](auto yield) {
return source.forwardToRippled({{"command", "server_info"}}, std::nullopt, yield);
return source.forwardToRippled({{"command", "server_info"}}, std::nullopt, {}, yield);
});

if (serverInfoRippled)
Expand Down
10 changes: 8 additions & 2 deletions src/etl/LoadBalancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ LoadBalancer::LoadBalancer(
chooseForwardingSource();
},
[this]() { chooseForwardingSource(); },
[this]() { forwardingCache_->invalidate(); }
[this]() {
if (forwardingCache_.has_value())
forwardingCache_->invalidate();
}
);

// checking etl node validity
Expand Down Expand Up @@ -213,6 +216,7 @@ std::optional<boost::json::object>
LoadBalancer::forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
bool isAdmin,
boost::asio::yield_context yield
)
{
Expand All @@ -227,9 +231,11 @@ LoadBalancer::forwardToRippled(

auto numAttempts = 0u;

auto xUserValue = isAdmin ? ADMIN_FORWARDING_X_USER_VALUE : USER_FORWARDING_X_USER_VALUE;

std::optional<boost::json::object> response;
while (numAttempts < sources_.size()) {
if (auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, yield)) {
if (auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, xUserValue, yield)) {
response = std::move(res);
break;
}
Expand Down
18 changes: 16 additions & 2 deletions src/etl/LoadBalancer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#pragma once

#include "data/BackendInterface.hpp"
#include "etl/ETLHelpers.hpp"
#include "etl/ETLState.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/Source.hpp"
#include "etl/impl/ForwardingCache.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
Expand All @@ -44,7 +44,7 @@
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <string_view>
#include <vector>

namespace etl {
Expand All @@ -68,13 +68,25 @@ class LoadBalancer {
util::Logger log_{"ETL"};
// Forwarding cache must be destroyed after sources because sources have a callback to invalidate cache
std::optional<impl::ForwardingCache> forwardingCache_;
std::optional<std::string> forwardingXUserValue_;

std::vector<SourcePtr> sources_;
std::optional<ETLState> etlState_;
std::uint32_t downloadRanges_ =
DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
std::atomic_bool hasForwardingSource_{false};

public:
/**
* @brief Value for the X-User header when forwarding admin requests
*/
static constexpr std::string_view ADMIN_FORWARDING_X_USER_VALUE = "clio_admin";

/**
* @brief Value for the X-User header when forwarding user requests
*/
static constexpr std::string_view USER_FORWARDING_X_USER_VALUE = "clio_user";

/**
* @brief Create an instance of the load balancer.
*
Expand Down Expand Up @@ -167,13 +179,15 @@ class LoadBalancer {
*
* @param request JSON-RPC request to forward
* @param clientIp The IP address of the peer, if known
* @param isAdmin Whether the request is from an admin
* @param yield The coroutine context
* @return Response received from rippled node as JSON object on success; nullopt on failure
*/
std::optional<boost::json::object>
forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
bool isAdmin,
boost::asio::yield_context yield
);

Expand Down
3 changes: 3 additions & 0 deletions src/etl/Source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -127,13 +128,15 @@ class SourceBase {
*
* @param request The request to forward
* @param forwardToRippledClientIp IP of the client forwarding this request if known
* @param xUserValue Value of the X-User header
* @param yield The coroutine context
* @return Response wrapped in an optional on success; nullopt otherwise
*/
virtual std::optional<boost::json::object>
forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& forwardToRippledClientIp,
std::string_view xUserValue,
boost::asio::yield_context yield
) const = 0;
};
Expand Down
5 changes: 5 additions & 0 deletions src/etl/impl/ForwardingSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <optional>
#include <stdexcept>
#include <string>
#include <string_view>
#include <utility>

namespace etl::impl {
Expand All @@ -55,6 +56,7 @@ std::optional<boost::json::object>
ForwardingSource::forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& forwardToRippledClientIp,
std::string_view xUserValue,
boost::asio::yield_context yield
) const
{
Expand All @@ -64,6 +66,9 @@ ForwardingSource::forwardToRippled(
{boost::beast::http::field::forwarded, fmt::format("for={}", *forwardToRippledClientIp)}
);
}

connectionBuilder.addHeader({"X-User", std::string{xUserValue}});

auto expectedConnection = connectionBuilder.connect(yield);
if (not expectedConnection) {
return std::nullopt;
Expand Down
3 changes: 3 additions & 0 deletions src/etl/impl/ForwardingSource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <chrono>
#include <optional>
#include <string>
#include <string_view>

namespace etl::impl {

Expand All @@ -49,13 +50,15 @@ class ForwardingSource {
*
* @param request The request to forward
* @param forwardToRippledClientIp IP of the client forwarding this request if known
* @param xUserValue Optional value for X-User header
* @param yield The coroutine context
* @return Response wrapped in an optional on success; nullopt otherwise
*/
std::optional<boost::json::object>
forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& forwardToRippledClientIp,
std::string_view xUserValue,
boost::asio::yield_context yield
) const;
};
Expand Down
5 changes: 4 additions & 1 deletion src/etl/impl/SourceImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
namespace etl::impl {
Expand Down Expand Up @@ -202,17 +203,19 @@ class SourceImpl : public SourceBase {
*
* @param request The request to forward
* @param forwardToRippledClientIp IP of the client forwarding this request if known
* @param xUserValue Optional value of the X-User header
* @param yield The coroutine context
* @return Response wrapped in an optional on success; nullopt otherwise
*/
std::optional<boost::json::object>
forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& forwardToRippledClientIp,
std::string_view xUserValue,
boost::asio::yield_context yield
) const final
{
return forwardingSource_.forwardToRippled(request, forwardToRippledClientIp, yield);
return forwardingSource_.forwardToRippled(request, forwardToRippledClientIp, xUserValue, yield);
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/rpc/common/impl/ForwardingProxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class ForwardingProxy {
auto toForward = ctx.params;
toForward["command"] = ctx.method;

auto res = balancer_->forwardToRippled(toForward, ctx.clientIp, ctx.yield);
auto res = balancer_->forwardToRippled(toForward, ctx.clientIp, ctx.isAdmin, ctx.yield);
if (not res) {
notifyFailedToForward(ctx.method);
return Result{Status{RippledError::rpcFAILED_TO_FORWARD}};
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/handlers/ServerInfo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class BaseServerInfoHandler {
}

auto const serverInfoRippled =
balancer_->forwardToRippled({{"command", "server_info"}}, ctx.clientIp, ctx.yield);
balancer_->forwardToRippled({{"command", "server_info"}}, ctx.clientIp, ctx.isAdmin, ctx.yield);

if (serverInfoRippled && !serverInfoRippled->contains(JS(error))) {
if (serverInfoRippled->contains(JS(result)) &&
Expand Down
2 changes: 1 addition & 1 deletion tests/common/util/MockLoadBalancer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct MockLoadBalancer {
MOCK_METHOD(
std::optional<boost::json::object>,
forwardToRippled,
(boost::json::object const&, std::optional<std::string> const&, boost::asio::yield_context),
(boost::json::object const&, std::optional<std::string> const&, bool, boost::asio::yield_context),
(const)
);
};
9 changes: 5 additions & 4 deletions tests/common/util/MockSource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#pragma once

#include "data/BackendInterface.hpp"
#include "etl/ETLHelpers.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/Source.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/config/Config.hpp"
Expand All @@ -39,7 +39,7 @@
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <string_view>
#include <utility>
#include <vector>

Expand All @@ -60,7 +60,7 @@ struct MockSource : etl::SourceBase {
MOCK_METHOD(
std::optional<boost::json::object>,
forwardToRippled,
(boost::json::object const&, std::optional<std::string> const&, boost::asio::yield_context),
(boost::json::object const&, std::optional<std::string> const&, std::string_view, boost::asio::yield_context),
(const, override)
);
};
Expand Down Expand Up @@ -129,10 +129,11 @@ class MockSourceWrapper : public etl::SourceBase {
forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& forwardToRippledClientIp,
std::string_view xUserValue,
boost::asio::yield_context yield
) const override
{
return mock_->forwardToRippled(request, forwardToRippledClientIp, yield);
return mock_->forwardToRippled(request, forwardToRippledClientIp, xUserValue, yield);
}
};

Expand Down
37 changes: 34 additions & 3 deletions tests/common/util/TestWsServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,30 @@
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/beast/core/role.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/beast/http/field.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/string_body.hpp>
#include <boost/beast/websocket/error.hpp>
#include <boost/beast/websocket/rfc6455.hpp>
#include <boost/beast/websocket/stream_base.hpp>
#include <gtest/gtest.h>

#include <algorithm>
#include <expected>
#include <iterator>
#include <optional>
#include <string>
#include <utility>
#include <vector>

namespace asio = boost::asio;
namespace websocket = boost::beast::websocket;

TestWsConnection::TestWsConnection(websocket::stream<boost::beast::tcp_stream> wsStream) : ws_(std::move(wsStream))
TestWsConnection::TestWsConnection(
websocket::stream<boost::beast::tcp_stream> wsStream,
std::vector<util::requests::HttpHeader> headers
)
: ws_(std::move(wsStream)), headers_(std::move(headers))
{
}

Expand Down Expand Up @@ -83,6 +93,12 @@ TestWsConnection::close(boost::asio::yield_context yield)
return std::nullopt;
}

std::vector<util::requests::HttpHeader> const&
TestWsConnection::headers() const
{
return headers_;
}

TestWsServer::TestWsServer(asio::io_context& context, std::string const& host, int port) : acceptor_(context)
{
auto endpoint = asio::ip::tcp::endpoint(boost::asio::ip::make_address(host), port);
Expand All @@ -102,13 +118,28 @@ TestWsServer::acceptConnection(asio::yield_context yield)
if (errorCode)
return std::unexpected{util::requests::RequestError{"Accept error", errorCode}};

boost::beast::flat_buffer buffer;
boost::beast::http::request<boost::beast::http::string_body> request;
boost::beast::http::async_read(socket, buffer, request, yield[errorCode]);
if (errorCode)
return std::unexpected{util::requests::RequestError{"Read error", errorCode}};
std::vector<util::requests::HttpHeader> headers;
std::transform(request.begin(), request.end(), std::back_inserter(headers), [](auto const& header) {
if (header.name() == boost::beast::http::field::unknown)
return util::requests::HttpHeader{header.name_string(), header.value()};

return util::requests::HttpHeader{header.name(), header.value()};
});
if (not boost::beast::websocket::is_upgrade(request))
return std::unexpected{util::requests::RequestError{"Not a websocket request"}};

boost::beast::websocket::stream<boost::beast::tcp_stream> ws(std::move(socket));
ws.set_option(websocket::stream_base::timeout::suggested(boost::beast::role_type::server));
ws.async_accept(yield[errorCode]);
ws.async_accept(request, yield[errorCode]);
if (errorCode)
return std::unexpected{util::requests::RequestError{"Handshake error", errorCode}};

return TestWsConnection(std::move(ws));
return TestWsConnection(std::move(ws), std::move(headers));
}

void
Expand Down
10 changes: 9 additions & 1 deletion tests/common/util/TestWsServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,20 @@
#include <functional>
#include <optional>
#include <string>
#include <vector>

class TestWsConnection {
boost::beast::websocket::stream<boost::beast::tcp_stream> ws_;
std::vector<util::requests::HttpHeader> headers_;

public:
using SendCallback = std::function<void()>;
using ReceiveCallback = std::function<void(std::string)>;

TestWsConnection(boost::beast::websocket::stream<boost::beast::tcp_stream> wsStream);
TestWsConnection(
boost::beast::websocket::stream<boost::beast::tcp_stream> wsStream,
std::vector<util::requests::HttpHeader> headers
);

// returns error message if error occurs
std::optional<std::string>
Expand All @@ -51,6 +56,9 @@ class TestWsConnection {

std::optional<std::string>
close(boost::asio::yield_context yield);

std::vector<util::requests::HttpHeader> const&
headers() const;
};

class TestWsServer {
Expand Down
Loading

0 comments on commit 56ab943

Please sign in to comment.