Skip to content

Commit

Permalink
Merge pull request #263 from spacetelescope/refactor/synchronization
Browse files Browse the repository at this point in the history
Refactor synchronization structure
  • Loading branch information
ehpor authored Jan 24, 2025
2 parents 6d78393 + 7989807 commit 67833ed
Show file tree
Hide file tree
Showing 11 changed files with 411 additions and 319 deletions.
1 change: 0 additions & 1 deletion catkit_core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ add_compile_options($<$<CXX_COMPILER_ID:MSVC>:/MP1>)
add_library(catkit_core STATIC
DataStream.cpp
SharedMemory.cpp
Synchronization.cpp
Timing.cpp
Log.cpp
LogConsole.cpp
Expand Down
47 changes: 26 additions & 21 deletions catkit_core/DataStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,14 @@ void CopyString(char *dest, const char *src, size_t n)

DataStream::DataStream(const std::string &stream_id, std::shared_ptr<SharedMemory> shared_memory, bool create)
: m_SharedMemory(shared_memory),
m_Event(nullptr),
m_Header(nullptr), m_Buffer(nullptr),
m_NextFrameIdToRead(0),
m_BufferHandlingMode(BM_NEWEST_ONLY)
{
auto buffer = m_SharedMemory->GetAddress();
m_Header = (DataStreamHeader *) buffer;
m_Buffer = ((char *) buffer) + sizeof(DataStreamHeader);

m_Synchronization.Initialize(stream_id, &(m_Header->m_SynchronizationSharedData), create);
}

DataStream::~DataStream()
Expand Down Expand Up @@ -119,6 +118,8 @@ std::shared_ptr<DataStream> DataStream::Create(const std::string &stream_name, c

data_stream->UpdateParameters(type, dimensions, num_frames_in_buffer);

data_stream->m_Event = Event::Create(stream_id, &(header->m_EventSharedState));

return data_stream;
}

Expand All @@ -141,6 +142,8 @@ std::shared_ptr<DataStream> DataStream::Open(const std::string &stream_id)
// Don't read frames that already are available at the time the data stream is opened.
data_stream->m_NextFrameIdToRead = data_stream->m_Header->m_LastId;

data_stream->m_Event = Event::Open(stream_id, &(data_stream->m_Header->m_EventSharedState));

return data_stream;
}

Expand Down Expand Up @@ -173,22 +176,27 @@ void DataStream::SubmitFrame(size_t id)
DataFrameMetadata *meta = m_Header->m_FrameMetadata + (id % m_Header->m_NumFramesInBuffer);
meta->m_TimeStamp = GetTimeStamp();

// Obtain a lock as we are about to modify the condition of the
// synchronization.
auto lock = SynchronizationLock(&m_Synchronization);

// Make frame available:
// Use a do-while loop to ensure we are never decrementing the last id.
size_t last_id;
do
{
last_id = m_Header->m_LastId;

if (last_id >= id + 1)
break;
} while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1));
// Obtain a lock as we are about to modify the condition of the
// synchronization.
auto lock = EventLockGuard(m_Event);

// Make frame available:
// Use a do-while loop to ensure we are never decrementing the last id.
size_t last_id;
do
{
last_id = m_Header->m_LastId;

if (last_id >= id + 1)
break;
} while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1));

m_Event->Signal();
}

m_Synchronization.Signal();
auto ts = GetTimeStamp();
tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), ts, 0);

// Don't update the framerate counter for the first frame.
if (id == 0)
Expand All @@ -207,9 +215,6 @@ void DataStream::SubmitFrame(size_t id)
m_Header->m_FrameRateCounter =
m_Header->m_FrameRateCounter * std::exp(-FRAMERATE_DECAY * time_delta)
+ FRAMERATE_DECAY;

auto ts = GetTimeStamp();
tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), ts, 0);
}

