Skip to content

Commit

Permalink
removed annoying type aliases and removed Fetch() function
Browse files Browse the repository at this point in the history
  • Loading branch information
OcelotEmpire committed Feb 3, 2024
1 parent a330c8c commit 61fba82
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 93 deletions.
19 changes: 14 additions & 5 deletions src/RoveComm/RoveCommConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
* @copyright Copyright Mars Rover Design Team 2023 - All Rights Reserved
******************************************************************************/

#ifndef CONSTS_H
#define CONSTS_H
#ifndef ROVECOMM_CONSTS_H
#define ROVECOMM_CONSTS_H

namespace rovecomm
{
Expand All @@ -20,10 +20,19 @@ namespace rovecomm
const int ROVECOMM_VERSION = 3;
const int ROVECOMM_ETHERNET_TCP_MAX_CONNECTIONS = 5;

/******************************************************************************
* @brief RoveComm System Information
*
* @author Missouri S&T - Mars Rover Design Team
* @date 2023-10-21
******************************************************************************/
namespace System
{
// pass to RoveCommServer::Fetch() signifying any data id. This is not a valid data id.
const int ANY = 0;
// pass this value to functions listening for data id's to signal to listen for all data id's
// this variable is in the rovecomm::System namespace to be with other values like SUBSCRIBE_DATA_ID
// this variable is not in RoveCommManifest.h because this constant is not present in the manifests
// of other RoveComm implementations
const int ANY_DATA_ID = 0;
} // namespace System
} // namespace rovecomm
#endif // CONSTS_H
#endif // ROVECOMM_CONSTS_H
16 changes: 8 additions & 8 deletions src/RoveComm/RoveCommEthernetTcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ bool RoveCommEthernetTcp::Init()
int nStatus = getaddrinfo(NULL, std::to_string(m_unPort).c_str(), &hints, &result);
if (nStatus != 0)
{
LOG_ERROR(logging::g_qSharedLogger, "Failed to find IP! Error: {}", gai_strerror(status));
LOG_ERROR(logging::g_qSharedLogger, "Failed to find IP! Error: {}", gai_strerror(nStatus));
return false;
}

