Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
WAcry committed Nov 26, 2024
0 parents commit 4f988bb
Show file tree
Hide file tree
Showing 7 changed files with 439 additions and 0 deletions.
24 changes: 24 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
cmake_minimum_required(VERSION 3.14)
project(cpp_async_toolkit VERSION 1.0.0 LANGUAGES CXX)

# 设置C++标准
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)

# 添加编译选项
if(MSVC)
add_compile_options(/W4 /WX)
else()
add_compile_options(-Wall -Wextra -Wpedantic -Werror)
endif()

# 设置头文件目录
include_directories(${PROJECT_SOURCE_DIR}/include)

# 添加测试
enable_testing()
add_subdirectory(tests)

# 安装配置
install(DIRECTORY include/ DESTINATION include)
57 changes: 57 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# C++ Async Toolkit

一个现代C++高性能异步流式处理和并发工具库,提供了丰富的并发编程工具和抽象。

## 特性

- 💪 高性能异步任务处理器
- 🔄 流式处理链式API
- 🎯 线程池和任务调度器
- 🚀 无锁数据结构
- 📦 异步Future/Promise实现
- 🛠 并发工具集合
- 🎭 协程支持
- ⚡ 零拷贝设计
- 🔒 RAII资源管理

## 要求

- C++20 兼容的编译器
- CMake 3.14 或更高版本
- 支持的编译器:
- GCC 10+
- Clang 10+
- MSVC 2019+

## 构建

```bash
mkdir build
cd build
cmake ..
cmake --build .
```

## 使用示例

```cpp
#include <async_toolkit/task_pool.hpp>
#include <async_toolkit/pipeline.hpp>

// 创建任务池
async_toolkit::TaskPool pool(4); // 4个工作线程

// 创建处理管道
auto pipeline = async_toolkit::Pipeline::create()
.then([](int x) { return x * 2; })
.then([](int x) { return std::to_string(x); })
.then([](std::string x) { return "Result: " + x; });

// 提交任务
auto future = pool.submit(42, pipeline);
auto result = future.get(); // "Result: 84"
```
## 许可
MIT License
78 changes: 78 additions & 0 deletions include/async_toolkit/pipeline.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#pragma once

#include <functional>
#include <type_traits>
#include <concepts>
#include <memory>
#include <utility>

namespace async_toolkit {

template<typename T>
class Pipeline {
public:
using value_type = T;

template<typename F>
requires std::invocable<F, T>
auto then(F&& func) && {
using result_type = std::invoke_result_t<F, T>;
return Pipeline<result_type>(
[prev = std::move(processor_), f = std::forward<F>(func)]
(auto&& input) mutable {
return f(prev(std::forward<decltype(input)>(input)));
}
);
}

template<typename U>
auto process(U&& input) {
return processor_(std::forward<U>(input));
}

static auto create() {
return Pipeline<T>([](T x) { return x; });
}

private:
template<typename F>
explicit Pipeline(F&& func) : processor_(std::forward<F>(func)) {}

std::function<T(T)> processor_;
};

// 辅助函数,用于创建管道
template<typename T>
auto make_pipeline() {
return Pipeline<T>::create();
}

// 并行管道执行器
template<typename... Pipelines>
class ParallelPipeline {
public:
explicit ParallelPipeline(Pipelines... pipes)
: pipelines_(std::make_tuple(std::move(pipes)...)) {}

template<typename T>
auto process(T&& input) {
return std::apply([&input](auto&&... pipes) {
return std::make_tuple(
pipes.process(input)...
);
}, pipelines_);
}

private:
std::tuple<Pipelines...> pipelines_;
};

// 创建并行管道的辅助函数
template<typename... Pipelines>
auto parallel(Pipelines&&... pipes) {
return ParallelPipeline<std::remove_reference_t<Pipelines>...>(
std::forward<Pipelines>(pipes)...
);
}

} // namespace async_toolkit
99 changes: 99 additions & 0 deletions include/async_toolkit/task_pool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#pragma once

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <type_traits>
#include <concepts>

