Skip to content

Commit

Permalink
try improve rt update (#585)
Browse files Browse the repository at this point in the history
* try improve rt update

* better error message
  • Loading branch information
felixguendling authored Oct 18, 2024
1 parent 18a1d72 commit f905abc
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 102 deletions.
1 change: 1 addition & 0 deletions docs/setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ timetable: # if not set, no timetable will be loaded
merge_dupes_inter_src: false # duplicates withing different datasets will be merged
link_stop_distance: 100 # stops will be linked by footpaths if they're less than X meters (default=100m) apart
update_interval: 60 # real-time updates are polled every `update_interval` seconds
http_timeout: 10 # timeout for the HTTP server to respond with a package
incremental_rt_update: false # false = real-time updates are applied to a clean slate, true = no data will be dropped
max_footpath_length: 15 # maximum footpath length when transitively connecting stops or for routing footpaths if `osr_footpath` is set to true
datasets: # map of tag -> dataset
Expand Down
15 changes: 4 additions & 11 deletions exe/server.cc
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
#include "boost/asio/co_spawn.hpp"
#include "boost/asio/detached.hpp"
#include "boost/asio/io_context.hpp"
#include "boost/program_options.hpp"

#include "net/run.h"
#include "net/stop_handler.h"
Expand All @@ -11,7 +8,6 @@
#include "utl/init_from.h"

#include "motis/config.h"
#include "motis/cron.h"
#include "motis/endpoints/adr/geocode.h"
#include "motis/endpoints/adr/reverse_geocode.h"
#include "motis/endpoints/elevators.h"
Expand All @@ -33,7 +29,6 @@
#include "motis/rt_update.h"

namespace fs = std::filesystem;
namespace bpo = boost::program_options;
namespace asio = boost::asio;

namespace motis {
Expand Down Expand Up @@ -95,12 +90,10 @@ int server(data d, config const& c) {
auto rt_update_ioc = std::unique_ptr<asio::io_context>{};
if (c.requires_rt_timetable_updates()) {
rt_update_ioc = std::make_unique<asio::io_context>();
cron(*rt_update_ioc, std::chrono::seconds{c.timetable_->update_interval_},
[&]() {
asio::co_spawn(*rt_update_ioc, rt_update(c, *d.tt_, *d.tags_, d.rt_),
asio::detached);
});
rt_update_thread = std::make_unique<std::thread>(net::run(*rt_update_ioc));
rt_update_thread = std::make_unique<std::thread>([&]() {
run_rt_update(*rt_update_ioc, c, *d.tt_, *d.tags_, d.rt_);
rt_update_ioc->run();
});
}

if (ec) {
Expand Down
1 change: 1 addition & 0 deletions include/motis/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ struct config {
bool merge_dupes_inter_src_{false};
unsigned link_stop_distance_{100U};
unsigned update_interval_{60};
unsigned http_timeout_{10};
bool incremental_rt_update_{false};
std::uint16_t max_footpath_length_{15};
std::optional<std::string> default_timezone_{};
Expand Down
16 changes: 0 additions & 16 deletions include/motis/cron.h

This file was deleted.

12 changes: 6 additions & 6 deletions include/motis/http_req.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
#pragma once

#include <chrono>
#include <map>
#include <string>

#include "boost/asio/awaitable.hpp"
#include "boost/asio/co_spawn.hpp"
#include "boost/asio/io_context.hpp"
#include "boost/beast/core.hpp"
#include "boost/beast/http.hpp"
#include "boost/beast/version.hpp"
#include "boost/beast/http/dynamic_body.hpp"
#include "boost/beast/http/message.hpp"
#include "boost/url/url.hpp"

namespace motis {
Expand All @@ -17,6 +15,8 @@ using http_response =
boost::beast::http::response<boost::beast::http::dynamic_body>;

boost::asio::awaitable<http_response> http_GET(
boost::urls::url, std::map<std::string, std::string> const& headers);
boost::urls::url,
std::map<std::string, std::string> const& headers,
std::chrono::seconds timeout);

} // namespace motis
14 changes: 9 additions & 5 deletions include/motis/rt_update.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
#pragma once

#include "boost/asio/awaitable.hpp"
#include <chrono>
#include <memory>

#include "boost/asio/io_context.hpp"

#include "motis/fwd.h"

namespace motis {

boost::asio::awaitable<void> rt_update(config const&,
nigiri::timetable const&,
tag_lookup const& tags,
std::shared_ptr<rt>&);
void run_rt_update(boost::asio::io_context&,
config const&,
nigiri::timetable const&,
tag_lookup const&,
std::shared_ptr<rt>&);

}
48 changes: 0 additions & 48 deletions src/cron.cc

This file was deleted.

18 changes: 11 additions & 7 deletions src/http_req.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,25 @@ asio::awaitable<http_response> req(Stream&&,

asio::awaitable<http_response> req_no_tls(
boost::urls::url const& url,
std::map<std::string, std::string> const& headers) {
std::map<std::string, std::string> const& headers,
std::chrono::seconds const timeout) {
auto executor = co_await asio::this_coro::executor;
auto resolver = asio::ip::tcp::resolver{executor};
auto stream = beast::tcp_stream{executor};

auto const results = co_await resolver.async_resolve(
url.host(), url.has_port() ? url.port() : "80");

stream.expires_after(std::chrono::seconds(30));
stream.expires_after(timeout);

co_await stream.async_connect(results);
co_return co_await req(std::move(stream), url, headers);
}

asio::awaitable<http_response> req_tls(
boost::urls::url const& url,
std::map<std::string, std::string> const& headers) {
std::map<std::string, std::string> const& headers,
std::chrono::seconds const timeout) {
auto ssl_ctx = ssl::context{ssl::context::tlsv12_client};
ssl_ctx.set_default_verify_paths();
ssl_ctx.set_verify_mode(ssl::verify_none);
Expand All @@ -63,7 +65,7 @@ asio::awaitable<http_response> req_tls(
boost::asio::error::get_ssl_category()}};
}

stream.next_layer().expires_after(std::chrono::seconds(30));
stream.next_layer().expires_after(timeout);

auto const results = co_await resolver.async_resolve(
url.host(), url.has_port() ? url.port() : "443");
Expand Down Expand Up @@ -99,14 +101,16 @@ asio::awaitable<http_response> req(
}

asio::awaitable<http::response<http::dynamic_body>> http_GET(
boost::urls::url url, std::map<std::string, std::string> const& headers) {
boost::urls::url url,
std::map<std::string, std::string> const& headers,
std::chrono::seconds const timeout) {
auto n_redirects = 0U;
auto next_url = url;
while (n_redirects < 3U) {
auto const res =
co_await (next_url.scheme_id() == boost::urls::scheme::https
? req_tls(next_url, headers)
: req_no_tls(next_url, headers));
? req_tls(next_url, headers, timeout)
: req_no_tls(next_url, headers, timeout));
auto const code = res.base().result_int();
if (code >= 300 && code < 400) {
next_url = boost::urls::url{res.base()["Location"]};
Expand Down
70 changes: 61 additions & 9 deletions src/rt_update.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#include "motis/rt_update.h"

#include "boost/asio/co_spawn.hpp"
#include "boost/asio/detached.hpp"
#include "boost/asio/io_context.hpp"
#include "boost/asio/redirect_error.hpp"
#include "boost/asio/steady_timer.hpp"
#include "boost/beast/core/buffers_to_string.hpp"

#include "utl/timer.h"

#include "nigiri/rt/create_rt_timetable.h"
Expand All @@ -21,7 +28,8 @@ namespace motis {
awaitable<void> rt_update(config const& c,
nigiri::timetable const& tt,
tag_lookup const& tags,
std::shared_ptr<rt>& r) {
std::shared_ptr<rt>& r,
std::chrono::seconds const http_timeout) {
auto const t = utl::scoped_timer{"rt_update"};

auto const no_hdr = headers_t{};
Expand All @@ -37,7 +45,8 @@ awaitable<void> rt_update(config const& c,
auto const url = boost::urls::url{ep.url_};
gtfs_rt.emplace_back(
src, url,
http_GET(url, ep.headers_.has_value() ? *ep.headers_ : no_hdr));
http_GET(url, ep.headers_.has_value() ? *ep.headers_ : no_hdr,
http_timeout));
}
}

Expand All @@ -50,18 +59,24 @@ awaitable<void> rt_update(config const& c,

auto statistics = std::vector<n::rt::statistics>{};
for (auto& [src, url, response] : gtfs_rt) {
// alternatively: make_parallel_group
auto const res = co_await std::move(response);
auto const stats = n::rt::gtfsrt_update_buf(
tt, *rtt, src, tags.get_tag(src),
boost::beast::buffers_to_string(res.body().data()));
auto stats = n::rt::statistics{};
auto const tag = tags.get_tag(src);
try {
auto const res = co_await std::move(response);
stats = n::rt::gtfsrt_update_buf(
tt, *rtt, src, tag,
boost::beast::buffers_to_string(res.body().data()));
} catch (std::exception const& e) {
n::log(n::log_lvl::error, "motis.rt", "RT FETCH ERROR: tag={}, error={}",
tag, e.what());
}
statistics.emplace_back(stats);
}

for (auto const [endpoint, stats] : utl::zip(gtfs_rt, statistics)) {
auto const& [src, url, response] = endpoint;
fmt::println("rt update stats for {}: {}", fmt::streamed(url),
fmt::streamed(stats));
n::log(n::log_lvl::info, "motis.rt", "rt update stats for {}: {}",
fmt::streamed(url), fmt::streamed(stats));
}

auto railviz_rt = std::make_unique<railviz_rt_index>(tt, *rtt);
Expand All @@ -71,4 +86,41 @@ awaitable<void> rt_update(config const& c,
co_return;
}

void run_rt_update(boost::asio::io_context& ioc,
config const& c,
nigiri::timetable const& tt,
tag_lookup const& tags,
std::shared_ptr<rt>& r) {
boost::asio::co_spawn(
ioc,
[&c, &tt, &tags, &r]() -> awaitable<void> {
auto executor = co_await asio::this_coro::executor;
auto timer = asio::steady_timer{executor};
auto ec = boost::system::error_code{};
while (true) {
auto const start = std::chrono::steady_clock::now();

try {
co_await rt_update(
c, tt, tags, r,
std::chrono::seconds{c.timetable_->http_timeout_});
} catch (std::exception const& e) {
n::log(n::log_lvl::error, "motis.rt",
"EXCEPTION CAUGHT IN CRON: {}", e.what());
} catch (...) {
n::log(n::log_lvl::error, "motis.rt", "EXCEPTION CAUGHT IN CRON");
}

timer.expires_at(
start + std::chrono::seconds{c.timetable_->update_interval_});
co_await timer.async_wait(
asio::redirect_error(asio::use_awaitable, ec));
if (ec == asio::error::operation_aborted) {
co_return;
}
}
},
boost::asio::detached);
}

} // namespace motis
1 change: 1 addition & 0 deletions test/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ osm: europe-latest.osm.pbf
merge_dupes_inter_src: false
link_stop_distance: 100
update_interval: 60
http_timeout: 10
incremental_rt_update: false
max_footpath_length: 15
datasets:
Expand Down

0 comments on commit f905abc

Please sign in to comment.