Skip to content

Commit

Permalink
Initial commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
Bosma committed Apr 30, 2017
0 parents commit 882558c
Show file tree
Hide file tree
Showing 6 changed files with 369 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.idea/
cmake-build-debug/
libScheduler.so
11 changes: 11 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
cmake_minimum_required(VERSION 3.7)
project(BosmaScheduler)

set(CMAKE_CXX_STANDARD 14)

# threads
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)

add_executable(example example.cpp)
target_link_libraries(example Threads::Threads)
83 changes: 83 additions & 0 deletions Cron.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#include <chrono>
#include <string>
#include <sstream>
#include <vector>
#include <iterator>

namespace Bosma {
using Clock = std::chrono::system_clock;
using namespace std::chrono_literals;

inline void add(std::tm &tm, std::chrono::nanoseconds time) {
auto tp = Clock::from_time_t(std::mktime(&tm));
auto tm_adjusted = Clock::to_time_t(tp + time);
tm = *std::localtime(&tm_adjusted);
}

class Cron {
public:
Cron(const std::string &expression) {
std::istringstream iss(expression);
std::vector<std::string> tokens{std::istream_iterator<std::string>{iss},
std::istream_iterator<std::string>{}};
minute = (tokens[0] == "*") ? -1 : std::stoi(tokens[0]);
hour = (tokens[1] == "*") ? -1 : std::stoi(tokens[1]);
day = (tokens[2] == "*") ? -1 : std::stoi(tokens[2]);
month = (tokens[3] == "*") ? -1 : std::stoi(tokens[3]);
day_of_week = (tokens[4] == "*") ? -1 : std::stoi(tokens[4]);
}

// http://stackoverflow.com/a/322058/1284550
Clock::time_point cron_to_next() const {
// get current time as a tm object
auto now = Clock::to_time_t(Clock::now());
std::tm next(*std::localtime(&now));
// it will always at least run the next minute
next.tm_sec = 0;
add(next, std::chrono::minutes(1));
while (true) {
if (month != -1 && next.tm_mon != month) {
// add a month
// if this will bring us over a year, increment the year instead and reset the month
if (next.tm_mon + 1 > 11) {
next.tm_mon = 0;
next.tm_year++;
} else
next.tm_mon++;

next.tm_mday = 1;
next.tm_hour = 0;
next.tm_min = 0;
continue;
}
if (day != -1 && next.tm_mday != day) {
add(next, std::chrono::hours(24));
next.tm_hour = 0;
next.tm_min = 0;
continue;
}
if (day_of_week != -1 && next.tm_wday != day_of_week) {
add(next, std::chrono::hours(24));
next.tm_wday++;
next.tm_hour = 0;
next.tm_min = 0;
continue;
}
if (hour != -1 && next.tm_hour != hour) {
add(next, std::chrono::hours(1));
next.tm_min = 0;
continue;
}
if (minute != -1 && next.tm_min != minute) {
add(next, std::chrono::minutes(1));
continue;
}
break;
}

return Clock::from_time_t(std::mktime(&next));
}

int minute, hour, day, month, day_of_week;
};
}
52 changes: 52 additions & 0 deletions InterruptableSleep.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include <chrono>
#include <thread>
#include <future>
#include <mutex>
#include <sstream>