void DataStream::SubmitData(const void *data)
Expand Down Expand Up @@ -332,8 +337,8 @@ DataFrame DataStream::GetFrame(size_t id, long wait_time_in_ms, void (*error_che

// Wait until frame becomes available.
// Obtain a lock first.
auto lock = SynchronizationLock(&m_Synchronization);
m_Synchronization.Wait(wait_time_in_ms, [this, id]() { return this->m_Header->m_LastId > id; }, error_check);
auto lock = EventLockGuard(m_Event);
m_Event->Wait(wait_time_in_ms, [this, id]() { return this->m_Header->m_LastId > id; }, error_check);
}

size_t offset = (id % m_Header->m_NumFramesInBuffer) * m_Header->m_NumBytesPerFrame;
Expand Down
6 changes: 3 additions & 3 deletions catkit_core/DataStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <climits>

#include "SharedMemory.h"
#include "Synchronization.h"
#include "Event.h"
#include "Tensor.h"

const char * const CURRENT_DATASTREAM_VERSION = "0.2";
Expand Down Expand Up @@ -46,7 +46,7 @@ struct DataStreamHeader

double m_FrameRateCounter;

SynchronizationSharedData m_SynchronizationSharedData;
Event::SharedState m_EventSharedState;
};

class DataFrame : public Tensor
Expand Down Expand Up @@ -117,7 +117,7 @@ class DataStream
DataStreamHeader *m_Header;
char *m_Buffer;

Synchronization m_Synchronization;
std::shared_ptr<Event> m_Event;

size_t m_NextFrameIdToRead;
BufferHandlingMode m_BufferHandlingMode;
Expand Down
32 changes: 32 additions & 0 deletions catkit_core/Event.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#ifndef EVENT_H
#define EVENT_H

#include "EventBase.h"

template <typename Event>
class EventLockGuard
{
public:
inline EventLockGuard(Event &event)
: m_Event(event)
{
m_Event->Lock();
}

inline ~EventLockGuard()
{
m_Event->Unlock();
}

private:
Event &m_Event;
};

// Select which implementation to use based on the platform.
#ifdef _WIN32
using Event = EventImpl<EventImplementationType::ET_SEMAPHORE>;
#elif defined(__linux__) or defined(__APPLE__)
using Event = EventImpl<EventImplementationType::ET_CONDITION_VARIABLE>;
#endif

#endif // EVENT_H
128 changes: 128 additions & 0 deletions catkit_core/EventBase.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#ifndef EVENT_BASE_H
#define EVENT_BASE_H

#include <functional>
#include <stdexcept>

enum EventImplementationType
{
ET_CONDITION_VARIABLE,
ET_FUTEX,
ET_SEMAPHORE,
ET_SPIN_LOCK
};

template<EventImplementationType Type>
struct EventSharedState
{
};

template<EventImplementationType Type>
struct EventLocalState
{
};

template<EventImplementationType Type>
class EventImpl
{
public:
using SharedState = EventSharedState<Type>;
using LocalState = EventLocalState<Type>;

protected:
EventImpl();

public:
~EventImpl();

// Note: do not implement these functions for specific implementations.
static std::unique_ptr<EventImpl<Type>> Create(const std::string &id, SharedState *shared_state);
static std::unique_ptr<EventImpl<Type>> Open(const std::string &id, SharedState *shared_state);

// Note: implement the following functions for specific implementations.
inline void Wait(long timeout_in_ms, std::function<bool()> condition, void (*error_check)());
inline void Signal();

inline void Lock();
inline void Unlock();

protected:
inline void CreateImpl(const std::string &id, SharedState *shared_state);
inline void OpenImpl(const std::string &id, SharedState *shared_state);

bool m_IsOwner;

SharedState *m_SharedState;
LocalState m_LocalState;
};

template<enum EventImplementationType Type>
EventImpl<Type>::EventImpl()
: m_IsOwner(false), m_SharedState(nullptr)
{
}

template<enum EventImplementationType Type>
EventImpl<Type>::~EventImpl()
{
}

template<enum EventImplementationType Type>
void EventImpl<Type>::Wait(long timeout_in_ms, std::function<bool()> condition, void (*error_check)())
{
throw std::runtime_error("This type of event implementation wasn't implemented.");
}

template<enum EventImplementationType Type>
void EventImpl<Type>::Signal()
{
}

template<enum EventImplementationType Type>
void EventImpl<Type>::Lock()
{
}

template<enum EventImplementationType Type>
void EventImpl<Type>::Unlock()
{
}

