Skip to content

Commit

Permalink
minor bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
OcelotEmpire committed Jan 21, 2024
1 parent 68982f3 commit 572ff6d
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 34 deletions.
20 changes: 16 additions & 4 deletions src/RoveComm/RoveCommEthernetTcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <unistd.h>
#include <vector>

void RoveCommEthernetTcp::Init()
bool RoveCommEthernetTcp::Init()
{
struct addrinfo hints, *result;
std::memset(&hints, 0, sizeof(hints)); // don't use {0} because it does not set padding bytes
Expand All @@ -38,7 +38,7 @@ void RoveCommEthernetTcp::Init()
if (int status = getaddrinfo(NULL, std::to_string(m_unPort).c_str(), &hints, &result) != 0)
{
LOG_ERROR(logging::g_qSharedLogger, "Failed to find IP! Error: {}", gai_strerror(status));
return;
return false;
}

addrinfo* p = result;
Expand Down Expand Up @@ -69,7 +69,7 @@ void RoveCommEthernetTcp::Init()
if (p == NULL)
{
LOG_ERROR(logging::g_qSharedLogger, "Failed to open TCP socket!");
return;
return false;
}

// not sure what nOptVal actually does?
Expand All @@ -91,7 +91,8 @@ void RoveCommEthernetTcp::Init()
FD_ZERO(&m_sAcceptSet);
FD_SET(m_nListeningSocket, &m_sAcceptSet);

LOG_INFO(logging::g_qSharedLogger, "Opened TCP socket on port {}", m_unPort);
LOG_INFO(logging::g_qSharedLogger, "Opened TCP server on port {}", m_unPort);
return true;
}

void RoveCommEthernetTcp::Shutdown()
Expand Down Expand Up @@ -286,6 +287,17 @@ bool RoveCommEthernetTcp::Connect(const RoveCommAddress& address)
return true;
}

void RoveCommEthernetTcp::Disconnect(const RoveCommAddress& address)
{
if (m_mOpenSockets.contains(address))
{
RoveCommSocket nSocket = m_mOpenSockets.at(address);
shutdown(nSocket, 1);
_unregister_socket(address);
LOG_INFO(logging::g_qSharedLogger, "Terminated connection with: {}.", address.ToString());
}
}

