Skip to content

Commit

Permalink
🧵 Threadless network transports (#119)
Browse files Browse the repository at this point in the history
This makes the network transport layer very dumb, now the media/control connection classes have threads that monitor and drive the streams. This is hopefully simpler and is also a step towards a more event-loop driven model.
  • Loading branch information
danstiner authored Apr 20, 2021
1 parent fb3c586 commit 133c867
Show file tree
Hide file tree
Showing 25 changed files with 1,434 additions and 1,148 deletions.
1 change: 1 addition & 0 deletions meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ sources = files([
'src/Configuration.cpp',
'src/FtlClient.cpp',
'src/FtlControlConnection.cpp',
'src/FtlMediaConnection.cpp',
'src/FtlServer.cpp',
'src/FtlStream.cpp',
'src/JanusFtl.cpp',
Expand Down
19 changes: 12 additions & 7 deletions src/ConnectionCreators/UdpConnectionCreator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ std::unique_ptr<ConnectionTransport> UdpConnectionCreator::CreateConnection(
{
int error = errno;
throw std::runtime_error(fmt::format(
"Couldn't create UDP socket. Error {}: {}", error, Util::ErrnoToString(error)));
"Couldn't create UDP socket. Error {}: {}",
error, Util::ErrnoToString(error)));
}

int bindResult = bind(
Expand All @@ -44,7 +45,8 @@ std::unique_ptr<ConnectionTransport> UdpConnectionCreator::CreateConnection(
{
int error = errno;
throw std::runtime_error(fmt::format(
"Couldn't bind UDP socket. Error {}: {}", error, Util::ErrnoToString(error)));
"Couldn't bind UDP socket. Error {}: {}",
error, Util::ErrnoToString(error)));
}

sockaddr_in target
Expand All @@ -53,9 +55,12 @@ std::unique_ptr<ConnectionTransport> UdpConnectionCreator::CreateConnection(
.sin_port = htons(port),
.sin_addr = targetAddr,
};
return std::make_unique<NetworkSocketConnectionTransport>(
NetworkSocketConnectionKind::Udp,
socketHandle,
target);
auto result = NetworkSocketConnectionTransport::Nonblocking(
NetworkSocketConnectionKind::Udp, socketHandle, target);
if (result.IsError)
{
throw std::runtime_error(result.ErrorMessage);
}
return std::move(result.Value);
}
#pragma endregion ConnectionCreator implementation
#pragma endregion ConnectionCreator implementation
2 changes: 1 addition & 1 deletion src/ConnectionListeners/ConnectionListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ class ConnectionListener
* has been established
*/
virtual void SetOnNewConnection(
std::function<void(ConnectionTransport*)> onNewConnection) = 0;
std::function<void(std::unique_ptr<ConnectionTransport>)> onNewConnection) = 0;
};
13 changes: 10 additions & 3 deletions src/ConnectionListeners/TcpConnectionListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,18 @@ void TcpConnectionListener::Listen(std::promise<void>&& readyPromise)
if (onNewConnection)
{
// Create a ConnectionTransport for this new connection
auto transport = new NetworkSocketConnectionTransport(
auto result = NetworkSocketConnectionTransport::Nonblocking(
NetworkSocketConnectionKind::Tcp,
connectionHandle,
acceptAddress);
onNewConnection(transport);
if (result.IsError)
{
spdlog::error(
"Failed to create transport for accepted socket: {}",
result.ErrorMessage);
break;
}
onNewConnection(std::move(result.Value));
}
else
{
Expand All @@ -130,7 +137,7 @@ void TcpConnectionListener::StopListening()
}

void TcpConnectionListener::SetOnNewConnection(
std::function<void(ConnectionTransport*)> onNewConnection)
std::function<void(std::unique_ptr<ConnectionTransport>)> onNewConnection)
{
this->onNewConnection = onNewConnection;
}
Expand Down
4 changes: 2 additions & 2 deletions src/ConnectionListeners/TcpConnectionListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ class TcpConnectionListener : public ConnectionListener
void Listen(std::promise<void>&& readyPromise = std::promise<void>()) override;
void StopListening() override;
void SetOnNewConnection(
std::function<void(ConnectionTransport*)> onNewConnection) override;
std::function<void(std::unique_ptr<ConnectionTransport>)> onNewConnection) override;

private:
const int listenPort;
const int socketQueueLimit;
int listenSocketHandle = 0;
std::function<void(ConnectionTransport*)> onNewConnection;
std::function<void(std::unique_ptr<ConnectionTransport>)> onNewConnection;
};
32 changes: 12 additions & 20 deletions src/ConnectionTransports/ConnectionTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@

#include "../Utilities/Result.h"

#include <chrono>
#include <functional>
#include <netinet/in.h>
#include <optional>
#include <span>
#include <vector>

/**
* @brief
* A generic network transport abstraction, allowing bytes to be read/written
* A generic thread-safe network transport abstraction, allowing bytes to be read/written
* to a connection via a common interface.
*
*/
Expand All @@ -37,32 +39,22 @@ class ConnectionTransport

/**
* @brief
* Starts the connection transport in a new thread.
* This function should block until the transport is ready to receive bytes.
* Shuts down the connection.
* This function should block until the underlying transport/socket has been closed.
*/
virtual Result<void> StartAsync() = 0;
virtual void Stop() = 0;

/**
* @brief
* Shuts down the connection.
* This function should block until the underlying transport/socket has been closed, unless
* noBlock has been set.
Read a set of bytes from the transport into the given buffer.
Will timeout if there is nothing to read and return zero bytes.
*/
virtual void Stop(bool noBlock = false) = 0;
virtual Result<ssize_t> Read(
std::vector<std::byte>& buffer,
std::chrono::milliseconds timeout) = 0;

/**
* @brief Write a set of bytes to the transport
*/
virtual void Write(const std::vector<std::byte>& bytes) = 0;

/**
* @brief Sets the callback that will fire when this connection has been closed.
*/
virtual void SetOnConnectionClosed(std::function<void(void)> onConnectionClosed) = 0;

/**
* @brief Sets the callback that will fire when this connection has received incoming data.
*/
virtual void SetOnBytesReceived(
std::function<void(const std::vector<std::byte>&)> onBytesReceived) = 0;
virtual Result<void> Write(const std::span<const std::byte>& bytes) = 0;
};
Loading

0 comments on commit 133c867

Please sign in to comment.