template<enum EventImplementationType Type>
std::unique_ptr<EventImpl<Type>> EventImpl<Type>::Create(const std::string &id, EventImpl<Type>::SharedState *shared_state)
{
if (!shared_state)
throw std::runtime_error("The passed shared data was a nullptr.");

auto obj = std::unique_ptr<EventImpl<Type>>(new EventImpl<Type>());

obj->CreateImpl(id, shared_state);

obj->m_IsOwner = true;
obj->m_SharedState = shared_state;

return obj;
}

template<enum EventImplementationType Type>
std::unique_ptr<EventImpl<Type>> EventImpl<Type>::Open(const std::string &id, EventImpl<Type>::SharedState *shared_state)
{
if (!shared_state)
throw std::runtime_error("The passed shared data was a nullptr.");

auto obj = std::unique_ptr<EventImpl<Type>>(new EventImpl<Type>());

obj->OpenImpl(id, shared_state);

obj->m_IsOwner = false;
obj->m_SharedState = shared_state;

return obj;
}

#include "EventConditionVariable.inl"
#include "EventFutex.inl"
#include "EventSemaphore.inl"
#include "EventSpinLock.inl"

#endif // EVENT_BASE_H
99 changes: 99 additions & 0 deletions catkit_core/EventConditionVariable.inl
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#include "EventBase.h"

#include "Timing.h"

#if defined(__linux__) || defined(__APPLE__)
#include <pthread.h>
#endif

using EventConditionVariable = EventImpl<EventImplementationType::ET_CONDITION_VARIABLE>;

#if defined(__linux__) || defined(__APPLE__)

template<>
struct EventSharedState<EventImplementationType::ET_CONDITION_VARIABLE>
{
pthread_mutex_t m_Mutex;
pthread_cond_t m_Condition;
};

template<>
inline void EventConditionVariable::Wait(long timeout_in_ms, std::function<bool()> condition, void (*error_check)())
{
Timer timer;

while (!condition())
{
// Wait for a maximum of 20ms to perform periodic error checking.
long timeout_wait = std::min(20L, timeout_in_ms);

#ifdef __APPLE__
// Relative timespec.
timespec timeout;
timeout.tv_sec = timeout_wait / 1000;
timeout.tv_nsec = 1000000 * (timeout_wait % 1000);

int res = pthread_cond_timedwait_relative_np(&(m_SharedState->m_Condition), &(m_SharedState->m_Mutex), &timeout);
#else
// Absolute timespec.
timespec timeout;
clock_gettime(CLOCK_MONOTONIC, &timeout);
timeout.tv_sec += timeout_wait / 1000;
timeout.tv_nsec += 1000000 * (timeout_wait % 1000);

int res = pthread_cond_timedwait(&(m_SharedState->m_Condition), &(m_SharedState->m_Mutex), &timeout);
#endif // __APPLE__
if (res == ETIMEDOUT && timer.GetTime() > (timeout_in_ms * 0.001))
{
throw std::runtime_error("Waiting time has expired.");
}

if (error_check != nullptr)
error_check();
}
}

template<>
inline void EventConditionVariable::Signal()
{
pthread_cond_broadcast(&(m_SharedState->m_Condition));
}

template<>
inline void EventConditionVariable::Lock()
{
pthread_mutex_lock(&(m_SharedState->m_Mutex));
}

template<>
inline void EventConditionVariable::Unlock()
{
pthread_mutex_unlock(&(m_SharedState->m_Mutex));
}

template<>
inline void EventConditionVariable::CreateImpl(const std::string &id, EventConditionVariable::SharedState *shared_state)
{
pthread_mutexattr_t mutex_attr;
pthread_mutexattr_init(&mutex_attr);
pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&(shared_state->m_Mutex), &mutex_attr);
pthread_mutexattr_destroy(&mutex_attr);

pthread_condattr_t cond_attr;
pthread_condattr_init(&cond_attr);
pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
#ifndef __APPLE__
pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC);
#endif // __APPLE__
pthread_cond_init(&(shared_state->m_Condition), &cond_attr);
pthread_condattr_destroy(&cond_attr);
}

template<>
inline void EventConditionVariable::OpenImpl(const std::string &id, EventConditionVariable::SharedState *shared_state)
{
// Nothing to do.
}

#endif
Empty file added catkit_core/EventFutex.inl
Empty file.
Loading

0 comments on commit 67833ed

Please sign in to comment.