namespace Bosma {
class InterruptableSleep {

// InterruptableSleep offers a sleep that can be interrupted by any thread.
// It can be interrupted multiple times
// and be interrupted before any sleep is called (the sleep will immediately complete)
// Has same interface as condition_variables and futures, except with sleep instead of wait.
// Sleep can be called on multiple threads safely, but is not recommended as behaviour is undefined.

public:
InterruptableSleep() : interrupted(false) {
};
InterruptableSleep(const InterruptableSleep &) = delete;
InterruptableSleep(InterruptableSleep &&) noexcept = delete;
~InterruptableSleep() noexcept = default;
InterruptableSleep& operator=(const InterruptableSleep &) noexcept = delete;
InterruptableSleep& operator=(InterruptableSleep &&) noexcept = delete;

void sleep_for(std::chrono::nanoseconds duration) {
std::unique_lock<std::mutex> ul(m);
cv.wait_for(ul, duration, [this] { return interrupted; });
interrupted = false;
}
void sleep_until(std::chrono::system_clock::time_point time) {
std::unique_lock<std::mutex> ul(m);
cv.wait_until(ul, time, [this] { return interrupted; });
interrupted = false;
}
void sleep() {
std::unique_lock<std::mutex> ul(m);
cv.wait(ul, [this] { return interrupted; });
interrupted = false;
}
void interrupt() {
std::lock_guard<std::mutex> lg(m);
interrupted = true;
cv.notify_one();
}
private:
bool interrupted;
std::mutex m;
std::condition_variable cv;
};
}
185 changes: 185 additions & 0 deletions Scheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#pragma once

#include <iomanip>
#include <map>

#include "ctpl_stl.h"

#include "InterruptableSleep.h"
#include "Cron.h"

