From fa59c37f83320a5cbf77ddcd7d5e8604573e811b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Micha=C3=ABl=20Celerier?= Date: Wed, 15 Jan 2025 22:57:15 -0500 Subject: [PATCH] osc: work on adding more features to rate limiter --- .../network/rate_limiter_configuration.hpp | 25 ++ src/ossia/network/rate_limiting_protocol.cpp | 229 ++++++++++++------ src/ossia/network/rate_limiting_protocol.hpp | 20 +- 3 files changed, 197 insertions(+), 77 deletions(-) create mode 100644 src/ossia/network/rate_limiter_configuration.hpp diff --git a/src/ossia/network/rate_limiter_configuration.hpp b/src/ossia/network/rate_limiter_configuration.hpp new file mode 100644 index 00000000000..8ec4a5713bd --- /dev/null +++ b/src/ossia/network/rate_limiter_configuration.hpp @@ -0,0 +1,25 @@ +#pragma once +#include + +#include + +namespace ossia::net +{ +struct rate_limiter_configuration +{ + using clock = std::chrono::steady_clock; + + // What is the rate limit + std::chrono::milliseconds duration{}; + + // Put things in e.g. OSC bundles + bool bundle{}; + + // When sending, send every parameters from the device, not just the last ones + bool send_all{}; + + // Always send the last sent values on every tick + bool repeat{}; +}; + +} diff --git a/src/ossia/network/rate_limiting_protocol.cpp b/src/ossia/network/rate_limiting_protocol.cpp index 405f843080f..ae00fc6597a 100644 --- a/src/ossia/network/rate_limiting_protocol.cpp +++ b/src/ossia/network/rate_limiting_protocol.cpp @@ -8,83 +8,162 @@ #if defined(__cpp_exceptions) namespace ossia::net { +// FIXME refactor with coalescing_queue +// FIXME refactor with sleep_accurate (MIDISync.hpp) +// FIXME refactor with exp wait in audio_spin_mutex + struct rate_limiter { rate_limiting_protocol& self; - void operator()() const noexcept + std::chrono::steady_clock::duration duration; + std::chrono::steady_clock::duration time_to_sleep{duration}; + + std::chrono::steady_clock::time_point sleep_before() { - ossia::set_thread_name("ossia ratelim"); using namespace std::literals; using clock = rate_limiting_protocol::clock; - const auto duration = self.m_duration.load(); - thread_local auto time_to_sleep = duration; - while(self.m_running) + auto prev_time = clock::now(); + if(time_to_sleep > 1ms) + std::this_thread::sleep_for(time_to_sleep); + return prev_time; + } + + void sleep_after(std::chrono::steady_clock::time_point prev_time) + { + using namespace std::literals; + using clock = rate_limiting_protocol::clock; + + auto new_time = clock::now(); + auto observed_duration + = std::chrono::duration_cast(new_time - prev_time); + if(observed_duration > duration) + { + if(observed_duration >= 2 * duration) + time_to_sleep = 0ms; + else + time_to_sleep = 2 * duration - observed_duration; + } + else + { + time_to_sleep = duration; + } + } +}; + +template +struct rate_limiter_impl; + +template <> +struct rate_limiter_impl : rate_limiter +{ + void operator()() + { + // TODO find safe way to handle if a parameter is removed + // TODO instead we should do the value filtering in the parameter ... + // but still have to handle cases that can be optimized, such as midi + { + std::lock_guard lock{self.m_msgMutex}; + std::swap(self.m_buffer, self.m_userMessages); + } + + // Copy newest messages in local map + for(auto& msg : self.m_buffer) + { + if(msg.second.first.valid()) + { + self.m_threadMessages[msg.first] = std::move(msg.second); + msg.second.first = ossia::value{}; + } + } + + // Push the actual messages + for(auto& v : self.m_threadMessages) + { + auto val = v.second.first; + if(val.valid()) + { + self.m_protocol->push(*v.first, v.second.first); + } + } + + // Clear both containers (while keeping memory allocated for sent + // messages so that it stays fast) + for(auto& v : self.m_buffer) + if(v.second.first.valid()) + v.second.first = ossia::value{}; + + for(auto& v : self.m_threadMessages) + if(v.second.first.valid()) + v.second.first = ossia::value{}; + } +}; + +template <> +struct rate_limiter_impl : rate_limiter +{ + void operator()() + { + std::vector> vec; + ossia::iterate_all_children(this->self.m_device->get_root_node()); + + // bundle + // send all + // repeat + + // TODO find safe way to handle if a parameter is removed + // TODO instead we should do the value filtering in the parameter ... + // but still have to handle cases that can be optimized, such as midi + { + std::lock_guard lock{self.m_msgMutex}; + std::swap(self.m_buffer, self.m_userMessages); + } + + // Copy newest messages in local map + for(auto& msg : self.m_buffer) + { + if(msg.second.first.valid()) + { + self.m_threadMessages[msg.first] = std::move(msg.second); + msg.second.first = ossia::value{}; + } + } + + // Push the actual messages + for(auto& v : self.m_threadMessages) + { + auto val = v.second.first; + if(val.valid()) + { + self.m_protocol->push(*v.first, v.second.first); + } + } + + // Clear both containers (while keeping memory allocated for sent + // messages so that it stays fast) + for(auto& v : self.m_buffer) + if(v.second.first.valid()) + v.second.first = ossia::value{}; + + for(auto& v : self.m_threadMessages) + if(v.second.first.valid()) + v.second.first = ossia::value{}; + } +}; + +template +struct rate_limiter_concrete : rate_limiter_impl +{ + void operator()() + { + ossia::set_thread_name("ossia ratelim"); + + while(this->self.m_running) { try { - auto prev_time = clock::now(); - if(time_to_sleep > 1ms) - std::this_thread::sleep_for(time_to_sleep); - - // TODO find safe way to handle if a parameter is removed - // TODO instead we should do the value filtering in the parameter ... - // but still have to handle cases that can be optimized, such as midi - { - std::lock_guard lock{self.m_msgMutex}; - std::swap(self.m_buffer, self.m_userMessages); - } - - // Copy newest messages in local map - for(auto& msg : self.m_buffer) - { - if(msg.second.first.valid()) - { - self.m_threadMessages[msg.first] = std::move(msg.second); - msg.second.first = ossia::value{}; - } - } - - // Push the actual messages - for(auto& v : self.m_threadMessages) - { - auto val = v.second.first; - if(val.valid()) - { - self.m_protocol->push(*v.first, v.second.first); - } - } - - // Clear both containers (while keeping memory allocated for sent - // messages so that it stays fast) - for(auto& v : self.m_buffer) - { - if(v.second.first.valid()) - { - v.second.first = ossia::value{}; - } - } - - for(auto& v : self.m_threadMessages) - { - if(v.second.first.valid()) - { - v.second.first = ossia::value{}; - } - } - auto new_time = clock::now(); - auto observed_duration = std::chrono::duration_cast( - new_time - prev_time); - if(observed_duration > duration) - { - if(observed_duration >= 2 * duration) - time_to_sleep = 0ms; - else - time_to_sleep = 2 * duration - observed_duration; - } - else - { - time_to_sleep = duration; - } + const auto prev_time = this->sleep_before(); + rate_limiter_impl::operator()(); + sleep_after(prev_time); } catch(...) { @@ -94,17 +173,17 @@ struct rate_limiter }; rate_limiting_protocol::rate_limiting_protocol( - rate_limiting_protocol::duration d, std::unique_ptr arg) + rate_limiter_configuration d, std::unique_ptr arg) : protocol_base{flags{SupportsMultiplex}} - , m_duration{d} + , m_duration{d.duration} + , m_bundle{d.bundle} + , m_repeat{d.repeat} + , m_send_all{d.send_all} , m_protocol{std::move(arg)} - { m_userMessages.reserve(4096); m_buffer.reserve(4096); m_threadMessages.reserve(4096); - m_lastTime = clock::now(); - m_thread = std::thread{rate_limiter{*this}}; } rate_limiting_protocol::~rate_limiting_protocol() @@ -116,6 +195,7 @@ rate_limiting_protocol::~rate_limiting_protocol() void rate_limiting_protocol::set_duration(rate_limiting_protocol::duration d) { m_duration = d; + // FIXME update thread } bool rate_limiting_protocol::pull(ossia::net::parameter_base& address) @@ -165,7 +245,8 @@ void rate_limiting_protocol::set_device(device_base& dev) { m_device = &dev; m_protocol->set_device(dev); + m_lastTime = clock::now(); + m_thread = std::thread{rate_limiter_concrete{*this, m_duration}}; } - } #endif diff --git a/src/ossia/network/rate_limiting_protocol.hpp b/src/ossia/network/rate_limiting_protocol.hpp index 64dddfa7bc7..28b226c755d 100644 --- a/src/ossia/network/rate_limiting_protocol.hpp +++ b/src/ossia/network/rate_limiting_protocol.hpp @@ -4,22 +4,29 @@ #include #include #include +#include #include #include #include -#include namespace ossia::net { struct rate_limiter; + +template +struct rate_limiter_impl; +template +struct rate_limiter_concrete; + class OSSIA_EXPORT rate_limiting_protocol final : public ossia::net::protocol_base { public: - using clock = std::chrono::high_resolution_clock; + using clock = std::chrono::steady_clock; using duration = clock::duration; - rate_limiting_protocol(duration d, std::unique_ptr arg); + rate_limiting_protocol( + rate_limiter_configuration conf, std::unique_ptr arg); ~rate_limiting_protocol() override; void set_duration(duration d); @@ -46,8 +53,15 @@ class OSSIA_EXPORT rate_limiting_protocol final : public ossia::net::protocol_ba rate_limiting_protocol& operator=(rate_limiting_protocol&&) = delete; friend struct rate_limiter; + template + friend struct rate_limiter_impl; + template + friend struct rate_limiter_concrete; std::atomic m_duration{}; + bool m_bundle{}; + bool m_send_all{}; + bool m_repeat{}; std::unique_ptr m_protocol; ossia::net::device_base* m_device{};