Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

osc: work on adding more features to rate limiter #858

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/ossia/network/rate_limiter_configuration.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once
#include <ossia/detail/config.hpp>

#include <chrono>

namespace ossia::net
{
struct rate_limiter_configuration
{
using clock = std::chrono::steady_clock;

// What is the rate limit
std::chrono::milliseconds duration{};

// Put things in e.g. OSC bundles
bool bundle{};

// When sending, send every parameters from the device, not just the last ones
bool send_all{};

// Always send the last sent values on every tick
bool repeat{};
};

}
229 changes: 155 additions & 74 deletions src/ossia/network/rate_limiting_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,83 +8,162 @@
#if defined(__cpp_exceptions)
namespace ossia::net
{
// FIXME refactor with coalescing_queue
// FIXME refactor with sleep_accurate (MIDISync.hpp)
// FIXME refactor with exp wait in audio_spin_mutex

struct rate_limiter
{
rate_limiting_protocol& self;
void operator()() const noexcept
std::chrono::steady_clock::duration duration;
std::chrono::steady_clock::duration time_to_sleep{duration};

std::chrono::steady_clock::time_point sleep_before()
{
ossia::set_thread_name("ossia ratelim");
using namespace std::literals;
using clock = rate_limiting_protocol::clock;
const auto duration = self.m_duration.load();
thread_local auto time_to_sleep = duration;
while(self.m_running)
auto prev_time = clock::now();
if(time_to_sleep > 1ms)
std::this_thread::sleep_for(time_to_sleep);
return prev_time;
}

void sleep_after(std::chrono::steady_clock::time_point prev_time)
{
using namespace std::literals;
using clock = rate_limiting_protocol::clock;

auto new_time = clock::now();
auto observed_duration
= std::chrono::duration_cast<std::chrono::milliseconds>(new_time - prev_time);
if(observed_duration > duration)
{
if(observed_duration >= 2 * duration)
time_to_sleep = 0ms;
else
time_to_sleep = 2 * duration - observed_duration;
}
else
{
time_to_sleep = duration;
}
}
};

template <bool Bundle, bool Repeat, bool SendAll>
struct rate_limiter_impl;

template <>
struct rate_limiter_impl<false, false, false> : rate_limiter
{
void operator()()
{
// TODO find safe way to handle if a parameter is removed
// TODO instead we should do the value filtering in the parameter ...
// but still have to handle cases that can be optimized, such as midi
{
std::lock_guard lock{self.m_msgMutex};
std::swap(self.m_buffer, self.m_userMessages);
}

// Copy newest messages in local map
for(auto& msg : self.m_buffer)
{
if(msg.second.first.valid())
{
self.m_threadMessages[msg.first] = std::move(msg.second);
msg.second.first = ossia::value{};
}
}

// Push the actual messages
for(auto& v : self.m_threadMessages)
{
auto val = v.second.first;
if(val.valid())
{
self.m_protocol->push(*v.first, v.second.first);
}
}

// Clear both containers (while keeping memory allocated for sent
// messages so that it stays fast)
for(auto& v : self.m_buffer)
if(v.second.first.valid())
v.second.first = ossia::value{};

for(auto& v : self.m_threadMessages)
if(v.second.first.valid())
v.second.first = ossia::value{};
}
};

template <>
struct rate_limiter_impl<true, true, true> : rate_limiter
{
void operator()()
{
std::vector<std::pair<ossia::net::parameter_base*, ossia::value>> vec;
ossia::iterate_all_children(this->self.m_device->get_root_node());

// bundle
// send all
// repeat

// TODO find safe way to handle if a parameter is removed
// TODO instead we should do the value filtering in the parameter ...
// but still have to handle cases that can be optimized, such as midi
{
std::lock_guard lock{self.m_msgMutex};
std::swap(self.m_buffer, self.m_userMessages);
}

// Copy newest messages in local map
for(auto& msg : self.m_buffer)
{
if(msg.second.first.valid())
{
self.m_threadMessages[msg.first] = std::move(msg.second);
msg.second.first = ossia::value{};
}
}

// Push the actual messages
for(auto& v : self.m_threadMessages)
{
auto val = v.second.first;
if(val.valid())
{
self.m_protocol->push(*v.first, v.second.first);
}
}

// Clear both containers (while keeping memory allocated for sent
// messages so that it stays fast)
for(auto& v : self.m_buffer)
if(v.second.first.valid())
v.second.first = ossia::value{};

for(auto& v : self.m_threadMessages)
if(v.second.first.valid())
v.second.first = ossia::value{};
}
};

template <bool Bundle, bool Repeat, bool SendAll>
struct rate_limiter_concrete : rate_limiter_impl<Bundle, Repeat, SendAll>
{
void operator()()
{
ossia::set_thread_name("ossia ratelim");

while(this->self.m_running)
{
try
{
auto prev_time = clock::now();
if(time_to_sleep > 1ms)
std::this_thread::sleep_for(time_to_sleep);

// TODO find safe way to handle if a parameter is removed
// TODO instead we should do the value filtering in the parameter ...
// but still have to handle cases that can be optimized, such as midi
{
std::lock_guard lock{self.m_msgMutex};
std::swap(self.m_buffer, self.m_userMessages);
}

// Copy newest messages in local map
for(auto& msg : self.m_buffer)
{
if(msg.second.first.valid())
{
self.m_threadMessages[msg.first] = std::move(msg.second);
msg.second.first = ossia::value{};
}
}

// Push the actual messages
for(auto& v : self.m_threadMessages)
{
auto val = v.second.first;
if(val.valid())
{
self.m_protocol->push(*v.first, v.second.first);
}
}

// Clear both containers (while keeping memory allocated for sent
// messages so that it stays fast)
for(auto& v : self.m_buffer)
{
if(v.second.first.valid())
{
v.second.first = ossia::value{};
}
}

for(auto& v : self.m_threadMessages)
{
if(v.second.first.valid())
{
v.second.first = ossia::value{};
}
}
auto new_time = clock::now();
auto observed_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
new_time - prev_time);
if(observed_duration > duration)
{
if(observed_duration >= 2 * duration)
time_to_sleep = 0ms;
else
time_to_sleep = 2 * duration - observed_duration;
}
else
{
time_to_sleep = duration;
}
const auto prev_time = this->sleep_before();
rate_limiter_impl<Bundle, Repeat, SendAll>::operator()();
sleep_after(prev_time);
}
catch(...)
{
Expand All @@ -94,17 +173,17 @@ struct rate_limiter
};

rate_limiting_protocol::rate_limiting_protocol(
rate_limiting_protocol::duration d, std::unique_ptr<protocol_base> arg)
rate_limiter_configuration d, std::unique_ptr<protocol_base> arg)
: protocol_base{flags{SupportsMultiplex}}
, m_duration{d}
, m_duration{d.duration}
, m_bundle{d.bundle}
, m_repeat{d.repeat}
, m_send_all{d.send_all}
, m_protocol{std::move(arg)}

{
m_userMessages.reserve(4096);
m_buffer.reserve(4096);
m_threadMessages.reserve(4096);
m_lastTime = clock::now();
m_thread = std::thread{rate_limiter{*this}};
}

rate_limiting_protocol::~rate_limiting_protocol()
Expand All @@ -116,6 +195,7 @@ rate_limiting_protocol::~rate_limiting_protocol()
void rate_limiting_protocol::set_duration(rate_limiting_protocol::duration d)
{
m_duration = d;
// FIXME update thread
}

bool rate_limiting_protocol::pull(ossia::net::parameter_base& address)
Expand Down Expand Up @@ -165,7 +245,8 @@ void rate_limiting_protocol::set_device(device_base& dev)
{
m_device = &dev;
m_protocol->set_device(dev);
m_lastTime = clock::now();
m_thread = std::thread{rate_limiter_concrete<false, false, false>{*this, m_duration}};
}

}
#endif
20 changes: 17 additions & 3 deletions src/ossia/network/rate_limiting_protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,29 @@
#include <ossia/network/base/message_queue.hpp>
#include <ossia/network/base/parameter_data.hpp>
#include <ossia/network/base/protocol.hpp>
#include <ossia/network/rate_limiter_configuration.hpp>

#include <readerwriterqueue.h>

#include <chrono>
#include <thread>
#include <vector>

namespace ossia::net
{
struct rate_limiter;

template <bool Bundle, bool Repeat, bool SendAll>
struct rate_limiter_impl;
template <bool Bundle, bool Repeat, bool SendAll>
struct rate_limiter_concrete;

class OSSIA_EXPORT rate_limiting_protocol final : public ossia::net::protocol_base
{
public:
using clock = std::chrono::high_resolution_clock;
using clock = std::chrono::steady_clock;
using duration = clock::duration;
rate_limiting_protocol(duration d, std::unique_ptr<protocol_base> arg);
rate_limiting_protocol(
rate_limiter_configuration conf, std::unique_ptr<protocol_base> arg);
~rate_limiting_protocol() override;

void set_duration(duration d);
Expand All @@ -46,8 +53,15 @@ class OSSIA_EXPORT rate_limiting_protocol final : public ossia::net::protocol_ba
rate_limiting_protocol& operator=(rate_limiting_protocol&&) = delete;

friend struct rate_limiter;
template <bool Bundle, bool Repeat, bool SendAll>
friend struct rate_limiter_impl;
template <bool Bundle, bool Repeat, bool SendAll>
friend struct rate_limiter_concrete;

std::atomic<duration> m_duration{};
bool m_bundle{};
bool m_send_all{};
bool m_repeat{};
std::unique_ptr<ossia::net::protocol_base> m_protocol;
ossia::net::device_base* m_device{};

Expand Down
Loading