namespace async_toolkit {

class TaskPool {
public:
explicit TaskPool(size_t thread_count = std::thread::hardware_concurrency())
: stop_(false) {
for(size_t i = 0; i < thread_count; ++i) {
workers_.emplace_back([this] {
while(true) {
std::function<void()> task;
{
std::unique_lock lock(queue_mutex_);
condition_.wait(lock, [this] {
return stop_ || !tasks_.empty();
});

if(stop_ && tasks_.empty()) {
return;
}

task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
}

~TaskPool() {
{
std::unique_lock lock(queue_mutex_);
stop_ = true;
}
condition_.notify_all();
for(auto& worker : workers_) {
worker.join();
}
}

template<typename F, typename... Args>
requires std::invocable<F, Args...>
auto submit(F&& f, Args&&... args) {
using return_type = std::invoke_result_t<F, Args...>;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

std::future<return_type> result = task->get_future();
{
std::unique_lock lock(queue_mutex_);
if(stop_) {
throw std::runtime_error("Cannot submit task to stopped ThreadPool");
}
tasks_.emplace([task]() { (*task)(); });
}
condition_.notify_one();
return result;
}

template<typename Pipeline, typename T>
auto submit(T&& initial_value, Pipeline&& pipeline) {
return submit([value = std::forward<T>(initial_value),
pipeline = std::forward<Pipeline>(pipeline)]() mutable {
return pipeline.process(std::move(value));
});
}

size_t thread_count() const noexcept {
return workers_.size();
}

size_t queued_tasks() const {
std::unique_lock lock(queue_mutex_);
return tasks_.size();
}

private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
mutable std::mutex queue_mutex_;
std::condition_variable condition_;
bool stop_;
};

} // namespace async_toolkit
26 changes: 26 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
enable_testing()

# 添加Google Test
include(FetchContent)
FetchContent_Declare(
googletest
GIT_REPOSITORY https://github.com/google/googletest.git
GIT_TAG release-1.12.1
)
FetchContent_MakeAvailable(googletest)

# 添加测试可执行文件
add_executable(async_toolkit_tests
task_pool_test.cpp
pipeline_test.cpp
)

# 链接Google Test
target_link_libraries(async_toolkit_tests
PRIVATE
gtest
gtest_main
)

# 添加测试
add_test(NAME async_toolkit_tests COMMAND async_toolkit_tests)
74 changes: 74 additions & 0 deletions tests/pipeline_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#include <gtest/gtest.h>
#include <async_toolkit/pipeline.hpp>
#include <string>

using namespace async_toolkit;

TEST(Pipeline, BasicPipeline) {
auto pipeline = Pipeline<int>::create()
.then([](int x) { return x * 2; })
.then([](int x) { return x + 1; });

EXPECT_EQ(pipeline.process(20), 41);
}

TEST(Pipeline, TypeTransformation) {
auto pipeline = Pipeline<int>::create()
.then([](int x) { return std::to_string(x); })
.then([](std::string x) { return "Number: " + x; });

EXPECT_EQ(pipeline.process(42), "Number: 42");
}

TEST(Pipeline, EmptyPipeline) {
auto pipeline = Pipeline<int>::create();
EXPECT_EQ(pipeline.process(42), 42);
}

TEST(Pipeline, ComplexTransformation) {
auto pipeline = Pipeline<std::string>::create()
.then([](const std::string& s) { return s.length(); })
.then([](size_t len) { return len * 2; })
.then([](int x) { return x > 10; });

EXPECT_TRUE(pipeline.process("Hello World"));
EXPECT_FALSE(pipeline.process("Hi"));
}

TEST(ParallelPipeline, BasicParallel) {
auto pipe1 = Pipeline<int>::create()
.then([](int x) { return x * 2; });

auto pipe2 = Pipeline<int>::create()
.then([](int x) { return x + 1; });

auto parallel_pipe = parallel(std::move(pipe1), std::move(pipe2));
auto result = parallel_pipe.process(20);

EXPECT_EQ(std::get<0>(result), 40);
EXPECT_EQ(std::get<1>(result), 21);
}

TEST(ParallelPipeline, ComplexParallel) {
auto number_pipe = Pipeline<int>::create()
.then([](int x) { return x * 2; });

auto string_pipe = Pipeline<int>::create()
.then([](int x) { return std::to_string(x); })
.then([](std::string x) { return "Number: " + x; });

auto bool_pipe = Pipeline<int>::create()
.then([](int x) { return x > 50; });

auto parallel_pipe = parallel(
std::move(number_pipe),
std::move(string_pipe),
std::move(bool_pipe)
);

auto result = parallel_pipe.process(42);

EXPECT_EQ(std::get<0>(result), 84);
EXPECT_EQ(std::get<1>(result), "Number: 42");
EXPECT_FALSE(std::get<2>(result));
}
Loading

0 comments on commit 4f988bb

Please sign in to comment.