Skip to content

Commit

Permalink
Merge branch 'XRPLF:develop' into 1219-split-metric-and-stat-analysis…
Browse files Browse the repository at this point in the history
…-docs
  • Loading branch information
maria-robobug authored Jan 30, 2025
2 parents c1beb9c + e549657 commit 742e114
Show file tree
Hide file tree
Showing 53 changed files with 1,041 additions and 199 deletions.
11 changes: 6 additions & 5 deletions docker/ci/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ This image contains an environment to build [Clio](https://github.com/XRPLF/clio
It is used in [Clio Github Actions](https://github.com/XRPLF/clio/actions) but can also be used to compile Clio locally.

The image is based on Ubuntu 20.04 and contains:
- clang 16
- clang 16.0.6
- gcc 12.3
- doxygen 1.10
- doxygen 1.12
- gh 2.40
- ccache 4.8.3
- conan
- ccache 4.10.2
- conan 1.62
- and some other useful tools

Conan is set up to build Clio without any additional steps. There are two preset conan profiles: `clang` and `gcc` to use corresponding compiler.
Conan is set up to build Clio without any additional steps. There are two preset conan profiles: `clang` and `gcc` to use corresponding compiler. By default conan is setup to use `gcc`.
Sanitizer builds for `ASAN`, `TSAN` and `UBSAN` are enabled via conan profiles for each of the supported compilers. These can be selected using the following pattern (all lowercase): `[compiler].[sanitizer]` (e.g. `--profile gcc.tsan`).
9 changes: 9 additions & 0 deletions docker/ci/conan/clang.asan
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
include(clang)

[options]
boost:extra_b2_flags="cxxflags=\"-fsanitize=address\" linkflags=\"-fsanitize=address\""
boost:without_stacktrace=True
[env]
CFLAGS="-fsanitize=address"
CXXFLAGS="-fsanitize=address"
LDFLAGS="-fsanitize=address"
9 changes: 9 additions & 0 deletions docker/ci/conan/clang.tsan
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
include(clang)

[options]
boost:extra_b2_flags="cxxflags=\"-fsanitize=thread\" linkflags=\"-fsanitize=thread\""
boost:without_stacktrace=True
[env]
CFLAGS="-fsanitize=thread"
CXXFLAGS="-fsanitize=thread"
LDFLAGS="-fsanitize=thread"
9 changes: 9 additions & 0 deletions docker/ci/conan/clang.ubsan
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
include(clang)

[options]
boost:extra_b2_flags="cxxflags=\"-fsanitize=undefined\" linkflags=\"-fsanitize=undefined\""
boost:without_stacktrace=True
[env]
CFLAGS="-fsanitize=undefined"
CXXFLAGS="-fsanitize=undefined"
LDFLAGS="-fsanitize=undefined"
9 changes: 9 additions & 0 deletions docker/ci/conan/gcc.asan
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
include(gcc)

[options]
boost:extra_b2_flags="cxxflags=\"-fsanitize=address\" linkflags=\"-fsanitize=address\""
boost:without_stacktrace=True
[env]
CFLAGS="-fsanitize=address"
CXXFLAGS="-fsanitize=address"
LDFLAGS="-fsanitize=address"
9 changes: 9 additions & 0 deletions docker/ci/conan/gcc.tsan
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
include(gcc)

[options]
boost:extra_b2_flags="cxxflags=\"-fsanitize=thread\" linkflags=\"-fsanitize=thread\""
boost:without_stacktrace=True
[env]
CFLAGS="-fsanitize=thread"
CXXFLAGS="-fsanitize=thread"
LDFLAGS="-fsanitize=thread"
9 changes: 9 additions & 0 deletions docker/ci/conan/gcc.ubsan
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
include(gcc)

[options]
boost:extra_b2_flags="cxxflags=\"-fsanitize=undefined\" linkflags=\"-fsanitize=undefined\""
boost:without_stacktrace=True
[env]
CFLAGS="-fsanitize=undefined"
CXXFLAGS="-fsanitize=undefined"
LDFLAGS="-fsanitize=undefined"
7 changes: 7 additions & 0 deletions docker/ci/dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,10 @@ RUN conan profile new clang --detect \
&& conan profile update "conf.tools.build:compiler_executables={\"c\": \"/usr/bin/clang-16\", \"cpp\": \"/usr/bin/clang++-16\"}" clang

RUN echo "include(gcc)" >> .conan/profiles/default

COPY conan/gcc.asan /root/.conan/profiles
COPY conan/gcc.tsan /root/.conan/profiles
COPY conan/gcc.ubsan /root/.conan/profiles
COPY conan/clang.asan /root/.conan/profiles
COPY conan/clang.tsan /root/.conan/profiles
COPY conan/clang.ubsan /root/.conan/profiles
2 changes: 0 additions & 2 deletions src/app/ClioApplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
#include "web/ng/Server.hpp"

#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/use_future.hpp>

#include <cstdint>
#include <cstdlib>
Expand Down
9 changes: 5 additions & 4 deletions src/data/cassandra/impl/AsyncExecutor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "data/cassandra/Handle.hpp"
#include "data/cassandra/Types.hpp"
#include "data/cassandra/impl/RetryPolicy.hpp"
#include "util/Mutex.hpp"
#include "util/log/Logger.hpp"

#include <boost/asio.hpp>
Expand Down Expand Up @@ -64,8 +65,8 @@ class AsyncExecutor : public std::enable_shared_from_this<AsyncExecutor<Statemen
RetryCallbackType onRetry_;

// does not exist during initial construction, hence optional
std::optional<FutureWithCallbackType> future_;
std::mutex mtx_;
using OptionalFuture = std::optional<FutureWithCallbackType>;
util::Mutex<OptionalFuture> future_;

public:
/**
Expand Down Expand Up @@ -127,8 +128,8 @@ class AsyncExecutor : public std::enable_shared_from_this<AsyncExecutor<Statemen
self = nullptr; // explicitly decrement refcount
};

std::scoped_lock const lck{mtx_};
future_.emplace(handle.asyncExecute(data_, std::move(handler)));
auto future = future_.template lock<std::scoped_lock>();
future->emplace(handle.asyncExecute(data_, std::move(handler)));
}
};

Expand Down
6 changes: 4 additions & 2 deletions src/etl/CacheLoader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ class CacheLoader {
void
stop() noexcept
{
loader_->stop();
if (loader_ != nullptr)
loader_->stop();
}

/**
Expand All @@ -139,7 +140,8 @@ class CacheLoader {
void
wait() noexcept
{
loader_->wait();
if (loader_ != nullptr)
loader_->wait();
}
};

Expand Down
1 change: 0 additions & 1 deletion src/etl/impl/SubscriptionSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/beast/http/field.hpp>
#include <boost/json/object.hpp>
#include <boost/json/parse.hpp>
Expand Down
2 changes: 1 addition & 1 deletion src/etlng/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ add_library(clio_etlng)

target_sources(
clio_etlng PRIVATE impl/AmendmentBlockHandler.cpp impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp
impl/Loading.cpp
impl/Loading.cpp impl/TaskManager.cpp
)

target_link_libraries(clio_etlng PUBLIC clio_data)
49 changes: 49 additions & 0 deletions src/etlng/LedgerPublisherInterface.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#pragma once

#include <chrono>
#include <cstdint>
#include <optional>

namespace etlng {

/**
* @brief The interface of a scheduler for the extraction proccess
*/
struct LedgerPublisherInterface {
virtual ~LedgerPublisherInterface() = default;

/**
* @brief Publish the ledger by its sequence number
*
* @param seq The sequence number of the ledger
* @param maxAttempts The maximum number of attempts to publish the ledger; no limit if nullopt
* @param attemptsDelay The delay between attempts
*/
virtual void
publish(
uint32_t seq,
std::optional<uint32_t> maxAttempts,
std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1}
) = 0;
};

} // namespace etlng
142 changes: 142 additions & 0 deletions src/etlng/impl/TaskManager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#include "etlng/impl/TaskManager.hpp"

