Skip to content

Commit

Permalink
Add tpie::merge_sorter wrapper class (closes #144)
Browse files Browse the repository at this point in the history
This also removes the Buckets = 0 option for the Levelized Priority Queue,
since it spawns a single bucket that is spurious and dead. If we need an
unbucketed Levelized Priority Queue at some point (e.g. Issue #149) we
should do so with a completely new class.
  • Loading branch information
SSoelvsten committed Dec 2, 2021
1 parent 7ad942b commit 60b6787
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 453 deletions.
2 changes: 2 additions & 0 deletions src/adiar/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ set(HEADERS
internal/product_construction.h
internal/quantify.h
internal/reduce.h
internal/sorter.h
internal/substitution.h
internal/util.h

Expand Down Expand Up @@ -90,6 +91,7 @@ set(SOURCES
internal/pred.cpp
internal/levelized_priority_queue.cpp
internal/reduce.cpp
internal/sorter.cpp

# statistics
statistics.cpp
Expand Down
2 changes: 0 additions & 2 deletions src/adiar/internal/levelized_priority_queue.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#include <tpie/priority_queue.h> // imports tpie::consecutive_memory_available

#include "levelized_priority_queue.h"

namespace adiar {
Expand Down
145 changes: 34 additions & 111 deletions src/adiar/internal/levelized_priority_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,54 +12,13 @@
#include <adiar/file.h>
#include <adiar/file_stream.h>

#include <adiar/internal/sorter.h>

#include <adiar/statistics.h>

#include <adiar/bdd/bdd.h>

namespace adiar {
extern tpie::dummy_progress_indicator pq_tpie_progress_indicator;

////////////////////////////////////////////////////////////////////////////
// Levelized Priority Queue memory size computations
//
// We set up the merge_sorters such that they have a very low memory footprint
// on Phase 1 (sending sorted blocks to disk) and Phase 3 (merging sorted
// partial results). This allows us to place a lot of memory on Phase 2 (the
// primary sorting step), which will allow us to have multiple concurrent
// merger_sorters, since only one of them will every be running Phase 2, while
// all the others are active at Phase 1 or Phase 3 at the same time.

// TODO: Move into an adiar/memory.h file? This is similar computations as in
// reduce.cpp
template<typename T>
tpie::memory_size_type m_single_block()
{
// 128 Kb * sizeof(T) + 5 MB. This is the minimum to make TPIE not cry.
return sizeof(T) * 128 * 1024 + 5 * 1024 * 1024;
}

template<typename T, size_t Sorters>
tpie::memory_size_type m_overflow_queue(tpie::memory_size_type memory_given)
{
return std::max(// Take at least the same amount of memory as the buckets
m_single_block<T>(),
// Otherwise take a small fraction of the remaining memory
(memory_given - (m_single_block<T>() * (Sorters + 1))) / 10
);
}

template<typename T, size_t Sorters>
tpie::memory_size_type m_sort(tpie::memory_size_type memory_given,
tpie::memory_size_type memory_occupied_by_meta)
{
// Total amount of memory minus the blocks set aside for all buckets, the
// priority queue and memory take by the meta streams
return memory_given
- (m_single_block<T>() * (Sorters+1))
- memory_occupied_by_meta
- m_overflow_queue<T,Sorters>(memory_given);
}

//////////////////////////////////////////////////////////////////////////////
/// We will use a stream of labels to retrieve the values from. Since this may
/// be one or more streams (Apply, Equality, and Relational Product, and
Expand Down Expand Up @@ -198,6 +157,7 @@ namespace adiar {
: private LabelExt, private pq_label_mgr<File_T, Files, LabelComparator, MetaStreams>
{
static_assert(0 < MetaStreams, "At least one meta stream should be provided");
static_assert(0 < Buckets, "Bucketized levelized priority queue requires at least one bucket");

typedef pq_label_mgr<File_T, Files, LabelComparator, MetaStreams> label_mgr;

Expand All @@ -217,9 +177,11 @@ namespace adiar {

label_t _buckets_label [Buckets + 1];

tpie::memory_size_type _memory_given;
tpie::memory_size_type _memory_occupied_by_overflow;
tpie::memory_size_type _memory_occupied_by_meta;
tpie::memory_size_type _buckets_memory;
std::unique_ptr<tpie::merge_sorter<T, false, TComparator>> _buckets_sorter [Buckets + 1];
tpie::memory_size_type _memory_for_buckets;
std::unique_ptr<external_sorter<T, TComparator>> _buckets_sorter [Buckets + 1];

bool _has_next_from_bucket = false;
T _next_from_bucket;
Expand All @@ -229,15 +191,18 @@ namespace adiar {
////////////////////////////////////////////////////////////////////////////
// Constructors
private:
levelized_priority_queue(tpie::memory_size_type memory_given)
: _memory_occupied_by_meta(tpie::get_memory_manager().available()),
_buckets_memory(memory_given),
_overflow_queue(m_overflow_queue<T, Buckets>(memory_given))
static tpie::memory_size_type m_overflow_queue(tpie::memory_size_type memory_given)
{
adiar_debug(m_single_block<T>() * (Buckets + 2) < _buckets_memory,
"Not enough memory to instantiate all buckets and overflow queue concurrently");
return memory_given / std::max(10lu, Buckets+1 + 1);
}

levelized_priority_queue(tpie::memory_size_type memory_given)
: _memory_given(memory_given),
_memory_occupied_by_overflow(m_overflow_queue(memory_given)),
_memory_occupied_by_meta(tpie::get_memory_manager().available()),
_overflow_queue(m_overflow_queue(memory_given))
{ }

public:
////////////////////////////////////////////////////////////////////////////
/// \brief Instantiate the priority_queue with the given amount of memory.
Expand All @@ -246,22 +211,22 @@ namespace adiar {
/// take.
////////////////////////////////////////////////////////////////////////////
levelized_priority_queue(const meta_file<File_T, Files> (& files) [MetaStreams],
tpie::memory_size_type memory_given)
tpie::memory_size_type memory_given)
: levelized_priority_queue(memory_given)
{
for (const meta_file<File_T, Files> &f : files) {
label_mgr::hook_meta_stream(f);
}
setup_buckets();
init_buckets();
}

levelized_priority_queue(const decision_diagram (& dds) [MetaStreams],
tpie::memory_size_type memory_given)
tpie::memory_size_type memory_given)
: levelized_priority_queue(memory_given) {
for (const decision_diagram& dd : dds) {
label_mgr::hook_meta_stream(dd.file);
}
setup_buckets();
init_buckets();
}

levelized_priority_queue(const meta_file<File_T, Files> (& files) [MetaStreams])
Expand All @@ -273,13 +238,17 @@ namespace adiar {
////////////////////////////////////////////////////////////////////////////
// Private constructor methods
private:
void setup_buckets()
void init_buckets()
{
// This was set in the private constructor above to be the total amount of
// memory. This was done before the label_mgr had created all of its meta
// streams, so we can get how much space they already took of what we are
// given now.
_memory_occupied_by_meta -= tpie::get_memory_manager().available();
_memory_for_buckets = _memory_given - _memory_occupied_by_meta - _memory_occupied_by_overflow;

adiar_debug(_memory_occupied_by_meta + _memory_occupied_by_overflow <= _memory_given,
"the amount of memory used should be within the given bounds");

if (label_mgr::can_pull()) {
label_t label = label_mgr::pull();
Expand All @@ -296,7 +265,7 @@ namespace adiar {
setup_bucket(_back_bucket_idx, label);
}

calc_front_bucket();
sort_front_bucket();
}
}

Expand All @@ -309,10 +278,6 @@ namespace adiar {
void push(const T &t)
{
_size++;
if constexpr (Buckets == 0) {
return _overflow_queue.push(t);
}

label_t label = LabelExt::label_of(t);

adiar_debug(_label_comparator(front_bucket_label(), label),
Expand All @@ -339,10 +304,6 @@ namespace adiar {
////////////////////////////////////////////////////////////////////////////
label_t current_level() const
{
if constexpr (Buckets == 0) {
return _buckets_label[0];
}

return front_bucket_label();
}

Expand All @@ -351,12 +312,6 @@ namespace adiar {
////////////////////////////////////////////////////////////////////////////
bool has_next_level()
{
if constexpr (Buckets == 0) {
adiar_debug (!can_pull(), "Cannot check on next level on empty queue");

return !_overflow_queue.empty();
}

return _size > 0 && has_next_bucket();
}

Expand Down Expand Up @@ -386,14 +341,6 @@ namespace adiar {
adiar_debug(has_next_level(),
"Has no next level to go to");

if constexpr (Buckets == 0) {
while (LabelExt::label_of(_overflow_queue.top()) != _buckets_label[0]
&& (!has_stop_label || _label_comparator(_buckets_label[0], stop_label))) {
_buckets_label[0] = label_mgr::pull();
}
return;
}

adiar_debug(_label_comparator(front_bucket_label(), back_bucket_label()),
"Front bucket run ahead of back bucket");

Expand All @@ -411,7 +358,7 @@ namespace adiar {
&& has_next_bucket();
b++) {
setup_next_bucket();
calc_front_bucket();
sort_front_bucket();
}

// Are we still at an empty bucket and behind the overflow queue and the
Expand Down Expand Up @@ -442,7 +389,7 @@ namespace adiar {
_front_bucket_idx = (_front_bucket_idx + 1) % (Buckets + 1);
}

calc_front_bucket();
sort_front_bucket();
}
}

Expand All @@ -457,12 +404,6 @@ namespace adiar {
////////////////////////////////////////////////////////////////////////////
bool can_pull()
{
if constexpr (Buckets == 0) {
return
!_overflow_queue.empty()
&& _buckets_label[0] == LabelExt::label_of(_overflow_queue.top());
}

return
_has_peeked
// Is the current bucket non-empty?
Expand All @@ -487,10 +428,6 @@ namespace adiar {
{
adiar_debug (can_pull(), "Cannot peek on empty level/queue");

if constexpr (Buckets == 0) {
return _overflow_queue.top();
}

if (!_has_peeked) {
_size++; // Compensate that pull() decrements the size
_peeked = pull();
Expand Down Expand Up @@ -519,12 +456,6 @@ namespace adiar {
adiar_debug (can_pull(), "Cannot pull on empty level/queue");

_size--;
if constexpr (Buckets == 0) {
T t = _overflow_queue.top();
_overflow_queue.pop();
return t;
}

if (_has_peeked) {
_has_peeked = false;
return _peeked;
Expand Down Expand Up @@ -587,13 +518,7 @@ namespace adiar {
void setup_bucket(size_t idx, label_t label)
{
_buckets_label[idx] = label;

_buckets_sorter[idx] = std::make_unique<tpie::merge_sorter<T, false, TComparator>>(_t_comparator);

_buckets_sorter[idx] -> set_available_memory(m_single_block<T>(),
m_sort<T, Buckets>(_buckets_memory, _memory_occupied_by_meta),
m_single_block<T>());
_buckets_sorter[idx] -> begin();
_buckets_sorter[idx] = std::make_unique<external_sorter<T, TComparator>>(_memory_for_buckets, Buckets+1);
}

void setup_next_bucket()
Expand All @@ -614,10 +539,9 @@ namespace adiar {
"Inconsistency in has_next_bucket predicate");
}

void calc_front_bucket()
void sort_front_bucket()
{
_buckets_sorter[_front_bucket_idx] -> end();
_buckets_sorter[_front_bucket_idx] -> calc(pq_tpie_progress_indicator);
_buckets_sorter[_front_bucket_idx] -> sort();

_has_next_from_bucket = _buckets_sorter[_front_bucket_idx] -> can_pull();
if (_has_next_from_bucket) {
Expand All @@ -627,10 +551,6 @@ namespace adiar {

size_t active_buckets() const
{
if constexpr (Buckets == 0) {
return 0;
}

return _front_bucket_idx <= _back_bucket_idx
? _back_bucket_idx - _front_bucket_idx
: (Buckets + 1 - _front_bucket_idx) + _back_bucket_idx;
Expand All @@ -656,6 +576,9 @@ namespace adiar {
typename TComparator = std::less<T>, typename LabelComparator = std::less<label_t>,
size_t MetaStreams = 1u, size_t Buckets = ADIAR_PQ_BUCKETS>
using levelized_arc_priority_queue = levelized_priority_queue<arc_t, ARC_FILE_COUNT, T, LabelExt, TComparator, LabelComparator, MetaStreams, Buckets>;

//////////////////////////////////////////////////////////////////////////////
/// TODO: Make a levelized_priority_queue that does not have any buckets
}

#endif // ADIAR_INTERNAL_LEVELIZED_PRIORITY_QUEUE_H
Loading

0 comments on commit 60b6787

Please sign in to comment.