Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Lock-free Queue #253

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
561d370
add libboost-dev dependency
saikishor Dec 19, 2024
2455b16
Add LockFreeBuffer class
saikishor Dec 19, 2024
02db30d
Add LockFreeSPSCQueueBase class
saikishor Dec 28, 2024
1468fe8
Add more functionality and tests
saikishor Dec 29, 2024
2ef8020
Add tests to CMakeLists
saikishor Dec 30, 2024
268138b
Draft: functional lockfree queue (not spsc_queue)
saikishor Dec 30, 2024
5e94cc9
Add more template enabled methods
saikishor Dec 31, 2024
34d03b5
update tests
saikishor Dec 31, 2024
6b629b1
Update the LockFreeQueue to completely support SPSC and MPMC queues +…
saikishor Dec 31, 2024
9a9951d
cleanup and change license
saikishor Jan 1, 2025
8c2c974
Update the documentation and add some minor tests
saikishor Jan 1, 2025
c991609
Merge branch 'ros-controls:master' into boost/lockfree/realtime/buffer
saikishor Jan 1, 2025
860029e
Add is_lock_free method
saikishor Jan 1, 2025
fe75f45
Update documentation
saikishor Jan 1, 2025
52f53b1
Add get_latest method to the LockFreeQueue
saikishor Jan 1, 2025
356d87e
update CMakeLists.txt
saikishor Jan 2, 2025
ac7ae53
Apply suggestions from code review
saikishor Jan 2, 2025
66dcdfd
Change template arg LockFreeSPSCContainer to LockFreeContainer
saikishor Jan 2, 2025
5d990c0
Install boost
christophfroehlich Jan 2, 2025
71321b3
cleanup the commented tests
saikishor Jan 2, 2025
4f61624
Update include/realtime_tools/lock_free_queue.hpp
saikishor Jan 3, 2025
9a8acdf
Update tests
saikishor Jan 3, 2025
ddb5ca9
Update condition for bounded push and update documentation
saikishor Jan 3, 2025
494b47d
Install boost in Windows CI
christophfroehlich Jan 3, 2025
8d1bae4
Unify empty method with internal conditioning of SPSC queue or MPMC q…
saikishor Jan 3, 2025
f90d22b
Try to set CMP0167 for boost on windows
christophfroehlich Jan 3, 2025
081d2f6
Remove quiet and add Boost::boost
christophfroehlich Jan 3, 2025
2f8d285
Remove atomic link libraries
christophfroehlich Jan 3, 2025
fff6ee3
Try to fix msvc atomic
christophfroehlich Jan 3, 2025
2bb6519
Try to fix atomic.lib on msvc
christophfroehlich Jan 3, 2025
551a9fb
Don't link atomic on windows
christophfroehlich Jan 3, 2025
689f63c
Use master branch of CI repo
christophfroehlich Jan 3, 2025
b8becad
Simplify push method and also add fixed_size arg to the MPMC queue
saikishor Jan 9, 2025
bd74a53
reuse tests to test all constructable types of LockFreeQueues
saikishor Jan 9, 2025
0d3fc7e
Merge branch 'master' into boost/lockfree/realtime/buffer
saikishor Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ find_package(ament_cmake REQUIRED)
foreach(Dependency IN ITEMS ${THIS_PACKAGE_INCLUDE_DEPENDS})
find_package(${Dependency} REQUIRED)
endforeach()
find_package(Boost REQUIRED)