void RoveCommEthernetTcp::AcceptIncomingConnections()
{
sockaddr_in sIncomingAddress;
Expand Down
17 changes: 15 additions & 2 deletions src/RoveComm/RoveCommEthernetTcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class RoveCommEthernetTcp : public RoveCommServer
******************************************************************************/
RoveCommEthernetTcp(RoveCommPort unPort) : RoveCommServer(unPort){};

void Init() override;
bool Init() override;
void Shutdown() override;

int Write(const RoveCommPacket& packet) override;
Expand All @@ -62,6 +62,17 @@ class RoveCommEthernetTcp : public RoveCommServer
******************************************************************************/
bool Connect(const RoveCommAddress& address);

/******************************************************************************
* @brief Close a TCP connection with another device (acting as client)
*
* @param address - The address to disconnect from. If no prior connection exists,
* this function will do nothing.
*
* @author OcelotEmpire ([email protected])
* @date 2024-01-20
******************************************************************************/
void Disconnect(const RoveCommAddress& address);

/******************************************************************************
* @brief Check for other devices trying to connect to this device (acting as server)
* This will be private in a future iteration ;)
Expand All @@ -76,6 +87,8 @@ class RoveCommEthernetTcp : public RoveCommServer
void _unregister_socket(const RoveCommAddress& sAddress);

private:
void OnRoveCommUpdate() override { AcceptIncomingConnections(); }

// Socket for accepting connections from other devices
RoveCommSocket m_nListeningSocket;
// All open connections (outgoing and incoming)
Expand All @@ -96,4 +109,4 @@ class RoveCommEthernetTcp : public RoveCommServer
fd_set m_sAcceptSet;
};

#endif // ROVECOMM_ETHERNET_TCP_H
#endif // ROVECOMM_ETHERNET_TCP_H
22 changes: 14 additions & 8 deletions src/RoveComm/RoveCommEthernetUdp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <sys/types.h>
#include <unistd.h>

void RoveCommEthernetUdp::Init()
bool RoveCommEthernetUdp::Init()
{
struct addrinfo hints, *result;
std::memset(&hints, 0, sizeof(hints)); // don't use {0} because it does not set padding bytes
Expand All @@ -38,7 +38,7 @@ void RoveCommEthernetUdp::Init()
if (int status = getaddrinfo(NULL, std::to_string(m_unPort).c_str(), &hints, &result) != 0)
{
LOG_ERROR(logging::g_qSharedLogger, "Failed to find IP! Error: {}", gai_strerror(status));
return;
return false;
}

addrinfo* p = result;
Expand All @@ -60,7 +60,7 @@ void RoveCommEthernetUdp::Init()
if (p == NULL)
{
LOG_ERROR(logging::g_qSharedLogger, "Failed to open UDP socket!");
return;
return false;
}

// not sure what nOptVal actually does?
Expand All @@ -80,7 +80,8 @@ void RoveCommEthernetUdp::Init()
FD_ZERO(&m_sReadSet);
FD_SET(m_nSocket, &m_sReadSet);

LOG_INFO(logging::g_qSharedLogger, "Opened UDP socket on port {}", m_unPort);
LOG_INFO(logging::g_qSharedLogger, "Opened UDP server on port {}", m_unPort);
return true;
}

void RoveCommEthernetUdp::Shutdown()
Expand Down Expand Up @@ -197,7 +198,12 @@ std::vector<RoveCommPacket> RoveCommEthernetUdp::Read()
return packets;
}

// void RoveCommEthernetUdp::Subscribe(const RoveCommAddress& address)
// {
// SendTo(RoveCommPacket(rovecomm::System::SUBSCRIBE_DATA_ID, rovecomm::DataTypes), address);
// }
void RoveCommEthernetUdp::Subscribe(const RoveCommAddress& address)
{
SendTo(RoveCommPacket(rovecomm::System::SUBSCRIBE_DATA_ID), address);
}

void RoveCommEthernetUdp::Unsubscribe(const RoveCommAddress& address)
{
SendTo(RoveCommPacket(rovecomm::System::SUBSCRIBE_DATA_ID), address);
}
7 changes: 3 additions & 4 deletions src/RoveComm/RoveCommEthernetUdp.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,15 @@ class RoveCommEthernetUdp : public RoveCommServer
******************************************************************************/
RoveCommEthernetUdp(RoveCommPort unPort) : RoveCommServer(unPort){};

void Init() override;
bool Init() override;
void Shutdown() override;

int Write(const RoveCommPacket& packet) override;
int SendTo(const RoveCommPacket& packet, RoveCommAddress address) override;
std::vector<RoveCommPacket> Read() override;

// TODO: subscribe/unsubscribe functions
// void Subscribe(const RoveCommAddress& address);
// void Unsubscribe(const RoveCommAddress& address);
void Subscribe(const RoveCommAddress& address);
void Unsubscribe(const RoveCommAddress& address);

private:
RoveCommSocket m_nSocket;
Expand Down
2 changes: 0 additions & 2 deletions src/RoveComm/RoveCommPacket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ size_t rovecomm::DataTypeSize(RoveCommDataType ucType)
}
}

// RoveCommPacket::RoveCommPacket(rovecomm::ManifestEntry sEntry, void* pData): RoveCommPacket({rovecomm::ROVECOMM_VERSION, sEntry

