Skip to content

Commit

Permalink
Deprecate pending_msg_que in favor of libquic internal stream buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
dr7ana committed Jan 7, 2024
1 parent e331f0b commit a5b7a7e
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 162 deletions.
64 changes: 32 additions & 32 deletions llarp/exit/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,38 +246,38 @@ namespace llarp::exit
bool
BaseSession::FlushUpstream()
{
auto now = router->now();
auto path = PickEstablishedPath(llarp::path::ePathRoleExit);
if (path)
{
// for (auto& [i, queue] : m_Upstream)
// {
// while (queue.size())
// {
// auto& msg = queue.front();
// msg.sequence_number = path->NextSeqNo();
// path->SendRoutingMessage(msg, router);
// queue.pop_front();
// }
// }
}
else
{
// if (m_Upstream.size())
// llarp::LogWarn("no path for exit session");
// // discard upstream
// for (auto& [i, queue] : m_Upstream)
// queue.clear();
// m_Upstream.clear();

if (numHops == 1)
{
if (const auto maybe = router->node_db()->get_rc(exit_router); maybe.has_value())
router->connect_to(*maybe);
}
else if (UrgentBuild(now))
BuildOneAlignedTo(exit_router);
}
// auto now = router->now();
// auto path = PickEstablishedPath(llarp::path::ePathRoleExit);
// if (path)
// {
// for (auto& [i, queue] : m_Upstream)
// {
// while (queue.size())
// {
// auto& msg = queue.front();
// msg.sequence_number = path->NextSeqNo();
// path->SendRoutingMessage(msg, router);
// queue.pop_front();
// }
// }
// }
// else
// {
// if (m_Upstream.size())
// llarp::LogWarn("no path for exit session");
// // discard upstream
// for (auto& [i, queue] : m_Upstream)
// queue.clear();
// m_Upstream.clear();

// if (numHops == 1)
// {
// if (const auto maybe = router->node_db()->get_rc(exit_router); maybe.has_value())
// router->connect_to(*maybe);
// }
// else if (UrgentBuild(now))
// BuildOneAlignedTo(exit_router);
// }
return true;
}

Expand Down
147 changes: 50 additions & 97 deletions llarp/link/link_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,36 +352,6 @@ namespace llarp
if (auto it = ep.service_conns.find(rid); it != ep.service_conns.end())
{
log::critical(logcat, "Fetched configured outbound connection to relay RID:{}", rid);

auto& conn = it->second->conn;
auto& str = it->second->control_stream;

if (auto itr = pending_conn_msg_queue.find(rid); itr != pending_conn_msg_queue.end())
{
log::critical(logcat, "Clearing pending queue for RID:{}", rid);
auto& que = itr->second;

while (not que.empty())
{
auto& msg = que.front();

if (msg.is_control)
{
log::critical(
logcat, "Dispatching {} request (stream ID: {})!", *msg.endpoint, str->stream_id());
str->command(std::move(*msg.endpoint), std::move(msg.body), std::move(msg.func));
}
else
{
log::critical(logcat, "DIspatching data message: {}", msg.body);
conn->send_datagram(std::move(msg.body));
}

que.pop_front();
}
}

log::warning(logcat, "Pending queue empty for RID:{}", rid);
}
else
{
Expand Down Expand Up @@ -427,10 +397,6 @@ namespace llarp
[this, scid = ci.scid(), rid = RouterID{ci.remote_key()}, error_code = ec]() {
log::critical(quic_cat, "Purging quic connection CID:{} (ec:{})", scid, error_code);

// in case this didn't clear earlier, do it now
if (auto p_itr = pending_conn_msg_queue.find(rid); p_itr != pending_conn_msg_queue.end())
pending_conn_msg_queue.erase(p_itr);

if (auto s_itr = ep.service_conns.find(rid); s_itr != ep.service_conns.end())
{
log::critical(quic_cat, "Quic connection to relay RID:{} purged successfully", rid);
Expand All @@ -453,7 +419,12 @@ namespace llarp
std::string body,
std::function<void(oxen::quic::message m)> func)
{
assert(func); // makes no sense to send control message and ignore response
// DISCUSS: revisit if this assert makes sense. If so, there's no need to if (func) the
// next logic block
assert(func); // makes no sense to send control message and ignore response (maybe gossip?)

if (is_stopping)
return false;

if (func)
{
Expand All @@ -463,19 +434,6 @@ namespace llarp
};
}

return send_control_message_impl(remote, std::move(endpoint), std::move(body), std::move(func));
}

bool
LinkManager::send_control_message_impl(
const RouterID& remote,
std::string endpoint,
std::string body,
std::function<void(oxen::quic::message m)> func)
{
if (is_stopping)
return false;

if (auto conn = ep.get_conn(remote); conn)
{
conn->control_stream->command(std::move(endpoint), std::move(body), std::move(func));
Expand All @@ -487,21 +445,7 @@ namespace llarp
endpoint = std::move(endpoint),
body = std::move(body),
f = std::move(func)]() {
auto pending = PendingMessage(std::move(body), std::move(endpoint), std::move(f));

if (auto it = pending_conn_msg_queue.find(remote); it != pending_conn_msg_queue.end())
{
it->second.push_back(std::move(pending));
log::critical(
logcat, "Connection to RID:{} is pending; message appended to send queue!", remote);
}
else
{
log::critical(logcat, "Connection to RID:{} is pending; creating send queue!", remote);
auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending));
connect_to(remote);
}
connect_and_send(remote, std::move(endpoint), std::move(body), std::move(f));
});