namespace Bosma {
using Clock = std::chrono::system_clock;

class Task {
public:
Task(std::function<void()> &&f, bool recur = false) : f(std::move(f)), recur(recur) {}

virtual Clock::time_point get_new_time() const = 0;

std::function<void()> f;

bool recur;
};

class InTask : public Task {
public:
InTask(std::function<void()> &&f) : Task(std::move(f)) {}

// dummy time_point because it's not used
Clock::time_point get_new_time() const override { return Clock::time_point(0ns); }
};

class EveryTask : public Task {
public:
EveryTask(std::chrono::nanoseconds time, std::function<void()> &&f) : Task(std::move(f), true), time(time) {}

Clock::time_point get_new_time() const override {
return Clock::now() + time;
};
std::chrono::nanoseconds time;
};

class CronTask : public Task {
public:
CronTask(const std::string &expression, std::function<void()> &&f) : Task(std::move(f), true), cron(expression) {}

Clock::time_point get_new_time() const override {
return cron.cron_to_next();
};
Cron cron;
};

inline bool try_parse(std::tm &tm, const std::string& expression, const std::string& format) {
std::stringstream ss(expression);
return !(ss >> std::get_time(&tm, format.c_str())).fail();
}

class Scheduler {
public:
Scheduler(unsigned int max_n_tasks = 4) : done(false), threads(max_n_tasks + 1) {
threads.push([this](int) {
while (!done) {
if (tasks.empty()) {
sleeper.sleep();
} else {
auto time_of_first_task = (*tasks.begin()).first;
sleeper.sleep_until(time_of_first_task);
}
std::lock_guard<std::mutex> l(lock);
manage_tasks();
}
});
}

Scheduler(const Scheduler &) = delete;

Scheduler(Scheduler &&) noexcept = delete;

Scheduler &operator=(const Scheduler &) = delete;

Scheduler &operator=(Scheduler &&) noexcept = delete;

~Scheduler() {
sleeper.interrupt();
done = true;
}

template<typename _Callable, typename... _Args>
void in(const Clock::time_point time, _Callable &&f, _Args &&... args) {
std::shared_ptr<Task> t = std::make_shared<InTask>(std::bind(std::forward<_Callable>(f), std::forward<_Args>(args)...));
add_task(time, std::move(t));
}

template<typename _Callable, typename... _Args>
void in(const std::chrono::nanoseconds time, _Callable &&f, _Args &&... args) {
in(Clock::now() + time, std::forward<_Callable>(f), std::forward<_Args>(args)...);
}

template<typename _Callable, typename... _Args>
void at(const std::string &time, _Callable &&f, _Args &&... args) {
// get current time as a tm object
auto time_now = Clock::to_time_t(Clock::now());
std::tm tm = *std::localtime(&time_now);

// our final time as a time_point
Clock::time_point tp;

if (try_parse(tm, time, "%H:%M:%S")) {
// convert tm back to time_t, then to a time_point and assign to final
tp = Clock::from_time_t(std::mktime(&tm));

// if we've already passed this time, the user will mean next day, so add a day.
if (Clock::now() >= tp)
tp += std::chrono::hours(24);
} else if (try_parse(tm, time, "%Y-%m-%d %H:%M:%S")) {
tp = Clock::from_time_t(std::mktime(&tm));
} else if (try_parse(tm, time, "%Y/%m/%d %H:%M:%S")) {
tp = Clock::from_time_t(std::mktime(&tm));
} else {
// could not parse time
throw std::runtime_error("Cannot parse time string: " + time);
}

in(tp, std::forward<_Callable>(f), std::forward<_Args>(args)...);
}

template<typename _Callable, typename... _Args>
void every(const std::chrono::nanoseconds time, _Callable &&f, _Args &&... args) {
std::shared_ptr<Task> t = std::make_shared<EveryTask>(time, std::bind(std::forward<_Callable>(f), std::forward<_Args>(args)...));
auto next_time = t->get_new_time();
add_task(next_time, std::move(t));
}

template<typename _Callable, typename... _Args>
void cron(const std::string &expression, _Callable &&f, _Args &&... args) {
std::shared_ptr<Task> t = std::make_shared<CronTask>(expression, std::bind(std::forward<_Callable>(f), std::forward<_Args>(args)...));
auto next_time = t->get_new_time();
add_task(next_time, std::move(t));
}

private:
std::atomic<bool> done;

Bosma::InterruptableSleep sleeper;

ctpl::thread_pool threads;
std::multimap<Clock::time_point, std::shared_ptr<Task>> tasks;
std::mutex lock;

void add_task(const Clock::time_point time, std::shared_ptr<Task> t) {
std::lock_guard<std::mutex> l(lock);
tasks.emplace(time, std::move(t));
sleeper.interrupt();
}

void manage_tasks() {
auto end_of_tasks_to_run = tasks.upper_bound(Clock::now());

// if there are any tasks to be run and removed
if (end_of_tasks_to_run != tasks.begin()) {
decltype(tasks) recurred_tasks;

for (auto i = tasks.begin(); i != end_of_tasks_to_run; ++i) {

auto &task = (*i).second;

threads.push([task](int) {
task->f();
});

// calculate time of next run and add the new task to the tasks to be recurred
if (task->recur)
recurred_tasks.emplace(task->get_new_time(), std::move(task));
}

// remove the completed tasks
tasks.erase(tasks.begin(), end_of_tasks_to_run);

// re-add the tasks that are recurring
for (auto &task : recurred_tasks)
tasks.emplace(task.first, std::move(task.second));
}
}
};
}
35 changes: 35 additions & 0 deletions example.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#include <iostream>

#include "Scheduler.h"

using namespace std::chrono_literals;

void message(const std::string &s) {
std::cout << s << std::endl;
}

int main() {
// number of tasks that can run simultaneously
// Note: not the number of tasks that can be added,
// but number of tasks that can be run in parallel
unsigned int max_n_threads = 12;

// cannot be moved or copied
Bosma::Scheduler s(max_n_threads);

s.every(1s, message, "every second");

s.in(1min, []() { std::cout << "in one minute" << std::endl; });

s.every(1min, []() { std::cout << "every minute" << std::endl; });

// https://en.wikipedia.org/wiki/Cron
s.cron("* * * * *", []() { std::cout << "top of every minute" << std::endl; });

s.at("2017-04-19 12:31:15", []() { std::cout << "at a specific time." << std::endl; });

s.cron("5 0 * * *", []() { std::cout << "every day 5 minutes after midnight" << std::endl; });

// destructor of Bosma::Scheduler will cancel all schedules but finish any tasks currently running
std::this_thread::sleep_for(30min);
}

0 comments on commit 882558c

Please sign in to comment.