add_library(realtime_tools SHARED
src/realtime_clock.cpp
Expand Down Expand Up @@ -74,6 +75,9 @@ if(BUILD_TESTING)
ament_add_gmock(realtime_buffer_tests test/realtime_buffer_tests.cpp)
target_link_libraries(realtime_buffer_tests realtime_tools)

ament_add_gmock(lock_free_queue_tests test/lock_free_queue_tests.cpp)
target_link_libraries(lock_free_queue_tests realtime_tools atomic)

ament_add_gmock(realtime_clock_tests test/realtime_clock_tests.cpp)
target_link_libraries(realtime_clock_tests realtime_tools)

Expand Down
362 changes: 362 additions & 0 deletions include/realtime_tools/lock_free_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,362 @@
// Copyright 2025 PAL Robotics S.L.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// \author Sai Kishor Kothakota

#ifndef REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_
#define REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_

#include <chrono>
#include <mutex>
#include <type_traits>
#include <utility>

#include <boost/lockfree/queue.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/lockfree/stack.hpp>

namespace
{
// Trait to check if the capacity is set
template <typename T>
struct has_capacity : std::false_type
{
};

template <typename T, std::size_t Capacity>
struct has_capacity<boost::lockfree::spsc_queue<T, boost::lockfree::capacity<Capacity>>>
: std::true_type
{
};

template <typename T, std::size_t Capacity>
struct has_capacity<boost::lockfree::queue<T, boost::lockfree::capacity<Capacity>>> : std::true_type
{
};

template <typename T, std::size_t Capacity, bool FixedSize>
struct has_capacity<boost::lockfree::queue<
T, boost::lockfree::capacity<Capacity>, boost::lockfree::fixed_sized<FixedSize>>> : std::true_type
{
};

// Traits to check if the queue is spsc_queue
template <typename T>
struct is_spsc_queue : std::false_type
{
};

template <typename T>
struct is_spsc_queue<boost::lockfree::spsc_queue<T>> : std::true_type
{
};

template <typename T, std::size_t Capacity>
struct is_spsc_queue<boost::lockfree::spsc_queue<T, boost::lockfree::capacity<Capacity>>>
: std::true_type
{
};

// Traits to get the capacity of the queue
// Default case: no capacity
template <typename T>
struct get_boost_lockfree_queue_capacity
{
static constexpr std::size_t value = 0; // Default to 0 if capacity is not defined
};

// Specialization for queues with capacity
template <typename T, std::size_t Capacity>
struct get_boost_lockfree_queue_capacity<
boost::lockfree::spsc_queue<T, boost::lockfree::capacity<Capacity>>>
{
static constexpr std::size_t value = Capacity;
};

// Specialization for queues with capacity
template <typename T, std::size_t Capacity>
struct get_boost_lockfree_queue_capacity<
boost::lockfree::queue<T, boost::lockfree::capacity<Capacity>>>
{
static constexpr std::size_t value = Capacity;
};

// Specialization for queues with capacity
template <typename T, std::size_t Capacity, bool FixedSize>
struct get_boost_lockfree_queue_capacity<boost::lockfree::queue<
T, boost::lockfree::capacity<Capacity>, boost::lockfree::fixed_sized<FixedSize>>>
{
static constexpr std::size_t value = Capacity;
};

} // namespace

namespace realtime_tools
{
/**
* @brief Base class for lock-free queues
* @tparam DataType Type of the data to be stored in the queue
* @tparam LockFreeSPSCContainer Type of the lock-free container - Typically boost::lockfree::spsc_queue or boost::lockfree::queue with their own template parameters
*/
template <typename DataType, typename LockFreeSPSCContainer>
class LockFreeQueueBase
{
public:
using T = DataType;

/**
* @brief Construct a new LockFreeQueueBase object
* @note This constructor is enabled only if the LockFreeSPSCContainer has a capacity set
*/
template <
bool HasCapacity = has_capacity<LockFreeSPSCContainer>::value,
typename std::enable_if_t<HasCapacity, int> = 0>
LockFreeQueueBase() : capacity_(get_boost_lockfree_queue_capacity<LockFreeSPSCContainer>::value)
{
}

/**
* @brief Construct a new LockFreeQueueBase object
* @param capacity Capacity of the queue
* @note This constructor is enabled only if the LockFreeSPSCContainer has no capacity set
*/
template <
bool HasCapacity = has_capacity<LockFreeSPSCContainer>::value,
typename std::enable_if_t<!HasCapacity, int> = 1>
explicit LockFreeQueueBase(std::size_t capacity) : data_queue_(capacity), capacity_(capacity)
{
}

virtual ~LockFreeQueueBase() = default;

/**
* @brief Pop the data from the queue
* @param data Data to be popped
* @return true If the data is popped successfully
* @return false If the queue is empty or the data could not be popped
*/
[[nodiscard]] bool pop(T & data) { return data_queue_.pop(data); }

/**
* @brief Pop the data from the queue
* @param data Data to be popped
* @return true If the data is popped successfully
* @return false If the queue is empty or the data could not be popped
* @note This function is enabled only if the data type is convertible to the template type of the queue
*/
template <typename U>
[[nodiscard]] std::enable_if_t<std::is_convertible_v<T, U>, bool> pop(U & data)
{
return data_queue_.pop(data);
}

/**
* @brief Push the data into the queue
* @param data Data to be pushed
* @return true If the data is pushed successfully
* @return false If the queue is full or the data could not be pushed
*/

[[nodiscard]] bool push(const T & data) { return data_queue_.push(data); }

/**
* @brief Push the data into the queue
* @param data Data to be pushed
* @return true If the data is pushed successfully
* @return false If the queue is full or the data could not be pushed
* @note This function is enabled only if the data type is convertible to the template type of the queue
*/
template <typename U>
[[nodiscard]] std::enable_if_t<std::is_convertible_v<T, U>, bool> push(const U & data)
{
return data_queue_.push(data);
}

/**
* @brief The bounded_push function pushes the data into the queue and pops the oldest data if the queue is full
* @param data Data to be pushed
* @return true If the data is pushed successfully
* @return false If the data could not be pushed
* @note This function is enabled only if the queue is a spsc_queue and only if the data type
* is convertible to the template type of the queue
* @note To be used in a single threaded applications
* @warning This method might not work as expected if it is used with 2 different threads one
* doing bounded_push and the other doing pop. In this case, the queue is no more a single producer
* single consumer queue. So, the behaviour might not be as expected.
*/
template <
typename U, bool IsSPSCQueue = is_spsc_queue<LockFreeSPSCContainer>::value,
typename std::enable_if_t<IsSPSCQueue, int> = 0>
[[nodiscard]] std::enable_if_t<std::is_convertible_v<T, U>, bool> bounded_push(const U & data)
{
if (!data_queue_.push(data)) {
// data_queue_.pop();
// return data_queue_.push(data);
saikishor marked this conversation as resolved.
Show resolved Hide resolved
T dummy;
data_queue_.pop(dummy);
return data_queue_.push(data);
}
return true;
}

/**
* @brief Get the latest data from the queue
* @param data Data to be filled with the latest data
* @return true If the data is filled with the latest data, false otherwise
* @return false If the queue is empty
* @note This function consumes all the data in the queue until the last data and returns the last element of the queue
*/
[[nodiscard]] bool get_latest(T & data)
{
return data_queue_.consume_all([&data](const T & d) { data = d; }) > 0;
}

/**
* @brief The bounded_push function pushes the data into the queue and pops the oldest data if the queue is full
* @param data Data to be pushed
* @return true If the data is pushed successfully
* @return false If the data could not be pushed
* @note This function is enabled only if the queue is of multiple producer and multiple consumer
* type and only if the data type is convertible to the template type of the queue
* @note Can be used in a multi threaded applications
*/
template <
typename U, bool IsSPSCQueue = is_spsc_queue<LockFreeSPSCContainer>::value,
typename std::enable_if_t<!IsSPSCQueue, int> = 1>
[[nodiscard]] std::enable_if_t<std::is_convertible_v<T, U>, bool> bounded_push(const U & data)
{
saikishor marked this conversation as resolved.
Show resolved Hide resolved
if (!data_queue_.bounded_push(data)) {
// data_queue_.pop();
// return data_queue_.push(data);
saikishor marked this conversation as resolved.
Show resolved Hide resolved
T dummy;
data_queue_.pop(dummy);
return data_queue_.bounded_push(data);
}
return true;
}

/**
* @brief Check if the queue is empty
* @return true If the queue is empty
* @return false If the queue is not empty
* @note This function is enabled only if the queue is a spsc_queue
* @note Should only be called from the consumer thread where pop is called
*/
template <
bool IsSPSCQueue = is_spsc_queue<LockFreeSPSCContainer>::value,
typename std::enable_if_t<IsSPSCQueue, int> = 0>
bool empty() const
{
return data_queue_.read_available() == 0;
}

/**
* @brief Check if the queue is empty
* @return true If the queue is empty
* @return false If the queue is not empty
* @note The result is only accurate, if no other thread modifies the queue.
* Therefore it is rarely practical to use this value in program logic.
* @note This function is enabled only if the queue is of multiple producer and multiple consumer type
*/
template <
bool IsSPSCQueue = is_spsc_queue<LockFreeSPSCContainer>::value,
typename std::enable_if_t<!IsSPSCQueue, int> = 1>
bool empty() const
{
return data_queue_.empty();
}

/**
* @brief Get the capacity of the queue
* @return std::size_t Capacity of the queue
*/
size_t capacity() const { return capacity_; }

/**
* @brief Get the size of the queue
* @return std::size_t Size of the queue
* @note This function is enabled only if the queue is a spsc_queue
*/
template <
bool IsSPSCQueue = is_spsc_queue<LockFreeSPSCContainer>::value,
typename std::enable_if_t<IsSPSCQueue, int> = 0>
std::size_t size() const
{
return data_queue_.read_available();
}

/**
* @brief The method to check if the queue is lock free
* @return true If the queue is lock free, false otherwise
* @warning It only checks, if the queue head and tail nodes and the freelist can
* be modified in a lock-free manner. On most platforms, the whole implementation
* is lock-free, if this is true. Using c++0x-style atomics, there is no possibility
* to provide a completely accurate implementation, because one would need to test
* every internal node, which is impossible if further nodes will be allocated from
* the operating system.
* @link https://www.boost.org/doc/libs/1_74_0/doc/html/boost/lockfree/queue.html
*/
bool is_lock_free() const
{
if constexpr (is_spsc_queue<LockFreeSPSCContainer>::value) {
return true;
} else {
return data_queue_.is_lock_free();
}
}

/**
* @brief Get the lockfree container
* @return const LockFreeSPSCContainer& Reference to the lockfree container
*/
const LockFreeSPSCContainer & get_lockfree_container() const { return data_queue_; }

/**
* @brief Get the lockfree container
* @return LockFreeSPSCContainer& Reference to the lockfree container
*/
LockFreeSPSCContainer & get_lockfree_container() { return data_queue_; }

private:
LockFreeSPSCContainer data_queue_;
std::size_t capacity_;
}; // class

/**
* @brief Lock-free Single Producer Single Consumer Queue
* @tparam DataType Type of the data to be stored in the queue
* @tparam Capacity Capacity of the queue
*/
template <class DataType, std::size_t Capacity = 0>
using LockFreeSPSCQueue = std::conditional_t<
Capacity == 0, LockFreeQueueBase<DataType, boost::lockfree::spsc_queue<DataType>>,
LockFreeQueueBase<
DataType, boost::lockfree::spsc_queue<DataType, boost::lockfree::capacity<Capacity>>>>;

/**
* @brief Lock-free Multiple Producer Multiple Consumer Queue
* @tparam DataType Type of the data to be stored in the queue
* @tparam Capacity Capacity of the queue
* @tparam FixedSize Fixed size of the queue
*/
template <class DataType, std::size_t Capacity = 0, bool FixedSize = true>
using LockFreeMPMCQueue = std::conditional_t<
Capacity == 0, LockFreeQueueBase<DataType, boost::lockfree::queue<DataType>>,
LockFreeQueueBase<
DataType,
boost::lockfree::queue<
DataType, boost::lockfree::capacity<Capacity>, boost::lockfree::fixed_sized<FixedSize>>>>;

} // namespace realtime_tools
#endif // REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_
1 change: 1 addition & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

<buildtool_export_depend>ament_cmake</buildtool_export_depend>

<build_depend>libboost-dev</build_depend>
saikishor marked this conversation as resolved.
Show resolved Hide resolved
<depend>rclcpp</depend>
<depend>rclcpp_action</depend>
<depend>libcap-dev</depend>
Expand Down
Loading
Loading