void RoveCommPacket::WriteHeader(char* pDest, const RoveCommPacketHeader& header)
{
pDest[0] = header.ucVersionId;
Expand Down
20 changes: 18 additions & 2 deletions src/RoveComm/RoveCommPacket.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <cstring>
#include <memory>
#include <ostream>
#include <stdarg.h>
#include <stdlib.h>
#include <string>

Expand Down Expand Up @@ -93,13 +94,28 @@ class RoveCommPacket
{
public:
RoveCommPacket() : RoveCommPacket(rovecomm::System::NO_DATA_DATA_ID, 0, rovecomm::DataTypes::INT8_T, nullptr){};

RoveCommPacket(RoveCommDataId usDataId, RoveCommDataCount usDataCount, RoveCommDataType ucDataType, std::unique_ptr<char>&& pData) :
RoveCommPacket({rovecomm::ROVECOMM_VERSION, usDataId, usDataCount, ucDataType}, std::move(pData)){};

RoveCommPacket(RoveCommPacketHeader sHeader, std::unique_ptr<char>&& pData) : m_sHeader(sHeader), m_pData(std::move(pData)) {}
RoveCommPacket(RoveCommPacketHeader sHeader, std::unique_ptr<char>&& pData) : m_sHeader(sHeader), m_pData(std::move(pData)){};

// convenience constructors:

RoveCommPacket(RoveCommDataId usDataId) : RoveCommPacket(usDataId, 0, rovecomm::DataTypes::UINT8_T, std::unique_ptr<char>{}){};

// RoveCommPacket(rovecomm::ManifestEntry sEntry, void* pData);
// example usage: RoveComm.SendTo(address, RoveCommPacket{rovecomm::AUTONOMY::REACHEDMARKER, 1})
// template<typename T>
// RoveCommPacket(rovecomm::ManifestEntry sEntry, T data...) :
// RoveCommPacket(sEntry.DATA_ID, sEntry.DATA_COUNT, sEntry.DATA_TYPE, std::make_unique<char>(sEntry.DATA_COUNT * rovecomm::DataTypeSize(sEntry.DATA_TYPE)))
// {
// va_list args;
// va_start(args, data);
// char* pRaw = m_pData.get();
// for (int i = 0; i<)
// };


inline RoveCommVersionId GetVersionId() const { return m_sHeader.ucVersionId; }

inline RoveCommDataId GetDataId() const { return m_sHeader.usDataId; }
Expand Down
38 changes: 30 additions & 8 deletions src/RoveComm/RoveCommServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
#include "RoveCommEthernetTcp.h"
#include "RoveCommEthernetUdp.h"

void RoveCommServerManager::Init()
bool RoveCommServerManager::Init()
{
LOG_INFO(logging::g_qSharedLogger, "Initializing RoveComm...");
OpenServerOnPort(rovecomm::General::ETHERNET_TCP_PORT, TCP);
OpenServerOnPort(rovecomm::General::ETHERNET_UDP_PORT, UDP);

m_bStopThread = false;
if (!(OpenServerOnPort(rovecomm::General::ETHERNET_TCP_PORT, TCP) && // init TCP
OpenServerOnPort(rovecomm::General::ETHERNET_UDP_PORT, UDP))) // init UDP
return false;

m_thNetworkThread = std::thread(
[&]()
{
Expand All @@ -45,34 +46,52 @@ void RoveCommServerManager::Shutdown()

void RoveCommServerManager::_loop_func()
{
reinterpret_cast<RoveCommEthernetTcp*>(m_mServers[TCP])->AcceptIncomingConnections();
{
const std::lock_guard lock(m_muQueueMutex);
for (auto& [protocol, server] : m_mServers)
{
server->OnRoveCommUpdate();
}
}
_read_all_to_queue();
}

void RoveCommServerManager::OpenServerOnPort(RoveCommPort port, RoveCommProtocol protocol)
bool RoveCommServerManager::OpenServerOnPort(RoveCommPort port, RoveCommProtocol protocol)
{
const std::lock_guard lock(m_muQueueMutex);
if (m_mServers.contains(protocol))
{
LOG_ERROR(logging::g_qSharedLogger, "Server already open with that protocol.");
return false;
}
switch (protocol)
{
case TCP:
{
(m_mServers[protocol] = new RoveCommEthernetTcp(port))->Init();
RoveCommEthernetTcp* server = new RoveCommEthernetTcp(port);
if (server->Init())
{
m_mServers[protocol] = server;
return true;
}
break;
}
case UDP:
{
(m_mServers[protocol] = new RoveCommEthernetUdp(port))->Init();
RoveCommEthernetUdp* server = new RoveCommEthernetUdp(port);
if (server->Init())
{
m_mServers[protocol] = server;
return true;
}
break;
}
default:
{
LOG_ERROR(logging::g_qSharedLogger, "Tried to open server with an unregistered protocol.");
}
}
return false;
}

int RoveCommServerManager::Write(RoveCommPacket& packet, RoveCommProtocol protocol)
Expand Down Expand Up @@ -105,6 +124,7 @@ int RoveCommServerManager::SendTo(RoveCommPacket& packet, RoveCommAddress addres

void RoveCommServerManager::_read_all_to_queue()
{
const std::lock_guard lock(m_muQueueMutex);
for (auto& [serverProtocol, server] : m_mServers)
{
for (auto& packet : server->Read())
Expand Down Expand Up @@ -139,12 +159,14 @@ std::optional<const RoveCommPacket> RoveCommServerManager::Next()

void RoveCommServerManager::SetCallback(RoveCommDataId unId, std::function<void(RoveCommPacket)> fCallback, bool bRemoveFromQueue)
{
const std::lock_guard lock(m_muQueueMutex);
const std::lock_guard lock(m_muQueueMutex);
m_mCallbacks[unId] = {fCallback, bRemoveFromQueue};
}

void RoveCommServerManager::ClearCallback(RoveCommDataId unId)
{
const std::lock_guard lock(m_muQueueMutex);
const std::lock_guard lock(m_muQueueMutex);
m_mCallbacks.erase(unId);
}
21 changes: 17 additions & 4 deletions src/RoveComm/RoveCommServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ class RoveCommServer
/******************************************************************************
* @brief Initialize listening socket and start thread
*
* @return bool - Whether the server initialized successfully
*
* @author OcelotEmpire ([email protected])
* @date 2023-11-29
******************************************************************************/
virtual void Init() = 0;
virtual bool Init() = 0;
/******************************************************************************
* @brief Close all open sockets and shut down thread
*
Expand Down Expand Up @@ -103,6 +105,17 @@ class RoveCommServer

inline RoveCommPort GetPort() const { return m_unPort; }

friend class RoveCommServerManager;

protected:
/******************************************************************************
* @brief Optional method that RoveCommServerManager calls before each Read()
*
* @author OcelotEmpire ([email protected])
* @date 2024-01-20
******************************************************************************/
virtual void OnRoveCommUpdate() {}

protected:
const RoveCommPort m_unPort;
};
Expand Down Expand Up @@ -133,8 +146,8 @@ class RoveCommServerManager
RoveCommServerManager() {}

public:
void Init();
void OpenServerOnPort(RoveCommPort port, RoveCommProtocol protocol = UDP);
bool Init();
bool OpenServerOnPort(RoveCommPort port, RoveCommProtocol protocol = UDP);
void Shutdown();
int Write(RoveCommPacket& packet, RoveCommProtocol protocol = UDP);
int SendTo(RoveCommPacket& packet, RoveCommAddress address, RoveCommProtocol protocol = UDP);
Expand Down Expand Up @@ -192,7 +205,7 @@ class RoveCommServerManager
std::map<RoveCommDataId, RoveCommCallback> m_mCallbacks;
std::deque<RoveCommPacket> m_dqPacketQueue;

bool m_bStopThread;
bool m_bStopThread = false;
std::thread m_thNetworkThread;
std::mutex m_muQueueMutex;
};
Expand Down

0 comments on commit 572ff6d

Please sign in to comment.