Expand Down Expand Up @@ -255,7 +255,7 @@ bool RoveCommEthernetTcp::Connect(const RoveCommAddress& address)
}
struct addrinfo hints, *result;
std::memset(&hints, 0, sizeof(hints));
RoveCommSocket nTcpSocketFd;
int nTcpSocketFd;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
int status = getaddrinfo(address.GetIp().ToString().c_str(), std::to_string(m_unPort).c_str(), &hints, &result);
Expand Down Expand Up @@ -293,7 +293,7 @@ void RoveCommEthernetTcp::Disconnect(const RoveCommAddress& address)
{
if (m_mOpenSockets.contains(address))
{
RoveCommSocket nSocket = m_mOpenSockets.at(address);
int nSocket = m_mOpenSockets.at(address);
shutdown(nSocket, 1);
_unregister_socket(address);
LOG_INFO(logging::g_qSharedLogger, "Terminated connection with: {}.", address.ToString());
Expand All @@ -314,7 +314,7 @@ void RoveCommEthernetTcp::AcceptIncomingConnections()
if (!FD_ISSET(m_nListeningSocket, &sAcceptSetCopy))
return;
// accept a connection request from another device, if one exists
RoveCommSocket nIncomingConnection = accept(m_nListeningSocket, (struct sockaddr*) &sIncomingAddress, &sAddressSize);
int nIncomingConnection = accept(m_nListeningSocket, (struct sockaddr*) &sIncomingAddress, &sAddressSize);
if (nIncomingConnection == -1)
{
LOG_ERROR(logging::g_qSharedLogger, "Failed to accept connection!");
Expand All @@ -330,8 +330,8 @@ void RoveCommEthernetTcp::AcceptIncomingConnections()
// The following code is IPv4-specific. If you are a future developer switching to IPv6, use sockaddr_storage instead of sockaddr_in

// Read back the address for storage in m_mIncomingSockets
RoveCommPort unIncomingPort = ntohs(sIncomingAddress.sin_port); // convert to host byte order
char* nReadIp = reinterpret_cast<char*>(&sIncomingAddress.sin_addr.s_addr); // network byte order (1.2.3.4)
uint16_t unIncomingPort = ntohs(sIncomingAddress.sin_port); // convert to host byte order
char* nReadIp = reinterpret_cast<char*>(&sIncomingAddress.sin_addr.s_addr); // network byte order (1.2.3.4)
RoveCommIp sIncomingIp{nReadIp[0], nReadIp[1], nReadIp[2], nReadIp[3]};
RoveCommAddress newRoveCommAddress(sIncomingIp, unIncomingPort);

Expand All @@ -345,7 +345,7 @@ void RoveCommEthernetTcp::AcceptIncomingConnections()
LOG_INFO(logging::g_qSharedLogger, "Successfully accepted connection from: {}.", newRoveCommAddress.ToString());
}

void RoveCommEthernetTcp::_register_socket(const RoveCommAddress& sAddress, RoveCommSocket nSocket, bool bIsIncoming)
void RoveCommEthernetTcp::_register_socket(const RoveCommAddress& sAddress, int nSocket, bool bIsIncoming)
{
m_mOpenSockets[sAddress] = nSocket;
if (bIsIncoming)
Expand All @@ -358,7 +358,7 @@ void RoveCommEthernetTcp::_register_socket(const RoveCommAddress& sAddress, Rove

void RoveCommEthernetTcp::_unregister_socket(const RoveCommAddress& sAddress)
{
RoveCommSocket nSocket = m_mOpenSockets.at(sAddress);
int nSocket = m_mOpenSockets.at(sAddress);
m_mOpenSockets.erase(sAddress);
m_mIncomingSockets.erase(sAddress); // if it exists
m_mReadBuffers.erase(nSocket);
Expand Down
12 changes: 6 additions & 6 deletions src/RoveComm/RoveCommEthernetTcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class RoveCommEthernetTcp : public RoveCommServer
* @author OcelotEmpire ([email protected])
* @date 2023-12-21
******************************************************************************/
RoveCommEthernetTcp(RoveCommPort unPort) : RoveCommServer(unPort){};
RoveCommEthernetTcp(uint16_t unPort) : RoveCommServer(unPort){};

bool Init() override;
void Shutdown() override;
Expand Down Expand Up @@ -83,20 +83,20 @@ class RoveCommEthernetTcp : public RoveCommServer
void AcceptIncomingConnections();

private:
void _register_socket(const RoveCommAddress& sAddress, RoveCommSocket nSocket, bool bIsIncoming);
void _register_socket(const RoveCommAddress& sAddress, int nSocket, bool bIsIncoming);
void _unregister_socket(const RoveCommAddress& sAddress);

private:
void OnRoveCommUpdate() override { AcceptIncomingConnections(); }

// Socket for accepting connections from other devices
RoveCommSocket m_nListeningSocket;
int m_nListeningSocket;
// All open connections (outgoing and incoming)
std::map<RoveCommAddress, RoveCommSocket> m_mOpenSockets;
std::map<RoveCommAddress, int> m_mOpenSockets;
// The sockets that Write() will send() to
std::map<RoveCommAddress, RoveCommSocket> m_mIncomingSockets;
std::map<RoveCommAddress, int> m_mIncomingSockets;
// Buffers to persist incomplete recv()'s
std::map<RoveCommSocket, std::vector<char>> m_mReadBuffers;
std::map<int, std::vector<char>> m_mReadBuffers;

// fd_set's contain all sockets for interfacing with the c library

Expand Down
2 changes: 1 addition & 1 deletion src/RoveComm/RoveCommEthernetUdp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ int RoveCommEthernetUdp::SendTo(const RoveCommPacket& packet, RoveCommAddress ad
int nStatus = getaddrinfo(address.GetIp().ToString().c_str(), std::to_string(m_unPort).c_str(), &hints, &result);
if (nStatus != 0)
{
LOG_ERROR(logging::g_qSharedLogger, "Failed to find IP! Error: {}", gai_strerror(status));
LOG_ERROR(logging::g_qSharedLogger, "Failed to find IP! Error: {}", gai_strerror(nStatus));
return 0;
}
// I'll just be lazy so I won't do the for loop shenanigans and use the first value
Expand Down
4 changes: 2 additions & 2 deletions src/RoveComm/RoveCommEthernetUdp.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class RoveCommEthernetUdp : public RoveCommServer
* @author OcelotEmpire ([email protected])
* @date 2023-12-21
******************************************************************************/
RoveCommEthernetUdp(RoveCommPort unPort) : RoveCommServer(unPort){};
RoveCommEthernetUdp(uint16_t unPort) : RoveCommServer(unPort){};

bool Init() override;
void Shutdown() override;
Expand All @@ -53,7 +53,7 @@ class RoveCommEthernetUdp : public RoveCommServer
void Unsubscribe(const RoveCommAddress& address);

private:
RoveCommSocket m_nSocket;
int m_nSocket;
fd_set m_sReadSet;
// these aren't meant to be read outside the class, so I'm being lazy and using the native struct type
std::list<sockaddr_in> m_lSubscribers;
Expand Down
14 changes: 7 additions & 7 deletions src/RoveComm/RoveCommHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ inline RoveCommProtocol operator|(RoveCommProtocol protocol, RoveCommProtocol ot
return static_cast<RoveCommProtocol>(static_cast<unsigned int>(protocol) | static_cast<unsigned int>(other));
}

using RoveCommPort = unsigned short;
// using RoveCommPort = unsigned short;

struct RoveCommIp
{
Expand Down Expand Up @@ -72,9 +72,9 @@ inline bool operator!=(const RoveCommIp& ip, const RoveCommIp& other);
class RoveCommAddress
{
public:
RoveCommAddress(RoveCommIp sOctets, RoveCommPort unPort) : m_sOctets(sOctets), m_unPort(unPort) {}
RoveCommAddress(RoveCommIp sOctets, uint16_t unPort) : m_sOctets(sOctets), m_unPort(unPort) {}

RoveCommAddress(char cFirstOctet, char cSecondOctet, char cThirdOctet, char cFourthOctet, RoveCommPort unPort) :
RoveCommAddress(char cFirstOctet, char cSecondOctet, char cThirdOctet, char cFourthOctet, uint16_t unPort) :
RoveCommAddress({cFirstOctet, cSecondOctet, cThirdOctet, cFourthOctet}, unPort)
{}

Expand All @@ -84,7 +84,7 @@ class RoveCommAddress

inline RoveCommIp GetIp() const { return m_sOctets; }

inline RoveCommPort GetPort() const { return m_unPort; }
inline uint16_t GetPort() const { return m_unPort; }

std::string ToString() const;
friend inline std::ostream& operator<<(std::ostream& out, const RoveCommAddress& address);
Expand All @@ -96,14 +96,14 @@ class RoveCommAddress

private:
RoveCommIp m_sOctets;
RoveCommPort m_unPort;
uint16_t m_unPort;

public:
// pass to RoveCommServer::Fetch() signifying any address. This is not a valid address.
const static RoveCommAddress ANY;
const static RoveCommAddress ANY_ADDRESS;
};

using RoveCommSocket = int;
// using RoveCommSocket = int;

inline std::ostream& operator<<(std::ostream& out, const RoveCommAddress& address);
inline bool operator==(const RoveCommAddress& address, const RoveCommAddress& other);
Expand Down
2 changes: 1 addition & 1 deletion src/RoveComm/RoveCommPacket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <memory>
#include <netinet/in.h>

size_t rovecomm::DataTypeSize(RoveCommDataType ucType)
size_t rovecomm::DataTypeSize(uint8_t ucType)
{
switch (ucType)
{
Expand Down
31 changes: 15 additions & 16 deletions src/RoveComm/RoveCommPacket.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
#include <stdlib.h>
#include <string>

using RoveCommVersionId = uint8_t;
using RoveCommDataId = uint16_t;
using RoveCommDataCount = uint16_t;
using RoveCommDataType = uint8_t;
// using RoveCommVersionId = uint8_t;
// using RoveCommDataId = uint16_t;
// using RoveCommDataCount = uint16_t;
// using RoveCommDataType = uint8_t;

namespace rovecomm
{
Expand All @@ -37,7 +37,7 @@ namespace rovecomm
* @author OcelotEmpire ([email protected])
* @date 2024-01-15
******************************************************************************/
size_t DataTypeSize(RoveCommDataType ucDataType);
size_t DataTypeSize(uint8_t ucDataType);
} // namespace rovecomm

/*
Expand All @@ -63,10 +63,10 @@ namespace rovecomm
******************************************************************************/
struct RoveCommPacketHeader
{
RoveCommVersionId ucVersionId;
RoveCommDataId usDataId;
RoveCommDataCount usDataCount;
RoveCommDataType ucDataType;
uint8_t ucVersionId;
uint16_t usDataId;
uint16_t usDataCount;
uint8_t ucDataType;
};

/******************************************************************************
Expand Down Expand Up @@ -95,14 +95,14 @@ class RoveCommPacket
public:
RoveCommPacket() : RoveCommPacket(rovecomm::System::NO_DATA_DATA_ID, 0, rovecomm::DataTypes::INT8_T, nullptr){};

RoveCommPacket(RoveCommDataId usDataId, RoveCommDataCount usDataCount, RoveCommDataType ucDataType, std::unique_ptr<char>&& pData) :
RoveCommPacket(uint16_t usDataId, uint16_t usDataCount, uint8_t ucDataType, std::unique_ptr<char>&& pData) :
RoveCommPacket({rovecomm::ROVECOMM_VERSION, usDataId, usDataCount, ucDataType}, std::move(pData)){};

RoveCommPacket(RoveCommPacketHeader sHeader, std::unique_ptr<char>&& pData) : m_sHeader(sHeader), m_pData(std::move(pData)){};

// convenience constructors:

RoveCommPacket(RoveCommDataId usDataId) : RoveCommPacket(usDataId, 0, rovecomm::DataTypes::UINT8_T, std::unique_ptr<char>{}){};
RoveCommPacket(uint16_t usDataId) : RoveCommPacket(usDataId, 0, rovecomm::DataTypes::UINT8_T, std::unique_ptr<char>{}){};

// example usage: RoveComm.SendTo(address, RoveCommPacket{rovecomm::AUTONOMY::REACHEDMARKER, 1})
// template<typename T>
Expand All @@ -115,14 +115,13 @@ class RoveCommPacket
// for (int i = 0; i<)
// };


inline RoveCommVersionId GetVersionId() const { return m_sHeader.ucVersionId; }
inline uint8_t GetVersionId() const { return m_sHeader.ucVersionId; }

inline RoveCommDataId GetDataId() const { return m_sHeader.usDataId; }
inline uint16_t GetDataId() const { return m_sHeader.usDataId; }

inline RoveCommDataCount GetDataCount() const { return m_sHeader.usDataCount; }
inline uint16_t GetDataCount() const { return m_sHeader.usDataCount; }

inline RoveCommDataType GetDataType() const { return m_sHeader.ucDataType; }
inline uint8_t GetDataType() const { return m_sHeader.ucDataType; }

/******************************************************************************
* @brief Get the size of the packet's data array (not including the header)
Expand Down
45 changes: 31 additions & 14 deletions src/RoveComm/RoveCommServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ bool RoveCommServerManager::Init()
OpenServerOnPort(rovecomm::General::ETHERNET_UDP_PORT, UDP))) // init UDP
return false;

m_thNetworkThread = std::thread(
m_thNetworkThread = std::thread( // init network thread
[&]()
{
while (!m_bStopThread)
Expand Down Expand Up @@ -56,7 +56,7 @@ void RoveCommServerManager::_loop_func()
_read_all_to_queue();
}

bool RoveCommServerManager::OpenServerOnPort(RoveCommPort port, RoveCommProtocol protocol)
bool RoveCommServerManager::OpenServerOnPort(uint16_t port, RoveCommProtocol protocol)
{
const std::lock_guard lock(m_muQueueMutex);
if (m_mServers.contains(protocol))
Expand Down Expand Up @@ -94,7 +94,7 @@ bool RoveCommServerManager::OpenServerOnPort(RoveCommPort port, RoveCommProtocol
return false;
}

int RoveCommServerManager::Write(RoveCommPacket& packet, RoveCommProtocol protocol)
int RoveCommServerManager::Write(const RoveCommPacket& packet, RoveCommProtocol protocol)
{
const std::lock_guard lock(m_muQueueMutex);
int nSent = 0;
Expand All @@ -108,7 +108,7 @@ int RoveCommServerManager::Write(RoveCommPacket& packet, RoveCommProtocol protoc
return nSent;
}

int RoveCommServerManager::SendTo(RoveCommPacket& packet, RoveCommAddress address, RoveCommProtocol protocol)
int RoveCommServerManager::SendTo(const RoveCommPacket& packet, RoveCommAddress address, RoveCommProtocol protocol)
{
const std::lock_guard lock(m_muQueueMutex);
int nSent = 0;
Expand All @@ -124,25 +124,25 @@ int RoveCommServerManager::SendTo(RoveCommPacket& packet, RoveCommAddress addres

void RoveCommServerManager::_read_all_to_queue()
{
const std::lock_guard lock(m_muQueueMutex);
// const std::lock_guard lock(m_muQueueMutex);
for (auto& [serverProtocol, server] : m_mServers)
{
for (auto& packet : server->Read())
for (auto& packet : server->Read()) // for each incoming packet in all servers
{
auto nDataId = packet.GetDataId();
if (m_mCallbacks.contains(nDataId))
if (m_mCallbacks.contains(nDataId)) // check if the dataId matches some callback and invoke it
{
auto callback = m_mCallbacks.at(nDataId);
callback.fInvoke(packet);
callback.fInvoke(packet); // TODO: use AutonomyThread to invoke callbacks asynchronously
if (callback.bRemoveFromQueue)
continue;
continue; // do not propogate packet to queue
}
m_dqPacketQueue.push_back(packet);
}
}
}

std::optional<const RoveCommPacket> RoveCommServerManager::Next()
std::optional<const RoveCommPacket> RoveCommServerManager::NextPacket()
{
const std::lock_guard lock(m_muQueueMutex);
if (m_dqPacketQueue.empty())
Expand All @@ -157,16 +157,33 @@ std::optional<const RoveCommPacket> RoveCommServerManager::Next()
}
}

void RoveCommServerManager::SetCallback(RoveCommDataId unId, std::function<void(RoveCommPacket)> fCallback, bool bRemoveFromQueue)
// std::shared_future<RoveCommPacket> RoveCommServerManager::Fetch(RoveCommDataId unId = rovecomm::System::ANY_DATA_ID,
// RoveCommAddress address = RoveCommAddress::ANY_ADDRESS,
// unsigned long long ulTimeout = 0)
// {
// Fetch([&](const RoveCommPacket& packet, const RoveCommAddress& address) -> bool { return true; }, ulTimeout);
// }

// std::shared_future<RoveCommPacket> RoveCommServerManager::Fetch(std::function<bool(const RoveCommPacket&, const RoveCommAddress&)> fFilter,
// unsigned long long ulTimeout = 0)
// {
// const std::lock_guard lock(m_muQueueMutex);
// RoveCommFetchRequest request{.fPredicate = fFilter,
// .pmPromise = std::promise<RoveCommPacket>{},
// .tmCreated = std::chrono::system_clock::now(),
// .tmTimeout = std::chrono::milliseconds(ulTimeout)};
// m_dqRequestQueue.push_back(request);
// return std::shared_future<RoveCommPacket>(request.pmPromise.get_future());
// }

void RoveCommServerManager::SetCallback(uint16_t unId, std::function<void(RoveCommPacket)> fCallback, bool bRemoveFromQueue)
{
const std::lock_guard lock(m_muQueueMutex);
const std::lock_guard lock(m_muQueueMutex);
m_mCallbacks[unId] = {fCallback, bRemoveFromQueue};
}

void RoveCommServerManager::ClearCallback(RoveCommDataId unId)
void RoveCommServerManager::ClearCallback(uint16_t unId)
{
const std::lock_guard lock(m_muQueueMutex);
const std::lock_guard lock(m_muQueueMutex);
m_mCallbacks.erase(unId);
}
Loading

0 comments on commit 61fba82

Please sign in to comment.