return false;
Expand All @@ -520,12 +464,7 @@ namespace llarp
}

_router.loop()->call([this, body = std::move(body), remote]() {
auto pending = PendingMessage(std::move(body));

auto [itr, b] = pending_conn_msg_queue.emplace(remote, MessageQueue());
itr->second.push_back(std::move(pending));

connect_to(remote);
connect_and_send(remote, std::nullopt, std::move(body));
});

return false;
Expand All @@ -550,12 +489,35 @@ namespace llarp
}

void
LinkManager::connect_to(const RouterID& rid, conn_open_hook hook)
LinkManager::connect_and_send(
const RouterID& router,
std::optional<std::string> endpoint,
std::string body,
std::function<void(oxen::quic::message m)> func)
{
if (auto rc = node_db->get_rc(rid))
connect_to(*rc, std::move(hook));
// by the time we have called this, we have already checked if we have a connection to this RID
// in ::send_control_message_impl, at which point we will dispatch on that stream
if (auto rc = node_db->get_rc(router))
{
const auto& remote_addr = rc->addr();

if (auto rv = ep.establish_and_send(
oxen::quic::RemoteAddress{router.ToView(), remote_addr},
*rc,
std::move(endpoint),
std::move(body),
std::move(func));
rv)
{
log::info(quic_cat, "Begun establishing connection to {}", remote_addr);
return;
}

log::warning(quic_cat, "Failed to begin establishing connection to {}", remote_addr);
}
else
log::warning(quic_cat, "Could not find RouterContact for connection to rid:{}", rid);
log::error(
quic_cat, "Error: Could not find RC for connection to rid:{}, message not sent!", router);
}

void
Expand All @@ -573,8 +535,6 @@ namespace llarp

const auto& remote_addr = rc.addr();

// TODO: confirm remote end is using the expected pubkey (RouterID).
// TODO: ALPN for "client" vs "relay" (could just be set on endpoint creation)
if (auto rv = ep.establish_connection(
oxen::quic::RemoteAddress{rid.ToView(), remote_addr},
rc,
Expand Down Expand Up @@ -767,35 +727,28 @@ namespace llarp
log::critical(link_cat, "Received known or old RC, not storing or forwarding.");
}

// TODO: can probably use ::send_control_message instead. Need to discuss the potential difference
// in calling Endpoint::get_service_conn vs Endpoint::get_conn
void
LinkManager::fetch_bootstrap_rcs(
const RemoteRC& source, std::string payload, std::function<void(oxen::quic::message m)> func)
{
_router.loop()->call([this, source, payload, f = std::move(func)]() mutable {
if (f)
{
f = [this, func = std::move(f)](oxen::quic::message m) mutable {
_router.loop()->call(
[f = std::move(func), msg = std::move(m)]() mutable { f(std::move(msg)); });
};
}

const auto& rid = source.router_id();

if (auto conn = ep.get_service_conn(rid); conn)
{
conn->control_stream->command("bfetch_rcs"s, std::move(payload), std::move(f));
log::critical(logcat, "Dispatched bootstrap fetch request!");
return;
}
func = [this, f = std::move(func)](oxen::quic::message m) mutable {
_router.loop()->call(
[func = std::move(f), msg = std::move(m)]() mutable { func(std::move(msg)); });
};

log::critical(logcat, "Queuing bootstrap fetch request to {}", rid);
auto pending = PendingMessage(std::move(payload), "bfetch_rcs"s, std::move(f));
const auto& rid = source.router_id();

auto [itr, b] = pending_conn_msg_queue.emplace(rid, MessageQueue());
itr->second.push_back(std::move(pending));
if (auto conn = ep.get_service_conn(rid); conn)
{
conn->control_stream->command("bfetch_rcs"s, std::move(payload), std::move(func));
log::critical(logcat, "Dispatched bootstrap fetch request!");
return;
}

connect_to(source);
_router.loop()->call([this, source, payload, f = std::move(func), rid = rid]() mutable {
connect_and_send(rid, "bfetch_rcs"s, std::move(payload), std::move(f));
});
}

Expand Down
Loading

0 comments on commit a5b7a7e

Please sign in to comment.