#include "etlng/ExtractorInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/SchedulerInterface.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/async/AnyStrand.hpp"
#include "util/log/Logger.hpp"


#include <chrono>
#include <cstddef>
#include <functional>
#include <ranges>
#include <thread>
#include <utility>
#include <vector>

namespace etlng::impl {

TaskManager::TaskManager(
util::async::AnyExecutionContext&& ctx,
std::reference_wrapper<SchedulerInterface> scheduler,
std::reference_wrapper<ExtractorInterface> extractor,
std::reference_wrapper<LoaderInterface> loader
)
: ctx_(std::move(ctx)), schedulers_(scheduler), extractor_(extractor), loader_(loader)
{
}

TaskManager::~TaskManager()
{
stop();
}

void
TaskManager::run(Settings settings)
{
static constexpr auto kQUEUE_SIZE_LIMIT = 2048uz;

auto schedulingStrand = ctx_.makeStrand();
PriorityQueue queue(ctx_.makeStrand(), kQUEUE_SIZE_LIMIT);

LOG(log_.debug()) << "Starting task manager...\n";

extractors_.reserve(settings.numExtractors);
for ([[maybe_unused]] auto _ : std::views::iota(0uz, settings.numExtractors))
extractors_.push_back(spawnExtractor(schedulingStrand, queue));

loaders_.reserve(settings.numLoaders);
for ([[maybe_unused]] auto _ : std::views::iota(0uz, settings.numLoaders))
loaders_.push_back(spawnLoader(queue));

wait();
LOG(log_.debug()) << "All finished in task manager..\n";
}

util::async::AnyOperation<void>
TaskManager::spawnExtractor(util::async::AnyStrand& strand, PriorityQueue& queue)
{
// TODO: these values may be extracted to config later and/or need to be fine-tuned on a realistic system
static constexpr auto kDELAY_BETWEEN_ATTEMPTS = std::chrono::milliseconds{100u};
static constexpr auto kDELAY_BETWEEN_ENQUEUE_ATTEMPTS = std::chrono::milliseconds{1u};

return strand.execute([this, &queue](auto stopRequested) {
while (not stopRequested) {
if (auto task = schedulers_.get().next(); task.has_value()) {
if (auto maybeBatch = extractor_.get().extractLedgerWithDiff(task->seq); maybeBatch.has_value()) {
LOG(log_.debug()) << "Adding data after extracting diff";
while (not queue.enqueue(*maybeBatch)) {
// TODO (https://github.com/XRPLF/clio/issues/1852)
std::this_thread::sleep_for(kDELAY_BETWEEN_ENQUEUE_ATTEMPTS);

if (stopRequested)
break;
}
} else {
// TODO: how do we signal to the loaders that it's time to shutdown? some special task?
break; // TODO: handle server shutdown or other node took over ETL
}
} else {
// TODO (https://github.com/XRPLF/clio/issues/1852)
std::this_thread::sleep_for(kDELAY_BETWEEN_ATTEMPTS);
}
}
});
}

util::async::AnyOperation<void>
TaskManager::spawnLoader(PriorityQueue& queue)
{
return ctx_.execute([this, &queue](auto stopRequested) {
while (not stopRequested) {
// TODO (https://github.com/XRPLF/clio/issues/66): does not tell the loader whether it's out of order or not
if (auto data = queue.dequeue(); data.has_value())
loader_.get().load(*data);
}
});
}

void
TaskManager::wait()
{
for (auto& extractor : extractors_)
extractor.wait();
for (auto& loader : loaders_)
loader.wait();
}

void
TaskManager::stop()
{
for (auto& extractor : extractors_)
extractor.abort();
for (auto& loader : loaders_)
loader.abort();

wait();
}

} // namespace etlng::impl
Loading

0 comments on commit 742e114

Please sign in to comment.