From ee9f2dae9b76385c1dc2b11ce9e74607035b68bf Mon Sep 17 00:00:00 2001 From: Mathias Laurin Date: Fri, 29 Nov 2024 09:24:55 +0100 Subject: [PATCH] Fetch rrd data over UDS socket CMK-18929 Change-Id: I3ded347b4c978ea6ac42037c9b349bbaa9872785 --- packages/livestatus/BUILD | 4 +- packages/livestatus/src/RRDColumn.cc | 315 ++++++++++----------------- 2 files changed, 116 insertions(+), 203 deletions(-) diff --git a/packages/livestatus/BUILD b/packages/livestatus/BUILD index 726e668c506..93331291aed 100644 --- a/packages/livestatus/BUILD +++ b/packages/livestatus/BUILD @@ -43,12 +43,10 @@ cc_library( ], visibility = ["//visibility:public"], deps = [ - ":livestatus_poller", ":livestatus_headers", + ":livestatus_poller", "@asio", "@re2", - # rrdgraph for rrd_xport - "@rrdtool_native//:rrdgraph", ], ) diff --git a/packages/livestatus/src/RRDColumn.cc b/packages/livestatus/src/RRDColumn.cc index 6fc7fb6c80a..b036d4dfa9f 100644 --- a/packages/livestatus/src/RRDColumn.cc +++ b/packages/livestatus/src/RRDColumn.cc @@ -5,27 +5,33 @@ #include "livestatus/RRDColumn.h" -#include - #include #include #include -#include +#include #include +#include #include #include +#include #include #include #include +#include #include "livestatus/ICore.h" #include "livestatus/Interface.h" #include "livestatus/Logger.h" #include "livestatus/Metric.h" #include "livestatus/PnpUtils.h" +#include "livestatus/RRDConsolidate.h" #include "livestatus/RRDFetch.h" +#include "livestatus/RRDRPN.h" +#include "livestatus/RRDUDSSocket.h" +#include "livestatus/StringUtils.h" using namespace std::string_view_literals; +using namespace std::chrono_literals; RRDColumnArgs::RRDColumnArgs(const std::string &arguments, const std::string &column_name) { @@ -109,22 +115,72 @@ std::string replace_all(const std::string &str, const std::string &chars, return result; } -std::pair getVarAndCF(std::string_view str) { +std::pair> getVarAndCF(std::string_view str) { const size_t dot_pos = str.find_last_of('.'); if (dot_pos != std::string::npos) { const Metric::Name head{std::string{str.substr(0, dot_pos)}}; auto tail = str.substr(dot_pos); if (tail == ".max"sv) { - return std::make_pair(head, "MAX"); + return std::make_pair(head, std::make_unique()); } if (tail == ".min"sv) { - return std::make_pair(head, "MIN"); + return std::make_pair(head, std::make_unique()); } if (tail == ".average"sv) { - return std::make_pair(head, "AVERAGE"); + return std::make_pair(head, std::make_unique()); + } + } + return std::make_pair(Metric::Name{std::string{str}}, + std::make_unique()); +} + +std::vector readData(RRDUDSSocket &sock, std::size_t count) { + const std::size_t raw_size = count * sizeof(double); + auto raw = std::string{}; + raw.reserve(raw_size); + while (raw.size() != raw_size) { + const auto part = sock.read(raw_size - raw.size()); + if (part.empty()) { + throw std::runtime_error("invalid payload"); + } + raw.append(part); + } + auto out = std::vector(count); + std::memcpy(out.data(), raw.data(), raw.size()); + return out; +} + +void sendFetchBin(RRDUDSSocket &sock, std::string_view fetchbin, + Logger *logger) { + const auto *it = fetchbin.begin(); + while (it != fetchbin.end()) { + auto written = sock.write({it, fetchbin.end()}, 200ms); + if (written <= 0) { + Warning(logger) << "Error sending RRD data: " << sock.readLine(); + return; } + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + it += written; + } +} + +std::tuple> recvFetchReply( + RRDUDSSocket &sock) { + const std::string status = sock.readLine(); + const int retcode = atoi(status.c_str()); + + auto rawheader = std::vector{}; + if (retcode < 0 || std::size_t(retcode) < RRDFetchHeader::size()) { + throw std::runtime_error{"invalid header"}; } - return std::make_pair(Metric::Name{std::string{str}}, "MAX"); + for (std::size_t ii = 0; ii < RRDFetchHeader::size(); ++ii) { + auto line = sock.readLine(); + rawheader.emplace_back(line); + } + const auto header = RRDFetchHeader::parse(rawheader); + const auto dsname_line = RRDFetchBinPayloadHeader::parse(sock.readLine()); + const auto payload = readData(sock, dsname_line.value_count); + return std::make_tuple(status, header, payload); } struct Data { @@ -147,7 +203,7 @@ struct Data { return result; } }; -}; // namespace +} // namespace // TODO(mk): Convert all of the RPN expressions that are available in RRDTool // and that have a different syntax then we have in our metrics system. @@ -157,31 +213,17 @@ struct Data { std::vector RRDDataMaker::make( const std::string &host_name, const std::string &service_description, std::chrono::seconds timezone_offset) const { - // Prepare the arguments for rrdtool xport in a dynamic array of strings. - // Note: The actual step might be different! - std::vector argv_s{ - "rrdtool xport", // name of program (ignored) - "-s", - std::to_string(args_.start_time), - "-e", - std::to_string(args_.end_time), - "--step", - std::to_string(args_.resolution)}; - - if (args_.max_entries > 0) { - argv_s.emplace_back("-m"); - argv_s.emplace_back(std::to_string(args_.max_entries)); - } + auto *logger = core_->loggerRRD(); - // We have an RPN like fs_used,1024,*. In order for that to work, we need to - // create DEFs for all RRDs of the service first. The we create a CDEF with - // our RPN and finally do the export. One difficulty here: we do not know + // We have an RPN like fs_used,1024,*. + // One difficulty here: we do not know // the exact variable names. The filenames of the RRDs have several // characters replaced with "_". This is a one-way escaping where we cannot // get back the original variable values. So the cleaner (an probably // faster) way is to look for the names of variables within our RPN // expressions and create DEFs just for them - if the according RRD exists. std::string converted_rpn; // convert foo.max -> foo-max + MetricLocation location; std::string_view rpn{args_.rpn}; auto next = [&rpn]() { auto token = rpn.substr(0, rpn.find(',')); @@ -195,7 +237,11 @@ std::vector RRDDataMaker::make( unsigned next_variable_number = 0; std::set touched_rrds; + // default to MAX + std::unique_ptr cf = std::make_unique(); while (!rpn.empty()) { + // That seems unnecessarily complex since AFAIK, we handle + // one DS at a time and only the simplest consolidation functions. auto token = next(); if (!converted_rpn.empty()) { converted_rpn += ","; @@ -206,190 +252,59 @@ std::vector RRDDataMaker::make( } // If the token looks like a variable name, then check if there is a - // matching RRD and create a matching DEF: command if that is the - // case. The token (assumed to be a metrics variable name) can contain a + // matching RRD. The token (assumed to be a metrics variable name) can + // contain a // '.' like e.g. in 'user.max', which select the consolidation function - // MAX. RRDTool does not allow a variable name to contain a '.', but - // strangely enough, it allows an underscore. Therefore, we replace '.' - // by '_' here. - auto [var, cf] = getVarAndCF(token); - auto location = - core_->metricLocation(host_name, service_description, var); + // MAX. + auto [var, cf_] = getVarAndCF(token); + cf.swap(cf_); + location = core_->metricLocation(host_name, service_description, var); + // RRDTool does not allow a variable name to contain a '.' but + // it allows an underscore. Therefore, we replace '.' by '_' here. std::string rrd_varname; if (location.path_.empty() || location.data_source_name_.empty()) { rrd_varname = replace_all(var.string(), ".", '_'); } else { + // We only support `var_1` in rpn_solve. rrd_varname = "var_" + std::to_string(++next_variable_number); - argv_s.push_back(std::string("DEF:") - .append(rrd_varname) - .append("=") - .append(location.path_.string()) - .append(":") - .append(location.data_source_name_) - .append(":") - .append(cf)); touched_rrds.insert(location.path_.string()); } converted_rpn += rrd_varname; } - // Add the two commands for the actual export. - argv_s.push_back("CDEF:xxx=" + converted_rpn); - argv_s.emplace_back("XPORT:xxx:"); - - // Make RRDTool flush the rrdcached if necessary - - // The cache daemon experiences long delays when queries extend over a - // large time range and the underlying RRA are in high resolution. - - // For performance reasons the xport tool will not connect to the daemon - // client to flush the data but will be done in 2 separate steps. First data - // will be flush only. Then the xport tool will directly read the RRD file. - - // The performance issues with the cache daemon have been reported to - // RRDTool on the issue - // https://github.com/oetiker/rrdtool-1.x/issues/1062 - - auto *logger = core_->loggerRRD(); - const auto rrdcached_socket = core_->paths()->rrdcached_socket(); - if (core_->pnp4nagiosEnabled() && !rrdcached_socket.empty() && - !touched_rrds.empty()) { - std::vector daemon_argv_s{ - "rrdtool flushcached", // name of program (ignored) - "--daemon", rrdcached_socket}; - - for (const auto &rrdfile : touched_rrds) { - daemon_argv_s.push_back(rrdfile); - } - - // Convert our dynamic C++ string array to a C-style argv array - std::vector daemon_argv; - daemon_argv.reserve(daemon_argv_s.size()); - for (const auto &arg : daemon_argv_s) { - daemon_argv.push_back(arg.c_str()); - } - daemon_argv.push_back(nullptr); - - if (logger->isLoggable(LogLevel::debug)) { - Debug debug(logger); - debug << "flush RRD data:"; - for (const auto &arg : daemon_argv_s) { - debug << " " << arg; - } - } - - if (rrd_flushcached( - static_cast(daemon_argv_s.size()), - // The RRD library is not const-correct. - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast) - const_cast(daemon_argv.data())) != 0) { - Warning(logger) << "Error flushing RRD: " << rrd_get_error(); - } - } - - // Convert our dynamic C++ string array to a C-style argv array - std::vector argv; - argv.reserve(argv_s.size()); - for (const auto &arg : argv_s) { - argv.push_back(arg.c_str()); + std::size_t dsname = 0; + const auto dsname_view = std::string_view{location.data_source_name_}; + auto [ptr, ec] = + std::from_chars(dsname_view.begin(), dsname_view.end(), dsname); + if (ec != std::errc{}) { + Warning(logger) << "Invalid location: " << location.data_source_name_; + return {}; } - argv.push_back(nullptr); - if (logger->isLoggable(LogLevel::debug)) { - Debug debug(logger); - debug << "retrieving RRD data:"; - for (const auto &arg : argv_s) { - debug << " " << arg; - } - } - - // Now do the actual export. The library function rrd_xport mimics the - // command line API of rrd xport, but - fortunately - we get direct access - // to a binary buffer with doubles. No parsing is required. - int xxsize = 0; - time_t start = 0; - time_t end = 0; - unsigned long step = 0; - unsigned long col_cnt = 0; - char **legend_v = nullptr; - rrd_value_t *rrd_data = nullptr; - - // Clear the RRD error float. RRDTool will not do this and immediately fail - // if an error already occurred. - rrd_clear_error(); - - Data data; - if (rrd_xport(static_cast(argv_s.size()), - // The RRD library is not const-correct. - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast) - const_cast(argv.data()), &xxsize, &start, &end, - &step, &col_cnt, &legend_v, &rrd_data) != 0) { - const std::string rrd_error{rrd_get_error()}; - if (rrd_error.starts_with("don't understand ")) { - // The error msg "don't understand ''" is logged on - // info lvl only as preventing such queries for non-given metrics is - // not feasible atm - Informational(logger) - << "Error parsing RPN expression: " << rrd_error; - } else { - Warning(logger) << "Error accessing RRD: " << rrd_error; - } - return data.as_vector(timezone_offset); - } - - // Since we have exactly one XPORT command, we expect exactly one column - if (col_cnt != 1) { - Error(logger) << "rrd_xport returned " << col_cnt - << " columns, but exactly one was expected."; - } else { - // XPORT takes a closed timewindow in its query and returns the - // timestamped values that represent an intersection with the query - // window. The returned interval description is right closed. - - // The timestamps associated with a value in RRDtool ALWAYS - // represent the time the sample was taken. Since any value you - // sample will represent some sort of past state your sampling - // apparatus has gathered, the timestamp will always be at the end - // of the sampling period - - // LEGEND - // O timestamps of measurements - // | query values, _start_time and _end_time - // x returned start, no data contained - // v returned data rows, includes end y - - // --O---O---O---O---O---O---O---O - // |---------------| - // x---v---v---v---v---y - - // Exact start time of the represented interval(x). This is <= our - // _start_time(|), but no value is associated to this time. - data.start = std::chrono::system_clock::from_time_t(start); - // Time closing time of the interval(y). This is >= our _end_time, and - // holds the last data value. - data.end = std::chrono::system_clock::from_time_t(end); - // Actual resolution in seconds. This is >= our _resolution - data.step = step; - // Now the actual data - double for double - // Data rows represent past values, thus loop starts with step shift. - // Interval is right closed, thus iterate until end inclusive. - rrd_value_t *ptr = rrd_data; - // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions) - for (time_t ti = start + step; ti <= end; ti += step) { - // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - data.values.push_back(*ptr++); - } - } - - // rrd_xport uses malloc, so we *have* to use free. - // NOLINTBEGIN(cppcoreguidelines-no-malloc,cppcoreguidelines-owning-memory) - for (unsigned long j = 0; j < col_cnt; j++) { - // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - ::free(legend_v[j]); - } - // NOLINTNEXTLINE(bugprone-multi-level-implicit-pointer-conversion) - ::free(legend_v); - ::free(rrd_data); - // NOLINTEND(cppcoreguidelines-no-malloc,cppcoreguidelines-owning-memory) - return data.as_vector(timezone_offset); + const auto rrdcached_socket = core_->paths()->rrdcached_socket(); + auto sock = + RRDUDSSocket{rrdcached_socket, logger, RRDUDSSocket::verbosity::none}; + sock.connect(); + + const auto fetch = std::ostringstream{} + << "FETCHBIN " << location.path_.string() << " " << *cf + << " " << args_.start_time << " " << args_.end_time + << " " << dsname << "\n"; + sendFetchBin(sock, fetch.view(), logger); + const auto &&[status, header, rawdata] = recvFetchReply(sock); + std::vector values; + values.reserve(rawdata.size()); + std::ranges::transform( + rawdata, std::back_inserter(values), [converted_rpn](auto &&point) { + return rrd_rpn_solve(mk::split(converted_rpn, ','), + std::make_pair("var_1", point)); + }); + const auto &&[out_values, out_resolution] = + rrd_consolidate(cf, values, header.step, args_.resolution); + Data out; + out.start = header.start; + out.end = header.end; + out.step = out_resolution; + out.values = out_values; + return out.as_vector(timezone_offset); }