Skip to content

Commit

Permalink
[oscquery] Various fixes from testing with TD oscq server
Browse files Browse the repository at this point in the history
  • Loading branch information
jcelerier committed Apr 10, 2024
1 parent 55466ff commit 16d71e4
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 16 deletions.
1 change: 0 additions & 1 deletion src/ossia/network/oscquery/oscquery_mirror.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ class OSSIA_EXPORT oscquery_mirror_protocol final : public ossia::net::protocol_
std::promise<void> promise;
std::string address{};
};
using promises_map = locked_map<string_map<get_osc_promise>>;

ossia::spsc_queue<get_ws_promise> m_getWSPromises;
ossia::spsc_queue<std::function<void()>> m_functionQueue;
Expand Down
120 changes: 105 additions & 15 deletions src/ossia/protocols/oscquery/oscquery_mirror_asio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,24 @@ struct http_async_answer
}
}
};
struct http_async_value_answer
{
std::weak_ptr<oscquery_shared_async_state> state;
std::string source_address;

template <typename T, typename S>
void operator()(T& req, const S& str)
{
if(auto ptr = state.lock())
{
if(ptr->active)
{
if(ptr->self.on_value_http_message(source_address, str))
req.close();
}
}
}
};

struct http_async_error
{
Expand Down Expand Up @@ -90,6 +108,8 @@ auto wait_for_future(
}
using http_async_request
= ossia::net::http_get_request<http_async_answer, http_async_error>;
using http_async_value_request
= ossia::net::http_get_request<http_async_value_answer, http_async_error>;

struct http_async_client_context
{
Expand All @@ -116,12 +136,21 @@ oscquery_mirror_asio_protocol::oscquery_mirror_asio_protocol(

// for http, host should be only the name, e.g. example.com instead of
// http://example.com:1234

if(port_idx != std::string::npos)
m_httpHost.erase(m_httpHost.begin() + port_idx, m_httpHost.end());

m_protocol_to_use = any_protocol;
if(boost::starts_with(m_httpHost, "http://"))
{
m_httpHost.erase(m_httpHost.begin(), m_httpHost.begin() + 7);
m_protocol_to_use = http;
}
else if(boost::starts_with(m_httpHost, "ws://"))
{
m_httpHost.erase(m_httpHost.begin(), m_httpHost.begin() + 5);
m_protocol_to_use = websockets;
}
}

void oscquery_mirror_asio_protocol::stop()
Expand Down Expand Up @@ -224,8 +253,12 @@ std::future<void> oscquery_mirror_asio_protocol::pull_async(net::parameter_base&
void oscquery_mirror_asio_protocol::request(net::parameter_base& address)
{
auto text = address.get_node().osc_address();
auto answer = http_async_value_answer{m_async_state, text};
text += ossia::oscquery::detail::query_value();
http_send_message(text);

auto hrq = std::make_shared<http_async_value_request>(
std::move(answer), http_async_error{}, m_ctx->context, m_httpHost, text);
hrq->resolve(m_httpHost, m_queryPort);
}

using proto = ossia::oscquery::oscquery_protocol_client<ossia::net::osc_extended_policy>;
Expand Down Expand Up @@ -412,22 +445,25 @@ void oscquery_mirror_asio_protocol::start_http()

void oscquery_mirror_asio_protocol::start_websockets()
{
if(m_protocol_to_use == http)
return;

m_websocketClient = std::make_unique<ossia::net::websocket_client>(
m_ctx->context, [this](
connection_handler hdl, websocketpp::frame::opcode::value op,
std::string& msg) {
switch(op)
{
case websocketpp::frame::opcode::value::TEXT:
this->on_text_ws_message(hdl, msg);
break;
case websocketpp::frame::opcode::value::BINARY:
this->on_binary_ws_message(hdl, msg);
break;
default:
break;
}
});
const connection_handler& hdl,
websocketpp::frame::opcode::value op, std::string& msg) {
switch(op)
{
case websocketpp::frame::opcode::value::TEXT:
this->on_text_ws_message(hdl, msg);
break;
case websocketpp::frame::opcode::value::BINARY:
this->on_binary_ws_message(hdl, msg);
break;
default:
break;
}
});
m_id.identifier = (uintptr_t)m_websocketClient.get();

m_websocketClient->on_close
Expand All @@ -448,6 +484,11 @@ void oscquery_mirror_asio_protocol::start_websockets()
catch(...)
{
// Websocket does not connect, http requests will be used instead

m_websocketClient.reset();
// m_websocketClient->on_open.disconnect(this->on_connection_open);
// m_websocketClient->on_close.disconnect(this->on_connection_closed);
// m_websocketClient->on_fail.disconnect(this->on_connection_failure);
}
}

Expand Down Expand Up @@ -651,4 +692,53 @@ bool oscquery_mirror_asio_protocol::on_text_ws_message(
return true;
}

bool oscquery_mirror_asio_protocol::on_value_http_message(
const std::string& address, const std::string& message)
{
using json_parser = ossia::oscquery::json_parser;
using message_type = ossia::oscquery::message_type;
using host_info = ossia::oscquery::host_info;
try
{
std::shared_ptr<rapidjson::Document> data = json_parser::parse(message);
if(!data->IsObject())
{
if(m_logger.inbound_logger)
m_logger.inbound_logger->warn("Invalid HTTP reply received: {}", message);
return false;
}

auto node = ossia::net::find_node(m_device->get_root_node(), address);

const rapidjson::Value* obj_value = data.get();
if(auto it = obj_value->FindMember("VALUE"); it != obj_value->MemberEnd())
{
obj_value = &it->value;
}

if(node)
{
auto addr = node->get_parameter();
if(addr)
{
json_parser::parse_value(*addr, *obj_value);
m_device->on_message(*addr);
}
else
{
m_device->on_unhandled_message(address, oscquery::detail::ReadValue(*obj_value));
}
}
else
{
m_device->on_unhandled_message(address, oscquery::detail::ReadValue(*obj_value));
}

return true;
}
catch(...)
{
return false;
}
}
}
8 changes: 8 additions & 0 deletions src/ossia/protocols/oscquery/oscquery_mirror_asio.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class OSSIA_EXPORT oscquery_mirror_asio_protocol final : public ossia::net::prot

private:
friend struct http_async_answer;
friend struct http_async_value_answer;
using connection_handler = std::weak_ptr<void>;
void on_ws_disconnected() { m_hasWS = false; }

Expand All @@ -135,6 +136,7 @@ class OSSIA_EXPORT oscquery_mirror_asio_protocol final : public ossia::net::prot
// Input
bool on_text_ws_message(connection_handler hdl, const std::string& message);
bool on_binary_ws_message(connection_handler hdl, const std::string& message);
bool on_value_http_message(const std::string& address, const std::string& message);
void on_osc_message(const oscpack::ReceivedMessage& m);
void process_raw_osc_data(const char* data, std::size_t sz);

Expand Down Expand Up @@ -178,6 +180,12 @@ class OSSIA_EXPORT oscquery_mirror_asio_protocol final : public ossia::net::prot
ossia::net::message_origin_identifier m_id;

bool m_zombie_on_remove{true};
enum
{
any_protocol,
http,
websockets
} m_protocol_to_use{any_protocol};
};
}
}

0 comments on commit 16d71e4

Please sign in to comment.