From 5c232e0cda56fd269ef35be694a9c231fccd0e36 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Sun, 3 Nov 2024 12:57:37 -0800 Subject: [PATCH 01/23] Use a reference instead of a pointer. --- catkit_core/Synchronization.cpp | 6 +++--- catkit_core/Synchronization.h | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/catkit_core/Synchronization.cpp b/catkit_core/Synchronization.cpp index 8da03b86e..54e6aa2b9 100644 --- a/catkit_core/Synchronization.cpp +++ b/catkit_core/Synchronization.cpp @@ -9,15 +9,15 @@ #include "Timing.h" -SynchronizationLock::SynchronizationLock(Synchronization *sync) +SynchronizationLock::SynchronizationLock(Synchronization &sync) : m_Sync(sync) { - m_Sync->Lock(); + m_Sync.Lock(); } SynchronizationLock::~SynchronizationLock() { - m_Sync->Unlock(); + m_Sync.Unlock(); } Synchronization::Synchronization() diff --git a/catkit_core/Synchronization.h b/catkit_core/Synchronization.h index 45ec8dfa9..2aa1a8b6f 100644 --- a/catkit_core/Synchronization.h +++ b/catkit_core/Synchronization.h @@ -30,11 +30,11 @@ struct SynchronizationSharedData class SynchronizationLock { public: - SynchronizationLock(Synchronization *sync); + SynchronizationLock(Synchronization &sync); ~SynchronizationLock(); private: - Synchronization *m_Sync; + Synchronization &m_Sync; }; class Synchronization From 54a6265e0e8d0e2fe08f40b1c1f910b42d53d943 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 17:29:29 -0800 Subject: [PATCH 02/23] Better scope the synchronization lock. It should only be around the tme-critical part. --- catkit_core/DataStream.cpp | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/catkit_core/DataStream.cpp b/catkit_core/DataStream.cpp index 09d20a822..64c0f1924 100644 --- a/catkit_core/DataStream.cpp +++ b/catkit_core/DataStream.cpp @@ -173,22 +173,27 @@ void DataStream::SubmitFrame(size_t id) DataFrameMetadata *meta = m_Header->m_FrameMetadata + (id % m_Header->m_NumFramesInBuffer); meta->m_TimeStamp = GetTimeStamp(); - // Obtain a lock as we are about to modify the condition of the - // synchronization. - auto lock = SynchronizationLock(&m_Synchronization); - - // Make frame available: - // Use a do-while loop to ensure we are never decrementing the last id. - size_t last_id; - do { - last_id = m_Header->m_LastId; - - if (last_id >= id + 1) - break; - } while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1)); + // Obtain a lock as we are about to modify the condition of the + // synchronization. + auto lock = SynchronizationLock(m_Synchronization); + + // Make frame available: + // Use a do-while loop to ensure we are never decrementing the last id. + size_t last_id; + do + { + last_id = m_Header->m_LastId; + + if (last_id >= id + 1) + break; + } while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1)); + + m_Synchronization.Signal(); + } - m_Synchronization.Signal(); + auto ts = GetTimeStamp(); + tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), ts, 0); // Don't update the framerate counter for the first frame. if (id == 0) @@ -207,9 +212,6 @@ void DataStream::SubmitFrame(size_t id) m_Header->m_FrameRateCounter = m_Header->m_FrameRateCounter * std::exp(-FRAMERATE_DECAY * time_delta) + FRAMERATE_DECAY; - - auto ts = GetTimeStamp(); - tracing_proxy.TraceInterval("DataStream::SubmitFrame", GetStreamName(), ts, 0); } void DataStream::SubmitData(const void *data) From 2eec9a7d156b6da3bd7e8744edfb45e1d0e6ede9 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:30:32 -0800 Subject: [PATCH 03/23] Use CRTP instead of preprocessor directives for operating systems. This allows us to separate different types of synchronization within one operating system. --- catkit_core/Synchronization.cpp | 224 -------------------------------- catkit_core/Synchronization.h | 74 +++++------ catkit_core/Synchronization.inl | 121 +++++++++++++++++ 3 files changed, 152 insertions(+), 267 deletions(-) delete mode 100644 catkit_core/Synchronization.cpp create mode 100644 catkit_core/Synchronization.inl diff --git a/catkit_core/Synchronization.cpp b/catkit_core/Synchronization.cpp deleted file mode 100644 index 54e6aa2b9..000000000 --- a/catkit_core/Synchronization.cpp +++ /dev/null @@ -1,224 +0,0 @@ -#include "Synchronization.h" - -#include -#include - -#ifndef _WIN32 - #include -#endif - -#include "Timing.h" - -SynchronizationLock::SynchronizationLock(Synchronization &sync) - : m_Sync(sync) -{ - m_Sync.Lock(); -} - -SynchronizationLock::~SynchronizationLock() -{ - m_Sync.Unlock(); -} - -Synchronization::Synchronization() - : m_IsOwner(false), m_SharedData(nullptr) -{ -} - -Synchronization::~Synchronization() -{ - if (m_SharedData) - { -#ifdef _WIN32 - CloseHandle(m_Semaphore); -#else - -#endif - } -} - -void Synchronization::Initialize(const std::string &id, SynchronizationSharedData *shared_data, bool create) -{ - if (create) - { - Create(id, shared_data); - } - else - { - Open(id, shared_data); - } -} - -void Synchronization::Create(const std::string &id, SynchronizationSharedData *shared_data) -{ - if (m_SharedData) - throw std::runtime_error("Create called on an already initialized Synchronization object."); - - if (!shared_data) - throw std::runtime_error("The passed shared data was a nullptr."); - -#ifdef _WIN32 - m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); - - if (m_Semaphore == NULL) - throw std::runtime_error("Something went wrong while creating semaphore."); - - shared_data->m_NumReadersWaiting = 0; -#else - pthread_mutexattr_t mutex_attr; - pthread_mutexattr_init(&mutex_attr); - pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(&(shared_data->m_Mutex), &mutex_attr); - pthread_mutexattr_destroy(&mutex_attr); - - pthread_condattr_t cond_attr; - pthread_condattr_init(&cond_attr); - pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); -#ifndef __APPLE__ - pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC); -#endif // __APPLE__ - pthread_cond_init(&(shared_data->m_Condition), &cond_attr); - pthread_condattr_destroy(&cond_attr); -#endif // _WIN32 - - m_SharedData = shared_data; -} - -void Synchronization::Open(const std::string &id, SynchronizationSharedData *shared_data) -{ - if (m_SharedData) - throw std::runtime_error("Open called on an already initialized Synchronization object."); - - if (!shared_data) - throw std::runtime_error("The passed shared data was a nullptr."); - -#ifdef _WIN32 - m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); - - if (m_Semaphore == NULL) - throw std::runtime_error("Something went wrong while opening semaphore."); -#else -#endif - - m_SharedData = shared_data; -} - -void Synchronization::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) -{ - if (!m_SharedData) - throw std::runtime_error("Wait() was called before the synchronization was intialized."); - -#ifdef _WIN32 - Timer timer; - DWORD res = WAIT_OBJECT_0; - - while (!condition()) - { - if (res == WAIT_OBJECT_0) - { - // Increment the number of readers that are waiting, making sure the counter - // is at least 1 after the increment. This can occur when a previous reader got - // interrupted and the trigger happening before decrementing the - // m_NumReadersWaiting counter. - while (m_SharedData->m_NumReadersWaiting++ < 0) - { - } - } - - // Wait for a maximum of 20ms to perform periodic error checking. - auto res = WaitForSingleObject(m_Semaphore, (unsigned long) (std::min)(20L, timeout_in_ms)); - - if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) - { - m_SharedData->m_NumReadersWaiting--; - throw std::runtime_error("Waiting time has expired."); - } - - if (res == WAIT_FAILED) - { - m_SharedData->m_NumReadersWaiting--; - throw std::runtime_error("An error occured during waiting for the semaphore: " + std::to_string(GetLastError())); - } - - if (error_check != nullptr) - { - try - { - error_check(); - } - catch (...) - { - m_SharedData->m_NumReadersWaiting--; - throw; - } - } - } -#else - Timer timer; - - while (!condition()) - { - // Wait for a maximum of 20ms to perform periodic error checking. - long timeout_wait = std::min(20L, timeout_in_ms); - -#ifdef __APPLE__ - // Relative timespec. - timespec timeout; - timeout.tv_sec = timeout_wait / 1000; - timeout.tv_nsec = 1000000 * (timeout_wait % 1000); - - int res = pthread_cond_timedwait_relative_np(&(m_SharedData->m_Condition), &(m_SharedData->m_Mutex), &timeout); -#else - // Absolute timespec. - timespec timeout; - clock_gettime(CLOCK_MONOTONIC, &timeout); - timeout.tv_sec += timeout_wait / 1000; - timeout.tv_nsec += 1000000 * (timeout_wait % 1000); - - int res = pthread_cond_timedwait(&(m_SharedData->m_Condition), &(m_SharedData->m_Mutex), &timeout); -#endif // __APPLE__ - if (res == ETIMEDOUT && timer.GetTime() > (timeout_in_ms * 0.001)) - { - throw std::runtime_error("Waiting time has expired."); - } - - if (error_check != nullptr) - error_check(); - } -#endif // _WIN32 -} - -void Synchronization::Signal() -{ - if (!m_SharedData) - throw std::runtime_error("Signal() was called before the synchronization was intialized."); - -#ifdef _WIN32 - // Notify waiting processes. - long num_readers_waiting = m_SharedData->m_NumReadersWaiting.exchange(0); - - // If a reader times out in between us reading the number of readers that are waiting - // and us releasing the semaphore, we are releasing one too many readers. This - // results in a future reader being released immediately, which is not a problem, - // as there are checks in place for that. - - if (num_readers_waiting > 0) - ReleaseSemaphore(m_Semaphore, (LONG) num_readers_waiting, NULL); -#else - pthread_cond_broadcast(&(m_SharedData->m_Condition)); -#endif // _WIN32 -} - -void Synchronization::Lock() -{ -#ifndef _WIN32 - pthread_mutex_lock(&(m_SharedData->m_Mutex)); -#endif // _WIN32 -} - -void Synchronization::Unlock() -{ -#ifndef _WIN32 - pthread_mutex_unlock(&(m_SharedData->m_Mutex)); -#endif // _WIN32 -} diff --git a/catkit_core/Synchronization.h b/catkit_core/Synchronization.h index 2aa1a8b6f..a00fce299 100644 --- a/catkit_core/Synchronization.h +++ b/catkit_core/Synchronization.h @@ -6,47 +6,24 @@ #include #include -#ifdef _WIN32 - #define WIN32_LEAN_AND_MEAN - #define NOMINMAX - #include -#else - #include - #include -#endif // _WIN32 - -class Synchronization; - -struct SynchronizationSharedData -{ -#ifdef _WIN32 - std::atomic_long m_NumReadersWaiting; -#else - pthread_cond_t m_Condition; - pthread_mutex_t m_Mutex; -#endif -}; - -class SynchronizationLock +template +class SynchronizationBase { public: - SynchronizationLock(Synchronization &sync); - ~SynchronizationLock(); + using SharedState = SharedStateType; -private: - Synchronization &m_Sync; -}; + SynchronizationBase(); + SynchronizationBase(const SynchronizationBase &other) = delete; + ~SynchronizationBase(); -class Synchronization -{ -public: - Synchronization(); - Synchronization(const Synchronization &other) = delete; - ~Synchronization(); + SynchronizationBase &operator=(const SynchronizationBase &other) = delete; + + void Initialize(const std::string &id, SharedState *shared_state, bool create); - Synchronization &operator=(const Synchronization &other) = delete; + void Create(const std::string &id, SharedState *shared_state); - void Initialize(const std::string &id, SynchronizationSharedData *shared_data, bool create); + void Open(const std::string &id, SharedState *shared_state); + void Close(); void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); void Signal(); @@ -54,17 +31,28 @@ class Synchronization void Lock(); void Unlock(); -private: - void Create(const std::string &id, SynchronizationSharedData *shared_data); - void Open(const std::string &id, SynchronizationSharedData *shared_data); +protected: + void CreateImpl(const std::string &id, SharedState *shared_state); + + void OpenImpl(const std::string &id, SharedState *shared_state); + void CloseImpl(); bool m_IsOwner; - SynchronizationSharedData *m_SharedData; - std::string m_Id; + bool m_IsOpen; + SharedState *m_SharedState; +}; + +template +class SynchronizationLock +{ +public: + SynchronizationLock(T &sync); + ~SynchronizationLock(); -#ifdef _WIN32 - HANDLE m_Semaphore; -#endif +private: + T &m_Sync; }; +#include "Synchronization.inl" + #endif // SYNCHRONIZATION_H diff --git a/catkit_core/Synchronization.inl b/catkit_core/Synchronization.inl new file mode 100644 index 000000000..be6f89571 --- /dev/null +++ b/catkit_core/Synchronization.inl @@ -0,0 +1,121 @@ +#include "Synchronization.h" + +template +SynchronizationLock::SynchronizationLock(T &sync) + : m_Sync(sync) +{ + m_Sync.Lock(); +} + +template +SynchronizationLock::~SynchronizationLock() +{ + m_Sync.Unlock(); +} + +template +SynchronizationBase::SynchronizationBase() + : m_IsOwner(false), m_SharedState(nullptr), m_IsOpen(false) +{ +} + +template +SynchronizationBase::~SynchronizationBase() +{ + Close(); +} + +template +void SynchronizationBase::Initialize(const std::string &id, SharedState *shared_state, bool create) +{ + if (create) + { + Create(id, shared_state); + } + else + { + Open(id, shared_state); + } +} + +template +void SynchronizationBase::Create(const std::string &id, SharedState *shared_state) +{ + if (m_IsOpen) + throw std::runtime_error("Create called on an already initialized Synchronization object."); + + if (!shared_state) + throw std::runtime_error("The passed shared data was a nullptr."); + + static_cast(this)->CreateImpl(id, shared_state); + + m_IsOpen = true; + m_IsOwner = true; + + m_SharedState = shared_state; +} + +template +void SynchronizationBase::Open(const std::string &id, SharedState *shared_state) +{ + if (m_IsOpen) + throw std::runtime_error("Open called on an already initialized Synchronization object."); + + if (!shared_state) + throw std::runtime_error("The passed shared data was a nullptr."); + + + static_cast(this)->CreateImpl(id, shared_state); + + m_IsOpen = true; + m_IsOwner = false; + + m_SharedState = shared_state; +} + +template +void SynchronizationBase::Close() +{ + if (!m_IsOpen) + return; + + static_cast(this)->CloseImpl(); + + m_IsOpen = false; + m_SharedState = nullptr; +} + +template +void SynchronizationBase::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ +} + +template +void SynchronizationBase::Signal() +{ +} + +template +void SynchronizationBase::Lock() +{ +} + +template +void SynchronizationBase::Unlock() +{ +} + +template +void SynchronizationBase::CreateImpl(const std::string &id, SharedState *shared_state) +{ +} + +template +void SynchronizationBase::OpenImpl(const std::string &id, SharedState *shared_state) +{ +} + +template +void SynchronizationBase::CloseImpl() +{ +} From 399b46b87de4a362fe7f7d1a02e1c253b7c5a4dd Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:31:01 -0800 Subject: [PATCH 04/23] Add Windows semaphore synchronization. --- catkit_core/SynchronizationSemaphore.cpp | 95 ++++++++++++++++++++++++ catkit_core/SynchronizationSemaphore.h | 43 +++++++++++ 2 files changed, 138 insertions(+) create mode 100644 catkit_core/SynchronizationSemaphore.cpp create mode 100644 catkit_core/SynchronizationSemaphore.h diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp new file mode 100644 index 000000000..459821bae --- /dev/null +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -0,0 +1,95 @@ +#include "SynchronizationSemaphore.h" + +#ifdef _WIN32 +void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void *(error_check)()) +{ + Timer timer; + DWORD res = WAIT_OBJECT_0; + + while (!condition()) + { + if (res == WAIT_OBJECT_0) + { + // Increment the number of readers that are waiting, making sure the counter + // is at least 1 after the increment. This can occur when a previous reader got + // interrupted and the trigger happening before decrementing the + // m_NumReadersWaiting counter. + while (m_SharedData->m_NumReadersWaiting++ < 0) + { + } + } + + // Wait for a maximum of 20ms to perform periodic error checking. + auto res = WaitForSingleObject(m_Semaphore, (unsigned long) (std::min)(20L, timeout_in_ms)); + + if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) + { + m_SharedData->m_NumReadersWaiting--; + throw std::runtime_error("Waiting time has expired."); + } + + if (res == WAIT_FAILED) + { + m_SharedData->m_NumReadersWaiting--; + throw std::runtime_error("An error occured during waiting for the semaphore: " + std::to_string(GetLastError())); + } + + if (error_check != nullptr) + { + try + { + error_check(); + } + catch (...) + { + m_SharedData->m_NumReadersWaiting--; + throw; + } + } + } +} + +void SynchronizationSemaphore::Signal() +{ + // Notify waiting processes. + long num_readers_waiting = m_SharedData->m_NumReadersWaiting.exchange(0); + + // If a reader times out in between us reading the number of readers that are waiting + // and us releasing the semaphore, we are releasing one too many readers. This + // results in a future reader being released immediately, which is not a problem, + // as there are checks in place for that. + + if (num_readers_waiting > 0) + ReleaseSemaphore(m_Semaphore, (LONG) num_readers_waiting, NULL); +} + +void SynchronizationSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) +{ + m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); + + if (m_Semaphore == NULL) + throw std::runtime_error("Something went wrong while creating semaphore."); + + shared_data->m_NumReadersWaiting = 0; +} + +void SynchronizationSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) +{ + m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); + + if (m_Semaphore == NULL) + throw std::runtime_error("Something went wrong while opening semaphore."); +} + +void SynchronizationSempahore::CloseImpl() +{ + CloseHandle(m_Semaphore); +} + +#ifdef __linux__ +#endif // __linux__ + +#ifdef __APPLE__ +#endif // __APPLE__ + +#endif diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h new file mode 100644 index 000000000..c2ad265d2 --- /dev/null +++ b/catkit_core/SynchronizationSemaphore.h @@ -0,0 +1,43 @@ +#ifndef SYNCHRONIZATION_SEMAPHORE_H +#define SYNCHRONIZATION_SEMAPHORE_H + +#include "Synchronization.h" + +#include + +#ifdef _WIN32 + #define WIN32_LEAN_AND_MEAN + #define NOMINMAX + #include +#endif + +#ifdef _WIN32 +struct SharedDataWindowsSemaphore +{ + std::atomic_long m_NumReadersWaiting; +}; + +class SynchronizationWindowsSemaphore : public SynchronizationBase +{ +public: + void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); + void Signal(); + + void Lock(); + void Unlock(); + +protected: + void Create(const std::string &id, SharedState *shared_state); + void Open(const std::string &id, SharedState *shared_state); + + HANDLE m_Semaphore; +}; +#endif // _WIN32 + +#ifdef __linux__ +#endif // __linux__ + +#ifdef __APPLE__ +#endif // __APPLE__ + +# endif // SYNCHRONIZATION_SEMAPHORE_H From b663890c67deb556f43374ea55055fda4c2742e8 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:31:18 -0800 Subject: [PATCH 05/23] Add Posix condition variable synchronization. --- .../SynchronizationConditionVariable.cpp | 75 +++++++++++++++++++ .../SynchronizationConditionVariable.h | 32 ++++++++ 2 files changed, 107 insertions(+) create mode 100644 catkit_core/SynchronizationConditionVariable.cpp create mode 100644 catkit_core/SynchronizationConditionVariable.h diff --git a/catkit_core/SynchronizationConditionVariable.cpp b/catkit_core/SynchronizationConditionVariable.cpp new file mode 100644 index 000000000..8bc97eaff --- /dev/null +++ b/catkit_core/SynchronizationConditionVariable.cpp @@ -0,0 +1,75 @@ +#include "SynchronizationConditionVariable.h" + +#include "Timing.h" + +#if defined(__linux__) || defined(__APPLE__) + +void SynchronizationConditionVariable::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ + Timer timer; + + while (!condition()) + { + // Wait for a maximum of 20ms to perform periodic error checking. + long timeout_wait = std::min(20L, timeout_in_ms); + +#ifdef __APPLE__ + // Relative timespec. + timespec timeout; + timeout.tv_sec = timeout_wait / 1000; + timeout.tv_nsec = 1000000 * (timeout_wait % 1000); + + int res = pthread_cond_timedwait_relative_np(&(m_SharedState->m_Condition), &(m_SharedState->m_Mutex), &timeout); +#else + // Absolute timespec. + timespec timeout; + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_sec += timeout_wait / 1000; + timeout.tv_nsec += 1000000 * (timeout_wait % 1000); + + int res = pthread_cond_timedwait(&(m_SharedState->m_Condition), &(m_SharedState->m_Mutex), &timeout); +#endif // __APPLE__ + if (res == ETIMEDOUT && timer.GetTime() > (timeout_in_ms * 0.001)) + { + throw std::runtime_error("Waiting time has expired."); + } + + if (error_check != nullptr) + error_check(); + } +} + +void SynchronizationConditionVariable::Signal() +{ + pthread_cond_broadcast(&(m_SharedState->m_Condition)); +} + +void SynchronizationConditionVariable::Lock() +{ + pthread_mutex_lock(&(m_SharedState->m_Mutex)); +} + +void SynchronizationConditionVariable::Unlock() +{ + pthread_mutex_unlock(&(m_SharedState->m_Mutex)); +} + +void SynchronizationConditionVariable::CreateImpl(const std::string &id, SynchronizationConditionVariable::SharedState *shared_state) +{ + pthread_mutexattr_t mutex_attr; + pthread_mutexattr_init(&mutex_attr); + pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&(shared_state->m_Mutex), &mutex_attr); + pthread_mutexattr_destroy(&mutex_attr); + + pthread_condattr_t cond_attr; + pthread_condattr_init(&cond_attr); + pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC); +#endif // __APPLE__ + pthread_cond_init(&(shared_state->m_Condition), &cond_attr); + pthread_condattr_destroy(&cond_attr); +} + +#endif diff --git a/catkit_core/SynchronizationConditionVariable.h b/catkit_core/SynchronizationConditionVariable.h new file mode 100644 index 000000000..f2cabf908 --- /dev/null +++ b/catkit_core/SynchronizationConditionVariable.h @@ -0,0 +1,32 @@ +#ifndef SYNCHRONIZATION_CONDITION_VARIABLE_H +#define SYNCHRONIZATION_CONDITION_VARIABLE_H + +#include "Synchronization.h" + +#if defined(__linux__) || defined(__APPLE__) + +#include + +struct SharedStateConditionVariable +{ + pthread_cond_t m_Condition; + pthread_mutex_t m_Mutex; +}; + +class SynchronizationConditionVariable : public SynchronizationBase +{ + friend SynchronizationBase; +public: + void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); + void Signal(); + + void Lock(); + void Unlock(); + +protected: + void CreateImpl(const std::string &id, SynchronizationConditionVariable::SharedState *shared_state); +}; + +#endif // Linux or Apple + +#endif // SYNCHRONIZATION_CONDITION_VARIABLE_H From 93d68e22b872806cd5446e0c3a44edc5693c3ac4 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:32:45 -0800 Subject: [PATCH 06/23] Set default Synchronization method for DataStreams. --- catkit_core/DataStream.cpp | 2 +- catkit_core/DataStream.h | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/catkit_core/DataStream.cpp b/catkit_core/DataStream.cpp index 64c0f1924..b80754b6f 100644 --- a/catkit_core/DataStream.cpp +++ b/catkit_core/DataStream.cpp @@ -334,7 +334,7 @@ DataFrame DataStream::GetFrame(size_t id, long wait_time_in_ms, void (*error_che // Wait until frame becomes available. // Obtain a lock first. - auto lock = SynchronizationLock(&m_Synchronization); + auto lock = SynchronizationLock(m_Synchronization); m_Synchronization.Wait(wait_time_in_ms, [this, id]() { return this->m_Header->m_LastId > id; }, error_check); } diff --git a/catkit_core/DataStream.h b/catkit_core/DataStream.h index 322a2ea2d..440f26000 100644 --- a/catkit_core/DataStream.h +++ b/catkit_core/DataStream.h @@ -8,8 +8,16 @@ #include "SharedMemory.h" #include "Synchronization.h" +#include "SynchronizationConditionVariable.h" +#include "SynchronizationSemaphore.h" #include "Tensor.h" +#ifdef _WIN32 +using Synchronization = SynchronizationSemaphore; +#elif defined(__linux__) or defined(__APPLE__) +using Synchronization = SynchronizationConditionVariable; +#endif + const char * const CURRENT_DATASTREAM_VERSION = "0.2"; const size_t MAX_NUM_FRAMES_IN_BUFFER = 20; const long INFINITE_WAIT_TIME = LONG_MAX; @@ -46,7 +54,7 @@ struct DataStreamHeader double m_FrameRateCounter; - SynchronizationSharedData m_SynchronizationSharedData; + Synchronization::SharedState m_SynchronizationSharedData; }; class DataFrame : public Tensor From 9852b133e6ba75492d4353ccfdf0b6caf5b693a8 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:33:19 -0800 Subject: [PATCH 07/23] Compile synchronization methods. --- catkit_core/CMakeLists.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/catkit_core/CMakeLists.txt b/catkit_core/CMakeLists.txt index 6258d432e..4183440a6 100644 --- a/catkit_core/CMakeLists.txt +++ b/catkit_core/CMakeLists.txt @@ -14,7 +14,8 @@ add_compile_options($<$:/MP1>) add_library(catkit_core STATIC DataStream.cpp SharedMemory.cpp - Synchronization.cpp + SynchronizationConditionVariable.cpp + SynchronizationSemaphore.cpp Timing.cpp Log.cpp LogConsole.cpp From 0fbbf016cb31b4fb6686a34f9a33532939f66c29 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:48:33 -0800 Subject: [PATCH 08/23] Fix variable names. --- catkit_core/SynchronizationSemaphore.cpp | 10 +++++----- catkit_core/SynchronizationSemaphore.h | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp index 459821bae..68fdcab3d 100644 --- a/catkit_core/SynchronizationSemaphore.cpp +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -14,7 +14,7 @@ void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function co // is at least 1 after the increment. This can occur when a previous reader got // interrupted and the trigger happening before decrementing the // m_NumReadersWaiting counter. - while (m_SharedData->m_NumReadersWaiting++ < 0) + while (m_SharedState->m_NumReadersWaiting++ < 0) { } } @@ -24,13 +24,13 @@ void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function co if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) { - m_SharedData->m_NumReadersWaiting--; + m_SharedState->m_NumReadersWaiting--; throw std::runtime_error("Waiting time has expired."); } if (res == WAIT_FAILED) { - m_SharedData->m_NumReadersWaiting--; + m_SharedState->m_NumReadersWaiting--; throw std::runtime_error("An error occured during waiting for the semaphore: " + std::to_string(GetLastError())); } @@ -42,7 +42,7 @@ void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function co } catch (...) { - m_SharedData->m_NumReadersWaiting--; + m_SharedState->m_NumReadersWaiting--; throw; } } @@ -52,7 +52,7 @@ void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function co void SynchronizationSemaphore::Signal() { // Notify waiting processes. - long num_readers_waiting = m_SharedData->m_NumReadersWaiting.exchange(0); + long num_readers_waiting = m_SharedState->m_NumReadersWaiting.exchange(0); // If a reader times out in between us reading the number of readers that are waiting // and us releasing the semaphore, we are releasing one too many readers. This diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h index c2ad265d2..0b6881dac 100644 --- a/catkit_core/SynchronizationSemaphore.h +++ b/catkit_core/SynchronizationSemaphore.h @@ -12,12 +12,12 @@ #endif #ifdef _WIN32 -struct SharedDataWindowsSemaphore +struct SharedStateSemaphore { std::atomic_long m_NumReadersWaiting; }; -class SynchronizationWindowsSemaphore : public SynchronizationBase +class SynchronizationSemaphore : public SynchronizationBase { public: void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); From 0663382351ef669b4de3e1e105582512988858f0 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Tue, 5 Nov 2024 23:50:51 -0800 Subject: [PATCH 09/23] Add missing includes. --- catkit_core/Synchronization.inl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/catkit_core/Synchronization.inl b/catkit_core/Synchronization.inl index be6f89571..1f842e62c 100644 --- a/catkit_core/Synchronization.inl +++ b/catkit_core/Synchronization.inl @@ -1,5 +1,8 @@ #include "Synchronization.h" +#include +#include + template SynchronizationLock::SynchronizationLock(T &sync) : m_Sync(sync) From bbc8822a05ea06c1621f7a4395bfa6df62443b13 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 6 Nov 2024 00:00:03 -0800 Subject: [PATCH 10/23] Fix the function pointer type. --- catkit_core/SynchronizationSemaphore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp index 68fdcab3d..563d44af2 100644 --- a/catkit_core/SynchronizationSemaphore.cpp +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -1,7 +1,7 @@ #include "SynchronizationSemaphore.h" #ifdef _WIN32 -void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void *(error_check)()) +void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { Timer timer; DWORD res = WAIT_OBJECT_0; From de41e7b5e40f5079b0aa7cb0a6990c4d80d6f1f2 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 6 Nov 2024 10:41:50 -0800 Subject: [PATCH 11/23] Add missing include. --- catkit_core/SynchronizationSemaphore.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp index 563d44af2..63a986ffb 100644 --- a/catkit_core/SynchronizationSemaphore.cpp +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -1,4 +1,5 @@ #include "SynchronizationSemaphore.h" +#include "Timing.h" #ifdef _WIN32 void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) From c1aabd8f43902ec190e7d23350ddd20b1433a972 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 6 Nov 2024 11:37:41 -0800 Subject: [PATCH 12/23] Fix function names. --- catkit_core/SynchronizationSemaphore.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h index 0b6881dac..778007bad 100644 --- a/catkit_core/SynchronizationSemaphore.h +++ b/catkit_core/SynchronizationSemaphore.h @@ -27,8 +27,8 @@ class SynchronizationSemaphore : public SynchronizationBase Date: Wed, 6 Nov 2024 12:11:00 -0800 Subject: [PATCH 13/23] Add friend classes and fix class name typos. --- catkit_core/SynchronizationConditionVariable.h | 1 + catkit_core/SynchronizationSemaphore.cpp | 4 ++-- catkit_core/SynchronizationSemaphore.h | 6 +++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/catkit_core/SynchronizationConditionVariable.h b/catkit_core/SynchronizationConditionVariable.h index f2cabf908..2027215c0 100644 --- a/catkit_core/SynchronizationConditionVariable.h +++ b/catkit_core/SynchronizationConditionVariable.h @@ -16,6 +16,7 @@ struct SharedStateConditionVariable class SynchronizationConditionVariable : public SynchronizationBase { friend SynchronizationBase; + public: void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); void Signal(); diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/SynchronizationSemaphore.cpp index 63a986ffb..b381011ff 100644 --- a/catkit_core/SynchronizationSemaphore.cpp +++ b/catkit_core/SynchronizationSemaphore.cpp @@ -71,7 +71,7 @@ void SynchronizationSemaphore::CreateImpl(const std::string &id, SharedState *sh if (m_Semaphore == NULL) throw std::runtime_error("Something went wrong while creating semaphore."); - shared_data->m_NumReadersWaiting = 0; + shared_state->m_NumReadersWaiting = 0; } void SynchronizationSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) @@ -82,7 +82,7 @@ void SynchronizationSemaphore::OpenImpl(const std::string &id, SharedState *shar throw std::runtime_error("Something went wrong while opening semaphore."); } -void SynchronizationSempahore::CloseImpl() +void SynchronizationSemaphore::CloseImpl() { CloseHandle(m_Semaphore); } diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h index 778007bad..46cd2f923 100644 --- a/catkit_core/SynchronizationSemaphore.h +++ b/catkit_core/SynchronizationSemaphore.h @@ -19,16 +19,16 @@ struct SharedStateSemaphore class SynchronizationSemaphore : public SynchronizationBase { + friend SynchronizationBase; + public: void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); void Signal(); - void Lock(); - void Unlock(); - protected: void CreateImpl(const std::string &id, SharedState *shared_state); void OpenImpl(const std::string &id, SharedState *shared_state); + void CloseImpl(); HANDLE m_Semaphore; }; From 72eea33c96ef085326c47f3fc23eae6efb3c7bf7 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 22 Jan 2025 22:17:05 -0800 Subject: [PATCH 14/23] Refactor to use template specialization instead of CRTP. --- catkit_core/Event.h | 13 ++ catkit_core/EventBase.h | 75 +++++++++++ ...ariable.cpp => EventConditionVariable.inl} | 0 catkit_core/EventFutex.inl | 0 ...zationSemaphore.cpp => EventSemaphore.inl} | 0 catkit_core/EventSpinLock.inl | 0 catkit_core/Synchronization.h | 58 -------- catkit_core/Synchronization.inl | 124 ------------------ .../SynchronizationConditionVariable.h | 33 ----- catkit_core/SynchronizationSemaphore.h | 43 ------ 10 files changed, 88 insertions(+), 258 deletions(-) create mode 100644 catkit_core/Event.h create mode 100644 catkit_core/EventBase.h rename catkit_core/{SynchronizationConditionVariable.cpp => EventConditionVariable.inl} (100%) create mode 100644 catkit_core/EventFutex.inl rename catkit_core/{SynchronizationSemaphore.cpp => EventSemaphore.inl} (100%) create mode 100644 catkit_core/EventSpinLock.inl delete mode 100644 catkit_core/Synchronization.h delete mode 100644 catkit_core/Synchronization.inl delete mode 100644 catkit_core/SynchronizationConditionVariable.h delete mode 100644 catkit_core/SynchronizationSemaphore.h diff --git a/catkit_core/Event.h b/catkit_core/Event.h new file mode 100644 index 000000000..157f7712f --- /dev/null +++ b/catkit_core/Event.h @@ -0,0 +1,13 @@ +#ifndef EVENT_H +#define EVENT_H + +#include "EventBase.h" + +// Select which implementation to use based on the platform. +#ifdef _WIN32 +using Event = EventImpl; +#else +using Event = EventImpl; +#endif + +#endif // EVENT_H diff --git a/catkit_core/EventBase.h b/catkit_core/EventBase.h new file mode 100644 index 000000000..5cd91f6d4 --- /dev/null +++ b/catkit_core/EventBase.h @@ -0,0 +1,75 @@ +#ifndef EVENT_BASE_H +#define EVENT_BASE_H + +#include + +enum EventImplementationType +{ + ET_CONDITION_VARIABLE, + ET_FUTEX, + ET_SEMAPHORE, + ET_SPIN_LOCK +}; + +template +struct EventSharedState +{ +}; + +template +struct EventLocalState +{ +}; + +template +class EventImpl +{ +public: + using SharedState = EventSharedState; + using LocalState = EventLocalState; + +protected: + EventImpl(); + +public: + static std::unique_ptr> Create(const std::string &id, SharedState *shared_state); + static std::unique_ptr> Open(const std::string &id, SharedState *shared_state); + + void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); + void Signal(); + + void Lock(); + void Unlock(); + +protected: + SharedState *m_SharedState; + LocalState m_LocalState; +}; + +template +void EventImpl::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +{ + throw std::runtime_error("This type of event implementation wasn't implemented."); +} + +template +void EventImpl::Signal() +{ +} + +template +void EventImpl::Lock() +{ +} + +template +void EventImpl::Unlock() +{ +} + +#include "EventConditionVariable.inl" +#include "EventFutex.inl" +#include "EventSemaphore.inl" +#include "EventSpinLock.inl" + +#endif // EVENT_BASE_H diff --git a/catkit_core/SynchronizationConditionVariable.cpp b/catkit_core/EventConditionVariable.inl similarity index 100% rename from catkit_core/SynchronizationConditionVariable.cpp rename to catkit_core/EventConditionVariable.inl diff --git a/catkit_core/EventFutex.inl b/catkit_core/EventFutex.inl new file mode 100644 index 000000000..e69de29bb diff --git a/catkit_core/SynchronizationSemaphore.cpp b/catkit_core/EventSemaphore.inl similarity index 100% rename from catkit_core/SynchronizationSemaphore.cpp rename to catkit_core/EventSemaphore.inl diff --git a/catkit_core/EventSpinLock.inl b/catkit_core/EventSpinLock.inl new file mode 100644 index 000000000..e69de29bb diff --git a/catkit_core/Synchronization.h b/catkit_core/Synchronization.h deleted file mode 100644 index a00fce299..000000000 --- a/catkit_core/Synchronization.h +++ /dev/null @@ -1,58 +0,0 @@ -#ifndef SYNCHRONZATION_H -#define SYNCHRONZATION_H - -#include -#include -#include -#include - -template -class SynchronizationBase -{ -public: - using SharedState = SharedStateType; - - SynchronizationBase(); - SynchronizationBase(const SynchronizationBase &other) = delete; - ~SynchronizationBase(); - - SynchronizationBase &operator=(const SynchronizationBase &other) = delete; - - void Initialize(const std::string &id, SharedState *shared_state, bool create); - - void Create(const std::string &id, SharedState *shared_state); - - void Open(const std::string &id, SharedState *shared_state); - void Close(); - - void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); - void Signal(); - - void Lock(); - void Unlock(); - -protected: - void CreateImpl(const std::string &id, SharedState *shared_state); - - void OpenImpl(const std::string &id, SharedState *shared_state); - void CloseImpl(); - - bool m_IsOwner; - bool m_IsOpen; - SharedState *m_SharedState; -}; - -template -class SynchronizationLock -{ -public: - SynchronizationLock(T &sync); - ~SynchronizationLock(); - -private: - T &m_Sync; -}; - -#include "Synchronization.inl" - -#endif // SYNCHRONIZATION_H diff --git a/catkit_core/Synchronization.inl b/catkit_core/Synchronization.inl deleted file mode 100644 index 1f842e62c..000000000 --- a/catkit_core/Synchronization.inl +++ /dev/null @@ -1,124 +0,0 @@ -#include "Synchronization.h" - -#include -#include - -template -SynchronizationLock::SynchronizationLock(T &sync) - : m_Sync(sync) -{ - m_Sync.Lock(); -} - -template -SynchronizationLock::~SynchronizationLock() -{ - m_Sync.Unlock(); -} - -template -SynchronizationBase::SynchronizationBase() - : m_IsOwner(false), m_SharedState(nullptr), m_IsOpen(false) -{ -} - -template -SynchronizationBase::~SynchronizationBase() -{ - Close(); -} - -template -void SynchronizationBase::Initialize(const std::string &id, SharedState *shared_state, bool create) -{ - if (create) - { - Create(id, shared_state); - } - else - { - Open(id, shared_state); - } -} - -template -void SynchronizationBase::Create(const std::string &id, SharedState *shared_state) -{ - if (m_IsOpen) - throw std::runtime_error("Create called on an already initialized Synchronization object."); - - if (!shared_state) - throw std::runtime_error("The passed shared data was a nullptr."); - - static_cast(this)->CreateImpl(id, shared_state); - - m_IsOpen = true; - m_IsOwner = true; - - m_SharedState = shared_state; -} - -template -void SynchronizationBase::Open(const std::string &id, SharedState *shared_state) -{ - if (m_IsOpen) - throw std::runtime_error("Open called on an already initialized Synchronization object."); - - if (!shared_state) - throw std::runtime_error("The passed shared data was a nullptr."); - - - static_cast(this)->CreateImpl(id, shared_state); - - m_IsOpen = true; - m_IsOwner = false; - - m_SharedState = shared_state; -} - -template -void SynchronizationBase::Close() -{ - if (!m_IsOpen) - return; - - static_cast(this)->CloseImpl(); - - m_IsOpen = false; - m_SharedState = nullptr; -} - -template -void SynchronizationBase::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) -{ -} - -template -void SynchronizationBase::Signal() -{ -} - -template -void SynchronizationBase::Lock() -{ -} - -template -void SynchronizationBase::Unlock() -{ -} - -template -void SynchronizationBase::CreateImpl(const std::string &id, SharedState *shared_state) -{ -} - -template -void SynchronizationBase::OpenImpl(const std::string &id, SharedState *shared_state) -{ -} - -template -void SynchronizationBase::CloseImpl() -{ -} diff --git a/catkit_core/SynchronizationConditionVariable.h b/catkit_core/SynchronizationConditionVariable.h deleted file mode 100644 index 2027215c0..000000000 --- a/catkit_core/SynchronizationConditionVariable.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef SYNCHRONIZATION_CONDITION_VARIABLE_H -#define SYNCHRONIZATION_CONDITION_VARIABLE_H - -#include "Synchronization.h" - -#if defined(__linux__) || defined(__APPLE__) - -#include - -struct SharedStateConditionVariable -{ - pthread_cond_t m_Condition; - pthread_mutex_t m_Mutex; -}; - -class SynchronizationConditionVariable : public SynchronizationBase -{ - friend SynchronizationBase; - -public: - void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); - void Signal(); - - void Lock(); - void Unlock(); - -protected: - void CreateImpl(const std::string &id, SynchronizationConditionVariable::SharedState *shared_state); -}; - -#endif // Linux or Apple - -#endif // SYNCHRONIZATION_CONDITION_VARIABLE_H diff --git a/catkit_core/SynchronizationSemaphore.h b/catkit_core/SynchronizationSemaphore.h deleted file mode 100644 index 46cd2f923..000000000 --- a/catkit_core/SynchronizationSemaphore.h +++ /dev/null @@ -1,43 +0,0 @@ -#ifndef SYNCHRONIZATION_SEMAPHORE_H -#define SYNCHRONIZATION_SEMAPHORE_H - -#include "Synchronization.h" - -#include - -#ifdef _WIN32 - #define WIN32_LEAN_AND_MEAN - #define NOMINMAX - #include -#endif - -#ifdef _WIN32 -struct SharedStateSemaphore -{ - std::atomic_long m_NumReadersWaiting; -}; - -class SynchronizationSemaphore : public SynchronizationBase -{ - friend SynchronizationBase; - -public: - void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); - void Signal(); - -protected: - void CreateImpl(const std::string &id, SharedState *shared_state); - void OpenImpl(const std::string &id, SharedState *shared_state); - void CloseImpl(); - - HANDLE m_Semaphore; -}; -#endif // _WIN32 - -#ifdef __linux__ -#endif // __linux__ - -#ifdef __APPLE__ -#endif // __APPLE__ - -# endif // SYNCHRONIZATION_SEMAPHORE_H From 95233424e3d8a1fc6168cf42b6316ea28dbe0149 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 22 Jan 2025 23:16:04 -0800 Subject: [PATCH 15/23] Refactor into Event implementation structure. --- catkit_core/EventBase.h | 46 ++++++++++++++++++++++++++ catkit_core/EventConditionVariable.inl | 26 +++++++++++---- catkit_core/EventSemaphore.inl | 25 ++++++++++---- 3 files changed, 85 insertions(+), 12 deletions(-) diff --git a/catkit_core/EventBase.h b/catkit_core/EventBase.h index 5cd91f6d4..92529986a 100644 --- a/catkit_core/EventBase.h +++ b/catkit_core/EventBase.h @@ -32,6 +32,8 @@ class EventImpl EventImpl(); public: + ~EventImpl(); + static std::unique_ptr> Create(const std::string &id, SharedState *shared_state); static std::unique_ptr> Open(const std::string &id, SharedState *shared_state); @@ -42,10 +44,26 @@ class EventImpl void Unlock(); protected: + void CreateImpl(const std::string &id, SharedState *shared_state); + void OpenImpl(const std::string &id, SharedState *shared_state); + + bool m_IsOwner; + SharedState *m_SharedState; LocalState m_LocalState; }; +template +EventImpl::EventImpl() + : m_IsOwner(false), m_SharedState(nullptr) +{ +} + +template +EventImpl::~EventImpl() +{ +} + template void EventImpl::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { @@ -67,6 +85,34 @@ void EventImpl::Unlock() { } +template +void EventImpl::CreateImpl(const std::string &id, EventImpl::SharedState *shared_state) +{ + if (!shared_state) + throw std::runtime_error("The passed shared data was a nullptr."); + + auto obj = std::make_shared>(); + + obj->CreateImpl(id, shared_state); + + obj->m_IsOwner = true; + + obj->m_SharedState = shared_state; +} + +template +void EventImpl::OpenImpl(const std::string &id, EventImpl::SharedState *shared_state) +{ + if (!shared_state) + throw std::runtime_error("The passed shared data was a nullptr."); + + OpenImpl(id, shared_state); + + m_IsOwner = false; + + m_SharedState = shared_state; +} + #include "EventConditionVariable.inl" #include "EventFutex.inl" #include "EventSemaphore.inl" diff --git a/catkit_core/EventConditionVariable.inl b/catkit_core/EventConditionVariable.inl index 8bc97eaff..3be912780 100644 --- a/catkit_core/EventConditionVariable.inl +++ b/catkit_core/EventConditionVariable.inl @@ -1,10 +1,20 @@ -#include "SynchronizationConditionVariable.h" +#include "EventBase.h" #include "Timing.h" +using EventConditionVariable = EventImpl; + #if defined(__linux__) || defined(__APPLE__) -void SynchronizationConditionVariable::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +template<> +struct EventSharedState +{ + pthread_mutex_t m_Mutex; + pthread_cond_t m_Condition; +}; + +template<> +void EventConditionVariable::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { Timer timer; @@ -39,22 +49,26 @@ void SynchronizationConditionVariable::Wait(long timeout_in_ms, std::function +void EventConditionVariable::Signal() { pthread_cond_broadcast(&(m_SharedState->m_Condition)); } -void SynchronizationConditionVariable::Lock() +template<> +void EventConditionVariable::Lock() { pthread_mutex_lock(&(m_SharedState->m_Mutex)); } -void SynchronizationConditionVariable::Unlock() +template<> +void EventConditionVariable::Unlock() { pthread_mutex_unlock(&(m_SharedState->m_Mutex)); } -void SynchronizationConditionVariable::CreateImpl(const std::string &id, SynchronizationConditionVariable::SharedState *shared_state) +template<> +void EventConditionVariable::CreateImpl(const std::string &id, EventConditionVariable::SharedState *shared_state) { pthread_mutexattr_t mutex_attr; pthread_mutexattr_init(&mutex_attr); diff --git a/catkit_core/EventSemaphore.inl b/catkit_core/EventSemaphore.inl index b381011ff..dc079e89e 100644 --- a/catkit_core/EventSemaphore.inl +++ b/catkit_core/EventSemaphore.inl @@ -1,8 +1,18 @@ -#include "SynchronizationSemaphore.h" +#include "EventBase.h" + #include "Timing.h" +using EventSemaphore = EventImpl; + #ifdef _WIN32 -void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +template<> +struct EventSharedState +{ + std::atomic_long m_NumReadersWaiting; +}; + +template<> +void EventSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { Timer timer; DWORD res = WAIT_OBJECT_0; @@ -50,7 +60,8 @@ void SynchronizationSemaphore::Wait(long timeout_in_ms, std::function co } } -void SynchronizationSemaphore::Signal() +template<> +void EventSemaphore::Signal() { // Notify waiting processes. long num_readers_waiting = m_SharedState->m_NumReadersWaiting.exchange(0); @@ -64,7 +75,8 @@ void SynchronizationSemaphore::Signal() ReleaseSemaphore(m_Semaphore, (LONG) num_readers_waiting, NULL); } -void SynchronizationSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) +template<> +void EventSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) { m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); @@ -74,7 +86,8 @@ void SynchronizationSemaphore::CreateImpl(const std::string &id, SharedState *sh shared_state->m_NumReadersWaiting = 0; } -void SynchronizationSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) +template<> +void EventSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) { m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); @@ -82,7 +95,7 @@ void SynchronizationSemaphore::OpenImpl(const std::string &id, SharedState *shar throw std::runtime_error("Something went wrong while opening semaphore."); } -void SynchronizationSemaphore::CloseImpl() +void EventSemaphore::~EventImpl() { CloseHandle(m_Semaphore); } From f6f51577fb94339dd46edbf9b34ac6adb4997513 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Wed, 22 Jan 2025 23:16:20 -0800 Subject: [PATCH 16/23] Remove non-existent files. --- catkit_core/CMakeLists.txt | 2 -- 1 file changed, 2 deletions(-) diff --git a/catkit_core/CMakeLists.txt b/catkit_core/CMakeLists.txt index 4183440a6..b18ae6a99 100644 --- a/catkit_core/CMakeLists.txt +++ b/catkit_core/CMakeLists.txt @@ -14,8 +14,6 @@ add_compile_options($<$:/MP1>) add_library(catkit_core STATIC DataStream.cpp SharedMemory.cpp - SynchronizationConditionVariable.cpp - SynchronizationSemaphore.cpp Timing.cpp Log.cpp LogConsole.cpp From e47ed9ec3e98deae4bbf9581044e208aac42cf9e Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 00:33:48 -0800 Subject: [PATCH 17/23] Fix base implementation of Create() and Open(). --- catkit_core/EventBase.h | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/catkit_core/EventBase.h b/catkit_core/EventBase.h index 92529986a..3b6ceef5b 100644 --- a/catkit_core/EventBase.h +++ b/catkit_core/EventBase.h @@ -86,31 +86,35 @@ void EventImpl::Unlock() } template -void EventImpl::CreateImpl(const std::string &id, EventImpl::SharedState *shared_state) +std::unique_ptr> EventImpl::Create(const std::string &id, EventImpl::SharedState *shared_state) { if (!shared_state) throw std::runtime_error("The passed shared data was a nullptr."); - auto obj = std::make_shared>(); + auto obj = std::unique_ptr>(new EventImpl()); obj->CreateImpl(id, shared_state); obj->m_IsOwner = true; - obj->m_SharedState = shared_state; + + return obj; } template -void EventImpl::OpenImpl(const std::string &id, EventImpl::SharedState *shared_state) +std::unique_ptr> EventImpl::Open(const std::string &id, EventImpl::SharedState *shared_state) { if (!shared_state) throw std::runtime_error("The passed shared data was a nullptr."); - OpenImpl(id, shared_state); + auto obj = std::unique_ptr>(new EventImpl()); - m_IsOwner = false; + obj->OpenImpl(id, shared_state); + + obj->m_IsOwner = false; + obj->m_SharedState = shared_state; - m_SharedState = shared_state; + return obj; } #include "EventConditionVariable.inl" From 6a408ddf3333fd17a32fd37bfc81cd0bff785a74 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 00:34:09 -0800 Subject: [PATCH 18/23] Implement a lock guard for events. --- catkit_core/Event.h | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/catkit_core/Event.h b/catkit_core/Event.h index 157f7712f..faa099745 100644 --- a/catkit_core/Event.h +++ b/catkit_core/Event.h @@ -3,10 +3,29 @@ #include "EventBase.h" +template +class EventLockGuard +{ +public: + inline EventLockGuard(Event &event) + : m_Event(event) + { + m_Event->Lock(); + } + + inline ~EventLockGuard() + { + m_Event->Unlock(); + } + +private: + Event &m_Event; +}; + // Select which implementation to use based on the platform. #ifdef _WIN32 using Event = EventImpl; -#else +#elif defined(__linux__) or defined(__APPLE__) using Event = EventImpl; #endif From a8580c10aba68ba0f73e4f1fd92b69322e2dd34b Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 00:34:26 -0800 Subject: [PATCH 19/23] Make empty template specializations inline. --- catkit_core/EventBase.h | 14 ++++++++------ catkit_core/EventConditionVariable.inl | 16 +++++++++++----- catkit_core/EventSemaphore.inl | 11 ++++++----- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/catkit_core/EventBase.h b/catkit_core/EventBase.h index 3b6ceef5b..263dc4c55 100644 --- a/catkit_core/EventBase.h +++ b/catkit_core/EventBase.h @@ -34,18 +34,20 @@ class EventImpl public: ~EventImpl(); + // Note: do not implement these functions for specific implementations. static std::unique_ptr> Create(const std::string &id, SharedState *shared_state); static std::unique_ptr> Open(const std::string &id, SharedState *shared_state); - void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); - void Signal(); + // Note: implement the following functions for specific implementations. + inline void Wait(long timeout_in_ms, std::function condition, void (*error_check)()); + inline void Signal(); - void Lock(); - void Unlock(); + inline void Lock(); + inline void Unlock(); protected: - void CreateImpl(const std::string &id, SharedState *shared_state); - void OpenImpl(const std::string &id, SharedState *shared_state); + inline void CreateImpl(const std::string &id, SharedState *shared_state); + inline void OpenImpl(const std::string &id, SharedState *shared_state); bool m_IsOwner; diff --git a/catkit_core/EventConditionVariable.inl b/catkit_core/EventConditionVariable.inl index 3be912780..da7ce818a 100644 --- a/catkit_core/EventConditionVariable.inl +++ b/catkit_core/EventConditionVariable.inl @@ -14,7 +14,7 @@ struct EventSharedState }; template<> -void EventConditionVariable::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +inline void EventConditionVariable::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { Timer timer; @@ -50,25 +50,25 @@ void EventConditionVariable::Wait(long timeout_in_ms, std::function cond } template<> -void EventConditionVariable::Signal() +inline void EventConditionVariable::Signal() { pthread_cond_broadcast(&(m_SharedState->m_Condition)); } template<> -void EventConditionVariable::Lock() +inline void EventConditionVariable::Lock() { pthread_mutex_lock(&(m_SharedState->m_Mutex)); } template<> -void EventConditionVariable::Unlock() +inline void EventConditionVariable::Unlock() { pthread_mutex_unlock(&(m_SharedState->m_Mutex)); } template<> -void EventConditionVariable::CreateImpl(const std::string &id, EventConditionVariable::SharedState *shared_state) +inline void EventConditionVariable::CreateImpl(const std::string &id, EventConditionVariable::SharedState *shared_state) { pthread_mutexattr_t mutex_attr; pthread_mutexattr_init(&mutex_attr); @@ -86,4 +86,10 @@ void EventConditionVariable::CreateImpl(const std::string &id, EventConditionVar pthread_condattr_destroy(&cond_attr); } +template<> +inline void EventConditionVariable::OpenImpl(const std::string &id, EventConditionVariable::SharedState *shared_state) +{ + // Nothing to do. +} + #endif diff --git a/catkit_core/EventSemaphore.inl b/catkit_core/EventSemaphore.inl index dc079e89e..9868d2b6f 100644 --- a/catkit_core/EventSemaphore.inl +++ b/catkit_core/EventSemaphore.inl @@ -12,7 +12,7 @@ struct EventSharedState }; template<> -void EventSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) +inline void EventSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { Timer timer; DWORD res = WAIT_OBJECT_0; @@ -61,7 +61,7 @@ void EventSemaphore::Wait(long timeout_in_ms, std::function condition, v } template<> -void EventSemaphore::Signal() +inline void EventSemaphore::Signal() { // Notify waiting processes. long num_readers_waiting = m_SharedState->m_NumReadersWaiting.exchange(0); @@ -76,7 +76,7 @@ void EventSemaphore::Signal() } template<> -void EventSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) +inline void EventSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) { m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); @@ -87,7 +87,7 @@ void EventSemaphore::CreateImpl(const std::string &id, SharedState *shared_state } template<> -void EventSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) +inline void EventSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) { m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); @@ -95,7 +95,8 @@ void EventSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) throw std::runtime_error("Something went wrong while opening semaphore."); } -void EventSemaphore::~EventImpl() +template<> +inline void EventSemaphore::~EventImpl() { CloseHandle(m_Semaphore); } From 68102ab963685628665c2f2be45a99794299204c Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 00:34:48 -0800 Subject: [PATCH 20/23] Use Events rather than Synchronization classes. --- catkit_core/DataStream.cpp | 15 +++++++++------ catkit_core/DataStream.h | 14 +++----------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/catkit_core/DataStream.cpp b/catkit_core/DataStream.cpp index b80754b6f..9bac19bbc 100644 --- a/catkit_core/DataStream.cpp +++ b/catkit_core/DataStream.cpp @@ -68,6 +68,7 @@ void CopyString(char *dest, const char *src, size_t n) DataStream::DataStream(const std::string &stream_id, std::shared_ptr shared_memory, bool create) : m_SharedMemory(shared_memory), + m_Event(nullptr), m_Header(nullptr), m_Buffer(nullptr), m_NextFrameIdToRead(0), m_BufferHandlingMode(BM_NEWEST_ONLY) @@ -75,8 +76,6 @@ DataStream::DataStream(const std::string &stream_id, std::shared_ptrGetAddress(); m_Header = (DataStreamHeader *) buffer; m_Buffer = ((char *) buffer) + sizeof(DataStreamHeader); - - m_Synchronization.Initialize(stream_id, &(m_Header->m_SynchronizationSharedData), create); } DataStream::~DataStream() @@ -119,6 +118,8 @@ std::shared_ptr DataStream::Create(const std::string &stream_name, c data_stream->UpdateParameters(type, dimensions, num_frames_in_buffer); + data_stream->m_Event = Event::Create(stream_id, &(header->m_EventSharedState)); + return data_stream; } @@ -141,6 +142,8 @@ std::shared_ptr DataStream::Open(const std::string &stream_id) // Don't read frames that already are available at the time the data stream is opened. data_stream->m_NextFrameIdToRead = data_stream->m_Header->m_LastId; + data_stream->m_Event = Event::Open(stream_id, &(data_stream->m_Header->m_EventSharedState)); + return data_stream; } @@ -176,7 +179,7 @@ void DataStream::SubmitFrame(size_t id) { // Obtain a lock as we are about to modify the condition of the // synchronization. - auto lock = SynchronizationLock(m_Synchronization); + auto lock = EventLockGuard(m_Event); // Make frame available: // Use a do-while loop to ensure we are never decrementing the last id. @@ -189,7 +192,7 @@ void DataStream::SubmitFrame(size_t id) break; } while (!m_Header->m_LastId.compare_exchange_strong(last_id, id + 1)); - m_Synchronization.Signal(); + m_Event->Signal(); } auto ts = GetTimeStamp(); @@ -334,8 +337,8 @@ DataFrame DataStream::GetFrame(size_t id, long wait_time_in_ms, void (*error_che // Wait until frame becomes available. // Obtain a lock first. - auto lock = SynchronizationLock(m_Synchronization); - m_Synchronization.Wait(wait_time_in_ms, [this, id]() { return this->m_Header->m_LastId > id; }, error_check); + auto lock = EventLockGuard(m_Event); + m_Event->Wait(wait_time_in_ms, [this, id]() { return this->m_Header->m_LastId > id; }, error_check); } size_t offset = (id % m_Header->m_NumFramesInBuffer) * m_Header->m_NumBytesPerFrame; diff --git a/catkit_core/DataStream.h b/catkit_core/DataStream.h index 440f26000..66968d444 100644 --- a/catkit_core/DataStream.h +++ b/catkit_core/DataStream.h @@ -7,17 +7,9 @@ #include #include "SharedMemory.h" -#include "Synchronization.h" -#include "SynchronizationConditionVariable.h" -#include "SynchronizationSemaphore.h" +#include "Event.h" #include "Tensor.h" -#ifdef _WIN32 -using Synchronization = SynchronizationSemaphore; -#elif defined(__linux__) or defined(__APPLE__) -using Synchronization = SynchronizationConditionVariable; -#endif - const char * const CURRENT_DATASTREAM_VERSION = "0.2"; const size_t MAX_NUM_FRAMES_IN_BUFFER = 20; const long INFINITE_WAIT_TIME = LONG_MAX; @@ -54,7 +46,7 @@ struct DataStreamHeader double m_FrameRateCounter; - Synchronization::SharedState m_SynchronizationSharedData; + Event::SharedState m_EventSharedState; }; class DataFrame : public Tensor @@ -125,7 +117,7 @@ class DataStream DataStreamHeader *m_Header; char *m_Buffer; - Synchronization m_Synchronization; + std::shared_ptr m_Event; size_t m_NextFrameIdToRead; BufferHandlingMode m_BufferHandlingMode; From c67223a30f0a1d0692f0eeaa16647866e9247f0f Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 00:45:48 -0800 Subject: [PATCH 21/23] Use local state structure. --- catkit_core/EventSemaphore.inl | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/catkit_core/EventSemaphore.inl b/catkit_core/EventSemaphore.inl index 9868d2b6f..f5eecf6c6 100644 --- a/catkit_core/EventSemaphore.inl +++ b/catkit_core/EventSemaphore.inl @@ -11,6 +11,12 @@ struct EventSharedState std::atomic_long m_NumReadersWaiting; }; +template<> +struct EventLocalState +{ + HANDLE m_Semaphore; +}; + template<> inline void EventSemaphore::Wait(long timeout_in_ms, std::function condition, void (*error_check)()) { @@ -31,7 +37,7 @@ inline void EventSemaphore::Wait(long timeout_in_ms, std::function condi } // Wait for a maximum of 20ms to perform periodic error checking. - auto res = WaitForSingleObject(m_Semaphore, (unsigned long) (std::min)(20L, timeout_in_ms)); + res = WaitForSingleObject(m_LocalState.m_Semaphore, (unsigned long) (std::min)(20L, timeout_in_ms)); if (res == WAIT_TIMEOUT && timer.GetTime() > (timeout_in_ms * 0.001)) { @@ -72,15 +78,15 @@ inline void EventSemaphore::Signal() // as there are checks in place for that. if (num_readers_waiting > 0) - ReleaseSemaphore(m_Semaphore, (LONG) num_readers_waiting, NULL); + ReleaseSemaphore(m_LocalState.m_Semaphore, (LONG) num_readers_waiting, NULL); } template<> inline void EventSemaphore::CreateImpl(const std::string &id, SharedState *shared_state) { - m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); + m_LocalState.m_Semaphore = CreateSemaphore(NULL, 0, 9999, (id + ".sem").c_str()); - if (m_Semaphore == NULL) + if (m_LocalState.m_Semaphore == NULL) throw std::runtime_error("Something went wrong while creating semaphore."); shared_state->m_NumReadersWaiting = 0; @@ -89,16 +95,16 @@ inline void EventSemaphore::CreateImpl(const std::string &id, SharedState *share template<> inline void EventSemaphore::OpenImpl(const std::string &id, SharedState *shared_state) { - m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); + m_LocalState.m_Semaphore = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, (id + ".sem").c_str()); - if (m_Semaphore == NULL) + if (m_LocalState.m_Semaphore == NULL) throw std::runtime_error("Something went wrong while opening semaphore."); } template<> -inline void EventSemaphore::~EventImpl() +inline EventSemaphore::~EventImpl() { - CloseHandle(m_Semaphore); + CloseHandle(m_LocalState.m_Semaphore); } #ifdef __linux__ From ac53f12cb3dc2fcd0179a5c861f3db18ed48896a Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 00:46:13 -0800 Subject: [PATCH 22/23] Add missing include. --- catkit_core/EventBase.h | 1 + 1 file changed, 1 insertion(+) diff --git a/catkit_core/EventBase.h b/catkit_core/EventBase.h index 263dc4c55..0c39a0bff 100644 --- a/catkit_core/EventBase.h +++ b/catkit_core/EventBase.h @@ -2,6 +2,7 @@ #define EVENT_BASE_H #include +#include enum EventImplementationType { From 79898078afa9f69f6281a2fdb0e2762078d74238 Mon Sep 17 00:00:00 2001 From: Emiel Por Date: Thu, 23 Jan 2025 12:58:33 -0800 Subject: [PATCH 23/23] Add missing includes. --- catkit_core/EventConditionVariable.inl | 4 ++++ catkit_core/EventSemaphore.inl | 7 +++++++ 2 files changed, 11 insertions(+) diff --git a/catkit_core/EventConditionVariable.inl b/catkit_core/EventConditionVariable.inl index da7ce818a..881f13ed0 100644 --- a/catkit_core/EventConditionVariable.inl +++ b/catkit_core/EventConditionVariable.inl @@ -2,6 +2,10 @@ #include "Timing.h" +#if defined(__linux__) || defined(__APPLE__) + #include +#endif + using EventConditionVariable = EventImpl; #if defined(__linux__) || defined(__APPLE__) diff --git a/catkit_core/EventSemaphore.inl b/catkit_core/EventSemaphore.inl index f5eecf6c6..1bff3e115 100644 --- a/catkit_core/EventSemaphore.inl +++ b/catkit_core/EventSemaphore.inl @@ -2,9 +2,16 @@ #include "Timing.h" +#ifdef _WIN32 + #define WIN32_LEAN_AND_MEAN + #define NOMINMAX + #include +#endif // _WIN32 + using EventSemaphore = EventImpl; #ifdef _WIN32 + template<> struct EventSharedState {